Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure delivery tag is decremented for client-side exception #1453

Merged
merged 1 commit into from
Dec 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions projects/RabbitMQ.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,9 @@ RabbitMQ.Client.IChannel.TxRollback() -> void
RabbitMQ.Client.IChannel.TxRollbackAsync() -> System.Threading.Tasks.ValueTask
RabbitMQ.Client.IChannel.TxSelect() -> void
RabbitMQ.Client.IChannel.TxSelectAsync() -> System.Threading.Tasks.ValueTask
RabbitMQ.Client.IChannel.WaitForConfirms() -> bool
RabbitMQ.Client.IChannel.WaitForConfirmsAsync(System.Threading.CancellationToken token = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<bool>
RabbitMQ.Client.IChannel.WaitForConfirmsOrDie() -> void
RabbitMQ.Client.IChannel.WaitForConfirmsOrDieAsync(System.Threading.CancellationToken token = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
RabbitMQ.Client.IChannelExtensions
RabbitMQ.Client.IConnection
Expand Down
24 changes: 24 additions & 0 deletions projects/RabbitMQ.Client/client/api/IChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,19 @@ ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString rou
/// Wait until all published messages on this channel have been confirmed.
/// </summary>
/// <returns>True if no nacks were received within the timeout, otherwise false.</returns>
/// <remarks>
/// Waits until all messages published on this channel since the last call have
/// been either ack'd or nack'd by the server. Returns whether
/// all the messages were ack'd (and none were nack'd).
/// Throws an exception when called on a channel
/// that does not have publisher confirms enabled.
/// </remarks>
bool WaitForConfirms();

/// <summary>
/// Asynchronously wait until all published messages on this channel have been confirmed.
/// </summary>
/// <returns>True if no nacks were received within the timeout, otherwise false.</returns>
/// <param name="token">The cancellation token.</param>
/// <remarks>
/// Waits until all messages published on this channel since the last call have
Expand All @@ -608,6 +621,17 @@ ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString rou
/// </remarks>
Task<bool> WaitForConfirmsAsync(CancellationToken token = default);

/// <summary>
/// Wait until all published messages on this channel have been confirmed.
/// </summary>
/// <remarks>
/// Waits until all messages published on this channel since the last call have
/// been ack'd by the server. If a nack is received or the timeout
/// elapses, throws an IOException exception immediately and closes
/// the channel.
/// </remarks>
void WaitForConfirmsOrDie();

/// <summary>
/// Wait until all published messages on this channel have been confirmed.
/// </summary>
Expand Down
6 changes: 6 additions & 0 deletions projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -602,9 +602,15 @@ public ValueTask TxSelectAsync()
return InnerChannel.TxSelectAsync();
}

public bool WaitForConfirms()
=> InnerChannel.WaitForConfirms();

public Task<bool> WaitForConfirmsAsync(CancellationToken token = default)
=> InnerChannel.WaitForConfirmsAsync(token);

public void WaitForConfirmsOrDie()
=> InnerChannel.WaitForConfirmsOrDie();

public Task WaitForConfirmsOrDieAsync(CancellationToken token = default)
=> InnerChannel.WaitForConfirmsOrDieAsync(token);

Expand Down
91 changes: 83 additions & 8 deletions projects/RabbitMQ.Client/client/impl/ChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ public IBasicConsumer DefaultConsumer

public bool IsOpen => CloseReason is null;

// TODO add private bool for Confirm mode
public ulong NextPublishSeqNo { get; private set; }

public string CurrentQueue { get; private set; }
Expand Down Expand Up @@ -1239,8 +1240,24 @@ public void BasicPublish<TProperties>(string exchange, string routingKey, in TPr
}
}

var cmd = new BasicPublish(exchange, routingKey, mandatory, default);
ChannelSend(in cmd, in basicProperties, body);
try
{
var cmd = new BasicPublish(exchange, routingKey, mandatory, default);
ChannelSend(in cmd, in basicProperties, body);
}
catch
{
if (NextPublishSeqNo > 0)
{
lock (_confirmLock)
{
NextPublishSeqNo--;
_pendingDeliveryTags.RemoveLast();
}
}

throw;
}
}

public void BasicPublish<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
Expand All @@ -1254,8 +1271,24 @@ public void BasicPublish<TProperties>(CachedString exchange, CachedString routin
}
}

var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);
ChannelSend(in cmd, in basicProperties, body);
try
{
var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);
ChannelSend(in cmd, in basicProperties, body);
}
catch
{
if (NextPublishSeqNo > 0)
{
lock (_confirmLock)
{
NextPublishSeqNo--;
_pendingDeliveryTags.RemoveLast();
}
}

throw;
}
}

public ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
Expand All @@ -1269,8 +1302,24 @@ public ValueTask BasicPublishAsync<TProperties>(string exchange, string routingK
}
}

var cmd = new BasicPublish(exchange, routingKey, mandatory, default);
return ModelSendAsync(in cmd, in basicProperties, body);
try
{
var cmd = new BasicPublish(exchange, routingKey, mandatory, default);
return ModelSendAsync(in cmd, in basicProperties, body);
}
catch
{
if (NextPublishSeqNo > 0)
{
lock (_confirmLock)
{
NextPublishSeqNo--;
_pendingDeliveryTags.RemoveLast();
}
}

throw;
}
}

public ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
Expand All @@ -1284,8 +1333,24 @@ public ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedStr
}
}

var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);
return ModelSendAsync(in cmd, in basicProperties, body);
try
{
var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);
return ModelSendAsync(in cmd, in basicProperties, body);
}
catch
{
if (NextPublishSeqNo > 0)
{
lock (_confirmLock)
{
NextPublishSeqNo--;
_pendingDeliveryTags.RemoveLast();
}
}

throw;
}
}

public void UpdateSecret(string newSecret, string reason)
Expand Down Expand Up @@ -1755,6 +1820,11 @@ await ModelSendAsync(method)

private List<TaskCompletionSource<bool>> _confirmsTaskCompletionSources;

public bool WaitForConfirms()
{
return WaitForConfirmsAsync().EnsureCompleted();
}

public Task<bool> WaitForConfirmsAsync(CancellationToken token = default)
{
if (NextPublishSeqNo == 0UL)
Expand Down Expand Up @@ -1812,6 +1882,11 @@ await tokenRegistration.DisposeAsync()
}
}

public void WaitForConfirmsOrDie()
{
WaitForConfirmsOrDieAsync().EnsureCompleted();
}

public async Task WaitForConfirmsOrDieAsync(CancellationToken token = default)
{
try
Expand Down
3 changes: 0 additions & 3 deletions projects/Test/AsyncIntegration/TestConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@
// Copyright (c) 2011-2020 VMware, Inc. or its affiliates. All rights reserved.
//---------------------------------------------------------------------------

#if !NET6_0_OR_GREATER
using System;
#endif
using System;
using System.Collections.Generic;
using System.Threading;
Expand Down
42 changes: 40 additions & 2 deletions projects/Test/Integration/TestConfirmSelect.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ public TestConfirmSelect(ITestOutputHelper output) : base(output)
[Fact]
public void TestConfirmSelectIdempotency()
{
void Publish()
{
_channel.BasicPublish("", "amq.fanout", _encoding.GetBytes("message"));
}

_channel.ConfirmSelect();
Assert.Equal(1ul, _channel.NextPublishSeqNo);
Publish();
Expand All @@ -60,9 +65,42 @@ public void TestConfirmSelectIdempotency()
Assert.Equal(6ul, _channel.NextPublishSeqNo);
}

protected void Publish()
[Theory]
[InlineData(255)]
[InlineData(256)]
public void TestDeliveryTagDiverged_GH1043(ushort correlationIdLength)
{
_channel.BasicPublish("", "amq.fanout", _encoding.GetBytes("message"));
byte[] body = GetRandomBody(16);

_channel.ExchangeDeclare("sample", "fanout", autoDelete: true);
// _channel.BasicAcks += (s, e) => _output.WriteLine("Acked {0}", e.DeliveryTag);
_channel.ConfirmSelect();

var properties = new BasicProperties();
// _output.WriteLine("Client delivery tag {0}", _channel.NextPublishSeqNo);
_channel.BasicPublish(exchange: "sample", routingKey: string.Empty, in properties, body);
_channel.WaitForConfirmsOrDie();

try
{
properties = new BasicProperties
{
CorrelationId = new string('o', correlationIdLength)
};
// _output.WriteLine("Client delivery tag {0}", _channel.NextPublishSeqNo);
_channel.BasicPublish("sample", string.Empty, in properties, body);
_channel.WaitForConfirmsOrDie();
}
catch
{
// _output.WriteLine("Error when trying to publish with long string: {0}", e.Message);
}

properties = new BasicProperties();
// _output.WriteLine("Client delivery tag {0}", _channel.NextPublishSeqNo);
_channel.BasicPublish("sample", string.Empty, in properties, body);
_channel.WaitForConfirmsOrDie();
// _output.WriteLine("I'm done...");
}
}
}