diff --git a/ddl/backfilling.go b/ddl/backfilling.go index c6bb38297402d..b547937660131 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -109,6 +109,11 @@ type BackfillJob struct { Meta *model.BackfillMeta } +// PrefixKeyString returns the BackfillJob's prefix key. +func (bj *BackfillJob) PrefixKeyString() string { + return fmt.Sprintf("%d_%s_%d_%%", bj.JobID, hex.EncodeToString(bj.EleKey), bj.EleID) +} + // AbbrStr returns the BackfillJob's info without the Meta info. func (bj *BackfillJob) AbbrStr() string { return fmt.Sprintf("ID:%d, JobID:%d, EleID:%d, Type:%s, State:%s, InstanceID:%s, InstanceLease:%s", diff --git a/ddl/dist_owner.go b/ddl/dist_owner.go index 43a6dff87bc97..563b7fcd88fe7 100644 --- a/ddl/dist_owner.go +++ b/ddl/dist_owner.go @@ -22,6 +22,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/metrics" @@ -323,7 +324,14 @@ func checkReorgJobFinished(ctx context.Context, sess *session, reorgCtxs *reorgC ticker := time.NewTicker(CheckBackfillJobFinishInterval) defer ticker.Stop() for { + failpoint.Inject("MockCanceledErr", func() { + getReorgCtx(reorgCtxs, ddlJobID).notifyReorgCancel() + }) if getReorgCtx(reorgCtxs, ddlJobID).isReorgCanceled() { + err := cleanupBackfillJobs(sess, fmt.Sprintf("%d_%s_%d_%%", ddlJobID, hex.EncodeToString(currEle.TypeKey), currEle.ID)) + if err != nil { + return err + } // Job is cancelled. So it can't be done. return dbterror.ErrCancelledDDLJob } @@ -366,6 +374,10 @@ func checkReorgJobFinished(ctx context.Context, sess *session, reorgCtxs *reorgC } } case <-ctx.Done(): + err := cleanupBackfillJobs(sess, fmt.Sprintf("%d_%s_%d_%%", ddlJobID, hex.EncodeToString(currEle.TypeKey), currEle.ID)) + if err != nil { + return err + } return ctx.Err() } } @@ -428,15 +440,20 @@ func checkAndHandleInterruptedBackfillJobs(sess *session, ddlJobID, currEleID in return nil } + return cleanupBackfillJobs(sess, bJobs[0].PrefixKeyString()) +} + +func cleanupBackfillJobs(sess *session, prefixKey string) error { + var err error for i := 0; i < retrySQLTimes; i++ { - err = MoveBackfillJobsToHistoryTable(sess, bJobs[0]) + err = MoveBackfillJobsToHistoryTable(sess, prefixKey) if err == nil { - return bJobs[0].Meta.Error + return nil } logutil.BgLogger().Info("[ddl] MoveBackfillJobsToHistoryTable failed", zap.Error(err)) time.Sleep(RetrySQLInterval) } - return errors.Trace(err) + return err } func checkBackfillJobCount(sess *session, ddlJobID, currEleID int64, currEleKey []byte, pTblID int64) (backfillJobCnt int, err error) { @@ -496,7 +513,7 @@ func GetPhysicalTableMetas(sess *session, ddlJobID, currEleID int64, currEleKey } // MoveBackfillJobsToHistoryTable moves backfill table jobs to the backfill history table. -func MoveBackfillJobsToHistoryTable(sctx sessionctx.Context, bfJob *BackfillJob) error { +func MoveBackfillJobsToHistoryTable(sctx sessionctx.Context, prefixKey string) error { s, ok := sctx.(*session) if !ok { return errors.Errorf("sess ctx:%#v convert session failed", sctx) @@ -504,8 +521,7 @@ func MoveBackfillJobsToHistoryTable(sctx sessionctx.Context, bfJob *BackfillJob) return s.runInTxn(func(se *session) error { // TODO: Consider batch by batch update backfill jobs and insert backfill history jobs. - bJobs, err := GetBackfillJobs(se, BackgroundSubtaskTable, fmt.Sprintf("task_key like \"%d_%s_%d_%%\"", - bfJob.JobID, hex.EncodeToString(bfJob.EleKey), bfJob.EleID), "update_backfill_job") + bJobs, err := GetBackfillJobs(se, BackgroundSubtaskTable, fmt.Sprintf("task_key like '%s'", prefixKey), "update_backfill_job") if err != nil { return errors.Trace(err) } diff --git a/ddl/failtest/BUILD.bazel b/ddl/failtest/BUILD.bazel index 4b7980412adc0..5cc48f0752bb4 100644 --- a/ddl/failtest/BUILD.bazel +++ b/ddl/failtest/BUILD.bazel @@ -15,6 +15,7 @@ go_test( "//ddl/testutil", "//ddl/util", "//domain", + "//errno", "//kv", "//parser/model", "//session", diff --git a/ddl/failtest/fail_db_test.go b/ddl/failtest/fail_db_test.go index 6f33f6b2107dd..821683ea44417 100644 --- a/ddl/failtest/fail_db_test.go +++ b/ddl/failtest/fail_db_test.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tidb/ddl/testutil" ddlutil "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/domain" + "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/session" @@ -220,6 +221,43 @@ func TestAddIndexFailed(t *testing.T) { tk.MustExec("admin check table t") } +func TestAddIndexCanceledInDistReorg(t *testing.T) { + if !variable.DDLEnableDistributeReorg.Load() { + // Non-dist-reorg hasn't this fail-point. + return + } + s := createFailDBSuite(t) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/MockCanceledErr", `1*return`)) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/MockCanceledErr")) + }() + tk := testkit.NewTestKit(t, s.store) + tk.MustExec("create database if not exists test_add_index_cancel") + defer tk.MustExec("drop database test_add_index_cancel") + tk.MustExec("use test_add_index_cancel") + + tk.MustExec("create table t(a bigint PRIMARY KEY, b int)") + for i := 0; i < 1000; i++ { + tk.MustExec(fmt.Sprintf("insert into t values(%v, %v)", i, i)) + } + + // Get table ID for split. + dom := domain.GetDomain(tk.Session()) + is := dom.InfoSchema() + tbl, err := is.TableByName(model.NewCIStr("test_add_index_cancel"), model.NewCIStr("t")) + require.NoError(t, err) + tblID := tbl.Meta().ID + + // Split the table. + tableStart := tablecodec.GenTableRecordPrefix(tblID) + s.cluster.SplitKeys(tableStart, tableStart.PrefixNext(), 100) + + tk.MustGetErrCode("alter table t add index idx_b(b)", errno.ErrCancelledDDLJob) + tk.MustQuery(fmt.Sprintf("select count(1) from mysql.%s", ddl.BackgroundSubtaskTable)).Check(testkit.Rows("0")) + tk.MustQuery(fmt.Sprintf("select count(1) from mysql.%s", ddl.BackgroundSubtaskHistoryTable)).Check(testkit.Rows("100")) + tk.MustExec("admin check table t") +} + // TestFailSchemaSyncer test when the schema syncer is done, // should prohibit DML executing until the syncer is restartd by loadSchemaInLoop. func TestFailSchemaSyncer(t *testing.T) { diff --git a/ddl/job_table_test.go b/ddl/job_table_test.go index 2f7a180f7b9ca..ed0c241768fe3 100644 --- a/ddl/job_table_test.go +++ b/ddl/job_table_test.go @@ -657,7 +657,7 @@ func TestSimpleExecBackfillJobs(t *testing.T) { allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackgroundSubtaskTable, getIdxConditionStr(jobID2, eleID3), "test_get_bj") require.NoError(t, err) require.Equal(t, allCnt, 2) - err = ddl.MoveBackfillJobsToHistoryTable(se, bJobs3[0]) + err = ddl.MoveBackfillJobsToHistoryTable(se, bJobs3[0].PrefixKeyString()) require.NoError(t, err) allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackgroundSubtaskTable, getIdxConditionStr(jobID2, eleID3), "test_get_bj") require.NoError(t, err) @@ -690,7 +690,7 @@ func TestSimpleExecBackfillJobs(t *testing.T) { allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackgroundSubtaskTable, getIdxConditionStr(jobID1, eleID1), "test_get_bj") require.NoError(t, err) require.Equal(t, allCnt, 6) - err = ddl.MoveBackfillJobsToHistoryTable(se, bJobs1[0]) + err = ddl.MoveBackfillJobsToHistoryTable(se, bJobs1[0].PrefixKeyString()) require.NoError(t, err) allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackgroundSubtaskTable, getIdxConditionStr(jobID1, eleID1), "test_get_bj") require.NoError(t, err)