diff --git a/executor/ddl_test.go b/executor/ddl_test.go index f5f1a6c3463cf..1e8c4701a814a 100644 --- a/executor/ddl_test.go +++ b/executor/ddl_test.go @@ -1239,9 +1239,9 @@ func (s *testSuite6) TestSetDDLReorgWorkerCnt(c *C) { res = tk.MustQuery("select @@global.tidb_ddl_reorg_worker_cnt") res.Check(testkit.Rows("100")) - tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 129") - tk.MustQuery("SHOW WARNINGS").Check(testkit.Rows("Warning 1292 Truncated incorrect tidb_ddl_reorg_worker_cnt value: '129'")) - tk.MustQuery("select @@global.tidb_ddl_reorg_worker_cnt").Check(testkit.Rows("128")) + tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 257") + tk.MustQuery("SHOW WARNINGS").Check(testkit.Rows("Warning 1292 Truncated incorrect tidb_ddl_reorg_worker_cnt value: '257'")) + tk.MustQuery("select @@global.tidb_ddl_reorg_worker_cnt").Check(testkit.Rows("256")) } func (s *testSuite6) TestSetDDLReorgBatchSize(c *C) { diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index a7deae5230c68..2f45e6dfcc852 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -891,11 +891,11 @@ var defaultSysVars = []*SysVar{ {Scope: ScopeGlobal, Name: TiDBAutoAnalyzeStartTime, Value: DefAutoAnalyzeStartTime, Type: TypeTime}, {Scope: ScopeGlobal, Name: TiDBAutoAnalyzeEndTime, Value: DefAutoAnalyzeEndTime, Type: TypeTime}, {Scope: ScopeSession, Name: TiDBChecksumTableConcurrency, skipInit: true, Value: strconv.Itoa(DefChecksumTableConcurrency)}, - {Scope: ScopeGlobal | ScopeSession, Name: TiDBExecutorConcurrency, Value: strconv.Itoa(DefExecutorConcurrency), Type: TypeUnsigned, MinValue: 1, MaxValue: math.MaxInt32, SetSession: func(s *SessionVars, val string) error { + {Scope: ScopeGlobal | ScopeSession, Name: TiDBExecutorConcurrency, Value: strconv.Itoa(DefExecutorConcurrency), Type: TypeUnsigned, MinValue: 1, MaxValue: MaxConfigurableConcurrency, SetSession: func(s *SessionVars, val string) error { s.ExecutorConcurrency = tidbOptPositiveInt32(val, DefExecutorConcurrency) return nil }}, - {Scope: ScopeGlobal | ScopeSession, Name: TiDBDistSQLScanConcurrency, Value: strconv.Itoa(DefDistSQLScanConcurrency), Type: TypeUnsigned, MinValue: 1, MaxValue: math.MaxInt32, SetSession: func(s *SessionVars, val string) error { + {Scope: ScopeGlobal | ScopeSession, Name: TiDBDistSQLScanConcurrency, Value: strconv.Itoa(DefDistSQLScanConcurrency), Type: TypeUnsigned, MinValue: 1, MaxValue: MaxConfigurableConcurrency, SetSession: func(s *SessionVars, val string) error { s.distSQLScanConcurrency = tidbOptPositiveInt32(val, DefDistSQLScanConcurrency) return nil }}, @@ -972,21 +972,21 @@ var defaultSysVars = []*SysVar{ s.IndexLookupSize = tidbOptPositiveInt32(val, DefIndexLookupSize) return nil }}, - {Scope: ScopeGlobal | ScopeSession, Name: TiDBIndexLookupConcurrency, Value: strconv.Itoa(DefIndexLookupConcurrency), Type: TypeInt, MinValue: 1, MaxValue: math.MaxInt32, AllowAutoValue: true, SetSession: func(s *SessionVars, val string) error { + {Scope: ScopeGlobal | ScopeSession, Name: TiDBIndexLookupConcurrency, Value: strconv.Itoa(DefIndexLookupConcurrency), Type: TypeInt, MinValue: 1, MaxValue: MaxConfigurableConcurrency, AllowAutoValue: true, SetSession: func(s *SessionVars, val string) error { s.indexLookupConcurrency = tidbOptPositiveInt32(val, ConcurrencyUnset) return nil }, Validation: func(vars *SessionVars, normalizedValue string, originalValue string, scope ScopeFlag) (string, error) { appendDeprecationWarning(vars, TiDBIndexLookupConcurrency, TiDBExecutorConcurrency) return normalizedValue, nil }}, - {Scope: ScopeGlobal | ScopeSession, Name: TiDBIndexLookupJoinConcurrency, Value: strconv.Itoa(DefIndexLookupJoinConcurrency), Type: TypeInt, MinValue: 1, MaxValue: math.MaxInt32, AllowAutoValue: true, SetSession: func(s *SessionVars, val string) error { + {Scope: ScopeGlobal | ScopeSession, Name: TiDBIndexLookupJoinConcurrency, Value: strconv.Itoa(DefIndexLookupJoinConcurrency), Type: TypeInt, MinValue: 1, MaxValue: MaxConfigurableConcurrency, AllowAutoValue: true, SetSession: func(s *SessionVars, val string) error { s.indexLookupJoinConcurrency = tidbOptPositiveInt32(val, ConcurrencyUnset) return nil }, Validation: func(vars *SessionVars, normalizedValue string, originalValue string, scope ScopeFlag) (string, error) { appendDeprecationWarning(vars, TiDBIndexLookupJoinConcurrency, TiDBExecutorConcurrency) return normalizedValue, nil }}, - {Scope: ScopeGlobal | ScopeSession, Name: TiDBIndexSerialScanConcurrency, Value: strconv.Itoa(DefIndexSerialScanConcurrency), Type: TypeUnsigned, MinValue: 1, MaxValue: math.MaxInt32, SetSession: func(s *SessionVars, val string) error { + {Scope: ScopeGlobal | ScopeSession, Name: TiDBIndexSerialScanConcurrency, Value: strconv.Itoa(DefIndexSerialScanConcurrency), Type: TypeUnsigned, MinValue: 1, MaxValue: MaxConfigurableConcurrency, SetSession: func(s *SessionVars, val string) error { s.indexSerialScanConcurrency = tidbOptPositiveInt32(val, DefIndexSerialScanConcurrency) return nil }}, @@ -1122,42 +1122,42 @@ var defaultSysVars = []*SysVar{ s.EnableListTablePartition = TiDBOptOn(val) return nil }}, - {Scope: ScopeGlobal | ScopeSession, Name: TiDBHashJoinConcurrency, Value: strconv.Itoa(DefTiDBHashJoinConcurrency), Type: TypeInt, MinValue: 1, MaxValue: math.MaxInt32, AllowAutoValue: true, SetSession: func(s *SessionVars, val string) error { + {Scope: ScopeGlobal | ScopeSession, Name: TiDBHashJoinConcurrency, Value: strconv.Itoa(DefTiDBHashJoinConcurrency), Type: TypeInt, MinValue: 1, MaxValue: MaxConfigurableConcurrency, AllowAutoValue: true, SetSession: func(s *SessionVars, val string) error { s.hashJoinConcurrency = tidbOptPositiveInt32(val, ConcurrencyUnset) return nil }, Validation: func(vars *SessionVars, normalizedValue string, originalValue string, scope ScopeFlag) (string, error) { appendDeprecationWarning(vars, TiDBHashJoinConcurrency, TiDBExecutorConcurrency) return normalizedValue, nil }}, - {Scope: ScopeGlobal | ScopeSession, Name: TiDBProjectionConcurrency, Value: strconv.Itoa(DefTiDBProjectionConcurrency), Type: TypeInt, MinValue: -1, MaxValue: math.MaxInt32, SetSession: func(s *SessionVars, val string) error { + {Scope: ScopeGlobal | ScopeSession, Name: TiDBProjectionConcurrency, Value: strconv.Itoa(DefTiDBProjectionConcurrency), Type: TypeInt, MinValue: -1, MaxValue: MaxConfigurableConcurrency, SetSession: func(s *SessionVars, val string) error { s.projectionConcurrency = tidbOptPositiveInt32(val, ConcurrencyUnset) return nil }, Validation: func(vars *SessionVars, normalizedValue string, originalValue string, scope ScopeFlag) (string, error) { appendDeprecationWarning(vars, TiDBProjectionConcurrency, TiDBExecutorConcurrency) return normalizedValue, nil }}, - {Scope: ScopeGlobal | ScopeSession, Name: TiDBHashAggPartialConcurrency, Value: strconv.Itoa(DefTiDBHashAggPartialConcurrency), Type: TypeInt, MinValue: 1, MaxValue: math.MaxInt32, AllowAutoValue: true, SetSession: func(s *SessionVars, val string) error { + {Scope: ScopeGlobal | ScopeSession, Name: TiDBHashAggPartialConcurrency, Value: strconv.Itoa(DefTiDBHashAggPartialConcurrency), Type: TypeInt, MinValue: 1, MaxValue: MaxConfigurableConcurrency, AllowAutoValue: true, SetSession: func(s *SessionVars, val string) error { s.hashAggPartialConcurrency = tidbOptPositiveInt32(val, ConcurrencyUnset) return nil }, Validation: func(vars *SessionVars, normalizedValue string, originalValue string, scope ScopeFlag) (string, error) { appendDeprecationWarning(vars, TiDBHashAggPartialConcurrency, TiDBExecutorConcurrency) return normalizedValue, nil }}, - {Scope: ScopeGlobal | ScopeSession, Name: TiDBHashAggFinalConcurrency, Value: strconv.Itoa(DefTiDBHashAggFinalConcurrency), Type: TypeInt, MinValue: 1, MaxValue: math.MaxInt32, AllowAutoValue: true, SetSession: func(s *SessionVars, val string) error { + {Scope: ScopeGlobal | ScopeSession, Name: TiDBHashAggFinalConcurrency, Value: strconv.Itoa(DefTiDBHashAggFinalConcurrency), Type: TypeInt, MinValue: 1, MaxValue: MaxConfigurableConcurrency, AllowAutoValue: true, SetSession: func(s *SessionVars, val string) error { s.hashAggFinalConcurrency = tidbOptPositiveInt32(val, ConcurrencyUnset) return nil }, Validation: func(vars *SessionVars, normalizedValue string, originalValue string, scope ScopeFlag) (string, error) { appendDeprecationWarning(vars, TiDBHashAggFinalConcurrency, TiDBExecutorConcurrency) return normalizedValue, nil }}, - {Scope: ScopeGlobal | ScopeSession, Name: TiDBWindowConcurrency, Value: strconv.Itoa(DefTiDBWindowConcurrency), Type: TypeInt, MinValue: 1, MaxValue: math.MaxInt32, AllowAutoValue: true, SetSession: func(s *SessionVars, val string) error { + {Scope: ScopeGlobal | ScopeSession, Name: TiDBWindowConcurrency, Value: strconv.Itoa(DefTiDBWindowConcurrency), Type: TypeInt, MinValue: 1, MaxValue: MaxConfigurableConcurrency, AllowAutoValue: true, SetSession: func(s *SessionVars, val string) error { s.windowConcurrency = tidbOptPositiveInt32(val, ConcurrencyUnset) return nil }, Validation: func(vars *SessionVars, normalizedValue string, originalValue string, scope ScopeFlag) (string, error) { appendDeprecationWarning(vars, TiDBWindowConcurrency, TiDBExecutorConcurrency) return normalizedValue, nil }}, - {Scope: ScopeGlobal | ScopeSession, Name: TiDBMergeJoinConcurrency, Value: strconv.Itoa(DefTiDBMergeJoinConcurrency), Type: TypeInt, MinValue: 1, MaxValue: math.MaxInt32, AllowAutoValue: true, SetSession: func(s *SessionVars, val string) error { + {Scope: ScopeGlobal | ScopeSession, Name: TiDBMergeJoinConcurrency, Value: strconv.Itoa(DefTiDBMergeJoinConcurrency), Type: TypeInt, MinValue: 1, MaxValue: MaxConfigurableConcurrency, AllowAutoValue: true, SetSession: func(s *SessionVars, val string) error { s.mergeJoinConcurrency = tidbOptPositiveInt32(val, ConcurrencyUnset) return nil }, Validation: func(vars *SessionVars, normalizedValue string, originalValue string, scope ScopeFlag) (string, error) { @@ -1165,7 +1165,7 @@ var defaultSysVars = []*SysVar{ return normalizedValue, nil }}, - {Scope: ScopeGlobal | ScopeSession, Name: TiDBStreamAggConcurrency, Value: strconv.Itoa(DefTiDBStreamAggConcurrency), Type: TypeInt, MinValue: 1, MaxValue: math.MaxInt32, AllowAutoValue: true, SetSession: func(s *SessionVars, val string) error { + {Scope: ScopeGlobal | ScopeSession, Name: TiDBStreamAggConcurrency, Value: strconv.Itoa(DefTiDBStreamAggConcurrency), Type: TypeInt, MinValue: 1, MaxValue: MaxConfigurableConcurrency, AllowAutoValue: true, SetSession: func(s *SessionVars, val string) error { s.streamAggConcurrency = tidbOptPositiveInt32(val, ConcurrencyUnset) return nil }, Validation: func(vars *SessionVars, normalizedValue string, originalValue string, scope ScopeFlag) (string, error) { @@ -1307,7 +1307,7 @@ var defaultSysVars = []*SysVar{ } return config.HideConfig(string(j)), nil }}, - {Scope: ScopeGlobal, Name: TiDBDDLReorgWorkerCount, Value: strconv.Itoa(DefTiDBDDLReorgWorkerCount), Type: TypeUnsigned, MinValue: 1, MaxValue: uint64(maxDDLReorgWorkerCount), SetSession: func(s *SessionVars, val string) error { + {Scope: ScopeGlobal, Name: TiDBDDLReorgWorkerCount, Value: strconv.Itoa(DefTiDBDDLReorgWorkerCount), Type: TypeUnsigned, MinValue: 1, MaxValue: MaxConfigurableConcurrency, SetSession: func(s *SessionVars, val string) error { SetDDLReorgWorkerCounter(int32(tidbOptPositiveInt32(val, DefTiDBDDLReorgWorkerCount))) return nil }}, @@ -1707,7 +1707,7 @@ var defaultSysVars = []*SysVar{ }, SetGlobal: func(s *SessionVars, val string) error { return setTiDBTableValue(s, "tikv_gc_life_time", val, "All versions within life time will not be collected by GC, at least 10m, in Go format.") }}, - {Scope: ScopeGlobal, Name: TiDBGCConcurrency, Value: "-1", Type: TypeInt, MinValue: 1, MaxValue: 128, AllowAutoValue: true, GetGlobal: func(s *SessionVars) (string, error) { + {Scope: ScopeGlobal, Name: TiDBGCConcurrency, Value: "-1", Type: TypeInt, MinValue: 1, MaxValue: MaxConfigurableConcurrency, AllowAutoValue: true, GetGlobal: func(s *SessionVars) (string, error) { autoConcurrencyVal, err := getTiDBTableValue(s, "tikv_gc_auto_concurrency", On) if err == nil && autoConcurrencyVal == On { return "-1", nil // convention for "AUTO" @@ -1722,7 +1722,7 @@ var defaultSysVars = []*SysVar{ if err := setTiDBTableValue(s, "tikv_gc_auto_concurrency", autoConcurrency, "Let TiDB pick the concurrency automatically. If set false, tikv_gc_concurrency will be used"); err != nil { return err } - return setTiDBTableValue(s, "tikv_gc_concurrency", val, "How many goroutines used to do GC parallel, [1, 128], default 2") + return setTiDBTableValue(s, "tikv_gc_concurrency", val, "How many goroutines used to do GC parallel, [1, 256], default 2") }}, {Scope: ScopeGlobal, Name: TiDBGCScanLockMode, Value: "LEGACY", Type: TypeEnum, PossibleValues: []string{"PHYSICAL", "LEGACY"}, GetGlobal: func(s *SessionVars) (string, error) { return getTiDBTableValue(s, "tikv_gc_scan_lock_mode", "LEGACY") diff --git a/sessionctx/variable/sysvar_test.go b/sessionctx/variable/sysvar_test.go index 4b2883e3d5a6d..b0c4e94e84490 100644 --- a/sessionctx/variable/sysvar_test.go +++ b/sessionctx/variable/sysvar_test.go @@ -248,7 +248,7 @@ func TestDeprecation(t *testing.T) { vars := NewSessionVars() - _, err := sysVar.Validate(vars, "1234", ScopeSession) + _, err := sysVar.Validate(vars, "123", ScopeSession) require.NoError(t, err) // There was no error but there is a deprecation warning. @@ -758,3 +758,29 @@ func TestIdentity(t *testing.T) { require.NoError(t, err) require.Equal(t, val, "21") } + +func TestDDLWorkers(t *testing.T) { + svWorkerCount, svBatchSize := GetSysVar(TiDBDDLReorgWorkerCount), GetSysVar(TiDBDDLReorgBatchSize) + vars := NewSessionVars() + vars.GlobalVarsAccessor = NewMockGlobalAccessor4Tests() + + val, err := svWorkerCount.Validate(vars, "-100", ScopeGlobal) + require.NoError(t, err) + require.Equal(t, val, "1") // converts it to min value + val, err = svWorkerCount.Validate(vars, "1234", ScopeGlobal) + require.NoError(t, err) + require.Equal(t, val, "256") // converts it to max value + val, err = svWorkerCount.Validate(vars, "100", ScopeGlobal) + require.NoError(t, err) + require.Equal(t, val, "100") // unchanged + + val, err = svBatchSize.Validate(vars, "10", ScopeGlobal) + require.NoError(t, err) + require.Equal(t, val, fmt.Sprint(MinDDLReorgBatchSize)) // converts it to min value + val, err = svBatchSize.Validate(vars, "999999", ScopeGlobal) + require.NoError(t, err) + require.Equal(t, val, fmt.Sprint(MaxDDLReorgBatchSize)) // converts it to max value + val, err = svBatchSize.Validate(vars, "100", ScopeGlobal) + require.NoError(t, err) + require.Equal(t, val, "100") // unchanged +} diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index fb26e994c5825..d3ab0b104bc36 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -608,6 +608,15 @@ const ( TiDBEnableEnhancedSecurity = "tidb_enable_enhanced_security" ) +// TiDB intentional limits +// Can be raised in future. + +const ( + // MaxConfigurableConcurrency is the maximum number of "threads" (goroutines) that can be specified + // for any type of configuration item that has concurrent workers. + MaxConfigurableConcurrency = 256 +) + // Default TiDB system variable values. const ( DefHostname = "localhost" @@ -754,14 +763,13 @@ const ( // Process global variables. var ( - ProcessGeneralLog = atomic.NewBool(false) - EnablePProfSQLCPU = atomic.NewBool(false) - ddlReorgWorkerCounter int32 = DefTiDBDDLReorgWorkerCount - maxDDLReorgWorkerCount int32 = 128 - ddlReorgBatchSize int32 = DefTiDBDDLReorgBatchSize - ddlErrorCountlimit int64 = DefTiDBDDLErrorCountLimit - ddlReorgRowFormat int64 = DefTiDBRowFormatV2 - maxDeltaSchemaCount int64 = DefTiDBMaxDeltaSchemaCount + ProcessGeneralLog = atomic.NewBool(false) + EnablePProfSQLCPU = atomic.NewBool(false) + ddlReorgWorkerCounter int32 = DefTiDBDDLReorgWorkerCount + ddlReorgBatchSize int32 = DefTiDBDDLReorgBatchSize + ddlErrorCountlimit int64 = DefTiDBDDLErrorCountLimit + ddlReorgRowFormat int64 = DefTiDBRowFormatV2 + maxDeltaSchemaCount int64 = DefTiDBMaxDeltaSchemaCount // Export for testing. MaxDDLReorgBatchSize int32 = 10240 MinDDLReorgBatchSize int32 = 32 diff --git a/sessionctx/variable/varsutil.go b/sessionctx/variable/varsutil.go index 3df177b0040c4..c308a6a9320c5 100644 --- a/sessionctx/variable/varsutil.go +++ b/sessionctx/variable/varsutil.go @@ -35,11 +35,8 @@ import ( const secondsPerYear = 60 * 60 * 24 * 365 // SetDDLReorgWorkerCounter sets ddlReorgWorkerCounter count. -// Max worker count is maxDDLReorgWorkerCount. +// Sysvar validation enforces the range to already be correct. func SetDDLReorgWorkerCounter(cnt int32) { - if cnt > maxDDLReorgWorkerCount { - cnt = maxDDLReorgWorkerCount - } atomic.StoreInt32(&ddlReorgWorkerCounter, cnt) } @@ -49,14 +46,8 @@ func GetDDLReorgWorkerCounter() int32 { } // SetDDLReorgBatchSize sets ddlReorgBatchSize size. -// Max batch size is MaxDDLReorgBatchSize. +// Sysvar validation enforces the range to already be correct. func SetDDLReorgBatchSize(cnt int32) { - if cnt > MaxDDLReorgBatchSize { - cnt = MaxDDLReorgBatchSize - } - if cnt < MinDDLReorgBatchSize { - cnt = MinDDLReorgBatchSize - } atomic.StoreInt32(&ddlReorgBatchSize, cnt) } diff --git a/sessionctx/variable/varsutil_test.go b/sessionctx/variable/varsutil_test.go index afe24ac596b10..35bf42cba4a76 100644 --- a/sessionctx/variable/varsutil_test.go +++ b/sessionctx/variable/varsutil_test.go @@ -496,24 +496,6 @@ func TestVarsutil(t *testing.T) { require.Regexp(t, ".*Truncated incorrect tidb_analyze_version value", warn.Err.Error()) } -func TestSetOverflowBehave(t *testing.T) { - ddRegWorker := maxDDLReorgWorkerCount + 1 - SetDDLReorgWorkerCounter(ddRegWorker) - require.Equal(t, GetDDLReorgWorkerCounter(), maxDDLReorgWorkerCount) - - ddlReorgBatchSize := MaxDDLReorgBatchSize + 1 - SetDDLReorgBatchSize(ddlReorgBatchSize) - require.Equal(t, GetDDLReorgBatchSize(), MaxDDLReorgBatchSize) - ddlReorgBatchSize = MinDDLReorgBatchSize - 1 - SetDDLReorgBatchSize(ddlReorgBatchSize) - require.Equal(t, GetDDLReorgBatchSize(), MinDDLReorgBatchSize) - - val := tidbOptInt64("a", 1) - require.Equal(t, int64(1), val) - val2 := tidbOptFloat64("b", 1.2) - require.Equal(t, 1.2, val2) -} - func TestValidate(t *testing.T) { v := NewSessionVars() v.GlobalVarsAccessor = NewMockGlobalAccessor4Tests()