From 71acda4458ab0576c51acc89e525a686dcc51278 Mon Sep 17 00:00:00 2001 From: ciscoxll Date: Thu, 30 Aug 2018 20:25:20 +0800 Subject: [PATCH 1/2] ddl: limit the number of ddl job retries --- ddl/ddl_worker.go | 12 +++++-- ddl/fail_test.go | 50 ++++++++++++++++++++++++++++ ddl/table.go | 1 + executor/ddl_test.go | 6 ++++ metrics/ddl.go | 1 + session/session.go | 1 + sessionctx/variable/session.go | 2 ++ sessionctx/variable/sysvar.go | 1 + sessionctx/variable/tidb_vars.go | 8 ++++- sessionctx/variable/varsutil.go | 15 ++++++++- sessionctx/variable/varsutil_test.go | 12 +++++++ 11 files changed, 105 insertions(+), 4 deletions(-) diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 283e70220e4f9..1b35f681ec07f 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/tidb/model" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/binloginfo" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/terror" "github.com/pingcap/tidb/util" log "github.com/sirupsen/logrus" @@ -522,13 +523,20 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, if err != nil { // If job is not cancelled, we should log this error. if job.State != model.JobStateCancelled { - log.Errorf("[ddl-%s] run DDL job err %v", w, errors.ErrorStack(err)) + log.Errorf("[ddl-%s] run DDL job err %v, job query %s ", w, errors.ErrorStack(err), job.Query) } else { - log.Infof("[ddl-%s] the DDL job is normal to cancel because %v", w, errors.ErrorStack(err)) + log.Infof("[ddl-%s] the DDL job is normal to cancel because %v, job query %s", w, errors.ErrorStack(err), job.Query) + metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerCancelDDLJob, metrics.RetLabel(err)).Observe(time.Since(model.TSConvert2Time(job.StartTS)).Seconds()) } job.Error = toTError(err) job.ErrorCount++ + if job.ErrorCount > int64(variable.GetDDLErrorRetryLimit()) && job.Type != model.ActionAddIndex { + log.Infof("[ddl-%s] DDL job over maximum retry count is canceled because %v, job query %s", w, errors.ErrorStack(err), job.Query) + job.State = model.JobStateCancelled + metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerCancelDDLJob, metrics.RetLabel(err)).Observe(time.Since(model.TSConvert2Time(job.StartTS)).Seconds()) + return + } } return } diff --git a/ddl/fail_test.go b/ddl/fail_test.go index f89c4a9aae2ff..45d47d2af9009 100644 --- a/ddl/fail_test.go +++ b/ddl/fail_test.go @@ -17,7 +17,10 @@ import ( gofail "github.com/etcd-io/gofail/runtime" . "github.com/pingcap/check" "github.com/pingcap/tidb/ast" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/model" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/types" "golang.org/x/net/context" ) @@ -63,3 +66,50 @@ func (s *testColumnChangeSuite) TestFailBeforeDecodeArgs(c *C) { c.Assert(stateCnt, Equals, 1) testCheckJobDone(c, d, job, true) } + +func (s *testColumnChangeSuite) TestErrorCountlimit(c *C) { + d := testNewDDL(context.Background(), nil, s.store, nil, nil, testLease) + defer d.Stop() + + tblInfo := testTableInfo(c, d, "t_error_count_limit", 2) + ctx := testNewContext(d) + err := ctx.NewTxn() + c.Assert(err, IsNil) + sessionVars := ctx.GetSessionVars() + variable.SetSessionSystemVar(sessionVars, variable.TiDBDDLErrorRetryLimit, types.NewIntDatum(5)) + c.Assert(variable.GetDDLErrorRetryLimit(), Equals, int32(5)) + + testCreateTable(c, ctx, d, s.dbInfo, tblInfo) + originTable := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID) + row := types.MakeDatums(1, 2) + _, err = originTable.AddRecord(ctx, row, false) + c.Assert(err, IsNil) + err = ctx.Txn().Commit(context.Background()) + c.Assert(err, IsNil) + tc := &TestDDLCallback{} + + tc.onJobRunBefore = func(job *model.Job) { + if job.SchemaState == model.StateWriteReorganization { + // Limit the number of retries. + if job.ErrorCount < int64(variable.GetDDLErrorRetryLimit())+1 { + gofail.Enable("github.com/pingcap/tidb/ddl/errorBeforeDecodeArgs", `return(true)`) + } else { + gofail.Disable("github.com/pingcap/tidb/ddl/errorBeforeDecodeArgs") + } + } + } + d.SetHook(tc) + + job := buildCreateColumnJob(s.dbInfo, tblInfo, "c3", &ast.ColumnPosition{Tp: ast.ColumnPositionNone}, int64(3)) + err = d.doDDLJob(ctx, job) + c.Assert(errCancelledDDLJob.Equal(err), Equals, true) + + kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { + t := meta.NewMeta(txn) + historyJob, err := t.GetHistoryDDLJob(job.ID) + c.Assert(err, IsNil) + // Verify that job is canceled. + c.Assert(historyJob.State, Equals, model.JobStateCancelled) + return nil + }) +} diff --git a/ddl/table.go b/ddl/table.go index d17afcadd9f50..89e12032b0cf0 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -65,6 +65,7 @@ func onCreateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) tbInfo.UpdateTS = t.StartTS err = t.CreateTable(schemaID, tbInfo) if err != nil { + job.State = model.JobStateCancelled return ver, errors.Trace(err) } if EnableSplitTableRegion { diff --git a/executor/ddl_test.go b/executor/ddl_test.go index 4d102ffa0cc20..87fb0c54f64c8 100644 --- a/executor/ddl_test.go +++ b/executor/ddl_test.go @@ -432,4 +432,10 @@ func (s *testSuite) TestSetDDLReorgWorkerCnt(c *C) { tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 100") res = tk.MustQuery("select @@global.tidb_ddl_reorg_worker_cnt") res.Check(testkit.Rows("100")) + + res = tk.MustQuery("select @@global.tidb_ddl_error_retry_limit") + res.Check(testkit.Rows(fmt.Sprintf("%v", variable.DefTiDBDDLErrorRetryLimit))) + tk.MustExec("set @@global.tidb_ddl_error_retry_limit = 10000") + res = tk.MustQuery("select @@global.tidb_ddl_error_retry_limit") + res.Check(testkit.Rows("10000")) } diff --git a/metrics/ddl.go b/metrics/ddl.go index d404ab6fd2d7e..969037ac7adf7 100644 --- a/metrics/ddl.go +++ b/metrics/ddl.go @@ -81,6 +81,7 @@ var ( WorkerAddDDLJob = "add_job" WorkerRunDDLJob = "run_job" WorkerFinishDDLJob = "finish_job" + WorkerCancelDDLJob = "cancel_job" WorkerWaitSchemaChanged = "wait_schema_changed" DDLWorkerHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ diff --git a/session/session.go b/session/session.go index d76313e581abd..19d8b53ccb791 100644 --- a/session/session.go +++ b/session/session.go @@ -1267,6 +1267,7 @@ const loadCommonGlobalVarsSQL = "select HIGH_PRIORITY * from mysql.global_variab variable.TiDBDistSQLScanConcurrency + quoteCommaQuote + variable.TiDBMaxChunkSize + quoteCommaQuote + variable.TiDBRetryLimit + quoteCommaQuote + + variable.TiDBDDLErrorRetryLimit + quoteCommaQuote + variable.TiDBDisableTxnAutoRetry + "')" // loadCommonGlobalVariablesIfNeeded loads and applies commonly used global variables for the session. diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index f15e4cae39c8c..e5501850ebf29 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -575,6 +575,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { SetDDLReorgWorkerCounter(int32(tidbOptPositiveInt32(val, DefTiDBDDLReorgWorkerCount))) case TiDBDDLReorgPriority: s.setDDLReorgPriority(val) + case TiDBDDLErrorRetryLimit: + SetDDLErrorRetryLimit(int32(tidbOptPositiveInt32(val, DefTiDBDDLErrorRetryLimit))) } s.systems[name] = val return nil diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index d62b2bea3ebe8..4b77fe803c649 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -661,6 +661,7 @@ var defaultSysVars = []*SysVar{ {ScopeSession, TiDBConfig, ""}, {ScopeGlobal | ScopeSession, TiDBDDLReorgWorkerCount, strconv.Itoa(DefTiDBDDLReorgWorkerCount)}, {ScopeSession, TiDBDDLReorgPriority, "PRIORITY_LOW"}, + {ScopeGlobal | ScopeSession, TiDBDDLErrorRetryLimit, strconv.Itoa(DefTiDBDDLErrorRetryLimit)}, } // SynonymsSysVariables is synonyms of system variables. diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 7909cf85c1d1d..3ec764b1d287c 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -177,6 +177,9 @@ const ( // tidb_ddl_reorg_priority defines the operations priority of adding indices. // It can be: PRIORITY_LOW, PRIORITY_NORMAL, PRIORITY_HIGH TiDBDDLReorgPriority = "tidb_ddl_reorg_priority" + + // tidb_ddl_error_retry_limit is the maximum number of retries when an error occurs in ddl job. + TiDBDDLErrorRetryLimit = "tidb_ddl_error_retry_limit" ) // Default TiDB system variable values. @@ -214,6 +217,7 @@ const ( DefTiDBDDLReorgWorkerCount = 16 DefTiDBHashAggPartialConcurrency = 4 DefTiDBHashAggFinalConcurrency = 4 + DefTiDBDDLErrorRetryLimit = 10000 ) // Process global variables. @@ -222,5 +226,7 @@ var ( ddlReorgWorkerCounter int32 = DefTiDBDDLReorgWorkerCount maxDDLReorgWorkerCount int32 = 128 // DDLSlowOprThreshold is the threshold for ddl slow operations, uint is millisecond. - DDLSlowOprThreshold uint32 = 300 + DDLSlowOprThreshold uint32 = 300 + ddlErrorRetryLimit int32 = DefTiDBDDLErrorRetryLimit + minDDLErrorRetryLimit int32 = 1 ) diff --git a/sessionctx/variable/varsutil.go b/sessionctx/variable/varsutil.go index c11f8501db71a..d86c9ddee6ae3 100644 --- a/sessionctx/variable/varsutil.go +++ b/sessionctx/variable/varsutil.go @@ -45,6 +45,19 @@ func GetDDLReorgWorkerCounter() int32 { return atomic.LoadInt32(&ddlReorgWorkerCounter) } +// SetDDLErrorRetryLimit sets ddlErrorRetryLimit count. +func SetDDLErrorRetryLimit(cnt int32) { + if cnt < minDDLErrorRetryLimit { + cnt = minDDLErrorRetryLimit + } + atomic.StoreInt32(&ddlErrorRetryLimit, cnt) +} + +// GetDDLErrorRetryLimit gets ddlErrorRetryLimit. +func GetDDLErrorRetryLimit() int32 { + return atomic.LoadInt32(&ddlErrorRetryLimit) +} + // GetSessionSystemVar gets a system variable. // If it is a session only variable, use the default value defined in code. // Returns error if there is no such variable. @@ -291,7 +304,7 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string, TiDBHashAggPartialConcurrency, TiDBHashAggFinalConcurrency, TiDBDistSQLScanConcurrency, - TiDBIndexSerialScanConcurrency, TiDBDDLReorgWorkerCount, + TiDBIndexSerialScanConcurrency, TiDBDDLReorgWorkerCount, TiDBDDLErrorRetryLimit, TiDBBackoffLockFast, TiDBMaxChunkSize, TiDBDMLBatchSize, TiDBOptimizerSelectivityLevel: v, err := strconv.Atoi(value) diff --git a/sessionctx/variable/varsutil_test.go b/sessionctx/variable/varsutil_test.go index 2c2a96a25ea4c..aab5a7059279e 100644 --- a/sessionctx/variable/varsutil_test.go +++ b/sessionctx/variable/varsutil_test.go @@ -223,4 +223,16 @@ func (s *testVarsutilSuite) TestVarsutil(c *C) { c.Assert(err, IsNil) c.Assert(val, Equals, "1") c.Assert(v.EnableTablePartition, IsTrue) + + c.Assert(GetDDLErrorRetryLimit(), Equals, int32(DefTiDBDDLErrorRetryLimit)) + SetSessionSystemVar(v, TiDBDDLErrorRetryLimit, types.NewIntDatum(DefTiDBDDLErrorRetryLimit-1)) + c.Assert(GetDDLErrorRetryLimit(), Equals, int32(DefTiDBDDLErrorRetryLimit-1)) + + SetSessionSystemVar(v, TiDBDDLErrorRetryLimit, types.NewIntDatum(1)) + c.Assert(GetDDLErrorRetryLimit(), Equals, int32(minDDLErrorRetryLimit)) + + SetSessionSystemVar(v, TiDBDDLErrorRetryLimit, types.NewIntDatum(DefTiDBDDLErrorRetryLimit)) + c.Assert(GetDDLErrorRetryLimit(), Equals, int32(10000)) + SetSessionSystemVar(v, TiDBDDLErrorRetryLimit, types.NewIntDatum(10)) + c.Assert(GetDDLErrorRetryLimit(), Equals, int32(10)) } From aec761090fe567b9ed124bdb4cc5279ea4726269 Mon Sep 17 00:00:00 2001 From: ciscoxll Date: Mon, 3 Sep 2018 14:42:43 +0800 Subject: [PATCH 2/2] fix ci error --- ddl/ddl_worker.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 1b35f681ec07f..56483d44b9ada 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -526,15 +526,15 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, log.Errorf("[ddl-%s] run DDL job err %v, job query %s ", w, errors.ErrorStack(err), job.Query) } else { log.Infof("[ddl-%s] the DDL job is normal to cancel because %v, job query %s", w, errors.ErrorStack(err), job.Query) - metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerCancelDDLJob, metrics.RetLabel(err)).Observe(time.Since(model.TSConvert2Time(job.StartTS)).Seconds()) + metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerCancelDDLJob, job.Type.String(), metrics.RetLabel(err)).Observe(time.Since(model.TSConvert2Time(job.StartTS)).Seconds()) } job.Error = toTError(err) job.ErrorCount++ if job.ErrorCount > int64(variable.GetDDLErrorRetryLimit()) && job.Type != model.ActionAddIndex { log.Infof("[ddl-%s] DDL job over maximum retry count is canceled because %v, job query %s", w, errors.ErrorStack(err), job.Query) - job.State = model.JobStateCancelled - metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerCancelDDLJob, metrics.RetLabel(err)).Observe(time.Since(model.TSConvert2Time(job.StartTS)).Seconds()) + job.State = model.JobStateCancelling + metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerCancelDDLJob, job.Type.String(), metrics.RetLabel(err)).Observe(time.Since(model.TSConvert2Time(job.StartTS)).Seconds()) return } }