From 6e6db1fd356e872f71d724467e9ec02f7c6979d7 Mon Sep 17 00:00:00 2001 From: qupeng Date: Fri, 24 Dec 2021 12:55:47 +0800 Subject: [PATCH] binlog: allow multiple ddl targets (#30904) --- ddl/table.go | 7 +++-- ddl/table_test.go | 67 +++++++++++++++++++++++++++++++++++++++++++++ parser/model/ddl.go | 14 ++++++++++ 3 files changed, 85 insertions(+), 3 deletions(-) diff --git a/ddl/table.go b/ddl/table.go index 625b4f39df759..a60e5d9a76b20 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -798,21 +798,22 @@ func onRenameTables(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error return ver, errors.Trace(err) } - tblInfo := &model.TableInfo{} + var tblInfos = make([]*model.TableInfo, 0, len(tableNames)) var err error for i, oldSchemaID := range oldSchemaIDs { job.TableID = tableIDs[i] - ver, tblInfo, err = checkAndRenameTables(t, job, oldSchemaID, newSchemaIDs[i], oldSchemaNames[i], tableNames[i]) + ver, tblInfo, err := checkAndRenameTables(t, job, oldSchemaID, newSchemaIDs[i], oldSchemaNames[i], tableNames[i]) if err != nil { return ver, errors.Trace(err) } + tblInfos = append(tblInfos, tblInfo) } ver, err = updateSchemaVersion(t, job) if err != nil { return ver, errors.Trace(err) } - job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo) + job.FinishMultipleTableJob(model.JobStateDone, model.StatePublic, ver, tblInfos) return ver, nil } diff --git a/ddl/table_test.go b/ddl/table_test.go index f366c090b1686..78be85a8dbd97 100644 --- a/ddl/table_test.go +++ b/ddl/table_test.go @@ -16,6 +16,7 @@ package ddl import ( "context" + "fmt" "testing" "github.com/pingcap/errors" @@ -66,6 +67,24 @@ func testRenameTable(t *testing.T, ctx sessionctx.Context, d *ddl, newSchemaID, return job } +func testRenameTables( + t *testing.T, ctx sessionctx.Context, d *ddl, + oldSchemaIDs, newSchemaIDs []int64, newTableNames []*model.CIStr, + oldTableIDs []int64, oldSchemaNames []*model.CIStr, +) *model.Job { + job := &model.Job{ + Type: model.ActionRenameTables, + BinlogInfo: &model.HistoryInfo{}, + Args: []interface{}{oldSchemaIDs, newSchemaIDs, newTableNames, oldTableIDs, oldSchemaNames}, + } + err := d.doDDLJob(ctx, job) + require.NoError(t, err) + + v := getSchemaVerT(t, ctx) + checkHistoryJobArgsT(t, ctx, job.ID, &historyJobArgs{ver: v, tbl: nil}) + return job +} + func testLockTable(t *testing.T, ctx sessionctx.Context, d *ddl, newSchemaID int64, tblInfo *model.TableInfo, lockTp model.TableLockType) *model.Job { arg := &lockTablesArg{ LockTables: []model.TableLockTpInfo{{SchemaID: newSchemaID, TableID: tblInfo.ID, Tp: lockTp}}, @@ -326,3 +345,51 @@ func testAlterNoCacheTable(t *testing.T, ctx sessionctx.Context, d *ddl, newSche checkHistoryJobArgsT(t, ctx, job.ID, &historyJobArgs{ver: v}) return job } + +func TestRenameTables(t *testing.T) { + store, err := mockstore.NewMockStore() + require.NoError(t, err) + ddl, err := testNewDDLAndStart( + context.Background(), + WithStore(store), + WithLease(testLease), + ) + require.NoError(t, err) + + dbInfo, err := testSchemaInfo(ddl, "test_table") + require.NoError(t, err) + testCreateSchemaT(t, testNewContext(ddl), ddl, dbInfo) + + ctx := testNewContext(ddl) + var tblInfos = make([]*model.TableInfo, 0, 2) + var newTblInfos = make([]*model.TableInfo, 0, 2) + for i := 1; i < 3; i++ { + tableName := fmt.Sprintf("t%d", i) + tblInfo, err := testTableInfo(ddl, tableName, 3) + require.NoError(t, err) + job := testCreateTableT(t, ctx, ddl, dbInfo, tblInfo) + testCheckTableStateT(t, ddl, dbInfo, tblInfo, model.StatePublic) + testCheckJobDoneT(t, ddl, job, true) + tblInfos = append(tblInfos, tblInfo) + + newTableName := fmt.Sprintf("tt%d", i) + tblInfo, err = testTableInfo(ddl, newTableName, 3) + require.NoError(t, err) + newTblInfos = append(newTblInfos, tblInfo) + } + + job := testRenameTables( + t, ctx, ddl, + []int64{dbInfo.ID, dbInfo.ID}, + []int64{dbInfo.ID, dbInfo.ID}, + []*model.CIStr{&newTblInfos[0].Name, &newTblInfos[1].Name}, + []int64{tblInfos[0].ID, tblInfos[1].ID}, + []*model.CIStr{&dbInfo.Name, &dbInfo.Name}, + ) + + txn, _ := ctx.Txn(true) + historyJob, _ := meta.NewMeta(txn).GetHistoryDDLJob(job.ID) + wantTblInfos := historyJob.BinlogInfo.MultipleTableInfos + require.Equal(t, wantTblInfos[0].Name.L, "tt1") + require.Equal(t, wantTblInfos[1].Name.L, "tt2") +} diff --git a/parser/model/ddl.go b/parser/model/ddl.go index c61372b55e263..9716cea38cd23 100644 --- a/parser/model/ddl.go +++ b/parser/model/ddl.go @@ -175,6 +175,9 @@ type HistoryInfo struct { DBInfo *DBInfo TableInfo *TableInfo FinishedTS uint64 + + // MultipleTableInfos is like TableInfo but only for operations updating multiple tables. + MultipleTableInfos []*TableInfo } // AddDBInfo adds schema version and schema information that are used for binlog. @@ -196,6 +199,7 @@ func (h *HistoryInfo) Clean() { h.SchemaVersion = 0 h.DBInfo = nil h.TableInfo = nil + h.MultipleTableInfos = nil } // DDLReorgMeta is meta info of DDL reorganization. @@ -279,6 +283,16 @@ func (job *Job) FinishTableJob(jobState JobState, schemaState SchemaState, ver i job.BinlogInfo.AddTableInfo(ver, tblInfo) } +// FinishMultipleTableJob is called when a job is finished. +// It updates the job's state information and adds tblInfos to the binlog. +func (job *Job) FinishMultipleTableJob(jobState JobState, schemaState SchemaState, ver int64, tblInfos []*TableInfo) { + job.State = jobState + job.SchemaState = schemaState + job.BinlogInfo.SchemaVersion = ver + job.BinlogInfo.MultipleTableInfos = tblInfos + job.BinlogInfo.TableInfo = tblInfos[len(tblInfos)-1] +} + // FinishDBJob is called when a job is finished. // It updates the job's state information and adds dbInfo the binlog. func (job *Job) FinishDBJob(jobState JobState, schemaState SchemaState, ver int64, dbInfo *DBInfo) {