Kafka入门与SpringBoot整合实战

前言

之前一直用RabbitMQ做消息队列,最近项目要换成Kafka

所以学习了一下Kafka,记录一下使用过程

其实Kafka和RabbitMQ的设计理念不太一样,Kafka更注重高吞吐量和持久化

Kafka简介

Kafka是一个分布式消息系统,主要用于:

  • 日志收集
  • 流式处理
  • 事件驱动架构

核心概念:

  • Producer:消息生产者
  • Consumer:消息消费者
  • Broker:Kafka服务节点
  • Topic:消息主题
  • Partition:分区,一个Topic可以分为多个Partition
  • Offset:消息在分区中的位置
  • Consumer Group:消费者组,组内每个消费者负责不同的分区

安装Kafka

Docker安装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 启动ZooKeeper(Kafka依赖)
docker run -d \
--name zookeeper \
-p 2181:2181 \
-e ZOOKEEPER_CLIENT_PORT=2181 \
confluentinc/cp-zookeeper:latest

# 启动Kafka
docker run -d \
--name kafka \
-p 9092:9092 \
-e KAFKA_BROKER_ID=1 \
-e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
confluentinc/cp-kafka:latest

本地安装

1
2
3
4
5
6
7
8
9
10
11
12
# 下载Kafka
wget https://downloads.apache.org/kafka/2.13/tgz/kafka_2.13-3.5.1.tgz

# 解压
tar -xzf kafka_2.13-3.5.1.tgz
cd kafka_2.13-3.5.1

# 启动ZooKeeper
bin/zookeeper-server-start.sh config/zookeeper.properties

# 启动Kafka
bin/kafka-server-start.sh config/server.properties

SpringBoot整合

添加依赖

1
2
3
4
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
spring:
kafka:
bootstrap-servers: localhost:9092

# Producer配置
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: all # 确认级别
retries: 3 # 重试次数
batch-size: 16384 # 批量发送大小
buffer-memory: 33554432 # 缓冲区大小

# Consumer配置
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: my-consumer-group # 消费者组
auto-offset-reset: earliest # offset重置策略
enable-auto-commit: false # 关闭自动提交
max-poll-records: 100 # 每次poll最多拉取多少条消息

# Listener配置
listener:
ack-mode: manual_immediate # 手动确认模式
concurrency: 3 # 监听器并发数

生产者

发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Service
public class KafkaProducer {

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

// 发送简单消息
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}

// 发送带key的消息(key决定分区)
public void sendMessageWithKey(String topic, String key, String message) {
kafkaTemplate.send(topic, key, message);
}

// 异步发送
public void sendMessageAsync(String topic, String message) {
kafkaTemplate.send(topic, message).addCallback(
success -> {
System.out.println("消息发送成功: " + success.getRecordMetadata().offset());
},
failure -> {
System.err.println("消息发送失败: " + failure.getMessage());
}
);
}
}

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
@RestController
@RequestMapping("/kafka")
public class KafkaController {

@Autowired
private KafkaProducer kafkaProducer;

@PostMapping("/send")
public String sendMessage(@RequestParam String message) {
kafkaProducer.sendMessage("my-topic", message);
return "消息已发送";
}
}

消费者

监听消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Component
public class KafkaConsumer {

// 简单监听
@KafkaListener(topics = "my-topic", groupId = "my-consumer-group")
public void consume(String message) {
System.out.println("收到消息: " + message);
}

// 手动确认
@KafkaListener(topics = "my-topic", groupId = "my-consumer-group")
public void consumeManualAck(ConsumerRecord<String, String> record,
Acknowledgment acknowledgment) {
try {
String message = record.value();
// 处理消息
System.out.println("收到消息: " + message);

// 手动确认
acknowledgment.acknowledge();
} catch (Exception e) {
// 处理失败,不确认,消息会重新消费
e.printStackTrace();
}
}

// 批量消费
@KafkaListener(topics = "my-topic", groupId = "my-consumer-group",
batch = "true")
public void consumeBatch(List<String> messages) {
System.out.println("收到" + messages.size() + "条消息");
messages.forEach(System.out::println);
}
}

获取元数据

1
2
3
4
5
6
7
8
9
10
11
@KafkaListener(topics = "my-topic", groupId = "my-consumer-group")
public void consumeWithMetadata(ConsumerRecord<String, String> record) {
String topic = record.topic();
int partition = record.partition();
long offset = record.offset();
String key = record.key();
String value = record.value();

System.out.printf("topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",
topic, partition, offset, key, value);
}

分区和消费者组

分区数和消费者数

  • 分区数决定了同一个消费者组中最多有多少个消费者
  • 如果消费者数多于分区数,多余的消费者空闲
  • 如果分区数多于消费者数,一个消费者消费多个分区

创建Topic时指定分区数

1
2
3
4
5
kafka-topics.sh --create \
--topic my-topic \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 1

或者在配置文件中:

1
2
3
4
spring:
kafka:
listener:
concurrency: 3 # 并发数对应分区数

消息序列化

发送JSON对象

定义消息类:

1
2
3
4
5
6
7
8
@Data
@AllArgsConstructor
@NoArgsConstructor
public class UserMessage {
private Long id;
private String username;
private String email;
}

配置序列化器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Configuration
public class KafkaConfig {

@Bean
public ProducerFactory<String, UserMessage> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

return new DefaultKafkaProducerFactory<>(config);
}

@Bean
public KafkaTemplate<String, UserMessage> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}

@Bean
public ConsumerFactory<String, UserMessage> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");

return new DefaultKafkaConsumerFactory<>(config);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, UserMessage> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, UserMessage> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}

使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Service
public class UserProducer {

@Autowired
private KafkaTemplate<String, UserMessage> kafkaTemplate;

public void sendUser(UserMessage user) {
kafkaTemplate.send("user-topic", user);
}
}

@Component
public class UserConsumer {

@KafkaListener(topics = "user-topic", groupId = "user-group",
containerFactory = "kafkaListenerContainerFactory")
public void consumeUser(UserMessage user) {
System.out.println("收到用户消息: " + user);
}
}

消息重试和死信队列

配置重试

1
2
3
4
5
6
7
spring:
kafka:
listener:
retry:
enabled: true
max-attempts: 3
back-off: 1000 # 重试间隔

死信队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Service
public class KafkaDeadLetterService {

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@RetryableTopic(attempts = "3")
@KafkaListener(topics = "my-topic", groupId = "my-group")
public void consume(String message) {
try {
// 处理消息
processMessage(message);
} catch (Exception e) {
// 处理失败,发送到死信队列
kafkaTemplate.send("my-topic-dlt", message);
throw e;
}
}

private void processMessage(String message) {
// 业务逻辑
}
}

监控

Kafka自带的监控工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 查看Topic列表
kafka-topics.sh --list --bootstrap-server localhost:9092

# 查看Topic详情
kafka-topics.sh --describe \
--topic my-topic \
--bootstrap-server localhost:9092

# 查看消费者组
kafka-consumer-groups.sh --list --bootstrap-server localhost:9092

# 查看消费者组详情
kafka-consumer-groups.sh --describe \
--group my-consumer-group \
--bootstrap-server localhost:9092

# 查看消息
kafka-console-consumer.sh \
--topic my-topic \
--from-beginning \
--bootstrap-server localhost:9092

SpringBoot Actuator集成

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
1
2
3
4
5
management:
endpoints:
web:
exposure:
include: health,info,bindings

总结

Kafka用起来还是挺方便的,主要优势是高吞吐量和持久化

适用场景:

  • 日志收集
  • 流式处理
  • 事件驱动
  • 大数据场景

不适合的场景:

  • 对消息可靠性要求极高
  • 需要复杂的路由规则
  • 需要延迟消息

暂时就先记录这么多