Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support to execute CTE on MPP side #42296

Merged
merged 36 commits into from
May 24, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
3f91dd3
tmp
winoros Feb 15, 2023
95375d7
fix the mpp prop
winoros Feb 22, 2023
e50927c
Merge branch 'master' into add-sequence-operator
winoros Feb 26, 2023
6285fb0
Merge branch 'master' into add-sequence-operator
winoros Feb 27, 2023
5584c9a
planner: support sending cte mpp task
winoros Mar 1, 2023
6044c08
fix panics
winoros Mar 1, 2023
6c69b8a
some codes updates
winoros Mar 8, 2023
d6c27bf
update the codes
winoros Mar 8, 2023
9133d92
change style and clean
winoros Mar 15, 2023
2dc1de9
Merge branch 'master' into add-sequence-operator
winoros Mar 15, 2023
6c81152
clean the debugging info, make it ready for review
winoros Mar 15, 2023
380b4d1
Merge branch 'master' into add-sequence-operator
winoros Mar 28, 2023
e4010a8
push sequence down
winoros Apr 4, 2023
699e39d
Merge remote-tracking branch 'origin/master' into add-sequence-operator
winoros Apr 4, 2023
521d66d
Apply suggestions from code review
winoros Apr 18, 2023
0b5244c
Merge branch 'master' into add-sequence-operator
winoros Apr 18, 2023
3f2418a
fix tests
winoros Apr 18, 2023
671ef17
address comments && add tests
winoros Apr 23, 2023
863890e
Merge branch 'master' into add-sequence-operator
winoros Apr 23, 2023
7e918fc
fix gofmt
winoros Apr 23, 2023
2341c88
fix check && fix test
winoros Apr 23, 2023
fa4ecaf
fix the cte producer status and add tests
winoros Apr 24, 2023
a924eef
Merge branch 'master' into add-sequence-operator
winoros Apr 25, 2023
f5d4303
address comments
winoros Apr 25, 2023
4657c67
merge the methods
winoros Apr 27, 2023
1c8994f
Merge branch 'master' into add-sequence-operator
winoros May 9, 2023
e5879cd
fix the aggregation's bad case
winoros May 10, 2023
af5c066
Merge branch 'master' into add-sequence-operator
winoros May 10, 2023
98868c0
Merge branch 'master' into add-sequence-operator
winoros May 17, 2023
84e8ac8
address comments
winoros May 17, 2023
9004ef0
fix bazel_prepare
winoros May 17, 2023
9088ad4
remove debug log
winoros May 18, 2023
06f4b3b
address comments
winoros May 23, 2023
f7026cd
Merge branch 'master' into add-sequence-operator
winoros May 23, 2023
51a1178
fix lint
winoros May 23, 2023
34864c8
fix tests
winoros May 24, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1174,6 +1174,12 @@ func (a *ExecStmt) buildExecutor() (Executor, error) {
}

b := newExecutorBuilder(ctx, a.InfoSchema, a.Ti)
if !a.Ctx.GetSessionVars().InRestrictedSQL {
logutil.BgLogger().Warn("bulding executor",
zap.String("the top one's output cols", a.Plan.Schema().String()),
zap.String("the output names", fmt.Sprintf("%v", a.OutputNames)),
)
}
e := b.build(a.Plan)
if b.err != nil {
return nil, errors.Trace(b.err)
Expand Down
92 changes: 92 additions & 0 deletions executor/mpp_gather.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ package executor

import (
"context"
"fmt"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/mpp"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -88,6 +90,16 @@ func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment) error {
} else {
dagReq.EncodeType = tipb.EncodeType_TypeChunk
}
// for _, mppTask := range pf.ExchangeSender.Tasks {
// logutil.BgLogger().Info("Dispatch mpp task", zap.Uint64("timestamp", mppTask.StartTs),
// zap.Int64("ID", mppTask.ID), zap.Uint64("QueryTs", mppTask.MppQueryID.QueryTs), zap.Uint64("LocalQueryId", mppTask.MppQueryID.LocalQueryID),
// zap.Uint64("ServerID", mppTask.MppQueryID.ServerID), zap.String("address", mppTask.Meta.GetAddress()),
// zap.String("plan", plannercore.ToStringNewForMPP(pf.ExchangeSender)),
// zap.Int64("mpp-version", mppTask.MppVersion.ToInt64()),
// zap.String("exchange-compression-mode", pf.ExchangeSender.CompressionMode.Name()),
// )
// }
// return nil
for _, mppTask := range pf.ExchangeSender.Tasks {
if mppTask.PartitionTableIDs != nil {
err = updateExecutorTableID(context.Background(), dagReq.RootExecutor, true, mppTask.PartitionTableIDs)
Expand All @@ -99,6 +111,10 @@ func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment) error {
if err != nil {
return errors.Trace(err)
}
err = e.fixTaskForCTEStorageAndReader(dagReq.RootExecutor, mppTask.Meta)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why should we align the address after pb is generated

Copy link
Member Author

@winoros winoros Apr 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's because each task of the CTE producer/consumer needs different upstream/downstream compared with the ones in the same fragment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider that there're two producer tasks A,B. You know that A and B's upstream consumer task is different.
In current fragments generation. We just use one struct to represent one fragment. So before we send the task. A&B shares the same upstream task address(The whole address list of the upstream consumer fragments).
Hence, we need to set them again here.

if err != nil {
return err
}
pbData, err := dagReq.Marshal()
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -127,6 +143,81 @@ func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment) error {
return nil
}

// fixTaskForCTEStorageAndReader fixes the upstream/downstream tasks for the producers and consumers.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please write some meaningful comments.

// We only setup local transport for the data.
func (e *MPPGather) fixTaskForCTEStorageAndReader(exec *tipb.Executor, meta kv.MPPTaskMeta) error {
children := make([]*tipb.Executor, 0, 2)
switch exec.Tp {
case tipb.ExecType_TypeTableScan, tipb.ExecType_TypePartitionTableScan, tipb.ExecType_TypeIndexScan:
case tipb.ExecType_TypeSelection:
children = append(children, exec.Selection.Child)
case tipb.ExecType_TypeAggregation, tipb.ExecType_TypeStreamAgg:
children = append(children, exec.Aggregation.Child)
case tipb.ExecType_TypeTopN:
children = append(children, exec.TopN.Child)
case tipb.ExecType_TypeLimit:
children = append(children, exec.Limit.Child)
case tipb.ExecType_TypeExchangeSender:
children = append(children, exec.ExchangeSender.Child)
if len(exec.ExchangeSender.UpstreamCteTaskMeta) == 0 {
break
}
actualUpStreamTasks := make([][]byte, 0, len(exec.ExchangeSender.UpstreamCteTaskMeta))
actualTIDs := make([]int64, 0, len(exec.ExchangeSender.UpstreamCteTaskMeta))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not used variable

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used for the debug log.

for _, tasksFromOneConsumer := range exec.ExchangeSender.UpstreamCteTaskMeta {
for _, taskBytes := range tasksFromOneConsumer.EncodedTasks {
taskMeta := &mpp.TaskMeta{}
err := taskMeta.Unmarshal(taskBytes)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really strange and complex code.
Marshal it in constructDistExecForTiFlash then unmarshal it here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but it's difficult to change since we split the tasks of one fragment just out here.

if err != nil {
return err
}
if taskMeta.Address != meta.GetAddress() {
continue
}
actualUpStreamTasks = append(actualUpStreamTasks, taskBytes)
actualTIDs = append(actualTIDs, taskMeta.TaskId)
}
}
logutil.BgLogger().Warn("refine tunnel for cte producer task", zap.String("the final tunnel", fmt.Sprintf("up stream consumer tasks: %v", actualTIDs)))
exec.ExchangeSender.EncodedTaskMeta = actualUpStreamTasks
case tipb.ExecType_TypeExchangeReceiver:
if len(exec.ExchangeReceiver.OriginalCtePrdocuerTaskMeta) == 0 {
break
}
exec.ExchangeReceiver.EncodedTaskMeta = [][]byte{}
actualTIDs := make([]int64, 0, 4)
for _, taskBytes := range exec.ExchangeReceiver.OriginalCtePrdocuerTaskMeta {
taskMeta := &mpp.TaskMeta{}
err := taskMeta.Unmarshal(taskBytes)
if err != nil {
return err
}
if taskMeta.Address != meta.GetAddress() {
continue
}
exec.ExchangeReceiver.EncodedTaskMeta = append(exec.ExchangeReceiver.EncodedTaskMeta, taskBytes)
actualTIDs = append(actualTIDs, taskMeta.TaskId)
}
logutil.BgLogger().Warn("refine tunnel for cte consumer task", zap.String("the final tunnel", fmt.Sprintf("down stream producer task: %v", actualTIDs)))
case tipb.ExecType_TypeJoin:
children = append(children, exec.Join.Children...)
case tipb.ExecType_TypeProjection:
children = append(children, exec.Projection.Child)
case tipb.ExecType_TypeWindow:
children = append(children, exec.Window.Child)
case tipb.ExecType_TypeSort:
children = append(children, exec.Sort.Child)
case tipb.ExecType_TypeExpand:
children = append(children, exec.Expand.Child)
default:
return errors.Errorf("unknown new tipb protocol %d", exec.Tp)
}
for _, child := range children {
e.fixTaskForCTEStorageAndReader(child, meta)
}
return nil
}

func collectPlanIDS(plan plannercore.PhysicalPlan, ids []int) []int {
ids = append(ids, plan.ID())
for _, child := range plan.Children() {
Expand All @@ -151,6 +242,7 @@ func (e *MPPGather) Open(ctx context.Context) (err error) {
return errors.Trace(err)
}
}
// return errors.Errorf("break for debugging")
failpoint.Inject("checkTotalMPPTasks", func(val failpoint.Value) {
if val.(int) != len(e.mppReqs) {
failpoint.Return(errors.Errorf("The number of tasks is not right, expect %d tasks but actually there are %d tasks", val.(int), len(e.mppReqs)))
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -277,5 +277,6 @@ replace (
// fix potential security issue(CVE-2020-26160) introduced by indirect dependency.
github.com/dgrijalva/jwt-go => github.com/form3tech-oss/jwt-go v3.2.6-0.20210809144907-32ab6a8243d7+incompatible
github.com/pingcap/tidb/parser => ./parser
github.com/pingcap/tipb => github.com/pingcap/tipb v0.0.0-20230305184922-3290e9e4ca8a
go.opencensus.io => go.opencensus.io v0.23.1-0.20220331163232-052120675fac
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -781,8 +781,8 @@ github.com/pingcap/log v1.1.1-0.20221116035753-734d527bc87c h1:crhkw6DD+07Bg1wYh
github.com/pingcap/log v1.1.1-0.20221116035753-734d527bc87c/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4 h1:HYbcxtnkN3s5tqrZ/z3eJS4j3Db8wMphEm1q10lY/TM=
github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4/go.mod h1:sDCsM39cGiv2vwunZkaFA917vVkqDTGSPbbV7z4Oops=
github.com/pingcap/tipb v0.0.0-20230119054146-c6b7a5a1623b h1:j5sw2YZY7QfgIFZEoUcn1P5cYflms1PCVVS96i+IQiI=
github.com/pingcap/tipb v0.0.0-20230119054146-c6b7a5a1623b/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
github.com/pingcap/tipb v0.0.0-20230305184922-3290e9e4ca8a h1:UV88ixc+osXKY+qa7fBellKgISQIOGtuqQuEAZ0zWVU=
github.com/pingcap/tipb v0.0.0-20230305184922-3290e9e4ca8a/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4 h1:49lOXmGaUpV9Fz3gd7TFZY106KVlPVa5jcYD1gaQf98=
github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4/go.mod h1:4OwLy04Bl9Ef3GJJCoec+30X3LQs/0/m4HFRt/2LUSA=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
6 changes: 5 additions & 1 deletion planner/core/access_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,11 @@ func (p *PhysicalTableReader) accessObject(sctx sessionctx.Context) AccessObject
return DynamicPartitionAccessObjects(nil)
}
if len(p.PartitionInfos) == 0 {
ts := p.TablePlans[0].(*PhysicalTableScan)
ts, ok := p.TablePlans[0].(*PhysicalTableScan)
if !ok {
cte := p.TablePlans[0].(*PhysicalCTE)
return OtherAccessObject(fmt.Sprintf("cte: %v as %v", cte.cteName, cte.cteAsName))
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic seems useless.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't have this part, some SQL will panic...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then the type assertion is enough, I think the OtherAccessObject is unneeded.

asName := ""
if ts.TableAsName != nil && len(ts.TableAsName.O) > 0 {
asName = ts.TableAsName.O
Expand Down
5 changes: 4 additions & 1 deletion planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1321,7 +1321,10 @@ func IsPointGetWithPKOrUniqueKeyByAutoCommit(ctx sessionctx.Context, p Plan) (bo
indexScan := v.IndexPlans[0].(*PhysicalIndexScan)
return indexScan.IsPointGetByUniqueKey(ctx), nil
case *PhysicalTableReader:
tableScan := v.TablePlans[0].(*PhysicalTableScan)
tableScan, ok := v.TablePlans[0].(*PhysicalTableScan)
if !ok {
return false, nil
}
isPointRange := len(tableScan.Ranges) == 1 && tableScan.Ranges[0].IsPointNonNullable(ctx)
if !isPointRange {
return false, nil
Expand Down
Loading