Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

log-backup: restore meta kv with batch method #37100

Merged
merged 9 commits into from
Aug 16, 2022
226 changes: 182 additions & 44 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1768,13 +1768,6 @@ func (rc *Client) ReadStreamDataFiles(
}
}

// sort files firstly.
slices.SortFunc(mFiles, func(i, j *backuppb.DataFileInfo) bool {
if i.ResolvedTs > 0 && j.ResolvedTs > 0 {
return i.ResolvedTs < j.ResolvedTs
}
return i.MaxTs < j.MaxTs
})
return dFiles, mFiles, nil
}

Expand Down Expand Up @@ -1996,6 +1989,31 @@ func (rc *Client) InitSchemasReplaceForDDL(
return stream.NewSchemasReplace(dbMap, rc.currentTS, tableFilter, rc.GenGlobalID, rc.GenGlobalIDs, rc.InsertDeleteRangeForTable, rc.InsertDeleteRangeForIndex), nil
}

func SortMetaKVFiles(files []*backuppb.DataFileInfo) []*backuppb.DataFileInfo {
slices.SortFunc(files, func(i, j *backuppb.DataFileInfo) bool {
if i.GetMinTs() < j.GetMinTs() {
return true
} else if i.GetMinTs() > j.GetMinTs() {
return false
}

if i.GetMaxTs() < j.GetMaxTs() {
return true
} else if i.GetMaxTs() > j.GetMaxTs() {
return false
}

if i.GetResolvedTs() < j.GetResolvedTs() {
return true
} else if i.GetResolvedTs() > j.GetResolvedTs() {
return false
}

return true
})
return files
}

// RestoreMetaKVFiles tries to restore files about meta kv-event from stream-backup.
func (rc *Client) RestoreMetaKVFiles(
ctx context.Context,
Expand All @@ -2004,7 +2022,10 @@ func (rc *Client) RestoreMetaKVFiles(
updateStats func(kvCount uint64, size uint64),
progressInc func(),
) error {
// sort files firstly.
files = SortMetaKVFiles(files)
filesInWriteCF := make([]*backuppb.DataFileInfo, 0, len(files))
filesInDefaultCF := make([]*backuppb.DataFileInfo, 0, len(files))

// The k-v events in default CF should be restored firstly. The reason is that:
// The error of transactions of meta could happen if restore write CF events successfully,
Expand All @@ -2014,30 +2035,39 @@ func (rc *Client) RestoreMetaKVFiles(
filesInWriteCF = append(filesInWriteCF, f)
continue
}

if f.Type == backuppb.FileType_Delete {
// this should happen abnormally.
// only do some preventive checks here.
log.Warn("detected delete file of meta key, skip it", zap.Any("file", f))
continue
}

kvCount, size, err := rc.RestoreMetaKVFile(ctx, f, schemasReplace)
if err != nil {
return errors.Trace(err)
if f.Cf == stream.DefaultCF {
filesInDefaultCF = append(filesInDefaultCF, f)
}
updateStats(kvCount, size)
progressInc()
}

// Restore files in default CF.
if err := rc.RestoreMetaKVFilesWithBatchMethod(
ctx,
filesInDefaultCF,
schemasReplace,
updateStats,
progressInc,
rc.RestoreBatchMetaKVFiles,
); err != nil {
return errors.Trace(err)
}

// Restore files in write CF.
for _, f := range filesInWriteCF {
kvCount, size, err := rc.RestoreMetaKVFile(ctx, f, schemasReplace)
if err != nil {
return errors.Trace(err)
}
updateStats(kvCount, size)
progressInc()
if err := rc.RestoreMetaKVFilesWithBatchMethod(
ctx,
filesInWriteCF,
schemasReplace,
updateStats,
progressInc,
rc.RestoreBatchMetaKVFiles,
); err != nil {
return errors.Trace(err)
}

// Update global schema version and report all of TiDBs.
Expand All @@ -2047,41 +2077,128 @@ func (rc *Client) RestoreMetaKVFiles(
return nil
}

// RestoreMetaKVFile tries to restore a file about meta kv-event from stream-backup.
func (rc *Client) RestoreMetaKVFile(
func (rc *Client) RestoreMetaKVFilesWithBatchMethod(
ctx context.Context,
file *backuppb.DataFileInfo,
sr *stream.SchemasReplace,
) (uint64, uint64, error) {
files []*backuppb.DataFileInfo,
schemasReplace *stream.SchemasReplace,
updateStats func(kvCount uint64, size uint64),
progressInc func(),
restoreBatch func(
ctx context.Context,
files []*backuppb.DataFileInfo,
schemasReplace *stream.SchemasReplace,
updateStats func(kvCount uint64, size uint64),
progressInc func(),
) error,
) error {
var (
kvCount uint64
size uint64
rangeMin uint64
rangeMax uint64
idx int
)
log.Info("restore meta kv events", zap.String("file", file.Path),
zap.String("cf", file.Cf), zap.Int64("kv-count", file.NumberOfEntries),
zap.Uint64("min-ts", file.MinTs), zap.Uint64("max-ts", file.MaxTs))
for i, f := range files {
if i == 0 {
idx = i
rangeMax = f.MaxTs
rangeMin = f.MinTs
} else {
if f.MinTs <= rangeMax {
rangeMin = mathutil.Min(rangeMin, f.MinTs)
rangeMax = mathutil.Max(rangeMax, f.MaxTs)
} else {
err := restoreBatch(ctx, files[idx:i], schemasReplace, updateStats, progressInc)
if err != nil {
return errors.Trace(err)
}
idx = i
rangeMin = f.MinTs
rangeMax = f.MaxTs
}
}

if i == len(files)-1 {
3pointer marked this conversation as resolved.
Show resolved Hide resolved
err := restoreBatch(ctx, files[idx:], schemasReplace, updateStats, progressInc)
if err != nil {
return errors.Trace(err)
}
}
}
return nil
}

// the kv entry with ts, the ts is decoded from entry.
type kvEntryWithTS struct {
e kv.Entry
ts uint64
}

func (rc *Client) RestoreBatchMetaKVFiles(
ctx context.Context,
files []*backuppb.DataFileInfo,
schemasReplace *stream.SchemasReplace,
updateStats func(kvCount uint64, size uint64),
progressInc func(),
) error {
if len(files) == 0 {
return nil
}

// read all of entries from files.
kvEntries := make([]*kvEntryWithTS, 0)
for _, f := range files {
es, err := rc.readAllEntries(ctx, f)
if err != nil {
return errors.Trace(err)
}

kvEntries = append(kvEntries, es...)
}

// sort these entries.
slices.SortFunc(kvEntries, func(i, j *kvEntryWithTS) bool {
return i.ts < j.ts
})

// restore these entries with rawPut() method.
kvCount, size, err := rc.restoreMetaKvEntries(ctx, schemasReplace, kvEntries, files[0].GetCf())
if err != nil {
return errors.Trace(err)
}

updateStats(kvCount, size)
for i := 0; i < len(files); i++ {
progressInc()
}
return nil
}

func (rc *Client) readAllEntries(
ctx context.Context,
file *backuppb.DataFileInfo,
) ([]*kvEntryWithTS, error) {
kvEntries := make([]*kvEntryWithTS, 0)

rc.rawKVClient.SetColumnFamily(file.GetCf())
buff, err := rc.storage.ReadFile(ctx, file.Path)
if err != nil {
return 0, 0, errors.Trace(err)
return nil, errors.Trace(err)
}

if checksum := sha256.Sum256(buff); !bytes.Equal(checksum[:], file.GetSha256()) {
return 0, 0, errors.Annotatef(berrors.ErrInvalidMetaFile,
return nil, errors.Annotatef(berrors.ErrInvalidMetaFile,
"checksum mismatch expect %x, got %x", file.GetSha256(), checksum[:])
}

iter := stream.NewEventIterator(buff)
for iter.Valid() {
iter.Next()
if iter.GetError() != nil {
return 0, 0, errors.Trace(iter.GetError())
return nil, errors.Trace(iter.GetError())
}

txnEntry := kv.Entry{Key: iter.Key(), Value: iter.Value()}
ts, err := GetKeyTS(txnEntry.Key)
if err != nil {
return 0, 0, errors.Trace(err)
return nil, errors.Trace(err)
}

// The commitTs in write CF need be limited on [startTs, restoreTs].
Expand All @@ -2101,20 +2218,41 @@ func (rc *Client) RestoreMetaKVFile(
log.Warn("txn entry is null", zap.Uint64("key-ts", ts), zap.ByteString("tnxKey", txnEntry.Key))
continue
}
log.Debug("txn entry", zap.Uint64("key-ts", ts), zap.Int("txnKey-len", len(txnEntry.Key)),
zap.Int("txnValue-len", len(txnEntry.Value)), zap.ByteString("txnKey", txnEntry.Key))
newEntry, err := sr.RewriteKvEntry(&txnEntry, file.Cf)
kvEntries = append(kvEntries, &kvEntryWithTS{e: txnEntry, ts: ts})
}

return kvEntries, nil
}

func (rc *Client) restoreMetaKvEntries(
ctx context.Context,
sr *stream.SchemasReplace,
entries []*kvEntryWithTS,
columnFamily string,
) (uint64, uint64, error) {
var (
kvCount uint64
size uint64
)

rc.rawKVClient.SetColumnFamily(columnFamily)

for _, entry := range entries {
log.Debug("before rewrte entry", zap.Uint64("key-ts", entry.ts), zap.Int("key-len", len(entry.e.Key)),
zap.Int("value-len", len(entry.e.Value)), zap.ByteString("key", entry.e.Key))

newEntry, err := sr.RewriteKvEntry(&entry.e, columnFamily)
if err != nil {
log.Error("rewrite txn entry failed", zap.Int("klen", len(txnEntry.Key)),
logutil.Key("txn-key", txnEntry.Key))
log.Error("rewrite txn entry failed", zap.Int("klen", len(entry.e.Key)),
joccau marked this conversation as resolved.
Show resolved Hide resolved
logutil.Key("txn-key", entry.e.Key))
return 0, 0, errors.Trace(err)
} else if newEntry == nil {
continue
}
log.Debug("rewrite txn entry", zap.Int("newKey-len", len(newEntry.Key)),
zap.Int("newValue-len", len(txnEntry.Value)), zap.ByteString("newkey", newEntry.Key))
log.Debug("after rewrite entry", zap.Int("new-key-len", len(newEntry.Key)),
zap.Int("new-value-len", len(entry.e.Value)), zap.ByteString("new-key", newEntry.Key))

if err := rc.rawKVClient.Put(ctx, newEntry.Key, newEntry.Value, ts); err != nil {
if err := rc.rawKVClient.Put(ctx, newEntry.Key, newEntry.Value, entry.ts); err != nil {
return 0, 0, errors.Trace(err)
}

Expand Down
Loading