From edc3ec44c049bbb590b2eb49721bdd63d9064d08 Mon Sep 17 00:00:00 2001 From: Vytenis Darulis Date: Mon, 14 Dec 2020 19:21:37 -0500 Subject: [PATCH] [cluster] Watch follow-ups (#3007) --- src/cluster/etcd/watchmanager/manager.go | 53 +++++++++++++----- src/cluster/etcd/watchmanager/manager_test.go | 55 ++++++++----------- src/cluster/kv/etcd/store_test.go | 43 +++++++++------ 3 files changed, 86 insertions(+), 65 deletions(-) diff --git a/src/cluster/etcd/watchmanager/manager.go b/src/cluster/etcd/watchmanager/manager.go index 02fefd320e..ccdcf31066 100644 --- a/src/cluster/etcd/watchmanager/manager.go +++ b/src/cluster/etcd/watchmanager/manager.go @@ -23,6 +23,7 @@ package watchmanager import ( "context" "fmt" + "math/rand" "time" "github.com/uber-go/tally" @@ -80,6 +81,7 @@ func (w *manager) watchChanWithTimeout(key string, rev int64) (clientv3.WatchCha if rev > 0 { wOpts = append(wOpts, clientv3.WithRev(rev)) } + watchChan = watcher.Watch( ctx, key, @@ -91,8 +93,14 @@ func (w *manager) watchChanWithTimeout(key string, rev int64) (clientv3.WatchCha var ( timeout = w.opts.WatchChanInitTimeout() cancelWatchFn = func() { + // we *must* both cancel the context and call .Close() on watch to + // properly free resources, and not end up with weird issues due to stale + // grpc streams or bad internal etcd watch state. cancelFn() if err := watcher.Close(); err != nil { + // however, there's nothing we can do about an error on watch close, + // and it shouldn't happen in practice - unless we end up + // closing an already closed grpc stream or smth. w.logger.Info("error closing watcher", zap.Error(err)) } } @@ -102,6 +110,7 @@ func (w *manager) watchChanWithTimeout(key string, rev int64) (clientv3.WatchCha case <-doneCh: return watchChan, cancelWatchFn, nil case <-time.After(timeout): + cancelWatchFn() err := fmt.Errorf("etcd watch create timed out after %s for key: %s", timeout.String(), key) return nil, cancelWatchFn, err } @@ -111,11 +120,13 @@ func (w *manager) Watch(key string) { var ( ticker = time.NewTicker(w.opts.WatchChanCheckInterval()) logger = w.logger.With(zap.String("watch_key", key)) + rnd = rand.New(rand.NewSource(time.Now().UnixNano())) //nolint:gosec - revOverride int64 - watchChan clientv3.WatchChan - cancelFn context.CancelFunc - err error + revOverride int64 + firstUpdateSucceeded bool + watchChan clientv3.WatchChan + cancelFn context.CancelFunc + err error ) defer ticker.Stop() @@ -127,7 +138,9 @@ func (w *manager) Watch(key string) { // set it to nil so it will be recreated watchChan = nil // avoid recreating watch channel too frequently - time.Sleep(w.opts.WatchChanResetInterval()) + dur := w.opts.WatchChanResetInterval() + dur += time.Duration(rnd.Int63n(int64(dur))) + time.Sleep(dur) } for { @@ -140,8 +153,14 @@ func (w *manager) Watch(key string) { // NB(cw) when we failed to create a etcd watch channel // we do a get for now and will try to recreate the watch chan later - if err = w.updateFn(key, nil); err != nil { - logger.Error("failed to get value for key", zap.Error(err)) + if !firstUpdateSucceeded { + if err = w.updateFn(key, nil); err != nil { + logger.Error("failed to get value for key", zap.Error(err)) + } else { + // NB(vytenis): only try initializing once, otherwise there's + // get request amplification, especially for non-existent keys. + firstUpdateSucceeded = true + } } resetWatchWithSleep() continue @@ -166,20 +185,26 @@ func (w *manager) Watch(key string) { zap.Error(err), ) w.m.etcdWatchError.Inc(1) - // do not stop here, even though the update contains an error - // we still take this chance to attempt a Get() for the latest value - - // If the current revision has been compacted, set watchChan to - // nil so the watch is recreated with a valid start revision if err == rpctypes.ErrCompacted { - logger.Warn("recreating watch at revision", zap.Int64("revision", r.CompactRevision)) revOverride = r.CompactRevision + logger.Warn("compacted; recreating watch at revision", + zap.Int64("revision", revOverride)) } else { - logger.Warn("recreating watch due to an error") + logger.Warn("recreating watch due to an error", zap.Error(err)) } resetWatchWithSleep() + continue } else if r.IsProgressNotify() { + if r.CompactRevision > revOverride { + // we only care about last event as this watchmanager implementation does not support + // watching key ranges, only single keys. + // set revOverride to minimum non-compacted revision if watch was + // initialized with an older rev., since we really don't care about history. + // this may help recover faster (one less retry) on connection loss/leader change + // around compaction, if we were watching on a revision that's already compacted. + revOverride = r.CompactRevision + } // Do not call updateFn on ProgressNotify as it happens periodically with no update events continue } diff --git a/src/cluster/etcd/watchmanager/manager_test.go b/src/cluster/etcd/watchmanager/manager_test.go index ad1eda0d33..d3720132eb 100644 --- a/src/cluster/etcd/watchmanager/manager_test.go +++ b/src/cluster/etcd/watchmanager/manager_test.go @@ -119,11 +119,11 @@ func TestWatchRecreate(t *testing.T) { ec := ecluster.RandClient() - failTotal := 2 + failTotal := 1 wh.opts = wh.opts. SetClient(ec). - SetWatchChanInitTimeout(200 * time.Millisecond). - SetWatchChanResetInterval(100 * time.Millisecond) + SetWatchChanInitTimeout(50 * time.Millisecond). + SetWatchChanResetInterval(50 * time.Millisecond) go func() { ecluster.Members[0].DropConnections() @@ -131,10 +131,10 @@ func TestWatchRecreate(t *testing.T) { wh.Watch("foo") }() - time.Sleep(2 * wh.opts.WatchChanInitTimeout()) + time.Sleep(4 * wh.opts.WatchChanInitTimeout()) // watch will error out but updateFn will be tried - for { + for i := 0; i < 100; i++ { if atomic.LoadInt32(updateCalled) >= int32(failTotal) { break } @@ -150,7 +150,7 @@ func TestWatchRecreate(t *testing.T) { _, err := ec.Put(context.Background(), "foo", "v") require.NoError(t, err) - for { + for i := 0; i < 100; i++ { if atomic.LoadInt32(updateCalled) > updatesBefore { break } @@ -223,13 +223,6 @@ func TestWatchNoLeader(t *testing.T) { leaderIdx := ecluster.WaitLeader(t) require.True(t, leaderIdx >= 0 && leaderIdx < len(ecluster.Members), "got invalid leader") - for i := 0; i < 10; i++ { - if atomic.LoadInt32(&updateCalled) == int32(3) { - break - } - time.Sleep(watchInitAndRetryDelay) - } - // simulate quorum loss ecluster.Members[1].Stop(t) ecluster.Members[2].Stop(t) @@ -237,23 +230,6 @@ func TestWatchNoLeader(t *testing.T) { // wait for election timeout, then member[0] will not have a leader. time.Sleep(electionTimeout) - for i := 0; i < 100; i++ { - // test that leader loss is retried - even on error, we should attempt update. - // 5 is an arbitraty number greater than amount of actual updates - if atomic.LoadInt32(&updateCalled) >= 10 { - break - } - time.Sleep(watchInitAndRetryDelay) - } - - updates := atomic.LoadInt32(&updateCalled) - if updates < 10 { - require.Fail(t, - "insufficient update calls", - "expected at least 10 update attempts, got %d during a partition", - updates) - } - require.NoError(t, ecluster.Members[1].Restart(t)) require.NoError(t, ecluster.Members[2].Restart(t)) // wait for leader + election delay just in case @@ -266,8 +242,21 @@ func TestWatchNoLeader(t *testing.T) { require.NoError(t, err) // give some time for watch to be updated - runtime.Gosched() - time.Sleep(watchInitAndRetryDelay) + for i := 0; i < 10; i++ { + if atomic.LoadInt32(&updateCalled) == int32(2) { + break + } + time.Sleep(watchInitAndRetryDelay) + runtime.Gosched() + } + + updates := atomic.LoadInt32(&updateCalled) + if updates < 2 { + require.Fail(t, + "insufficient update calls", + "expected at least 2 update attempts, got %d during a partition", + updates) + } atomic.AddInt32(&shouldStop, 1) <-doneCh @@ -308,7 +297,7 @@ func TestWatchCompactedRevision(t *testing.T) { go wh.Watch("foo") time.Sleep(3 * wh.opts.WatchChanInitTimeout()) - assert.Equal(t, int32(4), atomic.LoadInt32(updateCalled)) + assert.Equal(t, int32(3), atomic.LoadInt32(updateCalled)) lastRead := atomic.LoadInt32(updateCalled) ec.Put(context.Background(), "foo", "bar-11") diff --git a/src/cluster/kv/etcd/store_test.go b/src/cluster/kv/etcd/store_test.go index 8924095d3f..bbba9f9966 100644 --- a/src/cluster/kv/etcd/store_test.go +++ b/src/cluster/kv/etcd/store_test.go @@ -27,7 +27,6 @@ import ( "io/ioutil" "os" "path" - "sync/atomic" "testing" "time" @@ -37,6 +36,7 @@ import ( "github.com/m3db/m3/src/x/retry" "github.com/golang/protobuf/proto" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/integration" @@ -318,27 +318,35 @@ func TestWatchLastVersion(t *testing.T) { require.NoError(t, err) require.Nil(t, w.Get()) - var errs int32 - lastVersion := 50 + var ( + doneCh = make(chan struct{}) + lastVersion = 50 + ) + go func() { for i := 1; i <= lastVersion; i++ { _, err := store.Set("foo", genProto(fmt.Sprintf("bar%d", i))) - if err != nil { - atomic.AddInt32(&errs, 1) - } + assert.NoError(t, err) } }() - for { - <-w.C() - value := w.Get() - if value.Version() == lastVersion-int(atomic.LoadInt32(&errs)) { - break + go func() { + defer close(doneCh) + for { + <-w.C() + value := w.Get() + if value.Version() == lastVersion { + return + } } + }() + + select { + case <-time.After(5 * time.Second): + t.Fatal("test timed out") + case <-doneCh: } verifyValue(t, w.Get(), fmt.Sprintf("bar%d", lastVersion), lastVersion) - - w.Close() } func TestWatchFromExist(t *testing.T) { @@ -877,7 +885,6 @@ func TestStaleDelete__FromWatch(t *testing.T) { // in this test we ensure clients who did not receive a delete for a key in // their caches, evict the value in their cache the next time they communicate // with an etcd which is unaware of the key (e.g. it's been compacted). - // first, we find the bytes required to be created in the cache file serverCachePath, err := ioutil.TempDir("", "server-cache-dir") require.NoError(t, err) @@ -1156,10 +1163,10 @@ func testCluster(t *testing.T) (*integration.ClusterV3, Options, func()) { } opts := NewOptions(). - SetWatchChanCheckInterval(50 * time.Millisecond). - SetWatchChanResetInterval(150 * time.Millisecond). - SetWatchChanInitTimeout(150 * time.Millisecond). - SetRequestTimeout(100 * time.Millisecond). + SetWatchChanCheckInterval(100 * time.Millisecond). + SetWatchChanResetInterval(200 * time.Millisecond). + SetWatchChanInitTimeout(200 * time.Millisecond). + SetRequestTimeout(200 * time.Millisecond). SetRetryOptions(retry.NewOptions().SetMaxRetries(1).SetMaxBackoff(0)). SetPrefix("test")