From 631499f21109ddaf4887991a20166338ceed7299 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Mon, 13 Feb 2023 19:30:00 +0800 Subject: [PATCH 1/3] This is an automated cherry-pick of #6796 Signed-off-by: ti-chi-bot --- ...neratedColumnPlaceholderBlockInputStream.h | 97 ++++++++++ .../Coprocessor/DAGQueryBlockInterpreter.cpp | 2 + .../Coprocessor/DAGStorageInterpreter.cpp | 12 ++ .../Flash/Coprocessor/DAGStorageInterpreter.h | 2 + .../Flash/Coprocessor/InterpreterUtils.cpp | 74 ++++++++ dbms/src/Flash/Coprocessor/InterpreterUtils.h | 20 +++ .../Planner/Plans/PhysicalMockTableScan.cpp | 166 ++++++++++++++++++ dbms/src/Storages/Transaction/TiDB.h | 3 +- .../fullstack-test/expr/generated_index.test | 147 ++++++++++++++++ 9 files changed, 522 insertions(+), 1 deletion(-) create mode 100644 dbms/src/DataStreams/GeneratedColumnPlaceholderBlockInputStream.h create mode 100644 dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp create mode 100644 tests/fullstack-test/expr/generated_index.test diff --git a/dbms/src/DataStreams/GeneratedColumnPlaceholderBlockInputStream.h b/dbms/src/DataStreams/GeneratedColumnPlaceholderBlockInputStream.h new file mode 100644 index 00000000000..f220f1377d4 --- /dev/null +++ b/dbms/src/DataStreams/GeneratedColumnPlaceholderBlockInputStream.h @@ -0,0 +1,97 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +class GeneratedColumnPlaceholderBlockInputStream : public IProfilingBlockInputStream +{ +public: + GeneratedColumnPlaceholderBlockInputStream( + const BlockInputStreamPtr & input, + const std::vector> & generated_column_infos_, + const String & req_id_) + : generated_column_infos(generated_column_infos_) + , log(Logger::get(req_id_)) + { + children.push_back(input); + } + + String getName() const override { return NAME; } + Block getHeader() const override + { + Block block = children.back()->getHeader(); + insertColumns(block, /*insert_data=*/false); + return block; + } + + static String getColumnName(UInt64 col_index) + { + return "generated_column_" + std::to_string(col_index); + } + +protected: + void readPrefix() override + { + RUNTIME_CHECK(!generated_column_infos.empty()); + // Validation check. + for (size_t i = 1; i < generated_column_infos.size(); ++i) + { + RUNTIME_CHECK(std::get<0>(generated_column_infos[i]) > std::get<0>(generated_column_infos[i - 1])); + } + } + + Block readImpl() override + { + Block block = children.back()->read(); + insertColumns(block, /*insert_data=*/true); + return block; + } + +private: + void insertColumns(Block & block, bool insert_data) const + { + if (!block) + return; + + for (const auto & ele : generated_column_infos) + { + const auto & col_index = std::get<0>(ele); + const auto & col_name = std::get<1>(ele); + const auto & data_type = std::get<2>(ele); + ColumnPtr column = nullptr; + if (insert_data) + column = data_type->createColumnConstWithDefaultValue(block.rows()); + else + column = data_type->createColumn(); + block.insert(col_index, ColumnWithTypeAndName{column, data_type, col_name}); + } + } + + static constexpr auto NAME = "GeneratedColumnPlaceholder"; + const std::vector> generated_column_infos; + const LoggerPtr log; +}; + +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index a7d2aae92b7..d956fca4a84 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -178,6 +178,8 @@ void DAGQueryBlockInterpreter::handleMockTableScan(const TiDBTableScan & table_s analyzer = std::make_unique(std::move(names_and_types), context); pipeline.streams.insert(pipeline.streams.end(), mock_table_scan_streams.begin(), mock_table_scan_streams.end()); } + + // Ignore handling GeneratedColumnPlaceholderBlockInputStream for now, because we don't support generated column in test framework. } diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index 4c8adefebc3..9750e85a0a6 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -375,6 +376,8 @@ void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline) /// handle timezone/duration cast for local and remote table scan. executeCastAfterTableScan(remote_read_streams_start_index, pipeline); + /// handle generated column if necessary. + executeGeneratedColumnPlaceholder(remote_read_streams_start_index, generated_column_infos, log, pipeline); recordProfileStreams(pipeline, table_scan.getTableScanExecutorID()); /// handle pushed down filter for local and remote table scan. @@ -1043,6 +1046,15 @@ std::tuple> DAGStorageIn auto const & ci = table_scan.getColumns()[i]; ColumnID cid = ci.column_id(); + if (ci.hasGeneratedColumnFlag()) + { + LOG_DEBUG(log, "got column({}) with generated column flag", i); + const auto & data_type = getDataTypeByColumnInfoForComputingLayer(ci); + const auto & col_name = GeneratedColumnPlaceholderBlockInputStream::getColumnName(i); + generated_column_infos.push_back(std::make_tuple(i, col_name, data_type)); + source_columns_tmp.emplace_back(NameAndTypePair{col_name, data_type}); + continue; + } // Column ID -1 return the handle column String name; if (cid == TiDBPkColumnID) diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h index efe78c25918..f97ca2cce6f 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h @@ -141,6 +141,8 @@ class DAGStorageInterpreter ManageableStoragePtr storage_for_logical_table; Names required_columns; NamesAndTypes source_columns; + // For generated column, just need a placeholder, and TiDB will fill this column. + std::vector> generated_column_infos; }; } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp index d71f8b073d1..68d0f5d076e 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp +++ b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp @@ -14,6 +14,11 @@ #include #include +<<<<<<< HEAD +======= +#include +#include +>>>>>>> e84ed489e6 (add GeneratedColumnPlaceholderInputStream (#6796)) #include #include #include @@ -194,4 +199,73 @@ void executeCreatingSets( log->identifier()); } } +<<<<<<< HEAD +======= + +std::tuple buildPushDownFilter( + const FilterConditions & filter_conditions, + DAGExpressionAnalyzer & analyzer) +{ + assert(filter_conditions.hasValue()); + + ExpressionActionsChain chain; + analyzer.initChain(chain); + String filter_column_name = analyzer.appendWhere(chain, filter_conditions.conditions); + ExpressionActionsPtr before_where = chain.getLastActions(); + chain.addStep(); + + // remove useless tmp column and keep the schema of local streams and remote streams the same. + NamesWithAliases project_cols; + for (const auto & col : analyzer.getCurrentInputColumns()) + { + chain.getLastStep().required_output.push_back(col.name); + project_cols.emplace_back(col.name, col.name); + } + chain.getLastActions()->add(ExpressionAction::project(project_cols)); + ExpressionActionsPtr project_after_where = chain.getLastActions(); + chain.finalize(); + chain.clear(); + + return {before_where, filter_column_name, project_after_where}; +} + +void executePushedDownFilter( + size_t remote_read_streams_start_index, + const FilterConditions & filter_conditions, + DAGExpressionAnalyzer & analyzer, + LoggerPtr log, + DAGPipeline & pipeline) +{ + auto [before_where, filter_column_name, project_after_where] = ::DB::buildPushDownFilter(filter_conditions, analyzer); + + assert(remote_read_streams_start_index <= pipeline.streams.size()); + // for remote read, filter had been pushed down, don't need to execute again. + for (size_t i = 0; i < remote_read_streams_start_index; ++i) + { + auto & stream = pipeline.streams[i]; + stream = std::make_shared(stream, before_where, filter_column_name, log->identifier()); + stream->setExtraInfo("push down filter"); + // after filter, do project action to keep the schema of local streams and remote streams the same. + stream = std::make_shared(stream, project_after_where, log->identifier()); + stream->setExtraInfo("projection after push down filter"); + } +} + +void executeGeneratedColumnPlaceholder( + size_t remote_read_streams_start_index, + const std::vector> & generated_column_infos, + LoggerPtr log, + DAGPipeline & pipeline) +{ + if (generated_column_infos.empty()) + return; + assert(remote_read_streams_start_index <= pipeline.streams.size()); + for (size_t i = 0; i < remote_read_streams_start_index; ++i) + { + auto & stream = pipeline.streams[i]; + stream = std::make_shared(stream, generated_column_infos, log->identifier()); + stream->setExtraInfo("generated column placeholder above table scan"); + } +} +>>>>>>> e84ed489e6 (add GeneratedColumnPlaceholderInputStream (#6796)) } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/InterpreterUtils.h b/dbms/src/Flash/Coprocessor/InterpreterUtils.h index 87672e81dfa..1aca1d7d0dc 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterUtils.h +++ b/dbms/src/Flash/Coprocessor/InterpreterUtils.h @@ -66,4 +66,24 @@ void executeCreatingSets( const Context & context, size_t max_streams, const LoggerPtr & log); +<<<<<<< HEAD +======= + +std::tuple buildPushDownFilter( + const FilterConditions & filter_conditions, + DAGExpressionAnalyzer & analyzer); + +void executePushedDownFilter( + size_t remote_read_streams_start_index, + const FilterConditions & filter_conditions, + DAGExpressionAnalyzer & analyzer, + LoggerPtr log, + DAGPipeline & pipeline); + +void executeGeneratedColumnPlaceholder( + size_t remote_read_streams_start_index, + const std::vector> & generated_column_infos, + LoggerPtr log, + DAGPipeline & pipeline); +>>>>>>> e84ed489e6 (add GeneratedColumnPlaceholderInputStream (#6796)) } // namespace DB diff --git a/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp b/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp new file mode 100644 index 00000000000..1411dce0f70 --- /dev/null +++ b/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp @@ -0,0 +1,166 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +namespace +{ +std::pair mockSchemaAndStreams( + Context & context, + const String & executor_id, + const LoggerPtr & log, + const TiDBTableScan & table_scan) +{ + NamesAndTypes schema; + BlockInputStreams mock_streams; + auto & dag_context = *context.getDAGContext(); + size_t max_streams = getMockSourceStreamConcurrency(dag_context.initialize_concurrency, context.mockStorage()->getScanConcurrencyHint(table_scan.getLogicalTableID())); + assert(max_streams > 0); + + if (context.mockStorage()->useDeltaMerge()) + { + assert(context.mockStorage()->tableExistsForDeltaMerge(table_scan.getLogicalTableID())); + schema = context.mockStorage()->getNameAndTypesForDeltaMerge(table_scan.getLogicalTableID()); + mock_streams.emplace_back(context.mockStorage()->getStreamFromDeltaMerge(context, table_scan.getLogicalTableID())); + } + else + { + /// build from user input blocks. + assert(context.mockStorage()->tableExists(table_scan.getLogicalTableID())); + NamesAndTypes names_and_types; + std::vector> mock_table_scan_streams; + if (context.isMPPTest()) + { + std::tie(names_and_types, mock_table_scan_streams) = mockSourceStreamForMpp(context, max_streams, log, table_scan); + } + else + { + std::tie(names_and_types, mock_table_scan_streams) = mockSourceStream(context, max_streams, log, executor_id, table_scan.getLogicalTableID(), table_scan.getColumns()); + } + schema = std::move(names_and_types); + mock_streams.insert(mock_streams.end(), mock_table_scan_streams.begin(), mock_table_scan_streams.end()); + } + + assert(!schema.empty()); + assert(!mock_streams.empty()); + + // Ignore handling GeneratedColumnPlaceholderBlockInputStream for now, because we don't support generated column in test framework. + return {std::move(schema), std::move(mock_streams)}; +} +} // namespace + +PhysicalMockTableScan::PhysicalMockTableScan( + const String & executor_id_, + const NamesAndTypes & schema_, + const String & req_id, + const Block & sample_block_, + const BlockInputStreams & mock_streams_, + Int64 table_id_) + : PhysicalLeaf(executor_id_, PlanType::MockTableScan, schema_, req_id) + , sample_block(sample_block_) + , mock_streams(mock_streams_) + , table_id(table_id_) +{} + +PhysicalPlanNodePtr PhysicalMockTableScan::build( + Context & context, + const String & executor_id, + const LoggerPtr & log, + const TiDBTableScan & table_scan) +{ + assert(context.isTest()); + auto [schema, mock_streams] = mockSchemaAndStreams(context, executor_id, log, table_scan); + + auto physical_mock_table_scan = std::make_shared( + executor_id, + schema, + log->identifier(), + Block(schema), + mock_streams, + table_scan.getLogicalTableID()); + return physical_mock_table_scan; +} + +void PhysicalMockTableScan::buildBlockInputStreamImpl(DAGPipeline & pipeline, Context & /*context*/, size_t /*max_streams*/) +{ + assert(pipeline.streams.empty()); + pipeline.streams.insert(pipeline.streams.end(), mock_streams.begin(), mock_streams.end()); +} + +void PhysicalMockTableScan::buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & /*context*/, size_t /*concurrency*/) +{ + group_builder.init(mock_streams.size()); + size_t i = 0; + group_builder.transform([&](auto & builder) { + builder.setSourceOp(std::make_unique(group_builder.exec_status, mock_streams[i++])); + }); +} + +void PhysicalMockTableScan::finalize(const Names & parent_require) +{ + FinalizeHelper::checkSchemaContainsParentRequire(schema, parent_require); +} + +const Block & PhysicalMockTableScan::getSampleBlock() const +{ + return sample_block; +} + +void PhysicalMockTableScan::updateStreams(Context & context) +{ + mock_streams.clear(); + assert(context.mockStorage()->tableExistsForDeltaMerge(table_id)); + mock_streams.emplace_back(context.mockStorage()->getStreamFromDeltaMerge(context, table_id, &filter_conditions)); +} + +bool PhysicalMockTableScan::setFilterConditions(Context & context, const String & filter_executor_id, const tipb::Selection & selection) +{ + if (unlikely(hasFilterConditions())) + { + return false; + } + filter_conditions = FilterConditions::filterConditionsFrom(filter_executor_id, selection); + updateStreams(context); + return true; +} + +bool PhysicalMockTableScan::hasFilterConditions() const +{ + return filter_conditions.hasValue(); +} + +const String & PhysicalMockTableScan::getFilterConditionsId() const +{ + assert(hasFilterConditions()); + return filter_conditions.executor_id; +} + +Int64 PhysicalMockTableScan::getLogicalTableID() const +{ + return table_id; +} +} // namespace DB diff --git a/dbms/src/Storages/Transaction/TiDB.h b/dbms/src/Storages/Transaction/TiDB.h index 9bd78abeed3..8ee49a5c015 100644 --- a/dbms/src/Storages/Transaction/TiDB.h +++ b/dbms/src/Storages/Transaction/TiDB.h @@ -117,7 +117,8 @@ enum TP M(NoDefaultValue, (1 << 12)) \ M(OnUpdateNow, (1 << 13)) \ M(PartKey, (1 << 14)) \ - M(Num, (1 << 15)) + M(Num, (1 << 15)) \ + M(GeneratedColumn, (1 << 23)) enum ColumnFlag { diff --git a/tests/fullstack-test/expr/generated_index.test b/tests/fullstack-test/expr/generated_index.test new file mode 100644 index 00000000000..f09f2c3fcde --- /dev/null +++ b/tests/fullstack-test/expr/generated_index.test @@ -0,0 +1,147 @@ +# Copyright 2023 PingCAP, Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +mysql> drop table if exists test.t; +mysql> create table test.t(c1 varchar(100), c2 varchar(100)); +mysql> insert into test.t values('ABC', 'DEF'); +mysql> alter table test.t set tiflash replica 1; +func> wait_table test t +mysql> alter table test.t add index idx2((lower(c2))); + +mysql> select /*+ nth_plan(1) */ * from test.t where lower(test.t.c2) = 'def'; ++------+------+ +| c1 | c2 | ++------+------+ +| ABC | DEF | ++------+------+ +mysql> select /*+ nth_plan(2) */ * from test.t where lower(test.t.c2) = 'def'; ++------+------+ +| c1 | c2 | ++------+------+ +| ABC | DEF | ++------+------+ +mysql> select /*+ nth_plan(3) */ * from test.t where lower(test.t.c2) = 'def'; ++------+------+ +| c1 | c2 | ++------+------+ +| ABC | DEF | ++------+------+ +mysql> select /*+ nth_plan(4) */ * from test.t where lower(test.t.c2) = 'def'; ++------+------+ +| c1 | c2 | ++------+------+ +| ABC | DEF | ++------+------+ +mysql> select /*+ nth_plan(5) */ * from test.t where lower(test.t.c2) = 'def'; ++------+------+ +| c1 | c2 | ++------+------+ +| ABC | DEF | ++------+------+ +mysql> select /*+ nth_plan(6) */ * from test.t where lower(test.t.c2) = 'def'; ++------+------+ +| c1 | c2 | ++------+------+ +| ABC | DEF | ++------+------+ +mysql> select /*+ nth_plan(7) */ * from test.t where lower(test.t.c2) = 'def'; ++------+------+ +| c1 | c2 | ++------+------+ +| ABC | DEF | ++------+------+ +mysql> select /*+ nth_plan(8) */ * from test.t where lower(test.t.c2) = 'def'; ++------+------+ +| c1 | c2 | ++------+------+ +| ABC | DEF | ++------+------+ +mysql> select /*+ nth_plan(9) */ * from test.t where lower(test.t.c2) = 'def'; ++------+------+ +| c1 | c2 | ++------+------+ +| ABC | DEF | ++------+------+ +mysql> select /*+ nth_plan(10) */ * from test.t where lower(test.t.c2) = 'def'; ++------+------+ +| c1 | c2 | ++------+------+ +| ABC | DEF | ++------+------+ + +mysql> drop table if exists test.t; +mysql> create table test.t(id int, value int); +mysql> alter table test.t set tiflash replica 1; +func> wait_table test t +mysql> create unique index uk on test.t((tidb_shard(id)), id); +mysql> select /*+ nth_paln(1) */ max(value) from test.t; ++------------+ +| max(value) | ++------------+ +| NULL | ++------------+ +mysql> select /*+ nth_paln(2) */ max(value) from test.t; ++------------+ +| max(value) | ++------------+ +| NULL | ++------------+ +mysql> select /*+ nth_paln(3) */ max(value) from test.t; ++------------+ +| max(value) | ++------------+ +| NULL | ++------------+ +mysql> select /*+ nth_paln(4) */ max(value) from test.t; ++------------+ +| max(value) | ++------------+ +| NULL | ++------------+ +mysql> select /*+ nth_paln(5) */ max(value) from test.t; ++------------+ +| max(value) | ++------------+ +| NULL | ++------------+ +mysql> select /*+ nth_paln(6) */ max(value) from test.t; ++------------+ +| max(value) | ++------------+ +| NULL | ++------------+ +mysql> select /*+ nth_paln(7) */ max(value) from test.t; ++------------+ +| max(value) | ++------------+ +| NULL | ++------------+ +mysql> select /*+ nth_paln(8) */ max(value) from test.t; ++------------+ +| max(value) | ++------------+ +| NULL | ++------------+ +mysql> select /*+ nth_paln(9) */ max(value) from test.t; ++------------+ +| max(value) | ++------------+ +| NULL | ++------------+ +mysql> select /*+ nth_paln(10) */ max(value) from test.t; ++------------+ +| max(value) | ++------------+ +| NULL | ++------------+ From 7afdc53ff49015ff957aa692fed00c30b8fb3c6c Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Tue, 4 Apr 2023 10:15:31 +0800 Subject: [PATCH 2/3] fix conflicts Signed-off-by: guo-shaoge --- .../Flash/Coprocessor/InterpreterUtils.cpp | 56 ------ dbms/src/Flash/Coprocessor/InterpreterUtils.h | 14 -- .../Planner/Plans/PhysicalMockTableScan.cpp | 166 ------------------ 3 files changed, 236 deletions(-) delete mode 100644 dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp diff --git a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp index 68d0f5d076e..5ac1c5dc669 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp +++ b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp @@ -14,11 +14,7 @@ #include #include -<<<<<<< HEAD -======= -#include #include ->>>>>>> e84ed489e6 (add GeneratedColumnPlaceholderInputStream (#6796)) #include #include #include @@ -199,57 +195,6 @@ void executeCreatingSets( log->identifier()); } } -<<<<<<< HEAD -======= - -std::tuple buildPushDownFilter( - const FilterConditions & filter_conditions, - DAGExpressionAnalyzer & analyzer) -{ - assert(filter_conditions.hasValue()); - - ExpressionActionsChain chain; - analyzer.initChain(chain); - String filter_column_name = analyzer.appendWhere(chain, filter_conditions.conditions); - ExpressionActionsPtr before_where = chain.getLastActions(); - chain.addStep(); - - // remove useless tmp column and keep the schema of local streams and remote streams the same. - NamesWithAliases project_cols; - for (const auto & col : analyzer.getCurrentInputColumns()) - { - chain.getLastStep().required_output.push_back(col.name); - project_cols.emplace_back(col.name, col.name); - } - chain.getLastActions()->add(ExpressionAction::project(project_cols)); - ExpressionActionsPtr project_after_where = chain.getLastActions(); - chain.finalize(); - chain.clear(); - - return {before_where, filter_column_name, project_after_where}; -} - -void executePushedDownFilter( - size_t remote_read_streams_start_index, - const FilterConditions & filter_conditions, - DAGExpressionAnalyzer & analyzer, - LoggerPtr log, - DAGPipeline & pipeline) -{ - auto [before_where, filter_column_name, project_after_where] = ::DB::buildPushDownFilter(filter_conditions, analyzer); - - assert(remote_read_streams_start_index <= pipeline.streams.size()); - // for remote read, filter had been pushed down, don't need to execute again. - for (size_t i = 0; i < remote_read_streams_start_index; ++i) - { - auto & stream = pipeline.streams[i]; - stream = std::make_shared(stream, before_where, filter_column_name, log->identifier()); - stream->setExtraInfo("push down filter"); - // after filter, do project action to keep the schema of local streams and remote streams the same. - stream = std::make_shared(stream, project_after_where, log->identifier()); - stream->setExtraInfo("projection after push down filter"); - } -} void executeGeneratedColumnPlaceholder( size_t remote_read_streams_start_index, @@ -267,5 +212,4 @@ void executeGeneratedColumnPlaceholder( stream->setExtraInfo("generated column placeholder above table scan"); } } ->>>>>>> e84ed489e6 (add GeneratedColumnPlaceholderInputStream (#6796)) } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/InterpreterUtils.h b/dbms/src/Flash/Coprocessor/InterpreterUtils.h index 1aca1d7d0dc..0a3b392d95b 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterUtils.h +++ b/dbms/src/Flash/Coprocessor/InterpreterUtils.h @@ -66,24 +66,10 @@ void executeCreatingSets( const Context & context, size_t max_streams, const LoggerPtr & log); -<<<<<<< HEAD -======= - -std::tuple buildPushDownFilter( - const FilterConditions & filter_conditions, - DAGExpressionAnalyzer & analyzer); - -void executePushedDownFilter( - size_t remote_read_streams_start_index, - const FilterConditions & filter_conditions, - DAGExpressionAnalyzer & analyzer, - LoggerPtr log, - DAGPipeline & pipeline); void executeGeneratedColumnPlaceholder( size_t remote_read_streams_start_index, const std::vector> & generated_column_infos, LoggerPtr log, DAGPipeline & pipeline); ->>>>>>> e84ed489e6 (add GeneratedColumnPlaceholderInputStream (#6796)) } // namespace DB diff --git a/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp b/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp deleted file mode 100644 index 1411dce0f70..00000000000 --- a/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp +++ /dev/null @@ -1,166 +0,0 @@ -// Copyright 2023 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace DB -{ -namespace -{ -std::pair mockSchemaAndStreams( - Context & context, - const String & executor_id, - const LoggerPtr & log, - const TiDBTableScan & table_scan) -{ - NamesAndTypes schema; - BlockInputStreams mock_streams; - auto & dag_context = *context.getDAGContext(); - size_t max_streams = getMockSourceStreamConcurrency(dag_context.initialize_concurrency, context.mockStorage()->getScanConcurrencyHint(table_scan.getLogicalTableID())); - assert(max_streams > 0); - - if (context.mockStorage()->useDeltaMerge()) - { - assert(context.mockStorage()->tableExistsForDeltaMerge(table_scan.getLogicalTableID())); - schema = context.mockStorage()->getNameAndTypesForDeltaMerge(table_scan.getLogicalTableID()); - mock_streams.emplace_back(context.mockStorage()->getStreamFromDeltaMerge(context, table_scan.getLogicalTableID())); - } - else - { - /// build from user input blocks. - assert(context.mockStorage()->tableExists(table_scan.getLogicalTableID())); - NamesAndTypes names_and_types; - std::vector> mock_table_scan_streams; - if (context.isMPPTest()) - { - std::tie(names_and_types, mock_table_scan_streams) = mockSourceStreamForMpp(context, max_streams, log, table_scan); - } - else - { - std::tie(names_and_types, mock_table_scan_streams) = mockSourceStream(context, max_streams, log, executor_id, table_scan.getLogicalTableID(), table_scan.getColumns()); - } - schema = std::move(names_and_types); - mock_streams.insert(mock_streams.end(), mock_table_scan_streams.begin(), mock_table_scan_streams.end()); - } - - assert(!schema.empty()); - assert(!mock_streams.empty()); - - // Ignore handling GeneratedColumnPlaceholderBlockInputStream for now, because we don't support generated column in test framework. - return {std::move(schema), std::move(mock_streams)}; -} -} // namespace - -PhysicalMockTableScan::PhysicalMockTableScan( - const String & executor_id_, - const NamesAndTypes & schema_, - const String & req_id, - const Block & sample_block_, - const BlockInputStreams & mock_streams_, - Int64 table_id_) - : PhysicalLeaf(executor_id_, PlanType::MockTableScan, schema_, req_id) - , sample_block(sample_block_) - , mock_streams(mock_streams_) - , table_id(table_id_) -{} - -PhysicalPlanNodePtr PhysicalMockTableScan::build( - Context & context, - const String & executor_id, - const LoggerPtr & log, - const TiDBTableScan & table_scan) -{ - assert(context.isTest()); - auto [schema, mock_streams] = mockSchemaAndStreams(context, executor_id, log, table_scan); - - auto physical_mock_table_scan = std::make_shared( - executor_id, - schema, - log->identifier(), - Block(schema), - mock_streams, - table_scan.getLogicalTableID()); - return physical_mock_table_scan; -} - -void PhysicalMockTableScan::buildBlockInputStreamImpl(DAGPipeline & pipeline, Context & /*context*/, size_t /*max_streams*/) -{ - assert(pipeline.streams.empty()); - pipeline.streams.insert(pipeline.streams.end(), mock_streams.begin(), mock_streams.end()); -} - -void PhysicalMockTableScan::buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & /*context*/, size_t /*concurrency*/) -{ - group_builder.init(mock_streams.size()); - size_t i = 0; - group_builder.transform([&](auto & builder) { - builder.setSourceOp(std::make_unique(group_builder.exec_status, mock_streams[i++])); - }); -} - -void PhysicalMockTableScan::finalize(const Names & parent_require) -{ - FinalizeHelper::checkSchemaContainsParentRequire(schema, parent_require); -} - -const Block & PhysicalMockTableScan::getSampleBlock() const -{ - return sample_block; -} - -void PhysicalMockTableScan::updateStreams(Context & context) -{ - mock_streams.clear(); - assert(context.mockStorage()->tableExistsForDeltaMerge(table_id)); - mock_streams.emplace_back(context.mockStorage()->getStreamFromDeltaMerge(context, table_id, &filter_conditions)); -} - -bool PhysicalMockTableScan::setFilterConditions(Context & context, const String & filter_executor_id, const tipb::Selection & selection) -{ - if (unlikely(hasFilterConditions())) - { - return false; - } - filter_conditions = FilterConditions::filterConditionsFrom(filter_executor_id, selection); - updateStreams(context); - return true; -} - -bool PhysicalMockTableScan::hasFilterConditions() const -{ - return filter_conditions.hasValue(); -} - -const String & PhysicalMockTableScan::getFilterConditionsId() const -{ - assert(hasFilterConditions()); - return filter_conditions.executor_id; -} - -Int64 PhysicalMockTableScan::getLogicalTableID() const -{ - return table_id; -} -} // namespace DB From e92ed2aa756a82a374b7bd375f845c52d24fb0b5 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Tue, 4 Apr 2023 10:39:54 +0800 Subject: [PATCH 3/3] fix build Signed-off-by: guo-shaoge --- dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index 9750e85a0a6..d803bf8ddd1 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -1044,12 +1044,13 @@ std::tuple> DAGStorageIn for (Int32 i = 0; i < table_scan.getColumnSize(); ++i) { auto const & ci = table_scan.getColumns()[i]; + auto tidb_ci = TiDB::toTiDBColumnInfo(ci); ColumnID cid = ci.column_id(); - if (ci.hasGeneratedColumnFlag()) + if (tidb_ci.hasGeneratedColumnFlag()) { LOG_DEBUG(log, "got column({}) with generated column flag", i); - const auto & data_type = getDataTypeByColumnInfoForComputingLayer(ci); + const auto & data_type = getDataTypeByColumnInfoForComputingLayer(tidb_ci); const auto & col_name = GeneratedColumnPlaceholderBlockInputStream::getColumnName(i); generated_column_infos.push_back(std::make_tuple(i, col_name, data_type)); source_columns_tmp.emplace_back(NameAndTypePair{col_name, data_type});