Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: extract a LogicalMemTable from DataSource to decouple memory/stored tables #13741

Merged
merged 3 commits into from
Nov 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1245,7 +1245,7 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
e := &TableScanExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
t: tb,
columns: v.Columns,
columns: v.Table.Columns,
seekHandle: math.MinInt64,
isVirtualTable: tb.Type() == table.VirtualTable,
}
Expand Down
36 changes: 7 additions & 29 deletions planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/planner/property"
"github.com/pingcap/tidb/sessionctx/stmtctx"
Expand Down Expand Up @@ -183,33 +182,16 @@ func (p *baseLogicalPlan) findBestTask(prop *property.PhysicalProperty) (bestTas
return bestTask, nil
}

// tryToGetMemTask will check if this table is a mem table. If it is, it will produce a task.
func (ds *DataSource) tryToGetMemTask(prop *property.PhysicalProperty) (task task, err error) {
func (p *LogicalMemTable) findBestTask(prop *property.PhysicalProperty) (t task, err error) {
if !prop.IsEmpty() {
return nil, nil
}
if !infoschema.IsMemoryDB(ds.DBName.L) {
return nil, nil
return invalidTask, nil
}

memTable := PhysicalMemTable{
DBName: ds.DBName,
Table: ds.tableInfo,
Columns: ds.Columns,
TableAsName: ds.TableAsName,
}.Init(ds.ctx, ds.stats, ds.blockOffset)
memTable.SetSchema(ds.schema)

// Stop to push down these conditions.
var retPlan PhysicalPlan = memTable
if len(ds.pushedDownConds) > 0 {
sel := PhysicalSelection{
Conditions: ds.pushedDownConds,
}.Init(ds.ctx, ds.stats, ds.blockOffset)
sel.SetChildren(memTable)
retPlan = sel
}
return &rootTask{p: retPlan}, nil
DBName: p.dbName,
Table: p.tableInfo,
}.Init(p.ctx, p.stats, p.blockOffset)
memTable.SetSchema(p.schema)
return &rootTask{p: memTable}, nil
}

// tryToGetDualTask will check if the push down predicate has false constant. If so, it will return table dual.
Expand Down Expand Up @@ -423,10 +405,6 @@ func (ds *DataSource) findBestTask(prop *property.PhysicalProperty) (t task, err
if err != nil || t != nil {
return t, err
}
t, err = ds.tryToGetMemTask(prop)
if err != nil || t != nil {
return t, err
}

t = invalidTask
candidates := ds.skylinePruning(prop)
Expand Down
6 changes: 6 additions & 0 deletions planner/core/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,12 @@ func (p PhysicalIndexScan) Init(ctx sessionctx.Context, offset int) *PhysicalInd
return &p
}

// Init initializes LogicalMemTable.
func (p LogicalMemTable) Init(ctx sessionctx.Context, offset int) *LogicalMemTable {
p.baseLogicalPlan = newBaseLogicalPlan(ctx, plancodec.TypeMemTableScan, &p, offset)
return &p
}

// Init initializes PhysicalMemTable.
func (p PhysicalMemTable) Init(ctx sessionctx.Context, stats *property.StatsInfo, offset int) *PhysicalMemTable {
p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeMemTableScan, &p, offset)
Expand Down
56 changes: 51 additions & 5 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2477,6 +2477,10 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as
}
b.visitInfo = appendVisitInfo(b.visitInfo, mysql.SelectPriv, dbName.L, tableInfo.Name.L, "", authErr)

if tbl.Type() == table.VirtualTable {
return b.buildMemTable(ctx, dbName, tableInfo)
}

if tableInfo.IsView() {
return b.BuildDataSourceFromView(ctx, dbName, tableInfo)
}
Expand Down Expand Up @@ -2562,11 +2566,9 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as
schema.Append(newCol)
ds.TblCols = append(ds.TblCols, newCol)
}
// We append an extra handle column to the schema when "ds" is not a memory
// table e.g. table in the "INFORMATION_SCHEMA" database, and the handle
// We append an extra handle column to the schema when the handle
// column is not the primary key of "ds".
isMemDB := infoschema.IsMemoryDB(ds.DBName.L)
if !isMemDB && handleCol == nil {
if handleCol == nil {
ds.Columns = append(ds.Columns, model.NewExtraHandleColInfo())
handleCol = ds.newExtraHandleSchemaCol()
schema.Append(handleCol)
Expand Down Expand Up @@ -2606,7 +2608,7 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as
if err != nil {
return nil, err
}
if txn.Valid() && !txn.IsReadOnly() && !isMemDB {
if txn.Valid() && !txn.IsReadOnly() {
us := LogicalUnionScan{handleCol: handleCol}.Init(b.ctx, b.getSelectOffset())
us.SetChildren(ds)
result = us
Expand All @@ -2629,6 +2631,50 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as
return result, nil
}

func (b *PlanBuilder) buildMemTable(ctx context.Context, dbName model.CIStr, tableInfo *model.TableInfo) (LogicalPlan, error) {
// We can use the `tableInfo.Columns` directly because the memory table has
// a stable schema and there is no online DDL on the memory table.
schema := expression.NewSchema(make([]*expression.Column, 0, len(tableInfo.Columns))...)
names := make([]*types.FieldName, 0, len(tableInfo.Columns))
var handleCol *expression.Column
for _, col := range tableInfo.Columns {
names = append(names, &types.FieldName{
DBName: dbName,
TblName: tableInfo.Name,
ColName: col.Name,
OrigTblName: tableInfo.Name,
OrigColName: col.Name,
})
// NOTE: Rewrite the expression if memory table supports generated columns in the future
newCol := &expression.Column{
UniqueID: b.ctx.GetSessionVars().AllocPlanColumnID(),
ID: col.ID,
RetType: &col.FieldType,
}
if tableInfo.PKIsHandle && mysql.HasPriKeyFlag(col.Flag) {
handleCol = newCol
}
schema.Append(newCol)
}

if handleCol != nil {
handleMap := make(map[int64][]*expression.Column)
handleMap[tableInfo.ID] = []*expression.Column{handleCol}
b.handleHelper.pushMap(handleMap)
} else {
b.handleHelper.pushMap(nil)
}

// NOTE: Add a `LogicalUnionScan` if we support update memory table in the future
p := LogicalMemTable{
dbName: dbName,
tableInfo: tableInfo,
}.Init(b.ctx, b.getSelectOffset())
p.SetSchema(schema)
p.names = names
return p, nil
}

// BuildDataSourceFromView is used to build LogicalPlan from view
func (b *PlanBuilder) BuildDataSourceFromView(ctx context.Context, dbName model.CIStr, tableInfo *model.TableInfo) (LogicalPlan, error) {
charset, collation := b.ctx.GetSessionVars().GetCharsetInfo()
Expand Down
16 changes: 16 additions & 0 deletions planner/core/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,22 @@ type LogicalTableDual struct {
RowCount int
}

// LogicalMemTable represents a memory table or virtual table
// Some memory tables wants to take the ownership of some predications
// e.g
// SELECT * FROM tidb_cluster_log WHERE type='tikv' AND address='192.16.5.32'
// Assume that the table `tidb_cluster_log` is a memory table, which is used
// to retrieve logs from remote components. In the above situation we should
// send log search request to the target TiKV (192.16.5.32) directly instead of
// requesting all cluster components log search gRPC interface to retrieve
// log message and filtering them in TiDB node.
type LogicalMemTable struct {
logicalSchemaProducer

dbName model.CIStr
tableInfo *model.TableInfo
}

// LogicalUnionScan is only used in non read-only txn.
type LogicalUnionScan struct {
baseLogicalPlan
Expand Down
6 changes: 2 additions & 4 deletions planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,8 @@ type PhysicalIndexScan struct {
type PhysicalMemTable struct {
physicalSchemaProducer

DBName model.CIStr
Table *model.TableInfo
Columns []*model.ColumnInfo
TableAsName *model.CIStr
DBName model.CIStr
Table *model.TableInfo
}

// PhysicalTableScan represents a table scan plan.
Expand Down
3 changes: 1 addition & 2 deletions planner/core/rule_column_pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/types"
)

Expand Down Expand Up @@ -218,7 +217,7 @@ func (ds *DataSource) PruneColumns(parentUsedCols []*expression.Column) error {
}
// For SQL like `select 1 from t`, tikv's response will be empty if no column is in schema.
// So we'll force to push one if schema doesn't have any column.
if ds.schema.Len() == 0 && !infoschema.IsMemoryDB(ds.DBName.L) {
if ds.schema.Len() == 0 {
if handleCol == nil {
handleCol = ds.newExtraHandleSchemaCol()
handleColInfo = model.NewExtraHandleColInfo()
Expand Down
16 changes: 16 additions & 0 deletions planner/core/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,22 @@ func (p *LogicalTableDual) DeriveStats(childStats []*property.StatsInfo, selfSch
return p.stats, nil
}

// DeriveStats implement LogicalPlan DeriveStats interface.
func (p *LogicalMemTable) DeriveStats(childStats []*property.StatsInfo, selfSchema *expression.Schema, childSchema []*expression.Schema) (*property.StatsInfo, error) {
statsTable := statistics.PseudoTable(p.tableInfo)
stats := &property.StatsInfo{
RowCount: float64(statsTable.Count),
Cardinality: make([]float64, len(p.tableInfo.Columns)),
HistColl: statsTable.GenerateHistCollFromColumnInfo(p.tableInfo.Columns, p.schema.Columns),
StatsVersion: statistics.PseudoVersion,
}
for i := range p.tableInfo.Columns {
stats.Cardinality[i] = float64(statsTable.Count)
}
p.stats = stats
return p.stats, nil
}

// DeriveStats implement LogicalPlan DeriveStats interface.
func (p *LogicalShow) DeriveStats(childStats []*property.StatsInfo, selfSchema *expression.Schema, childSchema []*expression.Schema) (*property.StatsInfo, error) {
// A fake count, just to avoid panic now.
Expand Down
2 changes: 0 additions & 2 deletions table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ const (
NormalTable Type = iota
// VirtualTable , store no data, just extract data from the memory struct.
VirtualTable
// MemoryTable , store data only in local memory.
MemoryTable
)

const (
Expand Down