Skip to content

Commit

Permalink
*: add foreign key check/cascade runtime stats information in explain…
Browse files Browse the repository at this point in the history
… analyze result (#39203)

close #39202
  • Loading branch information
crazycs520 authored Nov 30, 2022
1 parent f06fb5a commit 63ce793
Show file tree
Hide file tree
Showing 13 changed files with 518 additions and 42 deletions.
2 changes: 1 addition & 1 deletion distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func TestSelectResultRuntimeStats(t *testing.T) {
}
stmtStats.RegisterStats(2, s1)
stats = stmtStats.GetRootStats(2)
expect = "time:0s, loops:0, cop_task: {num: 2, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 1s, tot_wait: 1s, rpc_num: 1, rpc_time: 1s, copr_cache_hit_ratio: 0.00, distsql_concurrency: 15}, backoff{RegionMiss: 1ms}"
expect = "cop_task: {num: 2, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 1s, tot_wait: 1s, rpc_num: 1, rpc_time: 1s, copr_cache_hit_ratio: 0.00, distsql_concurrency: 15}, backoff{RegionMiss: 1ms}"
require.Equal(t, expect, stats.String())
// Test for idempotence.
require.Equal(t, expect, stats.String())
Expand Down
18 changes: 14 additions & 4 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,10 +605,6 @@ func (a *ExecStmt) handleForeignKeyTrigger(ctx context.Context, e Executor, dept
if !ok {
return nil
}
a.Ctx.GetSessionVars().StmtCtx.InHandleForeignKeyTrigger = true
defer func() {
a.Ctx.GetSessionVars().StmtCtx.InHandleForeignKeyTrigger = false
}()
fkChecks := exec.GetFKChecks()
for _, fkCheck := range fkChecks {
err := fkCheck.doCheck(ctx)
Expand Down Expand Up @@ -638,12 +634,26 @@ func (a *ExecStmt) handleForeignKeyTrigger(ctx context.Context, e Executor, dept
// 4. `StmtCommit` to commit the kv change to transaction mem-buffer.
// 5. If the foreign key cascade behaviour has more fk value need to be cascaded, go to step 1.
func (a *ExecStmt) handleForeignKeyCascade(ctx context.Context, fkc *FKCascadeExec, depth int) error {
if a.Ctx.GetSessionVars().StmtCtx.RuntimeStatsColl != nil {
fkc.stats = &FKCascadeRuntimeStats{}
defer a.Ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(fkc.plan.ID(), fkc.stats)
}
if len(fkc.fkValues) == 0 && len(fkc.fkUpdatedValuesMap) == 0 {
return nil
}
if depth > maxForeignKeyCascadeDepth {
return ErrForeignKeyCascadeDepthExceeded.GenWithStackByArgs(maxForeignKeyCascadeDepth)
}
a.Ctx.GetSessionVars().StmtCtx.InHandleForeignKeyTrigger = true
defer func() {
a.Ctx.GetSessionVars().StmtCtx.InHandleForeignKeyTrigger = false
}()
if fkc.stats != nil {
start := time.Now()
defer func() {
fkc.stats.Total += time.Since(start)
}()
}
for {
e, err := fkc.buildExecutor(ctx)
if err != nil || e == nil {
Expand Down
344 changes: 333 additions & 11 deletions executor/fktest/foreign_key_test.go

Large diffs are not rendered by default.

134 changes: 133 additions & 1 deletion executor/foreign_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
package executor

import (
"bytes"
"context"
"strconv"
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
Expand All @@ -31,6 +34,7 @@ import (
"github.com/pingcap/tidb/types"
driver "github.com/pingcap/tidb/types/parser_driver"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/set"
"github.com/tikv/client-go/v2/txnkv/txnsnapshot"
)
Expand Down Expand Up @@ -60,12 +64,16 @@ type FKCheckExec struct {

// FKCheckRuntimeStats contains the FKCheckExec runtime stats.
type FKCheckRuntimeStats struct {
Keys int
Total time.Duration
Check time.Duration
Lock time.Duration
Keys int
}

// FKCascadeExec uses to execute foreign key cascade behaviour.
type FKCascadeExec struct {
*fkValueHelper
plan *plannercore.FKCascade
b *executorBuilder
tp plannercore.FKCascadeType
referredFK *model.ReferredFKInfo
Expand All @@ -78,6 +86,8 @@ type FKCascadeExec struct {
fkValues [][]types.Datum
// new-value-key => UpdatedValuesCouple
fkUpdatedValuesMap map[string]*UpdatedValuesCouple

stats *FKCascadeRuntimeStats
}

// UpdatedValuesCouple contains the updated new row the old rows, exporting for test.
Expand All @@ -86,6 +96,12 @@ type UpdatedValuesCouple struct {
OldValuesList [][]types.Datum
}

// FKCascadeRuntimeStats contains the FKCascadeExec runtime stats.
type FKCascadeRuntimeStats struct {
Total time.Duration
Keys int
}

func buildTblID2FKCheckExecs(sctx sessionctx.Context, tblID2Table map[int64]table.Table, tblID2FKChecks map[int64][]*plannercore.FKCheck) (map[int64][]*FKCheckExec, error) {
fkChecksMap := make(map[int64][]*FKCheckExec)
for tid, tbl := range tblID2Table {
Expand Down Expand Up @@ -175,6 +191,20 @@ func (fkc *FKCheckExec) addRowNeedToCheck(sc *stmtctx.StatementContext, row []ty
}

func (fkc *FKCheckExec) doCheck(ctx context.Context) error {
if fkc.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl != nil {
fkc.stats = &FKCheckRuntimeStats{}
defer fkc.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(fkc.ID(), fkc.stats)
}
if len(fkc.toBeCheckedKeys) == 0 && len(fkc.toBeCheckedPrefixKeys) == 0 {
return nil
}
start := time.Now()
if fkc.stats != nil {
defer func() {
fkc.stats.Keys = len(fkc.toBeCheckedKeys) + len(fkc.toBeCheckedPrefixKeys)
fkc.stats.Total = time.Since(start)
}()
}
txn, err := fkc.ctx.Txn(false)
if err != nil {
return err
Expand All @@ -187,6 +217,9 @@ func (fkc *FKCheckExec) doCheck(ctx context.Context) error {
if err != nil {
return err
}
if fkc.stats != nil {
fkc.stats.Check = time.Since(start)
}
if len(fkc.toBeLockedKeys) == 0 {
return nil
}
Expand All @@ -202,6 +235,9 @@ func (fkc *FKCheckExec) doCheck(ctx context.Context) error {
// doLockKeys may set TxnCtx.ForUpdate to 1, then if the lock meet write conflict, TiDB can't retry for update.
// So reset TxnCtx.ForUpdate to 0 then can be retry if meet write conflict.
atomic.StoreUint32(&sessVars.TxnCtx.ForUpdate, forUpdate)
if fkc.stats != nil {
fkc.stats.Lock = time.Since(start) - fkc.stats.Check
}
return err
}

Expand Down Expand Up @@ -477,6 +513,10 @@ type fkCheckKey struct {
}

func (fkc FKCheckExec) checkRows(ctx context.Context, sc *stmtctx.StatementContext, txn kv.Transaction, rows []toBeCheckedRow) error {
if fkc.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl != nil {
fkc.stats = &FKCheckRuntimeStats{}
defer fkc.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(fkc.ID(), fkc.stats)
}
if len(rows) == 0 {
return nil
}
Expand Down Expand Up @@ -589,6 +629,7 @@ func (b *executorBuilder) buildFKCascadeExec(tbl table.Table, fkCascade *planner
return &FKCascadeExec{
b: b,
fkValueHelper: helper,
plan: fkCascade,
tp: fkCascade.Tp,
referredFK: fkCascade.ReferredFK,
childTable: fkCascade.ChildTable.Meta(),
Expand Down Expand Up @@ -641,6 +682,7 @@ func (fkc *FKCascadeExec) buildExecutor(ctx context.Context) (Executor, error) {
if err != nil || p == nil {
return nil, err
}
fkc.plan.CascadePlans = append(fkc.plan.CascadePlans, p)
e := fkc.b.build(p)
return e, fkc.b.err
}
Expand Down Expand Up @@ -672,6 +714,9 @@ func (fkc *FKCascadeExec) buildFKCascadePlan(ctx context.Context) (plannercore.P
case model.ReferOptionCascade:
couple := fkc.fetchUpdatedValuesCouple()
if couple != nil && len(couple.NewValues) != 0 {
if fkc.stats != nil {
fkc.stats.Keys += len(couple.OldValuesList)
}
stmtNode = GenCascadeUpdateAST(fkc.referredFK.ChildSchema, fkc.childTable.Name, indexName, fkc.fkCols, couple)
}
case model.ReferOptionSetNull:
Expand Down Expand Up @@ -703,6 +748,9 @@ func (fkc *FKCascadeExec) fetchOnDeleteOrUpdateFKValues() [][]types.Datum {
fkValues = fkc.fkValues[:maxHandleFKValueInOneCascade]
fkc.fkValues = fkc.fkValues[maxHandleFKValueInOneCascade:]
}
if fkc.stats != nil {
fkc.stats.Keys += len(fkValues)
}
return fkValues
}

Expand Down Expand Up @@ -815,3 +863,87 @@ func genWhereConditionAstForMultiColumn(cols []*model.ColumnInfo, fkValues [][]t
List: valueList,
}
}

// String implements the RuntimeStats interface.
func (s *FKCheckRuntimeStats) String() string {
buf := bytes.NewBuffer(make([]byte, 0, 32))
buf.WriteString("total:")
buf.WriteString(execdetails.FormatDuration(s.Total))
if s.Check > 0 {
buf.WriteString(", check:")
buf.WriteString(execdetails.FormatDuration(s.Check))
}
if s.Lock > 0 {
buf.WriteString(", lock:")
buf.WriteString(execdetails.FormatDuration(s.Lock))
}
if s.Keys > 0 {
buf.WriteString(", foreign_keys:")
buf.WriteString(strconv.Itoa(s.Keys))
}
return buf.String()
}

// Clone implements the RuntimeStats interface.
func (s *FKCheckRuntimeStats) Clone() execdetails.RuntimeStats {
newRs := &FKCheckRuntimeStats{
Total: s.Total,
Check: s.Check,
Lock: s.Lock,
Keys: s.Keys,
}
return newRs
}

// Merge implements the RuntimeStats interface.
func (s *FKCheckRuntimeStats) Merge(other execdetails.RuntimeStats) {
tmp, ok := other.(*FKCheckRuntimeStats)
if !ok {
return
}
s.Total += tmp.Total
s.Check += tmp.Check
s.Lock += tmp.Lock
s.Keys += tmp.Keys
}

// Tp implements the RuntimeStats interface.
func (s *FKCheckRuntimeStats) Tp() int {
return execdetails.TpFKCheckRuntimeStats
}

// String implements the RuntimeStats interface.
func (s *FKCascadeRuntimeStats) String() string {
buf := bytes.NewBuffer(make([]byte, 0, 32))
buf.WriteString("total:")
buf.WriteString(execdetails.FormatDuration(s.Total))
if s.Keys > 0 {
buf.WriteString(", foreign_keys:")
buf.WriteString(strconv.Itoa(s.Keys))
}
return buf.String()
}

// Clone implements the RuntimeStats interface.
func (s *FKCascadeRuntimeStats) Clone() execdetails.RuntimeStats {
newRs := &FKCascadeRuntimeStats{
Total: s.Total,
Keys: s.Keys,
}
return newRs
}

// Merge implements the RuntimeStats interface.
func (s *FKCascadeRuntimeStats) Merge(other execdetails.RuntimeStats) {
tmp, ok := other.(*FKCascadeRuntimeStats)
if !ok {
return
}
s.Total += tmp.Total
s.Keys += tmp.Keys
}

// Tp implements the RuntimeStats interface.
func (s *FKCascadeRuntimeStats) Tp() int {
return execdetails.TpFKCascadeRuntimeStats
}
15 changes: 2 additions & 13 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -1092,10 +1092,6 @@ func (e *InsertValues) collectRuntimeStatsEnabled() bool {
BasicRuntimeStats: e.runtimeStats,
SnapshotRuntimeStats: snapshotStats,
AllocatorRuntimeStats: autoid.NewAllocatorRuntimeStats(),
FKCheckStats: &FKCheckRuntimeStats{},
}
for _, fkc := range e.fkChecks {
fkc.stats = e.stats.FKCheckStats
}
}
return true
Expand Down Expand Up @@ -1299,7 +1295,6 @@ type InsertRuntimeStat struct {
CheckInsertTime time.Duration
Prefetch time.Duration
FKCheckTime time.Duration
FKCheckStats *FKCheckRuntimeStats
}

func (e *InsertRuntimeStat) String() string {
Expand Down Expand Up @@ -1341,10 +1336,8 @@ func (e *InsertRuntimeStat) String() string {
execdetails.FormatDuration(e.CheckInsertTime),
execdetails.FormatDuration(e.CheckInsertTime-e.Prefetch),
execdetails.FormatDuration(e.Prefetch)))
if e.FKCheckStats != nil && e.FKCheckStats.Keys > 0 {
buf.WriteString(fmt.Sprintf(", fk_check: %v, fk_num: %v",
execdetails.FormatDuration(e.FKCheckTime),
e.FKCheckStats.Keys))
if e.FKCheckTime > 0 {
buf.WriteString(fmt.Sprintf(", fk_check: %v", execdetails.FormatDuration(e.FKCheckTime)))
}
if e.SnapshotRuntimeStats != nil {
if rpc := e.SnapshotRuntimeStats.String(); len(rpc) > 0 {
Expand Down Expand Up @@ -1376,10 +1369,6 @@ func (e *InsertRuntimeStat) Clone() execdetails.RuntimeStats {
if e.AllocatorRuntimeStats != nil {
newRs.AllocatorRuntimeStats = e.AllocatorRuntimeStats.Clone()
}
if e.FKCheckStats != nil {
fkCheckStats := *e.FKCheckStats
newRs.FKCheckStats = &fkCheckStats
}
return newRs
}

Expand Down
3 changes: 1 addition & 2 deletions executor/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1056,8 +1056,7 @@ func TestInsertRuntimeStat(t *testing.T) {
stats.Merge(stats.Clone())
require.Equal(t, "prepare: 6s, check_insert: {total_time: 4s, mem_insert_time: 2s, prefetch: 2s}", stats.String())
stats.FKCheckTime = time.Second
stats.FKCheckStats = &executor.FKCheckRuntimeStats{Keys: 20}
require.Equal(t, "prepare: 6s, check_insert: {total_time: 4s, mem_insert_time: 2s, prefetch: 2s, fk_check: 1s, fk_num: 20}", stats.String())
require.Equal(t, "prepare: 6s, check_insert: {total_time: 4s, mem_insert_time: 2s, prefetch: 2s, fk_check: 1s}", stats.String())
}

func TestDuplicateEntryMessage(t *testing.T) {
Expand Down
4 changes: 3 additions & 1 deletion planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1064,7 +1064,9 @@ func binaryOpFromFlatOp(explainCtx sessionctx.Context, op *FlatOperator, out *ti
rootStats, copStats, memTracker, diskTracker := getRuntimeInfo(explainCtx, op.Origin, nil)
if rootStats != nil {
basic, groups := rootStats.MergeStats()
out.RootBasicExecInfo = basic.String()
if basic != nil {
out.RootBasicExecInfo = basic.String()
}
for _, group := range groups {
str := group.String()
if len(str) > 0 {
Expand Down
3 changes: 2 additions & 1 deletion planner/core/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,11 +275,12 @@ func NormalizeFlatPlan(flat *FlatPhysicalPlan) (normalized string, digest *parse
// assume an operator costs around 30 bytes, preallocate space for them
d.buf.Grow(30 * len(selectPlan))
depthOffset := len(flat.Main) - len(selectPlan)
loop1:
for _, op := range selectPlan {
switch op.Origin.(type) {
case *FKCheck, *FKCascade:
// Generate plan digest doesn't need to contain the foreign key check/cascade plan, so just break the loop.
continue
break loop1
}
taskTypeInfo := plancodec.EncodeTaskTypeForNormalize(op.IsRoot, op.StoreType)
p := op.Origin.(PhysicalPlan)
Expand Down
7 changes: 7 additions & 0 deletions planner/core/flat_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,13 @@ func (f *FlatPhysicalPlan) flattenRecursively(p Plan, info *operatorCtx, target
target, childIdx = f.flattenRecursively(plan.TargetPlan, initInfo, target)
childIdxs = append(childIdxs, childIdx)
}
case *FKCascade:
for i, child := range plan.CascadePlans {
childCtx.label = Empty
childCtx.isLastChild = i == len(plan.CascadePlans)-1
target, childIdx = f.flattenRecursively(child, childCtx, target)
childIdxs = append(childIdxs, childIdx)
}
}
if flat != nil {
flat.ChildrenIdx = childIdxs
Expand Down
4 changes: 4 additions & 0 deletions planner/core/foreign_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ type FKCascade struct {
FK *model.FKInfo
FKCols []*model.ColumnInfo
FKIdx *model.IndexInfo
// CascadePlans contains the child cascade plan.
// CascadePlans will be filled during execution, so only `explain analyze` statement result contains the cascade plan,
// `explain` statement result doesn't contain the cascade plan.
CascadePlans []Plan
}

// FKCascadeType indicates in which (delete/update) statements.
Expand Down
6 changes: 4 additions & 2 deletions planner/core/point_get_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,8 +533,10 @@ func TryFastPlan(ctx sessionctx.Context, node ast.Node) (p Plan) {
return nil
}

ctx.GetSessionVars().PlanID = 0
ctx.GetSessionVars().PlanColumnID = 0
if !ctx.GetSessionVars().StmtCtx.InHandleForeignKeyTrigger {
ctx.GetSessionVars().PlanID = 0
ctx.GetSessionVars().PlanColumnID = 0
}
switch x := node.(type) {
case *ast.SelectStmt:
defer func() {
Expand Down
Loading

0 comments on commit 63ce793

Please sign in to comment.