Skip to content

Commit

Permalink
ddl: ignore reorg elem error when the job is cancelling (#41383)
Browse files Browse the repository at this point in the history
close #41381
  • Loading branch information
tangenta authored Feb 15, 2023
1 parent a56fa8e commit c6bd86c
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 2 deletions.
7 changes: 7 additions & 0 deletions ddl/callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ type Callback interface {
OnSchemaStateChanged(schemaVer int64)
// OnJobRunBefore is called before running job.
OnJobRunBefore(job *model.Job)
// OnJobRunAfter is called after running job.
OnJobRunAfter(job *model.Job)
// OnJobUpdated is called after the running job is updated.
OnJobUpdated(job *model.Job)
// OnWatched is called after watching owner is completed.
Expand Down Expand Up @@ -80,6 +82,11 @@ func (*BaseCallback) OnJobRunBefore(_ *model.Job) {
// Nothing to do.
}

// OnJobRunAfter implements Callback.OnJobRunAfter interface.
func (*BaseCallback) OnJobRunAfter(_ *model.Job) {
// Nothing to do.
}

// OnJobUpdated implements Callback.OnJobUpdated interface.
func (*BaseCallback) OnJobUpdated(job *model.Job) {
// Nothing to do.
Expand Down
2 changes: 1 addition & 1 deletion ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -822,7 +822,7 @@ func doReorgWorkForModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.J
return false, ver, errors.Trace(err)
}
reorgInfo, err := getReorgInfo(d.jobContext(job.ID), d, rh, job, dbInfo, tbl, BuildElements(changingCol, changingIdxs), false)
if err != nil || reorgInfo.first {
if err != nil || reorgInfo == nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
// and then run the reorg next time.
return false, ver, errors.Trace(err)
Expand Down
4 changes: 4 additions & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,10 @@ func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) (int64, error) {
// and retry later if the job is not cancelled.
schemaVer, runJobErr = w.runDDLJob(d, t, job)

d.mu.RLock()
d.mu.hook.OnJobRunAfter(job)
d.mu.RUnlock()

if job.IsCancelled() {
defer d.unlockSchemaVersion(job.ID)
w.sess.reset()
Expand Down
2 changes: 1 addition & 1 deletion ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -983,7 +983,7 @@ func runReorgJobAndHandleErr(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
return false, ver, errors.Trace(err)
}
reorgInfo, err := getReorgInfo(d.jobContext(job.ID), d, rh, job, dbInfo, tbl, elements, mergingTmpIdx)
if err != nil || reorgInfo.first {
if err != nil || reorgInfo == nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
// and then run the reorg next time.
return false, ver, errors.Trace(err)
Expand Down
39 changes: 39 additions & 0 deletions ddl/indexmergetest/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package indexmergetest

import (
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -858,3 +859,41 @@ func TestAddIndexMultipleDelete(t *testing.T) {
tk.MustQuery("select * from t;").Check(testkit.Rows())
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDMLExecution"))
}

func TestAddIndexDuplicateAndWriteConflict(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t(id int primary key, b int);")
tk.MustExec("insert into t values (1, 1);")

tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")

d := dom.DDL()
originalCallback := d.GetHook()
defer d.SetHook(originalCallback)
callback := &callback.TestDDLCallback{}
var runCancel bool
callback.OnJobRunAfterExported = func(job *model.Job) {
if t.Failed() || runCancel {
return
}
switch job.SchemaState {
case model.StateWriteOnly:
_, err := tk1.Exec("insert into t values (2, 1);")
assert.NoError(t, err)
}
if job.State == model.JobStateRollingback {
_, err := tk1.Exec("admin cancel ddl jobs " + strconv.FormatInt(job.ID, 10))
assert.NoError(t, err)
runCancel = true
}
}
d.SetHook(callback)

tk.MustGetErrCode("alter table t add unique index idx(b);", errno.ErrCancelledDDLJob)
tk.MustExec("admin check table t;")
tk.MustQuery("select * from t;").Check(testkit.Rows("1 1", "2 1"))
}
12 changes: 12 additions & 0 deletions ddl/internal/callback/callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type TestDDLCallback struct {

onJobRunBefore func(*model.Job)
OnJobRunBeforeExported func(*model.Job)
OnJobRunAfterExported func(*model.Job)
onJobUpdated func(*model.Job)
OnJobUpdatedExported atomic.Pointer[func(*model.Job)]
onWatched func(ctx context.Context)
Expand Down Expand Up @@ -103,6 +104,17 @@ func (tc *TestDDLCallback) OnJobRunBefore(job *model.Job) {
tc.BaseCallback.OnJobRunBefore(job)
}

// OnJobRunAfter is used to run the user customized logic of `OnJobRunAfter` first.
func (tc *TestDDLCallback) OnJobRunAfter(job *model.Job) {
logutil.BgLogger().Info("on job run after", zap.String("job", job.String()))
if tc.OnJobRunAfterExported != nil {
tc.OnJobRunAfterExported(job)
return
}

tc.BaseCallback.OnJobRunAfter(job)
}

// OnJobUpdated is used to run the user customized logic of `OnJobUpdated` first.
func (tc *TestDDLCallback) OnJobUpdated(job *model.Job) {
logutil.BgLogger().Info("on job updated", zap.String("job", job.String()))
Expand Down
3 changes: 3 additions & 0 deletions ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,9 @@ func getReorgInfo(ctx *JobContext, d *ddlCtx, rh *reorgHandler, job *model.Job,
if meta.ErrDDLReorgElementNotExist.Equal(err) {
job.SnapshotVer = 0
logutil.BgLogger().Warn("[ddl] get reorg info, the element does not exist", zap.String("job", job.String()))
if job.IsCancelling() {
return nil, nil
}
}
return &info, errors.Trace(err)
}
Expand Down

0 comments on commit c6bd86c

Please sign in to comment.