Skip to content

Commit

Permalink
ddl: cherrypick some PRs to fix MDL bug (pingcap#47079)
Browse files Browse the repository at this point in the history
  • Loading branch information
wjhuang2016 committed Nov 23, 2023
1 parent 71bcc44 commit a948ec8
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 26 deletions.
28 changes: 24 additions & 4 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ const (
OnExistReplace

jobRecordCapacity = 16
jobOnceCapacity = 1000
)

var (
Expand Down Expand Up @@ -286,14 +287,14 @@ type waitSchemaSyncedController struct {
mu sync.RWMutex
job map[int64]struct{}

// true if this node is elected to the DDL owner, we should wait 2 * lease before it runs the first DDL job.
once *atomicutil.Bool
// Use to check if the DDL job is the first run on this owner.
onceMap map[int64]struct{}
}

func newWaitSchemaSyncedController() *waitSchemaSyncedController {
return &waitSchemaSyncedController{
job: make(map[int64]struct{}, jobRecordCapacity),
once: atomicutil.NewBool(true),
job: make(map[int64]struct{}, jobRecordCapacity),
onceMap: make(map[int64]struct{}, jobOnceCapacity),
}
}

Expand All @@ -316,6 +317,25 @@ func (w *waitSchemaSyncedController) synced(job *model.Job) {
delete(w.job, job.ID)
}

// maybeAlreadyRunOnce returns true means that the job may be the first run on this owner.
// Returns false means that the job must not be the first run on this owner.
func (w *waitSchemaSyncedController) maybeAlreadyRunOnce(id int64) bool {
w.mu.Lock()
defer w.mu.Unlock()
_, ok := w.onceMap[id]
return ok
}

func (w *waitSchemaSyncedController) setAlreadyRunOnce(id int64) {
w.mu.Lock()
defer w.mu.Unlock()
if len(w.onceMap) > jobOnceCapacity {
// If the map is too large, we reset it. These jobs may need to check schema synced again, but it's ok.
w.onceMap = make(map[int64]struct{}, jobRecordCapacity)
}
w.onceMap[id] = struct{}{}
}

// ddlCtx is the context when we use worker to handle DDL jobs.
type ddlCtx struct {
ctx context.Context
Expand Down
30 changes: 16 additions & 14 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1052,8 +1052,7 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx) error {
// Here means the job enters another state (delete only, write only, public, etc...) or is cancelled.
// If the job is done or still running or rolling back, we will wait 2 * lease time to guarantee other servers to update
// the newest schema.
waitSchemaChanged(context.Background(), d, waitTime, schemaVer, job)

_ = waitSchemaChanged(d, waitTime, schemaVer, job)
if RunInGoTest {
// d.mu.hook is initialed from domain / test callback, which will force the owner host update schema diff synchronously.
d.mu.RLock()
Expand Down Expand Up @@ -1355,14 +1354,14 @@ func toTError(err error) *terror.Error {
return dbterror.ClassDDL.Synthesize(terror.CodeUnknown, err.Error())
}

// waitSchemaChanged waits for the completion of updating all servers' schema. In order to make sure that happens,
// waitSchemaChanged waits for the completion of updating all servers' schema or MDL synced. In order to make sure that happens,
// we wait at most 2 * lease time(sessionTTL, 90 seconds).
func waitSchemaChanged(ctx context.Context, d *ddlCtx, waitTime time.Duration, latestSchemaVersion int64, job *model.Job) {
func waitSchemaChanged(d *ddlCtx, waitTime time.Duration, latestSchemaVersion int64, job *model.Job) error {
if !job.IsRunning() && !job.IsRollingback() && !job.IsDone() && !job.IsRollbackDone() {
return
return nil
}
if waitTime == 0 {
return
return nil
}

timeStart := time.Now()
Expand All @@ -1373,29 +1372,33 @@ func waitSchemaChanged(ctx context.Context, d *ddlCtx, waitTime time.Duration, l

if latestSchemaVersion == 0 {
logutil.Logger(d.ctx).Info("[ddl] schema version doesn't change")
return
return nil
}

err = d.schemaSyncer.OwnerUpdateGlobalVersion(ctx, latestSchemaVersion)
err = d.schemaSyncer.OwnerUpdateGlobalVersion(d.ctx, latestSchemaVersion)
if err != nil {
logutil.Logger(d.ctx).Info("[ddl] update latest schema version failed", zap.Int64("ver", latestSchemaVersion), zap.Error(err))
if variable.EnableMDL.Load() {
return err
}
if terror.ErrorEqual(err, context.DeadlineExceeded) {
// If err is context.DeadlineExceeded, it means waitTime(2 * lease) is elapsed. So all the schemas are synced by ticker.
// There is no need to use etcd to sync. The function returns directly.
return
return nil
}
}

// OwnerCheckAllVersions returns only when all TiDB schemas are synced(exclude the isolated TiDB).
err = d.schemaSyncer.OwnerCheckAllVersions(context.Background(), job.ID, latestSchemaVersion)
err = d.schemaSyncer.OwnerCheckAllVersions(d.ctx, job.ID, latestSchemaVersion)
if err != nil {
logutil.Logger(d.ctx).Info("[ddl] wait latest schema version encounter error", zap.Int64("ver", latestSchemaVersion), zap.Error(err))
return
return err
}
logutil.Logger(d.ctx).Info("[ddl] wait latest schema version changed(get the metadata lock if tidb_enable_metadata_lock is true)",
zap.Int64("ver", latestSchemaVersion),
zap.Duration("take time", time.Since(timeStart)),
zap.String("job", job.String()))
return nil
}

// waitSchemaSyncedForMDL likes waitSchemaSynced, but it waits for getting the metadata lock of the latest version of this DDL.
Expand All @@ -1412,7 +1415,7 @@ func waitSchemaSyncedForMDL(d *ddlCtx, job *model.Job, latestSchemaVersion int64

timeStart := time.Now()
// OwnerCheckAllVersions returns only when all TiDB schemas are synced(exclude the isolated TiDB).
err := d.schemaSyncer.OwnerCheckAllVersions(context.Background(), job.ID, latestSchemaVersion)
err := d.schemaSyncer.OwnerCheckAllVersions(d.ctx, job.ID, latestSchemaVersion)
if err != nil {
logutil.Logger(d.ctx).Info("[ddl] wait latest schema version encounter error", zap.Int64("ver", latestSchemaVersion), zap.Error(err))
return err
Expand Down Expand Up @@ -1454,8 +1457,7 @@ func waitSchemaSynced(d *ddlCtx, job *model.Job, waitTime time.Duration) error {
}
})

waitSchemaChanged(context.Background(), d, waitTime, latestSchemaVersion, job)
return nil
return waitSchemaChanged(d, waitTime, latestSchemaVersion, job)
}

func buildPlacementAffects(oldIDs []int64, newIDs []int64) []*model.AffectedOption {
Expand Down
17 changes: 11 additions & 6 deletions ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (d *ddl) startDispatchLoop() {
return
}
if !variable.EnableConcurrentDDL.Load() || !d.isOwner() || d.waiting.Load() {
d.once.Store(true)
d.onceMap = make(map[int64]struct{}, jobOnceCapacity)
time.Sleep(time.Second)
continue
}
Expand Down Expand Up @@ -234,7 +234,7 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) {
metrics.DDLRunningJobCount.WithLabelValues(pool.tp().String()).Dec()
}()
// check if this ddl job is synced to all servers.
if !d.isSynced(job) || d.once.Load() {
if !job.NotStarted() && (!d.isSynced(job) || !d.maybeAlreadyRunOnce(job.ID)) {
if variable.EnableMDL.Load() {
exist, version, err := checkMDLInfo(job.ID, d.sessPool)
if err != nil {
Expand All @@ -249,7 +249,7 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) {
if err != nil {
return
}
d.once.Store(false)
d.setAlreadyRunOnce(job.ID)
cleanMDLInfo(d.sessPool, job.ID, d.etcdCli)
// Don't have a worker now.
return
Expand All @@ -263,7 +263,7 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) {
pool.put(wk)
return
}
d.once.Store(false)
d.setAlreadyRunOnce(job.ID)
}
}

Expand All @@ -282,9 +282,14 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) {
})

// Here means the job enters another state (delete only, write only, public, etc...) or is cancelled.
// If the job is done or still running or rolling back, we will wait 2 * lease time to guarantee other servers to update
// If the job is done or still running or rolling back, we will wait 2 * lease time or util MDL synced to guarantee other servers to update
// the newest schema.
waitSchemaChanged(context.Background(), d.ddlCtx, d.lease*2, schemaVer, job)
err := waitSchemaChanged(d.ddlCtx, d.lease*2, schemaVer, job)
if err != nil {
// May be caused by server closing, shouldn't clean the MDL info.
logutil.BgLogger().Info("wait latest schema version error", zap.String("category", "ddl"), zap.Error(err))
return
}
cleanMDLInfo(d.sessPool, job.ID, d.etcdCli)
d.synced(job)

Expand Down
6 changes: 4 additions & 2 deletions ddl/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,13 +310,15 @@ func (s *schemaVersionSyncer) OwnerCheckAllVersions(ctx context.Context, jobID i
if variable.EnableMDL.Load() {
for _, kv := range resp.Kvs {
key := string(kv.Key)
tidbIDInResp := key[strings.LastIndex(key, "/")+1:]
ver, err := strconv.Atoi(string(kv.Value))
if err != nil {
logutil.BgLogger().Info("[ddl] syncer check all versions, convert value to int failed, continue checking.", zap.String("ddl", string(kv.Key)), zap.String("value", string(kv.Value)), zap.Error(err))
succ = false
break
}
if int64(ver) < latestVer {
// We need to check if the tidb ID is in the updatedMap, in case that deleting etcd is failed, and tidb server is down.
if int64(ver) < latestVer && updatedMap[tidbIDInResp] != "" {
if notMatchVerCnt%intervalCnt == 0 {
logutil.BgLogger().Info("[ddl] syncer check all versions, someone is not synced, continue checking",
zap.String("ddl", string(kv.Key)), zap.Int("currentVer", ver), zap.Int64("latestVer", latestVer))
Expand All @@ -325,7 +327,7 @@ func (s *schemaVersionSyncer) OwnerCheckAllVersions(ctx context.Context, jobID i
notMatchVerCnt++
break
}
delete(updatedMap, key[strings.LastIndex(key, "/")+1:])
delete(updatedMap, tidbIDInResp)
}
if len(updatedMap) > 0 {
succ = false
Expand Down

0 comments on commit a948ec8

Please sign in to comment.