Skip to content

Commit

Permalink
Correct check for asigned enhanced consumer on the lease manager
Browse files Browse the repository at this point in the history
  • Loading branch information
eaviles committed Apr 29, 2019
1 parent 91c2ebf commit 83e5d1f
Showing 1 changed file with 12 additions and 8 deletions.
20 changes: 12 additions & 8 deletions lib/lease-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ class LeaseManager {
logger,
stateStore,
streamName,
useAutoShardAssignment
useAutoShardAssignment,
useEnhancedFanOut
} = options;

Object.assign(internal(this), {
Expand All @@ -130,13 +131,14 @@ class LeaseManager {
isStandalone: !useAutoShardAssignment,
logger,
stateStore,
streamName
streamName,
useEnhancedFanOut
});
}

async start() {
const privateProps = internal(this);
const { consumersManager, logger, stateStore } = privateProps;
const { consumersManager, logger, stateStore, useEnhancedFanOut } = privateProps;

const acquireLeases = async () => {
logger.debug('Trying to acquire leases…');
Expand All @@ -148,11 +150,13 @@ class LeaseManager {
return;
}

const consumerArn = await stateStore.getAssignedEnhancedConsumer();
if (!consumerArn) {
logger.debug("Can't acquire leases now as there's no assigned enhanced consumer.");
privateProps.timeoutId = setTimeout(acquireLeases, ACQUIRE_LEASES_INTERVAL);
return;
if (useEnhancedFanOut) {
const consumerArn = await stateStore.getAssignedEnhancedConsumer();
if (!consumerArn) {
logger.debug("Can't acquire leases now as there's no assigned enhanced consumer.");
privateProps.timeoutId = setTimeout(acquireLeases, ACQUIRE_LEASES_INTERVAL);
return;
}
}

const shards = await getStreamShards(privateProps);
Expand Down

0 comments on commit 83e5d1f

Please sign in to comment.