Skip to content

Commit

Permalink
fixed bug: resize on left pipeline cause the order by result wrong
Browse files Browse the repository at this point in the history
  • Loading branch information
lgbo-ustc committed Apr 26, 2022
1 parent c7841d2 commit 0b0fa84
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 18 deletions.
13 changes: 11 additions & 2 deletions src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <Processors/QueryPlan/ReadNothingStep.h>
#include <Processors/QueryPlan/ResizeStreamsStep.h>
#include <Processors/QueryPlan/RollupStep.h>
#include <Processors/QueryPlan/SettingQuotaAndLimitsStep.h>
#include <Processors/QueryPlan/TotalsHavingStep.h>
Expand Down Expand Up @@ -1268,13 +1269,21 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P

if (!joined_plan)
throw Exception(ErrorCodes::LOGICAL_ERROR, "There is no joined plan for query");
if (expressions.join->supportParallelJoin())
{
joined_plan->addStep(std::make_unique<ResizeStreamsStep>(joined_plan->getCurrentDataStream(), max_streams));

}
// If optimize_read_in_order = true, do not change the left pipeline's stream size.
// otherwise will make the result wrong for order by.
if (!analysis_result.optimize_read_in_order)
query_plan.addStep(std::make_unique<ResizeStreamsStep>(query_plan.getCurrentDataStream(), max_streams));

QueryPlanStepPtr join_step = std::make_unique<JoinStep>(
query_plan.getCurrentDataStream(),
joined_plan->getCurrentDataStream(),
expressions.join,
settings.max_block_size,
max_streams);
settings.max_block_size);

join_step->setStepDescription("JOIN");
std::vector<QueryPlanPtr> plans;
Expand Down
6 changes: 2 additions & 4 deletions src/Processors/QueryPlan/JoinStep.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@ JoinStep::JoinStep(
const DataStream & left_stream_,
const DataStream & right_stream_,
JoinPtr join_,
size_t max_block_size_,
size_t max_streams_)
size_t max_block_size_)
: join(std::move(join_))
, max_block_size(max_block_size_)
, max_streams(max_streams_)
{
input_streams = {left_stream_, right_stream_};
output_stream = DataStream
Expand All @@ -33,7 +31,7 @@ QueryPipelineBuilderPtr JoinStep::updatePipeline(QueryPipelineBuilders pipelines
if (pipelines.size() != 2)
throw Exception(ErrorCodes::LOGICAL_ERROR, "JoinStep expect two input steps");

return QueryPipelineBuilder::joinPipelines(std::move(pipelines[0]), std::move(pipelines[1]), join, max_block_size, &processors, max_streams);
return QueryPipelineBuilder::joinPipelines(std::move(pipelines[0]), std::move(pipelines[1]), join, max_block_size, &processors);
}

void JoinStep::describePipeline(FormatSettings & settings) const
Expand Down
4 changes: 1 addition & 3 deletions src/Processors/QueryPlan/JoinStep.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ class JoinStep : public IQueryPlanStep
const DataStream & left_stream_,
const DataStream & right_stream_,
JoinPtr join_,
size_t max_block_size_,
size_t max_streams_ = 0);
size_t max_block_size_);

String getName() const override { return "Join"; }

Expand All @@ -30,7 +29,6 @@ class JoinStep : public IQueryPlanStep
private:
JoinPtr join;
size_t max_block_size;
size_t max_streams;
Processors processors;
};

Expand Down
31 changes: 31 additions & 0 deletions src/Processors/QueryPlan/ResizeStreamsStep.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#include <Processors/QueryPlan/ResizeStreamsStep.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
namespace DB
{
static ITransformingStep::Traits getTraits()
{
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = false,
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = false,
},
{
.preserves_number_of_rows = false,
}
};
}

ResizeStreamsStep::ResizeStreamsStep(const DataStream & input_stream_, size_t pipeline_streams_)
: ITransformingStep(input_stream_, input_stream_.header, getTraits())
, pipeline_streams(pipeline_streams_)
{
}

void ResizeStreamsStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
pipeline.resize(pipeline_streams);
}
}
15 changes: 15 additions & 0 deletions src/Processors/QueryPlan/ResizeStreamsStep.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#include <Processors/QueryPlan/IQueryPlanStep.h>
#include <Processors/QueryPlan/ITransformingStep.h>

namespace DB
{
class ResizeStreamsStep : public ITransformingStep
{
public:
explicit ResizeStreamsStep(const DataStream & input_stream_, size_t pipeline_streams_);
String getName() const override { return "ResizeStreamsStep"; }
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
private:
size_t pipeline_streams;
};
}
9 changes: 2 additions & 7 deletions src/QueryPipeline/QueryPipelineBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,7 @@ std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelines(
std::unique_ptr<QueryPipelineBuilder> right,
JoinPtr join,
size_t max_block_size,
Processors * collected_processors,
size_t max_streams)
Processors * collected_processors)
{
left->checkInitializedAndNotCompleted();
right->checkInitializedAndNotCompleted();
Expand Down Expand Up @@ -346,14 +345,10 @@ std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelines(
/// ╞> FillingJoin ─> Resize ╣ ╞> Joining ─> (totals)
/// (totals) ─────────┘ ╙─────┘

// In some cases, left's streams is too smaller then max_streams. Keep it same as max_streams
// to make full use of cpu.
auto & num_streams = max_streams;
left->resize(num_streams);
auto num_streams = left->getNumStreams();

if (join->supportParallelJoin() && !right->hasTotals())
{
right->resize(num_streams);
auto concurrent_right_filling_transform = [&](OutputPortRawPtrs outports)
{
Processors processors;
Expand Down
3 changes: 1 addition & 2 deletions src/QueryPipeline/QueryPipelineBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,7 @@ class QueryPipelineBuilder
std::unique_ptr<QueryPipelineBuilder> right,
JoinPtr join,
size_t max_block_size,
Processors * collected_processors = nullptr,
size_t max_streams = 0);
Processors * collected_processors = nullptr);

/// Add other pipeline and execute it before current one.
/// Pipeline must have empty header, it should not generate any chunk.
Expand Down

0 comments on commit 0b0fa84

Please sign in to comment.