Skip to content

Commit

Permalink
trace metadata in task creation
Browse files Browse the repository at this point in the history
  • Loading branch information
duanmeng committed Aug 25, 2024
1 parent 3325e96 commit 97d582b
Show file tree
Hide file tree
Showing 7 changed files with 191 additions and 2 deletions.
5 changes: 5 additions & 0 deletions velox/common/memory/Memory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ MemoryManager::MemoryManager(const MemoryManagerOptions& options)
.coreOnAllocationFailureEnabled =
options.coreOnAllocationFailureEnabled})},
spillPool_{addLeafPool("__sys_spilling__")},
tracePool_{addLeafPool("__sys_tracing__")},
sharedLeafPools_(createSharedLeafMemoryPools(*sysRoot_)) {
VELOX_CHECK_NOT_NULL(allocator_);
VELOX_CHECK_NOT_NULL(arbitrator_);
Expand Down Expand Up @@ -412,4 +413,8 @@ memory::MemoryPool* spillMemoryPool() {
bool isSpillMemoryPool(memory::MemoryPool* pool) {
return pool == spillMemoryPool();
}

memory::MemoryPool* traceMemoryPool() {
return memory::MemoryManager::getInstance()->tracePool();
}
} // namespace facebook::velox::memory
9 changes: 9 additions & 0 deletions velox/common/memory/Memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,11 @@ class MemoryManager {
return spillPool_.get();
}

/// Returns the process wide leaf memory pool used for query tracing.
MemoryPool* tracePool() const {
return tracePool_.get();
}

const std::vector<std::shared_ptr<MemoryPool>>& testingSharedLeafPools() {
return sharedLeafPools_;
}
Expand Down Expand Up @@ -366,6 +371,7 @@ class MemoryManager {

const std::shared_ptr<MemoryPool> sysRoot_;
const std::shared_ptr<MemoryPool> spillPool_;
const std::shared_ptr<MemoryPool> tracePool_;
const std::vector<std::shared_ptr<MemoryPool>> sharedLeafPools_;

mutable folly::SharedMutex mutex_;
Expand Down Expand Up @@ -412,6 +418,9 @@ memory::MemoryPool* spillMemoryPool();
/// Returns true if the provided 'pool' is the spilling memory pool.
bool isSpillMemoryPool(memory::MemoryPool* pool);

/// Returns the system-wide memory pool for tracing memory usage.
memory::MemoryPool* traceMemoryPool();

FOLLY_ALWAYS_INLINE int32_t alignmentPadding(void* address, int32_t alignment) {
auto extra = reinterpret_cast<uintptr_t>(address) % alignment;
return extra == 0 ? 0 : alignment - extra;
Expand Down
25 changes: 25 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,16 @@ class QueryConfig {
/// derived using micro-benchmarking.
static constexpr const char* kPrefixSortMinRows = "prefixsort_min_rows";

/// Enable query tracing flag.
static constexpr const char* kQueryTraceEnabled = "query_trace_enabled";

/// Base dir of a query to store tracing data.
static constexpr const char* kQueryTraceDir = "query_trace_dir";

/// A comma-separated list of plan node ids whose input data will be traced.
/// Empty string if only want to trace the query metadata.
static constexpr const char* kQueryTraceNodeIds = "query_trace_node_ids";

uint64_t queryMaxMemoryPerNode() const {
return config::toCapacity(
get<std::string>(kQueryMaxMemoryPerNode, "0B"),
Expand Down Expand Up @@ -611,6 +621,21 @@ class QueryConfig {
return get<int32_t>(kSpillableReservationGrowthPct, kDefaultPct);
}

/// Returns true if query tracing is enabled.
bool queryTraceEnabled() const {
return get<bool>(kQueryTraceEnabled, false);
}

std::string queryTraceDir() const {
// The default query trace dir, empty by default.
return get<std::string>(kQueryTraceDir, "");
}

std::string queryTraceNodeIds() const {
// The default query trace nodes, empty by default.
return get<std::string>(kQueryTraceNodeIds, "");
}

bool prestoArrayAggIgnoreNulls() const {
return get<bool>(kPrestoArrayAggIgnoreNulls, false);
}
Expand Down
58 changes: 58 additions & 0 deletions velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ Task::Task(
planFragment_(std::move(planFragment)),
destination_(destination),
queryCtx_(std::move(queryCtx)),
traceConfig_(maybeMakeTraceConfig()),
mode_(mode),
consumerSupplier_(std::move(consumerSupplier)),
onError_(std::move(onError)),
Expand All @@ -304,6 +305,8 @@ Task::Task(
VELOX_CHECK_NULL(
dynamic_cast<const folly::InlineLikeExecutor*>(queryCtx_->executor()));
}

maybeInitQueryTrace();
}

Task::~Task() {
Expand Down Expand Up @@ -2833,6 +2836,61 @@ std::shared_ptr<ExchangeClient> Task::getExchangeClientLocked(
return exchangeClients_[pipelineId];
}

std::optional<trace::QueryTraceConfig> Task::maybeMakeTraceConfig() const {
const auto& queryConfig = queryCtx_->queryConfig();
if (!queryConfig.queryTraceEnabled()) {
return std::nullopt;
}

VELOX_CHECK(
!queryConfig.queryTraceDir().empty(),
"Query trace enabled but the trace dir is not set");

const auto queryTraceNodes = queryConfig.queryTraceNodeIds();
if (queryTraceNodes.empty()) {
LOG(INFO) << "Query trace enabled but no tracing plan node specified";
return trace::QueryTraceConfig(
std::unordered_set<std::string>{}, queryConfig.queryTraceDir());
}

std::vector<std::string> nodes;
folly::split(',', queryTraceNodes, nodes);
std::unordered_set<std::string> nodeSet(nodes.begin(), nodes.end());
VELOX_CHECK_EQ(nodeSet.size(), nodes.size());
LOG(INFO) << "Query trace plan node ids: " << queryTraceNodes;
return trace::QueryTraceConfig(
std::move(nodeSet), queryConfig.queryTraceDir());
}

std::string Task::createTraceDirectory(const std::string& traceDir) const {
try {
const auto fs = filesystems::getFileSystem(traceDir, nullptr);
if (fs->exists(traceDir)) {
fs->remove(traceDir);
}
fs->mkdir(traceDir);
} catch (const std::exception& e) {
VELOX_FAIL(
"Failed to create trace directory '{}' with error: {}",
traceDir,
e.what());
}
return traceDir;
}

void Task::maybeInitQueryTrace() {
if (!traceConfig_) {
return;
}

const auto traceTaskDir =
fmt::format("{}/{}", traceConfig_->queryTraceDir, taskId_);
const auto traceMetaDir = createTraceDirectory(traceTaskDir);
const auto queryMetadatWriter = std::make_unique<trace::QueryMetadataWriter>(
traceMetaDir, memory::traceMemoryPool());
queryMetadatWriter->write(queryCtx_, planFragment_.planNode);
}

void Task::testingVisitDrivers(const std::function<void(Driver*)>& callback) {
std::lock_guard<std::timed_mutex> l(mutex_);
for (int i = 0; i < drivers_.size(); ++i) {
Expand Down
13 changes: 13 additions & 0 deletions velox/exec/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include "velox/exec/Split.h"
#include "velox/exec/TaskStats.h"
#include "velox/exec/TaskStructs.h"
#include "velox/exec/trace/QueryMetadataWriter.h"
#include "velox/exec/trace/QueryTraceConfig.h"
#include "velox/vector/ComplexVector.h"

namespace facebook::velox::exec {
Expand Down Expand Up @@ -969,6 +971,16 @@ class Task : public std::enable_shared_from_this<Task> {
std::shared_ptr<ExchangeClient> getExchangeClientLocked(
int32_t pipelineId) const;

// Builds the query trace config.
std::optional<trace::QueryTraceConfig> maybeMakeTraceConfig() const;

// Create a directory to store the query trace metdata and data.
std::string createTraceDirectory(const std::string& traceDir) const;

// Create a 'QueryMetadtaWriter' to trace the query metadata if the query
// trace enabled.
void maybeInitQueryTrace();

// The helper class used to maintain 'numCreatedTasks_' and 'numDeletedTasks_'
// on task construction and destruction.
class TaskCounter {
Expand Down Expand Up @@ -999,6 +1011,7 @@ class Task : public std::enable_shared_from_this<Task> {
core::PlanFragment planFragment_;
const int destination_;
const std::shared_ptr<core::QueryCtx> queryCtx_;
const std::optional<trace::QueryTraceConfig> traceConfig_;

// The execution mode of the task. It is enforced that a task can only be
// executed in a single mode throughout its lifetime
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/trace/QueryTraceConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@

namespace facebook::velox::exec::trace {
struct QueryTraceConfig {
/// Target query trace nodes
/// Target query trace nodes.
std::unordered_set<std::string> queryNodes;
/// Base dir of query trace, normmaly it is $prefix/$taskId.
/// Base dir of query trace.
std::string queryTraceDir;

QueryTraceConfig(
Expand Down
79 changes: 79 additions & 0 deletions velox/exec/trace/test/QueryTraceTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,4 +202,83 @@ TEST_F(QueryTracerTest, traceMetadata) {
ASSERT_EQ(actualConnectorConfigs.at(key), expectedConnectorConfigs.at(key));
}
}

TEST_F(QueryTracerTest, task) {
const auto rowType =
ROW({"c0", "c1", "c2", "c3", "c4", "c5"},
{BIGINT(), SMALLINT(), TINYINT(), VARCHAR(), VARCHAR(), VARCHAR()});
std::vector<RowVectorPtr> rows;
constexpr auto numBatch = 1;
rows.reserve(numBatch);
for (auto i = 0; i < numBatch; ++i) {
rows.push_back(vectorFuzzer_.fuzzRow(rowType, 2));
}

const auto outputDir = TempDirectoryPath::create();
auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
const auto planNode =
PlanBuilder(planNodeIdGenerator)
.values(rows, false)
.project({"c0", "c1", "c2"})
.hashJoin(
{"c0"},
{"u0"},
PlanBuilder(planNodeIdGenerator)
.values(rows, true)
.singleAggregation({"c0", "c1"}, {"min(c2)"})
.project({"c0 AS u0", "c1 AS u1", "a0 AS u2"})
.planNode(),
"c0 < 135",
{"c0", "c1", "c2"},
core::JoinType::kInner)
.planNode();
const auto expectedQueryConfigs =
std::unordered_map<std::string, std::string>{
{core::QueryConfig::kSpillEnabled, "true"},
{core::QueryConfig::kSpillNumPartitionBits, "17"},
{core::QueryConfig::kQueryTraceEnabled, "true"},
{core::QueryConfig::kQueryTraceDir, outputDir->getPath()},
{"key1", "value1"},
};
const auto expectedConnectorProperties =
std::unordered_map<std::string, std::shared_ptr<config::ConfigBase>>{
{"test_trace",
std::make_shared<config::ConfigBase>(
std::unordered_map<std::string, std::string>{
{"cKey1", "cVal1"}})}};
const auto queryCtx = core::QueryCtx::create(
executor_.get(),
core::QueryConfig(expectedQueryConfigs),
expectedConnectorProperties);

std::shared_ptr<Task> task;
AssertQueryBuilder(planNode).queryCtx(queryCtx).maxDrivers(1).copyResults(
pool(), task);
const auto expectedDir =
fmt::format("{}/{}", outputDir->getPath(), task->taskId());

std::unordered_map<std::string, std::string> acutalQueryConfigs;
std::unordered_map<std::string, std::unordered_map<std::string, std::string>>
actualConnectorProperties;
core::PlanNodePtr actualQueryPlan;
auto reader = trace::QueryMetadataReader(expectedDir, pool());
reader.read(acutalQueryConfigs, actualConnectorProperties, actualQueryPlan);

ASSERT_TRUE(isSamePlan(actualQueryPlan, planNode));
ASSERT_EQ(acutalQueryConfigs.size(), expectedQueryConfigs.size());
for (const auto& [key, value] : acutalQueryConfigs) {
ASSERT_EQ(acutalQueryConfigs.at(key), expectedQueryConfigs.at(key));
}

ASSERT_EQ(
actualConnectorProperties.size(), expectedConnectorProperties.size());
ASSERT_EQ(actualConnectorProperties.count("test_trace"), 1);
const auto expectedConnectorConfigs =
expectedConnectorProperties.at("test_trace")->rawConfigsCopy();
const auto actualConnectorConfigs =
actualConnectorProperties.at("test_trace");
for (const auto& [key, value] : actualConnectorConfigs) {
ASSERT_EQ(actualConnectorConfigs.at(key), expectedConnectorConfigs.at(key));
}
}
} // namespace facebook::velox::exec::test

0 comments on commit 97d582b

Please sign in to comment.