Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: improve flashback to timestamp, let it compatible with TiCDC #39442

Merged
merged 9 commits into from
Nov 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -2915,8 +2915,8 @@ def go_deps():
name = "com_github_pingcap_kvproto",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/kvproto",
sum = "h1:hnUlIU5nCH6PAO9DC5DhODX1cwqoTcXTNIODyvNI9q4=",
version = "v0.0.0-20221123043343-cdc67325f05f",
sum = "h1:LzIZsQpXQlj8yF7+yvyOg680OaPq7bmPuDuszgXfHsw=",
version = "v0.0.0-20221129023506-621ec37aac7a",
)
go_repository(
name = "com_github_pingcap_log",
Expand Down Expand Up @@ -3519,8 +3519,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:nfdLYzkmYR2b0XurXSXcVFk06eO//P4VtkqoADX2tR4=",
version = "v2.0.3-0.20221128025602-81939ec8b2bb",
sum = "h1:vlgZedcfExiTzB3BB4nt5CpaghDfm9La/0Ofn7weIUA=",
version = "v2.0.3-0.20221129032117-857772dd0907",
)
go_repository(
name = "com_github_tikv_pd_client",
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/restore/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func RecoverData(ctx context.Context, resolveTS uint64, allStores []*metapb.Stor
return totalRegions, errors.Trace(err)
}

if err := recovery.PrepareFlashbackToVersion(ctx); err != nil {
if err := recovery.PrepareFlashbackToVersion(ctx, resolveTS, restoreTS-1); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we guarantee that no other write on restoreTS-1? Otherwise, it may overwrite the old value.

/cc @fengou1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, In EBS restore. workflow as follows

  1. start pd
  2. set pd/tikv recoverymode
  3. start tikv
  4. prepare/flashback version
  5. start tidb

there is no chance to write any data before step 5 (start tidb)

return totalRegions, errors.Trace(err)
}

Expand Down Expand Up @@ -304,12 +304,12 @@ func (recovery *Recovery) WaitApply(ctx context.Context) (err error) {
}

// prepare the region for flashback the data, the purpose is to stop region service, put region in flashback state
func (recovery *Recovery) PrepareFlashbackToVersion(ctx context.Context) (err error) {
func (recovery *Recovery) PrepareFlashbackToVersion(ctx context.Context, resolveTS uint64, startTS uint64) (err error) {
var totalRegions atomic.Uint64
totalRegions.Store(0)

handler := func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) {
stats, err := ddl.SendPrepareFlashbackToVersionRPC(ctx, recovery.mgr.GetStorage().(tikv.Storage), r)
stats, err := ddl.SendPrepareFlashbackToVersionRPC(ctx, recovery.mgr.GetStorage().(tikv.Storage), resolveTS, startTS, r)
totalRegions.Add(uint64(stats.CompletedRegions))
return stats, err
}
Expand Down
29 changes: 18 additions & 11 deletions ddl/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,7 @@ import (
)

var pdScheduleKey = []string{
"hot-region-schedule-limit",
"leader-schedule-limit",
"merge-schedule-limit",
"region-schedule-limit",
"replica-schedule-limit",
}

const (
Expand All @@ -68,6 +64,7 @@ const (
autoAnalyzeOffset
readOnlyOffset
totalLockedRegionsOffset
startTSOffset
commitTSOffset
)

Expand Down Expand Up @@ -280,6 +277,7 @@ func GetFlashbackKeyRanges(sess sessionctx.Context) ([]kv.KeyRange, error) {
func SendPrepareFlashbackToVersionRPC(
ctx context.Context,
s tikv.Storage,
flashbackTS, startTS uint64,
r tikvstore.KeyRange,
) (rangetask.TaskStat, error) {
startKey, rangeEndKey := r.StartKey, r.EndKey
Expand Down Expand Up @@ -314,6 +312,8 @@ func SendPrepareFlashbackToVersionRPC(
req := tikvrpc.NewRequest(tikvrpc.CmdPrepareFlashbackToVersion, &kvrpcpb.PrepareFlashbackToVersionRequest{
StartKey: startKey,
EndKey: endKey,
StartTs: startTS,
Version: flashbackTS,
})

resp, err := s.SendReq(bo, req, loc.Region, flashbackTimeout)
Expand Down Expand Up @@ -481,11 +481,11 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
return ver, errors.Errorf("Not support flashback cluster in non-TiKV env")
}

var flashbackTS, lockedRegions, commitTS uint64
var flashbackTS, lockedRegions, startTS, commitTS uint64
var pdScheduleValue map[string]interface{}
var autoAnalyzeValue, readOnlyValue string
var gcEnabledValue bool
if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &gcEnabledValue, &autoAnalyzeValue, &readOnlyValue, &lockedRegions, &commitTS); err != nil {
if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &gcEnabledValue, &autoAnalyzeValue, &readOnlyValue, &lockedRegions, &startTS, &commitTS); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
Expand Down Expand Up @@ -533,6 +533,13 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
// We should get startTS here to avoid lost startTS when TiDB crashed during send prepare flashback RPC.
startTS, err = d.store.GetOracle().GetTimestamp(d.ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
job.Args[startTSOffset] = startTS
job.SchemaState = model.StateWriteOnly
return ver, nil
// Stage 3, get key ranges and get locks.
Expand All @@ -552,7 +559,7 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
for _, r := range keyRanges {
if err = flashbackToVersion(d.ctx, d,
func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) {
stats, err := SendPrepareFlashbackToVersionRPC(ctx, d.store.(tikv.Storage), r)
stats, err := SendPrepareFlashbackToVersionRPC(ctx, d.store.(tikv.Storage), flashbackTS, startTS, r)
totalRegions.Add(uint64(stats.CompletedRegions))
return stats, err
}, r.StartKey, r.EndKey); err != nil {
Expand Down Expand Up @@ -587,8 +594,8 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
for _, r := range keyRanges {
if err = flashbackToVersion(d.ctx, d,
func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) {
// Use commitTS - 1 as startTS, make sure it less than commitTS.
stats, err := SendFlashbackToVersionRPC(ctx, d.store.(tikv.Storage), flashbackTS, commitTS-1, commitTS, r)
// Use same startTS as prepare phase to simulate 1PC txn.
stats, err := SendFlashbackToVersionRPC(ctx, d.store.(tikv.Storage), flashbackTS, startTS, commitTS, r)
completedRegions.Add(uint64(stats.CompletedRegions))
logutil.BgLogger().Info("[ddl] flashback cluster stats",
zap.Uint64("complete regions", completedRegions.Load()),
Expand All @@ -615,12 +622,12 @@ func finishFlashbackCluster(w *worker, job *model.Job) error {
return nil
}

var flashbackTS, lockedRegions, commitTS uint64
var flashbackTS, lockedRegions, startTS, commitTS uint64
var pdScheduleValue map[string]interface{}
var autoAnalyzeValue, readOnlyValue string
var gcEnabled bool

if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &gcEnabled, &autoAnalyzeValue, &readOnlyValue, &lockedRegions, &commitTS); err != nil {
if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &gcEnabled, &autoAnalyzeValue, &readOnlyValue, &lockedRegions, &startTS, &commitTS); err != nil {
return errors.Trace(err)
}
sess, err := w.sessPool.get()
Expand Down
6 changes: 3 additions & 3 deletions ddl/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func TestFlashbackCloseAndResetPDSchedule(t *testing.T) {
fmt.Sprintf("return(%v)", injectSafeTS)))

oldValue := map[string]interface{}{
"hot-region-schedule-limit": 1,
"merge-schedule-limit": 1,
}
require.NoError(t, infosync.SetPDScheduleConfig(context.Background(), oldValue))

Expand All @@ -112,7 +112,7 @@ func TestFlashbackCloseAndResetPDSchedule(t *testing.T) {
if job.SchemaState == model.StateWriteReorganization {
closeValue, err := infosync.GetPDScheduleConfig(context.Background())
assert.NoError(t, err)
assert.Equal(t, closeValue["hot-region-schedule-limit"], 0)
assert.Equal(t, closeValue["merge-schedule-limit"], 0)
// cancel flashback job
job.State = model.JobStateCancelled
job.Error = dbterror.ErrCancelledDDLJob
Expand All @@ -128,7 +128,7 @@ func TestFlashbackCloseAndResetPDSchedule(t *testing.T) {

finishValue, err := infosync.GetPDScheduleConfig(context.Background())
require.NoError(t, err)
require.EqualValues(t, finishValue["hot-region-schedule-limit"], 1)
require.EqualValues(t, finishValue["merge-schedule-limit"], 1)

require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS"))
Expand Down
13 changes: 11 additions & 2 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2724,6 +2724,14 @@ func (d *ddl) preSplitAndScatter(ctx sessionctx.Context, tbInfo *model.TableInfo

func (d *ddl) FlashbackCluster(ctx sessionctx.Context, flashbackTS uint64) error {
logutil.BgLogger().Info("[ddl] get flashback cluster job", zap.String("flashbackTS", oracle.GetTimeFromTS(flashbackTS).String()))
nowTS, err := ctx.GetStore().GetOracle().GetTimestamp(d.ctx, &oracle.Option{})
if err != nil {
return errors.Trace(err)
}
gap := time.Until(oracle.GetTimeFromTS(nowTS)).Abs()
if gap > 1*time.Second {
ctx.GetSessionVars().StmtCtx.AppendWarning(errors.Errorf("Gap between local time and PD TSO is %s, please check PD/system time", gap))
}
job := &model.Job{
Type: model.ActionFlashbackCluster,
BinlogInfo: &model.HistoryInfo{},
Expand All @@ -2735,9 +2743,10 @@ func (d *ddl) FlashbackCluster(ctx sessionctx.Context, flashbackTS uint64) error
variable.On, /* tidb_enable_auto_analyze */
variable.Off, /* tidb_super_read_only */
0, /* totalRegions */
0 /* newCommitTS */},
0, /* startTS */
0 /* commitTS */},
}
err := d.DoDDLJob(ctx, job)
err = d.DoDDLJob(ctx, job)
err = d.callHookOnChanged(job, err)
return errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ require (
github.com/pingcap/errors v0.11.5-0.20220729040631-518f63d66278
github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3
github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059
github.com/pingcap/kvproto v0.0.0-20221123043343-cdc67325f05f
github.com/pingcap/kvproto v0.0.0-20221129023506-621ec37aac7a
github.com/pingcap/log v1.1.1-0.20221116035753-734d527bc87c
github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4
github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e
Expand All @@ -86,7 +86,7 @@ require (
github.com/stretchr/testify v1.8.0
github.com/tdakkota/asciicheck v0.1.1
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.3-0.20221128025602-81939ec8b2bb
github.com/tikv/client-go/v2 v2.0.3-0.20221129032117-857772dd0907
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144
github.com/twmb/murmur3 v1.1.3
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -779,8 +779,8 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20221026112947-f8d61344b172/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20221123043343-cdc67325f05f h1:hnUlIU5nCH6PAO9DC5DhODX1cwqoTcXTNIODyvNI9q4=
github.com/pingcap/kvproto v0.0.0-20221123043343-cdc67325f05f/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20221129023506-621ec37aac7a h1:LzIZsQpXQlj8yF7+yvyOg680OaPq7bmPuDuszgXfHsw=
github.com/pingcap/kvproto v0.0.0-20221129023506-621ec37aac7a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
Expand Down Expand Up @@ -930,8 +930,8 @@ github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3 h1:f+jULpR
github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3/go.mod h1:ON8b8w4BN/kE1EOhwT0o+d62W65a6aPw1nouo9LMgyY=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU=
github.com/tikv/client-go/v2 v2.0.3-0.20221128025602-81939ec8b2bb h1:nfdLYzkmYR2b0XurXSXcVFk06eO//P4VtkqoADX2tR4=
github.com/tikv/client-go/v2 v2.0.3-0.20221128025602-81939ec8b2bb/go.mod h1:kqFVxpx40hAgqqLHXLEPJDM/j6ZVfH5CNdJEtkJvO58=
github.com/tikv/client-go/v2 v2.0.3-0.20221129032117-857772dd0907 h1:vlgZedcfExiTzB3BB4nt5CpaghDfm9La/0Ofn7weIUA=
github.com/tikv/client-go/v2 v2.0.3-0.20221129032117-857772dd0907/go.mod h1:MDT4J9LzgS7Bj1DnEq6Gk/puy6mp8TgUC92zGEVVLLg=
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07 h1:ckPpxKcl75mO2N6a4cJXiZH43hvcHPpqc9dh1TmH1nc=
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07/go.mod h1:CipBxPfxPUME+BImx9MUYXCnAVLS3VJUr3mnSJwh40A=
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 h1:kl4KhGNsJIbDHS9/4U9yQo1UcPQM0kOMJHn29EoH/Ro=
Expand Down