Kafka消费者加入group流程(下)

接上篇:《Kafka消费者加入group流程(上)

发送加入组请求(Rebalance流程)

消费者首次加入group也可以认为是Rebalance的一种,其中包含了两类请求:JoinGroup 和 SyncGroup 请求。我们先看一下两次请求的流程:

当组内成员加入group时,它会向协调者发送一个JoinGroup请求。请求中会将自己要订阅的Topic 上报,这样协调者就可以收集到所有成员的订阅信息。收集完订阅信息之后,通常情况下,第一个发送JoinGroup 请求的成员将会自动称为Leader。这里面的Leader 和 分区的Leader 副本不是一个概念,这里面的Leader 是消费者group 的 Leader,它将会负责具体的分区分配方案制定。下面我们看一下源代码的实现:

/**
 * 确保Group是active,并且加入该group
 * 向GroupCoordinator发送JoinGroup、SyncGroup请求,并获取分配的主题分区
 */
boolean ensureActiveGroup(final Timer timer) {
	// 确保 GroupCoordinator 已经连接,防止之前建立的连接断开
	if (!ensureCoordinatorReady(timer)) {
		return false;
	}
	// 启动心跳发送线程(并不一定发送心跳,满足条件后才会发送心跳)
	startHeartbeatThreadIfNeeded();
	// 发送 JoinGroup 请求,并对返回的信息进行处理
	return joinGroupIfNeeded(timer);
}

JoinGroup 的请求发送是在 joinGroupIfNeeded() 中实现的:

/**
 * 发送 JoinGroup + SyncGroup 请求
 */
boolean joinGroupIfNeeded(final Timer timer) {
	while (rejoinNeededOrPending()) {
		if (!ensureCoordinatorReady(timer)) {
			return false;
		}

		/** 触发 onJoinPrepare, 包括 offset commit 和 rebalance listener */
		if (needsJoinPrepare) {
			onJoinPrepare(generation.generationId, generation.memberId);
			needsJoinPrepare = false;
		}
		/** 初始化 JoinGroup 请求,并发送该请求 */
		final RequestFuture<ByteBuffer> future = initiateJoinGroup();
		client.poll(future, timer);
		if (!future.isDone()) {
			// we ran out of time
			return false;
		}
		/** 到这一步,时间上SyncGroup 已经成功了 */
		if (future.succeeded()) {
			ByteBuffer memberAssignment = future.value().duplicate();
			onJoinComplete(generation.generationId, generation.memberId, generation.protocol, memberAssignment);

			// 重置 joinFuture 为空
			resetJoinGroupFuture();
			needsJoinPrepare = true;
		} else {
			resetJoinGroupFuture();
			final RuntimeException exception = future.exception();
			if (exception instanceof UnknownMemberIdException || exception instanceof RebalanceInProgressException ||
					exception instanceof IllegalGenerationException || exception instanceof MemberIdRequiredException)
				continue;
			else if (!future.isRetriable())
				throw exception;

			timer.sleep(rebalanceConfig.retryBackoffMs);
		}
	}
	return true;
}

下面我们看下 initiateJoinGroup() 的实现:

/**
 * 发送JoinGroup请求,并添加listener
 */
private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
	if (joinFuture == null) {
		/** rebalance 期间,心跳线程停止 */
		disableHeartbeatThread();
		/** 标记为 rebalance */
		state = MemberState.REBALANCING;
		joinFuture = sendJoinGroupRequest(); /** 发送 JoinGroup 请求 */
		joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {
			@Override
			public void onSuccess(ByteBuffer value) {
				synchronized (AbstractCoordinator.this) {
					log.info("Successfully joined group with generation {}", generation.generationId);
					/** 标记 Consumer 为 stable */
					state = MemberState.STABLE;
					rejoinNeeded = false;

					if (heartbeatThread != null)
						heartbeatThread.enable();
				}
			}

			@Override
			public void onFailure(RuntimeException e) {
				synchronized (AbstractCoordinator.this) {
					/** 标记 Consumer 为 Unjoined */
					state = MemberState.UNJOINED;
				}
			}
		});
	}
	return joinFuture;
}

下面我们看一下发送请求的方法 sendJoinGroupRequest():

/**
 * 发送 JoinGroup 请求并返回分配结果(在 JoinGroupResponseHandler中实现)
 */
RequestFuture<ByteBuffer> sendJoinGroupRequest() {
	if (coordinatorUnknown())
		return RequestFuture.coordinatorNotAvailable();

	// 发送JoinGroup请求
	log.info("(Re-)joining group");
	JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder(
			new JoinGroupRequestData()
					.setGroupId(rebalanceConfig.groupId)
					.setSessionTimeoutMs(this.rebalanceConfig.sessionTimeoutMs)
					.setMemberId(this.generation.memberId)
					.setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
					.setProtocolType(protocolType())
					.setProtocols(metadata())
					.setRebalanceTimeoutMs(this.rebalanceConfig.rebalanceTimeoutMs)
	);
	log.debug("Sending JoinGroup ({}) to coordinator {}", requestBuilder, this.coordinator);

	int joinGroupTimeoutMs = Math.max(rebalanceConfig.rebalanceTimeoutMs, rebalanceConfig.rebalanceTimeoutMs + 5000);
	return client.send(coordinator, requestBuilder, joinGroupTimeoutMs).compose(new JoinGroupResponseHandler());
}

/**
 * 处理 JoinGroup response 的 handler(同步 group 信息)
 */
private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
	@Override
	public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
		Errors error = joinResponse.error();
		if (error == Errors.NONE) {
			log.debug("Received successful JoinGroup response: {}", joinResponse);
			sensors.joinLatency.record(response.requestLatencyMs());

			synchronized (AbstractCoordinator.this) {
				/** 如果此时 Consumer 的状态不是 rebalacing,就引起异常 */
				if (state != MemberState.REBALANCING) {
					// if the consumer was woken up before a rebalance completes, we may have already left
					// the group. In this case, we do not want to continue with the sync group.
					future.raise(new UnjoinedGroupException());
				} else {
					AbstractCoordinator.this.generation = new Generation(joinResponse.data().generationId(), joinResponse.data().memberId(), joinResponse.data().protocolName());
					/** JoinGroup成功,下面需要进行SyncGroup,获取分配的主题分区 */
					if (joinResponse.isLeader()) {
						// Leader 将会执行分配方案,并发送SyncGroup请求
						onJoinLeader(joinResponse).chain(future);
					} else {
						onJoinFollower().chain(future);
					}
				}
			}
		} else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
			log.debug("Attempt to join group rejected since coordinator {} is loading the group.", coordinator());
			// backoff and retry
			future.raise(error);
		} else if (error == Errors.UNKNOWN_MEMBER_ID) {
			// reset the member id and retry immediately
			resetGeneration();
			log.debug("Attempt to join group failed due to unknown member id.");
			future.raise(error);
		} else if (error == Errors.COORDINATOR_NOT_AVAILABLE
				|| error == Errors.NOT_COORDINATOR) {
			// re-discover the coordinator and retry with backoff
			markCoordinatorUnknown();
			log.debug("Attempt to join group failed due to obsolete coordinator information: {}", error.message());
			future.raise(error);
		} else if (error == Errors.FENCED_INSTANCE_ID) {
			log.error("Received fatal exception: group.instance.id gets fenced");
			future.raise(error);
		} else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL
				|| error == Errors.INVALID_SESSION_TIMEOUT
				|| error == Errors.INVALID_GROUP_ID
				|| error == Errors.GROUP_AUTHORIZATION_FAILED
				|| error == Errors.GROUP_MAX_SIZE_REACHED) {
			// log the error and re-throw the exception
			log.error("Attempt to join group failed due to fatal error: {}", error.message());
			if (error == Errors.GROUP_MAX_SIZE_REACHED) {
				future.raise(new GroupMaxSizeReachedException(rebalanceConfig.groupId));
			} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
				future.raise(new GroupAuthorizationException(rebalanceConfig.groupId));
			} else {
				future.raise(error);
			}
		} else if (error == Errors.UNSUPPORTED_VERSION) {
			log.error("Attempt to join group failed due to unsupported version error. Please unset field group.instance.id and retry" + "to see if the problem resolves");
			future.raise(error);
		} else if (error == Errors.MEMBER_ID_REQUIRED) {
			// Broker requires a concrete member id to be allowed to join the group. Update member id
			// and send another join group request in next cycle.
			synchronized (AbstractCoordinator.this) {
				AbstractCoordinator.this.generation = new Generation(OffsetCommitRequest.DEFAULT_GENERATION_ID, joinResponse.data().memberId(), null);
				AbstractCoordinator.this.rejoinNeeded = true;
				AbstractCoordinator.this.state = MemberState.UNJOINED;
			}
			future.raise(error);
		} else {
			// unexpected error, throw the exception
			log.error("Attempt to join group failed due to unexpected error: {}", error.message());
			future.raise(new KafkaException("Unexpected error in join group response: " + error.message()));
		}
	}
}

下面我们看一下 SyncGroup 的请求发送流程:

/**
 * 当consumer为follower时,发送SyncGroup获取分配结果
 */
private RequestFuture<ByteBuffer> onJoinFollower() {
	// 发送空消息的SyncGroup
	SyncGroupRequest.Builder requestBuilder =
			new SyncGroupRequest.Builder(
					new SyncGroupRequestData()
							.setGroupId(rebalanceConfig.groupId)
							.setMemberId(generation.memberId)
							.setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
							.setGenerationId(generation.generationId)
							.setAssignments(Collections.emptyList())
			);
	log.debug("Sending follower SyncGroup to coordinator {}: {}", this.coordinator, requestBuilder);
	return sendSyncGroupRequest(requestBuilder);
}

/**
 * 当consumer为leader时,对group下的所有实例进行分配,将assign的结果通过SyncGroup请求发送到GroupCoordinator
 */
private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
	try {
		// perform the leader synchronization and send back the assignment for the group
		/** 进行 assign 操作 */
		Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.data().leader(), joinResponse.data().protocolName(), joinResponse.data().members());

		List<SyncGroupRequestData.SyncGroupRequestAssignment> groupAssignmentList = new ArrayList<>();
		for (Map.Entry<String, ByteBuffer> assignment : groupAssignment.entrySet()) {
			groupAssignmentList.add(new SyncGroupRequestData.SyncGroupRequestAssignment().setMemberId(assignment.getKey()).setAssignment(Utils.toArray(assignment.getValue())));
		}

		SyncGroupRequest.Builder requestBuilder =
				new SyncGroupRequest.Builder(
						new SyncGroupRequestData()
								.setGroupId(rebalanceConfig.groupId)
								.setMemberId(generation.memberId)
								.setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
								.setGenerationId(generation.generationId)
								.setAssignments(groupAssignmentList)
				);
		log.debug("Sending leader SyncGroup to coordinator {}: {}", this.coordinator, requestBuilder);
		/** 发送 sync-group 请求 */
		return sendSyncGroupRequest(requestBuilder);
	} catch (RuntimeException e) {
		return RequestFuture.failure(e);
	}
}

/**
 * 发送 SyncGroup 请求,获取对 partition 分配的安排
 */
private RequestFuture<ByteBuffer> sendSyncGroupRequest(SyncGroupRequest.Builder requestBuilder) {
	if (coordinatorUnknown())
		return RequestFuture.coordinatorNotAvailable();
	return client.send(coordinator, requestBuilder).compose(new SyncGroupResponseHandler());
}

private class SyncGroupResponseHandler extends CoordinatorResponseHandler<SyncGroupResponse, ByteBuffer> {
	@Override
	public void handle(SyncGroupResponse syncResponse, RequestFuture<ByteBuffer> future) {
		Errors error = syncResponse.error();
		if (error == Errors.NONE) {
			sensors.syncLatency.record(response.requestLatencyMs());
			future.complete(ByteBuffer.wrap(syncResponse.data.assignment()));
		} else {
			// join的标志位设置为true
			requestRejoin();

			if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
				future.raise(new GroupAuthorizationException(rebalanceConfig.groupId));
			} else if (error == Errors.REBALANCE_IN_PROGRESS) {
				// group正在rebalance,任务失败
				log.debug("SyncGroup failed because the group began another rebalance");
				future.raise(error);
			} else if (error == Errors.FENCED_INSTANCE_ID) {
				log.error("Received fatal exception: group.instance.id gets fenced");
				future.raise(error);
			} else if (error == Errors.UNKNOWN_MEMBER_ID || error == Errors.ILLEGAL_GENERATION) {
				log.debug("SyncGroup failed: {}", error.message());
				resetGeneration();
				future.raise(error);
			} else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) {
				log.debug("SyncGroup failed: {}", error.message());
				markCoordinatorUnknown();
				future.raise(error);
			} else {
				future.raise(new KafkaException("Unexpected error from SyncGroup: " + error.message()));
			}
		}
	}
}

消费者Rebalance的几种场景

我们先看一下Rebalance触发的几个提交件: 1、组成员数量发生变化;2、订阅主题数量发生变化;3、订阅主题的分区数发生变化。对于一个运行中的应用,上面3 中场景中,第一种场景触发Rebalance的可能性比较大。下面我们看一下Rebalance的各种场景:

新成员入组

组成员主动离组

组成员崩溃离组

Rebalance时组内成员需要提交offset


参考:《Apache Kafka 源码剖析》、《Kafka技术内幕》、《极客时间:Kafka核心技术与实战》、http://matt33.com/2017/10/22/consumer-join-group/