diff --git a/cmd/explaintest/r/explain_easy.result b/cmd/explaintest/r/explain_easy.result index fb09439cdfaa2..7ee913b2283bc 100644 --- a/cmd/explaintest/r/explain_easy.result +++ b/cmd/explaintest/r/explain_easy.result @@ -111,15 +111,14 @@ MemTableScan_4 10000.00 root explain select c2 = (select c2 from t2 where t1.c1 = t2.c1 order by c1 limit 1) from t1; id count task operator info Projection_12 10000.00 root eq(test.t1.c2, test.t2.c2) -└─Apply_14 10000.00 root left outer join, inner:Limit_21 +└─Apply_14 10000.00 root left outer join, inner:Projection_44 ├─TableReader_16 10000.00 root data:TableScan_15 │ └─TableScan_15 10000.00 cop table:t1, range:[-inf,+inf], keep order:false, stats:pseudo - └─Limit_21 1.00 root offset:0, count:1 - └─Projection_44 1.00 root test.t2.c1, test.t2.c2 - └─IndexLookUp_43 1.00 root - ├─Limit_42 1.00 cop offset:0, count:1 - │ └─IndexScan_40 1.25 cop table:t2, index:c1, range: decided by [eq(test.t1.c1, test.t2.c1)], keep order:true, stats:pseudo - └─TableScan_41 1.00 cop table:t2, keep order:false, stats:pseudo + └─Projection_44 1.00 root test.t2.c1, test.t2.c2 + └─IndexLookUp_43 1.00 root limit embedded(offset:0, count:1) + ├─Limit_42 1.00 cop offset:0, count:1 + │ └─IndexScan_40 1.25 cop table:t2, index:c1, range: decided by [eq(test.t1.c1, test.t2.c1)], keep order:true, stats:pseudo + └─TableScan_41 1.00 cop table:t2, keep order:false, stats:pseudo explain select * from t1 order by c1 desc limit 1; id count task operator info Limit_10 1.00 root offset:0, count:1 diff --git a/cmd/explaintest/r/explain_easy_stats.result b/cmd/explaintest/r/explain_easy_stats.result index 5b90c4757e316..35afad18ad705 100644 --- a/cmd/explaintest/r/explain_easy_stats.result +++ b/cmd/explaintest/r/explain_easy_stats.result @@ -99,15 +99,14 @@ MemTableScan_4 10000.00 root explain select c2 = (select c2 from t2 where t1.c1 = t2.c1 order by c1 limit 1) from t1; id count task operator info Projection_12 1999.00 root eq(test.t1.c2, test.t2.c2) -└─Apply_14 1999.00 root left outer join, inner:Limit_21 +└─Apply_14 1999.00 root left outer join, inner:Projection_44 ├─TableReader_16 1999.00 root data:TableScan_15 │ └─TableScan_15 1999.00 cop table:t1, range:[-inf,+inf], keep order:false - └─Limit_21 1.00 root offset:0, count:1 - └─Projection_44 1.00 root test.t2.c1, test.t2.c2 - └─IndexLookUp_43 1.00 root - ├─Limit_42 1.00 cop offset:0, count:1 - │ └─IndexScan_40 1.25 cop table:t2, index:c1, range: decided by [eq(test.t1.c1, test.t2.c1)], keep order:true - └─TableScan_41 1.00 cop table:t2, keep order:false, stats:pseudo + └─Projection_44 1.00 root test.t2.c1, test.t2.c2 + └─IndexLookUp_43 1.00 root limit embedded(offset:0, count:1) + ├─Limit_42 1.00 cop offset:0, count:1 + │ └─IndexScan_40 1.25 cop table:t2, index:c1, range: decided by [eq(test.t1.c1, test.t2.c1)], keep order:true + └─TableScan_41 1.00 cop table:t2, keep order:false, stats:pseudo explain select * from t1 order by c1 desc limit 1; id count task operator info Limit_10 1.00 root offset:0, count:1 @@ -160,18 +159,16 @@ id count task operator info TableDual_5 0.00 root rows:0 explain select * from index_prune WHERE a = 1010010404050976781 AND b = 26467085526790 LIMIT 1, 1; id count task operator info -Limit_9 1.00 root offset:1, count:1 -└─IndexLookUp_15 1.00 root - ├─Limit_14 1.00 cop offset:0, count:2 - │ └─IndexScan_12 1.00 cop table:index_prune, index:a, b, range:[1010010404050976781 26467085526790,1010010404050976781 26467085526790], keep order:false - └─TableScan_13 1.00 cop table:index_prune, keep order:false, stats:pseudo +IndexLookUp_15 1.00 root limit embedded(offset:1, count:1) +├─Limit_14 1.00 cop offset:0, count:2 +│ └─IndexScan_12 1.00 cop table:index_prune, index:a, b, range:[1010010404050976781 26467085526790,1010010404050976781 26467085526790], keep order:false +└─TableScan_13 1.00 cop table:index_prune, keep order:false, stats:pseudo explain select * from index_prune WHERE a = 1010010404050976781 AND b = 26467085526790 LIMIT 1, 0; id count task operator info -Limit_9 0.00 root offset:1, count:0 -└─IndexLookUp_15 0.00 root - ├─Limit_14 0.00 cop offset:0, count:1 - │ └─IndexScan_12 1.00 cop table:index_prune, index:a, b, range:[1010010404050976781 26467085526790,1010010404050976781 26467085526790], keep order:false - └─TableScan_13 0.00 cop table:index_prune, keep order:false, stats:pseudo +IndexLookUp_15 0.00 root limit embedded(offset:1, count:0) +├─Limit_14 0.00 cop offset:0, count:1 +│ └─IndexScan_12 1.00 cop table:index_prune, index:a, b, range:[1010010404050976781 26467085526790,1010010404050976781 26467085526790], keep order:false +└─TableScan_13 0.00 cop table:index_prune, keep order:false, stats:pseudo explain select * from index_prune WHERE a = 1010010404050976781 AND b = 26467085526790 LIMIT 0, 1; id count task operator info Point_Get_1 1.00 root table:index_prune, index:a b diff --git a/executor/builder.go b/executor/builder.go index 902b1d42d7e28..74918bc64dd9d 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1910,6 +1910,7 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn colLens: is.IdxColLens, idxPlans: v.IndexPlans, tblPlans: v.TablePlans, + PushedLimit: v.PushedLimit, } if isPartition, physicalTableID := ts.IsPartition(); isPartition { e.physicalTableID = physicalTableID diff --git a/executor/distsql.go b/executor/distsql.go index 49d1078f99510..2b725a3a10065 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -366,6 +366,8 @@ type IndexLookUpExecutor struct { corColInAccess bool idxCols []*expression.Column colLens []int + // PushedLimit is used to skip the preceding and tailing handles when Limit is sunk into IndexLookUpReader. + PushedLimit *plannercore.PushedDownLimit } type checkIndexValue struct { @@ -468,6 +470,7 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k checkIndexValue: e.checkIndexValue, maxBatchSize: e.ctx.GetSessionVars().IndexLookupSize, maxChunkSize: e.maxChunkSize, + PushedLimit: e.PushedLimit, } if worker.batchSize > worker.maxBatchSize { worker.batchSize = worker.maxBatchSize @@ -623,6 +626,8 @@ type indexWorker struct { // checkIndexValue is used to check the consistency of the index data. *checkIndexValue + // PushedLimit is used to skip the preceding and tailing handles when Limit is sunk into IndexLookUpReader. + PushedLimit *plannercore.PushedDownLimit } // fetchHandles fetches a batch of handles from index data and builds the index lookup tasks. @@ -652,8 +657,9 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes } else { chk = chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, w.idxLookup.maxChunkSize) } + var count uint64 for { - handles, retChunk, err := w.extractTaskHandles(ctx, chk, result) + handles, retChunk, scannedKeys, err := w.extractTaskHandles(ctx, chk, result, count) if err != nil { doneCh := make(chan error, 1) doneCh <- errors.Trace(err) @@ -662,6 +668,7 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes } return err } + count += scannedKeys if len(handles) == 0 { return nil } @@ -677,20 +684,43 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes } } -func (w *indexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, idxResult distsql.SelectResult) ( - handles []int64, retChk *chunk.Chunk, err error) { +func (w *indexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, idxResult distsql.SelectResult, count uint64) ( + handles []int64, retChk *chunk.Chunk, scannedKeys uint64, err error) { handleOffset := chk.NumCols() - 1 handles = make([]int64, 0, w.batchSize) + // PushedLimit would always be nil for CheckIndex or CheckTable, we add this check just for insurance. + checkLimit := (w.PushedLimit != nil) && (w.checkIndexValue == nil) for len(handles) < w.batchSize { - chk.SetRequiredRows(w.batchSize-len(handles), w.maxChunkSize) + requiredRows := w.batchSize - len(handles) + if checkLimit { + if w.PushedLimit.Offset+w.PushedLimit.Count <= scannedKeys+count { + return handles, nil, scannedKeys, nil + } + leftCnt := w.PushedLimit.Offset + w.PushedLimit.Count - scannedKeys - count + if uint64(requiredRows) > leftCnt { + requiredRows = int(leftCnt) + } + } + chk.SetRequiredRows(requiredRows, w.maxChunkSize) err = errors.Trace(idxResult.Next(ctx, chk)) if err != nil { - return handles, nil, err + return handles, nil, scannedKeys, err } if chk.NumRows() == 0 { - return handles, retChk, nil + return handles, retChk, scannedKeys, nil } for i := 0; i < chk.NumRows(); i++ { + scannedKeys++ + if checkLimit { + if (count + scannedKeys) <= w.PushedLimit.Offset { + // Skip the preceding Offset handles. + continue + } + if (count + scannedKeys) > (w.PushedLimit.Offset + w.PushedLimit.Count) { + // Skip the handles after Offset+Count. + return handles, nil, scannedKeys, nil + } + } h := chk.GetRow(i).GetInt64(handleOffset) handles = append(handles, h) } @@ -705,7 +735,7 @@ func (w *indexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, if w.batchSize > w.maxBatchSize { w.batchSize = w.maxBatchSize } - return handles, retChk, nil + return handles, retChk, scannedKeys, nil } func (w *indexWorker) buildTableTask(handles []int64, retChk *chunk.Chunk) *lookupTableTask { diff --git a/executor/distsql_test.go b/executor/distsql_test.go index 804782c591e0d..4863fcfc15457 100644 --- a/executor/distsql_test.go +++ b/executor/distsql_test.go @@ -219,3 +219,18 @@ func (s *testSuite) TestIssue10178(c *C) { tk.MustQuery("select * from t where a > 9223372036854775807").Check(testkit.Rows("18446744073709551615")) tk.MustQuery("select * from t where a < 9223372036854775808").Check(testkit.Rows("9223372036854775807")) } + +func (s *testSuite) TestPushLimitDownIndexLookUpReader(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists tbl") + tk.MustExec("create table tbl(a int, b int, c int, key idx_b_c(b,c))") + tk.MustExec("insert into tbl values(1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5)") + tk.MustQuery("select * from tbl use index(idx_b_c) where b > 1 limit 2,1").Check(testkit.Rows("4 4 4")) + tk.MustQuery("select * from tbl use index(idx_b_c) where b > 4 limit 2,1").Check(testkit.Rows()) + tk.MustQuery("select * from tbl use index(idx_b_c) where b > 3 limit 2,1").Check(testkit.Rows()) + tk.MustQuery("select * from tbl use index(idx_b_c) where b > 2 limit 2,1").Check(testkit.Rows("5 5 5")) + tk.MustQuery("select * from tbl use index(idx_b_c) where b > 1 limit 1").Check(testkit.Rows("2 2 2")) + tk.MustQuery("select * from tbl use index(idx_b_c) where b > 1 order by b desc limit 2,1").Check(testkit.Rows("3 3 3")) + tk.MustQuery("select * from tbl use index(idx_b_c) where b > 1 and c > 1 limit 2,1").Check(testkit.Rows("4 4 4")) +} diff --git a/planner/core/explain.go b/planner/core/explain.go index bbd6cb9c625b9..931e6501f385a 100644 --- a/planner/core/explain.go +++ b/planner/core/explain.go @@ -128,6 +128,9 @@ func (p *PhysicalIndexReader) ExplainInfo() string { // ExplainInfo implements PhysicalPlan interface. func (p *PhysicalIndexLookUpReader) ExplainInfo() string { // The children can be inferred by the relation symbol. + if p.PushedLimit != nil { + return fmt.Sprintf("limit embedded(offset:%v, count:%v)", p.PushedLimit.Offset, p.PushedLimit.Count) + } return "" } diff --git a/planner/core/physical_plan_test.go b/planner/core/physical_plan_test.go index cc32107bbcbdf..7474afdf3a6ab 100644 --- a/planner/core/physical_plan_test.go +++ b/planner/core/physical_plan_test.go @@ -98,7 +98,7 @@ func (s *testPlanSuite) TestDAGPlanBuilderSimpleCase(c *C) { // Test TopN to Limit in double read. { sql: "select * from t where t.c = 1 and t.e = 1 order by t.d limit 1", - best: "IndexLookUp(Index(t.c_d_e)[[1,1]]->Sel([eq(test.t.e, 1)])->Limit, Table(t))->Limit", + best: "IndexLookUp(Index(t.c_d_e)[[1,1]]->Sel([eq(test.t.e, 1)])->Limit, Table(t))", }, // Test TopN to Limit in index single read. { @@ -143,7 +143,7 @@ func (s *testPlanSuite) TestDAGPlanBuilderSimpleCase(c *C) { // Test Limit push down in double single read. { sql: "select c, b from t where c = 1 limit 1", - best: "IndexLookUp(Index(t.c_d_e)[[1,1]]->Limit, Table(t))->Limit->Projection", + best: "IndexLookUp(Index(t.c_d_e)[[1,1]]->Limit, Table(t))->Projection", }, // Test Selection + Limit push down in double single read. { @@ -174,7 +174,7 @@ func (s *testPlanSuite) TestDAGPlanBuilderSimpleCase(c *C) { // Test PK in index double read. { sql: "select * from t where t.c = 1 and t.a > 1 order by t.d limit 1", - best: "IndexLookUp(Index(t.c_d_e)[[1,1]]->Sel([gt(test.t.a, 1)])->Limit, Table(t))->Limit", + best: "IndexLookUp(Index(t.c_d_e)[[1,1]]->Sel([gt(test.t.a, 1)])->Limit, Table(t))", }, // Test index filter condition push down. { @@ -542,7 +542,7 @@ func (s *testPlanSuite) TestDAGPlanTopN(c *C) { }, { sql: "select * from t where c = 1 order by c limit 1", - best: "IndexLookUp(Index(t.c_d_e)[[1,1]]->Limit, Table(t))->Limit", + best: "IndexLookUp(Index(t.c_d_e)[[1,1]]->Limit, Table(t))", }, { sql: "select * from t order by a limit 1", diff --git a/planner/core/physical_plans.go b/planner/core/physical_plans.go index a345719aac6a9..555cd80fa056d 100644 --- a/planner/core/physical_plans.go +++ b/planner/core/physical_plans.go @@ -70,6 +70,12 @@ type PhysicalIndexReader struct { OutputColumns []*expression.Column } +// PushedDownLimit is the limit operator pushed down into PhysicalIndexLookUpReader. +type PushedDownLimit struct { + Offset uint64 + Count uint64 +} + // PhysicalIndexLookUpReader is the index look up reader in tidb. It's used in case of double reading. type PhysicalIndexLookUpReader struct { physicalSchemaProducer @@ -80,6 +86,9 @@ type PhysicalIndexLookUpReader struct { TablePlans []PhysicalPlan indexPlan PhysicalPlan tablePlan PhysicalPlan + + // PushedLimit is used to avoid unnecessary table scan tasks of IndexLookUpReader. + PushedLimit *PushedDownLimit } // PhysicalIndexScan represents an index scan plan. diff --git a/planner/core/task.go b/planner/core/task.go index 1bbbff3cfd145..3b9e4e87ae06a 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -265,6 +265,7 @@ func (t *rootTask) plan() PhysicalPlan { func (p *PhysicalLimit) attach2Task(tasks ...task) task { t := tasks[0].copy() + sunk := false if cop, ok := t.(*copTask); ok { // If the table/index scans data by order and applies a double read, the limit cannot be pushed to the table side. if !cop.keepOrder || !cop.indexPlanFinished || cop.indexPlan == nil { @@ -273,9 +274,42 @@ func (p *PhysicalLimit) attach2Task(tasks ...task) task { cop = attachPlan2Task(pushedDownLimit, cop).(*copTask) } t = finishCopTask(p.ctx, cop) + sunk = p.sinkIntoIndexLookUp(t) } - t = attachPlan2Task(p, t) - return t + if sunk { + return t + } + return attachPlan2Task(p, t) +} + +func (p *PhysicalLimit) sinkIntoIndexLookUp(t task) bool { + root := t.(*rootTask) + reader, isDoubleRead := root.p.(*PhysicalIndexLookUpReader) + proj, isProj := root.p.(*PhysicalProjection) + if !isDoubleRead && !isProj { + return false + } + if isProj { + reader, isDoubleRead = proj.Children()[0].(*PhysicalIndexLookUpReader) + if !isDoubleRead { + return false + } + } + // We can sink Limit into IndexLookUpReader only if tablePlan contains no Selection. + ts, isTableScan := reader.tablePlan.(*PhysicalTableScan) + if !isTableScan { + return false + } + reader.PushedLimit = &PushedDownLimit{ + Offset: p.Offset, + Count: p.Count, + } + ts.stats = p.stats + reader.stats = p.stats + if isProj { + proj.stats = p.stats + } + return true } func (p *PhysicalSort) getCost(count float64) float64 {