Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14367; Add JoinGroup to the new GroupCoordinator interface #12845

Merged
merged 8 commits into from
Nov 29, 2022

Conversation

dajac
Copy link
Contributor

@dajac dajac commented Nov 14, 2022

This patch adds joinGroup to the new GroupCoordinator interface and updates KafkaApis to use it.

For the context, I will do the same for all the other interactions with the current group coordinator. In order to limit the changes, I have chosen to introduce the GroupCoordinatorAdapter that translates the new interface to the old one. It is basically a wrapper. This allows keeping the current group coordinator untouched for now and focus on the KafkaApis changes. Eventually, we can remove GroupCoordinatorAdapter.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

testJoinGroupProtocolType(version.asInstanceOf[Short])
}
}
def testHandleJoinGroupRequestFutureFailed(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this also a new test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right.

Copy link
Contributor

@jolshan jolshan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems reasonable so far. This is just an initial pass. I will take another pass soon.

Copy link
Contributor

@jeffkbkim jeffkbkim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the PR, left some comments.

is the idea to add all existing group coordinator apis to GroupCoordinatorAdapter, then introduce the new GroupCoordinator which implements GroupCoordinator interface with new design, then remove GroupCoordinatorAdapter and point KafkaApis to the new GroupCoordinator?

@dajac
Copy link
Contributor Author

dajac commented Nov 16, 2022

@jeffkbkim @jolshan Thanks for your comments. I have addressed your feedback. Note that I have extracted the changes about the JoinGroupResponse version handling in a separate PR: #12864. It is less risky this way.

hachikuji pushed a commit that referenced this pull request Nov 16, 2022
…ty in JoinGroupResponse (#12864)

This is a small refactor extracted from #12845. It basically moves the logic to handle the backward compatibility of `JoinGroupResponseData.protocolName` from `KafkaApis` to `JoinGroupResponse`.

The patch adds a new unit test for `JoinGroupResponse` and relies on existing tests as well.

Reviewers: Justine Olshan <jolshan@confluent.io>, Jason Gustafson <jason@confluent.io>
@dajac
Copy link
Contributor Author

dajac commented Nov 17, 2022

Rebased the PR. Ready for second round.

Copy link
Contributor

@jeffkbkim jeffkbkim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks! left some comments.

core/src/main/scala/kafka/server/KafkaApis.scala Outdated Show resolved Hide resolved
ArgumentMatchers.eq(joinGroupRequest)
)).thenReturn(future)

val response = new AtomicReference[JoinGroupResponse]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you help me understand the reason for using an atomic reference?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i just needed a container to store the response. i could have used a var initialized to null as well, i suppose.

Comment on lines 2728 to 2731
createKafkaApis().handle(
requestChannelRequest,
RequestLocal.NoCaching
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm surely missing something here - shouldn't the test be blocked here? i can't seem to find the other thread running

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it does not. keep in mind that we use a future internally so the method is executed and returns immediately. then, the future is completed later on by future.completeExceptionally(Errors.NOT_COORDINATOR.exception).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gotcha. i think i was missing this from the CompletableFuture java docs

Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.

so the thread that completes the future will attempt to finish all (non-async) dependent chains

@dajac
Copy link
Contributor Author

dajac commented Nov 18, 2022

@jeffkbkim Thanks for the review. I have addressed your comments.

Comment on lines 2728 to 2731
createKafkaApis().handle(
requestChannelRequest,
RequestLocal.NoCaching
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gotcha. i think i was missing this from the CompletableFuture java docs

Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.

so the thread that completes the future will attempt to finish all (non-async) dependent chains

@jolshan
Copy link
Contributor

jolshan commented Nov 21, 2022

@dajac This is looking close to ready. I just had a few points.

  1. I left a comment about leaving more comments on tests, not a big deal either way, but just wanted to remind.
  2. I see a lot of mirror maker tests failing that I couldn't find on other branches. It is likely unrelated, but wanted to confirm.

Copy link
Contributor

@jeffkbkim jeffkbkim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, left a minor comment. the mirror maker test failures only exist in the last commit, so i think we're good.

@@ -184,6 +195,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)
case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request, requestLocal)
.exceptionally(handleError)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: i personally think it's neater to merge this with the line above, since all the other cases are single-lined

Copy link
Contributor

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM! I understand why you introduce GroupCoordinatorAdapter, to focus on the KafkaApi changes, but I don't think that would have less change. However, I don't have strong preference for it. Let's keep on it! :)

And thanks @jeffkbkim @jolshan for the review.

@dajac
Copy link
Contributor Author

dajac commented Nov 22, 2022

I left a comment about leaving more comments on tests, not a big deal either way, but just wanted to remind.

@jolshan Thanks. I thought about this and I feel like the test is self explanatory.

I understand why you introduce GroupCoordinatorAdapter, to focus on the KafkaApi changes, but I don't think that would have less change

@showuon Yeah... The point that convinced me to do it like this was the impact on the tests of the group coordinator. The diff was quite large there.

@dajac
Copy link
Contributor Author

dajac commented Nov 22, 2022

@jeffkbkim @jolshan @showuon Thanks for your comments. I have addressed them.

Copy link
Contributor

@jolshan jolshan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm -- thanks David!

@dajac
Copy link
Contributor Author

dajac commented Nov 23, 2022

I have addressed Jason's comments.

Copy link
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, LGTM

@dajac dajac merged commit 98e19b3 into apache:trunk Nov 29, 2022
@dajac dajac deleted the KAFKA-14367-join-group branch November 29, 2022 20:06
honshu added a commit to confluentinc/kafka that referenced this pull request Nov 30, 2022
* apache-kafka/trunk: (31 commits)
  KAFKA-14413: Separate MirrorMaker configurations for each connector (apache#12899)
  MINOR: Fix config documentation formatting (apache#12921)
  KAFKA-13731: Allow standalone workers to be started without providing any connector configurations (apache#11890)
  MINOR: Fix docs in security.html (apache#12851)
  MINOR: Remove config/kraft/README.md from rat exclusion list (apache#12923)
  MINOR: Update Gradle to 7.6 (apache#12918)
  KAFKA-14414: Fix request/response header size calculation (apache#12917)
  KAFKA-14339 : Do not perform producerCommit on serializationError when trying offsetWriter flush (apache#12920)
  KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface (apache#12845)
  MINOR: buildoutput.log should be under the `build` directory (apache#12919)
  ...
guozhangwang pushed a commit to guozhangwang/kafka that referenced this pull request Jan 25, 2023
…ty in JoinGroupResponse (apache#12864)

This is a small refactor extracted from apache#12845. It basically moves the logic to handle the backward compatibility of `JoinGroupResponseData.protocolName` from `KafkaApis` to `JoinGroupResponse`.

The patch adds a new unit test for `JoinGroupResponse` and relies on existing tests as well.

Reviewers: Justine Olshan <jolshan@confluent.io>, Jason Gustafson <jason@confluent.io>
guozhangwang pushed a commit to guozhangwang/kafka that referenced this pull request Jan 25, 2023
…pache#12845)

This patch adds `joinGroup` to the new `GroupCoordinator` interface and updates `KafkaApis` to use it.

For the context, I will do the same for all the other interactions with the current group coordinator. In order to limit the changes, I have chosen to introduce the `GroupCoordinatorAdapter` that translates the new interface to the old one. It is basically a wrapper. This allows keeping the current group coordinator untouched for now and focus on the `KafkaApis` changes. Eventually, we can remove `GroupCoordinatorAdapter`.

Reviewers: Justine Olshan <jolshan@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Luke Chen <showuon@gmail.com>, Jason Gustafson <jason@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
5 participants