From 7701fbdff047f5dd083fd3e959210466549a1ae1 Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Thu, 19 Sep 2024 16:04:33 +0200 Subject: [PATCH 1/9] Make channel implement IAsyncDisposable --- projects/RabbitMQ.Client/IChannel.cs | 4 ++-- .../Impl/AutorecoveringChannel.cs | 17 ++++++++++++++ projects/RabbitMQ.Client/Impl/ChannelBase.cs | 22 ++++++++++++++++++- 3 files changed, 40 insertions(+), 3 deletions(-) diff --git a/projects/RabbitMQ.Client/IChannel.cs b/projects/RabbitMQ.Client/IChannel.cs index bfe4224f6..63fc72756 100644 --- a/projects/RabbitMQ.Client/IChannel.cs +++ b/projects/RabbitMQ.Client/IChannel.cs @@ -42,10 +42,10 @@ namespace RabbitMQ.Client /// functionality offered by versions 0-8, 0-8qpid, 0-9 and 0-9-1 of AMQP. /// /// - /// Extends the interface, so that the "using" + /// Extends the interface and the interface, so that the "using" /// statement can be used to scope the lifetime of a channel when appropriate. /// - public interface IChannel : IDisposable + public interface IChannel : IAsyncDisposable, IDisposable { /// /// Channel number, unique per connections. diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs index 9dabda5c1..26c6cd8bc 100644 --- a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs @@ -273,6 +273,23 @@ public void Dispose() _disposed = true; } + public async ValueTask DisposeAsync() + { + if (_disposed) + { + return; + } + + if (IsOpen) + { + await this.AbortAsync() + .ConfigureAwait(false); + } + + _recordedConsumerTags.Clear(); + _disposed = true; + } + public ValueTask GetNextPublishSequenceNumberAsync(CancellationToken cancellationToken = default) => InnerChannel.GetNextPublishSequenceNumberAsync(cancellationToken); public ValueTask BasicAckAsync(ulong deliveryTag, bool multiple, CancellationToken cancellationToken) diff --git a/projects/RabbitMQ.Client/Impl/ChannelBase.cs b/projects/RabbitMQ.Client/Impl/ChannelBase.cs index b3f38ef03..0a666875d 100644 --- a/projects/RabbitMQ.Client/Impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/Impl/ChannelBase.cs @@ -563,11 +563,31 @@ protected virtual void Dispose(bool disposing) } ConsumerDispatcher.Dispose(); - _rpcSemaphore.Dispose(); + _rpcSemaphore?.Dispose(); _confirmSemaphore?.Dispose(); } } + public async ValueTask DisposeAsync() + { + await DisposeAsyncCore() + .ConfigureAwait(false); + + Dispose(false); + } + + protected virtual async ValueTask DisposeAsyncCore() + { + if (IsOpen) + { + await this.AbortAsync().ConfigureAwait(false); + } + + ConsumerDispatcher.Dispose(); + _rpcSemaphore.Dispose(); + _confirmSemaphore?.Dispose(); + } + public Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heartbeat, CancellationToken cancellationToken) { var method = new ConnectionTuneOk(channelMax, frameMax, heartbeat); From 770c7ee180031ce4efe62648cadb1d2459d47f21 Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Thu, 19 Sep 2024 16:14:08 +0200 Subject: [PATCH 2/9] Make connection implement IAsyncDisposable --- projects/RabbitMQ.Client/IConnection.cs | 4 ++-- projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs | 7 +++++-- projects/RabbitMQ.Client/Impl/Connection.cs | 7 +++++-- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/projects/RabbitMQ.Client/IConnection.cs b/projects/RabbitMQ.Client/IConnection.cs index 0fc6a4c11..c562307a7 100644 --- a/projects/RabbitMQ.Client/IConnection.cs +++ b/projects/RabbitMQ.Client/IConnection.cs @@ -50,12 +50,12 @@ namespace RabbitMQ.Client /// Alternatively, an API tutorial can be found in the User Guide. /// /// - /// Extends the interface, so that the "using" + /// Extends the and the interface, so that the "using" /// statement can be used to scope the lifetime of a connection when /// appropriate. /// /// - public interface IConnection : INetworkConnection, IDisposable + public interface IConnection : INetworkConnection, IAsyncDisposable, IDisposable { /// /// The maximum channel number this connection supports (0 if unlimited). diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs index e766d10e7..3e05b650c 100644 --- a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs +++ b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs @@ -264,7 +264,9 @@ await RecordChannelAsync(channel, channelsSemaphoreHeld: false, cancellationToke return channel; } - public void Dispose() + public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult(); + + public async ValueTask DisposeAsync() { if (_disposed) { @@ -273,7 +275,8 @@ public void Dispose() try { - _innerConnection.Dispose(); + await _innerConnection.DisposeAsync() + .ConfigureAwait(false); } catch (OperationInterruptedException) { diff --git a/projects/RabbitMQ.Client/Impl/Connection.cs b/projects/RabbitMQ.Client/Impl/Connection.cs index 7bb14ab2c..1885e0bdf 100644 --- a/projects/RabbitMQ.Client/Impl/Connection.cs +++ b/projects/RabbitMQ.Client/Impl/Connection.cs @@ -485,7 +485,9 @@ internal ValueTask WriteAsync(RentedMemory frames, CancellationToken cancellatio return _frameHandler.WriteAsync(frames, cancellationToken); } - public void Dispose() + public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult(); + + public async ValueTask DisposeAsync() { if (_disposed) { @@ -496,7 +498,8 @@ public void Dispose() { if (IsOpen) { - this.AbortAsync().GetAwaiter().GetResult(); + await this.AbortAsync() + .ConfigureAwait(false); } _session0.Dispose(); From 8ef7b11ed5a929111878939ee2a6b7ad586c145e Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Thu, 19 Sep 2024 16:15:14 +0200 Subject: [PATCH 3/9] Switch AutorecoveringChannel to a simpler implementation --- .../Impl/AutorecoveringChannel.cs | 16 +--------------- 1 file changed, 1 insertion(+), 15 deletions(-) diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs index 26c6cd8bc..6e88ee907 100644 --- a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs @@ -257,21 +257,7 @@ await _connection.DeleteRecordedChannelAsync(this, public override string ToString() => InnerChannel.ToString(); - public void Dispose() - { - if (_disposed) - { - return; - } - - if (IsOpen) - { - this.AbortAsync().GetAwaiter().GetResult(); - } - - _recordedConsumerTags.Clear(); - _disposed = true; - } + public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult(); public async ValueTask DisposeAsync() { From 3063ec0fd2a1e665fda53d7d5d0b55e8eb7d8185 Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Thu, 19 Sep 2024 16:27:24 +0200 Subject: [PATCH 4/9] Use the async disposable channel in the auto recover connection --- .../Impl/AutorecoveringConnection.Recovery.cs | 33 +++++++++---------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.Recovery.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.Recovery.cs index 644f19e55..35ff0c350 100644 --- a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.Recovery.cs +++ b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.Recovery.cs @@ -299,13 +299,12 @@ private async ValueTask RecoverExchangesAsync(IConnection connection, { try { - using (IChannel ch = await connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false)) - { - await recordedExchange.RecoverAsync(ch, cancellationToken) - .ConfigureAwait(false); - await ch.CloseAsync(cancellationToken) - .ConfigureAwait(false); - } + var channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + await using var _ = channel.ConfigureAwait(false); + await recordedExchange.RecoverAsync(channel, cancellationToken) + .ConfigureAwait(false); + await channel.CloseAsync(cancellationToken) + .ConfigureAwait(false); } catch (Exception ex) { @@ -351,11 +350,12 @@ private async Task RecoverQueuesAsync(IConnection connection, try { string newName = string.Empty; - using (IChannel ch = await connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false)) + var channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + await using (channel.ConfigureAwait(false)) { - newName = await recordedQueue.RecoverAsync(ch, cancellationToken) + newName = await recordedQueue.RecoverAsync(channel, cancellationToken) .ConfigureAwait(false); - await ch.CloseAsync(cancellationToken) + await channel.CloseAsync(cancellationToken) .ConfigureAwait(false); } string oldName = recordedQueue.Name; @@ -463,13 +463,12 @@ private async ValueTask RecoverBindingsAsync(IConnection connection, { try { - using (IChannel ch = await connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false)) - { - await binding.RecoverAsync(ch, cancellationToken) - .ConfigureAwait(false); - await ch.CloseAsync(cancellationToken) - .ConfigureAwait(false); - } + var channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + await using var _ = channel.ConfigureAwait(false); + await binding.RecoverAsync(channel, cancellationToken) + .ConfigureAwait(false); + await channel.CloseAsync(cancellationToken) + .ConfigureAwait(false); } catch (Exception ex) { From eb9c8b622218d4a72ae2036c59f808ffe8b2cfa3 Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Thu, 19 Sep 2024 16:28:04 +0200 Subject: [PATCH 5/9] Formatting --- projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs | 2 +- projects/RabbitMQ.Client/Impl/Connection.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs index 3e05b650c..dc99e33fc 100644 --- a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs +++ b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs @@ -265,7 +265,7 @@ await RecordChannelAsync(channel, channelsSemaphoreHeld: false, cancellationToke } public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult(); - + public async ValueTask DisposeAsync() { if (_disposed) diff --git a/projects/RabbitMQ.Client/Impl/Connection.cs b/projects/RabbitMQ.Client/Impl/Connection.cs index 1885e0bdf..4a733ac8c 100644 --- a/projects/RabbitMQ.Client/Impl/Connection.cs +++ b/projects/RabbitMQ.Client/Impl/Connection.cs @@ -486,7 +486,7 @@ internal ValueTask WriteAsync(RentedMemory frames, CancellationToken cancellatio } public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult(); - + public async ValueTask DisposeAsync() { if (_disposed) From adff41e2121fb27aa794b2e28e4bf45e10a42fa7 Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Thu, 19 Sep 2024 16:28:24 +0200 Subject: [PATCH 6/9] Start using it in tests --- .../CreateChannel/CreateChannel.csproj | 1 + .../Applications/CreateChannel/Program.cs | 3 +- projects/Test/Applications/GH-1647/Program.cs | 4 +- .../Test/Applications/MassPublish/Program.cs | 4 +- .../PublisherConfirms/PublisherConfirms.cs | 12 +- .../TestBasicAckAndBasicNack.cs | 4 +- .../TestConnectionRecovery.cs | 4 +- .../Test/Integration/TestAsyncConsumer.cs | 200 ++++---- .../TestAsyncEventingBasicConsumer.cs | 2 +- projects/Test/Integration/TestBasicPublish.cs | 270 +++++----- ...estConcurrentAccessWithSharedConnection.cs | 66 ++- .../Test/Integration/TestConnectionFactory.cs | 194 +++---- ...estConnectionFactoryContinuationTimeout.cs | 28 +- .../TestConnectionRecoveryWithoutSetup.cs | 472 +++++++++--------- .../TestConnectionTopologyRecovery.cs | 38 +- .../Test/Integration/TestFloodPublishing.cs | 58 ++- .../Test/Integration/TestInitialConnection.cs | 16 +- .../TestPublishSharedChannelAsync.cs | 42 +- .../Test/Integration/TestPublisherConfirms.cs | 126 +++-- projects/Test/Integration/TestSsl.cs | 32 +- projects/Test/Integration/TestToxiproxy.cs | 312 ++++++------ .../TestActivitySource.cs | 452 ++++++++--------- .../TestConnectionBlockedChannelLeak.cs | 10 +- .../TestConnectionRecovery.cs | 4 +- .../SequentialIntegration/TestHeartbeats.cs | 38 +- .../TestOpenTelemetry.cs | 420 ++++++++-------- 26 files changed, 1325 insertions(+), 1487 deletions(-) diff --git a/projects/Test/Applications/CreateChannel/CreateChannel.csproj b/projects/Test/Applications/CreateChannel/CreateChannel.csproj index ddaac453b..2ff780b2a 100644 --- a/projects/Test/Applications/CreateChannel/CreateChannel.csproj +++ b/projects/Test/Applications/CreateChannel/CreateChannel.csproj @@ -14,6 +14,7 @@ Exe + 9.0 diff --git a/projects/Test/Applications/CreateChannel/Program.cs b/projects/Test/Applications/CreateChannel/Program.cs index f93ae61bf..378eb4f3c 100644 --- a/projects/Test/Applications/CreateChannel/Program.cs +++ b/projects/Test/Applications/CreateChannel/Program.cs @@ -20,7 +20,7 @@ public static async Task Main() doneEvent = new AutoResetEvent(false); var connectionFactory = new ConnectionFactory { }; - IConnection connection = await connectionFactory.CreateConnectionAsync(); + await using IConnection connection = await connectionFactory.CreateConnectionAsync(); var watch = Stopwatch.StartNew(); _ = Task.Run(async () => @@ -55,7 +55,6 @@ public static async Task Main() Console.WriteLine(); Console.WriteLine($"Took {watch.Elapsed.TotalMilliseconds} ms"); - connection.Dispose(); Console.ReadLine(); } } diff --git a/projects/Test/Applications/GH-1647/Program.cs b/projects/Test/Applications/GH-1647/Program.cs index 2ed2c9f64..c110641de 100644 --- a/projects/Test/Applications/GH-1647/Program.cs +++ b/projects/Test/Applications/GH-1647/Program.cs @@ -11,12 +11,12 @@ var props = new BasicProperties(); byte[] msg = Encoding.UTF8.GetBytes("test"); -using var connection = await connectionFactory.CreateConnectionAsync(); +await using var connection = await connectionFactory.CreateConnectionAsync(); for (int i = 0; i < 300; i++) { try { - using var channel = await connection.CreateChannelAsync(); // New channel for each message + await using var channel = await connection.CreateChannelAsync(); // New channel for each message await Task.Delay(1000); await channel.BasicPublishAsync(exchange: string.Empty, routingKey: string.Empty, mandatory: false, basicProperties: props, body: msg); diff --git a/projects/Test/Applications/MassPublish/Program.cs b/projects/Test/Applications/MassPublish/Program.cs index c01fa8c94..2d6ef5e63 100644 --- a/projects/Test/Applications/MassPublish/Program.cs +++ b/projects/Test/Applications/MassPublish/Program.cs @@ -98,10 +98,10 @@ static Program() static async Task Main() { - using IConnection consumeConnection = await s_consumeConnectionFactory.CreateConnectionAsync(); + await using IConnection consumeConnection = await s_consumeConnectionFactory.CreateConnectionAsync(); consumeConnection.ConnectionShutdownAsync += ConnectionShutdownAsync; - using IChannel consumeChannel = await consumeConnection.CreateChannelAsync(); + await using IChannel consumeChannel = await consumeConnection.CreateChannelAsync(); consumeChannel.ChannelShutdownAsync += Channel_ChannelShutdownAsync; await consumeChannel.BasicQosAsync(prefetchSize: 0, prefetchCount: 128, global: false); diff --git a/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs b/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs index 68d7549fd..16b597459 100644 --- a/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs +++ b/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs @@ -23,8 +23,8 @@ static async Task PublishMessagesIndividuallyAsync() { Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages individually and handling confirms all at once"); - using IConnection connection = await CreateConnectionAsync(); - using IChannel channel = await connection.CreateChannelAsync(); + await using IConnection connection = await CreateConnectionAsync(); + await using IChannel channel = await connection.CreateChannelAsync(); // declare a server-named queue QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync(); @@ -51,8 +51,8 @@ static async Task PublishMessagesInBatchAsync() { Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages and handling confirms in batches"); - using IConnection connection = await CreateConnectionAsync(); - using IChannel channel = await connection.CreateChannelAsync(); + await using IConnection connection = await CreateConnectionAsync(); + await using IChannel channel = await connection.CreateChannelAsync(); // declare a server-named queue QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync(); @@ -97,8 +97,8 @@ async Task HandlePublishConfirmsAsynchronously() { Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages and handling confirms asynchronously"); - using IConnection connection = await CreateConnectionAsync(); - using IChannel channel = await connection.CreateChannelAsync(); + await using IConnection connection = await CreateConnectionAsync(); + await using IChannel channel = await connection.CreateChannelAsync(); // declare a server-named queue QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync(); diff --git a/projects/Test/Integration/ConnectionRecovery/TestBasicAckAndBasicNack.cs b/projects/Test/Integration/ConnectionRecovery/TestBasicAckAndBasicNack.cs index 7fbb56f1d..c3da0f291 100644 --- a/projects/Test/Integration/ConnectionRecovery/TestBasicAckAndBasicNack.cs +++ b/projects/Test/Integration/ConnectionRecovery/TestBasicAckAndBasicNack.cs @@ -52,9 +52,9 @@ public override async Task DisposeAsync() { ConnectionFactory cf = CreateConnectionFactory(); cf.ClientProvidedName += "-TearDown"; - using (IConnection conn = await cf.CreateConnectionAsync()) + await using (IConnection conn = await cf.CreateConnectionAsync()) { - using (IChannel ch = await conn.CreateChannelAsync()) + await using (IChannel ch = await conn.CreateChannelAsync()) { await ch.QueueDeleteAsync(_queueName); await ch.CloseAsync(); diff --git a/projects/Test/Integration/ConnectionRecovery/TestConnectionRecovery.cs b/projects/Test/Integration/ConnectionRecovery/TestConnectionRecovery.cs index bb5bc3323..266144a04 100644 --- a/projects/Test/Integration/ConnectionRecovery/TestConnectionRecovery.cs +++ b/projects/Test/Integration/ConnectionRecovery/TestConnectionRecovery.cs @@ -96,7 +96,7 @@ await _channel.ExchangeDeclareAsync(exchange: exchangeName, consumer.ReceivedAsync += MessageReceived; await _channel.BasicConsumeAsync(queueName, true, consumer); - using (IChannel pubCh = await _conn.CreateChannelAsync()) + await using (IChannel pubCh = await _conn.CreateChannelAsync()) { await pubCh.BasicPublishAsync(exchange: exchangeName, routingKey: routingKey, body: body); await pubCh.CloseAsync(); @@ -106,7 +106,7 @@ await _channel.ExchangeDeclareAsync(exchange: exchangeName, await CloseAndWaitForRecoveryAsync(); - using (IChannel pubCh = await _conn.CreateChannelAsync()) + await using (IChannel pubCh = await _conn.CreateChannelAsync()) { await pubCh.BasicPublishAsync(exchange: exchangeName, routingKey: "unused", body: body); await pubCh.CloseAsync(); diff --git a/projects/Test/Integration/TestAsyncConsumer.cs b/projects/Test/Integration/TestAsyncConsumer.cs index ada76173e..f6b17f3ba 100644 --- a/projects/Test/Integration/TestAsyncConsumer.cs +++ b/projects/Test/Integration/TestAsyncConsumer.cs @@ -203,110 +203,106 @@ public async Task TestBasicRoundtripConcurrentManyMessages() Assert.Equal(queueName, q.QueueName); Task publishTask = Task.Run(async () => + { + await using IConnection publishConn = await _connFactory.CreateConnectionAsync(); + publishConn.ConnectionShutdownAsync += (o, ea) => + { + HandleConnectionShutdown(publishConn, ea, (args) => + { + MaybeSetException(args, publish1SyncSource, publish2SyncSource); + }); + return Task.CompletedTask; + }; + await using (IChannel publishChannel = await publishConn.CreateChannelAsync()) + { + AddCallbackExceptionHandlers(publishConn, publishChannel); + publishChannel.DefaultConsumer = new DefaultAsyncConsumer(publishChannel, + "publishChannel,", _output); + publishChannel.ChannelShutdownAsync += (o, ea) => { - using (IConnection publishConn = await _connFactory.CreateConnectionAsync()) + HandleChannelShutdown(publishChannel, ea, (args) => { - publishConn.ConnectionShutdownAsync += (o, ea) => - { - HandleConnectionShutdown(publishConn, ea, (args) => - { - MaybeSetException(args, publish1SyncSource, publish2SyncSource); - }); - return Task.CompletedTask; - }; - using (IChannel publishChannel = await publishConn.CreateChannelAsync()) - { - AddCallbackExceptionHandlers(publishConn, publishChannel); - publishChannel.DefaultConsumer = new DefaultAsyncConsumer(publishChannel, - "publishChannel,", _output); - publishChannel.ChannelShutdownAsync += (o, ea) => - { - HandleChannelShutdown(publishChannel, ea, (args) => - { - MaybeSetException(args, publish1SyncSource, publish2SyncSource); - }); - return Task.CompletedTask; - }; - await publishChannel.ConfirmSelectAsync(); - - for (int i = 0; i < publish_total; i++) - { - await publishChannel.BasicPublishAsync(string.Empty, queueName, body1); - await publishChannel.BasicPublishAsync(string.Empty, queueName, body2); - await publishChannel.WaitForConfirmsOrDieAsync(); - } - - await publishChannel.CloseAsync(); - } + MaybeSetException(args, publish1SyncSource, publish2SyncSource); + }); + return Task.CompletedTask; + }; + await publishChannel.ConfirmSelectAsync(); - await publishConn.CloseAsync(); - } - }); + for (int i = 0; i < publish_total; i++) + { + await publishChannel.BasicPublishAsync(string.Empty, queueName, body1); + await publishChannel.BasicPublishAsync(string.Empty, queueName, body2); + await publishChannel.WaitForConfirmsOrDieAsync(); + } + + await publishChannel.CloseAsync(); + } + + await publishConn.CloseAsync(); + }); int publish1_count = 0; int publish2_count = 0; Task consumeTask = Task.Run(async () => + { + await using IConnection consumeConn = await _connFactory.CreateConnectionAsync(); + consumeConn.ConnectionShutdownAsync += (o, ea) => + { + HandleConnectionShutdown(consumeConn, ea, (args) => + { + MaybeSetException(ea, publish1SyncSource, publish2SyncSource); + }); + return Task.CompletedTask; + }; + await using (IChannel consumeChannel = await consumeConn.CreateChannelAsync()) + { + AddCallbackExceptionHandlers(consumeConn, consumeChannel); + consumeChannel.DefaultConsumer = new DefaultAsyncConsumer(consumeChannel, + "consumeChannel,", _output); + consumeChannel.ChannelShutdownAsync += (o, ea) => { - using (IConnection consumeConn = await _connFactory.CreateConnectionAsync()) + HandleChannelShutdown(consumeChannel, ea, (args) => { - consumeConn.ConnectionShutdownAsync += (o, ea) => + MaybeSetException(ea, publish1SyncSource, publish2SyncSource); + }); + return Task.CompletedTask; + }; + + var consumer = new AsyncEventingBasicConsumer(consumeChannel); + consumer.ReceivedAsync += (o, a) => + { + if (ByteArraysEqual(a.Body.ToArray(), body1)) + { + if (Interlocked.Increment(ref publish1_count) >= publish_total) { - HandleConnectionShutdown(consumeConn, ea, (args) => - { - MaybeSetException(ea, publish1SyncSource, publish2SyncSource); - }); - return Task.CompletedTask; - }; - using (IChannel consumeChannel = await consumeConn.CreateChannelAsync()) + publish1SyncSource.TrySetResult(true); + } + } + else if (ByteArraysEqual(a.Body.ToArray(), body2)) + { + if (Interlocked.Increment(ref publish2_count) >= publish_total) { - AddCallbackExceptionHandlers(consumeConn, consumeChannel); - consumeChannel.DefaultConsumer = new DefaultAsyncConsumer(consumeChannel, - "consumeChannel,", _output); - consumeChannel.ChannelShutdownAsync += (o, ea) => - { - HandleChannelShutdown(consumeChannel, ea, (args) => - { - MaybeSetException(ea, publish1SyncSource, publish2SyncSource); - }); - return Task.CompletedTask; - }; - - var consumer = new AsyncEventingBasicConsumer(consumeChannel); - consumer.ReceivedAsync += (o, a) => - { - if (ByteArraysEqual(a.Body.ToArray(), body1)) - { - if (Interlocked.Increment(ref publish1_count) >= publish_total) - { - publish1SyncSource.TrySetResult(true); - } - } - else if (ByteArraysEqual(a.Body.ToArray(), body2)) - { - if (Interlocked.Increment(ref publish2_count) >= publish_total) - { - publish2SyncSource.TrySetResult(true); - } - } - else - { - var ex = new InvalidOperationException("incorrect message - should never happen!"); - SetException(ex, publish1SyncSource, publish2SyncSource); - } - return Task.CompletedTask; - }; - - await consumeChannel.BasicConsumeAsync(queueName, true, string.Empty, false, false, null, consumer); - await consumerSyncSource.Task; - - await consumeChannel.CloseAsync(); + publish2SyncSource.TrySetResult(true); } - - await consumeConn.CloseAsync(); } - }); + else + { + var ex = new InvalidOperationException("incorrect message - should never happen!"); + SetException(ex, publish1SyncSource, publish2SyncSource); + } + return Task.CompletedTask; + }; + + await consumeChannel.BasicConsumeAsync(queueName, true, string.Empty, false, false, null, consumer); + await consumerSyncSource.Task; + + await consumeChannel.CloseAsync(); + } + + await consumeConn.CloseAsync(); + }); try { @@ -653,15 +649,13 @@ public async Task TestCreateChannelWithinAsyncConsumerCallback_GH650() var consumer1 = new AsyncEventingBasicConsumer(_channel); consumer1.ReceivedAsync += async (sender, args) => { - using (IChannel innerChannel = await _conn.CreateChannelAsync()) - { - await innerChannel.ConfirmSelectAsync(); - await innerChannel.BasicPublishAsync(exchangeName, queue2Name, - mandatory: true, - body: Encoding.ASCII.GetBytes(nameof(TestCreateChannelWithinAsyncConsumerCallback_GH650))); - await innerChannel.WaitForConfirmsOrDieAsync(); - await innerChannel.CloseAsync(); - } + await using IChannel innerChannel = await _conn.CreateChannelAsync(); + await innerChannel.ConfirmSelectAsync(); + await innerChannel.BasicPublishAsync(exchangeName, queue2Name, + mandatory: true, + body: Encoding.ASCII.GetBytes(nameof(TestCreateChannelWithinAsyncConsumerCallback_GH650))); + await innerChannel.WaitForConfirmsOrDieAsync(); + await innerChannel.CloseAsync(); }; await _channel.BasicConsumeAsync(queue1Name, autoAck: true, consumer1); @@ -720,12 +714,10 @@ private async Task ValidateConsumerDispatchConcurrency() AutorecoveringChannel autorecoveringChannel = (AutorecoveringChannel)_channel; Assert.Equal(ConsumerDispatchConcurrency, autorecoveringChannel.ConsumerDispatcher.Concurrency); Assert.Equal(_consumerDispatchConcurrency, autorecoveringChannel.ConsumerDispatcher.Concurrency); - using (IChannel ch = await _conn.CreateChannelAsync( - consumerDispatchConcurrency: expectedConsumerDispatchConcurrency)) - { - AutorecoveringChannel ach = (AutorecoveringChannel)ch; - Assert.Equal(expectedConsumerDispatchConcurrency, ach.ConsumerDispatcher.Concurrency); - } + await using IChannel ch = await _conn.CreateChannelAsync( + consumerDispatchConcurrency: expectedConsumerDispatchConcurrency); + AutorecoveringChannel ach = (AutorecoveringChannel)ch; + Assert.Equal(expectedConsumerDispatchConcurrency, ach.ConsumerDispatcher.Concurrency); } private static void SetException(Exception ex, params TaskCompletionSource[] tcsAry) diff --git a/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs b/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs index 680936bda..5cc864888 100644 --- a/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs +++ b/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs @@ -105,7 +105,7 @@ public async Task TestAsyncEventingBasicConsumer_GH1038() await _channel.BasicConsumeAsync(queueName, false, consumer); //publisher - using IChannel publisherChannel = await _conn.CreateChannelAsync(); + await using IChannel publisherChannel = await _conn.CreateChannelAsync(); byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!"); var props = new BasicProperties(); await publisherChannel.BasicPublishAsync(exchange: exchangeName, routingKey: string.Empty, diff --git a/projects/Test/Integration/TestBasicPublish.cs b/projects/Test/Integration/TestBasicPublish.cs index 6f70661dc..3cbf98ca1 100644 --- a/projects/Test/Integration/TestBasicPublish.cs +++ b/projects/Test/Integration/TestBasicPublish.cs @@ -66,23 +66,21 @@ public async Task TestBasicRoundtripArray() byte[] sendBody = _encoding.GetBytes("hi"); byte[] consumeBody = null; var consumer = new AsyncEventingBasicConsumer(_channel); - using (var consumerReceivedSemaphore = new SemaphoreSlim(0, 1)) + using var consumerReceivedSemaphore = new SemaphoreSlim(0, 1); + consumer.ReceivedAsync += (o, a) => { - consumer.ReceivedAsync += (o, a) => - { - consumeBody = a.Body.ToArray(); - consumerReceivedSemaphore.Release(); - return Task.CompletedTask; - }; - string tag = await _channel.BasicConsumeAsync(q.QueueName, true, consumer); + consumeBody = a.Body.ToArray(); + consumerReceivedSemaphore.Release(); + return Task.CompletedTask; + }; + string tag = await _channel.BasicConsumeAsync(q.QueueName, true, consumer); - await _channel.BasicPublishAsync("", q.QueueName, true, bp, sendBody); - bool waitRes = await consumerReceivedSemaphore.WaitAsync(TimeSpan.FromSeconds(5)); - await _channel.BasicCancelAsync(tag); + await _channel.BasicPublishAsync("", q.QueueName, true, bp, sendBody); + bool waitRes = await consumerReceivedSemaphore.WaitAsync(TimeSpan.FromSeconds(5)); + await _channel.BasicCancelAsync(tag); - Assert.True(waitRes); - Assert.Equal(sendBody, consumeBody); - } + Assert.True(waitRes); + Assert.Equal(sendBody, consumeBody); } [Fact] @@ -96,23 +94,21 @@ public async Task TestBasicRoundtripCachedString() byte[] sendBody = _encoding.GetBytes("hi"); byte[] consumeBody = null; var consumer = new AsyncEventingBasicConsumer(_channel); - using (var consumerReceivedSemaphore = new SemaphoreSlim(0, 1)) + using var consumerReceivedSemaphore = new SemaphoreSlim(0, 1); + consumer.ReceivedAsync += (o, a) => { - consumer.ReceivedAsync += (o, a) => - { - consumeBody = a.Body.ToArray(); - consumerReceivedSemaphore.Release(); - return Task.CompletedTask; - }; - string tag = await _channel.BasicConsumeAsync(queueName.Value, true, consumer); + consumeBody = a.Body.ToArray(); + consumerReceivedSemaphore.Release(); + return Task.CompletedTask; + }; + string tag = await _channel.BasicConsumeAsync(queueName.Value, true, consumer); - await _channel.BasicPublishAsync(exchange: exchangeName, routingKey: queueName, body: sendBody); - bool waitResFalse = await consumerReceivedSemaphore.WaitAsync(TimeSpan.FromSeconds(2)); - await _channel.BasicCancelAsync(tag); + await _channel.BasicPublishAsync(exchange: exchangeName, routingKey: queueName, body: sendBody); + bool waitResFalse = await consumerReceivedSemaphore.WaitAsync(TimeSpan.FromSeconds(2)); + await _channel.BasicCancelAsync(tag); - Assert.True(waitResFalse); - Assert.Equal(sendBody, consumeBody); - } + Assert.True(waitResFalse); + Assert.Equal(sendBody, consumeBody); } [Fact] @@ -125,23 +121,21 @@ public async Task TestBasicRoundtripReadOnlyMemory() byte[] sendBody = _encoding.GetBytes("hi"); byte[] consumeBody = null; var consumer = new AsyncEventingBasicConsumer(_channel); - using (var consumerReceivedSemaphore = new SemaphoreSlim(0, 1)) + using var consumerReceivedSemaphore = new SemaphoreSlim(0, 1); + consumer.ReceivedAsync += (o, a) => { - consumer.ReceivedAsync += (o, a) => - { - consumeBody = a.Body.ToArray(); - consumerReceivedSemaphore.Release(); - return Task.CompletedTask; - }; - string tag = await _channel.BasicConsumeAsync(q.QueueName, true, consumer); + consumeBody = a.Body.ToArray(); + consumerReceivedSemaphore.Release(); + return Task.CompletedTask; + }; + string tag = await _channel.BasicConsumeAsync(q.QueueName, true, consumer); - await _channel.BasicPublishAsync("", q.QueueName, new ReadOnlyMemory(sendBody)); - bool waitRes = await consumerReceivedSemaphore.WaitAsync(TimeSpan.FromSeconds(2)); - await _channel.BasicCancelAsync(tag); + await _channel.BasicPublishAsync("", q.QueueName, new ReadOnlyMemory(sendBody)); + bool waitRes = await consumerReceivedSemaphore.WaitAsync(TimeSpan.FromSeconds(2)); + await _channel.BasicCancelAsync(tag); - Assert.True(waitRes); - Assert.Equal(sendBody, consumeBody); - } + Assert.True(waitRes); + Assert.Equal(sendBody, consumeBody); } [Fact] @@ -153,28 +147,26 @@ public async Task CanNotModifyPayloadAfterPublish() QueueDeclareOk q = await _channel.QueueDeclareAsync(); byte[] sendBody = new byte[1000]; var consumer = new AsyncEventingBasicConsumer(_channel); - using (var consumerReceivedSemaphore = new SemaphoreSlim(0, 1)) + using var consumerReceivedSemaphore = new SemaphoreSlim(0, 1); + bool modified = true; + consumer.ReceivedAsync += (o, a) => { - bool modified = true; - consumer.ReceivedAsync += (o, a) => + if (a.Body.Span.IndexOf((byte)1) < 0) { - if (a.Body.Span.IndexOf((byte)1) < 0) - { - modified = false; - } - consumerReceivedSemaphore.Release(); - return Task.CompletedTask; - }; - string tag = await _channel.BasicConsumeAsync(q.QueueName, true, consumer); + modified = false; + } + consumerReceivedSemaphore.Release(); + return Task.CompletedTask; + }; + string tag = await _channel.BasicConsumeAsync(q.QueueName, true, consumer); - await _channel.BasicPublishAsync("", q.QueueName, sendBody); - sendBody.AsSpan().Fill(1); + await _channel.BasicPublishAsync("", q.QueueName, sendBody); + sendBody.AsSpan().Fill(1); - Assert.True(await consumerReceivedSemaphore.WaitAsync(TimeSpan.FromSeconds(5))); - Assert.False(modified, "Payload was modified after the return of BasicPublish"); + Assert.True(await consumerReceivedSemaphore.WaitAsync(TimeSpan.FromSeconds(5))); + Assert.False(modified, "Payload was modified after the return of BasicPublish"); - await _channel.BasicCancelAsync(tag); - } + await _channel.BasicCancelAsync(tag); } [Fact] @@ -200,96 +192,94 @@ public async Task TestMaxInboundMessageBodySize() bool sawConsumerRegistered = false; bool sawConsumerUnregistered = false; - using (IConnection conn = await cf.CreateConnectionAsync()) + await using IConnection conn = await cf.CreateConnectionAsync(); + conn.ConnectionShutdownAsync += (o, a) => + { + sawConnectionShutdown = true; + return Task.CompletedTask; + }; + + Assert.Equal(maxMsgSize, cf.MaxInboundMessageBodySize); + Assert.Equal(maxMsgSize, cf.Endpoint.MaxInboundMessageBodySize); + Assert.Equal(maxMsgSize, conn.Endpoint.MaxInboundMessageBodySize); + + await using (IChannel channel = await conn.CreateChannelAsync()) { - conn.ConnectionShutdownAsync += (o, a) => + channel.ChannelShutdownAsync += (o, a) => { - sawConnectionShutdown = true; + sawChannelShutdown = true; return Task.CompletedTask; }; - Assert.Equal(maxMsgSize, cf.MaxInboundMessageBodySize); - Assert.Equal(maxMsgSize, cf.Endpoint.MaxInboundMessageBodySize); - Assert.Equal(maxMsgSize, conn.Endpoint.MaxInboundMessageBodySize); - - using (IChannel channel = await conn.CreateChannelAsync()) + channel.CallbackExceptionAsync += (o, a) => { - channel.ChannelShutdownAsync += (o, a) => - { - sawChannelShutdown = true; - return Task.CompletedTask; - }; - - channel.CallbackExceptionAsync += (o, a) => - { - throw new XunitException("Unexpected channel.CallbackException"); - }; + throw new XunitException("Unexpected channel.CallbackException"); + }; - QueueDeclareOk q = await channel.QueueDeclareAsync(); + QueueDeclareOk q = await channel.QueueDeclareAsync(); - var consumer = new AsyncEventingBasicConsumer(channel); + var consumer = new AsyncEventingBasicConsumer(channel); - consumer.ShutdownAsync += (o, a) => - { - tcs.SetResult(true); - return Task.CompletedTask; - }; + consumer.ShutdownAsync += (o, a) => + { + tcs.SetResult(true); + return Task.CompletedTask; + }; - consumer.RegisteredAsync += (o, a) => - { - sawConsumerRegistered = true; - return Task.CompletedTask; - }; + consumer.RegisteredAsync += (o, a) => + { + sawConsumerRegistered = true; + return Task.CompletedTask; + }; - consumer.UnregisteredAsync += (o, a) => - { - sawConsumerUnregistered = true; - return Task.CompletedTask; - }; + consumer.UnregisteredAsync += (o, a) => + { + sawConsumerUnregistered = true; + return Task.CompletedTask; + }; - consumer.ReceivedAsync += (o, a) => - { - Interlocked.Increment(ref count); - return Task.CompletedTask; - }; + consumer.ReceivedAsync += (o, a) => + { + Interlocked.Increment(ref count); + return Task.CompletedTask; + }; - string tag = await channel.BasicConsumeAsync(q.QueueName, true, consumer); + string tag = await channel.BasicConsumeAsync(q.QueueName, true, consumer); - await channel.BasicPublishAsync("", q.QueueName, msg0); - await channel.BasicPublishAsync("", q.QueueName, msg1); - Assert.True(await tcs.Task); + await channel.BasicPublishAsync("", q.QueueName, msg0); + await channel.BasicPublishAsync("", q.QueueName, msg1); + Assert.True(await tcs.Task); - Assert.Equal(1, count); - Assert.True(sawConnectionShutdown); - Assert.True(sawChannelShutdown); - Assert.True(sawConsumerRegistered); - Assert.True(sawConsumerUnregistered); - - try - { - await channel.CloseAsync(); - } - catch (Exception chex) - { - if (IsVerbose) - { - _output.WriteLine("[INFO] {0} channel exception: {1}", nameof(TestMaxInboundMessageBodySize), chex); - } - } - } + Assert.Equal(1, count); + Assert.True(sawConnectionShutdown); + Assert.True(sawChannelShutdown); + Assert.True(sawConsumerRegistered); + Assert.True(sawConsumerUnregistered); try { - await conn.CloseAsync(); + await channel.CloseAsync(); } - catch (Exception connex) + catch (Exception chex) { if (IsVerbose) { - _output.WriteLine("[INFO] {0} conn exception: {1}", nameof(TestMaxInboundMessageBodySize), connex); + _output.WriteLine("[INFO] {0} channel exception: {1}", nameof(TestMaxInboundMessageBodySize), chex); } } } + + try + { + await conn.CloseAsync(); + } + catch (Exception connex) + { + if (IsVerbose) + { + _output.WriteLine("[INFO] {0} conn exception: {1}", nameof(TestMaxInboundMessageBodySize), connex); + } + } } [Fact] @@ -309,25 +299,23 @@ public async Task TestPropertiesRoundtrip_Headers() byte[] sendBody = _encoding.GetBytes("hi"); byte[] consumeBody = null; var consumer = new AsyncEventingBasicConsumer(_channel); - using (var consumerReceivedSemaphore = new SemaphoreSlim(0, 1)) + using var consumerReceivedSemaphore = new SemaphoreSlim(0, 1); + string response = null; + consumer.ReceivedAsync += (o, a) => { - string response = null; - consumer.ReceivedAsync += (o, a) => - { - response = _encoding.GetString(a.BasicProperties.Headers["Hello"] as byte[]); - consumeBody = a.Body.ToArray(); - consumerReceivedSemaphore.Release(); - return Task.CompletedTask; - }; + response = _encoding.GetString(a.BasicProperties.Headers["Hello"] as byte[]); + consumeBody = a.Body.ToArray(); + consumerReceivedSemaphore.Release(); + return Task.CompletedTask; + }; - string tag = await _channel.BasicConsumeAsync(q.QueueName, true, consumer); - await _channel.BasicPublishAsync("", q.QueueName, false, bp, sendBody); - bool waitResFalse = await consumerReceivedSemaphore.WaitAsync(TimeSpan.FromSeconds(5)); - await _channel.BasicCancelAsync(tag); - Assert.True(waitResFalse); - Assert.Equal(sendBody, consumeBody); - Assert.Equal("World", response); - } + string tag = await _channel.BasicConsumeAsync(q.QueueName, true, consumer); + await _channel.BasicPublishAsync("", q.QueueName, false, bp, sendBody); + bool waitResFalse = await consumerReceivedSemaphore.WaitAsync(TimeSpan.FromSeconds(5)); + await _channel.BasicCancelAsync(tag); + Assert.True(waitResFalse); + Assert.Equal(sendBody, consumeBody); + Assert.Equal("World", response); } } } diff --git a/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs b/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs index daa95cdf8..bea692094 100644 --- a/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs +++ b/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs @@ -60,10 +60,8 @@ public async Task TestConcurrentChannelOpenCloseLoop() { await TestConcurrentOperationsAsync(async () => { - using (IChannel ch = await _conn.CreateChannelAsync()) - { - await ch.CloseAsync(); - } + await using IChannel ch = await _conn.CreateChannelAsync(); + await ch.CloseAsync(); }, 50); } @@ -110,47 +108,45 @@ private Task TestConcurrentChannelOpenAndPublishingWithBodyAsync(byte[] body, in try { - using (IChannel ch = await _conn.CreateChannelAsync()) + await using IChannel ch = await _conn.CreateChannelAsync(); + ch.ChannelShutdownAsync += (o, ea) => { - ch.ChannelShutdownAsync += (o, ea) => - { - HandleChannelShutdown(ch, ea, (args) => - { - if (args.Initiator != ShutdownInitiator.Application) - { - tcs.TrySetException(args.Exception); - } - }); - return Task.CompletedTask; - }; - - await ch.ConfirmSelectAsync(trackConfirmations: false); - - ch.BasicAcksAsync += (object sender, BasicAckEventArgs e) => + HandleChannelShutdown(ch, ea, (args) => { - if (e.DeliveryTag >= _messageCount) + if (args.Initiator != ShutdownInitiator.Application) { - tcs.SetResult(true); + tcs.TrySetException(args.Exception); } - return Task.CompletedTask; - }; + }); + return Task.CompletedTask; + }; - ch.BasicNacksAsync += (object sender, BasicNackEventArgs e) => - { - tcs.SetResult(false); - _output.WriteLine($"channel #{ch.ChannelNumber} saw a nack, deliveryTag: {e.DeliveryTag}, multiple: {e.Multiple}"); - return Task.CompletedTask; - }; + await ch.ConfirmSelectAsync(trackConfirmations: false); - QueueDeclareOk q = await ch.QueueDeclareAsync(queue: string.Empty, passive: false, durable: false, exclusive: true, autoDelete: true, arguments: null); - for (ushort j = 0; j < _messageCount; j++) + ch.BasicAcksAsync += (object sender, BasicAckEventArgs e) => + { + if (e.DeliveryTag >= _messageCount) { - await ch.BasicPublishAsync("", q.QueueName, mandatory: true, body: body); + tcs.SetResult(true); } + return Task.CompletedTask; + }; + + ch.BasicNacksAsync += (object sender, BasicNackEventArgs e) => + { + tcs.SetResult(false); + _output.WriteLine($"channel #{ch.ChannelNumber} saw a nack, deliveryTag: {e.DeliveryTag}, multiple: {e.Multiple}"); + return Task.CompletedTask; + }; - Assert.True(await tcs.Task); - await ch.CloseAsync(); + QueueDeclareOk q = await ch.QueueDeclareAsync(queue: string.Empty, passive: false, durable: false, exclusive: true, autoDelete: true, arguments: null); + for (ushort j = 0; j < _messageCount; j++) + { + await ch.BasicPublishAsync("", q.QueueName, mandatory: true, body: body); } + + Assert.True(await tcs.Task); + await ch.CloseAsync(); } finally { diff --git a/projects/Test/Integration/TestConnectionFactory.cs b/projects/Test/Integration/TestConnectionFactory.cs index dd21fd85f..46deccdbe 100644 --- a/projects/Test/Integration/TestConnectionFactory.cs +++ b/projects/Test/Integration/TestConnectionFactory.cs @@ -159,12 +159,10 @@ public async Task TestCreateConnectionWithClientProvidedNameUsesDefaultName() cf.AutomaticRecoveryEnabled = false; string expectedName = cf.ClientProvidedName; - using (IConnection conn = await cf.CreateConnectionAsync()) - { - Assert.Equal(expectedName, conn.ClientProvidedName); - Assert.Equal(expectedName, conn.ClientProperties["connection_name"]); - await conn.CloseAsync(); - } + await using IConnection conn = await cf.CreateConnectionAsync(); + Assert.Equal(expectedName, conn.ClientProvidedName); + Assert.Equal(expectedName, conn.ClientProperties["connection_name"]); + await conn.CloseAsync(); } [Fact] @@ -174,12 +172,10 @@ public async Task TestCreateConnectionWithClientProvidedNameUsesNameArgumentValu cf.AutomaticRecoveryEnabled = false; string expectedName = cf.ClientProvidedName; - using (IConnection conn = await cf.CreateConnectionAsync(expectedName)) - { - Assert.Equal(expectedName, conn.ClientProvidedName); - Assert.Equal(expectedName, conn.ClientProperties["connection_name"]); - await conn.CloseAsync(); - } + await using IConnection conn = await cf.CreateConnectionAsync(expectedName); + Assert.Equal(expectedName, conn.ClientProvidedName); + Assert.Equal(expectedName, conn.ClientProperties["connection_name"]); + await conn.CloseAsync(); } [Fact] @@ -189,12 +185,10 @@ public async Task TestCreateConnectionWithClientProvidedNameAndAutorecoveryUsesN cf.AutomaticRecoveryEnabled = true; string expectedName = cf.ClientProvidedName; - using (IConnection conn = await cf.CreateConnectionAsync(expectedName)) - { - Assert.Equal(expectedName, conn.ClientProvidedName); - Assert.Equal(expectedName, conn.ClientProperties["connection_name"]); - await conn.CloseAsync(); - } + await using IConnection conn = await cf.CreateConnectionAsync(expectedName); + Assert.Equal(expectedName, conn.ClientProvidedName); + Assert.Equal(expectedName, conn.ClientProperties["connection_name"]); + await conn.CloseAsync(); } [Fact] @@ -205,12 +199,10 @@ public async Task TestCreateConnectionAmqpTcpEndpointListAndClientProvidedName() string expectedName = cf.ClientProvidedName; var xs = new List { new AmqpTcpEndpoint("localhost") }; - using (IConnection conn = await cf.CreateConnectionAsync(xs, expectedName)) - { - Assert.Equal(expectedName, conn.ClientProvidedName); - Assert.Equal(expectedName, conn.ClientProperties["connection_name"]); - await conn.CloseAsync(); - } + await using IConnection conn = await cf.CreateConnectionAsync(xs, expectedName); + Assert.Equal(expectedName, conn.ClientProvidedName); + Assert.Equal(expectedName, conn.ClientProperties["connection_name"]); + await conn.CloseAsync(); } [Fact] @@ -220,11 +212,9 @@ public async Task TestCreateConnectionUsesDefaultPort() cf.AutomaticRecoveryEnabled = true; cf.HostName = "localhost"; - using (IConnection conn = await cf.CreateConnectionAsync()) - { - Assert.Equal(5672, conn.Endpoint.Port); - await conn.CloseAsync(); - } + await using IConnection conn = await cf.CreateConnectionAsync(); + Assert.Equal(5672, conn.Endpoint.Port); + await conn.CloseAsync(); } [Fact] @@ -237,11 +227,9 @@ public async Task TestCreateConnectionUsesDefaultMaxMessageSize() Assert.Equal(ConnectionFactory.DefaultMaxInboundMessageBodySize, cf.MaxInboundMessageBodySize); Assert.Equal(ConnectionFactory.DefaultMaxInboundMessageBodySize, cf.Endpoint.MaxInboundMessageBodySize); - using (IConnection conn = await cf.CreateConnectionAsync()) - { - Assert.Equal(ConnectionFactory.DefaultMaxInboundMessageBodySize, conn.Endpoint.MaxInboundMessageBodySize); - await conn.CloseAsync(); - } + await using IConnection conn = await cf.CreateConnectionAsync(); + Assert.Equal(ConnectionFactory.DefaultMaxInboundMessageBodySize, conn.Endpoint.MaxInboundMessageBodySize); + await conn.CloseAsync(); } [Fact] @@ -266,10 +254,8 @@ public async Task TestCreateConnectionWithAutoRecoveryUsesAmqpTcpEndpoint() cf.HostName = "not_localhost"; cf.Port = 1234; var ep = new AmqpTcpEndpoint("localhost"); - using (IConnection conn = await cf.CreateConnectionAsync(new List { ep })) - { - await conn.CloseAsync(); - } + await using IConnection conn = await cf.CreateConnectionAsync(new List { ep }); + await conn.CloseAsync(); } [Fact] @@ -291,10 +277,8 @@ public async Task TestCreateConnectionUsesAmqpTcpEndpoint() cf.HostName = "not_localhost"; cf.Port = 1234; var ep = new AmqpTcpEndpoint("localhost"); - using (IConnection conn = await cf.CreateConnectionAsync(new List { ep })) - { - await conn.CloseAsync(); - } + await using IConnection conn = await cf.CreateConnectionAsync(new List { ep }); + await conn.CloseAsync(); } [Fact] @@ -307,10 +291,8 @@ public async Task TestCreateConnectionWithForcedAddressFamily() AddressFamily = System.Net.Sockets.AddressFamily.InterNetwork }; cf.Endpoint = ep; - using (IConnection conn = await cf.CreateConnectionAsync()) - { - await conn.CloseAsync(); - } + await using IConnection conn = await cf.CreateConnectionAsync(); + await conn.CloseAsync(); } [Fact] @@ -330,10 +312,8 @@ public async Task TestCreateConnectionUsesValidEndpointWhenMultipleSupplied() ConnectionFactory cf = CreateConnectionFactory(); var invalidEp = new AmqpTcpEndpoint("not_localhost"); var ep = new AmqpTcpEndpoint("localhost"); - using (IConnection conn = await cf.CreateConnectionAsync(new List { invalidEp, ep })) - { - await conn.CloseAsync(); - } + await using IConnection conn = await cf.CreateConnectionAsync(new List { invalidEp, ep }); + await conn.CloseAsync(); } [Fact] @@ -349,11 +329,9 @@ public async Task TestCreateConnectionUsesConfiguredMaxMessageSize() { ConnectionFactory cf = CreateConnectionFactory(); cf.MaxInboundMessageBodySize = 1500; - using (IConnection conn = await cf.CreateConnectionAsync()) - { - Assert.Equal(cf.MaxInboundMessageBodySize, conn.Endpoint.MaxInboundMessageBodySize); - await conn.CloseAsync(); - } + await using IConnection conn = await cf.CreateConnectionAsync(); + Assert.Equal(cf.MaxInboundMessageBodySize, conn.Endpoint.MaxInboundMessageBodySize); + await conn.CloseAsync(); } [Fact] public async Task TestCreateConnectionWithAmqpEndpointListUsesAmqpTcpEndpointMaxMessageSize() @@ -362,11 +340,9 @@ public async Task TestCreateConnectionWithAmqpEndpointListUsesAmqpTcpEndpointMax cf.MaxInboundMessageBodySize = 1500; var ep = new AmqpTcpEndpoint("localhost"); Assert.Equal(ConnectionFactory.DefaultMaxInboundMessageBodySize, ep.MaxInboundMessageBodySize); - using (IConnection conn = await cf.CreateConnectionAsync(new List { ep })) - { - Assert.Equal(ConnectionFactory.DefaultMaxInboundMessageBodySize, conn.Endpoint.MaxInboundMessageBodySize); - await conn.CloseAsync(); - } + await using IConnection conn = await cf.CreateConnectionAsync(new List { ep }); + Assert.Equal(ConnectionFactory.DefaultMaxInboundMessageBodySize, conn.Endpoint.MaxInboundMessageBodySize); + await conn.CloseAsync(); } [Fact] @@ -375,11 +351,9 @@ public async Task TestCreateConnectionWithAmqpEndpointResolverUsesAmqpTcpEndpoin ConnectionFactory cf = CreateConnectionFactory(); cf.MaxInboundMessageBodySize = 1500; var ep = new AmqpTcpEndpoint("localhost", -1, new SslOption(), 1200); - using (IConnection conn = await cf.CreateConnectionAsync(new List { ep })) - { - Assert.Equal(ep.MaxInboundMessageBodySize, conn.Endpoint.MaxInboundMessageBodySize); - await conn.CloseAsync(); - } + await using IConnection conn = await cf.CreateConnectionAsync(new List { ep }); + Assert.Equal(ep.MaxInboundMessageBodySize, conn.Endpoint.MaxInboundMessageBodySize); + await conn.CloseAsync(); } [Fact] @@ -387,58 +361,50 @@ public async Task TestCreateConnectionWithHostnameListUsesConnectionFactoryMaxMe { ConnectionFactory cf = CreateConnectionFactory(); cf.MaxInboundMessageBodySize = 1500; - using (IConnection conn = await cf.CreateConnectionAsync(new List { "localhost" })) - { - Assert.Equal(cf.MaxInboundMessageBodySize, conn.Endpoint.MaxInboundMessageBodySize); - await conn.CloseAsync(); - } + await using IConnection conn = await cf.CreateConnectionAsync(new List { "localhost" }); + Assert.Equal(cf.MaxInboundMessageBodySize, conn.Endpoint.MaxInboundMessageBodySize); + await conn.CloseAsync(); } [Fact] public async Task TestCreateConnectionAsync_WithAlreadyCanceledToken() { - using (var cts = new CancellationTokenSource()) - { - cts.Cancel(); + using var cts = new CancellationTokenSource(); + cts.Cancel(); - ConnectionFactory cf = CreateConnectionFactory(); + ConnectionFactory cf = CreateConnectionFactory(); - bool passed = false; - /* + bool passed = false; + /* * If anyone wonders why TaskCanceledException is explicitly checked, * even though it's a subclass of OperationCanceledException: * https://github.com/rabbitmq/rabbitmq-dotnet-client/commit/383ca5c5f161edb717cf8fae7bf143c13143f634#r135400615 */ - try - { - await cf.CreateConnectionAsync(cts.Token); - } - catch (TaskCanceledException) - { - passed = true; - } - catch (OperationCanceledException) - { - passed = true; - } - - Assert.True(passed, "FAIL did not see TaskCanceledException nor OperationCanceledException"); + try + { + await cf.CreateConnectionAsync(cts.Token); } + catch (TaskCanceledException) + { + passed = true; + } + catch (OperationCanceledException) + { + passed = true; + } + + Assert.True(passed, "FAIL did not see TaskCanceledException nor OperationCanceledException"); } [Fact] public async Task TestCreateConnectionAsync_UsesValidEndpointWhenMultipleSupplied() { - using (var cts = new CancellationTokenSource(WaitSpan)) - { - ConnectionFactory cf = CreateConnectionFactory(); - var invalidEp = new AmqpTcpEndpoint("not_localhost"); - var ep = new AmqpTcpEndpoint("localhost"); - using (IConnection conn = await cf.CreateConnectionAsync(new List { invalidEp, ep }, cts.Token)) - { - await conn.CloseAsync(cts.Token); - } - } + using var cts = new CancellationTokenSource(WaitSpan); + ConnectionFactory cf = CreateConnectionFactory(); + var invalidEp = new AmqpTcpEndpoint("not_localhost"); + var ep = new AmqpTcpEndpoint("localhost"); + await using IConnection conn = await cf.CreateConnectionAsync(new List { invalidEp, ep }, cts.Token); + await conn.CloseAsync(cts.Token); } [Theory] @@ -451,23 +417,21 @@ public async Task TestCreateConnectionAsync_UsesValidEndpointWhenMultipleSupplie public async Task TestCreateConnectionAsync_TruncatesWhenClientNameIsLong_GH980(ushort count) { string cpn = GetUniqueString(count); - using (var cts = new CancellationTokenSource(WaitSpan)) + using var cts = new CancellationTokenSource(WaitSpan); + ConnectionFactory cf0 = new ConnectionFactory { ClientProvidedName = cpn }; + await using (IConnection conn = await cf0.CreateConnectionAsync(cts.Token)) { - ConnectionFactory cf0 = new ConnectionFactory { ClientProvidedName = cpn }; - using (IConnection conn = await cf0.CreateConnectionAsync(cts.Token)) - { - await conn.CloseAsync(cts.Token); - Assert.True(cf0.ClientProvidedName.Length <= InternalConstants.DefaultRabbitMqMaxClientProvideNameLength); - Assert.Contains(cf0.ClientProvidedName, cpn); - } + await conn.CloseAsync(cts.Token); + Assert.True(cf0.ClientProvidedName.Length <= InternalConstants.DefaultRabbitMqMaxClientProvideNameLength); + Assert.Contains(cf0.ClientProvidedName, cpn); + } - ConnectionFactory cf1 = new ConnectionFactory(); - using (IConnection conn = await cf1.CreateConnectionAsync(cpn, cts.Token)) - { - await conn.CloseAsync(cts.Token); - Assert.True(conn.ClientProvidedName.Length <= InternalConstants.DefaultRabbitMqMaxClientProvideNameLength); - Assert.Contains(conn.ClientProvidedName, cpn); - } + ConnectionFactory cf1 = new ConnectionFactory(); + await using (IConnection conn = await cf1.CreateConnectionAsync(cpn, cts.Token)) + { + await conn.CloseAsync(cts.Token); + Assert.True(conn.ClientProvidedName.Length <= InternalConstants.DefaultRabbitMqMaxClientProvideNameLength); + Assert.Contains(conn.ClientProvidedName, cpn); } } } diff --git a/projects/Test/Integration/TestConnectionFactoryContinuationTimeout.cs b/projects/Test/Integration/TestConnectionFactoryContinuationTimeout.cs index dc6892ab2..7930cb996 100644 --- a/projects/Test/Integration/TestConnectionFactoryContinuationTimeout.cs +++ b/projects/Test/Integration/TestConnectionFactoryContinuationTimeout.cs @@ -57,32 +57,28 @@ public override Task InitializeAsync() public async Task TestConnectionFactoryContinuationTimeoutOnRecoveringConnection() { var continuationTimeout = TimeSpan.FromSeconds(777); - using (IConnection c = await CreateConnectionWithContinuationTimeoutAsync(true, continuationTimeout)) + await using IConnection c = await CreateConnectionWithContinuationTimeoutAsync(true, continuationTimeout); + await using (IChannel ch = await c.CreateChannelAsync()) { - using (IChannel ch = await c.CreateChannelAsync()) - { - Assert.Equal(continuationTimeout, ch.ContinuationTimeout); - await ch.CloseAsync(); - } - - await c.CloseAsync(); + Assert.Equal(continuationTimeout, ch.ContinuationTimeout); + await ch.CloseAsync(); } + + await c.CloseAsync(); } [Fact] public async Task TestConnectionFactoryContinuationTimeoutOnNonRecoveringConnection() { var continuationTimeout = TimeSpan.FromSeconds(777); - using (IConnection c = await CreateConnectionWithContinuationTimeoutAsync(false, continuationTimeout)) + await using IConnection c = await CreateConnectionWithContinuationTimeoutAsync(false, continuationTimeout); + await using (IChannel ch = await c.CreateChannelAsync()) { - using (IChannel ch = await c.CreateChannelAsync()) - { - Assert.Equal(continuationTimeout, ch.ContinuationTimeout); - await ch.CloseAsync(); - } - - await c.CloseAsync(); + Assert.Equal(continuationTimeout, ch.ContinuationTimeout); + await ch.CloseAsync(); } + + await c.CloseAsync(); } private Task CreateConnectionWithContinuationTimeoutAsync(bool automaticRecoveryEnabled, TimeSpan continuationTimeout) diff --git a/projects/Test/Integration/TestConnectionRecoveryWithoutSetup.cs b/projects/Test/Integration/TestConnectionRecoveryWithoutSetup.cs index d13be6e56..8addefa19 100644 --- a/projects/Test/Integration/TestConnectionRecoveryWithoutSetup.cs +++ b/projects/Test/Integration/TestConnectionRecoveryWithoutSetup.cs @@ -60,94 +60,84 @@ public override Task InitializeAsync() [Fact] public async Task TestBasicConnectionRecoveryWithHostnameList() { - using (AutorecoveringConnection c = await CreateAutorecoveringConnectionAsync(new List { "127.0.0.1", "localhost" })) - { - Assert.True(c.IsOpen); - await CloseAndWaitForRecoveryAsync(c); - Assert.True(c.IsOpen); - await c.CloseAsync(); - } + await using AutorecoveringConnection c = await CreateAutorecoveringConnectionAsync(new List { "127.0.0.1", "localhost" }); + Assert.True(c.IsOpen); + await CloseAndWaitForRecoveryAsync(c); + Assert.True(c.IsOpen); + await c.CloseAsync(); } [Fact] public async Task TestBasicConnectionRecoveryWithHostnameListAndUnreachableHosts() { - using (AutorecoveringConnection c = await CreateAutorecoveringConnectionAsync(new List { "191.72.44.22", "127.0.0.1", "localhost" })) - { - Assert.True(c.IsOpen); - await CloseAndWaitForRecoveryAsync(c); - Assert.True(c.IsOpen); - await c.CloseAsync(); - } + await using AutorecoveringConnection c = await CreateAutorecoveringConnectionAsync(new List { "191.72.44.22", "127.0.0.1", "localhost" }); + Assert.True(c.IsOpen); + await CloseAndWaitForRecoveryAsync(c); + Assert.True(c.IsOpen); + await c.CloseAsync(); } [Fact] public async Task TestBasicConnectionRecoveryWithEndpointList() { - using (AutorecoveringConnection c = await CreateAutorecoveringConnectionAsync( - new List - { - new AmqpTcpEndpoint("127.0.0.1"), - new AmqpTcpEndpoint("localhost") - })) - { - Assert.True(c.IsOpen); - await CloseAndWaitForRecoveryAsync(c); - Assert.True(c.IsOpen); - await c.CloseAsync(); - } + await using AutorecoveringConnection c = await CreateAutorecoveringConnectionAsync( + new List + { + new AmqpTcpEndpoint("127.0.0.1"), + new AmqpTcpEndpoint("localhost") + }); + Assert.True(c.IsOpen); + await CloseAndWaitForRecoveryAsync(c); + Assert.True(c.IsOpen); + await c.CloseAsync(); } [Fact] public async Task TestBasicConnectionRecoveryWithEndpointListAndUnreachableHosts() { - using (AutorecoveringConnection c = await CreateAutorecoveringConnectionAsync( - new List - { - new AmqpTcpEndpoint("191.72.44.22"), - new AmqpTcpEndpoint("127.0.0.1"), - new AmqpTcpEndpoint("localhost") - })) - { - Assert.True(c.IsOpen); - await CloseAndWaitForRecoveryAsync(c); - Assert.True(c.IsOpen); - await c.CloseAsync(); - } + await using AutorecoveringConnection c = await CreateAutorecoveringConnectionAsync( + new List + { + new AmqpTcpEndpoint("191.72.44.22"), + new AmqpTcpEndpoint("127.0.0.1"), + new AmqpTcpEndpoint("localhost") + }); + Assert.True(c.IsOpen); + await CloseAndWaitForRecoveryAsync(c); + Assert.True(c.IsOpen); + await c.CloseAsync(); } [Fact] public async Task TestConsumerWorkServiceRecovery() { - using (AutorecoveringConnection c = await CreateAutorecoveringConnectionAsync()) + await using AutorecoveringConnection c = await CreateAutorecoveringConnectionAsync(); + await using (IChannel ch = await c.CreateChannelAsync()) { - using (IChannel ch = await c.CreateChannelAsync()) - { - string q = (await ch.QueueDeclareAsync("dotnet-client.recovery.consumer_work_pool1", - false, false, false)).QueueName; - var cons = new AsyncEventingBasicConsumer(ch); - await ch.BasicConsumeAsync(q, true, cons); - await AssertConsumerCountAsync(ch, q, 1); + string q = (await ch.QueueDeclareAsync("dotnet-client.recovery.consumer_work_pool1", + false, false, false)).QueueName; + var cons = new AsyncEventingBasicConsumer(ch); + await ch.BasicConsumeAsync(q, true, cons); + await AssertConsumerCountAsync(ch, q, 1); - await CloseAndWaitForRecoveryAsync(c); - - Assert.True(ch.IsOpen); - var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - cons.ReceivedAsync += (s, args) => - { - tcs.SetResult(true); - return Task.CompletedTask; - }; + await CloseAndWaitForRecoveryAsync(c); - await ch.BasicPublishAsync("", q, _encoding.GetBytes("msg")); - await WaitAsync(tcs, "received event"); + Assert.True(ch.IsOpen); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + cons.ReceivedAsync += (s, args) => + { + tcs.SetResult(true); + return Task.CompletedTask; + }; - await ch.QueueDeleteAsync(q); - await ch.CloseAsync(); - } + await ch.BasicPublishAsync("", q, _encoding.GetBytes("msg")); + await WaitAsync(tcs, "received event"); - await c.CloseAsync(); + await ch.QueueDeleteAsync(q); + await ch.CloseAsync(); } + + await c.CloseAsync(); } [Fact] @@ -155,120 +145,114 @@ public async Task TestConsumerRecoveryOnClientNamedQueueWithOneRecovery() { const string q0 = "dotnet-client.recovery.queue1"; // connection #1 - using (AutorecoveringConnection c = await CreateAutorecoveringConnectionAsync()) + await using AutorecoveringConnection c = await CreateAutorecoveringConnectionAsync(); + await using (IChannel ch = await c.CreateChannelAsync()) { - using (IChannel ch = await c.CreateChannelAsync()) + string q1 = (await ch.QueueDeclareAsync(q0, false, false, false)).QueueName; + Assert.Equal(q0, q1); + + var cons = new AsyncEventingBasicConsumer(ch); + await ch.BasicConsumeAsync(q1, true, cons); + await AssertConsumerCountAsync(ch, q1, 1); + + bool queueNameChangeAfterRecoveryCalled = false; + c.QueueNameChangedAfterRecoveryAsync += (source, ea) => { - string q1 = (await ch.QueueDeclareAsync(q0, false, false, false)).QueueName; - Assert.Equal(q0, q1); + queueNameChangeAfterRecoveryCalled = true; + return Task.CompletedTask; + }; - var cons = new AsyncEventingBasicConsumer(ch); - await ch.BasicConsumeAsync(q1, true, cons); - await AssertConsumerCountAsync(ch, q1, 1); + // connection #2 + await CloseAndWaitForRecoveryAsync(c); + await AssertConsumerCountAsync(ch, q1, 1); + Assert.False(queueNameChangeAfterRecoveryCalled); - bool queueNameChangeAfterRecoveryCalled = false; - c.QueueNameChangedAfterRecoveryAsync += (source, ea) => - { - queueNameChangeAfterRecoveryCalled = true; - return Task.CompletedTask; - }; - - // connection #2 - await CloseAndWaitForRecoveryAsync(c); - await AssertConsumerCountAsync(ch, q1, 1); - Assert.False(queueNameChangeAfterRecoveryCalled); - - // connection #3 - await CloseAndWaitForRecoveryAsync(c); - await AssertConsumerCountAsync(ch, q1, 1); - Assert.False(queueNameChangeAfterRecoveryCalled); - - // connection #4 - await CloseAndWaitForRecoveryAsync(c); - await AssertConsumerCountAsync(ch, q1, 1); - Assert.False(queueNameChangeAfterRecoveryCalled); - - var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - cons.ReceivedAsync += (s, args) => - { - tcs.SetResult(true); - return Task.CompletedTask; - }; + // connection #3 + await CloseAndWaitForRecoveryAsync(c); + await AssertConsumerCountAsync(ch, q1, 1); + Assert.False(queueNameChangeAfterRecoveryCalled); - await ch.BasicPublishAsync("", q1, _encoding.GetBytes("msg")); - await WaitAsync(tcs, "received event"); + // connection #4 + await CloseAndWaitForRecoveryAsync(c); + await AssertConsumerCountAsync(ch, q1, 1); + Assert.False(queueNameChangeAfterRecoveryCalled); - await ch.QueueDeleteAsync(q1); - await ch.CloseAsync(); - } + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + cons.ReceivedAsync += (s, args) => + { + tcs.SetResult(true); + return Task.CompletedTask; + }; - await c.CloseAsync(); + await ch.BasicPublishAsync("", q1, _encoding.GetBytes("msg")); + await WaitAsync(tcs, "received event"); + + await ch.QueueDeleteAsync(q1); + await ch.CloseAsync(); } + + await c.CloseAsync(); } [Fact] public async Task TestConsumerRecoveryWithServerNamedQueue() { // https://github.com/rabbitmq/rabbitmq-dotnet-client/issues/1238 - using (AutorecoveringConnection c = await CreateAutorecoveringConnectionAsync()) + await using AutorecoveringConnection c = await CreateAutorecoveringConnectionAsync(); + await using (IChannel ch = await c.CreateChannelAsync()) { - using (IChannel ch = await c.CreateChannelAsync()) + RabbitMQ.Client.QueueDeclareOk queueDeclareResult = + await ch.QueueDeclareAsync(queue: string.Empty, durable: false, exclusive: true, autoDelete: true, arguments: null); + string qname = queueDeclareResult.QueueName; + Assert.False(string.IsNullOrEmpty(qname)); + + var cons = new AsyncEventingBasicConsumer(ch); + await ch.BasicConsumeAsync(string.Empty, true, cons); + await AssertConsumerCountAsync(ch, qname, 1); + + bool queueNameBeforeIsEqual = false; + bool queueNameChangeAfterRecoveryCalled = false; + string qnameAfterRecovery = null; + c.QueueNameChangedAfterRecoveryAsync += (source, ea) => { - RabbitMQ.Client.QueueDeclareOk queueDeclareResult = - await ch.QueueDeclareAsync(queue: string.Empty, durable: false, exclusive: true, autoDelete: true, arguments: null); - string qname = queueDeclareResult.QueueName; - Assert.False(string.IsNullOrEmpty(qname)); - - var cons = new AsyncEventingBasicConsumer(ch); - await ch.BasicConsumeAsync(string.Empty, true, cons); - await AssertConsumerCountAsync(ch, qname, 1); - - bool queueNameBeforeIsEqual = false; - bool queueNameChangeAfterRecoveryCalled = false; - string qnameAfterRecovery = null; - c.QueueNameChangedAfterRecoveryAsync += (source, ea) => - { - queueNameChangeAfterRecoveryCalled = true; - queueNameBeforeIsEqual = qname.Equals(ea.NameBefore); - qnameAfterRecovery = ea.NameAfter; - return Task.CompletedTask; - }; - - await CloseAndWaitForRecoveryAsync(c); + queueNameChangeAfterRecoveryCalled = true; + queueNameBeforeIsEqual = qname.Equals(ea.NameBefore); + qnameAfterRecovery = ea.NameAfter; + return Task.CompletedTask; + }; - await AssertConsumerCountAsync(ch, qnameAfterRecovery, 1); - Assert.True(queueNameChangeAfterRecoveryCalled); - Assert.True(queueNameBeforeIsEqual); + await CloseAndWaitForRecoveryAsync(c); - await ch.CloseAsync(); - } + await AssertConsumerCountAsync(ch, qnameAfterRecovery, 1); + Assert.True(queueNameChangeAfterRecoveryCalled); + Assert.True(queueNameBeforeIsEqual); - await c.CloseAsync(); + await ch.CloseAsync(); } + + await c.CloseAsync(); } [Fact] public async Task TestCreateChannelOnClosedAutorecoveringConnectionDoesNotHang() { // we don't want this to recover quickly in this test - using (AutorecoveringConnection conn = await CreateAutorecoveringConnectionAsync(TimeSpan.FromSeconds(20))) + await using AutorecoveringConnection conn = await CreateAutorecoveringConnectionAsync(TimeSpan.FromSeconds(20)); + try { - try - { - await conn.CloseAsync(); - await WaitForShutdownAsync(conn); - Assert.False(conn.IsOpen); - await conn.CreateChannelAsync(); - Assert.Fail("Expected an exception"); - } - catch (AlreadyClosedException) - { - // expected - } - finally - { - await conn.CloseAsync(); - } + await conn.CloseAsync(); + await WaitForShutdownAsync(conn); + Assert.False(conn.IsOpen); + await conn.CreateChannelAsync(); + Assert.Fail("Expected an exception"); + } + catch (AlreadyClosedException) + { + // expected + } + finally + { + await conn.CloseAsync(); } } @@ -288,98 +272,96 @@ public async Task TestTopologyRecoveryConsumerFilter() var connectionRecoveryTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - using (AutorecoveringConnection conn = await CreateAutorecoveringConnectionWithTopologyRecoveryFilterAsync(filter)) + await using AutorecoveringConnection conn = await CreateAutorecoveringConnectionWithTopologyRecoveryFilterAsync(filter); + conn.RecoverySucceededAsync += (source, ea) => { - conn.RecoverySucceededAsync += (source, ea) => - { - connectionRecoveryTcs.SetResult(true); - return Task.CompletedTask; - }; - conn.ConnectionRecoveryErrorAsync += (source, ea) => + connectionRecoveryTcs.SetResult(true); + return Task.CompletedTask; + }; + conn.ConnectionRecoveryErrorAsync += (source, ea) => + { + connectionRecoveryTcs.SetException(ea.Exception); + return Task.CompletedTask; + }; + conn.CallbackExceptionAsync += (source, ea) => + { + connectionRecoveryTcs.SetException(ea.Exception); + return Task.CompletedTask; + }; + + await using (IChannel ch = await conn.CreateChannelAsync()) + { + await ch.ConfirmSelectAsync(); + + await ch.ExchangeDeclareAsync(exchange, "direct"); + await ch.QueueDeclareAsync(queueWithRecoveredConsumer, false, false, false); + await ch.QueueDeclareAsync(queueWithIgnoredConsumer, false, false, false); + await ch.QueueBindAsync(queueWithRecoveredConsumer, exchange, binding1); + await ch.QueueBindAsync(queueWithIgnoredConsumer, exchange, binding2); + await ch.QueuePurgeAsync(queueWithRecoveredConsumer); + await ch.QueuePurgeAsync(queueWithIgnoredConsumer); + + var consumerRecoveryTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var consumerToRecover = new AsyncEventingBasicConsumer(ch); + consumerToRecover.ReceivedAsync += (source, ea) => { - connectionRecoveryTcs.SetException(ea.Exception); + consumerRecoveryTcs.SetResult(true); return Task.CompletedTask; }; - conn.CallbackExceptionAsync += (source, ea) => + await ch.BasicConsumeAsync(queueWithRecoveredConsumer, true, "recovered.consumer", consumerToRecover); + + var ignoredTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var consumerToIgnore = new AsyncEventingBasicConsumer(ch); + consumerToIgnore.ReceivedAsync += (source, ea) => { - connectionRecoveryTcs.SetException(ea.Exception); + ignoredTcs.SetResult(true); return Task.CompletedTask; }; + await ch.BasicConsumeAsync(queueWithIgnoredConsumer, true, "filtered.consumer", consumerToIgnore); - using (IChannel ch = await conn.CreateChannelAsync()) + try { - await ch.ConfirmSelectAsync(); - - await ch.ExchangeDeclareAsync(exchange, "direct"); - await ch.QueueDeclareAsync(queueWithRecoveredConsumer, false, false, false); - await ch.QueueDeclareAsync(queueWithIgnoredConsumer, false, false, false); - await ch.QueueBindAsync(queueWithRecoveredConsumer, exchange, binding1); - await ch.QueueBindAsync(queueWithIgnoredConsumer, exchange, binding2); - await ch.QueuePurgeAsync(queueWithRecoveredConsumer); - await ch.QueuePurgeAsync(queueWithIgnoredConsumer); - - var consumerRecoveryTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - var consumerToRecover = new AsyncEventingBasicConsumer(ch); - consumerToRecover.ReceivedAsync += (source, ea) => + await CloseAndWaitForRecoveryAsync(conn); + await WaitAsync(connectionRecoveryTcs, "recovery succeeded"); + + Assert.True(ch.IsOpen); + await ch.BasicPublishAsync(exchange, binding1, _encoding.GetBytes("test message")); + await ch.BasicPublishAsync(exchange, binding2, _encoding.GetBytes("test message")); + await WaitForConfirmsWithCancellationAsync(ch); + + await consumerRecoveryTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.True(await consumerRecoveryTcs.Task); + + bool sawTimeout = false; + try { - consumerRecoveryTcs.SetResult(true); - return Task.CompletedTask; - }; - await ch.BasicConsumeAsync(queueWithRecoveredConsumer, true, "recovered.consumer", consumerToRecover); - - var ignoredTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - var consumerToIgnore = new AsyncEventingBasicConsumer(ch); - consumerToIgnore.ReceivedAsync += (source, ea) => + await ignoredTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); + } + catch (TimeoutException) { - ignoredTcs.SetResult(true); - return Task.CompletedTask; - }; + sawTimeout = true; + } + Assert.True(sawTimeout); + await ch.BasicConsumeAsync(queueWithIgnoredConsumer, true, "filtered.consumer", consumerToIgnore); try { - await CloseAndWaitForRecoveryAsync(conn); - await WaitAsync(connectionRecoveryTcs, "recovery succeeded"); - - Assert.True(ch.IsOpen); - await ch.BasicPublishAsync(exchange, binding1, _encoding.GetBytes("test message")); - await ch.BasicPublishAsync(exchange, binding2, _encoding.GetBytes("test message")); - await WaitForConfirmsWithCancellationAsync(ch); - - await consumerRecoveryTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); - Assert.True(await consumerRecoveryTcs.Task); - - bool sawTimeout = false; - try - { - await ignoredTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); - } - catch (TimeoutException) - { - sawTimeout = true; - } - Assert.True(sawTimeout); - - await ch.BasicConsumeAsync(queueWithIgnoredConsumer, true, "filtered.consumer", consumerToIgnore); - - try - { - await ch.BasicConsumeAsync(queueWithRecoveredConsumer, true, "recovered.consumer", consumerToRecover); - Assert.Fail("Expected an exception"); - } - catch (OperationInterruptedException e) - { - AssertShutdownError(e.ShutdownReason, 530); // NOT_ALLOWED - not allowed to reuse consumer tag - } + await ch.BasicConsumeAsync(queueWithRecoveredConsumer, true, "recovered.consumer", consumerToRecover); + Assert.Fail("Expected an exception"); } - finally + catch (OperationInterruptedException e) { - await ch.CloseAsync(); + AssertShutdownError(e.ShutdownReason, 530); // NOT_ALLOWED - not allowed to reuse consumer tag } } - - await conn.CloseAsync(); + finally + { + await ch.CloseAsync(); + } } + + await conn.CloseAsync(); } [Fact] @@ -387,38 +369,36 @@ public async Task TestRecoveryWithTopologyDisabled() { string queueName = GenerateQueueName() + "-dotnet-client.test.recovery.q2"; - using (AutorecoveringConnection conn = await CreateAutorecoveringConnectionWithTopologyRecoveryDisabledAsync()) + await using AutorecoveringConnection conn = await CreateAutorecoveringConnectionWithTopologyRecoveryDisabledAsync(); + await using (IChannel ch = await conn.CreateChannelAsync()) { - using (IChannel ch = await conn.CreateChannelAsync()) + try { - try - { - await ch.QueueDeleteAsync(queueName); - await ch.QueueDeclareAsync(queue: queueName, - durable: false, exclusive: true, autoDelete: false, arguments: null); - await ch.QueueDeclareAsync(queue: queueName, - passive: true, durable: false, exclusive: true, autoDelete: false, arguments: null); + await ch.QueueDeleteAsync(queueName); + await ch.QueueDeclareAsync(queue: queueName, + durable: false, exclusive: true, autoDelete: false, arguments: null); + await ch.QueueDeclareAsync(queue: queueName, + passive: true, durable: false, exclusive: true, autoDelete: false, arguments: null); - Assert.True(ch.IsOpen); - await CloseAndWaitForRecoveryAsync(conn); + Assert.True(ch.IsOpen); + await CloseAndWaitForRecoveryAsync(conn); - Assert.True(ch.IsOpen); - await ch.QueueDeclareAsync(queue: queueName, passive: true, durable: false, exclusive: true, autoDelete: false, arguments: null); + Assert.True(ch.IsOpen); + await ch.QueueDeclareAsync(queue: queueName, passive: true, durable: false, exclusive: true, autoDelete: false, arguments: null); - Assert.Fail("Expected an exception"); - } - catch (OperationInterruptedException) - { - // expected - } - finally - { - await ch.CloseAsync(); - } + Assert.Fail("Expected an exception"); + } + catch (OperationInterruptedException) + { + // expected + } + finally + { + await ch.CloseAsync(); } - - await conn.CloseAsync(); } + + await conn.CloseAsync(); } } } diff --git a/projects/Test/Integration/TestConnectionTopologyRecovery.cs b/projects/Test/Integration/TestConnectionTopologyRecovery.cs index 212da3f8c..ffe7d7847 100644 --- a/projects/Test/Integration/TestConnectionTopologyRecovery.cs +++ b/projects/Test/Integration/TestConnectionTopologyRecovery.cs @@ -55,7 +55,7 @@ public async Task TestRecoverTopologyOnDisposedChannel() string q = GenerateQueueName(); const string rk = "routing-key"; - using (IChannel ch = await _conn.CreateChannelAsync()) + await using (IChannel ch = await _conn.CreateChannelAsync()) { await ch.ExchangeDeclareAsync(exchange: x, type: "fanout"); await ch.QueueDeclareAsync(q, false, false, false); @@ -354,12 +354,10 @@ public async Task TestTopologyRecoveryQueueExceptionHandler() }, QueueRecoveryExceptionHandlerAsync = async (rq, ex, connection) => { - using (IChannel channel = await connection.CreateChannelAsync()) - { - await channel.QueueDeclareAsync(rq.Name, false, false, false, - noWait: false, arguments: changedQueueArguments); - await channel.CloseAsync(); - } + await using IChannel channel = await connection.CreateChannelAsync(); + await channel.QueueDeclareAsync(rq.Name, false, false, false, + noWait: false, arguments: changedQueueArguments); + await channel.CloseAsync(); } }; @@ -416,11 +414,9 @@ public async Task TestTopologyRecoveryExchangeExceptionHandler() }, ExchangeRecoveryExceptionHandlerAsync = async (re, ex, connection) => { - using (IChannel channel = await connection.CreateChannelAsync()) - { - await channel.ExchangeDeclareAsync(re.Name, "topic", false, false); - await channel.CloseAsync(); - } + await using IChannel channel = await connection.CreateChannelAsync(); + await channel.ExchangeDeclareAsync(re.Name, "topic", false, false); + await channel.CloseAsync(); } }; @@ -478,12 +474,10 @@ public async Task TestTopologyRecoveryBindingExceptionHandler() }, BindingRecoveryExceptionHandlerAsync = async (b, ex, connection) => { - using (IChannel channel = await connection.CreateChannelAsync()) - { - await channel.QueueDeclareAsync(queueWithExceptionBinding, false, false, false); - await channel.QueueBindAsync(queueWithExceptionBinding, exchange, bindingToRecoverWithException); - await channel.CloseAsync(); - } + await using IChannel channel = await connection.CreateChannelAsync(); + await channel.QueueDeclareAsync(queueWithExceptionBinding, false, false, false); + await channel.QueueBindAsync(queueWithExceptionBinding, exchange, bindingToRecoverWithException); + await channel.CloseAsync(); } }; @@ -541,11 +535,9 @@ public async Task TestTopologyRecoveryConsumerExceptionHandler() }, ConsumerRecoveryExceptionHandlerAsync = async (c, ex, connection) => { - using (IChannel channel = await connection.CreateChannelAsync()) - { - await channel.QueueDeclareAsync(queueWithExceptionConsumer, false, false, false); - await channel.CloseAsync(); - } + await using IChannel channel = await connection.CreateChannelAsync(); + await channel.QueueDeclareAsync(queueWithExceptionConsumer, false, false, false); + await channel.CloseAsync(); // So topology recovery runs again. This time he missing queue should exist, making // it possible to recover the consumer successfully. diff --git a/projects/Test/Integration/TestFloodPublishing.cs b/projects/Test/Integration/TestFloodPublishing.cs index 7cef7d900..836aead2f 100644 --- a/projects/Test/Integration/TestFloodPublishing.cs +++ b/projects/Test/Integration/TestFloodPublishing.cs @@ -167,49 +167,47 @@ public async Task TestMultithreadFloodPublishing() Task pub = Task.Run(async () => { bool stop = false; - using (IConnection publishConnection = await _connFactory.CreateConnectionAsync()) + await using IConnection publishConnection = await _connFactory.CreateConnectionAsync(); + publishConnection.ConnectionShutdownAsync += (o, ea) => { - publishConnection.ConnectionShutdownAsync += (o, ea) => + HandleConnectionShutdown(_conn, ea, (args) => { - HandleConnectionShutdown(_conn, ea, (args) => + if (args.Initiator != ShutdownInitiator.Application) + { + receivedCount = -1; + allMessagesSeenTcs.TrySetException(args.Exception); + } + }); + return Task.CompletedTask; + }; + + await using (IChannel publishChannel = await publishConnection.CreateChannelAsync()) + { + await publishChannel.ConfirmSelectAsync(); + + publishChannel.ChannelShutdownAsync += (o, ea) => + { + HandleChannelShutdown(publishChannel, ea, (args) => { if (args.Initiator != ShutdownInitiator.Application) { - receivedCount = -1; + stop = true; allMessagesSeenTcs.TrySetException(args.Exception); } }); return Task.CompletedTask; }; - using (IChannel publishChannel = await publishConnection.CreateChannelAsync()) + for (int i = 0; i < publishCount && false == stop; i++) { - await publishChannel.ConfirmSelectAsync(); - - publishChannel.ChannelShutdownAsync += (o, ea) => - { - HandleChannelShutdown(publishChannel, ea, (args) => - { - if (args.Initiator != ShutdownInitiator.Application) - { - stop = true; - allMessagesSeenTcs.TrySetException(args.Exception); - } - }); - return Task.CompletedTask; - }; - - for (int i = 0; i < publishCount && false == stop; i++) - { - await publishChannel.BasicPublishAsync(string.Empty, queueName, true, sendBody); - } - - await publishChannel.WaitForConfirmsOrDieAsync(); - await publishChannel.CloseAsync(); + await publishChannel.BasicPublishAsync(string.Empty, queueName, true, sendBody); } - await publishConnection.CloseAsync(); + await publishChannel.WaitForConfirmsOrDieAsync(); + await publishChannel.CloseAsync(); } + + await publishConnection.CloseAsync(); }); var cts = new CancellationTokenSource(WaitSpan); @@ -220,7 +218,7 @@ public async Task TestMultithreadFloodPublishing() try { - using (IConnection consumeConnection = await _connFactory.CreateConnectionAsync()) + await using (IConnection consumeConnection = await _connFactory.CreateConnectionAsync()) { consumeConnection.ConnectionShutdownAsync += (o, ea) => { @@ -235,7 +233,7 @@ public async Task TestMultithreadFloodPublishing() return Task.CompletedTask; }; - using (IChannel consumeChannel = await consumeConnection.CreateChannelAsync()) + await using (IChannel consumeChannel = await consumeConnection.CreateChannelAsync()) { consumeChannel.ChannelShutdownAsync += (o, ea) => { diff --git a/projects/Test/Integration/TestInitialConnection.cs b/projects/Test/Integration/TestInitialConnection.cs index 37229cff1..d488ce40d 100644 --- a/projects/Test/Integration/TestInitialConnection.cs +++ b/projects/Test/Integration/TestInitialConnection.cs @@ -58,21 +58,17 @@ public override Task InitializeAsync() [Fact] public async Task TestWithHostnameList() { - using (AutorecoveringConnection c = await CreateAutorecoveringConnectionAsync(new List() { "127.0.0.1", "localhost" })) - { - Assert.True(c.IsOpen); - await c.CloseAsync(); - } + await using AutorecoveringConnection c = await CreateAutorecoveringConnectionAsync(new List() { "127.0.0.1", "localhost" }); + Assert.True(c.IsOpen); + await c.CloseAsync(); } [Fact] public async Task TestWithHostnameListAndUnreachableHosts() { - using (AutorecoveringConnection c = await CreateAutorecoveringConnectionAsync(new List() { "191.72.44.22", "127.0.0.1", "localhost" })) - { - Assert.True(c.IsOpen); - await c.CloseAsync(); - } + await using AutorecoveringConnection c = await CreateAutorecoveringConnectionAsync(new List() { "191.72.44.22", "127.0.0.1", "localhost" }); + Assert.True(c.IsOpen); + await c.CloseAsync(); } [Fact] diff --git a/projects/Test/Integration/TestPublishSharedChannelAsync.cs b/projects/Test/Integration/TestPublishSharedChannelAsync.cs index abfbb0405..1378c7bc0 100644 --- a/projects/Test/Integration/TestPublishSharedChannelAsync.cs +++ b/projects/Test/Integration/TestPublishSharedChannelAsync.cs @@ -73,39 +73,37 @@ public async Task MultiThreadPublishOnSharedChannel() var cf = CreateConnectionFactory(); cf.AutomaticRecoveryEnabled = false; - using (IConnection conn = await cf.CreateConnectionAsync()) + await using (IConnection conn = await cf.CreateConnectionAsync()) { try { Assert.IsNotType(conn); conn.ConnectionShutdownAsync += HandleConnectionShutdownAsync; - using (IChannel channel = await conn.CreateChannelAsync()) + await using IChannel channel = await conn.CreateChannelAsync(); + try { - try - { - channel.ChannelShutdownAsync += HandleChannelShutdownAsync; - await channel.ExchangeDeclareAsync(ExchangeName.Value, ExchangeType.Topic, passive: false, durable: false, autoDelete: true, - noWait: false, arguments: null); - await channel.QueueDeclareAsync(QueueName, exclusive: false, autoDelete: true); - await channel.QueueBindAsync(QueueName, ExchangeName.Value, PublishKey.Value); + channel.ChannelShutdownAsync += HandleChannelShutdownAsync; + await channel.ExchangeDeclareAsync(ExchangeName.Value, ExchangeType.Topic, passive: false, durable: false, autoDelete: true, + noWait: false, arguments: null); + await channel.QueueDeclareAsync(QueueName, exclusive: false, autoDelete: true); + await channel.QueueBindAsync(QueueName, ExchangeName.Value, PublishKey.Value); - for (int i = 0; i < Loops; i++) + for (int i = 0; i < Loops; i++) + { + for (int j = 0; j < Repeats; j++) { - for (int j = 0; j < Repeats; j++) - { - await channel.BasicPublishAsync(ExchangeName, PublishKey, false, _body); - } + await channel.BasicPublishAsync(ExchangeName, PublishKey, false, _body); } } - catch (Exception e) - { - _raisedException = e; - } - finally - { - await channel.CloseAsync(); - } + } + catch (Exception e) + { + _raisedException = e; + } + finally + { + await channel.CloseAsync(); } } finally diff --git a/projects/Test/Integration/TestPublisherConfirms.cs b/projects/Test/Integration/TestPublisherConfirms.cs index 43dfe5cac..80df6ea72 100644 --- a/projects/Test/Integration/TestPublisherConfirms.cs +++ b/projects/Test/Integration/TestPublisherConfirms.cs @@ -63,10 +63,8 @@ public Task TestWaitForConfirmsWithTimeout() { return TestWaitForConfirmsAsync(200, async (ch) => { - using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(4))) - { - Assert.True(await ch.WaitForConfirmsAsync(cts.Token)); - } + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(4)); + Assert.True(await ch.WaitForConfirmsAsync(cts.Token)); }); } @@ -78,16 +76,14 @@ public async Task TestWaitForConfirmsWithTimeoutAsync_MightThrowTaskCanceledExce Task t = TestWaitForConfirmsAsync(10000, async (ch) => { - using (var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(1))) + using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(1)); + try + { + waitResult = await ch.WaitForConfirmsAsync(cts.Token); + } + catch { - try - { - waitResult = await ch.WaitForConfirmsAsync(cts.Token); - } - catch - { - sawException = true; - } + sawException = true; } }); @@ -106,10 +102,8 @@ public Task TestWaitForConfirmsWithTimeoutAsync_MessageNacked_WaitingHasTimedout { RecoveryAwareChannel actualChannel = ((AutorecoveringChannel)ch).InnerChannel; await actualChannel.HandleAckNack(10UL, false, true); - using (var cts = new CancellationTokenSource(ShortSpan)) - { - Assert.False(await ch.WaitForConfirmsAsync(cts.Token)); - } + using var cts = new CancellationTokenSource(ShortSpan); + Assert.False(await ch.WaitForConfirmsAsync(cts.Token)); }); } @@ -117,71 +111,67 @@ public Task TestWaitForConfirmsWithTimeoutAsync_MessageNacked_WaitingHasTimedout public async Task TestWaitForConfirmsWithEventsAsync() { string queueName = GenerateQueueName(); - using (IChannel ch = await _conn.CreateChannelAsync()) - { - await ch.ConfirmSelectAsync(); - await ch.QueueDeclareAsync(queue: queueName, passive: false, durable: false, - exclusive: true, autoDelete: false, arguments: null); + await using IChannel ch = await _conn.CreateChannelAsync(); + await ch.ConfirmSelectAsync(); + await ch.QueueDeclareAsync(queue: queueName, passive: false, durable: false, + exclusive: true, autoDelete: false, arguments: null); - int n = 200; - // number of event handler invocations - int c = 0; + int n = 200; + // number of event handler invocations + int c = 0; - ch.BasicAcksAsync += (_, args) => - { - Interlocked.Increment(ref c); - return Task.CompletedTask; - }; + ch.BasicAcksAsync += (_, args) => + { + Interlocked.Increment(ref c); + return Task.CompletedTask; + }; - try - { - for (int i = 0; i < n; i++) - { - await ch.BasicPublishAsync("", queueName, _encoding.GetBytes("msg")); - } - - await ch.WaitForConfirmsAsync(); - - // Note: number of event invocations is not guaranteed - // to be equal to N because acks can be batched, - // so we primarily care about event handlers being invoked - // in this test - Assert.True(c >= 1); - } - finally + try + { + for (int i = 0; i < n; i++) { - await ch.QueueDeleteAsync(queue: queueName, ifUnused: false, ifEmpty: false); - await ch.CloseAsync(); + await ch.BasicPublishAsync("", queueName, _encoding.GetBytes("msg")); } + + await ch.WaitForConfirmsAsync(); + + // Note: number of event invocations is not guaranteed + // to be equal to N because acks can be batched, + // so we primarily care about event handlers being invoked + // in this test + Assert.True(c >= 1); + } + finally + { + await ch.QueueDeleteAsync(queue: queueName, ifUnused: false, ifEmpty: false); + await ch.CloseAsync(); } } private async Task TestWaitForConfirmsAsync(int numberOfMessagesToPublish, Func fn) { string queueName = GenerateQueueName(); - using (IChannel ch = await _conn.CreateChannelAsync()) - { - var props = new BasicProperties { Persistent = true }; + await using IChannel ch = await _conn.CreateChannelAsync(); + var props = new BasicProperties { Persistent = true }; - await ch.ConfirmSelectAsync(); - await ch.QueueDeclareAsync(queue: queueName, passive: false, durable: false, - exclusive: true, autoDelete: false, arguments: null); + await ch.ConfirmSelectAsync(); + await ch.QueueDeclareAsync(queue: queueName, passive: false, durable: false, + exclusive: true, autoDelete: false, arguments: null); - for (int i = 0; i < numberOfMessagesToPublish; i++) - { - await ch.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, - body: _messageBody, mandatory: true, basicProperties: props); - } + for (int i = 0; i < numberOfMessagesToPublish; i++) + { + await ch.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, + body: _messageBody, mandatory: true, basicProperties: props); + } - try - { - await fn(ch); - } - finally - { - await ch.QueueDeleteAsync(queue: queueName, ifUnused: false, ifEmpty: false); - await ch.CloseAsync(); - } + try + { + await fn(ch); + } + finally + { + await ch.QueueDeleteAsync(queue: queueName, ifUnused: false, ifEmpty: false); + await ch.CloseAsync(); } } } diff --git a/projects/Test/Integration/TestSsl.cs b/projects/Test/Integration/TestSsl.cs index 48925f6df..676b317af 100644 --- a/projects/Test/Integration/TestSsl.cs +++ b/projects/Test/Integration/TestSsl.cs @@ -125,29 +125,25 @@ public async Task TestNoClientCertificate() private async Task SendReceiveAsync(ConnectionFactory connectionFactory) { - using (IConnection conn = await CreateConnectionAsyncWithRetries(connectionFactory)) - { - using (IChannel ch = await conn.CreateChannelAsync()) - { - await ch.ExchangeDeclareAsync("Exchange_TestSslEndPoint", ExchangeType.Direct); + await using IConnection conn = await CreateConnectionAsyncWithRetries(connectionFactory); + await using IChannel ch = await conn.CreateChannelAsync(); + await ch.ExchangeDeclareAsync("Exchange_TestSslEndPoint", ExchangeType.Direct); - string qName = await ch.QueueDeclareAsync(); - await ch.QueueBindAsync(qName, "Exchange_TestSslEndPoint", "Key_TestSslEndpoint"); + string qName = await ch.QueueDeclareAsync(); + await ch.QueueBindAsync(qName, "Exchange_TestSslEndPoint", "Key_TestSslEndpoint"); - string message = "Hello C# SSL Client World"; - byte[] msgBytes = _encoding.GetBytes(message); - await ch.BasicPublishAsync("Exchange_TestSslEndPoint", "Key_TestSslEndpoint", msgBytes); + string message = "Hello C# SSL Client World"; + byte[] msgBytes = _encoding.GetBytes(message); + await ch.BasicPublishAsync("Exchange_TestSslEndPoint", "Key_TestSslEndpoint", msgBytes); - bool autoAck = false; - BasicGetResult result = await ch.BasicGetAsync(qName, autoAck); - byte[] body = result.Body.ToArray(); - string resultMessage = _encoding.GetString(body); + bool autoAck = false; + BasicGetResult result = await ch.BasicGetAsync(qName, autoAck); + byte[] body = result.Body.ToArray(); + string resultMessage = _encoding.GetString(body); - Assert.Equal(message, resultMessage); + Assert.Equal(message, resultMessage); - await ch.CloseAsync(); - } - } + await ch.CloseAsync(); } } } diff --git a/projects/Test/Integration/TestToxiproxy.cs b/projects/Test/Integration/TestToxiproxy.cs index 52656c252..725d7f7a5 100644 --- a/projects/Test/Integration/TestToxiproxy.cs +++ b/projects/Test/Integration/TestToxiproxy.cs @@ -66,131 +66,125 @@ public async Task TestCloseConnection() { Skip.IfNot(AreToxiproxyTestsEnabled, "RABBITMQ_TOXIPROXY_TESTS is not set, skipping test"); - using (var pm = new ToxiproxyManager(_testDisplayName, IsRunningInCI, IsWindows)) - { - await pm.InitializeAsync(); + using var pm = new ToxiproxyManager(_testDisplayName, IsRunningInCI, IsWindows); + await pm.InitializeAsync(); - ConnectionFactory cf = CreateConnectionFactory(); - cf.Port = pm.ProxyPort; - cf.AutomaticRecoveryEnabled = true; - cf.NetworkRecoveryInterval = TimeSpan.FromSeconds(1); - cf.RequestedHeartbeat = TimeSpan.FromSeconds(1); + ConnectionFactory cf = CreateConnectionFactory(); + cf.Port = pm.ProxyPort; + cf.AutomaticRecoveryEnabled = true; + cf.NetworkRecoveryInterval = TimeSpan.FromSeconds(1); + cf.RequestedHeartbeat = TimeSpan.FromSeconds(1); - var messagePublishedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - var connectionShutdownTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - var recoverySucceededTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - var testSucceededTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var messagePublishedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var connectionShutdownTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var recoverySucceededTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var testSucceededTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - Task pubTask = Task.Run(async () => + Task pubTask = Task.Run(async () => + { + await using IConnection conn = await cf.CreateConnectionAsync(); + conn.CallbackExceptionAsync += (s, ea) => { - using (IConnection conn = await cf.CreateConnectionAsync()) - { - conn.CallbackExceptionAsync += (s, ea) => - { - _output.WriteLine($"[ERROR] unexpected callback exception {ea.Detail} {ea.Exception}"); - recoverySucceededTcs.SetResult(false); - return Task.CompletedTask; - }; + _output.WriteLine($"[ERROR] unexpected callback exception {ea.Detail} {ea.Exception}"); + recoverySucceededTcs.SetResult(false); + return Task.CompletedTask; + }; - conn.ConnectionRecoveryErrorAsync += (s, ea) => - { - _output.WriteLine($"[ERROR] connection recovery error {ea.Exception}"); - recoverySucceededTcs.SetResult(false); - return Task.CompletedTask; - }; + conn.ConnectionRecoveryErrorAsync += (s, ea) => + { + _output.WriteLine($"[ERROR] connection recovery error {ea.Exception}"); + recoverySucceededTcs.SetResult(false); + return Task.CompletedTask; + }; - conn.ConnectionShutdownAsync += (s, ea) => - { - if (IsVerbose) - { - _output.WriteLine($"[INFO] connection shutdown"); - } + conn.ConnectionShutdownAsync += (s, ea) => + { + if (IsVerbose) + { + _output.WriteLine($"[INFO] connection shutdown"); + } - /* + /* * Note: using TrySetResult because this callback will be called when the * test exits, and connectionShutdownTcs will have already been set */ - connectionShutdownTcs.TrySetResult(true); - return Task.CompletedTask; - }; + connectionShutdownTcs.TrySetResult(true); + return Task.CompletedTask; + }; - conn.RecoverySucceededAsync += (s, ea) => - { - if (IsVerbose) - { - _output.WriteLine($"[INFO] connection recovery succeeded"); - } + conn.RecoverySucceededAsync += (s, ea) => + { + if (IsVerbose) + { + _output.WriteLine($"[INFO] connection recovery succeeded"); + } - recoverySucceededTcs.SetResult(true); - return Task.CompletedTask; - }; + recoverySucceededTcs.SetResult(true); + return Task.CompletedTask; + }; - async Task PublishLoop() - { - using (IChannel ch = await conn.CreateChannelAsync()) - { - await ch.ConfirmSelectAsync(); - QueueDeclareOk q = await ch.QueueDeclareAsync(); - while (conn.IsOpen) - { - await ch.BasicPublishAsync("", q.QueueName, GetRandomBody()); - messagePublishedTcs.TrySetResult(true); - /* + async Task PublishLoop() + { + await using IChannel ch = await conn.CreateChannelAsync(); + await ch.ConfirmSelectAsync(); + QueueDeclareOk q = await ch.QueueDeclareAsync(); + while (conn.IsOpen) + { + await ch.BasicPublishAsync("", q.QueueName, GetRandomBody()); + messagePublishedTcs.TrySetResult(true); + /* * Note: * In this test, it is possible that the connection * will be closed before the ack is returned, * and this await will throw an exception */ - try - { - await ch.WaitForConfirmsAsync(); - } - catch (AlreadyClosedException ex) - { - if (IsVerbose) - { - _output.WriteLine($"[WARNING] WaitForConfirmsAsync ex: {ex}"); - } - } - } - - await ch.CloseAsync(); - } - } - try { - await PublishLoop(); + await ch.WaitForConfirmsAsync(); } - catch (Exception ex) + catch (AlreadyClosedException ex) { if (IsVerbose) { - _output.WriteLine($"[WARNING] PublishLoop ex: {ex}"); + _output.WriteLine($"[WARNING] WaitForConfirmsAsync ex: {ex}"); } } + } + + await ch.CloseAsync(); + } - Assert.True(await testSucceededTcs.Task); - await conn.CloseAsync(); + try + { + await PublishLoop(); + } + catch (Exception ex) + { + if (IsVerbose) + { + _output.WriteLine($"[WARNING] PublishLoop ex: {ex}"); } - }); + } - Assert.True(await messagePublishedTcs.Task); + Assert.True(await testSucceededTcs.Task); + await conn.CloseAsync(); + }); - Task disableProxyTask = pm.DisableAsync(); + Assert.True(await messagePublishedTcs.Task); - await Task.WhenAll(disableProxyTask, connectionShutdownTcs.Task); + Task disableProxyTask = pm.DisableAsync(); - Task enableProxyTask = pm.EnableAsync(); + await Task.WhenAll(disableProxyTask, connectionShutdownTcs.Task); - Task whenAllTask = Task.WhenAll(enableProxyTask, recoverySucceededTcs.Task); - await whenAllTask.WaitAsync(TimeSpan.FromSeconds(15)); + Task enableProxyTask = pm.EnableAsync(); - Assert.True(await recoverySucceededTcs.Task); + Task whenAllTask = Task.WhenAll(enableProxyTask, recoverySucceededTcs.Task); + await whenAllTask.WaitAsync(TimeSpan.FromSeconds(15)); - testSucceededTcs.SetResult(true); - await pubTask; - } + Assert.True(await recoverySucceededTcs.Task); + + testSucceededTcs.SetResult(true); + await pubTask; } [SkippableFact] @@ -199,52 +193,46 @@ public async Task TestThatStoppedSocketResultsInHeartbeatTimeout() { Skip.IfNot(AreToxiproxyTestsEnabled, "RABBITMQ_TOXIPROXY_TESTS is not set, skipping test"); - using (var pm = new ToxiproxyManager(_testDisplayName, IsRunningInCI, IsWindows)) - { - await pm.InitializeAsync(); + using var pm = new ToxiproxyManager(_testDisplayName, IsRunningInCI, IsWindows); + await pm.InitializeAsync(); - ConnectionFactory cf = CreateConnectionFactory(); - cf.Port = pm.ProxyPort; - cf.RequestedHeartbeat = _heartbeatTimeout; - cf.AutomaticRecoveryEnabled = false; + ConnectionFactory cf = CreateConnectionFactory(); + cf.Port = pm.ProxyPort; + cf.RequestedHeartbeat = _heartbeatTimeout; + cf.AutomaticRecoveryEnabled = false; - var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - Task pubTask = Task.Run(async () => + Task pubTask = Task.Run(async () => + { + await using IConnection conn = await cf.CreateConnectionAsync(); + await using IChannel ch = await conn.CreateChannelAsync(); + await ch.ConfirmSelectAsync(); + QueueDeclareOk q = await ch.QueueDeclareAsync(); + while (conn.IsOpen) { - using (IConnection conn = await cf.CreateConnectionAsync()) - { - using (IChannel ch = await conn.CreateChannelAsync()) - { - await ch.ConfirmSelectAsync(); - QueueDeclareOk q = await ch.QueueDeclareAsync(); - while (conn.IsOpen) - { - await ch.BasicPublishAsync("", q.QueueName, GetRandomBody()); - await ch.WaitForConfirmsAsync(); - await Task.Delay(TimeSpan.FromSeconds(1)); - tcs.TrySetResult(true); - } + await ch.BasicPublishAsync("", q.QueueName, GetRandomBody()); + await ch.WaitForConfirmsAsync(); + await Task.Delay(TimeSpan.FromSeconds(1)); + tcs.TrySetResult(true); + } - await ch.CloseAsync(); - await conn.CloseAsync(); - } - } - }); + await ch.CloseAsync(); + await conn.CloseAsync(); + }); - Assert.True(await tcs.Task); + Assert.True(await tcs.Task); - var timeoutToxic = new TimeoutToxic(); - timeoutToxic.Attributes.Timeout = 0; - timeoutToxic.Toxicity = 1.0; + var timeoutToxic = new TimeoutToxic(); + timeoutToxic.Attributes.Timeout = 0; + timeoutToxic.Toxicity = 1.0; - Task addToxicTask = pm.AddToxicAsync(timeoutToxic); + Task addToxicTask = pm.AddToxicAsync(timeoutToxic); - await Assert.ThrowsAsync(() => - { - return Task.WhenAll(addToxicTask, pubTask); - }); - } + await Assert.ThrowsAsync(() => + { + return Task.WhenAll(addToxicTask, pubTask); + }); } [SkippableFact] @@ -253,55 +241,51 @@ public async Task TestTcpReset_GH1464() { Skip.IfNot(AreToxiproxyTestsEnabled, "RABBITMQ_TOXIPROXY_TESTS is not set, skipping test"); - using (var pm = new ToxiproxyManager(_testDisplayName, IsRunningInCI, IsWindows)) - { - await pm.InitializeAsync(); + using var pm = new ToxiproxyManager(_testDisplayName, IsRunningInCI, IsWindows); + await pm.InitializeAsync(); - ConnectionFactory cf = CreateConnectionFactory(); - cf.Endpoint = new AmqpTcpEndpoint(IPAddress.Loopback.ToString(), pm.ProxyPort); - cf.RequestedHeartbeat = TimeSpan.FromSeconds(5); - cf.AutomaticRecoveryEnabled = true; + ConnectionFactory cf = CreateConnectionFactory(); + cf.Endpoint = new AmqpTcpEndpoint(IPAddress.Loopback.ToString(), pm.ProxyPort); + cf.RequestedHeartbeat = TimeSpan.FromSeconds(5); + cf.AutomaticRecoveryEnabled = true; - var channelCreatedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - var connectionShutdownTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var channelCreatedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var connectionShutdownTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - Task recoveryTask = Task.Run(async () => + Task recoveryTask = Task.Run(async () => + { + await using IConnection conn = await cf.CreateConnectionAsync(); + conn.ConnectionShutdownAsync += (o, ea) => { - using (IConnection conn = await cf.CreateConnectionAsync()) - { - conn.ConnectionShutdownAsync += (o, ea) => - { - connectionShutdownTcs.SetResult(true); - return Task.CompletedTask; - }; + connectionShutdownTcs.SetResult(true); + return Task.CompletedTask; + }; - using (IChannel ch = await conn.CreateChannelAsync()) - { - channelCreatedTcs.SetResult(true); - await WaitForRecoveryAsync(conn); - await ch.CloseAsync(); - } + await using (IChannel ch = await conn.CreateChannelAsync()) + { + channelCreatedTcs.SetResult(true); + await WaitForRecoveryAsync(conn); + await ch.CloseAsync(); + } - await conn.CloseAsync(); - } - }); + await conn.CloseAsync(); + }); - Assert.True(await channelCreatedTcs.Task); + Assert.True(await channelCreatedTcs.Task); - const string toxicName = "rmq-localhost-reset_peer"; - var resetPeerToxic = new ResetPeerToxic(); - resetPeerToxic.Name = toxicName; - resetPeerToxic.Attributes.Timeout = 500; - resetPeerToxic.Toxicity = 1.0; + const string toxicName = "rmq-localhost-reset_peer"; + var resetPeerToxic = new ResetPeerToxic(); + resetPeerToxic.Name = toxicName; + resetPeerToxic.Attributes.Timeout = 500; + resetPeerToxic.Toxicity = 1.0; - Task addToxicTask = pm.AddToxicAsync(resetPeerToxic); + Task addToxicTask = pm.AddToxicAsync(resetPeerToxic); - await Task.WhenAll(addToxicTask, connectionShutdownTcs.Task); + await Task.WhenAll(addToxicTask, connectionShutdownTcs.Task); - await pm.RemoveToxicAsync(toxicName); + await pm.RemoveToxicAsync(toxicName); - await recoveryTask; - } + await recoveryTask; } private bool AreToxiproxyTestsEnabled diff --git a/projects/Test/SequentialIntegration/TestActivitySource.cs b/projects/Test/SequentialIntegration/TestActivitySource.cs index 7266ae86a..107168f9f 100644 --- a/projects/Test/SequentialIntegration/TestActivitySource.cs +++ b/projects/Test/SequentialIntegration/TestActivitySource.cs @@ -84,34 +84,32 @@ public async Task TestPublisherAndConsumerActivityTags(bool useRoutingKeyAsOpera RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; var _activities = new List(); - using (ActivityListener activityListener = StartActivityListener(_activities)) + using ActivityListener activityListener = StartActivityListener(_activities); + await Task.Delay(500); + string queueName = $"{Guid.NewGuid()}"; + QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); + byte[] sendBody = Encoding.UTF8.GetBytes("hi"); + byte[] consumeBody = null; + var consumer = new AsyncEventingBasicConsumer(_channel); + var consumerReceivedTcs = + new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + consumer.ReceivedAsync += (o, a) => { - await Task.Delay(500); - string queueName = $"{Guid.NewGuid()}"; - QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); - byte[] sendBody = Encoding.UTF8.GetBytes("hi"); - byte[] consumeBody = null; - var consumer = new AsyncEventingBasicConsumer(_channel); - var consumerReceivedTcs = - new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - consumer.ReceivedAsync += (o, a) => - { - consumeBody = a.Body.ToArray(); - consumerReceivedTcs.SetResult(true); - return Task.CompletedTask; - }; + consumeBody = a.Body.ToArray(); + consumerReceivedTcs.SetResult(true); + return Task.CompletedTask; + }; - string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); - await _channel.BasicPublishAsync("", q.QueueName, true, sendBody); - await _channel.WaitForConfirmsOrDieAsync(); + string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); + await _channel.BasicPublishAsync("", q.QueueName, true, sendBody); + await _channel.WaitForConfirmsOrDieAsync(); - await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); - Assert.True(await consumerReceivedTcs.Task); + await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.True(await consumerReceivedTcs.Task); - await _channel.BasicCancelAsync(consumerTag); - await Task.Delay(500); - AssertActivityData(useRoutingKeyAsOperationName, queueName, _activities, true); - } + await _channel.BasicCancelAsync(consumerTag); + await Task.Delay(500); + AssertActivityData(useRoutingKeyAsOperationName, queueName, _activities, true); } [Theory] @@ -123,36 +121,34 @@ public async Task TestPublisherWithCachedStringsAndConsumerActivityTags(bool use RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; var _activities = new List(); - using (ActivityListener activityListener = StartActivityListener(_activities)) + using ActivityListener activityListener = StartActivityListener(_activities); + await Task.Delay(500); + string queueName = $"{Guid.NewGuid()}"; + QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); + byte[] sendBody = Encoding.UTF8.GetBytes("hi"); + byte[] consumeBody = null; + var consumer = new AsyncEventingBasicConsumer(_channel); + var consumerReceivedTcs = + new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + consumer.ReceivedAsync += (o, a) => { - await Task.Delay(500); - string queueName = $"{Guid.NewGuid()}"; - QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); - byte[] sendBody = Encoding.UTF8.GetBytes("hi"); - byte[] consumeBody = null; - var consumer = new AsyncEventingBasicConsumer(_channel); - var consumerReceivedTcs = - new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - consumer.ReceivedAsync += (o, a) => - { - consumeBody = a.Body.ToArray(); - consumerReceivedTcs.SetResult(true); - return Task.CompletedTask; - }; - - string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); - CachedString exchange = new CachedString(""); - CachedString routingKey = new CachedString(q.QueueName); - await _channel.BasicPublishAsync(exchange, routingKey, true, sendBody); - await _channel.WaitForConfirmsOrDieAsync(); - - await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); - Assert.True(await consumerReceivedTcs.Task); - - await _channel.BasicCancelAsync(consumerTag); - await Task.Delay(500); - AssertActivityData(useRoutingKeyAsOperationName, queueName, _activities, true); - } + consumeBody = a.Body.ToArray(); + consumerReceivedTcs.SetResult(true); + return Task.CompletedTask; + }; + + string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); + CachedString exchange = new CachedString(""); + CachedString routingKey = new CachedString(q.QueueName); + await _channel.BasicPublishAsync(exchange, routingKey, true, sendBody); + await _channel.WaitForConfirmsOrDieAsync(); + + await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.True(await consumerReceivedTcs.Task); + + await _channel.BasicCancelAsync(consumerTag); + await Task.Delay(500); + AssertActivityData(useRoutingKeyAsOperationName, queueName, _activities, true); } [Theory] @@ -164,35 +160,33 @@ public async Task TestPublisherWithPublicationAddressAndConsumerActivityTags(boo RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; var _activities = new List(); - using (ActivityListener activityListener = StartActivityListener(_activities)) + using ActivityListener activityListener = StartActivityListener(_activities); + await Task.Delay(500); + string queueName = $"{Guid.NewGuid()}"; + QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); + byte[] sendBody = Encoding.UTF8.GetBytes("hi"); + byte[] consumeBody = null; + var consumer = new AsyncEventingBasicConsumer(_channel); + var consumerReceivedTcs = + new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + consumer.ReceivedAsync += (o, a) => { - await Task.Delay(500); - string queueName = $"{Guid.NewGuid()}"; - QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); - byte[] sendBody = Encoding.UTF8.GetBytes("hi"); - byte[] consumeBody = null; - var consumer = new AsyncEventingBasicConsumer(_channel); - var consumerReceivedTcs = - new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - consumer.ReceivedAsync += (o, a) => - { - consumeBody = a.Body.ToArray(); - consumerReceivedTcs.SetResult(true); - return Task.CompletedTask; - }; - - string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); - PublicationAddress publicationAddress = new PublicationAddress(ExchangeType.Direct, "", q.QueueName); - await _channel.BasicPublishAsync(publicationAddress, new BasicProperties(), sendBody); - await _channel.WaitForConfirmsOrDieAsync(); - - await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); - Assert.True(await consumerReceivedTcs.Task); - - await _channel.BasicCancelAsync(consumerTag); - await Task.Delay(500); - AssertActivityData(useRoutingKeyAsOperationName, queueName, _activities, true); - } + consumeBody = a.Body.ToArray(); + consumerReceivedTcs.SetResult(true); + return Task.CompletedTask; + }; + + string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); + PublicationAddress publicationAddress = new PublicationAddress(ExchangeType.Direct, "", q.QueueName); + await _channel.BasicPublishAsync(publicationAddress, new BasicProperties(), sendBody); + await _channel.WaitForConfirmsOrDieAsync(); + + await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.True(await consumerReceivedTcs.Task); + + await _channel.BasicCancelAsync(consumerTag); + await Task.Delay(500); + AssertActivityData(useRoutingKeyAsOperationName, queueName, _activities, true); } [Theory] @@ -204,35 +198,33 @@ public async Task TestPublisherAndConsumerActivityTagsAsync(bool useRoutingKeyAs RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; var activities = new List(); - using (ActivityListener activityListener = StartActivityListener(activities)) + using ActivityListener activityListener = StartActivityListener(activities); + await Task.Delay(500); + + string queueName = $"{Guid.NewGuid()}"; + QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); + byte[] sendBody = Encoding.UTF8.GetBytes("hi"); + byte[] consumeBody = null; + var consumer = new AsyncEventingBasicConsumer(_channel); + var consumerReceivedTcs = + new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + consumer.ReceivedAsync += (o, a) => { - await Task.Delay(500); + consumeBody = a.Body.ToArray(); + consumerReceivedTcs.SetResult(true); + return Task.CompletedTask; + }; - string queueName = $"{Guid.NewGuid()}"; - QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); - byte[] sendBody = Encoding.UTF8.GetBytes("hi"); - byte[] consumeBody = null; - var consumer = new AsyncEventingBasicConsumer(_channel); - var consumerReceivedTcs = - new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - consumer.ReceivedAsync += (o, a) => - { - consumeBody = a.Body.ToArray(); - consumerReceivedTcs.SetResult(true); - return Task.CompletedTask; - }; + string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); + await _channel.BasicPublishAsync("", q.QueueName, true, sendBody); + await _channel.WaitForConfirmsOrDieAsync(); - string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); - await _channel.BasicPublishAsync("", q.QueueName, true, sendBody); - await _channel.WaitForConfirmsOrDieAsync(); + await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.True(await consumerReceivedTcs.Task); - await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); - Assert.True(await consumerReceivedTcs.Task); - - await _channel.BasicCancelAsync(consumerTag); - await Task.Delay(500); - AssertActivityData(useRoutingKeyAsOperationName, queueName, activities, true); - } + await _channel.BasicCancelAsync(consumerTag); + await Task.Delay(500); + AssertActivityData(useRoutingKeyAsOperationName, queueName, activities, true); } [Theory] @@ -244,37 +236,35 @@ public async Task TestPublisherWithCachedStringsAndConsumerActivityTagsAsync(boo RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; var activities = new List(); - using (ActivityListener activityListener = StartActivityListener(activities)) + using ActivityListener activityListener = StartActivityListener(activities); + await Task.Delay(500); + + string queueName = $"{Guid.NewGuid()}"; + QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); + byte[] sendBody = Encoding.UTF8.GetBytes("hi"); + byte[] consumeBody = null; + var consumer = new AsyncEventingBasicConsumer(_channel); + var consumerReceivedTcs = + new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + consumer.ReceivedAsync += (o, a) => { - await Task.Delay(500); - - string queueName = $"{Guid.NewGuid()}"; - QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); - byte[] sendBody = Encoding.UTF8.GetBytes("hi"); - byte[] consumeBody = null; - var consumer = new AsyncEventingBasicConsumer(_channel); - var consumerReceivedTcs = - new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - consumer.ReceivedAsync += (o, a) => - { - consumeBody = a.Body.ToArray(); - consumerReceivedTcs.SetResult(true); - return Task.CompletedTask; - }; - - string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); - CachedString exchange = new CachedString(""); - CachedString routingKey = new CachedString(q.QueueName); - await _channel.BasicPublishAsync(exchange, routingKey, true, sendBody); - await _channel.WaitForConfirmsOrDieAsync(); - - await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); - Assert.True(await consumerReceivedTcs.Task); - - await _channel.BasicCancelAsync(consumerTag); - await Task.Delay(500); - AssertActivityData(useRoutingKeyAsOperationName, queueName, activities, true); - } + consumeBody = a.Body.ToArray(); + consumerReceivedTcs.SetResult(true); + return Task.CompletedTask; + }; + + string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); + CachedString exchange = new CachedString(""); + CachedString routingKey = new CachedString(q.QueueName); + await _channel.BasicPublishAsync(exchange, routingKey, true, sendBody); + await _channel.WaitForConfirmsOrDieAsync(); + + await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.True(await consumerReceivedTcs.Task); + + await _channel.BasicCancelAsync(consumerTag); + await Task.Delay(500); + AssertActivityData(useRoutingKeyAsOperationName, queueName, activities, true); } [Theory] @@ -286,36 +276,34 @@ public async Task TestPublisherWithPublicationAddressAndConsumerActivityTagsAsyn RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; var activities = new List(); - using (ActivityListener activityListener = StartActivityListener(activities)) + using ActivityListener activityListener = StartActivityListener(activities); + await Task.Delay(500); + + string queueName = $"{Guid.NewGuid()}"; + QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); + byte[] sendBody = Encoding.UTF8.GetBytes("hi"); + byte[] consumeBody = null; + var consumer = new AsyncEventingBasicConsumer(_channel); + var consumerReceivedTcs = + new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + consumer.ReceivedAsync += (o, a) => { - await Task.Delay(500); - - string queueName = $"{Guid.NewGuid()}"; - QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); - byte[] sendBody = Encoding.UTF8.GetBytes("hi"); - byte[] consumeBody = null; - var consumer = new AsyncEventingBasicConsumer(_channel); - var consumerReceivedTcs = - new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - consumer.ReceivedAsync += (o, a) => - { - consumeBody = a.Body.ToArray(); - consumerReceivedTcs.SetResult(true); - return Task.CompletedTask; - }; - - string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); - var publicationAddress = new PublicationAddress(ExchangeType.Direct, "", q.QueueName); - await _channel.BasicPublishAsync(publicationAddress, new BasicProperties(), sendBody); - await _channel.WaitForConfirmsOrDieAsync(); - - await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); - Assert.True(await consumerReceivedTcs.Task); - - await _channel.BasicCancelAsync(consumerTag); - await Task.Delay(500); - AssertActivityData(useRoutingKeyAsOperationName, queueName, activities, true); - } + consumeBody = a.Body.ToArray(); + consumerReceivedTcs.SetResult(true); + return Task.CompletedTask; + }; + + string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); + var publicationAddress = new PublicationAddress(ExchangeType.Direct, "", q.QueueName); + await _channel.BasicPublishAsync(publicationAddress, new BasicProperties(), sendBody); + await _channel.WaitForConfirmsOrDieAsync(); + + await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.True(await consumerReceivedTcs.Task); + + await _channel.BasicCancelAsync(consumerTag); + await Task.Delay(500); + AssertActivityData(useRoutingKeyAsOperationName, queueName, activities, true); } [Theory] @@ -326,30 +314,28 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera await _channel.ConfirmSelectAsync(); RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; var activities = new List(); - using (ActivityListener activityListener = StartActivityListener(activities)) + using ActivityListener activityListener = StartActivityListener(activities); + await Task.Delay(500); + string queue = $"queue-{Guid.NewGuid()}"; + const string msg = "for basic.get"; + + try { + await _channel.QueueDeclareAsync(queue, false, false, false, null); + await _channel.BasicPublishAsync("", queue, true, Encoding.UTF8.GetBytes(msg)); + await _channel.WaitForConfirmsOrDieAsync(); + QueueDeclareOk ok = await _channel.QueueDeclarePassiveAsync(queue); + Assert.Equal(1u, ok.MessageCount); + BasicGetResult res = await _channel.BasicGetAsync(queue, true); + Assert.Equal(msg, Encoding.UTF8.GetString(res.Body.ToArray())); + ok = await _channel.QueueDeclarePassiveAsync(queue); + Assert.Equal(0u, ok.MessageCount); await Task.Delay(500); - string queue = $"queue-{Guid.NewGuid()}"; - const string msg = "for basic.get"; - - try - { - await _channel.QueueDeclareAsync(queue, false, false, false, null); - await _channel.BasicPublishAsync("", queue, true, Encoding.UTF8.GetBytes(msg)); - await _channel.WaitForConfirmsOrDieAsync(); - QueueDeclareOk ok = await _channel.QueueDeclarePassiveAsync(queue); - Assert.Equal(1u, ok.MessageCount); - BasicGetResult res = await _channel.BasicGetAsync(queue, true); - Assert.Equal(msg, Encoding.UTF8.GetString(res.Body.ToArray())); - ok = await _channel.QueueDeclarePassiveAsync(queue); - Assert.Equal(0u, ok.MessageCount); - await Task.Delay(500); - AssertActivityData(useRoutingKeyAsOperationName, queue, activities, false); - } - finally - { - await _channel.QueueDeleteAsync(queue); - } + AssertActivityData(useRoutingKeyAsOperationName, queue, activities, false); + } + finally + { + await _channel.QueueDeleteAsync(queue); } } @@ -361,32 +347,30 @@ public async Task TestPublisherWithCachedStringsAndBasicGetActivityTags(bool use await _channel.ConfirmSelectAsync(); RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; var activities = new List(); - using (ActivityListener activityListener = StartActivityListener(activities)) + using ActivityListener activityListener = StartActivityListener(activities); + await Task.Delay(500); + string queue = $"queue-{Guid.NewGuid()}"; + const string msg = "for basic.get"; + + try { + CachedString exchange = new CachedString(""); + CachedString routingKey = new CachedString(queue); + await _channel.QueueDeclareAsync(queue, false, false, false, null); + await _channel.BasicPublishAsync(exchange, routingKey, true, Encoding.UTF8.GetBytes(msg)); + await _channel.WaitForConfirmsOrDieAsync(); + QueueDeclareOk ok = await _channel.QueueDeclarePassiveAsync(queue); + Assert.Equal(1u, ok.MessageCount); + BasicGetResult res = await _channel.BasicGetAsync(queue, true); + Assert.Equal(msg, Encoding.UTF8.GetString(res.Body.ToArray())); + ok = await _channel.QueueDeclarePassiveAsync(queue); + Assert.Equal(0u, ok.MessageCount); await Task.Delay(500); - string queue = $"queue-{Guid.NewGuid()}"; - const string msg = "for basic.get"; - - try - { - CachedString exchange = new CachedString(""); - CachedString routingKey = new CachedString(queue); - await _channel.QueueDeclareAsync(queue, false, false, false, null); - await _channel.BasicPublishAsync(exchange, routingKey, true, Encoding.UTF8.GetBytes(msg)); - await _channel.WaitForConfirmsOrDieAsync(); - QueueDeclareOk ok = await _channel.QueueDeclarePassiveAsync(queue); - Assert.Equal(1u, ok.MessageCount); - BasicGetResult res = await _channel.BasicGetAsync(queue, true); - Assert.Equal(msg, Encoding.UTF8.GetString(res.Body.ToArray())); - ok = await _channel.QueueDeclarePassiveAsync(queue); - Assert.Equal(0u, ok.MessageCount); - await Task.Delay(500); - AssertActivityData(useRoutingKeyAsOperationName, queue, activities, false); - } - finally - { - await _channel.QueueDeleteAsync(queue); - } + AssertActivityData(useRoutingKeyAsOperationName, queue, activities, false); + } + finally + { + await _channel.QueueDeleteAsync(queue); } } @@ -398,32 +382,30 @@ public async Task TestPublisherWithPublicationAddressAndBasicGetActivityTags(boo await _channel.ConfirmSelectAsync(); RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; var activities = new List(); - using (ActivityListener activityListener = StartActivityListener(activities)) + using ActivityListener activityListener = StartActivityListener(activities); + await Task.Delay(500); + string queue = $"queue-{Guid.NewGuid()}"; + const string msg = "for basic.get"; + + try { + var publicationAddress = new PublicationAddress(ExchangeType.Direct, "", queue); + await _channel.QueueDeclareAsync(queue, false, false, false, null); + await _channel.BasicPublishAsync(publicationAddress, new BasicProperties(), + Encoding.UTF8.GetBytes(msg)); + await _channel.WaitForConfirmsOrDieAsync(); + QueueDeclareOk ok = await _channel.QueueDeclarePassiveAsync(queue); + Assert.Equal(1u, ok.MessageCount); + BasicGetResult res = await _channel.BasicGetAsync(queue, true); + Assert.Equal(msg, Encoding.UTF8.GetString(res.Body.ToArray())); + ok = await _channel.QueueDeclarePassiveAsync(queue); + Assert.Equal(0u, ok.MessageCount); await Task.Delay(500); - string queue = $"queue-{Guid.NewGuid()}"; - const string msg = "for basic.get"; - - try - { - var publicationAddress = new PublicationAddress(ExchangeType.Direct, "", queue); - await _channel.QueueDeclareAsync(queue, false, false, false, null); - await _channel.BasicPublishAsync(publicationAddress, new BasicProperties(), - Encoding.UTF8.GetBytes(msg)); - await _channel.WaitForConfirmsOrDieAsync(); - QueueDeclareOk ok = await _channel.QueueDeclarePassiveAsync(queue); - Assert.Equal(1u, ok.MessageCount); - BasicGetResult res = await _channel.BasicGetAsync(queue, true); - Assert.Equal(msg, Encoding.UTF8.GetString(res.Body.ToArray())); - ok = await _channel.QueueDeclarePassiveAsync(queue); - Assert.Equal(0u, ok.MessageCount); - await Task.Delay(500); - AssertActivityData(useRoutingKeyAsOperationName, queue, activities, false); - } - finally - { - await _channel.QueueDeleteAsync(queue); - } + AssertActivityData(useRoutingKeyAsOperationName, queue, activities, false); + } + finally + { + await _channel.QueueDeleteAsync(queue); } } diff --git a/projects/Test/SequentialIntegration/TestConnectionBlockedChannelLeak.cs b/projects/Test/SequentialIntegration/TestConnectionBlockedChannelLeak.cs index df27c25da..83641e2ac 100644 --- a/projects/Test/SequentialIntegration/TestConnectionBlockedChannelLeak.cs +++ b/projects/Test/SequentialIntegration/TestConnectionBlockedChannelLeak.cs @@ -96,12 +96,10 @@ public async Task TestConnectionBlockedChannelLeak_GH1573() async Task ExchangeDeclareAndPublish() { - using (IChannel publishChannel = await _conn.CreateChannelAsync()) - { - await publishChannel.ExchangeDeclareAsync(exchangeName, ExchangeType.Direct, autoDelete: true); - await publishChannel.BasicPublishAsync(exchangeName, exchangeName, true, GetRandomBody()); - await publishChannel.CloseAsync(); - } + await using IChannel publishChannel = await _conn.CreateChannelAsync(); + await publishChannel.ExchangeDeclareAsync(exchangeName, ExchangeType.Direct, autoDelete: true); + await publishChannel.BasicPublishAsync(exchangeName, exchangeName, true, GetRandomBody()); + await publishChannel.CloseAsync(); } await Assert.ThrowsAnyAsync(ExchangeDeclareAndPublish); diff --git a/projects/Test/SequentialIntegration/TestConnectionRecovery.cs b/projects/Test/SequentialIntegration/TestConnectionRecovery.cs index 13ae62e76..abf8e238b 100644 --- a/projects/Test/SequentialIntegration/TestConnectionRecovery.cs +++ b/projects/Test/SequentialIntegration/TestConnectionRecovery.cs @@ -64,9 +64,9 @@ public override async Task DisposeAsync() { ConnectionFactory cf = CreateConnectionFactory(); cf.ClientProvidedName += "-TearDown"; - using (IConnection conn = await cf.CreateConnectionAsync()) + await using (IConnection conn = await cf.CreateConnectionAsync()) { - using (IChannel ch = await conn.CreateChannelAsync()) + await using (IChannel ch = await conn.CreateChannelAsync()) { await ch.QueueDeleteAsync(_queueName); await ch.CloseAsync(); diff --git a/projects/Test/SequentialIntegration/TestHeartbeats.cs b/projects/Test/SequentialIntegration/TestHeartbeats.cs index 4304f6ce8..7ef20b865 100644 --- a/projects/Test/SequentialIntegration/TestHeartbeats.cs +++ b/projects/Test/SequentialIntegration/TestHeartbeats.cs @@ -135,35 +135,33 @@ public async Task TestHundredsOfConnectionsWithRandomHeartbeatInterval() private async Task RunSingleConnectionTestAsync(ConnectionFactory cf) { - using (IConnection conn = await cf.CreateConnectionAsync(_testDisplayName)) + await using IConnection conn = await cf.CreateConnectionAsync(_testDisplayName); + await using (IChannel ch = await conn.CreateChannelAsync()) { - using (IChannel ch = await conn.CreateChannelAsync()) - { - bool wasShutdown = false; + bool wasShutdown = false; - conn.ConnectionShutdownAsync += (sender, evt) => + conn.ConnectionShutdownAsync += (sender, evt) => + { + lock (conn) { - lock (conn) + if (InitiatedByPeerOrLibrary(evt)) { - if (InitiatedByPeerOrLibrary(evt)) - { - CheckInitiator(evt); - wasShutdown = true; - } + CheckInitiator(evt); + wasShutdown = true; } - return Task.CompletedTask; - }; + } + return Task.CompletedTask; + }; - await SleepFor(30); + await SleepFor(30); - Assert.False(wasShutdown, "shutdown event should not have been fired"); - Assert.True(conn.IsOpen, "connection should be open"); + Assert.False(wasShutdown, "shutdown event should not have been fired"); + Assert.True(conn.IsOpen, "connection should be open"); - await ch.CloseAsync(); - } - - await conn.CloseAsync(); + await ch.CloseAsync(); } + + await conn.CloseAsync(); } private bool LongRunningTestsEnabled() diff --git a/projects/Test/SequentialIntegration/TestOpenTelemetry.cs b/projects/Test/SequentialIntegration/TestOpenTelemetry.cs index c4a185601..7a99a00e6 100644 --- a/projects/Test/SequentialIntegration/TestOpenTelemetry.cs +++ b/projects/Test/SequentialIntegration/TestOpenTelemetry.cs @@ -88,55 +88,53 @@ void AssertIntTagGreaterThanZero(Activity activity, string name) public async Task TestPublisherAndConsumerActivityTags(bool useRoutingKeyAsOperationName) { var exportedItems = new List(); - using (var tracer = Sdk.CreateTracerProviderBuilder() - .AddRabbitMQInstrumentation() - .AddInMemoryExporter(exportedItems) - .Build()) + using var tracer = Sdk.CreateTracerProviderBuilder() + .AddRabbitMQInstrumentation() + .AddInMemoryExporter(exportedItems) + .Build(); + string baggageGuid = Guid.NewGuid().ToString(); + Baggage.SetBaggage("TestItem", baggageGuid); + Assert.Equal(baggageGuid, Baggage.GetBaggage("TestItem")); + await _channel.ConfirmSelectAsync(); + + RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; + await Task.Delay(500); + string queueName = $"{Guid.NewGuid()}"; + QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); + byte[] sendBody = Encoding.UTF8.GetBytes("hi"); + byte[] consumeBody = null; + var consumer = new AsyncEventingBasicConsumer(_channel); + var consumerReceivedTcs = + new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + consumer.ReceivedAsync += (o, a) => { - string baggageGuid = Guid.NewGuid().ToString(); - Baggage.SetBaggage("TestItem", baggageGuid); - Assert.Equal(baggageGuid, Baggage.GetBaggage("TestItem")); - await _channel.ConfirmSelectAsync(); - - RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; - await Task.Delay(500); - string queueName = $"{Guid.NewGuid()}"; - QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); - byte[] sendBody = Encoding.UTF8.GetBytes("hi"); - byte[] consumeBody = null; - var consumer = new AsyncEventingBasicConsumer(_channel); - var consumerReceivedTcs = - new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - consumer.ReceivedAsync += (o, a) => + consumeBody = a.Body.ToArray(); + string baggageItem = Baggage.GetBaggage("TestItem"); + if (baggageItem == baggageGuid) { - consumeBody = a.Body.ToArray(); - string baggageItem = Baggage.GetBaggage("TestItem"); - if (baggageItem == baggageGuid) - { - consumerReceivedTcs.SetResult(true); - } - else - { - consumerReceivedTcs.SetException( - EqualException.ForMismatchedStrings(baggageGuid, baggageItem, 0, 0)); - } - - return Task.CompletedTask; - }; - - string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); - await _channel.BasicPublishAsync("", q.QueueName, true, sendBody); - await _channel.WaitForConfirmsOrDieAsync(); - Baggage.ClearBaggage(); - Assert.Null(Baggage.GetBaggage("TestItem")); + consumerReceivedTcs.SetResult(true); + } + else + { + consumerReceivedTcs.SetException( + EqualException.ForMismatchedStrings(baggageGuid, baggageItem, 0, 0)); + } - await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); - Assert.True(await consumerReceivedTcs.Task); + return Task.CompletedTask; + }; - await _channel.BasicCancelAsync(consumerTag); - await Task.Delay(500); - AssertActivityData(useRoutingKeyAsOperationName, queueName, exportedItems, true); - } + string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); + await _channel.BasicPublishAsync("", q.QueueName, true, sendBody); + await _channel.WaitForConfirmsOrDieAsync(); + Baggage.ClearBaggage(); + Assert.Null(Baggage.GetBaggage("TestItem")); + + await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.True(await consumerReceivedTcs.Task); + + await _channel.BasicCancelAsync(consumerTag); + await Task.Delay(500); + AssertActivityData(useRoutingKeyAsOperationName, queueName, exportedItems, true); } [Theory] @@ -145,56 +143,54 @@ public async Task TestPublisherAndConsumerActivityTags(bool useRoutingKeyAsOpera public async Task TestPublisherAndConsumerActivityTagsAsync(bool useRoutingKeyAsOperationName) { var exportedItems = new List(); - using (var tracer = Sdk.CreateTracerProviderBuilder() - .AddRabbitMQInstrumentation() - .AddInMemoryExporter(exportedItems) - .Build()) + using var tracer = Sdk.CreateTracerProviderBuilder() + .AddRabbitMQInstrumentation() + .AddInMemoryExporter(exportedItems) + .Build(); + string baggageGuid = Guid.NewGuid().ToString(); + Baggage.SetBaggage("TestItem", baggageGuid); + Assert.Equal(baggageGuid, Baggage.GetBaggage("TestItem")); + await _channel.ConfirmSelectAsync(); + + RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; + await Task.Delay(500); + + string queueName = $"{Guid.NewGuid()}"; + QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); + byte[] sendBody = Encoding.UTF8.GetBytes("hi"); + byte[] consumeBody = null; + var consumer = new AsyncEventingBasicConsumer(_channel); + var consumerReceivedTcs = + new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + consumer.ReceivedAsync += (o, a) => { - string baggageGuid = Guid.NewGuid().ToString(); - Baggage.SetBaggage("TestItem", baggageGuid); - Assert.Equal(baggageGuid, Baggage.GetBaggage("TestItem")); - await _channel.ConfirmSelectAsync(); + consumeBody = a.Body.ToArray(); + string baggageItem = Baggage.GetBaggage("TestItem"); + if (baggageItem == baggageGuid) + { + consumerReceivedTcs.SetResult(true); + } + else + { + consumerReceivedTcs.SetException( + EqualException.ForMismatchedStrings(baggageGuid, baggageItem, 0, 0)); + } - RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; - await Task.Delay(500); + return Task.CompletedTask; + }; - string queueName = $"{Guid.NewGuid()}"; - QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); - byte[] sendBody = Encoding.UTF8.GetBytes("hi"); - byte[] consumeBody = null; - var consumer = new AsyncEventingBasicConsumer(_channel); - var consumerReceivedTcs = - new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - consumer.ReceivedAsync += (o, a) => - { - consumeBody = a.Body.ToArray(); - string baggageItem = Baggage.GetBaggage("TestItem"); - if (baggageItem == baggageGuid) - { - consumerReceivedTcs.SetResult(true); - } - else - { - consumerReceivedTcs.SetException( - EqualException.ForMismatchedStrings(baggageGuid, baggageItem, 0, 0)); - } - - return Task.CompletedTask; - }; - - string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); - await _channel.BasicPublishAsync("", q.QueueName, true, sendBody); - await _channel.WaitForConfirmsOrDieAsync(); - Baggage.ClearBaggage(); - Assert.Null(Baggage.GetBaggage("TestItem")); + string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); + await _channel.BasicPublishAsync("", q.QueueName, true, sendBody); + await _channel.WaitForConfirmsOrDieAsync(); + Baggage.ClearBaggage(); + Assert.Null(Baggage.GetBaggage("TestItem")); - await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); - Assert.True(await consumerReceivedTcs.Task); + await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.True(await consumerReceivedTcs.Task); - await _channel.BasicCancelAsync(consumerTag); - await Task.Delay(500); - AssertActivityData(useRoutingKeyAsOperationName, queueName, exportedItems, true); - } + await _channel.BasicCancelAsync(consumerTag); + await Task.Delay(500); + AssertActivityData(useRoutingKeyAsOperationName, queueName, exportedItems, true); } [Theory] @@ -203,57 +199,55 @@ public async Task TestPublisherAndConsumerActivityTagsAsync(bool useRoutingKeyAs public async Task TestPublisherWithPublicationAddressAndConsumerActivityTagsAsync(bool useRoutingKeyAsOperationName) { var exportedItems = new List(); - using (var tracer = Sdk.CreateTracerProviderBuilder() - .AddRabbitMQInstrumentation() - .AddInMemoryExporter(exportedItems) - .Build()) + using var tracer = Sdk.CreateTracerProviderBuilder() + .AddRabbitMQInstrumentation() + .AddInMemoryExporter(exportedItems) + .Build(); + string baggageGuid = Guid.NewGuid().ToString(); + Baggage.SetBaggage("TestItem", baggageGuid); + Assert.Equal(baggageGuid, Baggage.GetBaggage("TestItem")); + await _channel.ConfirmSelectAsync(); + + RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; + await Task.Delay(500); + + string queueName = $"{Guid.NewGuid()}"; + QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); + byte[] sendBody = Encoding.UTF8.GetBytes("hi"); + byte[] consumeBody = null; + var consumer = new AsyncEventingBasicConsumer(_channel); + var consumerReceivedTcs = + new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + consumer.ReceivedAsync += (o, a) => { - string baggageGuid = Guid.NewGuid().ToString(); - Baggage.SetBaggage("TestItem", baggageGuid); - Assert.Equal(baggageGuid, Baggage.GetBaggage("TestItem")); - await _channel.ConfirmSelectAsync(); + consumeBody = a.Body.ToArray(); + string baggageItem = Baggage.GetBaggage("TestItem"); + if (baggageItem == baggageGuid) + { + consumerReceivedTcs.SetResult(true); + } + else + { + consumerReceivedTcs.SetException( + EqualException.ForMismatchedStrings(baggageGuid, baggageItem, 0, 0)); + } - RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; - await Task.Delay(500); + return Task.CompletedTask; + }; - string queueName = $"{Guid.NewGuid()}"; - QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); - byte[] sendBody = Encoding.UTF8.GetBytes("hi"); - byte[] consumeBody = null; - var consumer = new AsyncEventingBasicConsumer(_channel); - var consumerReceivedTcs = - new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - consumer.ReceivedAsync += (o, a) => - { - consumeBody = a.Body.ToArray(); - string baggageItem = Baggage.GetBaggage("TestItem"); - if (baggageItem == baggageGuid) - { - consumerReceivedTcs.SetResult(true); - } - else - { - consumerReceivedTcs.SetException( - EqualException.ForMismatchedStrings(baggageGuid, baggageItem, 0, 0)); - } - - return Task.CompletedTask; - }; - - string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); - var publicationAddress = new PublicationAddress(ExchangeType.Direct, "", queueName); - await _channel.BasicPublishAsync(publicationAddress, new BasicProperties(), sendBody); - await _channel.WaitForConfirmsOrDieAsync(); - Baggage.ClearBaggage(); - Assert.Null(Baggage.GetBaggage("TestItem")); + string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); + var publicationAddress = new PublicationAddress(ExchangeType.Direct, "", queueName); + await _channel.BasicPublishAsync(publicationAddress, new BasicProperties(), sendBody); + await _channel.WaitForConfirmsOrDieAsync(); + Baggage.ClearBaggage(); + Assert.Null(Baggage.GetBaggage("TestItem")); - await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); - Assert.True(await consumerReceivedTcs.Task); + await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.True(await consumerReceivedTcs.Task); - await _channel.BasicCancelAsync(consumerTag); - await Task.Delay(500); - AssertActivityData(useRoutingKeyAsOperationName, queueName, exportedItems, true); - } + await _channel.BasicCancelAsync(consumerTag); + await Task.Delay(500); + AssertActivityData(useRoutingKeyAsOperationName, queueName, exportedItems, true); } [Theory] @@ -262,58 +256,56 @@ public async Task TestPublisherWithPublicationAddressAndConsumerActivityTagsAsyn public async Task TestPublisherWithCachedStringsAndConsumerActivityTagsAsync(bool useRoutingKeyAsOperationName) { var exportedItems = new List(); - using (var tracer = Sdk.CreateTracerProviderBuilder() - .AddRabbitMQInstrumentation() - .AddInMemoryExporter(exportedItems) - .Build()) + using var tracer = Sdk.CreateTracerProviderBuilder() + .AddRabbitMQInstrumentation() + .AddInMemoryExporter(exportedItems) + .Build(); + string baggageGuid = Guid.NewGuid().ToString(); + Baggage.SetBaggage("TestItem", baggageGuid); + Assert.Equal(baggageGuid, Baggage.GetBaggage("TestItem")); + await _channel.ConfirmSelectAsync(); + + RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; + await Task.Delay(500); + + string queueName = $"{Guid.NewGuid()}"; + QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); + byte[] sendBody = Encoding.UTF8.GetBytes("hi"); + byte[] consumeBody = null; + var consumer = new AsyncEventingBasicConsumer(_channel); + var consumerReceivedTcs = + new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + consumer.ReceivedAsync += (o, a) => { - string baggageGuid = Guid.NewGuid().ToString(); - Baggage.SetBaggage("TestItem", baggageGuid); - Assert.Equal(baggageGuid, Baggage.GetBaggage("TestItem")); - await _channel.ConfirmSelectAsync(); + consumeBody = a.Body.ToArray(); + string baggageItem = Baggage.GetBaggage("TestItem"); + if (baggageItem == baggageGuid) + { + consumerReceivedTcs.SetResult(true); + } + else + { + consumerReceivedTcs.SetException( + EqualException.ForMismatchedStrings(baggageGuid, baggageItem, 0, 0)); + } - RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; - await Task.Delay(500); + return Task.CompletedTask; + }; - string queueName = $"{Guid.NewGuid()}"; - QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); - byte[] sendBody = Encoding.UTF8.GetBytes("hi"); - byte[] consumeBody = null; - var consumer = new AsyncEventingBasicConsumer(_channel); - var consumerReceivedTcs = - new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - consumer.ReceivedAsync += (o, a) => - { - consumeBody = a.Body.ToArray(); - string baggageItem = Baggage.GetBaggage("TestItem"); - if (baggageItem == baggageGuid) - { - consumerReceivedTcs.SetResult(true); - } - else - { - consumerReceivedTcs.SetException( - EqualException.ForMismatchedStrings(baggageGuid, baggageItem, 0, 0)); - } - - return Task.CompletedTask; - }; - - string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); - CachedString exchange = new CachedString(""); - CachedString routingKey = new CachedString(queueName); - await _channel.BasicPublishAsync(exchange, routingKey, sendBody); - await _channel.WaitForConfirmsOrDieAsync(); - Baggage.ClearBaggage(); - Assert.Null(Baggage.GetBaggage("TestItem")); + string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); + CachedString exchange = new CachedString(""); + CachedString routingKey = new CachedString(queueName); + await _channel.BasicPublishAsync(exchange, routingKey, sendBody); + await _channel.WaitForConfirmsOrDieAsync(); + Baggage.ClearBaggage(); + Assert.Null(Baggage.GetBaggage("TestItem")); - await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); - Assert.True(await consumerReceivedTcs.Task); + await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.True(await consumerReceivedTcs.Task); - await _channel.BasicCancelAsync(consumerTag); - await Task.Delay(500); - AssertActivityData(useRoutingKeyAsOperationName, queueName, exportedItems, true); - } + await _channel.BasicCancelAsync(consumerTag); + await Task.Delay(500); + AssertActivityData(useRoutingKeyAsOperationName, queueName, exportedItems, true); } [Theory] @@ -322,40 +314,38 @@ public async Task TestPublisherWithCachedStringsAndConsumerActivityTagsAsync(boo public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOperationName) { var exportedItems = new List(); - using (var tracer = Sdk.CreateTracerProviderBuilder() - .AddRabbitMQInstrumentation() - .AddInMemoryExporter(exportedItems) - .Build()) + using var tracer = Sdk.CreateTracerProviderBuilder() + .AddRabbitMQInstrumentation() + .AddInMemoryExporter(exportedItems) + .Build(); + string baggageGuid = Guid.NewGuid().ToString(); + Baggage.SetBaggage("TestItem", baggageGuid); + Assert.Equal(baggageGuid, Baggage.GetBaggage("TestItem")); + await _channel.ConfirmSelectAsync(); + RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; + await Task.Delay(500); + string queue = $"queue-{Guid.NewGuid()}"; + const string msg = "for basic.get"; + + try { - string baggageGuid = Guid.NewGuid().ToString(); - Baggage.SetBaggage("TestItem", baggageGuid); - Assert.Equal(baggageGuid, Baggage.GetBaggage("TestItem")); - await _channel.ConfirmSelectAsync(); - RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; + await _channel.QueueDeclareAsync(queue, false, false, false, null); + await _channel.BasicPublishAsync("", queue, true, Encoding.UTF8.GetBytes(msg)); + await _channel.WaitForConfirmsOrDieAsync(); + Baggage.ClearBaggage(); + Assert.Null(Baggage.GetBaggage("TestItem")); + QueueDeclareOk ok = await _channel.QueueDeclarePassiveAsync(queue); + Assert.Equal(1u, ok.MessageCount); + BasicGetResult res = await _channel.BasicGetAsync(queue, true); + Assert.Equal(msg, Encoding.UTF8.GetString(res.Body.ToArray())); + ok = await _channel.QueueDeclarePassiveAsync(queue); + Assert.Equal(0u, ok.MessageCount); await Task.Delay(500); - string queue = $"queue-{Guid.NewGuid()}"; - const string msg = "for basic.get"; - - try - { - await _channel.QueueDeclareAsync(queue, false, false, false, null); - await _channel.BasicPublishAsync("", queue, true, Encoding.UTF8.GetBytes(msg)); - await _channel.WaitForConfirmsOrDieAsync(); - Baggage.ClearBaggage(); - Assert.Null(Baggage.GetBaggage("TestItem")); - QueueDeclareOk ok = await _channel.QueueDeclarePassiveAsync(queue); - Assert.Equal(1u, ok.MessageCount); - BasicGetResult res = await _channel.BasicGetAsync(queue, true); - Assert.Equal(msg, Encoding.UTF8.GetString(res.Body.ToArray())); - ok = await _channel.QueueDeclarePassiveAsync(queue); - Assert.Equal(0u, ok.MessageCount); - await Task.Delay(500); - AssertActivityData(useRoutingKeyAsOperationName, queue, exportedItems, false); - } - finally - { - await _channel.QueueDeleteAsync(queue); - } + AssertActivityData(useRoutingKeyAsOperationName, queue, exportedItems, false); + } + finally + { + await _channel.QueueDeleteAsync(queue); } } From 192cdcd3edea11ad3d24c8867def3b332a60f282 Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Thu, 19 Sep 2024 16:35:38 +0200 Subject: [PATCH 7/9] Remove unnecessary change --- projects/RabbitMQ.Client/Impl/ChannelBase.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/projects/RabbitMQ.Client/Impl/ChannelBase.cs b/projects/RabbitMQ.Client/Impl/ChannelBase.cs index 0a666875d..4cd9118d3 100644 --- a/projects/RabbitMQ.Client/Impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/Impl/ChannelBase.cs @@ -563,7 +563,7 @@ protected virtual void Dispose(bool disposing) } ConsumerDispatcher.Dispose(); - _rpcSemaphore?.Dispose(); + _rpcSemaphore.Dispose(); _confirmSemaphore?.Dispose(); } } From 0c627c5bc839aa7c2884d4bd5cda9ea26e3dcf37 Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Thu, 19 Sep 2024 16:44:41 +0200 Subject: [PATCH 8/9] Align the langversion --- projects/RabbitMQ.Client.OAuth2/RabbitMQ.Client.OAuth2.csproj | 2 +- .../RabbitMQ.Client.OpenTelemetry.csproj | 2 +- projects/Test/Common/Common.csproj | 2 +- projects/Test/Integration/Integration.csproj | 2 +- projects/Test/OAuth2/OAuth2.csproj | 2 +- .../Test/SequentialIntegration/SequentialIntegration.csproj | 2 +- projects/Test/Unit/Unit.csproj | 2 +- 7 files changed, 7 insertions(+), 7 deletions(-) diff --git a/projects/RabbitMQ.Client.OAuth2/RabbitMQ.Client.OAuth2.csproj b/projects/RabbitMQ.Client.OAuth2/RabbitMQ.Client.OAuth2.csproj index 33a6a67ff..9504ead77 100644 --- a/projects/RabbitMQ.Client.OAuth2/RabbitMQ.Client.OAuth2.csproj +++ b/projects/RabbitMQ.Client.OAuth2/RabbitMQ.Client.OAuth2.csproj @@ -31,7 +31,7 @@ https://learn.microsoft.com/en-us/answers/questions/1371494/for-net-standard-2-0-library-why-add-net-core-3-1 https://devblogs.microsoft.com/dotnet/embracing-nullable-reference-types/#what-should-library-authors-do --> - 8.0 + 9.0 enable diff --git a/projects/RabbitMQ.Client.OpenTelemetry/RabbitMQ.Client.OpenTelemetry.csproj b/projects/RabbitMQ.Client.OpenTelemetry/RabbitMQ.Client.OpenTelemetry.csproj index 8ae21f3aa..06fa5696f 100644 --- a/projects/RabbitMQ.Client.OpenTelemetry/RabbitMQ.Client.OpenTelemetry.csproj +++ b/projects/RabbitMQ.Client.OpenTelemetry/RabbitMQ.Client.OpenTelemetry.csproj @@ -27,7 +27,7 @@ true ../../packages README.md - 7.3 + 9.0 diff --git a/projects/Test/Common/Common.csproj b/projects/Test/Common/Common.csproj index eaf42e045..3c283cef8 100644 --- a/projects/Test/Common/Common.csproj +++ b/projects/Test/Common/Common.csproj @@ -16,7 +16,7 @@ ../../rabbit.snk true false - 7.3 + 9.0 diff --git a/projects/Test/Integration/Integration.csproj b/projects/Test/Integration/Integration.csproj index 40dc8d586..64ff8776b 100644 --- a/projects/Test/Integration/Integration.csproj +++ b/projects/Test/Integration/Integration.csproj @@ -16,7 +16,7 @@ ../../rabbit.snk true true - 8.0 + 9.0 diff --git a/projects/Test/OAuth2/OAuth2.csproj b/projects/Test/OAuth2/OAuth2.csproj index 45699f538..11e1123a7 100644 --- a/projects/Test/OAuth2/OAuth2.csproj +++ b/projects/Test/OAuth2/OAuth2.csproj @@ -16,7 +16,7 @@ ../../rabbit.snk true true - 8.0 + 9.0 enable diff --git a/projects/Test/SequentialIntegration/SequentialIntegration.csproj b/projects/Test/SequentialIntegration/SequentialIntegration.csproj index 0926eae73..9664afe2b 100644 --- a/projects/Test/SequentialIntegration/SequentialIntegration.csproj +++ b/projects/Test/SequentialIntegration/SequentialIntegration.csproj @@ -16,7 +16,7 @@ ../../rabbit.snk true true - 8.0 + 9.0 diff --git a/projects/Test/Unit/Unit.csproj b/projects/Test/Unit/Unit.csproj index 0e9cbc45e..0a0286a4b 100644 --- a/projects/Test/Unit/Unit.csproj +++ b/projects/Test/Unit/Unit.csproj @@ -16,7 +16,7 @@ ../../rabbit.snk true true - 7.3 + 9.0 From 68cc34e6d584d0f5cb64a3646814e6a79cd3a539 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 19 Sep 2024 14:31:31 -0700 Subject: [PATCH 9/9] Use the same `await using` syntax. --- .../Impl/AutorecoveringConnection.Recovery.cs | 30 +++++++++++-------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.Recovery.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.Recovery.cs index 35ff0c350..7eaed0479 100644 --- a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.Recovery.cs +++ b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.Recovery.cs @@ -299,12 +299,14 @@ private async ValueTask RecoverExchangesAsync(IConnection connection, { try { - var channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false); - await using var _ = channel.ConfigureAwait(false); - await recordedExchange.RecoverAsync(channel, cancellationToken) - .ConfigureAwait(false); - await channel.CloseAsync(cancellationToken) - .ConfigureAwait(false); + IChannel channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + await using (channel.ConfigureAwait(false)) + { + await recordedExchange.RecoverAsync(channel, cancellationToken) + .ConfigureAwait(false); + await channel.CloseAsync(cancellationToken) + .ConfigureAwait(false); + } } catch (Exception ex) { @@ -350,7 +352,7 @@ private async Task RecoverQueuesAsync(IConnection connection, try { string newName = string.Empty; - var channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + IChannel channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false); await using (channel.ConfigureAwait(false)) { newName = await recordedQueue.RecoverAsync(channel, cancellationToken) @@ -463,12 +465,14 @@ private async ValueTask RecoverBindingsAsync(IConnection connection, { try { - var channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false); - await using var _ = channel.ConfigureAwait(false); - await binding.RecoverAsync(channel, cancellationToken) - .ConfigureAwait(false); - await channel.CloseAsync(cancellationToken) - .ConfigureAwait(false); + IChannel channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + await using (channel.ConfigureAwait(false)) + { + await binding.RecoverAsync(channel, cancellationToken) + .ConfigureAwait(false); + await channel.CloseAsync(cancellationToken) + .ConfigureAwait(false); + } } catch (Exception ex) {