-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
*: support to execute CTE on MPP side #42296
Changes from 8 commits
3f91dd3
95375d7
e50927c
6285fb0
5584c9a
6044c08
6c69b8a
d6c27bf
9133d92
2dc1de9
6c81152
380b4d1
e4010a8
699e39d
521d66d
0b5244c
3f2418a
671ef17
863890e
7e918fc
2341c88
fa4ecaf
a924eef
f5d4303
4657c67
1c8994f
e5879cd
af5c066
98868c0
84e8ac8
9004ef0
9088ad4
06f4b3b
f7026cd
51a1178
34864c8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,10 +16,12 @@ package executor | |
|
||
import ( | ||
"context" | ||
"fmt" | ||
"time" | ||
|
||
"github.com/pingcap/errors" | ||
"github.com/pingcap/failpoint" | ||
"github.com/pingcap/kvproto/pkg/mpp" | ||
"github.com/pingcap/tidb/distsql" | ||
"github.com/pingcap/tidb/infoschema" | ||
"github.com/pingcap/tidb/kv" | ||
|
@@ -88,6 +90,16 @@ func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment) error { | |
} else { | ||
dagReq.EncodeType = tipb.EncodeType_TypeChunk | ||
} | ||
// for _, mppTask := range pf.ExchangeSender.Tasks { | ||
// logutil.BgLogger().Info("Dispatch mpp task", zap.Uint64("timestamp", mppTask.StartTs), | ||
// zap.Int64("ID", mppTask.ID), zap.Uint64("QueryTs", mppTask.MppQueryID.QueryTs), zap.Uint64("LocalQueryId", mppTask.MppQueryID.LocalQueryID), | ||
// zap.Uint64("ServerID", mppTask.MppQueryID.ServerID), zap.String("address", mppTask.Meta.GetAddress()), | ||
// zap.String("plan", plannercore.ToStringNewForMPP(pf.ExchangeSender)), | ||
// zap.Int64("mpp-version", mppTask.MppVersion.ToInt64()), | ||
// zap.String("exchange-compression-mode", pf.ExchangeSender.CompressionMode.Name()), | ||
// ) | ||
// } | ||
// return nil | ||
for _, mppTask := range pf.ExchangeSender.Tasks { | ||
if mppTask.PartitionTableIDs != nil { | ||
err = updateExecutorTableID(context.Background(), dagReq.RootExecutor, true, mppTask.PartitionTableIDs) | ||
|
@@ -99,6 +111,10 @@ func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment) error { | |
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
err = e.fixTaskForCTEStorageAndReader(dagReq.RootExecutor, mppTask.Meta) | ||
if err != nil { | ||
return err | ||
} | ||
pbData, err := dagReq.Marshal() | ||
if err != nil { | ||
return errors.Trace(err) | ||
|
@@ -127,6 +143,81 @@ func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment) error { | |
return nil | ||
} | ||
|
||
// fixTaskForCTEStorageAndReader fixes the upstream/downstream tasks for the producers and consumers. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please write some meaningful comments. |
||
// We only setup local transport for the data. | ||
func (e *MPPGather) fixTaskForCTEStorageAndReader(exec *tipb.Executor, meta kv.MPPTaskMeta) error { | ||
children := make([]*tipb.Executor, 0, 2) | ||
switch exec.Tp { | ||
case tipb.ExecType_TypeTableScan, tipb.ExecType_TypePartitionTableScan, tipb.ExecType_TypeIndexScan: | ||
case tipb.ExecType_TypeSelection: | ||
children = append(children, exec.Selection.Child) | ||
case tipb.ExecType_TypeAggregation, tipb.ExecType_TypeStreamAgg: | ||
children = append(children, exec.Aggregation.Child) | ||
case tipb.ExecType_TypeTopN: | ||
children = append(children, exec.TopN.Child) | ||
case tipb.ExecType_TypeLimit: | ||
children = append(children, exec.Limit.Child) | ||
case tipb.ExecType_TypeExchangeSender: | ||
children = append(children, exec.ExchangeSender.Child) | ||
if len(exec.ExchangeSender.UpstreamCteTaskMeta) == 0 { | ||
break | ||
} | ||
actualUpStreamTasks := make([][]byte, 0, len(exec.ExchangeSender.UpstreamCteTaskMeta)) | ||
actualTIDs := make([]int64, 0, len(exec.ExchangeSender.UpstreamCteTaskMeta)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not used variable There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's used for the debug log. |
||
for _, tasksFromOneConsumer := range exec.ExchangeSender.UpstreamCteTaskMeta { | ||
for _, taskBytes := range tasksFromOneConsumer.EncodedTasks { | ||
taskMeta := &mpp.TaskMeta{} | ||
err := taskMeta.Unmarshal(taskBytes) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Really strange and complex code. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, but it's difficult to change since we split the tasks of one fragment just out here. |
||
if err != nil { | ||
return err | ||
} | ||
if taskMeta.Address != meta.GetAddress() { | ||
continue | ||
} | ||
actualUpStreamTasks = append(actualUpStreamTasks, taskBytes) | ||
actualTIDs = append(actualTIDs, taskMeta.TaskId) | ||
} | ||
} | ||
logutil.BgLogger().Warn("refine tunnel for cte producer task", zap.String("the final tunnel", fmt.Sprintf("up stream consumer tasks: %v", actualTIDs))) | ||
exec.ExchangeSender.EncodedTaskMeta = actualUpStreamTasks | ||
case tipb.ExecType_TypeExchangeReceiver: | ||
if len(exec.ExchangeReceiver.OriginalCtePrdocuerTaskMeta) == 0 { | ||
break | ||
} | ||
exec.ExchangeReceiver.EncodedTaskMeta = [][]byte{} | ||
actualTIDs := make([]int64, 0, 4) | ||
for _, taskBytes := range exec.ExchangeReceiver.OriginalCtePrdocuerTaskMeta { | ||
taskMeta := &mpp.TaskMeta{} | ||
err := taskMeta.Unmarshal(taskBytes) | ||
if err != nil { | ||
return err | ||
} | ||
if taskMeta.Address != meta.GetAddress() { | ||
continue | ||
} | ||
exec.ExchangeReceiver.EncodedTaskMeta = append(exec.ExchangeReceiver.EncodedTaskMeta, taskBytes) | ||
actualTIDs = append(actualTIDs, taskMeta.TaskId) | ||
} | ||
logutil.BgLogger().Warn("refine tunnel for cte consumer task", zap.String("the final tunnel", fmt.Sprintf("down stream producer task: %v", actualTIDs))) | ||
case tipb.ExecType_TypeJoin: | ||
children = append(children, exec.Join.Children...) | ||
case tipb.ExecType_TypeProjection: | ||
children = append(children, exec.Projection.Child) | ||
case tipb.ExecType_TypeWindow: | ||
children = append(children, exec.Window.Child) | ||
case tipb.ExecType_TypeSort: | ||
children = append(children, exec.Sort.Child) | ||
case tipb.ExecType_TypeExpand: | ||
children = append(children, exec.Expand.Child) | ||
default: | ||
return errors.Errorf("unknown new tipb protocol %d", exec.Tp) | ||
} | ||
for _, child := range children { | ||
e.fixTaskForCTEStorageAndReader(child, meta) | ||
} | ||
return nil | ||
} | ||
|
||
func collectPlanIDS(plan plannercore.PhysicalPlan, ids []int) []int { | ||
ids = append(ids, plan.ID()) | ||
for _, child := range plan.Children() { | ||
|
@@ -151,6 +242,7 @@ func (e *MPPGather) Open(ctx context.Context) (err error) { | |
return errors.Trace(err) | ||
} | ||
} | ||
// return errors.Errorf("break for debugging") | ||
failpoint.Inject("checkTotalMPPTasks", func(val failpoint.Value) { | ||
if val.(int) != len(e.mppReqs) { | ||
failpoint.Return(errors.Errorf("The number of tasks is not right, expect %d tasks but actually there are %d tasks", val.(int), len(e.mppReqs))) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -396,7 +396,11 @@ func (p *PhysicalTableReader) accessObject(sctx sessionctx.Context) AccessObject | |
return DynamicPartitionAccessObjects(nil) | ||
} | ||
if len(p.PartitionInfos) == 0 { | ||
ts := p.TablePlans[0].(*PhysicalTableScan) | ||
ts, ok := p.TablePlans[0].(*PhysicalTableScan) | ||
if !ok { | ||
cte := p.TablePlans[0].(*PhysicalCTE) | ||
return OtherAccessObject(fmt.Sprintf("cte: %v as %v", cte.cteName, cte.cteAsName)) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This logic seems useless. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we don't have this part, some SQL will panic... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Then the type assertion is enough, I think the |
||
asName := "" | ||
if ts.TableAsName != nil && len(ts.TableAsName.O) > 0 { | ||
asName = ts.TableAsName.O | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why should we align the address after pb is generated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's because each task of the CTE producer/consumer needs different upstream/downstream compared with the ones in the same fragment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider that there're two producer tasks A,B. You know that A and B's upstream consumer task is different.
In current fragments generation. We just use one struct to represent one fragment. So before we send the task. A&B shares the same upstream task address(The whole address list of the upstream consumer fragments).
Hence, we need to set them again here.