Kafka控制器-分区状态机(PartitionStateMachine)

PartitionStateMachine 记录了集群所有TopicPartition的状态信息,它记录着每个分区的状态流转。Kafka 中的分区状态共有以下4种类型:

1、NonExistentPartition:表示该分区没有被创建过或创建之后又被删除了,有效前置状态是 OfflinePartition;

2、NewPartition:分区创建后,将处于这个状态。此时分区还没有 leader 和 isr,有效前置状态是 NonExistentPartition;

3、OnlinePartition:一旦这个分区的 leader 被选举出来,将处于这个状态。有效前置状态是 NewPartition、OnlinePartition、OfflinePartition;

4、OfflinePartition:当分区的 leader 宕机,分区会被转移到这个状态。有效前置状态是 NewPartition、OnlinePartition、OfflinePartition。

PartitionStateMachine 初始化

/**
 * 初始化所有分区的状态(从ZK获取), 然后对于 New/Offline 触发选主(选主成功后, 变为 OnlinePartition)
 */
def startup() {
  info("Initializing partition state")
  /** 初始化分区的状态, 如果leader所在的broker是存活的, 那么状态为OnlinePartition; 否则为 OfflinePartition */
  initializePartitionState()
  info("Triggering online partition state changes")
  /** 为所有处理 NewPartition 或 OnlinePartition 状态的分区选举 leader */
  triggerOnlinePartitionStateChange()
  debug(s"Started partition state machine with initial state -> ${controllerContext.partitionStates}")
}

PartitionStateMachine先调用initializePartitionState()初始化集群中所有分区的状态:

1)如果分区有LeaderAndIsr,当分区leader所在的Broker是存活的,那么将其状态设置为OnlinePartition,否则设置为OfflinePartition;

2)如果分区没有LeaderAndIsr,那么将其状态设置为NewPartition。

initializePartitionState()方法中,只是将分区的状态更新到分区状态机的partitionState中,并没有真正的进行状态转移。

/**
 * 根据从ZK获取的所有分区, 进行状态初始化
 */
private def initializePartitionState() {
  for (topicPartition <- controllerContext.allPartitions) {
    /** 检查分区是否有LeaderAndIsr信息, 如果没有则设置为New */
    controllerContext.partitionLeadershipInfo.get(topicPartition) match {
      case Some(currentLeaderIsrAndEpoch) =>
        if (controllerContext.isReplicaOnline(currentLeaderIsrAndEpoch.leaderAndIsr.leader, topicPartition))
          /** 有LeaderAndIsr信息, 且leader存活, 设置为OnlinePartition */
          controllerContext.putPartitionState(topicPartition, OnlinePartition)
        else
          /** 有LeaderAndIsr信息, 但leader不存活, 设置为OfflinePartition */
          controllerContext.putPartitionState(topicPartition, OfflinePartition)
      case None =>
        /** 没有LeaderAndIsr信息, 设置为NewPartition(表示分区不存在)*/
        controllerContext.putPartitionState(topicPartition, NewPartition)
    }
  }
}

后面会调用triggerOnlinePartitionStateChange(),为所有的状态为NewPartition/OnlinePartition的分区进行leader选举。选举成功后,状态会被设置为OnlinePartition。

/**
 * 这个方法是在控制器选举后或broker上线/下线时触发的
 */
def triggerOnlinePartitionStateChange(): Unit = {
  val partitions = controllerContext.partitionsInStates(Set(OfflinePartition, NewPartition))
  triggerOnlineStateChangeForPartitions(partitions)
}

private def triggerOnlineStateChangeForPartitions(partitions: collection.Set[TopicPartition]): Unit = {
  /** 过滤掉将要被删除的分区,只保留状态为NewPartition/OfflinePartition的分区 */
  val partitionsToTrigger = partitions.filter { partition =>
    !controllerContext.isTopicQueuedUpForDeletion(partition.topic)
  }.toSeq

  handleStateChanges(partitionsToTrigger, OnlinePartition, Some(OfflinePartitionLeaderElectionStrategy(false)))
}

/**
 * 分区状态机改变处理方法, 可以处理多个分区的状态转换, 这样就可以采用批量方式发送请求给多个Broker
 * 用partitionLeaderElectionStrategyOpt指定的策略去选举Leader
 */
override def handleStateChanges(partitions: Seq[TopicPartition], targetState: PartitionState,
  partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]
): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] = {
  if (partitions.nonEmpty) {
    try {
      controllerBrokerRequestBatch.newBatch()
      /** 为处在OfflinePartition/NewPartition状态的分区选Leader, 完成后转换为OnlinePartition */
      val result = doHandleStateChanges(
        partitions,
        targetState,
        partitionLeaderElectionStrategyOpt
      )
      /** 发送请求给所有broker, 包括LeaderAndIsr请求和UpdateMetadata请求 */
      controllerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
      result
    } catch {
    }
  } else {
    Map.empty
  }
}

以上的目的是为了将所有状态为NewPartition/OnlinePartition的分区转移到OnlinePartition,它做了2件事:

1)状态转移;

2)发送相应的请求。

分区状态转移

状态转换为 NewPartition

/** 如果分区的状态不存在, 设置为NonExistentPartition */
case NewPartition =>
  validPartitions.foreach { partition =>
    stateChangeLog.trace(s"Changed partition $partition state from ${partitionState(partition)} to $targetState with " +
      s"assigned replicas ${controllerContext.partitionReplicaAssignment(partition).mkString(",")}")
    controllerContext.putPartitionState(partition, NewPartition)
  }
  Map.empty

NewPartition是分区刚创建时的状态,处理逻辑如下:

1)校验前置状态,有效的前置状态为NonExistentPartition;
2)将该分区设置为NewPartition状态。

状态转换为 OnlinePartition

case OnlinePartition =>
  /** 获取NewPartition状态的分区集合 */
  val uninitializedPartitions = validPartitions.filter(partition => partitionState(partition) == NewPartition)
  /** 获取OfflinePartition|OnlinePartition的分区集合 */
  val partitionsToElectLeader = validPartitions.filter(partition => partitionState(partition) == OfflinePartition || partitionState(partition) == OnlinePartition)
  if (uninitializedPartitions.nonEmpty) {
    /** 选举leader, 更新到ZK和控制器上下文中, 如果没有存活的副本, 则抛出异常 */
    val successfulInitializations = initializeLeaderAndIsrForPartitions(uninitializedPartitions)
    successfulInitializations.foreach { partition =>
      stateChangeLog.trace(s"Changed partition $partition from ${partitionState(partition)} to $targetState with state " +
        s"${controllerContext.partitionLeadershipInfo(partition).leaderAndIsr}")
      controllerContext.putPartitionState(partition, OnlinePartition)
    }
  }
  if (partitionsToElectLeader.nonEmpty) {
    /** 进行leader选举, 更新到ZK及控制器上下文中, 失败抛出异常 */
    val electionResults = electLeaderForPartitions(
      partitionsToElectLeader,
      partitionLeaderElectionStrategyOpt.getOrElse(
        throw new IllegalArgumentException("Election strategy is a required field when the target state is OnlinePartition")
      )
    )

    electionResults.foreach {
      case (partition, Right(leaderAndIsr)) =>
        stateChangeLog.trace(
          s"Changed partition $partition from ${partitionState(partition)} to $targetState with state $leaderAndIsr"
        )
        controllerContext.putPartitionState(partition, OnlinePartition)
      case (_, Left(_)) => // Ignore; no need to update partition state on election error
    }

    electionResults
  } else {
    Map.empty
  }

OnlinePartition是分区正常工作时的状态,此时分区已经成功选举出了leader,实现逻辑如下:

1)将分区集合切分为:NewPartition和(OfflinePartition|OnlinePartition);

2)如果前置状态是NewPartition,那么为该分区选举leader,更新到ZK和控制器上下文中,如果没有副本存活,则抛出异常;

3)如果前置状态是OnlinePartition|OfflinePartition,需要传入leader选举的策略,触发分区的leader选举;

4)更新分区状态为OnlinePartition。

对于上面几种情况,无论前置状态是什么,最后都会触发分区的leader选举。成功后,会向这个分区的所有副本发送LeaderAndIsr请求。  

状态转换为 OfflinePartition

case OfflinePartition =>
  validPartitions.foreach { partition =>
    stateChangeLog.trace(s"Changed partition $partition state from ${partitionState(partition)} to $targetState")
    controllerContext.putPartitionState(partition, OfflinePartition)
  }
  Map.empty

OfflinePartition是分区leader宕机后,转移到的状态。如果分区转移到这个状态,那么就意味着这个分区没有可用leader。

1)校验前置状态,有效的前置状态为 NewPartition、OnlinePartition或OfflinePartition;

2)将分区的状态设置为OfflinePartition。

状态转换为 NonExistentPartition

case NonExistentPartition =>
  validPartitions.foreach { partition =>
    stateChangeLog.trace(s"Changed partition $partition state from ${partitionState(partition)} to $targetState")
    controllerContext.putPartitionState(partition, NonExistentPartition)
  }
  Map.empty

NonExistentPartition表示已经处于OfflinePartition状态的分区,已经从metadata和ZK中删除。

1)校验前置状态,有效前置状态为OfflinePartition;

2)将该分区的状态转移为NonExistentPartition。


参考:《Kafka技术内幕》、《Apache Kafka 源码剖析》、Kafka源码