Skip to content

Commit

Permalink
ddl: limit the number of ddl job retries
Browse files Browse the repository at this point in the history
  • Loading branch information
ciscoxll committed Aug 30, 2018
1 parent 67d7544 commit 71acda4
Show file tree
Hide file tree
Showing 11 changed files with 105 additions and 4 deletions.
12 changes: 10 additions & 2 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/util"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -522,13 +523,20 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
if err != nil {
// If job is not cancelled, we should log this error.
if job.State != model.JobStateCancelled {
log.Errorf("[ddl-%s] run DDL job err %v", w, errors.ErrorStack(err))
log.Errorf("[ddl-%s] run DDL job err %v, job query %s ", w, errors.ErrorStack(err), job.Query)
} else {
log.Infof("[ddl-%s] the DDL job is normal to cancel because %v", w, errors.ErrorStack(err))
log.Infof("[ddl-%s] the DDL job is normal to cancel because %v, job query %s", w, errors.ErrorStack(err), job.Query)
metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerCancelDDLJob, metrics.RetLabel(err)).Observe(time.Since(model.TSConvert2Time(job.StartTS)).Seconds())
}

job.Error = toTError(err)
job.ErrorCount++
if job.ErrorCount > int64(variable.GetDDLErrorRetryLimit()) && job.Type != model.ActionAddIndex {
log.Infof("[ddl-%s] DDL job over maximum retry count is canceled because %v, job query %s", w, errors.ErrorStack(err), job.Query)
job.State = model.JobStateCancelled
metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerCancelDDLJob, metrics.RetLabel(err)).Observe(time.Since(model.TSConvert2Time(job.StartTS)).Seconds())
return
}
}
return
}
Expand Down
50 changes: 50 additions & 0 deletions ddl/fail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ import (
gofail "github.com/etcd-io/gofail/runtime"
. "github.com/pingcap/check"
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -63,3 +66,50 @@ func (s *testColumnChangeSuite) TestFailBeforeDecodeArgs(c *C) {
c.Assert(stateCnt, Equals, 1)
testCheckJobDone(c, d, job, true)
}

func (s *testColumnChangeSuite) TestErrorCountlimit(c *C) {
d := testNewDDL(context.Background(), nil, s.store, nil, nil, testLease)
defer d.Stop()

tblInfo := testTableInfo(c, d, "t_error_count_limit", 2)
ctx := testNewContext(d)
err := ctx.NewTxn()
c.Assert(err, IsNil)
sessionVars := ctx.GetSessionVars()
variable.SetSessionSystemVar(sessionVars, variable.TiDBDDLErrorRetryLimit, types.NewIntDatum(5))
c.Assert(variable.GetDDLErrorRetryLimit(), Equals, int32(5))

testCreateTable(c, ctx, d, s.dbInfo, tblInfo)
originTable := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID)
row := types.MakeDatums(1, 2)
_, err = originTable.AddRecord(ctx, row, false)
c.Assert(err, IsNil)
err = ctx.Txn().Commit(context.Background())
c.Assert(err, IsNil)
tc := &TestDDLCallback{}

tc.onJobRunBefore = func(job *model.Job) {
if job.SchemaState == model.StateWriteReorganization {
// Limit the number of retries.
if job.ErrorCount < int64(variable.GetDDLErrorRetryLimit())+1 {
gofail.Enable("github.com/pingcap/tidb/ddl/errorBeforeDecodeArgs", `return(true)`)
} else {
gofail.Disable("github.com/pingcap/tidb/ddl/errorBeforeDecodeArgs")
}
}
}
d.SetHook(tc)

job := buildCreateColumnJob(s.dbInfo, tblInfo, "c3", &ast.ColumnPosition{Tp: ast.ColumnPositionNone}, int64(3))
err = d.doDDLJob(ctx, job)
c.Assert(errCancelledDDLJob.Equal(err), Equals, true)

kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
historyJob, err := t.GetHistoryDDLJob(job.ID)
c.Assert(err, IsNil)
// Verify that job is canceled.
c.Assert(historyJob.State, Equals, model.JobStateCancelled)
return nil
})
}
1 change: 1 addition & 0 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func onCreateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error)
tbInfo.UpdateTS = t.StartTS
err = t.CreateTable(schemaID, tbInfo)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
if EnableSplitTableRegion {
Expand Down
6 changes: 6 additions & 0 deletions executor/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,4 +432,10 @@ func (s *testSuite) TestSetDDLReorgWorkerCnt(c *C) {
tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 100")
res = tk.MustQuery("select @@global.tidb_ddl_reorg_worker_cnt")
res.Check(testkit.Rows("100"))

res = tk.MustQuery("select @@global.tidb_ddl_error_retry_limit")
res.Check(testkit.Rows(fmt.Sprintf("%v", variable.DefTiDBDDLErrorRetryLimit)))
tk.MustExec("set @@global.tidb_ddl_error_retry_limit = 10000")
res = tk.MustQuery("select @@global.tidb_ddl_error_retry_limit")
res.Check(testkit.Rows("10000"))
}
1 change: 1 addition & 0 deletions metrics/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ var (
WorkerAddDDLJob = "add_job"
WorkerRunDDLJob = "run_job"
WorkerFinishDDLJob = "finish_job"
WorkerCancelDDLJob = "cancel_job"
WorkerWaitSchemaChanged = "wait_schema_changed"
DDLWorkerHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Expand Down
1 change: 1 addition & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1267,6 +1267,7 @@ const loadCommonGlobalVarsSQL = "select HIGH_PRIORITY * from mysql.global_variab
variable.TiDBDistSQLScanConcurrency + quoteCommaQuote +
variable.TiDBMaxChunkSize + quoteCommaQuote +
variable.TiDBRetryLimit + quoteCommaQuote +
variable.TiDBDDLErrorRetryLimit + quoteCommaQuote +
variable.TiDBDisableTxnAutoRetry + "')"

// loadCommonGlobalVariablesIfNeeded loads and applies commonly used global variables for the session.
Expand Down
2 changes: 2 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
SetDDLReorgWorkerCounter(int32(tidbOptPositiveInt32(val, DefTiDBDDLReorgWorkerCount)))
case TiDBDDLReorgPriority:
s.setDDLReorgPriority(val)
case TiDBDDLErrorRetryLimit:
SetDDLErrorRetryLimit(int32(tidbOptPositiveInt32(val, DefTiDBDDLErrorRetryLimit)))
}
s.systems[name] = val
return nil
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,7 @@ var defaultSysVars = []*SysVar{
{ScopeSession, TiDBConfig, ""},
{ScopeGlobal | ScopeSession, TiDBDDLReorgWorkerCount, strconv.Itoa(DefTiDBDDLReorgWorkerCount)},
{ScopeSession, TiDBDDLReorgPriority, "PRIORITY_LOW"},
{ScopeGlobal | ScopeSession, TiDBDDLErrorRetryLimit, strconv.Itoa(DefTiDBDDLErrorRetryLimit)},
}

// SynonymsSysVariables is synonyms of system variables.
Expand Down
8 changes: 7 additions & 1 deletion sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ const (
// tidb_ddl_reorg_priority defines the operations priority of adding indices.
// It can be: PRIORITY_LOW, PRIORITY_NORMAL, PRIORITY_HIGH
TiDBDDLReorgPriority = "tidb_ddl_reorg_priority"

// tidb_ddl_error_retry_limit is the maximum number of retries when an error occurs in ddl job.
TiDBDDLErrorRetryLimit = "tidb_ddl_error_retry_limit"
)

// Default TiDB system variable values.
Expand Down Expand Up @@ -214,6 +217,7 @@ const (
DefTiDBDDLReorgWorkerCount = 16
DefTiDBHashAggPartialConcurrency = 4
DefTiDBHashAggFinalConcurrency = 4
DefTiDBDDLErrorRetryLimit = 10000
)

// Process global variables.
Expand All @@ -222,5 +226,7 @@ var (
ddlReorgWorkerCounter int32 = DefTiDBDDLReorgWorkerCount
maxDDLReorgWorkerCount int32 = 128
// DDLSlowOprThreshold is the threshold for ddl slow operations, uint is millisecond.
DDLSlowOprThreshold uint32 = 300
DDLSlowOprThreshold uint32 = 300
ddlErrorRetryLimit int32 = DefTiDBDDLErrorRetryLimit
minDDLErrorRetryLimit int32 = 1
)
15 changes: 14 additions & 1 deletion sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,19 @@ func GetDDLReorgWorkerCounter() int32 {
return atomic.LoadInt32(&ddlReorgWorkerCounter)
}

// SetDDLErrorRetryLimit sets ddlErrorRetryLimit count.
func SetDDLErrorRetryLimit(cnt int32) {
if cnt < minDDLErrorRetryLimit {
cnt = minDDLErrorRetryLimit
}
atomic.StoreInt32(&ddlErrorRetryLimit, cnt)
}

// GetDDLErrorRetryLimit gets ddlErrorRetryLimit.
func GetDDLErrorRetryLimit() int32 {
return atomic.LoadInt32(&ddlErrorRetryLimit)
}

// GetSessionSystemVar gets a system variable.
// If it is a session only variable, use the default value defined in code.
// Returns error if there is no such variable.
Expand Down Expand Up @@ -291,7 +304,7 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
TiDBHashAggPartialConcurrency,
TiDBHashAggFinalConcurrency,
TiDBDistSQLScanConcurrency,
TiDBIndexSerialScanConcurrency, TiDBDDLReorgWorkerCount,
TiDBIndexSerialScanConcurrency, TiDBDDLReorgWorkerCount, TiDBDDLErrorRetryLimit,
TiDBBackoffLockFast, TiDBMaxChunkSize,
TiDBDMLBatchSize, TiDBOptimizerSelectivityLevel:
v, err := strconv.Atoi(value)
Expand Down
12 changes: 12 additions & 0 deletions sessionctx/variable/varsutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,4 +223,16 @@ func (s *testVarsutilSuite) TestVarsutil(c *C) {
c.Assert(err, IsNil)
c.Assert(val, Equals, "1")
c.Assert(v.EnableTablePartition, IsTrue)

c.Assert(GetDDLErrorRetryLimit(), Equals, int32(DefTiDBDDLErrorRetryLimit))
SetSessionSystemVar(v, TiDBDDLErrorRetryLimit, types.NewIntDatum(DefTiDBDDLErrorRetryLimit-1))
c.Assert(GetDDLErrorRetryLimit(), Equals, int32(DefTiDBDDLErrorRetryLimit-1))

SetSessionSystemVar(v, TiDBDDLErrorRetryLimit, types.NewIntDatum(1))
c.Assert(GetDDLErrorRetryLimit(), Equals, int32(minDDLErrorRetryLimit))

SetSessionSystemVar(v, TiDBDDLErrorRetryLimit, types.NewIntDatum(DefTiDBDDLErrorRetryLimit))
c.Assert(GetDDLErrorRetryLimit(), Equals, int32(10000))
SetSessionSystemVar(v, TiDBDDLErrorRetryLimit, types.NewIntDatum(10))
c.Assert(GetDDLErrorRetryLimit(), Equals, int32(10))
}

0 comments on commit 71acda4

Please sign in to comment.