Skip to content

Commit

Permalink
Improve spill related logging (#11200)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #11200

Improve spill related logging by always printing warning log if a spillable operator is under non-reclaimable section.
Add runtime stats to record if a spillable operator doesn't support spill in certain config like window operator doesn't
support to spill without partitioning keys to ease debug query OOM.

Reviewed By: tanjialiang

Differential Revision: D64075306

fbshipit-source-id: 88d6bd0b02798e9bf78ce9e0ae5ed0396a4a48ae
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Oct 9, 2024
1 parent 530b446 commit 6a7c844
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 47 deletions.
5 changes: 5 additions & 0 deletions velox/docs/monitoring/stats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ These stats are reported by operators that support spilling.
* - Stats
- Unit
- Description
* - spillNotSupported
- nanos
- The number of a spillable operators that don't support spill because of
spill limitation. For instance, a window operator do not support spill
if there is no partitioning.
* - spillFillWallNanos
- nanos
- The time spent on filling rows for spilling.
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/GroupingSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class GroupingSet {
/// is full or reclaims memory from distinct aggregation after it has received
/// all the inputs. If 'freeTable' is false, then hash table itself is not
/// freed but only table content.
void resetTable(bool freeTable = false);
void resetTable(bool freeTable);

/// Returns true if 'this' should start producing partial
/// aggregation results. Checks the memory consumption against
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/HashAggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ void HashAggregation::resetPartialOutputIfNeed() {
lockedStats->addRuntimeStat(
"partialAggregationPct", RuntimeCounter(aggregationPct));
}
groupingSet_->resetTable();
groupingSet_->resetTable(/*freeTable=*/false);
partialFull_ = false;
if (!finished_) {
maybeIncreasePartialAggregationMemoryUsage(aggregationPct);
Expand Down
33 changes: 16 additions & 17 deletions velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,9 @@ void HashBuild::setupSpiller(SpillPartition* spillPartition) {
// out of memory if the restored partition still can't fit in memory.
if (config->exceedSpillLevelLimit(startPartitionBit)) {
RECORD_METRIC_VALUE(kMetricMaxSpillLevelExceededCount);
FB_LOG_EVERY_MS(WARNING, 1'000)
<< "Exceeded spill level limit: " << config->maxSpillLevel
<< ", and disable spilling for memory pool: " << pool()->name();
LOG(WARNING) << "Exceeded spill level limit: " << config->maxSpillLevel
<< ", and disable spilling for memory pool: "
<< pool()->name();
++spillStats_.wlock()->spillMaxLevelExceededCount;
exceededMaxSpillLevelLimit_ = true;
return;
Expand Down Expand Up @@ -1076,15 +1076,14 @@ void HashBuild::reclaim(
// TODO: reduce the log frequency if it is too verbose.
RECORD_METRIC_VALUE(kMetricMemoryNonReclaimableCount);
++stats.numNonReclaimableAttempts;
FB_LOG_EVERY_MS(WARNING, 1'000)
<< "Can't reclaim from hash build operator, state_["
<< stateName(state_) << "], nonReclaimableSection_["
<< nonReclaimableSection_ << "], spiller_["
<< (stateCleared_
? "cleared"
: (spiller_->finalized() ? "finalized" : "non-finalized"))
<< "] " << pool()->name()
<< ", usage: " << succinctBytes(pool()->usedBytes());
LOG(WARNING) << "Can't reclaim from hash build operator, state_["
<< stateName(state_) << "], nonReclaimableSection_["
<< nonReclaimableSection_ << "], spiller_["
<< (stateCleared_ ? "cleared"
: (spiller_->finalized() ? "finalized"
: "non-finalized"))
<< "] " << pool()->name()
<< ", usage: " << succinctBytes(pool()->usedBytes());
return;
}

Expand All @@ -1100,11 +1099,11 @@ void HashBuild::reclaim(
// TODO: reduce the log frequency if it is too verbose.
RECORD_METRIC_VALUE(kMetricMemoryNonReclaimableCount);
++stats.numNonReclaimableAttempts;
FB_LOG_EVERY_MS(WARNING, 1'000)
<< "Can't reclaim from hash build operator, state_["
<< stateName(buildOp->state_) << "], nonReclaimableSection_["
<< buildOp->nonReclaimableSection_ << "], " << buildOp->pool()->name()
<< ", usage: " << succinctBytes(buildOp->pool()->usedBytes());
LOG(WARNING) << "Can't reclaim from hash build operator, state_["
<< stateName(buildOp->state_) << "], nonReclaimableSection_["
<< buildOp->nonReclaimableSection_ << "], "
<< buildOp->pool()->name() << ", usage: "
<< succinctBytes(buildOp->pool()->usedBytes());
return;
}
}
Expand Down
57 changes: 29 additions & 28 deletions velox/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1696,19 +1696,19 @@ void HashProbe::reclaim(
if (nonReclaimableState()) {
RECORD_METRIC_VALUE(kMetricMemoryNonReclaimableCount);
++stats.numNonReclaimableAttempts;
FB_LOG_EVERY_MS(WARNING, 1'000)
<< "Can't reclaim from hash probe operator, state_["
<< ProbeOperatorState(state_) << "], nonReclaimableSection_["
<< nonReclaimableSection_ << "], inputSpiller_["
<< (inputSpiller_ == nullptr ? "nullptr" : "initialized")
<< "], table_[" << (table_ == nullptr ? "nullptr" : "initialized")
<< "], table_ numDistinct["
<< (table_ == nullptr ? "nullptr"
: std::to_string(table_->numDistinct()))
<< "], " << pool()->name()
<< ", usage: " << succinctBytes(pool()->usedBytes())
<< ", node pool reservation: "
<< succinctBytes(pool()->parent()->reservedBytes());
LOG(WARNING) << "Can't reclaim from hash probe operator, state_["
<< ProbeOperatorState(state_) << "], nonReclaimableSection_["
<< nonReclaimableSection_ << "], inputSpiller_["
<< (inputSpiller_ == nullptr ? "nullptr" : "initialized")
<< "], table_["
<< (table_ == nullptr ? "nullptr" : "initialized")
<< "], table_ numDistinct["
<< (table_ == nullptr ? "nullptr"
: std::to_string(table_->numDistinct()))
<< "], " << pool()->name()
<< ", usage: " << succinctBytes(pool()->usedBytes())
<< ", node pool reservation: "
<< succinctBytes(pool()->parent()->reservedBytes());
return;
}

Expand All @@ -1723,21 +1723,22 @@ void HashProbe::reclaim(
RECORD_METRIC_VALUE(kMetricMemoryNonReclaimableCount);
++stats.numNonReclaimableAttempts;
const auto* peerPool = probeOp->pool();
FB_LOG_EVERY_MS(WARNING, 1'000)
<< "Can't reclaim from hash probe operator, state_["
<< ProbeOperatorState(probeOp->state_) << "], nonReclaimableSection_["
<< probeOp->nonReclaimableSection_ << "], inputSpiller_["
<< (probeOp->inputSpiller_ == nullptr ? "nullptr" : "initialized")
<< "], table_["
<< (probeOp->table_ == nullptr ? "nullptr" : "initialized")
<< "], table_ numDistinct["
<< (probeOp->table_ == nullptr
? "nullptr"
: std::to_string(probeOp->table_->numDistinct()))
<< "], " << peerPool->name()
<< ", usage: " << succinctBytes(peerPool->usedBytes())
<< ", node pool reservation: "
<< succinctBytes(peerPool->parent()->reservedBytes());
LOG(WARNING) << "Can't reclaim from hash probe operator, state_["
<< ProbeOperatorState(probeOp->state_)
<< "], nonReclaimableSection_["
<< probeOp->nonReclaimableSection_ << "], inputSpiller_["
<< (probeOp->inputSpiller_ == nullptr ? "nullptr"
: "initialized")
<< "], table_["
<< (probeOp->table_ == nullptr ? "nullptr" : "initialized")
<< "], table_ numDistinct["
<< (probeOp->table_ == nullptr
? "nullptr"
: std::to_string(probeOp->table_->numDistinct()))
<< "], " << peerPool->name()
<< ", usage: " << succinctBytes(peerPool->usedBytes())
<< ", node pool reservation: "
<< succinctBytes(peerPool->parent()->reservedBytes());
return;
}
hasMoreProbeInput |= !probeOp->noMoreSpillInput_;
Expand Down
5 changes: 5 additions & 0 deletions velox/exec/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,11 @@ class Operator : public BaseRuntimeStatWriter {

/// The name of the runtime spill stats collected and reported by operators
/// that support spilling.

/// This indicates the spill not supported for a spillable operator when the
/// spill config is enabled. This is due to the spill limitation in certain
/// plan node config such as unpartition window operator.
static inline const std::string kSpillNotSupported{"spillNotSupported"};
/// The spill write stats.
static inline const std::string kSpillFillTime{"spillFillWallNanos"};
static inline const std::string kSpillSortTime{"spillSortWallNanos"};
Expand Down
5 changes: 5 additions & 0 deletions velox/exec/Window.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ Window::Window(
stringAllocator_(pool()) {
auto* spillConfig =
spillConfig_.has_value() ? &spillConfig_.value() : nullptr;
if (spillConfig == nullptr &&
operatorCtx_->driverCtx()->queryConfig().windowSpillEnabled()) {
auto lockedStats = stats_.wlock();
lockedStats->runtimeStats.emplace(kSpillNotSupported, RuntimeMetric(1));
}
if (windowNode->inputsSorted()) {
if (supportRowsStreaming()) {
windowBuild_ = std::make_unique<RowsStreamingWindowBuild>(
Expand Down
44 changes: 44 additions & 0 deletions velox/exec/tests/WindowTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,50 @@ TEST_F(WindowTest, spill) {
ASSERT_GT(stats.spilledPartitions, 0);
}

TEST_F(WindowTest, spillUnsupported) {
const vector_size_t size = 1'000;
auto data = makeRowVector(
{"d", "p", "s"},
{
// Payload.
makeFlatVector<int64_t>(size, [](auto row) { return row; }),
// Partition key.
makeFlatVector<int16_t>(size, [](auto row) { return row % 11; }),
// Sorting key.
makeFlatVector<int32_t>(size, [](auto row) { return row; }),
});

createDuckDbTable({data});

core::PlanNodeId windowId;
auto plan = PlanBuilder()
.values(split(data, 10))
.window({"row_number() over (order by s)"})
.capturePlanNodeId(windowId)
.planNode();

auto spillDirectory = TempDirectoryPath::create();
TestScopedSpillInjection scopedSpillInjection(100);
auto task =
AssertQueryBuilder(plan, duckDbQueryRunner_)
.config(core::QueryConfig::kPreferredOutputBatchBytes, "1024")
.config(core::QueryConfig::kSpillEnabled, "true")
.config(core::QueryConfig::kWindowSpillEnabled, "true")
.spillDirectory(spillDirectory->getPath())
.assertResults("SELECT *, row_number() over (order by s) FROM tmp");

auto taskStats = exec::toPlanStats(task->taskStats());
const auto& stats = taskStats.at(windowId);

ASSERT_EQ(stats.spilledBytes, 0);
ASSERT_EQ(stats.spilledRows, 0);
ASSERT_EQ(stats.spilledFiles, 0);
ASSERT_EQ(stats.spilledPartitions, 0);
auto opStats = toOperatorStats(task->taskStats());
ASSERT_GT(
opStats.at("Window").runtimeStats[Operator::kSpillNotSupported].sum, 1);
}

TEST_F(WindowTest, rowBasedStreamingWindowOOM) {
const vector_size_t size = 1'000'000;
auto data = makeRowVector(
Expand Down

0 comments on commit 6a7c844

Please sign in to comment.