Skip to content

Commit

Permalink
ddl: make reorgCtx safer to use (#41421)
Browse files Browse the repository at this point in the history
close #41418
  • Loading branch information
zimulala authored Feb 15, 2023
1 parent 44e4381 commit a56fa8e
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 24 deletions.
19 changes: 7 additions & 12 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,13 @@ func (dc *ddlCtx) getReorgCtx(jobID int64) *reorgCtx {
}

func (dc *ddlCtx) newReorgCtx(jobID int64, startKey []byte, currElement *meta.Element, rowCount int64) *reorgCtx {
dc.reorgCtx.Lock()
defer dc.reorgCtx.Unlock()
existedRC, ok := dc.reorgCtx.reorgCtxMap[jobID]
if ok {
existedRC.references.Add(1)
return existedRC
}
rc := &reorgCtx{}
rc.doneCh = make(chan error, 1)
// initial reorgCtx
Expand All @@ -535,22 +542,10 @@ func (dc *ddlCtx) newReorgCtx(jobID int64, startKey []byte, currElement *meta.El
rc.mu.warnings = make(map[errors.ErrorID]*terror.Error)
rc.mu.warningsCount = make(map[errors.ErrorID]int64)
rc.references.Add(1)
dc.reorgCtx.Lock()
defer dc.reorgCtx.Unlock()
dc.reorgCtx.reorgCtxMap[jobID] = rc
return rc
}

func (dc *ddlCtx) setReorgCtxForBackfill(bfJob *BackfillJob) {
rc := dc.getReorgCtx(bfJob.JobID)
if rc == nil {
ele := &meta.Element{ID: bfJob.EleID, TypeKey: bfJob.EleKey}
dc.newReorgCtx(bfJob.JobID, bfJob.Meta.StartKey, ele, bfJob.Meta.RowCount)
} else {
rc.references.Add(1)
}
}

func (dc *ddlCtx) removeReorgCtx(jobID int64) {
dc.reorgCtx.Lock()
defer dc.reorgCtx.Unlock()
Expand Down
6 changes: 0 additions & 6 deletions ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ type DDLForTest interface {
// SetInterceptor sets the interceptor.
SetInterceptor(h Interceptor)
NewReorgCtx(jobID int64, startKey []byte, currElement *meta.Element, rowCount int64) *reorgCtx
SetReorgCtxForBackfill(bfJob *BackfillJob)
GetReorgCtx(jobID int64) *reorgCtx
RemoveReorgCtx(id int64)
}
Expand All @@ -67,11 +66,6 @@ func (d *ddl) NewReorgCtx(jobID int64, startKey []byte, currElement *meta.Elemen
return d.newReorgCtx(jobID, startKey, currElement, rowCount)
}

// SetReorgCtxForBackfill exports for testing.
func (d *ddl) SetReorgCtxForBackfill(bfJob *BackfillJob) {
d.setReorgCtxForBackfill(bfJob)
}

// GetReorgCtx exports for testing.
func (d *ddl) GetReorgCtx(jobID int64) *reorgCtx {
return d.getReorgCtx(jobID)
Expand Down
10 changes: 5 additions & 5 deletions ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,10 +317,10 @@ func TestUsingReorgCtx(t *testing.T) {
wg := util.WaitGroupWrapper{}
wg.Run(func() {
jobID := int64(1)
m := &model.BackfillMeta{StartKey: []byte("skey"), RowCount: 1}
bfJob := &ddl.BackfillJob{JobID: jobID, EleID: 1, EleKey: nil, Meta: m}
for i := 0; i < 100; i++ {
d.(ddl.DDLForTest).SetReorgCtxForBackfill(bfJob)
startKey := []byte("skey")
ele := &meta.Element{ID: 1, TypeKey: nil}
for i := 0; i < 500; i++ {
d.(ddl.DDLForTest).NewReorgCtx(jobID, startKey, ele, 0)
d.(ddl.DDLForTest).GetReorgCtx(jobID).IsReorgCanceled()
d.(ddl.DDLForTest).RemoveReorgCtx(jobID)
}
Expand All @@ -329,7 +329,7 @@ func TestUsingReorgCtx(t *testing.T) {
jobID := int64(1)
startKey := []byte("skey")
ele := &meta.Element{ID: 1, TypeKey: nil}
for i := 0; i < 100; i++ {
for i := 0; i < 500; i++ {
d.(ddl.DDLForTest).NewReorgCtx(jobID, startKey, ele, 0)
d.(ddl.DDLForTest).GetReorgCtx(jobID).IsReorgCanceled()
d.(ddl.DDLForTest).RemoveReorgCtx(jobID)
Expand Down
2 changes: 1 addition & 1 deletion ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ func (d *ddl) loadBackfillJobAndRun() {
return
}
// TODO: Adjust how the non-owner uses ReorgCtx.
d.setReorgCtxForBackfill(bfJob)
d.newReorgCtx(bfJob.JobID, bfJob.Meta.StartKey, &meta.Element{ID: bfJob.EleID, TypeKey: bfJob.EleKey}, bfJob.Meta.RowCount)
d.wg.Run(func() {
defer func() {
tidbutil.Recover(metrics.LabelDistReorg, "runBackfillJobs", nil, false)
Expand Down

0 comments on commit a56fa8e

Please sign in to comment.