Skip to content

Commit

Permalink
[cluster] Fix flaky watchmanager tests (#3091)
Browse files Browse the repository at this point in the history
  • Loading branch information
vdarulis authored Jan 15, 2021
1 parent 6746e4f commit 146cad6
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 28 deletions.
16 changes: 10 additions & 6 deletions src/cluster/etcd/watchmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ import (
)

func TestWatchChan(t *testing.T) {
t.Parallel()
wh, ecluster, _, _, _, closer := testCluster(t) //nolint:dogsled
defer closer()

ec := ecluster.RandClient()
integration.WaitClientV3(t, ec)

wc, _, err := wh.watchChanWithTimeout("foo", 0)
require.NoError(t, err)
Expand All @@ -67,9 +67,9 @@ func TestWatchChan(t *testing.T) {
}

func TestWatchSimple(t *testing.T) {
t.Parallel()
wh, ec, updateCalled, shouldStop, doneCh, closer := testSetup(t)
defer closer()
integration.WaitClientV3(t, ec)
require.Equal(t, int32(0), atomic.LoadInt32(updateCalled))

go wh.Watch("foo")
Expand Down Expand Up @@ -115,11 +115,11 @@ func TestWatchSimple(t *testing.T) {
}

func TestWatchRecreate(t *testing.T) {
t.Parallel()
wh, ecluster, updateCalled, shouldStop, doneCh, closer := testCluster(t)
defer closer()

ec := ecluster.RandClient()
integration.WaitClientV3(t, ec)

failTotal := 1
wh.opts = wh.opts.
Expand Down Expand Up @@ -165,7 +165,6 @@ func TestWatchRecreate(t *testing.T) {
}

func TestWatchNoLeader(t *testing.T) {
t.Parallel()
const (
watchInitAndRetryDelay = 200 * time.Millisecond
watchCheckInterval = 50 * time.Millisecond
Expand Down Expand Up @@ -210,6 +209,8 @@ func TestWatchNoLeader(t *testing.T) {
SetWatchChanResetInterval(watchInitAndRetryDelay).
SetWatchChanCheckInterval(watchCheckInterval)

integration.WaitClientV3(t, ec)

wh, err := NewWatchManager(opts)
require.NoError(t, err)

Expand All @@ -234,19 +235,21 @@ func TestWatchNoLeader(t *testing.T) {

require.NoError(t, ecluster.Members[1].Restart(t))
require.NoError(t, ecluster.Members[2].Restart(t))

// wait for leader + election delay just in case
time.Sleep(time.Duration(3*ecluster.Members[0].ElectionTicks) * tickDuration)

leaderIdx = ecluster.WaitLeader(t)
require.True(t, leaderIdx >= 0 && leaderIdx < len(ecluster.Members), "got invalid leader")
integration.WaitClientV3(t, ec) // wait for client to be ready again

_, err = ec.Put(context.Background(), "foo", "baz")
require.NoError(t, err)

// give some time for watch to be updated
require.True(t, clock.WaitUntil(func() bool {
return atomic.LoadInt32(&updateCalled) >= 2
}, 30*time.Second))
}, 10*time.Second))

updates := atomic.LoadInt32(&updateCalled)
if updates < 2 {
Expand All @@ -269,10 +272,11 @@ func TestWatchNoLeader(t *testing.T) {
}

func TestWatchCompactedRevision(t *testing.T) {
t.Parallel()
wh, ec, updateCalled, shouldStop, doneCh, closer := testSetup(t)
defer closer()

integration.WaitClientV3(t, ec)

ts := tally.NewTestScope("", nil)
errC := ts.Counter("errors")
wh.m.etcdWatchError = errC
Expand Down
22 changes: 0 additions & 22 deletions src/cluster/kv/etcd/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,6 @@ func TestGetAndSet(t *testing.T) {
}

func TestNoCache(t *testing.T) {
t.Parallel()

ec, opts, closeFn := testStore(t)

store, err := NewStore(ec, opts)
Expand Down Expand Up @@ -152,8 +150,6 @@ func TestCacheDirCreation(t *testing.T) {
}

func TestCache(t *testing.T) {
t.Parallel()

ec, opts, closeFn := testStore(t)

f, err := ioutil.TempFile("", "")
Expand Down Expand Up @@ -206,8 +202,6 @@ func TestCache(t *testing.T) {
}

func TestSetIfNotExist(t *testing.T) {
t.Parallel()

ec, opts, closeFn := testStore(t)
defer closeFn()

Expand All @@ -227,8 +221,6 @@ func TestSetIfNotExist(t *testing.T) {
}

func TestCheckAndSet(t *testing.T) {
t.Parallel()

ec, opts, closeFn := testStore(t)
defer closeFn()

Expand All @@ -255,8 +247,6 @@ func TestCheckAndSet(t *testing.T) {
}

func TestWatchClose(t *testing.T) {
t.Parallel()

ec, opts, closeFn := testStore(t)
defer closeFn()

Expand Down Expand Up @@ -306,8 +296,6 @@ func TestWatchClose(t *testing.T) {
}

func TestWatchLastVersion(t *testing.T) {
t.Parallel()

ec, opts, closeFn := testStore(t)
defer closeFn()

Expand Down Expand Up @@ -350,8 +338,6 @@ func TestWatchLastVersion(t *testing.T) {
}

func TestWatchFromExist(t *testing.T) {
t.Parallel()

ec, opts, closeFn := testStore(t)
defer closeFn()

Expand Down Expand Up @@ -389,8 +375,6 @@ func TestWatchFromExist(t *testing.T) {
}

func TestWatchFromNotExist(t *testing.T) {
t.Parallel()

ec, opts, closeFn := testStore(t)
defer closeFn()

Expand Down Expand Up @@ -434,8 +418,6 @@ func TestGetFromKvNotFound(t *testing.T) {
}

func TestMultipleWatchesFromExist(t *testing.T) {
t.Parallel()

ec, opts, closeFn := testStore(t)
defer closeFn()

Expand Down Expand Up @@ -486,8 +468,6 @@ func TestMultipleWatchesFromExist(t *testing.T) {
}

func TestMultipleWatchesFromNotExist(t *testing.T) {
t.Parallel()

ec, opts, closeFn := testStore(t)
defer closeFn()

Expand Down Expand Up @@ -530,8 +510,6 @@ func TestMultipleWatchesFromNotExist(t *testing.T) {
}

func TestWatchNonBlocking(t *testing.T) {
t.Parallel()

ecluster, opts, closeFn := testCluster(t)
defer closeFn()

Expand Down

0 comments on commit 146cad6

Please sign in to comment.