Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add OderBy output stage spill #7759

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 3 additions & 16 deletions velox/exec/OrderBy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,24 +78,11 @@ void OrderBy::reclaim(
memory::MemoryReclaimer::Stats& stats) {
VELOX_CHECK(canReclaim());
VELOX_CHECK(!nonReclaimableSection_);
auto* driver = operatorCtx_->driver();

// NOTE: an order by operator is reclaimable if it hasn't started output
// processing and is not under non-reclaimable execution section.
if (noMoreInput_) {
// TODO: reduce the log frequency if it is too verbose.
++stats.numNonReclaimableAttempts;
LOG(WARNING)
<< "Can't reclaim from order by operator which has started producing output: "
<< pool()->name()
<< ", usage: " << succinctBytes(pool()->currentBytes())
<< ", reservation: " << succinctBytes(pool()->reservedBytes());
return;
}

// TODO: support fine-grain disk spilling based on 'targetBytes' after having
// row container memory compaction support later.
// TODO: support fine-grain disk spilling based on 'targetBytes' after
// having row container memory compaction support later.
sortBuffer_->spill();

// Release the minimum reserved memory.
pool()->release();
}
Expand Down
80 changes: 57 additions & 23 deletions velox/exec/SortBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

#include "SortBuffer.h"
#include "velox/exec/MemoryReclaimer.h"
#include "velox/vector/BaseVector.h"

namespace facebook::velox::exec {

Expand Down Expand Up @@ -133,11 +132,7 @@ void SortBuffer::noMoreInput() {
// for now.
spill();

// Finish spill, and we shouldn't get any rows from non-spilled partition as
// there is only one hash partition for SortBuffer.
VELOX_CHECK_NULL(spillMerger_);
auto spillPartition = spiller_->finishSpill();
spillMerger_ = spillPartition.createOrderedReader(pool());
finishSpill();
}

// Releases the unused memory reservation after procesing input.
Expand Down Expand Up @@ -170,24 +165,11 @@ void SortBuffer::spill() {
}
updateEstimatedOutputRowSize();

if (spiller_ == nullptr) {
spiller_ = std::make_unique<Spiller>(
Spiller::Type::kOrderBy,
data_.get(),
spillerStoreType_,
data_->keyTypes().size(),
sortCompareFlags_,
spillConfig_->getSpillDirPathCb,
spillConfig_->fileNamePrefix,
spillConfig_->writeBufferSize,
spillConfig_->compressionKind,
memory::spillMemoryPool(),
spillConfig_->executor);
VELOX_CHECK_EQ(spiller_->state().maxPartitions(), 1);
if (sortedRows_.empty()) {
spillInput();
} else {
spillOutput();
}

spiller_->spill();
data_->clear();
}

std::optional<uint64_t> SortBuffer::estimateOutputRowSize() const {
Expand Down Expand Up @@ -278,6 +260,52 @@ void SortBuffer::updateEstimatedOutputRowSize() {
}
}

void SortBuffer::spillInput() {
if (spiller_ == nullptr) {
VELOX_CHECK(!noMoreInput_);
spiller_ = std::make_unique<Spiller>(
Spiller::Type::kOrderByInput,
data_.get(),
spillerStoreType_,
data_->keyTypes().size(),
sortCompareFlags_,
spillConfig_->getSpillDirPathCb,
spillConfig_->fileNamePrefix,
spillConfig_->writeBufferSize,
spillConfig_->compressionKind,
memory::spillMemoryPool(),
spillConfig_->executor);
}
spiller_->spill();
data_->clear();
}

void SortBuffer::spillOutput() {
if (spiller_ != nullptr) {
// Already spilled.
return;
}

spiller_ = std::make_unique<Spiller>(
Spiller::Type::kOrderByOutput,
data_.get(),
spillerStoreType_,
spillConfig_->getSpillDirPathCb,
spillConfig_->fileNamePrefix,
spillConfig_->writeBufferSize,
spillConfig_->compressionKind,
memory::spillMemoryPool(),
spillConfig_->executor);
auto spillRows = std::vector<char*>(
sortedRows_.begin() + numOutputRows_, sortedRows_.end());
spiller_->spill(spillRows);
data_->clear();
sortedRows_.clear();
// Finish right after spilling as the output spiller only spills at most
// once.
finishSpill();
}

void SortBuffer::prepareOutput(uint32_t maxOutputRows) {
VELOX_CHECK_GT(maxOutputRows, 0);
VELOX_CHECK_GT(numInputRows_, numOutputRows_);
Expand Down Expand Up @@ -364,4 +392,10 @@ void SortBuffer::getOutputWithSpill() {
numOutputRows_ += output_->size();
}

void SortBuffer::finishSpill() {
VELOX_CHECK_NULL(spillMerger_);
auto spillPartition = spiller_->finishSpill();
spillMerger_ = spillPartition.createOrderedReader(pool());
}

} // namespace facebook::velox::exec
7 changes: 7 additions & 0 deletions velox/exec/SortBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ class SortBuffer {
void prepareOutput(uint32_t maxOutputRows);
void getOutputWithoutSpill();
void getOutputWithSpill();
// Spill during input stage.
void spillInput();
// Spill during output stage.
void spillOutput();
// Finish spill, and we shouldn't get any rows from non-spilled partition as
// there is only one hash partition for SortBuffer.
void finishSpill();

const RowTypePtr input_;
const std::vector<CompareFlags> sortCompareFlags_;
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/SortWindowBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ void SortWindowBuild::setupSpiller() {

spiller_ = std::make_unique<Spiller>(
// TODO Replace Spiller::Type::kOrderBy.
Spiller::Type::kOrderBy,
Spiller::Type::kOrderByInput,
data_.get(),
inputType_,
spillCompareFlags_.size(),
Expand Down
68 changes: 51 additions & 17 deletions velox/exec/Spiller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ Spiller::Spiller(
executor,
writeFileOptions) {
VELOX_CHECK(
type_ == Type::kOrderBy || type_ == Type::kAggregateInput,
type_ == Type::kOrderByInput || type_ == Type::kAggregateInput,
"Unexpected spiller type: {}",
typeName(type_));
VELOX_CHECK_EQ(state_.maxPartitions(), 1);
Expand Down Expand Up @@ -94,9 +94,8 @@ Spiller::Spiller(
pool,
executor,
writeFileOptions) {
VELOX_CHECK_EQ(
type,
Type::kAggregateOutput,
VELOX_CHECK(
type_ == Type::kAggregateOutput || type_ == Type::kOrderByOutput,
"Unexpected spiller type: {}",
typeName(type_));
VELOX_CHECK_EQ(state_.maxPartitions(), 1);
Expand Down Expand Up @@ -447,9 +446,11 @@ void Spiller::runSpill() {
VELOX_CHECK_EQ(numWritten, run.rows.size());
run.clear();
// When a sorted run ends, we start with a new file next time. For
// aggregation output spiller, we expect only one spill call to spill all
// the rows starting from the specified row offset.
if (needSort() || (type_ == Spiller::Type::kAggregateOutput)) {
// aggregation output / orderby output spiller, we expect only one spill
// call to spill all the rows starting from the specified row offset.
if (needSort() ||
(type_ == Spiller::Type::kAggregateOutput ||
type_ == Spiller::Type::kOrderByOutput)) {
state_.finishFile(partition);
}
}
Expand All @@ -467,7 +468,7 @@ void Spiller::updateSpillSortTime(uint64_t timeUs) {

bool Spiller::needSort() const {
return type_ != Type::kHashJoinProbe && type_ != Type::kHashJoinBuild &&
type_ != Type::kAggregateOutput;
type_ != Type::kAggregateOutput && type_ != Type::kOrderByOutput;
}

void Spiller::spill() {
Expand All @@ -483,25 +484,41 @@ void Spiller::spill(const RowContainerIterator* startRowIter) {
CHECK_NOT_FINALIZED();
VELOX_CHECK_NE(type_, Type::kHashJoinProbe);

// Marks all the partitions have been spilled as we don't support fine-grained
// spilling as for now.
for (auto partition = 0; partition < state_.maxPartitions(); ++partition) {
if (!state_.isPartitionSpilled(partition)) {
state_.setPartitionSpilled(partition);
}
}
markAllPartitionsSpilled();

fillSpillRuns(startRowIter);
runSpill();
checkEmptySpillRuns();
}

void Spiller::spill(std::vector<char*>& rows) {
CHECK_NOT_FINALIZED();
VELOX_CHECK_EQ(type_, Type::kOrderByOutput);
if (rows.empty()) {
return;
}

markAllPartitionsSpilled();

fillSpillRun(rows);
runSpill();
checkEmptySpillRuns();
}

void Spiller::checkEmptySpillRuns() const {
for (const auto& spillRun : spillRuns_) {
VELOX_CHECK(spillRun.rows.empty());
}
}

void Spiller::markAllPartitionsSpilled() {
for (auto partition = 0; partition < state_.maxPartitions(); ++partition) {
if (!state_.isPartitionSpilled(partition)) {
state_.setPartitionSpilled(partition);
}
}
}

void Spiller::spill(uint32_t partition, const RowVectorPtr& spillVector) {
CHECK_NOT_FINALIZED();
VELOX_CHECK(
Expand Down Expand Up @@ -598,6 +615,21 @@ void Spiller::fillSpillRuns(const RowContainerIterator* startRowIter) {
updateSpillFillTime(execTimeUs);
}

void Spiller::fillSpillRun(std::vector<char*>& rows) {
VELOX_CHECK_EQ(bits_.numPartitions(), 1);
checkEmptySpillRuns();
uint64_t execTimeUs{0};
{
MicrosecondTimer timer(&execTimeUs);
spillRuns_[0].rows =
SpillRows(rows.begin(), rows.end(), spillRuns_[0].rows.get_allocator());
for (const auto* row : rows) {
spillRuns_[0].numBytes += container_->rowSize(row);
xiaoxmeng marked this conversation as resolved.
Show resolved Hide resolved
}
}
updateSpillFillTime(execTimeUs);
}

std::string Spiller::toString() const {
return fmt::format(
"{}\t{}\tMAX_PARTITIONS:{}\tFINALIZED:{}",
Expand All @@ -610,8 +642,10 @@ std::string Spiller::toString() const {
// static
std::string Spiller::typeName(Type type) {
switch (type) {
case Type::kOrderBy:
return "ORDER_BY";
case Type::kOrderByInput:
return "ORDER_BY_INPUT";
case Type::kOrderByOutput:
return "ORDER_BY_OUTPUT";
case Type::kHashJoinBuild:
return "HASH_JOIN_BUILD";
case Type::kHashJoinProbe:
Expand Down
23 changes: 20 additions & 3 deletions velox/exec/Spiller.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,14 @@ class Spiller {
kHashJoinBuild = 2,
// Used for hash join probe.
kHashJoinProbe = 3,
// Used for order by.
kOrderBy = 4,
// Used for order by input processing stage.
kOrderByInput = 4,
// Used for order by output processing stage.
kOrderByOutput = 5,
// Number of spiller types.
kNumTypes = 5,
kNumTypes = 6,
};

static std::string typeName(Type);

using SpillRows = std::vector<char*, memory::StlAllocator<char*>>;
Expand Down Expand Up @@ -118,6 +121,12 @@ class Spiller {
/// The caller needs to erase them from the row container.
void spill(const RowContainerIterator& startRowIter);

/// Invoked to spill all the rows pointed by rows. This is used by
/// 'kOrderByOutput' spiller type to spill during the order by
/// output processing. Similarly, the spilled rows still stays in the row
/// container. The caller needs to erase them from the row container.
void spill(std::vector<char*>& rows);

/// Append 'spillVector' into the spill file of given 'partition'. It is now
/// only used by the spilling operator which doesn't need data sort, such as
/// hash join build and hash join probe.
Expand Down Expand Up @@ -274,11 +283,19 @@ class Spiller {

void checkEmptySpillRuns() const;

// Marks all the partitions have been spilled as we don't support
// fine-grained spilling as for now.
void markAllPartitionsSpilled();
xiaoxmeng marked this conversation as resolved.
Show resolved Hide resolved

// Prepares spill runs for the spillable data from all the hash partitions.
// If 'startRowIter' is not null, we prepare runs starting from the offset
// pointed by 'startRowIter'.
void fillSpillRuns(const RowContainerIterator* startRowIter = nullptr);

// Prepares spill run of a single partition for the spillable data from the
// rows.
void fillSpillRun(std::vector<char*>& rows);

// Writes out all the rows collected in spillRuns_.
void runSpill();

Expand Down
2 changes: 1 addition & 1 deletion velox/exec/TopNRowNumber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,7 @@ void TopNRowNumber::setupSpiller() {

spiller_ = std::make_unique<Spiller>(
// TODO Replace Spiller::Type::kOrderBy.
Spiller::Type::kOrderBy,
Spiller::Type::kOrderByInput,
data_.get(),
inputType_,
spillCompareFlags_.size(),
Expand Down
13 changes: 4 additions & 9 deletions velox/exec/tests/OrderByTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1082,15 +1082,10 @@ DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringOutputProcessing) {

if (enableSpilling) {
ASSERT_GT(reclaimableBytes, 0);
const auto usedMemoryBytes = op->pool()->currentBytes();
reclaimAndRestoreCapacity(
op,
folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_),
reclaimerStats_);
ASSERT_GT(reclaimerStats_.reclaimedBytes, 0);
reclaimerStats_.reset();
reclaimAndRestoreCapacity(op, reclaimableBytes, reclaimerStats_);
ASSERT_EQ(reclaimerStats_.reclaimedBytes, reclaimableBytes);
ASSERT_GT(reclaimerStats_.reclaimExecTimeUs, 0);
// No reclaim as the operator has started output processing.
ASSERT_EQ(usedMemoryBytes, op->pool()->currentBytes());
} else {
ASSERT_EQ(reclaimableBytes, 0);
VELOX_ASSERT_THROW(
Expand All @@ -1108,7 +1103,7 @@ DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringOutputProcessing) {
ASSERT_EQ(stats[0].operatorStats[1].spilledPartitions, 0);
OperatorTestBase::deleteTaskAndCheckSpillDirectory(task);
}
ASSERT_EQ(reclaimerStats_.numNonReclaimableAttempts, 1);
ASSERT_EQ(reclaimerStats_.numNonReclaimableAttempts, 0);
}

DEBUG_ONLY_TEST_F(OrderByTest, abortDuringOutputProcessing) {
Expand Down
Loading