Kafka生产者分区策略、ISR、ACK、幂等性、事务机制

  • 2022-10-27
  • 浏览 (1200)

本篇介绍kafka生产者的分区策略、ISR、ACK机制,故障处理,一致性语义,Exactly Once精准一次性语义等

分区策略

kafka的topic通过分区(partition)提升可扩展性和并发度,生产者将数据写入topic时选择分区有下面三种策略:

  1. 指定分区:写到指定的分区;
  2. 没指定分区:将key进行hash后再与该topic的分区数量进行取余得到分区号,可以保证同样的key的消息会进入同一个分区内;
  3. 没指定分区又没有key值:先随机生成一个整数(之后每次在这个整数上递增1),将这个整数和topic的分区数量进行取余得到分区号(Round-Robin算法),是kafka默认的分区分配策略,能保证负载均衡不会出现倾斜的情况。

消息压缩

在前一篇文章中我们提到 消息压缩可以提升kafka的性能,生产者在发送消息的时候将消息按照指定的压缩算法(gzip,snappy,lz4,zstandard)进行压缩,达到降低网络IO、优化broker磁盘空间的目的,但这样也会有更多的CPU消耗,因此是个需要权衡的功能。

在存储的消息格式中除了K-V的消息体之外还有其他描述性字段如attributes,它保存了消息的编码,压缩算法等信息。消费者通过该值记录的压缩算法进行解压,得到消息体。

各压缩算法的性能对比请参考: Kafka 2.1.0压缩算法性能测试

ISR机制

在前面介绍了每个topic的分区可以设置若干个副本(Leader、Follower),其中Follower实时同步Leader的数据,为了保证生产者发送的数据,能可靠的发送到指定的 topic,topic 的每个分区收到生产者发送的数据后,都需要向生产者发送应答 ack(acknowledgement),如果生产者收到 ack,就会进行下一轮的发送,否则重新发送数据。在Kafka中必须要所有Follower都完成同步时才会发送ack,这样的好处是当重新选举Leader时,只需要有n+1个副本即可容忍n台节点故障,但缺点也很明显就是延迟很高,因为必须等待所有follower都完成同步才行。

但这样又会带来新的问题,如果其中有一个follower由于网络延迟或者某种原因,迟迟不能完成数据同步,Leader就会一直阻塞等待直到该follower完成同步,这非常的影响性能,于是Kafka引入了一个新的机制:ISR(In-Sync Replica)。

ISR(In-Sync Replica):和Leader保持同步的follower集合,Leader不需要等待所有Follower都完成同步,只要在ISR中的Follower完成数据同步就可以发送ack 给生产者。如果ISR集合里的follower 的延迟时间超过配置的参数( _replica.lag.time.max.ms_)就会从ISR 内剔除,只需要保证副本能够发送到这些ISR集合里的follower即可。一旦Leader发生故障,就会从ISR集合里选举一个Follower作为新的Leader。

在版本0.90以前还有一个参数,根据follower和leader之间相差的数据量来控制是否在ISR集合内,大于配置的阈值则踢出ISR集合。

ACK机制

对于某些不太重要的数据,我们能容忍少量数据的丢失,不需要等待ISR内的Follower都完成同步再发送ack,这样的话可以牺牲可靠性来换来数据吞吐量的提升。通过对可靠性和写入延迟的权衡,我们可以选择下面三种可靠性级别:

  • acks = 0 :生产者只负责发消息,不管Leader 和Follower 是否完成落盘就会发送ack 。这样能够最大降低延迟,但当Leader还未落盘时发生故障就会造成 数据丢失
  • acks = 1 :Leader将数据落盘后,不管Follower 是否落盘就会发送ack 。这样可以保证Leader节点内有一份数据,但当Follower还未同步时Leader发生故障就会造成 数据丢失
  • acks = -1(all) :生产者等待Leader 和ISR 集合内的所有Follower 都完成同步才会发送ack 。但当Follower 同步完之后,broker发送ack之前,Leader发生故障时,此时会重新从ISR内选举一个新的Leader,此时由于生产者没收到ack,于是生产者会重新发消息给新的Leader,此时就会造成 数据重复

故障处理

  • LEO(Log End Offset):每个副本内最大的offset;
  • HW(High Watermake):消费者能见到的最大的offset,为ISR集合内最小的LEO;

如上图Leader写15条,ISR内两个follower分别写了10条和13条,那当前分区数据的HW为9,LEO分别为14,9,12。

为了保证消费的一致性,避免出现当消费者消费到offset13时发生故障,然后从新leader消费时没有offset为13的数据,因此对于消费者来说只能看到并消费HW之前的数据,在这种机制下,当消费者消费到offset9时Leader出现故障,此时无论是ISR内的哪个follower作为新的Leader都能保证消费端的一致性。

故障处理

  • Follower:follower 发生故障后会被临时踢出 ISR集合,当该 follower 恢复后,它会先读取上次记录的HW,并将本地log 文件内高于该HW 的部分截取掉,然后从 HW 的offset开始向 Leader 进行重新同步,保证自己的数据和当前的Leader数据一致。当该 follower 的LEO 大于等于(重新超过)该 分区的当前HW,就可以重新加入 ISR 了。
  • Leader:Leader 发生故障之后,控制器会从ISR 集合内选举一个新的Leader,然后为了保证多个副本之间的数据一致性,其余的Follower 会先将各自的 log 文件内高于HW 的部分截取掉,然后从HW 的offset开始向 Leader 进行重新同步,保证自己的数据和当前新的Leader的数据一致。

可以看出,HW,LEO是为了保证消费者端的消费一致性,避免出现由于Leader更换导致的消费offset不一致或数据不一致的情况。

而上面的ACK机制是为了保证生产者端的数据一致性,当选择acks=all时可以保证数据一定不会丢。

一致性语义

在分布式消息传递一致性语义上有下面三种:

  • At Least Once:消息不会丢,但可能会重复;
  • At Most Once:消息会丢,但不会重复;
  • Exactly Once:消息不会丢,也不会重复;

在kafka0.11版本之前是无法在kafka内实现exactly once 精确一次性保证的语义的,在0.11之后的版本,我们可以结合新特性 幂等性以及 acks=-1 来实现kafka生产者的exactly once。

  • acks=-1:在上面ack机制里已经介绍过,他能实现at least once语义,保证数据不会丢,但可能会有重复数据;
  • 幂等性:0.11版本之后新增的特性,针对生产者,指生产者无论向broker 发送多少次重复数据,broker 只会持久化一条;

在0.11版本之前要实现exactly once语义只能通过外部系统如hbase的rowkey实现基于主键的去重。

幂等性解读:

在生产者配置文件 producer.properties 设置参数 enable.idompotence = true 即可启用幂等性。

Kafka的幂等性其实就是将原来需要在下游进行的去重操作放在了数据上游。开启幂等性的生产者在初始化时会被分配一个PID(producer ID),该生产者发往同一个分区(Partition)的消息会附带一个序列号(Sequence Number),Broker 端会对作为该消息的主键进行缓存,当有相同主键的消息提交时,Broker 只会持久化一条。但是生产者重启时PID 就会发生变化,同时不同的 分区(Partition)也具有不同的编号,所以生产者幂等性无法保证跨分区和跨会话的 Exactly Once。

事务: kafka在0.11 版本引入了事务支持。事务可以保证 Kafka 在 Exactly Once 语义的基础上,生产和消费可以跨分区和会话,要么全部成功,要么全部失败。

生产者事务:

Kafka引入了一个新的组件Transaction Coordinator,它管理了一个全局唯一的事务ID(Transaction ID),并将生产者的PID和事务ID进行绑定,当生产者重启时虽然PID会变,但仍然可以和Transaction Coordinator交互,通过事务ID可以找回原来的PID,这样就保证了重启后的生产者也能保证Exactly Once 了。

同时,Transaction Coordinator 将事务信息写入 Kafka 的一个内部 Topic,即使整个kafka服务重启,由于事务状态已持久化到topic,进行中的事务状态也可以得到恢复,然后继续进行。

转自:https://blog.csdn.net/wsdc0521/article/details/108604420

3  赞