diff --git a/velox/common/memory/Memory.cpp b/velox/common/memory/Memory.cpp index fed1d3c2b6dc..72b1882f2685 100644 --- a/velox/common/memory/Memory.cpp +++ b/velox/common/memory/Memory.cpp @@ -157,6 +157,7 @@ MemoryManager::MemoryManager(const MemoryManagerOptions& options) .coreOnAllocationFailureEnabled = options.coreOnAllocationFailureEnabled})}, spillPool_{addLeafPool("__sys_spilling__")}, + tracePool_{addLeafPool("__sys_tracing__")}, sharedLeafPools_(createSharedLeafMemoryPools(*sysRoot_)) { VELOX_CHECK_NOT_NULL(allocator_); VELOX_CHECK_NOT_NULL(arbitrator_); @@ -412,4 +413,8 @@ memory::MemoryPool* spillMemoryPool() { bool isSpillMemoryPool(memory::MemoryPool* pool) { return pool == spillMemoryPool(); } + +memory::MemoryPool* traceMemoryPool() { + return memory::MemoryManager::getInstance()->tracePool(); +} } // namespace facebook::velox::memory diff --git a/velox/common/memory/Memory.h b/velox/common/memory/Memory.h index 5ce372e77dc4..d8849d9acce5 100644 --- a/velox/common/memory/Memory.h +++ b/velox/common/memory/Memory.h @@ -339,6 +339,11 @@ class MemoryManager { return spillPool_.get(); } + /// Returns the process wide leaf memory pool used for query tracing. + MemoryPool* tracePool() const { + return tracePool_.get(); + } + const std::vector>& testingSharedLeafPools() { return sharedLeafPools_; } @@ -366,6 +371,7 @@ class MemoryManager { const std::shared_ptr sysRoot_; const std::shared_ptr spillPool_; + const std::shared_ptr tracePool_; const std::vector> sharedLeafPools_; mutable folly::SharedMutex mutex_; @@ -412,6 +418,9 @@ memory::MemoryPool* spillMemoryPool(); /// Returns true if the provided 'pool' is the spilling memory pool. bool isSpillMemoryPool(memory::MemoryPool* pool); +/// Returns the system-wide memory pool for tracing memory usage. +memory::MemoryPool* traceMemoryPool(); + FOLLY_ALWAYS_INLINE int32_t alignmentPadding(void* address, int32_t alignment) { auto extra = reinterpret_cast(address) % alignment; return extra == 0 ? 0 : alignment - extra; diff --git a/velox/common/memory/tests/MemoryManagerTest.cpp b/velox/common/memory/tests/MemoryManagerTest.cpp index 348ed8db3377..3b12d6946614 100644 --- a/velox/common/memory/tests/MemoryManagerTest.cpp +++ b/velox/common/memory/tests/MemoryManagerTest.cpp @@ -53,7 +53,7 @@ TEST_F(MemoryManagerTest, ctor) { const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools; { MemoryManager manager{}; - ASSERT_EQ(manager.numPools(), 1); + ASSERT_EQ(manager.numPools(), 2); ASSERT_EQ(manager.capacity(), kMaxMemory); ASSERT_EQ(0, manager.getTotalBytes()); ASSERT_EQ(manager.alignment(), MemoryAllocator::kMaxAlignment); @@ -69,7 +69,7 @@ TEST_F(MemoryManagerTest, ctor) { .arbitratorCapacity = kCapacity, .arbitratorReservedCapacity = 0}}; ASSERT_EQ(kCapacity, manager.capacity()); - ASSERT_EQ(manager.numPools(), 1); + ASSERT_EQ(manager.numPools(), 2); ASSERT_EQ(manager.testingDefaultRoot().alignment(), manager.alignment()); } { @@ -84,7 +84,7 @@ TEST_F(MemoryManagerTest, ctor) { ASSERT_EQ(manager.testingDefaultRoot().alignment(), manager.alignment()); // TODO: replace with root pool memory tracker quota check. ASSERT_EQ( - kSharedPoolCount + 1, manager.testingDefaultRoot().getChildCount()); + kSharedPoolCount + 2, manager.testingDefaultRoot().getChildCount()); ASSERT_EQ(kCapacity, manager.capacity()); ASSERT_EQ(0, manager.getTotalBytes()); } @@ -103,7 +103,7 @@ TEST_F(MemoryManagerTest, ctor) { ASSERT_EQ( manager.toString(), "Memory Manager[capacity 4.00GB alignment 64B usedBytes 0B number of " - "pools 1\nList of root pools:\n\t__sys_root__\n" + "pools 2\nList of root pools:\n\t__sys_root__\n" "Memory Allocator[MALLOC capacity 4.00GB allocated bytes 0 " "allocated pages 0 mapped pages 0]\n" "ARBITRATOR[SHARED CAPACITY[4.00GB] PENDING[0] " @@ -246,10 +246,10 @@ TEST_F(MemoryManagerTest, addPoolWithArbitrator) { TEST_F(MemoryManagerTest, defaultMemoryManager) { auto& managerA = toMemoryManager(deprecatedDefaultMemoryManager()); auto& managerB = toMemoryManager(deprecatedDefaultMemoryManager()); - const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 1; - ASSERT_EQ(managerA.numPools(), 1); + const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 2; + ASSERT_EQ(managerA.numPools(), 2); ASSERT_EQ(managerA.testingDefaultRoot().getChildCount(), kSharedPoolCount); - ASSERT_EQ(managerB.numPools(), 1); + ASSERT_EQ(managerB.numPools(), 2); ASSERT_EQ(managerB.testingDefaultRoot().getChildCount(), kSharedPoolCount); auto child1 = managerA.addLeafPool("child_1"); @@ -260,41 +260,44 @@ TEST_F(MemoryManagerTest, defaultMemoryManager) { kSharedPoolCount + 2, managerA.testingDefaultRoot().getChildCount()); EXPECT_EQ( kSharedPoolCount + 2, managerB.testingDefaultRoot().getChildCount()); - ASSERT_EQ(managerA.numPools(), 3); - ASSERT_EQ(managerB.numPools(), 3); - auto pool = managerB.addRootPool(); ASSERT_EQ(managerA.numPools(), 4); ASSERT_EQ(managerB.numPools(), 4); + auto pool = managerB.addRootPool(); + ASSERT_EQ(managerA.numPools(), 5); + ASSERT_EQ(managerB.numPools(), 5); ASSERT_EQ( managerA.toString(), - "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 4\nList of root pools:\n\t__sys_root__\n\tdefault_root_0\n\trefcount 2\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]"); + "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 5\nList of root pools:\n\t__sys_root__\n\tdefault_root_0\n\trefcount 2\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]"); ASSERT_EQ( managerB.toString(), - "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 4\nList of root pools:\n\t__sys_root__\n\tdefault_root_0\n\trefcount 2\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]"); + "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 5\nList of root pools:\n\t__sys_root__\n\tdefault_root_0\n\trefcount 2\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]"); child1.reset(); EXPECT_EQ( kSharedPoolCount + 1, managerA.testingDefaultRoot().getChildCount()); child2.reset(); EXPECT_EQ(kSharedPoolCount, managerB.testingDefaultRoot().getChildCount()); + ASSERT_EQ(managerA.numPools(), 3); + ASSERT_EQ(managerB.numPools(), 3); + pool.reset(); ASSERT_EQ(managerA.numPools(), 2); ASSERT_EQ(managerB.numPools(), 2); - pool.reset(); - ASSERT_EQ(managerA.numPools(), 1); - ASSERT_EQ(managerB.numPools(), 1); ASSERT_EQ( managerA.toString(), - "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 1\nList of root pools:\n\t__sys_root__\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]"); + "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 2\nList of root pools:\n\t__sys_root__\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]"); ASSERT_EQ( managerB.toString(), - "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 1\nList of root pools:\n\t__sys_root__\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]"); + "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 2\nList of root pools:\n\t__sys_root__\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]"); const std::string detailedManagerStr = managerA.toString(true); ASSERT_THAT( detailedManagerStr, testing::HasSubstr( - "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 1\nList of root pools:\n__sys_root__ usage 0B reserved 0B peak 0B\n")); + "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 2\nList of root pools:\n__sys_root__ usage 0B reserved 0B peak 0B\n")); ASSERT_THAT( detailedManagerStr, testing::HasSubstr("__sys_spilling__ usage 0B reserved 0B peak 0B\n")); + ASSERT_THAT( + detailedManagerStr, + testing::HasSubstr("__sys_tracing__ usage 0B reserved 0B peak 0B\n")); for (int i = 0; i < 32; ++i) { ASSERT_THAT( managerA.toString(true), @@ -306,7 +309,7 @@ TEST_F(MemoryManagerTest, defaultMemoryManager) { // TODO: remove this test when remove deprecatedAddDefaultLeafMemoryPool. TEST(MemoryHeaderTest, addDefaultLeafMemoryPool) { auto& manager = toMemoryManager(deprecatedDefaultMemoryManager()); - const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 1; + const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 2; ASSERT_EQ(manager.testingDefaultRoot().getChildCount(), kSharedPoolCount); { auto poolA = deprecatedAddDefaultLeafMemoryPool(); @@ -361,7 +364,7 @@ TEST_F(MemoryManagerTest, memoryPoolManagement) { MemoryManagerOptions options; options.alignment = alignment; MemoryManager manager{options}; - ASSERT_EQ(manager.numPools(), 1); + ASSERT_EQ(manager.numPools(), 2); const int numPools = 100; std::vector> userRootPools; std::vector> userLeafPools; @@ -386,14 +389,14 @@ TEST_F(MemoryManagerTest, memoryPoolManagement) { ASSERT_FALSE(rootUnamedPool->name().empty()); ASSERT_EQ(rootUnamedPool->kind(), MemoryPool::Kind::kAggregate); ASSERT_EQ(rootUnamedPool->parent(), nullptr); - ASSERT_EQ(manager.numPools(), 1 + numPools + 2); + ASSERT_EQ(manager.numPools(), 1 + numPools + 2 + 1); userLeafPools.clear(); leafUnamedPool.reset(); - ASSERT_EQ(manager.numPools(), 1 + numPools / 2 + 1); + ASSERT_EQ(manager.numPools(), 1 + numPools / 2 + 1 + 1); userRootPools.clear(); - ASSERT_EQ(manager.numPools(), 1 + 1); + ASSERT_EQ(manager.numPools(), 1 + 2); rootUnamedPool.reset(); - ASSERT_EQ(manager.numPools(), 1); + ASSERT_EQ(manager.numPools(), 2); } // TODO: when run sequentially, e.g. `buck run dwio/memory/...`, this has side @@ -410,7 +413,7 @@ TEST_F(MemoryManagerTest, globalMemoryManager) { ASSERT_NE(manager, globalManager); ASSERT_EQ(manager, memoryManager()); auto* managerII = memoryManager(); - const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 1; + const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 2; { auto& rootI = manager->testingDefaultRoot(); const std::string childIName("some_child"); @@ -444,9 +447,9 @@ TEST_F(MemoryManagerTest, globalMemoryManager) { ASSERT_EQ(userRootChild->kind(), MemoryPool::Kind::kAggregate); ASSERT_EQ(rootI.getChildCount(), kSharedPoolCount + 1); ASSERT_EQ(rootII.getChildCount(), kSharedPoolCount + 1); - ASSERT_EQ(manager->numPools(), 2 + 1); + ASSERT_EQ(manager->numPools(), 2 + 2); } - ASSERT_EQ(manager->numPools(), 1); + ASSERT_EQ(manager->numPools(), 2); } TEST_F(MemoryManagerTest, alignmentOptionCheck) { @@ -544,9 +547,9 @@ TEST_F(MemoryManagerTest, concurrentPoolAccess) { } stopCheck = true; checkThread.join(); - ASSERT_EQ(manager.numPools(), pools.size() + 1); + ASSERT_EQ(manager.numPools(), pools.size() + 2); pools.clear(); - ASSERT_EQ(manager.numPools(), 1); + ASSERT_EQ(manager.numPools(), 2); } TEST_F(MemoryManagerTest, quotaEnforcement) { diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index 78f51375fa45..bc1a24976a71 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -346,6 +346,16 @@ class QueryConfig { /// derived using micro-benchmarking. static constexpr const char* kPrefixSortMinRows = "prefixsort_min_rows"; + /// Enable query tracing flag. + static constexpr const char* kQueryTraceEnabled = "query_trace_enabled"; + + /// Base dir of a query to store tracing data. + static constexpr const char* kQueryTraceDir = "query_trace_dir"; + + /// A comma-separated list of plan node ids whose input data will be traced. + /// Empty string if only want to trace the query metadata. + static constexpr const char* kQueryTraceNodeIds = "query_trace_node_ids"; + uint64_t queryMaxMemoryPerNode() const { return config::toCapacity( get(kQueryMaxMemoryPerNode, "0B"), @@ -611,6 +621,21 @@ class QueryConfig { return get(kSpillableReservationGrowthPct, kDefaultPct); } + /// Returns true if query tracing is enabled. + bool queryTraceEnabled() const { + return get(kQueryTraceEnabled, false); + } + + std::string queryTraceDir() const { + // The default query trace dir, empty by default. + return get(kQueryTraceDir, ""); + } + + std::string queryTraceNodeIds() const { + // The default query trace nodes, empty by default. + return get(kQueryTraceNodeIds, ""); + } + bool prestoArrayAggIgnoreNulls() const { return get(kPrestoArrayAggIgnoreNulls, false); } diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index 38006f9baf53..d11e492a2580 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -293,6 +293,7 @@ Task::Task( planFragment_(std::move(planFragment)), destination_(destination), queryCtx_(std::move(queryCtx)), + traceConfig_(maybeMakeTraceConfig()), mode_(mode), consumerSupplier_(std::move(consumerSupplier)), onError_(std::move(onError)), @@ -304,6 +305,8 @@ Task::Task( VELOX_CHECK_NULL( dynamic_cast(queryCtx_->executor())); } + + maybeInitQueryTrace(); } Task::~Task() { @@ -2833,6 +2836,61 @@ std::shared_ptr Task::getExchangeClientLocked( return exchangeClients_[pipelineId]; } +std::optional Task::maybeMakeTraceConfig() const { + const auto& queryConfig = queryCtx_->queryConfig(); + if (!queryConfig.queryTraceEnabled()) { + return std::nullopt; + } + + VELOX_CHECK( + !queryConfig.queryTraceDir().empty(), + "Query trace enabled but the trace dir is not set"); + + const auto queryTraceNodes = queryConfig.queryTraceNodeIds(); + if (queryTraceNodes.empty()) { + LOG(INFO) << "Query trace enabled but no tracing plan node specified"; + return trace::QueryTraceConfig( + std::unordered_set{}, queryConfig.queryTraceDir()); + } + + std::vector nodes; + folly::split(',', queryTraceNodes, nodes); + std::unordered_set nodeSet(nodes.begin(), nodes.end()); + VELOX_CHECK_EQ(nodeSet.size(), nodes.size()); + LOG(INFO) << "Query trace plan node ids: " << queryTraceNodes; + return trace::QueryTraceConfig( + std::move(nodeSet), queryConfig.queryTraceDir()); +} + +std::string Task::createTraceDirectory(const std::string& traceDir) const { + try { + const auto fs = filesystems::getFileSystem(traceDir, nullptr); + if (fs->exists(traceDir)) { + fs->remove(traceDir); + } + fs->mkdir(traceDir); + } catch (const std::exception& e) { + VELOX_FAIL( + "Failed to create trace directory '{}' with error: {}", + traceDir, + e.what()); + } + return traceDir; +} + +void Task::maybeInitQueryTrace() { + if (!traceConfig_) { + return; + } + + const auto traceTaskDir = + fmt::format("{}/{}", traceConfig_->queryTraceDir, taskId_); + const auto traceMetaDir = createTraceDirectory(traceTaskDir); + const auto queryMetadatWriter = std::make_unique( + traceMetaDir, memory::traceMemoryPool()); + queryMetadatWriter->write(queryCtx_, planFragment_.planNode); +} + void Task::testingVisitDrivers(const std::function& callback) { std::lock_guard l(mutex_); for (int i = 0; i < drivers_.size(); ++i) { diff --git a/velox/exec/Task.h b/velox/exec/Task.h index 433df9a0451e..3855ef13d621 100644 --- a/velox/exec/Task.h +++ b/velox/exec/Task.h @@ -23,6 +23,8 @@ #include "velox/exec/Split.h" #include "velox/exec/TaskStats.h" #include "velox/exec/TaskStructs.h" +#include "velox/exec/trace/QueryMetadataWriter.h" +#include "velox/exec/trace/QueryTraceConfig.h" #include "velox/vector/ComplexVector.h" namespace facebook::velox::exec { @@ -969,6 +971,16 @@ class Task : public std::enable_shared_from_this { std::shared_ptr getExchangeClientLocked( int32_t pipelineId) const; + // Builds the query trace config. + std::optional maybeMakeTraceConfig() const; + + // Create a directory to store the query trace metdata and data. + std::string createTraceDirectory(const std::string& traceDir) const; + + // Create a 'QueryMetadtaWriter' to trace the query metadata if the query + // trace enabled. + void maybeInitQueryTrace(); + // The helper class used to maintain 'numCreatedTasks_' and 'numDeletedTasks_' // on task construction and destruction. class TaskCounter { @@ -999,6 +1011,7 @@ class Task : public std::enable_shared_from_this { core::PlanFragment planFragment_; const int destination_; const std::shared_ptr queryCtx_; + const std::optional traceConfig_; // The execution mode of the task. It is enforced that a task can only be // executed in a single mode throughout its lifetime diff --git a/velox/exec/trace/QueryTraceConfig.h b/velox/exec/trace/QueryTraceConfig.h index 8afcbf22fb53..e50cca49e282 100644 --- a/velox/exec/trace/QueryTraceConfig.h +++ b/velox/exec/trace/QueryTraceConfig.h @@ -21,9 +21,9 @@ namespace facebook::velox::exec::trace { struct QueryTraceConfig { - /// Target query trace nodes + /// Target query trace nodes. std::unordered_set queryNodes; - /// Base dir of query trace, normmaly it is $prefix/$taskId. + /// Base dir of query trace. std::string queryTraceDir; QueryTraceConfig( diff --git a/velox/exec/trace/test/QueryTraceTest.cpp b/velox/exec/trace/test/QueryTraceTest.cpp index 765eeccaaf53..3cffbbb0ca9f 100644 --- a/velox/exec/trace/test/QueryTraceTest.cpp +++ b/velox/exec/trace/test/QueryTraceTest.cpp @@ -202,4 +202,83 @@ TEST_F(QueryTracerTest, traceMetadata) { ASSERT_EQ(actualConnectorConfigs.at(key), expectedConnectorConfigs.at(key)); } } + +TEST_F(QueryTracerTest, task) { + const auto rowType = + ROW({"c0", "c1", "c2", "c3", "c4", "c5"}, + {BIGINT(), SMALLINT(), TINYINT(), VARCHAR(), VARCHAR(), VARCHAR()}); + std::vector rows; + constexpr auto numBatch = 1; + rows.reserve(numBatch); + for (auto i = 0; i < numBatch; ++i) { + rows.push_back(vectorFuzzer_.fuzzRow(rowType, 2)); + } + + const auto outputDir = TempDirectoryPath::create(); + auto planNodeIdGenerator = std::make_shared(); + const auto planNode = + PlanBuilder(planNodeIdGenerator) + .values(rows, false) + .project({"c0", "c1", "c2"}) + .hashJoin( + {"c0"}, + {"u0"}, + PlanBuilder(planNodeIdGenerator) + .values(rows, true) + .singleAggregation({"c0", "c1"}, {"min(c2)"}) + .project({"c0 AS u0", "c1 AS u1", "a0 AS u2"}) + .planNode(), + "c0 < 135", + {"c0", "c1", "c2"}, + core::JoinType::kInner) + .planNode(); + const auto expectedQueryConfigs = + std::unordered_map{ + {core::QueryConfig::kSpillEnabled, "true"}, + {core::QueryConfig::kSpillNumPartitionBits, "17"}, + {core::QueryConfig::kQueryTraceEnabled, "true"}, + {core::QueryConfig::kQueryTraceDir, outputDir->getPath()}, + {"key1", "value1"}, + }; + const auto expectedConnectorProperties = + std::unordered_map>{ + {"test_trace", + std::make_shared( + std::unordered_map{ + {"cKey1", "cVal1"}})}}; + const auto queryCtx = core::QueryCtx::create( + executor_.get(), + core::QueryConfig(expectedQueryConfigs), + expectedConnectorProperties); + + std::shared_ptr task; + AssertQueryBuilder(planNode).queryCtx(queryCtx).maxDrivers(1).copyResults( + pool(), task); + const auto expectedDir = + fmt::format("{}/{}", outputDir->getPath(), task->taskId()); + + std::unordered_map acutalQueryConfigs; + std::unordered_map> + actualConnectorProperties; + core::PlanNodePtr actualQueryPlan; + auto reader = trace::QueryMetadataReader(expectedDir, pool()); + reader.read(acutalQueryConfigs, actualConnectorProperties, actualQueryPlan); + + ASSERT_TRUE(isSamePlan(actualQueryPlan, planNode)); + ASSERT_EQ(acutalQueryConfigs.size(), expectedQueryConfigs.size()); + for (const auto& [key, value] : acutalQueryConfigs) { + ASSERT_EQ(acutalQueryConfigs.at(key), expectedQueryConfigs.at(key)); + } + + ASSERT_EQ( + actualConnectorProperties.size(), expectedConnectorProperties.size()); + ASSERT_EQ(actualConnectorProperties.count("test_trace"), 1); + const auto expectedConnectorConfigs = + expectedConnectorProperties.at("test_trace")->rawConfigsCopy(); + const auto actualConnectorConfigs = + actualConnectorProperties.at("test_trace"); + for (const auto& [key, value] : actualConnectorConfigs) { + ASSERT_EQ(actualConnectorConfigs.at(key), expectedConnectorConfigs.at(key)); + } +} } // namespace facebook::velox::exec::test