Skip to content

Commit

Permalink
ddl: refactor unlock seq and make ddl worker closeable (#35422)
Browse files Browse the repository at this point in the history
ref #32031
  • Loading branch information
xiongjiwei authored Jun 16, 2022
1 parent 7541899 commit a5d6db2
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 16 deletions.
2 changes: 1 addition & 1 deletion ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ func (d *ddl) close() {
d.schemaSyncer.Close()

for _, worker := range d.workers {
worker.close()
worker.Close()
}
// d.delRangeMgr using sessions from d.sessPool.
// Put it before d.sessPool.close to reduce the time spent by d.sessPool.close.
Expand Down
4 changes: 2 additions & 2 deletions ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func TestNotifyDDLJob(t *testing.T) {
// Ensure that the notification is not handled in workers `start` function.
d.cancel()
for _, worker := range d.workers {
worker.close()
worker.Close()
}

job := &model.Job{
Expand Down Expand Up @@ -401,7 +401,7 @@ func TestNotifyDDLJob(t *testing.T) {
// Ensure that the notification is not handled by worker's "start".
d1.cancel()
for _, worker := range d1.workers {
worker.close()
worker.Close()
}
d1.ownerManager.RetireOwner()
d1.asyncNotifyWorker(job)
Expand Down
30 changes: 17 additions & 13 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,15 @@ import (
topsqlstate "github.com/pingcap/tidb/util/topsql/state"
"github.com/tikv/client-go/v2/tikvrpc"
clientv3 "go.etcd.io/etcd/client/v3"
atomicutil "go.uber.org/atomic"
"go.uber.org/zap"
)

var (
// RunWorker indicates if this TiDB server starts DDL worker and can run DDL job.
RunWorker = true
// ddlWorkerID is used for generating the next DDL worker ID.
ddlWorkerID = int32(0)
ddlWorkerID = atomicutil.NewInt32(0)
// WaitTimeWhenErrorOccurred is waiting interval when processing DDL jobs encounter errors.
WaitTimeWhenErrorOccurred = int64(1 * time.Second)
)
Expand Down Expand Up @@ -118,7 +119,7 @@ func NewJobContext() *JobContext {

func newWorker(ctx context.Context, tp workerType, sessPool *sessionPool, delRangeMgr delRangeManager, dCtx *ddlCtx) *worker {
worker := &worker{
id: atomic.AddInt32(&ddlWorkerID, 1),
id: ddlWorkerID.Add(1),
tp: tp,
ddlJobCh: make(chan struct{}, 1),
ctx: ctx,
Expand Down Expand Up @@ -149,7 +150,7 @@ func (w *worker) String() string {
return fmt.Sprintf("worker %d, tp %s", w.id, w.typeStr())
}

func (w *worker) close() {
func (w *worker) Close() {
startTime := time.Now()
w.wg.Wait()
logutil.Logger(w.logCtx).Info("[ddl] DDL worker closed", zap.Duration("take time", time.Since(startTime)))
Expand Down Expand Up @@ -540,6 +541,17 @@ func (w *JobContext) setDDLLabelForTopSQL(job *model.Job) {
}
}

func (w *worker) unlockSeqNum(err error) {
if w.lockSeqNum {
if err != nil {
// if meet error, we should reset seqNum.
w.ddlSeqNumMu.seqNum--
}
w.lockSeqNum = false
w.ddlSeqNumMu.Unlock()
}
}

func (w *JobContext) getResourceGroupTaggerForTopSQL() tikvrpc.ResourceGroupTagger {
if !topsqlstate.TopSQLEnabled() || w.cacheDigest == nil {
return nil
Expand Down Expand Up @@ -650,21 +662,13 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx) error {
}

if err != nil {
if w.lockSeqNum {
// txn commit failed, we should reset seqNum.
w.ddlSeqNumMu.seqNum--
w.lockSeqNum = false
w.ddlSeqNumMu.Unlock()
}
w.unlockSeqNum(err)
return errors.Trace(err)
} else if job == nil {
// No job now, return and retry getting later.
return nil
}
if w.lockSeqNum {
w.lockSeqNum = false
d.ddlSeqNumMu.Unlock()
}
w.unlockSeqNum(err)
w.waitDependencyJobFinished(job, &waitDependencyJobCnt)

// Here means the job enters another state (delete only, write only, public, etc...) or is cancelled.
Expand Down

0 comments on commit a5d6db2

Please sign in to comment.