Skip to content

Commit

Permalink
Merge pull request #1616 from rabbitmq/rabbitmq-users-hk5pJ4cKF0c
Browse files Browse the repository at this point in the history
Ensure that arguments passed to recorded entities are copied.
  • Loading branch information
lukebakken authored Jun 27, 2024
2 parents 6673bd5 + 870a1f4 commit 89d472d
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 9 deletions.
9 changes: 9 additions & 0 deletions projects/RabbitMQ.Client/client/impl/RecordedBinding.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,15 @@ public RecordedBinding(bool isQueueBinding, string destination, string source, s
_source = source;
_routingKey = routingKey;
_arguments = arguments;

if (arguments is null)
{
_arguments = null;
}
else
{
_arguments = new Dictionary<string, object>(arguments);
}
}

public RecordedBinding(string destination, in RecordedBinding old)
Expand Down
10 changes: 9 additions & 1 deletion projects/RabbitMQ.Client/client/impl/RecordedConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,15 @@ public RecordedConsumer(AutorecoveringChannel channel, IBasicConsumer consumer,

_autoAck = autoAck;
_exclusive = exclusive;
_arguments = arguments;

if (arguments is null)
{
_arguments = null;
}
else
{
_arguments = new Dictionary<string, object>(arguments);
}
}

public AutorecoveringChannel Channel => _channel;
Expand Down
10 changes: 9 additions & 1 deletion projects/RabbitMQ.Client/client/impl/RecordedExchange.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,15 @@ public RecordedExchange(string name, string type, bool durable, bool autoDelete,
_type = type;
_durable = durable;
_autoDelete = autoDelete;
_arguments = arguments;

if (arguments is null)
{
_arguments = null;
}
else
{
_arguments = new Dictionary<string, object>(arguments);
}
}

public Task RecoverAsync(IChannel channel, CancellationToken cancellationToken)
Expand Down
10 changes: 9 additions & 1 deletion projects/RabbitMQ.Client/client/impl/RecordedQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,15 @@ public RecordedQueue(string name, bool isServerNamed, bool durable, bool exclusi
_durable = durable;
_exclusive = exclusive;
_autoDelete = autoDelete;
_arguments = arguments;

if (arguments is null)
{
_arguments = null;
}
else
{
_arguments = new Dictionary<string, object>(arguments);
}
}

public RecordedQueue(string newName, in RecordedQueue old)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,17 @@ public async Task TestRecoveringConsumerEventHandlers_Called(int iterations)
[Fact]
public async Task TestRecoveringConsumerEventHandler_EventArgumentsArePassedDown()
{
var myArgs = new Dictionary<string, object> { { "first-argument", "some-value" } };
const string key = "first-argument";
const string value = "some-value";

IDictionary<string, object> arguments = new Dictionary<string, object>
{
{ key, value }
};

RabbitMQ.Client.QueueDeclareOk q = await _channel.QueueDeclareAsync(GenerateQueueName(), false, false, false);
var cons = new EventingBasicConsumer(_channel);
string expectedCTag = await _channel.BasicConsumeAsync(cons, q, arguments: myArgs);
string expectedCTag = await _channel.BasicConsumeAsync(cons, q, arguments: arguments);

bool ctagMatches = false;
bool consumerArgumentMatches = false;
Expand All @@ -82,15 +89,14 @@ public async Task TestRecoveringConsumerEventHandler_EventArgumentsArePassedDown
// passed to a CallbackExceptionHandler, instead of failing the test. Instead, we have to do this trick
// and assert in the test function.
ctagMatches = args.ConsumerTag == expectedCTag;
consumerArgumentMatches = (string)args.ConsumerArguments["first-argument"] == "some-value";
args.ConsumerArguments["first-argument"] = "event-handler-set-this-value";
consumerArgumentMatches = (string)args.ConsumerArguments[key] == value;
};

await CloseAndWaitForRecoveryAsync();
Assert.True(ctagMatches, "expected consumer tag to match");
Assert.True(consumerArgumentMatches, "expected consumer arguments to match");
string actualVal = (string)Assert.Contains("first-argument", myArgs as IDictionary<string, object>);
Assert.Equal("event-handler-set-this-value", actualVal);
string actualVal = (string)Assert.Contains(key, arguments);
Assert.Equal(value, actualVal);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
//
// The APL v2.0:
//
//---------------------------------------------------------------------------
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//---------------------------------------------------------------------------
//
// The MPL v2.0:
//
//---------------------------------------------------------------------------
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
//---------------------------------------------------------------------------

using System.Collections.Generic;
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Xunit;
using Xunit.Abstractions;

namespace Test.Integration.ConnectionRecovery
{
public class TestQueueRecoveryWithArguments : TestConnectionRecoveryBase
{
public TestQueueRecoveryWithArguments(ITestOutputHelper output) : base(output)
{
}

[Fact]
public async Task TestQueueRecoveryWithDlxArgument_RabbitMQUsers_hk5pJ4cKF0c()
{
string tdiWaitExchangeName = GenerateExchangeName();
string tdiRetryExchangeName = GenerateExchangeName();
string testRetryQueueName = GenerateQueueName();
string testQueueName = GenerateQueueName();

await _channel.ExchangeDeclareAsync(exchange: tdiWaitExchangeName,
type: ExchangeType.Topic, durable: true, autoDelete: false, arguments: null);
await _channel.ExchangeDeclareAsync(exchange: tdiRetryExchangeName,
type: ExchangeType.Topic, durable: true, autoDelete: false, arguments: null);

var arguments = new Dictionary<string, object>
{
{ "x-dead-letter-exchange", "tdi.retry.exchange" },
{ "x-dead-letter-routing-key", "QueueTest" }
};

await _channel.QueueDeclareAsync(testRetryQueueName, durable: false, exclusive: false, autoDelete: false, arguments);

arguments["x-dead-letter-exchange"] = "tdi.wait.exchange";
arguments["x-dead-letter-routing-key"] = "QueueTest";

await _channel.QueueDeclareAsync(testQueueName, durable: false, exclusive: false, autoDelete: false, arguments);

arguments.Remove("x-dead-letter-exchange");
arguments.Remove("x-dead-letter-routing-key");

await _channel.QueueBindAsync(testRetryQueueName, tdiWaitExchangeName, testQueueName);

await _channel.QueueBindAsync(testQueueName, tdiRetryExchangeName, testQueueName);

var consumerAsync = new EventingBasicConsumer(_channel);
await _channel.BasicConsumeAsync(queue: testQueueName, autoAck: false, consumer: consumerAsync);

await CloseAndWaitForRecoveryAsync();

QueueDeclareOk q0 = await _channel.QueueDeclarePassiveAsync(testRetryQueueName);
Assert.Equal(testRetryQueueName, q0.QueueName);

QueueDeclareOk q1 = await _channel.QueueDeclarePassiveAsync(testQueueName);
Assert.Equal(testQueueName, q1.QueueName);
}
}
}

0 comments on commit 89d472d

Please sign in to comment.