-
Notifications
You must be signed in to change notification settings - Fork 13.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-14367; Add JoinGroup
to the new GroupCoordinator
interface
#12845
Conversation
e4e8ceb
to
dbe20a5
Compare
clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
Outdated
Show resolved
Hide resolved
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
Show resolved
Hide resolved
testJoinGroupProtocolType(version.asInstanceOf[Short]) | ||
} | ||
} | ||
def testHandleJoinGroupRequestFutureFailed(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this also a new test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's right.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems reasonable so far. This is just an initial pass. I will take another pass soon.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the PR, left some comments.
is the idea to add all existing group coordinator apis to GroupCoordinatorAdapter, then introduce the new GroupCoordinator which implements GroupCoordinator interface with new design, then remove GroupCoordinatorAdapter and point KafkaApis to the new GroupCoordinator?
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
Show resolved
Hide resolved
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
Show resolved
Hide resolved
dbe20a5
to
83b72f6
Compare
@jeffkbkim @jolshan Thanks for your comments. I have addressed your feedback. Note that I have extracted the changes about the JoinGroupResponse version handling in a separate PR: #12864. It is less risky this way. |
…ty in JoinGroupResponse (#12864) This is a small refactor extracted from #12845. It basically moves the logic to handle the backward compatibility of `JoinGroupResponseData.protocolName` from `KafkaApis` to `JoinGroupResponse`. The patch adds a new unit test for `JoinGroupResponse` and relies on existing tests as well. Reviewers: Justine Olshan <jolshan@confluent.io>, Jason Gustafson <jason@confluent.io>
Add GroupCoordinator interface
83b72f6
to
01baae5
Compare
Rebased the PR. Ready for second round. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks! left some comments.
ArgumentMatchers.eq(joinGroupRequest) | ||
)).thenReturn(future) | ||
|
||
val response = new AtomicReference[JoinGroupResponse]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you help me understand the reason for using an atomic reference?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i just needed a container to store the response. i could have used a var initialized to null as well, i suppose.
createKafkaApis().handle( | ||
requestChannelRequest, | ||
RequestLocal.NoCaching | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'm surely missing something here - shouldn't the test be blocked here? i can't seem to find the other thread running
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it does not. keep in mind that we use a future internally so the method is executed and returns immediately. then, the future is completed later on by future.completeExceptionally(Errors.NOT_COORDINATOR.exception)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gotcha. i think i was missing this from the CompletableFuture java docs
Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.
so the thread that completes the future will attempt to finish all (non-async) dependent chains
@jeffkbkim Thanks for the review. I have addressed your comments. |
createKafkaApis().handle( | ||
requestChannelRequest, | ||
RequestLocal.NoCaching | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gotcha. i think i was missing this from the CompletableFuture java docs
Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.
so the thread that completes the future will attempt to finish all (non-async) dependent chains
@dajac This is looking close to ready. I just had a few points.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, left a minor comment. the mirror maker test failures only exist in the last commit, so i think we're good.
@@ -184,6 +195,7 @@ class KafkaApis(val requestChannel: RequestChannel, | |||
case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request) | |||
case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request) | |||
case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request, requestLocal) | |||
.exceptionally(handleError) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: i personally think it's neater to merge this with the line above, since all the other cases are single-lined
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM! I understand why you introduce GroupCoordinatorAdapter
, to focus on the KafkaApi changes, but I don't think that would have less change. However, I don't have strong preference for it. Let's keep on it! :)
And thanks @jeffkbkim @jolshan for the review.
...dinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRequestContext.java
Outdated
Show resolved
Hide resolved
@jolshan Thanks. I thought about this and I feel like the test is self explanatory.
@showuon Yeah... The point that convinced me to do it like this was the impact on the tests of the group coordinator. The diff was quite large there. |
@jeffkbkim @jolshan @showuon Thanks for your comments. I have addressed them. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm -- thanks David!
...dinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRequestContext.java
Outdated
Show resolved
Hide resolved
...dinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRequestContext.java
Outdated
Show resolved
Hide resolved
I have addressed Jason's comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, LGTM
* apache-kafka/trunk: (31 commits) KAFKA-14413: Separate MirrorMaker configurations for each connector (apache#12899) MINOR: Fix config documentation formatting (apache#12921) KAFKA-13731: Allow standalone workers to be started without providing any connector configurations (apache#11890) MINOR: Fix docs in security.html (apache#12851) MINOR: Remove config/kraft/README.md from rat exclusion list (apache#12923) MINOR: Update Gradle to 7.6 (apache#12918) KAFKA-14414: Fix request/response header size calculation (apache#12917) KAFKA-14339 : Do not perform producerCommit on serializationError when trying offsetWriter flush (apache#12920) KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface (apache#12845) MINOR: buildoutput.log should be under the `build` directory (apache#12919) ...
…ty in JoinGroupResponse (apache#12864) This is a small refactor extracted from apache#12845. It basically moves the logic to handle the backward compatibility of `JoinGroupResponseData.protocolName` from `KafkaApis` to `JoinGroupResponse`. The patch adds a new unit test for `JoinGroupResponse` and relies on existing tests as well. Reviewers: Justine Olshan <jolshan@confluent.io>, Jason Gustafson <jason@confluent.io>
…pache#12845) This patch adds `joinGroup` to the new `GroupCoordinator` interface and updates `KafkaApis` to use it. For the context, I will do the same for all the other interactions with the current group coordinator. In order to limit the changes, I have chosen to introduce the `GroupCoordinatorAdapter` that translates the new interface to the old one. It is basically a wrapper. This allows keeping the current group coordinator untouched for now and focus on the `KafkaApis` changes. Eventually, we can remove `GroupCoordinatorAdapter`. Reviewers: Justine Olshan <jolshan@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Luke Chen <showuon@gmail.com>, Jason Gustafson <jason@confluent.io>
This patch adds
joinGroup
to the newGroupCoordinator
interface and updatesKafkaApis
to use it.For the context, I will do the same for all the other interactions with the current group coordinator. In order to limit the changes, I have chosen to introduce the
GroupCoordinatorAdapter
that translates the new interface to the old one. It is basically a wrapper. This allows keeping the current group coordinator untouched for now and focus on theKafkaApis
changes. Eventually, we can removeGroupCoordinatorAdapter
.Committer Checklist (excluded from commit message)