Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: limit the number of DDL job retries #7474

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/util"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -522,13 +523,20 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
if err != nil {
// If job is not cancelled, we should log this error.
if job.State != model.JobStateCancelled {
log.Errorf("[ddl-%s] run DDL job err %v", w, errors.ErrorStack(err))
log.Errorf("[ddl-%s] run DDL job err %v, job query %s ", w, errors.ErrorStack(err), job.Query)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add metrics(WorkerCancelDDLJob) here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zimulala Done.

} else {
log.Infof("[ddl-%s] the DDL job is normal to cancel because %v", w, errors.ErrorStack(err))
log.Infof("[ddl-%s] the DDL job is normal to cancel because %v, job query %s", w, errors.ErrorStack(err), job.Query)
metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerCancelDDLJob, job.Type.String(), metrics.RetLabel(err)).Observe(time.Since(model.TSConvert2Time(job.StartTS)).Seconds())
}

job.Error = toTError(err)
job.ErrorCount++
if job.ErrorCount > int64(variable.GetDDLErrorRetryLimit()) && job.Type != model.ActionAddIndex {
log.Infof("[ddl-%s] DDL job over maximum retry count is canceled because %v, job query %s", w, errors.ErrorStack(err), job.Query)
job.State = model.JobStateCancelling
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a problem with the following scenario?
If the operation of “Add column” updates to “Delete only”, then we cancel this job.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zimulala I write a test and canceled this job, but no error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ciscoxll
If the column is “drop column” updates to “delete only”, but this job occurs many errors then this job cancel successfully. Then we return an error to the client, but this column's state is "delete only".
I think this will be a problem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zimulala I will write a rollback in another PR.

Copy link
Contributor

@zimulala zimulala Feb 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handle it on #9295

metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerCancelDDLJob, job.Type.String(), metrics.RetLabel(err)).Observe(time.Since(model.TSConvert2Time(job.StartTS)).Seconds())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Repeat with line#529?

return
}
}
return
}
Expand Down
50 changes: 50 additions & 0 deletions ddl/fail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ import (
gofail "github.com/etcd-io/gofail/runtime"
. "github.com/pingcap/check"
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -63,3 +66,50 @@ func (s *testColumnChangeSuite) TestFailBeforeDecodeArgs(c *C) {
c.Assert(stateCnt, Equals, 1)
testCheckJobDone(c, d, job, true)
}

func (s *testColumnChangeSuite) TestErrorCountlimit(c *C) {
d := testNewDDL(context.Background(), nil, s.store, nil, nil, testLease)
defer d.Stop()

tblInfo := testTableInfo(c, d, "t_error_count_limit", 2)
ctx := testNewContext(d)
err := ctx.NewTxn()
c.Assert(err, IsNil)
sessionVars := ctx.GetSessionVars()
variable.SetSessionSystemVar(sessionVars, variable.TiDBDDLErrorRetryLimit, types.NewIntDatum(5))
c.Assert(variable.GetDDLErrorRetryLimit(), Equals, int32(5))

testCreateTable(c, ctx, d, s.dbInfo, tblInfo)
originTable := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID)
row := types.MakeDatums(1, 2)
_, err = originTable.AddRecord(ctx, row, false)
c.Assert(err, IsNil)
err = ctx.Txn().Commit(context.Background())
c.Assert(err, IsNil)
tc := &TestDDLCallback{}

tc.onJobRunBefore = func(job *model.Job) {
if job.SchemaState == model.StateWriteReorganization {
// Limit the number of retries.
if job.ErrorCount < int64(variable.GetDDLErrorRetryLimit())+1 {
gofail.Enable("github.com/pingcap/tidb/ddl/errorBeforeDecodeArgs", `return(true)`)
} else {
gofail.Disable("github.com/pingcap/tidb/ddl/errorBeforeDecodeArgs")
}
}
}
d.SetHook(tc)

job := buildCreateColumnJob(s.dbInfo, tblInfo, "c3", &ast.ColumnPosition{Tp: ast.ColumnPositionNone}, int64(3))
err = d.doDDLJob(ctx, job)
c.Assert(errCancelledDDLJob.Equal(err), Equals, true)

kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
historyJob, err := t.GetHistoryDDLJob(job.ID)
c.Assert(err, IsNil)
// Verify that job is canceled.
c.Assert(historyJob.State, Equals, model.JobStateCancelled)
return nil
})
}
1 change: 1 addition & 0 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func onCreateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error)
tbInfo.UpdateTS = t.StartTS
err = t.CreateTable(schemaID, tbInfo)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
if EnableSplitTableRegion {
Expand Down
6 changes: 6 additions & 0 deletions executor/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,4 +432,10 @@ func (s *testSuite) TestSetDDLReorgWorkerCnt(c *C) {
tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 100")
res = tk.MustQuery("select @@global.tidb_ddl_reorg_worker_cnt")
res.Check(testkit.Rows("100"))

res = tk.MustQuery("select @@global.tidb_ddl_error_retry_limit")
res.Check(testkit.Rows(fmt.Sprintf("%v", variable.DefTiDBDDLErrorRetryLimit)))
tk.MustExec("set @@global.tidb_ddl_error_retry_limit = 10000")
res = tk.MustQuery("select @@global.tidb_ddl_error_retry_limit")
res.Check(testkit.Rows("10000"))
}
1 change: 1 addition & 0 deletions metrics/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ var (
WorkerAddDDLJob = "add_job"
WorkerRunDDLJob = "run_job"
WorkerFinishDDLJob = "finish_job"
WorkerCancelDDLJob = "cancel_job"
WorkerWaitSchemaChanged = "wait_schema_changed"
DDLWorkerHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Expand Down
1 change: 1 addition & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1267,6 +1267,7 @@ const loadCommonGlobalVarsSQL = "select HIGH_PRIORITY * from mysql.global_variab
variable.TiDBDistSQLScanConcurrency + quoteCommaQuote +
variable.TiDBMaxChunkSize + quoteCommaQuote +
variable.TiDBRetryLimit + quoteCommaQuote +
variable.TiDBDDLErrorRetryLimit + quoteCommaQuote +
variable.TiDBDisableTxnAutoRetry + "')"

// loadCommonGlobalVariablesIfNeeded loads and applies commonly used global variables for the session.
Expand Down
2 changes: 2 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
SetDDLReorgWorkerCounter(int32(tidbOptPositiveInt32(val, DefTiDBDDLReorgWorkerCount)))
case TiDBDDLReorgPriority:
s.setDDLReorgPriority(val)
case TiDBDDLErrorRetryLimit:
SetDDLErrorRetryLimit(int32(tidbOptPositiveInt32(val, DefTiDBDDLErrorRetryLimit)))
}
s.systems[name] = val
return nil
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,7 @@ var defaultSysVars = []*SysVar{
{ScopeSession, TiDBConfig, ""},
{ScopeGlobal | ScopeSession, TiDBDDLReorgWorkerCount, strconv.Itoa(DefTiDBDDLReorgWorkerCount)},
{ScopeSession, TiDBDDLReorgPriority, "PRIORITY_LOW"},
{ScopeGlobal | ScopeSession, TiDBDDLErrorRetryLimit, strconv.Itoa(DefTiDBDDLErrorRetryLimit)},
}

// SynonymsSysVariables is synonyms of system variables.
Expand Down
8 changes: 7 additions & 1 deletion sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ const (
// tidb_ddl_reorg_priority defines the operations priority of adding indices.
// It can be: PRIORITY_LOW, PRIORITY_NORMAL, PRIORITY_HIGH
TiDBDDLReorgPriority = "tidb_ddl_reorg_priority"

// tidb_ddl_error_retry_limit is the maximum number of retries when an error occurs in ddl job.
TiDBDDLErrorRetryLimit = "tidb_ddl_error_retry_limit"
)

// Default TiDB system variable values.
Expand Down Expand Up @@ -214,6 +217,7 @@ const (
DefTiDBDDLReorgWorkerCount = 16
DefTiDBHashAggPartialConcurrency = 4
DefTiDBHashAggFinalConcurrency = 4
DefTiDBDDLErrorRetryLimit = 10000
)

// Process global variables.
Expand All @@ -222,5 +226,7 @@ var (
ddlReorgWorkerCounter int32 = DefTiDBDDLReorgWorkerCount
maxDDLReorgWorkerCount int32 = 128
// DDLSlowOprThreshold is the threshold for ddl slow operations, uint is millisecond.
DDLSlowOprThreshold uint32 = 300
DDLSlowOprThreshold uint32 = 300
ddlErrorRetryLimit int32 = DefTiDBDDLErrorRetryLimit
minDDLErrorRetryLimit int32 = 1
)
15 changes: 14 additions & 1 deletion sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,19 @@ func GetDDLReorgWorkerCounter() int32 {
return atomic.LoadInt32(&ddlReorgWorkerCounter)
}

// SetDDLErrorRetryLimit sets ddlErrorRetryLimit count.
func SetDDLErrorRetryLimit(cnt int32) {
if cnt < minDDLErrorRetryLimit {
cnt = minDDLErrorRetryLimit
}
atomic.StoreInt32(&ddlErrorRetryLimit, cnt)
}

// GetDDLErrorRetryLimit gets ddlErrorRetryLimit.
func GetDDLErrorRetryLimit() int32 {
return atomic.LoadInt32(&ddlErrorRetryLimit)
}

// GetSessionSystemVar gets a system variable.
// If it is a session only variable, use the default value defined in code.
// Returns error if there is no such variable.
Expand Down Expand Up @@ -291,7 +304,7 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
TiDBHashAggPartialConcurrency,
TiDBHashAggFinalConcurrency,
TiDBDistSQLScanConcurrency,
TiDBIndexSerialScanConcurrency, TiDBDDLReorgWorkerCount,
TiDBIndexSerialScanConcurrency, TiDBDDLReorgWorkerCount, TiDBDDLErrorRetryLimit,
TiDBBackoffLockFast, TiDBMaxChunkSize,
TiDBDMLBatchSize, TiDBOptimizerSelectivityLevel:
v, err := strconv.Atoi(value)
Expand Down
12 changes: 12 additions & 0 deletions sessionctx/variable/varsutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,4 +223,16 @@ func (s *testVarsutilSuite) TestVarsutil(c *C) {
c.Assert(err, IsNil)
c.Assert(val, Equals, "1")
c.Assert(v.EnableTablePartition, IsTrue)

c.Assert(GetDDLErrorRetryLimit(), Equals, int32(DefTiDBDDLErrorRetryLimit))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add more test case like set -1 to DefTiDBDDLErrorRetryLimit. Please make your unit tests cover more cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@winkyao Done.

SetSessionSystemVar(v, TiDBDDLErrorRetryLimit, types.NewIntDatum(DefTiDBDDLErrorRetryLimit-1))
c.Assert(GetDDLErrorRetryLimit(), Equals, int32(DefTiDBDDLErrorRetryLimit-1))

SetSessionSystemVar(v, TiDBDDLErrorRetryLimit, types.NewIntDatum(1))
c.Assert(GetDDLErrorRetryLimit(), Equals, int32(minDDLErrorRetryLimit))

SetSessionSystemVar(v, TiDBDDLErrorRetryLimit, types.NewIntDatum(DefTiDBDDLErrorRetryLimit))
c.Assert(GetDDLErrorRetryLimit(), Equals, int32(10000))
SetSessionSystemVar(v, TiDBDDLErrorRetryLimit, types.NewIntDatum(10))
c.Assert(GetDDLErrorRetryLimit(), Equals, int32(10))
}