Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cancellation token overload to channel extensions #1641

Merged
merged 1 commit into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 18 additions & 18 deletions projects/RabbitMQ.Client/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -808,9 +808,6 @@ static RabbitMQ.Client.EndpointResolverExtensions.SelectOneAsync<T>(this RabbitM
static RabbitMQ.Client.Events.CallbackExceptionEventArgs.Build(System.Exception e, string context) -> RabbitMQ.Client.Events.CallbackExceptionEventArgs
static RabbitMQ.Client.Events.CallbackExceptionEventArgs.Build(System.Exception e, string context, object consumer) -> RabbitMQ.Client.Events.CallbackExceptionEventArgs
static RabbitMQ.Client.ExchangeType.All() -> System.Collections.Generic.ICollection<string>
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel channel, RabbitMQ.Client.CachedString exchange, RabbitMQ.Client.CachedString routingKey, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false) -> System.Threading.Tasks.ValueTask
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel channel, string exchange, string routingKey, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false) -> System.Threading.Tasks.ValueTask
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync<T>(this RabbitMQ.Client.IChannel channel, RabbitMQ.Client.PublicationAddress addr, T basicProperties, System.ReadOnlyMemory<byte> body) -> System.Threading.Tasks.ValueTask
static RabbitMQ.Client.PublicationAddress.Parse(string uriLikeString) -> RabbitMQ.Client.PublicationAddress
static RabbitMQ.Client.PublicationAddress.TryParse(string uriLikeString, out RabbitMQ.Client.PublicationAddress result) -> bool
static RabbitMQ.Client.QueueDeclareOk.implicit operator string(RabbitMQ.Client.QueueDeclareOk declareOk) -> string
Expand Down Expand Up @@ -897,25 +894,28 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void
~RabbitMQ.Client.TopologyRecoveryExceptionHandler.QueueRecoveryExceptionHandlerAsync.get -> System.Func<RabbitMQ.Client.IRecordedQueue, System.Exception, RabbitMQ.Client.IConnection, System.Threading.Tasks.Task>
~RabbitMQ.Client.TopologyRecoveryExceptionHandler.QueueRecoveryExceptionHandlerAsync.set -> void
~static RabbitMQ.Client.IChannelExtensions.AbortAsync(this RabbitMQ.Client.IChannel channel, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
~static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, RabbitMQ.Client.IAsyncBasicConsumer consumer, string queue, bool autoAck = false, string consumerTag = "", bool noLocal = false, bool exclusive = false, System.Collections.Generic.IDictionary<string, object> arguments = null) -> System.Threading.Tasks.Task<string>
~static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, string queue, bool autoAck, RabbitMQ.Client.IAsyncBasicConsumer consumer) -> System.Threading.Tasks.Task<string>
~static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, string queue, bool autoAck, string consumerTag, RabbitMQ.Client.IAsyncBasicConsumer consumer) -> System.Threading.Tasks.Task<string>
~static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, string queue, bool autoAck, string consumerTag, System.Collections.Generic.IDictionary<string, object> arguments, RabbitMQ.Client.IAsyncBasicConsumer consumer) -> System.Threading.Tasks.Task<string>
~static RabbitMQ.Client.IChannelExtensions.CloseAsync(this RabbitMQ.Client.IChannel channel, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
~static RabbitMQ.Client.IChannelExtensions.CloseAsync(this RabbitMQ.Client.IChannel channel, ushort replyCode, string replyText, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
~static RabbitMQ.Client.IChannelExtensions.ExchangeDeclareAsync(this RabbitMQ.Client.IChannel channel, string exchange, string type, bool durable = false, bool autoDelete = false, System.Collections.Generic.IDictionary<string, object> arguments = null, bool noWait = false) -> System.Threading.Tasks.Task
~static RabbitMQ.Client.IChannelExtensions.QueueDeclareAsync(this RabbitMQ.Client.IChannel channel, string queue = "", bool durable = false, bool exclusive = true, bool autoDelete = true, System.Collections.Generic.IDictionary<string, object> arguments = null, bool noWait = false) -> System.Threading.Tasks.Task<RabbitMQ.Client.QueueDeclareOk>
~static RabbitMQ.Client.IChannelExtensions.QueueDeleteAsync(this RabbitMQ.Client.IChannel channel, string queue, bool ifUnused = false, bool ifEmpty = false) -> System.Threading.Tasks.Task<uint>
~static RabbitMQ.Client.IChannelExtensions.QueueUnbindAsync(this RabbitMQ.Client.IChannel channel, string queue, string exchange, string routingKey, System.Collections.Generic.IDictionary<string, object> arguments = null) -> System.Threading.Tasks.Task
~static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection connection) -> System.Threading.Tasks.Task
~static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection connection, System.TimeSpan timeout) -> System.Threading.Tasks.Task
~static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText) -> System.Threading.Tasks.Task
~static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText, System.TimeSpan timeout) -> System.Threading.Tasks.Task
~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, System.TimeSpan timeout) -> System.Threading.Tasks.Task
~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText, System.TimeSpan timeout) -> System.Threading.Tasks.Task
~static RabbitMQ.Client.RabbitMQActivitySource.ContextExtractor.get -> System.Func<RabbitMQ.Client.IReadOnlyBasicProperties, System.Diagnostics.ActivityContext>
~static RabbitMQ.Client.RabbitMQActivitySource.ContextExtractor.set -> void
~static RabbitMQ.Client.RabbitMQActivitySource.ContextInjector.get -> System.Action<System.Diagnostics.Activity, System.Collections.Generic.IDictionary<string, object>>
~static RabbitMQ.Client.RabbitMQActivitySource.ContextInjector.set -> void
static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.IAsyncBasicConsumer! consumer, string! queue, bool autoAck = false, string! consumerTag = "", bool noLocal = false, bool exclusive = false, System.Collections.Generic.IDictionary<string!, object?>? arguments = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<string!>!
static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel! channel, string! queue, bool autoAck, RabbitMQ.Client.IAsyncBasicConsumer! consumer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<string!>!
static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel! channel, string! queue, bool autoAck, string! consumerTag, RabbitMQ.Client.IAsyncBasicConsumer! consumer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<string!>!
static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel! channel, string! queue, bool autoAck, string! consumerTag, System.Collections.Generic.IDictionary<string!, object?>? arguments, RabbitMQ.Client.IAsyncBasicConsumer! consumer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<string!>!
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync<T>(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.PublicationAddress! addr, T basicProperties, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
static RabbitMQ.Client.IChannelExtensions.CloseAsync(this RabbitMQ.Client.IChannel! channel, ushort replyCode, string! replyText, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
static RabbitMQ.Client.IChannelExtensions.ExchangeDeclareAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! type, bool durable = false, bool autoDelete = false, System.Collections.Generic.IDictionary<string!, object?>? arguments = null, bool noWait = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
static RabbitMQ.Client.IChannelExtensions.QueueDeclareAsync(this RabbitMQ.Client.IChannel! channel, string! queue = "", bool durable = false, bool exclusive = true, bool autoDelete = true, System.Collections.Generic.IDictionary<string!, object?>? arguments = null, bool noWait = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.QueueDeclareOk!>!
static RabbitMQ.Client.IChannelExtensions.QueueDeleteAsync(this RabbitMQ.Client.IChannel! channel, string! queue, bool ifUnused = false, bool ifEmpty = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<uint>!
static RabbitMQ.Client.IChannelExtensions.QueueUnbindAsync(this RabbitMQ.Client.IChannel! channel, string! queue, string! exchange, string! routingKey, System.Collections.Generic.IDictionary<string!, object?>? arguments = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection! connection) -> System.Threading.Tasks.Task!
static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection! connection, System.TimeSpan timeout) -> System.Threading.Tasks.Task!
static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection! connection, ushort reasonCode, string! reasonText, System.TimeSpan timeout) -> System.Threading.Tasks.Task!
static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection! connection, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection! connection, System.TimeSpan timeout) -> System.Threading.Tasks.Task!
static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection! connection, ushort reasonCode, string! reasonText, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
48 changes: 26 additions & 22 deletions projects/RabbitMQ.Client/client/api/IChannelExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,36 +48,40 @@ public static Task<string> BasicConsumeAsync(this IChannel channel,
string consumerTag = "",
bool noLocal = false,
bool exclusive = false,
IDictionary<string, object?>? arguments = null)
IDictionary<string, object?>? arguments = null,
CancellationToken cancellationToken = default)
{
return channel.BasicConsumeAsync(queue, autoAck, consumerTag, noLocal, exclusive, arguments, consumer);
return channel.BasicConsumeAsync(queue, autoAck, consumerTag, noLocal, exclusive, arguments, consumer, cancellationToken);
}

/// <summary>Asynchronously start a Basic content-class consumer.</summary>
public static Task<string> BasicConsumeAsync(this IChannel channel, string queue,
bool autoAck,
IAsyncBasicConsumer consumer)
IAsyncBasicConsumer consumer,
CancellationToken cancellationToken = default)
{
return channel.BasicConsumeAsync(queue, autoAck, string.Empty, false, false, null, consumer);
return channel.BasicConsumeAsync(queue, autoAck, string.Empty, false, false, null, consumer, cancellationToken);
}

/// <summary>Asynchronously start a Basic content-class consumer.</summary>
public static Task<string> BasicConsumeAsync(this IChannel channel, string queue,
bool autoAck,
string consumerTag,
IAsyncBasicConsumer consumer)
IAsyncBasicConsumer consumer,
CancellationToken cancellationToken = default)
{
return channel.BasicConsumeAsync(queue, autoAck, consumerTag, false, false, null, consumer);
return channel.BasicConsumeAsync(queue, autoAck, consumerTag, false, false, null, consumer, cancellationToken);
}

/// <summary>Asynchronously start a Basic content-class consumer.</summary>
public static Task<string> BasicConsumeAsync(this IChannel channel, string queue,
bool autoAck,
string consumerTag,
IDictionary<string, object?>? arguments,
IAsyncBasicConsumer consumer)
IAsyncBasicConsumer consumer,
CancellationToken cancellationToken = default)
{
return channel.BasicConsumeAsync(queue, autoAck, consumerTag, false, false, arguments, consumer);
return channel.BasicConsumeAsync(queue, autoAck, consumerTag, false, false, arguments, consumer, cancellationToken);
}

/// <summary>
Expand All @@ -87,55 +91,55 @@ public static Task<string> BasicConsumeAsync(this IChannel channel, string queue
/// The publication occurs with mandatory=false and immediate=false.
/// </remarks>
public static ValueTask BasicPublishAsync<T>(this IChannel channel, PublicationAddress addr, T basicProperties,
ReadOnlyMemory<byte> body)
ReadOnlyMemory<byte> body, CancellationToken cancellationToken = default)
where T : IReadOnlyBasicProperties, IAmqpHeader
{
return channel.BasicPublishAsync(addr.ExchangeName, addr.RoutingKey, basicProperties, body);
return channel.BasicPublishAsync(addr.ExchangeName, addr.RoutingKey, basicProperties, body, false, cancellationToken);
}

public static ValueTask BasicPublishAsync(this IChannel channel, string exchange, string routingKey,
ReadOnlyMemory<byte> body = default, bool mandatory = false) =>
channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory);
ReadOnlyMemory<byte> body = default, bool mandatory = false, CancellationToken cancellationToken = default) =>
channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory, cancellationToken);

public static ValueTask BasicPublishAsync(this IChannel channel, CachedString exchange,
CachedString routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false) =>
channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory);
CachedString routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false, CancellationToken cancellationToken = default) =>
channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory, cancellationToken);

/// <summary>
/// Asynchronously declare a queue.
/// </summary>
public static Task<QueueDeclareOk> QueueDeclareAsync(this IChannel channel, string queue = "", bool durable = false, bool exclusive = true,
bool autoDelete = true, IDictionary<string, object?>? arguments = null, bool noWait = false)
bool autoDelete = true, IDictionary<string, object?>? arguments = null, bool noWait = false, CancellationToken cancellationToken = default)
{
return channel.QueueDeclareAsync(queue: queue, passive: false,
durable: durable, exclusive: exclusive, autoDelete: autoDelete,
arguments: arguments, noWait: noWait);
arguments: arguments, noWait: noWait, cancellationToken: cancellationToken);
}

/// <summary>
/// Asynchronously declare an exchange.
/// </summary>
public static Task ExchangeDeclareAsync(this IChannel channel, string exchange, string type, bool durable = false, bool autoDelete = false,
IDictionary<string, object?>? arguments = null, bool noWait = false)
IDictionary<string, object?>? arguments = null, bool noWait = false, CancellationToken cancellationToken = default)
{
return channel.ExchangeDeclareAsync(exchange, type, durable, autoDelete,
arguments: arguments, passive: false, noWait: noWait);
arguments: arguments, passive: false, noWait: noWait, cancellationToken: cancellationToken);
}

/// <summary>
/// Asynchronously deletes a queue.
/// </summary>
public static Task<uint> QueueDeleteAsync(this IChannel channel, string queue, bool ifUnused = false, bool ifEmpty = false)
public static Task<uint> QueueDeleteAsync(this IChannel channel, string queue, bool ifUnused = false, bool ifEmpty = false, CancellationToken cancellationToken = default)
{
return channel.QueueDeleteAsync(queue, ifUnused, ifEmpty);
return channel.QueueDeleteAsync(queue, ifUnused, ifEmpty, false, cancellationToken);
}

/// <summary>
/// Asynchronously unbinds a queue.
/// </summary>
public static Task QueueUnbindAsync(this IChannel channel, string queue, string exchange, string routingKey, IDictionary<string, object?>? arguments = null)
public static Task QueueUnbindAsync(this IChannel channel, string queue, string exchange, string routingKey, IDictionary<string, object?>? arguments = null, CancellationToken cancellationToken = default)
{
return channel.QueueUnbindAsync(queue, exchange, routingKey, arguments);
return channel.QueueUnbindAsync(queue, exchange, routingKey, arguments, cancellationToken);
}

/// <summary>
Expand Down
Loading
Loading