Skip to content

Commit

Permalink
[cluster] Watch follow-ups (#3007)
Browse files Browse the repository at this point in the history
  • Loading branch information
vdarulis authored Dec 15, 2020
1 parent ca7f928 commit edc3ec4
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 65 deletions.
53 changes: 39 additions & 14 deletions src/cluster/etcd/watchmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package watchmanager
import (
"context"
"fmt"
"math/rand"
"time"

"github.com/uber-go/tally"
Expand Down Expand Up @@ -80,6 +81,7 @@ func (w *manager) watchChanWithTimeout(key string, rev int64) (clientv3.WatchCha
if rev > 0 {
wOpts = append(wOpts, clientv3.WithRev(rev))
}

watchChan = watcher.Watch(
ctx,
key,
Expand All @@ -91,8 +93,14 @@ func (w *manager) watchChanWithTimeout(key string, rev int64) (clientv3.WatchCha
var (
timeout = w.opts.WatchChanInitTimeout()
cancelWatchFn = func() {
// we *must* both cancel the context and call .Close() on watch to
// properly free resources, and not end up with weird issues due to stale
// grpc streams or bad internal etcd watch state.
cancelFn()
if err := watcher.Close(); err != nil {
// however, there's nothing we can do about an error on watch close,
// and it shouldn't happen in practice - unless we end up
// closing an already closed grpc stream or smth.
w.logger.Info("error closing watcher", zap.Error(err))
}
}
Expand All @@ -102,6 +110,7 @@ func (w *manager) watchChanWithTimeout(key string, rev int64) (clientv3.WatchCha
case <-doneCh:
return watchChan, cancelWatchFn, nil
case <-time.After(timeout):
cancelWatchFn()
err := fmt.Errorf("etcd watch create timed out after %s for key: %s", timeout.String(), key)
return nil, cancelWatchFn, err
}
Expand All @@ -111,11 +120,13 @@ func (w *manager) Watch(key string) {
var (
ticker = time.NewTicker(w.opts.WatchChanCheckInterval())
logger = w.logger.With(zap.String("watch_key", key))
rnd = rand.New(rand.NewSource(time.Now().UnixNano())) //nolint:gosec

revOverride int64
watchChan clientv3.WatchChan
cancelFn context.CancelFunc
err error
revOverride int64
firstUpdateSucceeded bool
watchChan clientv3.WatchChan
cancelFn context.CancelFunc
err error
)

defer ticker.Stop()
Expand All @@ -127,7 +138,9 @@ func (w *manager) Watch(key string) {
// set it to nil so it will be recreated
watchChan = nil
// avoid recreating watch channel too frequently
time.Sleep(w.opts.WatchChanResetInterval())
dur := w.opts.WatchChanResetInterval()
dur += time.Duration(rnd.Int63n(int64(dur)))
time.Sleep(dur)
}

for {
Expand All @@ -140,8 +153,14 @@ func (w *manager) Watch(key string) {

// NB(cw) when we failed to create a etcd watch channel
// we do a get for now and will try to recreate the watch chan later
if err = w.updateFn(key, nil); err != nil {
logger.Error("failed to get value for key", zap.Error(err))
if !firstUpdateSucceeded {
if err = w.updateFn(key, nil); err != nil {
logger.Error("failed to get value for key", zap.Error(err))
} else {
// NB(vytenis): only try initializing once, otherwise there's
// get request amplification, especially for non-existent keys.
firstUpdateSucceeded = true
}
}
resetWatchWithSleep()
continue
Expand All @@ -166,20 +185,26 @@ func (w *manager) Watch(key string) {
zap.Error(err),
)
w.m.etcdWatchError.Inc(1)
// do not stop here, even though the update contains an error
// we still take this chance to attempt a Get() for the latest value

// If the current revision has been compacted, set watchChan to
// nil so the watch is recreated with a valid start revision
if err == rpctypes.ErrCompacted {
logger.Warn("recreating watch at revision", zap.Int64("revision", r.CompactRevision))
revOverride = r.CompactRevision
logger.Warn("compacted; recreating watch at revision",
zap.Int64("revision", revOverride))
} else {
logger.Warn("recreating watch due to an error")
logger.Warn("recreating watch due to an error", zap.Error(err))
}

resetWatchWithSleep()
continue
} else if r.IsProgressNotify() {
if r.CompactRevision > revOverride {
// we only care about last event as this watchmanager implementation does not support
// watching key ranges, only single keys.
// set revOverride to minimum non-compacted revision if watch was
// initialized with an older rev., since we really don't care about history.
// this may help recover faster (one less retry) on connection loss/leader change
// around compaction, if we were watching on a revision that's already compacted.
revOverride = r.CompactRevision
}
// Do not call updateFn on ProgressNotify as it happens periodically with no update events
continue
}
Expand Down
55 changes: 22 additions & 33 deletions src/cluster/etcd/watchmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,22 +119,22 @@ func TestWatchRecreate(t *testing.T) {

ec := ecluster.RandClient()

failTotal := 2
failTotal := 1
wh.opts = wh.opts.
SetClient(ec).
SetWatchChanInitTimeout(200 * time.Millisecond).
SetWatchChanResetInterval(100 * time.Millisecond)
SetWatchChanInitTimeout(50 * time.Millisecond).
SetWatchChanResetInterval(50 * time.Millisecond)

go func() {
ecluster.Members[0].DropConnections()
ecluster.Members[0].Blackhole()
wh.Watch("foo")
}()

time.Sleep(2 * wh.opts.WatchChanInitTimeout())
time.Sleep(4 * wh.opts.WatchChanInitTimeout())

// watch will error out but updateFn will be tried
for {
for i := 0; i < 100; i++ {
if atomic.LoadInt32(updateCalled) >= int32(failTotal) {
break
}
Expand All @@ -150,7 +150,7 @@ func TestWatchRecreate(t *testing.T) {
_, err := ec.Put(context.Background(), "foo", "v")
require.NoError(t, err)

for {
for i := 0; i < 100; i++ {
if atomic.LoadInt32(updateCalled) > updatesBefore {
break
}
Expand Down Expand Up @@ -223,37 +223,13 @@ func TestWatchNoLeader(t *testing.T) {
leaderIdx := ecluster.WaitLeader(t)
require.True(t, leaderIdx >= 0 && leaderIdx < len(ecluster.Members), "got invalid leader")

for i := 0; i < 10; i++ {
if atomic.LoadInt32(&updateCalled) == int32(3) {
break
}
time.Sleep(watchInitAndRetryDelay)
}

// simulate quorum loss
ecluster.Members[1].Stop(t)
ecluster.Members[2].Stop(t)

// wait for election timeout, then member[0] will not have a leader.
time.Sleep(electionTimeout)

for i := 0; i < 100; i++ {
// test that leader loss is retried - even on error, we should attempt update.
// 5 is an arbitraty number greater than amount of actual updates
if atomic.LoadInt32(&updateCalled) >= 10 {
break
}
time.Sleep(watchInitAndRetryDelay)
}

updates := atomic.LoadInt32(&updateCalled)
if updates < 10 {
require.Fail(t,
"insufficient update calls",
"expected at least 10 update attempts, got %d during a partition",
updates)
}

require.NoError(t, ecluster.Members[1].Restart(t))
require.NoError(t, ecluster.Members[2].Restart(t))
// wait for leader + election delay just in case
Expand All @@ -266,8 +242,21 @@ func TestWatchNoLeader(t *testing.T) {
require.NoError(t, err)

// give some time for watch to be updated
runtime.Gosched()
time.Sleep(watchInitAndRetryDelay)
for i := 0; i < 10; i++ {
if atomic.LoadInt32(&updateCalled) == int32(2) {
break
}
time.Sleep(watchInitAndRetryDelay)
runtime.Gosched()
}

updates := atomic.LoadInt32(&updateCalled)
if updates < 2 {
require.Fail(t,
"insufficient update calls",
"expected at least 2 update attempts, got %d during a partition",
updates)
}

atomic.AddInt32(&shouldStop, 1)
<-doneCh
Expand Down Expand Up @@ -308,7 +297,7 @@ func TestWatchCompactedRevision(t *testing.T) {
go wh.Watch("foo")
time.Sleep(3 * wh.opts.WatchChanInitTimeout())

assert.Equal(t, int32(4), atomic.LoadInt32(updateCalled))
assert.Equal(t, int32(3), atomic.LoadInt32(updateCalled))

lastRead := atomic.LoadInt32(updateCalled)
ec.Put(context.Background(), "foo", "bar-11")
Expand Down
43 changes: 25 additions & 18 deletions src/cluster/kv/etcd/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"io/ioutil"
"os"
"path"
"sync/atomic"
"testing"
"time"

Expand All @@ -37,6 +36,7 @@ import (
"github.com/m3db/m3/src/x/retry"

"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/integration"
Expand Down Expand Up @@ -318,27 +318,35 @@ func TestWatchLastVersion(t *testing.T) {
require.NoError(t, err)
require.Nil(t, w.Get())

var errs int32
lastVersion := 50
var (
doneCh = make(chan struct{})
lastVersion = 50
)

go func() {
for i := 1; i <= lastVersion; i++ {
_, err := store.Set("foo", genProto(fmt.Sprintf("bar%d", i)))
if err != nil {
atomic.AddInt32(&errs, 1)
}
assert.NoError(t, err)
}
}()

for {
<-w.C()
value := w.Get()
if value.Version() == lastVersion-int(atomic.LoadInt32(&errs)) {
break
go func() {
defer close(doneCh)
for {
<-w.C()
value := w.Get()
if value.Version() == lastVersion {
return
}
}
}()

select {
case <-time.After(5 * time.Second):
t.Fatal("test timed out")
case <-doneCh:
}
verifyValue(t, w.Get(), fmt.Sprintf("bar%d", lastVersion), lastVersion)

w.Close()
}

func TestWatchFromExist(t *testing.T) {
Expand Down Expand Up @@ -877,7 +885,6 @@ func TestStaleDelete__FromWatch(t *testing.T) {
// in this test we ensure clients who did not receive a delete for a key in
// their caches, evict the value in their cache the next time they communicate
// with an etcd which is unaware of the key (e.g. it's been compacted).

// first, we find the bytes required to be created in the cache file
serverCachePath, err := ioutil.TempDir("", "server-cache-dir")
require.NoError(t, err)
Expand Down Expand Up @@ -1156,10 +1163,10 @@ func testCluster(t *testing.T) (*integration.ClusterV3, Options, func()) {
}

opts := NewOptions().
SetWatchChanCheckInterval(50 * time.Millisecond).
SetWatchChanResetInterval(150 * time.Millisecond).
SetWatchChanInitTimeout(150 * time.Millisecond).
SetRequestTimeout(100 * time.Millisecond).
SetWatchChanCheckInterval(100 * time.Millisecond).
SetWatchChanResetInterval(200 * time.Millisecond).
SetWatchChanInitTimeout(200 * time.Millisecond).
SetRequestTimeout(200 * time.Millisecond).
SetRetryOptions(retry.NewOptions().SetMaxRetries(1).SetMaxBackoff(0)).
SetPrefix("test")

Expand Down

0 comments on commit edc3ec4

Please sign in to comment.