diff --git a/br/pkg/streamhelper/subscription_test.go b/br/pkg/streamhelper/subscription_test.go index 2341cb05dc01e..da7aa627eabd0 100644 --- a/br/pkg/streamhelper/subscription_test.go +++ b/br/pkg/streamhelper/subscription_test.go @@ -7,6 +7,7 @@ import ( "fmt" "sync" "testing" + "time" "github.com/pingcap/tidb/br/pkg/streamhelper" "github.com/pingcap/tidb/br/pkg/streamhelper/spans" @@ -32,6 +33,16 @@ func installSubscribeSupportForRandomN(c *fakeCluster, n int) { } } +func waitPendingEvents(t *testing.T, sub *streamhelper.FlushSubscriber) { + last := len(sub.Events()) + time.Sleep(100 * time.Microsecond) + require.Eventually(t, func() bool { + noProg := len(sub.Events()) == last + last = len(sub.Events()) + return noProg + }, 3*time.Second, 100*time.Millisecond) +} + func TestSubBasic(t *testing.T) { req := require.New(t) ctx := context.Background() @@ -47,6 +58,7 @@ func TestSubBasic(t *testing.T) { } sub.HandleErrors(ctx) req.NoError(sub.PendingErrors()) + waitPendingEvents(t, sub) sub.Drop() s := spans.Sorted(spans.NewFullWith(spans.Full(), 1)) for k := range sub.Events() { @@ -81,6 +93,7 @@ func TestNormalError(t *testing.T) { cp = c.advanceCheckpoints() c.flushAll() } + waitPendingEvents(t, sub) sub.Drop() s := spans.Sorted(spans.NewFullWith(spans.Full(), 1)) for k := range sub.Events() { @@ -155,6 +168,7 @@ func TestStoreRemoved(t *testing.T) { sub.HandleErrors(ctx) req.NoError(sub.PendingErrors()) + waitPendingEvents(t, sub) sub.Drop() s := spans.Sorted(spans.NewFullWith(spans.Full(), 1)) for k := range sub.Events() { @@ -188,6 +202,8 @@ func TestSomeOfStoreUnsupported(t *testing.T) { } s := spans.Sorted(spans.NewFullWith(spans.Full(), 1)) m := new(sync.Mutex) + + waitPendingEvents(t, sub) sub.Drop() for k := range sub.Events() { s.Merge(k)