Skip to content

Commit

Permalink
MINOR: JoinGroup and SyncGroup callbacks should catch exceptions (apa…
Browse files Browse the repository at this point in the history
…che#12910)

We recently had a bug causing the JoinGroup callback to thrown an exception (apache#12909). When it happens, the exception is propagated to the caller and the JoinGroup callback is never completed. To make it worst, the member whose callback failed become a zombie because the group coordinator does not expire member with a pending callback.

This patch catch exceptions for both invocation of JoinGroup and SyncGroup callbacks and retry to complete them with a `UNKNOWN_SERVER_ERROR` error if they failed.

Reviewers: Jason Gustafson <jason@confluent.io>
  • Loading branch information
dajac committed Nov 29, 2022
1 parent be03273 commit c2c8b24
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 5 deletions.
24 changes: 19 additions & 5 deletions core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
Original file line number Diff line number Diff line change
Expand Up @@ -546,9 +546,16 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
def maybeInvokeJoinCallback(member: MemberMetadata,
joinGroupResult: JoinGroupResult): Unit = {
if (member.isAwaitingJoin) {
member.awaitingJoinCallback(joinGroupResult)
member.awaitingJoinCallback = null
numMembersAwaitingJoin -= 1
try {
member.awaitingJoinCallback(joinGroupResult)
} catch {
case t: Throwable =>
error(s"Failed to invoke join callback for $member due to ${t.getMessage}.", t)
member.awaitingJoinCallback(JoinGroupResult(member.memberId, Errors.UNKNOWN_SERVER_ERROR))
} finally {
member.awaitingJoinCallback = null
numMembersAwaitingJoin -= 1
}
}
}

Expand All @@ -558,8 +565,15 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
def maybeInvokeSyncCallback(member: MemberMetadata,
syncGroupResult: SyncGroupResult): Boolean = {
if (member.isAwaitingSync) {
member.awaitingSyncCallback(syncGroupResult)
member.awaitingSyncCallback = null
try {
member.awaitingSyncCallback(syncGroupResult)
} catch {
case t: Throwable =>
error(s"Failed to invoke sync callback for $member due to ${t.getMessage}.", t)
member.awaitingSyncCallback(SyncGroupResult(Errors.UNKNOWN_SERVER_ERROR))
} finally {
member.awaitingSyncCallback = null
}
true
} else {
false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,30 @@ class GroupMetadataTest {
assertFalse(member.isAwaitingJoin)
}

@Test
def testInvokeJoinCallbackFails(): Unit = {
val member = new MemberMetadata(memberId, None, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
protocolType, List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte])))

var shouldFail = true
var result: Option[JoinGroupResult] = None
def joinCallback(joinGroupResult: JoinGroupResult): Unit = {
if (shouldFail) {
shouldFail = false
throw new Exception("Something went wrong!")
} else {
result = Some(joinGroupResult)
}
}

group.add(member, joinCallback)

group.maybeInvokeJoinCallback(member, JoinGroupResult(member.memberId, Errors.NONE))

assertEquals(Errors.UNKNOWN_SERVER_ERROR, result.get.error)
assertFalse(member.isAwaitingJoin)
}

@Test
def testNotInvokeJoinCallback(): Unit = {
val member = new MemberMetadata(memberId, None, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
Expand All @@ -631,6 +655,31 @@ class GroupMetadataTest {
assertFalse(member.isAwaitingJoin)
}

@Test
def testInvokeSyncCallbackFails(): Unit = {
val member = new MemberMetadata(memberId, None, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
protocolType, List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte])))

var shouldFail = true
var result: Option[SyncGroupResult] = None
def syncCallback(syncGroupResult: SyncGroupResult): Unit = {
if (shouldFail) {
shouldFail = false
throw new Exception("Something went wrong!")
} else {
result = Some(syncGroupResult)
}
}

group.add(member)
member.awaitingSyncCallback = syncCallback

val invoked = group.maybeInvokeSyncCallback(member, SyncGroupResult(Errors.NONE))
assertTrue(invoked)
assertEquals(Errors.UNKNOWN_SERVER_ERROR, result.get.error)
assertFalse(member.isAwaitingSync)
}

@Test
def testInvokeSyncCallback(): Unit = {
val member = new MemberMetadata(memberId, None, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
Expand Down

0 comments on commit c2c8b24

Please sign in to comment.