kafka深入浅出

  1. kafka
    1. 存储原理
    2. 性能
    3. 一些配置
    4. Kafka消息的可靠性
    5. Producer
    6. Consumer
    7. 深入分析Kafka高可用性
      1. producer

image-20190612200726343

kafka

存储原理

为什么快

​ producer->broker->comsumer

segment文件命名规则

​ 磁盘顺序读

​ topic (1)->partition(n) 分布在broker 负载均衡(根据key hash取模)

​ 文件分段 数据文件分段(顺序读写、分段命令、二分查找)

image-20200311212633833

​ 数据文件索引(分段索引、稀疏存储)

image-20200320180603193

查找的算法是

  1. 根据offset的值,查找segment段中的index索引文件。由于索引文件命名是以上一个文件的最后 一个offset进行命名的,所以,使用二分查找算法能够根据offset快速定位到指定的索引文件。

  2. 找到索引文件后,根据offset进行定位,找到索引文件中的符合范围的索引。(kafka采用稀疏索引的方式来提高查找性能)

  3. 得到position以后,再到对应的log文件中,从position出开始查找offset对应的消息,将每条消息

    的offset与目标offset进行比较,直到找到消息

比如说,我们要查找offset=2490这条消息,那么先找到00000000000000000000.index, 然后找到 [2487,49111]这个索引,再到log文件中,根据49111这个position开始查找,比较每条消息的offset是 否大于等于2490。最后查找到对应的消息以后返回

Log文件的消息内容分析

根据kafka提供的命令

 offset: 5371 position: 102124 CreateTime: 1531477349286 isvalid: true keysize:
-1 valuesize: 12 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1
sequence: -1 isTransactional: false headerKeys: [] payload: message_5371

offset和position这两个前面已经讲过了、

createTime表示创建时间、

keysize和valuesize表示key和 value的大小、

compresscodec表示压缩编码、

payload:表示消息的具体内容

日志的清除策略以及压缩策略

日志清除策略

前面提到过,日志的分段存储,一方面能够减少单个文件内容的大小,另一方面,方便kafka进行日志清理。日志的清理策略有两个

  • 根据消息的保留时间,当消息在kafka中保存的时间超过了指定的时间,就会触发清理过程
  • 根据topic存储的数据大小,当topic所占的日志文件大小大于一定的阀值,则可以开始删除最旧的消息。kafka会启动一个后台线程,定期检查是否存在可以删除的消息

通过log.retention.bytes和log.retention.hours这两个参数来设置,当其中任意一个达到要求,都会执行删除。

日志压缩策略

Kafka还提供了“日志压缩(Log Compaction)”功能,通过这个功能可以有效的减少日志文件的大小, 缓解磁盘紧张的情况,在很多实际场景中,消息的key和value的值之间的对应关系是不断变化的,就像 数据库中的数据会不断被修改一样,消费者只关心key对应的最新的value。因此,我们可以开启kafka 的日志压缩功能,服务端会在后台启动启动Cleaner线程池,定期将相同的key进行合并,只保留最新的 value值。

性能

磁盘存储的性能优化 顺序写

​ 我们现在大部分企业仍然用的是机械结构的磁盘,如果把消息以随机的方式写入到磁盘,那么磁盘首先 要做的就是寻址,也就是定位到数据所在的物理地址,在磁盘上就要找到对应的柱面、磁头以及对应的 扇区;这个过程相对内存来说会消耗大量时间,为了规避随机读写带来的时间消耗,kafka采用顺序写 的方式存储数据。即使是这样,但是频繁的I/O操作仍然会造成磁盘的性能瓶颈

零拷贝

消息从发送到落地保存,broker维护的消息日志本身就是文件目录,每个文件都是二进制保存,生产者和消费者使用相同的格式来处理。在消费者获取消息时,服务器先从硬盘读取数据到内存,然后把内存中的数据原封不动的通过socket发送给消费者。虽然这个操作描述起来很简单,但实际上经历了很多步骤。

操作系统将数据从磁盘读入到内核空间的页缓存

  • 应用程序将数据从内核空间读入到用户空间缓存中
  • 应用程序将数据写回到内核空间到socket缓存中
  • 操作系统将数据从socket缓冲区复制到网卡缓冲区,以便将数据经网络发出

通过“零拷贝”技术,可以去掉这些没必要的数据复制操作,同时也会减少上下文切换次数。现代的unix 操作系统提供一个优化的代码路径,用于将数据从页缓存传输到socket;在Linux中,是通过sendfile系 统调用来完成的。

Java提供了访问这个系统调用的方法:FileChannel.transferTo API 使用sendfile,只需要一次拷贝就行,允许操作系统将数据直接从页缓存发送到网络上。所以在这个优化的路径中,只有最后一步将数据拷贝到网卡缓存中是需要的

页缓存

页缓存是操作系统实现的一种主要的磁盘缓存,但凡设计到缓存的,基本都是为了提升i/o性能,所以页缓存是用来减少磁盘I/O操作的。

磁盘高速缓存有两个重要因素:

第一,访问磁盘的速度要远低于访问内存的速度,若从处理器L1和L2高速缓存访问则速度更快。

第二,数据一旦被访问,就很有可能短时间内再次访问。正是由于基于访问内存比磁盘快的多,所 以磁盘的内存缓存将给系统存储性能带来质的飞越。

Kafka中大量使用了页缓存, 这是Kafka实现高吞吐的重要因素之 一 。 虽然消息都是先被写入页缓存, 然后由操作系统负责具体的刷盘任务的, 但在Kafka中同样提供了同步刷盘及间断性强制刷盘(fsync), 可以通过 log.flush.interval.messages 和 log.flush.interval.ms 参数来控制。

同步刷盘能够保证消息的可靠性,避免因为宕机导致页缓存数据还未完成同步时造成的数据丢失。但是 实际使用上,我们没必要去考虑这样的因素以及这种问题带来的损失,消息可靠性可以由多副本来解决,同步刷盘会带来性能的影响。 刷盘的操作由操作系统去完成即可

一些配置

group.id

enable.auto.commit

auto.offset.reset

max.poll.records

Kafka消息的可靠性

副本机制

分区副本, 你可以创建更多的分区来提升可靠性,但是分区数过多也会带来性能上的开销,一般 来说,3个副本就能满足对大部分场景的可靠性要求

确认机制

Producer

acks,生产者发送消息的可靠性,也就是我要保证我这个消息一定是到了broker并且完成了多副 本的持久化,但这种要求也同样会带来性能上的开销。它有几个可选项 1, -1(全部) ,0

Consumer

保障消息到了broker之后,消费者也需要有一定的保证,因为消费者也可能出现某些问题导致消 息没有消费到

enable.auto.commit默认为true,也就是自动提交offset,自动提交是批量执行的,有一个时间窗口,这种方式会带来重复提交或者消息丢失的问题,所以对于高可靠性要求的程序,要使用手动提交。 对于高可靠要求的应用来说,宁愿重复消费也不应该因为消费异常而导致消息丢失

Producer

异步发送

kafka客户端会积累一定量的消息统一组装成一个批量消息发送出 去,触发条件是前面提到的batch.size和linger.ms

batch.size

生产者发送多个消息到broker上的同一个分区时,为了减少网络请求带来的性能开销,通过批量的方式 来提交消息,可以通过这个参数来控制批量提交的字节数大小,默认大小是16384byte,也就是16kb, 意味着当一批消息大小达到指定的batch.size的时候会统一发送

linger.ms

Producer默认会把两次发送时间间隔内收集到的所有Requests进行一次聚合然后再发送,以此提高吞 吐量,而linger.ms就是为每次发送到broker的请求增加一些delay,以此来聚合更多的Message请求。 这个有点想TCP里面的Nagle算法,在TCP协议的传输中,为了减少大量小数据包的发送,采用了Nagle 算法,也就是基于小包的等-停协议。

batch.size和linger.ms这两个参数是kafka性能优化的关键参数,很多同学会发现batch.size和 linger.ms这两者的作用是一样的,如果两个都配置了,那么怎么工作的呢?实际上,当二者都配 置的时候,只要满足其中一个要求,就会发送请求到broker上

​ 负载均衡

​ 批量

​ 压缩

0:表示producer不需要等待broker的消息确认。这个选项时延最小但同时风险最大(因为当server宕

机时,数据将会丢失)。

1:表示producer只需要获得kafka集群中的leader节点确认即可,这个选择时延较小同时确保了

leader节点确认接收成功。

all(-1):需要ISR中所有的Replica给予接收确认,速度最慢,安全性最高,但是由于ISR可能会缩小到仅

包含一个Replica,所以设置参数为all并不能一定避免数据丢失.

Consumer

在实际生产过程中,每个topic都会有多个partitions,多个partitions的好处在于,

一方面能够对 broker上的数据进行分片有效减少了消息的容量从而提升io性能。

另外一方面,为了提高消费端的消费 能力,一般会通过多个consumer去消费同一个topic ,也就是消费端的负载均衡机制

consumer和partition的数量建议

  1. 如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的, 所以consumer数不要大于partition数
  2. 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配 consumer数和partition数,否则会导致partition里面的数据被取的不均匀。最好partiton数目是 consumer数目的整数倍,所以partition数目很重要,比如取24,就很容易设定consumer数目
  3. 如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition 上数据是有序的,但多个partition,根据你读的顺序会有不同
  4. 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的 partition会发生变化

什么时候会触发这个策略呢?

当出现以下几种情况时,kafka会进行一次分区分配操作,也就是kafka consumer的rebalance

1.同一个consumer group内新增了消费者

2.消费者离开当前所属的consumer group,比如主动停机或者宕机

3.topic新增了分区(也就是分区数量发生了变化)

分区分配策略

一种是Range(默认)、 另一种是RoundRobin(轮询)、 StickyAssignor(粘性)。

StrickyAssignor分配策略

** kafka在0.11.x版本支持了StrickyAssignor, 翻译过来叫粘滞策略,它主要有两个目的

分区的分配尽可能的均匀
分区的分配尽可能和上次分配保持相同

谁来执行Rebalance以及管理consumer的group呢?

Kafka提供了一个角色: coordinator来执行对于consumer group的管理,

当consumer group的第一个consumer启动的时 候,它会去和kafka server确定谁是它们组的coordinator。

之后该group内的所有成员都会和该 coordinator进行协调通信

如何保存消费端的消费位置

什么是 offset

前面在讲解partition的时候,提到过offset, 每个topic可以划分多个分区(每个Topic至少有一个分 区),同一topic下的不同分区包含的消息是不同的。每个消息在被添加到分区时,都会被分配一个 offset(称之为偏移量),它是消息在此分区中的唯一编号,kafka通过offset保证消息在分区内的顺 序,offset的顺序不跨分区,即kafka只保证在同一个分区内的消息是有序的; 对于应用层的消费来 说,每次消费一个消息并且提交以后,会保存当前消费到的最近的一个offset。那么offset保存在哪 里?

offset在哪里维护

在kafka中,提供了一个**consumer_offsets_***的一个topic, 把offset信息写入到这个topic中. consumer_offsets——按保存了每个consumer group某一时刻提交的offset信息。 __consumer_offsets 默认有50个分区。

Math.abs(“groupid”.hashCode())%groupMetadataTopicPartitionCount ;

**分区的副本机制 **(副本机制,导致kafka的partition不能过多, 文件句柄,同步,写入确认)

我们已经知道Kafka的每个topic都可以分为多个Partition,并且多个partition会均匀分布在集群的各个 节点下。虽然这种方式能够有效的对数据进行分片,但是对于每个partition来说,都是单点的,当其中 一个partition不可用的时候,那么这部分消息就没办法消费。所以kafka为了提高partition的可靠性而 提供了副本的概念(Replica),通过副本机制来实现冗余备份。

每个分区可以有多个副本,并且在副本集合中会存在一个leader的副本,所有的读写请求都是由leader 副本来进行处理。剩余的其他副本都做为follower副本,follower副本会从leader副本同步消息日志。 这个有点类似zookeeper中leader和follower的概念,但是具体的时间方式还是有比较大的差异。所以 我们可以认为,副本集会存在一主多从的关系。

一般情况下,同一个分区的多个副本会被均匀分配到集群中的不同broker上,当leader副本所在的 broker出现故障后,可以重新选举新的leader副本继续对外提供服务。通过这样的副本机制来提高 kafka集群的可用性

Kafka分区下有可能有很多个副本(replica)用于实现冗余,从而进一步实现高可用。副本根据角色的不同

可分为3类:
leader副本:响应clients端读写请求的副本

follower副本:被动地备份leader副本中的数据,不能响应clients端读写请求。

ISR副本:包含了leader副本和所有与leader副本保持同步的follower副本——如何判定是否与leader同 步后面会提到每个Kafka副本对象都有两个重要的属性:LEO和HW。注意是所有的副本,而不只是 leader副本。

LEO:即日志末端位移(log end offset),记录了该副本底层日志(log)中下一条消息的位移值。注意是下 一条消息!也就是说,如果LEO=10,那么表示该副本保存了10条消息,位移值范围是[0, 9]。另外, leader LEO和follower LEO的更新是有区别的。我们后面会详细说

HW:即上面提到的水位值。对于同一个副本对象而言,其HW值不会大于LEO值。小于等于HW值的所 有消息都被认为是“已备份”的(replicated)。同理,leader副本和follower副本的HW更新是有区别的

从生产者发出的 一 条消息首先会被写入分区的leader 副本,不过还需要等待ISR集合中的所有 follower副本都同步完之后才能被认为已经提交,之后才会更新分区的HW, 进而消费者可以消费 到这条消息。

ISR集合中的副本必须满足两个条件

\1. 副本所在节点必须维持着与zookeeper的连接

\2. 副本最后一条消息的offset与leader副本的最后一条消息的offset之间的差值不能超过指定的阈值 (replica.lag.time.max.ms) replica.lag.time.max.ms:如果该follower在此时间间隔内一直没有追 上过leader的所有消息,则该follower就会被剔除isr列表

ISR数据保存在Zookeeper的 /brokers/topics//partitions//state 节点中

参考:

Kafka权威指南

实践: https://www.infoq.cn/article/Q0o*QzLQiay31MWiOBJH

版本https://blog.csdn.net/lidazhou/article/details/95909496?depth_1-utm_source=distribute.pc_relevant.none-task&utm_source=distribute.pc_relevant.none-task

0.7

0.8 副本机制

0.9

0.10 Kafka Streams

0.11 一个是提供幂等性 Producer API 以及事务(Transaction) API;另一个是对 Kafka 消息格式做了重构。

1.0 Kafka Streams改进

2.0

官方文档

https://kafka.apache.org/documentation/

rocketmq的优化

https://rocketmq.apache.org/rocketmq/how-to-support-more-queues-in-rocketmq/

https://rocketmq.apache.org/docs/motivation/

区别

https://zhuanlan.zhihu.com/p/73431430?utm_source=wechat_session&utm_medium=social&utm_oi=557574927448846336

怎么合理选择分区

https://www.confluent.io/blog/how-choose-number-topics-partitions-kafka-cluster/

高可用 复制机制

https://zhuanlan.zhihu.com/p/46658003?utm_source=wechat_session&utm_medium=social&utm_oi=557574927448846336

深入分析Kafka高可用性

producer

①kafka复制机制

②分区leader副本宕掉怎么选新的leader

③高水位与leader epoch的详细分析

④一些相关配置

所有的生产者请求和消费者请求都经过leader副本,leader副本以外的副本都是follower副本,follower副本不处理来自客户端的请求,它们唯一的任务就是从leader副本那里复制消息,保持与leader副本一致的状态。如果leader 副本发生崩溃,其中的一个follower副本会被提升为新的leader副本。

Kafka复制协议有两个阶段,

第一阶段,follower从leader获取到消息;

第二阶段,在下一轮的RPC中向leader发送fetch request确认收到消息。假设其他的follower也都确认了,那么leader会更新HW,并在接下来的RPC中响应给follower。

同时,在重启一个follower时,这个follower可能会把日志截断到HW处(意味着此follower将会删除一些消息),然后从leader获取消息。

会导致数据不一致 或者数据丢失

在0.11版本使用leader epoch解决这两个问题。

rocketmq和kafka


转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 951488791@qq.com

文章标题:kafka深入浅出

字数:4.7k

本文作者:zhengyumin

发布时间:2019-06-12, 20:05:37

最后更新:2020-06-27, 11:18:57

原始链接:http://zyumin.github.io/2019/06/12/Overview_Kafka/

版权声明: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。