Skip to content

Commit

Permalink
br/streamhelper: added timeout for ticking (#39625)
Browse files Browse the repository at this point in the history
close #39620
  • Loading branch information
YuJuncen authored Dec 7, 2022
1 parent 26bbb71 commit d7d059c
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 7 deletions.
1 change: 1 addition & 0 deletions br/pkg/streamhelper/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ go_test(
"//br/pkg/logutil",
"//br/pkg/redact",
"//br/pkg/storage",
"//br/pkg/streamhelper/config",
"//br/pkg/streamhelper/spans",
"//br/pkg/utils",
"//kv",
Expand Down
8 changes: 5 additions & 3 deletions br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,15 +382,17 @@ func (c *CheckpointAdvancer) tick(ctx context.Context) error {
log.Debug("No tasks yet, skipping advancing.")
return nil
}
cx, cancel := context.WithTimeout(ctx, c.Config().TickTimeout())
defer cancel()

threshold := c.Config().GetDefaultStartPollThreshold()
if err := c.subscribeTick(ctx); err != nil {
if err := c.subscribeTick(cx); err != nil {
log.Warn("[log backup advancer] Subscriber meet error, would polling the checkpoint.", logutil.ShortError(err))
threshold = c.Config().GetSubscriberErrorStartPollThreshold()
}

err := c.advanceCheckpointBy(ctx, func(ctx context.Context) (uint64, error) {
return c.CalculateGlobalCheckpointLight(ctx, threshold)
err := c.advanceCheckpointBy(cx, func(cx context.Context) (uint64, error) {
return c.CalculateGlobalCheckpointLight(cx, threshold)
})
if err != nil {
return err
Expand Down
36 changes: 36 additions & 0 deletions br/pkg/streamhelper/advancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@ import (
"testing"
"time"

"github.com/pingcap/errors"
logbackup "github.com/pingcap/kvproto/pkg/logbackuppb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/streamhelper"
"github.com/pingcap/tidb/br/pkg/streamhelper/config"
"github.com/pingcap/tidb/kv"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -219,3 +222,36 @@ func TestTaskRangesWithSplit(t *testing.T) {
shouldFinishInTime(t, 10*time.Second, "second advancing", func() { require.NoError(t, adv.OnTick(ctx)) })
require.Greater(t, env.getCheckpoint(), fstCheckpoint)
}

func TestBlocked(t *testing.T) {
log.SetLevel(zapcore.DebugLevel)
c := createFakeCluster(t, 4, true)
ctx := context.Background()
req := require.New(t)
c.splitAndScatter("0012", "0034", "0048")
marked := false
for _, s := range c.stores {
s.clientMu.Lock()
s.onGetRegionCheckpoint = func(glftrr *logbackup.GetLastFlushTSOfRegionRequest) error {
// blocking the thread.
// this may happen when TiKV goes down or too busy.
<-(chan struct{})(nil)
return nil
}
s.clientMu.Unlock()
marked = true
}
req.True(marked, "failed to mark the cluster: ")
env := &testEnv{fakeCluster: c, testCtx: t}
adv := streamhelper.NewCheckpointAdvancer(env)
adv.StartTaskListener(ctx)
adv.UpdateConfigWith(func(c *config.Config) {
// ... So the tick timeout would be 100ms
c.TickDuration = 10 * time.Millisecond
})
var err error
shouldFinishInTime(t, time.Second, "ticking", func() {
err = adv.OnTick(ctx)
})
req.ErrorIs(errors.Cause(err), context.DeadlineExceeded)
}
15 changes: 11 additions & 4 deletions br/pkg/streamhelper/basic_lib_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,11 @@ type fakeStore struct {
id uint64
regions map[uint64]*region

clientMu sync.Mutex
supportsSub bool
bootstrapAt uint64
fsub func(logbackup.SubscribeFlushEventResponse)
clientMu sync.Mutex
supportsSub bool
bootstrapAt uint64
fsub func(logbackup.SubscribeFlushEventResponse)
onGetRegionCheckpoint func(*logbackup.GetLastFlushTSOfRegionRequest) error
}

type fakeCluster struct {
Expand Down Expand Up @@ -184,6 +185,12 @@ func (f *fakeStore) SetSupportFlushSub(b bool) {
}

func (f *fakeStore) GetLastFlushTSOfRegion(ctx context.Context, in *logbackup.GetLastFlushTSOfRegionRequest, opts ...grpc.CallOption) (*logbackup.GetLastFlushTSOfRegionResponse, error) {
if f.onGetRegionCheckpoint != nil {
err := f.onGetRegionCheckpoint(in)
if err != nil {
return nil, err
}
}
resp := &logbackup.GetLastFlushTSOfRegionResponse{
Checkpoints: []*logbackup.RegionCheckpoint{},
}
Expand Down
6 changes: 6 additions & 0 deletions br/pkg/streamhelper/config/advancer_conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,9 @@ func (conf Config) GetDefaultStartPollThreshold() time.Duration {
func (conf Config) GetSubscriberErrorStartPollThreshold() time.Duration {
return conf.TryAdvanceThreshold / 5
}

// TickTimeout returns the max duration for each tick.
func (conf Config) TickTimeout() time.Duration {
// If a tick blocks 10x the interval of ticking, we may need to break it and retry.
return 10 * conf.TickDuration
}

0 comments on commit d7d059c

Please sign in to comment.