Skip to content

Commit

Permalink
*: add auto_random id cache for statement retrying and table recover (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta committed Mar 16, 2020
1 parent 340d8e2 commit 63d5774
Show file tree
Hide file tree
Showing 14 changed files with 300 additions and 72 deletions.
12 changes: 11 additions & 1 deletion ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ type DDL interface {
CreateView(ctx sessionctx.Context, stmt *ast.CreateViewStmt) error
CreateTableWithLike(ctx sessionctx.Context, ident, referIdent ast.Ident, ifNotExists bool) error
DropTable(ctx sessionctx.Context, tableIdent ast.Ident) (err error)
RecoverTable(ctx sessionctx.Context, tbInfo *model.TableInfo, schemaID, autoID, dropJobID int64, snapshotTS uint64) (err error)
RecoverTable(ctx sessionctx.Context, recoverInfo *RecoverInfo) (err error)
DropView(ctx sessionctx.Context, tableIdent ast.Ident) (err error)
CreateIndex(ctx sessionctx.Context, tableIdent ast.Ident, unique bool, indexName model.CIStr,
columnNames []*ast.IndexColName, indexOption *ast.IndexOption) error
Expand Down Expand Up @@ -659,6 +659,16 @@ func (d *ddl) GetHook() Callback {
return d.mu.hook
}

// RecoverInfo contains information needed by DDL.RecoverTable.
type RecoverInfo struct {
SchemaID int64
TableInfo *model.TableInfo
DropJobID int64
SnapshotTS uint64
CurAutoIncID int64
CurAutoRandID int64
}

// DDL error codes.
const (
codeInvalidWorker terror.ErrCode = 1
Expand Down
6 changes: 4 additions & 2 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1519,8 +1519,9 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e
return errors.Trace(err)
}

func (d *ddl) RecoverTable(ctx sessionctx.Context, tbInfo *model.TableInfo, schemaID, autoID, dropJobID int64, snapshotTS uint64) (err error) {
func (d *ddl) RecoverTable(ctx sessionctx.Context, recoverInfo *RecoverInfo) (err error) {
is := d.GetInfoSchemaWithInterceptor(ctx)
schemaID, tbInfo := recoverInfo.SchemaID, recoverInfo.TableInfo
// Check schema exist.
schema, ok := is.SchemaByID(schemaID)
if !ok {
Expand All @@ -1539,7 +1540,8 @@ func (d *ddl) RecoverTable(ctx sessionctx.Context, tbInfo *model.TableInfo, sche
TableID: tbInfo.ID,
Type: model.ActionRecoverTable,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{tbInfo, autoID, dropJobID, snapshotTS, recoverTableCheckFlagNone},
Args: []interface{}{tbInfo, recoverInfo.CurAutoIncID, recoverInfo.DropJobID,
recoverInfo.SnapshotTS, recoverTableCheckFlagNone, recoverInfo.CurAutoRandID},
}
err = d.doDDLJob(ctx, job)
err = d.callHookOnChanged(err)
Expand Down
4 changes: 2 additions & 2 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,9 +318,9 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {

func finishRecoverTable(w *worker, t *meta.Meta, job *model.Job) error {
tbInfo := &model.TableInfo{}
var autoID, dropJobID, recoverTableCheckFlag int64
var autoIncID, autoRandID, dropJobID, recoverTableCheckFlag int64
var snapshotTS uint64
err := job.DecodeArgs(tbInfo, &autoID, &dropJobID, &snapshotTS, &recoverTableCheckFlag)
err := job.DecodeArgs(tbInfo, &autoIncID, &dropJobID, &snapshotTS, &recoverTableCheckFlag, &autoRandID)
if err != nil {
return errors.Trace(err)
}
Expand Down
11 changes: 6 additions & 5 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,10 @@ const (
func (w *worker) onRecoverTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
schemaID := job.SchemaID
tblInfo := &model.TableInfo{}
var autoID, dropJobID, recoverTableCheckFlag int64
var autoIncID, autoRandID, dropJobID, recoverTableCheckFlag int64
var snapshotTS uint64
if err = job.DecodeArgs(tblInfo, &autoID, &dropJobID, &snapshotTS, &recoverTableCheckFlag); err != nil {
const checkFlagIndexInJobArgs = 4 // The index of `recoverTableCheckFlag` in job arg list.
if err = job.DecodeArgs(tblInfo, &autoIncID, &dropJobID, &snapshotTS, &recoverTableCheckFlag, &autoRandID); err != nil {
// Invalid arguments, cancel this job.
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
Expand Down Expand Up @@ -236,9 +237,9 @@ func (w *worker) onRecoverTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in
// none -> write only
// check GC enable and update flag.
if gcEnable {
job.Args[len(job.Args)-1] = recoverTableCheckFlagEnableGC
job.Args[checkFlagIndexInJobArgs] = recoverTableCheckFlagEnableGC
} else {
job.Args[len(job.Args)-1] = recoverTableCheckFlagDisableGC
job.Args[checkFlagIndexInJobArgs] = recoverTableCheckFlagDisableGC
}

job.SchemaState = model.StateWriteOnly
Expand Down Expand Up @@ -271,7 +272,7 @@ func (w *worker) onRecoverTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in

tblInfo.State = model.StatePublic
tblInfo.UpdateTS = t.StartTS
err = t.CreateTableAndSetAutoID(schemaID, tblInfo, autoID)
err = t.CreateTableAndSetAutoID(schemaID, tblInfo, autoIncID, autoRandID)
if err != nil {
return ver, errors.Trace(err)
}
Expand Down
24 changes: 24 additions & 0 deletions ddl/testutil/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@ import (

"github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
)

// SessionExecInGoroutine export for testing.
Expand Down Expand Up @@ -55,3 +59,23 @@ func ExecMultiSQLInGoroutine(c *check.C, s kv.Storage, dbName string, multiSQL [
}
}()
}

// ExtractAllTableHandles extracts all handles of a given table.
func ExtractAllTableHandles(se session.Session, dbName, tbName string) ([]int64, error) {
dom := domain.GetDomain(se)
tbl, err := dom.InfoSchema().TableByName(model.NewCIStr(dbName), model.NewCIStr(tbName))
if err != nil {
return nil, err
}
err = se.NewTxn(context.Background())
if err != nil {
return nil, err
}
var allHandles []int64
err = tbl.IterRecords(se, tbl.FirstKey(), nil,
func(h int64, _ []types.Datum, _ []*table.Column) (more bool, err error) {
allHandles = append(allHandles, h)
return true, nil
})
return allHandles, err
}
37 changes: 30 additions & 7 deletions executor/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/meta"
Expand Down Expand Up @@ -332,20 +333,42 @@ func (e *DDLExec) executeRecoverTable(s *ast.RecoverTableStmt) error {
if err != nil {
return err
}
// Get table original autoID before table drop.
m, err := dom.GetSnapshotMeta(job.StartTS)
autoIncID, autoRandID, err := e.getTableAutoIDsFromSnapshot(job)
if err != nil {
return err
}
autoID, err := m.GetAutoTableID(job.SchemaID, job.TableID)
if err != nil {
return errors.Errorf("recover table_id: %d, get original autoID from snapshot meta err: %s", job.TableID, err.Error())

recoverInfo := &ddl.RecoverInfo{
SchemaID: job.SchemaID,
TableInfo: tblInfo,
DropJobID: job.ID,
SnapshotTS: job.StartTS,
CurAutoIncID: autoIncID,
CurAutoRandID: autoRandID,
}
// Call DDL RecoverTable
err = domain.GetDomain(e.ctx).DDL().RecoverTable(e.ctx, tblInfo, job.SchemaID, autoID, job.ID, job.StartTS)
// Call DDL RecoverTable.
err = domain.GetDomain(e.ctx).DDL().RecoverTable(e.ctx, recoverInfo)
return err
}

func (e *DDLExec) getTableAutoIDsFromSnapshot(job *model.Job) (autoIncID, autoRandID int64, err error) {
// Get table original autoIDs before table drop.
dom := domain.GetDomain(e.ctx)
m, err := dom.GetSnapshotMeta(job.StartTS)
if err != nil {
return 0, 0, err
}
autoIncID, err = m.GetAutoTableID(job.SchemaID, job.TableID)
if err != nil {
return 0, 0, errors.Errorf("recover table_id: %d, get original autoIncID from snapshot meta err: %s", job.TableID, err.Error())
}
autoRandID, err = m.GetAutoRandomID(job.SchemaID, job.TableID)
if err != nil {
return 0, 0, errors.Errorf("recover table_id: %d, get original autoRandID from snapshot meta err: %s", job.TableID, err.Error())
}
return autoIncID, autoRandID, nil
}

func (e *DDLExec) getRecoverTableByJobID(s *ast.RecoverTableStmt, t *meta.Meta, dom *domain.Domain) (*model.Job, *model.TableInfo, error) {
job, err := t.GetHistoryDDLJob(s.JobID)
if err != nil {
Expand Down
19 changes: 3 additions & 16 deletions executor/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"context"
"fmt"
"math"
"sort"
"strconv"
"strings"
"time"
Expand All @@ -28,6 +27,7 @@ import (
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/ddl"
ddltestutil "github.com/pingcap/tidb/ddl/testutil"
ddlutil "github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/infoschema"
Expand Down Expand Up @@ -761,16 +761,7 @@ func (s *testAutoRandomSuite) TestAutoRandomBitsData(c *C) {
for i := 0; i < 100; i++ {
tk.MustExec("insert into t(b) values (?)", i)
}
dom := domain.GetDomain(tk.Se)
tbl, err := dom.InfoSchema().TableByName(model.NewCIStr("test_auto_random_bits"), model.NewCIStr("t"))
c.Assert(err, IsNil)
c.Assert(tk.Se.NewTxn(context.Background()), IsNil)
var allHandles []int64
// Iterate all the record. The order is not guaranteed.
err = tbl.IterRecords(tk.Se, tbl.FirstKey(), nil, func(h int64, _ []types.Datum, _ []*table.Column) (more bool, err error) {
allHandles = append(allHandles, h)
return true, nil
})
allHandles, err := ddltestutil.ExtractAllTableHandles(tk.Se, "test_auto_random_bits", "t")
c.Assert(err, IsNil)
tk.MustExec("drop table t")

Expand All @@ -783,11 +774,7 @@ func (s *testAutoRandomSuite) TestAutoRandomBitsData(c *C) {
}
c.Assert(allZero, IsFalse)
// Test non-shard-bits part of auto random id is monotonic increasing and continuous.
orderedHandles := make([]int64, len(allHandles))
for i, h := range allHandles {
orderedHandles[i] = h << 16 >> 16
}
sort.Slice(orderedHandles, func(i, j int) bool { return orderedHandles[i] < orderedHandles[j] })
orderedHandles := testutil.ConfigTestUtils.MaskSortHandles(allHandles, 15, mysql.TypeLonglong)
size := int64(len(allHandles))
for i := int64(1); i <= size; i++ {
c.Assert(i, Equals, orderedHandles[i-1])
Expand Down
12 changes: 12 additions & 0 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,16 @@ func getAutoRecordID(d types.Datum, target *types.FieldType, isInsert bool) (int
}

func (e *InsertValues) adjustAutoRandomDatum(ctx context.Context, d types.Datum, hasValue bool, c *table.Column) (types.Datum, error) {
retryInfo := e.ctx.GetSessionVars().RetryInfo
if retryInfo.Retrying {
autoRandomID, err := retryInfo.GetCurrAutoRandomID()
if err != nil {
return types.Datum{}, err
}
d.SetAutoID(autoRandomID, c.Flag)
return d, nil
}

var err error
var recordID int64
if !hasValue {
Expand All @@ -736,6 +746,7 @@ func (e *InsertValues) adjustAutoRandomDatum(ctx context.Context, d types.Datum,
}
e.ctx.GetSessionVars().StmtCtx.InsertID = uint64(recordID)
d.SetAutoID(recordID, c.Flag)
retryInfo.AddAutoRandomID(recordID)
return d, nil
}

Expand All @@ -758,6 +769,7 @@ func (e *InsertValues) adjustAutoRandomDatum(ctx context.Context, d types.Datum,
}

d.SetAutoID(recordID, c.Flag)
retryInfo.AddAutoRandomID(recordID)

casted, err := table.CastValue(e.ctx, d, c.ToInfo())
if err != nil {
Expand Down
Loading

0 comments on commit 63d5774

Please sign in to comment.