From 9b3e36db03d455d42255948b2ff95602529bf2ad Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 29 Mar 2023 14:58:54 +0800 Subject: [PATCH] add GeneratedColumnPlaceholderInputStream (#6796) (#6812) close pingcap/tiflash#6801, ref pingcap/tidb#40663 --- ...neratedColumnPlaceholderBlockInputStream.h | 97 ++++++++++ .../Coprocessor/DAGQueryBlockInterpreter.cpp | 2 + .../Coprocessor/DAGStorageInterpreter.cpp | 13 ++ .../Flash/Coprocessor/DAGStorageInterpreter.h | 2 + .../Flash/Coprocessor/InterpreterUtils.cpp | 17 ++ dbms/src/Flash/Coprocessor/InterpreterUtils.h | 6 + .../Planner/Plans/PhysicalMockTableScan.cpp | 166 ++++++++++++++++++ dbms/src/Storages/Transaction/TiDB.cpp | 13 ++ dbms/src/Storages/Transaction/TiDB.h | 5 +- .../fullstack-test/expr/generated_index.test | 147 ++++++++++++++++ 10 files changed, 467 insertions(+), 1 deletion(-) create mode 100644 dbms/src/DataStreams/GeneratedColumnPlaceholderBlockInputStream.h create mode 100644 dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp create mode 100644 tests/fullstack-test/expr/generated_index.test diff --git a/dbms/src/DataStreams/GeneratedColumnPlaceholderBlockInputStream.h b/dbms/src/DataStreams/GeneratedColumnPlaceholderBlockInputStream.h new file mode 100644 index 00000000000..1adff01a328 --- /dev/null +++ b/dbms/src/DataStreams/GeneratedColumnPlaceholderBlockInputStream.h @@ -0,0 +1,97 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +class GeneratedColumnPlaceholderBlockInputStream : public IProfilingBlockInputStream +{ +public: + GeneratedColumnPlaceholderBlockInputStream( + const BlockInputStreamPtr & input, + const std::vector> & generated_column_infos_, + const String & req_id_) + : generated_column_infos(generated_column_infos_) + , log(Logger::get(req_id_)) + { + children.push_back(input); + } + + String getName() const override { return NAME; } + Block getHeader() const override + { + Block block = children.back()->getHeader(); + insertColumns(block, /*insert_data=*/false); + return block; + } + + static String getColumnName(UInt64 col_index) + { + return "generated_column_" + std::to_string(col_index); + } + +protected: + void readPrefix() override + { + RUNTIME_CHECK(!generated_column_infos.empty(), Exception, "generated_column_infos cannot be empty"); + // Validation check. + for (size_t i = 1; i < generated_column_infos.size(); ++i) + { + RUNTIME_CHECK(std::get<0>(generated_column_infos[i]) > std::get<0>(generated_column_infos[i - 1]), Exception, "generated column index should be ordered"); + } + } + + Block readImpl() override + { + Block block = children.back()->read(); + insertColumns(block, /*insert_data=*/true); + return block; + } + +private: + void insertColumns(Block & block, bool insert_data) const + { + if (!block) + return; + + for (const auto & ele : generated_column_infos) + { + const auto & col_index = std::get<0>(ele); + const auto & col_name = std::get<1>(ele); + const auto & data_type = std::get<2>(ele); + ColumnPtr column = nullptr; + if (insert_data) + column = data_type->createColumnConstWithDefaultValue(block.rows()); + else + column = data_type->createColumn(); + block.insert(col_index, ColumnWithTypeAndName{column, data_type, col_name}); + } + } + + static constexpr auto NAME = "GeneratedColumnPlaceholder"; + const std::vector> generated_column_infos; + const LoggerPtr log; +}; + +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index 8d91b8b23e9..0c386347e27 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -172,6 +172,8 @@ void DAGQueryBlockInterpreter::handleMockTableScan(const TiDBTableScan & table_s auto mock_table_scan_stream = std::make_shared(columns_with_type_and_name, context.getSettingsRef().max_block_size); pipeline.streams.emplace_back(mock_table_scan_stream); } + + // Ignore handling GeneratedColumnPlaceholderBlockInputStream for now, because we don't support generated column in test framework. } void DAGQueryBlockInterpreter::handleTableScan(const TiDBTableScan & table_scan, DAGPipeline & pipeline) diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index f5354994b44..24d4edc48b6 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -300,6 +301,8 @@ void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline) /// handle timezone/duration cast for local and remote table scan. executeCastAfterTableScan(remote_read_streams_start_index, pipeline); + /// handle generated column if necessary. + executeGeneratedColumnPlaceholder(remote_read_streams_start_index, generated_column_infos, log, pipeline); recordProfileStreams(pipeline, table_scan.getTableScanExecutorID()); /// handle pushed down filter for local and remote table scan. @@ -960,8 +963,18 @@ std::tuple> DAGStorageIn for (Int32 i = 0; i < table_scan.getColumnSize(); ++i) { auto const & ci = table_scan.getColumns()[i]; + auto tidb_ci = TiDB::toTiDBColumnInfo(ci); ColumnID cid = ci.column_id(); + if (tidb_ci.hasGeneratedColumnFlag()) + { + LOG_FMT_DEBUG(log, "got column({}) with generated column flag", i); + const auto & data_type = getDataTypeByColumnInfoForComputingLayer(tidb_ci); + const auto & col_name = GeneratedColumnPlaceholderBlockInputStream::getColumnName(i); + generated_column_infos.push_back(std::make_tuple(i, col_name, data_type)); + source_columns_tmp.emplace_back(NameAndTypePair{col_name, data_type}); + continue; + } // Column ID -1 return the handle column String name; if (cid == TiDBPkColumnID) diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h index d86274a1e22..782797accb7 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h @@ -127,6 +127,8 @@ class DAGStorageInterpreter ManageableStoragePtr storage_for_logical_table; Names required_columns; NamesAndTypes source_columns; + // For generated column, just need a placeholder, and TiDB will fill this column. + std::vector> generated_column_infos; }; } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp index 69060071997..a5b0739c6b2 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp +++ b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -93,4 +94,20 @@ ExpressionActionsPtr generateProjectExpressionActions( project->add(ExpressionAction::project(project_cols)); return project; } + +void executeGeneratedColumnPlaceholder( + size_t remote_read_streams_start_index, + const std::vector> & generated_column_infos, + LoggerPtr log, + DAGPipeline & pipeline) +{ + if (generated_column_infos.empty()) + return; + assert(remote_read_streams_start_index <= pipeline.streams.size()); + for (size_t i = 0; i < remote_read_streams_start_index; ++i) + { + auto & stream = pipeline.streams[i]; + stream = std::make_shared(stream, generated_column_infos, log->identifier()); + } +} } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/InterpreterUtils.h b/dbms/src/Flash/Coprocessor/InterpreterUtils.h index 91e6d483220..3f30288eee5 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterUtils.h +++ b/dbms/src/Flash/Coprocessor/InterpreterUtils.h @@ -43,4 +43,10 @@ ExpressionActionsPtr generateProjectExpressionActions( const BlockInputStreamPtr & stream, const Context & context, const NamesWithAliases & project_cols); + +void executeGeneratedColumnPlaceholder( + size_t remote_read_streams_start_index, + const std::vector> & generated_column_infos, + LoggerPtr log, + DAGPipeline & pipeline); } // namespace DB diff --git a/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp b/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp new file mode 100644 index 00000000000..1411dce0f70 --- /dev/null +++ b/dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp @@ -0,0 +1,166 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +namespace +{ +std::pair mockSchemaAndStreams( + Context & context, + const String & executor_id, + const LoggerPtr & log, + const TiDBTableScan & table_scan) +{ + NamesAndTypes schema; + BlockInputStreams mock_streams; + auto & dag_context = *context.getDAGContext(); + size_t max_streams = getMockSourceStreamConcurrency(dag_context.initialize_concurrency, context.mockStorage()->getScanConcurrencyHint(table_scan.getLogicalTableID())); + assert(max_streams > 0); + + if (context.mockStorage()->useDeltaMerge()) + { + assert(context.mockStorage()->tableExistsForDeltaMerge(table_scan.getLogicalTableID())); + schema = context.mockStorage()->getNameAndTypesForDeltaMerge(table_scan.getLogicalTableID()); + mock_streams.emplace_back(context.mockStorage()->getStreamFromDeltaMerge(context, table_scan.getLogicalTableID())); + } + else + { + /// build from user input blocks. + assert(context.mockStorage()->tableExists(table_scan.getLogicalTableID())); + NamesAndTypes names_and_types; + std::vector> mock_table_scan_streams; + if (context.isMPPTest()) + { + std::tie(names_and_types, mock_table_scan_streams) = mockSourceStreamForMpp(context, max_streams, log, table_scan); + } + else + { + std::tie(names_and_types, mock_table_scan_streams) = mockSourceStream(context, max_streams, log, executor_id, table_scan.getLogicalTableID(), table_scan.getColumns()); + } + schema = std::move(names_and_types); + mock_streams.insert(mock_streams.end(), mock_table_scan_streams.begin(), mock_table_scan_streams.end()); + } + + assert(!schema.empty()); + assert(!mock_streams.empty()); + + // Ignore handling GeneratedColumnPlaceholderBlockInputStream for now, because we don't support generated column in test framework. + return {std::move(schema), std::move(mock_streams)}; +} +} // namespace + +PhysicalMockTableScan::PhysicalMockTableScan( + const String & executor_id_, + const NamesAndTypes & schema_, + const String & req_id, + const Block & sample_block_, + const BlockInputStreams & mock_streams_, + Int64 table_id_) + : PhysicalLeaf(executor_id_, PlanType::MockTableScan, schema_, req_id) + , sample_block(sample_block_) + , mock_streams(mock_streams_) + , table_id(table_id_) +{} + +PhysicalPlanNodePtr PhysicalMockTableScan::build( + Context & context, + const String & executor_id, + const LoggerPtr & log, + const TiDBTableScan & table_scan) +{ + assert(context.isTest()); + auto [schema, mock_streams] = mockSchemaAndStreams(context, executor_id, log, table_scan); + + auto physical_mock_table_scan = std::make_shared( + executor_id, + schema, + log->identifier(), + Block(schema), + mock_streams, + table_scan.getLogicalTableID()); + return physical_mock_table_scan; +} + +void PhysicalMockTableScan::buildBlockInputStreamImpl(DAGPipeline & pipeline, Context & /*context*/, size_t /*max_streams*/) +{ + assert(pipeline.streams.empty()); + pipeline.streams.insert(pipeline.streams.end(), mock_streams.begin(), mock_streams.end()); +} + +void PhysicalMockTableScan::buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & /*context*/, size_t /*concurrency*/) +{ + group_builder.init(mock_streams.size()); + size_t i = 0; + group_builder.transform([&](auto & builder) { + builder.setSourceOp(std::make_unique(group_builder.exec_status, mock_streams[i++])); + }); +} + +void PhysicalMockTableScan::finalize(const Names & parent_require) +{ + FinalizeHelper::checkSchemaContainsParentRequire(schema, parent_require); +} + +const Block & PhysicalMockTableScan::getSampleBlock() const +{ + return sample_block; +} + +void PhysicalMockTableScan::updateStreams(Context & context) +{ + mock_streams.clear(); + assert(context.mockStorage()->tableExistsForDeltaMerge(table_id)); + mock_streams.emplace_back(context.mockStorage()->getStreamFromDeltaMerge(context, table_id, &filter_conditions)); +} + +bool PhysicalMockTableScan::setFilterConditions(Context & context, const String & filter_executor_id, const tipb::Selection & selection) +{ + if (unlikely(hasFilterConditions())) + { + return false; + } + filter_conditions = FilterConditions::filterConditionsFrom(filter_executor_id, selection); + updateStreams(context); + return true; +} + +bool PhysicalMockTableScan::hasFilterConditions() const +{ + return filter_conditions.hasValue(); +} + +const String & PhysicalMockTableScan::getFilterConditionsId() const +{ + assert(hasFilterConditions()); + return filter_conditions.executor_id; +} + +Int64 PhysicalMockTableScan::getLogicalTableID() const +{ + return table_id; +} +} // namespace DB diff --git a/dbms/src/Storages/Transaction/TiDB.cpp b/dbms/src/Storages/Transaction/TiDB.cpp index 732c72aa143..d08dd274553 100644 --- a/dbms/src/Storages/Transaction/TiDB.cpp +++ b/dbms/src/Storages/Transaction/TiDB.cpp @@ -1128,4 +1128,17 @@ ColumnInfo fieldTypeToColumnInfo(const tipb::FieldType & field_type) return ret; } +ColumnInfo toTiDBColumnInfo(const tipb::ColumnInfo & tipb_column_info) +{ + ColumnInfo tidb_column_info; + tidb_column_info.tp = static_cast(tipb_column_info.tp()); + tidb_column_info.id = tipb_column_info.column_id(); + tidb_column_info.flag = tipb_column_info.flag(); + tidb_column_info.flen = tipb_column_info.columnlen(); + tidb_column_info.decimal = tipb_column_info.decimal(); + for (int i = 0; i < tipb_column_info.elems_size(); ++i) + tidb_column_info.elems.emplace_back(tipb_column_info.elems(i), i + 1); + return tidb_column_info; +} + } // namespace TiDB diff --git a/dbms/src/Storages/Transaction/TiDB.h b/dbms/src/Storages/Transaction/TiDB.h index 4c28a614857..25839b2b051 100644 --- a/dbms/src/Storages/Transaction/TiDB.h +++ b/dbms/src/Storages/Transaction/TiDB.h @@ -20,6 +20,7 @@ #include #include #include +#include #include @@ -116,7 +117,8 @@ enum TP M(NoDefaultValue, (1 << 12)) \ M(OnUpdateNow, (1 << 13)) \ M(PartKey, (1 << 14)) \ - M(Num, (1 << 15)) + M(Num, (1 << 15)) \ + M(GeneratedColumn, (1 << 23)) enum ColumnFlag { @@ -412,5 +414,6 @@ String genJsonNull(); tipb::FieldType columnInfoToFieldType(const ColumnInfo & ci); ColumnInfo fieldTypeToColumnInfo(const tipb::FieldType & field_type); +ColumnInfo toTiDBColumnInfo(const tipb::ColumnInfo & tipb_column_info); } // namespace TiDB diff --git a/tests/fullstack-test/expr/generated_index.test b/tests/fullstack-test/expr/generated_index.test new file mode 100644 index 00000000000..f09f2c3fcde --- /dev/null +++ b/tests/fullstack-test/expr/generated_index.test @@ -0,0 +1,147 @@ +# Copyright 2023 PingCAP, Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +mysql> drop table if exists test.t; +mysql> create table test.t(c1 varchar(100), c2 varchar(100)); +mysql> insert into test.t values('ABC', 'DEF'); +mysql> alter table test.t set tiflash replica 1; +func> wait_table test t +mysql> alter table test.t add index idx2((lower(c2))); + +mysql> select /*+ nth_plan(1) */ * from test.t where lower(test.t.c2) = 'def'; ++------+------+ +| c1 | c2 | ++------+------+ +| ABC | DEF | ++------+------+ +mysql> select /*+ nth_plan(2) */ * from test.t where lower(test.t.c2) = 'def'; ++------+------+ +| c1 | c2 | ++------+------+ +| ABC | DEF | ++------+------+ +mysql> select /*+ nth_plan(3) */ * from test.t where lower(test.t.c2) = 'def'; ++------+------+ +| c1 | c2 | ++------+------+ +| ABC | DEF | ++------+------+ +mysql> select /*+ nth_plan(4) */ * from test.t where lower(test.t.c2) = 'def'; ++------+------+ +| c1 | c2 | ++------+------+ +| ABC | DEF | ++------+------+ +mysql> select /*+ nth_plan(5) */ * from test.t where lower(test.t.c2) = 'def'; ++------+------+ +| c1 | c2 | ++------+------+ +| ABC | DEF | ++------+------+ +mysql> select /*+ nth_plan(6) */ * from test.t where lower(test.t.c2) = 'def'; ++------+------+ +| c1 | c2 | ++------+------+ +| ABC | DEF | ++------+------+ +mysql> select /*+ nth_plan(7) */ * from test.t where lower(test.t.c2) = 'def'; ++------+------+ +| c1 | c2 | ++------+------+ +| ABC | DEF | ++------+------+ +mysql> select /*+ nth_plan(8) */ * from test.t where lower(test.t.c2) = 'def'; ++------+------+ +| c1 | c2 | ++------+------+ +| ABC | DEF | ++------+------+ +mysql> select /*+ nth_plan(9) */ * from test.t where lower(test.t.c2) = 'def'; ++------+------+ +| c1 | c2 | ++------+------+ +| ABC | DEF | ++------+------+ +mysql> select /*+ nth_plan(10) */ * from test.t where lower(test.t.c2) = 'def'; ++------+------+ +| c1 | c2 | ++------+------+ +| ABC | DEF | ++------+------+ + +mysql> drop table if exists test.t; +mysql> create table test.t(id int, value int); +mysql> alter table test.t set tiflash replica 1; +func> wait_table test t +mysql> create unique index uk on test.t((tidb_shard(id)), id); +mysql> select /*+ nth_paln(1) */ max(value) from test.t; ++------------+ +| max(value) | ++------------+ +| NULL | ++------------+ +mysql> select /*+ nth_paln(2) */ max(value) from test.t; ++------------+ +| max(value) | ++------------+ +| NULL | ++------------+ +mysql> select /*+ nth_paln(3) */ max(value) from test.t; ++------------+ +| max(value) | ++------------+ +| NULL | ++------------+ +mysql> select /*+ nth_paln(4) */ max(value) from test.t; ++------------+ +| max(value) | ++------------+ +| NULL | ++------------+ +mysql> select /*+ nth_paln(5) */ max(value) from test.t; ++------------+ +| max(value) | ++------------+ +| NULL | ++------------+ +mysql> select /*+ nth_paln(6) */ max(value) from test.t; ++------------+ +| max(value) | ++------------+ +| NULL | ++------------+ +mysql> select /*+ nth_paln(7) */ max(value) from test.t; ++------------+ +| max(value) | ++------------+ +| NULL | ++------------+ +mysql> select /*+ nth_paln(8) */ max(value) from test.t; ++------------+ +| max(value) | ++------------+ +| NULL | ++------------+ +mysql> select /*+ nth_paln(9) */ max(value) from test.t; ++------------+ +| max(value) | ++------------+ +| NULL | ++------------+ +mysql> select /*+ nth_paln(10) */ max(value) from test.t; ++------------+ +| max(value) | ++------------+ +| NULL | ++------------+