Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add auto_random id cache for statement retrying and table recover #14711

Merged
merged 13 commits into from
Feb 19, 2020
Merged
12 changes: 11 additions & 1 deletion ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ type DDL interface {
CreateView(ctx sessionctx.Context, stmt *ast.CreateViewStmt) error
CreateTableWithLike(ctx sessionctx.Context, ident, referIdent ast.Ident, ifNotExists bool) error
DropTable(ctx sessionctx.Context, tableIdent ast.Ident) (err error)
RecoverTable(ctx sessionctx.Context, tbInfo *model.TableInfo, schemaID, autoID, dropJobID int64, snapshotTS uint64) (err error)
RecoverTable(ctx sessionctx.Context, recoverInfo *RecoverInfo) (err error)
DropView(ctx sessionctx.Context, tableIdent ast.Ident) (err error)
CreateIndex(ctx sessionctx.Context, tableIdent ast.Ident, keyType ast.IndexKeyType, indexName model.CIStr,
columnNames []*ast.IndexPartSpecification, indexOption *ast.IndexOption, ifNotExists bool) error
Expand Down Expand Up @@ -665,3 +665,13 @@ func (d *ddl) GetHook() Callback {

return d.mu.hook
}

// RecoverInfo contains information needed by DDL.RecoverTable.
type RecoverInfo struct {
SchemaID int64
TableInfo *model.TableInfo
DropJobID int64
SnapshotTS uint64
CurAutoIncID int64
CurAutoRandID int64
}
AilinKid marked this conversation as resolved.
Show resolved Hide resolved
6 changes: 4 additions & 2 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1540,8 +1540,9 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e
return errors.Trace(err)
}

func (d *ddl) RecoverTable(ctx sessionctx.Context, tbInfo *model.TableInfo, schemaID, autoID, dropJobID int64, snapshotTS uint64) (err error) {
func (d *ddl) RecoverTable(ctx sessionctx.Context, recoverInfo *RecoverInfo) (err error) {
is := d.GetInfoSchemaWithInterceptor(ctx)
schemaID, tbInfo := recoverInfo.SchemaID, recoverInfo.TableInfo
// Check schema exist.
schema, ok := is.SchemaByID(schemaID)
if !ok {
Expand All @@ -1561,7 +1562,8 @@ func (d *ddl) RecoverTable(ctx sessionctx.Context, tbInfo *model.TableInfo, sche
SchemaName: schema.Name.L,
Type: model.ActionRecoverTable,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{tbInfo, autoID, dropJobID, snapshotTS, recoverTableCheckFlagNone},
Args: []interface{}{tbInfo, recoverInfo.CurAutoIncID, recoverInfo.DropJobID,
recoverInfo.SnapshotTS, recoverTableCheckFlagNone, recoverInfo.CurAutoRandID},
}
err = d.doDDLJob(ctx, job)
err = d.callHookOnChanged(err)
Expand Down
4 changes: 2 additions & 2 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,9 +319,9 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {

func finishRecoverTable(w *worker, t *meta.Meta, job *model.Job) error {
tbInfo := &model.TableInfo{}
var autoID, dropJobID, recoverTableCheckFlag int64
var autoID, autoRandID, dropJobID, recoverTableCheckFlag int64
tangenta marked this conversation as resolved.
Show resolved Hide resolved
var snapshotTS uint64
err := job.DecodeArgs(tbInfo, &autoID, &dropJobID, &snapshotTS, &recoverTableCheckFlag)
err := job.DecodeArgs(tbInfo, &autoID, &dropJobID, &snapshotTS, &recoverTableCheckFlag, &autoRandID)
if err != nil {
return errors.Trace(err)
}
Expand Down
11 changes: 6 additions & 5 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,10 @@ const (
func (w *worker) onRecoverTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
schemaID := job.SchemaID
tblInfo := &model.TableInfo{}
var autoID, dropJobID, recoverTableCheckFlag int64
var autoIncID, autoRandID, dropJobID, recoverTableCheckFlag int64
var snapshotTS uint64
if err = job.DecodeArgs(tblInfo, &autoID, &dropJobID, &snapshotTS, &recoverTableCheckFlag); err != nil {
const argsCheckFlagIndex = 4
tangenta marked this conversation as resolved.
Show resolved Hide resolved
if err = job.DecodeArgs(tblInfo, &autoIncID, &dropJobID, &snapshotTS, &recoverTableCheckFlag, &autoRandID); err != nil {
// Invalid arguments, cancel this job.
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
Expand Down Expand Up @@ -251,9 +252,9 @@ func (w *worker) onRecoverTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in
// none -> write only
// check GC enable and update flag.
if gcEnable {
job.Args[len(job.Args)-1] = recoverTableCheckFlagEnableGC
job.Args[argsCheckFlagIndex] = recoverTableCheckFlagEnableGC
} else {
job.Args[len(job.Args)-1] = recoverTableCheckFlagDisableGC
job.Args[argsCheckFlagIndex] = recoverTableCheckFlagDisableGC
}

job.SchemaState = model.StateWriteOnly
Expand Down Expand Up @@ -292,7 +293,7 @@ func (w *worker) onRecoverTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in

tblInfo.State = model.StatePublic
tblInfo.UpdateTS = t.StartTS
err = t.CreateTableAndSetAutoID(schemaID, tblInfo, autoID)
err = t.CreateTableAndSetAutoID(schemaID, tblInfo, autoIncID, autoRandID)
if err != nil {
return ver, errors.Trace(err)
}
Expand Down
40 changes: 31 additions & 9 deletions executor/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/meta"
Expand Down Expand Up @@ -372,27 +373,40 @@ func (e *DDLExec) executeRecoverTable(s *ast.RecoverTableStmt) error {
if err != nil {
return err
}
autoID, err := e.getTableAutoIDFromSnapshot(job)
autoIncID, autoRandID, err := e.getTableAutoIDsFromSnapshot(job)
if err != nil {
return err
}

recoverInfo := &ddl.RecoverInfo{
SchemaID: job.SchemaID,
TableInfo: tblInfo,
DropJobID: job.ID,
SnapshotTS: job.StartTS,
CurAutoIncID: autoIncID,
CurAutoRandID: autoRandID,
}
// Call DDL RecoverTable.
err = domain.GetDomain(e.ctx).DDL().RecoverTable(e.ctx, tblInfo, job.SchemaID, autoID, job.ID, job.StartTS)
err = domain.GetDomain(e.ctx).DDL().RecoverTable(e.ctx, recoverInfo)
return err
}

func (e *DDLExec) getTableAutoIDFromSnapshot(job *model.Job) (int64, error) {
func (e *DDLExec) getTableAutoIDsFromSnapshot(job *model.Job) (autoIncID, autoRandID int64, err error) {
// Get table original autoID before table drop.
reafans marked this conversation as resolved.
Show resolved Hide resolved
dom := domain.GetDomain(e.ctx)
m, err := dom.GetSnapshotMeta(job.StartTS)
if err != nil {
return 0, err
return 0, 0, err
}
autoID, err := m.GetAutoTableID(job.SchemaID, job.TableID)
autoIncID, err = m.GetAutoTableID(job.SchemaID, job.TableID)
if err != nil {
return 0, errors.Errorf("recover table_id: %d, get original autoID from snapshot meta err: %s", job.TableID, err.Error())
return 0, 0, errors.Errorf("recover table_id: %d, get original autoIncID from snapshot meta err: %s", job.TableID, err.Error())
}
return autoID, nil
autoRandID, err = m.GetAutoRandomID(job.SchemaID, job.TableID)
if err != nil {
return 0, 0, errors.Errorf("recover table_id: %d, get original autoRandID from snapshot meta err: %s", job.TableID, err.Error())
}
return autoIncID, autoRandID, nil
}

func (e *DDLExec) getRecoverTableByJobID(s *ast.RecoverTableStmt, t *meta.Meta, dom *domain.Domain) (*model.Job, *model.TableInfo, error) {
Expand Down Expand Up @@ -511,12 +525,20 @@ func (e *DDLExec) executeFlashbackTable(s *ast.FlashBackTableStmt) error {
return infoschema.ErrTableExists.GenWithStackByArgs("tableID:" + strconv.FormatInt(tblInfo.ID, 10))
}

autoID, err := e.getTableAutoIDFromSnapshot(job)
autoIncID, autoRandID, err := e.getTableAutoIDsFromSnapshot(job)
if err != nil {
return err
}
recoverInfo := &ddl.RecoverInfo{
SchemaID: job.SchemaID,
TableInfo: tblInfo,
DropJobID: job.ID,
SnapshotTS: job.StartTS,
CurAutoIncID: autoIncID,
CurAutoRandID: autoRandID,
}
// Call DDL RecoverTable.
err = domain.GetDomain(e.ctx).DDL().RecoverTable(e.ctx, tblInfo, job.SchemaID, autoID, job.ID, job.StartTS)
err = domain.GetDomain(e.ctx).DDL().RecoverTable(e.ctx, recoverInfo)
return err
}

Expand Down
12 changes: 12 additions & 0 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,16 @@ func getAutoRecordID(d types.Datum, target *types.FieldType, isInsert bool) (int
}

func (e *InsertValues) adjustAutoRandomDatum(ctx context.Context, d types.Datum, hasValue bool, c *table.Column) (types.Datum, error) {
retryInfo := e.ctx.GetSessionVars().RetryInfo
if retryInfo.Retrying {
autoRandomID, err := retryInfo.GetCurrAutoRandomID()
if err != nil {
return types.Datum{}, err
}
d.SetAutoID(autoRandomID, c.Flag)
return d, nil
}

if !hasValue || d.IsNull() {
_, err := e.ctx.Txn(true)
if err != nil {
Expand All @@ -848,6 +858,7 @@ func (e *InsertValues) adjustAutoRandomDatum(ctx context.Context, d types.Datum,
return types.Datum{}, err
}
d.SetAutoID(autoRandomID, c.Flag)
retryInfo.AddAutoRandomID(autoRandomID)
} else {
recordID, err := getAutoRecordID(d, &c.FieldType, true)
if err != nil {
Expand All @@ -858,6 +869,7 @@ func (e *InsertValues) adjustAutoRandomDatum(ctx context.Context, d types.Datum,
return types.Datum{}, err
}
d.SetAutoID(recordID, c.Flag)
retryInfo.AddAutoRandomID(recordID)
}

casted, err := table.CastValue(e.ctx, d, c.ToInfo())
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ require (
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
github.com/pingcap/kvproto v0.0.0-20200108025604-a4dc183d2af5
github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd
github.com/pingcap/parser v0.0.0-20200207090844-d65f5147dd9f
github.com/pingcap/parser v0.0.0-20200211032504-77fd7e3e8fa0
github.com/pingcap/pd v1.1.0-beta.0.20200106144140-f5a7aa985497
github.com/pingcap/sysutil v0.0.0-20200206130906-2bfa6dc40bcd
github.com/pingcap/tidb-tools v3.0.6-0.20191106033616-90632dda3863+incompatible
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@ github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd h1:CV3VsP3Z02MVtdpTMfEgRJ4T9NGgGTxdHpJerent7rM=
github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/parser v0.0.0-20200207090844-d65f5147dd9f h1:uUrZ94J2/tsmCXHjF7pItG2tMqwP4P4vMojAbI8NMRY=
github.com/pingcap/parser v0.0.0-20200207090844-d65f5147dd9f/go.mod h1:9v0Edh8IbgjGYW2ArJr19E+bvL8zKahsFp+ixWeId+4=
github.com/pingcap/parser v0.0.0-20200211032504-77fd7e3e8fa0 h1:pdoULzj07DdUWodgP2ioywqX/onl1HLCJmYiD+D97h4=
github.com/pingcap/parser v0.0.0-20200211032504-77fd7e3e8fa0/go.mod h1:9v0Edh8IbgjGYW2ArJr19E+bvL8zKahsFp+ixWeId+4=
github.com/pingcap/pd v1.1.0-beta.0.20200106144140-f5a7aa985497 h1:FzLErYtcXnSxtC469OuVDlgBbh0trJZzNxw0mNKzyls=
github.com/pingcap/pd v1.1.0-beta.0.20200106144140-f5a7aa985497/go.mod h1:cfT/xu4Zz+Tkq95QrLgEBZ9ikRcgzy4alHqqoaTftqI=
github.com/pingcap/sysutil v0.0.0-20191216090214-5f9620d22b3b h1:EEyo/SCRswLGuSk+7SB86Ak1p8bS6HL1Mi4Dhyuv6zg=
Expand Down
6 changes: 3 additions & 3 deletions meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,17 +330,17 @@ func (m *Meta) CreateTableOrView(dbID int64, tableInfo *model.TableInfo) error {

// CreateTableAndSetAutoID creates a table with tableInfo in database,
// and rebases the table autoID.
func (m *Meta) CreateTableAndSetAutoID(dbID int64, tableInfo *model.TableInfo, autoID int64) error {
func (m *Meta) CreateTableAndSetAutoID(dbID int64, tableInfo *model.TableInfo, autoIncID, autoRandID int64) error {
err := m.CreateTableOrView(dbID, tableInfo)
if err != nil {
return errors.Trace(err)
}
_, err = m.txn.HInc(m.dbKey(dbID), m.autoTableIDKey(tableInfo.ID), autoID)
_, err = m.txn.HInc(m.dbKey(dbID), m.autoTableIDKey(tableInfo.ID), autoIncID)
if err != nil {
return errors.Trace(err)
}
if tableInfo.AutoRandomBits > 0 {
_, err = m.txn.HInc(m.dbKey(dbID), m.autoRandomTableIDKey(tableInfo.ID), 0)
_, err = m.txn.HInc(m.dbKey(dbID), m.autoRandomTableIDKey(tableInfo.ID), autoRandID)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion meta/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (s *testSuite) TestMeta(c *C) {
ID: 3,
Name: model.NewCIStr("tbl3"),
}
err = t.CreateTableAndSetAutoID(1, tbInfo3, 123)
err = t.CreateTableAndSetAutoID(1, tbInfo3, 123, 0)
c.Assert(err, IsNil)
id, err := t.GetAutoTableID(1, tbInfo3.ID)
c.Assert(err, IsNil)
Expand Down
59 changes: 42 additions & 17 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,39 +60,64 @@ var (
type RetryInfo struct {
Retrying bool
DroppedPreparedStmtIDs []uint32
currRetryOff int
autoIncrementIDs []int64
autoIncrementIDs retryInfoAutoIDs
autoRandomIDs retryInfoAutoIDs
}

// Clean does some clean work.
func (r *RetryInfo) Clean() {
r.currRetryOff = 0
if len(r.autoIncrementIDs) > 0 {
r.autoIncrementIDs = r.autoIncrementIDs[:0]
}
r.autoIncrementIDs.clean()
r.autoRandomIDs.clean()

if len(r.DroppedPreparedStmtIDs) > 0 {
r.DroppedPreparedStmtIDs = r.DroppedPreparedStmtIDs[:0]
}
}

// AddAutoIncrementID adds id to AutoIncrementIDs.
func (r *RetryInfo) AddAutoIncrementID(id int64) {
r.autoIncrementIDs = append(r.autoIncrementIDs, id)
}

// ResetOffset resets the current retry offset.
func (r *RetryInfo) ResetOffset() {
r.currRetryOff = 0
r.autoIncrementIDs.currentOffset = 0
r.autoRandomIDs.currentOffset = 0
tangenta marked this conversation as resolved.
Show resolved Hide resolved
}

// GetCurrAutoIncrementID gets current AutoIncrementID.
// AddAutoIncrementID adds id to autoIncrementIDs.
func (r *RetryInfo) AddAutoIncrementID(id int64) {
r.autoIncrementIDs.autoIDs = append(r.autoIncrementIDs.autoIDs, id)
}

// GetCurrAutoIncrementID gets current autoIncrementID.
func (r *RetryInfo) GetCurrAutoIncrementID() (int64, error) {
if r.currRetryOff >= len(r.autoIncrementIDs) {
return 0, errCantGetValidID
return r.autoIncrementIDs.getCurrent()
}

// AddAutoRandomID adds id to autoRandomIDs.
func (r *RetryInfo) AddAutoRandomID(id int64) {
r.autoRandomIDs.autoIDs = append(r.autoRandomIDs.autoIDs, id)
}

// GetCurrAutoRandomID gets current AutoRandomID.
AilinKid marked this conversation as resolved.
Show resolved Hide resolved
func (r *RetryInfo) GetCurrAutoRandomID() (int64, error) {
return r.autoRandomIDs.getCurrent()
}

type retryInfoAutoIDs struct {
currentOffset int
autoIDs []int64
}

func (r *retryInfoAutoIDs) clean() {
r.currentOffset = 0
if len(r.autoIDs) > 0 {
r.autoIDs = r.autoIDs[:0]
}
id := r.autoIncrementIDs[r.currRetryOff]
r.currRetryOff++
}

func (r *retryInfoAutoIDs) getCurrent() (int64, error) {
if r.currentOffset >= len(r.autoIDs) {
return 0, errCantGetValidID
Copy link
Member

@bb7133 bb7133 Feb 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message of errCantGetValidID is "cannot get valid auto-increment id in retry". I suggest to modify it to "Cannot get a valid auto-ID when retrying the statement", what do you think?

}
id := r.autoIDs[r.currentOffset]
r.currentOffset++
return id, nil
}

Expand Down