Skip to content

Commit

Permalink
Merge pull request #1684 from danielmarbach/disposable
Browse files Browse the repository at this point in the history
AsyncDisposable
  • Loading branch information
lukebakken authored Sep 19, 2024
2 parents 13fa6b2 + 68cc34e commit be1d1e3
Show file tree
Hide file tree
Showing 40 changed files with 1,383 additions and 1,513 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
https://learn.microsoft.com/en-us/answers/questions/1371494/for-net-standard-2-0-library-why-add-net-core-3-1
https://devblogs.microsoft.com/dotnet/embracing-nullable-reference-types/#what-should-library-authors-do
-->
<LangVersion>8.0</LangVersion>
<LangVersion>9.0</LangVersion>
<Nullable>enable</Nullable>
</PropertyGroup>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<PackageOutputPath>../../packages</PackageOutputPath>
<PackageReadmeFile>README.md</PackageReadmeFile>
<LangVersion>7.3</LangVersion>
<LangVersion>9.0</LangVersion>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)' == 'Release' And '$(CI)' == 'true'">
Expand Down
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/IChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ namespace RabbitMQ.Client
/// functionality offered by versions 0-8, 0-8qpid, 0-9 and 0-9-1 of AMQP.
/// </summary>
/// <remarks>
/// Extends the <see cref="IDisposable"/> interface, so that the "using"
/// Extends the <see cref="IDisposable"/> interface and the <see cref="IAsyncDisposable"/> interface, so that the "using"
/// statement can be used to scope the lifetime of a channel when appropriate.
/// </remarks>
public interface IChannel : IDisposable
public interface IChannel : IAsyncDisposable, IDisposable
{
/// <summary>
/// Channel number, unique per connections.
Expand Down
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/IConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ namespace RabbitMQ.Client
/// Alternatively, an API tutorial can be found in the User Guide.
/// </para>
/// <para>
/// Extends the <see cref="IDisposable"/> interface, so that the "using"
/// Extends the <see cref="IDisposable"/> and the <see cref="IAsyncDisposable"/> interface, so that the "using"
/// statement can be used to scope the lifetime of a connection when
/// appropriate.
/// </para>
/// </remarks>
public interface IConnection : INetworkConnection, IDisposable
public interface IConnection : INetworkConnection, IAsyncDisposable, IDisposable
{
/// <summary>
/// The maximum channel number this connection supports (0 if unlimited).
Expand Down
7 changes: 5 additions & 2 deletions projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,9 @@ await _connection.DeleteRecordedChannelAsync(this,
public override string ToString()
=> InnerChannel.ToString();

public void Dispose()
public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult();

public async ValueTask DisposeAsync()
{
if (_disposed)
{
Expand All @@ -266,7 +268,8 @@ public void Dispose()

if (IsOpen)
{
this.AbortAsync().GetAwaiter().GetResult();
await this.AbortAsync()
.ConfigureAwait(false);
}

_recordedConsumerTags.Clear();
Expand Down
21 changes: 12 additions & 9 deletions projects/RabbitMQ.Client/Impl/AutorecoveringConnection.Recovery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -299,11 +299,12 @@ private async ValueTask RecoverExchangesAsync(IConnection connection,
{
try
{
using (IChannel ch = await connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false))
IChannel channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
await using (channel.ConfigureAwait(false))
{
await recordedExchange.RecoverAsync(ch, cancellationToken)
await recordedExchange.RecoverAsync(channel, cancellationToken)
.ConfigureAwait(false);
await ch.CloseAsync(cancellationToken)
await channel.CloseAsync(cancellationToken)
.ConfigureAwait(false);
}
}
Expand Down Expand Up @@ -351,11 +352,12 @@ private async Task RecoverQueuesAsync(IConnection connection,
try
{
string newName = string.Empty;
using (IChannel ch = await connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false))
IChannel channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
await using (channel.ConfigureAwait(false))
{
newName = await recordedQueue.RecoverAsync(ch, cancellationToken)
newName = await recordedQueue.RecoverAsync(channel, cancellationToken)
.ConfigureAwait(false);
await ch.CloseAsync(cancellationToken)
await channel.CloseAsync(cancellationToken)
.ConfigureAwait(false);
}
string oldName = recordedQueue.Name;
Expand Down Expand Up @@ -463,11 +465,12 @@ private async ValueTask RecoverBindingsAsync(IConnection connection,
{
try
{
using (IChannel ch = await connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false))
IChannel channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
await using (channel.ConfigureAwait(false))
{
await binding.RecoverAsync(ch, cancellationToken)
await binding.RecoverAsync(channel, cancellationToken)
.ConfigureAwait(false);
await ch.CloseAsync(cancellationToken)
await channel.CloseAsync(cancellationToken)
.ConfigureAwait(false);
}
}
Expand Down
7 changes: 5 additions & 2 deletions projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,9 @@ await RecordChannelAsync(channel, channelsSemaphoreHeld: false, cancellationToke
return channel;
}

public void Dispose()
public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult();

public async ValueTask DisposeAsync()
{
if (_disposed)
{
Expand All @@ -273,7 +275,8 @@ public void Dispose()

try
{
_innerConnection.Dispose();
await _innerConnection.DisposeAsync()
.ConfigureAwait(false);
}
catch (OperationInterruptedException)
{
Expand Down
20 changes: 20 additions & 0 deletions projects/RabbitMQ.Client/Impl/ChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,26 @@ protected virtual void Dispose(bool disposing)
}
}

public async ValueTask DisposeAsync()
{
await DisposeAsyncCore()
.ConfigureAwait(false);

Dispose(false);
}

protected virtual async ValueTask DisposeAsyncCore()
{
if (IsOpen)
{
await this.AbortAsync().ConfigureAwait(false);
}

ConsumerDispatcher.Dispose();
_rpcSemaphore.Dispose();
_confirmSemaphore?.Dispose();
}

public Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heartbeat, CancellationToken cancellationToken)
{
var method = new ConnectionTuneOk(channelMax, frameMax, heartbeat);
Expand Down
7 changes: 5 additions & 2 deletions projects/RabbitMQ.Client/Impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,9 @@ internal ValueTask WriteAsync(RentedMemory frames, CancellationToken cancellatio
return _frameHandler.WriteAsync(frames, cancellationToken);
}

public void Dispose()
public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult();

public async ValueTask DisposeAsync()
{
if (_disposed)
{
Expand All @@ -496,7 +498,8 @@ public void Dispose()
{
if (IsOpen)
{
this.AbortAsync().GetAwaiter().GetResult();
await this.AbortAsync()
.ConfigureAwait(false);
}

_session0.Dispose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

<PropertyGroup>
<OutputType>Exe</OutputType>
<LangVersion>9.0</LangVersion>
</PropertyGroup>

<ItemGroup>
Expand Down
3 changes: 1 addition & 2 deletions projects/Test/Applications/CreateChannel/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public static async Task Main()
doneEvent = new AutoResetEvent(false);

var connectionFactory = new ConnectionFactory { };
IConnection connection = await connectionFactory.CreateConnectionAsync();
await using IConnection connection = await connectionFactory.CreateConnectionAsync();

var watch = Stopwatch.StartNew();
_ = Task.Run(async () =>
Expand Down Expand Up @@ -55,7 +55,6 @@ public static async Task Main()
Console.WriteLine();
Console.WriteLine($"Took {watch.Elapsed.TotalMilliseconds} ms");

connection.Dispose();
Console.ReadLine();
}
}
Expand Down
4 changes: 2 additions & 2 deletions projects/Test/Applications/GH-1647/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@

var props = new BasicProperties();
byte[] msg = Encoding.UTF8.GetBytes("test");
using var connection = await connectionFactory.CreateConnectionAsync();
await using var connection = await connectionFactory.CreateConnectionAsync();
for (int i = 0; i < 300; i++)
{
try
{
using var channel = await connection.CreateChannelAsync(); // New channel for each message
await using var channel = await connection.CreateChannelAsync(); // New channel for each message
await Task.Delay(1000);
await channel.BasicPublishAsync(exchange: string.Empty, routingKey: string.Empty,
mandatory: false, basicProperties: props, body: msg);
Expand Down
4 changes: 2 additions & 2 deletions projects/Test/Applications/MassPublish/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,10 @@ static Program()

static async Task Main()
{
using IConnection consumeConnection = await s_consumeConnectionFactory.CreateConnectionAsync();
await using IConnection consumeConnection = await s_consumeConnectionFactory.CreateConnectionAsync();
consumeConnection.ConnectionShutdownAsync += ConnectionShutdownAsync;

using IChannel consumeChannel = await consumeConnection.CreateChannelAsync();
await using IChannel consumeChannel = await consumeConnection.CreateChannelAsync();
consumeChannel.ChannelShutdownAsync += Channel_ChannelShutdownAsync;
await consumeChannel.BasicQosAsync(prefetchSize: 0, prefetchCount: 128, global: false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ static async Task PublishMessagesIndividuallyAsync()
{
Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages individually and handling confirms all at once");

using IConnection connection = await CreateConnectionAsync();
using IChannel channel = await connection.CreateChannelAsync();
await using IConnection connection = await CreateConnectionAsync();
await using IChannel channel = await connection.CreateChannelAsync();

// declare a server-named queue
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
Expand All @@ -51,8 +51,8 @@ static async Task PublishMessagesInBatchAsync()
{
Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages and handling confirms in batches");

using IConnection connection = await CreateConnectionAsync();
using IChannel channel = await connection.CreateChannelAsync();
await using IConnection connection = await CreateConnectionAsync();
await using IChannel channel = await connection.CreateChannelAsync();

// declare a server-named queue
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
Expand Down Expand Up @@ -97,8 +97,8 @@ async Task HandlePublishConfirmsAsynchronously()
{
Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages and handling confirms asynchronously");

using IConnection connection = await CreateConnectionAsync();
using IChannel channel = await connection.CreateChannelAsync();
await using IConnection connection = await CreateConnectionAsync();
await using IChannel channel = await connection.CreateChannelAsync();

// declare a server-named queue
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
Expand Down
2 changes: 1 addition & 1 deletion projects/Test/Common/Common.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<AssemblyOriginatorKeyFile>../../rabbit.snk</AssemblyOriginatorKeyFile>
<SignAssembly>true</SignAssembly>
<IsTestProject>false</IsTestProject>
<LangVersion>7.3</LangVersion>
<LangVersion>9.0</LangVersion>
</PropertyGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ public override async Task DisposeAsync()
{
ConnectionFactory cf = CreateConnectionFactory();
cf.ClientProvidedName += "-TearDown";
using (IConnection conn = await cf.CreateConnectionAsync())
await using (IConnection conn = await cf.CreateConnectionAsync())
{
using (IChannel ch = await conn.CreateChannelAsync())
await using (IChannel ch = await conn.CreateChannelAsync())
{
await ch.QueueDeleteAsync(_queueName);
await ch.CloseAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ await _channel.ExchangeDeclareAsync(exchange: exchangeName,
consumer.ReceivedAsync += MessageReceived;
await _channel.BasicConsumeAsync(queueName, true, consumer);

using (IChannel pubCh = await _conn.CreateChannelAsync())
await using (IChannel pubCh = await _conn.CreateChannelAsync())
{
await pubCh.BasicPublishAsync(exchange: exchangeName, routingKey: routingKey, body: body);
await pubCh.CloseAsync();
Expand All @@ -106,7 +106,7 @@ await _channel.ExchangeDeclareAsync(exchange: exchangeName,

await CloseAndWaitForRecoveryAsync();

using (IChannel pubCh = await _conn.CreateChannelAsync())
await using (IChannel pubCh = await _conn.CreateChannelAsync())
{
await pubCh.BasicPublishAsync(exchange: exchangeName, routingKey: "unused", body: body);
await pubCh.CloseAsync();
Expand Down
2 changes: 1 addition & 1 deletion projects/Test/Integration/Integration.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<AssemblyOriginatorKeyFile>../../rabbit.snk</AssemblyOriginatorKeyFile>
<SignAssembly>true</SignAssembly>
<IsTestProject>true</IsTestProject>
<LangVersion>8.0</LangVersion>
<LangVersion>9.0</LangVersion>
</PropertyGroup>

<ItemGroup>
Expand Down
Loading

0 comments on commit be1d1e3

Please sign in to comment.