diff --git a/ddl/ddl.go b/ddl/ddl.go index 39c22c8f3c191..7c2760eb2a6b0 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -154,6 +154,7 @@ const ( OnExistReplace jobRecordCapacity = 16 + jobOnceCapacity = 1000 ) var ( @@ -289,14 +290,14 @@ type waitSchemaSyncedController struct { mu sync.RWMutex job map[int64]struct{} - // true if this node is elected to the DDL owner, we should wait 2 * lease before it runs the first DDL job. - once *atomicutil.Bool + // Use to check if the DDL job is the first run on this owner. + onceMap map[int64]struct{} } func newWaitSchemaSyncedController() *waitSchemaSyncedController { return &waitSchemaSyncedController{ - job: make(map[int64]struct{}, jobRecordCapacity), - once: atomicutil.NewBool(true), + job: make(map[int64]struct{}, jobRecordCapacity), + onceMap: make(map[int64]struct{}, jobOnceCapacity), } } @@ -319,6 +320,25 @@ func (w *waitSchemaSyncedController) synced(job *model.Job) { delete(w.job, job.ID) } +// maybeAlreadyRunOnce returns true means that the job may be the first run on this owner. +// Returns false means that the job must not be the first run on this owner. +func (w *waitSchemaSyncedController) maybeAlreadyRunOnce(id int64) bool { + w.mu.Lock() + defer w.mu.Unlock() + _, ok := w.onceMap[id] + return ok +} + +func (w *waitSchemaSyncedController) setAlreadyRunOnce(id int64) { + w.mu.Lock() + defer w.mu.Unlock() + if len(w.onceMap) > jobOnceCapacity { + // If the map is too large, we reset it. These jobs may need to check schema synced again, but it's ok. + w.onceMap = make(map[int64]struct{}, jobRecordCapacity) + } + w.onceMap[id] = struct{}{} +} + // ddlCtx is the context when we use worker to handle DDL jobs. type ddlCtx struct { ctx context.Context diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index e99601a6d81d7..45594132bc776 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -835,17 +835,14 @@ func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) (int64, error) { } w.registerSync(job) - if runJobErr != nil { + if runJobErr != nil && !dbterror.ErrPausedDDLJob.Equal(runJobErr) { // Omit the ErrPausedDDLJob - if !dbterror.ErrPausedDDLJob.Equal(runJobErr) { + w.jobLogger(job).Info("run DDL job failed, sleeps a while then retries it.", + zap.Duration("waitTime", GetWaitTimeWhenErrorOccurred()), zap.Error(runJobErr)) + // In test and job is cancelling we can ignore the sleep. + if !(intest.InTest && job.IsCancelling()) { // wait a while to retry again. If we don't wait here, DDL will retry this job immediately, // which may act like a deadlock. - w.jobLogger(job).Info("run DDL job failed, sleeps a while then retries it.", - zap.Duration("waitTime", GetWaitTimeWhenErrorOccurred()), zap.Error(runJobErr)) - } - - // In test and job is cancelling we can ignore the sleep - if !(intest.InTest && job.IsCancelling()) { time.Sleep(GetWaitTimeWhenErrorOccurred()) } } @@ -1190,14 +1187,14 @@ func toTError(err error) *terror.Error { return dbterror.ClassDDL.Synthesize(terror.CodeUnknown, err.Error()) } -// waitSchemaChanged waits for the completion of updating all servers' schema. In order to make sure that happens, +// waitSchemaChanged waits for the completion of updating all servers' schema or MDL synced. In order to make sure that happens, // we wait at most 2 * lease time(sessionTTL, 90 seconds). -func waitSchemaChanged(d *ddlCtx, waitTime time.Duration, latestSchemaVersion int64, job *model.Job) { +func waitSchemaChanged(d *ddlCtx, waitTime time.Duration, latestSchemaVersion int64, job *model.Job) error { if !job.IsRunning() && !job.IsRollingback() && !job.IsDone() && !job.IsRollbackDone() { - return + return nil } if waitTime == 0 { - return + return nil } timeStart := time.Now() @@ -1208,16 +1205,19 @@ func waitSchemaChanged(d *ddlCtx, waitTime time.Duration, latestSchemaVersion in if latestSchemaVersion == 0 { logutil.Logger(d.ctx).Info("schema version doesn't change", zap.String("category", "ddl")) - return + return nil } err = d.schemaSyncer.OwnerUpdateGlobalVersion(d.ctx, latestSchemaVersion) if err != nil { logutil.Logger(d.ctx).Info("update latest schema version failed", zap.String("category", "ddl"), zap.Int64("ver", latestSchemaVersion), zap.Error(err)) + if variable.EnableMDL.Load() { + return err + } if terror.ErrorEqual(err, context.DeadlineExceeded) { // If err is context.DeadlineExceeded, it means waitTime(2 * lease) is elapsed. So all the schemas are synced by ticker. // There is no need to use etcd to sync. The function returns directly. - return + return nil } } @@ -1225,12 +1225,13 @@ func waitSchemaChanged(d *ddlCtx, waitTime time.Duration, latestSchemaVersion in err = d.schemaSyncer.OwnerCheckAllVersions(d.ctx, job.ID, latestSchemaVersion) if err != nil { logutil.Logger(d.ctx).Info("wait latest schema version encounter error", zap.String("category", "ddl"), zap.Int64("ver", latestSchemaVersion), zap.Error(err)) - return + return err } logutil.Logger(d.ctx).Info("wait latest schema version changed(get the metadata lock if tidb_enable_metadata_lock is true)", zap.String("category", "ddl"), zap.Int64("ver", latestSchemaVersion), zap.Duration("take time", time.Since(timeStart)), zap.String("job", job.String())) + return nil } // waitSchemaSyncedForMDL likes waitSchemaSynced, but it waits for getting the metadata lock of the latest version of this DDL. @@ -1289,8 +1290,7 @@ func waitSchemaSynced(d *ddlCtx, job *model.Job, waitTime time.Duration) error { } }) - waitSchemaChanged(d, waitTime, latestSchemaVersion, job) - return nil + return waitSchemaChanged(d, waitTime, latestSchemaVersion, job) } func buildPlacementAffects(oldIDs []int64, newIDs []int64) []*model.AffectedOption { diff --git a/ddl/job_table.go b/ddl/job_table.go index 757ebbaa31e22..6b309229df064 100644 --- a/ddl/job_table.go +++ b/ddl/job_table.go @@ -277,7 +277,7 @@ func (d *ddl) startDispatchLoop() { } if !d.isOwner() { isOnce = true - d.once.Store(true) + d.onceMap = make(map[int64]struct{}, jobOnceCapacity) time.Sleep(dispatchLoopWaitingDuration) continue } @@ -378,7 +378,7 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) { metrics.DDLRunningJobCount.WithLabelValues(pool.tp().String()).Dec() }() // check if this ddl job is synced to all servers. - if !d.isSynced(job) || d.once.Load() { + if !job.NotStarted() && (!d.isSynced(job) || !d.maybeAlreadyRunOnce(job.ID)) { if variable.EnableMDL.Load() { exist, version, err := checkMDLInfo(job.ID, d.sessPool) if err != nil { @@ -393,7 +393,7 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) { if err != nil { return } - d.once.Store(false) + d.setAlreadyRunOnce(job.ID) cleanMDLInfo(d.sessPool, job.ID, d.etcdCli) // Don't have a worker now. return @@ -407,7 +407,7 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) { pool.put(wk) return } - d.once.Store(false) + d.setAlreadyRunOnce(job.ID) } } @@ -426,9 +426,14 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) { }) // Here means the job enters another state (delete only, write only, public, etc...) or is cancelled. - // If the job is done or still running or rolling back, we will wait 2 * lease time to guarantee other servers to update + // If the job is done or still running or rolling back, we will wait 2 * lease time or util MDL synced to guarantee other servers to update // the newest schema. - waitSchemaChanged(d.ddlCtx, d.lease*2, schemaVer, job) + err := waitSchemaChanged(d.ddlCtx, d.lease*2, schemaVer, job) + if err != nil { + // May be caused by server closing, shouldn't clean the MDL info. + logutil.BgLogger().Info("wait latest schema version error", zap.String("category", "ddl"), zap.Error(err)) + return + } cleanMDLInfo(d.sessPool, job.ID, d.etcdCli) d.synced(job) diff --git a/session/bootstraptest/bootstrap_upgrade_test.go b/session/bootstraptest/bootstrap_upgrade_test.go index 026aca1c16922..f5fa4dbb7df58 100644 --- a/session/bootstraptest/bootstrap_upgrade_test.go +++ b/session/bootstraptest/bootstrap_upgrade_test.go @@ -586,13 +586,6 @@ func TestUpgradeVersionForResumeJob(t *testing.T) { jobID = job.ID times = 1 } - // Make sure we do jobID first, then do jobID+1. - if job.ID == jobID && job.SchemaState == model.StateWriteReorganization && job.State == model.JobStateQueueing && times == 1 { - times = 2 - } - if job.ID == jobID+1 && job.SchemaState == model.StateNone && job.State == model.JobStateQueueing && times == 2 { - times = 3 - } if job.ID == jobID && job.State == model.JobStateDone && job.SchemaState == model.StatePublic { wg.Done() } @@ -621,7 +614,6 @@ func TestUpgradeVersionForResumeJob(t *testing.T) { require.Equal(t, session.CurrentBootstrapVersion, ver) wg.Wait() - require.Equal(t, 3, times) // Make sure the second add index operation is successful. sql := fmt.Sprintf("select job_meta from mysql.tidb_ddl_history where job_id=%d or job_id=%d order by job_id", jobID, jobID+1) rows, err := execute(context.Background(), seLatestV, sql)