diff --git a/velox/common/memory/Memory.cpp b/velox/common/memory/Memory.cpp index fed1d3c2b6dc..72b1882f2685 100644 --- a/velox/common/memory/Memory.cpp +++ b/velox/common/memory/Memory.cpp @@ -157,6 +157,7 @@ MemoryManager::MemoryManager(const MemoryManagerOptions& options) .coreOnAllocationFailureEnabled = options.coreOnAllocationFailureEnabled})}, spillPool_{addLeafPool("__sys_spilling__")}, + tracePool_{addLeafPool("__sys_tracing__")}, sharedLeafPools_(createSharedLeafMemoryPools(*sysRoot_)) { VELOX_CHECK_NOT_NULL(allocator_); VELOX_CHECK_NOT_NULL(arbitrator_); @@ -412,4 +413,8 @@ memory::MemoryPool* spillMemoryPool() { bool isSpillMemoryPool(memory::MemoryPool* pool) { return pool == spillMemoryPool(); } + +memory::MemoryPool* traceMemoryPool() { + return memory::MemoryManager::getInstance()->tracePool(); +} } // namespace facebook::velox::memory diff --git a/velox/common/memory/Memory.h b/velox/common/memory/Memory.h index 5ce372e77dc4..d8849d9acce5 100644 --- a/velox/common/memory/Memory.h +++ b/velox/common/memory/Memory.h @@ -339,6 +339,11 @@ class MemoryManager { return spillPool_.get(); } + /// Returns the process wide leaf memory pool used for query tracing. + MemoryPool* tracePool() const { + return tracePool_.get(); + } + const std::vector>& testingSharedLeafPools() { return sharedLeafPools_; } @@ -366,6 +371,7 @@ class MemoryManager { const std::shared_ptr sysRoot_; const std::shared_ptr spillPool_; + const std::shared_ptr tracePool_; const std::vector> sharedLeafPools_; mutable folly::SharedMutex mutex_; @@ -412,6 +418,9 @@ memory::MemoryPool* spillMemoryPool(); /// Returns true if the provided 'pool' is the spilling memory pool. bool isSpillMemoryPool(memory::MemoryPool* pool); +/// Returns the system-wide memory pool for tracing memory usage. +memory::MemoryPool* traceMemoryPool(); + FOLLY_ALWAYS_INLINE int32_t alignmentPadding(void* address, int32_t alignment) { auto extra = reinterpret_cast(address) % alignment; return extra == 0 ? 0 : alignment - extra; diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index 78f51375fa45..bc1a24976a71 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -346,6 +346,16 @@ class QueryConfig { /// derived using micro-benchmarking. static constexpr const char* kPrefixSortMinRows = "prefixsort_min_rows"; + /// Enable query tracing flag. + static constexpr const char* kQueryTraceEnabled = "query_trace_enabled"; + + /// Base dir of a query to store tracing data. + static constexpr const char* kQueryTraceDir = "query_trace_dir"; + + /// A comma-separated list of plan node ids whose input data will be traced. + /// Empty string if only want to trace the query metadata. + static constexpr const char* kQueryTraceNodeIds = "query_trace_node_ids"; + uint64_t queryMaxMemoryPerNode() const { return config::toCapacity( get(kQueryMaxMemoryPerNode, "0B"), @@ -611,6 +621,21 @@ class QueryConfig { return get(kSpillableReservationGrowthPct, kDefaultPct); } + /// Returns true if query tracing is enabled. + bool queryTraceEnabled() const { + return get(kQueryTraceEnabled, false); + } + + std::string queryTraceDir() const { + // The default query trace dir, empty by default. + return get(kQueryTraceDir, ""); + } + + std::string queryTraceNodeIds() const { + // The default query trace nodes, empty by default. + return get(kQueryTraceNodeIds, ""); + } + bool prestoArrayAggIgnoreNulls() const { return get(kPrestoArrayAggIgnoreNulls, false); } diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index 38006f9baf53..d11e492a2580 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -293,6 +293,7 @@ Task::Task( planFragment_(std::move(planFragment)), destination_(destination), queryCtx_(std::move(queryCtx)), + traceConfig_(maybeMakeTraceConfig()), mode_(mode), consumerSupplier_(std::move(consumerSupplier)), onError_(std::move(onError)), @@ -304,6 +305,8 @@ Task::Task( VELOX_CHECK_NULL( dynamic_cast(queryCtx_->executor())); } + + maybeInitQueryTrace(); } Task::~Task() { @@ -2833,6 +2836,61 @@ std::shared_ptr Task::getExchangeClientLocked( return exchangeClients_[pipelineId]; } +std::optional Task::maybeMakeTraceConfig() const { + const auto& queryConfig = queryCtx_->queryConfig(); + if (!queryConfig.queryTraceEnabled()) { + return std::nullopt; + } + + VELOX_CHECK( + !queryConfig.queryTraceDir().empty(), + "Query trace enabled but the trace dir is not set"); + + const auto queryTraceNodes = queryConfig.queryTraceNodeIds(); + if (queryTraceNodes.empty()) { + LOG(INFO) << "Query trace enabled but no tracing plan node specified"; + return trace::QueryTraceConfig( + std::unordered_set{}, queryConfig.queryTraceDir()); + } + + std::vector nodes; + folly::split(',', queryTraceNodes, nodes); + std::unordered_set nodeSet(nodes.begin(), nodes.end()); + VELOX_CHECK_EQ(nodeSet.size(), nodes.size()); + LOG(INFO) << "Query trace plan node ids: " << queryTraceNodes; + return trace::QueryTraceConfig( + std::move(nodeSet), queryConfig.queryTraceDir()); +} + +std::string Task::createTraceDirectory(const std::string& traceDir) const { + try { + const auto fs = filesystems::getFileSystem(traceDir, nullptr); + if (fs->exists(traceDir)) { + fs->remove(traceDir); + } + fs->mkdir(traceDir); + } catch (const std::exception& e) { + VELOX_FAIL( + "Failed to create trace directory '{}' with error: {}", + traceDir, + e.what()); + } + return traceDir; +} + +void Task::maybeInitQueryTrace() { + if (!traceConfig_) { + return; + } + + const auto traceTaskDir = + fmt::format("{}/{}", traceConfig_->queryTraceDir, taskId_); + const auto traceMetaDir = createTraceDirectory(traceTaskDir); + const auto queryMetadatWriter = std::make_unique( + traceMetaDir, memory::traceMemoryPool()); + queryMetadatWriter->write(queryCtx_, planFragment_.planNode); +} + void Task::testingVisitDrivers(const std::function& callback) { std::lock_guard l(mutex_); for (int i = 0; i < drivers_.size(); ++i) { diff --git a/velox/exec/Task.h b/velox/exec/Task.h index 433df9a0451e..3855ef13d621 100644 --- a/velox/exec/Task.h +++ b/velox/exec/Task.h @@ -23,6 +23,8 @@ #include "velox/exec/Split.h" #include "velox/exec/TaskStats.h" #include "velox/exec/TaskStructs.h" +#include "velox/exec/trace/QueryMetadataWriter.h" +#include "velox/exec/trace/QueryTraceConfig.h" #include "velox/vector/ComplexVector.h" namespace facebook::velox::exec { @@ -969,6 +971,16 @@ class Task : public std::enable_shared_from_this { std::shared_ptr getExchangeClientLocked( int32_t pipelineId) const; + // Builds the query trace config. + std::optional maybeMakeTraceConfig() const; + + // Create a directory to store the query trace metdata and data. + std::string createTraceDirectory(const std::string& traceDir) const; + + // Create a 'QueryMetadtaWriter' to trace the query metadata if the query + // trace enabled. + void maybeInitQueryTrace(); + // The helper class used to maintain 'numCreatedTasks_' and 'numDeletedTasks_' // on task construction and destruction. class TaskCounter { @@ -999,6 +1011,7 @@ class Task : public std::enable_shared_from_this { core::PlanFragment planFragment_; const int destination_; const std::shared_ptr queryCtx_; + const std::optional traceConfig_; // The execution mode of the task. It is enforced that a task can only be // executed in a single mode throughout its lifetime diff --git a/velox/exec/trace/QueryTraceConfig.h b/velox/exec/trace/QueryTraceConfig.h index 8afcbf22fb53..e50cca49e282 100644 --- a/velox/exec/trace/QueryTraceConfig.h +++ b/velox/exec/trace/QueryTraceConfig.h @@ -21,9 +21,9 @@ namespace facebook::velox::exec::trace { struct QueryTraceConfig { - /// Target query trace nodes + /// Target query trace nodes. std::unordered_set queryNodes; - /// Base dir of query trace, normmaly it is $prefix/$taskId. + /// Base dir of query trace. std::string queryTraceDir; QueryTraceConfig( diff --git a/velox/exec/trace/test/QueryTraceTest.cpp b/velox/exec/trace/test/QueryTraceTest.cpp index 765eeccaaf53..3cffbbb0ca9f 100644 --- a/velox/exec/trace/test/QueryTraceTest.cpp +++ b/velox/exec/trace/test/QueryTraceTest.cpp @@ -202,4 +202,83 @@ TEST_F(QueryTracerTest, traceMetadata) { ASSERT_EQ(actualConnectorConfigs.at(key), expectedConnectorConfigs.at(key)); } } + +TEST_F(QueryTracerTest, task) { + const auto rowType = + ROW({"c0", "c1", "c2", "c3", "c4", "c5"}, + {BIGINT(), SMALLINT(), TINYINT(), VARCHAR(), VARCHAR(), VARCHAR()}); + std::vector rows; + constexpr auto numBatch = 1; + rows.reserve(numBatch); + for (auto i = 0; i < numBatch; ++i) { + rows.push_back(vectorFuzzer_.fuzzRow(rowType, 2)); + } + + const auto outputDir = TempDirectoryPath::create(); + auto planNodeIdGenerator = std::make_shared(); + const auto planNode = + PlanBuilder(planNodeIdGenerator) + .values(rows, false) + .project({"c0", "c1", "c2"}) + .hashJoin( + {"c0"}, + {"u0"}, + PlanBuilder(planNodeIdGenerator) + .values(rows, true) + .singleAggregation({"c0", "c1"}, {"min(c2)"}) + .project({"c0 AS u0", "c1 AS u1", "a0 AS u2"}) + .planNode(), + "c0 < 135", + {"c0", "c1", "c2"}, + core::JoinType::kInner) + .planNode(); + const auto expectedQueryConfigs = + std::unordered_map{ + {core::QueryConfig::kSpillEnabled, "true"}, + {core::QueryConfig::kSpillNumPartitionBits, "17"}, + {core::QueryConfig::kQueryTraceEnabled, "true"}, + {core::QueryConfig::kQueryTraceDir, outputDir->getPath()}, + {"key1", "value1"}, + }; + const auto expectedConnectorProperties = + std::unordered_map>{ + {"test_trace", + std::make_shared( + std::unordered_map{ + {"cKey1", "cVal1"}})}}; + const auto queryCtx = core::QueryCtx::create( + executor_.get(), + core::QueryConfig(expectedQueryConfigs), + expectedConnectorProperties); + + std::shared_ptr task; + AssertQueryBuilder(planNode).queryCtx(queryCtx).maxDrivers(1).copyResults( + pool(), task); + const auto expectedDir = + fmt::format("{}/{}", outputDir->getPath(), task->taskId()); + + std::unordered_map acutalQueryConfigs; + std::unordered_map> + actualConnectorProperties; + core::PlanNodePtr actualQueryPlan; + auto reader = trace::QueryMetadataReader(expectedDir, pool()); + reader.read(acutalQueryConfigs, actualConnectorProperties, actualQueryPlan); + + ASSERT_TRUE(isSamePlan(actualQueryPlan, planNode)); + ASSERT_EQ(acutalQueryConfigs.size(), expectedQueryConfigs.size()); + for (const auto& [key, value] : acutalQueryConfigs) { + ASSERT_EQ(acutalQueryConfigs.at(key), expectedQueryConfigs.at(key)); + } + + ASSERT_EQ( + actualConnectorProperties.size(), expectedConnectorProperties.size()); + ASSERT_EQ(actualConnectorProperties.count("test_trace"), 1); + const auto expectedConnectorConfigs = + expectedConnectorProperties.at("test_trace")->rawConfigsCopy(); + const auto actualConnectorConfigs = + actualConnectorProperties.at("test_trace"); + for (const auto& [key, value] : actualConnectorConfigs) { + ASSERT_EQ(actualConnectorConfigs.at(key), expectedConnectorConfigs.at(key)); + } +} } // namespace facebook::velox::exec::test