diff --git a/distsql/distsql_test.go b/distsql/distsql_test.go index 6fda7832eb474..20cd07619d6b3 100644 --- a/distsql/distsql_test.go +++ b/distsql/distsql_test.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/execdetails" + "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/stringutil" "github.com/pingcap/tipb/go-tipb" ) @@ -42,7 +43,8 @@ func (s *testSuite) createSelectNormal(batch, totalRows int, c *C, planIDs []str SetDesc(false). SetKeepOrder(false). SetFromSessionVars(variable.NewSessionVars()). - SetMemTracker(s.sctx, stringutil.StringerStr("testSuite.createSelectNormal")). + SetMemTracker(memory.NewTracker(stringutil.StringerStr("testSuite.createSelectNormal"), + s.sctx.GetSessionVars().MemQuotaDistSQL)). Build() c.Assert(err, IsNil) diff --git a/distsql/request_builder.go b/distsql/request_builder.go index 92532de0b7d1d..6818579c5a2a5 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -14,12 +14,10 @@ package distsql import ( - "fmt" "math" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics" @@ -44,10 +42,8 @@ func (builder *RequestBuilder) Build() (*kv.Request, error) { } // SetMemTracker sets a memTracker for this request. -func (builder *RequestBuilder) SetMemTracker(sctx sessionctx.Context, label fmt.Stringer) *RequestBuilder { - t := memory.NewTracker(label, sctx.GetSessionVars().MemQuotaDistSQL) - t.AttachTo(sctx.GetSessionVars().StmtCtx.MemTracker) - builder.Request.MemTracker = t +func (builder *RequestBuilder) SetMemTracker(tracker *memory.Tracker) *RequestBuilder { + builder.Request.MemTracker = tracker return builder } diff --git a/executor/distsql.go b/executor/distsql.go index c4f20312d92a5..f24658531d43d 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -19,6 +19,7 @@ import ( "math" "runtime" "sort" + "strconv" "sync" "sync/atomic" "time" @@ -252,6 +253,8 @@ type IndexReaderExecutor struct { colLens []int plans []plannercore.PhysicalPlan + memTracker *memory.Tracker + selectResultHook // for testing } @@ -301,8 +304,6 @@ func (e *IndexReaderExecutor) Open(ctx context.Context) error { return e.open(ctx, kvRanges) } -var indexReaderDistSQLTrackerLabel fmt.Stringer = stringutil.StringerStr("IndexReaderDistSQLTracker") - func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) error { var err error if e.corColInFilter { @@ -317,6 +318,8 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) e.dagPB.CollectExecutionSummaries = &collExec } + e.memTracker = memory.NewTracker(e.id, e.ctx.GetSessionVars().MemQuotaDistSQL) + e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) var builder distsql.RequestBuilder kvReq, err := builder.SetKeyRanges(kvRanges). SetDAGRequest(e.dagPB). @@ -324,7 +327,7 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) SetKeepOrder(e.keepOrder). SetStreaming(e.streaming). SetFromSessionVars(e.ctx.GetSessionVars()). - SetMemTracker(e.ctx, indexReaderDistSQLTrackerLabel). + SetMemTracker(e.memTracker). Build() if err != nil { e.feedback.Invalidate() @@ -455,6 +458,8 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k e.dagPB.CollectExecutionSummaries = &collExec } + tracker := memory.NewTracker(stringutil.StringerStr("IndexWorker"), e.ctx.GetSessionVars().MemQuotaIndexLookupReader) + tracker.AttachTo(e.memTracker) var builder distsql.RequestBuilder kvReq, err := builder.SetKeyRanges(kvRanges). SetDAGRequest(e.dagPB). @@ -462,7 +467,7 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k SetKeepOrder(e.keepOrder). SetStreaming(e.indexStreaming). SetFromSessionVars(e.ctx.GetSessionVars()). - SetMemTracker(e.ctx, indexLookupDistSQLTrackerLabel). + SetMemTracker(tracker). Build() if err != nil { return err @@ -511,8 +516,6 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k return nil } -var tableWorkerLabel fmt.Stringer = stringutil.StringerStr("tableWorker") - // startTableWorker launchs some background goroutines which pick tasks from workCh and execute the task. func (e *IndexLookUpExecutor) startTableWorker(ctx context.Context, workCh <-chan *lookupTableTask) { lookupConcurrencyLimit := e.ctx.GetSessionVars().IndexLookupConcurrency @@ -526,7 +529,8 @@ func (e *IndexLookUpExecutor) startTableWorker(ctx context.Context, workCh <-cha keepOrder: e.keepOrder, handleIdx: e.handleIdx, isCheckOp: e.isCheckOp, - memTracker: memory.NewTracker(tableWorkerLabel, -1), + memTracker: memory.NewTracker(stringutil.MemoizeStr(func() string { return "TableWorker_" + strconv.Itoa(i) }), + e.ctx.GetSessionVars().MemQuotaIndexLookupReader), } worker.memTracker.AttachTo(e.memTracker) ctx1, cancel := context.WithCancel(ctx) @@ -571,7 +575,6 @@ func (e *IndexLookUpExecutor) Close() error { e.tblWorkerWg.Wait() e.finished = nil e.workerStarted = false - e.memTracker.Detach() e.memTracker = nil if e.runtimeStats != nil { copStats := e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(e.idxPlans[0].ExplainID().String()) diff --git a/executor/explain_test.go b/executor/explain_test.go index f1596be8e2b0c..a9722e54793ff 100644 --- a/executor/explain_test.go +++ b/executor/explain_test.go @@ -14,6 +14,8 @@ package executor_test import ( + "strings" + . "github.com/pingcap/check" "github.com/pingcap/parser/auth" plannercore "github.com/pingcap/tidb/planner/core" @@ -60,3 +62,50 @@ func (s *testSuite1) TestExplainPriviliges(c *C) { err = tk1.ExecToErr("explain select * from v") c.Assert(err.Error(), Equals, plannercore.ErrTableaccessDenied.GenWithStackByArgs("SELECT", "explain", "%", "v").Error()) } + +func (s *testSuite1) TestExplainAnalyzeMemory(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (v int, k int, key(k))") + tk.MustExec("insert into t values (1, 1), (1, 1), (1, 1), (1, 1), (1, 1)") + + s.checkMemoryInfo(c, tk, "explain analyze select * from t order by v") + s.checkMemoryInfo(c, tk, "explain analyze select * from t order by v limit 5") + s.checkMemoryInfo(c, tk, "explain analyze select /*+ TIDB_HJ(t1, t2) */ t1.k from t t1, t t2 where t1.v = t2.v+1") + s.checkMemoryInfo(c, tk, "explain analyze select /*+ TIDB_SMJ(t1, t2) */ t1.k from t t1, t t2 where t1.k = t2.k+1") + s.checkMemoryInfo(c, tk, "explain analyze select /*+ TIDB_INLJ(t1, t2) */ t1.k from t t1, t t2 where t1.k = t2.k and t1.v=1") + s.checkMemoryInfo(c, tk, "explain analyze select sum(k) from t group by v") + s.checkMemoryInfo(c, tk, "explain analyze select sum(v) from t group by k") + s.checkMemoryInfo(c, tk, "explain analyze select * from t") + s.checkMemoryInfo(c, tk, "explain analyze select k from t use index(k)") + s.checkMemoryInfo(c, tk, "explain analyze select * from t use index(k)") +} + +func (s *testSuite1) checkMemoryInfo(c *C, tk *testkit.TestKit, sql string) { + memCol := 5 + ops := []string{"Join", "Reader", "Top", "Sort", "LookUp"} + rows := tk.MustQuery(sql).Rows() + for _, row := range rows { + strs := make([]string, len(row)) + for i, c := range row { + strs[i] = c.(string) + } + if strings.Contains(strs[2], "cop") { + continue + } + + shouldHasMem := false + for _, op := range ops { + if strings.Contains(strs[0], op) { + shouldHasMem = true + break + } + } + + if shouldHasMem { + c.Assert(strs[memCol], Not(Equals), "N/A") + } else { + c.Assert(strs[memCol], Equals, "N/A") + } + } +} diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index 0c7acfa1dee28..556bf0d84b12a 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -300,9 +300,6 @@ func (e *IndexLookUpJoin) getFinishedTask(ctx context.Context) (*lookUpJoinTask, return nil, nil } - if e.task != nil { - e.task.memTracker.Detach() - } e.task = task return task, nil } @@ -655,7 +652,6 @@ func (e *IndexLookUpJoin) Close() error { e.cancelFunc() } e.workerWg.Wait() - e.memTracker.Detach() e.memTracker = nil return e.children[0].Close() } diff --git a/executor/join.go b/executor/join.go index dc5387e965492..83c2619932c1b 100644 --- a/executor/join.go +++ b/executor/join.go @@ -135,7 +135,6 @@ func (e *HashJoinExec) Close() error { e.outerChkResourceCh = nil e.joinChkResourceCh = nil } - e.memTracker.Detach() e.memTracker = nil err := e.baseExecutor.Close() @@ -638,7 +637,6 @@ type NestedLoopApplyExec struct { func (e *NestedLoopApplyExec) Close() error { e.innerRows = nil - e.memTracker.Detach() e.memTracker = nil return e.outerExec.Close() } diff --git a/executor/merge_join.go b/executor/merge_join.go index 31374ef39eb32..4a0521740bc7c 100644 --- a/executor/merge_join.go +++ b/executor/merge_join.go @@ -201,7 +201,6 @@ func (t *mergeJoinInnerTable) reallocReaderResult() { // Close implements the Executor Close interface. func (e *MergeJoinExec) Close() error { - e.memTracker.Detach() e.childrenResults = nil e.memTracker = nil diff --git a/executor/sort.go b/executor/sort.go index 980a1cb6c1ac5..fefac4cda4c1a 100644 --- a/executor/sort.go +++ b/executor/sort.go @@ -56,7 +56,6 @@ type SortExec struct { // Close implements the Executor Close interface. func (e *SortExec) Close() error { - e.memTracker.Detach() e.memTracker = nil return e.children[0].Close() } diff --git a/executor/table_reader.go b/executor/table_reader.go index 6295c0c4e04ad..72d04a9968e8d 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -28,8 +28,8 @@ import ( "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/ranger" - "github.com/pingcap/tidb/util/stringutil" "github.com/pingcap/tipb/go-tipb" ) @@ -74,11 +74,16 @@ type TableReaderExecutor struct { corColInAccess bool plans []plannercore.PhysicalPlan + memTracker *memory.Tracker + selectResultHook // for testing } // Open initialzes necessary variables for using this executor. func (e *TableReaderExecutor) Open(ctx context.Context) error { + e.memTracker = memory.NewTracker(e.id, e.ctx.GetSessionVars().MemQuotaDistSQL) + e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) + var err error if e.corColInFilter { e.dagPB.Executors, _, err = constructDistExec(e.ctx, e.plans) @@ -158,8 +163,6 @@ func (e *TableReaderExecutor) Close() error { return err } -var tableReaderDistSQLTrackerLabel fmt.Stringer = stringutil.StringerStr("TableReaderDistSQLTracker") - // buildResp first builds request and sends it to tikv using distsql.Select. It uses SelectResut returned by the callee // to fetch all results. func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Range) (distsql.SelectResult, error) { @@ -170,7 +173,7 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra SetKeepOrder(e.keepOrder). SetStreaming(e.streaming). SetFromSessionVars(e.ctx.GetSessionVars()). - SetMemTracker(e.ctx, tableReaderDistSQLTrackerLabel). + SetMemTracker(e.memTracker). Build() if err != nil { return nil, err diff --git a/planner/core/cbo_test.go b/planner/core/cbo_test.go index c212258eaf1a3..438e6bc294678 100644 --- a/planner/core/cbo_test.go +++ b/planner/core/cbo_test.go @@ -80,7 +80,7 @@ func (s *testAnalyzeSuite) TestExplainAnalyze(c *C) { rs := tk.MustQuery("explain analyze select t1.a, t1.b, sum(t1.c) from t1 join t2 on t1.a = t2.b where t1.a > 1") c.Assert(len(rs.Rows()), Equals, 10) for _, row := range rs.Rows() { - c.Assert(len(row), Equals, 5) + c.Assert(len(row), Equals, 6) execInfo := row[4].(string) c.Assert(strings.Contains(execInfo, "time"), Equals, true) c.Assert(strings.Contains(execInfo, "loops"), Equals, true) @@ -977,7 +977,7 @@ func (s *testAnalyzeSuite) TestIssue9805(c *C) { c.Assert(rs.Rows(), HasLen, 10) hasIndexLookUp12 := false for _, row := range rs.Rows() { - c.Assert(row, HasLen, 5) + c.Assert(row, HasLen, 6) if strings.HasSuffix(row[0].(string), "IndexLookUp_12") { hasIndexLookUp12 = true c.Assert(row[4], Equals, "time:0ns, loops:0, rows:0") diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index d52b436324cbb..e737701646d13 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -548,7 +548,7 @@ func (e *Explain) prepareSchema() error { case ast.ExplainFormatROW: retFields := []string{"id", "count", "task", "operator info"} if e.Analyze { - retFields = append(retFields, "execution info") + retFields = append(retFields, "execution info", "memory") } schema := expression.NewSchema(make([]*expression.Column, 0, len(retFields))...) for _, fieldName := range retFields { @@ -628,6 +628,13 @@ func (e *Explain) prepareOperatorInfo(p PhysicalPlan, taskType string, indent st } else { row = append(row, "time:0ns, loops:0, rows:0") } + + tracker := e.ctx.GetSessionVars().StmtCtx.MemTracker.SearchTracker(p.ExplainID().String()) + if tracker != nil { + row = append(row, tracker.BytesToString(tracker.MaxConsumed())) + } else { + row = append(row, "N/A") + } } e.Rows = append(e.Rows, row) } diff --git a/util/memory/tracker.go b/util/memory/tracker.go index befbab5ce3e76..3b935360f0fce 100644 --- a/util/memory/tracker.go +++ b/util/memory/tracker.go @@ -87,11 +87,6 @@ func (t *Tracker) AttachTo(parent *Tracker) { t.parent.Consume(t.BytesConsumed()) } -// Detach detaches this Tracker from its parent. -func (t *Tracker) Detach() { - t.parent.remove(t) -} - func (t *Tracker) remove(oldChild *Tracker) { t.mu.Lock() defer t.mu.Unlock() @@ -144,17 +139,13 @@ func (t *Tracker) Consume(bytes int64) { rootExceed = tracker } - if tracker.parent == nil { - // since we only need a total memory usage during execution, - // we only record max consumed bytes in root(statement-level) for performance. - for { - maxNow := atomic.LoadInt64(&tracker.maxConsumed) - consumed := atomic.LoadInt64(&tracker.bytesConsumed) - if consumed > maxNow && !atomic.CompareAndSwapInt64(&tracker.maxConsumed, maxNow, consumed) { - continue - } - break + for { + maxNow := atomic.LoadInt64(&tracker.maxConsumed) + consumed := atomic.LoadInt64(&tracker.bytesConsumed) + if consumed > maxNow && !atomic.CompareAndSwapInt64(&tracker.maxConsumed, maxNow, consumed) { + continue } + break } } if rootExceed != nil { @@ -172,6 +163,21 @@ func (t *Tracker) MaxConsumed() int64 { return atomic.LoadInt64(&t.maxConsumed) } +// SearchTracker searches the specific tracker under this tracker. +func (t *Tracker) SearchTracker(label string) *Tracker { + if t.label.String() == label { + return t + } + t.mu.Lock() + defer t.mu.Unlock() + for _, child := range t.mu.children { + if result := child.SearchTracker(label); result != nil { + return result + } + } + return nil +} + // String returns the string representation of this Tracker tree. func (t *Tracker) String() string { buffer := bytes.NewBufferString("\n")