Skip to content

Commit

Permalink
KAFKA-3299: Ensure that reading config log on rebalance doesn't hang …
Browse files Browse the repository at this point in the history
…the herder

Author: Gwen Shapira <cshapi@gmail.com>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes apache#981 from gwenshap/KAFKA-3299
  • Loading branch information
gwenshap authored and ewencp committed Mar 4, 2016
1 parent 6d65ede commit b7d6fae
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
// and the from other nodes are safe to process
private boolean rebalanceResolved;
private ConnectProtocol.Assignment assignment;
private boolean canReadConfigs;

// To handle most external requests, like creating or destroying a connector, we can use a generic request where
// the caller specifies all the code that should be executed.
Expand Down Expand Up @@ -150,6 +151,7 @@ public DistributedHerder(DistributedConfig config,

rebalanceResolved = true; // If we still need to follow up after a rebalance occurred, starting up tasks
needsReconfigRebalance = false;
canReadConfigs = true; // We didn't try yet, but Configs are readable until proven otherwise

forwardRequestExecutor = Executors.newSingleThreadExecutor();
}
Expand Down Expand Up @@ -206,6 +208,11 @@ public void tick() {
// blocking up this thread (especially those in callbacks due to rebalance events).

try {
// if we failed to read to end of log before, we need to make sure the issue was resolved before joining group
// Joining and immediately leaving for failure to read configs is exceedingly impolite
if (!canReadConfigs && !readConfigToEnd(workerSyncTimeoutMs))
return; // Safe to return and tick immediately because readConfigToEnd will do the backoff for us

member.ensureActive();
// Ensure we're in a good state in our group. If not restart and everything should be setup to rejoin
if (!handleRebalanceCompleted()) return;
Expand Down Expand Up @@ -574,21 +581,18 @@ private boolean handleRebalanceCompleted() {
// even attempting to. If we can't we should drop out of the group because we will block everyone from making
// progress. We can backoff and try rejoining later.
// 1b. We are not the leader. We might need to catch up. If we're already caught up we can rejoin immediately,
// otherwise, we just want to wait indefinitely to catch up and rejoin whenever we're finally ready.
// otherwise, we just want to wait reasonable amount of time to catch up and rejoin if we are ready.
// 2. Assignment succeeded.
// 2a. We are caught up on configs. Awesome! We can proceed to run our assigned work.
// 2b. We need to try to catch up. We can do this potentially indefinitely because if it takes to long, we'll
// be kicked out of the group anyway due to lack of heartbeats.
// 2b. We need to try to catch up - try reading configs for reasonable amount of time.

boolean needsReadToEnd = false;
long syncConfigsTimeoutMs = Long.MAX_VALUE;
boolean needsRejoin = false;
if (assignment.failed()) {
needsRejoin = true;
if (isLeader()) {
log.warn("Join group completed, but assignment failed and we are the leader. Reading to end of config and retrying.");
needsReadToEnd = true;
syncConfigsTimeoutMs = workerSyncTimeoutMs;
} else if (configState.offset() < assignment.offset()) {
log.warn("Join group completed, but assignment failed and we lagging. Reading to end of config and retrying.");
needsReadToEnd = true;
Expand All @@ -604,11 +608,13 @@ private boolean handleRebalanceCompleted() {

if (needsReadToEnd) {
// Force exiting this method to avoid creating any connectors/tasks and require immediate rejoining if
// we timed out. This should only happen if we were the leader and didn't finish quickly enough, in which
// case we've waited a long time and should have already left the group OR the timeout should have been
// very long and not having finished also indicates we've waited longer than the session timeout.
if (!readConfigToEnd(syncConfigsTimeoutMs))
// we timed out. This should only happen if we failed to read configuration for long enough,
// in which case giving back control to the main loop will prevent hanging around indefinitely after getting kicked out of the group.
// We also indicate to the main loop that we failed to readConfigs so it will check that the issue was resolved before trying to join the group
if (!readConfigToEnd(workerSyncTimeoutMs)) {
canReadConfigs = false;
needsRejoin = true;
}
}

if (needsRejoin) {
Expand Down Expand Up @@ -646,11 +652,11 @@ private boolean readConfigToEnd(long timeoutMs) {
log.info("Finished reading to end of log and updated config snapshot, new config log offset: {}", configState.offset());
return true;
} catch (TimeoutException e) {
// in case reading the log takes too long, leave the group to ensure a quick rebalance (although by default we should be out of the group already)
// and back off to avoid a tight loop of rejoin-attempt-to-catch-up-leave
log.warn("Didn't reach end of config log quickly enough", e);
// TODO: With explicit leave group support, it would be good to explicitly leave the group *before* this
// backoff since it'll be longer than the session timeout
if (isLeader())
backoff(workerUnsyncBackoffMs);
member.maybeLeaveGroup();
backoff(workerUnsyncBackoffMs);
return false;
} catch (InterruptedException | ExecutionException e) {
throw new ConnectException("Error trying to catch up after assignment", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@ public void requestRejoin() {
coordinator.requestRejoin();
}

public void maybeLeaveGroup() {
coordinator.maybeLeaveGroup();
}

private void stop(boolean swallowException) {
log.trace("Stopping the Connect group member.");
AtomicReference<Throwable> firstException = new AtomicReference<Throwable>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,8 @@ public void testJoinLeaderCatchUpFails() throws Exception {
TestFuture<Void> readToEndFuture = new TestFuture<>();
readToEndFuture.resolveOnGet(new TimeoutException());
EasyMock.expect(configStorage.readToEnd()).andReturn(readToEndFuture);
member.maybeLeaveGroup();
EasyMock.expectLastCall();
PowerMock.expectPrivate(herder, "backoff", DistributedConfig.WORKER_UNSYNC_BACKOFF_MS_DEFAULT);
member.requestRejoin();

Expand Down

0 comments on commit b7d6fae

Please sign in to comment.