Skip to content

Commit

Permalink
restore: rewrite auto increment id after pitr (#46521)
Browse files Browse the repository at this point in the history
close #46520
  • Loading branch information
3pointer authored Sep 4, 2023
1 parent 08024e7 commit e82519e
Show file tree
Hide file tree
Showing 9 changed files with 141 additions and 43 deletions.
1 change: 1 addition & 0 deletions br/pkg/backup/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ go_library(
"//br/pkg/storage",
"//br/pkg/summary",
"//br/pkg/utils",
"//br/pkg/version",
"//ddl",
"//distsql",
"//kv",
Expand Down
10 changes: 10 additions & 0 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/summary"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/version"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -537,6 +538,15 @@ func BuildBackupRangeAndInitSchema(

hasTable := false
err = m.IterTables(dbInfo.ID, func(tableInfo *model.TableInfo) error {
if tableInfo.Version > version.CURRENT_BACKUP_SUPPORT_TABLE_INFO_VERSION {
// normally this shouldn't happen in a production env.
// because we had a unit test to avoid table info version update silencly.
// and had version check before run backup.
return errors.Errorf("backup doesn't not support table %s with version %d, maybe try a new version of br",
tableInfo.Name.String(),
tableInfo.Version,
)
}
if !tableFilter.MatchTable(dbInfo.Name.O, tableInfo.Name.O) {
// Skip tables other than the given table.
return nil
Expand Down
16 changes: 16 additions & 0 deletions br/pkg/stream/rewrite_meta_rawkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,20 @@ func (sr *SchemasReplace) rewriteEntryForTable(e *kv.Entry, cf string) (*kv.Entr
return &kv.Entry{Key: newKey, Value: result.NewValue}, nil
}

func (sr *SchemasReplace) rewriteEntryForAutoIncrementIDKey(e *kv.Entry, cf string) (*kv.Entry, error) {
newKey, err := sr.rewriteKeyForTable(
e.Key,
cf,
meta.ParseAutoIncrementIDKey,
meta.AutoIncrementIDKey,
)
if err != nil {
return nil, errors.Trace(err)
}

return &kv.Entry{Key: newKey, Value: e.Value}, nil
}

func (sr *SchemasReplace) rewriteEntryForAutoTableIDKey(e *kv.Entry, cf string) (*kv.Entry, error) {
newKey, err := sr.rewriteKeyForTable(
e.Key,
Expand Down Expand Up @@ -652,6 +666,8 @@ func (sr *SchemasReplace) RewriteKvEntry(e *kv.Entry, cf string) (*kv.Entry, err
}
if meta.IsTableKey(rawKey.Field) {
return sr.rewriteEntryForTable(e, cf)
} else if meta.IsAutoIncrementIDKey(rawKey.Field) {
return sr.rewriteEntryForAutoIncrementIDKey(e, cf)
} else if meta.IsAutoTableIDKey(rawKey.Field) {
return sr.rewriteEntryForAutoTableIDKey(e, cf)
} else if meta.IsSequenceKey(rawKey.Field) {
Expand Down
111 changes: 69 additions & 42 deletions br/pkg/stream/rewrite_meta_rawkv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,49 +208,76 @@ func TestRewriteKeyForTable(t *testing.T) {
tableID int64 = 57
ts uint64 = 400036290571534337
)
encodedKey := encodeTxnMetaKey(meta.DBkey(dbID), meta.TableKey(tableID), ts)

// create schemasReplace.
sr := MockEmptySchemasReplace(nil)

// set preConstruct status and construct map information.
sr.SetPreConstructMapStatus()
newKey, err := sr.rewriteKeyForTable(encodedKey, WriteCF, meta.ParseTableKey, meta.TableKey)
require.Nil(t, err)
require.Nil(t, newKey)
require.Equal(t, len(sr.DbMap), 1)
require.Equal(t, len(sr.DbMap[dbID].TableMap), 1)
downStreamDbID := sr.DbMap[dbID].DbID
downStreamTblID := sr.DbMap[dbID].TableMap[tableID].TableID

// set restoreKV status and rewrite it.
sr.SetRestoreKVStatus()
newKey, err = sr.rewriteKeyForTable(encodedKey, DefaultCF, meta.ParseTableKey, meta.TableKey)
require.Nil(t, err)
decodedKey, err := ParseTxnMetaKeyFrom(newKey)
require.Nil(t, err)
require.Equal(t, decodedKey.Ts, ts)

newDbID, err := meta.ParseDBKey(decodedKey.Key)
require.Nil(t, err)
require.Equal(t, newDbID, downStreamDbID)
newTblID, err := meta.ParseTableKey(decodedKey.Field)
require.Nil(t, err)
require.Equal(t, newTblID, downStreamTblID)

// rewrite it again, and get the same result.
newKey, err = sr.rewriteKeyForTable(encodedKey, WriteCF, meta.ParseTableKey, meta.TableKey)
require.Nil(t, err)
decodedKey, err = ParseTxnMetaKeyFrom(newKey)
require.Nil(t, err)
require.Equal(t, decodedKey.Ts, sr.RewriteTS)
cases := []struct {
encodeTableFn func(int64) []byte
decodeTableFn func([]byte) (int64, error)
}{
{
meta.TableKey,
meta.ParseTableKey,
},
{
meta.AutoIncrementIDKey,
meta.ParseAutoIncrementIDKey,
},
{
meta.AutoTableIDKey,
meta.ParseAutoTableIDKey,
},
{
meta.AutoRandomTableIDKey,
meta.ParseAutoRandomTableIDKey,
},
{
meta.SequenceKey,
meta.ParseSequenceKey,
},
}

newDbID, err = meta.ParseDBKey(decodedKey.Key)
require.Nil(t, err)
require.Equal(t, newDbID, downStreamDbID)
newTblID, err = meta.ParseTableKey(decodedKey.Field)
require.Nil(t, err)
require.Equal(t, newTblID, downStreamTblID)
for _, ca := range cases {
encodedKey := encodeTxnMetaKey(meta.DBkey(dbID), ca.encodeTableFn(tableID), ts)
// create schemasReplace.
sr := MockEmptySchemasReplace(nil)

// set preConstruct status and construct map information.
sr.SetPreConstructMapStatus()
newKey, err := sr.rewriteKeyForTable(encodedKey, WriteCF, ca.decodeTableFn, ca.encodeTableFn)
require.Nil(t, err)
require.Nil(t, newKey)
require.Equal(t, len(sr.DbMap), 1)
require.Equal(t, len(sr.DbMap[dbID].TableMap), 1)
downStreamDbID := sr.DbMap[dbID].DbID
downStreamTblID := sr.DbMap[dbID].TableMap[tableID].TableID

// set restoreKV status and rewrite it.
sr.SetRestoreKVStatus()
newKey, err = sr.rewriteKeyForTable(encodedKey, DefaultCF, ca.decodeTableFn, ca.encodeTableFn)
require.Nil(t, err)
decodedKey, err := ParseTxnMetaKeyFrom(newKey)
require.Nil(t, err)
require.Equal(t, decodedKey.Ts, ts)

newDbID, err := meta.ParseDBKey(decodedKey.Key)
require.Nil(t, err)
require.Equal(t, newDbID, downStreamDbID)
newTblID, err := ca.decodeTableFn(decodedKey.Field)
require.Nil(t, err)
require.Equal(t, newTblID, downStreamTblID)

// rewrite it again, and get the same result.
newKey, err = sr.rewriteKeyForTable(encodedKey, WriteCF, ca.decodeTableFn, ca.encodeTableFn)
require.Nil(t, err)
decodedKey, err = ParseTxnMetaKeyFrom(newKey)
require.Nil(t, err)
require.Equal(t, decodedKey.Ts, sr.RewriteTS)

newDbID, err = meta.ParseDBKey(decodedKey.Key)
require.Nil(t, err)
require.Equal(t, newDbID, downStreamDbID)
newTblID, err = ca.decodeTableFn(decodedKey.Field)
require.Nil(t, err)
require.Equal(t, newTblID, downStreamTblID)
}
}

func TestRewriteTableInfo(t *testing.T) {
Expand Down
7 changes: 7 additions & 0 deletions br/pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx/variable"
filter "github.com/pingcap/tidb/util/table-filter"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -103,6 +104,12 @@ const (
flagFullBackupType = "type"
)

const (
// Once TableInfoVersion updated. BR need to check compatibility with
// new TableInfoVersion. both snapshot restore and pitr need to be checked.
CURRENT_BACKUP_SUPPORT_TABLE_INFO_VERSION = model.TableInfoVersion5
)

// FullBackupType type when doing full backup or restore
type FullBackupType string

Expand Down
4 changes: 3 additions & 1 deletion br/pkg/version/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"//br/pkg/logutil",
"//br/pkg/utils",
"//br/pkg/version/build",
"//parser/model",
"//util/engine",
"@com_github_coreos_go_semver//semver",
"@com_github_pingcap_errors//:errors",
Expand All @@ -26,9 +27,10 @@ go_test(
srcs = ["version_test.go"],
embed = [":version"],
flaky = True,
shard_count = 9,
shard_count = 10,
deps = [
"//br/pkg/version/build",
"//parser/model",
"@com_github_coreos_go_semver//semver",
"@com_github_data_dog_go_sqlmock//:go-sqlmock",
"@com_github_pingcap_kvproto//pkg/metapb",
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/version/build"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/util/engine"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
Expand All @@ -35,6 +36,10 @@ var (
checkpointSupportError error = nil
// pitrSupportBatchKVFiles specifies whether TiKV-server supports batch PITR.
pitrSupportBatchKVFiles bool = false

// Once TableInfoVersion updated. BR need to check compatibility with
// new TableInfoVersion. both snapshot restore and pitr need to be checked.
CURRENT_BACKUP_SUPPORT_TABLE_INFO_VERSION = model.TableInfoVersion5
)

// NextMajorVersion returns the next major version.
Expand Down
9 changes: 9 additions & 0 deletions br/pkg/version/version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/coreos/go-semver/semver"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/br/pkg/version/build"
"github.com/pingcap/tidb/parser/model"
"github.com/stretchr/testify/require"
pd "github.com/tikv/pd/client"
)
Expand Down Expand Up @@ -622,3 +623,11 @@ Check Table Before Drop: false`
require.NoError(t, err)
require.Equal(t, "5.7.25", versionStr)
}

func TestEnsureSupportVersion(t *testing.T) {
// Once this test failed. please check the compatibility carefully.
// *** Don't change this test simply. ***
require.Equal(t,
CURRENT_BACKUP_SUPPORT_TABLE_INFO_VERSION,
model.CurrLatestTableInfoVersion)
}
21 changes: 21 additions & 0 deletions meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,9 +313,30 @@ func ParseAutoTableIDKey(key []byte) (int64, error) {
}

func (*Meta) autoIncrementIDKey(tableID int64) []byte {
return AutoIncrementIDKey(tableID)
}

// AutoIncrementIDKey decodes the auto inc table key.
func AutoIncrementIDKey(tableID int64) []byte {
return []byte(fmt.Sprintf("%s:%d", mIncIDPrefix, tableID))
}

// IsAutoIncrementIDKey checks whether the key is auto increment key.
func IsAutoIncrementIDKey(key []byte) bool {
return strings.HasPrefix(string(key), mIncIDPrefix+":")
}

// ParseAutoIncrementIDKey decodes the tableID from the auto tableID key.
func ParseAutoIncrementIDKey(key []byte) (int64, error) {
if !IsAutoIncrementIDKey(key) {
return 0, ErrInvalidString.GenWithStack("fail to parse autoIncrementKey")
}

tableID := strings.TrimPrefix(string(key), mIncIDPrefix+":")
id, err := strconv.Atoi(tableID)
return int64(id), err
}

func (*Meta) autoRandomTableIDKey(tableID int64) []byte {
return AutoRandomTableIDKey(tableID)
}
Expand Down

0 comments on commit e82519e

Please sign in to comment.