前言
之前一直用RabbitMQ做消息队列,最近项目要换成Kafka
所以学习了一下Kafka,记录一下使用过程
其实Kafka和RabbitMQ的设计理念不太一样,Kafka更注重高吞吐量和持久化
Kafka简介
Kafka是一个分布式消息系统,主要用于:
核心概念:
- Producer:消息生产者
- Consumer:消息消费者
- Broker:Kafka服务节点
- Topic:消息主题
- Partition:分区,一个Topic可以分为多个Partition
- Offset:消息在分区中的位置
- Consumer Group:消费者组,组内每个消费者负责不同的分区
安装Kafka
Docker安装
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| docker run -d \ --name zookeeper \ -p 2181:2181 \ -e ZOOKEEPER_CLIENT_PORT=2181 \ confluentinc/cp-zookeeper:latest
docker run -d \ --name kafka \ -p 9092:9092 \ -e KAFKA_BROKER_ID=1 \ -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \ -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \ confluentinc/cp-kafka:latest
|
本地安装
1 2 3 4 5 6 7 8 9 10 11 12
| wget https://downloads.apache.org/kafka/2.13/tgz/kafka_2.13-3.5.1.tgz
tar -xzf kafka_2.13-3.5.1.tgz cd kafka_2.13-3.5.1
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
|
SpringBoot整合
添加依赖
1 2 3 4
| <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
|
配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| spring: kafka: bootstrap-servers: localhost:9092
producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer acks: all retries: 3 batch-size: 16384 buffer-memory: 33554432
consumer: key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer group-id: my-consumer-group auto-offset-reset: earliest enable-auto-commit: false max-poll-records: 100
listener: ack-mode: manual_immediate concurrency: 3
|
生产者
发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| @Service public class KafkaProducer {
@Autowired private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String message) { kafkaTemplate.send(topic, message); }
public void sendMessageWithKey(String topic, String key, String message) { kafkaTemplate.send(topic, key, message); }
public void sendMessageAsync(String topic, String message) { kafkaTemplate.send(topic, message).addCallback( success -> { System.out.println("消息发送成功: " + success.getRecordMetadata().offset()); }, failure -> { System.err.println("消息发送失败: " + failure.getMessage()); } ); } }
|
使用示例
1 2 3 4 5 6 7 8 9 10 11 12 13
| @RestController @RequestMapping("/kafka") public class KafkaController {
@Autowired private KafkaProducer kafkaProducer;
@PostMapping("/send") public String sendMessage(@RequestParam String message) { kafkaProducer.sendMessage("my-topic", message); return "消息已发送"; } }
|
消费者
监听消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| @Component public class KafkaConsumer {
@KafkaListener(topics = "my-topic", groupId = "my-consumer-group") public void consume(String message) { System.out.println("收到消息: " + message); }
@KafkaListener(topics = "my-topic", groupId = "my-consumer-group") public void consumeManualAck(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) { try { String message = record.value(); System.out.println("收到消息: " + message);
acknowledgment.acknowledge(); } catch (Exception e) { e.printStackTrace(); } }
@KafkaListener(topics = "my-topic", groupId = "my-consumer-group", batch = "true") public void consumeBatch(List<String> messages) { System.out.println("收到" + messages.size() + "条消息"); messages.forEach(System.out::println); } }
|
获取元数据
1 2 3 4 5 6 7 8 9 10 11
| @KafkaListener(topics = "my-topic", groupId = "my-consumer-group") public void consumeWithMetadata(ConsumerRecord<String, String> record) { String topic = record.topic(); int partition = record.partition(); long offset = record.offset(); String key = record.key(); String value = record.value();
System.out.printf("topic=%s, partition=%d, offset=%d, key=%s, value=%s%n", topic, partition, offset, key, value); }
|
分区和消费者组
分区数和消费者数
- 分区数决定了同一个消费者组中最多有多少个消费者
- 如果消费者数多于分区数,多余的消费者空闲
- 如果分区数多于消费者数,一个消费者消费多个分区
创建Topic时指定分区数
1 2 3 4 5
| kafka-topics.sh --create \ --topic my-topic \ --bootstrap-server localhost:9092 \ --partitions 3 \ --replication-factor 1
|
或者在配置文件中:
1 2 3 4
| spring: kafka: listener: concurrency: 3
|
消息序列化
发送JSON对象
定义消息类:
1 2 3 4 5 6 7 8
| @Data @AllArgsConstructor @NoArgsConstructor public class UserMessage { private Long id; private String username; private String email; }
|
配置序列化器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| @Configuration public class KafkaConfig {
@Bean public ProducerFactory<String, UserMessage> producerFactory() { Map<String, Object> config = new HashMap<>(); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config); }
@Bean public KafkaTemplate<String, UserMessage> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); }
@Bean public ConsumerFactory<String, UserMessage> consumerFactory() { Map<String, Object> config = new HashMap<>(); config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return new DefaultKafkaConsumerFactory<>(config); }
@Bean public ConcurrentKafkaListenerContainerFactory<String, UserMessage> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, UserMessage> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }
|
使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @Service public class UserProducer {
@Autowired private KafkaTemplate<String, UserMessage> kafkaTemplate;
public void sendUser(UserMessage user) { kafkaTemplate.send("user-topic", user); } }
@Component public class UserConsumer {
@KafkaListener(topics = "user-topic", groupId = "user-group", containerFactory = "kafkaListenerContainerFactory") public void consumeUser(UserMessage user) { System.out.println("收到用户消息: " + user); } }
|
消息重试和死信队列
配置重试
1 2 3 4 5 6 7
| spring: kafka: listener: retry: enabled: true max-attempts: 3 back-off: 1000
|
死信队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| @Service public class KafkaDeadLetterService {
@Autowired private KafkaTemplate<String, String> kafkaTemplate;
@RetryableTopic(attempts = "3") @KafkaListener(topics = "my-topic", groupId = "my-group") public void consume(String message) { try { processMessage(message); } catch (Exception e) { kafkaTemplate.send("my-topic-dlt", message); throw e; } }
private void processMessage(String message) { } }
|
监控
Kafka自带的监控工具
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| kafka-topics.sh --list --bootstrap-server localhost:9092
kafka-topics.sh --describe \ --topic my-topic \ --bootstrap-server localhost:9092
kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
kafka-consumer-groups.sh --describe \ --group my-consumer-group \ --bootstrap-server localhost:9092
kafka-console-consumer.sh \ --topic my-topic \ --from-beginning \ --bootstrap-server localhost:9092
|
SpringBoot Actuator集成
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency>
|
1 2 3 4 5
| management: endpoints: web: exposure: include: health,info,bindings
|
总结
Kafka用起来还是挺方便的,主要优势是高吞吐量和持久化
适用场景:
不适合的场景:
- 对消息可靠性要求极高
- 需要复杂的路由规则
- 需要延迟消息
暂时就先记录这么多