diff --git a/DEPS.bzl b/DEPS.bzl index c04883609c952..0f687ecb48f5c 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -3450,8 +3450,8 @@ def go_deps(): name = "com_github_pingcap_tipb", build_file_proto_mode = "disable_global", importpath = "github.com/pingcap/tipb", - sum = "h1:ltplM2dLXcIAwlleA5v4gke6m6ZeHpvUA3qYX9dCC18=", - version = "v0.0.0-20230427024529-aed92caf20b9", + sum = "h1:ZVehx2Mand1frpxzJud7FUOonbLZeXXQpEsNdjnEAJA=", + version = "v0.0.0-20230516140330-b3e432c40cb3", ) go_repository( name = "com_github_pkg_browser", diff --git a/cmd/explaintest/r/explain_cte.result b/cmd/explaintest/r/explain_cte.result index 556752d0bf4e6..8f0682a178560 100644 --- a/cmd/explaintest/r/explain_cte.result +++ b/cmd/explaintest/r/explain_cte.result @@ -191,7 +191,7 @@ CTE_0 100.00 root Non-Recursive CTE └─IndexFullScan_31 10000.00 cop[tikv] table:t2, index:c1(c1) keep order:false, stats:pseudo explain with recursive cte1(c1) as (select c1 from t1 union select c1 from t2 limit 0 offset 0) select * from cte1; id estRows task access object operator info -CTEFullScan_18 0.00 root CTE:cte1 data:CTE_0 +CTEFullScan_19 0.00 root CTE:cte1 data:CTE_0 CTE_0 0.00 root Non-Recursive CTE └─TableDual_16(Seed Part) 0.00 root rows:0 CREATE TABLE `customer` ( diff --git a/distsql/select_result.go b/distsql/select_result.go index b83d63b22aaec..acf47f722fee9 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -558,12 +558,11 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *copr } if hasExecutor { var recorededPlanIDs = make(map[int]int) - for i, detail := range r.selectResp.GetExecutionSummaries() { + for _, detail := range r.selectResp.GetExecutionSummaries() { if detail != nil && detail.TimeProcessedNs != nil && detail.NumProducedRows != nil && detail.NumIterations != nil { - planID := r.copPlanIDs[i] recorededPlanIDs[r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl. - RecordOneCopTask(planID, r.storeType.Name(), callee, detail)] = 0 + RecordOneCopTask(-1, r.storeType.Name(), callee, detail)] = 0 } } num := uint64(0) diff --git a/executor/BUILD.bazel b/executor/BUILD.bazel index d1a36a0b937be..42cc724b95aa1 100644 --- a/executor/BUILD.bazel +++ b/executor/BUILD.bazel @@ -229,6 +229,7 @@ go_library( "@com_github_pingcap_kvproto//pkg/encryptionpb", "@com_github_pingcap_kvproto//pkg/kvrpcpb", "@com_github_pingcap_kvproto//pkg/metapb", + "@com_github_pingcap_kvproto//pkg/mpp", "@com_github_pingcap_kvproto//pkg/resource_manager", "@com_github_pingcap_kvproto//pkg/tikvpb", "@com_github_pingcap_log//:log", diff --git a/executor/mpp_gather.go b/executor/mpp_gather.go index b2ced590dc338..0d4525872e699 100644 --- a/executor/mpp_gather.go +++ b/executor/mpp_gather.go @@ -16,10 +16,12 @@ package executor import ( "context" + "fmt" "time" "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/mpp" "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" @@ -99,6 +101,10 @@ func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment) error { if err != nil { return errors.Trace(err) } + err = e.fixTaskForCTEStorageAndReader(dagReq.RootExecutor, mppTask.Meta) + if err != nil { + return err + } pbData, err := dagReq.Marshal() if err != nil { return errors.Trace(err) @@ -127,6 +133,88 @@ func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment) error { return nil } +// fixTaskForCTEStorageAndReader fixes the upstream/downstream tasks for the producers and consumers. +// After we split the fragments. A CTE producer in the fragment will holds all the task address of the consumers. +// For example, the producer has two task on node_1 and node_2. As we know that each consumer also has two task on the same nodes(node_1 and node_2) +// We need to prune address of node_2 for producer's task on node_1 since we just want the producer task on the node_1 only send to the consumer tasks on the node_1. +// And the same for the task on the node_2. +// And the same for the consumer task. We need to prune the unnecessary task address of its producer tasks(i.e. the downstream tasks). +func (e *MPPGather) fixTaskForCTEStorageAndReader(exec *tipb.Executor, meta kv.MPPTaskMeta) error { + children := make([]*tipb.Executor, 0, 2) + switch exec.Tp { + case tipb.ExecType_TypeTableScan, tipb.ExecType_TypePartitionTableScan, tipb.ExecType_TypeIndexScan: + case tipb.ExecType_TypeSelection: + children = append(children, exec.Selection.Child) + case tipb.ExecType_TypeAggregation, tipb.ExecType_TypeStreamAgg: + children = append(children, exec.Aggregation.Child) + case tipb.ExecType_TypeTopN: + children = append(children, exec.TopN.Child) + case tipb.ExecType_TypeLimit: + children = append(children, exec.Limit.Child) + case tipb.ExecType_TypeExchangeSender: + children = append(children, exec.ExchangeSender.Child) + if len(exec.ExchangeSender.UpstreamCteTaskMeta) == 0 { + break + } + actualUpStreamTasks := make([][]byte, 0, len(exec.ExchangeSender.UpstreamCteTaskMeta)) + actualTIDs := make([]int64, 0, len(exec.ExchangeSender.UpstreamCteTaskMeta)) + for _, tasksFromOneConsumer := range exec.ExchangeSender.UpstreamCteTaskMeta { + for _, taskBytes := range tasksFromOneConsumer.EncodedTasks { + taskMeta := &mpp.TaskMeta{} + err := taskMeta.Unmarshal(taskBytes) + if err != nil { + return err + } + if taskMeta.Address != meta.GetAddress() { + continue + } + actualUpStreamTasks = append(actualUpStreamTasks, taskBytes) + actualTIDs = append(actualTIDs, taskMeta.TaskId) + } + } + logutil.BgLogger().Warn("refine tunnel for cte producer task", zap.String("the final tunnel", fmt.Sprintf("up stream consumer tasks: %v", actualTIDs))) + exec.ExchangeSender.EncodedTaskMeta = actualUpStreamTasks + case tipb.ExecType_TypeExchangeReceiver: + if len(exec.ExchangeReceiver.OriginalCtePrdocuerTaskMeta) == 0 { + break + } + exec.ExchangeReceiver.EncodedTaskMeta = [][]byte{} + actualTIDs := make([]int64, 0, 4) + for _, taskBytes := range exec.ExchangeReceiver.OriginalCtePrdocuerTaskMeta { + taskMeta := &mpp.TaskMeta{} + err := taskMeta.Unmarshal(taskBytes) + if err != nil { + return err + } + if taskMeta.Address != meta.GetAddress() { + continue + } + exec.ExchangeReceiver.EncodedTaskMeta = append(exec.ExchangeReceiver.EncodedTaskMeta, taskBytes) + actualTIDs = append(actualTIDs, taskMeta.TaskId) + } + logutil.BgLogger().Warn("refine tunnel for cte consumer task", zap.String("the final tunnel", fmt.Sprintf("down stream producer task: %v", actualTIDs))) + case tipb.ExecType_TypeJoin: + children = append(children, exec.Join.Children...) + case tipb.ExecType_TypeProjection: + children = append(children, exec.Projection.Child) + case tipb.ExecType_TypeWindow: + children = append(children, exec.Window.Child) + case tipb.ExecType_TypeSort: + children = append(children, exec.Sort.Child) + case tipb.ExecType_TypeExpand: + children = append(children, exec.Expand.Child) + default: + return errors.Errorf("unknown new tipb protocol %d", exec.Tp) + } + for _, child := range children { + err := e.fixTaskForCTEStorageAndReader(child, meta) + if err != nil { + return err + } + } + return nil +} + func collectPlanIDS(plan plannercore.PhysicalPlan, ids []int) []int { ids = append(ids, plan.ID()) for _, child := range plan.Children() { diff --git a/go.mod b/go.mod index 8004206c5a81a..f0bd12b6b592c 100644 --- a/go.mod +++ b/go.mod @@ -78,7 +78,7 @@ require ( github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22 github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e - github.com/pingcap/tipb v0.0.0-20230427024529-aed92caf20b9 + github.com/pingcap/tipb v0.0.0-20230516140330-b3e432c40cb3 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.15.1 github.com/prometheus/client_model v0.4.0 diff --git a/go.sum b/go.sum index 9985b656008a1..5bcc81e0fd1e6 100644 --- a/go.sum +++ b/go.sum @@ -798,8 +798,8 @@ github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22 h1:2SOzvGvE8beiC1Y4g github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM= -github.com/pingcap/tipb v0.0.0-20230427024529-aed92caf20b9 h1:ltplM2dLXcIAwlleA5v4gke6m6ZeHpvUA3qYX9dCC18= -github.com/pingcap/tipb v0.0.0-20230427024529-aed92caf20b9/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= +github.com/pingcap/tipb v0.0.0-20230516140330-b3e432c40cb3 h1:ZVehx2Mand1frpxzJud7FUOonbLZeXXQpEsNdjnEAJA= +github.com/pingcap/tipb v0.0.0-20230516140330-b3e432c40cb3/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4 h1:49lOXmGaUpV9Fz3gd7TFZY106KVlPVa5jcYD1gaQf98= github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4/go.mod h1:4OwLy04Bl9Ef3GJJCoec+30X3LQs/0/m4HFRt/2LUSA= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= diff --git a/planner/core/BUILD.bazel b/planner/core/BUILD.bazel index ccae974c10f8d..39cbb64814a1b 100644 --- a/planner/core/BUILD.bazel +++ b/planner/core/BUILD.bazel @@ -63,6 +63,7 @@ go_library( "rule_partition_processor.go", "rule_predicate_push_down.go", "rule_predicate_simplification.go", + "rule_push_down_sequence.go", "rule_result_reorder.go", "rule_semi_join_rewrite.go", "rule_topn_push_down.go", diff --git a/planner/core/access_object.go b/planner/core/access_object.go index c9994efe3de86..5c36226d7bcb2 100644 --- a/planner/core/access_object.go +++ b/planner/core/access_object.go @@ -230,6 +230,9 @@ func (o OtherAccessObject) SetIntoPB(pb *tipb.ExplainOperator) { if pb == nil { return } + if o == "" { + return + } pb.AccessObjects = []*tipb.AccessObject{ { AccessObject: &tipb.AccessObject_OtherObject{OtherObject: string(o)}, @@ -396,7 +399,10 @@ func (p *PhysicalTableReader) accessObject(sctx sessionctx.Context) AccessObject return DynamicPartitionAccessObjects(nil) } if len(p.PartitionInfos) == 0 { - ts := p.TablePlans[0].(*PhysicalTableScan) + ts, ok := p.TablePlans[0].(*PhysicalTableScan) + if !ok { + return OtherAccessObject("") + } asName := "" if ts.TableAsName != nil && len(ts.TableAsName.O) > 0 { asName = ts.TableAsName.O diff --git a/planner/core/casetest/enforce_mpp_test.go b/planner/core/casetest/enforce_mpp_test.go index 7f6fb0b990e2d..09499a74de87c 100644 --- a/planner/core/casetest/enforce_mpp_test.go +++ b/planner/core/casetest/enforce_mpp_test.go @@ -598,3 +598,51 @@ func TestMPPNullAwareSemiJoinPushDown(t *testing.T) { require.Equal(t, output[i].Warn, testdata.ConvertSQLWarnToStrings(tk.Session().GetSessionVars().StmtCtx.GetWarnings())) } } + +func TestMPPSharedCTEScan(t *testing.T) { + store := testkit.CreateMockStore(t, internal.WithMockTiFlash(2)) + tk := testkit.NewTestKit(t, store) + + // test table + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("drop table if exists s") + tk.MustExec("create table t(a int, b int, c int)") + tk.MustExec("create table s(a int, b int, c int)") + tk.MustExec("alter table t set tiflash replica 1") + tk.MustExec("alter table s set tiflash replica 1") + + tb := external.GetTableByName(t, tk, "test", "t") + err := domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true) + require.NoError(t, err) + + tb = external.GetTableByName(t, tk, "test", "s") + err = domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true) + require.NoError(t, err) + + var input []string + var output []struct { + SQL string + Plan []string + Warn []string + } + + tk.MustExec("set @@tidb_enforce_mpp='on'") + tk.MustExec("set @@tidb_opt_enable_mpp_shared_cte_execution='on'") + + enforceMPPSuiteData := GetEnforceMPPSuiteData() + enforceMPPSuiteData.LoadTestCases(t, &input, &output) + for i, tt := range input { + testdata.OnRecord(func() { + output[i].SQL = tt + }) + testdata.OnRecord(func() { + output[i].SQL = tt + output[i].Plan = testdata.ConvertRowsToStrings(tk.MustQuery(tt).Rows()) + output[i].Warn = testdata.ConvertSQLWarnToStrings(tk.Session().GetSessionVars().StmtCtx.GetWarnings()) + }) + res := tk.MustQuery(tt) + res.Check(testkit.Rows(output[i].Plan...)) + require.Equal(t, output[i].Warn, testdata.ConvertSQLWarnToStrings(tk.Session().GetSessionVars().StmtCtx.GetWarnings())) + } +} diff --git a/planner/core/casetest/testdata/binary_plan_suite_out.json b/planner/core/casetest/testdata/binary_plan_suite_out.json index a361c5ca0b23f..0ce7f545d22cf 100644 --- a/planner/core/casetest/testdata/binary_plan_suite_out.json +++ b/planner/core/casetest/testdata/binary_plan_suite_out.json @@ -297,6 +297,7 @@ "BinaryPlan": { "main": { "name": "CTEFullScan_17", + "cost": 0.8982000000000001, "est_rows": 1.8, "act_rows": 5, "task_type": 1, @@ -363,6 +364,7 @@ "operator_info": "cast(plus(Column#3, 1), bigint(1) BINARY)->Column#5" } ], + "cost": 0.8982000000000001, "est_rows": 1.8, "act_rows": 5, "task_type": 1, diff --git a/planner/core/casetest/testdata/enforce_mpp_suite_in.json b/planner/core/casetest/testdata/enforce_mpp_suite_in.json index 33f4893d96a56..e018632f21c7e 100644 --- a/planner/core/casetest/testdata/enforce_mpp_suite_in.json +++ b/planner/core/casetest/testdata/enforce_mpp_suite_in.json @@ -172,5 +172,31 @@ "EXPLAIN select *, (t.a, t.b) not in (select s.a, s.b from s) from t; -- 7. left anti semi join, two join key", "EXPLAIN select *, (t.a, t.b) not in (select s.a, s.b from s where t.c < s.c) from t; -- 8. left anti semi join, two join key + other condition" ] + }, + { + "name": "TestMPPSharedCTEScan", + "cases": [ + // The most simple case. + "explain format = 'brief' with c1 as (select * from t) select * from c1, c1 c2 where c1.a=c2.b ", + "explain format = 'brief' with c1 as (select /*+ read_from_storage(tikv[t]) */ * from t) select * from c1, c1 c2 where c1.a=c2.b ", + "explain format = 'brief' with c1 as (select * from t) select c1.* from c1, c1 c2 where c1.b=c2.c", + // Can work when there's global limit/topn + "explain format = 'brief' with c1 as (select * from t) select * from c1, c1 c2 where c1.a=c2.b limit 10", + "explain format = 'brief' with c1 as (select * from t) select * from c1, c1 c2 where c1.a=c2.b order by c1.a limit 10", + // The c2 references c1, c1 can mpp, and then c2 can mpp, so the main query can mpp. + "explain format = 'brief' with c1 as (select * from t), c2 as (select c1.* from c1, c1 c2 where c1.b=c2.c) select * from c2 c1, c2 where c1.a=c2.b", + // The same SQL, c1 forces to read tikv. So c2 cannot MPP, then the whole SQL. + "explain format = 'brief' with c1 as (select /*+ read_from_storage(tikv[t]) */ * from t), c2 as (select c1.* from c1, c1 c2 where c1.b=c2.c) select * from c2 c1, c2 where c1.a=c2.b", + // The two WITH satement can all be MPP. + "explain format = 'brief' with c1 as (select * from t), c2 as (select c1.* from c1, c1 c2 where c1.b=c2.c) select * from c2 c1, c2, (with c3 as (select * from t) select c3.* from c3, c3 c4 where c3.c=c4.b) c3 where c1.a=c2.b and c1.a=c3.a", + // The outer one will fail to use MPP. But the inner WITH statement can. But we haven't implemented the least common ancestor to detect the best position of the Sequence. So the whole SQL cannot MPP. + "explain format = 'brief' with c1 as (select /*+ read_from_storage(tikv[t]) */ * from t), c2 as (select c1.* from c1, c1 c2 where c1.b=c2.c) select * from c2 c1, c2, (with c3 as (select * from t) select c3.* from c3, c3 c4 where c3.c=c4.b) c3 where c1.a=c2.b and c1.a=c3.a", + // The inner one will fail. So the whole SQL cannot MPP. + "explain format = 'brief' with c1 as (select * from t), c2 as (select c1.* from c1, c1 c2 where c1.b=c2.c) select * from c2 c1, c2, (with c3 as (select /*+ read_from_storage(tikv[t]) */ * from t) select c3.* from c3, c3 c4 where c3.c=c4.b) c3 where c1.a=c2.b and c1.a=c3.a", + // A little change that the inner WITH statement references the outer's c1. + "explain format = 'brief' with c1 as (select * from t), c2 as (select c1.* from c1, c1 c2 where c1.b=c2.c) select * from c2 c1, c2, (with c3 as (select * from c1) select c3.* from c3, c3 c4 where c3.c=c4.b) c3 where c1.a=c2.b and c1.a=c3.a", + // The outer one will fail to use MPP. Since the inner one is references the outer one, the whole SQL cannot MPP. + "explain format = 'brief' with c1 as (select /*+ read_from_storage(tikv[t]) */ * from t), c2 as (select c1.* from c1, c1 c2 where c1.b=c2.c) select * from c2 c1, c2, (with c3 as (select * from c1) select c3.* from c3, c3 c4 where c3.c=c4.b) c3 where c1.a=c2.b and c1.a=c3.a" + ] } ] diff --git a/planner/core/casetest/testdata/enforce_mpp_suite_out.json b/planner/core/casetest/testdata/enforce_mpp_suite_out.json index 0d2af3674e71b..85fc7ca3983b6 100644 --- a/planner/core/casetest/testdata/enforce_mpp_suite_out.json +++ b/planner/core/casetest/testdata/enforce_mpp_suite_out.json @@ -1591,5 +1591,338 @@ "Warn": null } ] + }, + { + "Name": "TestMPPSharedCTEScan", + "Cases": [ + { + "SQL": "explain format = 'brief' with c1 as (select * from t) select * from c1, c1 c2 where c1.a=c2.b ", + "Plan": [ + "TableReader 9999.99 root MppVersion: 1, data:ExchangeSender", + "└─ExchangeSender 9999.99 mpp[tiflash] ExchangeType: PassThrough", + " └─Sequence 9999.99 mpp[tiflash] Sequence Node", + " ├─CTE_0 9999.99 mpp[tiflash] Non-Recursive CTE Storage", + " │ └─Selection 9999.99 mpp[tiflash] or(not(isnull(test.t.a)), not(isnull(test.t.b)))", + " │ └─TableFullScan 10000.00 mpp[tiflash] table:t pushed down filter:empty, keep order:false, stats:pseudo", + " └─HashJoin 9999.99 mpp[tiflash] inner join, equal:[eq(test.t.a, test.t.b)]", + " ├─ExchangeReceiver(Build) 7999.99 mpp[tiflash] ", + " │ └─ExchangeSender 7999.99 mpp[tiflash] ExchangeType: Broadcast, Compression: FAST", + " │ └─Selection 7999.99 mpp[tiflash] not(isnull(test.t.a))", + " │ └─CTEFullScan 9999.99 mpp[tiflash] CTE:c1 data:CTE_0", + " └─Selection(Probe) 7999.99 mpp[tiflash] not(isnull(test.t.b))", + " └─CTEFullScan 9999.99 mpp[tiflash] CTE:c1 AS c2 data:CTE_0" + ], + "Warn": null + }, + { + "SQL": "explain format = 'brief' with c1 as (select /*+ read_from_storage(tikv[t]) */ * from t) select * from c1, c1 c2 where c1.a=c2.b ", + "Plan": [ + "HashJoin 9999.99 root inner join, equal:[eq(test.t.a, test.t.b)]", + "├─Selection(Build) 7999.99 root not(isnull(test.t.b))", + "│ └─CTEFullScan 9999.99 root CTE:c1 AS c2 data:CTE_0", + "└─Selection(Probe) 7999.99 root not(isnull(test.t.a))", + " └─CTEFullScan 9999.99 root CTE:c1 data:CTE_0", + "CTE_0 9999.99 root Non-Recursive CTE", + "└─TableReader(Seed Part) 9999.99 root data:Selection", + " └─Selection 9999.99 cop[tikv] or(not(isnull(test.t.a)), not(isnull(test.t.b)))", + " └─TableFullScan 10000.00 cop[tikv] table:t keep order:false, stats:pseudo" + ], + "Warn": [ + "MPP mode may be blocked because you have set a hint to read table `t` from TiKV." + ] + }, + { + "SQL": "explain format = 'brief' with c1 as (select * from t) select c1.* from c1, c1 c2 where c1.b=c2.c", + "Plan": [ + "TableReader 9999.99 root MppVersion: 1, data:ExchangeSender", + "└─ExchangeSender 9999.99 mpp[tiflash] ExchangeType: PassThrough", + " └─Projection 9999.99 mpp[tiflash] test.t.a, test.t.b, test.t.c", + " └─Sequence 9999.99 mpp[tiflash] Sequence Node", + " ├─CTE_0 9999.99 mpp[tiflash] Non-Recursive CTE Storage", + " │ └─Selection 9999.99 mpp[tiflash] or(not(isnull(test.t.b)), not(isnull(test.t.c)))", + " │ └─TableFullScan 10000.00 mpp[tiflash] table:t pushed down filter:empty, keep order:false, stats:pseudo", + " └─HashJoin 9999.99 mpp[tiflash] inner join, equal:[eq(test.t.b, test.t.c)]", + " ├─ExchangeReceiver(Build) 7999.99 mpp[tiflash] ", + " │ └─ExchangeSender 7999.99 mpp[tiflash] ExchangeType: Broadcast, Compression: FAST", + " │ └─Selection 7999.99 mpp[tiflash] not(isnull(test.t.b))", + " │ └─CTEFullScan 9999.99 mpp[tiflash] CTE:c1 data:CTE_0", + " └─Selection(Probe) 7999.99 mpp[tiflash] not(isnull(test.t.c))", + " └─CTEFullScan 9999.99 mpp[tiflash] CTE:c1 AS c2 data:CTE_0" + ], + "Warn": null + }, + { + "SQL": "explain format = 'brief' with c1 as (select * from t) select * from c1, c1 c2 where c1.a=c2.b limit 10", + "Plan": [ + "Limit 10.00 root offset:0, count:10", + "└─TableReader 10.00 root MppVersion: 1, data:ExchangeSender", + " └─ExchangeSender 10.00 mpp[tiflash] ExchangeType: PassThrough", + " └─Limit 10.00 mpp[tiflash] offset:0, count:10", + " └─Sequence 9999.99 mpp[tiflash] Sequence Node", + " ├─CTE_0 9999.99 mpp[tiflash] Non-Recursive CTE Storage", + " │ └─Selection 9999.99 mpp[tiflash] or(not(isnull(test.t.a)), not(isnull(test.t.b)))", + " │ └─TableFullScan 10000.00 mpp[tiflash] table:t pushed down filter:empty, keep order:false, stats:pseudo", + " └─HashJoin 9999.99 mpp[tiflash] inner join, equal:[eq(test.t.a, test.t.b)]", + " ├─ExchangeReceiver(Build) 7999.99 mpp[tiflash] ", + " │ └─ExchangeSender 7999.99 mpp[tiflash] ExchangeType: Broadcast, Compression: FAST", + " │ └─Selection 7999.99 mpp[tiflash] not(isnull(test.t.a))", + " │ └─CTEFullScan 9999.99 mpp[tiflash] CTE:c1 data:CTE_0", + " └─Selection(Probe) 8.00 mpp[tiflash] not(isnull(test.t.b))", + " └─CTEFullScan 9999.99 mpp[tiflash] CTE:c1 AS c2 data:CTE_0" + ], + "Warn": null + }, + { + "SQL": "explain format = 'brief' with c1 as (select * from t) select * from c1, c1 c2 where c1.a=c2.b order by c1.a limit 10", + "Plan": [ + "TopN 10.00 root test.t.a, offset:0, count:10", + "└─TableReader 10.00 root MppVersion: 1, data:ExchangeSender", + " └─ExchangeSender 10.00 mpp[tiflash] ExchangeType: PassThrough", + " └─TopN 10.00 mpp[tiflash] test.t.a, offset:0, count:10", + " └─Sequence 9999.99 mpp[tiflash] Sequence Node", + " ├─CTE_0 9999.99 mpp[tiflash] Non-Recursive CTE Storage", + " │ └─Selection 9999.99 mpp[tiflash] or(not(isnull(test.t.a)), not(isnull(test.t.b)))", + " │ └─TableFullScan 10000.00 mpp[tiflash] table:t pushed down filter:empty, keep order:false, stats:pseudo", + " └─HashJoin 9999.99 mpp[tiflash] inner join, equal:[eq(test.t.a, test.t.b)]", + " ├─ExchangeReceiver(Build) 7999.99 mpp[tiflash] ", + " │ └─ExchangeSender 7999.99 mpp[tiflash] ExchangeType: Broadcast, Compression: FAST", + " │ └─Selection 7999.99 mpp[tiflash] not(isnull(test.t.a))", + " │ └─CTEFullScan 9999.99 mpp[tiflash] CTE:c1 data:CTE_0", + " └─Selection(Probe) 7999.99 mpp[tiflash] not(isnull(test.t.b))", + " └─CTEFullScan 9999.99 mpp[tiflash] CTE:c1 AS c2 data:CTE_0" + ], + "Warn": null + }, + { + "SQL": "explain format = 'brief' with c1 as (select * from t), c2 as (select c1.* from c1, c1 c2 where c1.b=c2.c) select * from c2 c1, c2 where c1.a=c2.b", + "Plan": [ + "TableReader 12500.00 root MppVersion: 1, data:ExchangeSender", + "└─ExchangeSender 12500.00 mpp[tiflash] ExchangeType: PassThrough", + " └─Sequence 12500.00 mpp[tiflash] Sequence Node", + " ├─CTE_0 10000.00 mpp[tiflash] Non-Recursive CTE Storage", + " │ └─TableFullScan 10000.00 mpp[tiflash] table:t keep order:false, stats:pseudo", + " ├─CTE_1 10000.00 mpp[tiflash] Non-Recursive CTE Storage", + " │ └─Projection 10000.00 mpp[tiflash] test.t.a, test.t.b, test.t.c", + " │ └─HashJoin 10000.00 mpp[tiflash] inner join, equal:[eq(test.t.b, test.t.c)]", + " │ ├─ExchangeReceiver(Build) 8000.00 mpp[tiflash] ", + " │ │ └─ExchangeSender 8000.00 mpp[tiflash] ExchangeType: Broadcast, Compression: FAST", + " │ │ └─Selection 8000.00 mpp[tiflash] not(isnull(test.t.b)), or(not(isnull(test.t.a)), not(isnull(test.t.b)))", + " │ │ └─CTEFullScan 10000.00 mpp[tiflash] CTE:c1 data:CTE_0", + " │ └─Selection(Probe) 8000.00 mpp[tiflash] not(isnull(test.t.c))", + " │ └─CTEFullScan 10000.00 mpp[tiflash] CTE:c1 AS c2 data:CTE_0", + " └─HashJoin 12500.00 mpp[tiflash] inner join, equal:[eq(test.t.a, test.t.b)]", + " ├─ExchangeReceiver(Build) 8000.00 mpp[tiflash] ", + " │ └─ExchangeSender 8000.00 mpp[tiflash] ExchangeType: Broadcast, Compression: FAST", + " │ └─Selection 8000.00 mpp[tiflash] not(isnull(test.t.a))", + " │ └─CTEFullScan 10000.00 mpp[tiflash] CTE:c2 AS c1 data:CTE_1", + " └─Selection(Probe) 8000.00 mpp[tiflash] not(isnull(test.t.b))", + " └─CTEFullScan 10000.00 mpp[tiflash] CTE:c2 data:CTE_1" + ], + "Warn": null + }, + { + "SQL": "explain format = 'brief' with c1 as (select /*+ read_from_storage(tikv[t]) */ * from t), c2 as (select c1.* from c1, c1 c2 where c1.b=c2.c) select * from c2 c1, c2 where c1.a=c2.b", + "Plan": [ + "HashJoin 12500.00 root inner join, equal:[eq(test.t.a, test.t.b)]", + "├─Selection(Build) 8000.00 root not(isnull(test.t.b))", + "│ └─CTEFullScan 10000.00 root CTE:c2 data:CTE_1", + "└─Selection(Probe) 8000.00 root not(isnull(test.t.a))", + " └─CTEFullScan 10000.00 root CTE:c2 AS c1 data:CTE_1", + "CTE_1 10000.00 root Non-Recursive CTE", + "└─HashJoin(Seed Part) 10000.00 root inner join, equal:[eq(test.t.b, test.t.c)]", + " ├─Selection(Build) 8000.00 root not(isnull(test.t.c))", + " │ └─CTEFullScan 10000.00 root CTE:c1 AS c2 data:CTE_0", + " └─Selection(Probe) 8000.00 root not(isnull(test.t.b)), or(not(isnull(test.t.a)), not(isnull(test.t.b)))", + " └─CTEFullScan 10000.00 root CTE:c1 data:CTE_0", + "CTE_0 10000.00 root Non-Recursive CTE", + "└─TableReader(Seed Part) 10000.00 root data:TableFullScan", + " └─TableFullScan 10000.00 cop[tikv] table:t keep order:false, stats:pseudo" + ], + "Warn": [ + "MPP mode may be blocked because you have set a hint to read table `t` from TiKV." + ] + }, + { + "SQL": "explain format = 'brief' with c1 as (select * from t), c2 as (select c1.* from c1, c1 c2 where c1.b=c2.c) select * from c2 c1, c2, (with c3 as (select * from t) select c3.* from c3, c3 c4 where c3.c=c4.b) c3 where c1.a=c2.b and c1.a=c3.a", + "Plan": [ + "TableReader 19531.25 root MppVersion: 1, data:ExchangeSender", + "└─ExchangeSender 19531.25 mpp[tiflash] ExchangeType: PassThrough", + " └─Projection 19531.25 mpp[tiflash] test.t.a, test.t.b, test.t.c, test.t.a, test.t.b, test.t.c, test.t.a, test.t.b, test.t.c", + " └─Sequence 19531.25 mpp[tiflash] Sequence Node", + " ├─CTE_0 10000.00 mpp[tiflash] Non-Recursive CTE Storage", + " │ └─TableFullScan 10000.00 mpp[tiflash] table:t keep order:false, stats:pseudo", + " ├─CTE_1 10000.00 mpp[tiflash] Non-Recursive CTE Storage", + " │ └─Projection 10000.00 mpp[tiflash] test.t.a, test.t.b, test.t.c", + " │ └─HashJoin 10000.00 mpp[tiflash] inner join, equal:[eq(test.t.b, test.t.c)]", + " │ ├─ExchangeReceiver(Build) 8000.00 mpp[tiflash] ", + " │ │ └─ExchangeSender 8000.00 mpp[tiflash] ExchangeType: Broadcast, Compression: FAST", + " │ │ └─Selection 8000.00 mpp[tiflash] not(isnull(test.t.b)), or(not(isnull(test.t.a)), not(isnull(test.t.b)))", + " │ │ └─CTEFullScan 10000.00 mpp[tiflash] CTE:c1 data:CTE_0", + " │ └─Selection(Probe) 8000.00 mpp[tiflash] not(isnull(test.t.c))", + " │ └─CTEFullScan 10000.00 mpp[tiflash] CTE:c1 AS c2 data:CTE_0", + " └─HashJoin 19531.25 mpp[tiflash] inner join, equal:[eq(test.t.a, test.t.a)]", + " ├─ExchangeReceiver(Build) 9999.98 mpp[tiflash] ", + " │ └─ExchangeSender 9999.98 mpp[tiflash] ExchangeType: Broadcast, Compression: FAST", + " │ └─Sequence 9999.98 mpp[tiflash] Sequence Node", + " │ ├─CTE_2 9999.98 mpp[tiflash] Non-Recursive CTE Storage", + " │ │ └─Selection 9999.98 mpp[tiflash] or(and(not(isnull(test.t.c)), not(isnull(test.t.a))), not(isnull(test.t.b)))", + " │ │ └─TableFullScan 10000.00 mpp[tiflash] table:t pushed down filter:empty, keep order:false, stats:pseudo", + " │ └─HashJoin 9999.98 mpp[tiflash] inner join, equal:[eq(test.t.c, test.t.b)]", + " │ ├─ExchangeReceiver(Build) 7999.98 mpp[tiflash] ", + " │ │ └─ExchangeSender 7999.98 mpp[tiflash] ExchangeType: Broadcast, Compression: FAST", + " │ │ └─Selection 7999.98 mpp[tiflash] not(isnull(test.t.a)), not(isnull(test.t.c))", + " │ │ └─CTEFullScan 9999.98 mpp[tiflash] CTE:c3 data:CTE_2", + " │ └─Selection(Probe) 7999.98 mpp[tiflash] not(isnull(test.t.b))", + " │ └─CTEFullScan 9999.98 mpp[tiflash] CTE:c3 AS c4 data:CTE_2", + " └─HashJoin(Probe) 12500.00 mpp[tiflash] inner join, equal:[eq(test.t.a, test.t.b)]", + " ├─ExchangeReceiver(Build) 8000.00 mpp[tiflash] ", + " │ └─ExchangeSender 8000.00 mpp[tiflash] ExchangeType: Broadcast, Compression: FAST", + " │ └─Selection 8000.00 mpp[tiflash] not(isnull(test.t.a))", + " │ └─CTEFullScan 10000.00 mpp[tiflash] CTE:c2 AS c1 data:CTE_1", + " └─Selection(Probe) 8000.00 mpp[tiflash] not(isnull(test.t.b))", + " └─CTEFullScan 10000.00 mpp[tiflash] CTE:c2 data:CTE_1" + ], + "Warn": null + }, + { + "SQL": "explain format = 'brief' with c1 as (select /*+ read_from_storage(tikv[t]) */ * from t), c2 as (select c1.* from c1, c1 c2 where c1.b=c2.c) select * from c2 c1, c2, (with c3 as (select * from t) select c3.* from c3, c3 c4 where c3.c=c4.b) c3 where c1.a=c2.b and c1.a=c3.a", + "Plan": [ + "HashJoin 19531.25 root inner join, equal:[eq(test.t.a, test.t.a)]", + "├─HashJoin(Build) 9999.98 root inner join, equal:[eq(test.t.c, test.t.b)]", + "│ ├─Selection(Build) 7999.98 root not(isnull(test.t.b))", + "│ │ └─CTEFullScan 9999.98 root CTE:c3 AS c4 data:CTE_2", + "│ └─Selection(Probe) 7999.98 root not(isnull(test.t.a)), not(isnull(test.t.c))", + "│ └─CTEFullScan 9999.98 root CTE:c3 data:CTE_2", + "└─HashJoin(Probe) 12500.00 root inner join, equal:[eq(test.t.a, test.t.b)]", + " ├─Selection(Build) 8000.00 root not(isnull(test.t.b))", + " │ └─CTEFullScan 10000.00 root CTE:c2 data:CTE_1", + " └─Selection(Probe) 8000.00 root not(isnull(test.t.a))", + " └─CTEFullScan 10000.00 root CTE:c2 AS c1 data:CTE_1", + "CTE_2 9999.98 root Non-Recursive CTE", + "└─TableReader(Seed Part) 9999.98 root MppVersion: 1, data:ExchangeSender", + " └─ExchangeSender 9999.98 mpp[tiflash] ExchangeType: PassThrough", + " └─Selection 9999.98 mpp[tiflash] or(and(not(isnull(test.t.c)), not(isnull(test.t.a))), not(isnull(test.t.b)))", + " └─TableFullScan 10000.00 mpp[tiflash] table:t pushed down filter:empty, keep order:false, stats:pseudo", + "CTE_1 10000.00 root Non-Recursive CTE", + "└─HashJoin(Seed Part) 10000.00 root inner join, equal:[eq(test.t.b, test.t.c)]", + " ├─Selection(Build) 8000.00 root not(isnull(test.t.c))", + " │ └─CTEFullScan 10000.00 root CTE:c1 AS c2 data:CTE_0", + " └─Selection(Probe) 8000.00 root not(isnull(test.t.b)), or(not(isnull(test.t.a)), not(isnull(test.t.b)))", + " └─CTEFullScan 10000.00 root CTE:c1 data:CTE_0", + "CTE_0 10000.00 root Non-Recursive CTE", + "└─TableReader(Seed Part) 10000.00 root data:TableFullScan", + " └─TableFullScan 10000.00 cop[tikv] table:t keep order:false, stats:pseudo" + ], + "Warn": [ + "MPP mode may be blocked because you have set a hint to read table `t` from TiKV." + ] + }, + { + "SQL": "explain format = 'brief' with c1 as (select * from t), c2 as (select c1.* from c1, c1 c2 where c1.b=c2.c) select * from c2 c1, c2, (with c3 as (select /*+ read_from_storage(tikv[t]) */ * from t) select c3.* from c3, c3 c4 where c3.c=c4.b) c3 where c1.a=c2.b and c1.a=c3.a", + "Plan": [ + "HashJoin 19531.25 root inner join, equal:[eq(test.t.a, test.t.a)]", + "├─HashJoin(Build) 9999.98 root inner join, equal:[eq(test.t.c, test.t.b)]", + "│ ├─Selection(Build) 7999.98 root not(isnull(test.t.b))", + "│ │ └─CTEFullScan 9999.98 root CTE:c3 AS c4 data:CTE_2", + "│ └─Selection(Probe) 7999.98 root not(isnull(test.t.a)), not(isnull(test.t.c))", + "│ └─CTEFullScan 9999.98 root CTE:c3 data:CTE_2", + "└─HashJoin(Probe) 12500.00 root inner join, equal:[eq(test.t.a, test.t.b)]", + " ├─Selection(Build) 8000.00 root not(isnull(test.t.b))", + " │ └─CTEFullScan 10000.00 root CTE:c2 data:CTE_1", + " └─Selection(Probe) 8000.00 root not(isnull(test.t.a))", + " └─CTEFullScan 10000.00 root CTE:c2 AS c1 data:CTE_1", + "CTE_2 9999.98 root Non-Recursive CTE", + "└─TableReader(Seed Part) 9999.98 root data:Selection", + " └─Selection 9999.98 cop[tikv] or(and(not(isnull(test.t.c)), not(isnull(test.t.a))), not(isnull(test.t.b)))", + " └─TableFullScan 10000.00 cop[tikv] table:t keep order:false, stats:pseudo", + "CTE_1 10000.00 root Non-Recursive CTE", + "└─HashJoin(Seed Part) 10000.00 root inner join, equal:[eq(test.t.b, test.t.c)]", + " ├─Selection(Build) 8000.00 root not(isnull(test.t.c))", + " │ └─CTEFullScan 10000.00 root CTE:c1 AS c2 data:CTE_0", + " └─Selection(Probe) 8000.00 root not(isnull(test.t.b)), or(not(isnull(test.t.a)), not(isnull(test.t.b)))", + " └─CTEFullScan 10000.00 root CTE:c1 data:CTE_0", + "CTE_0 10000.00 root Non-Recursive CTE", + "└─TableReader(Seed Part) 10000.00 root MppVersion: 1, data:ExchangeSender", + " └─ExchangeSender 10000.00 mpp[tiflash] ExchangeType: PassThrough", + " └─TableFullScan 10000.00 mpp[tiflash] table:t keep order:false, stats:pseudo" + ], + "Warn": [ + "MPP mode may be blocked because you have set a hint to read table `t` from TiKV." + ] + }, + { + "SQL": "explain format = 'brief' with c1 as (select * from t), c2 as (select c1.* from c1, c1 c2 where c1.b=c2.c) select * from c2 c1, c2, (with c3 as (select * from c1) select c3.* from c3, c3 c4 where c3.c=c4.b) c3 where c1.a=c2.b and c1.a=c3.a", + "Plan": [ + "TableReader 19531.25 root MppVersion: 1, data:ExchangeSender", + "└─ExchangeSender 19531.25 mpp[tiflash] ExchangeType: PassThrough", + " └─Projection 19531.25 mpp[tiflash] test.t.a, test.t.b, test.t.c, test.t.a, test.t.b, test.t.c, test.t.a, test.t.b, test.t.c", + " └─Sequence 19531.25 mpp[tiflash] Sequence Node", + " ├─CTE_0 10000.00 mpp[tiflash] Non-Recursive CTE Storage", + " │ └─TableFullScan 10000.00 mpp[tiflash] table:t keep order:false, stats:pseudo", + " ├─CTE_1 10000.00 mpp[tiflash] Non-Recursive CTE Storage", + " │ └─Projection 10000.00 mpp[tiflash] test.t.a, test.t.b, test.t.c", + " │ └─HashJoin 10000.00 mpp[tiflash] inner join, equal:[eq(test.t.b, test.t.c)]", + " │ ├─ExchangeReceiver(Build) 8000.00 mpp[tiflash] ", + " │ │ └─ExchangeSender 8000.00 mpp[tiflash] ExchangeType: Broadcast, Compression: FAST", + " │ │ └─Selection 8000.00 mpp[tiflash] not(isnull(test.t.b)), or(not(isnull(test.t.a)), not(isnull(test.t.b)))", + " │ │ └─CTEFullScan 10000.00 mpp[tiflash] CTE:c1 data:CTE_0", + " │ └─Selection(Probe) 8000.00 mpp[tiflash] not(isnull(test.t.c))", + " │ └─CTEFullScan 10000.00 mpp[tiflash] CTE:c1 AS c2 data:CTE_0", + " └─HashJoin 19531.25 mpp[tiflash] inner join, equal:[eq(test.t.a, test.t.a)]", + " ├─ExchangeReceiver(Build) 8000.00 mpp[tiflash] ", + " │ └─ExchangeSender 8000.00 mpp[tiflash] ExchangeType: Broadcast, Compression: FAST", + " │ └─Sequence 8000.00 mpp[tiflash] Sequence Node", + " │ ├─CTE_2 8000.00 mpp[tiflash] Non-Recursive CTE Storage", + " │ │ └─Selection 8000.00 mpp[tiflash] or(and(not(isnull(test.t.c)), not(isnull(test.t.a))), not(isnull(test.t.b)))", + " │ │ └─CTEFullScan 10000.00 mpp[tiflash] CTE:c1 data:CTE_0", + " │ └─HashJoin 8000.00 mpp[tiflash] inner join, equal:[eq(test.t.c, test.t.b)]", + " │ ├─ExchangeReceiver(Build) 6400.00 mpp[tiflash] ", + " │ │ └─ExchangeSender 6400.00 mpp[tiflash] ExchangeType: Broadcast, Compression: FAST", + " │ │ └─Selection 6400.00 mpp[tiflash] not(isnull(test.t.a)), not(isnull(test.t.c))", + " │ │ └─CTEFullScan 8000.00 mpp[tiflash] CTE:c3 data:CTE_2", + " │ └─Selection(Probe) 6400.00 mpp[tiflash] not(isnull(test.t.b))", + " │ └─CTEFullScan 8000.00 mpp[tiflash] CTE:c3 AS c4 data:CTE_2", + " └─HashJoin(Probe) 12500.00 mpp[tiflash] inner join, equal:[eq(test.t.a, test.t.b)]", + " ├─ExchangeReceiver(Build) 8000.00 mpp[tiflash] ", + " │ └─ExchangeSender 8000.00 mpp[tiflash] ExchangeType: Broadcast, Compression: FAST", + " │ └─Selection 8000.00 mpp[tiflash] not(isnull(test.t.a))", + " │ └─CTEFullScan 10000.00 mpp[tiflash] CTE:c2 AS c1 data:CTE_1", + " └─Selection(Probe) 8000.00 mpp[tiflash] not(isnull(test.t.b))", + " └─CTEFullScan 10000.00 mpp[tiflash] CTE:c2 data:CTE_1" + ], + "Warn": null + }, + { + "SQL": "explain format = 'brief' with c1 as (select /*+ read_from_storage(tikv[t]) */ * from t), c2 as (select c1.* from c1, c1 c2 where c1.b=c2.c) select * from c2 c1, c2, (with c3 as (select * from c1) select c3.* from c3, c3 c4 where c3.c=c4.b) c3 where c1.a=c2.b and c1.a=c3.a", + "Plan": [ + "HashJoin 19531.25 root inner join, equal:[eq(test.t.a, test.t.a)]", + "├─HashJoin(Build) 8000.00 root inner join, equal:[eq(test.t.c, test.t.b)]", + "│ ├─Selection(Build) 6400.00 root not(isnull(test.t.b))", + "│ │ └─CTEFullScan 8000.00 root CTE:c3 AS c4 data:CTE_2", + "│ └─Selection(Probe) 6400.00 root not(isnull(test.t.a)), not(isnull(test.t.c))", + "│ └─CTEFullScan 8000.00 root CTE:c3 data:CTE_2", + "└─HashJoin(Probe) 12500.00 root inner join, equal:[eq(test.t.a, test.t.b)]", + " ├─Selection(Build) 8000.00 root not(isnull(test.t.b))", + " │ └─CTEFullScan 10000.00 root CTE:c2 data:CTE_1", + " └─Selection(Probe) 8000.00 root not(isnull(test.t.a))", + " └─CTEFullScan 10000.00 root CTE:c2 AS c1 data:CTE_1", + "CTE_2 8000.00 root Non-Recursive CTE", + "└─Selection(Seed Part) 8000.00 root or(and(not(isnull(test.t.c)), not(isnull(test.t.a))), not(isnull(test.t.b)))", + " └─CTEFullScan 10000.00 root CTE:c1 data:CTE_0", + "CTE_1 10000.00 root Non-Recursive CTE", + "└─HashJoin(Seed Part) 10000.00 root inner join, equal:[eq(test.t.b, test.t.c)]", + " ├─Selection(Build) 8000.00 root not(isnull(test.t.c))", + " │ └─CTEFullScan 10000.00 root CTE:c1 AS c2 data:CTE_0", + " └─Selection(Probe) 8000.00 root not(isnull(test.t.b)), or(not(isnull(test.t.a)), not(isnull(test.t.b)))", + " └─CTEFullScan 10000.00 root CTE:c1 data:CTE_0", + "CTE_0 10000.00 root Non-Recursive CTE", + "└─TableReader(Seed Part) 10000.00 root data:TableFullScan", + " └─TableFullScan 10000.00 cop[tikv] table:t keep order:false, stats:pseudo" + ], + "Warn": [ + "MPP mode may be blocked because you have set a hint to read table `t` from TiKV." + ] + } + ] } ] diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index 5545deb42f267..bd8b64fdb9ae6 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -1386,7 +1386,10 @@ func IsPointGetWithPKOrUniqueKeyByAutoCommit(ctx sessionctx.Context, p Plan) (bo indexScan := v.IndexPlans[0].(*PhysicalIndexScan) return indexScan.IsPointGetByUniqueKey(ctx), nil case *PhysicalTableReader: - tableScan := v.TablePlans[0].(*PhysicalTableScan) + tableScan, ok := v.TablePlans[0].(*PhysicalTableScan) + if !ok { + return false, nil + } isPointRange := len(tableScan.Ranges) == 1 && tableScan.Ranges[0].IsPointNonNullable(ctx) if !isPointRange { return false, nil diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index 864ed72ad0887..33e254a331345 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -41,6 +41,7 @@ import ( "github.com/pingcap/tidb/util/ranger" "github.com/pingcap/tidb/util/set" "github.com/pingcap/tidb/util/size" + "github.com/pingcap/tipb/go-tipb" "go.uber.org/atomic" "go.uber.org/zap" "golang.org/x/exp/slices" @@ -117,6 +118,8 @@ func (p *PhysicalMergeJoin) tryToGetChildReqProp(prop *property.PhysicalProperty all, desc := prop.AllSameOrder() lProp := property.NewPhysicalProperty(property.RootTaskType, p.LeftJoinKeys, desc, math.MaxFloat64, false) rProp := property.NewPhysicalProperty(property.RootTaskType, p.RightJoinKeys, desc, math.MaxFloat64, false) + lProp.CTEProducerStatus = prop.CTEProducerStatus + rProp.CTEProducerStatus = prop.CTEProducerStatus if !prop.IsSortItemEmpty() { // sort merge join fits the cases of massive ordered data, so desc scan is always expensive. if !all { @@ -439,8 +442,8 @@ func (p *LogicalJoin) getHashJoins(prop *property.PhysicalProperty) (joins []Phy func (p *LogicalJoin) getHashJoin(prop *property.PhysicalProperty, innerIdx int, useOuterToBuild bool) *PhysicalHashJoin { chReqProps := make([]*property.PhysicalProperty, 2) - chReqProps[innerIdx] = &property.PhysicalProperty{ExpectedCnt: math.MaxFloat64} - chReqProps[1-innerIdx] = &property.PhysicalProperty{ExpectedCnt: math.MaxFloat64} + chReqProps[innerIdx] = &property.PhysicalProperty{ExpectedCnt: math.MaxFloat64, CTEProducerStatus: prop.CTEProducerStatus} + chReqProps[1-innerIdx] = &property.PhysicalProperty{ExpectedCnt: math.MaxFloat64, CTEProducerStatus: prop.CTEProducerStatus} if prop.ExpectedCnt < p.stats.RowCount { expCntScale := prop.ExpectedCnt / p.stats.RowCount chReqProps[1-innerIdx].ExpectedCnt = p.children[1-innerIdx].statsInfo().RowCount * expCntScale @@ -483,7 +486,7 @@ func (p *LogicalJoin) constructIndexJoin( return nil } chReqProps := make([]*property.PhysicalProperty, 2) - chReqProps[outerIdx] = &property.PhysicalProperty{TaskTp: property.RootTaskType, ExpectedCnt: math.MaxFloat64, SortItems: prop.SortItems} + chReqProps[outerIdx] = &property.PhysicalProperty{TaskTp: property.RootTaskType, ExpectedCnt: math.MaxFloat64, SortItems: prop.SortItems, CTEProducerStatus: prop.CTEProducerStatus} if prop.ExpectedCnt < p.stats.RowCount { expCntScale := prop.ExpectedCnt / p.stats.RowCount chReqProps[outerIdx].ExpectedCnt = p.children[outerIdx].statsInfo().RowCount * expCntScale @@ -2445,7 +2448,7 @@ func (p *LogicalJoin) tryToGetMppHashJoin(prop *property.PhysicalProperty, useBC baseJoin.InnerChildIdx = preferredBuildIndex childrenProps := make([]*property.PhysicalProperty, 2) if useBCJ { - childrenProps[preferredBuildIndex] = &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, MPPPartitionTp: property.BroadcastType, CanAddEnforcer: true, RejectSort: true} + childrenProps[preferredBuildIndex] = &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, MPPPartitionTp: property.BroadcastType, CanAddEnforcer: true, RejectSort: true, CTEProducerStatus: prop.CTEProducerStatus} expCnt := math.MaxFloat64 if prop.ExpectedCnt < p.stats.RowCount { expCntScale := prop.ExpectedCnt / p.stats.RowCount @@ -2458,12 +2461,12 @@ func (p *LogicalJoin) tryToGetMppHashJoin(prop *property.PhysicalProperty, useBC hashKeys = lPartitionKeys } if matches := prop.IsSubsetOf(hashKeys); len(matches) != 0 { - childrenProps[1-preferredBuildIndex] = &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: expCnt, MPPPartitionTp: property.HashType, MPPPartitionCols: prop.MPPPartitionCols, RejectSort: true} + childrenProps[1-preferredBuildIndex] = &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: expCnt, MPPPartitionTp: property.HashType, MPPPartitionCols: prop.MPPPartitionCols, RejectSort: true, CTEProducerStatus: prop.CTEProducerStatus} } else { return nil } } else { - childrenProps[1-preferredBuildIndex] = &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: expCnt, MPPPartitionTp: property.AnyType, RejectSort: true} + childrenProps[1-preferredBuildIndex] = &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: expCnt, MPPPartitionTp: property.AnyType, RejectSort: true, CTEProducerStatus: prop.CTEProducerStatus} } } else { lPartitionKeys, rPartitionKeys := p.GetPotentialPartitionKeys() @@ -2492,8 +2495,8 @@ func (p *LogicalJoin) tryToGetMppHashJoin(prop *property.PhysicalProperty, useBC lPartitionKeys = choosePartitionKeys(lPartitionKeys, matches) rPartitionKeys = choosePartitionKeys(rPartitionKeys, matches) } - childrenProps[0] = &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, MPPPartitionTp: property.HashType, MPPPartitionCols: lPartitionKeys, CanAddEnforcer: true, RejectSort: true} - childrenProps[1] = &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, MPPPartitionTp: property.HashType, MPPPartitionCols: rPartitionKeys, CanAddEnforcer: true, RejectSort: true} + childrenProps[0] = &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, MPPPartitionTp: property.HashType, MPPPartitionCols: lPartitionKeys, CanAddEnforcer: true, RejectSort: true, CTEProducerStatus: prop.CTEProducerStatus} + childrenProps[1] = &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, MPPPartitionTp: property.HashType, MPPPartitionCols: rPartitionKeys, CanAddEnforcer: true, RejectSort: true, CTEProducerStatus: prop.CTEProducerStatus} } join := PhysicalHashJoin{ basePhysicalJoin: baseJoin, @@ -2597,7 +2600,7 @@ func pushLimitOrTopNForcibly(p LogicalPlan) bool { return false } -func (lt *LogicalTopN) getPhysTopN(_ *property.PhysicalProperty) []PhysicalPlan { +func (lt *LogicalTopN) getPhysTopN(prop *property.PhysicalProperty) []PhysicalPlan { allTaskTypes := []property.TaskType{property.CopSingleReadTaskType, property.CopMultiReadTaskType} if !pushLimitOrTopNForcibly(lt) { allTaskTypes = append(allTaskTypes, property.RootTaskType) @@ -2607,7 +2610,7 @@ func (lt *LogicalTopN) getPhysTopN(_ *property.PhysicalProperty) []PhysicalPlan } ret := make([]PhysicalPlan, 0, len(allTaskTypes)) for _, tp := range allTaskTypes { - resultProp := &property.PhysicalProperty{TaskTp: tp, ExpectedCnt: math.MaxFloat64} + resultProp := &property.PhysicalProperty{TaskTp: tp, ExpectedCnt: math.MaxFloat64, CTEProducerStatus: prop.CTEProducerStatus} topN := PhysicalTopN{ ByItems: lt.ByItems, PartitionBy: lt.PartitionBy, @@ -2619,7 +2622,7 @@ func (lt *LogicalTopN) getPhysTopN(_ *property.PhysicalProperty) []PhysicalPlan return ret } -func (lt *LogicalTopN) getPhysLimits(_ *property.PhysicalProperty) []PhysicalPlan { +func (lt *LogicalTopN) getPhysLimits(prop *property.PhysicalProperty) []PhysicalPlan { p, canPass := GetPropByOrderByItems(lt.ByItems) if !canPass { return nil @@ -2631,7 +2634,7 @@ func (lt *LogicalTopN) getPhysLimits(_ *property.PhysicalProperty) []PhysicalPla } ret := make([]PhysicalPlan, 0, len(allTaskTypes)) for _, tp := range allTaskTypes { - resultProp := &property.PhysicalProperty{TaskTp: tp, ExpectedCnt: float64(lt.Count + lt.Offset), SortItems: p.SortItems} + resultProp := &property.PhysicalProperty{TaskTp: tp, ExpectedCnt: float64(lt.Count + lt.Offset), SortItems: p.SortItems, CTEProducerStatus: prop.CTEProducerStatus} limit := PhysicalLimit{ Count: lt.Count, Offset: lt.Offset, @@ -2707,8 +2710,8 @@ func (la *LogicalApply) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([ }.Init(la.ctx, la.stats.ScaleByExpectCnt(prop.ExpectedCnt), la.blockOffset, - &property.PhysicalProperty{ExpectedCnt: math.MaxFloat64, SortItems: prop.SortItems}, - &property.PhysicalProperty{ExpectedCnt: math.MaxFloat64}) + &property.PhysicalProperty{ExpectedCnt: math.MaxFloat64, SortItems: prop.SortItems, CTEProducerStatus: prop.CTEProducerStatus}, + &property.PhysicalProperty{ExpectedCnt: math.MaxFloat64, CTEProducerStatus: prop.CTEProducerStatus}) apply.SetSchema(la.schema) return []PhysicalPlan{apply}, true, nil } @@ -2784,6 +2787,7 @@ func (lw *LogicalWindow) tryToGetMppWindows(prop *property.PhysicalProperty) []P SortItems: byItems, TaskTp: property.MppTaskType, SortItemsForPartition: byItems, + CTEProducerStatus: prop.CTEProducerStatus, } if !prop.IsPrefix(childProperty) { return nil @@ -2838,7 +2842,7 @@ func (lw *LogicalWindow) exhaustPhysicalPlans(prop *property.PhysicalProperty) ( var byItems []property.SortItem byItems = append(byItems, lw.PartitionBy...) byItems = append(byItems, lw.OrderBy...) - childProperty := &property.PhysicalProperty{ExpectedCnt: math.MaxFloat64, SortItems: byItems, CanAddEnforcer: true} + childProperty := &property.PhysicalProperty{ExpectedCnt: math.MaxFloat64, SortItems: byItems, CanAddEnforcer: true, CTEProducerStatus: prop.CTEProducerStatus} if !prop.IsPrefix(childProperty) { return nil, true, nil } @@ -2924,6 +2928,16 @@ func (p *baseLogicalPlan) canPushToCopImpl(storeTp kv.StoreType, considerDual bo // These operators can be partially push down to TiFlash, so we don't raise warning for them. case *LogicalLimit, *LogicalTopN: return false + case *LogicalSequence: + return storeTp == kv.TiFlash + case *LogicalCTE: + if storeTp != kv.TiFlash { + return false + } + if c.cte.recursivePartLogicalPlan != nil || !c.cte.seedPartLogicalPlan.canPushToCop(storeTp) { + return false + } + return true default: p.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced( "MPP mode may be blocked because operator `" + c.TP() + "` is not supported now.") @@ -2967,9 +2981,14 @@ func (la *LogicalAggregation) getEnforcedStreamAggs(prop *property.PhysicalPrope *copiedChildProperty = *childProp // It's ok to not deep copy the "cols" field. copiedChildProperty.TaskTp = taskTp + newGbyItems := make([]expression.Expression, len(la.GroupByItems)) + copy(newGbyItems, la.GroupByItems) + newAggFuncs := make([]*aggregation.AggFuncDesc, len(la.AggFuncs)) + copy(newAggFuncs, la.AggFuncs) + agg := basePhysicalAgg{ - GroupByItems: la.GroupByItems, - AggFuncs: la.AggFuncs, + GroupByItems: newGbyItems, + AggFuncs: newAggFuncs, }.initForStream(la.ctx, la.stats.ScaleByExpectCnt(prop.ExpectedCnt), la.blockOffset, copiedChildProperty) agg.SetSchema(la.schema.Clone()) enforcedAggs = append(enforcedAggs, agg) @@ -3046,9 +3065,14 @@ func (la *LogicalAggregation) getStreamAggs(prop *property.PhysicalProperty) []P *copiedChildProperty = *childProp // It's ok to not deep copy the "cols" field. copiedChildProperty.TaskTp = taskTp + newGbyItems := make([]expression.Expression, len(la.GroupByItems)) + copy(newGbyItems, la.GroupByItems) + newAggFuncs := make([]*aggregation.AggFuncDesc, len(la.AggFuncs)) + copy(newAggFuncs, la.AggFuncs) + agg := basePhysicalAgg{ - GroupByItems: la.GroupByItems, - AggFuncs: la.AggFuncs, + GroupByItems: newGbyItems, + AggFuncs: newAggFuncs, }.initForStream(la.ctx, la.stats.ScaleByExpectCnt(prop.ExpectedCnt), la.blockOffset, copiedChildProperty) agg.SetSchema(la.schema.Clone()) streamAggs = append(streamAggs, agg) @@ -3124,7 +3148,7 @@ func (la *LogicalAggregation) tryToGetMppHashAggs(prop *property.PhysicalPropert // To avoid mess, we don't do any one-phase aggregation in this case. // If this is a skew distinct group agg, skip generating 1-phase agg, because skew data will cause performance issue if len(partitionCols) != 0 && !la.ctx.GetSessionVars().EnableSkewDistinctAgg { - childProp := &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, MPPPartitionTp: property.HashType, MPPPartitionCols: partitionCols, CanAddEnforcer: true, RejectSort: true} + childProp := &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, MPPPartitionTp: property.HashType, MPPPartitionCols: partitionCols, CanAddEnforcer: true, RejectSort: true, CTEProducerStatus: prop.CTEProducerStatus} agg := NewPhysicalHashAgg(la, la.stats.ScaleByExpectCnt(prop.ExpectedCnt), childProp) agg.SetSchema(la.schema.Clone()) agg.MppRunMode = Mpp1Phase @@ -3138,7 +3162,7 @@ func (la *LogicalAggregation) tryToGetMppHashAggs(prop *property.PhysicalPropert // 2-phase agg // no partition property down,record partition cols inside agg itself, enforce shuffler latter. - childProp := &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, MPPPartitionTp: property.AnyType, RejectSort: true} + childProp := &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, MPPPartitionTp: property.AnyType, RejectSort: true, CTEProducerStatus: prop.CTEProducerStatus} agg := NewPhysicalHashAgg(la, la.stats.ScaleByExpectCnt(prop.ExpectedCnt), childProp) agg.SetSchema(la.schema.Clone()) agg.MppRunMode = Mpp2Phase @@ -3147,7 +3171,7 @@ func (la *LogicalAggregation) tryToGetMppHashAggs(prop *property.PhysicalPropert // agg runs on TiDB with a partial agg on TiFlash if possible if prop.TaskTp == property.RootTaskType { - childProp := &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, RejectSort: true} + childProp := &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, RejectSort: true, CTEProducerStatus: prop.CTEProducerStatus} agg := NewPhysicalHashAgg(la, la.stats.ScaleByExpectCnt(prop.ExpectedCnt), childProp) agg.SetSchema(la.schema.Clone()) agg.MppRunMode = MppTiDB @@ -3155,7 +3179,7 @@ func (la *LogicalAggregation) tryToGetMppHashAggs(prop *property.PhysicalPropert } } else if !hasFinalAgg { // TODO: support scalar agg in MPP, merge the final result to one node - childProp := &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, RejectSort: true} + childProp := &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, RejectSort: true, CTEProducerStatus: prop.CTEProducerStatus} agg := NewPhysicalHashAgg(la, la.stats.ScaleByExpectCnt(prop.ExpectedCnt), childProp) agg.SetSchema(la.schema.Clone()) if la.HasDistinct() || la.HasOrderBy() { @@ -3249,7 +3273,7 @@ func (la *LogicalAggregation) getHashAggs(prop *property.PhysicalProperty) []Phy hashAggs = append(hashAggs, mppAggs...) } } else { - agg := NewPhysicalHashAgg(la, la.stats.ScaleByExpectCnt(prop.ExpectedCnt), &property.PhysicalProperty{ExpectedCnt: math.MaxFloat64, TaskTp: taskTp}) + agg := NewPhysicalHashAgg(la, la.stats.ScaleByExpectCnt(prop.ExpectedCnt), &property.PhysicalProperty{ExpectedCnt: math.MaxFloat64, TaskTp: taskTp, CTEProducerStatus: prop.CTEProducerStatus}) agg.SetSchema(la.schema.Clone()) hashAggs = append(hashAggs, agg) } @@ -3353,7 +3377,7 @@ func (p *LogicalLimit) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([] } ret := make([]PhysicalPlan, 0, len(allTaskTypes)) for _, tp := range allTaskTypes { - resultProp := &property.PhysicalProperty{TaskTp: tp, ExpectedCnt: float64(p.Count + p.Offset)} + resultProp := &property.PhysicalProperty{TaskTp: tp, ExpectedCnt: float64(p.Count + p.Offset), CTEProducerStatus: prop.CTEProducerStatus} limit := PhysicalLimit{ Offset: p.Offset, Count: p.Count, @@ -3394,12 +3418,13 @@ func (p *LogicalUnionAll) exhaustPhysicalPlans(prop *property.PhysicalProperty) for range p.children { if canUseMpp && prop.TaskTp == property.MppTaskType { chReqProps = append(chReqProps, &property.PhysicalProperty{ - ExpectedCnt: prop.ExpectedCnt, - TaskTp: property.MppTaskType, - RejectSort: true, + ExpectedCnt: prop.ExpectedCnt, + TaskTp: property.MppTaskType, + RejectSort: true, + CTEProducerStatus: prop.CTEProducerStatus, }) } else { - chReqProps = append(chReqProps, &property.PhysicalProperty{ExpectedCnt: prop.ExpectedCnt, RejectSort: true}) + chReqProps = append(chReqProps, &property.PhysicalProperty{ExpectedCnt: prop.ExpectedCnt, RejectSort: true, CTEProducerStatus: prop.CTEProducerStatus}) } } ua := PhysicalUnionAll{ @@ -3410,9 +3435,10 @@ func (p *LogicalUnionAll) exhaustPhysicalPlans(prop *property.PhysicalProperty) chReqProps = make([]*property.PhysicalProperty, 0, len(p.children)) for range p.children { chReqProps = append(chReqProps, &property.PhysicalProperty{ - ExpectedCnt: prop.ExpectedCnt, - TaskTp: property.MppTaskType, - RejectSort: true, + ExpectedCnt: prop.ExpectedCnt, + TaskTp: property.MppTaskType, + RejectSort: true, + CTEProducerStatus: prop.CTEProducerStatus, }) } mppUA := PhysicalUnionAll{mpp: true}.Init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), p.blockOffset, chReqProps...) @@ -3434,7 +3460,7 @@ func (p *LogicalPartitionUnionAll) exhaustPhysicalPlans(prop *property.PhysicalP } func (ls *LogicalSort) getPhysicalSort(prop *property.PhysicalProperty) *PhysicalSort { - ps := PhysicalSort{ByItems: ls.ByItems}.Init(ls.ctx, ls.stats.ScaleByExpectCnt(prop.ExpectedCnt), ls.blockOffset, &property.PhysicalProperty{TaskTp: prop.TaskTp, ExpectedCnt: math.MaxFloat64, RejectSort: true}) + ps := PhysicalSort{ByItems: ls.ByItems}.Init(ls.ctx, ls.stats.ScaleByExpectCnt(prop.ExpectedCnt), ls.blockOffset, &property.PhysicalProperty{TaskTp: prop.TaskTp, ExpectedCnt: math.MaxFloat64, RejectSort: true, CTEProducerStatus: prop.CTEProducerStatus}) return ps } @@ -3478,6 +3504,51 @@ func (p *LogicalMaxOneRow) exhaustPhysicalPlans(prop *property.PhysicalProperty) p.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because operator `MaxOneRow` is not supported now.") return nil, true, nil } - mor := PhysicalMaxOneRow{}.Init(p.ctx, p.stats, p.blockOffset, &property.PhysicalProperty{ExpectedCnt: 2}) + mor := PhysicalMaxOneRow{}.Init(p.ctx, p.stats, p.blockOffset, &property.PhysicalProperty{ExpectedCnt: 2, CTEProducerStatus: prop.CTEProducerStatus}) return []PhysicalPlan{mor}, true, nil } + +func (p *LogicalCTE) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]PhysicalPlan, bool, error) { + pcte := PhysicalCTE{CTE: p.cte}.Init(p.ctx, p.stats) + if prop.IsFlashProp() { + pcte.storageSender = PhysicalExchangeSender{ + ExchangeType: tipb.ExchangeType_Broadcast, + }.Init(p.ctx, p.stats) + } + pcte.SetSchema(p.schema) + pcte.childrenReqProps = []*property.PhysicalProperty{prop.CloneEssentialFields()} + return []PhysicalPlan{(*PhysicalCTEStorage)(pcte)}, true, nil +} + +func (p *LogicalSequence) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]PhysicalPlan, bool, error) { + possibleChildrenProps := make([][]*property.PhysicalProperty, 0, 2) + anyType := &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, MPPPartitionTp: property.AnyType, CanAddEnforcer: true, RejectSort: true, CTEProducerStatus: prop.CTEProducerStatus} + if prop.TaskTp == property.MppTaskType { + if prop.CTEProducerStatus == property.SomeCTEFailedMpp { + return nil, true, nil + } + anyType.CTEProducerStatus = property.AllCTECanMpp + possibleChildrenProps = append(possibleChildrenProps, []*property.PhysicalProperty{anyType, prop.CloneEssentialFields()}) + } else { + copied := prop.CloneEssentialFields() + copied.CTEProducerStatus = property.SomeCTEFailedMpp + possibleChildrenProps = append(possibleChildrenProps, []*property.PhysicalProperty{{TaskTp: property.RootTaskType, ExpectedCnt: math.MaxFloat64, CTEProducerStatus: property.SomeCTEFailedMpp}, copied}) + } + + if prop.TaskTp != property.MppTaskType && prop.CTEProducerStatus != property.SomeCTEFailedMpp && + p.SCtx().GetSessionVars().IsMPPAllowed() && prop.IsSortItemEmpty() { + possibleChildrenProps = append(possibleChildrenProps, []*property.PhysicalProperty{anyType, anyType.CloneEssentialFields()}) + } + seqs := make([]PhysicalPlan, 0, 2) + for _, propChoice := range possibleChildrenProps { + childReqs := make([]*property.PhysicalProperty, 0, len(p.children)) + for i := 0; i < len(p.children)-1; i++ { + childReqs = append(childReqs, propChoice[0].CloneEssentialFields()) + } + childReqs = append(childReqs, propChoice[1]) + seq := PhysicalSequence{}.Init(p.ctx, p.stats, p.blockOffset, childReqs...) + seq.SetSchema(p.children[len(p.children)-1].Schema()) + seqs = append(seqs, seq) + } + return seqs, true, nil +} diff --git a/planner/core/find_best_task.go b/planner/core/find_best_task.go index cde5c1b1c98e9..5b157e81c60d9 100644 --- a/planner/core/find_best_task.go +++ b/planner/core/find_best_task.go @@ -209,30 +209,56 @@ func (p *baseLogicalPlan) enumeratePhysicalPlans4Task(physicalPlans []PhysicalPl prop *property.PhysicalProperty, addEnforcer bool, planCounter *PlanCounterTp, opt *physicalOptimizeOp) (task, int64, error) { var bestTask task = invalidTask var curCntPlan, cntPlan int64 + var err error childTasks := make([]task, 0, len(p.children)) childCnts := make([]int64, len(p.children)) cntPlan = 0 - for _, pp := range physicalPlans { + iteration := func( + selfPhysicalPlan PhysicalPlan, + childTasks []task, + childCnts []int64, + prop *property.PhysicalProperty, + opt *physicalOptimizeOp, + ) ([]task, int64, []int64, error) { // Find best child tasks firstly. childTasks = childTasks[:0] // The curCntPlan records the number of possible plans for pp - curCntPlan = 1 - timeStampNow := p.GetLogicalTS4TaskMap() - savedPlanID := p.ctx.GetSessionVars().PlanID.Load() + curCntPlan := int64(1) for j, child := range p.children { - childProp := pp.GetChildReqProps(j) + childProp := selfPhysicalPlan.GetChildReqProps(j) childTask, cnt, err := child.findBestTask(childProp, &PlanCounterDisabled, opt) childCnts[j] = cnt if err != nil { - return nil, 0, err + return nil, 0, childCnts, err } curCntPlan = curCntPlan * cnt if childTask != nil && childTask.invalid() { - break + return nil, 0, childCnts, nil } childTasks = append(childTasks, childTask) } + // This check makes sure that there is no invalid child task. + if len(childTasks) != len(p.children) { + return nil, 0, childCnts, nil + } + return childTasks, curCntPlan, childCnts, nil + } + + if seq, ok := p.self.(*LogicalSequence); ok { + iteration = seq.iterateChildPlan + } + + for _, pp := range physicalPlans { + timeStampNow := p.GetLogicalTS4TaskMap() + savedPlanID := p.ctx.GetSessionVars().PlanID.Load() + + childTasks, curCntPlan, childCnts, err = iteration(pp, childTasks, childCnts, prop, opt) + + if err != nil { + return nil, 0, err + } + // This check makes sure that there is no invalid child task. if len(childTasks) != len(p.children) { continue @@ -289,6 +315,64 @@ func (p *baseLogicalPlan) enumeratePhysicalPlans4Task(physicalPlans []PhysicalPl return bestTask, cntPlan, nil } +// iterateChildPlan does the special part for sequence. We need to iterate its child one by one to check whether the former child is a valid plan and then go to the nex +func (p *LogicalSequence) iterateChildPlan( + selfPhysicalPlan PhysicalPlan, + childTasks []task, + childCnts []int64, + prop *property.PhysicalProperty, + opt *physicalOptimizeOp, +) ([]task, int64, []int64, error) { + // Find best child tasks firstly. + childTasks = childTasks[:0] + // The curCntPlan records the number of possible plans for pp + curCntPlan := int64(1) + lastIdx := len(p.children) - 1 + for j := 0; j < lastIdx; j++ { + child := p.children[j] + childProp := selfPhysicalPlan.GetChildReqProps(j) + childTask, cnt, err := child.findBestTask(childProp, &PlanCounterDisabled, opt) + childCnts[j] = cnt + if err != nil { + return nil, 0, nil, err + } + curCntPlan = curCntPlan * cnt + if childTask != nil && childTask.invalid() { + return nil, 0, nil, nil + } + _, isMpp := childTask.(*mppTask) + if !isMpp && prop.IsFlashProp() { + break + } + childTasks = append(childTasks, childTask) + } + // This check makes sure that there is no invalid child task. + if len(childTasks) != len(p.children)-1 { + return nil, 0, nil, nil + } + + lastChildProp := selfPhysicalPlan.GetChildReqProps(lastIdx).CloneEssentialFields() + if lastChildProp.IsFlashProp() { + lastChildProp.CTEProducerStatus = property.AllCTECanMpp + } + lastChildTask, cnt, err := p.Children()[lastIdx].findBestTask(lastChildProp, &PlanCounterDisabled, opt) + childCnts[lastIdx] = cnt + if err != nil { + return nil, 0, nil, err + } + curCntPlan = curCntPlan * cnt + if lastChildTask != nil && lastChildTask.invalid() { + return nil, 0, nil, nil + } + + if _, ok := lastChildTask.(*mppTask); !ok && lastChildProp.CTEProducerStatus == property.AllCTECanMpp { + return nil, 0, nil, nil + } + + childTasks = append(childTasks, lastChildTask) + return childTasks, curCntPlan, childCnts, nil +} + // compareTaskCost compares cost of curTask and bestTask and returns whether curTask's cost is smaller than bestTask's. func compareTaskCost(_ sessionctx.Context, curTask, bestTask task, op *physicalOptimizeOp) (curIsBetter bool, err error) { curCost, curInvalid, err := getTaskPlanCost(curTask, op) @@ -2525,14 +2609,30 @@ func (ds *DataSource) getOriginalPhysicalIndexScan(prop *property.PhysicalProper return is } -func (p *LogicalCTE) findBestTask(prop *property.PhysicalProperty, _ *PlanCounterTp, _ *physicalOptimizeOp) (t task, cntPlan int64, err error) { +func (p *LogicalCTE) findBestTask(prop *property.PhysicalProperty, counter *PlanCounterTp, pop *physicalOptimizeOp) (t task, cntPlan int64, err error) { + if len(p.children) > 0 { + return p.baseLogicalPlan.findBestTask(prop, counter, pop) + } if !prop.IsSortItemEmpty() && !prop.CanAddEnforcer { return invalidTask, 1, nil } // The physical plan has been build when derive stats. pcte := PhysicalCTE{SeedPlan: p.cte.seedPartPhysicalPlan, RecurPlan: p.cte.recursivePartPhysicalPlan, CTE: p.cte, cteAsName: p.cteAsName, cteName: p.cteName}.Init(p.ctx, p.stats) pcte.SetSchema(p.schema) - t = &rootTask{p: pcte, isEmpty: false} + if prop.IsFlashProp() && prop.CTEProducerStatus == property.AllCTECanMpp { + pcte.readerReceiver = PhysicalExchangeReceiver{IsCTEReader: true}.Init(p.ctx, p.stats) + if prop.MPPPartitionTp != property.AnyType { + return invalidTask, 1, nil + } + t = &mppTask{ + p: pcte, + partTp: prop.MPPPartitionTp, + hashCols: prop.MPPPartitionCols, + tblColHists: p.stats.HistColl, + } + } else { + t = &rootTask{p: pcte, isEmpty: false} + } if prop.CanAddEnforcer { t = enforceProperty(prop, t, p.basePlan.ctx) } diff --git a/planner/core/flat_plan.go b/planner/core/flat_plan.go index 328418f67f663..ff2bc1d7985fe 100644 --- a/planner/core/flat_plan.go +++ b/planner/core/flat_plan.go @@ -356,7 +356,10 @@ func (f *FlatPhysicalPlan) flattenRecursively(p Plan, info *operatorCtx, target // for details) to affect the row count display of the independent CTE plan tree. copiedCTE := *plan copiedCTE.probeParents = nil - f.ctesToFlatten = append(f.ctesToFlatten, &copiedCTE) + if info.isRoot { + // If it's executed in TiDB, we need to record it since we don't have producer and consumer + f.ctesToFlatten = append(f.ctesToFlatten, &copiedCTE) + } case *Insert: if plan.SelectPlan != nil { childCtx.isRoot = true diff --git a/planner/core/fragment.go b/planner/core/fragment.go index a7c9f0de0535e..97d5659728fff 100644 --- a/planner/core/fragment.go +++ b/planner/core/fragment.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/types" @@ -45,6 +46,7 @@ type Fragment struct { // following field are filled during getPlanFragment. TableScan *PhysicalTableScan // result physical table scan ExchangeReceivers []*PhysicalExchangeReceiver // data receivers + CTEReaders []*PhysicalCTE // The receivers for CTE storage/producer. // following fields are filled after scheduling. ExchangeSender *PhysicalExchangeSender // data exporter @@ -54,6 +56,14 @@ type Fragment struct { singleton bool // indicates if this is a task running on a single node. } +type cteGroupInFragment struct { + CTEStorage *PhysicalCTEStorage + CTEReader []*PhysicalCTE + + StorageTasks []*kv.MPPTask + StorageFragments []*Fragment +} + const emptyFragmentSize = int64(unsafe.Sizeof(Fragment{})) // MemoryUsage return the memory usage of Fragment @@ -88,6 +98,8 @@ type mppTaskGenerator struct { is infoschema.InfoSchema frags []*Fragment cache map[int]tasksAndFrags + + CTEGroups map[int]*cteGroupInFragment } // GenerateRootMPPTasks generate all mpp tasks and return root ones. @@ -117,7 +129,6 @@ func AllocMPPQueryID() uint64 { func (e *mppTaskGenerator) generateMPPTasks(s *PhysicalExchangeSender) ([]*Fragment, error) { mppVersion := e.ctx.GetSessionVars().ChooseMppVersion() - logutil.BgLogger().Info("Mpp will generate tasks", zap.String("plan", ToString(s)), zap.Int64("mpp-version", mppVersion.ToInt64())) tidbTask := &kv.MPPTask{ StartTs: e.startTS, MppQueryID: e.mppQueryID, @@ -147,13 +158,24 @@ func (m *mppAddr) GetAddress() string { // for the task without table scan, we construct tasks according to the children's tasks. // That's for avoiding assigning to the failed node repeatly. We assumes that the chilren node must be workable. -func (e *mppTaskGenerator) constructMPPTasksByChildrenTasks(tasks []*kv.MPPTask) []*kv.MPPTask { +func (e *mppTaskGenerator) constructMPPTasksByChildrenTasks(tasks []*kv.MPPTask, cteProducerTasks []*kv.MPPTask) []*kv.MPPTask { addressMap := make(map[string]struct{}) newTasks := make([]*kv.MPPTask, 0, len(tasks)) + cteAddrMap := make(map[string]struct{}) + for _, task := range cteProducerTasks { + addr := task.Meta.GetAddress() + if _, ok := cteAddrMap[addr]; !ok { + cteAddrMap[addr] = struct{}{} + } + } for _, task := range tasks { addr := task.Meta.GetAddress() // for upper fragment, the task num is equal to address num covered by lower tasks _, ok := addressMap[addr] + if _, okk := cteAddrMap[addr]; !okk && len(cteAddrMap) > 0 { + // If we have cte producer, we reject other possible addresses. + continue + } if !ok { mppTask := &kv.MPPTask{ Meta: &mppAddr{addr: addr}, @@ -183,6 +205,8 @@ func (f *Fragment) init(p PhysicalPlan) error { f.ExchangeReceivers = append(f.ExchangeReceivers, x) case *PhysicalUnionAll: return errors.New("unexpected union all detected") + case *PhysicalCTE: + f.CTEReaders = append(f.CTEReaders, x) default: for _, ch := range p.Children() { if err := f.init(ch); err != nil { @@ -201,10 +225,10 @@ func (f *Fragment) init(p PhysicalPlan) error { // after untwist, there will be two plans in `forest` slice: // - ExchangeSender -> Projection (c1) -> TableScan(t) // - ExchangeSender -> Projection (c2) -> TableScan(s) -func untwistPlanAndRemoveUnionAll(stack []PhysicalPlan, forest *[]*PhysicalExchangeSender) error { +func (e *mppTaskGenerator) untwistPlanAndRemoveUnionAll(stack []PhysicalPlan, forest *[]*PhysicalExchangeSender) error { cur := stack[len(stack)-1] switch x := cur.(type) { - case *PhysicalTableScan, *PhysicalExchangeReceiver: // This should be the leave node. + case *PhysicalTableScan, *PhysicalExchangeReceiver, *PhysicalCTE: // This should be the leave node. p, err := stack[0].Clone() if err != nil { return errors.Trace(err) @@ -214,6 +238,9 @@ func untwistPlanAndRemoveUnionAll(stack []PhysicalPlan, forest *[]*PhysicalExcha if _, ok := stack[i].(*PhysicalUnionAll); ok { continue } + if _, ok := stack[i].(*PhysicalSequence); ok { + continue + } ch, err := stack[i].Clone() if err != nil { return errors.Trace(err) @@ -225,36 +252,58 @@ func untwistPlanAndRemoveUnionAll(stack []PhysicalPlan, forest *[]*PhysicalExcha } p = ch } + if cte, ok := p.(*PhysicalCTE); ok { + e.CTEGroups[cte.CTE.IDForStorage].CTEReader = append(e.CTEGroups[cte.CTE.IDForStorage].CTEReader, cte) + } case *PhysicalHashJoin: stack = append(stack, x.children[1-x.InnerChildIdx]) - err := untwistPlanAndRemoveUnionAll(stack, forest) + err := e.untwistPlanAndRemoveUnionAll(stack, forest) stack = stack[:len(stack)-1] return errors.Trace(err) case *PhysicalUnionAll: for _, ch := range x.children { stack = append(stack, ch) - err := untwistPlanAndRemoveUnionAll(stack, forest) + err := e.untwistPlanAndRemoveUnionAll(stack, forest) stack = stack[:len(stack)-1] if err != nil { return errors.Trace(err) } } + case *PhysicalSequence: + lastChildIdx := len(x.children) - 1 + // except the last child, those previous ones are all cte producer. + for i := 0; i < lastChildIdx; i++ { + if e.CTEGroups == nil { + e.CTEGroups = make(map[int]*cteGroupInFragment) + } + cteStorage := x.children[i].(*PhysicalCTEStorage) + e.CTEGroups[cteStorage.CTE.IDForStorage] = &cteGroupInFragment{ + CTEStorage: cteStorage, + CTEReader: make([]*PhysicalCTE, 0, 3), + } + } + stack = append(stack, x.children[lastChildIdx]) + err := e.untwistPlanAndRemoveUnionAll(stack, forest) + stack = stack[:len(stack)-1] + if err != nil { + return err + } default: if len(cur.Children()) != 1 { return errors.Trace(errors.New("unexpected plan " + cur.ExplainID().String())) } ch := cur.Children()[0] stack = append(stack, ch) - err := untwistPlanAndRemoveUnionAll(stack, forest) + err := e.untwistPlanAndRemoveUnionAll(stack, forest) stack = stack[:len(stack)-1] return errors.Trace(err) } return nil } -func buildFragments(s *PhysicalExchangeSender) ([]*Fragment, error) { +func (e *mppTaskGenerator) buildFragments(s *PhysicalExchangeSender) ([]*Fragment, error) { forest := make([]*PhysicalExchangeSender, 0, 1) - err := untwistPlanAndRemoveUnionAll([]PhysicalPlan{s}, &forest) + err := e.untwistPlanAndRemoveUnionAll([]PhysicalPlan{s}, &forest) if err != nil { return nil, errors.Trace(err) } @@ -274,7 +323,7 @@ func (e *mppTaskGenerator) generateMPPTasksForExchangeSender(s *PhysicalExchange if cached, ok := e.cache[s.ID()]; ok { return cached.tasks, cached.frags, nil } - frags, err := buildFragments(s) + frags, err := e.buildFragments(s) if err != nil { return nil, nil, errors.Trace(err) } @@ -293,6 +342,12 @@ func (e *mppTaskGenerator) generateMPPTasksForExchangeSender(s *PhysicalExchange } func (e *mppTaskGenerator) generateMPPTasksForFragment(f *Fragment) (tasks []*kv.MPPTask, err error) { + for _, cteReader := range f.CTEReaders { + err := e.generateTasksForCTEReader(cteReader) + if err != nil { + return nil, err + } + } for _, r := range f.ExchangeReceivers { // chain call: to get lower fragments and tasks r.Tasks, r.frags, err = e.generateMPPTasksForExchangeSender(r.GetExchangeSender()) @@ -312,10 +367,19 @@ func (e *mppTaskGenerator) generateMPPTasksForFragment(f *Fragment) (tasks []*kv for _, r := range f.ExchangeReceivers { childrenTasks = append(childrenTasks, r.Tasks...) } + cteProducerTasks := make([]*kv.MPPTask, 0) + for _, cteR := range f.CTEReaders { + child := cteR.children[0] + if _, ok := child.(*PhysicalProjection); ok { + child = child.Children()[0] + } + cteProducerTasks = append(cteProducerTasks, child.(*PhysicalExchangeReceiver).Tasks...) + childrenTasks = append(childrenTasks, child.(*PhysicalExchangeReceiver).Tasks...) + } if f.singleton && len(childrenTasks) > 0 { childrenTasks = childrenTasks[0:1] } - tasks = e.constructMPPTasksByChildrenTasks(childrenTasks) + tasks = e.constructMPPTasksByChildrenTasks(childrenTasks, cteProducerTasks) } if err != nil { return nil, errors.Trace(err) @@ -325,10 +389,77 @@ func (e *mppTaskGenerator) generateMPPTasksForFragment(f *Fragment) (tasks []*kv frag.ExchangeSender.TargetTasks = append(frag.ExchangeSender.TargetTasks, tasks...) } } + for _, cteR := range f.CTEReaders { + e.addReaderTasksForCTEStorage(cteR.CTE.IDForStorage, tasks...) + } f.ExchangeSender.Tasks = tasks + f.flipCTEReader(f.ExchangeSender) return tasks, nil } +// flipCTEReader fix the plan tree. In the func generateTasksForCTEReader, we create the plan tree like ParentPlan->CTEConsumer->ExchangeReceiver. +// The CTEConsumer has no real meaning in MPP's execution. We prune it to make the plan become ParentPlan->ExchangeReceiver. +// But the Receiver needs a schema since itself doesn't hold the schema. So the final plan become ParentPlan->ExchangeReceiver->CTEConsumer. +func (f *Fragment) flipCTEReader(currentPlan PhysicalPlan) { + newChildren := make([]PhysicalPlan, len(currentPlan.Children())) + for i := 0; i < len(currentPlan.Children()); i++ { + child := currentPlan.Children()[i] + newChildren[i] = child + if cteR, ok := child.(*PhysicalCTE); ok { + receiver := cteR.Children()[0] + newChildren[i] = receiver + } else if _, ok := child.(*PhysicalExchangeReceiver); !ok { + // The receiver is the leaf of the fragment though it has child, we need break it. + f.flipCTEReader(child) + } + } + currentPlan.SetChildren(newChildren...) +} + +// genereateTasksForCTEReader generates the task leaf for cte reader. +// A fragment's leaf must be Exchange and we could not lost the information of the CTE. +// So we create the plan like ParentPlan->CTEReader->ExchangeReceiver. +func (e *mppTaskGenerator) generateTasksForCTEReader(cteReader *PhysicalCTE) (err error) { + group := e.CTEGroups[cteReader.CTE.IDForStorage] + if group.StorageFragments == nil { + group.CTEStorage.storageSender.SetChildren(group.CTEStorage.children...) + group.StorageTasks, group.StorageFragments, err = e.generateMPPTasksForExchangeSender(group.CTEStorage.storageSender) + if err != nil { + return err + } + } + receiver := cteReader.readerReceiver + receiver.Tasks = group.StorageTasks + receiver.frags = group.StorageFragments + cteReader.SetChildren(receiver) + receiver.SetChildren(group.CTEStorage.children[0]) + inconsistenceNullable := false + for i, col := range cteReader.schema.Columns { + if mysql.HasNotNullFlag(col.RetType.GetFlag()) != mysql.HasNotNullFlag(group.CTEStorage.children[0].Schema().Columns[i].RetType.GetFlag()) { + inconsistenceNullable = true + break + } + } + if inconsistenceNullable { + cols := group.CTEStorage.children[0].Schema().Clone().Columns + for i, col := range cols { + col.Index = i + } + proj := PhysicalProjection{Exprs: expression.Column2Exprs(cols)}.Init(cteReader.ctx, cteReader.stats, 0, nil) + proj.SetSchema(cteReader.schema.Clone()) + proj.SetChildren(receiver) + cteReader.SetChildren(proj) + } + return nil +} + +func (e *mppTaskGenerator) addReaderTasksForCTEStorage(storageID int, tasks ...*kv.MPPTask) { + group := e.CTEGroups[storageID] + for _, frag := range group.StorageFragments { + frag.ExchangeSender.TargetCTEReaderTasks = append(frag.ExchangeSender.TargetCTEReaderTasks, tasks) + } +} + func partitionPruning(ctx sessionctx.Context, tbl table.PartitionedTable, conds []expression.Expression, partitionNames []model.CIStr, columns []*expression.Column, columnNames types.NameSlice) ([]table.PhysicalTable, error) { idxArr, err := PartitionPruning(ctx, tbl, conds, partitionNames, columns, columnNames) diff --git a/planner/core/hints.go b/planner/core/hints.go index 2d67f6800b77f..694b9673bbaef 100644 --- a/planner/core/hints.go +++ b/planner/core/hints.go @@ -174,7 +174,10 @@ func genHintsFromSingle(p PhysicalPlan, nodeType utilhint.NodeType, res []*ast.T } switch pp := p.(type) { case *PhysicalTableReader: - tbl := pp.TablePlans[0].(*PhysicalTableScan) + tbl, ok := pp.TablePlans[0].(*PhysicalTableScan) + if !ok { + return res + } if tbl.StoreType == kv.TiFlash { res = append(res, &ast.TableOptimizerHint{ QBName: qbName, diff --git a/planner/core/initialize.go b/planner/core/initialize.go index c4c05a4061a70..ea62c7cd86210 100644 --- a/planner/core/initialize.go +++ b/planner/core/initialize.go @@ -620,7 +620,7 @@ func (p LogicalCTE) Init(ctx sessionctx.Context, offset int) *LogicalCTE { // Init only assigns type and context. func (p PhysicalCTE) Init(ctx sessionctx.Context, stats *property.StatsInfo) *PhysicalCTE { - p.basePlan = newBasePlan(ctx, plancodec.TypeCTE, 0) + p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeCTE, &p, 0) p.stats = stats return &p } @@ -651,3 +651,17 @@ func (p FKCascade) Init(ctx sessionctx.Context) *FKCascade { p.stats = &property.StatsInfo{} return &p } + +// Init initializes LogicalSequence +func (p LogicalSequence) Init(ctx sessionctx.Context, offset int) *LogicalSequence { + p.baseLogicalPlan = newBaseLogicalPlan(ctx, plancodec.TypeSequence, &p, offset) + return &p +} + +// Init initializes PhysicalSequence +func (p PhysicalSequence) Init(ctx sessionctx.Context, stats *property.StatsInfo, blockOffset int, props ...*property.PhysicalProperty) *PhysicalSequence { + p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeSequence, &p, blockOffset) + p.stats = stats + p.childrenReqProps = props + return &p +} diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index 62b35605ed8af..e0ed32f71389e 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -1737,7 +1737,7 @@ func (b *PlanBuilder) buildSetOpr(ctx context.Context, setOpr *ast.SetOprStmt) ( defer func() { b.outerCTEs = b.outerCTEs[:l] }() - err := b.buildWith(ctx, setOpr.With) + _, err := b.buildWith(ctx, setOpr.With) if err != nil { return nil, err } @@ -2727,7 +2727,7 @@ func (r *correlatedAggregateResolver) resolveSelect(sel *ast.SelectStmt) (err er defer func() { r.b.outerCTEs = r.b.outerCTEs[:l] }() - err := r.b.buildWith(r.ctx, sel.With) + _, err := r.b.buildWith(r.ctx, sel.With) if err != nil { return err } @@ -4020,12 +4020,13 @@ func (b *PlanBuilder) buildSelect(ctx context.Context, sel *ast.SelectStmt) (p L } } + var currentLayerCTEs []*cteInfo if sel.With != nil { l := len(b.outerCTEs) defer func() { b.outerCTEs = b.outerCTEs[:l] }() - err = b.buildWith(ctx, sel.With) + currentLayerCTEs, err = b.buildWith(ctx, sel.With) if err != nil { return nil, err } @@ -4246,10 +4247,44 @@ func (b *PlanBuilder) buildSelect(ctx context.Context, sel *ast.SelectStmt) (p L } proj.names = p.OutputNames()[:oldLen] proj.SetSchema(schema) - return proj, nil + return b.tryToBuildSequence(currentLayerCTEs, proj), nil } - return p, nil + return b.tryToBuildSequence(currentLayerCTEs, p), nil +} + +func (b *PlanBuilder) tryToBuildSequence(ctes []*cteInfo, p LogicalPlan) LogicalPlan { + if !b.ctx.GetSessionVars().EnableMPPSharedCTEExecution { + return p + } + for i := len(ctes) - 1; i >= 0; i-- { + if !ctes[i].nonRecursive { + return p + } + if ctes[i].isInline || ctes[i].cteClass == nil { + ctes = append(ctes[:i], ctes[i+1:]...) + } + } + if len(ctes) == 0 { + return p + } + lctes := make([]LogicalPlan, 0, len(ctes)+1) + for _, cte := range ctes { + lcte := LogicalCTE{ + cte: cte.cteClass, + cteAsName: cte.def.Name, + cteName: cte.def.Name, + seedStat: cte.seedStat, + onlyUsedAsStorage: true, + }.Init(b.ctx, b.getSelectOffset()) + lcte.SetSchema(getResultCTESchema(cte.seedLP.Schema(), b.ctx.GetSessionVars())) + lctes = append(lctes, lcte) + } + b.optFlag |= flagPushDownSequence + seq := LogicalSequence{}.Init(b.ctx, b.getSelectOffset()) + seq.SetChildren(append(lctes, p)...) + seq.SetOutputNames(p.OutputNames().Shallow()) + return seq } func (b *PlanBuilder) buildTableDual() *LogicalTableDual { @@ -5508,7 +5543,7 @@ func (b *PlanBuilder) buildUpdate(ctx context.Context, update *ast.UpdateStmt) ( defer func() { b.outerCTEs = b.outerCTEs[:l] }() - err := b.buildWith(ctx, update.With) + _, err := b.buildWith(ctx, update.With) if err != nil { return nil, err } @@ -5915,7 +5950,7 @@ func (b *PlanBuilder) buildDelete(ctx context.Context, ds *ast.DeleteStmt) (Plan defer func() { b.outerCTEs = b.outerCTEs[:l] }() - err := b.buildWith(ctx, ds.With) + _, err := b.buildWith(ctx, ds.With) if err != nil { return nil, err } @@ -7150,7 +7185,7 @@ func (b *PlanBuilder) buildRecursiveCTE(ctx context.Context, cte ast.ResultSetNo b.outerCTEs = b.outerCTEs[:l] x.With = sw }() - err := b.buildWith(ctx, x.With) + _, err := b.buildWith(ctx, x.With) if err != nil { return err } @@ -7348,15 +7383,16 @@ func (b *PlanBuilder) genCTETableNameForError() string { return name } -func (b *PlanBuilder) buildWith(ctx context.Context, w *ast.WithClause) error { +func (b *PlanBuilder) buildWith(ctx context.Context, w *ast.WithClause) ([]*cteInfo, error) { // Check CTE name must be unique. nameMap := make(map[string]struct{}) for _, cte := range w.CTEs { if _, ok := nameMap[cte.Name.L]; ok { - return ErrNonUniqTable + return nil, ErrNonUniqTable } nameMap[cte.Name.L] = struct{}{} } + ctes := make([]*cteInfo, 0, len(w.CTEs)) for _, cte := range w.CTEs { b.outerCTEs = append(b.outerCTEs, &cteInfo{def: cte, nonRecursive: !w.IsRecursive, isBuilding: true, storageID: b.allocIDForCTEStorage, seedStat: &property.StatsInfo{}}) b.allocIDForCTEStorage++ @@ -7371,15 +7407,16 @@ func (b *PlanBuilder) buildWith(ctx context.Context, w *ast.WithClause) error { } _, err := b.buildCte(ctx, cte, w.IsRecursive) if err != nil { - return err + return nil, err } b.outerCTEs[len(b.outerCTEs)-1].optFlag = b.optFlag b.outerCTEs[len(b.outerCTEs)-1].isBuilding = false b.optFlag = saveFlag // each cte (select statement) will generate a handle map, pop it out here. b.handleHelper.popMap() + ctes = append(ctes, b.outerCTEs[len(b.outerCTEs)-1]) } - return nil + return ctes, nil } func (b *PlanBuilder) buildProjection4CTEUnion(_ context.Context, seed LogicalPlan, recur LogicalPlan) (LogicalPlan, error) { diff --git a/planner/core/logical_plans.go b/planner/core/logical_plans.go index 029de940c57e0..cb7d134f12612 100644 --- a/planner/core/logical_plans.go +++ b/planner/core/logical_plans.go @@ -1705,6 +1705,7 @@ type LogicalLimit struct { Offset uint64 Count uint64 limitHints limitHintInfo + IsPartial bool } // GetPartitionBy returns partition by fields @@ -2037,6 +2038,8 @@ type LogicalCTE struct { cteName model.CIStr seedStat *property.StatsInfo isOuterMostCTE bool + + onlyUsedAsStorage bool } // LogicalCTETable is for CTE table @@ -2059,3 +2062,22 @@ func (p *LogicalCTE) ExtractCorrelatedCols() []*expression.CorrelatedColumn { } return corCols } + +// LogicalSequence is used to mark the CTE producer in the main query tree. +// Its last child is main query. The previous children are cte producers. +// And there might be dependencies between the CTE producers: +// +// Suppose that the sequence has 4 children, naming c0, c1, c2, c3. +// From the definition, c3 is the main query. c0, c1, c2 are CTE producers. +// It's possible that c1 references c0, c2 references c1 and c2. +// But it's no possible that c0 references c1 or c2. +// +// We use this property to do complex optimizations for CTEs. +type LogicalSequence struct { + baseLogicalPlan +} + +// Schema returns its last child(which is the main query plan)'s schema. +func (p *LogicalSequence) Schema() *expression.Schema { + return p.children[len(p.children)-1].Schema() +} diff --git a/planner/core/optimizer.go b/planner/core/optimizer.go index 2cc066eaaaa8d..caca9c3e81448 100644 --- a/planner/core/optimizer.go +++ b/planner/core/optimizer.go @@ -82,6 +82,7 @@ const ( flagSyncWaitStatsLoadPoint flagJoinReOrder flagPrunColumnsAgain + flagPushDownSequence ) var optRuleList = []logicalOptRule{ @@ -106,6 +107,7 @@ var optRuleList = []logicalOptRule{ &syncWaitStatsLoadPoint{}, &joinReOrderSolver{}, &columnPruner{}, // column pruning again at last, note it will mess up the results of buildKeySolver + &pushDownSequenceSolver{}, } type logicalOptimizeOp struct { @@ -277,22 +279,17 @@ func checkStableResultMode(sctx sessionctx.Context) bool { return s.EnableStableResultMode && (!st.InInsertStmt && !st.InUpdateStmt && !st.InDeleteStmt && !st.InLoadDataStmt) } -// DoOptimize optimizes a logical plan to a physical plan. -func DoOptimize(ctx context.Context, sctx sessionctx.Context, flag uint64, logic LogicalPlan) (PhysicalPlan, float64, error) { +// DoOptimizeAndLogicAsRet optimizes a logical plan to a physical plan and return the optimized logical plan. +func DoOptimizeAndLogicAsRet(ctx context.Context, sctx sessionctx.Context, flag uint64, logic LogicalPlan) (LogicalPlan, PhysicalPlan, float64, error) { sessVars := sctx.GetSessionVars() - if sessVars.StmtCtx.EnableOptimizerDebugTrace { - debugtrace.EnterContextCommon(sctx) - defer debugtrace.LeaveContextCommon(sctx) - } - // if there is something after flagPrunColumns, do flagPrunColumnsAgain if flag&flagPrunColumns > 0 && flag-flagPrunColumns > flagPrunColumns { flag |= flagPrunColumnsAgain } - if checkStableResultMode(sctx) { + if checkStableResultMode(logic.SCtx()) { flag |= flagStabilizeResults } - if sessVars.StmtCtx.StraightJoinOrder { + if logic.SCtx().GetSessionVars().StmtCtx.StraightJoinOrder { // When we use the straight Join Order hint, we should disable the join reorder optimization. flag &= ^flagJoinReOrder } @@ -300,10 +297,11 @@ func DoOptimize(ctx context.Context, sctx sessionctx.Context, flag uint64, logic flag |= flagSyncWaitStatsLoadPoint logic, err := logicalOptimize(ctx, flag, logic) if err != nil { - return nil, 0, err + return nil, nil, 0, err } + if !AllowCartesianProduct.Load() && existsCartesianProduct(logic) { - return nil, 0, errors.Trace(ErrCartesianProductUnsupported) + return nil, nil, 0, errors.Trace(ErrCartesianProductUnsupported) } planCounter := PlanCounterTp(sessVars.StmtCtx.StmtHints.ForceNthPlan) if planCounter == 0 { @@ -311,11 +309,11 @@ func DoOptimize(ctx context.Context, sctx sessionctx.Context, flag uint64, logic } physical, cost, err := physicalOptimize(logic, &planCounter) if err != nil { - return nil, 0, err + return nil, nil, 0, err } finalPlan, err := postOptimize(ctx, sctx, physical) if err != nil { - return nil, 0, err + return nil, nil, 0, err } if sessVars.StmtCtx.EnableOptimizerCETrace { @@ -324,7 +322,18 @@ func DoOptimize(ctx context.Context, sctx sessionctx.Context, flag uint64, logic if sessVars.StmtCtx.EnableOptimizeTrace { sessVars.StmtCtx.OptimizeTracer.RecordFinalPlan(finalPlan.buildPlanTrace()) } - return finalPlan, cost, nil + return logic, finalPlan, cost, nil +} + +// DoOptimize optimizes a logical plan to a physical plan. +func DoOptimize(ctx context.Context, sctx sessionctx.Context, flag uint64, logic LogicalPlan) (PhysicalPlan, float64, error) { + sessVars := sctx.GetSessionVars() + if sessVars.StmtCtx.EnableOptimizerDebugTrace { + debugtrace.EnterContextCommon(sctx) + defer debugtrace.LeaveContextCommon(sctx) + } + _, finalPlan, cost, err := DoOptimizeAndLogicAsRet(ctx, sctx, flag, logic) + return finalPlan, cost, err } // refineCETrace will adjust the content of CETrace. diff --git a/planner/core/physical_plans.go b/planner/core/physical_plans.go index 25e947ad3691d..24fef2a0a7a01 100644 --- a/planner/core/physical_plans.go +++ b/planner/core/physical_plans.go @@ -1508,6 +1508,8 @@ type PhysicalExchangeReceiver struct { Tasks []*kv.MPPTask frags []*Fragment + + IsCTEReader bool } // Clone implment PhysicalPlan interface. @@ -1518,6 +1520,8 @@ func (p *PhysicalExchangeReceiver) Clone() (PhysicalPlan, error) { return nil, errors.Trace(err) } np.basePhysicalPlan = *base + + np.IsCTEReader = p.IsCTEReader return np, nil } @@ -1597,9 +1601,10 @@ func (p *PhysicalExpand) MemoryUsage() (sum int64) { type PhysicalExchangeSender struct { basePhysicalPlan - TargetTasks []*kv.MPPTask - ExchangeType tipb.ExchangeType - HashCols []*property.MPPPartitionColumn + TargetTasks []*kv.MPPTask + TargetCTEReaderTasks [][]*kv.MPPTask + ExchangeType tipb.ExchangeType + HashCols []*property.MPPPartitionColumn // Tasks is the mpp task for current PhysicalExchangeSender. Tasks []*kv.MPPTask CompressionMode kv.ExchangeCompressionMode @@ -1891,9 +1896,19 @@ func (p *PhysicalHashAgg) MemoryUsage() (sum int64) { // NewPhysicalHashAgg creates a new PhysicalHashAgg from a LogicalAggregation. func NewPhysicalHashAgg(la *LogicalAggregation, newStats *property.StatsInfo, prop *property.PhysicalProperty) *PhysicalHashAgg { + newGbyItems := make([]expression.Expression, len(la.GroupByItems)) + copy(newGbyItems, la.GroupByItems) + newAggFuncs := make([]*aggregation.AggFuncDesc, len(la.AggFuncs)) + // There's some places that rewrites the aggFunc in-place. + // I clone it first. + // It needs a well refactor to make sure that the physical optimize should not change the things of logical plan. + // It's bad for cascades + for i, aggFunc := range la.AggFuncs { + newAggFuncs[i] = aggFunc.Clone() + } agg := basePhysicalAgg{ - GroupByItems: la.GroupByItems, - AggFuncs: la.AggFuncs, + GroupByItems: newGbyItems, + AggFuncs: newAggFuncs, }.initForHash(la.ctx, newStats, la.blockOffset, prop) return agg } @@ -2449,6 +2464,9 @@ type PhysicalCTE struct { CTE *CTEClass cteAsName model.CIStr cteName model.CIStr + + readerReceiver *PhysicalExchangeReceiver + storageSender *PhysicalExchangeSender } // PhysicalCTETable is for CTE table. @@ -2487,6 +2505,45 @@ func (p *PhysicalCTE) ExplainID() fmt.Stringer { }) } +// Clone implements PhysicalPlan interface. +func (p *PhysicalCTE) Clone() (PhysicalPlan, error) { + cloned := new(PhysicalCTE) + base, err := p.physicalSchemaProducer.cloneWithSelf(cloned) + if err != nil { + return nil, err + } + cloned.physicalSchemaProducer = *base + if p.SeedPlan != nil { + cloned.SeedPlan, err = p.SeedPlan.Clone() + if err != nil { + return nil, err + } + } + if p.RecurPlan != nil { + cloned.RecurPlan, err = p.RecurPlan.Clone() + if err != nil { + return nil, err + } + } + cloned.cteAsName, cloned.cteName = p.cteAsName, p.cteName + cloned.CTE = p.CTE + if p.storageSender != nil { + clonedSender, err := p.storageSender.Clone() + if err != nil { + return nil, err + } + cloned.storageSender = clonedSender.(*PhysicalExchangeSender) + } + if p.readerReceiver != nil { + clonedReceiver, err := p.readerReceiver.Clone() + if err != nil { + return nil, err + } + cloned.readerReceiver = clonedReceiver.(*PhysicalExchangeReceiver) + } + return cloned, nil +} + // MemoryUsage return the memory usage of PhysicalCTE func (p *PhysicalCTE) MemoryUsage() (sum int64) { if p == nil { @@ -2563,6 +2620,43 @@ func (p *CTEDefinition) MemoryUsage() (sum int64) { return } +// PhysicalCTEStorage is used for representing CTE storage, or CTE producer in other words. +type PhysicalCTEStorage PhysicalCTE + +// ExplainInfo overrides the ExplainInfo +func (*PhysicalCTEStorage) ExplainInfo() string { + return "Non-Recursive CTE Storage" +} + +// ExplainID overrides the ExplainID. +func (p *PhysicalCTEStorage) ExplainID() fmt.Stringer { + return stringutil.MemoizeStr(func() string { + return "CTE_" + strconv.Itoa(p.CTE.IDForStorage) + }) +} + +// MemoryUsage return the memory usage of CTEDefinition +func (p *PhysicalCTEStorage) MemoryUsage() (sum int64) { + if p == nil { + return + } + + sum = p.physicalSchemaProducer.MemoryUsage() + p.cteAsName.MemoryUsage() + if p.CTE != nil { + sum += p.CTE.MemoryUsage() + } + return +} + +// Clone implements PhysicalPlan interface. +func (p *PhysicalCTEStorage) Clone() (PhysicalPlan, error) { + cloned, err := (*PhysicalCTE)(p).Clone() + if err != nil { + return nil, err + } + return (*PhysicalCTEStorage)(cloned.(*PhysicalCTE)), nil +} + func appendChildCandidate(origin PhysicalPlan, pp PhysicalPlan, op *physicalOptimizeOp) { candidate := &tracing.CandidatePlanTrace{ PlanTrace: &tracing.PlanTrace{ @@ -2576,3 +2670,51 @@ func appendChildCandidate(origin PhysicalPlan, pp PhysicalPlan, op *physicalOpti pp.appendChildCandidate(op) op.tracer.Candidates[origin.ID()].AppendChildrenID(pp.ID()) } + +// PhysicalSequence is the physical representation of LogicalSequence. Used to mark the CTE producers in the plan tree. +type PhysicalSequence struct { + physicalSchemaProducer +} + +// MemoryUsage returns the memory usage of the PhysicalSequence. +func (p *PhysicalSequence) MemoryUsage() (sum int64) { + if p == nil { + return + } + + sum = p.physicalSchemaProducer.MemoryUsage() + + return +} + +// ExplainID overrides the ExplainID. +func (p *PhysicalSequence) ExplainID() fmt.Stringer { + return stringutil.MemoizeStr(func() string { + if p.ctx != nil && p.ctx.GetSessionVars().StmtCtx.IgnoreExplainIDSuffix { + return p.TP() + } + return p.TP() + "_" + strconv.Itoa(p.id) + }) +} + +// ExplainInfo overrides the ExplainInfo. +func (*PhysicalSequence) ExplainInfo() string { + res := "Sequence Node" + return res +} + +// Clone implements PhysicalPlan interface. +func (p *PhysicalSequence) Clone() (PhysicalPlan, error) { + cloned := new(PhysicalSequence) + base, err := p.physicalSchemaProducer.cloneWithSelf(cloned) + if err != nil { + return nil, err + } + cloned.physicalSchemaProducer = *base + return cloned, nil +} + +// Schema returns its last child(which is the main query tree)'s schema. +func (p *PhysicalSequence) Schema() *expression.Schema { + return p.Children()[len(p.Children())-1].Schema() +} diff --git a/planner/core/plan_cost_ver2.go b/planner/core/plan_cost_ver2.go index 14dce649ae672..16c429fae133e 100644 --- a/planner/core/plan_cost_ver2.go +++ b/planner/core/plan_cost_ver2.go @@ -751,6 +751,21 @@ func (p *BatchPointGetPlan) getPlanCostVer2(taskType property.TaskType, option * return p.planCostVer2, nil } +func (p *PhysicalCTE) getPlanCostVer2(taskType property.TaskType, option *PlanCostOption) (costVer2, error) { + if p.planCostInit && !hasCostFlag(option.CostFlag, CostFlagRecalculate) { + return p.planCostVer2, nil + } + + inputRows := getCardinality(p, option.CostFlag) + cpuFactor := getTaskCPUFactorVer2(p, taskType) + + projCost := filterCostVer2(option, inputRows, expression.Column2Exprs(p.schema.Columns), cpuFactor) + + p.planCostVer2 = projCost + p.planCostInit = true + return p.planCostVer2, nil +} + func scanCostVer2(option *PlanCostOption, rows, rowSize float64, scanFactor costVer2Factor) costVer2 { if rowSize < 1 { rowSize = 1 diff --git a/planner/core/plan_to_pb.go b/planner/core/plan_to_pb.go index 3ebcece46ef25..f9c14410f6c85 100644 --- a/planner/core/plan_to_pb.go +++ b/planner/core/plan_to_pb.go @@ -331,6 +331,21 @@ func (e *PhysicalExchangeSender) ToPB(ctx sessionctx.Context, storeType kv.Store encodedTask = append(encodedTask, encodedStr) } + encodedUpstreamCTETask := make([]*tipb.EncodedBytesSlice, 0, len(e.TargetCTEReaderTasks)) + for _, cteRTasks := range e.TargetCTEReaderTasks { + encodedTasksForOneCTEReader := &tipb.EncodedBytesSlice{ + EncodedTasks: make([][]byte, 0, len(cteRTasks)), + } + for _, task := range cteRTasks { + encodedStr, err := task.ToPB().Marshal() + if err != nil { + return nil, err + } + encodedTasksForOneCTEReader.EncodedTasks = append(encodedTasksForOneCTEReader.EncodedTasks, encodedStr) + } + encodedUpstreamCTETask = append(encodedUpstreamCTETask, encodedTasksForOneCTEReader) + } + hashCols := make([]expression.Expression, 0, len(e.HashCols)) hashColTypes := make([]*tipb.FieldType, 0, len(e.HashCols)) for _, col := range e.HashCols { @@ -355,13 +370,14 @@ func (e *PhysicalExchangeSender) ToPB(ctx sessionctx.Context, storeType kv.Store return nil, errors.Trace(err) } ecExec := &tipb.ExchangeSender{ - Tp: e.ExchangeType, - EncodedTaskMeta: encodedTask, - PartitionKeys: hashColPb, - Child: child, - Types: hashColTypes, - AllFieldTypes: allFieldTypes, - Compression: e.CompressionMode.ToTipbCompressionMode(), + Tp: e.ExchangeType, + EncodedTaskMeta: encodedTask, + PartitionKeys: hashColPb, + Child: child, + Types: hashColTypes, + AllFieldTypes: allFieldTypes, + Compression: e.CompressionMode.ToTipbCompressionMode(), + UpstreamCteTaskMeta: encodedUpstreamCTETask, } executorID := e.ExplainID().String() return &tipb.Executor{ @@ -397,6 +413,11 @@ func (e *PhysicalExchangeReceiver) ToPB(ctx sessionctx.Context, _ kv.StoreType) EncodedTaskMeta: encodedTask, FieldTypes: fieldTypes, } + if e.IsCTEReader { + encodedTaskShallowCopy := make([][]byte, len(e.Tasks)) + copy(encodedTaskShallowCopy, encodedTask) + ecExec.OriginalCtePrdocuerTaskMeta = encodedTaskShallowCopy + } executorID := e.ExplainID().String() return &tipb.Executor{ Tp: tipb.ExecType_TypeExchangeReceiver, diff --git a/planner/core/rule_column_pruning.go b/planner/core/rule_column_pruning.go index 34a5259abbd32..74c5b68f7a8df 100644 --- a/planner/core/rule_column_pruning.go +++ b/planner/core/rule_column_pruning.go @@ -666,3 +666,14 @@ func preferKeyColumnFromTable(dataSource *DataSource, originColumns []*expressio } return resultColumn, resultColumnInfo } + +// PruneColumns implements the interface of LogicalPlan. +// LogicalCTE just do a empty function call. It's logical optimize is indivisual phase. +func (*LogicalCTE) PruneColumns(_ []*expression.Column, _ *logicalOptimizeOp) error { + return nil +} + +// PruneColumns implements the interface of LogicalPlan. +func (p *LogicalSequence) PruneColumns(parentUsedCols []*expression.Column, opt *logicalOptimizeOp) error { + return p.children[len(p.children)-1].PruneColumns(parentUsedCols, opt) +} diff --git a/planner/core/rule_decorrelate.go b/planner/core/rule_decorrelate.go index 5d5fcef35c3e2..686fc18edfd11 100644 --- a/planner/core/rule_decorrelate.go +++ b/planner/core/rule_decorrelate.go @@ -371,6 +371,10 @@ func (s *decorrelateSolver) optimize(ctx context.Context, p LogicalPlan, opt *lo } } NoOptimize: + // CTE's logical optimization is independent. + if _, ok := p.(*LogicalCTE); ok { + return p, nil + } newChildren := make([]LogicalPlan, 0, len(p.Children())) for _, child := range p.Children() { np, err := s.optimize(ctx, child, opt) diff --git a/planner/core/rule_eliminate_projection.go b/planner/core/rule_eliminate_projection.go index 83936e7cd9e94..7a9d49bff2496 100644 --- a/planner/core/rule_eliminate_projection.go +++ b/planner/core/rule_eliminate_projection.go @@ -168,6 +168,10 @@ func (pe *projectionEliminator) optimize(_ context.Context, lp LogicalPlan, opt // eliminate eliminates the redundant projection in a logical plan. func (pe *projectionEliminator) eliminate(p LogicalPlan, replace map[string]*expression.Column, canEliminate bool, opt *logicalOptimizeOp) LogicalPlan { + // LogicalCTE's logical optimization is independent. + if _, ok := p.(*LogicalCTE); ok { + return p + } proj, isProj := p.(*LogicalProjection) childFlag := canEliminate if _, isUnion := p.(*LogicalUnionAll); isUnion { diff --git a/planner/core/rule_generate_column_substitute.go b/planner/core/rule_generate_column_substitute.go index 88039392bf1a3..2ddcda033adae 100644 --- a/planner/core/rule_generate_column_substitute.go +++ b/planner/core/rule_generate_column_substitute.go @@ -49,6 +49,9 @@ func (gc *gcSubstituter) optimize(ctx context.Context, lp LogicalPlan, opt *logi // For the sake of simplicity, we don't collect the stored generate column because we can't get their expressions directly. // TODO: support stored generate column. func collectGenerateColumn(lp LogicalPlan, exprToColumn ExprColumnMap) { + if _, ok := lp.(*LogicalCTE); ok { + return + } for _, child := range lp.Children() { collectGenerateColumn(child, exprToColumn) } diff --git a/planner/core/rule_join_elimination.go b/planner/core/rule_join_elimination.go index 330b0ba8b98bb..8e7731b338c74 100644 --- a/planner/core/rule_join_elimination.go +++ b/planner/core/rule_join_elimination.go @@ -184,6 +184,10 @@ func GetDupAgnosticAggCols( } func (o *outerJoinEliminator) doOptimize(p LogicalPlan, aggCols []*expression.Column, parentCols []*expression.Column, opt *logicalOptimizeOp) (LogicalPlan, error) { + // CTE's logical optimization is independent. + if _, ok := p.(*LogicalCTE); ok { + return p, nil + } var err error var isEliminated bool for join, isJoin := p.(*LogicalJoin); isJoin; join, isJoin = p.(*LogicalJoin) { diff --git a/planner/core/rule_join_reorder.go b/planner/core/rule_join_reorder.go index d83f7a24e8163..7d0b9ae535ff2 100644 --- a/planner/core/rule_join_reorder.go +++ b/planner/core/rule_join_reorder.go @@ -233,6 +233,10 @@ func (s *joinReOrderSolver) optimize(_ context.Context, p LogicalPlan, opt *logi // optimizeRecursive recursively collects join groups and applies join reorder algorithm for each group. func (s *joinReOrderSolver) optimizeRecursive(ctx sessionctx.Context, p LogicalPlan, tracer *joinReorderTrace) (LogicalPlan, error) { + if _, ok := p.(*LogicalCTE); ok { + return p, nil + } + var err error result := extractJoinGroup(p) diff --git a/planner/core/rule_max_min_eliminate.go b/planner/core/rule_max_min_eliminate.go index d9bb1eae19f9b..de5ab2e35bf5e 100644 --- a/planner/core/rule_max_min_eliminate.go +++ b/planner/core/rule_max_min_eliminate.go @@ -205,6 +205,10 @@ func (a *maxMinEliminator) eliminateSingleMaxMin(agg *LogicalAggregation, opt *l // eliminateMaxMin tries to convert max/min to Limit+Sort operators. func (a *maxMinEliminator) eliminateMaxMin(p LogicalPlan, opt *logicalOptimizeOp) LogicalPlan { + // CTE's logical optimization is indenpent. + if _, ok := p.(*LogicalCTE); ok { + return p + } newChildren := make([]LogicalPlan, 0, len(p.Children())) for _, child := range p.Children() { newChildren = append(newChildren, a.eliminateMaxMin(child, opt)) diff --git a/planner/core/rule_partition_processor.go b/planner/core/rule_partition_processor.go index 0521dd7f69b2d..57225e53368af 100644 --- a/planner/core/rule_partition_processor.go +++ b/planner/core/rule_partition_processor.go @@ -94,6 +94,8 @@ func (s *partitionProcessor) rewriteDataSource(lp LogicalPlan, opt *logicalOptim // Only one partition, no union all. p.SetChildren(ds) return p, nil + case *LogicalCTE: + return lp, nil default: children := lp.Children() for i, child := range children { diff --git a/planner/core/rule_predicate_push_down.go b/planner/core/rule_predicate_push_down.go index 8cbed486b76cc..e3df3c6245f18 100644 --- a/planner/core/rule_predicate_push_down.go +++ b/planner/core/rule_predicate_push_down.go @@ -1013,3 +1013,12 @@ func (p *LogicalCTE) PredicatePushDown(predicates []expression.Expression, _ *lo p.cte.pushDownPredicates = append(p.cte.pushDownPredicates, expression.ComposeCNFCondition(p.ctx, newPred...)) return predicates, p.self } + +// PredicatePushDown implements the LogicalPlan interface. +// Currently, we only maintain the main query tree. +func (p *LogicalSequence) PredicatePushDown(predicates []expression.Expression, op *logicalOptimizeOp) ([]expression.Expression, LogicalPlan) { + lastIdx := len(p.children) - 1 + remained, newLastChild := p.children[lastIdx].PredicatePushDown(predicates, op) + p.SetChild(lastIdx, newLastChild) + return remained, p +} diff --git a/planner/core/rule_push_down_sequence.go b/planner/core/rule_push_down_sequence.go new file mode 100644 index 0000000000000..c1d7ac5f44a42 --- /dev/null +++ b/planner/core/rule_push_down_sequence.go @@ -0,0 +1,66 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package core + +import "context" + +type pushDownSequenceSolver struct { +} + +func (*pushDownSequenceSolver) name() string { + return "push_down_sequence" +} + +func (pdss *pushDownSequenceSolver) optimize(_ context.Context, lp LogicalPlan, _ *logicalOptimizeOp) (LogicalPlan, error) { + return pdss.recursiveOptimize(nil, lp), nil +} + +func (pdss *pushDownSequenceSolver) recursiveOptimize(pushedSequence *LogicalSequence, lp LogicalPlan) LogicalPlan { + _, ok := lp.(*LogicalSequence) + if !ok && pushedSequence == nil { + newChildren := make([]LogicalPlan, 0, len(lp.Children())) + for _, child := range lp.Children() { + newChildren = append(newChildren, pdss.recursiveOptimize(nil, child)) + } + lp.SetChildren(newChildren...) + return lp + } + switch x := lp.(type) { + case *LogicalSequence: + if pushedSequence == nil { + pushedSequence = LogicalSequence{}.Init(lp.SCtx(), lp.SelectBlockOffset()) + pushedSequence.SetChildren(lp.Children()...) + return pdss.recursiveOptimize(pushedSequence, lp.Children()[len(lp.Children())-1]) + } + childLen := len(x.children) + mainQuery := x.children[childLen-1] + allCTEs := make([]LogicalPlan, 0, childLen+len(pushedSequence.children)-2) + allCTEs = append(allCTEs, pushedSequence.children[:len(pushedSequence.children)-1]...) + allCTEs = append(allCTEs, x.children[:childLen-1]...) + pushedSequence = LogicalSequence{}.Init(lp.SCtx(), lp.SelectBlockOffset()) + pushedSequence.SetChildren(append(allCTEs, mainQuery)...) + return pdss.recursiveOptimize(pushedSequence, mainQuery) + case *DataSource, *LogicalAggregation, *LogicalCTE: + pushedSequence.SetChild(len(pushedSequence.children)-1, pdss.recursiveOptimize(nil, lp)) + return pushedSequence + default: + if len(lp.Children()) > 1 { + pushedSequence.SetChild(len(pushedSequence.children)-1, lp) + return pushedSequence + } + lp.SetChildren(pdss.recursiveOptimize(pushedSequence, lp.Children()[0])) + return lp + } +} diff --git a/planner/core/rule_semi_join_rewrite.go b/planner/core/rule_semi_join_rewrite.go index e38155d8ef455..25e4f624ddfe5 100644 --- a/planner/core/rule_semi_join_rewrite.go +++ b/planner/core/rule_semi_join_rewrite.go @@ -34,6 +34,9 @@ func (*semiJoinRewriter) name() string { } func (smj *semiJoinRewriter) recursivePlan(p LogicalPlan) (LogicalPlan, error) { + if _, ok := p.(*LogicalCTE); ok { + return p, nil + } newChildren := make([]LogicalPlan, 0, len(p.Children())) for _, child := range p.Children() { newChild, err := smj.recursivePlan(child) diff --git a/planner/core/rule_topn_push_down.go b/planner/core/rule_topn_push_down.go index 1ffd3ce7b8e73..c8eff1f4bc98d 100644 --- a/planner/core/rule_topn_push_down.go +++ b/planner/core/rule_topn_push_down.go @@ -43,6 +43,13 @@ func (s *baseLogicalPlan) pushDownTopN(topN *LogicalTopN, opt *logicalOptimizeOp return p } +func (p *LogicalCTE) pushDownTopN(topN *LogicalTopN, opt *logicalOptimizeOp) LogicalPlan { + if topN != nil { + return topN.setChild(p, opt) + } + return p +} + // setChild set p as topn's child. func (lt *LogicalTopN) setChild(p LogicalPlan, opt *logicalOptimizeOp) LogicalPlan { // Remove this TopN if its child is a TableDual. diff --git a/planner/core/stats.go b/planner/core/stats.go index 0c6ac98b2d209..4caa3a2feb293 100644 --- a/planner/core/stats.go +++ b/planner/core/stats.go @@ -1108,12 +1108,16 @@ func (p *LogicalCTE) DeriveStats(_ []*property.StatsInfo, selfSchema *expression newSel := LogicalSelection{Conditions: []expression.Expression{newCond}}.Init(p.SCtx(), p.cte.seedPartLogicalPlan.SelectBlockOffset()) newSel.SetChildren(p.cte.seedPartLogicalPlan) p.cte.seedPartLogicalPlan = newSel + p.cte.optFlag |= flagPredicatePushDown } - p.cte.seedPartPhysicalPlan, _, err = DoOptimize(context.TODO(), p.ctx, p.cte.optFlag|flagPredicatePushDown, p.cte.seedPartLogicalPlan) + p.cte.seedPartLogicalPlan, p.cte.seedPartPhysicalPlan, _, err = DoOptimizeAndLogicAsRet(context.TODO(), p.ctx, p.cte.optFlag, p.cte.seedPartLogicalPlan) if err != nil { return nil, err } } + if p.onlyUsedAsStorage { + p.SetChildren(p.cte.seedPartLogicalPlan) + } resStat := p.cte.seedPartPhysicalPlan.Stats() // Changing the pointer so that seedStat in LogicalCTETable can get the new stat. *p.seedStat = *resStat @@ -1152,3 +1156,9 @@ func (p *LogicalCTETable) DeriveStats(_ []*property.StatsInfo, _ *expression.Sch p.stats = p.seedStat return p.stats, nil } + +// DeriveStats implement LogicalPlan DeriveStats interface. +func (p *LogicalSequence) DeriveStats(childStats []*property.StatsInfo, _ *expression.Schema, _ []*expression.Schema, _ [][]*expression.Column) (*property.StatsInfo, error) { + p.stats = childStats[len(childStats)-1] + return p.stats, nil +} diff --git a/planner/core/stringer.go b/planner/core/stringer.go index 3679b92ce777c..de045c5d0b0af 100644 --- a/planner/core/stringer.go +++ b/planner/core/stringer.go @@ -201,6 +201,14 @@ func toString(in Plan, strs []string, idxs []int) ([]string, []int) { } str = name + "{" + strings.Join(children, "->") + "}" idxs = idxs[:last] + case *LogicalSequence: + last := len(idxs) - 1 + idx := idxs[last] + children := strs[idx:] + strs = strs[:idx] + name := "Sequence" + str = name + "{" + strings.Join(children, ",") + "}" + idxs = idxs[:last] case *DataSource: if x.isPartition { str = fmt.Sprintf("Partition(%d)", x.physicalTableID) @@ -350,6 +358,17 @@ func toString(in Plan, strs []string, idxs []int) ([]string, []int) { for _, task := range x.TargetTasks { str += fmt.Sprintf("%d, ", task.ID) } + for _, tasks := range x.TargetCTEReaderTasks { + str += "(" + for _, task := range tasks { + str += fmt.Sprintf("%d, ", task.ID) + } + str += ")" + } + str += ")" + case *PhysicalCTE: + str = "CTEReader(" + str += fmt.Sprintf("%v", x.CTE.IDForStorage) str += ")" default: str = fmt.Sprintf("%T", in) diff --git a/planner/core/task.go b/planner/core/task.go index 2925785219591..885cfc9f50fee 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -2623,6 +2623,50 @@ func (p *PhysicalWindow) attach2Task(tasks ...task) task { return attachPlan2Task(p.self, t) } +func (p *PhysicalCTEStorage) attach2Task(tasks ...task) task { + t := tasks[0].copy() + if mpp, ok := t.(*mppTask); ok { + p.SetChildren(t.plan()) + return &mppTask{ + p: p, + partTp: mpp.partTp, + hashCols: mpp.hashCols, + tblColHists: mpp.tblColHists, + } + } + t.convertToRootTask(p.ctx) + p.SetChildren(t.plan()) + return &rootTask{ + p: p, + } +} + +func (p *PhysicalSequence) attach2Task(tasks ...task) task { + for _, t := range tasks { + _, isMpp := t.(*mppTask) + if !isMpp { + return tasks[len(tasks)-1] + } + } + + lastTask := tasks[len(tasks)-1].(*mppTask) + + children := make([]PhysicalPlan, 0, len(tasks)) + for _, t := range tasks { + children = append(children, t.plan()) + } + + p.SetChildren(children...) + + mppTask := &mppTask{ + p: p, + partTp: lastTask.partTp, + hashCols: lastTask.hashCols, + tblColHists: lastTask.tblColHists, + } + return mppTask +} + // mppTask can not : // 1. keep order // 2. support double read diff --git a/planner/property/physical_property.go b/planner/property/physical_property.go index 314737174db1d..50e05d6d24f60 100644 --- a/planner/property/physical_property.go +++ b/planner/property/physical_property.go @@ -161,6 +161,16 @@ func GetCollateNameByIDForPartition(collateID int32) string { return collate.CollationID2Name(collateID) } +// cteProducerStatus indicates whether we can let the current CTE consumer/reader be executed on the MPP nodes. +type cteProducerStatus int + +// Constants for CTE status. +const ( + NoCTEOrAllProducerCanMPP cteProducerStatus = iota + SomeCTEFailedMpp + AllCTECanMpp +) + // PhysicalProperty stands for the required physical property by parents. // It contains the orders and the task types. type PhysicalProperty struct { @@ -202,6 +212,8 @@ type PhysicalProperty struct { // RejectSort means rejecting the sort property from its children, but it only works for MPP tasks. // Non-MPP tasks do not care about it. RejectSort bool + + CTEProducerStatus cteProducerStatus } // NewPhysicalProperty builds property from columns. @@ -332,6 +344,7 @@ func (p *PhysicalProperty) HashCode() []byte { p.hashcode = append(p.hashcode, col.hashCode(nil)...) } } + p.hashcode = append(p.hashcode, codec.EncodeInt(nil, int64(p.CTEProducerStatus))...) return p.hashcode } @@ -351,6 +364,7 @@ func (p *PhysicalProperty) CloneEssentialFields() *PhysicalProperty { MPPPartitionTp: p.MPPPartitionTp, MPPPartitionCols: p.MPPPartitionCols, RejectSort: p.RejectSort, + CTEProducerStatus: p.CTEProducerStatus, } return prop } diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 61eef429cd346..3a5be1690165e 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -1475,6 +1475,9 @@ type SessionVars struct { // use the ExpectedCnt to adjust the estimated row count for index scan. OptOrderingIdxSelThresh float64 + // EnableMPPSharedCTEExecution indicates whether we enable the shared CTE execution strategy on MPP side. + EnableMPPSharedCTEExecution bool + // OptimizerFixControl control some details of the optimizer behavior through the tidb_opt_fix_control variable. OptimizerFixControl map[uint64]string diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 4cab8f38af584..148e08e3b0f57 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -2488,6 +2488,10 @@ var defaultSysVars = []*SysVar{ s.OptOrderingIdxSelThresh = tidbOptFloat64(val, DefTiDBOptOrderingIdxSelThresh) return nil }}, + {Scope: ScopeGlobal | ScopeSession, Name: TiDBOptEnableMPPSharedCTEExecution, Value: BoolToOnOff(DefTiDBOptEnableMPPSharedCTEExecution), Type: TypeBool, SetSession: func(s *SessionVars, val string) error { + s.EnableMPPSharedCTEExecution = TiDBOptOn(val) + return nil + }}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBOptFixControl, Value: "", Type: TypeStr, IsHintUpdatable: true, SetSession: func(s *SessionVars, val string) error { newMap := make(map[uint64]string) diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 0ee5a1df15b33..64b3f86cf9dcb 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -864,6 +864,8 @@ const ( // TiDBOptOrderingIdxSelThresh is the threshold for optimizer to consider the ordering index. TiDBOptOrderingIdxSelThresh = "tidb_opt_ordering_index_selectivity_threshold" + // TiDBOptEnableMPPSharedCTEExecution indicates whehter the optimizer try to build shared CTE scan during MPP execution. + TiDBOptEnableMPPSharedCTEExecution = "tidb_opt_enable_mpp_shared_cte_execution" // TiDBOptFixControl makes the user able to control some details of the optimizer behavior. TiDBOptFixControl = "tidb_opt_fix_control" ) @@ -1314,6 +1316,7 @@ const ( DefTiDBLoadBasedReplicaReadThreshold = time.Second DefTiDBOptEnableLateMaterialization = true DefTiDBOptOrderingIdxSelThresh = 0.0 + DefTiDBOptEnableMPPSharedCTEExecution = false DefTiDBPlanCacheInvalidationOnFreshStats = true DefTiDBEnableRowLevelChecksum = false DefAuthenticationLDAPSASLAuthMethodName = "SCRAM-SHA-1" diff --git a/store/copr/mpp.go b/store/copr/mpp.go index 5bd374651c122..7960167d26429 100644 --- a/store/copr/mpp.go +++ b/store/copr/mpp.go @@ -491,6 +491,7 @@ func (m *mppIterator) handleMPPStreamResponse(bo *Backoffer, response *mpp.MPPDa zap.Uint64("txnStartTS", req.StartTs), zap.String("storeAddr", req.Meta.GetAddress()), zap.Int64("mpp-version", m.mppVersion.ToInt64()), + zap.Int64("task-id", req.ID), zap.Error(err)) return err } diff --git a/util/plancodec/id.go b/util/plancodec/id.go index 20ad0e4301cfa..c23959072e4f6 100644 --- a/util/plancodec/id.go +++ b/util/plancodec/id.go @@ -133,6 +133,8 @@ const ( TypeForeignKeyCheck = "Foreign_Key_Check" // TypeForeignKeyCascade is the type of FKCascade TypeForeignKeyCascade = "Foreign_Key_Cascade" + // TypeSequence is the type of Sequence + TypeSequence = "Sequence" ) // plan id.