前面我们讲到了消费者端的网络模型,但是对于Kafka而言,在接收消息之前,消费者是必须要加入 group的。一个消费者 group 由一个或者多个消费者组成,原则上每个消费者都需要有一个 groupId。这个可以在KafkaConsumer创建的时候指定。当消费者组只有一个消费者时,此时可以认为就是点对点模式;当有多个消费者时,就可以认为是发布订阅模式。
对于Broker 端的TopicPartition 而言,一个Partition 只能被一个消费者消费。也就是说,假设一个Topic 有 3 个分区(TopicA),此时groupId 为test 的消费者组内有4 个消费者,此时组内的每个成员都订阅了TopicA。这个时候最多有3个消费者可以消费到数据,因为主题的分区只有3个。
Coordinator 组件
Coordinator 表示一类组件,其中包含了消费者端的 ConsumerCoordinator 和 Broker 端的 GroupCoordinator。在Broker 端,GroupCoordinator 负责的是:消费者 group 成员管理以及 offset 提交。消费者 offset 提交在老版本的 Kafka 中是存储在 Zookeeper 中的。新版本的 Kafka 中,将Topic 的消费 offset 存储在一个叫 __consumer_offsets 的主题中。这是一个 Kafka 内部主题,默认情况下会有 50 个分区,每个分区会有3个副本。
后面我们将会详细的讲解 ConsumerCoordinator 在客户端和 Broker 端的交互流程。其中主要包括,一个消费者如何加入 group 和 offset 提交。
ConsumerCoordinator 主体流程
首先我们在调用 KafkaConsumer.poll() 时,首先会去调用 ConsumerCoordinator.poll() ,然后也会去调用位移提交的相关操作。对于 ConsumerCoordinator.poll(),也就是上图中的入口,下面看下入口 ConsumerCoordinator.poll() 的代码实现。
简单的描述一下上面的流程:
1、ensureCoordinatorReady() 的主要作用是发送GroupCoordinator请求,并建立连接。
2、判断是否需要加入group,如果订阅主题分区发生变化,或者新消费者入组等,需要重新入组。此时是通过ensureActiveGroup() 发送JoinGroup、SyncGroup,并获取到分配给自身的TopicPartition。
3、检测心跳线程是否正常,心跳线程需要定时向GroupCoordinator发送心跳,超过约定阈值就会认为Consumer离组,触发Rebalance。
4、如果设置的是自动提交位移,达到时间阈值就会提交offset。
后面将对于 ensureCoordinatorReady() 和 ensureActiveGroup() 详细说明一下。
消费者group状态
对于消费者group的状态包含了下面几种
消费者group各个状态之间的流转如下所示
下面简单的描述一下状态流转过程:最开始消费者group是Empty 状态,当Rebalance 开启后,会被置于 RreparingRebalance 状态等待成员加入group。之后当有成员入组时,会变更到CompletingRebalance 状态等待分配方案。分配完成后会流转到Stable 状态完成充平衡。
当有新成员入组或者成员退出时,消费者group 状态从 Stable 直接变为 PreparingRebalance 状态,此时所有成员都需要重新加入group。当所有的成员都退出组时,状态会变为 Empty。Kafka 定期自动删除过期位移的条件就是,group要处于 Empty 状态。当消费者 group 停用了很长时间(超过7天),此时Kafka 就可能将其删除。
与Broker建立TCP连接
与Broker 建立TCP连接是通过 ensureCoordinatorReady() 方法实现的,下面我们看一下方法的具体实现。
上面方法入口中,与Broker 端建立TCP 连接的主要逻辑委派给了 lookupCoordinator() 去实现。
然后发送GroupCoordinator 请求的详情如下:
接下篇:《Kafka消费者加入group流程(下)》