diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 2874e9a8178e..deaac32e2757 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -54,6 +54,7 @@ #include #include #include +#include #include #include #include @@ -1268,13 +1269,21 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional

supportParallelJoin()) + { + joined_plan->addStep(std::make_unique(joined_plan->getCurrentDataStream(), max_streams)); + + } + // If optimize_read_in_order = true, do not change the left pipeline's stream size. + // otherwise will make the result wrong for order by. + if (!analysis_result.optimize_read_in_order) + query_plan.addStep(std::make_unique(query_plan.getCurrentDataStream(), max_streams)); QueryPlanStepPtr join_step = std::make_unique( query_plan.getCurrentDataStream(), joined_plan->getCurrentDataStream(), expressions.join, - settings.max_block_size, - max_streams); + settings.max_block_size); join_step->setStepDescription("JOIN"); std::vector plans; diff --git a/src/Processors/QueryPlan/JoinStep.cpp b/src/Processors/QueryPlan/JoinStep.cpp index 0170c3564590..494a2a6aa0e4 100644 --- a/src/Processors/QueryPlan/JoinStep.cpp +++ b/src/Processors/QueryPlan/JoinStep.cpp @@ -15,11 +15,9 @@ JoinStep::JoinStep( const DataStream & left_stream_, const DataStream & right_stream_, JoinPtr join_, - size_t max_block_size_, - size_t max_streams_) + size_t max_block_size_) : join(std::move(join_)) , max_block_size(max_block_size_) - , max_streams(max_streams_) { input_streams = {left_stream_, right_stream_}; output_stream = DataStream @@ -33,7 +31,7 @@ QueryPipelineBuilderPtr JoinStep::updatePipeline(QueryPipelineBuilders pipelines if (pipelines.size() != 2) throw Exception(ErrorCodes::LOGICAL_ERROR, "JoinStep expect two input steps"); - return QueryPipelineBuilder::joinPipelines(std::move(pipelines[0]), std::move(pipelines[1]), join, max_block_size, &processors, max_streams); + return QueryPipelineBuilder::joinPipelines(std::move(pipelines[0]), std::move(pipelines[1]), join, max_block_size, &processors); } void JoinStep::describePipeline(FormatSettings & settings) const diff --git a/src/Processors/QueryPlan/JoinStep.h b/src/Processors/QueryPlan/JoinStep.h index 0ae1f78594bb..71537f29a8ee 100644 --- a/src/Processors/QueryPlan/JoinStep.h +++ b/src/Processors/QueryPlan/JoinStep.h @@ -16,8 +16,7 @@ class JoinStep : public IQueryPlanStep const DataStream & left_stream_, const DataStream & right_stream_, JoinPtr join_, - size_t max_block_size_, - size_t max_streams_ = 0); + size_t max_block_size_); String getName() const override { return "Join"; } @@ -30,7 +29,6 @@ class JoinStep : public IQueryPlanStep private: JoinPtr join; size_t max_block_size; - size_t max_streams; Processors processors; }; diff --git a/src/Processors/QueryPlan/ResizeStreamsStep.cpp b/src/Processors/QueryPlan/ResizeStreamsStep.cpp new file mode 100644 index 000000000000..580d11fa5f81 --- /dev/null +++ b/src/Processors/QueryPlan/ResizeStreamsStep.cpp @@ -0,0 +1,31 @@ +#include +#include +namespace DB +{ +static ITransformingStep::Traits getTraits() +{ + return ITransformingStep::Traits + { + { + .preserves_distinct_columns = false, + .returns_single_stream = false, + .preserves_number_of_streams = true, + .preserves_sorting = false, + }, + { + .preserves_number_of_rows = false, + } + }; +} + +ResizeStreamsStep::ResizeStreamsStep(const DataStream & input_stream_, size_t pipeline_streams_) + : ITransformingStep(input_stream_, input_stream_.header, getTraits()) + , pipeline_streams(pipeline_streams_) +{ +} + +void ResizeStreamsStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) +{ + pipeline.resize(pipeline_streams); +} +} diff --git a/src/Processors/QueryPlan/ResizeStreamsStep.h b/src/Processors/QueryPlan/ResizeStreamsStep.h new file mode 100644 index 000000000000..f26dab366717 --- /dev/null +++ b/src/Processors/QueryPlan/ResizeStreamsStep.h @@ -0,0 +1,15 @@ +#include +#include + +namespace DB +{ +class ResizeStreamsStep : public ITransformingStep +{ +public: + explicit ResizeStreamsStep(const DataStream & input_stream_, size_t pipeline_streams_); + String getName() const override { return "ResizeStreamsStep"; } + void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override; +private: + size_t pipeline_streams; +}; +} diff --git a/src/QueryPipeline/QueryPipelineBuilder.cpp b/src/QueryPipeline/QueryPipelineBuilder.cpp index 7241548d0131..5feed7c55ea0 100644 --- a/src/QueryPipeline/QueryPipelineBuilder.cpp +++ b/src/QueryPipeline/QueryPipelineBuilder.cpp @@ -307,8 +307,7 @@ std::unique_ptr QueryPipelineBuilder::joinPipelines( std::unique_ptr right, JoinPtr join, size_t max_block_size, - Processors * collected_processors, - size_t max_streams) + Processors * collected_processors) { left->checkInitializedAndNotCompleted(); right->checkInitializedAndNotCompleted(); @@ -346,14 +345,10 @@ std::unique_ptr QueryPipelineBuilder::joinPipelines( /// ╞> FillingJoin ─> Resize ╣ ╞> Joining ─> (totals) /// (totals) ─────────┘ ╙─────┘ - // In some cases, left's streams is too smaller then max_streams. Keep it same as max_streams - // to make full use of cpu. - auto & num_streams = max_streams; - left->resize(num_streams); + auto num_streams = left->getNumStreams(); if (join->supportParallelJoin() && !right->hasTotals()) { - right->resize(num_streams); auto concurrent_right_filling_transform = [&](OutputPortRawPtrs outports) { Processors processors; diff --git a/src/QueryPipeline/QueryPipelineBuilder.h b/src/QueryPipeline/QueryPipelineBuilder.h index 5f483b86c1cb..ac84191cf346 100644 --- a/src/QueryPipeline/QueryPipelineBuilder.h +++ b/src/QueryPipeline/QueryPipelineBuilder.h @@ -101,8 +101,7 @@ class QueryPipelineBuilder std::unique_ptr right, JoinPtr join, size_t max_block_size, - Processors * collected_processors = nullptr, - size_t max_streams = 0); + Processors * collected_processors = nullptr); /// Add other pipeline and execute it before current one. /// Pipeline must have empty header, it should not generate any chunk.