Skip to content

Commit

Permalink
Add OderBy output stage spill (#7759)
Browse files Browse the repository at this point in the history
Summary:
Add a new type of spiller named `kOrderByOutput`.
It is used to spill data during the output processing
stage of `OrderBy` operator. The spiller uses a newly
added API that supports spill data by row pointers.

Pull Request resolved: #7759

Reviewed By: tanjialiang

Differential Revision: D51947152

Pulled By: xiaoxmeng

fbshipit-source-id: 10be66d8d4fa570b8a377d0ee7dac7bb18982841
  • Loading branch information
duanmeng authored and facebook-github-bot committed Dec 7, 2023
1 parent 61ab60d commit ddc3471
Show file tree
Hide file tree
Showing 9 changed files with 251 additions and 81 deletions.
19 changes: 3 additions & 16 deletions velox/exec/OrderBy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,24 +78,11 @@ void OrderBy::reclaim(
memory::MemoryReclaimer::Stats& stats) {
VELOX_CHECK(canReclaim());
VELOX_CHECK(!nonReclaimableSection_);
auto* driver = operatorCtx_->driver();

// NOTE: an order by operator is reclaimable if it hasn't started output
// processing and is not under non-reclaimable execution section.
if (noMoreInput_) {
// TODO: reduce the log frequency if it is too verbose.
++stats.numNonReclaimableAttempts;
LOG(WARNING)
<< "Can't reclaim from order by operator which has started producing output: "
<< pool()->name()
<< ", usage: " << succinctBytes(pool()->currentBytes())
<< ", reservation: " << succinctBytes(pool()->reservedBytes());
return;
}

// TODO: support fine-grain disk spilling based on 'targetBytes' after having
// row container memory compaction support later.
// TODO: support fine-grain disk spilling based on 'targetBytes' after
// having row container memory compaction support later.
sortBuffer_->spill();

// Release the minimum reserved memory.
pool()->release();
}
Expand Down
80 changes: 57 additions & 23 deletions velox/exec/SortBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

#include "SortBuffer.h"
#include "velox/exec/MemoryReclaimer.h"
#include "velox/vector/BaseVector.h"

namespace facebook::velox::exec {

Expand Down Expand Up @@ -133,11 +132,7 @@ void SortBuffer::noMoreInput() {
// for now.
spill();

// Finish spill, and we shouldn't get any rows from non-spilled partition as
// there is only one hash partition for SortBuffer.
VELOX_CHECK_NULL(spillMerger_);
auto spillPartition = spiller_->finishSpill();
spillMerger_ = spillPartition.createOrderedReader(pool());
finishSpill();
}

// Releases the unused memory reservation after procesing input.
Expand Down Expand Up @@ -170,24 +165,11 @@ void SortBuffer::spill() {
}
updateEstimatedOutputRowSize();

if (spiller_ == nullptr) {
spiller_ = std::make_unique<Spiller>(
Spiller::Type::kOrderBy,
data_.get(),
spillerStoreType_,
data_->keyTypes().size(),
sortCompareFlags_,
spillConfig_->getSpillDirPathCb,
spillConfig_->fileNamePrefix,
spillConfig_->writeBufferSize,
spillConfig_->compressionKind,
memory::spillMemoryPool(),
spillConfig_->executor);
VELOX_CHECK_EQ(spiller_->state().maxPartitions(), 1);
if (sortedRows_.empty()) {
spillInput();
} else {
spillOutput();
}

spiller_->spill();
data_->clear();
}

std::optional<uint64_t> SortBuffer::estimateOutputRowSize() const {
Expand Down Expand Up @@ -278,6 +260,52 @@ void SortBuffer::updateEstimatedOutputRowSize() {
}
}

void SortBuffer::spillInput() {
if (spiller_ == nullptr) {
VELOX_CHECK(!noMoreInput_);
spiller_ = std::make_unique<Spiller>(
Spiller::Type::kOrderByInput,
data_.get(),
spillerStoreType_,
data_->keyTypes().size(),
sortCompareFlags_,
spillConfig_->getSpillDirPathCb,
spillConfig_->fileNamePrefix,
spillConfig_->writeBufferSize,
spillConfig_->compressionKind,
memory::spillMemoryPool(),
spillConfig_->executor);
}
spiller_->spill();
data_->clear();
}

void SortBuffer::spillOutput() {
if (spiller_ != nullptr) {
// Already spilled.
return;
}

spiller_ = std::make_unique<Spiller>(
Spiller::Type::kOrderByOutput,
data_.get(),
spillerStoreType_,
spillConfig_->getSpillDirPathCb,
spillConfig_->fileNamePrefix,
spillConfig_->writeBufferSize,
spillConfig_->compressionKind,
memory::spillMemoryPool(),
spillConfig_->executor);
auto spillRows = std::vector<char*>(
sortedRows_.begin() + numOutputRows_, sortedRows_.end());
spiller_->spill(spillRows);
data_->clear();
sortedRows_.clear();
// Finish right after spilling as the output spiller only spills at most
// once.
finishSpill();
}

void SortBuffer::prepareOutput(uint32_t maxOutputRows) {
VELOX_CHECK_GT(maxOutputRows, 0);
VELOX_CHECK_GT(numInputRows_, numOutputRows_);
Expand Down Expand Up @@ -364,4 +392,10 @@ void SortBuffer::getOutputWithSpill() {
numOutputRows_ += output_->size();
}

void SortBuffer::finishSpill() {
VELOX_CHECK_NULL(spillMerger_);
auto spillPartition = spiller_->finishSpill();
spillMerger_ = spillPartition.createOrderedReader(pool());
}

} // namespace facebook::velox::exec
7 changes: 7 additions & 0 deletions velox/exec/SortBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ class SortBuffer {
void prepareOutput(uint32_t maxOutputRows);
void getOutputWithoutSpill();
void getOutputWithSpill();
// Spill during input stage.
void spillInput();
// Spill during output stage.
void spillOutput();
// Finish spill, and we shouldn't get any rows from non-spilled partition as
// there is only one hash partition for SortBuffer.
void finishSpill();

const RowTypePtr input_;
const std::vector<CompareFlags> sortCompareFlags_;
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/SortWindowBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ void SortWindowBuild::setupSpiller() {

spiller_ = std::make_unique<Spiller>(
// TODO Replace Spiller::Type::kOrderBy.
Spiller::Type::kOrderBy,
Spiller::Type::kOrderByInput,
data_.get(),
inputType_,
spillCompareFlags_.size(),
Expand Down
68 changes: 51 additions & 17 deletions velox/exec/Spiller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ Spiller::Spiller(
executor,
writeFileOptions) {
VELOX_CHECK(
type_ == Type::kOrderBy || type_ == Type::kAggregateInput,
type_ == Type::kOrderByInput || type_ == Type::kAggregateInput,
"Unexpected spiller type: {}",
typeName(type_));
VELOX_CHECK_EQ(state_.maxPartitions(), 1);
Expand Down Expand Up @@ -94,9 +94,8 @@ Spiller::Spiller(
pool,
executor,
writeFileOptions) {
VELOX_CHECK_EQ(
type,
Type::kAggregateOutput,
VELOX_CHECK(
type_ == Type::kAggregateOutput || type_ == Type::kOrderByOutput,
"Unexpected spiller type: {}",
typeName(type_));
VELOX_CHECK_EQ(state_.maxPartitions(), 1);
Expand Down Expand Up @@ -447,9 +446,11 @@ void Spiller::runSpill() {
VELOX_CHECK_EQ(numWritten, run.rows.size());
run.clear();
// When a sorted run ends, we start with a new file next time. For
// aggregation output spiller, we expect only one spill call to spill all
// the rows starting from the specified row offset.
if (needSort() || (type_ == Spiller::Type::kAggregateOutput)) {
// aggregation output / orderby output spiller, we expect only one spill
// call to spill all the rows starting from the specified row offset.
if (needSort() ||
(type_ == Spiller::Type::kAggregateOutput ||
type_ == Spiller::Type::kOrderByOutput)) {
state_.finishFile(partition);
}
}
Expand All @@ -467,7 +468,7 @@ void Spiller::updateSpillSortTime(uint64_t timeUs) {

bool Spiller::needSort() const {
return type_ != Type::kHashJoinProbe && type_ != Type::kHashJoinBuild &&
type_ != Type::kAggregateOutput;
type_ != Type::kAggregateOutput && type_ != Type::kOrderByOutput;
}

void Spiller::spill() {
Expand All @@ -483,25 +484,41 @@ void Spiller::spill(const RowContainerIterator* startRowIter) {
CHECK_NOT_FINALIZED();
VELOX_CHECK_NE(type_, Type::kHashJoinProbe);

// Marks all the partitions have been spilled as we don't support fine-grained
// spilling as for now.
for (auto partition = 0; partition < state_.maxPartitions(); ++partition) {
if (!state_.isPartitionSpilled(partition)) {
state_.setPartitionSpilled(partition);
}
}
markAllPartitionsSpilled();

fillSpillRuns(startRowIter);
runSpill();
checkEmptySpillRuns();
}

void Spiller::spill(std::vector<char*>& rows) {
CHECK_NOT_FINALIZED();
VELOX_CHECK_EQ(type_, Type::kOrderByOutput);
if (rows.empty()) {
return;
}

markAllPartitionsSpilled();

fillSpillRun(rows);
runSpill();
checkEmptySpillRuns();
}

void Spiller::checkEmptySpillRuns() const {
for (const auto& spillRun : spillRuns_) {
VELOX_CHECK(spillRun.rows.empty());
}
}

void Spiller::markAllPartitionsSpilled() {
for (auto partition = 0; partition < state_.maxPartitions(); ++partition) {
if (!state_.isPartitionSpilled(partition)) {
state_.setPartitionSpilled(partition);
}
}
}

void Spiller::spill(uint32_t partition, const RowVectorPtr& spillVector) {
CHECK_NOT_FINALIZED();
VELOX_CHECK(
Expand Down Expand Up @@ -598,6 +615,21 @@ void Spiller::fillSpillRuns(const RowContainerIterator* startRowIter) {
updateSpillFillTime(execTimeUs);
}

void Spiller::fillSpillRun(std::vector<char*>& rows) {
VELOX_CHECK_EQ(bits_.numPartitions(), 1);
checkEmptySpillRuns();
uint64_t execTimeUs{0};
{
MicrosecondTimer timer(&execTimeUs);
spillRuns_[0].rows =
SpillRows(rows.begin(), rows.end(), spillRuns_[0].rows.get_allocator());
for (const auto* row : rows) {
spillRuns_[0].numBytes += container_->rowSize(row);
}
}
updateSpillFillTime(execTimeUs);
}

std::string Spiller::toString() const {
return fmt::format(
"{}\t{}\tMAX_PARTITIONS:{}\tFINALIZED:{}",
Expand All @@ -610,8 +642,10 @@ std::string Spiller::toString() const {
// static
std::string Spiller::typeName(Type type) {
switch (type) {
case Type::kOrderBy:
return "ORDER_BY";
case Type::kOrderByInput:
return "ORDER_BY_INPUT";
case Type::kOrderByOutput:
return "ORDER_BY_OUTPUT";
case Type::kHashJoinBuild:
return "HASH_JOIN_BUILD";
case Type::kHashJoinProbe:
Expand Down
23 changes: 20 additions & 3 deletions velox/exec/Spiller.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,14 @@ class Spiller {
kHashJoinBuild = 2,
// Used for hash join probe.
kHashJoinProbe = 3,
// Used for order by.
kOrderBy = 4,
// Used for order by input processing stage.
kOrderByInput = 4,
// Used for order by output processing stage.
kOrderByOutput = 5,
// Number of spiller types.
kNumTypes = 5,
kNumTypes = 6,
};

static std::string typeName(Type);

using SpillRows = std::vector<char*, memory::StlAllocator<char*>>;
Expand Down Expand Up @@ -118,6 +121,12 @@ class Spiller {
/// The caller needs to erase them from the row container.
void spill(const RowContainerIterator& startRowIter);

/// Invoked to spill all the rows pointed by rows. This is used by
/// 'kOrderByOutput' spiller type to spill during the order by
/// output processing. Similarly, the spilled rows still stays in the row
/// container. The caller needs to erase them from the row container.
void spill(std::vector<char*>& rows);

/// Append 'spillVector' into the spill file of given 'partition'. It is now
/// only used by the spilling operator which doesn't need data sort, such as
/// hash join build and hash join probe.
Expand Down Expand Up @@ -274,11 +283,19 @@ class Spiller {

void checkEmptySpillRuns() const;

// Marks all the partitions have been spilled as we don't support
// fine-grained spilling as for now.
void markAllPartitionsSpilled();

// Prepares spill runs for the spillable data from all the hash partitions.
// If 'startRowIter' is not null, we prepare runs starting from the offset
// pointed by 'startRowIter'.
void fillSpillRuns(const RowContainerIterator* startRowIter = nullptr);

// Prepares spill run of a single partition for the spillable data from the
// rows.
void fillSpillRun(std::vector<char*>& rows);

// Writes out all the rows collected in spillRuns_.
void runSpill();

Expand Down
2 changes: 1 addition & 1 deletion velox/exec/TopNRowNumber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,7 @@ void TopNRowNumber::setupSpiller() {

spiller_ = std::make_unique<Spiller>(
// TODO Replace Spiller::Type::kOrderBy.
Spiller::Type::kOrderBy,
Spiller::Type::kOrderByInput,
data_.get(),
inputType_,
spillCompareFlags_.size(),
Expand Down
13 changes: 4 additions & 9 deletions velox/exec/tests/OrderByTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1082,15 +1082,10 @@ DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringOutputProcessing) {

if (enableSpilling) {
ASSERT_GT(reclaimableBytes, 0);
const auto usedMemoryBytes = op->pool()->currentBytes();
reclaimAndRestoreCapacity(
op,
folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_),
reclaimerStats_);
ASSERT_GT(reclaimerStats_.reclaimedBytes, 0);
reclaimerStats_.reset();
reclaimAndRestoreCapacity(op, reclaimableBytes, reclaimerStats_);
ASSERT_EQ(reclaimerStats_.reclaimedBytes, reclaimableBytes);
ASSERT_GT(reclaimerStats_.reclaimExecTimeUs, 0);
// No reclaim as the operator has started output processing.
ASSERT_EQ(usedMemoryBytes, op->pool()->currentBytes());
} else {
ASSERT_EQ(reclaimableBytes, 0);
VELOX_ASSERT_THROW(
Expand All @@ -1108,7 +1103,7 @@ DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringOutputProcessing) {
ASSERT_EQ(stats[0].operatorStats[1].spilledPartitions, 0);
OperatorTestBase::deleteTaskAndCheckSpillDirectory(task);
}
ASSERT_EQ(reclaimerStats_.numNonReclaimableAttempts, 1);
ASSERT_EQ(reclaimerStats_.numNonReclaimableAttempts, 0);
}

DEBUG_ONLY_TEST_F(OrderByTest, abortDuringOutputProcessing) {
Expand Down
Loading

0 comments on commit ddc3471

Please sign in to comment.