Skip to content

Commit

Permalink
KAFKA-2738: Replica FetcherThread should connect to leader endpoint m…
Browse files Browse the repository at this point in the history
…atching its inter-broker security protocol.

…atching its inter-broker security protocol

Author: Gwen Shapira <cshapi@gmail.com>

Reviewers: Jun Rao, Guozhang Wang

Closes apache#428 from gwenshap/KAFKA-2738
  • Loading branch information
gwenshap authored and guozhangwang committed Nov 5, 2015
1 parent b30d68a commit d23785f
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 8 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class KafkaApis(val requestChannel: RequestChannel,

try {
// call replica manager to handle updating partitions to become leader or follower
val result = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest)
val result = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest, metadataCache)
val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.correlationId, result.responseMap, result.errorCode)
// for each new leader or follower, call coordinator to handle
// consumer group migration
Expand Down
14 changes: 7 additions & 7 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ class ReplicaManager(val config: KafkaConfig,
}
}

def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest): BecomeLeaderOrFollowerResult = {
def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest, metadataCache: MetadataCache): BecomeLeaderOrFollowerResult = {
leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo) =>
stateChangeLogger.trace("Broker %d received LeaderAndIsr request %s correlation id %d from controller %d epoch %d for partition [%s,%d]"
.format(localBrokerId, stateInfo, leaderAndISRRequest.correlationId,
Expand Down Expand Up @@ -639,7 +639,7 @@ class ReplicaManager(val config: KafkaConfig,
else
Set.empty[Partition]
val partitionsBecomeFollower = if (!partitionsToBeFollower.isEmpty)
makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, leaderAndISRRequest.leaders, leaderAndISRRequest.correlationId, responseMap)
makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, leaderAndISRRequest.correlationId, responseMap, metadataCache)
else
Set.empty[Partition]

Expand Down Expand Up @@ -731,9 +731,9 @@ class ReplicaManager(val config: KafkaConfig,
private def makeFollowers(controllerId: Int,
epoch: Int,
partitionState: Map[Partition, PartitionStateInfo],
leaders: Set[BrokerEndPoint],
correlationId: Int,
responseMap: mutable.Map[(String, Int), Short]) : Set[Partition] = {
responseMap: mutable.Map[(String, Int), Short],
metadataCache: MetadataCache) : Set[Partition] = {
partitionState.foreach { state =>
stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " +
"starting the become-follower transition for partition %s")
Expand All @@ -751,7 +751,7 @@ class ReplicaManager(val config: KafkaConfig,
partitionState.foreach{ case (partition, partitionStateInfo) =>
val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
val newLeaderBrokerId = leaderIsrAndControllerEpoch.leaderAndIsr.leader
leaders.find(_.id == newLeaderBrokerId) match {
metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match {
// Only change partition state when the leader is available
case Some(leaderBroker) =>
if (partition.makeFollower(controllerId, partitionStateInfo, correlationId))
Expand All @@ -762,7 +762,7 @@ class ReplicaManager(val config: KafkaConfig,
.format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch,
partition.topic, partition.partitionId, newLeaderBrokerId))
case None =>
// The leader broker should always be present in the leaderAndIsrRequest.
// The leader broker should always be present in the metadata cache.
// If not, we should record the error message and abort the transition process for this partition
stateChangeLogger.error(("Broker %d received LeaderAndIsrRequest with correlation id %d from controller" +
" %d epoch %d for partition [%s,%d] but cannot become follower since the new leader %d is unavailable.")
Expand Down Expand Up @@ -800,7 +800,7 @@ class ReplicaManager(val config: KafkaConfig,
// we do not need to check if the leader exists again since this has been done at the beginning of this process
val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition =>
new TopicAndPartition(partition) -> BrokerAndInitialOffset(
leaders.find(_.id == partition.leaderReplicaIdOpt.get).get,
metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerSecurityProtocol),
partition.getReplica().get.logEndOffset.messageOffset)).toMap
replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)

Expand Down

0 comments on commit d23785f

Please sign in to comment.