Skip to content

Commit

Permalink
KAFKA-13790; ReplicaManager should be robust to all partition updates…
Browse files Browse the repository at this point in the history
… from kraft metadata log (apache#12085)

This patch refactors the `Partition.makeLeader` and `Partition.makeFollower` to be robust to all partition updates from the KRaft metadata log. Particularly, it ensures the following invariants:

- A partition update is accepted if the partition epoch is equal or newer. The partition epoch is updated by the AlterPartition path as well so we accept an update from the metadata log with the same partition epoch in order to fully update the partition state.
- The leader epoch state offset is only updated when the leader epoch is bumped.
- The follower states are only updated when the leader epoch is bumped.
- Fetchers are only restarted when the leader epoch is bumped. This was already the case but this patch adds unit tests to prove/maintain it.

In the mean time, the patch unifies the state change logs to be similar in both ZK and KRaft world.

Reviewers: Jason Gustafson <jason@confluent.io>
  • Loading branch information
dajac committed May 9, 2022
1 parent df507e5 commit b485f92
Show file tree
Hide file tree
Showing 4 changed files with 510 additions and 78 deletions.
143 changes: 88 additions & 55 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ class Partition(val topicPartition: TopicPartition,
@volatile private var leaderEpoch: Int = LeaderAndIsr.InitialLeaderEpoch - 1
// start offset for 'leaderEpoch' above (leader epoch of the current leader for this partition),
// defined when this broker is leader for partition
@volatile private var leaderEpochStartOffsetOpt: Option[Long] = None
@volatile private[cluster] var leaderEpochStartOffsetOpt: Option[Long] = None
// Replica ID of the leader, defined when this broker is leader or follower for the partition.
@volatile var leaderReplicaIdOpt: Option[Int] = None
@volatile private[cluster] var partitionState: PartitionState = CommittedPartitionState(Set.empty, LeaderRecoveryState.RECOVERED)
Expand Down Expand Up @@ -548,21 +548,35 @@ class Partition(val topicPartition: TopicPartition,
highWatermarkCheckpoints: OffsetCheckpoints,
topicId: Option[Uuid]): Boolean = {
val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {
// record the epoch of the controller that made the leadership decision. This is useful while updating the isr
// to maintain the decision maker controller's epoch in the zookeeper path
// Partition state changes are expected to have an partition epoch larger or equal
// to the current partition epoch. The latter is allowed because the partition epoch
// is also updated by the AlterPartition response so the new epoch might be known
// before a LeaderAndIsr request is received or before an update is received via
// the metadata log.
if (partitionState.partitionEpoch < partitionEpoch) {
stateChangeLogger.info(s"Skipped the become-leader state change for $topicPartition with topic id $topicId " +
s"and partition state $partitionState since the leader is already at a newer partition epoch $partitionEpoch.")
return false
}

// Record the epoch of the controller that made the leadership decision. This is useful while updating the isr
// to maintain the decision maker controller's epoch in the zookeeper path.
controllerEpoch = partitionState.controllerEpoch

val currentTimeMs = time.milliseconds
val isNewLeader = !isLeader
val isNewLeaderEpoch = partitionState.leaderEpoch > leaderEpoch
val isr = partitionState.isr.asScala.map(_.toInt).toSet
val addingReplicas = partitionState.addingReplicas.asScala.map(_.toInt)
val removingReplicas = partitionState.removingReplicas.asScala.map(_.toInt)

if (partitionState.leaderRecoveryState == LeaderRecoveryState.RECOVERING.value()) {
stateChangeLogger.info(
s"The topic partition $topicPartition was marked as RECOVERING. Leader log recovery is not implemented. " +
"Marking the topic partition as RECOVERED."
)
if (partitionState.leaderRecoveryState == LeaderRecoveryState.RECOVERING.value) {
stateChangeLogger.info(s"The topic partition $topicPartition was marked as RECOVERING. " +
"Marking the topic partition as RECOVERED.")
}

// Updating the assignment and ISR state is safe if the partition epoch is
// larger or equal to the current partition epoch.
updateAssignmentAndIsr(
assignment = partitionState.replicas.asScala.map(_.toInt),
isr = isr,
Expand All @@ -576,51 +590,60 @@ class Partition(val topicPartition: TopicPartition,
} catch {
case e: ZooKeeperClientException =>
stateChangeLogger.error(s"A ZooKeeper client exception has occurred and makeLeader will be skipping the " +
s"state change for the partition $topicPartition with leader epoch: $leaderEpoch ", e)

s"state change for the partition $topicPartition with leader epoch: $leaderEpoch.", e)
return false
}

val leaderLog = localLogOrException
val leaderEpochStartOffset = leaderLog.logEndOffset
stateChangeLogger.info(s"Leader $topicPartition starts at leader epoch ${partitionState.leaderEpoch} from " +
s"offset $leaderEpochStartOffset with high watermark ${leaderLog.highWatermark} " +
s"ISR ${isr.mkString("[", ",", "]")} addingReplicas ${addingReplicas.mkString("[", ",", "]")} " +
s"removingReplicas ${removingReplicas.mkString("[", ",", "]")}. Previous leader epoch was $leaderEpoch.")

// We cache the leader epoch here, persisting it only if it's local (hence having a log dir)
leaderEpoch = partitionState.leaderEpoch
leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset)
partitionEpoch = partitionState.partitionEpoch

// In the case of successive leader elections in a short time period, a follower may have
// entries in its log from a later epoch than any entry in the new leader's log. In order
// to ensure that these followers can truncate to the right offset, we must cache the new
// leader epoch and the start offset since it should be larger than any epoch that a follower
// would try to query.
leaderLog.maybeAssignEpochStartOffset(leaderEpoch, leaderEpochStartOffset)

val isNewLeader = !isLeader
val currentTimeMs = time.milliseconds
// We update the epoch start offset and the replicas' state only if the leader epoch
// has changed.
if (isNewLeaderEpoch) {
val leaderEpochStartOffset = leaderLog.logEndOffset
stateChangeLogger.info(s"Leader $topicPartition with topic id $topicId starts at " +
s"leader epoch ${partitionState.leaderEpoch} from offset $leaderEpochStartOffset " +
s"with partition epoch ${partitionState.partitionEpoch}, high watermark ${leaderLog.highWatermark}, " +
s"ISR ${isr.mkString("[", ",", "]")}, adding replicas ${addingReplicas.mkString("[", ",", "]")} and " +
s"removing replicas ${removingReplicas.mkString("[", ",", "]")}. Previous leader epoch was $leaderEpoch.")

// In the case of successive leader elections in a short time period, a follower may have
// entries in its log from a later epoch than any entry in the new leader's log. In order
// to ensure that these followers can truncate to the right offset, we must cache the new
// leader epoch and the start offset since it should be larger than any epoch that a follower
// would try to query.
leaderLog.maybeAssignEpochStartOffset(partitionState.leaderEpoch, leaderEpochStartOffset)

// Initialize lastCaughtUpTime of replicas as well as their lastFetchTimeMs and
// lastFetchLeaderLogEndOffset.
remoteReplicas.foreach { replica =>
replica.resetReplicaState(
currentTimeMs = currentTimeMs,
leaderEndOffset = leaderEpochStartOffset,
isNewLeader = isNewLeader,
isFollowerInSync = partitionState.isr.contains(replica.brokerId)
)
}

// Initialize lastCaughtUpTime of replicas as well as their lastFetchTimeMs and
// lastFetchLeaderLogEndOffset.
remoteReplicas.foreach { replica =>
replica.resetReplicaState(
currentTimeMs = currentTimeMs,
leaderEndOffset = leaderEpochStartOffset,
isNewLeader = isNewLeader,
isFollowerInSync = partitionState.isr.contains(replica.brokerId)
)
// We update the leader epoch and the leader epoch start offset iff the
// leader epoch changed.
leaderEpoch = partitionState.leaderEpoch
leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset)
} else {
stateChangeLogger.info(s"Skipped the become-leader state change for $topicPartition with topic id $topicId " +
s"and partition state $partitionState since it is already the leader with leader epoch $leaderEpoch. " +
s"Current high watermark ${leaderLog.highWatermark}, ISR ${isr.mkString("[", ",", "]")}, " +
s"adding replicas ${addingReplicas.mkString("[", ",", "]")} and " +
s"removing replicas ${removingReplicas.mkString("[", ",", "]")}.")
}

partitionEpoch = partitionState.partitionEpoch
leaderReplicaIdOpt = Some(localBrokerId)

// we may need to increment high watermark since ISR could be down to 1
// We may need to increment high watermark since ISR could be down to 1.
(maybeIncrementLeaderHW(leaderLog, currentTimeMs = currentTimeMs), isNewLeader)
}

// some delayed operations may be unblocked after HW changed
// Some delayed operations may be unblocked after HW changed.
if (leaderHWIncremented)
tryCompleteDelayedRequests()

Expand All @@ -631,15 +654,20 @@ class Partition(val topicPartition: TopicPartition,
* Make the local replica the follower by setting the new leader and ISR to empty
* If the leader replica id does not change and the new epoch is equal or one
* greater (that is, no updates have been missed), return false to indicate to the
* replica manager that state is already correct and the become-follower steps can be skipped
* replica manager that state is already correct and the become-follower steps can
* be skipped.
*/
def makeFollower(partitionState: LeaderAndIsrPartitionState,
highWatermarkCheckpoints: OffsetCheckpoints,
topicId: Option[Uuid]): Boolean = {
inWriteLock(leaderIsrUpdateLock) {
val newLeaderBrokerId = partitionState.leader
val oldLeaderEpoch = leaderEpoch
// record the epoch of the controller that made the leadership decision. This is useful while updating the isr
if (partitionState.partitionEpoch < partitionEpoch) {
stateChangeLogger.info(s"Skipped the become-follower state change for $topicPartition with topic id $topicId " +
s"and partition state $partitionState since the follower is already at a newer partition epoch $partitionEpoch.")
return false
}

// Record the epoch of the controller that made the leadership decision. This is useful while updating the isr
// to maintain the decision maker controller's epoch in the zookeeper path
controllerEpoch = partitionState.controllerEpoch

Expand All @@ -650,32 +678,37 @@ class Partition(val topicPartition: TopicPartition,
removingReplicas = partitionState.removingReplicas.asScala.map(_.toInt),
LeaderRecoveryState.of(partitionState.leaderRecoveryState)
)

try {
createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints, topicId)
} catch {
case e: ZooKeeperClientException =>
stateChangeLogger.error(s"A ZooKeeper client exception has occurred. makeFollower will be skipping the " +
s"state change for the partition $topicPartition with leader epoch: $leaderEpoch.", e)

return false
}

val followerLog = localLogOrException
val leaderEpochEndOffset = followerLog.logEndOffset
stateChangeLogger.info(s"Follower $topicPartition starts at leader epoch ${partitionState.leaderEpoch} from " +
s"offset $leaderEpochEndOffset with high watermark ${followerLog.highWatermark}. " +
s"Previous leader epoch was $leaderEpoch.")
val isNewLeaderEpoch = partitionState.leaderEpoch > leaderEpoch

if (isNewLeaderEpoch) {
val leaderEpochEndOffset = followerLog.logEndOffset
stateChangeLogger.info(s"Follower $topicPartition starts at leader epoch ${partitionState.leaderEpoch} from " +
s"offset $leaderEpochEndOffset with partition epoch ${partitionState.partitionEpoch} and " +
s"high watermark ${followerLog.highWatermark}. Previous leader epoch was $leaderEpoch.")
} else {
stateChangeLogger.info(s"Skipped the become-follower state change for $topicPartition with topic id $topicId " +
s"and partition state $partitionState since it is already a follower with leader epoch $leaderEpoch.")
}

leaderReplicaIdOpt = Some(partitionState.leader)
leaderEpoch = partitionState.leaderEpoch
leaderEpochStartOffsetOpt = None
partitionEpoch = partitionState.partitionEpoch

if (leaderReplicaIdOpt.contains(newLeaderBrokerId) && leaderEpoch == oldLeaderEpoch) {
false
} else {
leaderReplicaIdOpt = Some(newLeaderBrokerId)
true
}
// We must restart the fetchers when the leader epoch changed regardless of
// whether the leader changed as well.
isNewLeaderEpoch
}
}

Expand Down
26 changes: 5 additions & 21 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1599,13 +1599,9 @@ class ReplicaManager(val config: KafkaConfig,
// Update the partition information to be the leader
partitionStates.forKeyValue { (partition, partitionState) =>
try {
if (partition.makeLeader(partitionState, highWatermarkCheckpoints, topicIds(partitionState.topicName)))
if (partition.makeLeader(partitionState, highWatermarkCheckpoints, topicIds(partitionState.topicName))) {
partitionsToMakeLeaders += partition
else
stateChangeLogger.info(s"Skipped the become-leader state change after marking its " +
s"partition as leader with correlation id $correlationId from controller $controllerId epoch $controllerEpoch for " +
s"partition ${partition.topicPartition} (last update controller epoch ${partitionState.controllerEpoch}) " +
s"since it is already the leader for the partition.")
}
} catch {
case e: KafkaStorageException =>
stateChangeLogger.error(s"Skipped the become-leader state change with " +
Expand Down Expand Up @@ -1681,14 +1677,9 @@ class ReplicaManager(val config: KafkaConfig,
try {
if (metadataCache.hasAliveBroker(newLeaderBrokerId)) {
// Only change partition state when the leader is available
if (partition.makeFollower(partitionState, highWatermarkCheckpoints, topicIds(partitionState.topicName)))
if (partition.makeFollower(partitionState, highWatermarkCheckpoints, topicIds(partitionState.topicName))) {
partitionsToMakeFollower += partition
else
stateChangeLogger.info(s"Skipped the become-follower state change after marking its partition as " +
s"follower with correlation id $correlationId from controller $controllerId epoch $controllerEpoch " +
s"for partition ${partition.topicPartition} (last update " +
s"controller epoch ${partitionState.controllerEpoch}) " +
s"since the new leader $newLeaderBrokerId is the same as the old leader")
}
} else {
// The leader broker should always be present in the metadata cache.
// If not, we should record the error message and abort the transition process for this partition
Expand Down Expand Up @@ -2180,11 +2171,7 @@ class ReplicaManager(val config: KafkaConfig,
getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, isNew) =>
try {
val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew)
if (!partition.makeLeader(state, offsetCheckpoints, Some(info.topicId))) {
stateChangeLogger.info("Skipped the become-leader state change for " +
s"$tp with topic id ${info.topicId} because this partition is " +
"already a local leader.")
}
partition.makeLeader(state, offsetCheckpoints, Some(info.topicId))
changedPartitions.add(partition)
} catch {
case e: KafkaStorageException =>
Expand Down Expand Up @@ -2234,9 +2221,6 @@ class ReplicaManager(val config: KafkaConfig,
val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew)
if (partition.makeFollower(state, offsetCheckpoints, Some(info.topicId))) {
partitionsToMakeFollower.put(tp, partition)
} else {
stateChangeLogger.info("Skipped the become-follower state change after marking its " +
s"partition as follower for partition $tp with id ${info.topicId} and partition state $state.")
}
}
}
Expand Down
Loading

0 comments on commit b485f92

Please sign in to comment.