diff --git a/executor/analyzetest/BUILD.bazel b/executor/analyzetest/BUILD.bazel index 53126213363a5..3112abe57c00f 100644 --- a/executor/analyzetest/BUILD.bazel +++ b/executor/analyzetest/BUILD.bazel @@ -8,7 +8,6 @@ go_test( "main_test.go", ], flaky = True, - race = "on", shard_count = 50, deps = [ "//domain", @@ -30,6 +29,7 @@ go_test( "//tablecodec", "//testkit", "//types", + "//util", "//util/codec", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", diff --git a/executor/analyzetest/analyze_test.go b/executor/analyzetest/analyze_test.go index e3bf9d51d9260..55f3ad9397be9 100644 --- a/executor/analyzetest/analyze_test.go +++ b/executor/analyzetest/analyze_test.go @@ -17,6 +17,7 @@ package analyzetest import ( "context" "fmt" + "runtime" "strconv" "strings" "testing" @@ -43,6 +44,7 @@ import ( "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/codec" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/testutils" @@ -3060,3 +3062,115 @@ func TestAutoAnalyzeAwareGlobalVariableChange(t *testing.T) { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/injectBaseCount")) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/injectBaseModifyCount")) } + +func TestGlobalMemoryControlForAnalyze(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + + tk0 := testkit.NewTestKit(t, store) + tk0.MustExec("set global tidb_mem_oom_action = 'cancel'") + tk0.MustExec("set global tidb_server_memory_limit = 512MB") + tk0.MustExec("set global tidb_server_memory_limit_sess_min_size = 128") + + sm := &testkit.MockSessionManager{ + PS: []*util.ProcessInfo{tk0.Session().ShowProcess()}, + } + dom.ServerMemoryLimitHandle().SetSessionManager(sm) + go dom.ServerMemoryLimitHandle().Run() + + tk0.MustExec("use test") + tk0.MustExec("create table t(a int)") + tk0.MustExec("insert into t select 1") + for i := 1; i <= 8; i++ { + tk0.MustExec("insert into t select * from t") // 256 Lines + } + sql := "analyze table t with 1.0 samplerate;" // Need about 100MB + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/memory/ReadMemStats", `return(536870912)`)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/mockAnalyzeMergeWorkerSlowConsume", `return(100)`)) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/memory/ReadMemStats")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/mockAnalyzeMergeWorkerSlowConsume")) + }() + _, err := tk0.Exec(sql) + require.True(t, strings.Contains(err.Error(), "Out Of Memory Quota!")) + runtime.GC() +} + +func TestGlobalMemoryControlForAutoAnalyze(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + originalVal1 := tk.MustQuery("select @@global.tidb_mem_oom_action").Rows()[0][0].(string) + tk.MustExec("set global tidb_mem_oom_action = 'cancel'") + //originalVal2 := tk.MustQuery("select @@global.tidb_server_memory_limit").Rows()[0][0].(string) + tk.MustExec("set global tidb_server_memory_limit = 512MB") + originalVal3 := tk.MustQuery("select @@global.tidb_server_memory_limit_sess_min_size").Rows()[0][0].(string) + tk.MustExec("set global tidb_server_memory_limit_sess_min_size = 128") + defer func() { + tk.MustExec(fmt.Sprintf("set global tidb_mem_oom_action = %v", originalVal1)) + //tk.MustExec(fmt.Sprintf("set global tidb_server_memory_limit = %v", originalVal2)) + tk.MustExec(fmt.Sprintf("set global tidb_server_memory_limit_sess_min_size = %v", originalVal3)) + }() + + // clean child trackers + oldChildTrackers := executor.GlobalAnalyzeMemoryTracker.GetChildrenForTest() + for _, tracker := range oldChildTrackers { + tracker.Detach() + } + defer func() { + for _, tracker := range oldChildTrackers { + tracker.AttachTo(executor.GlobalAnalyzeMemoryTracker) + } + }() + childTrackers := executor.GlobalAnalyzeMemoryTracker.GetChildrenForTest() + require.Len(t, childTrackers, 0) + + tk.MustExec("use test") + tk.MustExec("create table t(a int)") + tk.MustExec("insert into t select 1") + for i := 1; i <= 8; i++ { + tk.MustExec("insert into t select * from t") // 256 Lines + } + _, err0 := tk.Exec("analyze table t with 1.0 samplerate;") + require.NoError(t, err0) + rs0 := tk.MustQuery("select fail_reason from mysql.analyze_jobs where table_name=? and state=? limit 1", "t", "failed") + require.Len(t, rs0.Rows(), 0) + + h := dom.StatsHandle() + originalVal4 := handle.AutoAnalyzeMinCnt + originalVal5 := tk.MustQuery("select @@global.tidb_auto_analyze_ratio").Rows()[0][0].(string) + handle.AutoAnalyzeMinCnt = 0 + tk.MustExec("set global tidb_auto_analyze_ratio = 0.001") + defer func() { + handle.AutoAnalyzeMinCnt = originalVal4 + tk.MustExec(fmt.Sprintf("set global tidb_auto_analyze_ratio = %v", originalVal5)) + }() + + sm := &testkit.MockSessionManager{ + Dom: dom, + PS: []*util.ProcessInfo{tk.Session().ShowProcess()}, + } + dom.ServerMemoryLimitHandle().SetSessionManager(sm) + go dom.ServerMemoryLimitHandle().Run() + + tk.MustExec("insert into t values(4),(5),(6)") + require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpAll)) + err := h.Update(dom.InfoSchema()) + require.NoError(t, err) + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/memory/ReadMemStats", `return(536870912)`)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/mockAnalyzeMergeWorkerSlowConsume", `return(100)`)) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/memory/ReadMemStats")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/mockAnalyzeMergeWorkerSlowConsume")) + }() + tk.MustQuery("select 1") + childTrackers = executor.GlobalAnalyzeMemoryTracker.GetChildrenForTest() + require.Len(t, childTrackers, 0) + + h.HandleAutoAnalyze(dom.InfoSchema()) + rs := tk.MustQuery("select fail_reason from mysql.analyze_jobs where table_name=? and state=? limit 1", "t", "failed") + failReason := rs.Rows()[0][0].(string) + require.True(t, strings.Contains(failReason, "Out Of Memory Quota!")) + + childTrackers = executor.GlobalAnalyzeMemoryTracker.GetChildrenForTest() + require.Len(t, childTrackers, 0) +} diff --git a/executor/executor.go b/executor/executor.go index 90622ce52e527..2e5b5c4a0280f 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -1965,6 +1965,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { sc.SysdateIsNow = ctx.GetSessionVars().SysdateIsNow + vars.MemTracker.Detach() vars.MemTracker.UnbindActions() vars.MemTracker.SetBytesLimit(vars.MemQuotaQuery) vars.MemTracker.ResetMaxConsumed() diff --git a/executor/executor_test.go b/executor/executor_test.go index bd64c39e5a134..122fbdbe7dd2f 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -6199,38 +6199,6 @@ func TestGlobalMemoryControl2(t *testing.T) { runtime.GC() } -func TestGlobalMemoryControlForAnalyze(t *testing.T) { - store, dom := testkit.CreateMockStoreAndDomain(t) - - tk0 := testkit.NewTestKit(t, store) - tk0.MustExec("set global tidb_mem_oom_action = 'cancel'") - tk0.MustExec("set global tidb_server_memory_limit = 512MB") - tk0.MustExec("set global tidb_server_memory_limit_sess_min_size = 128") - - sm := &testkit.MockSessionManager{ - PS: []*util.ProcessInfo{tk0.Session().ShowProcess()}, - } - dom.ServerMemoryLimitHandle().SetSessionManager(sm) - go dom.ServerMemoryLimitHandle().Run() - - tk0.MustExec("use test") - tk0.MustExec("create table t(a int)") - tk0.MustExec("insert into t select 1") - for i := 1; i <= 8; i++ { - tk0.MustExec("insert into t select * from t") // 256 Lines - } - sql := "analyze table t with 1.0 samplerate;" // Need about 100MB - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/memory/ReadMemStats", `return(536870912)`)) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/mockAnalyzeMergeWorkerSlowConsume", `return(100)`)) - defer func() { - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/memory/ReadMemStats")) - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/mockAnalyzeMergeWorkerSlowConsume")) - }() - _, err := tk0.Exec(sql) - require.True(t, strings.Contains(err.Error(), "Out Of Memory Quota!")) - runtime.GC() -} - func TestCompileOutOfMemoryQuota(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) diff --git a/server/server.go b/server/server.go index 09a20c8cb39c2..ba915c64f23cb 100644 --- a/server/server.go +++ b/server/server.go @@ -733,6 +733,11 @@ func (s *Server) GetProcessInfo(id uint64) (*util.ProcessInfo, bool) { conn, ok := s.clients[id] s.rwlock.RUnlock() if !ok { + if s.dom != nil { + if pinfo, ok2 := s.dom.SysProcTracker().GetSysProcessList()[id]; ok2 { + return pinfo, true + } + } return &util.ProcessInfo{}, false } return conn.ctx.ShowProcess(), ok diff --git a/session/session.go b/session/session.go index 2c6aa0567fa66..63bc1c970fe08 100644 --- a/session/session.go +++ b/session/session.go @@ -1988,6 +1988,7 @@ func (s *session) useCurrentSession(execOption sqlexec.ExecOption) (*session, fu s.sessionVars.StmtCtx.OriginalSQL = prevSQL s.sessionVars.StmtCtx.StmtType = prevStmtType s.sessionVars.StmtCtx.Tables = prevTables + s.sessionVars.MemTracker.Detach() }, nil } @@ -2049,6 +2050,7 @@ func (s *session) getInternalSession(execOption sqlexec.ExecOption) (*session, f se.sessionVars.PartitionPruneMode.Store(prePruneMode) se.sessionVars.OptimizerUseInvisibleIndexes = false se.sessionVars.InspectionTableCache = nil + se.sessionVars.MemTracker.Detach() s.sysSessionPool().Put(tmp) }, nil } diff --git a/testkit/mocksessionmanager.go b/testkit/mocksessionmanager.go index a9e4d085dc34d..550ff69132d91 100644 --- a/testkit/mocksessionmanager.go +++ b/testkit/mocksessionmanager.go @@ -18,6 +18,7 @@ import ( "crypto/tls" "sync" + "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/session" @@ -31,6 +32,7 @@ type MockSessionManager struct { PSMu sync.RWMutex SerID uint64 TxnInfo []*txninfo.TxnInfo + Dom *domain.Domain conn map[uint64]session.Session mu sync.Mutex } @@ -68,6 +70,11 @@ func (msm *MockSessionManager) ShowProcessList() map[uint64]*util.ProcessInfo { ret[connID] = pi.ShowProcess() } msm.mu.Unlock() + if msm.Dom != nil { + for connID, pi := range msm.Dom.SysProcTracker().GetSysProcessList() { + ret[connID] = pi + } + } return ret } @@ -85,6 +92,11 @@ func (msm *MockSessionManager) GetProcessInfo(id uint64) (*util.ProcessInfo, boo if sess := msm.conn[id]; sess != nil { return sess.ShowProcess(), true } + if msm.Dom != nil { + if pinfo, ok := msm.Dom.SysProcTracker().GetSysProcessList()[id]; ok { + return pinfo, true + } + } return &util.ProcessInfo{}, false } diff --git a/util/memory/tracker.go b/util/memory/tracker.go index 9c2adf31ace14..39261a45355a1 100644 --- a/util/memory/tracker.go +++ b/util/memory/tracker.go @@ -762,6 +762,17 @@ func (t *Tracker) CountAllChildrenMemUse() map[string]int64 { return trackerMemUseMap } +// GetChildrenForTest returns children trackers +func (t *Tracker) GetChildrenForTest() []*Tracker { + t.mu.Lock() + defer t.mu.Unlock() + trackers := make([]*Tracker, 0) + for _, list := range t.mu.children { + trackers = append(trackers, list...) + } + return trackers +} + func countChildMem(t *Tracker, familyTreeName string, trackerMemUseMap map[string]int64) { if len(familyTreeName) > 0 { familyTreeName += " <- "