-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-14500; [2/N] Rewrite GroupMetadata in Java #13663
Conversation
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jeffkbkim Thanks for the patch. I made a first pass on it and left some small comments.
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
Outdated
Show resolved
Hide resolved
...-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupState.java
Outdated
Show resolved
Hide resolved
...-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupState.java
Outdated
Show resolved
Hide resolved
|
||
@BeforeEach | ||
public void initialize() { | ||
group = new GenericGroup(new LogContext(), "groupId", Empty, Time.SYSTEM); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I usually prefer to avoid global variable like this in tests but I leave it up to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'll keep this as is since this variable is used for all tests.
...p-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
Outdated
Show resolved
Hide resolved
...p-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java
Outdated
Show resolved
Hide resolved
...-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupState.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
Outdated
Show resolved
Hide resolved
private GenericGroupState state; | ||
|
||
/** | ||
* The timestamp of when the group transitioned |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:- of -> from*
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks grammatically correct
private final Map<String, String> staticMembers = new HashMap<>(); | ||
|
||
/** | ||
* Members who have yet to (re)join the group |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: have -> are*
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks grammatically correct
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oya yep mb
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jeffkbkim Thanks for the update. I left some comments for consideration.
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
Outdated
Show resolved
Hide resolved
public boolean isGenericGroup() { | ||
return protocolType.map(type -> | ||
type.equals(ConsumerProtocol.PROTOCOL_TYPE) | ||
).orElse(false); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was this method in the old implementation? The name is a bit weird because a generic group could use different protocol types (e.g. connect). Should it be named useConsumerGroupProtocol
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it. this is from
def isConsumerGroup: Boolean = protocolType.contains(ConsumerProtocol.PROTOCOL_TYPE)
the naming is confusing because we both consumer and generic groups but a generic group can expect a group using the consumer protocol
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
Outdated
Show resolved
Hide resolved
...p-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java
Outdated
Show resolved
Hide resolved
...p-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java
Outdated
Show resolved
Hide resolved
...p-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java
Show resolved
Hide resolved
...p-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java
Show resolved
Hide resolved
...p-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
Outdated
Show resolved
Hide resolved
* The timestamp of when the group transitioned | ||
* to its current state. | ||
*/ | ||
private Optional<Long> currentStateTimestamp; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this necessary to be an Optional? As far as I see we immediately define it in the constructor and we never set it equal to something which is empty. Am I missing something obvious?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit awkward as the existing GroupMetadata updates this field when we read the group metadata record (https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L1225).
So we should expect the new group metadata manager introduced in #13639 to perform this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I do not understand. This field is currently private so the only way we can set it now is either in the constructor or via a setter. Do you mean that in the near future either we will make this field public or we will add a setter which will then be called by the GroupMetadataManager to possibly set this to an empty Optional? If so, then okay, this makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks.
Rewrites GroupMetadata as GenericGroup that will be used with the new group coordinator. Offset related fields, classes, and methods will not be included as they will be reworked as a separate offset manager component.
Committer Checklist (excluded from commit message)