Skip to content

Commit

Permalink
Add publish sequence number as long as `_publisherConfirmationsEnable…
Browse files Browse the repository at this point in the history
…d` is true

Fix the `PublisherConfirms` test program
  • Loading branch information
lukebakken committed Oct 3, 2024
1 parent d8d00e8 commit 5296d44
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 30 deletions.
16 changes: 8 additions & 8 deletions projects/RabbitMQ.Client/Impl/ChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ await _basicReturnAsyncWrapper.InvokeAsync(this, e)
.ConfigureAwait(false);
}

if (_publisherConfirmationsEnabled && _publisherConfirmationTrackingEnabled)
if (_publisherConfirmationsEnabled)
{
ulong publishSequenceNumber = 0;
IReadOnlyBasicProperties props = e.BasicProperties;
Expand All @@ -666,7 +666,7 @@ await _basicReturnAsyncWrapper.InvokeAsync(this, e)
}
}

if (publishSequenceNumber != 0)
if (publishSequenceNumber != 0 && _publisherConfirmationTrackingEnabled)
{
await HandleAckNack(publishSequenceNumber, false, true, cancellationToken)
.ConfigureAwait(false);
Expand Down Expand Up @@ -1018,10 +1018,10 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
.ConfigureAwait(false);
try
{
publishSequenceNumber = _nextPublishSeqNo;

if (_publisherConfirmationTrackingEnabled)
{
publishSequenceNumber = _nextPublishSeqNo;

_pendingDeliveryTags.AddLast(publishSequenceNumber);
publisherConfirmationTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
_confirmsTaskCompletionSources[publishSequenceNumber] = publisherConfirmationTcs;
Expand Down Expand Up @@ -1115,10 +1115,10 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
.ConfigureAwait(false);
try
{
publishSequenceNumber = _nextPublishSeqNo;

if (_publisherConfirmationTrackingEnabled)
{
publishSequenceNumber = _nextPublishSeqNo;

_pendingDeliveryTags.AddLast(publishSequenceNumber);
publisherConfirmationTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
_confirmsTaskCompletionSources[publishSequenceNumber] = publisherConfirmationTcs;
Expand Down Expand Up @@ -1880,7 +1880,7 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
Activity? sendActivity, ulong publishSequenceNumber)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
{
if (sendActivity is null && false == _publisherConfirmationTrackingEnabled)
if (sendActivity is null && false == _publisherConfirmationsEnabled)
{
return null;
}
Expand Down Expand Up @@ -1936,7 +1936,7 @@ void MaybeAddActivityToHeaders(IDictionary<string, object?> headers,

void MaybeAddPublishSequenceNumberToHeaders(IDictionary<string, object?> headers)
{
if (_publisherConfirmationTrackingEnabled)
if (_publisherConfirmationsEnabled)
{
byte[] publishSequenceNumberBytes;
if (BitConverter.IsLittleEndian)
Expand Down
60 changes: 38 additions & 22 deletions projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,13 @@
using System.Threading.Tasks;
using RabbitMQ.Client;

// const int MESSAGE_COUNT = 50_000;
const int MESSAGE_COUNT = 5;
const int MESSAGE_COUNT = 50_000;
bool debug = false;

#pragma warning disable CS8321 // Local function is declared but never used

// await PublishMessagesIndividuallyAsync();
// await PublishMessagesInBatchAsync();
await PublishMessagesIndividuallyAsync();
await PublishMessagesInBatchAsync();
await HandlePublishConfirmsAsynchronously();

static Task<IConnection> CreateConnectionAsync()
Expand All @@ -68,10 +67,15 @@ static async Task PublishMessagesIndividuallyAsync()
var sw = new Stopwatch();
sw.Start();

bool ack = false;
for (int i = 0; i < MESSAGE_COUNT; i++)
{
byte[] body = Encoding.UTF8.GetBytes(i.ToString());
await channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body);
ack = await channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body);
if (false == ack)
{
Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack '{ack}'");
}
}

sw.Stop();
Expand All @@ -90,7 +94,7 @@ static async Task PublishMessagesInBatchAsync()
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
string queueName = queueDeclareResult.QueueName;

int batchSize = 500;
int batchSize = 1000;
int outstandingMessageCount = 0;

var sw = new Stopwatch();
Expand All @@ -105,9 +109,13 @@ static async Task PublishMessagesInBatchAsync()

if (outstandingMessageCount == batchSize)
{
foreach (ValueTask<bool> vt in publishTasks)
foreach (ValueTask<bool> pt in publishTasks)
{
await vt;
bool ack = await pt;
if (false == ack)
{
Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack '{ack}'");
}
}
publishTasks.Clear();
outstandingMessageCount = 0;
Expand All @@ -116,9 +124,13 @@ static async Task PublishMessagesInBatchAsync()

if (publishTasks.Count > 0)
{
foreach (ValueTask<bool> vt in publishTasks)
foreach (ValueTask<bool> pt in publishTasks)
{
await vt;
bool ack = await pt;
if (false == ack)
{
Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack '{ack}'");
}
}
publishTasks.Clear();
outstandingMessageCount = 0;
Expand All @@ -134,25 +146,21 @@ async Task HandlePublishConfirmsAsynchronously()

await using IConnection connection = await CreateConnectionAsync();

// NOTE: setting trackConfirmations to false because this program
// is tracking them itself.
var channelOptions = new CreateChannelOptions
{
PublisherConfirmationsEnabled = true,
PublisherConfirmationTrackingEnabled = true
PublisherConfirmationTrackingEnabled = false
};
await using IChannel channel = await connection.CreateChannelAsync(channelOptions);

// declare a server-named queue
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
string queueName = queueDeclareResult.QueueName;

#pragma warning disable CS0219 // Variable is assigned but its value is never used
bool publishingCompleted = false;
#pragma warning restore CS0219 // Variable is assigned but its value is never used
var allMessagesConfirmedTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var outstandingConfirms = new LinkedList<ulong>();
var semaphore = new SemaphoreSlim(1, 1);
int confirmedCount = 0;
async Task CleanOutstandingConfirms(ulong deliveryTag, bool multiple)
{
if (debug)
Expand Down Expand Up @@ -181,10 +189,13 @@ async Task CleanOutstandingConfirms(ulong deliveryTag, bool multiple)
{
break;
}

confirmedCount++;
} while (true);
}
else
{
confirmedCount++;
outstandingConfirms.Remove(deliveryTag);
}
}
Expand All @@ -193,8 +204,7 @@ async Task CleanOutstandingConfirms(ulong deliveryTag, bool multiple)
semaphore.Release();
}

// if (publishingCompleted && outstandingConfirms.Count == 0)
if (outstandingConfirms.Count == 0)
if (outstandingConfirms.Count == 0 || confirmedCount == MESSAGE_COUNT)
{
allMessagesConfirmedTcs.SetResult(true);
}
Expand Down Expand Up @@ -248,8 +258,12 @@ async Task CleanOutstandingConfirms(ulong deliveryTag, bool multiple)
semaphore.Release();
}

// string rk = queueName;
string rk = Guid.NewGuid().ToString();
string rk = queueName;
if (i % 1000 == 0)
{
// This will cause a basic.return, for fun
rk = Guid.NewGuid().ToString();
}
ValueTask<bool> pt = channel.BasicPublishAsync(exchange: string.Empty, routingKey: rk, body: body, mandatory: true);
publishTasks.Add(pt);
}
Expand All @@ -259,9 +273,11 @@ async Task CleanOutstandingConfirms(ulong deliveryTag, bool multiple)
foreach (ValueTask<bool> pt in publishTasks)
{
bool ack = await pt;
Console.WriteLine($"{DateTime.Now} [INFO] saw ack '{ack}'");
if (false == ack)
{
Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack '{ack}'");
}
}
publishingCompleted = true;

try
{
Expand Down

0 comments on commit 5296d44

Please sign in to comment.