Skip to content

Commit

Permalink
Change receivePump to start using Executor provided to EventHubClient (
Browse files Browse the repository at this point in the history
…#317)

* move whenCompleteAsync's usage to handleAsync
* logs first, error handlers next!
* optimize extra Runnable objects created for `ReceivePump`
* fix scheduling issue - reschedule should happen inside partitionReceiver.receive().handle{...}
  • Loading branch information
SreeramGarlapati committed May 18, 2018
1 parent 6a275d8 commit a521873
Show file tree
Hide file tree
Showing 8 changed files with 231 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import org.slf4j.LoggerFactory;

public class BaseLinkHandler extends BaseHandler {
protected static final Logger TRACE_LOGGER = LoggerFactory.getLogger(BaseHandler.class);
protected static final Logger TRACE_LOGGER = LoggerFactory.getLogger(BaseLinkHandler.class);

private final AmqpLink underlyingEntity;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,15 +249,19 @@ public void setReceiveTimeout(final Duration value) {
public CompletableFuture<Collection<Message>> receive(final int maxMessageCount) {
this.throwIfClosed();

final CompletableFuture<Collection<Message>> onReceive = new CompletableFuture<>();
if (maxMessageCount <= 0 || maxMessageCount > this.prefetchCount) {
throw new IllegalArgumentException(String.format(Locale.US, "parameter 'maxMessageCount' should be a positive number and should be less than prefetchCount(%s)", this.prefetchCount));
onReceive.completeExceptionally(new IllegalArgumentException(String.format(
Locale.US,
"parameter 'maxMessageCount' should be a positive number and should be less than prefetchCount(%s)",
this.prefetchCount)));
return onReceive;
}

if (this.pendingReceives.isEmpty()) {
timer.schedule(this.onOperationTimedout, this.receiveTimeout);
}

final CompletableFuture<Collection<Message>> onReceive = new CompletableFuture<>();
pendingReceives.offer(new ReceiveWorkItem(onReceive, receiveTimeout, maxMessageCount));

try {
Expand Down Expand Up @@ -558,13 +562,15 @@ public void run() {
}
, timeout.remaining());

this.openTimer.whenCompleteAsync(
this.openTimer.handleAsync(
(unUsed, exception) -> {
if (exception != null
&& exception instanceof Exception
&& !(exception instanceof CancellationException)) {
ExceptionUtil.completeExceptionally(linkOpen.getWork(), (Exception) exception, MessageReceiver.this);
}

return null;
}, this.executor);
}

Expand Down Expand Up @@ -593,11 +599,13 @@ public void run() {
}
, timeout.remaining());

this.closeTimer.whenCompleteAsync(
this.closeTimer.handleAsync(
(unUsed, exception) -> {
if (exception != null && exception instanceof Exception && !(exception instanceof CancellationException)) {
ExceptionUtil.completeExceptionally(linkClose, (Exception) exception, MessageReceiver.this);
}

return null;
}, this.executor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,14 @@ private CompletableFuture<Void> sendCore(

// if the timeoutTask completed with scheduling error - notify sender
if (timeoutTimerTask.isCompletedExceptionally()) {
timeoutTimerTask.whenCompleteAsync(
timeoutTimerTask.handleAsync(
(unUsed, exception) -> {
if (exception != null && !(exception instanceof CancellationException))
onSendFuture.completeExceptionally(
new OperationCancelledException("Send failed while dispatching to Reactor, see cause for more details.",
exception));

return null;
}, this.executor);

return onSendFuture;
Expand Down Expand Up @@ -642,13 +644,15 @@ public void run() {
}
, timeout.remaining());

this.openTimer.whenCompleteAsync(
this.openTimer.handleAsync(
(unUsed, exception) -> {
if (exception != null
&& exception instanceof Exception
&& !(exception instanceof CancellationException)) {
ExceptionUtil.completeExceptionally(linkFirstOpen, (Exception) exception, this);
}

return null;
}, this.executor);
}

Expand Down Expand Up @@ -830,11 +834,13 @@ public void run() {
}
, timeout.remaining());

this.closeTimer.whenCompleteAsync(
this.closeTimer.handleAsync(
(unUsed, exception) -> {
if (exception != null && exception instanceof Exception && !(exception instanceof CancellationException)) {
ExceptionUtil.completeExceptionally(linkClose, (Exception) exception, MessageSender.this);
}

return null;
}, this.executor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,14 @@ public void run() {
messagingFactory.getOperationTimeout());

// if scheduling messagingfactory openTimer fails - notify user and stop
messagingFactory.openTimer.whenCompleteAsync(
messagingFactory.openTimer.handleAsync(
(unUsed, exception) -> {
if (exception != null && !(exception instanceof CancellationException)) {
messagingFactory.open.completeExceptionally(exception);
messagingFactory.getReactor().stop();
}

return null;
}, messagingFactory.executor);

return messagingFactory.open;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ public CompletableFuture<Void> setReceiveHandler(final PartitionReceiveHandler r
this.receivePump = new ReceivePump(
new ReceivePump.IPartitionReceiver() {
@Override
public Iterable<EventData> receive(int maxBatchSize) throws EventHubException {
return PartitionReceiverImpl.this.receiveSync(maxBatchSize);
public CompletableFuture<Iterable<EventData>> receive(int maxBatchSize) {
return PartitionReceiverImpl.this.receive(maxBatchSize);
}

@Override
Expand All @@ -195,16 +195,10 @@ public String getPartitionId() {
}
},
receiveHandler,
invokeWhenNoEvents);
invokeWhenNoEvents,
this.executor);

final Thread onReceivePumpThread = new Thread(new Runnable() {
@Override
public void run() {
receivePump.run();
}
});

onReceivePumpThread.start();
this.executor.execute(this.receivePump);
}

return CompletableFuture.completedFuture(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,74 +5,74 @@
package com.microsoft.azure.eventhubs.impl;

import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubException;
import com.microsoft.azure.eventhubs.PartitionReceiveHandler;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;

public class ReceivePump {
public class ReceivePump implements Runnable {
private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(ReceivePump.class);

private final IPartitionReceiver receiver;
private final PartitionReceiveHandler onReceiveHandler;
private final boolean invokeOnTimeout;
private final CompletableFuture<Void> stopPump;
private final Executor executor;
private final ProcessAndReschedule processAndReschedule;

private AtomicBoolean stopPumpRaised;
private volatile boolean isPumpHealthy = true;

public ReceivePump(
final IPartitionReceiver receiver,
final PartitionReceiveHandler receiveHandler,
final boolean invokeOnReceiveWithNoEvents) {
final boolean invokeOnReceiveWithNoEvents,
final Executor executor) {
this.receiver = receiver;
this.onReceiveHandler = receiveHandler;
this.invokeOnTimeout = invokeOnReceiveWithNoEvents;
this.stopPump = new CompletableFuture<Void>();
this.stopPump = new CompletableFuture<>();
this.executor = executor;
this.processAndReschedule = new ProcessAndReschedule();

this.stopPumpRaised = new AtomicBoolean(false);
}

// entry-point - for runnable
public void run() {
boolean isPumpHealthy = true;
while (isPumpHealthy && !this.stopPumpRaised.get()) {
Iterable<EventData> receivedEvents = null;
try {
ReceivePump.this.receiveAndProcess();
} catch (final Exception exception) {
if (TRACE_LOGGER.isErrorEnabled()) {
TRACE_LOGGER.error(
String.format("Receive pump for partition (%s) encountered unrecoverable error and exited with exception %s.",
ReceivePump.this.receiver.getPartitionId(), exception.toString()));
}

try {
receivedEvents = this.receiver.receive(this.onReceiveHandler.getMaxEventCount());
} catch (Throwable clientException) {
isPumpHealthy = false;
this.onReceiveHandler.onError(clientException);
throw exception;
}
}

if (TRACE_LOGGER.isWarnEnabled()) {
TRACE_LOGGER.warn(String.format("Receive pump for partition (%s) exiting after receive exception %s", this.receiver.getPartitionId(), clientException.toString()));
}
// receives and invokes user-callback if success or stops pump if fails
public void receiveAndProcess() {
if (this.shouldContinue()) {
this.receiver.receive(this.onReceiveHandler.getMaxEventCount())
.handleAsync(this.processAndReschedule, this.executor);
} else {
if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(String.format("Stopping receive pump for partition (%s) as %s",
ReceivePump.this.receiver.getPartitionId(),
this.stopPumpRaised.get() ? "per the request." : "pump ran into errors."));
}

try {
if (receivedEvents != null || (receivedEvents == null && this.invokeOnTimeout && isPumpHealthy)) {
this.onReceiveHandler.onReceive(receivedEvents);
}
} catch (Throwable userCodeError) {
isPumpHealthy = false;
this.onReceiveHandler.onError(userCodeError);

if (userCodeError instanceof InterruptedException) {
if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(String.format("Interrupting receive pump for partition (%s)", this.receiver.getPartitionId()));
}

Thread.currentThread().interrupt();
} else {
TRACE_LOGGER.error(
String.format("Receive pump for partition (%s) exiting after user exception %s", this.receiver.getPartitionId(), userCodeError.toString()));
}
}
this.stopPump.complete(null);
}

this.stopPump.complete(null);
}

public CompletableFuture<Void> stop() {
Expand All @@ -85,9 +85,87 @@ public boolean isRunning() {
}

// partition receiver contract against which this pump works
public static interface IPartitionReceiver {
public String getPartitionId();
public interface IPartitionReceiver {
String getPartitionId();

CompletableFuture<Iterable<EventData>> receive(final int maxBatchSize);
}

private boolean shouldContinue() {
return this.isPumpHealthy && !this.stopPumpRaised.get();
}

private void handleClientExceptions(final Throwable clientException) {
if (clientException != null) {
this.isPumpHealthy = false;

if (TRACE_LOGGER.isWarnEnabled()) {
TRACE_LOGGER.warn(String.format(
"Receive pump for partition (%s) exiting after receive exception %s",
this.receiver.getPartitionId(), clientException.toString()));
}

this.onReceiveHandler.onError(clientException);
}
}

private void handleUserCodeExceptions(final Throwable userCodeException) {
this.isPumpHealthy = false;
if (TRACE_LOGGER.isErrorEnabled()) {
TRACE_LOGGER.error(
String.format("Receive pump for partition (%s) exiting after user-code exception %s",
this.receiver.getPartitionId(), userCodeException.toString()));
}

this.onReceiveHandler.onError(userCodeException);

if (userCodeException instanceof InterruptedException) {
if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(String.format("Interrupting receive pump for partition (%s)",
this.receiver.getPartitionId()));
}

Thread.currentThread().interrupt();
}
}

private void schedulePump() {
try {
this.executor.execute(this);
} catch (final RejectedExecutionException rejectedException) {
this.isPumpHealthy = false;

if (TRACE_LOGGER.isWarnEnabled()) {
TRACE_LOGGER.warn(String.format(
"Receive pump for partition (%s) exiting with error: %s",
ReceivePump.this.receiver.getPartitionId(), rejectedException.toString()));
}

this.onReceiveHandler.onError(rejectedException);
}
}

private final class ProcessAndReschedule implements BiFunction<Iterable<EventData>, Throwable, Void> {

public Iterable<EventData> receive(final int maxBatchSize) throws EventHubException;
@Override
public Void apply(final Iterable<EventData> receivedEvents, final Throwable clientException) {

ReceivePump.this.handleClientExceptions(clientException);

try {
// don't invoke user call back - if stop is already raised / pump is unhealthy
if (ReceivePump.this.shouldContinue() &&
(receivedEvents != null
|| (receivedEvents == null && ReceivePump.this.invokeOnTimeout))) {
ReceivePump.this.onReceiveHandler.onReceive(receivedEvents);
}
} catch (final Throwable userCodeError) {
ReceivePump.this.handleUserCodeExceptions(userCodeError);
}

ReceivePump.this.schedulePump();

return null;
}
}
}
Loading

0 comments on commit a521873

Please sign in to comment.