Skip to content

Commit

Permalink
ddl: wrap some functions for ddl job (#34920)
Browse files Browse the repository at this point in the history
close #34919
  • Loading branch information
xiongjiwei authored May 29, 2022
1 parent dfb22c0 commit 7f023bd
Show file tree
Hide file tree
Showing 22 changed files with 145 additions and 193 deletions.
12 changes: 4 additions & 8 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/summary"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
Expand Down Expand Up @@ -480,20 +481,15 @@ func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, store kv.Storage, lastB
if err != nil {
return errors.Trace(err)
}
allJobs := make([]*model.Job, 0)
defaultJobs, err := snapMeta.GetAllDDLJobsInQueue(meta.DefaultJobListKey)
allJobs, err := ddl.GetAllDDLJobs(snapMeta)
if err != nil {
return errors.Trace(err)
}
log.Debug("get default jobs", zap.Int("jobs", len(defaultJobs)))
allJobs = append(allJobs, defaultJobs...)
addIndexJobs, err := snapMeta.GetAllDDLJobsInQueue(meta.AddIndexJobListKey)
log.Debug("get all jobs", zap.Int("jobs", len(allJobs)))
if err != nil {
return errors.Trace(err)
}
log.Debug("get add index jobs", zap.Int("jobs", len(addIndexJobs)))
allJobs = append(allJobs, addIndexJobs...)
historyJobs, err := snapMeta.GetAllHistoryDDLJobs()
historyJobs, err := ddl.GetAllHistoryDDLJobs(snapMeta)
if err != nil {
return errors.Trace(err)
}
Expand Down
22 changes: 9 additions & 13 deletions ddl/column_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,19 +413,15 @@ func checkHistoryJobArgs(t *testing.T, ctx sessionctx.Context, id int64, args *h
}

func testCheckJobDone(t *testing.T, store kv.Storage, jobID int64, isAdd bool) {
require.NoError(t, kv.RunInNewTxn(context.Background(), store, false, func(ctx context.Context, txn kv.Transaction) error {
m := meta.NewMeta(txn)
historyJob, err := m.GetHistoryDDLJob(jobID)
require.NoError(t, err)
require.Equal(t, historyJob.State, model.JobStateSynced)
if isAdd {
require.Equal(t, historyJob.SchemaState, model.StatePublic)
} else {
require.Equal(t, historyJob.SchemaState, model.StateNone)
}

return nil
}))
sess := testkit.NewTestKit(t, store).Session()
historyJob, err := ddl.GetHistoryJobByID(sess, jobID)
require.NoError(t, err)
require.Equal(t, historyJob.State, model.JobStateSynced)
if isAdd {
require.Equal(t, historyJob.SchemaState, model.StatePublic)
} else {
require.Equal(t, historyJob.SchemaState, model.StateNone)
}
}

func testNewContext(store kv.Storage) sessionctx.Context {
Expand Down
11 changes: 1 addition & 10 deletions ddl/column_type_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,16 @@
package ddl_test

import (
"context"
"errors"
"fmt"
"strconv"
"testing"
"time"

errors2 "github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/terror"
Expand Down Expand Up @@ -1932,13 +1929,7 @@ func TestDDLExitWhenCancelMeetPanic(t *testing.T) {
require.Less(t, int64(0), jobID)

// Verification of the history job state.
var job *model.Job
err = kv.RunInNewTxn(context.Background(), store, false, func(ctx context.Context, txn kv.Transaction) error {
m := meta.NewMeta(txn)
var err1 error
job, err1 = m.GetHistoryDDLJob(jobID)
return errors2.Trace(err1)
})
job, err := ddl.GetHistoryJobByID(tk.Session(), jobID)
require.NoError(t, err)
require.Equal(t, int64(4), job.ErrorCount)
require.Equal(t, "[ddl:-1]panic in handling DDL logic and error count beyond the limitation 3, cancelled", job.Error.Error())
Expand Down
14 changes: 2 additions & 12 deletions ddl/db_change_failpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,16 @@
package ddl_test

import (
"context"
"fmt"
"strconv"
"strings"
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/ddl"
ddlutil "github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/testkit"
Expand Down Expand Up @@ -64,15 +62,7 @@ func TestModifyColumnTypeArgs(t *testing.T) {

id, err := strconv.Atoi(jobID)
require.NoError(t, err)
var historyJob *model.Job
err = kv.RunInNewTxn(context.Background(), store, false, func(ctx context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
historyJob, err = t.GetHistoryDDLJob(int64(id))
if err != nil {
return err
}
return nil
})
historyJob, err := ddl.GetHistoryJobByID(tk.Session(), int64(id))
require.NoError(t, err)
require.NotNil(t, historyJob)

Expand Down
33 changes: 16 additions & 17 deletions ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@ import (
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/testkit/external"
"github.com/pingcap/tidb/util"
Expand Down Expand Up @@ -1363,15 +1365,14 @@ func prepareTestControlParallelExecSQL(t *testing.T, store kv.Storage, dom *doma
}
var qLen int
for {
err := kv.RunInNewTxn(context.Background(), store, false, func(ctx context.Context, txn kv.Transaction) error {
jobs, err1 := ddl.GetDDLJobs(txn)
if err1 != nil {
return err1
}
qLen = len(jobs)
return nil
})
sess := testkit.NewTestKit(t, store).Session()
err := sessiontxn.NewTxn(context.Background(), sess)
require.NoError(t, err)
txn, err := sess.Txn(true)
require.NoError(t, err)
jobs, err := ddl.GetAllDDLJobs(meta.NewMeta(txn))
require.NoError(t, err)
qLen = len(jobs)
if qLen == 2 {
break
}
Expand All @@ -1393,17 +1394,15 @@ func prepareTestControlParallelExecSQL(t *testing.T, store kv.Storage, dom *doma
go func() {
var qLen int
for {
err := kv.RunInNewTxn(context.Background(), store, false, func(ctx context.Context, txn kv.Transaction) error {
jobs, err3 := ddl.GetDDLJobs(txn)
if err3 != nil {
return err3
}
qLen = len(jobs)
return nil
})
sess := testkit.NewTestKit(t, store).Session()
err := sessiontxn.NewTxn(context.Background(), sess)
require.NoError(t, err)
txn, err := sess.Txn(true)
require.NoError(t, err)
jobs, err := ddl.GetAllDDLJobs(meta.NewMeta(txn))
require.NoError(t, err)
qLen = len(jobs)
if qLen == 1 {
// Make sure sql2 is executed after the sql1.
close(ch)
break
}
Expand Down
13 changes: 0 additions & 13 deletions ddl/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1225,19 +1225,6 @@ func backgroundExec(s kv.Storage, sql string, done chan error) {
done <- errors.Trace(err)
}

func getHistoryDDLJob(store kv.Storage, id int64) (*model.Job, error) {
var job *model.Job

err := kv.RunInNewTxn(context.Background(), store, false, func(ctx context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
var err1 error
job, err1 = t.GetHistoryDDLJob(id)
return errors.Trace(err1)
})

return job, errors.Trace(err)
}

func TestCreateTableTooLarge(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
Expand Down
16 changes: 5 additions & 11 deletions ddl/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/pingcap/tidb/errno"
tmysql "github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
Expand Down Expand Up @@ -3022,17 +3021,12 @@ func TestDropSchemaWithPartitionTable(t *testing.T) {
jobID := row.GetInt64(0)

var tableIDs []int64
err = kv.RunInNewTxn(context.Background(), store, false, func(ctx context.Context, txn kv.Transaction) error {
tt := meta.NewMeta(txn)
historyJob, err := tt.GetHistoryDDLJob(jobID)
require.NoError(t, err)
err = historyJob.DecodeArgs(&tableIDs)
require.NoError(t, err)
// There is 2 partitions.
require.Equal(t, 3, len(tableIDs))
return nil
})
historyJob, err := ddl.GetHistoryJobByID(tk.Session(), jobID)
require.NoError(t, err)
err = historyJob.DecodeArgs(&tableIDs)
require.NoError(t, err)
// There is 2 partitions.
require.Equal(t, 3, len(tableIDs))

startTime := time.Now()
done := waitGCDeleteRangeDone(t, tk, tableIDs[2])
Expand Down
2 changes: 1 addition & 1 deletion ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -933,7 +933,7 @@ func TestDDLJobErrorCount(t *testing.T) {

tk.MustGetErrCode("rename table ddl_error_table to new_ddl_error_table", errno.ErrEntryTooLarge)

historyJob, err := getHistoryDDLJob(store, jobID)
historyJob, err := ddl.GetHistoryJobByID(tk.Session(), jobID)
require.NoError(t, err)
require.NotNil(t, historyJob)
require.Equal(t, int64(1), historyJob.ErrorCount)
Expand Down
35 changes: 30 additions & 5 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/table"
pumpcli "github.com/pingcap/tidb/tidb-binlog/pump_client"
Expand Down Expand Up @@ -472,7 +473,7 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error {
d.wg.Add(1)
go d.limitDDLJobs()

d.sessPool = newSessionPool(ctxPool)
d.sessPool = newSessionPool(ctxPool, d.store)

// If RunWorker is true, we need campaign owner and do DDL job.
// Otherwise, we needn't do that.
Expand Down Expand Up @@ -1064,9 +1065,8 @@ func getDDLJobsInQueue(t *meta.Meta, jobListKey meta.JobListKeyType) ([]*model.J
return jobs, nil
}

// GetDDLJobs get all DDL jobs and sorts jobs by job.ID.
func GetDDLJobs(txn kv.Transaction) ([]*model.Job, error) {
t := meta.NewMeta(txn)
// GetAllDDLJobs get all DDL jobs and sorts jobs by job.ID.
func GetAllDDLJobs(t *meta.Meta) ([]*model.Job, error) {
generalJobs, err := getDDLJobsInQueue(t, meta.DefaultJobListKey)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -1134,7 +1134,7 @@ func IterHistoryDDLJobs(txn kv.Transaction, finishFn func([]*model.Job) (bool, e
// IterAllDDLJobs will iterates running DDL jobs first, return directly if `finishFn` return true or error,
// then iterates history DDL jobs until the `finishFn` return true or error.
func IterAllDDLJobs(txn kv.Transaction, finishFn func([]*model.Job) (bool, error)) error {
jobs, err := GetDDLJobs(txn)
jobs, err := GetAllDDLJobs(meta.NewMeta(txn))
if err != nil {
return err
}
Expand All @@ -1145,3 +1145,28 @@ func IterAllDDLJobs(txn kv.Transaction, finishFn func([]*model.Job) (bool, error
}
return IterHistoryDDLJobs(txn, finishFn)
}

// GetAllHistoryDDLJobs get all the done DDL jobs.
func GetAllHistoryDDLJobs(m *meta.Meta) ([]*model.Job, error) {
return m.GetAllHistoryDDLJobs()
}

// GetHistoryJobByID return history DDL job by ID.
func GetHistoryJobByID(sess sessionctx.Context, id int64) (*model.Job, error) {
err := sessiontxn.NewTxn(context.Background(), sess)
if err != nil {
return nil, err
}
txn, err := sess.Txn(true)
if err != nil {
return nil, err
}
t := meta.NewMeta(txn)
job, err := t.GetHistoryDDLJob(id)
return job, errors.Trace(err)
}

// AddHistoryDDLJob adds DDL job to history table.
func AddHistoryDDLJob(t *meta.Meta, job *model.Job, updateRawArgs bool) error {
return t.AddHistoryDDLJob(job, updateRawArgs)
}
13 changes: 5 additions & 8 deletions ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,7 @@ func checkEqualTable(t *testing.T, t1, t2 *model.TableInfo) {
}

func checkHistoryJobArgs(t *testing.T, ctx sessionctx.Context, id int64, args *historyJobArgs) {
txn, err := ctx.Txn(true)
require.NoError(t, err)
tran := meta.NewMeta(txn)
historyJob, err := tran.GetHistoryDDLJob(id)
historyJob, err := GetHistoryJobByID(ctx, id)
require.NoError(t, err)
require.Greater(t, historyJob.BinlogInfo.FinishedTS, uint64(0))

Expand Down Expand Up @@ -755,7 +752,7 @@ func TestGetDDLJobs(t *testing.T) {
err = m.EnQueueDDLJob(jobs[i])
require.NoError(t, err)

currJobs, err := GetDDLJobs(txn)
currJobs, err := GetAllDDLJobs(meta.NewMeta(txn))
require.NoError(t, err)
require.Len(t, currJobs, i+1)

Expand All @@ -774,7 +771,7 @@ func TestGetDDLJobs(t *testing.T) {
require.Len(t, currJobs2, i+1)
}

currJobs, err := GetDDLJobs(txn)
currJobs, err := GetAllDDLJobs(meta.NewMeta(txn))
require.NoError(t, err)

for i, job := range jobs {
Expand Down Expand Up @@ -806,7 +803,7 @@ func TestGetDDLJobsIsSort(t *testing.T) {
m = meta.NewMeta(txn, meta.AddIndexJobListKey)
enQueueDDLJobs(t, m, model.ActionAddIndex, 5, 10)

currJobs, err := GetDDLJobs(txn)
currJobs, err := GetAllDDLJobs(meta.NewMeta(txn))
require.NoError(t, err)
require.Len(t, currJobs, 15)

Expand Down Expand Up @@ -946,7 +943,7 @@ func TestGetHistoryDDLJobs(t *testing.T) {
SchemaID: 1,
Type: model.ActionCreateTable,
}
err = m.AddHistoryDDLJob(jobs[i], true)
err = AddHistoryDDLJob(m, jobs[i], true)
require.NoError(t, err)

historyJobs, err := GetHistoryDDLJobs(txn, DefNumHistoryJobs)
Expand Down
14 changes: 6 additions & 8 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,14 +338,12 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) {

// getHistoryDDLJob gets a DDL job with job's ID from history queue.
func (d *ddl) getHistoryDDLJob(id int64) (*model.Job, error) {
var job *model.Job

err := kv.RunInNewTxn(context.Background(), d.store, false, func(ctx context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
var err1 error
job, err1 = t.GetHistoryDDLJob(id)
return errors.Trace(err1)
})
se, err := d.sessPool.get()
if err != nil {
return nil, errors.Trace(err)
}
defer d.sessPool.put(se)
job, err := GetHistoryJobByID(se, id)

return job, errors.Trace(err)
}
Expand Down
Loading

0 comments on commit 7f023bd

Please sign in to comment.