Skip to content

Commit

Permalink
use one spill
Browse files Browse the repository at this point in the history
  • Loading branch information
duanmeng committed Dec 7, 2023
1 parent fe8fb32 commit 12a7f3e
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 55 deletions.
12 changes: 5 additions & 7 deletions velox/exec/OrderBy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,11 @@ void OrderBy::reclaim(
memory::MemoryReclaimer::Stats& stats) {
VELOX_CHECK(canReclaim());
VELOX_CHECK(!nonReclaimableSection_);
if (noMoreInput_) {
sortBuffer_->spillOutput();
} else {
// TODO: support fine-grain disk spilling based on 'targetBytes' after
// having row container memory compaction support later.
sortBuffer_->spill();
}

// TODO: support fine-grain disk spilling based on 'targetBytes' after
// having row container memory compaction support later.
sortBuffer_->spill();

// Release the minimum reserved memory.
pool()->release();
}
Expand Down
87 changes: 43 additions & 44 deletions velox/exec/SortBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

#include "SortBuffer.h"
#include "velox/exec/MemoryReclaimer.h"
#include "velox/vector/BaseVector.h"

namespace facebook::velox::exec {

Expand Down Expand Up @@ -166,8 +165,36 @@ void SortBuffer::spill() {
}
updateEstimatedOutputRowSize();

if (spiller_ == nullptr) {
spiller_ = std::make_unique<Spiller>(
if (sortedRows_.empty()) {
// Input stage spill.
if (spiller_ == nullptr) {
spiller_ = makeSpiller(Spiller::Type::kOrderByInput);
}
spiller_->spill();
data_->clear();
} else {
// Output stage spill.
if (spiller_ != nullptr) {
// Already spilled.
return;
}

spiller_ = makeSpiller(Spiller::Type::kOrderByOutput);
auto spillRows = std::vector<char*>(
sortedRows_.begin() + numOutputRows_, sortedRows_.end());
spiller_->spill(spillRows);
data_->clear();
sortedRows_.clear();
// Finish right after spilling as the output spiller only spills at most
// once.
finishSpill();
}
}

std::unique_ptr<Spiller> SortBuffer::makeSpiller(Spiller::Type type) {
std::unique_ptr<Spiller> spiller;
if (type == Spiller::Type::kOrderByInput) {
spiller = std::make_unique<Spiller>(
Spiller::Type::kOrderByInput,
data_.get(),
spillerStoreType_,
Expand All @@ -179,48 +206,20 @@ void SortBuffer::spill() {
spillConfig_->compressionKind,
memory::spillMemoryPool(),
spillConfig_->executor);
VELOX_CHECK_EQ(spiller_->state().maxPartitions(), 1);
}

spiller_->spill();
data_->clear();
}

void SortBuffer::spillOutput() {
VELOX_CHECK_NOT_NULL(
spillConfig_, "spill config is null when SortBuffer spill is called");

VELOX_CHECK(noMoreInput_);

if (spiller_ != nullptr) {
// Already spilled.
return;
}

// Check if sort buffer is empty or not, and skip spill if it is empty.
if (data_->numRows() == 0) {
return;
} else {
spiller = std::make_unique<Spiller>(
Spiller::Type::kOrderByOutput,
data_.get(),
spillerStoreType_,
spillConfig_->getSpillDirPathCb,
spillConfig_->fileNamePrefix,
spillConfig_->writeBufferSize,
spillConfig_->compressionKind,
memory::spillMemoryPool(),
spillConfig_->executor);
}
updateEstimatedOutputRowSize();

spiller_ = std::make_unique<Spiller>(
Spiller::Type::kOrderByOutput,
data_.get(),
spillerStoreType_,
spillConfig_->getSpillDirPathCb,
spillConfig_->fileNamePrefix,
spillConfig_->writeBufferSize,
spillConfig_->compressionKind,
memory::spillMemoryPool(),
spillConfig_->executor);
VELOX_CHECK_EQ(spiller_->state().maxPartitions(), 1);

auto spillRows = std::vector<char*>(
sortedRows_.begin() + numOutputRows_, sortedRows_.end());
spiller_->spill(spillRows);
data_->clear();
sortedRows_.clear();
finishSpill();
VELOX_CHECK_EQ(spiller->state().maxPartitions(), 1);
return spiller;
}

std::optional<uint64_t> SortBuffer::estimateOutputRowSize() const {
Expand Down
6 changes: 2 additions & 4 deletions velox/exec/SortBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,6 @@ class SortBuffer {
/// Invoked to spill all the rows from 'data_'.
void spill();

/// Invoked to spill the rest of the rows from 'data_' during output
/// processing, it should actually spill at most once.
void spillOutput();

memory::MemoryPool* pool() const {
return pool_;
}
Expand Down Expand Up @@ -89,6 +85,8 @@ class SortBuffer {
// there is only one hash partition for SortBuffer.
void finishSpill();

std::unique_ptr<Spiller> makeSpiller(Spiller::Type type);

const RowTypePtr input_;
const std::vector<CompareFlags> sortCompareFlags_;
velox::memory::MemoryPool* const pool_;
Expand Down

0 comments on commit 12a7f3e

Please sign in to comment.