diff --git a/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp b/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp index 0536a70ab74..bfce7f02038 100644 --- a/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp +++ b/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp @@ -830,6 +830,7 @@ bool DataReaderHistory::update_instance_nts( assert(vit != instances_.end()); assert(false == change->isRead); + auto previous_owner = vit->second->current_owner.first; ++counters_.samples_unread; bool ret = vit->second->update_state(counters_, change->kind, change->writerGUID, @@ -837,6 +838,31 @@ bool DataReaderHistory::update_instance_nts( change->reader_info.disposed_generation_count = vit->second->disposed_generation_count; change->reader_info.no_writers_generation_count = vit->second->no_writers_generation_count; + auto current_owner = vit->second->current_owner.first; + if (current_owner != previous_owner) + { + assert(current_owner == change->writerGUID); + + // Remove all changes from different owners after the change. + DataReaderInstance::ChangeCollection& changes = vit->second->cache_changes; + auto it = std::lower_bound(changes.begin(), changes.end(), change, rtps::history_order_cmp); + assert(it != changes.end()); + assert(*it == change); + ++it; + while (it != changes.end()) + { + if ((*it)->writerGUID != current_owner) + { + // Remove from history + remove_change_sub(*it, it); + + // Current iterator will point to change next to the one removed. Avoid incrementing. + continue; + } + ++it; + } + } + return ret; } diff --git a/test/blackbox/common/DDSBlackboxTestsOwnershipQos.cpp b/test/blackbox/common/DDSBlackboxTestsOwnershipQos.cpp index cbe1676f95c..49b00a2f2de 100644 --- a/test/blackbox/common/DDSBlackboxTestsOwnershipQos.cpp +++ b/test/blackbox/common/DDSBlackboxTestsOwnershipQos.cpp @@ -2161,6 +2161,142 @@ TEST_P(OwnershipQos, exclusive_kind_keyed_besteffort_disposing_instance) exclusive_kind_keyed_disposing_instance(false); } +/*! + * This is a regression test for redmine issue 20866. + * + * This test checks that a reader keeping a long number of samples and with an exclusive ownership policy only + * returns the data from the writer with the highest strength. + * + * @param use_keep_all_history Whether to use KEEP_ALL history or KEEP_LAST(20). + * @param mixed_data Whether to send data from both writers in an interleaved way. + */ +static void test_exclusive_kind_big_history( + bool use_keep_all_history, + bool mixed_data) +{ + PubSubReader reader(TEST_TOPIC_NAME); + PubSubWriter low_strength_writer(TEST_TOPIC_NAME); + PubSubWriter high_strength_writer(TEST_TOPIC_NAME); + + // Configure history QoS. + if (use_keep_all_history) + { + reader.history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS); + low_strength_writer.history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS); + high_strength_writer.history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS); + } + else + { + reader.history_kind(eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS).history_depth(20); + low_strength_writer.history_kind(eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS).history_depth(20); + high_strength_writer.history_kind(eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS).history_depth(20); + } + + // Prepare data. + std::list generated_data = default_keyedhelloworld_data_generator(20); + auto middle = std::next(generated_data.begin(), 10); + std::list low_strength_data(generated_data.begin(), middle); + std::list high_strength_data(middle, generated_data.end()); + auto expected_data = high_strength_data; + + if (mixed_data) + { + // Expect reception of the first two samples from the low strength writer (one per instance). + auto it = low_strength_data.begin(); + expected_data.push_front(*it++); + expected_data.push_front(*it); + } + + // Initialize writers. + low_strength_writer.ownership_strength(3) + .reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS) + .init(); + ASSERT_TRUE(low_strength_writer.isInitialized()); + + // High strength writer will use a custom transport to ensure its data is received after the low strength data. + auto test_transport = std::make_shared(); + std::atomic drop_messages(false); + test_transport->messages_filter_ = [&drop_messages](eprosima::fastrtps::rtps::CDRMessage_t&) + { + return drop_messages.load(); + }; + high_strength_writer.ownership_strength(4) + .reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS) + .disable_builtin_transport() + .add_user_transport_to_pparams(test_transport) + .init(); + ASSERT_TRUE(high_strength_writer.isInitialized()); + + // Initialize reader. + reader.ownership_exclusive() + .reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS) + .init(); + ASSERT_TRUE(reader.isInitialized()); + + // Wait for discovery. + low_strength_writer.wait_discovery(); + high_strength_writer.wait_discovery(); + reader.wait_discovery(std::chrono::seconds::zero(), 2); + + // Drop the messages from the high strength writer, so they arrive later to the reader. + drop_messages.store(true); + + if (mixed_data) + { + // Send one sample from each writer, with low strength data first. + while (!low_strength_data.empty() && !high_strength_data.empty()) + { + EXPECT_TRUE(low_strength_writer.send_sample(low_strength_data.front())); + EXPECT_TRUE(high_strength_writer.send_sample(high_strength_data.front())); + low_strength_data.pop_front(); + high_strength_data.pop_front(); + } + } + else + { + // Send high strength data first, so it has the lowest source timestamps, but drop the messages, so they arrive + // later to the reader. + high_strength_writer.send(high_strength_data); + EXPECT_TRUE(high_strength_data.empty()); + + // Send low strength data, so it has the highest source timestamps. + low_strength_writer.send(low_strength_data); + EXPECT_TRUE(low_strength_data.empty()); + } + + // Wait for the reader to receive the low strength data. + EXPECT_TRUE(low_strength_writer.waitForAllAcked(std::chrono::seconds(1))); + + // Let high strength writer send the data, and wait for the reader to receive it. + drop_messages.store(false); + EXPECT_TRUE(high_strength_writer.waitForAllAcked(std::chrono::seconds(1))); + + // Make the reader process the data, expecting only the required data. + // The issue was reproduced by the reader complaining about reception of unexpected data. + reader.startReception(expected_data); + reader.block_for_all(); +} + +TEST(OwnershipQos, exclusive_kind_keep_all_reliable) +{ + test_exclusive_kind_big_history(true, false); +} + +TEST(OwnershipQos, exclusive_kind_keep_all_reliable_mixed) +{ + test_exclusive_kind_big_history(true, true); +} + +TEST(OwnershipQos, exclusive_kind_keep_last_reliable) +{ + test_exclusive_kind_big_history(false, false); +} + +TEST(OwnershipQos, exclusive_kind_keep_last_reliable_mixed) +{ + test_exclusive_kind_big_history(false, true); +} + #ifdef INSTANTIATE_TEST_SUITE_P #define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w) #else diff --git a/test/unittest/dds/subscriber/DataReaderHistoryTests.cpp b/test/unittest/dds/subscriber/DataReaderHistoryTests.cpp index b80e24c6847..ef738e054c4 100644 --- a/test/unittest/dds/subscriber/DataReaderHistoryTests.cpp +++ b/test/unittest/dds/subscriber/DataReaderHistoryTests.cpp @@ -39,6 +39,23 @@ class TestType : public TopicDataType bool)); }; +bool add_test_change( + eprosima::fastdds::dds::detail::DataReaderHistory& history, + eprosima::fastrtps::rtps::CacheChange_t& change, + std::vector>& test_changes) +{ + ++change.sequenceNumber; + eprosima::fastrtps::rtps::Time_t::now(change.sourceTimestamp); + eprosima::fastrtps::rtps::CacheChange_t* new_change = new eprosima::fastrtps::rtps::CacheChange_t(); + new_change->copy(&change); + new_change->reader_info.writer_ownership_strength = change.reader_info.writer_ownership_strength; + + EXPECT_TRUE(history.received_change(new_change, 0)); + bool ret = history.update_instance_nts(new_change); + test_changes.push_back(std::unique_ptr(new_change)); + return ret; +} + /*! * \test DDS-OWN-HIST-01 Tests `DataReaderInstance` handles successfully the reception of Non-Keyed samples with * different Ownership's strength. @@ -53,6 +70,7 @@ TEST(DataReaderHistory, exclusive_ownership_non_keyed_sample_reception) DataReaderHistory history(type, topic, qos); eprosima::fastrtps::RecursiveTimedMutex mutex; eprosima::fastrtps::rtps::StatelessReader reader(&history, &mutex); + std::vector> changes; eprosima::fastrtps::rtps::CacheChange_t dw1_change; dw1_change.writerGUID = {{}, 1}; @@ -65,50 +83,32 @@ TEST(DataReaderHistory, exclusive_ownership_non_keyed_sample_reception) dw3_change.reader_info.writer_ownership_strength = 3; // Receives a sample with seq 1 from DW1 and update instance with strength 1. - ++dw1_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw1_change, 0)); - ASSERT_TRUE(history.update_instance_nts(&dw1_change)); + ASSERT_TRUE(add_test_change(history, dw1_change, changes)); // Receives a sample with seq 1 from DW2 and update instance with strength 2. - ++dw2_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw2_change, 0)); - ASSERT_TRUE(history.update_instance_nts(&dw2_change)); + ASSERT_TRUE(add_test_change(history, dw2_change, changes)); // Receives a sample with seq 2 from DW1 and update instance with strength 1. - ++dw1_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw1_change, 0)); - ASSERT_FALSE(history.update_instance_nts(&dw1_change)); + ASSERT_FALSE(add_test_change(history, dw1_change, changes)); // Receives a sample with seq 2 from DW2 and update instance with strength 2. - ++dw2_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw2_change, 0)); - ASSERT_TRUE(history.update_instance_nts(&dw2_change)); + ASSERT_TRUE(add_test_change(history, dw2_change, changes)); // Receives a sample with seq 1 from DW3 and update instance with strength 3. - ++dw3_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw3_change, 0)); - ASSERT_TRUE(history.update_instance_nts(&dw3_change)); + ASSERT_TRUE(add_test_change(history, dw3_change, changes)); // Receives a sample with seq 3 from DW1 and update instance with strength 1. - ++dw1_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw1_change, 0)); - ASSERT_FALSE(history.update_instance_nts(&dw1_change)); + ASSERT_FALSE(add_test_change(history, dw1_change, changes)); // Receives a sample with seq 3 from DW2 and update instance with strength 2. - ++dw2_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw2_change, 0)); - ASSERT_FALSE(history.update_instance_nts(&dw2_change)); + ASSERT_FALSE(add_test_change(history, dw2_change, changes)); // Receives a sample with seq 2 from DW3 and update instance with strength 1. - ++dw3_change.sequenceNumber; dw3_change.reader_info.writer_ownership_strength = 1; - ASSERT_TRUE(history.received_change(&dw3_change, 0)); - ASSERT_TRUE(history.update_instance_nts(&dw3_change)); + ASSERT_TRUE(add_test_change(history, dw3_change, changes)); // Receives a sample with seq 4 from DW2 and update instance with strength 2. - ++dw2_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw2_change, 0)); - ASSERT_TRUE(history.update_instance_nts(&dw2_change)); + ASSERT_TRUE(add_test_change(history, dw2_change, changes)); ASSERT_EQ(9u, history.getHistorySize()); } @@ -133,6 +133,7 @@ TEST(DataReaderHistory, exclusive_ownership_keyed_sample_reception) DataReaderHistory history(type, topic, qos); eprosima::fastrtps::RecursiveTimedMutex mutex; eprosima::fastrtps::rtps::StatelessReader reader(&history, &mutex); + std::vector> changes; const InstanceHandle_t instance_1 = eprosima::fastrtps::rtps::GUID_t{{}, 1}; const InstanceHandle_t instance_2 = eprosima::fastrtps::rtps::GUID_t{{}, 2}; @@ -149,112 +150,76 @@ TEST(DataReaderHistory, exclusive_ownership_keyed_sample_reception) // Receives instance 1 with seq 1 from DW1 and update instance with strength 1. dw1_change.instanceHandle = instance_1; - ++dw1_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw1_change, 0)); - ASSERT_TRUE(history.update_instance_nts(&dw1_change)); + ASSERT_TRUE(add_test_change(history, dw1_change, changes)); // Receives instance 2 with seq 2 from DW1 and update instance with strength 1. dw1_change.instanceHandle = instance_2; - ++dw1_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw1_change, 0)); - ASSERT_TRUE(history.update_instance_nts(&dw1_change)); + ASSERT_TRUE(add_test_change(history, dw1_change, changes)); // Receives instance 1 with seq 1 from DW2 and update instance with strength 2. dw2_change.instanceHandle = instance_1; - ++dw2_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw2_change, 0)); - ASSERT_TRUE(history.update_instance_nts(&dw2_change)); + ASSERT_TRUE(add_test_change(history, dw2_change, changes)); // Receives instance 1 with seq 3 from DW1 and update instance with strength 1. dw1_change.instanceHandle = instance_1; - ++dw1_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw1_change, 0)); - ASSERT_FALSE(history.update_instance_nts(&dw1_change)); + ASSERT_FALSE(add_test_change(history, dw1_change, changes)); // Receives instance 2 with seq 4 from DW1 and update instance with strength 1. dw1_change.instanceHandle = instance_2; - ++dw1_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw1_change, 0)); - ASSERT_TRUE(history.update_instance_nts(&dw1_change)); + ASSERT_TRUE(add_test_change(history, dw1_change, changes)); // Receives instance 2 with seq 1 from DW3 and update instance with strength 3. dw3_change.instanceHandle = instance_2; - ++dw3_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw3_change, 0)); - ASSERT_TRUE(history.update_instance_nts(&dw3_change)); + ASSERT_TRUE(add_test_change(history, dw3_change, changes)); // Receives instance 1 with seq 5 from DW1 and update instance with strength 1. dw1_change.instanceHandle = instance_1; - ++dw1_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw1_change, 0)); - ASSERT_FALSE(history.update_instance_nts(&dw1_change)); + ASSERT_FALSE(add_test_change(history, dw1_change, changes)); // Receives instance 2 with seq 6 from DW1 and update instance with strength 1. dw1_change.instanceHandle = instance_2; - ++dw1_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw1_change, 0)); - ASSERT_FALSE(history.update_instance_nts(&dw1_change)); + ASSERT_FALSE(add_test_change(history, dw1_change, changes)); // Receives instance 3 with seq 2 from DW3 and update instance with strength 3. dw3_change.instanceHandle = instance_3; - ++dw3_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw3_change, 0)); - ASSERT_TRUE(history.update_instance_nts(&dw3_change)); + ASSERT_TRUE(add_test_change(history, dw3_change, changes)); // Receives instance 3 with seq 2 from DW2 and update instance with strength 2. dw2_change.instanceHandle = instance_3; - ++dw2_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw2_change, 0)); - ASSERT_FALSE(history.update_instance_nts(&dw2_change)); + ASSERT_FALSE(add_test_change(history, dw2_change, changes)); // Receives instance 3 with seq 3 from DW3 and update instance with strength 1. dw3_change.instanceHandle = instance_3; dw3_change.reader_info.writer_ownership_strength = 1; - ++dw3_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw3_change, 0)); - ASSERT_TRUE(history.update_instance_nts(&dw3_change)); + ASSERT_TRUE(add_test_change(history, dw3_change, changes)); // Receives instance 3 with seq 3 from DW2 and update instance with strength 2. dw2_change.instanceHandle = instance_3; - ++dw2_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw2_change, 0)); - ASSERT_TRUE(history.update_instance_nts(&dw2_change)); + ASSERT_TRUE(add_test_change(history, dw2_change, changes)); // Receives instance 1 with seq 7 from DW1 and update instance with strength 1. dw1_change.instanceHandle = instance_1; - ++dw1_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw1_change, 0)); - ASSERT_FALSE(history.update_instance_nts(&dw1_change)); + ASSERT_FALSE(add_test_change(history, dw1_change, changes)); // Receives instance 2 with seq 8 from DW1 and update instance with strength 1. dw1_change.instanceHandle = instance_2; - ++dw1_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw1_change, 0)); - ASSERT_FALSE(history.update_instance_nts(&dw1_change)); + ASSERT_FALSE(add_test_change(history, dw1_change, changes)); // Receives instance 3 with seq 9 from DW1 and update instance with strength 1. dw1_change.instanceHandle = instance_3; - ++dw1_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw1_change, 0)); - ASSERT_FALSE(history.update_instance_nts(&dw1_change)); + ASSERT_FALSE(add_test_change(history, dw1_change, changes)); // Receives instance 1 with seq 4 from DW2 and update instance with strength 2. dw2_change.instanceHandle = instance_1; - ++dw2_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw2_change, 0)); - ASSERT_TRUE(history.update_instance_nts(&dw2_change)); + ASSERT_TRUE(add_test_change(history, dw2_change, changes)); // Receives instance 2 with seq 5 from DW2 and update instance with strength 2. dw2_change.instanceHandle = instance_2; - ++dw2_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw2_change, 0)); - ASSERT_FALSE(history.update_instance_nts(&dw2_change)); + ASSERT_FALSE(add_test_change(history, dw2_change, changes)); // Receives instance 3 with seq 6 from DW2 and update instance with strength 2. dw2_change.instanceHandle = instance_3; - ++dw2_change.sequenceNumber; - ASSERT_TRUE(history.received_change(&dw2_change, 0)); - ASSERT_TRUE(history.update_instance_nts(&dw2_change)); + ASSERT_TRUE(add_test_change(history, dw2_change, changes)); ASSERT_EQ(18u, history.getHistorySize()); }