diff --git a/src/cluster/etcd/watchmanager/manager_test.go b/src/cluster/etcd/watchmanager/manager_test.go index 3cd9e1646c..0a28d232df 100644 --- a/src/cluster/etcd/watchmanager/manager_test.go +++ b/src/cluster/etcd/watchmanager/manager_test.go @@ -38,11 +38,11 @@ import ( ) func TestWatchChan(t *testing.T) { - t.Parallel() wh, ecluster, _, _, _, closer := testCluster(t) //nolint:dogsled defer closer() ec := ecluster.RandClient() + integration.WaitClientV3(t, ec) wc, _, err := wh.watchChanWithTimeout("foo", 0) require.NoError(t, err) @@ -67,9 +67,9 @@ func TestWatchChan(t *testing.T) { } func TestWatchSimple(t *testing.T) { - t.Parallel() wh, ec, updateCalled, shouldStop, doneCh, closer := testSetup(t) defer closer() + integration.WaitClientV3(t, ec) require.Equal(t, int32(0), atomic.LoadInt32(updateCalled)) go wh.Watch("foo") @@ -115,11 +115,11 @@ func TestWatchSimple(t *testing.T) { } func TestWatchRecreate(t *testing.T) { - t.Parallel() wh, ecluster, updateCalled, shouldStop, doneCh, closer := testCluster(t) defer closer() ec := ecluster.RandClient() + integration.WaitClientV3(t, ec) failTotal := 1 wh.opts = wh.opts. @@ -165,7 +165,6 @@ func TestWatchRecreate(t *testing.T) { } func TestWatchNoLeader(t *testing.T) { - t.Parallel() const ( watchInitAndRetryDelay = 200 * time.Millisecond watchCheckInterval = 50 * time.Millisecond @@ -210,6 +209,8 @@ func TestWatchNoLeader(t *testing.T) { SetWatchChanResetInterval(watchInitAndRetryDelay). SetWatchChanCheckInterval(watchCheckInterval) + integration.WaitClientV3(t, ec) + wh, err := NewWatchManager(opts) require.NoError(t, err) @@ -234,11 +235,13 @@ func TestWatchNoLeader(t *testing.T) { require.NoError(t, ecluster.Members[1].Restart(t)) require.NoError(t, ecluster.Members[2].Restart(t)) + // wait for leader + election delay just in case time.Sleep(time.Duration(3*ecluster.Members[0].ElectionTicks) * tickDuration) leaderIdx = ecluster.WaitLeader(t) require.True(t, leaderIdx >= 0 && leaderIdx < len(ecluster.Members), "got invalid leader") + integration.WaitClientV3(t, ec) // wait for client to be ready again _, err = ec.Put(context.Background(), "foo", "baz") require.NoError(t, err) @@ -246,7 +249,7 @@ func TestWatchNoLeader(t *testing.T) { // give some time for watch to be updated require.True(t, clock.WaitUntil(func() bool { return atomic.LoadInt32(&updateCalled) >= 2 - }, 30*time.Second)) + }, 10*time.Second)) updates := atomic.LoadInt32(&updateCalled) if updates < 2 { @@ -269,10 +272,11 @@ func TestWatchNoLeader(t *testing.T) { } func TestWatchCompactedRevision(t *testing.T) { - t.Parallel() wh, ec, updateCalled, shouldStop, doneCh, closer := testSetup(t) defer closer() + integration.WaitClientV3(t, ec) + ts := tally.NewTestScope("", nil) errC := ts.Counter("errors") wh.m.etcdWatchError = errC diff --git a/src/cluster/kv/etcd/store_test.go b/src/cluster/kv/etcd/store_test.go index 70fb5e10a9..de5b36bef7 100644 --- a/src/cluster/kv/etcd/store_test.go +++ b/src/cluster/kv/etcd/store_test.go @@ -84,8 +84,6 @@ func TestGetAndSet(t *testing.T) { } func TestNoCache(t *testing.T) { - t.Parallel() - ec, opts, closeFn := testStore(t) store, err := NewStore(ec, opts) @@ -152,8 +150,6 @@ func TestCacheDirCreation(t *testing.T) { } func TestCache(t *testing.T) { - t.Parallel() - ec, opts, closeFn := testStore(t) f, err := ioutil.TempFile("", "") @@ -206,8 +202,6 @@ func TestCache(t *testing.T) { } func TestSetIfNotExist(t *testing.T) { - t.Parallel() - ec, opts, closeFn := testStore(t) defer closeFn() @@ -227,8 +221,6 @@ func TestSetIfNotExist(t *testing.T) { } func TestCheckAndSet(t *testing.T) { - t.Parallel() - ec, opts, closeFn := testStore(t) defer closeFn() @@ -255,8 +247,6 @@ func TestCheckAndSet(t *testing.T) { } func TestWatchClose(t *testing.T) { - t.Parallel() - ec, opts, closeFn := testStore(t) defer closeFn() @@ -306,8 +296,6 @@ func TestWatchClose(t *testing.T) { } func TestWatchLastVersion(t *testing.T) { - t.Parallel() - ec, opts, closeFn := testStore(t) defer closeFn() @@ -350,8 +338,6 @@ func TestWatchLastVersion(t *testing.T) { } func TestWatchFromExist(t *testing.T) { - t.Parallel() - ec, opts, closeFn := testStore(t) defer closeFn() @@ -389,8 +375,6 @@ func TestWatchFromExist(t *testing.T) { } func TestWatchFromNotExist(t *testing.T) { - t.Parallel() - ec, opts, closeFn := testStore(t) defer closeFn() @@ -434,8 +418,6 @@ func TestGetFromKvNotFound(t *testing.T) { } func TestMultipleWatchesFromExist(t *testing.T) { - t.Parallel() - ec, opts, closeFn := testStore(t) defer closeFn() @@ -486,8 +468,6 @@ func TestMultipleWatchesFromExist(t *testing.T) { } func TestMultipleWatchesFromNotExist(t *testing.T) { - t.Parallel() - ec, opts, closeFn := testStore(t) defer closeFn() @@ -530,8 +510,6 @@ func TestMultipleWatchesFromNotExist(t *testing.T) { } func TestWatchNonBlocking(t *testing.T) { - t.Parallel() - ecluster, opts, closeFn := testCluster(t) defer closeFn()