Skip to content

Commit

Permalink
Merge pull request #1667 from rabbitmq/lukebakken/async-consumer-requ…
Browse files Browse the repository at this point in the history
…ire-channel

Require `IChannel` for `AsyncDefaultBasicConsumer`
  • Loading branch information
lukebakken authored Sep 10, 2024
2 parents d444230 + 6e39df0 commit b2282c7
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public static async Task Publish_Hello_World(IConnection connection, uint messag
using (IChannel channel = await connection.CreateChannelAsync())
{
QueueDeclareOk queue = await channel.QueueDeclareAsync();
var consumer = new CountingConsumer(messageCount);
var consumer = new CountingConsumer(channel, messageCount);
await channel.BasicConsumeAsync(queue.QueueName, true, consumer);

for (int i = 0; i < messageCount; i++)
Expand All @@ -35,7 +35,7 @@ internal sealed class CountingConsumer : AsyncDefaultBasicConsumer

public Task CompletedTask => _tcs.Task;

public CountingConsumer(uint messageCount)
public CountingConsumer(IChannel channel, uint messageCount) : base(channel)
{
_remainingCount = (int)messageCount;
_tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
Expand Down
4 changes: 0 additions & 4 deletions projects/RabbitMQ.Client/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,11 @@ RabbitMQ.Client.AmqpTimestamp.AmqpTimestamp() -> void
RabbitMQ.Client.AmqpTimestamp.AmqpTimestamp(long unixTime) -> void
RabbitMQ.Client.AmqpTimestamp.Equals(RabbitMQ.Client.AmqpTimestamp other) -> bool
RabbitMQ.Client.AsyncDefaultBasicConsumer
RabbitMQ.Client.AsyncDefaultBasicConsumer.AsyncDefaultBasicConsumer() -> void
RabbitMQ.Client.AsyncDefaultBasicConsumer.AsyncDefaultBasicConsumer(RabbitMQ.Client.IChannel channel) -> void
RabbitMQ.Client.AsyncDefaultBasicConsumer.Channel.get -> RabbitMQ.Client.IChannel
RabbitMQ.Client.AsyncDefaultBasicConsumer.Channel.set -> void
RabbitMQ.Client.AsyncDefaultBasicConsumer.ConsumerTags.get -> string[]
RabbitMQ.Client.AsyncDefaultBasicConsumer.IsRunning.get -> bool
RabbitMQ.Client.AsyncDefaultBasicConsumer.IsRunning.set -> void
RabbitMQ.Client.AsyncDefaultBasicConsumer.ShutdownReason.get -> RabbitMQ.Client.ShutdownEventArgs
RabbitMQ.Client.AsyncDefaultBasicConsumer.ShutdownReason.set -> void
RabbitMQ.Client.BasicGetResult
RabbitMQ.Client.BasicGetResult.BasicGetResult(ulong deliveryTag, bool redelivered, string exchange, string routingKey, uint messageCount, RabbitMQ.Client.IReadOnlyBasicProperties basicProperties, System.ReadOnlyMemory<byte> body) -> void
RabbitMQ.Client.BasicProperties
Expand Down
13 changes: 3 additions & 10 deletions projects/RabbitMQ.Client/client/api/AsyncDefaultBasicConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,6 @@ public class AsyncDefaultBasicConsumer : IAsyncBasicConsumer
{
private readonly HashSet<string> _consumerTags = new HashSet<string>();

/// <summary>
/// Creates a new instance of an <see cref="AsyncDefaultBasicConsumer"/>.
/// </summary>
public AsyncDefaultBasicConsumer()
{
}

/// <summary>
/// Constructor which sets the Channel property to the given value.
/// </summary>
Expand All @@ -40,19 +33,19 @@ public string[] ConsumerTags
/// <summary>
/// Returns true while the consumer is registered and expecting deliveries from the broker.
/// </summary>
public bool IsRunning { get; protected set; }
public bool IsRunning { get; private set; }

/// <summary>
/// If our <see cref="IChannel"/> shuts down, this property will contain a description of the reason for the
/// shutdown. Otherwise it will contain null. See <see cref="ShutdownEventArgs"/>.
/// </summary>
public ShutdownEventArgs? ShutdownReason { get; protected set; }
public ShutdownEventArgs? ShutdownReason { get; private set; }

/// <summary>
/// Retrieve the <see cref="IChannel"/> this consumer is associated with,
/// for use in acknowledging received messages, for instance.
/// </summary>
public IChannel? Channel { get; set; }
public IChannel Channel { get; private set; }

/// <summary>
/// Called when the consumer is cancelled for reasons other than by a basicCancel:
Expand Down
13 changes: 8 additions & 5 deletions projects/Test/Integration/TestAsyncConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public TestAsyncConsumer(ITestOutputHelper output)
public async Task TestBasicRoundtripConcurrent()
{
AddCallbackExceptionHandlers();
_channel.DefaultConsumer = new DefaultAsyncConsumer("_channel,", _output);
_channel.DefaultConsumer = new DefaultAsyncConsumer(_channel, "_channel,", _output);

QueueDeclareOk q = await _channel.QueueDeclareAsync();

Expand Down Expand Up @@ -147,7 +147,7 @@ public async Task TestBasicRoundtripConcurrent()
public async Task TestBasicRoundtripConcurrentManyMessages()
{
AddCallbackExceptionHandlers();
_channel.DefaultConsumer = new DefaultAsyncConsumer("_channel,", _output);
_channel.DefaultConsumer = new DefaultAsyncConsumer(_channel, "_channel,", _output);

const int publish_total = 4096;
const int length = 512;
Expand Down Expand Up @@ -205,7 +205,8 @@ public async Task TestBasicRoundtripConcurrentManyMessages()
using (IChannel publishChannel = await publishConn.CreateChannelAsync())
{
AddCallbackExceptionHandlers(publishConn, publishChannel);
publishChannel.DefaultConsumer = new DefaultAsyncConsumer("publishChannel,", _output);
publishChannel.DefaultConsumer = new DefaultAsyncConsumer(publishChannel,
"publishChannel,", _output);
publishChannel.ChannelShutdown += (o, ea) =>
{
HandleChannelShutdown(publishChannel, ea, (args) =>
Expand Down Expand Up @@ -247,7 +248,8 @@ public async Task TestBasicRoundtripConcurrentManyMessages()
using (IChannel consumeChannel = await consumeConn.CreateChannelAsync())
{
AddCallbackExceptionHandlers(consumeConn, consumeChannel);
consumeChannel.DefaultConsumer = new DefaultAsyncConsumer("consumeChannel,", _output);
consumeChannel.DefaultConsumer = new DefaultAsyncConsumer(consumeChannel,
"consumeChannel,", _output);
consumeChannel.ChannelShutdown += (o, ea) =>
{
HandleChannelShutdown(consumeChannel, ea, (args) =>
Expand Down Expand Up @@ -722,7 +724,8 @@ private class DefaultAsyncConsumer : AsyncDefaultBasicConsumer
private readonly string _logPrefix;
private readonly ITestOutputHelper _output;

public DefaultAsyncConsumer(string logPrefix, ITestOutputHelper output)
public DefaultAsyncConsumer(IChannel channel, string logPrefix, ITestOutputHelper output)
: base(channel)
{
_logPrefix = logPrefix;
_output = output;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public async Task TestChannelShutdownDoesNotShutDownDispatcher()

private class ShutdownLatchConsumer : AsyncDefaultBasicConsumer
{
public ShutdownLatchConsumer()
public ShutdownLatchConsumer(IChannel channel) : base(channel)
{
Latch = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
DuplicateLatch = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
Expand Down Expand Up @@ -211,7 +211,7 @@ public override Task HandleChannelShutdownAsync(object channel, ShutdownEventArg
public async Task TestChannelShutdownHandler()
{
string q = await _channel.QueueDeclareAsync();
var consumer = new ShutdownLatchConsumer();
var consumer = new ShutdownLatchConsumer(_channel);

await _channel.BasicConsumeAsync(q, true, consumer);
await _channel.CloseAsync();
Expand Down

0 comments on commit b2282c7

Please sign in to comment.