diff --git a/br/pkg/streamhelper/BUILD.bazel b/br/pkg/streamhelper/BUILD.bazel index c18b015e596ea..3c281563439de 100644 --- a/br/pkg/streamhelper/BUILD.bazel +++ b/br/pkg/streamhelper/BUILD.bazel @@ -71,6 +71,7 @@ go_test( "//br/pkg/logutil", "//br/pkg/redact", "//br/pkg/storage", + "//br/pkg/streamhelper/config", "//br/pkg/streamhelper/spans", "//br/pkg/utils", "//kv", diff --git a/br/pkg/streamhelper/advancer.go b/br/pkg/streamhelper/advancer.go index 0e28a150cc157..407134bc6027a 100644 --- a/br/pkg/streamhelper/advancer.go +++ b/br/pkg/streamhelper/advancer.go @@ -382,15 +382,17 @@ func (c *CheckpointAdvancer) tick(ctx context.Context) error { log.Debug("No tasks yet, skipping advancing.") return nil } + cx, cancel := context.WithTimeout(ctx, c.Config().TickTimeout()) + defer cancel() threshold := c.Config().GetDefaultStartPollThreshold() - if err := c.subscribeTick(ctx); err != nil { + if err := c.subscribeTick(cx); err != nil { log.Warn("[log backup advancer] Subscriber meet error, would polling the checkpoint.", logutil.ShortError(err)) threshold = c.Config().GetSubscriberErrorStartPollThreshold() } - err := c.advanceCheckpointBy(ctx, func(ctx context.Context) (uint64, error) { - return c.CalculateGlobalCheckpointLight(ctx, threshold) + err := c.advanceCheckpointBy(cx, func(cx context.Context) (uint64, error) { + return c.CalculateGlobalCheckpointLight(cx, threshold) }) if err != nil { return err diff --git a/br/pkg/streamhelper/advancer_test.go b/br/pkg/streamhelper/advancer_test.go index 0e924a4db8013..7dd4c71d35b9c 100644 --- a/br/pkg/streamhelper/advancer_test.go +++ b/br/pkg/streamhelper/advancer_test.go @@ -9,8 +9,11 @@ import ( "testing" "time" + "github.com/pingcap/errors" + logbackup "github.com/pingcap/kvproto/pkg/logbackuppb" "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/streamhelper" + "github.com/pingcap/tidb/br/pkg/streamhelper/config" "github.com/pingcap/tidb/kv" "github.com/stretchr/testify/require" "go.uber.org/zap/zapcore" @@ -219,3 +222,36 @@ func TestTaskRangesWithSplit(t *testing.T) { shouldFinishInTime(t, 10*time.Second, "second advancing", func() { require.NoError(t, adv.OnTick(ctx)) }) require.Greater(t, env.getCheckpoint(), fstCheckpoint) } + +func TestBlocked(t *testing.T) { + log.SetLevel(zapcore.DebugLevel) + c := createFakeCluster(t, 4, true) + ctx := context.Background() + req := require.New(t) + c.splitAndScatter("0012", "0034", "0048") + marked := false + for _, s := range c.stores { + s.clientMu.Lock() + s.onGetRegionCheckpoint = func(glftrr *logbackup.GetLastFlushTSOfRegionRequest) error { + // blocking the thread. + // this may happen when TiKV goes down or too busy. + <-(chan struct{})(nil) + return nil + } + s.clientMu.Unlock() + marked = true + } + req.True(marked, "failed to mark the cluster: ") + env := &testEnv{fakeCluster: c, testCtx: t} + adv := streamhelper.NewCheckpointAdvancer(env) + adv.StartTaskListener(ctx) + adv.UpdateConfigWith(func(c *config.Config) { + // ... So the tick timeout would be 100ms + c.TickDuration = 10 * time.Millisecond + }) + var err error + shouldFinishInTime(t, time.Second, "ticking", func() { + err = adv.OnTick(ctx) + }) + req.ErrorIs(errors.Cause(err), context.DeadlineExceeded) +} diff --git a/br/pkg/streamhelper/basic_lib_for_test.go b/br/pkg/streamhelper/basic_lib_for_test.go index 9598b5d376fe8..1dff77dd72864 100644 --- a/br/pkg/streamhelper/basic_lib_for_test.go +++ b/br/pkg/streamhelper/basic_lib_for_test.go @@ -77,10 +77,11 @@ type fakeStore struct { id uint64 regions map[uint64]*region - clientMu sync.Mutex - supportsSub bool - bootstrapAt uint64 - fsub func(logbackup.SubscribeFlushEventResponse) + clientMu sync.Mutex + supportsSub bool + bootstrapAt uint64 + fsub func(logbackup.SubscribeFlushEventResponse) + onGetRegionCheckpoint func(*logbackup.GetLastFlushTSOfRegionRequest) error } type fakeCluster struct { @@ -184,6 +185,12 @@ func (f *fakeStore) SetSupportFlushSub(b bool) { } func (f *fakeStore) GetLastFlushTSOfRegion(ctx context.Context, in *logbackup.GetLastFlushTSOfRegionRequest, opts ...grpc.CallOption) (*logbackup.GetLastFlushTSOfRegionResponse, error) { + if f.onGetRegionCheckpoint != nil { + err := f.onGetRegionCheckpoint(in) + if err != nil { + return nil, err + } + } resp := &logbackup.GetLastFlushTSOfRegionResponse{ Checkpoints: []*logbackup.RegionCheckpoint{}, } diff --git a/br/pkg/streamhelper/config/advancer_conf.go b/br/pkg/streamhelper/config/advancer_conf.go index 10a645f8721e7..45cdab8a68e46 100644 --- a/br/pkg/streamhelper/config/advancer_conf.go +++ b/br/pkg/streamhelper/config/advancer_conf.go @@ -78,3 +78,9 @@ func (conf Config) GetDefaultStartPollThreshold() time.Duration { func (conf Config) GetSubscriberErrorStartPollThreshold() time.Duration { return conf.TryAdvanceThreshold / 5 } + +// TickTimeout returns the max duration for each tick. +func (conf Config) TickTimeout() time.Duration { + // If a tick blocks 10x the interval of ticking, we may need to break it and retry. + return 10 * conf.TickDuration +}