Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trace metadata during task creation #10815

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions velox/common/memory/Memory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ MemoryManager::MemoryManager(const MemoryManagerOptions& options)
.coreOnAllocationFailureEnabled =
options.coreOnAllocationFailureEnabled})},
spillPool_{addLeafPool("__sys_spilling__")},
tracePool_{addLeafPool("__sys_tracing__")},
sharedLeafPools_(createSharedLeafMemoryPools(*sysRoot_)) {
VELOX_CHECK_NOT_NULL(allocator_);
VELOX_CHECK_NOT_NULL(arbitrator_);
Expand Down Expand Up @@ -427,4 +428,8 @@ memory::MemoryPool* spillMemoryPool() {
bool isSpillMemoryPool(memory::MemoryPool* pool) {
return pool == spillMemoryPool();
}

memory::MemoryPool* traceMemoryPool() {
xiaoxmeng marked this conversation as resolved.
Show resolved Hide resolved
return memory::MemoryManager::getInstance()->tracePool();
}
} // namespace facebook::velox::memory
9 changes: 9 additions & 0 deletions velox/common/memory/Memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,11 @@ class MemoryManager {
return spillPool_.get();
}

/// Returns the process wide leaf memory pool used for query tracing.
MemoryPool* tracePool() const {
return tracePool_.get();
}

const std::vector<std::shared_ptr<MemoryPool>>& testingSharedLeafPools() {
return sharedLeafPools_;
}
Expand Down Expand Up @@ -374,6 +379,7 @@ class MemoryManager {

const std::shared_ptr<MemoryPool> sysRoot_;
const std::shared_ptr<MemoryPool> spillPool_;
const std::shared_ptr<MemoryPool> tracePool_;
const std::vector<std::shared_ptr<MemoryPool>> sharedLeafPools_;

mutable folly::SharedMutex mutex_;
Expand Down Expand Up @@ -420,6 +426,9 @@ memory::MemoryPool* spillMemoryPool();
/// Returns true if the provided 'pool' is the spilling memory pool.
bool isSpillMemoryPool(memory::MemoryPool* pool);

/// Returns the system-wide memory pool for tracing memory usage.
memory::MemoryPool* traceMemoryPool();

FOLLY_ALWAYS_INLINE int32_t alignmentPadding(void* address, int32_t alignment) {
auto extra = reinterpret_cast<uintptr_t>(address) % alignment;
return extra == 0 ? 0 : alignment - extra;
Expand Down
63 changes: 33 additions & 30 deletions velox/common/memory/tests/MemoryManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ TEST_F(MemoryManagerTest, ctor) {
const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools;
{
MemoryManager manager{};
ASSERT_EQ(manager.numPools(), 1);
ASSERT_EQ(manager.numPools(), 2);
ASSERT_EQ(manager.capacity(), kMaxMemory);
ASSERT_EQ(0, manager.getTotalBytes());
ASSERT_EQ(manager.alignment(), MemoryAllocator::kMaxAlignment);
Expand All @@ -69,7 +69,7 @@ TEST_F(MemoryManagerTest, ctor) {
.arbitratorCapacity = kCapacity,
.arbitratorReservedCapacity = 0}};
ASSERT_EQ(kCapacity, manager.capacity());
ASSERT_EQ(manager.numPools(), 1);
ASSERT_EQ(manager.numPools(), 2);
ASSERT_EQ(manager.testingDefaultRoot().alignment(), manager.alignment());
}
{
Expand All @@ -84,7 +84,7 @@ TEST_F(MemoryManagerTest, ctor) {
ASSERT_EQ(manager.testingDefaultRoot().alignment(), manager.alignment());
// TODO: replace with root pool memory tracker quota check.
ASSERT_EQ(
kSharedPoolCount + 1, manager.testingDefaultRoot().getChildCount());
kSharedPoolCount + 2, manager.testingDefaultRoot().getChildCount());
ASSERT_EQ(kCapacity, manager.capacity());
ASSERT_EQ(0, manager.getTotalBytes());
}
Expand All @@ -103,7 +103,7 @@ TEST_F(MemoryManagerTest, ctor) {
ASSERT_EQ(
manager.toString(),
"Memory Manager[capacity 4.00GB alignment 64B usedBytes 0B number of "
"pools 1\nList of root pools:\n\t__sys_root__\n"
"pools 2\nList of root pools:\n\t__sys_root__\n"
"Memory Allocator[MALLOC capacity 4.00GB allocated bytes 0 "
"allocated pages 0 mapped pages 0]\n"
"ARBITRATOR[SHARED CAPACITY[4.00GB] PENDING[0] "
Expand Down Expand Up @@ -246,10 +246,10 @@ TEST_F(MemoryManagerTest, addPoolWithArbitrator) {
TEST_F(MemoryManagerTest, defaultMemoryManager) {
auto& managerA = toMemoryManager(deprecatedDefaultMemoryManager());
auto& managerB = toMemoryManager(deprecatedDefaultMemoryManager());
const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 1;
ASSERT_EQ(managerA.numPools(), 1);
const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 2;
ASSERT_EQ(managerA.numPools(), 2);
ASSERT_EQ(managerA.testingDefaultRoot().getChildCount(), kSharedPoolCount);
ASSERT_EQ(managerB.numPools(), 1);
ASSERT_EQ(managerB.numPools(), 2);
ASSERT_EQ(managerB.testingDefaultRoot().getChildCount(), kSharedPoolCount);

auto child1 = managerA.addLeafPool("child_1");
Expand All @@ -260,41 +260,44 @@ TEST_F(MemoryManagerTest, defaultMemoryManager) {
kSharedPoolCount + 2, managerA.testingDefaultRoot().getChildCount());
EXPECT_EQ(
kSharedPoolCount + 2, managerB.testingDefaultRoot().getChildCount());
ASSERT_EQ(managerA.numPools(), 3);
ASSERT_EQ(managerB.numPools(), 3);
auto pool = managerB.addRootPool();
ASSERT_EQ(managerA.numPools(), 4);
ASSERT_EQ(managerB.numPools(), 4);
auto pool = managerB.addRootPool();
ASSERT_EQ(managerA.numPools(), 5);
ASSERT_EQ(managerB.numPools(), 5);
ASSERT_EQ(
managerA.toString(),
"Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 4\nList of root pools:\n\t__sys_root__\n\tdefault_root_0\n\trefcount 2\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]");
"Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 5\nList of root pools:\n\t__sys_root__\n\tdefault_root_0\n\trefcount 2\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]");
ASSERT_EQ(
managerB.toString(),
"Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 4\nList of root pools:\n\t__sys_root__\n\tdefault_root_0\n\trefcount 2\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]");
"Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 5\nList of root pools:\n\t__sys_root__\n\tdefault_root_0\n\trefcount 2\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]");
child1.reset();
EXPECT_EQ(
kSharedPoolCount + 1, managerA.testingDefaultRoot().getChildCount());
child2.reset();
EXPECT_EQ(kSharedPoolCount, managerB.testingDefaultRoot().getChildCount());
ASSERT_EQ(managerA.numPools(), 3);
ASSERT_EQ(managerB.numPools(), 3);
pool.reset();
ASSERT_EQ(managerA.numPools(), 2);
ASSERT_EQ(managerB.numPools(), 2);
pool.reset();
ASSERT_EQ(managerA.numPools(), 1);
ASSERT_EQ(managerB.numPools(), 1);
ASSERT_EQ(
managerA.toString(),
"Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 1\nList of root pools:\n\t__sys_root__\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]");
"Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 2\nList of root pools:\n\t__sys_root__\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]");
ASSERT_EQ(
managerB.toString(),
"Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 1\nList of root pools:\n\t__sys_root__\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]");
"Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 2\nList of root pools:\n\t__sys_root__\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]");
const std::string detailedManagerStr = managerA.toString(true);
ASSERT_THAT(
detailedManagerStr,
testing::HasSubstr(
"Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 1\nList of root pools:\n__sys_root__ usage 0B reserved 0B peak 0B\n"));
"Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 2\nList of root pools:\n__sys_root__ usage 0B reserved 0B peak 0B\n"));
ASSERT_THAT(
detailedManagerStr,
testing::HasSubstr("__sys_spilling__ usage 0B reserved 0B peak 0B\n"));
ASSERT_THAT(
detailedManagerStr,
testing::HasSubstr("__sys_tracing__ usage 0B reserved 0B peak 0B\n"));
for (int i = 0; i < 32; ++i) {
ASSERT_THAT(
managerA.toString(true),
Expand All @@ -306,7 +309,7 @@ TEST_F(MemoryManagerTest, defaultMemoryManager) {
// TODO: remove this test when remove deprecatedAddDefaultLeafMemoryPool.
TEST(MemoryHeaderTest, addDefaultLeafMemoryPool) {
auto& manager = toMemoryManager(deprecatedDefaultMemoryManager());
const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 1;
const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 2;
ASSERT_EQ(manager.testingDefaultRoot().getChildCount(), kSharedPoolCount);
{
auto poolA = deprecatedAddDefaultLeafMemoryPool();
Expand Down Expand Up @@ -361,7 +364,7 @@ TEST_F(MemoryManagerTest, memoryPoolManagement) {
MemoryManagerOptions options;
options.alignment = alignment;
MemoryManager manager{options};
ASSERT_EQ(manager.numPools(), 1);
ASSERT_EQ(manager.numPools(), 2);
const int numPools = 100;
std::vector<std::shared_ptr<MemoryPool>> userRootPools;
std::vector<std::shared_ptr<MemoryPool>> userLeafPools;
Expand All @@ -386,14 +389,14 @@ TEST_F(MemoryManagerTest, memoryPoolManagement) {
ASSERT_FALSE(rootUnamedPool->name().empty());
ASSERT_EQ(rootUnamedPool->kind(), MemoryPool::Kind::kAggregate);
ASSERT_EQ(rootUnamedPool->parent(), nullptr);
ASSERT_EQ(manager.numPools(), 1 + numPools + 2);
ASSERT_EQ(manager.numPools(), 1 + numPools + 2 + 1);
userLeafPools.clear();
leafUnamedPool.reset();
ASSERT_EQ(manager.numPools(), 1 + numPools / 2 + 1);
ASSERT_EQ(manager.numPools(), 1 + numPools / 2 + 1 + 1);
userRootPools.clear();
ASSERT_EQ(manager.numPools(), 1 + 1);
ASSERT_EQ(manager.numPools(), 1 + 2);
rootUnamedPool.reset();
ASSERT_EQ(manager.numPools(), 1);
ASSERT_EQ(manager.numPools(), 2);
}

// TODO: when run sequentially, e.g. `buck run dwio/memory/...`, this has side
Expand All @@ -410,7 +413,7 @@ TEST_F(MemoryManagerTest, globalMemoryManager) {
ASSERT_NE(manager, globalManager);
ASSERT_EQ(manager, memoryManager());
auto* managerII = memoryManager();
const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 1;
const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 2;
{
auto& rootI = manager->testingDefaultRoot();
const std::string childIName("some_child");
Expand Down Expand Up @@ -444,9 +447,9 @@ TEST_F(MemoryManagerTest, globalMemoryManager) {
ASSERT_EQ(userRootChild->kind(), MemoryPool::Kind::kAggregate);
ASSERT_EQ(rootI.getChildCount(), kSharedPoolCount + 1);
ASSERT_EQ(rootII.getChildCount(), kSharedPoolCount + 1);
ASSERT_EQ(manager->numPools(), 2 + 1);
ASSERT_EQ(manager->numPools(), 2 + 2);
}
ASSERT_EQ(manager->numPools(), 1);
ASSERT_EQ(manager->numPools(), 2);
}

TEST_F(MemoryManagerTest, alignmentOptionCheck) {
Expand Down Expand Up @@ -544,9 +547,9 @@ TEST_F(MemoryManagerTest, concurrentPoolAccess) {
}
stopCheck = true;
checkThread.join();
ASSERT_EQ(manager.numPools(), pools.size() + 1);
ASSERT_EQ(manager.numPools(), pools.size() + 2);
pools.clear();
ASSERT_EQ(manager.numPools(), 1);
ASSERT_EQ(manager.numPools(), 2);
}

TEST_F(MemoryManagerTest, quotaEnforcement) {
Expand Down Expand Up @@ -654,7 +657,7 @@ TEST_F(MemoryManagerTest, disableMemoryPoolTracking) {
ASSERT_EQ(manager.capacity(), 64LL << 20);
ASSERT_EQ(manager.shrinkPools(), 0);
// Default 1 system pool with 1 leaf child
ASSERT_EQ(manager.numPools(), 1);
ASSERT_EQ(manager.numPools(), 2);

VELOX_ASSERT_THROW(
leaf0->allocate(38LL << 20), "Exceeded memory pool capacity");
Expand Down
25 changes: 25 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,16 @@ class QueryConfig {
/// derived using micro-benchmarking.
static constexpr const char* kPrefixSortMinRows = "prefixsort_min_rows";

/// Enable query tracing flag.
static constexpr const char* kQueryTraceEnabled = "query_trace_enabled";
duanmeng marked this conversation as resolved.
Show resolved Hide resolved

/// Base dir of a query to store tracing data.
static constexpr const char* kQueryTraceDir = "query_trace_dir";

/// A comma-separated list of plan node ids whose input data will be traced.
/// Empty string if only want to trace the query metadata.
static constexpr const char* kQueryTraceNodeIds = "query_trace_node_ids";

uint64_t queryMaxMemoryPerNode() const {
return config::toCapacity(
get<std::string>(kQueryMaxMemoryPerNode, "0B"),
Expand Down Expand Up @@ -611,6 +621,21 @@ class QueryConfig {
return get<int32_t>(kSpillableReservationGrowthPct, kDefaultPct);
}

/// Returns true if query tracing is enabled.
bool queryTraceEnabled() const {
return get<bool>(kQueryTraceEnabled, false);
}

std::string queryTraceDir() const {
// The default query trace dir, empty by default.
return get<std::string>(kQueryTraceDir, "");
}

std::string queryTraceNodeIds() const {
// The default query trace nodes, empty by default.
return get<std::string>(kQueryTraceNodeIds, "");
}

bool prestoArrayAggIgnoreNulls() const {
return get<bool>(kPrestoArrayAggIgnoreNulls, false);
}
Expand Down
25 changes: 25 additions & 0 deletions velox/docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -686,4 +686,29 @@ Spark-specific Configuration
the value of this config can not exceed the default value.
* - spark.partition_id
- integer
-
- The current task's Spark partition ID. It's set by the query engine (Spark) prior to task execution.

Tracing
--------
.. list-table::
:widths: 30 10 10 70
:header-rows: 1

* - Property Name
- Type
- Default Value
- Description
* - query_trace_enabled
- bool
- true
- If true, enable query tracing.
* - query_trace_dir
- string
-
- The root directory to store the tracing data and metadata for a query.
* - query_trace_node_ids
- string
-
- A comma-separated list of plan node ids whose input data will be trace. If it is empty, then we only trace the
query metadata which includes the query plan and configs etc.
42 changes: 41 additions & 1 deletion velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@
#include "velox/exec/HashBuild.h"
#include "velox/exec/LocalPlanner.h"
#include "velox/exec/MemoryReclaimer.h"
#include "velox/exec/Merge.h"
#include "velox/exec/NestedLoopJoinBuild.h"
#include "velox/exec/OperatorUtils.h"
#include "velox/exec/OutputBufferManager.h"
#include "velox/exec/Task.h"
#include "velox/exec/trace/QueryTraceUtil.h"

using facebook::velox::common::testutil::TestValue;

Expand Down Expand Up @@ -293,6 +293,7 @@ Task::Task(
planFragment_(std::move(planFragment)),
destination_(destination),
queryCtx_(std::move(queryCtx)),
traceConfig_(maybeMakeTraceConfig()),
mode_(mode),
consumerSupplier_(std::move(consumerSupplier)),
onError_(std::move(onError)),
Expand All @@ -304,6 +305,8 @@ Task::Task(
VELOX_CHECK_NULL(
dynamic_cast<const folly::InlineLikeExecutor*>(queryCtx_->executor()));
}

maybeInitQueryTrace();
}

Task::~Task() {
Expand Down Expand Up @@ -2833,6 +2836,43 @@ std::shared_ptr<ExchangeClient> Task::getExchangeClientLocked(
return exchangeClients_[pipelineId];
}

std::optional<trace::QueryTraceConfig> Task::maybeMakeTraceConfig() const {
const auto& queryConfig = queryCtx_->queryConfig();
if (!queryConfig.queryTraceEnabled()) {
return std::nullopt;
}

VELOX_USER_CHECK(
!queryConfig.queryTraceDir().empty(),
"Query trace enabled but the trace dir is not set");

const auto queryTraceNodes = queryConfig.queryTraceNodeIds();
if (queryTraceNodes.empty()) {
duanmeng marked this conversation as resolved.
Show resolved Hide resolved
return trace::QueryTraceConfig(queryConfig.queryTraceDir());
}

std::vector<std::string> nodes;
folly::split(',', queryTraceNodes, nodes);
std::unordered_set<std::string> nodeSet(nodes.begin(), nodes.end());
VELOX_CHECK_EQ(nodeSet.size(), nodes.size());
duanmeng marked this conversation as resolved.
Show resolved Hide resolved
LOG(INFO) << "Query trace plan node ids: " << queryTraceNodes;
return trace::QueryTraceConfig(
std::move(nodeSet), queryConfig.queryTraceDir());
}

void Task::maybeInitQueryTrace() {
if (!traceConfig_) {
return;
}

const auto traceTaskDir =
fmt::format("{}/{}", traceConfig_->queryTraceDir, taskId_);
trace::createTraceDirectory(traceTaskDir);
const auto queryMetadatWriter = std::make_unique<trace::QueryMetadataWriter>(
traceTaskDir, memory::traceMemoryPool());
queryMetadatWriter->write(queryCtx_, planFragment_.planNode);
}

void Task::testingVisitDrivers(const std::function<void(Driver*)>& callback) {
std::lock_guard<std::timed_mutex> l(mutex_);
for (int i = 0; i < drivers_.size(); ++i) {
Expand Down
Loading
Loading