diff --git a/ddl/db_partition_test.go b/ddl/db_partition_test.go index c17c9a648790f..ac73f4dd9f73f 100644 --- a/ddl/db_partition_test.go +++ b/ddl/db_partition_test.go @@ -1341,7 +1341,7 @@ func TestAlterTableDropPartitionByList(t *testing.T) { );`) tk.MustExec(`insert into t values (1),(3),(5),(null)`) tk.MustExec(`alter table t drop partition p1`) - tk.MustQuery("select * from t").Check(testkit.Rows("1", "5", "")) + tk.MustQuery("select * from t order by id").Check(testkit.Rows("", "1", "5")) ctx := tk.Session() is := domain.GetDomain(ctx).InfoSchema() tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) @@ -1867,7 +1867,7 @@ func TestAlterTableExchangePartition(t *testing.T) { // test disable exchange partition tk.MustExec("ALTER TABLE e EXCHANGE PARTITION p0 WITH TABLE e2") tk.MustQuery("show warnings").Check(testkit.Rows("Warning 8200 Exchange Partition is disabled, please set 'tidb_enable_exchange_partition' if you need to need to enable it")) - tk.MustQuery("select * from e").Check(testkit.Rows("16", "1669", "337", "2005")) + tk.MustQuery("select * from e order by id").Check(testkit.Rows("16", "337", "1669", "2005")) tk.MustQuery("select * from e2").Check(testkit.Rows()) // enable exchange partition diff --git a/distsql/request_builder.go b/distsql/request_builder.go index c8d7f0ebb3c0c..62355a075825d 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -187,6 +187,12 @@ func (builder *RequestBuilder) SetKeyRanges(keyRanges []kv.KeyRange) *RequestBui return builder } +// SetPartitionKeyRanges sets the "KeyRangesWithPartition" for "kv.Request". +func (builder *RequestBuilder) SetPartitionKeyRanges(keyRanges [][]kv.KeyRange) *RequestBuilder { + builder.Request.KeyRangesWithPartition = keyRanges + return builder +} + // SetStartTS sets "StartTS" for "kv.Request". func (builder *RequestBuilder) SetStartTS(startTS uint64) *RequestBuilder { builder.Request.StartTs = startTS @@ -332,6 +338,16 @@ func (builder *RequestBuilder) verifyTxnScope() error { return errors.New("requestBuilder can't decode tableID from keyRange") } } + for _, partKeyRanges := range builder.Request.KeyRangesWithPartition { + for _, keyRange := range partKeyRanges { + tableID := tablecodec.DecodeTableID(keyRange.StartKey) + if tableID > 0 { + visitPhysicalTableID[tableID] = struct{}{} + } else { + return errors.New("requestBuilder can't decode tableID from keyRange") + } + } + } for phyTableID := range visitPhysicalTableID { valid := VerifyTxnScope(builder.ReadReplicaScope, phyTableID, builder.is) diff --git a/executor/adapter.go b/executor/adapter.go index e68711d200ef2..8e182cf9517ef 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -20,6 +20,7 @@ import ( "fmt" "math" "runtime/trace" + "strconv" "strings" "sync/atomic" "time" @@ -394,8 +395,18 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) { oriScan := sctx.GetSessionVars().DistSQLScanConcurrency() oriIndex := sctx.GetSessionVars().IndexSerialScanConcurrency() oriIso, _ := sctx.GetSessionVars().GetSystemVar(variable.TxnIsolation) - terror.Log(sctx.GetSessionVars().SetSystemVar(variable.TiDBBuildStatsConcurrency, "1")) - sctx.GetSessionVars().SetDistSQLScanConcurrency(1) + autoConcurrency, err1 := variable.GetSessionOrGlobalSystemVar(sctx.GetSessionVars(), variable.TiDBAutoBuildStatsConcurrency) + terror.Log(err1) + if err1 == nil { + terror.Log(sctx.GetSessionVars().SetSystemVar(variable.TiDBBuildStatsConcurrency, autoConcurrency)) + } + sVal, err2 := variable.GetSessionOrGlobalSystemVar(sctx.GetSessionVars(), variable.TiDBSysProcScanConcurrency) + terror.Log(err2) + if err2 == nil { + concurrency, err3 := strconv.ParseInt(sVal, 10, 64) + terror.Log(err3) + sctx.GetSessionVars().SetDistSQLScanConcurrency(int(concurrency)) + } sctx.GetSessionVars().SetIndexSerialScanConcurrency(1) terror.Log(sctx.GetSessionVars().SetSystemVar(variable.TxnIsolation, ast.ReadCommitted)) defer func() { diff --git a/executor/builder.go b/executor/builder.go index a225a7e4a95ec..22fa2e72c74ed 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -4118,16 +4118,16 @@ func (h kvRangeBuilderFromRangeAndPartition) buildKeyRangeSeparately(ranges []*r return pids, ret, nil } -func (h kvRangeBuilderFromRangeAndPartition) buildKeyRange(ranges []*ranger.Range) ([]kv.KeyRange, error) { - var ret []kv.KeyRange - for _, p := range h.partitions { +func (h kvRangeBuilderFromRangeAndPartition) buildKeyRange(ranges []*ranger.Range) ([][]kv.KeyRange, error) { + ret := make([][]kv.KeyRange, len(h.partitions)) + for i, p := range h.partitions { pid := p.GetPhysicalID() meta := p.Meta() kvRange, err := distsql.TableHandleRangesToKVRanges(h.sctx.GetSessionVars().StmtCtx, []int64{pid}, meta != nil && meta.IsCommonHandle, ranges, nil) if err != nil { return nil, err } - ret = append(ret, kvRange...) + ret[i] = append(ret[i], kvRange...) } return ret, nil } diff --git a/executor/distsql.go b/executor/distsql.go index 2ec1dca348d0f..63a9d2aeb5014 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -449,9 +449,6 @@ func (e *IndexLookUpExecutor) Open(ctx context.Context) error { func (e *IndexLookUpExecutor) buildTableKeyRanges() (err error) { sc := e.ctx.GetSessionVars().StmtCtx if e.partitionTableMode { - if e.keepOrder { // this case should be prevented by the optimizer - return errors.New("invalid execution plan: cannot keep order when accessing a partition table by IndexLookUpReader") - } e.feedback.Invalidate() // feedback for partition tables is not ready e.partitionKVRanges = make([][]kv.KeyRange, 0, len(e.prunedPartitions)) for _, p := range e.prunedPartitions { diff --git a/executor/partition_table_test.go b/executor/partition_table_test.go index e2d098c622f8f..6a8aa85641b10 100644 --- a/executor/partition_table_test.go +++ b/executor/partition_table_test.go @@ -52,7 +52,7 @@ partition p2 values less than (10))`) // Table reader: one partition tk.MustQuery("select * from pt where c > 8").Check(testkit.Rows("9 9")) // Table reader: more than one partition - tk.MustQuery("select * from pt where c < 2 or c >= 9").Check(testkit.Rows("0 0", "9 9")) + tk.MustQuery("select * from pt where c < 2 or c >= 9").Sort().Check(testkit.Rows("0 0", "9 9")) // Index reader tk.MustQuery("select c from pt").Sort().Check(testkit.Rows("0", "2", "4", "6", "7", "9", "")) @@ -64,7 +64,7 @@ partition p2 values less than (10))`) tk.MustQuery("select /*+ use_index(pt, i_id) */ * from pt").Sort().Check(testkit.Rows("0 0", "2 2", "4 4", "6 6", "7 7", "9 9", " ")) tk.MustQuery("select /*+ use_index(pt, i_id) */ * from pt where id < 4 and c > 10").Check(testkit.Rows()) tk.MustQuery("select /*+ use_index(pt, i_id) */ * from pt where id < 10 and c > 8").Check(testkit.Rows("9 9")) - tk.MustQuery("select /*+ use_index(pt, i_id) */ * from pt where id < 10 and c < 2 or c >= 9").Check(testkit.Rows("0 0", "9 9")) + tk.MustQuery("select /*+ use_index(pt, i_id) */ * from pt where id < 10 and c < 2 or c >= 9").Sort().Check(testkit.Rows("0 0", "9 9")) // Index Merge tk.MustExec("set @@tidb_enable_index_merge = 1") @@ -352,14 +352,67 @@ func TestOrderByandLimit(t *testing.T) { // regular table tk.MustExec("create table tregular(a int, b int, index idx_a(a))") + // range partition table with int pk + tk.MustExec(`create table trange_intpk(a int primary key, b int) partition by range(a) ( + partition p0 values less than(300), + partition p1 values less than (500), + partition p2 values less than(1100));`) + + // hash partition table with int pk + tk.MustExec("create table thash_intpk(a int primary key, b int) partition by hash(a) partitions 4;") + + // regular table with int pk + tk.MustExec("create table tregular_intpk(a int primary key, b int)") + + // range partition table with clustered index + tk.MustExec(`create table trange_clustered(a int, b int, primary key(a, b) clustered) partition by range(a) ( + partition p0 values less than(300), + partition p1 values less than (500), + partition p2 values less than(1100));`) + + // hash partition table with clustered index + tk.MustExec("create table thash_clustered(a int, b int, primary key(a, b) clustered) partition by hash(a) partitions 4;") + + // regular table with clustered index + tk.MustExec("create table tregular_clustered(a int, b int, primary key(a, b) clustered)") + // generate some random data to be inserted vals := make([]string, 0, 2000) for i := 0; i < 2000; i++ { vals = append(vals, fmt.Sprintf("(%v, %v)", rand.Intn(1100), rand.Intn(2000))) } + + dedupValsA := make([]string, 0, 2000) + dedupMapA := make(map[int]struct{}, 2000) + for i := 0; i < 2000; i++ { + valA := rand.Intn(1100) + if _, ok := dedupMapA[valA]; ok { + continue + } + dedupValsA = append(dedupValsA, fmt.Sprintf("(%v, %v)", valA, rand.Intn(2000))) + dedupMapA[valA] = struct{}{} + } + + dedupValsAB := make([]string, 0, 2000) + dedupMapAB := make(map[string]struct{}, 2000) + for i := 0; i < 2000; i++ { + val := fmt.Sprintf("(%v, %v)", rand.Intn(1100), rand.Intn(2000)) + if _, ok := dedupMapAB[val]; ok { + continue + } + dedupValsAB = append(dedupValsAB, val) + dedupMapAB[val] = struct{}{} + } + tk.MustExec("insert into trange values " + strings.Join(vals, ",")) tk.MustExec("insert into thash values " + strings.Join(vals, ",")) tk.MustExec("insert into tregular values " + strings.Join(vals, ",")) + tk.MustExec("insert into trange_intpk values " + strings.Join(dedupValsA, ",")) + tk.MustExec("insert into thash_intpk values " + strings.Join(dedupValsA, ",")) + tk.MustExec("insert into tregular_intpk values " + strings.Join(dedupValsA, ",")) + tk.MustExec("insert into trange_clustered values " + strings.Join(dedupValsAB, ",")) + tk.MustExec("insert into thash_clustered values " + strings.Join(dedupValsAB, ",")) + tk.MustExec("insert into tregular_clustered values " + strings.Join(dedupValsAB, ",")) // test indexLookUp for i := 0; i < 100; i++ { @@ -373,6 +426,29 @@ func TestOrderByandLimit(t *testing.T) { tk.MustQuery(queryPartition).Sort().Check(tk.MustQuery(queryRegular).Sort().Rows()) } + // test indexLookUp with order property pushed down. + for i := 0; i < 100; i++ { + // explain select * from t where a > {y} use index(idx_a) order by a limit {x}; // check if IndexLookUp is used + // select * from t where a > {y} use index(idx_a) order by a limit {x}; // it can return the correct result + x := rand.Intn(1099) + y := rand.Intn(2000) + 1 + // Since we only use order by a not order by a, b, the result is not stable when we read both a and b. + // We cut the max element so that the result can be stable. + maxEle := tk.MustQuery(fmt.Sprintf("select ifnull(max(a), 1100) from (select * from tregular use index(idx_a) where a > %v order by a limit %v) t", x, y)).Rows()[0][0] + queryRangePartitionWithLimitHint := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from trange use index(idx_a) where a > %v and a < greatest(%v+1, %v) order by a limit %v", x, x+1, maxEle, y) + queryHashPartitionWithLimitHint := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from thash use index(idx_a) where a > %v and a < greatest(%v+1, %v) order by a limit %v", x, x+1, maxEle, y) + queryRegular := fmt.Sprintf("select * from tregular use index(idx_a) where a > %v and a < greatest(%v+1, %v) order by a limit %v;", x, x+1, maxEle, y) + require.True(t, tk.HasPlan(queryRangePartitionWithLimitHint, "Limit")) + require.True(t, tk.HasPlan(queryRangePartitionWithLimitHint, "IndexLookUp")) + require.True(t, tk.HasPlan(queryHashPartitionWithLimitHint, "Limit")) + require.True(t, tk.HasPlan(queryHashPartitionWithLimitHint, "IndexLookUp")) + require.True(t, tk.HasPlan(queryRangePartitionWithLimitHint, "TopN")) // but not fully pushed + require.True(t, tk.HasPlan(queryHashPartitionWithLimitHint, "TopN")) + regularResult := tk.MustQuery(queryRegular).Sort().Rows() + tk.MustQuery(queryRangePartitionWithLimitHint).Sort().Check(regularResult) + tk.MustQuery(queryHashPartitionWithLimitHint).Sort().Check(regularResult) + } + // test tableReader for i := 0; i < 100; i++ { // explain select * from t where a > {y} ignore index(idx_a) order by a limit {x}; // check if IndexLookUp is used @@ -385,6 +461,51 @@ func TestOrderByandLimit(t *testing.T) { tk.MustQuery(queryPartition).Sort().Check(tk.MustQuery(queryRegular).Sort().Rows()) } + // test tableReader with order property pushed down. + for i := 0; i < 100; i++ { + // explain select * from t where a > {y} ignore index(idx_a) order by a limit {x}; // check if IndexLookUp is used + // select * from t where a > {y} ignore index(idx_a) order by a limit {x}; // it can return the correct result + x := rand.Intn(1099) + y := rand.Intn(2000) + 1 + queryRangePartition := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from trange ignore index(idx_a) where a > %v order by a, b limit %v;", x, y) + queryHashPartition := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from thash ignore index(idx_a) where a > %v order by a, b limit %v;", x, y) + queryRegular := fmt.Sprintf("select * from tregular ignore index(idx_a) where a > %v order by a, b limit %v;", x, y) + require.True(t, tk.HasPlan(queryRangePartition, "TableReader")) // check if tableReader is used + require.True(t, tk.HasPlan(queryHashPartition, "TableReader")) + require.False(t, tk.HasPlan(queryRangePartition, "Limit")) // check if order property is not pushed + require.False(t, tk.HasPlan(queryHashPartition, "Limit")) + regularResult := tk.MustQuery(queryRegular).Sort().Rows() + tk.MustQuery(queryRangePartition).Sort().Check(regularResult) + tk.MustQuery(queryHashPartition).Sort().Check(regularResult) + + // test int pk + // To be simplified, we only read column a. + queryRangePartition = fmt.Sprintf("select /*+ LIMIT_TO_COP() */ a from trange_intpk use index(primary) where a > %v order by a limit %v", x, y) + queryHashPartition = fmt.Sprintf("select /*+ LIMIT_TO_COP() */ a from thash_intpk use index(primary) where a > %v order by a limit %v", x, y) + queryRegular = fmt.Sprintf("select a from tregular_intpk where a > %v order by a limit %v", x, y) + require.True(t, tk.HasPlan(queryRangePartition, "TableReader")) + require.True(t, tk.HasPlan(queryHashPartition, "TableReader")) + require.True(t, tk.HasPlan(queryRangePartition, "Limit")) // check if order property is not pushed + require.True(t, tk.HasPlan(queryHashPartition, "Limit")) + regularResult = tk.MustQuery(queryRegular).Rows() + tk.MustQuery(queryRangePartition).Check(regularResult) + tk.MustQuery(queryHashPartition).Check(regularResult) + + // test clustered index + queryRangePartition = fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from trange_clustered use index(primary) where a > %v order by a, b limit %v;", x, y) + queryHashPartition = fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from thash_clustered use index(primary) where a > %v order by a, b limit %v;", x, y) + queryRegular = fmt.Sprintf("select * from tregular_clustered where a > %v order by a, b limit %v;", x, y) + require.True(t, tk.HasPlan(queryRangePartition, "TableReader")) // check if tableReader is used + require.True(t, tk.HasPlan(queryHashPartition, "TableReader")) + require.True(t, tk.HasPlan(queryRangePartition, "Limit")) // check if order property is pushed + require.True(t, tk.HasPlan(queryHashPartition, "Limit")) + require.True(t, tk.HasPlan(queryRangePartition, "TopN")) // but not fully pushed + require.True(t, tk.HasPlan(queryHashPartition, "TopN")) + regularResult = tk.MustQuery(queryRegular).Rows() + tk.MustQuery(queryRangePartition).Check(regularResult) + tk.MustQuery(queryHashPartition).Check(regularResult) + } + // test indexReader for i := 0; i < 100; i++ { // explain select a from t where a > {y} use index(idx_a) order by a limit {x}; // check if IndexLookUp is used @@ -397,6 +518,24 @@ func TestOrderByandLimit(t *testing.T) { tk.MustQuery(queryPartition).Sort().Check(tk.MustQuery(queryRegular).Sort().Rows()) } + // test indexReader with order property pushed down. + for i := 0; i < 100; i++ { + // explain select a from t where a > {y} use index(idx_a) order by a limit {x}; // check if IndexLookUp is used + // select a from t where a > {y} use index(idx_a) order by a limit {x}; // it can return the correct result + x := rand.Intn(1099) + y := rand.Intn(2000) + 1 + queryRangePartition := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ a from trange use index(idx_a) where a > %v order by a limit %v;", x, y) + queryHashPartition := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ a from trange use index(idx_a) where a > %v order by a limit %v;", x, y) + queryRegular := fmt.Sprintf("select a from tregular use index(idx_a) where a > %v order by a limit %v;", x, y) + require.True(t, tk.HasPlan(queryRangePartition, "IndexReader")) // check if indexReader is used + require.True(t, tk.HasPlan(queryHashPartition, "IndexReader")) + require.True(t, tk.HasPlan(queryRangePartition, "Limit")) // check if order property is pushed + require.True(t, tk.HasPlan(queryHashPartition, "Limit")) + regularResult := tk.MustQuery(queryRegular).Sort().Rows() + tk.MustQuery(queryRangePartition).Sort().Check(regularResult) + tk.MustQuery(queryHashPartition).Sort().Check(regularResult) + } + // test indexMerge for i := 0; i < 100; i++ { // explain select /*+ use_index_merge(t) */ * from t where a > 2 or b < 5 order by a limit {x}; // check if IndexMerge is used diff --git a/executor/table_reader.go b/executor/table_reader.go index a9decdd55b1a7..22e83a4231e13 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -56,7 +56,7 @@ func (sr selectResultHook) SelectResult(ctx context.Context, sctx sessionctx.Con } type kvRangeBuilder interface { - buildKeyRange(ranges []*ranger.Range) ([]kv.KeyRange, error) + buildKeyRange(ranges []*ranger.Range) ([][]kv.KeyRange, error) buildKeyRangeSeparately(ranges []*ranger.Range) ([]int64, [][]kv.KeyRange, error) } @@ -196,13 +196,25 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error { if err != nil { return err } - e.kvRanges = append(e.kvRanges, kvReq.KeyRanges...) + if len(kvReq.KeyRanges) > 0 { + e.kvRanges = append(e.kvRanges, kvReq.KeyRanges...) + } else { + for _, kr := range kvReq.KeyRangesWithPartition { + e.kvRanges = append(e.kvRanges, kr...) + } + } if len(secondPartRanges) != 0 { kvReq, err = e.buildKVReq(ctx, secondPartRanges) if err != nil { return err } - e.kvRanges = append(e.kvRanges, kvReq.KeyRanges...) + if len(kvReq.KeyRanges) > 0 { + e.kvRanges = append(e.kvRanges, kvReq.KeyRanges...) + } else { + for _, kr := range kvReq.KeyRangesWithPartition { + e.kvRanges = append(e.kvRanges, kr...) + } + } } return nil } @@ -305,7 +317,14 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra if err != nil { return nil, err } - e.kvRanges = append(e.kvRanges, kvReq.KeyRanges...) + + if len(kvReq.KeyRanges) > 0 { + e.kvRanges = append(e.kvRanges, kvReq.KeyRanges...) + } else { + for _, kr := range kvReq.KeyRangesWithPartition { + e.kvRanges = append(e.kvRanges, kr...) + } + } result, err := e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans), e.id) if err != nil { @@ -391,7 +410,7 @@ func (e *TableReaderExecutor) buildKVReq(ctx context.Context, ranges []*ranger.R if err != nil { return nil, err } - reqBuilder = builder.SetKeyRanges(kvRange) + reqBuilder = builder.SetPartitionKeyRanges(kvRange) } else { reqBuilder = builder.SetHandleRanges(e.ctx.GetSessionVars().StmtCtx, getPhysicalTableID(e.table), e.table.Meta() != nil && e.table.Meta().IsCommonHandle, ranges, e.feedback) } diff --git a/kv/kv.go b/kv/kv.go index 58aecb5195891..24544e852f415 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -313,6 +313,10 @@ type Request struct { Data []byte KeyRanges []KeyRange + // KeyRangesWithPartition makes sure that the request is sent first by partition then by region. + // When the table is small, it's possible that multiple partitions are in the same region. + KeyRangesWithPartition [][]KeyRange + // For PartitionTableScan used by tiflash. PartitionIDAndRanges []PartitionIDAndRanges diff --git a/planner/cascades/testdata/integration_suite_in.json b/planner/cascades/testdata/integration_suite_in.json index b4f563265813c..57272922a71f1 100644 --- a/planner/cascades/testdata/integration_suite_in.json +++ b/planner/cascades/testdata/integration_suite_in.json @@ -142,7 +142,7 @@ { "name": "TestCascadePlannerHashedPartTable", "cases": [ - "select * from pt1" + "select * from pt1 order by a" ] }, { diff --git a/planner/cascades/testdata/integration_suite_out.json b/planner/cascades/testdata/integration_suite_out.json index 21b601bfc9198..08c57d0cb7094 100644 --- a/planner/cascades/testdata/integration_suite_out.json +++ b/planner/cascades/testdata/integration_suite_out.json @@ -1197,17 +1197,18 @@ "Name": "TestCascadePlannerHashedPartTable", "Cases": [ { - "SQL": "select * from pt1", + "SQL": "select * from pt1 order by a", "Plan": [ - "TableReader_5 10000.00 root partition:all data:TableFullScan_6", - "└─TableFullScan_6 10000.00 cop[tikv] table:pt1 keep order:false, stats:pseudo" + "Sort_11 10000.00 root test.pt1.a", + "└─TableReader_9 10000.00 root partition:all data:TableFullScan_10", + " └─TableFullScan_10 10000.00 cop[tikv] table:pt1 keep order:false, stats:pseudo" ], "Result": [ - "4 40", "1 10", - "5 50", "2 20", - "3 30" + "3 30", + "4 40", + "5 50" ] } ] diff --git a/planner/core/find_best_task.go b/planner/core/find_best_task.go index 13c559d9c66c5..86d163366afdb 100644 --- a/planner/core/find_best_task.go +++ b/planner/core/find_best_task.go @@ -1292,8 +1292,9 @@ func (ds *DataSource) buildIndexMergeTableScan(prop *property.PhysicalProperty, // `remaining`: exprs that can NOT be pushed to TiKV but can be pushed to other storage engines. // Why do we need this func? // IndexMerge only works on TiKV, so we need to find all exprs that cannot be pushed to TiKV, and add a new Selection above IndexMergeReader. -// But the new Selection should exclude the exprs that can NOT be pushed to ALL the storage engines. -// Because these exprs have already been put in another Selection(check rule_predicate_push_down). +// +// But the new Selection should exclude the exprs that can NOT be pushed to ALL the storage engines. +// Because these exprs have already been put in another Selection(check rule_predicate_push_down). func extractFiltersForIndexMerge(sc *stmtctx.StatementContext, client kv.Client, filters []expression.Expression) (pushed []expression.Expression, remaining []expression.Expression) { for _, expr := range filters { if expression.CanExprsPushDown(sc, []expression.Expression{expr}, client, kv.TiKV) { @@ -1474,11 +1475,12 @@ func (is *PhysicalIndexScan) getScanRowSize() float64 { // initSchema is used to set the schema of PhysicalIndexScan. Before calling this, // make sure the following field of PhysicalIndexScan are initialized: -// PhysicalIndexScan.Table *model.TableInfo -// PhysicalIndexScan.Index *model.IndexInfo -// PhysicalIndexScan.Index.Columns []*IndexColumn -// PhysicalIndexScan.IdxCols []*expression.Column -// PhysicalIndexScan.Columns []*model.ColumnInfo +// +// PhysicalIndexScan.Table *model.TableInfo +// PhysicalIndexScan.Index *model.IndexInfo +// PhysicalIndexScan.Index.Columns []*IndexColumn +// PhysicalIndexScan.IdxCols []*expression.Column +// PhysicalIndexScan.Columns []*model.ColumnInfo func (is *PhysicalIndexScan) initSchema(idxExprCols []*expression.Column, isDoubleRead bool) { indexCols := make([]*expression.Column, len(is.IdxCols), len(is.Index.Columns)+1) copy(indexCols, is.IdxCols) @@ -2281,6 +2283,8 @@ func (ds *DataSource) getOriginalPhysicalIndexScan(prop *property.PhysicalProper physicalTableID: ds.physicalTableID, tblColHists: ds.TblColHists, pkIsHandleCol: ds.getPKIsHandleCol(), + constColsByCond: path.ConstCols, + prop: prop, }.Init(ds.ctx, ds.blockOffset) statsTbl := ds.statisticTable if statsTbl.Indices[idx.ID] != nil { diff --git a/planner/core/partition_pruner_test.go b/planner/core/partition_pruner_test.go index 45f9e1f8f5529..1c3e811cb9b2a 100644 --- a/planner/core/partition_pruner_test.go +++ b/planner/core/partition_pruner_test.go @@ -307,17 +307,20 @@ func TestListPartitionPruner(t *testing.T) { partitionPrunerData.GetTestCases(t, &input, &output) valid := false for i, tt := range input { + if tt == "select * from t1 where (id = 1 and a = 1) or a is null" { + fmt.Println("1") + } testdata.OnRecord(func() { output[i].SQL = tt - output[i].Result = testdata.ConvertRowsToStrings(tk.MustQuery(tt).Rows()) + output[i].Result = testdata.ConvertRowsToStrings(tk.MustQuery(tt).Sort().Rows()) output[i].Plan = testdata.ConvertRowsToStrings(tk.MustQuery("explain format = 'brief' " + tt).Rows()) }) tk.MustQuery("explain format = 'brief' " + tt).Check(testkit.Rows(output[i].Plan...)) - result := tk.MustQuery(tt) + result := tk.MustQuery(tt).Sort() result.Check(testkit.Rows(output[i].Result...)) // If the query doesn't specified the partition, compare the result with normal table if !strings.Contains(tt, "partition(") { - result.Check(tk2.MustQuery(tt).Rows()) + result.Check(tk.MustQuery(tt).Sort().Rows()) valid = true } require.True(t, valid) @@ -383,7 +386,7 @@ func TestListColumnsPartitionPruner(t *testing.T) { indexPlanTree := testdata.ConvertRowsToStrings(indexPlan.Rows()) testdata.OnRecord(func() { output[i].SQL = tt.SQL - output[i].Result = testdata.ConvertRowsToStrings(tk.MustQuery(tt.SQL).Rows()) + output[i].Result = testdata.ConvertRowsToStrings(tk.MustQuery(tt.SQL).Sort().Rows()) // Test for table without index. output[i].Plan = planTree // Test for table with index. @@ -398,14 +401,14 @@ func TestListColumnsPartitionPruner(t *testing.T) { checkPrunePartitionInfo(t, tt.SQL, tt.Pruner, indexPlanTree) // compare the result. - result := tk.MustQuery(tt.SQL) + result := tk.MustQuery(tt.SQL).Sort() idxResult := tk1.MustQuery(tt.SQL) - result.Check(idxResult.Rows()) + result.Check(idxResult.Sort().Rows()) result.Check(testkit.Rows(output[i].Result...)) // If the query doesn't specified the partition, compare the result with normal table if !strings.Contains(tt.SQL, "partition(") { - result.Check(tk2.MustQuery(tt.SQL).Rows()) + result.Check(tk2.MustQuery(tt.SQL).Sort().Rows()) valid = true } } diff --git a/planner/core/physical_plans.go b/planner/core/physical_plans.go index 9f5721049c166..71b8a60985f39 100644 --- a/planner/core/physical_plans.go +++ b/planner/core/physical_plans.go @@ -442,6 +442,12 @@ type PhysicalIndexScan struct { // tblColHists contains all columns before pruning, which are used to calculate row-size tblColHists *statistics.HistColl pkIsHandleCol *expression.Column + + // constColsByCond records the constant part of the index columns caused by the access conds. + // e.g. the index is (a, b, c) and there's filter a = 1 and b = 2, then the column a and b are const part. + constColsByCond []bool + + prop *property.PhysicalProperty } // Clone implements PhysicalPlan interface. diff --git a/planner/core/plan_cost_test.go b/planner/core/plan_cost_test.go index 4646a78534690..3bd693fb5f54e 100644 --- a/planner/core/plan_cost_test.go +++ b/planner/core/plan_cost_test.go @@ -976,6 +976,7 @@ func TestScanOnSmallTable(t *testing.T) { tk.MustExec("insert into t values (1), (2), (3), (4), (5)") tk.MustExec("analyze table t") tk.MustExec(`set @@tidb_cost_model_version=2`) + tk.MustExec(`set @@tidb_enable_new_cost_interface=ON`) // Create virtual tiflash replica info. dom := domain.GetDomain(tk.Session()) diff --git a/planner/core/task.go b/planner/core/task.go index 0dbfa17c3904a..a6cb16011c421 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tidb/planner/util" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/statistics" + "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/collate" @@ -1011,6 +1012,10 @@ func (p *PhysicalTopN) attach2Task(tasks ...task) task { } needPushDown := len(cols) > 0 if copTask, ok := t.(*copTask); ok && needPushDown && p.canPushDown(copTask.getStoreType()) && len(copTask.rootTaskConds) == 0 { + newTask, changed := p.pushTopNDownToDynamicPartition(copTask) + if changed { + return newTask + } // If all columns in topN are from index plan, we push it to index plan, otherwise we finish the index plan and // push it to table plan. var pushedDownTopN *PhysicalTopN @@ -1035,6 +1040,145 @@ func (p *PhysicalTopN) attach2Task(tasks ...task) task { return attachPlan2Task(p, rootTask) } +// pushTopNDownToDynamicPartition is a temp solution for partition table. It actually does the same thing as DataSource's isMatchProp. +// We need to support a more enhanced read strategy in the execution phase. So that we can achieve Limit(TiDB)->Reader(TiDB)->Limit(TiKV/TiFlash)->Scan(TiKV/TiFlash). +// Before that is done, we use this logic to provide a way to keep the order property when reading from TiKV, so that we can use the orderliness of index to speed up the query. +// Here we can change the execution plan to TopN(TiDB)->Reader(TiDB)->Limit(TiKV)->Scan(TiKV).(TiFlash is not supported). +func (p *PhysicalTopN) pushTopNDownToDynamicPartition(copTsk *copTask) (task, bool) { + var err error + copTsk = copTsk.copy().(*copTask) + if err != nil { + return nil, false + } + if len(copTsk.rootTaskConds) > 0 { + return nil, false + } + colsProp, ok := GetPropByOrderByItems(p.ByItems) + if !ok { + return nil, false + } + allSameOrder, isDesc := colsProp.AllSameOrder() + if !allSameOrder { + return nil, false + } + checkIndexMatchProp := func(idxCols []*expression.Column, idxColLens []int, constColsByCond []bool, colsProp *property.PhysicalProperty) bool { + // If the number of the by-items is bigger than the index columns. We cannot push down since it must not keep order. + if len(idxCols) < len(colsProp.SortItems) { + return false + } + idxPos := 0 + for _, byItem := range colsProp.SortItems { + found := false + for ; idxPos < len(idxCols); idxPos++ { + if idxColLens[idxPos] == types.UnspecifiedLength && idxCols[idxPos].Equal(p.SCtx(), byItem.Col) { + found = true + idxPos++ + break + } + if len(constColsByCond) == 0 || idxPos > len(constColsByCond) || !constColsByCond[idxPos] { + found = false + break + } + } + if !found { + return false + } + } + return true + } + var ( + idxScan *PhysicalIndexScan + tblScan *PhysicalTableScan + tblInfo *model.TableInfo + ) + if copTsk.indexPlan != nil { + copTsk.indexPlan, err = copTsk.indexPlan.Clone() + if err != nil { + return nil, false + } + finalIdxScanPlan := copTsk.indexPlan + for len(finalIdxScanPlan.Children()) > 0 && finalIdxScanPlan.Children()[0] != nil { + finalIdxScanPlan = finalIdxScanPlan.Children()[0] + } + idxScan = finalIdxScanPlan.(*PhysicalIndexScan) + tblInfo = idxScan.Table + } + if copTsk.tablePlan != nil { + copTsk.tablePlan, err = copTsk.tablePlan.Clone() + if err != nil { + return nil, false + } + finalTblScanPlan := copTsk.tablePlan + if len(finalTblScanPlan.Children()) > 0 { + finalTblScanPlan = finalTblScanPlan.Children()[0] + } + tblScan = finalTblScanPlan.(*PhysicalTableScan) + tblInfo = tblScan.Table + } + + pi := tblInfo.GetPartitionInfo() + if pi == nil { + return nil, false + } + if pi.Type == model.PartitionTypeList { + return nil, false + } + + if !copTsk.indexPlanFinished { + // If indexPlan side isn't finished, there's no selection on the table side. + + propMatched := checkIndexMatchProp(idxScan.IdxCols, idxScan.IdxColLens, idxScan.constColsByCond, colsProp) + if !propMatched { + return nil, false + } + + idxScan.Desc = isDesc + childProfile := copTsk.plan().statsInfo() + newCount := p.Offset + p.Count + stats := deriveLimitStats(childProfile, float64(newCount)) + pushedLimit := PhysicalLimit{ + Count: newCount, + }.Init(p.SCtx(), stats, p.SelectBlockOffset()) + pushedLimit.SetSchema(copTsk.indexPlan.Schema()) + copTsk = attachPlan2Task(pushedLimit, copTsk).(*copTask) + pushedLimit.cost = copTsk.cost() + } else if copTsk.indexPlan == nil { + if tblScan.HandleCols == nil { + return nil, false + } + + if tblScan.HandleCols.IsInt() { + pk := tblScan.HandleCols.GetCol(0) + if len(colsProp.SortItems) != 1 || !colsProp.SortItems[0].Col.Equal(p.SCtx(), pk) { + return nil, false + } + } else { + idxCols, idxColLens := expression.IndexInfo2PrefixCols(tblScan.Columns, tblScan.Schema().Columns, tables.FindPrimaryIndex(tblScan.Table)) + matched := checkIndexMatchProp(idxCols, idxColLens, nil, colsProp) + if !matched { + return nil, false + } + } + tblScan.Desc = isDesc + childProfile := copTsk.plan().statsInfo() + newCount := p.Offset + p.Count + stats := deriveLimitStats(childProfile, float64(newCount)) + pushedLimit := PhysicalLimit{ + Count: newCount, + }.Init(p.SCtx(), stats, p.SelectBlockOffset()) + pushedLimit.SetSchema(copTsk.tablePlan.Schema()) + copTsk = attachPlan2Task(pushedLimit, copTsk).(*copTask) + pushedLimit.cost = copTsk.cost() + } else { + return nil, false + } + + rootTask := copTsk.convertToRootTask(p.ctx) + rootTask.addCost(p.GetCost(rootTask.count(), true)) + p.cost = rootTask.cost() + return attachPlan2Task(p, rootTask), true +} + func (p *PhysicalProjection) attach2Task(tasks ...task) task { t := tasks[0].copy() if cop, ok := t.(*copTask); ok { diff --git a/planner/core/testdata/partition_pruner_out.json b/planner/core/testdata/partition_pruner_out.json index 39f8a3c00ef76..32be5a1d6d609 100644 --- a/planner/core/testdata/partition_pruner_out.json +++ b/planner/core/testdata/partition_pruner_out.json @@ -847,9 +847,9 @@ { "SQL": "select * from t7 where a is null or a > 0 order by a;", "Result": [ - "", "1", - "2" + "2", + "" ], "Plan": [ "Sort 3343.33 root test_partition.t7.a", @@ -866,8 +866,8 @@ { "SQL": "select * from t1 order by id,a", "Result": [ - " 10 ", "1 1 1", + "10 10 10", "2 2 2", "3 3 3", "4 4 4", @@ -876,7 +876,7 @@ "7 7 7", "8 8 8", "9 9 9", - "10 10 10" + " 10 " ], "Plan": [ "Sort 10000.00 root test_partition.t1.id, test_partition.t1.a", @@ -1341,8 +1341,8 @@ { "SQL": "select * from t1 where a = 1 or true order by id,a", "Result": [ - " 10 ", "1 1 1", + "10 10 10", "2 2 2", "3 3 3", "4 4 4", @@ -1351,7 +1351,7 @@ "7 7 7", "8 8 8", "9 9 9", - "10 10 10" + " 10 " ], "Plan": [ "Sort 10000.00 root test_partition.t1.id, test_partition.t1.a", @@ -1973,13 +1973,13 @@ "SQL": "select * from t1 where a < 3 or b > 4", "Result": [ "1 1 1", + "10 10 10", "2 2 2", "5 5 5", "6 6 6", "7 7 7", "8 8 8", - "9 9 9", - "10 10 10" + "9 9 9" ], "Plan": [ "TableReader 5548.89 root partition:p0,p1 data:Selection", @@ -2059,11 +2059,11 @@ "SQL": "select * from t1 where (a<=1 and b<=1) or (a >=6 and b>=6)", "Result": [ "1 1 1", + "10 10 10", "6 6 6", "7 7 7", "8 8 8", - "9 9 9", - "10 10 10" + "9 9 9" ], "Plan": [ "TableReader 2092.85 root partition:p0,p1 data:Selection", @@ -2080,6 +2080,7 @@ "SQL": "select * from t1 where a <= 100 and b <= 100", "Result": [ "1 1 1", + "10 10 10", "2 2 2", "3 3 3", "4 4 4", @@ -2087,8 +2088,7 @@ "6 6 6", "7 7 7", "8 8 8", - "9 9 9", - "10 10 10" + "9 9 9" ], "Plan": [ "TableReader 1104.45 root partition:p0,p1 data:Selection", @@ -2126,10 +2126,10 @@ { "SQL": "select * from t1 left join t2 on true where (t1.a <=1 or t1.a <= 3 and (t1.b >=3 and t1.b <= 5)) and (t2.a >= 6 and t2.a <= 8) and t2.b>=7 and t2.id>=7 order by t1.id,t1.a", "Result": [ - "1 1 1 8 8 8", "1 1 1 7 7 7", - "3 3 3 8 8 8", - "3 3 3 7 7 7" + "1 1 1 8 8 8", + "3 3 3 7 7 7", + "3 3 3 8 8 8" ], "Plan": [ "Sort 93855.70 root test_partition.t1.id, test_partition.t1.a", @@ -2326,8 +2326,8 @@ { "SQL": "select * from t1 where a = 3 or true order by id,a", "Result": [ - " 10 ", "1 1 1", + "10 10 10", "2 2 2", "3 3 3", "4 4 4", @@ -2336,7 +2336,7 @@ "7 7 7", "8 8 8", "9 9 9", - "10 10 10" + " 10 " ], "Plan": [ "Sort 10000.00 root test_partition.t1.id, test_partition.t1.a", @@ -2463,6 +2463,7 @@ "SQL": "select * from t1 where (a >= 1 and a <= 6) or (a>=3 and b >=3)", "Result": [ "1 1 1", + "10 10 10", "2 2 2", "3 3 3", "4 4 4", @@ -2470,8 +2471,7 @@ "6 6 6", "7 7 7", "8 8 8", - "9 9 9", - "10 10 10" + "9 9 9" ], "Plan": [ "TableReader 1333.33 root partition:p0,p1 data:Selection", diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 3a838510ecd61..c81bb01c6de80 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -45,7 +45,9 @@ import ( ) // All system variables declared here are ordered by their scopes, which follow the order of scopes below: -// [NONE, SESSION, INSTANCE, GLOBAL, GLOBAL & SESSION] +// +// [NONE, SESSION, INSTANCE, GLOBAL, GLOBAL & SESSION] +// // If you are adding a new system variable, please put it in the corresponding area. var defaultSysVars = []*SysVar{ /* The system variables below have NONE scope */ @@ -802,6 +804,8 @@ var defaultSysVars = []*SysVar{ return err }, }, + {Scope: ScopeGlobal, Name: TiDBAutoBuildStatsConcurrency, Value: strconv.Itoa(DefTiDBAutoBuildStatsConcurrency), Type: TypeInt, MinValue: 1, MaxValue: MaxConfigurableConcurrency}, + {Scope: ScopeGlobal, Name: TiDBSysProcScanConcurrency, Value: strconv.Itoa(DefTiDBSysProcScanConcurrency), Type: TypeInt, MinValue: 1, MaxValue: MaxConfigurableConcurrency}, /* The system variables below have GLOBAL and SESSION scope */ {Scope: ScopeGlobal | ScopeSession, Name: SQLSelectLimit, Value: "18446744073709551615", Type: TypeUnsigned, MinValue: 0, MaxValue: math.MaxUint64, SetSession: func(s *SessionVars, val string) error { diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 29e6ec89a350e..f1c5f88fe4e95 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -731,6 +731,10 @@ const ( // TiDBMaxAutoAnalyzeTime is the max time that auto analyze can run. If auto analyze runs longer than the value, it // will be killed. 0 indicates that there is no time limit. TiDBMaxAutoAnalyzeTime = "tidb_max_auto_analyze_time" + // TiDBAutoBuildStatsConcurrency is used to set the build concurrency of auto-analyze. + TiDBAutoBuildStatsConcurrency = "tidb_auto_build_stats_concurrency" + // TiDBSysProcScanConcurrency is used to set the scan concurrency of for backend system processes, like auto-analyze. + TiDBSysProcScanConcurrency = "tidb_sysproc_scan_concurrency" ) // TiDB intentional limits @@ -930,6 +934,8 @@ const ( DefTiDBEnablePrepPlanCache = true DefTiDBPrepPlanCacheSize = 100 DefTiDBPrepPlanCacheMemoryGuardRatio = 0.1 + DefTiDBAutoBuildStatsConcurrency = 1 + DefTiDBSysProcScanConcurrency = 1 ) // Process global variables. diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index 09b48bfbe2679..ece9d4cb524aa 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -87,8 +87,27 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interfa } ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTs) bo := backoff.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars) - ranges := NewKeyRanges(req.KeyRanges) - tasks, err := buildCopTasks(bo, c.store.GetRegionCache(), ranges, req, eventCb) + var ( + tasks []*copTask + err error + ) + if len(req.KeyRanges) > 0 { + ranges := NewKeyRanges(req.KeyRanges) + tasks, err = buildCopTasks(bo, c.store.GetRegionCache(), ranges, req, eventCb) + } else { + // Here we build the task by partition, not directly by region. + // This is because it's possible that TiDB merge multiple small partition into one region which break some assumption. + // Keep it split by partition would be more safe. + for _, kvRanges := range req.KeyRangesWithPartition { + ranges := NewKeyRanges(kvRanges) + var tasksInPartition []*copTask + tasksInPartition, err = buildCopTasks(bo, c.store.GetRegionCache(), ranges, req, eventCb) + if err != nil { + break + } + tasks = append(tasks, tasksInPartition...) + } + } if err != nil { return copErrorResponse{err} } diff --git a/testkit/result.go b/testkit/result.go index 30fa7a53d2a2f..9b7e0ef027a95 100644 --- a/testkit/result.go +++ b/testkit/result.go @@ -49,6 +49,21 @@ func (res *Result) Check(expected [][]interface{}) { res.require.Equal(needBuff.String(), resBuff.String(), res.comment) } +// AddComment adds the extra comment for the Result's output. +func (res *Result) AddComment(c string) { + res.comment += "\n" + c +} + +// CheckWithFunc asserts the result match the expected results in the way `f` specifies. +func (res *Result) CheckWithFunc(expected [][]interface{}, f func([]string, []interface{}) bool) { + res.require.Equal(len(res.rows), len(expected), res.comment+"\nResult length mismatch") + + for i, resRow := range res.rows { + expectedRow := expected[i] + res.require.Truef(f(resRow, expectedRow), res.comment+"\nCheck with function failed\nactual: %s\nexpected: %s", resRow, expectedRow) + } +} + // Rows is similar to RowsWithSep, use white space as separator string. func Rows(args ...string) [][]interface{} { return RowsWithSep(" ", args...)