说位移提交之前,我们首先简单的回顾一下位移和消费者位移之间的区别。我们通常所说的位移是指 TopicPartition 在 Broker 端的存储偏移量。而消费者位移是指某个消费者组在不同 TopicPartition 上面的消费偏移量。下面我们介绍一下消费者位移的提交方式,其中主要包含了自动提交和手动提交。
自动提交
对于启用自动提交位移,在 KafkaConsumer 实例初始化的时候,通过设置参数 enable.auto.commit 的值为 true 即可(默认为true)。同时与其相关联的参数 auto.commit.interval.ms,这个参数可以设置提交的时间间隔,这个值默认是5秒。
对于自动提交的触发条件,除了要满足时间的阈值,还需要Client端调用 KafkaConsumer.poll() 方法才能触发。每次执行都会调用 ConsumerCoordinator.poll() 执行消费者入组的流程,在方法执行的最后会执行一个异步的 offset 提交。实现代码如下:
手动提交
与自动提交相对应 就是手动提交了,此时在创建KafkaConsumer实例时,就需要指定 enable.auto.commit 的值为false。然后在处理完消息之后,自己手动执行 commit 操作。对于手动提交位移,又分为同步提交和异步提交。
同步提交
同步提交时通过 KafkaConsumer.commitSync()方法实现,其内部又调用了ConsumerCoordinator.commitOffsetsSync() 方法发送位移提交请求。同步的位移提交提供了重试的机制,其代码实现如下:
对于同步提交,官方文档上面提供了一次拉取,按照批次提交位移的方式,这样可以减少重复消费的批量。
另外一种方式是,可以按照TopicPartition 分区的维度去提交位移。
异步提交
对于同步的位移提交,通常情况下会影响系统的吞吐量。此时KafkaConsumer也提供了异步的提交方式,也就是commitAsync()。但是相对同步的位移提交,此时异步提交缺少了重试的机制,同步的重试机制可以在网络抖动的场景下,减少提交失败的场景。异步提交在这里没有重试机制,是因为重试的时候消费位移可能已经变化,此时提交已经没啥意义了。
在生产上面,还是建议使用异步的位移提交,这样也可以提升客户端的TPS。对于提交的方式,笔者从网上找到了如下的提交方式:
参考:《Apache Kafka 源码剖析》、《Kafka技术内幕》、《极客时间:Kafka核心技术与实战》