Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track publisher confirmations automatically #1687

Merged
merged 1 commit into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion projects/RabbitMQ.Client/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,15 @@ public static class Constants
/// <summary>
/// The default consumer dispatch concurrency. See <see cref="IConnectionFactory.ConsumerDispatchConcurrency"/>
/// to set this value for every channel created on a connection,
/// and <see cref="IConnection.CreateChannelAsync(ushort?, System.Threading.CancellationToken)"/>
/// and <see cref="IConnection.CreateChannelAsync(CreateChannelOptions?, System.Threading.CancellationToken)" />
/// for setting this value for a particular channel.
/// </summary>
public const ushort DefaultConsumerDispatchConcurrency = 1;

/// <summary>
/// The message header used to track publish sequence numbers, to allow correlation when
/// <c>basic.return</c> is sent via the broker.
/// </summary>
public const string PublishSequenceNumberHeader = "x-dotnet-pub-seq-no";
}
}
35 changes: 35 additions & 0 deletions projects/RabbitMQ.Client/CreateChannelOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
namespace RabbitMQ.Client
{
/// <summary>
/// Channel creation options.
/// </summary>
public sealed class CreateChannelOptions
{
/// <summary>
/// Enable or disable publisher confirmations on this channel. Defaults to <c>false</c>
/// </summary>
public bool PublisherConfirmationsEnabled { get; set; } = false;

/// <summary>
/// Should this library track publisher confirmations for you? Defaults to <c>false</c>
/// </summary>
public bool PublisherConfirmationTrackingEnabled { get; set; } = false;

/// <summary>
/// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one <see cref="IAsyncBasicConsumer"/>
/// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
/// <see cref="IAsyncBasicConsumer"/> can handle concurrency much more efficiently due to the non-blocking nature of the consumer.
///
/// Defaults to <c>null</c>, which will use the value from <see cref="IConnectionFactory.ConsumerDispatchConcurrency"/>
///
/// For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
/// In addition to that consumers need to be thread/concurrency safe.
/// </summary>
public ushort? ConsumerDispatchConcurrency { get; set; } = null;

/// <summary>
/// The default channel options.
/// </summary>
public static CreateChannelOptions Default { get; } = new CreateChannelOptions();
}
}
3 changes: 2 additions & 1 deletion projects/RabbitMQ.Client/Events/ShutdownEventArgs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ public ShutdownEventArgs(ShutdownInitiator initiator, ushort replyCode, string r
/// <summary>
/// Construct a <see cref="ShutdownEventArgs"/> with the given parameters.
/// </summary>
public ShutdownEventArgs(ShutdownInitiator initiator, ushort replyCode, string replyText, Exception exception, CancellationToken cancellationToken = default)
public ShutdownEventArgs(ShutdownInitiator initiator, ushort replyCode, string replyText,
Exception exception, CancellationToken cancellationToken = default)
: this(initiator, replyCode, replyText, 0, 0, cancellationToken: cancellationToken)
{
_exception = exception ?? throw new ArgumentNullException(nameof(exception));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,35 +45,30 @@ namespace RabbitMQ.Client.Exceptions
public class OperationInterruptedException
: RabbitMQClientException
{
///<summary>Construct an OperationInterruptedException with
///the passed-in explanation, if any.</summary>
public OperationInterruptedException(ShutdownEventArgs? reason)
: base(reason is null ? "The AMQP operation was interrupted" :
$"The AMQP operation was interrupted: {reason}")
///<summary>
///Construct an OperationInterruptedException
///</summary>
public OperationInterruptedException() : base("The AMQP operation was interrupted")
{
ShutdownReason = reason;
}

///<summary>Construct an OperationInterruptedException with
///the passed-in explanation and prefix, if any.</summary>
public OperationInterruptedException(ShutdownEventArgs? reason, string prefix)
: base(reason is null ? $"{prefix}: The AMQP operation was interrupted" :
$"{prefix}: The AMQP operation was interrupted: {reason}")
{
ShutdownReason = reason;
}

protected OperationInterruptedException()
///<summary>
///Construct an OperationInterruptedException with
///the passed-in explanation, if any.
///</summary>
public OperationInterruptedException(ShutdownEventArgs reason)
: base($"The AMQP operation was interrupted: {reason}", reason.Exception)
{
ShutdownReason = reason;
}

protected OperationInterruptedException(string message) : base(message)
{
}

protected OperationInterruptedException(string message, Exception inner)
: base(message, inner)
///<summary>Construct an OperationInterruptedException with
///the passed-in explanation and prefix, if any.</summary>
public OperationInterruptedException(ShutdownEventArgs reason, string prefix)
: base($"{prefix}: The AMQP operation was interrupted: {reason}", reason.Exception)
{
ShutdownReason = reason;
}

///<summary>Retrieves the explanation for the shutdown. May
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@ namespace RabbitMQ.Client.Exceptions
[Serializable]
public class ProtocolViolationException : RabbitMQClientException
{
public ProtocolViolationException(string message) : base(message)
public ProtocolViolationException() : base()
{
}
public ProtocolViolationException(string message, Exception inner) : base(message, inner)

public ProtocolViolationException(string message) : base(message)
{
}
public ProtocolViolationException()

public ProtocolViolationException(string message, Exception inner) : base(message, inner)
{
}
}
Expand Down
66 changes: 66 additions & 0 deletions projects/RabbitMQ.Client/Exceptions/PublishException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
//
// The APL v2.0:
//
//---------------------------------------------------------------------------
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//---------------------------------------------------------------------------
//
// The MPL v2.0:
//
//---------------------------------------------------------------------------
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
//---------------------------------------------------------------------------

using System;

namespace RabbitMQ.Client.Exceptions
{
/// <summary>
/// Class for exceptions related to publisher confirmations
/// or the <c>mandatory</c> flag.
/// </summary>
public class PublishException : RabbitMQClientException
{
private bool _isReturn = false;
private ulong _publishSequenceNumber = ulong.MinValue;

public PublishException(ulong publishSequenceNumber, bool isReturn) : base()
{
if (publishSequenceNumber == ulong.MinValue)
{
throw new ArgumentException($"{nameof(publishSequenceNumber)} must not be 0");
}

_isReturn = isReturn;
_publishSequenceNumber = publishSequenceNumber;
}

/// <summary>
/// <c>true</c> if this exception is due to a <c>basic.return</c>
/// </summary>
public bool IsReturn => _isReturn;

/// <summary>
/// Retrieve the publish sequence number.
/// </summary>
public ulong PublishSequenceNumber => _publishSequenceNumber;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,21 @@ namespace RabbitMQ.Client.Exceptions
public abstract class RabbitMQClientException : Exception
{
/// <summary>Initializes a new instance of the <see cref="RabbitMQClientException" /> class.</summary>
protected RabbitMQClientException()
protected RabbitMQClientException() : base()
{

}

/// <summary>Initializes a new instance of the <see cref="RabbitMQClientException" /> class with a specified error message.</summary>
/// <param name="message">The message that describes the error. </param>
protected RabbitMQClientException(string message) : base(message)
{

}

/// <summary>Initializes a new instance of the <see cref="RabbitMQClientException" /> class with a specified error message and a reference to the inner exception that is the cause of this exception.</summary>
/// <param name="message">The error message that explains the reason for the exception. </param>
/// <param name="innerException">The exception that is the cause of the current exception, or a null reference (Nothing in Visual Basic) if no inner exception is specified. </param>
protected RabbitMQClientException(string message, Exception innerException) : base(message, innerException)
protected RabbitMQClientException(string message, Exception? innerException) : base(message, innerException)
{

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
namespace RabbitMQ.Client.Exceptions
{
/// <summary>
/// TODO WHY IS THIS UNREFERENCED?
/// Thrown when the channel receives an RPC reply that it wasn't expecting.
/// </summary>
[Serializable]
Expand Down
1 change: 1 addition & 0 deletions projects/RabbitMQ.Client/Framing/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

namespace RabbitMQ.Client.Framing
{
// TODO merge into ChannelBase
internal class Channel : ChannelBase
{
public Channel(ConnectionConfig config, ISession session, ushort? consumerDispatchConcurrency = null)
Expand Down
36 changes: 2 additions & 34 deletions projects/RabbitMQ.Client/IChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ Task<string> BasicConsumeAsync(string queue, bool autoAck, string consumerTag, b
/// <param name="cancellationToken">CancellationToken for this operation.</param>
/// <remarks>
/// Routing key must be shorter than 255 bytes.
/// Throws <see cref="Exceptions.PublishException"/> if a nack or basic.return is returned for the message.
/// </remarks>
ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey,
bool mandatory, TProperties basicProperties, ReadOnlyMemory<byte> body,
Expand All @@ -221,6 +222,7 @@ ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey,
/// <param name="cancellationToken">CancellationToken for this operation.</param>
/// <remarks>
/// Routing key must be shorter than 255 bytes.
/// Throws <see cref="Exceptions.PublishException"/> if a nack or basic.return is returned for the message.
/// </remarks>
ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey,
bool mandatory, TProperties basicProperties, ReadOnlyMemory<byte> body,
Expand Down Expand Up @@ -265,14 +267,6 @@ Task CloseAsync(ushort replyCode, string replyText, bool abort,
Task CloseAsync(ShutdownEventArgs reason, bool abort,
CancellationToken cancellationToken = default);

/// <summary>
/// Asynchronously enable publisher confirmations.
/// </summary>
/// <param name="trackConfirmations">Set to <c>false</c> if tracking via <see cref="BasicAcksAsync"/> and <see cref="BasicNacksAsync"/> yourself.</param>
/// <param name="cancellationToken">CancellationToken for this operation.</param>
Task ConfirmSelectAsync(bool trackConfirmations = true,
CancellationToken cancellationToken = default);

/// <summary>Asynchronously declare an exchange.</summary>
/// <param name="exchange">The name of the exchange.</param>
/// <param name="type">The type of the exchange.</param>
Expand Down Expand Up @@ -451,32 +445,6 @@ Task QueueUnbindAsync(string queue, string exchange, string routingKey,
/// <param name="cancellationToken">The cancellation token.</param>
Task TxSelectAsync(CancellationToken cancellationToken = default);

/// <summary>
/// Asynchronously wait until all published messages on this channel have been confirmed.
/// </summary>
/// <returns>True if no nacks were received within the timeout, otherwise false.</returns>
/// <param name="cancellationToken">The cancellation token.</param>
/// <remarks>
/// Waits until all messages published on this channel since the last call have
/// been either ack'd or nack'd by the server. Returns whether
/// all the messages were ack'd (and none were nack'd).
/// Throws an exception when called on a channel
/// that does not have publisher confirms enabled.
/// </remarks>
Task<bool> WaitForConfirmsAsync(CancellationToken cancellationToken = default);

/// <summary>
/// Wait until all published messages on this channel have been confirmed.
/// </summary>
/// <param name="cancellationToken">The cancellation token.</param>
/// <remarks>
/// Waits until all messages published on this channel since the last call have
/// been ack'd by the server. If a nack is received or the timeout
/// elapses, throws an IOException exception immediately and closes
/// the channel.
/// </remarks>
Task WaitForConfirmsOrDieAsync(CancellationToken cancellationToken = default);

/// <summary>
/// Amount of time protocol operations (e.g. <code>queue.declare</code>) are allowed to take before
/// timing out.
Expand Down
13 changes: 3 additions & 10 deletions projects/RabbitMQ.Client/IConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -240,18 +240,11 @@ Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abo
/// <summary>
/// Asynchronously create and return a fresh channel, session, and channel.
/// </summary>
/// <param name="consumerDispatchConcurrency">
/// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one <see cref="IAsyncBasicConsumer"/>
/// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
/// <see cref="IAsyncBasicConsumer"/> can handle concurrency much more efficiently due to the non-blocking nature of the consumer.
///
/// Defaults to <c>null</c>, which will use the value from <see cref="IConnectionFactory.ConsumerDispatchConcurrency"/>
///
/// For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
/// In addition to that consumers need to be thread/concurrency safe.
/// <param name="options">
/// The channel creation options.
/// </param>
/// <param name="cancellationToken">Cancellation token</param>
Task<IChannel> CreateChannelAsync(ushort? consumerDispatchConcurrency = null,
Task<IChannel> CreateChannelAsync(CreateChannelOptions? options = default,
CancellationToken cancellationToken = default);
}
}
Loading