diff --git a/velox/exec/OrderBy.cpp b/velox/exec/OrderBy.cpp index 72d4949de49d..f78b4517e671 100644 --- a/velox/exec/OrderBy.cpp +++ b/velox/exec/OrderBy.cpp @@ -78,13 +78,11 @@ void OrderBy::reclaim( memory::MemoryReclaimer::Stats& stats) { VELOX_CHECK(canReclaim()); VELOX_CHECK(!nonReclaimableSection_); - if (noMoreInput_) { - sortBuffer_->spillOutput(); - } else { - // TODO: support fine-grain disk spilling based on 'targetBytes' after - // having row container memory compaction support later. - sortBuffer_->spill(); - } + + // TODO: support fine-grain disk spilling based on 'targetBytes' after + // having row container memory compaction support later. + sortBuffer_->spill(); + // Release the minimum reserved memory. pool()->release(); } diff --git a/velox/exec/SortBuffer.cpp b/velox/exec/SortBuffer.cpp index 73a342a65807..a0263b9a9dab 100644 --- a/velox/exec/SortBuffer.cpp +++ b/velox/exec/SortBuffer.cpp @@ -16,7 +16,6 @@ #include "SortBuffer.h" #include "velox/exec/MemoryReclaimer.h" -#include "velox/vector/BaseVector.h" namespace facebook::velox::exec { @@ -166,8 +165,36 @@ void SortBuffer::spill() { } updateEstimatedOutputRowSize(); - if (spiller_ == nullptr) { - spiller_ = std::make_unique( + if (sortedRows_.empty()) { + // Input stage spill. + if (spiller_ == nullptr) { + spiller_ = makeSpiller(Spiller::Type::kOrderByInput); + } + spiller_->spill(); + data_->clear(); + } else { + // Output stage spill. + if (spiller_ != nullptr) { + // Already spilled. + return; + } + + spiller_ = makeSpiller(Spiller::Type::kOrderByOutput); + auto spillRows = std::vector( + sortedRows_.begin() + numOutputRows_, sortedRows_.end()); + spiller_->spill(spillRows); + data_->clear(); + sortedRows_.clear(); + // Finish right after spilling as the output spiller only spills at most + // once. + finishSpill(); + } +} + +std::unique_ptr SortBuffer::makeSpiller(Spiller::Type type) { + std::unique_ptr spiller; + if (type == Spiller::Type::kOrderByInput) { + spiller = std::make_unique( Spiller::Type::kOrderByInput, data_.get(), spillerStoreType_, @@ -179,48 +206,20 @@ void SortBuffer::spill() { spillConfig_->compressionKind, memory::spillMemoryPool(), spillConfig_->executor); - VELOX_CHECK_EQ(spiller_->state().maxPartitions(), 1); - } - - spiller_->spill(); - data_->clear(); -} - -void SortBuffer::spillOutput() { - VELOX_CHECK_NOT_NULL( - spillConfig_, "spill config is null when SortBuffer spill is called"); - - VELOX_CHECK(noMoreInput_); - - if (spiller_ != nullptr) { - // Already spilled. - return; - } - - // Check if sort buffer is empty or not, and skip spill if it is empty. - if (data_->numRows() == 0) { - return; + } else { + spiller = std::make_unique( + Spiller::Type::kOrderByOutput, + data_.get(), + spillerStoreType_, + spillConfig_->getSpillDirPathCb, + spillConfig_->fileNamePrefix, + spillConfig_->writeBufferSize, + spillConfig_->compressionKind, + memory::spillMemoryPool(), + spillConfig_->executor); } - updateEstimatedOutputRowSize(); - - spiller_ = std::make_unique( - Spiller::Type::kOrderByOutput, - data_.get(), - spillerStoreType_, - spillConfig_->getSpillDirPathCb, - spillConfig_->fileNamePrefix, - spillConfig_->writeBufferSize, - spillConfig_->compressionKind, - memory::spillMemoryPool(), - spillConfig_->executor); - VELOX_CHECK_EQ(spiller_->state().maxPartitions(), 1); - - auto spillRows = std::vector( - sortedRows_.begin() + numOutputRows_, sortedRows_.end()); - spiller_->spill(spillRows); - data_->clear(); - sortedRows_.clear(); - finishSpill(); + VELOX_CHECK_EQ(spiller->state().maxPartitions(), 1); + return spiller; } std::optional SortBuffer::estimateOutputRowSize() const { diff --git a/velox/exec/SortBuffer.h b/velox/exec/SortBuffer.h index 386022e1b328..2ed3bfbda48b 100644 --- a/velox/exec/SortBuffer.h +++ b/velox/exec/SortBuffer.h @@ -58,10 +58,6 @@ class SortBuffer { /// Invoked to spill all the rows from 'data_'. void spill(); - /// Invoked to spill the rest of the rows from 'data_' during output - /// processing, it should actually spill at most once. - void spillOutput(); - memory::MemoryPool* pool() const { return pool_; } @@ -89,6 +85,8 @@ class SortBuffer { // there is only one hash partition for SortBuffer. void finishSpill(); + std::unique_ptr makeSpiller(Spiller::Type type); + const RowTypePtr input_; const std::vector sortCompareFlags_; velox::memory::MemoryPool* const pool_;