Skip to content

Commit

Permalink
*: make auto-analyze killable by global memory limit (#39978)
Browse files Browse the repository at this point in the history
ref #39971, close #39994
  • Loading branch information
chrysan authored Dec 20, 2022
1 parent 806fcbf commit fdf335e
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 33 deletions.
2 changes: 1 addition & 1 deletion executor/analyzetest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ go_test(
"main_test.go",
],
flaky = True,
race = "on",
shard_count = 50,
deps = [
"//domain",
Expand All @@ -30,6 +29,7 @@ go_test(
"//tablecodec",
"//testkit",
"//types",
"//util",
"//util/codec",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
Expand Down
114 changes: 114 additions & 0 deletions executor/analyzetest/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package analyzetest
import (
"context"
"fmt"
"runtime"
"strconv"
"strings"
"testing"
Expand All @@ -43,6 +44,7 @@ import (
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/codec"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/testutils"
Expand Down Expand Up @@ -3060,3 +3062,115 @@ func TestAutoAnalyzeAwareGlobalVariableChange(t *testing.T) {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/injectBaseCount"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/injectBaseModifyCount"))
}

func TestGlobalMemoryControlForAnalyze(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

tk0 := testkit.NewTestKit(t, store)
tk0.MustExec("set global tidb_mem_oom_action = 'cancel'")
tk0.MustExec("set global tidb_server_memory_limit = 512MB")
tk0.MustExec("set global tidb_server_memory_limit_sess_min_size = 128")

sm := &testkit.MockSessionManager{
PS: []*util.ProcessInfo{tk0.Session().ShowProcess()},
}
dom.ServerMemoryLimitHandle().SetSessionManager(sm)
go dom.ServerMemoryLimitHandle().Run()

tk0.MustExec("use test")
tk0.MustExec("create table t(a int)")
tk0.MustExec("insert into t select 1")
for i := 1; i <= 8; i++ {
tk0.MustExec("insert into t select * from t") // 256 Lines
}
sql := "analyze table t with 1.0 samplerate;" // Need about 100MB
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/memory/ReadMemStats", `return(536870912)`))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/mockAnalyzeMergeWorkerSlowConsume", `return(100)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/memory/ReadMemStats"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/mockAnalyzeMergeWorkerSlowConsume"))
}()
_, err := tk0.Exec(sql)
require.True(t, strings.Contains(err.Error(), "Out Of Memory Quota!"))
runtime.GC()
}

func TestGlobalMemoryControlForAutoAnalyze(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
originalVal1 := tk.MustQuery("select @@global.tidb_mem_oom_action").Rows()[0][0].(string)
tk.MustExec("set global tidb_mem_oom_action = 'cancel'")
//originalVal2 := tk.MustQuery("select @@global.tidb_server_memory_limit").Rows()[0][0].(string)
tk.MustExec("set global tidb_server_memory_limit = 512MB")
originalVal3 := tk.MustQuery("select @@global.tidb_server_memory_limit_sess_min_size").Rows()[0][0].(string)
tk.MustExec("set global tidb_server_memory_limit_sess_min_size = 128")
defer func() {
tk.MustExec(fmt.Sprintf("set global tidb_mem_oom_action = %v", originalVal1))
//tk.MustExec(fmt.Sprintf("set global tidb_server_memory_limit = %v", originalVal2))
tk.MustExec(fmt.Sprintf("set global tidb_server_memory_limit_sess_min_size = %v", originalVal3))
}()

// clean child trackers
oldChildTrackers := executor.GlobalAnalyzeMemoryTracker.GetChildrenForTest()
for _, tracker := range oldChildTrackers {
tracker.Detach()
}
defer func() {
for _, tracker := range oldChildTrackers {
tracker.AttachTo(executor.GlobalAnalyzeMemoryTracker)
}
}()
childTrackers := executor.GlobalAnalyzeMemoryTracker.GetChildrenForTest()
require.Len(t, childTrackers, 0)

tk.MustExec("use test")
tk.MustExec("create table t(a int)")
tk.MustExec("insert into t select 1")
for i := 1; i <= 8; i++ {
tk.MustExec("insert into t select * from t") // 256 Lines
}
_, err0 := tk.Exec("analyze table t with 1.0 samplerate;")
require.NoError(t, err0)
rs0 := tk.MustQuery("select fail_reason from mysql.analyze_jobs where table_name=? and state=? limit 1", "t", "failed")
require.Len(t, rs0.Rows(), 0)

h := dom.StatsHandle()
originalVal4 := handle.AutoAnalyzeMinCnt
originalVal5 := tk.MustQuery("select @@global.tidb_auto_analyze_ratio").Rows()[0][0].(string)
handle.AutoAnalyzeMinCnt = 0
tk.MustExec("set global tidb_auto_analyze_ratio = 0.001")
defer func() {
handle.AutoAnalyzeMinCnt = originalVal4
tk.MustExec(fmt.Sprintf("set global tidb_auto_analyze_ratio = %v", originalVal5))
}()

sm := &testkit.MockSessionManager{
Dom: dom,
PS: []*util.ProcessInfo{tk.Session().ShowProcess()},
}
dom.ServerMemoryLimitHandle().SetSessionManager(sm)
go dom.ServerMemoryLimitHandle().Run()

tk.MustExec("insert into t values(4),(5),(6)")
require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpAll))
err := h.Update(dom.InfoSchema())
require.NoError(t, err)

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/memory/ReadMemStats", `return(536870912)`))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/mockAnalyzeMergeWorkerSlowConsume", `return(100)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/memory/ReadMemStats"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/mockAnalyzeMergeWorkerSlowConsume"))
}()
tk.MustQuery("select 1")
childTrackers = executor.GlobalAnalyzeMemoryTracker.GetChildrenForTest()
require.Len(t, childTrackers, 0)

h.HandleAutoAnalyze(dom.InfoSchema())
rs := tk.MustQuery("select fail_reason from mysql.analyze_jobs where table_name=? and state=? limit 1", "t", "failed")
failReason := rs.Rows()[0][0].(string)
require.True(t, strings.Contains(failReason, "Out Of Memory Quota!"))

childTrackers = executor.GlobalAnalyzeMemoryTracker.GetChildrenForTest()
require.Len(t, childTrackers, 0)
}
1 change: 1 addition & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1965,6 +1965,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {

sc.SysdateIsNow = ctx.GetSessionVars().SysdateIsNow

vars.MemTracker.Detach()
vars.MemTracker.UnbindActions()
vars.MemTracker.SetBytesLimit(vars.MemQuotaQuery)
vars.MemTracker.ResetMaxConsumed()
Expand Down
32 changes: 0 additions & 32 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6199,38 +6199,6 @@ func TestGlobalMemoryControl2(t *testing.T) {
runtime.GC()
}

func TestGlobalMemoryControlForAnalyze(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

tk0 := testkit.NewTestKit(t, store)
tk0.MustExec("set global tidb_mem_oom_action = 'cancel'")
tk0.MustExec("set global tidb_server_memory_limit = 512MB")
tk0.MustExec("set global tidb_server_memory_limit_sess_min_size = 128")

sm := &testkit.MockSessionManager{
PS: []*util.ProcessInfo{tk0.Session().ShowProcess()},
}
dom.ServerMemoryLimitHandle().SetSessionManager(sm)
go dom.ServerMemoryLimitHandle().Run()

tk0.MustExec("use test")
tk0.MustExec("create table t(a int)")
tk0.MustExec("insert into t select 1")
for i := 1; i <= 8; i++ {
tk0.MustExec("insert into t select * from t") // 256 Lines
}
sql := "analyze table t with 1.0 samplerate;" // Need about 100MB
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/memory/ReadMemStats", `return(536870912)`))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/mockAnalyzeMergeWorkerSlowConsume", `return(100)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/memory/ReadMemStats"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/mockAnalyzeMergeWorkerSlowConsume"))
}()
_, err := tk0.Exec(sql)
require.True(t, strings.Contains(err.Error(), "Out Of Memory Quota!"))
runtime.GC()
}

func TestCompileOutOfMemoryQuota(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
Expand Down
5 changes: 5 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,11 @@ func (s *Server) GetProcessInfo(id uint64) (*util.ProcessInfo, bool) {
conn, ok := s.clients[id]
s.rwlock.RUnlock()
if !ok {
if s.dom != nil {
if pinfo, ok2 := s.dom.SysProcTracker().GetSysProcessList()[id]; ok2 {
return pinfo, true
}
}
return &util.ProcessInfo{}, false
}
return conn.ctx.ShowProcess(), ok
Expand Down
2 changes: 2 additions & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1988,6 +1988,7 @@ func (s *session) useCurrentSession(execOption sqlexec.ExecOption) (*session, fu
s.sessionVars.StmtCtx.OriginalSQL = prevSQL
s.sessionVars.StmtCtx.StmtType = prevStmtType
s.sessionVars.StmtCtx.Tables = prevTables
s.sessionVars.MemTracker.Detach()
}, nil
}

Expand Down Expand Up @@ -2049,6 +2050,7 @@ func (s *session) getInternalSession(execOption sqlexec.ExecOption) (*session, f
se.sessionVars.PartitionPruneMode.Store(prePruneMode)
se.sessionVars.OptimizerUseInvisibleIndexes = false
se.sessionVars.InspectionTableCache = nil
se.sessionVars.MemTracker.Detach()
s.sysSessionPool().Put(tmp)
}, nil
}
Expand Down
12 changes: 12 additions & 0 deletions testkit/mocksessionmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"crypto/tls"
"sync"

"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/session"
Expand All @@ -31,6 +32,7 @@ type MockSessionManager struct {
PSMu sync.RWMutex
SerID uint64
TxnInfo []*txninfo.TxnInfo
Dom *domain.Domain
conn map[uint64]session.Session
mu sync.Mutex
}
Expand Down Expand Up @@ -68,6 +70,11 @@ func (msm *MockSessionManager) ShowProcessList() map[uint64]*util.ProcessInfo {
ret[connID] = pi.ShowProcess()
}
msm.mu.Unlock()
if msm.Dom != nil {
for connID, pi := range msm.Dom.SysProcTracker().GetSysProcessList() {
ret[connID] = pi
}
}
return ret
}

Expand All @@ -85,6 +92,11 @@ func (msm *MockSessionManager) GetProcessInfo(id uint64) (*util.ProcessInfo, boo
if sess := msm.conn[id]; sess != nil {
return sess.ShowProcess(), true
}
if msm.Dom != nil {
if pinfo, ok := msm.Dom.SysProcTracker().GetSysProcessList()[id]; ok {
return pinfo, true
}
}
return &util.ProcessInfo{}, false
}

Expand Down
11 changes: 11 additions & 0 deletions util/memory/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,17 @@ func (t *Tracker) CountAllChildrenMemUse() map[string]int64 {
return trackerMemUseMap
}

// GetChildrenForTest returns children trackers
func (t *Tracker) GetChildrenForTest() []*Tracker {
t.mu.Lock()
defer t.mu.Unlock()
trackers := make([]*Tracker, 0)
for _, list := range t.mu.children {
trackers = append(trackers, list...)
}
return trackers
}

func countChildMem(t *Tracker, familyTreeName string, trackerMemUseMap map[string]int64) {
if len(familyTreeName) > 0 {
familyTreeName += " <- "
Expand Down

0 comments on commit fdf335e

Please sign in to comment.