From 46d2caae1e87b007b8518ac240461b6ef5e4da77 Mon Sep 17 00:00:00 2001 From: Chao Chen Date: Mon, 18 Mar 2024 18:41:17 -0700 Subject: [PATCH] [release-3.5] backport fix watch event loss after compaction Signed-off-by: Chao Chen --- server/mvcc/watchable_store.go | 5 +++ server/mvcc/watchable_store_test.go | 61 ++++++++++++++++++++++++++++- 2 files changed, 65 insertions(+), 1 deletion(-) diff --git a/server/mvcc/watchable_store.go b/server/mvcc/watchable_store.go index 85429e850a0..cdac20ad5ed 100644 --- a/server/mvcc/watchable_store.go +++ b/server/mvcc/watchable_store.go @@ -366,6 +366,11 @@ func (s *watchableStore) syncWatchers() int { var victims watcherBatch wb := newWatcherBatch(wg, evs) for w := range wg.watchers { + if w.minRev < compactionRev { + // Skip the watcher that failed to send compacted watch response due to w.ch is full. + // Next retry of syncWatchers would try to resend the compacted watch response to w.ch + continue + } w.minRev = curRev + 1 eb, ok := wb[w] diff --git a/server/mvcc/watchable_store_test.go b/server/mvcc/watchable_store_test.go index 34723730f5e..c4e2c2d2537 100644 --- a/server/mvcc/watchable_store_test.go +++ b/server/mvcc/watchable_store_test.go @@ -23,11 +23,14 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zaptest" + "go.etcd.io/etcd/api/v3/mvccpb" "go.etcd.io/etcd/pkg/v3/traceutil" "go.etcd.io/etcd/server/v3/lease" betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing" - "go.uber.org/zap" ) func TestWatch(t *testing.T) { @@ -259,6 +262,62 @@ func TestWatchCompacted(t *testing.T) { } } +func TestWatchNoEventLossOnCompact(t *testing.T) { + oldChanBufLen, oldMaxWatchersPerSync := chanBufLen, maxWatchersPerSync + b, tmpPath := betesting.NewDefaultTmpBackend(t) + lg := zaptest.NewLogger(t) + s := newWatchableStore(lg, b, &lease.FakeLessor{}, StoreConfig{}) + + defer func() { + cleanup(s, b, tmpPath) + chanBufLen, maxWatchersPerSync = oldChanBufLen, oldMaxWatchersPerSync + }() + + chanBufLen, maxWatchersPerSync = 1, 4 + testKey, testValue := []byte("foo"), []byte("bar") + + maxRev := 10 + compactRev := int64(5) + for i := 0; i < maxRev; i++ { + s.Put(testKey, testValue, lease.NoLease) + } + _, err := s.Compact(traceutil.TODO(), compactRev) + require.NoErrorf(t, err, "failed to compact kv (%v)", err) + + w := s.NewWatchStream() + defer w.Close() + + watchers := map[WatchID]int64{ + 0: 1, + 1: 1, // create unsyncd watchers with startRev < compactRev + 2: 6, // create unsyncd watchers with compactRev < startRev < currentRev + } + for id, startRev := range watchers { + _, err := w.Watch(id, testKey, nil, startRev) + require.NoError(t, err) + } + // fill up w.Chan() with 1 buf via 2 compacted watch response + s.syncWatchers() + + for len(watchers) > 0 { + resp := <-w.Chan() + if resp.CompactRevision != 0 { + require.Equal(t, resp.CompactRevision, compactRev) + require.Contains(t, watchers, resp.WatchID) + delete(watchers, resp.WatchID) + continue + } + nextRev := watchers[resp.WatchID] + for _, ev := range resp.Events { + require.Equalf(t, nextRev, ev.Kv.ModRevision, "got event revision %d but want %d for watcher with watch ID %d", ev.Kv.ModRevision, nextRev, resp.WatchID) + nextRev++ + } + if nextRev == s.rev()+1 { + delete(watchers, resp.WatchID) + } + } +} + func TestWatchFutureRev(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})