Kafka必知必会18问:30+图带您看透Kafka原创
基于前面的消息队列文章,我们对消息队列发展历史有了一个比较全面的了解,并且最主流的两款消息队列:RabbotMQ和RocketMQ的用法,使用场景,常见问题以及解决方法,原理都有了比较深入的了解:
有网友提到RabbitMQ受限于开发语言,比较难以一探究竟,而RocketMQ对于Java开发人员来说更加触手可得。在发表了RocketMQ的文章之后,有几个网友反馈可否出一篇Kafka的文章,所以我就写了这篇文章。那么话不多说,我们开始吧。
本文将带您了解以下问题:
-
Kafka是如何存储和检索消息的?(log文件,index索引文件,timeindex索引文件)
-
Kafka是如何基于offset查找消息的?
-
Kafka有哪些日志清理策略?什么场景下会用到?
-
ISR是干嘛的?
-
Kafka总控制器是干嘛的?如何选举出来的?
-
Topic的最优Leader副本是如何选举出来的?
-
什么时候会触发消费的Rebalace?Kafka中有哪些Rebalance策略?
-
Rebalance是如何工作的?
-
Kafka是如何保证数据可靠性的?
-
Kafka是如何保证数据一致性的?
-
消费者是如何提交offset的?
-
有哪些消费历史消息的方法?
-
Kafka为啥性能这么高?
-
Kafka如何避免重复消费?
-
Kafka如何处理消息堆积?
-
如何保证消息顺序性?
-
Kafka如何实现消息传递保障?
-
Kafka有哪些关键的生产者和消费者参数?
-
…
本文主要内容:
Kafka是一个分布式实时事件流平台,主要提供了关键功能:
-
发布和订阅事件流,事件记录被存储起来,因此消费应用程序可以提取他们需要的信息,并跟踪历史保存的所有消息;
-
支持高吞吐量;
-
可以弹性和透明的扩容,无需停机;
-
将事件流存储在磁盘上,并在分布式集群中实现多副本存储,以实现容错,支持配置事件记录数据存储的时长;
-
基于Zookeeper的同步控制器,以保持主题、分区和元数据的高可用(不过在2.8版本之后,可以使用基于 Kafka Raft 的 Quorm 控制器取代基于Zookeeper的控制器)。
如果对Kafka不是很了解,看到上面功能列表,大家可能会比较茫然,不过没关系,接下来的文章保证给大家彻底讲明白,一看就懂,看不懂就当我没说。
下面看看Kafka的整体架构以及关键组件。
1. Kafka整体架构
Kafka整体架构图如下:
Kafka基本概念:
-
Broker
:Kafka以Broker集群的方式运行,一个Kafka节点就是一个Broker。理论上可以跨越多个数据中心。Broker负责数据复制,管理主题、分区、消费偏移量等。如果要跨越多个数据中心,数据中心之间的网络延迟需要非常低,因为Kafka Broker之间以及Broker和Zookeeper服务器之间有大量的通信。 -
在上图中,Kafka集群中包含3个Broker。
-
Topic
:即主题,与RocketMQ的Topic类似,使用Topic对消息进行分类,Kafka接收到的每条消息都会放入到一个Topic中。 -
Topic代表发布和消费记录的端点。生产者向主题发布消息,消费者订阅主题进行消费消息;
-
每条记录有一个键,一个值,一个时间戳和一些元数据组成;
-
在未指定分区的情况下发布消息时,将使用键的散列选择分区。
-
Producer
:消息生产者,负责向Broker发送消息; -
Consumer
:消息消费者,从负责Broker读取并消费消息; -
ConsumerGroup
:消费分组,对于同一个主题,可以被多个消费分组分别消费,每个消费分组有自己的消费偏移量,互不影响; -
Partition
:分区,对Topic的数据进行分布式存储的最小单位。
再次说明下关键点:Kafka每个分区的消息存在在CommitLog文件中,每个Consumer各自维护各自对CommitLog的消费进度(offset),可以从头到尾消费消息,也可以指定offset来重复消费消息,或者跳过某些消息。
接下来的文章会详细讲解这些特性。
2. Kafka存储架构
在介绍RocketMQ的时候,高并发异步解耦利器:RocketMQ究竟强在哪里?这篇文章中,我们介绍了RocketMQ的存储架构,由于RocketMQ是基于Kafka改造而来的,所以Rocket与Kafa的存储架构很相似。这里对比下:
-
RocketMQ是把Topic分片存储到各个Broker节点中,然后在把Broker节点中的Topic继续分片为若干等分的ConsumeQueue,从而提高消息的吞吐量。ConsumeQueue是作为负载均衡资源分配的基本单元;
-
类似的,Kafka的Topic以Partition为单位,分片存储到各个Broker节点中,一个Broker节点可以存储多个Partition,Partition是作为Kafka负载均衡资源分配的基本单元。
还没有深入了解RocketMQ的朋友,可以看看高并发异步解耦利器:RocketMQ究竟强在哪里?这篇文章,更多技术文章,欢迎关注我的博客IT宅(itzhai.com)或者Java架构杂谈公众号。
2.1 Kafka分区文件存储方式
Kafka的Partition类似于RocketMQ的ConsumeQueue。随便查看某一个Topic Partition下的文件:
我们重点看看index, log, timeindex这三个文件。
log文件有点像RocketMQ的commitlog文件,但是Kafka是以分区为维度进行存储的,RocketMQ存储的则是整个Broker的所有消息。
每个Partition分区下面是由多个Segment(段)组成的,Segment是逻辑概念,实际上会对应到上面的三个文件:
-
log
:数据文件,存储实际的消息数据; -
index
:索引文件,存储消息数据的索引; -
timeindex
:索引文件,提供时间维度的检索。
Segment文件的命名规则:Partition的第一个Segment文件从0开始,后续每生成一个新的Segment文件的时候,文件名以当前Partition的最大offset为基准,文件名长度为64位long类型。
Segment生成相关配置:
-
log.segment.bytes
: 每个segment的大小,达到这个大小后会创建一个新的segment,默认是1G; -
log.segment.ms
: 配置每隔多少毫秒产生一个新的segment,默认是7天。
2.2 log数据文件
log文件存储实际的消息数据,可以通过参数log.segment.bytes
指定一个log文件大小,log文件的消息是顺序写的。
2.3 index索引文件
index:是一个稀疏索引,默认的,Kafka每接收4k(可通过log.index.interval.bytes
参数配置)就记录当前一条消息的offset和消息在log日志中的实际位置到index索引文件。也就是说,Kafka是采用稀疏索引来实现信息检索的,如下图,Kafka会把offset为3,7,10的消息的offset以及在log文件中的实际位置存入index文件中:
我们可以通过以下命令查看index文件的内容:
log.index.interval.bytes
:索引条目区间密度,默认4k,每接收4k就记录当前一条消息的offset。增加索引条目的区间密度会影响索引文件的区间密度和查询效率。
2.3.1 Kafka是如何基于offset查找消息的 ?
当我们要根据offset在log文件中查找消息的时候,首先会根据offset定位到具体的Segment,然后去查找Segment中的index文件,通过二分查找快速定位到offset的存储范围在log文件中的起始地址;当拿到起始地址之后,从log文件的起始地址开始顺序查找,直到找到匹配的offset的消息:
index相关配置:
-
log.index.interval.bytes
:索引间隔,即每接收多少数据会记录一个索引,默认为4k;
2.4 timeindex索引文件
存储消息时,除了会维护index索引文件,也会维护timeindex索引文件,timeindex同样是稀疏索引,timeindex索引文件存储消息发送的时间点以及offset。
2.4.1 Kafka是如何基于时间查找消息的?
要通过时间戳a查找消息:
-
首先会根据时间戳a基于时间戳索引定位到具体的Segment,定位方法:
-
将时间戳a与每个Segment的timeindex中最大时间戳对比,找到最大时间戳不小于时间戳a的记录,如果找到了,则继续按以下步骤在这个Segment中查找消息;
-
使用二分法查找timeindex文件,找到不大于时间戳a的最大索引项,从而获取到该索引项存储的offset;
-
使用offset二分查找index文件,找到不大于offset的最大索引项的log文件物理位置p;
-
在log文件中定位到物理位置p,开始查找不小于时间戳a的消息。
如下图,要基于时间戳1636773676499
查找消息,先定位到具体的Segment,然后按以下步骤查找:
-
在timeindex中查找时间戳不大于1636773676499的最大记录,最终找到
1636773676498
,对应的offset为7; -
在index中查找offset不大于7的最大索引项的log文件物理位置,这里即为offset=7的索引的log文件物理地址p;
-
到log文件中定位到物理地址p,开始查找时间戳不小于
1636773676499
的记录,找到第一条,就是我们要找的消息。
2.5 Kafka的日志清理策略是怎样的?
Kafka的日志清理策略cleanup.policy
有两种:Delete策略和Compact策略。
2.5.1 delete策略
默认的的策略,当Segment的不活跃时间大于设置的时间的时候,就删除对应的Segment。具体配置参数:
-
retention.bytes
:总的segment的大小限制,超过这个值之后,会删除旧的segment。默认为-1,表示无大小限制; -
retention.ms
:Segment最后一次写入日志记录的时间与当前时间的时间差,如果超过配置的值,则删除这个Segment。默认是168h,即7天; -
log.retention.check.interval.ms
:检查是否有可删除日志的间隔时间,默认是300s,5分钟; -
file.delete.delay.ms
:删除延迟时间,在真正删除文件之前,继续保留文件的时间,默认为1分钟。
2.5.1.1 如果日志增长很慢,delete策略下如何配置才能触发文件清理?
在delete策略下,我们如果要日志保留3天,可以这样设置:
1retention.ms: 259200000 # 3天
但是如果日志文件增长很慢,3天之后,日志文件大小还没有达到retention.bytes
的值,那么就不会生成新的Segment文件,仍然用的是同一个Segment文件,所以不能直接删除Segment文件。
如果想要真正达到清理3天之前的日志的效果,就需要优化一下配置了,可以添加设置:
1segment.ms: 86400000 # 24小时
这样,每隔24小时,只要有新数据进来,就会产生新的Segment,从而可以触发retention.ms
的三天清除策略了。
总结:对于写速度很慢的Topic,为了优化存储,需要控制:segment.ms < retention.ms。
2.5.2 compact策略
在这种模式下,日志不会被删除,但会被去重清理。这种模式下要求每个日志记录都必须有key,kafka按照一定的时机清理Segment中的key:对于同一个key,只保留最新的那个key。
每个Partition的日志,以Segment为单位,会被分为两部分,已清理和未清理的部分。未清理的部分又可以分为可清理和不可清理。
对于compact清理策略,Segment可清理部分的清理思路是这样的:
Kafka根据key来去重合并,对于可清理的部分,每个key保留一个最新的值。如果清理后的Segment太小,Kafka会按照一定的策略合并这些Segment,避免Segment过于碎片化。
2.5.2.1 什么情况下会用到compact策略策略?
比如,当我们按照一定的逻辑计算到每个用户的粉丝数,并且每几分钟就更新一次,把用户的粉丝数都存到Kafka中,任何需要用户粉丝数的业务都可以从Kafka获取数据。
此时就不能使用delete策略了,因为数据不能删,但是每次重复计算之后,用户粉丝数都会多一份数据,我们只是需要最新的那一个粉丝数,为此,可以把用户id作为key,通过使用compact策略,把重复的历史用户粉丝数给清理掉。
更多关于compact测量队配置参数:
-
min.cleanable.dirty.ratio
:可以进行compact的脏数据的比例; -
dirtyRatio = dirtyBytes / (cleanBytes + dirtyBytes)
,其中dirtyBytes表示可被清理部分的日志大小,cleanBytes表示已清理部分的日志大小。默认值是0.5
,即脏数据达到了总数据的50%才进行清理,这样配置可以减少清理次数,提高清理的性价比,如果需要更及时的清理策略,可用调低该值; -
min.compaction.lag.ms
:设置一条消息投递到Kafka后,多久时间内不会被compact。默认是0,表示不会根据消息投递的时间来决定消息是否应该被compacted。这个配置可用于支持获取一定时间内的历史快照的业务场景。
对于日志增值很慢的topic,同样需要配合segment.ms配置来配合清理日志。
看到这里,是不是对Kafka的存储原理有了比较深入的了解了呢?想看更多中间件的相关文章,欢迎关注我的博客IT宅(itzhai.com)
或者Java架构杂谈
公众号。
3. 集群
由于我的测试服务器内存比较小,我们先来配置Kafka启动内存,大家根据自己的实际情况进行配置:
1.修改bin
目录下的 zookeeper-server-start.sh
,将初始堆的大小(-Xms)设置小一些
1export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"
-
修改
bin
目录下的kafka-server-start.sh
文件,将初始堆的大小(-Xms)设置小一些
1 export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"
注意:listeners中的IP地址要配置为服务器的IP地址
listeners=PLAINTEXT://192.168.1.101:9093
然后启动三个Kafka实例,部署好之后,可以通过Zookeeper查看集群节点数:
即,目前的集群情况如下:
3.1 创建集群
每个Topic可以配置为多个分区,每个分区可以有多个副本,副本称为Replica,在副本集合中会存在一个Leader副本,Leader负责所有的读写请求,其余副本只负责从Leader同步备份数据。
3.1.1 ✨创建Topic
接下来新建一个Topic,副本数设置为3,分区数设置为2:
3.1.2 ✨查看所有Topic
3.1.3 ✨查看Topic分区详情
查看刚刚创建的 itzhai-com-topic-1 Topic的信息:
Topic信息属性说明:
-
Topic:主题名称
-
TopicId:主题id
-
PartitionCount:主题分区数量
-
ReplicationFactor:副本数量
-
Configs:主题详细配置,每一行配置表示一个分区信息
-
Topic:主题名称
-
Partition:分区号
-
Leader:分区的Leader副本,负责当前分区的所有读写请求
-
Replicas:存放分区备份的节点
-
Isr(In-Sync Replica):这个集合是Replicas的一个子集,列出当前还存活着,并且已经同步备份了该分区的节点
此时,服务器状态如下图所示:
3.1.3.1 ISR是干嘛的?
Isr(In-Sync Replica):是Replicas的一个子集,列出当前还存活着,并且已经同步备份了该分区的节点。
Isr中包括Leader副本,以及与Leader副本保持同步的Follower副本。
3.2 重新选主
副本会均匀分配到多个Broker节点上,当Leader节点挂了之后,会从副本集中选出一个新的副本作为Leader继续对外提供服务。
下面我们来测试一下,我们把Broker-1给停掉(找到servier.properties中broker.id=1的进程):
1ps aux | grep server-1.properties
2kill 28329
再次查看Topic状态:
发现Partition 1的Leader已经从Broker-1切换到了Broker-2,Broker-1已经从Isr副本集合中移除了。服务器状态如下所示:
3.3 总控制器
3.3.1 总控制器是干嘛的?
我们再来看一下Kafka的集群架构图:
在Broker集群中,会选举出一个Controller总控制器
。总控制器主要负责:
-
监听集群信息变更:
-
监听集群变更:为
Zookeeper的/brokers/ids
节点添加BrokerChangeListener,用于处理Broker节点增减变更; -
监听Topic变更:
-
为
Zookeeper的/brokers/topics
节点添加TopicChangeListener,用于处理Topic增减变更; -
为
Zookeeper的/admin/delete_topics
节点添加TopicDeletionListener,用于处理删除Topic的事件; -
为
Zookeeper的/brokers/topics/[topic]
节点添加PartitionModificationsListener,用于监听Topic分区分配变更; -
选举Partition分区Leader:分区的Leader副本宕机之后,Controller负责为该分区选举一个新的Leader副本;
-
更新集群元数据信息:感知到分区的ISR集合有变更之后,Controller通知所有的Broker更新其元数据信息。
Zookeeper中存储了Kafka集群信息,可以从Zookeeper中查看到当前总控制器是哪个Broker:
可以发现,id为0的Broker是当前的总控制器。
3.3.2 总控制器是如何选出来的?
首次选举Controller:集群启动的时候,每个Broker节点都会尝在Zookeeper中创建临时节点/controller
,最终只会有一个节点能够创建成功,这个节点就会作为Controller总控制器。
重新选举Controller:当Controller所在的Broker发生故障之后,Zookeeper中的/controller临时节点会被删除,/broker/ids中对应的Broker节点信息业会被删除。其他Broker节点监听这两个Zookeeper节点,当监听到/controller临时节点消失了,就会尝试往Zookeeper创建该节点,写成功的那个Broker将会成为新的Controller。
3.3.3 Topic的最优Leader副本是如何选举出来的?
一般的,在分布式系统中,Leader的选举算法很多,如Zab、Raft、Viewstamped Replication等。Kafka使用的Leader选举算法更像是微软的PacificA算法。
Controller负责为Topic选取Leader副本:Controller从ISR列表中选择第一个分区作为Leader,因为ISR第一个分区可能是同步数据最多的副本,可以尽可能保证数据不丢失。
重新选举Leader副本:当Controller监听到/brokers/ids
中的Broker节点消失的时候,会重新执行Leader选举流程。
相关参数:
-
unclean.leader.election.enable
:true表示当ISR列表所有副本都挂了之后,可以在ISR以外的副本选取Leader副本。从而可以提高可用性,但是可能会导致丢失更多的数据;false表示只能从ISR中选择Leader副本。
3.3.3.1 Kafka的Topic Leader选举机制有啥优势?
与一般的少数服从多数选举算法不同,Kafka通过使用ISR来实现选举的,ISR的数量不需要超过副本数量的一半,从而使得在可靠性和吞吐量上面取得平衡,一般我们设置为一个大于1的值。
3.4 Rebalance机制
Rebalance机制是Kafka消费机制的核心。
当消费组消费者数量发生变化、或者消费组消费主题数量变化、主题分区数量变化等的时候,Kafka会重新分配消费者和分区的关系,也就是做一次Rebalance。
Kafka保证一个Topic分区只会配给一个组内的消费者,而一个消费者可以消费多个分区。
关于Rebalance的具体原理,找到了一篇讲的比较好的文章,可以参考:Apache Kafka Rebalance Protocol, or the magic behind your streams applications[2]
3.4.1 什么时候会触发Rebalance机制?
当发生以下情况时,会触发Rebalance机制:
-
消费者的数量发生变化:
-
主题分区的数量发生变化:
-
消费组订阅的Topic数量发生了变化:
3.4.2 Kafka中有哪些Rebalance的策略?
Rebalance策略主要有三种:Range、RoundRobin、StickyAssignor(粘性分配器)。在声明消费者的时候可以指定:
1props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StickyAssignor.class.getName());
属于同一组的所有消费者必须声明一个共同的策略。如果消费者尝试加入分配配置与其他组成员不一致的组,会引发如下异常:
1org.apache.kafka.common.errors.InconsistentGroupProtocolException: The group member’s supported protocols are incompatible with those of existing members or first group member tried to join with empty protocol type or empty protocol list.
3.4.2.1 RangeAssignor[3]
范围分配器,这是默认的策略。
范围分配器是在每个主题基础上工作的,对于每个主题,按照数字顺序排列可用分区,使用组协调器分配的member_id按字典顺序排列消费者。然后将分区数除于消费者总数,以确定分配给每个消费者的分区数,如果没有均匀划分,那么前几个消费者将有一个额外的分区。
如下例子:
假设有两个消费者C0和C1,两个主题T0和T1,每个主题有3个分区,使用范围分配器最终分配结果:
-
C0 = {T0P0, T0P1, T1P0, T1P1}
-
C1 = {T0P2, T1P2}
RangeAssignor有何缺点?
我们假设消费组中的消费者数量多于主题的分区数量,则会出现以下情况:
可以发现C3消费者并没有在消费任何分区的消息,并没有尽可能地使用到所有的消费者。
3.4.2.2 RoundRobinAssignor[4]
循环分配器,让消费组中的所有消费者平均分配可用分区。同样的,首先按照顺序排列可用分区和消费者。循环分配器把所有主题的所有可用分区轮训地分配给订阅它们的消费者。
假设所有消费者实例订阅的主题都相同,则分区将均匀分布:
T0和T1的分区:T0P0, T0P1, T0P2, T1P0, T1P1, T1P2将以此轮训的分配给C0和C1。
可以发现,RoundRobin尽可能地使用到了所有的消费者,把分区更均匀的分配给消费者。
上一节的例子使用RoundRobin策略,结果如下图所示:
可以发现C2也被利用起来了。
RoundRobin有何缺点?
虽然RoundRobin尽可能的利用所有的消费者,但是一旦消费者数量发生变化触发Rebalance时,会导致更多的分区重分配。
3.4.2.3 StickyAssignor[5]
粘性分配策略,与RoundRobin类似,但是在Rebalance时,会遵循以下原则:
-
分区尽可能保证分布均匀;
-
分区分配尽可能保持不变更;
优先保证分布均匀。
使用StickyAssignor策略的情况下,假如C1挂了,那么只需要把原本C1的T0P1分区分配给C2即可:
3.4.3 集群消费Rebalance机制是如何工作的?[2]
Kafka的Rebalance流程会经历以下几个阶段:
3.4.3.1 选择组协调器阶段
Kafka会为每个消费组选择一个Broker来作为组协调器,组协调器负责监控消费组里所有消费者的心跳,判断机器是否下线,以及开启消费者Rebalance。
消费组中的每个消费者在启动的时候都会向Kafka集群中的某个节点发送FindCoordinator
请求来查找对应的组协调器GroupCoordinator
,并与之建立网络连接。
如何选择组协调器?
Kafka会选择消费分组正在使用的consumer_offsets分区对应的Broker作为ConsumerGroup的Coordinator。
消费分组写消息的consumer_offsets分区号:
hash(ConsumerGroupId) % __consumer_offsets 主题的分区数。
3.4.3.2 消费者加入消费组阶段
成功找到消费组对应的GroupCoordinator
之后,就进入加入消费组阶段。
此时消费者向GroupCoordinator
发送JoinGroup
请求,申请加入消费组,此时会启动Rebalance协议。
Join Group 包含了一些消费者客户端配置信息,如session.timeout.ms
和max.poll.interval.ms
等,组协调器使用这些属性进行消费者下线状态判断。另外,请求中包含了成员支持的客户端协议列表,以及用于执行客户端协议的元数据。
GroupCoordinator
会从Consumer Group中选择第一个加入消费组的消费者作为组长
(Leader),并把消费组的情况发送给这个组长,组长负责在本地制定分区方案。
3.4.3.3 同步与执行分区方案阶段
消费组的组长制定好分区方案后,给GroupCoordinator发送SyncGroup
请求,并附加上制定好的分区作业,非组长则简单的发送一个空请求。:
然后GroupCoordinator把分区方案响应给组里的所有消费者。最终消费者连接指定的分区,并进行消息消费:
每个消费者定期向组协调器发送心跳请求,以保持会话状态(相关配置:heartbeat.interval.ms)。如果此时正在进行Rebalance操作,组协调器会响应告知消费者需要重新加入组。
当集群节点比较多的时候,Rebalance可能会花费比较多的时间,导致消耗Broker服务器的资源,影响消费性能,为此,尽量选择在系统负载比较低的时候进行Rebalance。
注意:通过assign指定消费分区的情况下,Kafka不会进行Rebalance:
1consumer.assign(Collections.singletonList(new TopicPartition("itzhai-test-topic", 0)));
4. 消息的投递
消费的时候,会从集群的Leader节点进行读写请求。
4.1 生产消息如何投递?
生产者将消息发送到Topic的某一个分区中,一般通过round-robin
做简单的负载均衡,也可以通过自定义分区器根据消息中的某一个关键字来做分区,后者使用更广泛。
关于自定义分区器的例子,参考:Custom Partitioner in Kafka: Let’s Take a Quick Tour!. Retrieved from https://dzone.com/articles/custom-partitioner-in-kafka-lets-take-quick-tour[9]
我们可以直接使用Kafka提供的bash脚本来尝试发送消息,接下来演示下。
4.1.1 ✨发送消息例子
4.2 生产者相关参数
生产者最重要的三个参数[1]是:
-
acks
-
compression
-
batch size
4.2.1 投递ack: acks
消息投递的持久化机制。有如下几个配置策略:
acks=0
acks=0
:不需要ACK,生产者消息发出之后,不需要等待Broker的确认回复,就可以继续发送下一条消息。
优点:性能高;
缺点:容易丢消息;
使用场景:适合对性能要求比较高但是对数据可靠性要求比较低的场景,如写日志。
acks=1
acks=1
: 生产者发出消息后,需要Leader副本的ACK,Leader将数据持久化到本地log之后,就确认回复,而不需要等待Follower副本写入成功。这也是默认的配置值。
优点:比acks=0可靠,确保消息写入到了Leader;
缺点:Leader挂掉的时候,Follower没有成功备份数据,那么消息会丢失;
使用场景:对消息可靠性有一定要求,但是不是很高,消息丢失之后通过专门的补偿机制去保证数据的完整性,并且对性能要求高的场景。如订单状态更新消息,假如消息丢失了,还有定时任务去轮训补偿。
acks=all 或者 -1
acks=all或者-1
: 需要等到min.insync.replicas个副本(包括Leader副本)都成功写入消息,才进行确认回复。Leader挂了,触发选举机制,选举策略是优先选举同步成功的Follower节点为新的Leader。
假设min.insync.replicas=2,则有如下同步过程:
优点:消息的可靠性可以得到更大程度的保证;
缺点:性能更低;
使用场景:对消息可靠性要求很高的场景,不允许丢失消息,如金融业务。
min.insync.replicas
acks=all或者-1时,min.insync.replicas参数设置必须成功写入日志的最小副本个数,如果达不到这个数量,那么生产者将引发异常:NotEnoughReplicas 或者NotEnoughReplicasAfterAppend。
通过配合min.insync.replicas 和 acks 一起使用,你可以拥有更大的持久性保证。
如果要保证更高持久化可靠性,一般的,如果Topic的副本因子为3 ,那么一般将 min.insync.replicas 设置为 2,acks设置为all,如果大多数副本没有收到写入,这将确保生产者引发异常。
4.2.2 重试: retries和retry.backoff.ms
retries用于配置重试次数,配置为大于0,则在发送失败、网络异常等场景下回触发重新发送。重试可能会导致消息的重复投递,需要消费端做好消费幂等处理。
支持重试可能影响消息的顺序性,比如:
a b两个批次发送到Broker的单个分区中,a批次第一次发送失败了,但是b批次发送成功了,导致Broker先接收到b批次,然后重试发送a批次,最终导致Broker分区中的a b批次消息顺序改变了。
如果需要确保这种情况的顺序性,请配置max.in.flight.requests.per.connection
参数的值为1。
如果重试次数用完之前,就到达到了重试超时时间(达到了delivery.timeout.ms
配置的值),那么将不继续进行重试。一般的,用户更愿意使用delivery.timeout.ms
来控制重试行为。
retry.backoff.ms
参数则是用于控制重试间隔。
4.2.3 分批发送: buffer.memory, batch.size和linger.ms
设置分批发送每个批次的大小。
Kafka为了提高发送消息,将生产者请求传输的所有记录组合成一个一个的批次进行分批发送,这类似于TCP中的Nagle算法。
这个暂存消息的发送缓冲区大小是通过buffer.memory
参数进行设置的。
分批批次大小是通过batch.size
参数进行设置的。
一旦获取到batch.size大小的批次之后,就立刻发送出去。
linger.ms
参数控制最多每间隔多久发送一个批次,如果在linger.ms间隔内就获取到了完整的批次,那么就会立刻发送出去。如果等到linger.ms时间,还没有收集到完整的一批数据,那么也会强制发送出去。
linger.ms默认值为0,表示消息会立即被发送出去,发送效率相对较低。
如果消息生产的速度太慢,为了避免消息一直发送不出去,注意留一下linger.ms
配置的发送间隔,可以适当缩小发送间隔。
4.3 Kafka是如何保证数据的可靠性?
4.3.1 生产端
在Kafka 0.8.0之前,是没有副本的概念的,数据可能会丢失,只能存储一些不重要的数据。
从0.l8.0banb开始引入了分区副本,每个分区可以配置几个副本。Kafka的分区多副本机制是可靠性保证的核心。
为了保证可靠性,我们可以使用同步发送,根据不同的场景,配置合理的acks参数值。
为了严格保证可靠性,以下是需要的配置:
-
生产者
:acks=all,并且使用同步阻塞的方式发送消息; -
Topic
:replication.factor >= 3,min.insync.replicas >= 2; -
Broker
:unclean.leader.election.enable=false,确保ISR集合中没有可用的在线副本时,不会去选举ISR之外的副本作为新的Leader。
unclean.leader.election.enable设置为true,意味着允许选举非ISR集合的副本作为新的Leader,即使配置了acks=all,新选举出来的Leader也可能消息是落后的。
如下图,原本ISR中有三个副本,某个时间之后,Follower1脱离了ISR,并且落后Leader比较多:
此时ISR中的副本都下线了,unclean.leader.election.enable=true,那么,会把Follower1选举为新的 Leader:
此时新的Leader副本开始接收消息,假如原来的Leader此后又恢复了,称为了新的Follower副本,那么会开始尝试从新的Leader副本同步消息,此时这个新的 Follower副本的LEO比新的Leader还要大,最终会把这个新的Follower副本的日志进行截断,保持与心Leader一致,最终导致数据丢失:
4.3.2 消费端
对于消费端,为了避免丢失未处理完的消息,需要设置为手动提交。
4.4 Kafka是如何保证数据的一致性?
保证数据一致性,也就是无论是对于老的Leader,还是新选举出来的Leader,消费者都需要读到一样的数据。
为了支持以上特性,Kakfa引入了HW
(High Watermark)高水位的概念。ISR中每个副本最后的那个日志偏移量称为LEO
(Log End Offset),HW的取值为ISR集合中最小的LEO,消费者只能消费到HW对应的日志。有点抽象?IT宅来给大家上图,一看就懂:
如上图,ISR中有三个副本,Replica 0为Leader,副本0的消息3和消息4都没有完全同步给其他副本,所以HW在消息2处,消费者只能消费到消息2以及之前的消息。
通过引入HW,就避免让消费者消费到还没有完全同步到ISR中所有副本的消息,避免由于切换Leader导致能够读取到的消息变少了,从而导致数据不一致问题。
为了避免部分副本写入速度太慢,导致影响消费者消费消息的及时性,可以配置参数replica.lag.time.max.ms
参数,指定副本在复制消息时可被允许的最大延迟时间。如果超过这个时间副本还没有同步好消息,那么副本就会被剔出ISR集合。
HW是用于控制消费行为的,即使acks设置为0,超过HW的消息也是不能被消费者消费端。
讲完了消息的投递,我们接下来讲讲消息的消费。更多图解系列文章,欢迎关注我的博客IT宅(itzhai.com)
或者Java架构杂谈
公众号。
5. 消息的消费
5.1 集群消费与广播消费
5.1.1 Kafka中的集群消费(单播消费)
如上图,每个ConsumerGroup里面的消费者是一个集群,同一个ConsumerGroup的消费者共同消费Topic的消息,同一个Topic的一条消息只能被同一个ConsumerGroup的某一个Consumer消费,不能被重复消费,如果C0消费了一条消息,那么C1和C2就不会再消费这条消息了。要实现集群消费,只要把所有Consumer放到同一个ConsumerGroup中就可以了。
✨ 集群消费例子
我们启动了一个消费者,通过group.id参数指定消费分组arthinking
来消费消息了,从而达到了集群消费的效果。
5.1.2 Kafka中的广播消费
同一个Topic的一条消息可以被多个ConsumerGoup重复消费。如果要实现广播消费,只需要把Consumer放到不同的ConsumerGroup中就可以了。
✨ 广播消费例子
为了实现广播消费效果,我们继续启用新的消费组消费即可:
5.2 Kafka的消费进度如何维度
5.2.1 消费进度相关命令
我们现在来看一下消费组的消费进度:
✨ 查看消费组消费进度
可以发现,在arthinking消费分组中,P0和P1分区都正在被同一个消费者消费,这里可以看到详细的消费进度。
我们列出所有的Topic,发现有一个消费主题__consumer_offsets
,这个主题是用来维护消费进度的。
✨列出所有Topic
我们看看__consumer_offsets
这个Topic的详情:
可以发现,这个Topic有50个Partition,副本数为1。topic配置的清理策略是compact
,即总是保留最新的key。
5.2.3 __consumer_offsets
__consumer_offsets
这个Topic就是用于维护消费组的消费进度的。__consumer_offsets
中保存的也是普通的Kafka消息,主要保留三类消息消息:
-
Consumer group组元数据消息,如groupId,组成员状态,成员配置信息等;这类消息在Group Rebalance的时候写入;
-
Consumer group位移消息,存储消费组的消费进度;这类消息在提交消费进度的时候写入;
-
Tombstone消息或Delete mark消息。每当Consumer Group下已经没有任何激活的成员并且所有位移数据都被删除时,Kafka就会将该Group状态设置为Dead,并发送一条tombstone消息,表明要彻底删除这个Group的信息。这类消息在Kafka后台线程扫描并删除过期位移或者
__consumer_offsets
分区副本重分配的时候写入。
这里我们主要关注的就是Consumer group的位移消息。该消息的key的格式是:groupId + topic + partition分区号
,即,每个topic的每个分区,针对不同的消费分组,都会存储一个消费进度。value是消费偏移量offset。
__consumer_offset Topic
相关配置参数:
-
offsets.topic.num.partitions:分区数量,默认为50;
-
offsets.topic.replication.factor:副本因子,默认为1。
推荐副本因子设置成>1,以提供数据存储的可靠性。
5.3 消费者是如何提交offset的?
5.3.1 自动提交 enable.auto.commit
通过enable.auto.commit
参数,可以控制是否自动提交offset,默认为true。
如果设置为false,则消费完成之后,记得手动提交ack,否则,每次重启消费者之后,会继续从未提交的位置继续重复消费消息。
auto.commit.interval.ms
配置自动提交的时间间隔。
自动提交会有什么问题?
假设设置的自动提交时间间隔为1秒,取出一批数据之后,需要5秒才能消费完,但是还没消费完,程序就挂了。导致这批未被消费部分的数据再也没有机会被消费到了,即消息错过消费。
假设取出的一批数据为10条,假设成功处理了两条消息,还没有触发自动提交offset,消费程序就挂了,下次重启消费程序之后,会导致这两条消息再次被消费到,即消息重复消费。
5.3.2 同步提交&异步提交
如果设置为手动提交,需要调用提交的API。在kafka-clients的API中,kafka为我们提供了同步提交和异步提交的API。
同步提交:
1consumer.commitSync();
异步提交:
1consumer.commitAsync(new OffsetCommitCallback() {
2 @Override
3 public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
4 if (exception != null) {
5 log.error("提交消费进度异常,offsets:{}", offsets, exception);
6 }
7 }
8});
5.4 有哪些消费历史消息的方法?
5.4.1 指定分区消费
指定消费0分区:
1String TOPIC_NAME = "itzhai-com-test1";
2// 指定0分区
3consumer.assign(Collections.singletonList(new TopicPartition(TOPIC_NAME, 0)));
5.4.2 消息回溯消费
指定0分区,从头消费:
1String TOPIC_NAME = "itzhai-com-test1";
2consumer.assign(Collections.singletonList(new TopicPartition(TOPIC_NAME, 0)));
3// 从头消费
4consumer.seekToBeginning(Collections.singletonList(new TopicPartition(TOPIC_NAME, 0)));
5.4.3 指定offset消费
1String TOPIC_NAME = "itzhai-com-test1";
2consumer.assign(Collections.singletonList(new TopicPartition(TOPIC_NAME, 0)));
3// 指定偏移量
4consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);、
5.4.4 指定时间点消费
从指定时间点往后找到第一条消息的偏移量,开始消费。最终都是调用指定offset进行消费。
相关例子:
1// 消费8小时前的消息
2long beginConsumeTime = System.currentTimeMillis() - 1000 * 60 * 60 * 8;
3
4for (PartitionInfo partitionInfo : partitionInfos) {
5 seekMap.put(new TopicPartition(TOPIC_NAME, partitionInfo.partition()), beginConsumeTime);
6}
7consumer.assign(seekMap.keySet());
8
9Map<TopicPartition, OffsetAndTimestamp> foundOffsets = consumer.offsetsForTimes(seekMap);
10
11for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : foundOffsets.entrySet()) {
12 TopicPartition key = entry.getKey();
13 OffsetAndTimestamp value = entry.getValue();
14 if (key == null || value == null) {
15 continue;
16 }
17 consumer.seek(key, value.offset());
18}
19
20while (true) {
21 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
22 for (ConsumerRecord<String, String> record : records) {
23 System.out.println("消费消息: partition={}, offset={}, key={}, value={}");
24 log.info("消费消息: partition={}, offset={}, key={}, value={}",
25 record.partition(), record.offset(), record.key(), record.value());
26 }
27}
5.4.4.1 基于时间点消费底层是如何实现的?
由上面的例子可以发现,基于时间的消费,也是先找到对应时间的消息offset,最终都是基于offset去消费的。
5.5 消费者相关参数
5.5.1 消费提交: enable.auto.commit
参考 5.3.1 自动提交 enable.auto.commit
5.5.2 最大拉取消息数: max.poll.interval.ms
每次poll拉取的最大消息数,根据消费处理速度进行配置。如果消费者消费速度很快,则可以设置的大点。
5.5.3 消费者在线判断: heartbeat.interval.ms 和 session.timeout.ms
heartbeat.interval.ms
参数配置消费者给Broker发送心跳的间隔时间。当Broker进行Rebalance的时候,接收到了消费者的心跳,将把Rebalance方案响应给Consumer。
session.timeout.ms
Broker等待消费者发送心跳的最大时间,如果超过了这个时间,消费者就会被判断为出问题,会被踢出消费组,导致该消费者占用的Partition被重新分配给其他消费者。
5.5.4 最大poll时间间隔: max.poll.interval.ms
如果两次poll时间超过这个间隔,Broker就会认为这个消费者消费太慢了,会把消费者剔除消费组,让出分区,并把分区分配给其他的消费者进行消费。
5.5.4.1 为什么生产的消费者突然就不消费消息了?
如果消费者每次启动了,消费若干条消息就不再消费消息了,而生产者是有不断生产消息的,就需要确认消费者是否被T掉了,可能是两次poll的时间超过了max.poll.interval.ms
配置的值。为了解决这个问题,可以:
-
增加
max.poll.interval.ms
配置的时间,建议不要配置的太大,不然就没办法基于这个参数判断消费者的消费能力了,导致没法把分区重分配给消费能力更好的消费者; -
减小
max.poll.interval.ms
,即每次poll拉取的消息数降低点,避免消费时间过长; -
检查消费者消费性能是否有瓶颈,根据实际情况进行优化。
5.5.5 新消费组是否从头消费: auto.offset.reset
auto.offset.reset
用于配置新的消费组的消费行为,配置新的消费组是否从头开始消费分区,还是只消费增量的消息,可选配置:
-
latest
:只消费增量的消息,即消费消费组启动后分区接收到的消息,默认为该配置; -
earliest
:从头开始消费分区消息,后续会根据offset记录消费进度,消费增量的消息。
6. Kafka常见问题
6.1 Kafka为啥性能这么高?
大家知道RocketMQ是基于Kafka改造而来的,因此Kafka的高性能原因与RocketMQ类似,以下是Kafka高性能的原因:
-
磁盘顺序读写:Kafka写消息都是直接追加到文件末尾的,不会有随机写的情况,另外,不会随机删除日志,只会按照删除策略删除一整段的历史消息。
-
与RocketMQ不同的是,kafka不会像RocketMQ那样预分配一个很大的文件来存储消息,Kafka的顺序写可以理解为分段顺序写的,一般一台服务器只部署Kafka就更接近与完全顺序;
-
批量读写数据,以及压缩传输:
-
Rocket发送消息底层是分批发送的,提高了传输和存储的效率;
-
数据零拷贝技术:通过mmap内存映射,以及sendfile,减少了数据拷贝次数,提高了数据发送效率。
6.2 Kafka支持延时队列吗?
很遗憾,Kafka中并没有像RocketMQ中提供的那种延时队列功能,不过可以参考RocketMQ自己实现一个延时队列。RocketMQ不正是基于Kafka演变而来的么。
参考做法:按延时时间分为不同的延时等级,分别创建对应的延时主题:delay_1s, delay_10s, delay_30s…通过定时任务轮训这些主题,根据消息的创建时间,对比判断主题的队列是否到期,如果到期了,就把消息转发给具体的业务处理的topic进行处理,由于排在前面的消息肯定时候最早到期的,所以可以很快的找到所有要处理的消息,处理完毕。
6.3 Kafka支持事务消息吗?
Kafka中有事务的概念,但是并不支持类似RocketMQ中的分布式事务消息,Kafka中的事务只是用于保证发送多条消息时候,同时成功或者失败。
有时候,我们在做完一次业务处理之后,需要发多条不同的消息给不同的消费方,这个时候要确保消息同时发送成功的话,就可以使用Kafka的事务了[6]:
1Properties producerProps = new Properties();
2producerProps.put("enable.idempotence", "true");
3producerProps.put("transactional.id", "prod-1");
4KafkaProducer<String, String> producer = new KafkaProducer(producerProps);
5try {
6 // 开启事务
7 producer.beginTransaction();
8 producer.send(new ProducerRecord<>("itzhai‐topic", "123", "itzhai.com"));
9 producer.send(new ProducerRecord<>("arthinking‐topic", "123", "Java架构杂谈"));
10 producer.send(new ProducerRecord<>("itread‐topic", "123", "abc"));
11 // 提交事务
12 producer.commitTransaction();
13} catch (Exception e) {
14 // 回滚事务
15 producer.abortTransaction();
16} finally {
17 producer.close();
18}
6.4 Kafka如何避免重复消费?
避免重复消费,是任何消息队列中间件都不可避免遇到的问题,我么接下来说下在Kafka中导致重复消费的原因和解决方法。
6.4.1 生产端
如果生产端配置了重试机制,那么在网络不稳定,或者发送超时的情况下,就会尝试重新发送,这可能会导致Broker接收到重复的消息。
6.4.2 消费端
当消费端设置为自动提交Offset的时候,可能在消费一批数据过程中,还没来得及提交,服务就挂了,下次重启消费者,就会导致重复消费该批消息。
为了避免重复消费,在消费端,需要做好幂等处理。
6.5 消息堆积如何处理?
产生消息堆积的原因,不外乎两种:
-
消费端程序有bug,或者数据有问题,导致一直消费失败,消息一直得不到正确处理从而导致消息堆积;
-
消费者的消费性能太差,或者消费消息的时间太长了,导致消息堆积着来不及消费。
针对第一种情况,为了避免消息队列,可以把这种消息单独放到死信队列中做特殊处理。由于Kafka中并没有提供类似RocketMQ的那种死信队列[7],所以需要专门准备一个这样的主题充当死信队列。进入死信队列的消息需要进行分析并处理掉消费不成功的问题。
更进一步的,也可以参考RocketMQ,先把消费失败的消息放到一个专门负责重试的重试队列中,执行多次重试可以通过创建多个主题来完成,如果重试队列还是消费失败,则把消息放入死信队列。具体做法可以参考此文:Building Reliable Reprocessing and Dead Letter Queues with Apache Kafka. Retrieved from https://eng.uber.com/reliable-reprocessing/[8]
针对第二种情况,由于分区数量是固定的,即使增加消费者,也没办法加快消费速度。为了快速修复问题,可以修改消费者程序,把消息快速转发到另一个新的主题中,并给这个主题设置很多分区,最后启动对应数量的消费者进行消费:
6.6 消息顺序性如何保证?
为了保证消息消费的顺序性,最简单的做法就是:
-
发送端设置同步发送,避免异步发送导致乱序;
-
消费端消息统一发到同一个分区,通过一个消费者去消费消息。
但是这样会导致消息处理的效率很低,拖慢系统的吞吐量。
为了提高性能,需要考虑其他的思路。RocketMQ中,提供给了MessageQueueSelector接口,可以把具有相同标识(如订单号)的消息统一发到同一个消息队列中,参考IT宅上一篇文章(高并发异步解耦利器:RocketMQ究竟强在哪里?)。我们也可以考虑类似的思路:按照消息的某种标识,把相同标识的消息投递到同一个分区,从而保证同同一个标识的消息在分区中是顺序消费的。
6.7 消息如何回溯消费?
在某些场景下,如消费程序有问题时,修复了消费程序之后,想要重新消费之前已经消费过的消息,就需要用到回溯消息的功能更了。回溯消息支持指定offset消费,也支持指定时间点消费,参考5.4 有哪些消费历史消息的方法
。
6.8 如何实现消息传递保障?
对于消息中间件,可以提供多种传递保障:
-
最多一次,消息可能会丢失,但绝对不会重发;
-
至少一次,消息不会丢失,但有可能会导致重发;
-
正好一次,每个消息传递一次且仅一次。
在Kafka可以通过acks参数值控制传递保障行为:
-
最多一次:acks=0
-
至少一次:acks=all 或者 -1
-
正好一次:acks=all 或者 -1,消费端加上消费幂等性保证。当然,也可以使用Kafka的幂等性投递来实现。
Kafka中的幂等性投递消息是如何实现的?
相关参数:enable.idempotence
当设置为“true”时,生产者将确保只会投递一条消息到Broker中。如果为“false”,则生产者则可能会由于网络等问题导致重试投递,导致重复消息。请注意,启用幂等性要求
max.in.flight.requests.per.connection
小于或等于 5(保留任何允许值的消息排序),retries
大于 0,并且acks
必须为“all”。实现原理:Kafka每次发送消息的时候,会给消息生成PID和Sequence Number,一并发送给Broker,Broker根据PID和Sequence Number判断生产者发送过来的消息是否相同,只有不相同的才会接收并存储起来。
References
[1]: Kafka 3.0 Documentation. Retrieved from https://kafka.apache.org/documentation/#configuration
[2]: Apache Kafka Rebalance Protocol, or the magic behind your streams applications. Retrieved from https://medium.com/streamthoughts/apache-kafka-rebalance-protocol-or-the-magic-behind-your-streams-applications-e94baf68e4f2
[3]: Class RangeAssignor. Retrieved from https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html
[4]: Class RoundRobinAssignor. Retrieved from https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html
[5]: Class StickyAssignor. Retrieved from https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html
[6]: Exactly Once Processing in Kafka with Java. Retrieved from https://www.baeldung.com/kafka-exactly-once
[7]: Kafka Connect 101: Error Handling and Dead Letter Queues. Retrieved from https://www.youtube.com/watch?v=KJUlnmEjbTY
[8]: Building Reliable Reprocessing and Dead Letter Queues with Apache Kafka. Retrieved from https://eng.uber.com/reliable-reprocessing/
[9]: Custom Partitioner in Kafka: Let’s Take a Quick Tour!. Retrieved from https://dzone.com/articles/custom-partitioner-in-kafka-lets-take-quick-tour