Skip to content

Commit

Permalink
* Add ConfigureAwait where it was missing
Browse files Browse the repository at this point in the history
* Always create cancellation token source for recovery, and dispose it
* Modify WaitAsync extension to see if task has already completed
  • Loading branch information
lukebakken committed Jan 25, 2024
1 parent 9c9f5f6 commit 19cc98b
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 43 deletions.
50 changes: 35 additions & 15 deletions projects/RabbitMQ.Client/client/TaskExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,25 +54,33 @@ public static bool IsCompletedSuccessfully(this Task task)
private static void IgnoreTaskContinuation(Task t, object s) => t.Exception.Handle(e => true);

// https://devblogs.microsoft.com/pfxteam/how-do-i-cancel-non-cancelable-async-operations/
public static async Task WaitAsync(this Task task, TimeSpan timeout, CancellationToken cancellationToken)
public static Task WaitAsync(this Task task, TimeSpan timeout, CancellationToken cancellationToken)
{
if (task.IsCompletedSuccessfully())
{
return task;
}
else
{
return DoWaitWithTimeoutAsync(task, timeout, cancellationToken);
}
}

private static async Task DoWaitWithTimeoutAsync(this Task task, TimeSpan timeout, CancellationToken cancellationToken)
{
using var timeoutTokenCts = new CancellationTokenSource(timeout);
CancellationToken timeoutToken = timeoutTokenCts.Token;
var timeoutTokenTcs = new TaskCompletionSource<bool>();
using CancellationTokenRegistration timeoutTokenRegistration =
timeoutToken.Register(s => ((TaskCompletionSource<bool>)s).TrySetResult(true),
state: timeoutTokenTcs, useSynchronizationContext: false);

var cancellationTokenTcs = new TaskCompletionSource<bool>();
var linkedTokenTcs = new TaskCompletionSource<bool>();
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(timeoutToken, cancellationToken);
using CancellationTokenRegistration cancellationTokenRegistration =
cancellationToken.Register(s => ((TaskCompletionSource<bool>)s).TrySetResult(true),
state: cancellationTokenTcs, useSynchronizationContext: false);
linkedCts.Token.Register(s => ((TaskCompletionSource<bool>)s).TrySetResult(true),
state: linkedTokenTcs, useSynchronizationContext: false);

if (task != await Task.WhenAny(task, timeoutTokenTcs.Task, cancellationTokenTcs.Task)
.ConfigureAwait(false))
if (task != await Task.WhenAny(task, linkedTokenTcs.Task).ConfigureAwait(false))
{
task.Ignore();
if (task == timeoutTokenTcs.Task)
if (timeoutToken.IsCancellationRequested)
{
throw new OperationCanceledException($"Operation timed out after {timeout}");
}
Expand All @@ -86,14 +94,26 @@ public static async Task WaitAsync(this Task task, TimeSpan timeout, Cancellatio
}

// https://devblogs.microsoft.com/pfxteam/how-do-i-cancel-non-cancelable-async-operations/
public static async Task WaitAsync(this Task task, CancellationToken cancellationToken)
public static Task WaitAsync(this Task task, CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource<bool>();
if (task.IsCompletedSuccessfully())
{
return task;
}
else
{
return DoWaitAsync(task, cancellationToken);
}
}

private static async Task DoWaitAsync(this Task task, CancellationToken cancellationToken)
{
var cancellationTokenTcs = new TaskCompletionSource<bool>();

using (cancellationToken.Register(s => ((TaskCompletionSource<bool>)s).TrySetResult(true),
state: tcs, useSynchronizationContext: false))
state: cancellationTokenTcs, useSynchronizationContext: false))
{
if (task != await Task.WhenAny(task, tcs.Task))
if (task != await Task.WhenAny(task, cancellationTokenTcs.Task).ConfigureAwait(false))
{
task.Ignore();
throw new OperationCanceledException(cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ public override void HandleBasicConsumeOk(string consumerTag)
public override async Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey,
ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
{
await base.HandleBasicDeliverAsync(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
await base.HandleBasicDeliverAsync(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body)
.ConfigureAwait(false);
Received?.Invoke(
this,
new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,7 @@ namespace RabbitMQ.Client.Framing.Impl
internal sealed partial class AutorecoveringConnection
{
private Task? _recoveryTask;
private CancellationTokenSource? _recoveryCancellationTokenSource;

// TODO dispose the CTS
private CancellationTokenSource RecoveryCancellationTokenSource => _recoveryCancellationTokenSource ??= new CancellationTokenSource();
private readonly CancellationTokenSource _recoveryCancellationTokenSource = new CancellationTokenSource();

private void HandleConnectionShutdown(object _, ShutdownEventArgs args)
{
Expand All @@ -71,7 +68,7 @@ private async Task RecoverConnectionAsync()
{
try
{
CancellationToken token = RecoveryCancellationTokenSource.Token;
CancellationToken token = _recoveryCancellationTokenSource.Token;
bool success;
do
{
Expand All @@ -96,26 +93,6 @@ await Task.Delay(_config.NetworkRecoveryInterval, token)
}
}

/// <summary>
/// Cancels the main recovery loop and will block until the loop finishes, or the timeout
/// expires, to prevent Close operations overlapping with recovery operations.
/// </summary>
private void StopRecoveryLoop()
{
Task? task = _recoveryTask;
if (task is null)
{
return;
}
RecoveryCancellationTokenSource.Cancel();

Task timeout = Task.Delay(_config.RequestedConnectionTimeout);
if (Task.WhenAny(task, timeout).Result == timeout)
{
ESLog.Warn("Timeout while trying to stop background AutorecoveringConnection recovery loop.");
}
}

/// <summary>
/// Async cancels the main recovery loop and will block until the loop finishes, or the timeout
/// expires, to prevent Close operations overlapping with recovery operations.
Expand All @@ -125,7 +102,7 @@ private async ValueTask StopRecoveryLoopAsync(CancellationToken cancellationToke
Task? task = _recoveryTask;
if (task != null)
{
RecoveryCancellationTokenSource.Cancel();
_recoveryCancellationTokenSource.Cancel();
using var timeoutTokenSource = new CancellationTokenSource(_config.RequestedConnectionTimeout);
using var lts = CancellationTokenSource.CreateLinkedTokenSource(timeoutTokenSource.Token, cancellationToken);
try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,10 @@ public void Dispose()
{
_channels.Clear();
_innerConnection = null;
_disposed = true;
_recordedEntitiesSemaphore.Dispose();
_channelsSemaphore.Dispose();
_recoveryCancellationTokenSource.Dispose();
_disposed = true;
}
}

Expand Down

0 comments on commit 19cc98b

Please sign in to comment.