Skip to content

Commit

Permalink
MINOR: Improve code style in FenceProducersHandler (apache#12208)
Browse files Browse the repository at this point in the history
Reviewers: Jason Gustafson <jason@confluent.io>
  • Loading branch information
dajac committed May 28, 2022
1 parent 620ada9 commit 6b93652
Showing 1 changed file with 22 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ public FenceProducersHandler(
}

public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, ProducerIdAndEpoch> newFuture(
Collection<String> transactionalIds
Collection<String> transactionalIds
) {
return AdminApiFuture.forKeys(buildKeySet(transactionalIds));
}

private static Set<CoordinatorKey> buildKeySet(Collection<String> transactionalIds) {
return transactionalIds.stream()
.map(CoordinatorKey::byTransactionalId)
.collect(Collectors.toSet());
.map(CoordinatorKey::byTransactionalId)
.collect(Collectors.toSet());
}

@Override
Expand All @@ -75,24 +75,24 @@ InitProducerIdRequest.Builder buildSingleRequest(int brokerId, CoordinatorKey ke
" when building `InitProducerId` request");
}
InitProducerIdRequestData data = new InitProducerIdRequestData()
// Because we never include a producer epoch or ID in this request, we expect that some errors
// (such as PRODUCER_FENCED) will never be returned in the corresponding broker response.
// If we ever modify this logic to include an epoch or producer ID, we will need to update the
// error handling logic for this handler to accommodate these new errors.
.setProducerEpoch(ProducerIdAndEpoch.NONE.epoch)
.setProducerId(ProducerIdAndEpoch.NONE.producerId)
.setTransactionalId(key.idValue)
// Set transaction timeout to 1 since it's only being initialized to fence out older producers with the same transactional ID,
// and shouldn't be used for any actual record writes
.setTransactionTimeoutMs(1);
// Because we never include a producer epoch or ID in this request, we expect that some errors
// (such as PRODUCER_FENCED) will never be returned in the corresponding broker response.
// If we ever modify this logic to include an epoch or producer ID, we will need to update the
// error handling logic for this handler to accommodate these new errors.
.setProducerEpoch(ProducerIdAndEpoch.NONE.epoch)
.setProducerId(ProducerIdAndEpoch.NONE.producerId)
.setTransactionalId(key.idValue)
// Set transaction timeout to 1 since it's only being initialized to fence out older producers with the same transactional ID,
// and shouldn't be used for any actual record writes
.setTransactionTimeoutMs(1);
return new InitProducerIdRequest.Builder(data);
}

@Override
public ApiResult<CoordinatorKey, ProducerIdAndEpoch> handleSingleResponse(
Node broker,
CoordinatorKey key,
AbstractResponse abstractResponse
Node broker,
CoordinatorKey key,
AbstractResponse abstractResponse
) {
InitProducerIdResponse response = (InitProducerIdResponse) abstractResponse;

Expand All @@ -102,14 +102,17 @@ public ApiResult<CoordinatorKey, ProducerIdAndEpoch> handleSingleResponse(
}

Map<CoordinatorKey, ProducerIdAndEpoch> completed = Collections.singletonMap(key, new ProducerIdAndEpoch(
response.data().producerId(),
response.data().producerEpoch()
response.data().producerId(),
response.data().producerEpoch()
));

return new ApiResult<>(completed, Collections.emptyMap(), Collections.emptyList());
}

private ApiResult<CoordinatorKey, ProducerIdAndEpoch> handleError(CoordinatorKey transactionalIdKey, Errors error) {
private ApiResult<CoordinatorKey, ProducerIdAndEpoch> handleError(
CoordinatorKey transactionalIdKey,
Errors error
) {
switch (error) {
case CLUSTER_AUTHORIZATION_FAILED:
return ApiResult.failed(transactionalIdKey, new ClusterAuthorizationException(
Expand Down

0 comments on commit 6b93652

Please sign in to comment.