Skip to content

Commit

Permalink
*: add session var 'tidb_ddl_reorg_worker_cnt' to control ddl reorg w…
Browse files Browse the repository at this point in the history
…orkers count (#6441)

* *: add session var 'tidb_ddl_reorg_worker_cnt' to control ddl reorg workers count
  • Loading branch information
winkyao authored and ciscoxll committed Jun 27, 2018
1 parent 9952fe0 commit 6851878
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 5 deletions.
8 changes: 4 additions & 4 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
Expand Down Expand Up @@ -443,7 +444,6 @@ func onDropIndex(t *meta.Meta, job *model.Job) (ver int64, _ error) {

const (
defaultTaskHandleCnt = 128
defaultIndexWorkers = 16
)

// indexRecord is the record information of an index.
Expand Down Expand Up @@ -873,10 +873,10 @@ func (w *worker) addTableIndex(t table.Table, indexInfo *model.IndexInfo, reorgI
log.Infof("[ddl-reorg] addTableIndex, job:%s, reorgInfo:%#v", job, reorgInfo)
colFieldMap := makeupIndexColFieldMap(t, indexInfo)

// TODO: make workerCnt can be modified by system variable.
workerCnt := defaultIndexWorkers
// variable.ddlReorgWorkerCounter can be modified by system variable "tidb_ddl_reorg_worker_cnt".
workerCnt := variable.GetDDLReorgWorkerCounter()
idxWorkers := make([]*addIndexWorker, workerCnt)
for i := 0; i < workerCnt; i++ {
for i := 0; i < int(workerCnt); i++ {
sessCtx := newContext(reorgInfo.d.store)
idxWorkers[i] = newAddIndexWorker(sessCtx, w, i, t, indexInfo, colFieldMap)
go idxWorkers[i].run(reorgInfo.d)
Expand Down
29 changes: 29 additions & 0 deletions executor/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/plan"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
Expand Down Expand Up @@ -399,3 +400,31 @@ func (s *testSuite) TestMaxHandleAddIndex(c *C) {
tk.MustExec("alter table t1 add index idx_b(b)")
tk.MustExec("admin check table t1")
}

func (s *testSuite) TestSetDDLReorgWorkerCnt(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
c.Assert(variable.GetDDLReorgWorkerCounter(), Equals, int32(variable.DefTiDBDDLReorgWorkerCount))
tk.MustExec("set tidb_ddl_reorg_worker_cnt = 1")
c.Assert(variable.GetDDLReorgWorkerCounter(), Equals, int32(1))
tk.MustExec("set tidb_ddl_reorg_worker_cnt = 100")
c.Assert(variable.GetDDLReorgWorkerCounter(), Equals, int32(100))
tk.MustExec("set tidb_ddl_reorg_worker_cnt = invalid_val")
c.Assert(variable.GetDDLReorgWorkerCounter(), Equals, int32(variable.DefTiDBDDLReorgWorkerCount))
tk.MustExec("set tidb_ddl_reorg_worker_cnt = 100")
c.Assert(variable.GetDDLReorgWorkerCounter(), Equals, int32(100))
tk.MustExec("set tidb_ddl_reorg_worker_cnt = -1")
c.Assert(variable.GetDDLReorgWorkerCounter(), Equals, int32(variable.DefTiDBDDLReorgWorkerCount))

res := tk.MustQuery("select @@tidb_ddl_reorg_worker_cnt")
res.Check(testkit.Rows("-1"))
tk.MustExec("set tidb_ddl_reorg_worker_cnt = 100")
res = tk.MustQuery("select @@tidb_ddl_reorg_worker_cnt")
res.Check(testkit.Rows("100"))

res = tk.MustQuery("select @@global.tidb_ddl_reorg_worker_cnt")
res.Check(testkit.Rows(fmt.Sprintf("%v", variable.DefTiDBDDLReorgWorkerCount)))
tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 100")
res = tk.MustQuery("select @@global.tidb_ddl_reorg_worker_cnt")
res.Check(testkit.Rows("100"))
}
1 change: 1 addition & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1313,6 +1313,7 @@ const loadCommonGlobalVarsSQL = "select HIGH_PRIORITY * from mysql.global_variab
variable.TiDBHashAggPartialConcurrency + quoteCommaQuote +
variable.TiDBHashAggFinalConcurrency + quoteCommaQuote +
variable.TiDBBackoffLockFast + quoteCommaQuote +
variable.TiDBDDLReorgWorkerCount + quoteCommaQuote +
variable.TiDBOptInSubqUnFolding + quoteCommaQuote +
variable.TiDBDistSQLScanConcurrency + quoteCommaQuote +
variable.TiDBMaxChunkSize + quoteCommaQuote +
Expand Down
3 changes: 3 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,9 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
s.EnableStreaming = TiDBOptOn(val)
case TiDBOptimizerSelectivityLevel:
s.OptimizerSelectivityLevel = tidbOptPositiveInt32(val, DefTiDBOptimizerSelectivityLevel)
case TiDBDDLReorgWorkerCount:
workerCnt := tidbOptPositiveInt32(val, DefTiDBDDLReorgWorkerCount)
SetDDLReorgWorkerCounter(int32(workerCnt))
}
s.systems[name] = val
return nil
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,7 @@ var defaultSysVars = []*SysVar{
/* The following variable is defined as session scope but is actually server scope. */
{ScopeSession, TiDBGeneralLog, strconv.Itoa(DefTiDBGeneralLog)},
{ScopeSession, TiDBConfig, ""},
{ScopeGlobal | ScopeSession, TiDBDDLReorgWorkerCount, strconv.Itoa(DefTiDBDDLReorgWorkerCount)},
}

// SynonymsSysVariables is synonyms of system variables.
Expand Down
8 changes: 7 additions & 1 deletion sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ const (

// tidb_backoff_lock_fast is used for tikv backoff base time in milliseconds.
TiDBBackoffLockFast = "tidb_backoff_lock_fast"

// tidb_ddl_reorg_worker_cnt defines the count of ddl reorg workers.
TiDBDDLReorgWorkerCount = "tidb_ddl_reorg_worker_cnt"
)

// Default TiDB system variable values.
Expand Down Expand Up @@ -205,11 +208,14 @@ const (
DefTiDBHashJoinConcurrency = 5
DefTiDBProjectionConcurrency = 4
DefTiDBOptimizerSelectivityLevel = 0
DefTiDBDDLReorgWorkerCount = 16
DefTiDBHashAggPartialConcurrency = 4
DefTiDBHashAggFinalConcurrency = 4
)

// Process global variables.
var (
ProcessGeneralLog uint32
ProcessGeneralLog uint32
ddlReorgWorkerCounter int32 = DefTiDBDDLReorgWorkerCount
maxDDLReorgWorkerCount int32 = 128
)
14 changes: 14 additions & 0 deletions sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,20 @@ import (
"github.com/pingcap/tidb/types"
)

// SetDDLReorgWorkerCounter sets ddlReorgWorkerCounter count.
// Max worker count is maxDDLReorgWorkerCount.
func SetDDLReorgWorkerCounter(cnt int32) {
if cnt > maxDDLReorgWorkerCount {
cnt = maxDDLReorgWorkerCount
}
atomic.StoreInt32(&ddlReorgWorkerCounter, cnt)
}

// GetDDLReorgWorkerCounter gets ddlReorgWorkerCounter.
func GetDDLReorgWorkerCounter() int32 {
return atomic.LoadInt32(&ddlReorgWorkerCounter)
}

// GetSessionSystemVar gets a system variable.
// If it is a session only variable, use the default value defined in code.
// Returns error if there is no such variable.
Expand Down
10 changes: 10 additions & 0 deletions sessionctx/variable/varsutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,16 @@ func (s *testVarsutilSuite) TestVarsutil(c *C) {
SetSessionSystemVar(v, TiDBOptimizerSelectivityLevel, types.NewIntDatum(1))
c.Assert(v.OptimizerSelectivityLevel, Equals, 1)

c.Assert(GetDDLReorgWorkerCounter(), Equals, int32(DefTiDBDDLReorgWorkerCount))
SetSessionSystemVar(v, TiDBDDLReorgWorkerCount, types.NewIntDatum(1))
c.Assert(GetDDLReorgWorkerCounter(), Equals, int32(1))

SetSessionSystemVar(v, TiDBDDLReorgWorkerCount, types.NewIntDatum(-1))
c.Assert(GetDDLReorgWorkerCounter(), Equals, int32(DefTiDBDDLReorgWorkerCount))

SetSessionSystemVar(v, TiDBDDLReorgWorkerCount, types.NewIntDatum(int64(maxDDLReorgWorkerCount)+1))
c.Assert(GetDDLReorgWorkerCounter(), Equals, int32(maxDDLReorgWorkerCount))

err = SetSessionSystemVar(v, TiDBRetryLimit, types.NewStringDatum("3"))
c.Assert(err, IsNil)
val, err = GetSessionSystemVar(v, TiDBRetryLimit)
Expand Down

0 comments on commit 6851878

Please sign in to comment.