From cd579ddc35460fffb7f1cc716ddf48bb1b26040d Mon Sep 17 00:00:00 2001 From: Lynn Date: Tue, 8 Oct 2019 11:12:01 +0800 Subject: [PATCH] *: adjust delta schema count and add metrics (#11625) (#12502) --- domain/schema_validator.go | 69 ++++++++++++++++++++++++--- domain/schema_validator_test.go | 59 +++++++++++++++++++++++ executor/seqtest/seq_executor_test.go | 29 +++++++++++ metrics/domain.go | 15 ++++++ metrics/metrics.go | 1 + session/session.go | 1 + sessionctx/variable/session.go | 3 ++ sessionctx/variable/sysvar.go | 1 + sessionctx/variable/tidb_vars.go | 6 +++ sessionctx/variable/varsutil.go | 12 +++++ 10 files changed, 189 insertions(+), 7 deletions(-) diff --git a/domain/schema_validator.go b/domain/schema_validator.go index 53a9bf4977857..17c6763d28f1d 100644 --- a/domain/schema_validator.go +++ b/domain/schema_validator.go @@ -18,6 +18,8 @@ import ( "sync" "time" + "github.com/pingcap/tidb/metrics" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" @@ -73,7 +75,7 @@ func NewSchemaValidator(lease time.Duration) SchemaValidator { return &schemaValidator{ isStarted: true, lease: lease, - deltaSchemaInfos: make([]deltaSchemaInfo, 0, maxNumberOfDiffsToLoad), + deltaSchemaInfos: make([]deltaSchemaInfo, 0, variable.DefTiDBMaxDeltaSchemaCount), } } @@ -86,26 +88,29 @@ func (s *schemaValidator) IsStarted() bool { func (s *schemaValidator) Stop() { logutil.Logger(context.Background()).Info("the schema validator stops") + metrics.LoadSchemaCounter.WithLabelValues(metrics.SchemaValidatorStop).Inc() s.mux.Lock() defer s.mux.Unlock() s.isStarted = false s.latestSchemaVer = 0 - s.deltaSchemaInfos = make([]deltaSchemaInfo, 0, maxNumberOfDiffsToLoad) + s.deltaSchemaInfos = s.deltaSchemaInfos[:0] } func (s *schemaValidator) Restart() { logutil.Logger(context.Background()).Info("the schema validator restarts") + metrics.LoadSchemaCounter.WithLabelValues(metrics.SchemaValidatorRestart).Inc() s.mux.Lock() defer s.mux.Unlock() s.isStarted = true } func (s *schemaValidator) Reset() { + metrics.LoadSchemaCounter.WithLabelValues(metrics.SchemaValidatorReset).Inc() s.mux.Lock() defer s.mux.Unlock() s.isStarted = true s.latestSchemaVer = 0 - s.deltaSchemaInfos = make([]deltaSchemaInfo, 0, maxNumberOfDiffsToLoad) + s.deltaSchemaInfos = s.deltaSchemaInfos[:0] } func (s *schemaValidator) Update(leaseGrantTS uint64, oldVer, currVer int64, changedTableIDs []int64) { @@ -147,13 +152,17 @@ func hasRelatedTableID(relatedTableIDs, updateTableIDs []int64) bool { // NOTE, this function should be called under lock! func (s *schemaValidator) isRelatedTablesChanged(currVer int64, tableIDs []int64) bool { if len(s.deltaSchemaInfos) == 0 { + metrics.LoadSchemaCounter.WithLabelValues(metrics.SchemaValidatorCacheEmpty).Inc() logutil.Logger(context.Background()).Info("schema change history is empty", zap.Int64("currVer", currVer)) return true } newerDeltas := s.findNewerDeltas(currVer) if len(newerDeltas) == len(s.deltaSchemaInfos) { - logutil.Logger(context.Background()).Info("the schema version is much older than the latest version", zap.Int64("currVer", currVer), - zap.Int64("latestSchemaVer", s.latestSchemaVer)) + metrics.LoadSchemaCounter.WithLabelValues(metrics.SchemaValidatorCacheMiss).Inc() + logutil.Logger(context.Background()).Info("the schema version is much older than the latest version", + zap.Int64("currVer", currVer), + zap.Int64("latestSchemaVer", s.latestSchemaVer), + zap.Reflect("deltas", newerDeltas)) return true } for _, item := range newerDeltas { @@ -209,8 +218,54 @@ func (s *schemaValidator) Check(txnTS uint64, schemaVer int64, relatedTableIDs [ } func (s *schemaValidator) enqueue(schemaVersion int64, relatedTableIDs []int64) { - s.deltaSchemaInfos = append(s.deltaSchemaInfos, deltaSchemaInfo{schemaVersion, relatedTableIDs}) - if len(s.deltaSchemaInfos) > maxNumberOfDiffsToLoad { + maxCnt := int(variable.GetMaxDeltaSchemaCount()) + if maxCnt <= 0 { + logutil.Logger(context.Background()).Info("the schema validator enqueue", zap.Int("delta max count", maxCnt)) + return + } + + delta := deltaSchemaInfo{schemaVersion, relatedTableIDs} + if len(s.deltaSchemaInfos) == 0 { + s.deltaSchemaInfos = append(s.deltaSchemaInfos, delta) + return + } + + lastOffset := len(s.deltaSchemaInfos) - 1 + // The first item we needn't to merge, because we hope to cover more versions. + if lastOffset != 0 && ids(s.deltaSchemaInfos[lastOffset].relatedTableIDs).containIn(delta.relatedTableIDs) { + s.deltaSchemaInfos[lastOffset] = delta + } else { + s.deltaSchemaInfos = append(s.deltaSchemaInfos, delta) + } + + if len(s.deltaSchemaInfos) > maxCnt { + logutil.Logger(context.Background()).Info("the schema validator enqueue, queue is too long", + zap.Int("delta max count", maxCnt), zap.Int64("remove schema version", s.deltaSchemaInfos[0].schemaVersion)) s.deltaSchemaInfos = s.deltaSchemaInfos[1:] } } + +type ids []int64 + +// containIn is checks if a is included in b. +func (a ids) containIn(b []int64) bool { + if len(a) > len(b) { + return false + } + + var isEqual bool + for _, i := range a { + isEqual = false + for _, j := range b { + if i == j { + isEqual = true + break + } + } + if !isEqual { + return false + } + } + + return true +} diff --git a/domain/schema_validator_test.go b/domain/schema_validator_test.go index d5e2193b25b39..46332b1742cb6 100644 --- a/domain/schema_validator_test.go +++ b/domain/schema_validator_test.go @@ -19,6 +19,7 @@ import ( "time" . "github.com/pingcap/check" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/util/testleak" ) @@ -141,3 +142,61 @@ func serverFunc(lease time.Duration, requireLease chan leaseGrantItem, oracleCh } } } + +func (*testSuite) TestEnqueue(c *C) { + lease := 10 * time.Millisecond + originalCnt := variable.GetMaxDeltaSchemaCount() + defer variable.SetMaxDeltaSchemaCount(originalCnt) + + validator := NewSchemaValidator(lease).(*schemaValidator) + c.Assert(validator.IsStarted(), IsTrue) + // maxCnt is 0. + variable.SetMaxDeltaSchemaCount(0) + validator.enqueue(1, []int64{11}) + c.Assert(validator.deltaSchemaInfos, HasLen, 0) + + // maxCnt is 10. + variable.SetMaxDeltaSchemaCount(10) + ds := []deltaSchemaInfo{ + {0, []int64{1}}, + {1, []int64{1}}, + {2, []int64{1}}, + {3, []int64{2, 2}}, + {4, []int64{2}}, + {5, []int64{1, 4}}, + {6, []int64{1, 4}}, + {7, []int64{3, 1, 3}}, + {8, []int64{1, 2, 3}}, + {9, []int64{1, 2, 3}}, + } + for _, d := range ds { + validator.enqueue(d.schemaVersion, d.relatedTableIDs) + } + validator.enqueue(10, []int64{1}) + ret := []deltaSchemaInfo{ + {0, []int64{1}}, + {2, []int64{1}}, + {3, []int64{2, 2}}, + {4, []int64{2}}, + {6, []int64{1, 4}}, + {9, []int64{1, 2, 3}}, + {10, []int64{1}}, + } + c.Assert(validator.deltaSchemaInfos, DeepEquals, ret) + // The Items' relatedTableIDs have different order. + validator.enqueue(11, []int64{1, 2, 3, 4}) + validator.enqueue(12, []int64{4, 1, 2, 3, 1}) + validator.enqueue(13, []int64{4, 1, 3, 2, 5}) + ret[len(ret)-1] = deltaSchemaInfo{13, []int64{4, 1, 3, 2, 5}} + c.Assert(validator.deltaSchemaInfos, DeepEquals, ret) + // The length of deltaSchemaInfos is greater then maxCnt. + validator.enqueue(14, []int64{1}) + validator.enqueue(15, []int64{2}) + validator.enqueue(16, []int64{3}) + validator.enqueue(17, []int64{4}) + ret = append(ret, deltaSchemaInfo{14, []int64{1}}) + ret = append(ret, deltaSchemaInfo{15, []int64{2}}) + ret = append(ret, deltaSchemaInfo{16, []int64{3}}) + ret = append(ret, deltaSchemaInfo{17, []int64{4}}) + c.Assert(validator.deltaSchemaInfos, DeepEquals, ret[1:]) +} diff --git a/executor/seqtest/seq_executor_test.go b/executor/seqtest/seq_executor_test.go index 2093cd7a5a6f3..631c5ecea3f19 100644 --- a/executor/seqtest/seq_executor_test.go +++ b/executor/seqtest/seq_executor_test.go @@ -36,6 +36,7 @@ import ( pb "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/parser" "github.com/pingcap/parser/model" + "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/executor" @@ -1050,3 +1051,31 @@ func (s *seqTestSuite) TestAutoIDInRetry(c *C) { tk.MustExec("insert into t values ()") tk.MustQuery(`select * from t`).Check(testkit.Rows("1", "2", "3", "4", "5")) } + +func (s *seqTestSuite) TestMaxDeltaSchemaCount(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + c.Assert(variable.GetMaxDeltaSchemaCount(), Equals, int64(variable.DefTiDBMaxDeltaSchemaCount)) + gvc := domain.GetDomain(tk.Se).GetGlobalVarsCache() + gvc.Disable() + + tk.MustExec("set @@global.tidb_max_delta_schema_count= -1") + tk.MustQuery("show warnings;").Check(testkit.Rows("Warning 1292 Truncated incorrect tidb_max_delta_schema_count value: '-1'")) + // Make sure a new session will load global variables. + tk.Se = nil + tk.MustExec("use test") + c.Assert(variable.GetMaxDeltaSchemaCount(), Equals, int64(100)) + tk.MustExec(fmt.Sprintf("set @@global.tidb_max_delta_schema_count= %v", uint64(math.MaxInt64))) + tk.MustQuery("show warnings;").Check(testkit.Rows(fmt.Sprintf("Warning 1292 Truncated incorrect tidb_max_delta_schema_count value: '%d'", uint64(math.MaxInt64)))) + tk.Se = nil + tk.MustExec("use test") + c.Assert(variable.GetMaxDeltaSchemaCount(), Equals, int64(16384)) + _, err := tk.Exec("set @@global.tidb_max_delta_schema_count= invalid_val") + c.Assert(terror.ErrorEqual(err, variable.ErrWrongTypeForVar), IsTrue, Commentf("err %v", err)) + + tk.MustExec("set @@global.tidb_max_delta_schema_count= 2048") + tk.Se = nil + tk.MustExec("use test") + c.Assert(variable.GetMaxDeltaSchemaCount(), Equals, int64(2048)) + tk.MustQuery("select @@global.tidb_max_delta_schema_count").Check(testkit.Rows("2048")) +} diff --git a/metrics/domain.go b/metrics/domain.go index 017e007e4fb98..a8ea4e3d4cbbf 100644 --- a/metrics/domain.go +++ b/metrics/domain.go @@ -17,6 +17,7 @@ import ( "github.com/prometheus/client_golang/prometheus" ) +// Metrics for the domain package. var ( // LoadSchemaCounter records the counter of load schema. LoadSchemaCounter = prometheus.NewCounterVec( @@ -45,4 +46,18 @@ var ( Name: "load_privilege_total", Help: "Counter of load privilege", }, []string{LblType}) + + SchemaValidatorStop = "stop" + SchemaValidatorRestart = "restart" + SchemaValidatorReset = "reset" + SchemaValidatorCacheEmpty = "cache_empty" + SchemaValidatorCacheMiss = "cache_miss" + // HandleSchemaValidate records the counter of handling schema validate. + HandleSchemaValidate = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "tidb", + Subsystem: "domain", + Name: "handle_schema_validate", + Help: "Counter of handle schema validate", + }, []string{LblType}) ) diff --git a/metrics/metrics.go b/metrics/metrics.go index 5a43c4168f03c..a09390ab9e33c 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -150,5 +150,6 @@ func RegisterMetrics() { prometheus.MustRegister(TiKVBatchClientUnavailable) prometheus.MustRegister(TiKVRangeTaskStats) prometheus.MustRegister(TiKVRangeTaskPushDuration) + prometheus.MustRegister(HandleSchemaValidate) prometheus.MustRegister(GRPCConnTransientFailureCounter) } diff --git a/session/session.go b/session/session.go index 68fd962bb05b8..e2d69e0f0e51c 100644 --- a/session/session.go +++ b/session/session.go @@ -1702,6 +1702,7 @@ var builtinGlobalVariable = []string{ variable.TiDBExpensiveQueryTimeThreshold, variable.TiDBTxnMode, variable.TiDBEnableStmtSummary, + variable.TiDBMaxDeltaSchemaCount, } var ( diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index a81aca3836932..42b40d435a4a8 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -833,6 +833,9 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { s.LowResolutionTSO = TiDBOptOn(val) case TiDBAllowRemoveAutoInc: s.AllowRemoveAutoInc = TiDBOptOn(val) + // It's a global variable, but it also wants to be cached in server. + case TiDBMaxDeltaSchemaCount: + SetMaxDeltaSchemaCount(tidbOptInt64(val, DefTiDBMaxDeltaSchemaCount)) } s.systems[name] = val return nil diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index dbecc46d744fa..fc883f6e14223 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -699,6 +699,7 @@ var defaultSysVars = []*SysVar{ {ScopeGlobal, TiDBDDLReorgBatchSize, strconv.Itoa(DefTiDBDDLReorgBatchSize)}, {ScopeGlobal, TiDBDDLErrorCountLimit, strconv.Itoa(DefTiDBDDLErrorCountLimit)}, {ScopeSession, TiDBDDLReorgPriority, "PRIORITY_LOW"}, + {ScopeGlobal, TiDBMaxDeltaSchemaCount, strconv.Itoa(DefTiDBMaxDeltaSchemaCount)}, {ScopeSession, TiDBForcePriority, mysql.Priority2Str[DefTiDBForcePriority]}, {ScopeSession, TiDBEnableRadixJoin, BoolToIntStr(DefTiDBUseRadixJoin)}, {ScopeGlobal | ScopeSession, TiDBOptJoinReorderThreshold, strconv.Itoa(DefTiDBOptJoinReorderThreshold)}, diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index eaa34a3e06f05..895adaea446cb 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -248,6 +248,10 @@ const ( // It can be: PRIORITY_LOW, PRIORITY_NORMAL, PRIORITY_HIGH TiDBDDLReorgPriority = "tidb_ddl_reorg_priority" + // tidb_max_delta_schema_count defines the max length of deltaSchemaInfos. + // deltaSchemaInfos is a queue that maintains the history of schema changes. + TiDBMaxDeltaSchemaCount = "tidb_max_delta_schema_count" + // tidb_scatter_region will scatter the regions for DDLs when it is ON. TiDBScatterRegion = "tidb_scatter_region" @@ -337,6 +341,7 @@ const ( DefTiDBDDLReorgWorkerCount = 4 DefTiDBDDLReorgBatchSize = 256 DefTiDBDDLErrorCountLimit = 512 + DefTiDBMaxDeltaSchemaCount = 1024 DefTiDBHashAggPartialConcurrency = 4 DefTiDBHashAggFinalConcurrency = 4 DefTiDBForcePriority = mysql.NoPriority @@ -360,6 +365,7 @@ var ( maxDDLReorgWorkerCount int32 = 128 ddlReorgBatchSize int32 = DefTiDBDDLReorgBatchSize ddlErrorCountlimit int64 = DefTiDBDDLErrorCountLimit + maxDeltaSchemaCount int64 = DefTiDBMaxDeltaSchemaCount // Export for testing. MaxDDLReorgBatchSize int32 = 10240 MinDDLReorgBatchSize int32 = 32 diff --git a/sessionctx/variable/varsutil.go b/sessionctx/variable/varsutil.go index a4fdf5c439a17..bd9ac487b1337 100644 --- a/sessionctx/variable/varsutil.go +++ b/sessionctx/variable/varsutil.go @@ -74,6 +74,16 @@ func GetDDLErrorCountLimit() int64 { return atomic.LoadInt64(&ddlErrorCountlimit) } +// SetMaxDeltaSchemaCount sets maxDeltaSchemaCount size. +func SetMaxDeltaSchemaCount(cnt int64) { + atomic.StoreInt64(&maxDeltaSchemaCount, cnt) +} + +// GetMaxDeltaSchemaCount gets maxDeltaSchemaCount size. +func GetMaxDeltaSchemaCount() int64 { + return atomic.LoadInt64(&maxDeltaSchemaCount) +} + // GetSessionSystemVar gets a system variable. // If it is a session only variable, use the default value defined in code. // Returns error if there is no such variable. @@ -321,6 +331,8 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string, return checkUInt64SystemVar(name, value, 0, 4294967295, vars) case OldPasswords: return checkUInt64SystemVar(name, value, 0, 2, vars) + case TiDBMaxDeltaSchemaCount: + return checkInt64SystemVar(name, value, 100, 16384, vars) case SessionTrackGtids: if strings.EqualFold(value, "OFF") || value == "0" { return "OFF", nil