From ddc34718836f53a4ae1bcdaf7ed212a29ae935ac Mon Sep 17 00:00:00 2001 From: duanmeng Date: Thu, 7 Dec 2023 12:48:32 -0800 Subject: [PATCH] Add OderBy output stage spill (#7759) Summary: Add a new type of spiller named `kOrderByOutput`. It is used to spill data during the output processing stage of `OrderBy` operator. The spiller uses a newly added API that supports spill data by row pointers. Pull Request resolved: https://github.com/facebookincubator/velox/pull/7759 Reviewed By: tanjialiang Differential Revision: D51947152 Pulled By: xiaoxmeng fbshipit-source-id: 10be66d8d4fa570b8a377d0ee7dac7bb18982841 --- velox/exec/OrderBy.cpp | 19 +---- velox/exec/SortBuffer.cpp | 80 +++++++++++++++------ velox/exec/SortBuffer.h | 7 ++ velox/exec/SortWindowBuild.cpp | 2 +- velox/exec/Spiller.cpp | 68 +++++++++++++----- velox/exec/Spiller.h | 23 +++++- velox/exec/TopNRowNumber.cpp | 2 +- velox/exec/tests/OrderByTest.cpp | 13 ++-- velox/exec/tests/SpillerTest.cpp | 118 ++++++++++++++++++++++++++++--- 9 files changed, 251 insertions(+), 81 deletions(-) diff --git a/velox/exec/OrderBy.cpp b/velox/exec/OrderBy.cpp index 37c5def34bf0..f78b4517e671 100644 --- a/velox/exec/OrderBy.cpp +++ b/velox/exec/OrderBy.cpp @@ -78,24 +78,11 @@ void OrderBy::reclaim( memory::MemoryReclaimer::Stats& stats) { VELOX_CHECK(canReclaim()); VELOX_CHECK(!nonReclaimableSection_); - auto* driver = operatorCtx_->driver(); - // NOTE: an order by operator is reclaimable if it hasn't started output - // processing and is not under non-reclaimable execution section. - if (noMoreInput_) { - // TODO: reduce the log frequency if it is too verbose. - ++stats.numNonReclaimableAttempts; - LOG(WARNING) - << "Can't reclaim from order by operator which has started producing output: " - << pool()->name() - << ", usage: " << succinctBytes(pool()->currentBytes()) - << ", reservation: " << succinctBytes(pool()->reservedBytes()); - return; - } - - // TODO: support fine-grain disk spilling based on 'targetBytes' after having - // row container memory compaction support later. + // TODO: support fine-grain disk spilling based on 'targetBytes' after + // having row container memory compaction support later. sortBuffer_->spill(); + // Release the minimum reserved memory. pool()->release(); } diff --git a/velox/exec/SortBuffer.cpp b/velox/exec/SortBuffer.cpp index 7e981d1a90e4..aeca6590b1d3 100644 --- a/velox/exec/SortBuffer.cpp +++ b/velox/exec/SortBuffer.cpp @@ -16,7 +16,6 @@ #include "SortBuffer.h" #include "velox/exec/MemoryReclaimer.h" -#include "velox/vector/BaseVector.h" namespace facebook::velox::exec { @@ -133,11 +132,7 @@ void SortBuffer::noMoreInput() { // for now. spill(); - // Finish spill, and we shouldn't get any rows from non-spilled partition as - // there is only one hash partition for SortBuffer. - VELOX_CHECK_NULL(spillMerger_); - auto spillPartition = spiller_->finishSpill(); - spillMerger_ = spillPartition.createOrderedReader(pool()); + finishSpill(); } // Releases the unused memory reservation after procesing input. @@ -170,24 +165,11 @@ void SortBuffer::spill() { } updateEstimatedOutputRowSize(); - if (spiller_ == nullptr) { - spiller_ = std::make_unique( - Spiller::Type::kOrderBy, - data_.get(), - spillerStoreType_, - data_->keyTypes().size(), - sortCompareFlags_, - spillConfig_->getSpillDirPathCb, - spillConfig_->fileNamePrefix, - spillConfig_->writeBufferSize, - spillConfig_->compressionKind, - memory::spillMemoryPool(), - spillConfig_->executor); - VELOX_CHECK_EQ(spiller_->state().maxPartitions(), 1); + if (sortedRows_.empty()) { + spillInput(); + } else { + spillOutput(); } - - spiller_->spill(); - data_->clear(); } std::optional SortBuffer::estimateOutputRowSize() const { @@ -278,6 +260,52 @@ void SortBuffer::updateEstimatedOutputRowSize() { } } +void SortBuffer::spillInput() { + if (spiller_ == nullptr) { + VELOX_CHECK(!noMoreInput_); + spiller_ = std::make_unique( + Spiller::Type::kOrderByInput, + data_.get(), + spillerStoreType_, + data_->keyTypes().size(), + sortCompareFlags_, + spillConfig_->getSpillDirPathCb, + spillConfig_->fileNamePrefix, + spillConfig_->writeBufferSize, + spillConfig_->compressionKind, + memory::spillMemoryPool(), + spillConfig_->executor); + } + spiller_->spill(); + data_->clear(); +} + +void SortBuffer::spillOutput() { + if (spiller_ != nullptr) { + // Already spilled. + return; + } + + spiller_ = std::make_unique( + Spiller::Type::kOrderByOutput, + data_.get(), + spillerStoreType_, + spillConfig_->getSpillDirPathCb, + spillConfig_->fileNamePrefix, + spillConfig_->writeBufferSize, + spillConfig_->compressionKind, + memory::spillMemoryPool(), + spillConfig_->executor); + auto spillRows = std::vector( + sortedRows_.begin() + numOutputRows_, sortedRows_.end()); + spiller_->spill(spillRows); + data_->clear(); + sortedRows_.clear(); + // Finish right after spilling as the output spiller only spills at most + // once. + finishSpill(); +} + void SortBuffer::prepareOutput(uint32_t maxOutputRows) { VELOX_CHECK_GT(maxOutputRows, 0); VELOX_CHECK_GT(numInputRows_, numOutputRows_); @@ -364,4 +392,10 @@ void SortBuffer::getOutputWithSpill() { numOutputRows_ += output_->size(); } +void SortBuffer::finishSpill() { + VELOX_CHECK_NULL(spillMerger_); + auto spillPartition = spiller_->finishSpill(); + spillMerger_ = spillPartition.createOrderedReader(pool()); +} + } // namespace facebook::velox::exec diff --git a/velox/exec/SortBuffer.h b/velox/exec/SortBuffer.h index 0fcd3817d25f..fa62460d203a 100644 --- a/velox/exec/SortBuffer.h +++ b/velox/exec/SortBuffer.h @@ -80,6 +80,13 @@ class SortBuffer { void prepareOutput(uint32_t maxOutputRows); void getOutputWithoutSpill(); void getOutputWithSpill(); + // Spill during input stage. + void spillInput(); + // Spill during output stage. + void spillOutput(); + // Finish spill, and we shouldn't get any rows from non-spilled partition as + // there is only one hash partition for SortBuffer. + void finishSpill(); const RowTypePtr input_; const std::vector sortCompareFlags_; diff --git a/velox/exec/SortWindowBuild.cpp b/velox/exec/SortWindowBuild.cpp index dd2fd71d4764..7cb9e8667f7e 100644 --- a/velox/exec/SortWindowBuild.cpp +++ b/velox/exec/SortWindowBuild.cpp @@ -140,7 +140,7 @@ void SortWindowBuild::setupSpiller() { spiller_ = std::make_unique( // TODO Replace Spiller::Type::kOrderBy. - Spiller::Type::kOrderBy, + Spiller::Type::kOrderByInput, data_.get(), inputType_, spillCompareFlags_.size(), diff --git a/velox/exec/Spiller.cpp b/velox/exec/Spiller.cpp index ca0b1582082b..3ffb6fd7fbd9 100644 --- a/velox/exec/Spiller.cpp +++ b/velox/exec/Spiller.cpp @@ -61,7 +61,7 @@ Spiller::Spiller( executor, writeFileOptions) { VELOX_CHECK( - type_ == Type::kOrderBy || type_ == Type::kAggregateInput, + type_ == Type::kOrderByInput || type_ == Type::kAggregateInput, "Unexpected spiller type: {}", typeName(type_)); VELOX_CHECK_EQ(state_.maxPartitions(), 1); @@ -94,9 +94,8 @@ Spiller::Spiller( pool, executor, writeFileOptions) { - VELOX_CHECK_EQ( - type, - Type::kAggregateOutput, + VELOX_CHECK( + type_ == Type::kAggregateOutput || type_ == Type::kOrderByOutput, "Unexpected spiller type: {}", typeName(type_)); VELOX_CHECK_EQ(state_.maxPartitions(), 1); @@ -447,9 +446,11 @@ void Spiller::runSpill() { VELOX_CHECK_EQ(numWritten, run.rows.size()); run.clear(); // When a sorted run ends, we start with a new file next time. For - // aggregation output spiller, we expect only one spill call to spill all - // the rows starting from the specified row offset. - if (needSort() || (type_ == Spiller::Type::kAggregateOutput)) { + // aggregation output / orderby output spiller, we expect only one spill + // call to spill all the rows starting from the specified row offset. + if (needSort() || + (type_ == Spiller::Type::kAggregateOutput || + type_ == Spiller::Type::kOrderByOutput)) { state_.finishFile(partition); } } @@ -467,7 +468,7 @@ void Spiller::updateSpillSortTime(uint64_t timeUs) { bool Spiller::needSort() const { return type_ != Type::kHashJoinProbe && type_ != Type::kHashJoinBuild && - type_ != Type::kAggregateOutput; + type_ != Type::kAggregateOutput && type_ != Type::kOrderByOutput; } void Spiller::spill() { @@ -483,25 +484,41 @@ void Spiller::spill(const RowContainerIterator* startRowIter) { CHECK_NOT_FINALIZED(); VELOX_CHECK_NE(type_, Type::kHashJoinProbe); - // Marks all the partitions have been spilled as we don't support fine-grained - // spilling as for now. - for (auto partition = 0; partition < state_.maxPartitions(); ++partition) { - if (!state_.isPartitionSpilled(partition)) { - state_.setPartitionSpilled(partition); - } - } + markAllPartitionsSpilled(); fillSpillRuns(startRowIter); runSpill(); checkEmptySpillRuns(); } +void Spiller::spill(std::vector& rows) { + CHECK_NOT_FINALIZED(); + VELOX_CHECK_EQ(type_, Type::kOrderByOutput); + if (rows.empty()) { + return; + } + + markAllPartitionsSpilled(); + + fillSpillRun(rows); + runSpill(); + checkEmptySpillRuns(); +} + void Spiller::checkEmptySpillRuns() const { for (const auto& spillRun : spillRuns_) { VELOX_CHECK(spillRun.rows.empty()); } } +void Spiller::markAllPartitionsSpilled() { + for (auto partition = 0; partition < state_.maxPartitions(); ++partition) { + if (!state_.isPartitionSpilled(partition)) { + state_.setPartitionSpilled(partition); + } + } +} + void Spiller::spill(uint32_t partition, const RowVectorPtr& spillVector) { CHECK_NOT_FINALIZED(); VELOX_CHECK( @@ -598,6 +615,21 @@ void Spiller::fillSpillRuns(const RowContainerIterator* startRowIter) { updateSpillFillTime(execTimeUs); } +void Spiller::fillSpillRun(std::vector& rows) { + VELOX_CHECK_EQ(bits_.numPartitions(), 1); + checkEmptySpillRuns(); + uint64_t execTimeUs{0}; + { + MicrosecondTimer timer(&execTimeUs); + spillRuns_[0].rows = + SpillRows(rows.begin(), rows.end(), spillRuns_[0].rows.get_allocator()); + for (const auto* row : rows) { + spillRuns_[0].numBytes += container_->rowSize(row); + } + } + updateSpillFillTime(execTimeUs); +} + std::string Spiller::toString() const { return fmt::format( "{}\t{}\tMAX_PARTITIONS:{}\tFINALIZED:{}", @@ -610,8 +642,10 @@ std::string Spiller::toString() const { // static std::string Spiller::typeName(Type type) { switch (type) { - case Type::kOrderBy: - return "ORDER_BY"; + case Type::kOrderByInput: + return "ORDER_BY_INPUT"; + case Type::kOrderByOutput: + return "ORDER_BY_OUTPUT"; case Type::kHashJoinBuild: return "HASH_JOIN_BUILD"; case Type::kHashJoinProbe: diff --git a/velox/exec/Spiller.h b/velox/exec/Spiller.h index 765950747a46..815361bf8033 100644 --- a/velox/exec/Spiller.h +++ b/velox/exec/Spiller.h @@ -35,11 +35,14 @@ class Spiller { kHashJoinBuild = 2, // Used for hash join probe. kHashJoinProbe = 3, - // Used for order by. - kOrderBy = 4, + // Used for order by input processing stage. + kOrderByInput = 4, + // Used for order by output processing stage. + kOrderByOutput = 5, // Number of spiller types. - kNumTypes = 5, + kNumTypes = 6, }; + static std::string typeName(Type); using SpillRows = std::vector>; @@ -118,6 +121,12 @@ class Spiller { /// The caller needs to erase them from the row container. void spill(const RowContainerIterator& startRowIter); + /// Invoked to spill all the rows pointed by rows. This is used by + /// 'kOrderByOutput' spiller type to spill during the order by + /// output processing. Similarly, the spilled rows still stays in the row + /// container. The caller needs to erase them from the row container. + void spill(std::vector& rows); + /// Append 'spillVector' into the spill file of given 'partition'. It is now /// only used by the spilling operator which doesn't need data sort, such as /// hash join build and hash join probe. @@ -274,11 +283,19 @@ class Spiller { void checkEmptySpillRuns() const; + // Marks all the partitions have been spilled as we don't support + // fine-grained spilling as for now. + void markAllPartitionsSpilled(); + // Prepares spill runs for the spillable data from all the hash partitions. // If 'startRowIter' is not null, we prepare runs starting from the offset // pointed by 'startRowIter'. void fillSpillRuns(const RowContainerIterator* startRowIter = nullptr); + // Prepares spill run of a single partition for the spillable data from the + // rows. + void fillSpillRun(std::vector& rows); + // Writes out all the rows collected in spillRuns_. void runSpill(); diff --git a/velox/exec/TopNRowNumber.cpp b/velox/exec/TopNRowNumber.cpp index 5bb8b4a9fb0d..5bb0791fbd2e 100644 --- a/velox/exec/TopNRowNumber.cpp +++ b/velox/exec/TopNRowNumber.cpp @@ -737,7 +737,7 @@ void TopNRowNumber::setupSpiller() { spiller_ = std::make_unique( // TODO Replace Spiller::Type::kOrderBy. - Spiller::Type::kOrderBy, + Spiller::Type::kOrderByInput, data_.get(), inputType_, spillCompareFlags_.size(), diff --git a/velox/exec/tests/OrderByTest.cpp b/velox/exec/tests/OrderByTest.cpp index a3c07e45cfda..205eaa8bd4d2 100644 --- a/velox/exec/tests/OrderByTest.cpp +++ b/velox/exec/tests/OrderByTest.cpp @@ -1082,15 +1082,10 @@ DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringOutputProcessing) { if (enableSpilling) { ASSERT_GT(reclaimableBytes, 0); - const auto usedMemoryBytes = op->pool()->currentBytes(); - reclaimAndRestoreCapacity( - op, - folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_), - reclaimerStats_); - ASSERT_GT(reclaimerStats_.reclaimedBytes, 0); + reclaimerStats_.reset(); + reclaimAndRestoreCapacity(op, reclaimableBytes, reclaimerStats_); + ASSERT_EQ(reclaimerStats_.reclaimedBytes, reclaimableBytes); ASSERT_GT(reclaimerStats_.reclaimExecTimeUs, 0); - // No reclaim as the operator has started output processing. - ASSERT_EQ(usedMemoryBytes, op->pool()->currentBytes()); } else { ASSERT_EQ(reclaimableBytes, 0); VELOX_ASSERT_THROW( @@ -1108,7 +1103,7 @@ DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringOutputProcessing) { ASSERT_EQ(stats[0].operatorStats[1].spilledPartitions, 0); OperatorTestBase::deleteTaskAndCheckSpillDirectory(task); } - ASSERT_EQ(reclaimerStats_.numNonReclaimableAttempts, 1); + ASSERT_EQ(reclaimerStats_.numNonReclaimableAttempts, 0); } DEBUG_ONLY_TEST_F(OrderByTest, abortDuringOutputProcessing) { diff --git a/velox/exec/tests/SpillerTest.cpp b/velox/exec/tests/SpillerTest.cpp index c99e9e6cf7a0..371cc6d3ce4a 100644 --- a/velox/exec/tests/SpillerTest.cpp +++ b/velox/exec/tests/SpillerTest.cpp @@ -120,7 +120,8 @@ class SpillerTest : public exec::test::RowContainerTestBase { compressionKind_(param.compressionKind), hashBits_( 0, - (type_ == Spiller::Type::kOrderBy || + (type_ == Spiller::Type::kOrderByInput || + type_ == Spiller::Type::kOrderByOutput || type_ == Spiller::Type::kAggregateOutput || type_ == Spiller::Type::kAggregateInput) ? 0 @@ -384,7 +385,7 @@ class SpillerTest : public exec::test::RowContainerTestBase { } } - void setupSpillContainer(RowTypePtr rowType, int32_t numKeys) { + void setupSpillContainer(const RowTypePtr& rowType, int32_t numKeys) { const auto& childTypes = rowType->children(); std::vector keys(childTypes.begin(), childTypes.begin() + numKeys); std::vector dependents; @@ -396,7 +397,7 @@ class SpillerTest : public exec::test::RowContainerTestBase { rowType_ = rowType; } - void writeSpillData(std::vector batches) { + void writeSpillData(const std::vector& batches) { vector_size_t numRows = 0; for (const auto& batch : batches) { numRows += batch->size(); @@ -450,7 +451,8 @@ class SpillerTest : public exec::test::RowContainerTestBase { // NOTE: for aggregation output type, we expect the merge read to produce // the output rows in the same order of the row insertion. So do need the // sort for testing. - if (type_ == Spiller::Type::kAggregateOutput) { + if (type_ == Spiller::Type::kAggregateOutput || + type_ == Spiller::Type::kOrderByOutput) { return; } for (auto& partition : partitions_) { @@ -490,7 +492,7 @@ class SpillerTest : public exec::test::RowContainerTestBase { pool_.get(), executor()); } else if ( - type_ == Spiller::Type::kOrderBy || + type_ == Spiller::Type::kOrderByInput || type_ == Spiller::Type::kAggregateInput) { // We spill 'data' in one partition in type of kOrderBy, otherwise in 4 // partitions. @@ -506,7 +508,9 @@ class SpillerTest : public exec::test::RowContainerTestBase { compressionKind_, pool_.get(), executor()); - } else if (type_ == Spiller::Type::kAggregateOutput) { + } else if ( + type_ == Spiller::Type::kAggregateOutput || + type_ == Spiller::Type::kOrderByOutput) { spiller_ = std::make_unique( type_, rowContainer_.get(), @@ -1044,7 +1048,9 @@ class NoHashJoin : public SpillerTest, static std::vector getTestParams() { return TestParamsBuilder{ .typesToExclude = - {Spiller::Type::kHashJoinProbe, Spiller::Type::kHashJoinBuild}} + {Spiller::Type::kHashJoinProbe, + Spiller::Type::kHashJoinBuild, + Spiller::Type::kOrderByOutput}} .getTestParams(); } }; @@ -1093,7 +1099,8 @@ TEST_P(NoHashJoin, error) { } TEST_P(AllTypes, nonSortedSpillFunctions) { - if (type_ == Spiller::Type::kOrderBy || + if (type_ == Spiller::Type::kOrderByInput || + type_ == Spiller::Type::kOrderByOutput || type_ == Spiller::Type::kAggregateInput || type_ == Spiller::Type::kAggregateOutput) { setupSpillData(rowType_, numKeys_, 1'000, 1, nullptr, {}); @@ -1113,6 +1120,7 @@ TEST_P(AllTypes, nonSortedSpillFunctions) { verifySortedSpillData(spillPartitionSet.begin()->second.get()); return; } + testNonSortedSpill(1, 1000, 1, 1); testNonSortedSpill(1, 1000, 10, 1); testNonSortedSpill(1, 1000, 1, 1'000'000'000); @@ -1136,7 +1144,8 @@ class HashJoinBuildOnly : public SpillerTest, {Spiller::Type::kAggregateInput, Spiller::Type::kAggregateOutput, Spiller::Type::kHashJoinProbe, - Spiller::Type::kOrderBy}} + Spiller::Type::kOrderByInput, + Spiller::Type::kOrderByOutput}} .getTestParams(); } }; @@ -1227,7 +1236,8 @@ class AggregationOutputOnly : public SpillerTest, {Spiller::Type::kAggregateInput, Spiller::Type::kHashJoinBuild, Spiller::Type::kHashJoinProbe, - Spiller::Type::kOrderBy}} + Spiller::Type::kOrderByInput, + Spiller::Type::kOrderByOutput}} .getTestParams(); } }; @@ -1268,7 +1278,6 @@ TEST_P(AggregationOutputOnly, basic) { ASSERT_EQ(rowContainer_->numRows(), numRows); rowContainer_->clear(); - rowContainer_->clear(); auto spillPartition = spiller_->finishSpill(); ASSERT_TRUE(spiller_->finalized()); @@ -1300,6 +1309,88 @@ TEST_P(AggregationOutputOnly, basic) { } } +class OrderByOutputOnly : public SpillerTest, + public testing::WithParamInterface { + public: + OrderByOutputOnly() : SpillerTest(GetParam()) {} + + static std::vector getTestParams() { + return TestParamsBuilder{ + .typesToExclude = + {Spiller::Type::kAggregateInput, + Spiller::Type::kAggregateOutput, + Spiller::Type::kHashJoinBuild, + Spiller::Type::kHashJoinProbe, + Spiller::Type::kOrderByInput}} + .getTestParams(); + } +}; + +TEST_P(OrderByOutputOnly, basic) { + const int numRows = 5'000; + struct { + int numSpillRows; + + std::string debugString() const { + return fmt::format("numSpillRows {}", numSpillRows); + } + } testSettings[] = {{0}, {1000}, {5000}, {5000 - 1}, {5000 + 1}, {50000 * 2}}; + + for (const auto& testData : testSettings) { + SCOPED_TRACE(testData.debugString()); + + setupSpillData(rowType_, numKeys_, numRows, 0); + sortSpillData(); + // NOTE: target file size is ignored by aggregation output spiller type. + setupSpiller(0, 1'000'000, 0, false); + RowContainerIterator rowIter; + std::vector rows(numRows); + int numListedRows{0}; + numListedRows = + rowContainer_->listRows(&rowIter, testData.numSpillRows, rows.data()); + ASSERT_LE(numListedRows, numRows); + { + RowVectorPtr dummy; + VELOX_ASSERT_THROW( + spiller_->spill(0, dummy), + "Unexpected spiller type: ORDER_BY_OUTPUT"); + } + auto spillRows = + std::vector(rows.begin(), rows.begin() + numListedRows); + spiller_->spill(spillRows); + ASSERT_EQ(rowContainer_->numRows(), numRows); + rowContainer_->clear(); + + rowContainer_->clear(); + auto spillPartition = spiller_->finishSpill(); + ASSERT_TRUE(spiller_->finalized()); + + const int expectedNumSpilledRows = numListedRows; + auto merge = spillPartition.createOrderedReader(pool()); + if (expectedNumSpilledRows == 0) { + ASSERT_TRUE(merge == nullptr); + } else { + for (auto i = 0; i < expectedNumSpilledRows; ++i) { + auto* stream = merge->next(); + ASSERT_TRUE(stream != nullptr); + ASSERT_TRUE(rowVector_->equalValueAt( + &stream->current(), partitions_[0][i], stream->currentIndex())); + stream->pop(); + } + } + + const auto stats = spiller_->stats(); + if (expectedNumSpilledRows == 0) { + ASSERT_EQ(stats.spilledFiles, 0) << stats.toString(); + ASSERT_EQ(stats.spilledRows, 0) << stats.toString(); + } else { + ASSERT_EQ(stats.spilledFiles, 1) << stats.toString(); + ASSERT_EQ(stats.spilledRows, expectedNumSpilledRows) << stats.toString(); + } + ASSERT_EQ(stats.spillSortTimeUs, 0); + } +} + VELOX_INSTANTIATE_TEST_SUITE_P( SpillerTest, AllTypes, @@ -1319,3 +1410,8 @@ VELOX_INSTANTIATE_TEST_SUITE_P( SpillerTest, AggregationOutputOnly, testing::ValuesIn(AggregationOutputOnly::getTestParams())); + +VELOX_INSTANTIATE_TEST_SUITE_P( + SpillerTest, + OrderByOutputOnly, + testing::ValuesIn(OrderByOutputOnly::getTestParams()));