Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl, executor: executor change for pause/resume on ddl jobs (#43171) #43658

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ go_test(
"mv_index_test.go",
"options_test.go",
"partition_test.go",
"pause_test.go",
"placement_policy_ddl_test.go",
"placement_policy_test.go",
"placement_sql_test.go",
Expand Down
72 changes: 70 additions & 2 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1106,8 +1106,10 @@ func TestCancelJobWriteConflict(t *testing.T) {
stmt := fmt.Sprintf("admin cancel ddl jobs %d", job.ID)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn", `return("no_retry")`))
defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn")) }()
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockCancelConcurencyDDL", `return(true)`))
defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockCancelConcurencyDDL")) }()
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFailedCommandOnConcurencyDDL", `return(true)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFailedCommandOnConcurencyDDL"))
}()
rs, cancelErr = tk2.Session().Execute(context.Background(), stmt)
}
}
Expand All @@ -1131,6 +1133,72 @@ func TestCancelJobWriteConflict(t *testing.T) {
result.Check(testkit.Rows(fmt.Sprintf("%d successful", jobID)))
}

func TestPauseJobWriteConflict(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, dbTestLease)

tk1 := testkit.NewTestKit(t, store)
tk2 := testkit.NewTestKit(t, store)

tk1.MustExec("use test")

tk1.MustExec("create table t(id int)")

var jobID int64
var pauseErr error
var pauseRS []sqlexec.RecordSet
hook := &callback.TestDDLCallback{Do: dom}
d := dom.DDL()
originalHook := d.GetHook()
d.SetHook(hook)
defer d.SetHook(originalHook)

// Test when pause cannot be retried and adding index succeeds.
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.Type == model.ActionAddIndex && job.State == model.JobStateRunning && job.SchemaState == model.StateWriteReorganization {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn", `return("no_retry")`))
defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn")) }()
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFailedCommandOnConcurencyDDL", `return(true)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFailedCommandOnConcurencyDDL"))
}()

jobID = job.ID
stmt := fmt.Sprintf("admin pause ddl jobs %d", jobID)
pauseRS, pauseErr = tk2.Session().Execute(context.Background(), stmt)
}
}
tk1.MustExec("alter table t add index (id)")
require.EqualError(t, pauseErr, "mock commit error")

var cancelRS []sqlexec.RecordSet
var cancelErr error
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.Type == model.ActionAddIndex && job.State == model.JobStateRunning && job.SchemaState == model.StateWriteReorganization {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn", `return("no_retry")`))
defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn")) }()
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFailedCommandOnConcurencyDDL", `return(false)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFailedCommandOnConcurencyDDL"))
}()

jobID = job.ID
stmt := fmt.Sprintf("admin pause ddl jobs %d", jobID)
pauseRS, pauseErr = tk2.Session().Execute(context.Background(), stmt)

time.Sleep(5 * time.Second)
stmt = fmt.Sprintf("admin cancel ddl jobs %d", jobID)
cancelRS, cancelErr = tk2.Session().Execute(context.Background(), stmt)
}
}
tk1.MustGetErrCode("alter table t add index (id)", errno.ErrCancelledDDLJob)
require.NoError(t, pauseErr)
require.NoError(t, cancelErr)
result := tk2.ResultSetToResultWithCtx(context.Background(), pauseRS[0], "pause ddl job successfully")
result.Check(testkit.Rows(fmt.Sprintf("%d successful", jobID)))
result = tk2.ResultSetToResultWithCtx(context.Background(), cancelRS[0], "cancel ddl job successfully")
result.Check(testkit.Rows(fmt.Sprintf("%d successful", jobID)))
}

func TestTxnSavepointWithDDL(t *testing.T) {
store, _ := testkit.CreateMockStoreAndDomainWithSchemaLease(t, dbTestLease)
tk := testkit.NewTestKit(t, store)
Expand Down
259 changes: 206 additions & 53 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1091,7 +1091,7 @@ func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error {
continue
}
sessVars.StmtCtx.DDLJobID = 0 // Avoid repeat.
errs, err := CancelJobs(se, []int64{jobID})
errs, err := CancelJobsBySystem(se, []int64{jobID})
d.sessPool.Put(se)
if len(errs) > 0 {
logutil.BgLogger().Warn("error canceling DDL job", zap.Error(errs[0]))
Expand Down Expand Up @@ -1425,85 +1425,238 @@ func get2JobsFromTable(sess *sess.Session) (*model.Job, *model.Job, error) {
return generalJob, reorgJob, nil
}

// CancelJobs cancels the DDL jobs.
func CancelJobs(se sessionctx.Context, ids []int64) (errs []error, err error) {
return cancelConcurrencyJobs(se, ids)
// cancelRunningJob cancel a DDL job that is in the concurrent state.
func cancelRunningJob(sess *sess.Session, job *model.Job,
byWho model.AdminCommandOperator) (err error) {
// These states can't be cancelled.
if job.IsDone() || job.IsSynced() {
return dbterror.ErrCancelFinishedDDLJob.GenWithStackByArgs(job.ID)
}

// If the state is rolling back, it means the work is cleaning the data after cancelling the job.
if job.IsCancelled() || job.IsRollingback() || job.IsRollbackDone() {
return nil
}

if !job.IsRollbackable() {
return dbterror.ErrCannotCancelDDLJob.GenWithStackByArgs(job.ID)
}
job.State = model.JobStateCancelling
job.AdminOperator = byWho

// Make sure RawArgs isn't overwritten.
return json.Unmarshal(job.RawArgs, &job.Args)
}

// cancelConcurrencyJobs cancels the DDL jobs that are in the concurrent state.
func cancelConcurrencyJobs(se sessionctx.Context, ids []int64) ([]error, error) {
failpoint.Inject("mockCancelConcurencyDDL", func(val failpoint.Value) {
// pauseRunningJob check and pause the running Job
func pauseRunningJob(sess *sess.Session, job *model.Job,
byWho model.AdminCommandOperator) (err error) {
// It would be much better doing this filter during `getJobsBySQL`, but not now.
if !job.IsPausable() {
err = dbterror.ErrCannotPauseDDLJob.GenWithStackByArgs(job.ID)
if err != nil {
return err
}
}

job.State = model.JobStatePausing
job.AdminOperator = byWho

return json.Unmarshal(job.RawArgs, &job.Args)
}

// resumePausedJob check and resume the Paused Job
func resumePausedJob(se *sess.Session, job *model.Job,
byWho model.AdminCommandOperator) (err error) {
if !job.IsResumable() ||
// The Paused job should only be resumed by who paused it
job.AdminOperator != byWho {
return dbterror.ErrCannotResumeDDLJob.GenWithStackByArgs(job.ID)
}

job.State = model.JobStateQueueing

return json.Unmarshal(job.RawArgs, &job.Args)
}

// processJobs command on the Job according to the process
func processJobs(process func(*sess.Session, *model.Job, model.AdminCommandOperator) (err error),
sessCtx sessionctx.Context,
ids []int64,
byWho model.AdminCommandOperator) ([]error, error) {
failpoint.Inject("mockFailedCommandOnConcurencyDDL", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(nil, errors.New("mock commit error"))
}
})

if len(ids) == 0 {
return nil, nil
}
var jobMap = make(map[int64]int) // jobID -> error index

sessCtx := sess.NewSession(se)
err := sessCtx.Begin()
if err != nil {
return nil, err
}
ns := sess.NewSession(sessCtx)
var errs []error

idsStr := make([]string, 0, len(ids))
for idx, id := range ids {
jobMap[id] = idx
idsStr = append(idsStr, strconv.FormatInt(id, 10))
}
// We should process (and try) all the jobs in one Transaction.
for tryN := uint(0); tryN < 10; tryN += 1 {
errs = make([]error, len(ids))
// Need to figure out which one could not be paused
jobMap := make(map[int64]int, len(ids))
idsStr := make([]string, 0, len(ids))
for idx, id := range ids {
jobMap[id] = idx
idsStr = append(idsStr, strconv.FormatInt(id, 10))
}

jobs, err := getJobsBySQL(sessCtx, JobTable, fmt.Sprintf("job_id in (%s) order by job_id", strings.Join(idsStr, ", ")))
if err != nil {
sessCtx.Rollback()
return nil, err
}
err := ns.Begin()
if err != nil {
return nil, err
}
jobs, err := getJobsBySQL(ns, JobTable, fmt.Sprintf("job_id in (%s) order by job_id", strings.Join(idsStr, ", ")))
if err != nil {
ns.Rollback()
return nil, err
}

errs := make([]error, len(ids))
for _, job := range jobs {
i, ok := jobMap[job.ID]
if !ok {
logutil.BgLogger().Debug("Job ID from meta is not consistent with requested job id,",
zap.Int64("fetched job ID", job.ID))
errs[i] = dbterror.ErrInvalidDDLJob.GenWithStackByArgs(job.ID)
continue
}
delete(jobMap, job.ID)

for _, job := range jobs {
i, ok := jobMap[job.ID]
if !ok {
logutil.BgLogger().Debug("the job that needs to be canceled isn't equal to current job",
zap.Int64("need to canceled job ID", job.ID),
zap.Int64("current job ID", job.ID))
continue
err = process(ns, job, byWho)
if err != nil {
errs[i] = err
break
}

err = updateDDLJob2Table(ns, job, true)
if err != nil {
break
}
}
delete(jobMap, job.ID)
// These states can't be cancelled.
if job.IsDone() || job.IsSynced() {
errs[i] = dbterror.ErrCancelFinishedDDLJob.GenWithStackByArgs(job.ID)
// We may meet some error on job update, try it again
if err != nil {
ns.Rollback()
continue
}
// If the state is rolling back, it means the work is cleaning the data after cancelling the job.
if job.IsCancelled() || job.IsRollingback() || job.IsRollbackDone() {

// There may be some conflict during the update, try it again
if ns.Commit() != nil {
continue
}
if !job.IsRollbackable() {
errs[i] = dbterror.ErrCannotCancelDDLJob.GenWithStackByArgs(job.ID)
continue

for id, idx := range jobMap {
errs[idx] = dbterror.ErrDDLJobNotFound.GenWithStackByArgs(id)
}
job.State = model.JobStateCancelling
// Make sure RawArgs isn't overwritten.
err := json.Unmarshal(job.RawArgs, &job.Args)

break
}
return errs, nil
}

// CancelJobs cancels the DDL jobs according to user command.
func CancelJobs(se sessionctx.Context, ids []int64) (errs []error, err error) {
return processJobs(cancelRunningJob, se, ids, model.AdminCommandByEndUser)
}

// PauseJobs pause all the DDL jobs according to user command.
func PauseJobs(se sessionctx.Context, ids []int64) ([]error, error) {
return processJobs(pauseRunningJob, se, ids, model.AdminCommandByEndUser)
}

// ResumeJobs resume all the DDL jobs according to user command.
func ResumeJobs(se sessionctx.Context, ids []int64) ([]error, error) {
return processJobs(resumePausedJob, se, ids, model.AdminCommandByEndUser)
}

// CancelJobsBySystem cancels Jobs because of internal reasons.
func CancelJobsBySystem(se sessionctx.Context, ids []int64) (errs []error, err error) {
return processJobs(cancelRunningJob, se, ids, model.AdminCommandBySystem)
}

// PauseJobsBySystem pauses Jobs because of internal reasons.
func PauseJobsBySystem(se sessionctx.Context, ids []int64) (errs []error, err error) {
return processJobs(pauseRunningJob, se, ids, model.AdminCommandBySystem)
}

// ResumeJobsBySystem resumes Jobs that are paused by TiDB itself.
func ResumeJobsBySystem(se sessionctx.Context, ids []int64) (errs []error, err error) {
return processJobs(resumePausedJob, se, ids, model.AdminCommandBySystem)
}

// pprocessAllJobs processes all the jobs in the job table, 100 jobs at a time in case of high memory usage.
func processAllJobs(process func(*sess.Session, *model.Job, model.AdminCommandOperator) (err error),
se sessionctx.Context, byWho model.AdminCommandOperator) (map[int64]error, error) {
var err error
var jobErrs = make(map[int64]error)

ns := sess.NewSession(se)
err = ns.Begin()
if err != nil {
return nil, err
}

var jobID int64 = 0
var jobIDMax int64 = 0
var limit int = 100
for {
var jobs []*model.Job
jobs, err = getJobsBySQL(ns, JobTable,
fmt.Sprintf("job_id >= %s order by job_id asc limit %s",
strconv.FormatInt(jobID, 10),
strconv.FormatInt(int64(limit), 10)))
if err != nil {
errs[i] = errors.Trace(err)
continue
ns.Rollback()
return nil, err
}
err = updateDDLJob2Table(sessCtx, job, true)
if err != nil {
errs[i] = errors.Trace(err)

for _, job := range jobs {
err = process(ns, job, byWho)
if err != nil {
jobErrs[job.ID] = err
ns.Rollback()
return jobErrs, err
}
err = updateDDLJob2Table(ns, job, true)
if err != nil {
ns.Rollback()
return jobErrs, err
}
}

// Just in case the job ID is not sequential
if jobs[len(jobs)-1].ID > jobIDMax {
jobIDMax = jobs[len(jobs)-1].ID
}

// If rows returned is smaller than $limit, then there is no more records
if len(jobs) < limit {
break
}

jobID = jobIDMax + 1
}
err = sessCtx.Commit()

err = ns.Commit()
if err != nil {
return nil, err
}
for id, idx := range jobMap {
errs[idx] = dbterror.ErrDDLJobNotFound.GenWithStackByArgs(id)
}
return errs, nil
return jobErrs, nil
}

// PauseAllJobsBySystem pauses all running Jobs because of internal reasons.
func PauseAllJobsBySystem(se sessionctx.Context) (map[int64]error, error) {
return processAllJobs(pauseRunningJob, se, model.AdminCommandBySystem)
}

// ResumeAllJobsBySystem resumes all paused Jobs because of internal reasons.
func ResumeAllJobsBySystem(se sessionctx.Context) (map[int64]error, error) {
return processAllJobs(resumePausedJob, se, model.AdminCommandBySystem)
}

// GetAllDDLJobs get all DDL jobs and sorts jobs by job.ID.
Expand Down
Loading