Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Channel exceptions and consumer cancellations cause the transport to stop processing messages #901

Merged
merged 1 commit into from
Nov 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,11 @@ jobs:
Write-Output "No response, retrying..."
Start-Sleep -m 5000
}
} until (($response.status) -or ($tries -ge 5))
} until (($response.status) -or ($tries -ge 10))

if($response.status -ne "ok")
{
Write-Output "Failed to connect after 5 attempts";
Write-Output "Failed to connect after 10 attempts";

Write-Output
exit 1
Expand Down
4 changes: 2 additions & 2 deletions src/NServiceBus.Transport.RabbitMQ.Tests/RabbitMqContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void SetUp()

var purger = new QueuePurger(connectionFactory);

messagePump = new MessagePump(connectionFactory, new MessageConverter(), "Unit test", channelProvider, purger, TimeSpan.FromMinutes(2), 3, 0);
messagePump = new MessagePump(connectionFactory, new MessageConverter(), "Unit test", channelProvider, purger, TimeSpan.FromMinutes(2), 3, 0, config.RetryDelay);

routingTopology.Reset(connectionFactory, new[] { ReceiverQueue }.Concat(AdditionalReceiverQueues), new[] { ErrorQueue });

Expand Down Expand Up @@ -90,6 +90,6 @@ bool TryReceiveMessage(out IncomingMessage message, TimeSpan timeout) =>
BlockingCollection<IncomingMessage> receivedMessages;
ConventionalRoutingTopology routingTopology;

static readonly TimeSpan incomingMessageTimeout = TimeSpan.FromSeconds(1);
static readonly TimeSpan incomingMessageTimeout = TimeSpan.FromSeconds(5);
}
}
12 changes: 7 additions & 5 deletions src/NServiceBus.Transport.RabbitMQ/Connection/ChannelProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,19 @@ void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e)
{
if (e.Initiator != ShutdownInitiator.Application)
{
_ = Task.Run(Reconnect);
var connection = (IConnection)sender;

_ = Task.Run(() => Reconnect(connection.ClientProvidedName));
}
}

async Task Reconnect()
async Task Reconnect(string connectionName)
{
var reconnected = false;

while (!reconnected)
{
Logger.InfoFormat("Attempting to reconnect in {0} seconds.", retryDelay.TotalSeconds);
Logger.InfoFormat("'{0}': Attempting to reconnect in {1} seconds.", connectionName, retryDelay.TotalSeconds);

await Task.Delay(retryDelay).ConfigureAwait(false);

Expand All @@ -47,11 +49,11 @@ async Task Reconnect()
CreateConnection();
reconnected = true;

Logger.Info("Connection to the broker reestablished successfully.");
Logger.InfoFormat("'{0}': Connection to the broker reestablished successfully.", connectionName);
}
catch (Exception e)
{
Logger.Info("Reconnecting to the broker failed.", e);
Logger.InfoFormat("'{0}': Reconnecting to the broker failed: {1}", connectionName, e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ sealed class RabbitMQTransportInfrastructure : TransportInfrastructure
const string coreHostInformationDisplayNameKey = "NServiceBus.HostInformation.DisplayName";

readonly SettingsHolder settings;
readonly TimeSpan networkRetryDelay;
readonly ConnectionFactory connectionFactory;
readonly IRoutingTopology routingTopology;
readonly ChannelProvider channelProvider;
IRoutingTopology routingTopology;

public RabbitMQTransportInfrastructure(SettingsHolder settings, string connectionString)
{
Expand All @@ -32,12 +33,13 @@ public RabbitMQTransportInfrastructure(SettingsHolder settings, string connectio
settings.TryGet(SettingsKeys.UseExternalAuthMechanism, out bool useExternalAuthMechanism);
settings.TryGet(SettingsKeys.HeartbeatInterval, out TimeSpan? heartbeatInterval);
settings.TryGet(SettingsKeys.NetworkRecoveryInterval, out TimeSpan? networkRecoveryInterval);
networkRetryDelay = networkRecoveryInterval ?? connectionConfiguration.RetryDelay;

connectionFactory = new ConnectionFactory(endpointName, connectionConfiguration, clientCertificateCollection, disableRemoteCertificateValidation, useExternalAuthMechanism, heartbeatInterval, networkRecoveryInterval);

routingTopology = CreateRoutingTopology();

channelProvider = new ChannelProvider(connectionFactory, connectionConfiguration.RetryDelay, routingTopology);
channelProvider = new ChannelProvider(connectionFactory, networkRetryDelay, routingTopology);
}

public override IEnumerable<Type> DeliveryConstraints => new List<Type> { typeof(DiscardIfNotReceivedBefore), typeof(NonDurableDelivery), typeof(DoNotDeliverBefore), typeof(DelayDeliveryWith) };
Expand Down Expand Up @@ -149,7 +151,7 @@ IPushMessages CreateMessagePump()
prefetchCount = 0;
}

return new MessagePump(connectionFactory, messageConverter, consumerTag, channelProvider, queuePurger, timeToWaitBeforeTriggeringCircuitBreaker, prefetchMultiplier, prefetchCount);
return new MessagePump(connectionFactory, messageConverter, consumerTag, channelProvider, queuePurger, timeToWaitBeforeTriggeringCircuitBreaker, prefetchMultiplier, prefetchCount, networkRetryDelay);
}
}
}
133 changes: 118 additions & 15 deletions src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ sealed class MessagePump : IPushMessages, IDisposable
readonly TimeSpan timeToWaitBeforeTriggeringCircuitBreaker;
readonly int prefetchMultiplier;
readonly ushort overriddenPrefetchCount;
readonly TimeSpan retryDelay;

// Init
Func<MessageContext, Task> onMessage;
Func<ErrorContext, Task<ErrorHandleResult>> onError;
PushSettings settings;
CriticalError criticalError;
string name;
MessagePumpConnectionFailedCircuitBreaker circuitBreaker;
TaskScheduler exclusiveScheduler;

Expand All @@ -37,12 +39,11 @@ sealed class MessagePump : IPushMessages, IDisposable
SemaphoreSlim semaphore;
CancellationTokenSource messageProcessing;
IConnection connection;
EventingBasicConsumer consumer;

// Stop
TaskCompletionSource<bool> connectionShutdownCompleted;

public MessagePump(ConnectionFactory connectionFactory, MessageConverter messageConverter, string consumerTag, ChannelProvider channelProvider, QueuePurger queuePurger, TimeSpan timeToWaitBeforeTriggeringCircuitBreaker, int prefetchMultiplier, ushort overriddenPrefetchCount)
public MessagePump(ConnectionFactory connectionFactory, MessageConverter messageConverter, string consumerTag, ChannelProvider channelProvider, QueuePurger queuePurger, TimeSpan timeToWaitBeforeTriggeringCircuitBreaker, int prefetchMultiplier, ushort overriddenPrefetchCount, TimeSpan retryDelay)
{
this.connectionFactory = connectionFactory;
this.messageConverter = messageConverter;
Expand All @@ -52,6 +53,7 @@ public MessagePump(ConnectionFactory connectionFactory, MessageConverter message
this.timeToWaitBeforeTriggeringCircuitBreaker = timeToWaitBeforeTriggeringCircuitBreaker;
this.prefetchMultiplier = prefetchMultiplier;
this.overriddenPrefetchCount = overriddenPrefetchCount;
this.retryDelay = retryDelay;
}

public Task Init(Func<MessageContext, Task> onMessage, Func<ErrorContext, Task<ErrorHandleResult>> onError, CriticalError criticalError, PushSettings settings)
Expand All @@ -61,7 +63,9 @@ public Task Init(Func<MessageContext, Task> onMessage, Func<ErrorContext, Task<E
this.settings = settings;
this.criticalError = criticalError;

circuitBreaker = new MessagePumpConnectionFailedCircuitBreaker($"'{settings.InputQueue} MessagePump'", timeToWaitBeforeTriggeringCircuitBreaker, criticalError);
name = $"{settings.InputQueue} MessagePump";

circuitBreaker = new MessagePumpConnectionFailedCircuitBreaker(name, timeToWaitBeforeTriggeringCircuitBreaker, criticalError);

exclusiveScheduler = new ConcurrentExclusiveSchedulerPair().ExclusiveScheduler;

Expand All @@ -79,9 +83,13 @@ public void Start(PushRuntimeSettings limitations)
semaphore = new SemaphoreSlim(limitations.MaxConcurrency, limitations.MaxConcurrency);
messageProcessing = new CancellationTokenSource();

connection = connectionFactory.CreateConnection($"{settings.InputQueue} MessagePump");
ConnectToBroker();
}

var channel = connection.CreateModel();
void ConnectToBroker()
{
connection = connectionFactory.CreateConnection($"{settings.InputQueue} MessagePump");
connection.ConnectionShutdown += Connection_ConnectionShutdown;

long prefetchCount;

Expand All @@ -100,21 +108,20 @@ public void Start(PushRuntimeSettings limitations)
prefetchCount = (long)maxConcurrency * prefetchMultiplier;
}

var channel = connection.CreateModel();
channel.ModelShutdown += Channel_ModelShutdown;
channel.BasicQos(0, (ushort)Math.Min(prefetchCount, ushort.MaxValue), false);

consumer = new EventingBasicConsumer(channel);

var consumer = new EventingBasicConsumer(channel);
consumer.ConsumerCancelled += Consumer_ConsumerCancelled;
consumer.Registered += Consumer_Registered;
connection.ConnectionShutdown += Connection_ConnectionShutdown;

consumer.Received += Consumer_Received;

channel.BasicConsume(settings.InputQueue, false, consumerTag, consumer);
}

public async Task Stop()
{
consumer.Received -= Consumer_Received;
messageProcessing.Cancel();

while (semaphore.CurrentCount != maxConcurrency)
Expand Down Expand Up @@ -147,14 +154,110 @@ void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e)
{
connectionShutdownCompleted?.TrySetResult(true);
}
else if (circuitBreaker.Disarmed)
{
//log entry handled by event handler registered in ConnectionFactory
circuitBreaker.Failure(new Exception(e.ToString()));
_ = Task.Run(() => Reconnect());
}
else
{
Logger.WarnFormat("'{0}' connection shutdown while reconnect already in progress: {1}", name, e);
}
}

void Channel_ModelShutdown(object sender, ShutdownEventArgs e)
{
if (e.Initiator == ShutdownInitiator.Application)
{
return;
}

if (e.Initiator == ShutdownInitiator.Peer && e.ReplyCode == 404)
{
return;
}

if (circuitBreaker.Disarmed)
{
Logger.WarnFormat("'{0}' channel shutdown: {1}", name, e);
circuitBreaker.Failure(new Exception(e.ToString()));
_ = Task.Run(() => Reconnect());
}
else
{
Logger.WarnFormat("'{0}' channel shutdown while reconnect already in progress: {1}", name, e);
}
}

void Consumer_ConsumerCancelled(object sender, ConsumerEventArgs e)
{
var consumer = (EventingBasicConsumer)sender;

if (consumer.Model.IsOpen && connection.IsOpen)
{
if (circuitBreaker.Disarmed)
{
Logger.WarnFormat("'{0}' consumer canceled by broker", name);
circuitBreaker.Failure(new Exception($"'{name}' consumer canceled by broker"));
_ = Task.Run(() => Reconnect());
}
else
{
Logger.WarnFormat("'{0}' consumer canceled by broker while reconnect already in progress", name);
}
}
}

async Task Reconnect()
{
try
{
var oldConnection = connection;

while (true)
{
Logger.InfoFormat("'{0}': Attempting to reconnect in {1} seconds.", name, retryDelay.TotalSeconds);

await Task.Delay(retryDelay, messageProcessing.Token).ConfigureAwait(false);

try
{
ConnectToBroker();
break;
}
catch (Exception ex)
{
Logger.InfoFormat("'{0}': Reconnecting to the broker failed: {1}", name, ex);
}
}
Logger.InfoFormat("'{0}': Connection to the broker reestablished successfully.", name);

if (oldConnection.IsOpen)
{
oldConnection.Close();
oldConnection.Dispose();
}
}
catch (OperationCanceledException ex) when (messageProcessing.Token.IsCancellationRequested)
{
Logger.DebugFormat("'{0}': Reconnection canceled since the transport is being stopped: {1}", name, ex);
}
catch (Exception ex)
{
Logger.WarnFormat("'{0}': Unexpected error while reconnecting: '{1}'", name, ex);
}
}

async void Consumer_Received(object sender, BasicDeliverEventArgs eventArgs)
{
if (messageProcessing.IsCancellationRequested)
{
return;
}

var consumer = (EventingBasicConsumer)sender;

var eventRaisingThreadId = Thread.CurrentThread.ManagedThreadId;

var messageBody = eventArgs.Body.ToArray();
Expand Down Expand Up @@ -204,7 +307,7 @@ async void Consumer_Received(object sender, BasicDeliverEventArgs eventArgs)
await Task.Yield();
}

await Process(eventArgsCopy, messageBody).ConfigureAwait(false);
await Process(consumer, eventArgsCopy, messageBody).ConfigureAwait(false);
}
catch (Exception ex)
{
Expand All @@ -217,7 +320,7 @@ async void Consumer_Received(object sender, BasicDeliverEventArgs eventArgs)
}
}

async Task Process(BasicDeliverEventArgs message, byte[] messageBody)
async Task Process(EventingBasicConsumer consumer, BasicDeliverEventArgs message, byte[] messageBody)
{
Dictionary<string, string> headers;

Expand All @@ -228,7 +331,7 @@ async Task Process(BasicDeliverEventArgs message, byte[] messageBody)
catch (Exception ex)
{
Logger.Error($"Failed to retrieve headers from poison message. Moving message to queue '{settings.ErrorQueue}'...", ex);
await MovePoisonMessage(message, settings.ErrorQueue).ConfigureAwait(false);
await MovePoisonMessage(consumer, message, settings.ErrorQueue).ConfigureAwait(false);

return;
}
Expand All @@ -242,7 +345,7 @@ async Task Process(BasicDeliverEventArgs message, byte[] messageBody)
catch (Exception ex)
{
Logger.Error($"Failed to retrieve ID from poison message. Moving message to queue '{settings.ErrorQueue}'...", ex);
await MovePoisonMessage(message, settings.ErrorQueue).ConfigureAwait(false);
await MovePoisonMessage(consumer, message, settings.ErrorQueue).ConfigureAwait(false);

return;
}
Expand Down Expand Up @@ -311,7 +414,7 @@ async Task Process(BasicDeliverEventArgs message, byte[] messageBody)
}
}

async Task MovePoisonMessage(BasicDeliverEventArgs message, string queue)
async Task MovePoisonMessage(EventingBasicConsumer consumer, BasicDeliverEventArgs message, string queue)
{
try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ public MessagePumpConnectionFailedCircuitBreaker(string name, TimeSpan timeToWai
timer = new Timer(CircuitBreakerTriggered);
}

public bool Disarmed => Interlocked.Read(ref failureCount) == 0;

public void Success()
{
var oldValue = Interlocked.Exchange(ref failureCount, 0);
Expand All @@ -25,7 +27,7 @@ public void Success()
}

timer.Change(Timeout.Infinite, Timeout.Infinite);
Logger.InfoFormat("The circuit breaker for {0} is now disarmed", name);
Logger.InfoFormat("The circuit breaker for '{0}' is now disarmed", name);
}

public void Failure(Exception exception)
Expand All @@ -36,7 +38,7 @@ public void Failure(Exception exception)
if (newValue == 1)
{
timer.Change(timeToWaitBeforeTriggering, NoPeriodicTriggering);
Logger.WarnFormat("The circuit breaker for {0} is now in the armed state", name);
Logger.WarnFormat("The circuit breaker for '{0}' is now in the armed state", name);
}
}

Expand All @@ -49,8 +51,8 @@ void CircuitBreakerTriggered(object state)
{
if (Interlocked.Read(ref failureCount) > 0)
{
Logger.WarnFormat("The circuit breaker for {0} will now be triggered", name);
criticalError.Raise($"{name} connection to the broker has failed.", lastException);
Logger.WarnFormat("The circuit breaker for '{0}' will now be triggered", name);
criticalError.Raise($"'{name}' connection to the broker has failed.", lastException);
}
}

Expand Down