Skip to content

Commit

Permalink
add GeneratedColumnPlaceholderInputStream (#6796) (#6812)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Mar 29, 2023
1 parent 1e2fdec commit 9b3e36d
Show file tree
Hide file tree
Showing 10 changed files with 467 additions and 1 deletion.
97 changes: 97 additions & 0 deletions dbms/src/DataStreams/GeneratedColumnPlaceholderBlockInputStream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <Core/ColumnsWithTypeAndName.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <Flash/Coprocessor/TiDBTableScan.h>

namespace DB
{
class GeneratedColumnPlaceholderBlockInputStream : public IProfilingBlockInputStream
{
public:
GeneratedColumnPlaceholderBlockInputStream(
const BlockInputStreamPtr & input,
const std::vector<std::tuple<UInt64, String, DataTypePtr>> & generated_column_infos_,
const String & req_id_)
: generated_column_infos(generated_column_infos_)
, log(Logger::get(req_id_))
{
children.push_back(input);
}

String getName() const override { return NAME; }
Block getHeader() const override
{
Block block = children.back()->getHeader();
insertColumns(block, /*insert_data=*/false);
return block;
}

static String getColumnName(UInt64 col_index)
{
return "generated_column_" + std::to_string(col_index);
}

protected:
void readPrefix() override
{
RUNTIME_CHECK(!generated_column_infos.empty(), Exception, "generated_column_infos cannot be empty");
// Validation check.
for (size_t i = 1; i < generated_column_infos.size(); ++i)
{
RUNTIME_CHECK(std::get<0>(generated_column_infos[i]) > std::get<0>(generated_column_infos[i - 1]), Exception, "generated column index should be ordered");
}
}

Block readImpl() override
{
Block block = children.back()->read();
insertColumns(block, /*insert_data=*/true);
return block;
}

private:
void insertColumns(Block & block, bool insert_data) const
{
if (!block)
return;

for (const auto & ele : generated_column_infos)
{
const auto & col_index = std::get<0>(ele);
const auto & col_name = std::get<1>(ele);
const auto & data_type = std::get<2>(ele);
ColumnPtr column = nullptr;
if (insert_data)
column = data_type->createColumnConstWithDefaultValue(block.rows());
else
column = data_type->createColumn();
block.insert(col_index, ColumnWithTypeAndName{column, data_type, col_name});
}
}

static constexpr auto NAME = "GeneratedColumnPlaceholder";
const std::vector<std::tuple<UInt64, String, DataTypePtr>> generated_column_infos;
const LoggerPtr log;
};

} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ void DAGQueryBlockInterpreter::handleMockTableScan(const TiDBTableScan & table_s
auto mock_table_scan_stream = std::make_shared<MockTableScanBlockInputStream>(columns_with_type_and_name, context.getSettingsRef().max_block_size);
pipeline.streams.emplace_back(mock_table_scan_stream);
}

// Ignore handling GeneratedColumnPlaceholderBlockInputStream for now, because we don't support generated column in test framework.
}

void DAGQueryBlockInterpreter::handleTableScan(const TiDBTableScan & table_scan, DAGPipeline & pipeline)
Expand Down
13 changes: 13 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <Common/TiFlashMetrics.h>
#include <DataStreams/ExpressionBlockInputStream.h>
#include <DataStreams/FilterBlockInputStream.h>
#include <DataStreams/GeneratedColumnPlaceholderBlockInputStream.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/NullBlockInputStream.h>
#include <DataStreams/TiRemoteBlockInputStream.h>
Expand Down Expand Up @@ -300,6 +301,8 @@ void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline)

/// handle timezone/duration cast for local and remote table scan.
executeCastAfterTableScan(remote_read_streams_start_index, pipeline);
/// handle generated column if necessary.
executeGeneratedColumnPlaceholder(remote_read_streams_start_index, generated_column_infos, log, pipeline);
recordProfileStreams(pipeline, table_scan.getTableScanExecutorID());

/// handle pushed down filter for local and remote table scan.
Expand Down Expand Up @@ -960,8 +963,18 @@ std::tuple<Names, NamesAndTypes, std::vector<ExtraCastAfterTSMode>> DAGStorageIn
for (Int32 i = 0; i < table_scan.getColumnSize(); ++i)
{
auto const & ci = table_scan.getColumns()[i];
auto tidb_ci = TiDB::toTiDBColumnInfo(ci);
ColumnID cid = ci.column_id();

if (tidb_ci.hasGeneratedColumnFlag())
{
LOG_FMT_DEBUG(log, "got column({}) with generated column flag", i);
const auto & data_type = getDataTypeByColumnInfoForComputingLayer(tidb_ci);
const auto & col_name = GeneratedColumnPlaceholderBlockInputStream::getColumnName(i);
generated_column_infos.push_back(std::make_tuple(i, col_name, data_type));
source_columns_tmp.emplace_back(NameAndTypePair{col_name, data_type});
continue;
}
// Column ID -1 return the handle column
String name;
if (cid == TiDBPkColumnID)
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ class DAGStorageInterpreter
ManageableStoragePtr storage_for_logical_table;
Names required_columns;
NamesAndTypes source_columns;
// For generated column, just need a placeholder, and TiDB will fill this column.
std::vector<std::tuple<UInt64, String, DataTypePtr>> generated_column_infos;
};

} // namespace DB
17 changes: 17 additions & 0 deletions dbms/src/Flash/Coprocessor/InterpreterUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <DataStreams/GeneratedColumnPlaceholderBlockInputStream.h>
#include <DataStreams/SharedQueryBlockInputStream.h>
#include <DataStreams/UnionBlockInputStream.h>
#include <Flash/Coprocessor/InterpreterUtils.h>
Expand Down Expand Up @@ -93,4 +94,20 @@ ExpressionActionsPtr generateProjectExpressionActions(
project->add(ExpressionAction::project(project_cols));
return project;
}

void executeGeneratedColumnPlaceholder(
size_t remote_read_streams_start_index,
const std::vector<std::tuple<UInt64, String, DataTypePtr>> & generated_column_infos,
LoggerPtr log,
DAGPipeline & pipeline)
{
if (generated_column_infos.empty())
return;
assert(remote_read_streams_start_index <= pipeline.streams.size());
for (size_t i = 0; i < remote_read_streams_start_index; ++i)
{
auto & stream = pipeline.streams[i];
stream = std::make_shared<GeneratedColumnPlaceholderBlockInputStream>(stream, generated_column_infos, log->identifier());
}
}
} // namespace DB
6 changes: 6 additions & 0 deletions dbms/src/Flash/Coprocessor/InterpreterUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,10 @@ ExpressionActionsPtr generateProjectExpressionActions(
const BlockInputStreamPtr & stream,
const Context & context,
const NamesWithAliases & project_cols);

void executeGeneratedColumnPlaceholder(
size_t remote_read_streams_start_index,
const std::vector<std::tuple<UInt64, String, DataTypePtr>> & generated_column_infos,
LoggerPtr log,
DAGPipeline & pipeline);
} // namespace DB
166 changes: 166 additions & 0 deletions dbms/src/Flash/Planner/Plans/PhysicalMockTableScan.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <DataStreams/MockTableScanBlockInputStream.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DAGPipeline.h>
#include <Flash/Coprocessor/GenSchemaAndColumn.h>
#include <Flash/Coprocessor/InterpreterUtils.h>
#include <Flash/Coprocessor/MockSourceStream.h>
#include <Flash/Pipeline/Exec/PipelineExecBuilder.h>
#include <Flash/Planner/FinalizeHelper.h>
#include <Flash/Planner/PhysicalPlanHelper.h>
#include <Flash/Planner/Plans/PhysicalMockTableScan.h>
#include <Interpreters/Context.h>
#include <Operators/BlockInputStreamSourceOp.h>

namespace DB
{
namespace
{
std::pair<NamesAndTypes, BlockInputStreams> mockSchemaAndStreams(
Context & context,
const String & executor_id,
const LoggerPtr & log,
const TiDBTableScan & table_scan)
{
NamesAndTypes schema;
BlockInputStreams mock_streams;
auto & dag_context = *context.getDAGContext();
size_t max_streams = getMockSourceStreamConcurrency(dag_context.initialize_concurrency, context.mockStorage()->getScanConcurrencyHint(table_scan.getLogicalTableID()));
assert(max_streams > 0);

if (context.mockStorage()->useDeltaMerge())
{
assert(context.mockStorage()->tableExistsForDeltaMerge(table_scan.getLogicalTableID()));
schema = context.mockStorage()->getNameAndTypesForDeltaMerge(table_scan.getLogicalTableID());
mock_streams.emplace_back(context.mockStorage()->getStreamFromDeltaMerge(context, table_scan.getLogicalTableID()));
}
else
{
/// build from user input blocks.
assert(context.mockStorage()->tableExists(table_scan.getLogicalTableID()));
NamesAndTypes names_and_types;
std::vector<std::shared_ptr<DB::MockTableScanBlockInputStream>> mock_table_scan_streams;
if (context.isMPPTest())
{
std::tie(names_and_types, mock_table_scan_streams) = mockSourceStreamForMpp(context, max_streams, log, table_scan);
}
else
{
std::tie(names_and_types, mock_table_scan_streams) = mockSourceStream<MockTableScanBlockInputStream>(context, max_streams, log, executor_id, table_scan.getLogicalTableID(), table_scan.getColumns());
}
schema = std::move(names_and_types);
mock_streams.insert(mock_streams.end(), mock_table_scan_streams.begin(), mock_table_scan_streams.end());
}

assert(!schema.empty());
assert(!mock_streams.empty());

// Ignore handling GeneratedColumnPlaceholderBlockInputStream for now, because we don't support generated column in test framework.
return {std::move(schema), std::move(mock_streams)};
}
} // namespace

PhysicalMockTableScan::PhysicalMockTableScan(
const String & executor_id_,
const NamesAndTypes & schema_,
const String & req_id,
const Block & sample_block_,
const BlockInputStreams & mock_streams_,
Int64 table_id_)
: PhysicalLeaf(executor_id_, PlanType::MockTableScan, schema_, req_id)
, sample_block(sample_block_)
, mock_streams(mock_streams_)
, table_id(table_id_)
{}

PhysicalPlanNodePtr PhysicalMockTableScan::build(
Context & context,
const String & executor_id,
const LoggerPtr & log,
const TiDBTableScan & table_scan)
{
assert(context.isTest());
auto [schema, mock_streams] = mockSchemaAndStreams(context, executor_id, log, table_scan);

auto physical_mock_table_scan = std::make_shared<PhysicalMockTableScan>(
executor_id,
schema,
log->identifier(),
Block(schema),
mock_streams,
table_scan.getLogicalTableID());
return physical_mock_table_scan;
}

void PhysicalMockTableScan::buildBlockInputStreamImpl(DAGPipeline & pipeline, Context & /*context*/, size_t /*max_streams*/)
{
assert(pipeline.streams.empty());
pipeline.streams.insert(pipeline.streams.end(), mock_streams.begin(), mock_streams.end());
}

void PhysicalMockTableScan::buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & /*context*/, size_t /*concurrency*/)
{
group_builder.init(mock_streams.size());
size_t i = 0;
group_builder.transform([&](auto & builder) {
builder.setSourceOp(std::make_unique<BlockInputStreamSourceOp>(group_builder.exec_status, mock_streams[i++]));
});
}

void PhysicalMockTableScan::finalize(const Names & parent_require)
{
FinalizeHelper::checkSchemaContainsParentRequire(schema, parent_require);
}

const Block & PhysicalMockTableScan::getSampleBlock() const
{
return sample_block;
}

void PhysicalMockTableScan::updateStreams(Context & context)
{
mock_streams.clear();
assert(context.mockStorage()->tableExistsForDeltaMerge(table_id));
mock_streams.emplace_back(context.mockStorage()->getStreamFromDeltaMerge(context, table_id, &filter_conditions));
}

bool PhysicalMockTableScan::setFilterConditions(Context & context, const String & filter_executor_id, const tipb::Selection & selection)
{
if (unlikely(hasFilterConditions()))
{
return false;
}
filter_conditions = FilterConditions::filterConditionsFrom(filter_executor_id, selection);
updateStreams(context);
return true;
}

bool PhysicalMockTableScan::hasFilterConditions() const
{
return filter_conditions.hasValue();
}

const String & PhysicalMockTableScan::getFilterConditionsId() const
{
assert(hasFilterConditions());
return filter_conditions.executor_id;
}

Int64 PhysicalMockTableScan::getLogicalTableID() const
{
return table_id;
}
} // namespace DB
13 changes: 13 additions & 0 deletions dbms/src/Storages/Transaction/TiDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1128,4 +1128,17 @@ ColumnInfo fieldTypeToColumnInfo(const tipb::FieldType & field_type)
return ret;
}

ColumnInfo toTiDBColumnInfo(const tipb::ColumnInfo & tipb_column_info)
{
ColumnInfo tidb_column_info;
tidb_column_info.tp = static_cast<TiDB::TP>(tipb_column_info.tp());
tidb_column_info.id = tipb_column_info.column_id();
tidb_column_info.flag = tipb_column_info.flag();
tidb_column_info.flen = tipb_column_info.columnlen();
tidb_column_info.decimal = tipb_column_info.decimal();
for (int i = 0; i < tipb_column_info.elems_size(); ++i)
tidb_column_info.elems.emplace_back(tipb_column_info.elems(i), i + 1);
return tidb_column_info;
}

} // namespace TiDB
Loading

0 comments on commit 9b3e36d

Please sign in to comment.