Skip to content

Commit

Permalink
trace metadata in task creation
Browse files Browse the repository at this point in the history
  • Loading branch information
duanmeng committed Aug 25, 2024
1 parent 3325e96 commit 3437398
Show file tree
Hide file tree
Showing 8 changed files with 223 additions and 31 deletions.
5 changes: 5 additions & 0 deletions velox/common/memory/Memory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ MemoryManager::MemoryManager(const MemoryManagerOptions& options)
.coreOnAllocationFailureEnabled =
options.coreOnAllocationFailureEnabled})},
spillPool_{addLeafPool("__sys_spilling__")},
tracePool_{addLeafPool("__sys_tracing__")},
sharedLeafPools_(createSharedLeafMemoryPools(*sysRoot_)) {
VELOX_CHECK_NOT_NULL(allocator_);
VELOX_CHECK_NOT_NULL(arbitrator_);
Expand Down Expand Up @@ -412,4 +413,8 @@ memory::MemoryPool* spillMemoryPool() {
bool isSpillMemoryPool(memory::MemoryPool* pool) {
return pool == spillMemoryPool();
}

memory::MemoryPool* traceMemoryPool() {
return memory::MemoryManager::getInstance()->tracePool();
}
} // namespace facebook::velox::memory
9 changes: 9 additions & 0 deletions velox/common/memory/Memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,11 @@ class MemoryManager {
return spillPool_.get();
}

/// Returns the process wide leaf memory pool used for query tracing.
MemoryPool* tracePool() const {
return tracePool_.get();
}

const std::vector<std::shared_ptr<MemoryPool>>& testingSharedLeafPools() {
return sharedLeafPools_;
}
Expand Down Expand Up @@ -366,6 +371,7 @@ class MemoryManager {

const std::shared_ptr<MemoryPool> sysRoot_;
const std::shared_ptr<MemoryPool> spillPool_;
const std::shared_ptr<MemoryPool> tracePool_;
const std::vector<std::shared_ptr<MemoryPool>> sharedLeafPools_;

mutable folly::SharedMutex mutex_;
Expand Down Expand Up @@ -412,6 +418,9 @@ memory::MemoryPool* spillMemoryPool();
/// Returns true if the provided 'pool' is the spilling memory pool.
bool isSpillMemoryPool(memory::MemoryPool* pool);

/// Returns the system-wide memory pool for tracing memory usage.
memory::MemoryPool* traceMemoryPool();

FOLLY_ALWAYS_INLINE int32_t alignmentPadding(void* address, int32_t alignment) {
auto extra = reinterpret_cast<uintptr_t>(address) % alignment;
return extra == 0 ? 0 : alignment - extra;
Expand Down
61 changes: 32 additions & 29 deletions velox/common/memory/tests/MemoryManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ TEST_F(MemoryManagerTest, ctor) {
const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools;
{
MemoryManager manager{};
ASSERT_EQ(manager.numPools(), 1);
ASSERT_EQ(manager.numPools(), 2);
ASSERT_EQ(manager.capacity(), kMaxMemory);
ASSERT_EQ(0, manager.getTotalBytes());
ASSERT_EQ(manager.alignment(), MemoryAllocator::kMaxAlignment);
Expand All @@ -69,7 +69,7 @@ TEST_F(MemoryManagerTest, ctor) {
.arbitratorCapacity = kCapacity,
.arbitratorReservedCapacity = 0}};
ASSERT_EQ(kCapacity, manager.capacity());
ASSERT_EQ(manager.numPools(), 1);
ASSERT_EQ(manager.numPools(), 2);
ASSERT_EQ(manager.testingDefaultRoot().alignment(), manager.alignment());
}
{
Expand All @@ -84,7 +84,7 @@ TEST_F(MemoryManagerTest, ctor) {
ASSERT_EQ(manager.testingDefaultRoot().alignment(), manager.alignment());
// TODO: replace with root pool memory tracker quota check.
ASSERT_EQ(
kSharedPoolCount + 1, manager.testingDefaultRoot().getChildCount());
kSharedPoolCount + 2, manager.testingDefaultRoot().getChildCount());
ASSERT_EQ(kCapacity, manager.capacity());
ASSERT_EQ(0, manager.getTotalBytes());
}
Expand All @@ -103,7 +103,7 @@ TEST_F(MemoryManagerTest, ctor) {
ASSERT_EQ(
manager.toString(),
"Memory Manager[capacity 4.00GB alignment 64B usedBytes 0B number of "
"pools 1\nList of root pools:\n\t__sys_root__\n"
"pools 2\nList of root pools:\n\t__sys_root__\n"
"Memory Allocator[MALLOC capacity 4.00GB allocated bytes 0 "
"allocated pages 0 mapped pages 0]\n"
"ARBITRATOR[SHARED CAPACITY[4.00GB] PENDING[0] "
Expand Down Expand Up @@ -246,10 +246,10 @@ TEST_F(MemoryManagerTest, addPoolWithArbitrator) {
TEST_F(MemoryManagerTest, defaultMemoryManager) {
auto& managerA = toMemoryManager(deprecatedDefaultMemoryManager());
auto& managerB = toMemoryManager(deprecatedDefaultMemoryManager());
const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 1;
ASSERT_EQ(managerA.numPools(), 1);
const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 2;
ASSERT_EQ(managerA.numPools(), 2);
ASSERT_EQ(managerA.testingDefaultRoot().getChildCount(), kSharedPoolCount);
ASSERT_EQ(managerB.numPools(), 1);
ASSERT_EQ(managerB.numPools(), 2);
ASSERT_EQ(managerB.testingDefaultRoot().getChildCount(), kSharedPoolCount);

auto child1 = managerA.addLeafPool("child_1");
Expand All @@ -260,41 +260,44 @@ TEST_F(MemoryManagerTest, defaultMemoryManager) {
kSharedPoolCount + 2, managerA.testingDefaultRoot().getChildCount());
EXPECT_EQ(
kSharedPoolCount + 2, managerB.testingDefaultRoot().getChildCount());
ASSERT_EQ(managerA.numPools(), 3);
ASSERT_EQ(managerB.numPools(), 3);
auto pool = managerB.addRootPool();
ASSERT_EQ(managerA.numPools(), 4);
ASSERT_EQ(managerB.numPools(), 4);
auto pool = managerB.addRootPool();
ASSERT_EQ(managerA.numPools(), 5);
ASSERT_EQ(managerB.numPools(), 5);
ASSERT_EQ(
managerA.toString(),
"Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 4\nList of root pools:\n\t__sys_root__\n\tdefault_root_0\n\trefcount 2\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]");
"Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 5\nList of root pools:\n\t__sys_root__\n\tdefault_root_0\n\trefcount 2\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]");
ASSERT_EQ(
managerB.toString(),
"Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 4\nList of root pools:\n\t__sys_root__\n\tdefault_root_0\n\trefcount 2\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]");
"Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 5\nList of root pools:\n\t__sys_root__\n\tdefault_root_0\n\trefcount 2\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]");
child1.reset();
EXPECT_EQ(
kSharedPoolCount + 1, managerA.testingDefaultRoot().getChildCount());
child2.reset();
EXPECT_EQ(kSharedPoolCount, managerB.testingDefaultRoot().getChildCount());
ASSERT_EQ(managerA.numPools(), 3);
ASSERT_EQ(managerB.numPools(), 3);
pool.reset();
ASSERT_EQ(managerA.numPools(), 2);
ASSERT_EQ(managerB.numPools(), 2);
pool.reset();
ASSERT_EQ(managerA.numPools(), 1);
ASSERT_EQ(managerB.numPools(), 1);
ASSERT_EQ(
managerA.toString(),
"Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 1\nList of root pools:\n\t__sys_root__\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]");
"Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 2\nList of root pools:\n\t__sys_root__\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]");
ASSERT_EQ(
managerB.toString(),
"Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 1\nList of root pools:\n\t__sys_root__\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]");
"Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 2\nList of root pools:\n\t__sys_root__\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]");
const std::string detailedManagerStr = managerA.toString(true);
ASSERT_THAT(
detailedManagerStr,
testing::HasSubstr(
"Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 1\nList of root pools:\n__sys_root__ usage 0B reserved 0B peak 0B\n"));
"Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 2\nList of root pools:\n__sys_root__ usage 0B reserved 0B peak 0B\n"));
ASSERT_THAT(
detailedManagerStr,
testing::HasSubstr("__sys_spilling__ usage 0B reserved 0B peak 0B\n"));
ASSERT_THAT(
detailedManagerStr,
testing::HasSubstr("__sys_tracing__ usage 0B reserved 0B peak 0B\n"));
for (int i = 0; i < 32; ++i) {
ASSERT_THAT(
managerA.toString(true),
Expand All @@ -306,7 +309,7 @@ TEST_F(MemoryManagerTest, defaultMemoryManager) {
// TODO: remove this test when remove deprecatedAddDefaultLeafMemoryPool.
TEST(MemoryHeaderTest, addDefaultLeafMemoryPool) {
auto& manager = toMemoryManager(deprecatedDefaultMemoryManager());
const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 1;
const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 2;
ASSERT_EQ(manager.testingDefaultRoot().getChildCount(), kSharedPoolCount);
{
auto poolA = deprecatedAddDefaultLeafMemoryPool();
Expand Down Expand Up @@ -361,7 +364,7 @@ TEST_F(MemoryManagerTest, memoryPoolManagement) {
MemoryManagerOptions options;
options.alignment = alignment;
MemoryManager manager{options};
ASSERT_EQ(manager.numPools(), 1);
ASSERT_EQ(manager.numPools(), 2);
const int numPools = 100;
std::vector<std::shared_ptr<MemoryPool>> userRootPools;
std::vector<std::shared_ptr<MemoryPool>> userLeafPools;
Expand All @@ -386,14 +389,14 @@ TEST_F(MemoryManagerTest, memoryPoolManagement) {
ASSERT_FALSE(rootUnamedPool->name().empty());
ASSERT_EQ(rootUnamedPool->kind(), MemoryPool::Kind::kAggregate);
ASSERT_EQ(rootUnamedPool->parent(), nullptr);
ASSERT_EQ(manager.numPools(), 1 + numPools + 2);
ASSERT_EQ(manager.numPools(), 1 + numPools + 2 + 1);
userLeafPools.clear();
leafUnamedPool.reset();
ASSERT_EQ(manager.numPools(), 1 + numPools / 2 + 1);
ASSERT_EQ(manager.numPools(), 1 + numPools / 2 + 1 + 1);
userRootPools.clear();
ASSERT_EQ(manager.numPools(), 1 + 1);
ASSERT_EQ(manager.numPools(), 1 + 2);
rootUnamedPool.reset();
ASSERT_EQ(manager.numPools(), 1);
ASSERT_EQ(manager.numPools(), 2);
}

// TODO: when run sequentially, e.g. `buck run dwio/memory/...`, this has side
Expand All @@ -410,7 +413,7 @@ TEST_F(MemoryManagerTest, globalMemoryManager) {
ASSERT_NE(manager, globalManager);
ASSERT_EQ(manager, memoryManager());
auto* managerII = memoryManager();
const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 1;
const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 2;
{
auto& rootI = manager->testingDefaultRoot();
const std::string childIName("some_child");
Expand Down Expand Up @@ -444,9 +447,9 @@ TEST_F(MemoryManagerTest, globalMemoryManager) {
ASSERT_EQ(userRootChild->kind(), MemoryPool::Kind::kAggregate);
ASSERT_EQ(rootI.getChildCount(), kSharedPoolCount + 1);
ASSERT_EQ(rootII.getChildCount(), kSharedPoolCount + 1);
ASSERT_EQ(manager->numPools(), 2 + 1);
ASSERT_EQ(manager->numPools(), 2 + 2);
}
ASSERT_EQ(manager->numPools(), 1);
ASSERT_EQ(manager->numPools(), 2);
}

TEST_F(MemoryManagerTest, alignmentOptionCheck) {
Expand Down Expand Up @@ -544,9 +547,9 @@ TEST_F(MemoryManagerTest, concurrentPoolAccess) {
}
stopCheck = true;
checkThread.join();
ASSERT_EQ(manager.numPools(), pools.size() + 1);
ASSERT_EQ(manager.numPools(), pools.size() + 2);
pools.clear();
ASSERT_EQ(manager.numPools(), 1);
ASSERT_EQ(manager.numPools(), 2);
}

TEST_F(MemoryManagerTest, quotaEnforcement) {
Expand Down
25 changes: 25 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,16 @@ class QueryConfig {
/// derived using micro-benchmarking.
static constexpr const char* kPrefixSortMinRows = "prefixsort_min_rows";

/// Enable query tracing flag.
static constexpr const char* kQueryTraceEnabled = "query_trace_enabled";

/// Base dir of a query to store tracing data.
static constexpr const char* kQueryTraceDir = "query_trace_dir";

/// A comma-separated list of plan node ids whose input data will be traced.
/// Empty string if only want to trace the query metadata.
static constexpr const char* kQueryTraceNodeIds = "query_trace_node_ids";

uint64_t queryMaxMemoryPerNode() const {
return config::toCapacity(
get<std::string>(kQueryMaxMemoryPerNode, "0B"),
Expand Down Expand Up @@ -611,6 +621,21 @@ class QueryConfig {
return get<int32_t>(kSpillableReservationGrowthPct, kDefaultPct);
}

/// Returns true if query tracing is enabled.
bool queryTraceEnabled() const {
return get<bool>(kQueryTraceEnabled, false);
}

std::string queryTraceDir() const {
// The default query trace dir, empty by default.
return get<std::string>(kQueryTraceDir, "");
}

std::string queryTraceNodeIds() const {
// The default query trace nodes, empty by default.
return get<std::string>(kQueryTraceNodeIds, "");
}

bool prestoArrayAggIgnoreNulls() const {
return get<bool>(kPrestoArrayAggIgnoreNulls, false);
}
Expand Down
58 changes: 58 additions & 0 deletions velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ Task::Task(
planFragment_(std::move(planFragment)),
destination_(destination),
queryCtx_(std::move(queryCtx)),
traceConfig_(maybeMakeTraceConfig()),
mode_(mode),
consumerSupplier_(std::move(consumerSupplier)),
onError_(std::move(onError)),
Expand All @@ -304,6 +305,8 @@ Task::Task(
VELOX_CHECK_NULL(
dynamic_cast<const folly::InlineLikeExecutor*>(queryCtx_->executor()));
}

maybeInitQueryTrace();
}

Task::~Task() {
Expand Down Expand Up @@ -2833,6 +2836,61 @@ std::shared_ptr<ExchangeClient> Task::getExchangeClientLocked(
return exchangeClients_[pipelineId];
}

std::optional<trace::QueryTraceConfig> Task::maybeMakeTraceConfig() const {
const auto& queryConfig = queryCtx_->queryConfig();
if (!queryConfig.queryTraceEnabled()) {
return std::nullopt;
}

VELOX_CHECK(
!queryConfig.queryTraceDir().empty(),
"Query trace enabled but the trace dir is not set");

const auto queryTraceNodes = queryConfig.queryTraceNodeIds();
if (queryTraceNodes.empty()) {
LOG(INFO) << "Query trace enabled but no tracing plan node specified";
return trace::QueryTraceConfig(
std::unordered_set<std::string>{}, queryConfig.queryTraceDir());
}

std::vector<std::string> nodes;
folly::split(',', queryTraceNodes, nodes);
std::unordered_set<std::string> nodeSet(nodes.begin(), nodes.end());
VELOX_CHECK_EQ(nodeSet.size(), nodes.size());
LOG(INFO) << "Query trace plan node ids: " << queryTraceNodes;
return trace::QueryTraceConfig(
std::move(nodeSet), queryConfig.queryTraceDir());
}

std::string Task::createTraceDirectory(const std::string& traceDir) const {
try {
const auto fs = filesystems::getFileSystem(traceDir, nullptr);
if (fs->exists(traceDir)) {
fs->remove(traceDir);
}
fs->mkdir(traceDir);
} catch (const std::exception& e) {
VELOX_FAIL(
"Failed to create trace directory '{}' with error: {}",
traceDir,
e.what());
}
return traceDir;
}

void Task::maybeInitQueryTrace() {
if (!traceConfig_) {
return;
}

const auto traceTaskDir =
fmt::format("{}/{}", traceConfig_->queryTraceDir, taskId_);
const auto traceMetaDir = createTraceDirectory(traceTaskDir);
const auto queryMetadatWriter = std::make_unique<trace::QueryMetadataWriter>(
traceMetaDir, memory::traceMemoryPool());
queryMetadatWriter->write(queryCtx_, planFragment_.planNode);
}

void Task::testingVisitDrivers(const std::function<void(Driver*)>& callback) {
std::lock_guard<std::timed_mutex> l(mutex_);
for (int i = 0; i < drivers_.size(); ++i) {
Expand Down
13 changes: 13 additions & 0 deletions velox/exec/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include "velox/exec/Split.h"
#include "velox/exec/TaskStats.h"
#include "velox/exec/TaskStructs.h"
#include "velox/exec/trace/QueryMetadataWriter.h"
#include "velox/exec/trace/QueryTraceConfig.h"
#include "velox/vector/ComplexVector.h"

namespace facebook::velox::exec {
Expand Down Expand Up @@ -969,6 +971,16 @@ class Task : public std::enable_shared_from_this<Task> {
std::shared_ptr<ExchangeClient> getExchangeClientLocked(
int32_t pipelineId) const;

// Builds the query trace config.
std::optional<trace::QueryTraceConfig> maybeMakeTraceConfig() const;

// Create a directory to store the query trace metdata and data.
std::string createTraceDirectory(const std::string& traceDir) const;

// Create a 'QueryMetadtaWriter' to trace the query metadata if the query
// trace enabled.
void maybeInitQueryTrace();

// The helper class used to maintain 'numCreatedTasks_' and 'numDeletedTasks_'
// on task construction and destruction.
class TaskCounter {
Expand Down Expand Up @@ -999,6 +1011,7 @@ class Task : public std::enable_shared_from_this<Task> {
core::PlanFragment planFragment_;
const int destination_;
const std::shared_ptr<core::QueryCtx> queryCtx_;
const std::optional<trace::QueryTraceConfig> traceConfig_;

// The execution mode of the task. It is enforced that a task can only be
// executed in a single mode throughout its lifetime
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/trace/QueryTraceConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@

namespace facebook::velox::exec::trace {
struct QueryTraceConfig {
/// Target query trace nodes
/// Target query trace nodes.
std::unordered_set<std::string> queryNodes;
/// Base dir of query trace, normmaly it is $prefix/$taskId.
/// Base dir of query trace.
std::string queryTraceDir;

QueryTraceConfig(
Expand Down
Loading

0 comments on commit 3437398

Please sign in to comment.