diff --git a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt index 91b90ae61..3794aecb0 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt @@ -808,9 +808,6 @@ static RabbitMQ.Client.EndpointResolverExtensions.SelectOneAsync(this RabbitM static RabbitMQ.Client.Events.CallbackExceptionEventArgs.Build(System.Exception e, string context) -> RabbitMQ.Client.Events.CallbackExceptionEventArgs static RabbitMQ.Client.Events.CallbackExceptionEventArgs.Build(System.Exception e, string context, object consumer) -> RabbitMQ.Client.Events.CallbackExceptionEventArgs static RabbitMQ.Client.ExchangeType.All() -> System.Collections.Generic.ICollection -static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel channel, RabbitMQ.Client.CachedString exchange, RabbitMQ.Client.CachedString routingKey, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false) -> System.Threading.Tasks.ValueTask -static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel channel, string exchange, string routingKey, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false) -> System.Threading.Tasks.ValueTask -static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel channel, RabbitMQ.Client.PublicationAddress addr, T basicProperties, System.ReadOnlyMemory body) -> System.Threading.Tasks.ValueTask static RabbitMQ.Client.PublicationAddress.Parse(string uriLikeString) -> RabbitMQ.Client.PublicationAddress static RabbitMQ.Client.PublicationAddress.TryParse(string uriLikeString, out RabbitMQ.Client.PublicationAddress result) -> bool static RabbitMQ.Client.QueueDeclareOk.implicit operator string(RabbitMQ.Client.QueueDeclareOk declareOk) -> string @@ -897,25 +894,28 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void ~RabbitMQ.Client.TopologyRecoveryExceptionHandler.QueueRecoveryExceptionHandlerAsync.get -> System.Func ~RabbitMQ.Client.TopologyRecoveryExceptionHandler.QueueRecoveryExceptionHandlerAsync.set -> void ~static RabbitMQ.Client.IChannelExtensions.AbortAsync(this RabbitMQ.Client.IChannel channel, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task -~static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, RabbitMQ.Client.IAsyncBasicConsumer consumer, string queue, bool autoAck = false, string consumerTag = "", bool noLocal = false, bool exclusive = false, System.Collections.Generic.IDictionary arguments = null) -> System.Threading.Tasks.Task -~static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, string queue, bool autoAck, RabbitMQ.Client.IAsyncBasicConsumer consumer) -> System.Threading.Tasks.Task -~static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, string queue, bool autoAck, string consumerTag, RabbitMQ.Client.IAsyncBasicConsumer consumer) -> System.Threading.Tasks.Task -~static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, string queue, bool autoAck, string consumerTag, System.Collections.Generic.IDictionary arguments, RabbitMQ.Client.IAsyncBasicConsumer consumer) -> System.Threading.Tasks.Task ~static RabbitMQ.Client.IChannelExtensions.CloseAsync(this RabbitMQ.Client.IChannel channel, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task -~static RabbitMQ.Client.IChannelExtensions.CloseAsync(this RabbitMQ.Client.IChannel channel, ushort replyCode, string replyText, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task -~static RabbitMQ.Client.IChannelExtensions.ExchangeDeclareAsync(this RabbitMQ.Client.IChannel channel, string exchange, string type, bool durable = false, bool autoDelete = false, System.Collections.Generic.IDictionary arguments = null, bool noWait = false) -> System.Threading.Tasks.Task -~static RabbitMQ.Client.IChannelExtensions.QueueDeclareAsync(this RabbitMQ.Client.IChannel channel, string queue = "", bool durable = false, bool exclusive = true, bool autoDelete = true, System.Collections.Generic.IDictionary arguments = null, bool noWait = false) -> System.Threading.Tasks.Task -~static RabbitMQ.Client.IChannelExtensions.QueueDeleteAsync(this RabbitMQ.Client.IChannel channel, string queue, bool ifUnused = false, bool ifEmpty = false) -> System.Threading.Tasks.Task -~static RabbitMQ.Client.IChannelExtensions.QueueUnbindAsync(this RabbitMQ.Client.IChannel channel, string queue, string exchange, string routingKey, System.Collections.Generic.IDictionary arguments = null) -> System.Threading.Tasks.Task -~static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection connection) -> System.Threading.Tasks.Task -~static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection connection, System.TimeSpan timeout) -> System.Threading.Tasks.Task ~static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText) -> System.Threading.Tasks.Task -~static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText, System.TimeSpan timeout) -> System.Threading.Tasks.Task -~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task -~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, System.TimeSpan timeout) -> System.Threading.Tasks.Task -~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task ~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText, System.TimeSpan timeout) -> System.Threading.Tasks.Task ~static RabbitMQ.Client.RabbitMQActivitySource.ContextExtractor.get -> System.Func ~static RabbitMQ.Client.RabbitMQActivitySource.ContextExtractor.set -> void ~static RabbitMQ.Client.RabbitMQActivitySource.ContextInjector.get -> System.Action> ~static RabbitMQ.Client.RabbitMQActivitySource.ContextInjector.set -> void +static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.IAsyncBasicConsumer! consumer, string! queue, bool autoAck = false, string! consumerTag = "", bool noLocal = false, bool exclusive = false, System.Collections.Generic.IDictionary? arguments = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! +static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel! channel, string! queue, bool autoAck, RabbitMQ.Client.IAsyncBasicConsumer! consumer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! +static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel! channel, string! queue, bool autoAck, string! consumerTag, RabbitMQ.Client.IAsyncBasicConsumer! consumer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! +static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel! channel, string! queue, bool autoAck, string! consumerTag, System.Collections.Generic.IDictionary? arguments, RabbitMQ.Client.IAsyncBasicConsumer! consumer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! +static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask +static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask +static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.PublicationAddress! addr, T basicProperties, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask +static RabbitMQ.Client.IChannelExtensions.CloseAsync(this RabbitMQ.Client.IChannel! channel, ushort replyCode, string! replyText, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! +static RabbitMQ.Client.IChannelExtensions.ExchangeDeclareAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! type, bool durable = false, bool autoDelete = false, System.Collections.Generic.IDictionary? arguments = null, bool noWait = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! +static RabbitMQ.Client.IChannelExtensions.QueueDeclareAsync(this RabbitMQ.Client.IChannel! channel, string! queue = "", bool durable = false, bool exclusive = true, bool autoDelete = true, System.Collections.Generic.IDictionary? arguments = null, bool noWait = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! +static RabbitMQ.Client.IChannelExtensions.QueueDeleteAsync(this RabbitMQ.Client.IChannel! channel, string! queue, bool ifUnused = false, bool ifEmpty = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! +static RabbitMQ.Client.IChannelExtensions.QueueUnbindAsync(this RabbitMQ.Client.IChannel! channel, string! queue, string! exchange, string! routingKey, System.Collections.Generic.IDictionary? arguments = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! +static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection! connection) -> System.Threading.Tasks.Task! +static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection! connection, System.TimeSpan timeout) -> System.Threading.Tasks.Task! +static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection! connection, ushort reasonCode, string! reasonText, System.TimeSpan timeout) -> System.Threading.Tasks.Task! +static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection! connection, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! +static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection! connection, System.TimeSpan timeout) -> System.Threading.Tasks.Task! +static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection! connection, ushort reasonCode, string! reasonText, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! diff --git a/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs b/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs index f47b7e646..6bf174563 100644 --- a/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs +++ b/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs @@ -48,26 +48,29 @@ public static Task BasicConsumeAsync(this IChannel channel, string consumerTag = "", bool noLocal = false, bool exclusive = false, - IDictionary? arguments = null) + IDictionary? arguments = null, + CancellationToken cancellationToken = default) { - return channel.BasicConsumeAsync(queue, autoAck, consumerTag, noLocal, exclusive, arguments, consumer); + return channel.BasicConsumeAsync(queue, autoAck, consumerTag, noLocal, exclusive, arguments, consumer, cancellationToken); } /// Asynchronously start a Basic content-class consumer. public static Task BasicConsumeAsync(this IChannel channel, string queue, bool autoAck, - IAsyncBasicConsumer consumer) + IAsyncBasicConsumer consumer, + CancellationToken cancellationToken = default) { - return channel.BasicConsumeAsync(queue, autoAck, string.Empty, false, false, null, consumer); + return channel.BasicConsumeAsync(queue, autoAck, string.Empty, false, false, null, consumer, cancellationToken); } /// Asynchronously start a Basic content-class consumer. public static Task BasicConsumeAsync(this IChannel channel, string queue, bool autoAck, string consumerTag, - IAsyncBasicConsumer consumer) + IAsyncBasicConsumer consumer, + CancellationToken cancellationToken = default) { - return channel.BasicConsumeAsync(queue, autoAck, consumerTag, false, false, null, consumer); + return channel.BasicConsumeAsync(queue, autoAck, consumerTag, false, false, null, consumer, cancellationToken); } /// Asynchronously start a Basic content-class consumer. @@ -75,9 +78,10 @@ public static Task BasicConsumeAsync(this IChannel channel, string queue bool autoAck, string consumerTag, IDictionary? arguments, - IAsyncBasicConsumer consumer) + IAsyncBasicConsumer consumer, + CancellationToken cancellationToken = default) { - return channel.BasicConsumeAsync(queue, autoAck, consumerTag, false, false, arguments, consumer); + return channel.BasicConsumeAsync(queue, autoAck, consumerTag, false, false, arguments, consumer, cancellationToken); } /// @@ -87,55 +91,55 @@ public static Task BasicConsumeAsync(this IChannel channel, string queue /// The publication occurs with mandatory=false and immediate=false. /// public static ValueTask BasicPublishAsync(this IChannel channel, PublicationAddress addr, T basicProperties, - ReadOnlyMemory body) + ReadOnlyMemory body, CancellationToken cancellationToken = default) where T : IReadOnlyBasicProperties, IAmqpHeader { - return channel.BasicPublishAsync(addr.ExchangeName, addr.RoutingKey, basicProperties, body); + return channel.BasicPublishAsync(addr.ExchangeName, addr.RoutingKey, basicProperties, body, false, cancellationToken); } public static ValueTask BasicPublishAsync(this IChannel channel, string exchange, string routingKey, - ReadOnlyMemory body = default, bool mandatory = false) => - channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory); + ReadOnlyMemory body = default, bool mandatory = false, CancellationToken cancellationToken = default) => + channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory, cancellationToken); public static ValueTask BasicPublishAsync(this IChannel channel, CachedString exchange, - CachedString routingKey, ReadOnlyMemory body = default, bool mandatory = false) => - channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory); + CachedString routingKey, ReadOnlyMemory body = default, bool mandatory = false, CancellationToken cancellationToken = default) => + channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory, cancellationToken); /// /// Asynchronously declare a queue. /// public static Task QueueDeclareAsync(this IChannel channel, string queue = "", bool durable = false, bool exclusive = true, - bool autoDelete = true, IDictionary? arguments = null, bool noWait = false) + bool autoDelete = true, IDictionary? arguments = null, bool noWait = false, CancellationToken cancellationToken = default) { return channel.QueueDeclareAsync(queue: queue, passive: false, durable: durable, exclusive: exclusive, autoDelete: autoDelete, - arguments: arguments, noWait: noWait); + arguments: arguments, noWait: noWait, cancellationToken: cancellationToken); } /// /// Asynchronously declare an exchange. /// public static Task ExchangeDeclareAsync(this IChannel channel, string exchange, string type, bool durable = false, bool autoDelete = false, - IDictionary? arguments = null, bool noWait = false) + IDictionary? arguments = null, bool noWait = false, CancellationToken cancellationToken = default) { return channel.ExchangeDeclareAsync(exchange, type, durable, autoDelete, - arguments: arguments, passive: false, noWait: noWait); + arguments: arguments, passive: false, noWait: noWait, cancellationToken: cancellationToken); } /// /// Asynchronously deletes a queue. /// - public static Task QueueDeleteAsync(this IChannel channel, string queue, bool ifUnused = false, bool ifEmpty = false) + public static Task QueueDeleteAsync(this IChannel channel, string queue, bool ifUnused = false, bool ifEmpty = false, CancellationToken cancellationToken = default) { - return channel.QueueDeleteAsync(queue, ifUnused, ifEmpty); + return channel.QueueDeleteAsync(queue, ifUnused, ifEmpty, false, cancellationToken); } /// /// Asynchronously unbinds a queue. /// - public static Task QueueUnbindAsync(this IChannel channel, string queue, string exchange, string routingKey, IDictionary? arguments = null) + public static Task QueueUnbindAsync(this IChannel channel, string queue, string exchange, string routingKey, IDictionary? arguments = null, CancellationToken cancellationToken = default) { - return channel.QueueUnbindAsync(queue, exchange, routingKey, arguments); + return channel.QueueUnbindAsync(queue, exchange, routingKey, arguments, cancellationToken); } /// diff --git a/projects/Test/Integration/ConnectionRecovery/TestConnectionRecovery.cs b/projects/Test/Integration/ConnectionRecovery/TestConnectionRecovery.cs index d22f99829..35b027bad 100644 --- a/projects/Test/Integration/ConnectionRecovery/TestConnectionRecovery.cs +++ b/projects/Test/Integration/ConnectionRecovery/TestConnectionRecovery.cs @@ -92,9 +92,9 @@ await _channel.ExchangeDeclareAsync(exchange: exchangeName, await _channel.QueueBindAsync(queue: queueName, exchange: exchangeName, routingKey: routingKey); - var c = new AsyncEventingBasicConsumer(_channel); - c.Received += MessageReceived; - await _channel.BasicConsumeAsync(queue: queueName, autoAck: true, consumer: c); + var consumer = new AsyncEventingBasicConsumer(_channel); + consumer.Received += MessageReceived; + await _channel.BasicConsumeAsync(queueName, true, consumer); using (IChannel pubCh = await _conn.CreateChannelAsync()) { diff --git a/projects/Test/Integration/ConnectionRecovery/TestQueueRecoveryWithArguments.cs b/projects/Test/Integration/ConnectionRecovery/TestQueueRecoveryWithArguments.cs index f16aefca2..c04b55072 100644 --- a/projects/Test/Integration/ConnectionRecovery/TestQueueRecoveryWithArguments.cs +++ b/projects/Test/Integration/ConnectionRecovery/TestQueueRecoveryWithArguments.cs @@ -77,8 +77,8 @@ await _channel.ExchangeDeclareAsync(exchange: tdiRetryExchangeName, await _channel.QueueBindAsync(testQueueName, tdiRetryExchangeName, testQueueName); - var consumerAsync = new AsyncEventingBasicConsumer(_channel); - await _channel.BasicConsumeAsync(queue: testQueueName, autoAck: false, consumer: consumerAsync); + var consumer = new AsyncEventingBasicConsumer(_channel); + await _channel.BasicConsumeAsync(testQueueName, false, consumer); await CloseAndWaitForRecoveryAsync(); diff --git a/projects/Test/Integration/TestAsyncConsumerOperationDispatch.cs b/projects/Test/Integration/TestAsyncConsumerOperationDispatch.cs index bf4db9077..1d5795e57 100644 --- a/projects/Test/Integration/TestAsyncConsumerOperationDispatch.cs +++ b/projects/Test/Integration/TestAsyncConsumerOperationDispatch.cs @@ -118,7 +118,7 @@ public async Task TestDeliveryOrderingWithSingleChannel() _queues.Add(q); var cons = new CollectingConsumer(ch); _consumers.Add(cons); - await ch.BasicConsumeAsync(queue: q, autoAck: false, consumer: cons); + await ch.BasicConsumeAsync(q, false, cons); } for (int i = 0; i < N; i++) @@ -211,20 +211,20 @@ public override Task HandleChannelShutdownAsync(object channel, ShutdownEventArg public async Task TestChannelShutdownHandler() { string q = await _channel.QueueDeclareAsync(); - var c = new ShutdownLatchConsumer(); + var consumer = new ShutdownLatchConsumer(); - await _channel.BasicConsumeAsync(queue: q, autoAck: true, consumer: c); + await _channel.BasicConsumeAsync(q, true, consumer); await _channel.CloseAsync(); - await c.Latch.Task.WaitAsync(ShortSpan); - Assert.True(c.Latch.Task.IsCompletedSuccessfully()); + await consumer.Latch.Task.WaitAsync(ShortSpan); + Assert.True(consumer.Latch.Task.IsCompletedSuccessfully()); await Assert.ThrowsAsync(() => { - return c.DuplicateLatch.Task.WaitAsync(ShortSpan); + return consumer.DuplicateLatch.Task.WaitAsync(ShortSpan); }); - Assert.False(c.DuplicateLatch.Task.IsCompletedSuccessfully()); + Assert.False(consumer.DuplicateLatch.Task.IsCompletedSuccessfully()); } } }