Skip to content

Commit

Permalink
Revert "Add OderBy output stage spill (facebookincubator#7759)"
Browse files Browse the repository at this point in the history
This reverts commit ddc3471.
  • Loading branch information
zhztheplayer committed Dec 11, 2023
1 parent 9052038 commit 0981fb6
Show file tree
Hide file tree
Showing 9 changed files with 79 additions and 252 deletions.
19 changes: 16 additions & 3 deletions velox/exec/OrderBy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,24 @@ void OrderBy::reclaim(
memory::MemoryReclaimer::Stats& stats) {
VELOX_CHECK(canReclaim());
VELOX_CHECK(!nonReclaimableSection_);
auto* driver = operatorCtx_->driver();

// TODO: support fine-grain disk spilling based on 'targetBytes' after
// having row container memory compaction support later.
sortBuffer_->spill();
// NOTE: an order by operator is reclaimable if it hasn't started output
// processing and is not under non-reclaimable execution section.
if (noMoreInput_) {
// TODO: reduce the log frequency if it is too verbose.
++stats.numNonReclaimableAttempts;
LOG(WARNING)
<< "Can't reclaim from order by operator which has started producing output: "
<< pool()->name()
<< ", usage: " << succinctBytes(pool()->currentBytes())
<< ", reservation: " << succinctBytes(pool()->reservedBytes());
return;
}

// TODO: support fine-grain disk spilling based on 'targetBytes' after having
// row container memory compaction support later.
sortBuffer_->spill();
// Release the minimum reserved memory.
pool()->release();
}
Expand Down
82 changes: 23 additions & 59 deletions velox/exec/SortBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "SortBuffer.h"
#include "velox/exec/MemoryReclaimer.h"
#include "velox/vector/BaseVector.h"

namespace facebook::velox::exec {

Expand Down Expand Up @@ -132,7 +133,11 @@ void SortBuffer::noMoreInput() {
// for now.
spill();

finishSpill();
// Finish spill, and we shouldn't get any rows from non-spilled partition as
// there is only one hash partition for SortBuffer.
VELOX_CHECK_NULL(spillMerger_);
auto spillPartition = spiller_->finishSpill();
spillMerger_ = spillPartition.createOrderedReader(pool());
}

// Releases the unused memory reservation after procesing input.
Expand Down Expand Up @@ -165,11 +170,24 @@ void SortBuffer::spill() {
}
updateEstimatedOutputRowSize();

if (sortedRows_.empty()) {
spillInput();
} else {
spillOutput();
if (spiller_ == nullptr) {
spiller_ = std::make_unique<Spiller>(
Spiller::Type::kOrderBy,
data_.get(),
spillerStoreType_,
data_->keyTypes().size(),
sortCompareFlags_,
spillConfig_->getSpillDirPathCb,
spillConfig_->fileNamePrefix,
spillConfig_->writeBufferSize,
spillConfig_->compressionKind,
memory::spillMemoryPool(),
spillConfig_->executor);
VELOX_CHECK_EQ(spiller_->state().maxPartitions(), 1);
}

spiller_->spill();
data_->clear();
}

std::optional<uint64_t> SortBuffer::estimateOutputRowSize() const {
Expand Down Expand Up @@ -260,54 +278,6 @@ void SortBuffer::updateEstimatedOutputRowSize() {
}
}

void SortBuffer::spillInput() {
if (spiller_ == nullptr) {
VELOX_CHECK(!noMoreInput_);
spiller_ = std::make_unique<Spiller>(
Spiller::Type::kOrderByInput,
data_.get(),
spillerStoreType_,
data_->keyTypes().size(),
sortCompareFlags_,
spillConfig_->getSpillDirPathCb,
spillConfig_->fileNamePrefix,
spillConfig_->writeBufferSize,
spillConfig_->compressionKind,
memory::spillMemoryPool(),
spillConfig_->executor,
spillConfig_->fileCreateConfig);
}
spiller_->spill();
data_->clear();
}

void SortBuffer::spillOutput() {
if (spiller_ != nullptr) {
// Already spilled.
return;
}

spiller_ = std::make_unique<Spiller>(
Spiller::Type::kOrderByOutput,
data_.get(),
spillerStoreType_,
spillConfig_->getSpillDirPathCb,
spillConfig_->fileNamePrefix,
spillConfig_->writeBufferSize,
spillConfig_->compressionKind,
memory::spillMemoryPool(),
spillConfig_->executor,
spillConfig_->fileCreateConfig);
auto spillRows = std::vector<char*>(
sortedRows_.begin() + numOutputRows_, sortedRows_.end());
spiller_->spill(spillRows);
data_->clear();
sortedRows_.clear();
// Finish right after spilling as the output spiller only spills at most
// once.
finishSpill();
}

void SortBuffer::prepareOutput(uint32_t maxOutputRows) {
VELOX_CHECK_GT(maxOutputRows, 0);
VELOX_CHECK_GT(numInputRows_, numOutputRows_);
Expand Down Expand Up @@ -394,10 +364,4 @@ void SortBuffer::getOutputWithSpill() {
numOutputRows_ += output_->size();
}

void SortBuffer::finishSpill() {
VELOX_CHECK_NULL(spillMerger_);
auto spillPartition = spiller_->finishSpill();
spillMerger_ = spillPartition.createOrderedReader(pool());
}

} // namespace facebook::velox::exec
7 changes: 0 additions & 7 deletions velox/exec/SortBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,6 @@ class SortBuffer {
void prepareOutput(uint32_t maxOutputRows);
void getOutputWithoutSpill();
void getOutputWithSpill();
// Spill during input stage.
void spillInput();
// Spill during output stage.
void spillOutput();
// Finish spill, and we shouldn't get any rows from non-spilled partition as
// there is only one hash partition for SortBuffer.
void finishSpill();

const RowTypePtr input_;
const std::vector<CompareFlags> sortCompareFlags_;
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/SortWindowBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ void SortWindowBuild::setupSpiller() {

spiller_ = std::make_unique<Spiller>(
// TODO Replace Spiller::Type::kOrderBy.
Spiller::Type::kOrderByInput,
Spiller::Type::kOrderBy,
data_.get(),
inputType_,
spillCompareFlags_.size(),
Expand Down
65 changes: 15 additions & 50 deletions velox/exec/Spiller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ Spiller::Spiller(
executor,
fileCreateConfig) {
VELOX_CHECK(
type_ == Type::kOrderByInput || type_ == Type::kAggregateInput,
type_ == Type::kOrderBy || type_ == Type::kAggregateInput,
"Unexpected spiller type: {}",
typeName(type_));
VELOX_CHECK_EQ(state_.maxPartitions(), 1);
Expand Down Expand Up @@ -95,7 +95,7 @@ Spiller::Spiller(
executor,
fileCreateConfig) {
VELOX_CHECK(
type_ == Type::kAggregateOutput || type_ == Type::kOrderByOutput,
type_ == Type::kAggregateOutput || type_ == Type::kOrderBy,
"Unexpected spiller type: {}",
typeName(type_));
VELOX_CHECK_EQ(state_.maxPartitions(), 1);
Expand Down Expand Up @@ -446,11 +446,9 @@ void Spiller::runSpill() {
VELOX_CHECK_EQ(numWritten, run.rows.size());
run.clear();
// When a sorted run ends, we start with a new file next time. For
// aggregation output / orderby output spiller, we expect only one spill
// call to spill all the rows starting from the specified row offset.
if (needSort() ||
(type_ == Spiller::Type::kAggregateOutput ||
type_ == Spiller::Type::kOrderByOutput)) {
// aggregation output spiller, we expect only one spill call to spill all
// the rows starting from the specified row offset.
if (needSort() || (type_ == Spiller::Type::kAggregateOutput)) {
state_.finishFile(partition);
}
}
Expand All @@ -468,7 +466,7 @@ void Spiller::updateSpillSortTime(uint64_t timeUs) {

bool Spiller::needSort() const {
return type_ != Type::kHashJoinProbe && type_ != Type::kHashJoinBuild &&
type_ != Type::kAggregateOutput && type_ != Type::kOrderByOutput;
type_ != Type::kAggregateOutput;
}

void Spiller::spill() {
Expand All @@ -484,23 +482,15 @@ void Spiller::spill(const RowContainerIterator* startRowIter) {
CHECK_NOT_FINALIZED();
VELOX_CHECK_NE(type_, Type::kHashJoinProbe);

markAllPartitionsSpilled();

fillSpillRuns(startRowIter);
runSpill();
checkEmptySpillRuns();
}

void Spiller::spill(std::vector<char*>& rows) {
CHECK_NOT_FINALIZED();
VELOX_CHECK_EQ(type_, Type::kOrderByOutput);
if (rows.empty()) {
return;
// Marks all the partitions have been spilled as we don't support fine-grained
// spilling as for now.
for (auto partition = 0; partition < state_.maxPartitions(); ++partition) {
if (!state_.isPartitionSpilled(partition)) {
state_.setPartitionSpilled(partition);
}
}

markAllPartitionsSpilled();

fillSpillRun(rows);
fillSpillRuns(startRowIter);
runSpill();
checkEmptySpillRuns();
}
Expand All @@ -511,14 +501,6 @@ void Spiller::checkEmptySpillRuns() const {
}
}

void Spiller::markAllPartitionsSpilled() {
for (auto partition = 0; partition < state_.maxPartitions(); ++partition) {
if (!state_.isPartitionSpilled(partition)) {
state_.setPartitionSpilled(partition);
}
}
}

void Spiller::spill(uint32_t partition, const RowVectorPtr& spillVector) {
CHECK_NOT_FINALIZED();
VELOX_CHECK(
Expand Down Expand Up @@ -615,21 +597,6 @@ void Spiller::fillSpillRuns(const RowContainerIterator* startRowIter) {
updateSpillFillTime(execTimeUs);
}

void Spiller::fillSpillRun(std::vector<char*>& rows) {
VELOX_CHECK_EQ(bits_.numPartitions(), 1);
checkEmptySpillRuns();
uint64_t execTimeUs{0};
{
MicrosecondTimer timer(&execTimeUs);
spillRuns_[0].rows =
SpillRows(rows.begin(), rows.end(), spillRuns_[0].rows.get_allocator());
for (const auto* row : rows) {
spillRuns_[0].numBytes += container_->rowSize(row);
}
}
updateSpillFillTime(execTimeUs);
}

std::string Spiller::toString() const {
return fmt::format(
"{}\t{}\tMAX_PARTITIONS:{}\tFINALIZED:{}",
Expand All @@ -642,10 +609,8 @@ std::string Spiller::toString() const {
// static
std::string Spiller::typeName(Type type) {
switch (type) {
case Type::kOrderByInput:
return "ORDER_BY_INPUT";
case Type::kOrderByOutput:
return "ORDER_BY_OUTPUT";
case Type::kOrderBy:
return "ORDER_BY";
case Type::kHashJoinBuild:
return "HASH_JOIN_BUILD";
case Type::kHashJoinProbe:
Expand Down
23 changes: 3 additions & 20 deletions velox/exec/Spiller.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,11 @@ class Spiller {
kHashJoinBuild = 2,
// Used for hash join probe.
kHashJoinProbe = 3,
// Used for order by input processing stage.
kOrderByInput = 4,
// Used for order by output processing stage.
kOrderByOutput = 5,
// Used for order by.
kOrderBy = 4,
// Number of spiller types.
kNumTypes = 6,
kNumTypes = 5,
};

static std::string typeName(Type);

using SpillRows = std::vector<char*, memory::StlAllocator<char*>>;
Expand Down Expand Up @@ -117,12 +114,6 @@ class Spiller {
/// The caller needs to erase them from the row container.
void spill(const RowContainerIterator& startRowIter);

/// Invoked to spill all the rows pointed by rows. This is used by
/// 'kOrderByOutput' spiller type to spill during the order by
/// output processing. Similarly, the spilled rows still stays in the row
/// container. The caller needs to erase them from the row container.
void spill(std::vector<char*>& rows);

/// Append 'spillVector' into the spill file of given 'partition'. It is now
/// only used by the spilling operator which doesn't need data sort, such as
/// hash join build and hash join probe.
Expand Down Expand Up @@ -279,19 +270,11 @@ class Spiller {

void checkEmptySpillRuns() const;

// Marks all the partitions have been spilled as we don't support
// fine-grained spilling as for now.
void markAllPartitionsSpilled();

// Prepares spill runs for the spillable data from all the hash partitions.
// If 'startRowIter' is not null, we prepare runs starting from the offset
// pointed by 'startRowIter'.
void fillSpillRuns(const RowContainerIterator* startRowIter = nullptr);

// Prepares spill run of a single partition for the spillable data from the
// rows.
void fillSpillRun(std::vector<char*>& rows);

// Writes out all the rows collected in spillRuns_.
void runSpill();

Expand Down
2 changes: 1 addition & 1 deletion velox/exec/TopNRowNumber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,7 @@ void TopNRowNumber::setupSpiller() {

spiller_ = std::make_unique<Spiller>(
// TODO Replace Spiller::Type::kOrderBy.
Spiller::Type::kOrderByInput,
Spiller::Type::kOrderBy,
data_.get(),
inputType_,
spillCompareFlags_.size(),
Expand Down
13 changes: 9 additions & 4 deletions velox/exec/tests/OrderByTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1082,10 +1082,15 @@ DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringOutputProcessing) {

if (enableSpilling) {
ASSERT_GT(reclaimableBytes, 0);
reclaimerStats_.reset();
reclaimAndRestoreCapacity(op, reclaimableBytes, reclaimerStats_);
ASSERT_EQ(reclaimerStats_.reclaimedBytes, reclaimableBytes);
const auto usedMemoryBytes = op->pool()->currentBytes();
reclaimAndRestoreCapacity(
op,
folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_),
reclaimerStats_);
ASSERT_GT(reclaimerStats_.reclaimedBytes, 0);
ASSERT_GT(reclaimerStats_.reclaimExecTimeUs, 0);
// No reclaim as the operator has started output processing.
ASSERT_EQ(usedMemoryBytes, op->pool()->currentBytes());
} else {
ASSERT_EQ(reclaimableBytes, 0);
VELOX_ASSERT_THROW(
Expand All @@ -1103,7 +1108,7 @@ DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringOutputProcessing) {
ASSERT_EQ(stats[0].operatorStats[1].spilledPartitions, 0);
OperatorTestBase::deleteTaskAndCheckSpillDirectory(task);
}
ASSERT_EQ(reclaimerStats_.numNonReclaimableAttempts, 0);
ASSERT_EQ(reclaimerStats_.numNonReclaimableAttempts, 1);
}

DEBUG_ONLY_TEST_F(OrderByTest, abortDuringOutputProcessing) {
Expand Down
Loading

0 comments on commit 0981fb6

Please sign in to comment.