Skip to content

Commit

Permalink
sessionctx/variable, executor: introduce a limit on "thread" config (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
morgo authored Oct 22, 2021
1 parent 6766833 commit 42b0bec
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 56 deletions.
6 changes: 3 additions & 3 deletions executor/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1239,9 +1239,9 @@ func (s *testSuite6) TestSetDDLReorgWorkerCnt(c *C) {
res = tk.MustQuery("select @@global.tidb_ddl_reorg_worker_cnt")
res.Check(testkit.Rows("100"))

tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 129")
tk.MustQuery("SHOW WARNINGS").Check(testkit.Rows("Warning 1292 Truncated incorrect tidb_ddl_reorg_worker_cnt value: '129'"))
tk.MustQuery("select @@global.tidb_ddl_reorg_worker_cnt").Check(testkit.Rows("128"))
tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 257")
tk.MustQuery("SHOW WARNINGS").Check(testkit.Rows("Warning 1292 Truncated incorrect tidb_ddl_reorg_worker_cnt value: '257'"))
tk.MustQuery("select @@global.tidb_ddl_reorg_worker_cnt").Check(testkit.Rows("256"))
}

func (s *testSuite6) TestSetDDLReorgBatchSize(c *C) {
Expand Down
30 changes: 15 additions & 15 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -891,11 +891,11 @@ var defaultSysVars = []*SysVar{
{Scope: ScopeGlobal, Name: TiDBAutoAnalyzeStartTime, Value: DefAutoAnalyzeStartTime, Type: TypeTime},
{Scope: ScopeGlobal, Name: TiDBAutoAnalyzeEndTime, Value: DefAutoAnalyzeEndTime, Type: TypeTime},
{Scope: ScopeSession, Name: TiDBChecksumTableConcurrency, skipInit: true, Value: strconv.Itoa(DefChecksumTableConcurrency)},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBExecutorConcurrency, Value: strconv.Itoa(DefExecutorConcurrency), Type: TypeUnsigned, MinValue: 1, MaxValue: math.MaxInt32, SetSession: func(s *SessionVars, val string) error {
{Scope: ScopeGlobal | ScopeSession, Name: TiDBExecutorConcurrency, Value: strconv.Itoa(DefExecutorConcurrency), Type: TypeUnsigned, MinValue: 1, MaxValue: MaxConfigurableConcurrency, SetSession: func(s *SessionVars, val string) error {
s.ExecutorConcurrency = tidbOptPositiveInt32(val, DefExecutorConcurrency)
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBDistSQLScanConcurrency, Value: strconv.Itoa(DefDistSQLScanConcurrency), Type: TypeUnsigned, MinValue: 1, MaxValue: math.MaxInt32, SetSession: func(s *SessionVars, val string) error {
{Scope: ScopeGlobal | ScopeSession, Name: TiDBDistSQLScanConcurrency, Value: strconv.Itoa(DefDistSQLScanConcurrency), Type: TypeUnsigned, MinValue: 1, MaxValue: MaxConfigurableConcurrency, SetSession: func(s *SessionVars, val string) error {
s.distSQLScanConcurrency = tidbOptPositiveInt32(val, DefDistSQLScanConcurrency)
return nil
}},
Expand Down Expand Up @@ -972,21 +972,21 @@ var defaultSysVars = []*SysVar{
s.IndexLookupSize = tidbOptPositiveInt32(val, DefIndexLookupSize)
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBIndexLookupConcurrency, Value: strconv.Itoa(DefIndexLookupConcurrency), Type: TypeInt, MinValue: 1, MaxValue: math.MaxInt32, AllowAutoValue: true, SetSession: func(s *SessionVars, val string) error {
{Scope: ScopeGlobal | ScopeSession, Name: TiDBIndexLookupConcurrency, Value: strconv.Itoa(DefIndexLookupConcurrency), Type: TypeInt, MinValue: 1, MaxValue: MaxConfigurableConcurrency, AllowAutoValue: true, SetSession: func(s *SessionVars, val string) error {
s.indexLookupConcurrency = tidbOptPositiveInt32(val, ConcurrencyUnset)
return nil
}, Validation: func(vars *SessionVars, normalizedValue string, originalValue string, scope ScopeFlag) (string, error) {
appendDeprecationWarning(vars, TiDBIndexLookupConcurrency, TiDBExecutorConcurrency)
return normalizedValue, nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBIndexLookupJoinConcurrency, Value: strconv.Itoa(DefIndexLookupJoinConcurrency), Type: TypeInt, MinValue: 1, MaxValue: math.MaxInt32, AllowAutoValue: true, SetSession: func(s *SessionVars, val string) error {
{Scope: ScopeGlobal | ScopeSession, Name: TiDBIndexLookupJoinConcurrency, Value: strconv.Itoa(DefIndexLookupJoinConcurrency), Type: TypeInt, MinValue: 1, MaxValue: MaxConfigurableConcurrency, AllowAutoValue: true, SetSession: func(s *SessionVars, val string) error {
s.indexLookupJoinConcurrency = tidbOptPositiveInt32(val, ConcurrencyUnset)
return nil
}, Validation: func(vars *SessionVars, normalizedValue string, originalValue string, scope ScopeFlag) (string, error) {
appendDeprecationWarning(vars, TiDBIndexLookupJoinConcurrency, TiDBExecutorConcurrency)
return normalizedValue, nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBIndexSerialScanConcurrency, Value: strconv.Itoa(DefIndexSerialScanConcurrency), Type: TypeUnsigned, MinValue: 1, MaxValue: math.MaxInt32, SetSession: func(s *SessionVars, val string) error {
{Scope: ScopeGlobal | ScopeSession, Name: TiDBIndexSerialScanConcurrency, Value: strconv.Itoa(DefIndexSerialScanConcurrency), Type: TypeUnsigned, MinValue: 1, MaxValue: MaxConfigurableConcurrency, SetSession: func(s *SessionVars, val string) error {
s.indexSerialScanConcurrency = tidbOptPositiveInt32(val, DefIndexSerialScanConcurrency)
return nil
}},
Expand Down Expand Up @@ -1122,50 +1122,50 @@ var defaultSysVars = []*SysVar{
s.EnableListTablePartition = TiDBOptOn(val)
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBHashJoinConcurrency, Value: strconv.Itoa(DefTiDBHashJoinConcurrency), Type: TypeInt, MinValue: 1, MaxValue: math.MaxInt32, AllowAutoValue: true, SetSession: func(s *SessionVars, val string) error {
{Scope: ScopeGlobal | ScopeSession, Name: TiDBHashJoinConcurrency, Value: strconv.Itoa(DefTiDBHashJoinConcurrency), Type: TypeInt, MinValue: 1, MaxValue: MaxConfigurableConcurrency, AllowAutoValue: true, SetSession: func(s *SessionVars, val string) error {
s.hashJoinConcurrency = tidbOptPositiveInt32(val, ConcurrencyUnset)
return nil
}, Validation: func(vars *SessionVars, normalizedValue string, originalValue string, scope ScopeFlag) (string, error) {
appendDeprecationWarning(vars, TiDBHashJoinConcurrency, TiDBExecutorConcurrency)
return normalizedValue, nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBProjectionConcurrency, Value: strconv.Itoa(DefTiDBProjectionConcurrency), Type: TypeInt, MinValue: -1, MaxValue: math.MaxInt32, SetSession: func(s *SessionVars, val string) error {
{Scope: ScopeGlobal | ScopeSession, Name: TiDBProjectionConcurrency, Value: strconv.Itoa(DefTiDBProjectionConcurrency), Type: TypeInt, MinValue: -1, MaxValue: MaxConfigurableConcurrency, SetSession: func(s *SessionVars, val string) error {
s.projectionConcurrency = tidbOptPositiveInt32(val, ConcurrencyUnset)
return nil
}, Validation: func(vars *SessionVars, normalizedValue string, originalValue string, scope ScopeFlag) (string, error) {
appendDeprecationWarning(vars, TiDBProjectionConcurrency, TiDBExecutorConcurrency)
return normalizedValue, nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBHashAggPartialConcurrency, Value: strconv.Itoa(DefTiDBHashAggPartialConcurrency), Type: TypeInt, MinValue: 1, MaxValue: math.MaxInt32, AllowAutoValue: true, SetSession: func(s *SessionVars, val string) error {
{Scope: ScopeGlobal | ScopeSession, Name: TiDBHashAggPartialConcurrency, Value: strconv.Itoa(DefTiDBHashAggPartialConcurrency), Type: TypeInt, MinValue: 1, MaxValue: MaxConfigurableConcurrency, AllowAutoValue: true, SetSession: func(s *SessionVars, val string) error {
s.hashAggPartialConcurrency = tidbOptPositiveInt32(val, ConcurrencyUnset)
return nil
}, Validation: func(vars *SessionVars, normalizedValue string, originalValue string, scope ScopeFlag) (string, error) {
appendDeprecationWarning(vars, TiDBHashAggPartialConcurrency, TiDBExecutorConcurrency)
return normalizedValue, nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBHashAggFinalConcurrency, Value: strconv.Itoa(DefTiDBHashAggFinalConcurrency), Type: TypeInt, MinValue: 1, MaxValue: math.MaxInt32, AllowAutoValue: true, SetSession: func(s *SessionVars, val string) error {
{Scope: ScopeGlobal | ScopeSession, Name: TiDBHashAggFinalConcurrency, Value: strconv.Itoa(DefTiDBHashAggFinalConcurrency), Type: TypeInt, MinValue: 1, MaxValue: MaxConfigurableConcurrency, AllowAutoValue: true, SetSession: func(s *SessionVars, val string) error {
s.hashAggFinalConcurrency = tidbOptPositiveInt32(val, ConcurrencyUnset)
return nil
}, Validation: func(vars *SessionVars, normalizedValue string, originalValue string, scope ScopeFlag) (string, error) {
appendDeprecationWarning(vars, TiDBHashAggFinalConcurrency, TiDBExecutorConcurrency)
return normalizedValue, nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBWindowConcurrency, Value: strconv.Itoa(DefTiDBWindowConcurrency), Type: TypeInt, MinValue: 1, MaxValue: math.MaxInt32, AllowAutoValue: true, SetSession: func(s *SessionVars, val string) error {
{Scope: ScopeGlobal | ScopeSession, Name: TiDBWindowConcurrency, Value: strconv.Itoa(DefTiDBWindowConcurrency), Type: TypeInt, MinValue: 1, MaxValue: MaxConfigurableConcurrency, AllowAutoValue: true, SetSession: func(s *SessionVars, val string) error {
s.windowConcurrency = tidbOptPositiveInt32(val, ConcurrencyUnset)
return nil
}, Validation: func(vars *SessionVars, normalizedValue string, originalValue string, scope ScopeFlag) (string, error) {
appendDeprecationWarning(vars, TiDBWindowConcurrency, TiDBExecutorConcurrency)
return normalizedValue, nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBMergeJoinConcurrency, Value: strconv.Itoa(DefTiDBMergeJoinConcurrency), Type: TypeInt, MinValue: 1, MaxValue: math.MaxInt32, AllowAutoValue: true, SetSession: func(s *SessionVars, val string) error {
{Scope: ScopeGlobal | ScopeSession, Name: TiDBMergeJoinConcurrency, Value: strconv.Itoa(DefTiDBMergeJoinConcurrency), Type: TypeInt, MinValue: 1, MaxValue: MaxConfigurableConcurrency, AllowAutoValue: true, SetSession: func(s *SessionVars, val string) error {
s.mergeJoinConcurrency = tidbOptPositiveInt32(val, ConcurrencyUnset)
return nil
}, Validation: func(vars *SessionVars, normalizedValue string, originalValue string, scope ScopeFlag) (string, error) {
appendDeprecationWarning(vars, TiDBMergeJoinConcurrency, TiDBExecutorConcurrency)
return normalizedValue, nil
}},

{Scope: ScopeGlobal | ScopeSession, Name: TiDBStreamAggConcurrency, Value: strconv.Itoa(DefTiDBStreamAggConcurrency), Type: TypeInt, MinValue: 1, MaxValue: math.MaxInt32, AllowAutoValue: true, SetSession: func(s *SessionVars, val string) error {
{Scope: ScopeGlobal | ScopeSession, Name: TiDBStreamAggConcurrency, Value: strconv.Itoa(DefTiDBStreamAggConcurrency), Type: TypeInt, MinValue: 1, MaxValue: MaxConfigurableConcurrency, AllowAutoValue: true, SetSession: func(s *SessionVars, val string) error {
s.streamAggConcurrency = tidbOptPositiveInt32(val, ConcurrencyUnset)
return nil
}, Validation: func(vars *SessionVars, normalizedValue string, originalValue string, scope ScopeFlag) (string, error) {
Expand Down Expand Up @@ -1307,7 +1307,7 @@ var defaultSysVars = []*SysVar{
}
return config.HideConfig(string(j)), nil
}},
{Scope: ScopeGlobal, Name: TiDBDDLReorgWorkerCount, Value: strconv.Itoa(DefTiDBDDLReorgWorkerCount), Type: TypeUnsigned, MinValue: 1, MaxValue: uint64(maxDDLReorgWorkerCount), SetSession: func(s *SessionVars, val string) error {
{Scope: ScopeGlobal, Name: TiDBDDLReorgWorkerCount, Value: strconv.Itoa(DefTiDBDDLReorgWorkerCount), Type: TypeUnsigned, MinValue: 1, MaxValue: MaxConfigurableConcurrency, SetSession: func(s *SessionVars, val string) error {
SetDDLReorgWorkerCounter(int32(tidbOptPositiveInt32(val, DefTiDBDDLReorgWorkerCount)))
return nil
}},
Expand Down Expand Up @@ -1707,7 +1707,7 @@ var defaultSysVars = []*SysVar{
}, SetGlobal: func(s *SessionVars, val string) error {
return setTiDBTableValue(s, "tikv_gc_life_time", val, "All versions within life time will not be collected by GC, at least 10m, in Go format.")
}},
{Scope: ScopeGlobal, Name: TiDBGCConcurrency, Value: "-1", Type: TypeInt, MinValue: 1, MaxValue: 128, AllowAutoValue: true, GetGlobal: func(s *SessionVars) (string, error) {
{Scope: ScopeGlobal, Name: TiDBGCConcurrency, Value: "-1", Type: TypeInt, MinValue: 1, MaxValue: MaxConfigurableConcurrency, AllowAutoValue: true, GetGlobal: func(s *SessionVars) (string, error) {
autoConcurrencyVal, err := getTiDBTableValue(s, "tikv_gc_auto_concurrency", On)
if err == nil && autoConcurrencyVal == On {
return "-1", nil // convention for "AUTO"
Expand All @@ -1722,7 +1722,7 @@ var defaultSysVars = []*SysVar{
if err := setTiDBTableValue(s, "tikv_gc_auto_concurrency", autoConcurrency, "Let TiDB pick the concurrency automatically. If set false, tikv_gc_concurrency will be used"); err != nil {
return err
}
return setTiDBTableValue(s, "tikv_gc_concurrency", val, "How many goroutines used to do GC parallel, [1, 128], default 2")
return setTiDBTableValue(s, "tikv_gc_concurrency", val, "How many goroutines used to do GC parallel, [1, 256], default 2")
}},
{Scope: ScopeGlobal, Name: TiDBGCScanLockMode, Value: "LEGACY", Type: TypeEnum, PossibleValues: []string{"PHYSICAL", "LEGACY"}, GetGlobal: func(s *SessionVars) (string, error) {
return getTiDBTableValue(s, "tikv_gc_scan_lock_mode", "LEGACY")
Expand Down
28 changes: 27 additions & 1 deletion sessionctx/variable/sysvar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func TestDeprecation(t *testing.T) {

vars := NewSessionVars()

_, err := sysVar.Validate(vars, "1234", ScopeSession)
_, err := sysVar.Validate(vars, "123", ScopeSession)
require.NoError(t, err)

// There was no error but there is a deprecation warning.
Expand Down Expand Up @@ -758,3 +758,29 @@ func TestIdentity(t *testing.T) {
require.NoError(t, err)
require.Equal(t, val, "21")
}

func TestDDLWorkers(t *testing.T) {
svWorkerCount, svBatchSize := GetSysVar(TiDBDDLReorgWorkerCount), GetSysVar(TiDBDDLReorgBatchSize)
vars := NewSessionVars()
vars.GlobalVarsAccessor = NewMockGlobalAccessor4Tests()

val, err := svWorkerCount.Validate(vars, "-100", ScopeGlobal)
require.NoError(t, err)
require.Equal(t, val, "1") // converts it to min value
val, err = svWorkerCount.Validate(vars, "1234", ScopeGlobal)
require.NoError(t, err)
require.Equal(t, val, "256") // converts it to max value
val, err = svWorkerCount.Validate(vars, "100", ScopeGlobal)
require.NoError(t, err)
require.Equal(t, val, "100") // unchanged

val, err = svBatchSize.Validate(vars, "10", ScopeGlobal)
require.NoError(t, err)
require.Equal(t, val, fmt.Sprint(MinDDLReorgBatchSize)) // converts it to min value
val, err = svBatchSize.Validate(vars, "999999", ScopeGlobal)
require.NoError(t, err)
require.Equal(t, val, fmt.Sprint(MaxDDLReorgBatchSize)) // converts it to max value
val, err = svBatchSize.Validate(vars, "100", ScopeGlobal)
require.NoError(t, err)
require.Equal(t, val, "100") // unchanged
}
24 changes: 16 additions & 8 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,15 @@ const (
TiDBEnableEnhancedSecurity = "tidb_enable_enhanced_security"
)

// TiDB intentional limits
// Can be raised in future.

const (
// MaxConfigurableConcurrency is the maximum number of "threads" (goroutines) that can be specified
// for any type of configuration item that has concurrent workers.
MaxConfigurableConcurrency = 256
)

// Default TiDB system variable values.
const (
DefHostname = "localhost"
Expand Down Expand Up @@ -754,14 +763,13 @@ const (

// Process global variables.
var (
ProcessGeneralLog = atomic.NewBool(false)
EnablePProfSQLCPU = atomic.NewBool(false)
ddlReorgWorkerCounter int32 = DefTiDBDDLReorgWorkerCount
maxDDLReorgWorkerCount int32 = 128
ddlReorgBatchSize int32 = DefTiDBDDLReorgBatchSize
ddlErrorCountlimit int64 = DefTiDBDDLErrorCountLimit
ddlReorgRowFormat int64 = DefTiDBRowFormatV2
maxDeltaSchemaCount int64 = DefTiDBMaxDeltaSchemaCount
ProcessGeneralLog = atomic.NewBool(false)
EnablePProfSQLCPU = atomic.NewBool(false)
ddlReorgWorkerCounter int32 = DefTiDBDDLReorgWorkerCount
ddlReorgBatchSize int32 = DefTiDBDDLReorgBatchSize
ddlErrorCountlimit int64 = DefTiDBDDLErrorCountLimit
ddlReorgRowFormat int64 = DefTiDBRowFormatV2
maxDeltaSchemaCount int64 = DefTiDBMaxDeltaSchemaCount
// Export for testing.
MaxDDLReorgBatchSize int32 = 10240
MinDDLReorgBatchSize int32 = 32
Expand Down
13 changes: 2 additions & 11 deletions sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,8 @@ import (
const secondsPerYear = 60 * 60 * 24 * 365

// SetDDLReorgWorkerCounter sets ddlReorgWorkerCounter count.
// Max worker count is maxDDLReorgWorkerCount.
// Sysvar validation enforces the range to already be correct.
func SetDDLReorgWorkerCounter(cnt int32) {
if cnt > maxDDLReorgWorkerCount {
cnt = maxDDLReorgWorkerCount
}
atomic.StoreInt32(&ddlReorgWorkerCounter, cnt)
}

Expand All @@ -49,14 +46,8 @@ func GetDDLReorgWorkerCounter() int32 {
}

// SetDDLReorgBatchSize sets ddlReorgBatchSize size.
// Max batch size is MaxDDLReorgBatchSize.
// Sysvar validation enforces the range to already be correct.
func SetDDLReorgBatchSize(cnt int32) {
if cnt > MaxDDLReorgBatchSize {
cnt = MaxDDLReorgBatchSize
}
if cnt < MinDDLReorgBatchSize {
cnt = MinDDLReorgBatchSize
}
atomic.StoreInt32(&ddlReorgBatchSize, cnt)
}

Expand Down
18 changes: 0 additions & 18 deletions sessionctx/variable/varsutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,24 +496,6 @@ func TestVarsutil(t *testing.T) {
require.Regexp(t, ".*Truncated incorrect tidb_analyze_version value", warn.Err.Error())
}

func TestSetOverflowBehave(t *testing.T) {
ddRegWorker := maxDDLReorgWorkerCount + 1
SetDDLReorgWorkerCounter(ddRegWorker)
require.Equal(t, GetDDLReorgWorkerCounter(), maxDDLReorgWorkerCount)

ddlReorgBatchSize := MaxDDLReorgBatchSize + 1
SetDDLReorgBatchSize(ddlReorgBatchSize)
require.Equal(t, GetDDLReorgBatchSize(), MaxDDLReorgBatchSize)
ddlReorgBatchSize = MinDDLReorgBatchSize - 1
SetDDLReorgBatchSize(ddlReorgBatchSize)
require.Equal(t, GetDDLReorgBatchSize(), MinDDLReorgBatchSize)

val := tidbOptInt64("a", 1)
require.Equal(t, int64(1), val)
val2 := tidbOptFloat64("b", 1.2)
require.Equal(t, 1.2, val2)
}

func TestValidate(t *testing.T) {
v := NewSessionVars()
v.GlobalVarsAccessor = NewMockGlobalAccessor4Tests()
Expand Down

0 comments on commit 42b0bec

Please sign in to comment.