diff --git a/br/pkg/streamhelper/advancer.go b/br/pkg/streamhelper/advancer.go index 407134bc6027a..b29cbd6956ae2 100644 --- a/br/pkg/streamhelper/advancer.go +++ b/br/pkg/streamhelper/advancer.go @@ -18,6 +18,7 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" "github.com/tikv/client-go/v2/oracle" + "go.uber.org/multierr" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -285,6 +286,7 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error c.task = e.Info c.taskRange = spans.Collapse(len(e.Ranges), func(i int) kv.KeyRange { return e.Ranges[i] }) c.checkpoints = spans.Sorted(spans.NewFullWith(e.Ranges, 0)) + c.lastCheckpoint = e.Info.StartTs log.Info("added event", zap.Stringer("task", e.Info), zap.Stringer("ranges", logutil.StringifyKeys(c.taskRange))) case EventDel: utils.LogBackupTaskCountDec() @@ -292,7 +294,10 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error c.taskRange = nil c.checkpoints = nil // This would be synced by `taskMu`, perhaps we'd better rename that to `tickMu`. - c.subscriber.Clear() + // Do the null check because some of test cases won't equip the advancer with subscriber. + if c.subscriber != nil { + c.subscriber.Clear() + } if err := c.env.ClearV3GlobalCheckpointForTask(ctx, e.Name); err != nil { log.Warn("failed to clear global checkpoint", logutil.ShortError(err)) } @@ -303,6 +308,18 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error return nil } +func (c *CheckpointAdvancer) setCheckpoint(cp uint64) bool { + if cp < c.lastCheckpoint { + log.Warn("failed to update global checkpoint: stale", zap.Uint64("old", c.lastCheckpoint), zap.Uint64("new", cp)) + return false + } + if cp <= c.lastCheckpoint { + return false + } + c.lastCheckpoint = cp + return true +} + // advanceCheckpointBy advances the checkpoint by a checkpoint getter function. func (c *CheckpointAdvancer) advanceCheckpointBy(ctx context.Context, getCheckpoint func(context.Context) (uint64, error)) error { start := time.Now() @@ -310,24 +327,15 @@ func (c *CheckpointAdvancer) advanceCheckpointBy(ctx context.Context, getCheckpo if err != nil { return err } - log.Info("get checkpoint", zap.Uint64("old", c.lastCheckpoint), zap.Uint64("new", cp)) - if cp < c.lastCheckpoint { - log.Warn("failed to update global checkpoint: stale", zap.Uint64("old", c.lastCheckpoint), zap.Uint64("new", cp)) - } - if cp <= c.lastCheckpoint { - return nil - } - log.Info("uploading checkpoint for task", - zap.Stringer("checkpoint", oracle.GetTimeFromTS(cp)), - zap.Uint64("checkpoint", cp), - zap.String("task", c.task.Name), - zap.Stringer("take", time.Since(start))) - if err := c.env.UploadV3GlobalCheckpointForTask(ctx, c.task.Name, cp); err != nil { - return errors.Annotate(err, "failed to upload global checkpoint") + if c.setCheckpoint(cp) { + log.Info("uploading checkpoint for task", + zap.Stringer("checkpoint", oracle.GetTimeFromTS(cp)), + zap.Uint64("checkpoint", cp), + zap.String("task", c.task.Name), + zap.Stringer("take", time.Since(start))) + metrics.LastCheckpoint.WithLabelValues(c.task.GetName()).Set(float64(c.lastCheckpoint)) } - c.lastCheckpoint = cp - metrics.LastCheckpoint.WithLabelValues(c.task.GetName()).Set(float64(c.lastCheckpoint)) return nil } @@ -375,16 +383,17 @@ func (c *CheckpointAdvancer) subscribeTick(ctx context.Context) error { return c.subscriber.PendingErrors() } -func (c *CheckpointAdvancer) tick(ctx context.Context) error { - c.taskMu.Lock() - defer c.taskMu.Unlock() - if c.task == nil { - log.Debug("No tasks yet, skipping advancing.") - return nil +func (c *CheckpointAdvancer) importantTick(ctx context.Context) error { + c.checkpointsMu.Lock() + c.setCheckpoint(c.checkpoints.MinValue()) + c.checkpointsMu.Unlock() + if err := c.env.UploadV3GlobalCheckpointForTask(ctx, c.task.Name, c.lastCheckpoint); err != nil { + return errors.Annotate(err, "failed to upload global checkpoint") } - cx, cancel := context.WithTimeout(ctx, c.Config().TickTimeout()) - defer cancel() + return nil +} +func (c *CheckpointAdvancer) optionalTick(cx context.Context) error { threshold := c.Config().GetDefaultStartPollThreshold() if err := c.subscribeTick(cx); err != nil { log.Warn("[log backup advancer] Subscriber meet error, would polling the checkpoint.", logutil.ShortError(err)) @@ -397,6 +406,32 @@ func (c *CheckpointAdvancer) tick(ctx context.Context) error { if err != nil { return err } - return nil } + +func (c *CheckpointAdvancer) tick(ctx context.Context) error { + c.taskMu.Lock() + defer c.taskMu.Unlock() + if c.task == nil { + log.Debug("No tasks yet, skipping advancing.") + return nil + } + + var errs error + + cx, cancel := context.WithTimeout(ctx, c.Config().TickTimeout()) + defer cancel() + err := c.optionalTick(cx) + if err != nil { + log.Warn("[log backup advancer] option tick failed.", logutil.ShortError(err)) + errs = multierr.Append(errs, err) + } + + err = c.importantTick(ctx) + if err != nil { + log.Warn("[log backup advancer] important tick failed.", logutil.ShortError(err)) + errs = multierr.Append(errs, err) + } + + return errs +} diff --git a/br/pkg/streamhelper/advancer_cliext.go b/br/pkg/streamhelper/advancer_cliext.go index 611ad3744dfa8..d83e5a6ce1eb7 100644 --- a/br/pkg/streamhelper/advancer_cliext.go +++ b/br/pkg/streamhelper/advancer_cliext.go @@ -5,15 +5,19 @@ package streamhelper import ( "bytes" "context" + "encoding/binary" "fmt" "strings" "github.com/golang/protobuf/proto" "github.com/pingcap/errors" backuppb "github.com/pingcap/kvproto/pkg/brpb" + "github.com/pingcap/log" berrors "github.com/pingcap/tidb/br/pkg/errors" + "github.com/pingcap/tidb/br/pkg/redact" "github.com/pingcap/tidb/kv" clientv3 "go.etcd.io/etcd/client/v3" + "go.uber.org/zap" ) type EventType int @@ -181,11 +185,43 @@ func (t AdvancerExt) Begin(ctx context.Context, ch chan<- TaskEvent) error { return nil } +func (t AdvancerExt) getGlobalCheckpointForTask(ctx context.Context, taskName string) (uint64, error) { + key := GlobalCheckpointOf(taskName) + resp, err := t.KV.Get(ctx, key) + if err != nil { + return 0, err + } + + if len(resp.Kvs) == 0 { + return 0, nil + } + + firstKV := resp.Kvs[0] + value := firstKV.Value + if len(value) != 8 { + return 0, errors.Annotatef(berrors.ErrPiTRMalformedMetadata, + "the global checkpoint isn't 64bits (it is %d bytes, value = %s)", + len(value), + redact.Key(value)) + } + + return binary.BigEndian.Uint64(value), nil +} + func (t AdvancerExt) UploadV3GlobalCheckpointForTask(ctx context.Context, taskName string, checkpoint uint64) error { key := GlobalCheckpointOf(taskName) value := string(encodeUint64(checkpoint)) - _, err := t.KV.Put(ctx, key, value) + oldValue, err := t.getGlobalCheckpointForTask(ctx, taskName) + if err != nil { + return err + } + + if checkpoint < oldValue { + log.Warn("[log backup advancer] skipping upload global checkpoint", zap.Uint64("old", oldValue), zap.Uint64("new", checkpoint)) + return nil + } + _, err = t.KV.Put(ctx, key, value) if err != nil { return err } diff --git a/br/pkg/streamhelper/config/advancer_conf.go b/br/pkg/streamhelper/config/advancer_conf.go index 45cdab8a68e46..1440a81f932f9 100644 --- a/br/pkg/streamhelper/config/advancer_conf.go +++ b/br/pkg/streamhelper/config/advancer_conf.go @@ -16,7 +16,7 @@ const ( flagTryAdvanceThreshold = "try-advance-threshold" DefaultConsistencyCheckTick = 5 - DefaultTryAdvanceThreshold = 9 * time.Minute + DefaultTryAdvanceThreshold = 4 * time.Minute DefaultBackOffTime = 5 * time.Second DefaultTickInterval = 12 * time.Second DefaultFullScanTick = 4 @@ -76,11 +76,18 @@ func (conf Config) GetDefaultStartPollThreshold() time.Duration { // GetSubscriberErrorStartPollThreshold returns the threshold of begin polling the checkpoint // when the subscriber meets error. func (conf Config) GetSubscriberErrorStartPollThreshold() time.Duration { - return conf.TryAdvanceThreshold / 5 + // 0.45x of the origin threshold. + // The origin threshold is 0.8x the target RPO, + // and the default flush interval is about 0.5x the target RPO. + // So the relationship between the RPO and the threshold is: + // When subscription is all available, it is 1.7x of the flush interval (which allow us to save in abnormal condition). + // When some of subscriptions are not available, it is 0.75x of the flush interval. + // NOTE: can we make subscription better and give up the poll model? + return conf.TryAdvanceThreshold * 9 / 20 } // TickTimeout returns the max duration for each tick. func (conf Config) TickTimeout() time.Duration { - // If a tick blocks 10x the interval of ticking, we may need to break it and retry. - return 10 * conf.TickDuration + // If a tick blocks longer than the interval of ticking, we may need to break it and retry. + return conf.TickDuration } diff --git a/br/pkg/streamhelper/integration_test.go b/br/pkg/streamhelper/integration_test.go index b3baf433c43f6..f00d15c4b1f30 100644 --- a/br/pkg/streamhelper/integration_test.go +++ b/br/pkg/streamhelper/integration_test.go @@ -352,10 +352,12 @@ func testStreamCheckpoint(t *testing.T, metaCli streamhelper.AdvancerExt) { req.Len(resp.Kvs, 1) return binary.BigEndian.Uint64(resp.Kvs[0].Value) } - metaCli.UploadV3GlobalCheckpointForTask(ctx, task, 5) + req.NoError(metaCli.UploadV3GlobalCheckpointForTask(ctx, task, 5)) req.EqualValues(5, getCheckpoint()) - metaCli.UploadV3GlobalCheckpointForTask(ctx, task, 18) + req.NoError(metaCli.UploadV3GlobalCheckpointForTask(ctx, task, 18)) req.EqualValues(18, getCheckpoint()) - metaCli.ClearV3GlobalCheckpointForTask(ctx, task) + req.NoError(metaCli.UploadV3GlobalCheckpointForTask(ctx, task, 16)) + req.EqualValues(18, getCheckpoint()) + req.NoError(metaCli.ClearV3GlobalCheckpointForTask(ctx, task)) req.EqualValues(0, getCheckpoint()) }