kafka入门

本文最后更新于:2023年9月25日 下午

kafka

Kafka是一个分布式流处理平台。它可以用于两大类别的应用:

  1. 构造实时流数据管道,它可以在系统或应用之间可靠地获取数据。 (相当于message queue)
  2. 构建实时流式应用程序,对这些流数据进行转换或者影响。 (就是流处理,通过kafka stream topic和topic之间内部进行变化)

kafka结构

Kafka作为一个集群,运行在一台或者多台服务器上。Kafka通过topic对存储的流数据进行分类。每个topic被划分为一个或多个partition,每个partition具有一个或多个副本。每条记录由topic-partition-offset唯一确定,其中包含一个key,一个value和一个timestamp.

topic结构

生产者可以将数据发布到所选择的topic中。消费者使用一个消费组名称来进行标识,发布到topic中的每条记录被分配给订阅消费组中的一个消费者实例。消费者实例可以分布在多个进程中或者多个机器上。如果所有的消费者实例在同一消费组中,消息记录会负载平衡到每一个消费者实例(每一个partition在任意给定的时间内只能被每个订阅了这个topic的consumer组中的一个consumer消费。如果消费者多于分区数,会有消费者闲置)。如果所有的消费者实例在不同的消费组中,每条消息记录会广播到所有的消费者进程.

消费组

Kafka 只保证partition内的记录是有序的,而不保证topic中不同partition的顺序。

Kafka依赖Zookeeper:

  • 管理 broker 与 consumer 的动态加入与离开。
  • 当 broker 或 consumer 加入或离开时会触发负载均衡算法,使得一个 consumer group 内的多个 consumer 的消费负载平衡。
  • 维护消费关系及每个 partition 的消费信息。

kafka生产消费流程

producer发送消息

  1. 封装为 ProducerRecord 实例
  2. 序列化
  3. 由 partitioner 确定具体分区
  4. 发送到内存缓冲区
  5. 由 producer 的一个专属 I/O 线程去取消息,并将其封装到一个批次 ,发送给对应分区的 kafka broker
  6. leader 将消息写入本地 log
  7. followers 从 leader pull 消息,写入本地 log 后 leader 发送 ACK
  8. leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 发送 ACK

生产流程

consumer消费消息

  1. 连接 ZK 集群,拿到对应 topic 的 partition 信息和 partition 的 leader 的相关信息
  2. 连接到对应 leader 对应的 broker
  3. consumer 将自己保存的 offset 发送给 leader
  4. leader 根据 offset 等信息定位到 segment(索引文件和日志文件)
  5. 根据索引文件中的内容,定位到日志文件中该偏移量对应的开始位置读取相应长度的数据并返回给 consumer

安装

  • 使用docker安装。先创建docker虚拟网络:
1
docker network create app-tier --driver bridge
  • 安装zookeeper:
1
docker run -d --name zookeeper-server --network app-tier -e ALLOW_ANONYMOUS_LOGIN=yes bitnami/zookeeper:latest
  • 安装一个单机的kafka:
1
docker run -d --network app-tier --log-driver json-file --log-opt max-size=100m --log-opt max-file=2 --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper-server:2181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -v /etc/localtime:/etc/localtime wurstmeister/kafka

springboot中使用kafka进行生产消费

  • maven依赖
1
2
3
4
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
  • application.propertise配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
###########【Kafka集群】###########
spring.kafka.bootstrap-servers=127.0.0.1:9092
###########【初始化生产者配置】###########
# 重试次数
spring.kafka.producer.retries=0
# 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
spring.kafka.producer.acks=1
# 批量大小
spring.kafka.producer.batch-size=16384
# 提交延时。当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka。linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
spring.kafka.producer.properties.linger.ms=0
# 生产端缓冲区大小
spring.kafka.producer.buffer-memory = 33554432
# Kafka提供的序列化和反序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
###########【初始化消费者配置】###########
# 默认的消费组ID
spring.kafka.consumer.properties.group.id=defaultConsumerGroup
# 是否自动提交offset
spring.kafka.consumer.enable-auto-commit=true
# 提交offset延时(接收到消息后多久提交offset)
spring.kafka.consumer.auto.commit.interval.ms=1000
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset;
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
spring.kafka.consumer.auto-offset-reset=latest
# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=120000
# 消费请求超时时间
spring.kafka.consumer.properties.request.timeout.ms=180000
# Kafka提供的序列化和反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 消费端监听的topic不存在时,项目启动会报错(关掉)
spring.kafka.listener.missing-topics-fatal=false
  • 手动初始化分区和副本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Configuration
public class KafkaInitialConfiguration {
// 创建一个名为testtopic的Topic并设置分区数为8,分区副本数为2
@Bean
public NewTopic initialTopic() {
return new NewTopic("testtopic",8, (short) 2 );
}

// 如果要修改分区数,只需修改配置值重启项目即可。分区数只能增大不能减小
@Bean
public NewTopic updateTopic() {
return new NewTopic("testtopic",10, (short) 2 );
}
}
  • 简单生产和消费
1
2
3
4
5
6
7
8
9
10
11
@RestController
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;

// 发送消息
@GetMapping("/kafka/normal/{message}")
public void sendMessage1(@PathVariable("message") String normalMessage) {
kafkaTemplate.send("topic1", normalMessage);
}
}
1
2
3
4
5
6
7
8
@Component
public class KafkaConsumer {
// 消费topic1
@KafkaListener(topics = {"topic1"})
public void onMessage1(ConsumerRecord<?, ?> record){
System.out.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
}
}

get请求localhost:8080/kafka/normal/123,控制台打印生产消费结果

  • 指定消费者组、分区、offset
1
2
3
4
5
6
7
8
9
// 监听topic1的0号分区,同时监听topic2的0号分区和topic2的1号分区里面offset从8开始的消息
@KafkaListener(id = "consumer1",groupId = "felix-group",topicPartitions = {
@TopicPartition(topic = "topic1", partitions = { "0" }),
@TopicPartition(topic = "topic2", partitions = "0", partitionOffsets =
@PartitionOffset(partition = "1", initialOffset = "8"))
})
public void onMessage2(ConsumerRecord<?, ?> record) {
System.out.println("topic:"+record.topic()+"|partition:"+record.partition()+"|offset:"+record.offset()+"|value:"+record.value());
}
  • 批量消费
1
2
3
4
5
6
7
8
@Component
public class KafkaConsumer {
@KafkaListener(id = "consumer2", groupId = "felix-group", topics = "topic1")
public void onMessage3(List<ConsumerRecord<?, ?>> records) {
System.out.println(">>>批量消费一次,records.size()=" + records.size());
records.forEach(System.out::println);
}
}

修改properties文件

1
2
3
4
# 设置批量消费
spring.kafka.listener.type=batch
# 批量消费每次最多消费多少条消息
spring.kafka.consumer.max-poll-records=5

批量消费结果

消费结果

为什么这里offset每次加2,不连续?

  • 未使用事务时,at-least-once语义,消息重发时,会占用offset
  • 使用事务时,每次事务的commit/abort,都会写一个标志,这个标志会占用offset
  • 带回调的生产者
1
2
3
4
5
6
7
8
9
10
11
12
@GetMapping("/kafka/callbackOne/{message}")
public void sendMessage2(@PathVariable("message") String callbackMessage) {
kafkaTemplate.send("topic1", callbackMessage).addCallback(success -> {
String topic = success.getRecordMetadata().topic();
int partition = success.getRecordMetadata().partition();
long offset = success.getRecordMetadata().offset();
System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
}, failure -> {
// 发送失败时做补偿处理
System.out.println("发送消息失败:" + failure.getMessage());
});
}
  • 支持事务的生产者

先在application.propertise配置事务:

1
2
3
4
5
6
7
8
# 重试次数必须大于0
spring.kafka.producer.retries=1
# 事务前缀名
spring.kafka.producer.transaction-id-prefix=tx_
# 支持幂等性
spring.kafka.producer.properties.enable.idempotence=true
# 必须所有副本都写入成功才应答
spring.kafka.producer.acks=all
1
2
3
4
5
6
7
8
@GetMapping("/kafka/transaction")
public void sendMessage7() {
// 启用事务执行。也可使用 @Transactional 注解替代。
kafkaTemplate.executeInTransaction(operations -> {
operations.send("topic1", "test executeInTransaction");
throw new RuntimeException("fail");
});
}

事务回滚

生产结果

  • 消费者异常处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 新建一个异常处理器,用@Bean注入
@Bean
public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
return (message, exception, consumer) -> {
System.out.println("消费异常:"+message.getPayload());
return null;
};
}

// 将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面
@KafkaListener(topics = {"topic1"},errorHandler = "consumerAwareErrorHandler")
public void onMessage4(ConsumerRecord<?, ?> record) throws Exception {
throw new Exception("简单消费-模拟异常");
}
  • 自定义生产者分区策略
1
2
3
4
5
6
7
8
9
10
11
12
13
public class CustomizePartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 自定义分区规则(这里假设全部发到1号分区)
return 1;
}

@Override
public void close() {}

@Override
public void configure(Map<String, ?> configs) {}
}

在application.propertise配置分区器:

1
spring.kafka.producer.properties.partitioner.class=com.example.demo.CustomizePartitioner

消费结果

消费结果


kafka入门
https://njuu.top/1970/01/01/java/kafka入门/
作者
Wayne
发布于
1970年1月1日
许可协议