kafka
存储原理
为什么快
producer->broker->comsumer
segment文件命名规则
磁盘顺序读
topic (1)->partition(n) 分布在broker 负载均衡(根据key hash取模)
文件分段 数据文件分段(顺序读写、分段命令、二分查找)
数据文件索引(分段索引、稀疏存储)
查找的算法是
根据offset的值,查找segment段中的index索引文件。由于索引文件命名是以上一个文件的最后 一个offset进行命名的,所以,使用二分查找算法能够根据offset快速定位到指定的索引文件。
找到索引文件后,根据offset进行定位,找到索引文件中的符合范围的索引。(kafka采用稀疏索引的方式来提高查找性能)
得到position以后,再到对应的log文件中,从position出开始查找offset对应的消息,将每条消息
的offset与目标offset进行比较,直到找到消息
比如说,我们要查找offset=2490这条消息,那么先找到00000000000000000000.index, 然后找到 [2487,49111]这个索引,再到log文件中,根据49111这个position开始查找,比较每条消息的offset是 否大于等于2490。最后查找到对应的消息以后返回
Log文件的消息内容分析
根据kafka提供的命令
offset: 5371 position: 102124 CreateTime: 1531477349286 isvalid: true keysize:
-1 valuesize: 12 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1
sequence: -1 isTransactional: false headerKeys: [] payload: message_5371
offset和position这两个前面已经讲过了、
createTime表示创建时间、
keysize和valuesize表示key和 value的大小、
compresscodec表示压缩编码、
payload:表示消息的具体内容
日志的清除策略以及压缩策略
日志清除策略
前面提到过,日志的分段存储,一方面能够减少单个文件内容的大小,另一方面,方便kafka进行日志清理。日志的清理策略有两个
- 根据消息的保留时间,当消息在kafka中保存的时间超过了指定的时间,就会触发清理过程
- 根据topic存储的数据大小,当topic所占的日志文件大小大于一定的阀值,则可以开始删除最旧的消息。kafka会启动一个后台线程,定期检查是否存在可以删除的消息
通过log.retention.bytes和log.retention.hours这两个参数来设置,当其中任意一个达到要求,都会执行删除。
日志压缩策略
Kafka还提供了“日志压缩(Log Compaction)”功能,通过这个功能可以有效的减少日志文件的大小, 缓解磁盘紧张的情况,在很多实际场景中,消息的key和value的值之间的对应关系是不断变化的,就像 数据库中的数据会不断被修改一样,消费者只关心key对应的最新的value。因此,我们可以开启kafka 的日志压缩功能,服务端会在后台启动启动Cleaner线程池,定期将相同的key进行合并,只保留最新的 value值。
性能
磁盘存储的性能优化 顺序写
我们现在大部分企业仍然用的是机械结构的磁盘,如果把消息以随机的方式写入到磁盘,那么磁盘首先 要做的就是寻址,也就是定位到数据所在的物理地址,在磁盘上就要找到对应的柱面、磁头以及对应的 扇区;这个过程相对内存来说会消耗大量时间,为了规避随机读写带来的时间消耗,kafka采用顺序写 的方式存储数据。即使是这样,但是频繁的I/O操作仍然会造成磁盘的性能瓶颈
零拷贝
消息从发送到落地保存,broker维护的消息日志本身就是文件目录,每个文件都是二进制保存,生产者和消费者使用相同的格式来处理。在消费者获取消息时,服务器先从硬盘读取数据到内存,然后把内存中的数据原封不动的通过socket发送给消费者。虽然这个操作描述起来很简单,但实际上经历了很多步骤。
操作系统将数据从磁盘读入到内核空间的页缓存
- 应用程序将数据从内核空间读入到用户空间缓存中
- 应用程序将数据写回到内核空间到socket缓存中
- 操作系统将数据从socket缓冲区复制到网卡缓冲区,以便将数据经网络发出
通过“零拷贝”技术,可以去掉这些没必要的数据复制操作,同时也会减少上下文切换次数。现代的unix 操作系统提供一个优化的代码路径,用于将数据从页缓存传输到socket;在Linux中,是通过sendfile系 统调用来完成的。
Java提供了访问这个系统调用的方法:FileChannel.transferTo API 使用sendfile,只需要一次拷贝就行,允许操作系统将数据直接从页缓存发送到网络上。所以在这个优化的路径中,只有最后一步将数据拷贝到网卡缓存中是需要的
页缓存
页缓存是操作系统实现的一种主要的磁盘缓存,但凡设计到缓存的,基本都是为了提升i/o性能,所以页缓存是用来减少磁盘I/O操作的。
磁盘高速缓存有两个重要因素:
第一,访问磁盘的速度要远低于访问内存的速度,若从处理器L1和L2高速缓存访问则速度更快。
第二,数据一旦被访问,就很有可能短时间内再次访问。正是由于基于访问内存比磁盘快的多,所 以磁盘的内存缓存将给系统存储性能带来质的飞越。
Kafka中大量使用了页缓存, 这是Kafka实现高吞吐的重要因素之 一 。 虽然消息都是先被写入页缓存, 然后由操作系统负责具体的刷盘任务的, 但在Kafka中同样提供了同步刷盘及间断性强制刷盘(fsync), 可以通过 log.flush.interval.messages 和 log.flush.interval.ms 参数来控制。
同步刷盘能够保证消息的可靠性,避免因为宕机导致页缓存数据还未完成同步时造成的数据丢失。但是 实际使用上,我们没必要去考虑这样的因素以及这种问题带来的损失,消息可靠性可以由多副本来解决,同步刷盘会带来性能的影响。 刷盘的操作由操作系统去完成即可
一些配置
group.id
enable.auto.commit
auto.offset.reset
max.poll.records
Kafka消息的可靠性
副本机制
分区副本, 你可以创建更多的分区来提升可靠性,但是分区数过多也会带来性能上的开销,一般 来说,3个副本就能满足对大部分场景的可靠性要求
确认机制
Producer
acks,生产者发送消息的可靠性,也就是我要保证我这个消息一定是到了broker并且完成了多副 本的持久化,但这种要求也同样会带来性能上的开销。它有几个可选项 1, -1(全部) ,0
Consumer
保障消息到了broker之后,消费者也需要有一定的保证,因为消费者也可能出现某些问题导致消 息没有消费到
enable.auto.commit默认为true,也就是自动提交offset,自动提交是批量执行的,有一个时间窗口,这种方式会带来重复提交或者消息丢失的问题,所以对于高可靠性要求的程序,要使用手动提交。 对于高可靠要求的应用来说,宁愿重复消费也不应该因为消费异常而导致消息丢失
Producer
异步发送
kafka客户端会积累一定量的消息统一组装成一个批量消息发送出 去,触发条件是前面提到的batch.size和linger.ms
batch.size
生产者发送多个消息到broker上的同一个分区时,为了减少网络请求带来的性能开销,通过批量的方式 来提交消息,可以通过这个参数来控制批量提交的字节数大小,默认大小是16384byte,也就是16kb, 意味着当一批消息大小达到指定的batch.size的时候会统一发送
linger.ms
Producer默认会把两次发送时间间隔内收集到的所有Requests进行一次聚合然后再发送,以此提高吞 吐量,而linger.ms就是为每次发送到broker的请求增加一些delay,以此来聚合更多的Message请求。 这个有点想TCP里面的Nagle算法,在TCP协议的传输中,为了减少大量小数据包的发送,采用了Nagle 算法,也就是基于小包的等-停协议。
batch.size和linger.ms这两个参数是kafka性能优化的关键参数,很多同学会发现batch.size和 linger.ms这两者的作用是一样的,如果两个都配置了,那么怎么工作的呢?实际上,当二者都配 置的时候,只要满足其中一个要求,就会发送请求到broker上
负载均衡
批量
压缩
0:表示producer不需要等待broker的消息确认。这个选项时延最小但同时风险最大(因为当server宕
机时,数据将会丢失)。
1:表示producer只需要获得kafka集群中的leader节点确认即可,这个选择时延较小同时确保了
leader节点确认接收成功。
all(-1):需要ISR中所有的Replica给予接收确认,速度最慢,安全性最高,但是由于ISR可能会缩小到仅
包含一个Replica,所以设置参数为all并不能一定避免数据丢失.
Consumer
在实际生产过程中,每个topic都会有多个partitions,多个partitions的好处在于,
一方面能够对 broker上的数据进行分片有效减少了消息的容量从而提升io性能。
另外一方面,为了提高消费端的消费 能力,一般会通过多个consumer去消费同一个topic ,也就是消费端的负载均衡机制
consumer和partition的数量建议
- 如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的, 所以consumer数不要大于partition数
- 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配 consumer数和partition数,否则会导致partition里面的数据被取的不均匀。最好partiton数目是 consumer数目的整数倍,所以partition数目很重要,比如取24,就很容易设定consumer数目
- 如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition 上数据是有序的,但多个partition,根据你读的顺序会有不同
- 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的 partition会发生变化
什么时候会触发这个策略呢?
当出现以下几种情况时,kafka会进行一次分区分配操作,也就是kafka consumer的rebalance
1.同一个consumer group内新增了消费者
2.消费者离开当前所属的consumer group,比如主动停机或者宕机
3.topic新增了分区(也就是分区数量发生了变化)
分区分配策略
一种是Range(默认)、 另一种是RoundRobin(轮询)、 StickyAssignor(粘性)。
StrickyAssignor分配策略
** kafka在0.11.x版本支持了StrickyAssignor, 翻译过来叫粘滞策略,它主要有两个目的
分区的分配尽可能的均匀
分区的分配尽可能和上次分配保持相同
谁来执行Rebalance以及管理consumer的group呢?
Kafka提供了一个角色: coordinator来执行对于consumer group的管理,
当consumer group的第一个consumer启动的时 候,它会去和kafka server确定谁是它们组的coordinator。
之后该group内的所有成员都会和该 coordinator进行协调通信
如何保存消费端的消费位置
什么是 offset
前面在讲解partition的时候,提到过offset, 每个topic可以划分多个分区(每个Topic至少有一个分 区),同一topic下的不同分区包含的消息是不同的。每个消息在被添加到分区时,都会被分配一个 offset(称之为偏移量),它是消息在此分区中的唯一编号,kafka通过offset保证消息在分区内的顺 序,offset的顺序不跨分区,即kafka只保证在同一个分区内的消息是有序的; 对于应用层的消费来 说,每次消费一个消息并且提交以后,会保存当前消费到的最近的一个offset。那么offset保存在哪 里?
offset在哪里维护
在kafka中,提供了一个**consumer_offsets_***的一个topic, 把offset信息写入到这个topic中. consumer_offsets——按保存了每个consumer group某一时刻提交的offset信息。 __consumer_offsets 默认有50个分区。
Math.abs(“groupid”.hashCode())%groupMetadataTopicPartitionCount ;
**分区的副本机制 **(副本机制,导致kafka的partition不能过多, 文件句柄,同步,写入确认)
我们已经知道Kafka的每个topic都可以分为多个Partition,并且多个partition会均匀分布在集群的各个 节点下。虽然这种方式能够有效的对数据进行分片,但是对于每个partition来说,都是单点的,当其中 一个partition不可用的时候,那么这部分消息就没办法消费。所以kafka为了提高partition的可靠性而 提供了副本的概念(Replica),通过副本机制来实现冗余备份。
每个分区可以有多个副本,并且在副本集合中会存在一个leader的副本,所有的读写请求都是由leader 副本来进行处理。剩余的其他副本都做为follower副本,follower副本会从leader副本同步消息日志。 这个有点类似zookeeper中leader和follower的概念,但是具体的时间方式还是有比较大的差异。所以 我们可以认为,副本集会存在一主多从的关系。
一般情况下,同一个分区的多个副本会被均匀分配到集群中的不同broker上,当leader副本所在的 broker出现故障后,可以重新选举新的leader副本继续对外提供服务。通过这样的副本机制来提高 kafka集群的可用性
Kafka分区下有可能有很多个副本(replica)用于实现冗余,从而进一步实现高可用。副本根据角色的不同
可分为3类:
leader副本:响应clients端读写请求的副本
follower副本:被动地备份leader副本中的数据,不能响应clients端读写请求。
ISR副本:包含了leader副本和所有与leader副本保持同步的follower副本——如何判定是否与leader同 步后面会提到每个Kafka副本对象都有两个重要的属性:LEO和HW。注意是所有的副本,而不只是 leader副本。
LEO:即日志末端位移(log end offset),记录了该副本底层日志(log)中下一条消息的位移值。注意是下 一条消息!也就是说,如果LEO=10,那么表示该副本保存了10条消息,位移值范围是[0, 9]。另外, leader LEO和follower LEO的更新是有区别的。我们后面会详细说
HW:即上面提到的水位值。对于同一个副本对象而言,其HW值不会大于LEO值。小于等于HW值的所 有消息都被认为是“已备份”的(replicated)。同理,leader副本和follower副本的HW更新是有区别的
从生产者发出的 一 条消息首先会被写入分区的leader 副本,不过还需要等待ISR集合中的所有 follower副本都同步完之后才能被认为已经提交,之后才会更新分区的HW, 进而消费者可以消费 到这条消息。
ISR集合中的副本必须满足两个条件
\1. 副本所在节点必须维持着与zookeeper的连接
\2. 副本最后一条消息的offset与leader副本的最后一条消息的offset之间的差值不能超过指定的阈值 (replica.lag.time.max.ms) replica.lag.time.max.ms:如果该follower在此时间间隔内一直没有追 上过leader的所有消息,则该follower就会被剔除isr列表
ISR数据保存在Zookeeper的 /brokers/topics/
参考:
Kafka权威指南
实践: https://www.infoq.cn/article/Q0o*QzLQiay31MWiOBJH
0.7
0.8 副本机制
0.9
0.10 Kafka Streams
0.11 一个是提供幂等性 Producer API 以及事务(Transaction) API;另一个是对 Kafka 消息格式做了重构。
1.0 Kafka Streams改进
2.0
官方文档
https://kafka.apache.org/documentation/
rocketmq的优化
https://rocketmq.apache.org/rocketmq/how-to-support-more-queues-in-rocketmq/
https://rocketmq.apache.org/docs/motivation/
区别
怎么合理选择分区
https://www.confluent.io/blog/how-choose-number-topics-partitions-kafka-cluster/
高可用 复制机制
深入分析Kafka高可用性
producer
①kafka复制机制
②分区leader副本宕掉怎么选新的leader
③高水位与leader epoch的详细分析
④一些相关配置
所有的生产者请求和消费者请求都经过leader副本,leader副本以外的副本都是follower副本,follower副本不处理来自客户端的请求,它们唯一的任务就是从leader副本那里复制消息,保持与leader副本一致的状态。如果leader 副本发生崩溃,其中的一个follower副本会被提升为新的leader副本。
Kafka复制协议有两个阶段,
第一阶段,follower从leader获取到消息;
第二阶段,在下一轮的RPC中向leader发送fetch request确认收到消息。假设其他的follower也都确认了,那么leader会更新HW,并在接下来的RPC中响应给follower。
同时,在重启一个follower时,这个follower可能会把日志截断到HW处(意味着此follower将会删除一些消息),然后从leader获取消息。
会导致数据不一致 或者数据丢失
在0.11版本使用leader epoch解决这两个问题。
rocketmq和kafka
转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 951488791@qq.com