Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14500; [2/N] Rewrite GroupMetadata in Java #13663

Merged
merged 15 commits into from
May 12, 2023
Prev Previous commit
Next Next commit
address comments
  • Loading branch information
jeffkbkim committed May 11, 2023
commit 12385306d5f403c9eb0899e7f4d7a8812ae03de5
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import static org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE;

/**
*
* This class holds metadata for a generic group where the
* member assignment is driven solely from the client side.
*
Expand Down Expand Up @@ -80,11 +79,6 @@ public class GenericGroup implements Group {
*/
private static final String MEMBER_ID_DELIMITER = "-";

/**
* The slf4j log context, used to create new loggers.
*/
private final LogContext logContext;

/**
* The slf4j logger.
*/
Expand Down Expand Up @@ -180,7 +174,7 @@ public GenericGroup(
GenericGroupState initialState,
Time time
) {
this.logContext = Objects.requireNonNull(logContext);
Objects.requireNonNull(logContext);
this.log = logContext.logger(GenericGroup.class);
this.groupId = Objects.requireNonNull(groupId);
this.state = Objects.requireNonNull(initialState);
Expand Down Expand Up @@ -249,7 +243,7 @@ public GenericGroupState currentState() {
* @param groupState the state to match against.
* @return true if the state matches, false otherwise.
*/
public boolean isState(GenericGroupState groupState) {
public boolean isInState(GenericGroupState groupState) {
return this.state == groupState;
}

Expand Down Expand Up @@ -307,12 +301,17 @@ public long currentStateTimestampOrDefault() {
/**
* @return whether the group is using the consumer protocol.
*/
public boolean isGenericGroup() {
public boolean usesConsumerGroupProtocol() {
return protocolType.map(type ->
type.equals(ConsumerProtocol.PROTOCOL_TYPE)
).orElse(false);
}

/**
* Add a member to this group.
*
* @param member the member to add.
*/
public void add(GenericGroupMember member) {
dajac marked this conversation as resolved.
Show resolved Hide resolved
add(member, null);
}
Expand Down Expand Up @@ -341,10 +340,7 @@ public void add(GenericGroupMember member, CompletableFuture<JoinGroupResponseDa
throw new IllegalStateException("The group and member's protocol type must be the same.");
}

if (!supportsProtocols(
member.protocolType(),
GenericGroupMember.plainProtocolSet(member.supportedProtocols()))) {

if (!supportsProtocols(member)) {
throw new IllegalStateException("None of the member's protocols can be supported.");
}

Expand Down Expand Up @@ -397,7 +393,8 @@ public void remove(String memberId) {
* 1. the group is currently empty (has no designated leader)
* 2. no member rejoined
*
* @return whether a new leader was elected.
* @return true if a new leader was elected or the existing
* leader rejoined, false otherwise.
*/
public boolean maybeElectNewJoinedLeader() {
if (leaderId.isPresent()) {
Expand All @@ -407,8 +404,8 @@ public boolean maybeElectNewJoinedLeader() {
if (member.isAwaitingJoin()) {
leaderId = Optional.of(member.memberId());
log.info("Group leader [memberId: {}, groupInstanceId: {}] " +
"failed to join before the rebalance timeout. Member {} " +
"was elected as the new leader.",
"failed to join before the rebalance timeout. Member {} " +
"was elected as the new leader.",
currentLeader.memberId(),
currentLeader.groupInstanceId().orElse("None"),
member
Expand All @@ -425,7 +422,7 @@ public boolean maybeElectNewJoinedLeader() {
);
return false;
}
return false;
return true;
}
return false;
}
Expand Down Expand Up @@ -642,7 +639,7 @@ Set<String> allDynamicMemberIds() {
/**
* @return number of members waiting for a join group response.
*/
public int numAwaiting() {
public int numAwaitingJoinResponse() {
return numMembersAwaitingJoinResponse;
}

Expand Down Expand Up @@ -780,6 +777,20 @@ private Set<String> candidateProtocols() {
.map(Map.Entry::getKey).collect(Collectors.toSet());
}

/**
* Checks whether at least one of the given protocols can be supported. A
* protocol can be supported if it is supported by all members.
*
* @param member the member to check.
* @return a boolean based on the condition mentioned above.
*/
public boolean supportsProtocols(GenericGroupMember member) {
return supportsProtocols(
member.protocolType(),
GenericGroupMember.plainProtocolSet(member.supportedProtocols())
);
}

/**
* Checks whether at least one of the given protocols can be supported. A
* protocol can be supported if it is supported by all members.
Expand All @@ -789,7 +800,7 @@ private Set<String> candidateProtocols() {
* @return a boolean based on the condition mentioned above.
*/
public boolean supportsProtocols(String memberProtocolType, Set<String> memberProtocols) {
if (isState(EMPTY)) {
if (isInState(EMPTY)) {
return !memberProtocolType.isEmpty() && !memberProtocols.isEmpty();
} else {
return protocolType.map(type -> type.equals(memberProtocolType)).orElse(false) &&
Expand Down Expand Up @@ -895,16 +906,19 @@ public void updateMember(
*
* @param member the member.
* @param response the join response to complete the future with.
* @return true if a join future actually completes.
*/
public void completeJoinFuture(
public boolean completeJoinFuture(
GenericGroupMember member,
JoinGroupResponseData response
) {
if (member.isAwaitingJoin()) {
member.awaitingJoinFuture().complete(response);
member.setAwaitingJoinFuture(null);
numMembersAwaitingJoinResponse--;
return true;
}
return false;
}

/**
Expand Down Expand Up @@ -950,7 +964,7 @@ public void initNextGeneration() {
* @return the members.
*/
public List<JoinGroupResponseMember> currentGenericGroupMembers() {
if (isState(DEAD) || isState(PREPARING_REBALANCE)) {
if (isInState(DEAD) || isInState(PREPARING_REBALANCE)) {
throw new IllegalStateException("Cannot obtain generic member metadata for group " +
groupId + " in state " + state);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
import static org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
import static org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
Expand All @@ -55,7 +57,6 @@ public class GenericGroupTest {
private final int rebalanceTimeoutMs = 60000;
private final int sessionTimeoutMs = 10000;

dajac marked this conversation as resolved.
Show resolved Hide resolved

private GenericGroup group = null;

@BeforeEach
Expand Down Expand Up @@ -324,27 +325,14 @@ public void testSupportsProtocols() {
);

// by default, the group supports everything
Set<String> expectedProtocols = new HashSet<>();
member1Protocols.forEach(protocol -> expectedProtocols.add(protocol.name()));
assertTrue(group.supportsProtocols(protocolType, expectedProtocols));
assertTrue(group.supportsProtocols(protocolType, mkSet("range", "roundrobin")));

group.add(member1);
group.transitionTo(PREPARING_REBALANCE);

expectedProtocols.clear();
expectedProtocols.add("roundrobin");
expectedProtocols.add("foo");
assertTrue(group.supportsProtocols(protocolType, expectedProtocols));

expectedProtocols.clear();
expectedProtocols.add("range");
expectedProtocols.add("bar");
assertTrue(group.supportsProtocols(protocolType, expectedProtocols));

expectedProtocols.clear();
expectedProtocols.add("foo");
expectedProtocols.add("bar");
assertFalse(group.supportsProtocols(protocolType, expectedProtocols));
assertTrue(group.supportsProtocols(protocolType, mkSet("roundrobin", "foo")));
assertTrue(group.supportsProtocols(protocolType, mkSet("range", "bar")));
assertFalse(group.supportsProtocols(protocolType, mkSet("foo", "bar")));
}

@Test
Expand Down Expand Up @@ -530,7 +518,7 @@ public void testReplaceGroupInstanceWithNonExistingMember() {
}

@Test
public void testReplaceGroupInstance() {
public void testReplaceGroupInstance() throws Exception {
GenericGroupMember member = new GenericGroupMember(
memberId,
Optional.of(groupInstanceId),
Expand All @@ -547,33 +535,28 @@ public void testReplaceGroupInstance() {
)
);

AtomicBoolean joinAwaitingMemberFenced = new AtomicBoolean(false);
CompletableFuture<JoinGroupResponseData> joinGroupFuture = new CompletableFuture<>();
joinGroupFuture.whenComplete((joinGroupResult, __) ->
joinAwaitingMemberFenced.set(joinGroupResult.errorCode() == Errors.FENCED_INSTANCE_ID.code()));
group.add(member, joinGroupFuture);

AtomicBoolean syncAwaitingMemberFenced = new AtomicBoolean(false);
CompletableFuture<SyncGroupResponseData> syncGroupFuture = new CompletableFuture<>();
syncGroupFuture.whenComplete((syncGroupResult, __) ->
syncAwaitingMemberFenced.set(syncGroupResult.errorCode() == Errors.FENCED_INSTANCE_ID.code()));
member.setAwaitingSyncFuture(syncGroupFuture);

assertTrue(group.isLeader(memberId));
assertEquals(memberId, group.staticMemberId(groupInstanceId));

String newMemberId = "newMemberId";
group.replaceStaticMember(groupInstanceId, memberId, newMemberId);

assertTrue(group.isLeader(newMemberId));
assertEquals(newMemberId, group.staticMemberId(groupInstanceId));
assertTrue(joinAwaitingMemberFenced.get());
assertTrue(syncAwaitingMemberFenced.get());
assertEquals(Errors.FENCED_INSTANCE_ID.code(), joinGroupFuture.get().errorCode());
assertEquals(Errors.FENCED_INSTANCE_ID.code(), syncGroupFuture.get().errorCode());
assertFalse(member.isAwaitingJoin());
assertFalse(member.isAwaitingSync());
}

@Test
public void testCompleteJoinFuture() {
public void testCompleteJoinFuture() throws Exception {
GenericGroupMember member = new GenericGroupMember(
memberId,
Optional.empty(),
Expand All @@ -590,18 +573,18 @@ public void testCompleteJoinFuture() {
)
);

AtomicBoolean invoked = new AtomicBoolean(false);
CompletableFuture<JoinGroupResponseData> joinGroupFuture = new CompletableFuture<>();
joinGroupFuture.whenComplete((__, ___) ->
invoked.set(true));
group.add(member, joinGroupFuture);

assertTrue(group.hasAllMembersJoined());
group.completeJoinFuture(member, new JoinGroupResponseData()
.setMemberId(member.memberId())
.setErrorCode(Errors.NONE.code()));
assertTrue(
group.completeJoinFuture(member, new JoinGroupResponseData()
.setMemberId(member.memberId())
.setErrorCode(Errors.NONE.code()))
);

assertTrue(invoked.get());
assertEquals(Errors.NONE.code(), joinGroupFuture.get().errorCode());
assertEquals(memberId, joinGroupFuture.get().memberId());
assertFalse(member.isAwaitingJoin());
}

Expand All @@ -626,15 +609,17 @@ public void testNotCompleteJoinFuture() {
group.add(member);

assertFalse(member.isAwaitingJoin());
group.completeJoinFuture(member, new JoinGroupResponseData()
.setMemberId(member.memberId())
.setErrorCode(Errors.NONE.code()));
assertFalse(
group.completeJoinFuture(member, new JoinGroupResponseData()
.setMemberId(member.memberId())
.setErrorCode(Errors.NONE.code()))
);

assertFalse(member.isAwaitingJoin());
}

@Test
public void testCompleteSyncFuture() {
public void testCompleteSyncFuture() throws Exception {
GenericGroupMember member = new GenericGroupMember(
memberId,
Optional.empty(),
Expand All @@ -659,6 +644,7 @@ public void testCompleteSyncFuture() {
.setErrorCode(Errors.NONE.code())));

assertFalse(member.isAwaitingSync());
jeffkbkim marked this conversation as resolved.
Show resolved Hide resolved
assertEquals(Errors.NONE.code(), syncGroupFuture.get().errorCode());
}

@Test
Expand All @@ -680,6 +666,7 @@ public void testNotCompleteSyncFuture() {
);

group.add(member);
assertFalse(member.isAwaitingSync());

assertFalse(group.completeSyncFuture(member, new SyncGroupResponseData()
.setErrorCode(Errors.NONE.code())));
Expand Down Expand Up @@ -921,6 +908,49 @@ public void testElectNewJoinedLeader() {
assertTrue(group.isLeader("new-leader"));
}

@Test
public void testMaybeElectNewJoinedLeaderChooseExisting() {
GenericGroupMember leader = new GenericGroupMember(
memberId,
Optional.empty(),
clientId,
clientHost,
rebalanceTimeoutMs,
sessionTimeoutMs,
protocolType,
Collections.singletonList(
new Protocol(
"roundrobin",
new byte[0]
)
)
);

group.add(leader, new CompletableFuture<>());
assertTrue(group.isLeader(memberId));
assertTrue(leader.isAwaitingJoin());

GenericGroupMember newMember = new GenericGroupMember(
"new-member",
Optional.empty(),
clientId,
clientHost,
rebalanceTimeoutMs,
sessionTimeoutMs,
protocolType,
Collections.singletonList(
new Protocol(
"roundrobin",
new byte[0]
)
)
);
group.add(newMember);

assertTrue(group.maybeElectNewJoinedLeader());
assertTrue(group.isLeader(memberId));
}

private void assertState(GenericGroup group, GenericGroupState targetState) {
Set<GenericGroupState> otherStates = new HashSet<>();
otherStates.add(STABLE);
Expand All @@ -929,7 +959,7 @@ private void assertState(GenericGroup group, GenericGroupState targetState) {
otherStates.add(DEAD);
otherStates.remove(targetState);

otherStates.forEach(otherState -> assertFalse(group.isState(otherState)));
assertTrue(group.isState(targetState));
otherStates.forEach(otherState -> assertFalse(group.isInState(otherState)));
assertTrue(group.isInState(targetState));
}
}