From 2ef3492c6fd08d24422c9c1d638c670401ef40c8 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Mon, 15 May 2023 17:48:43 +0200 Subject: [PATCH] tests/robustness: Validate all etcd watches opened to etcd Signed-off-by: Marek Siarkowicz --- tests/robustness/README.md | 32 ++-- tests/robustness/identity/id.go | 7 + tests/robustness/linearizability_test.go | 37 ++-- tests/robustness/model/history.go | 15 ++ tests/robustness/report.go | 48 ++--- tests/robustness/traffic/client.go | 64 ++++++- tests/robustness/traffic/etcd.go | 14 +- tests/robustness/traffic/kubernetes.go | 4 +- tests/robustness/traffic/traffic.go | 30 ++-- tests/robustness/watch.go | 215 ++++++++++++----------- 10 files changed, 279 insertions(+), 187 deletions(-) diff --git a/tests/robustness/README.md b/tests/robustness/README.md index 1aa5c745b8bc..ebea3b45098c 100644 --- a/tests/robustness/README.md +++ b/tests/robustness/README.md @@ -1,7 +1,7 @@ # etcd Robustness Testing Purpose of etcd robustness tests is to validate that etcd upholds -[API guarantees] and [watch guarantees] under any condition or failure. +[KV API guarantees] and [watch API guarantees] under any condition or failure. Robustness tests achieve that comparing etcd cluster behavior against a simplified model. Multiple test encompass different etcd cluster setups, client traffic types and failures experienced by cluster. @@ -9,8 +9,8 @@ During a single test we create a cluster and inject failures while sending and r Correctness is validated by running collected history of client operations against the etcd model and a set of validators. Upon failure tests generate a report that can be used to attribute whether failure was caused by bug in etcd or test framework. -[API guarantees]: https://etcd.io/docs/latest/learning/api_guarantees/ -[watch guarantees]: https://etcd.io/docs/latest/learning/api/#watch-streams +[KV API guarantees]: https://etcd.io/docs/v3.6/learning/api_guarantees/#kv-apis +[watch API guarantees]: https://etcd.io/docs/v3.6/learning/api_guarantees/#watch-apis ## Running locally @@ -41,15 +41,15 @@ is included in test logs. One of log lines should look like: logger.go:130: 2023-03-18T12:18:03.244+0100 INFO Saving member data dir {"member": "TestRobustnessIssue14370-test-0", "path": "/tmp/TestRobustness_Issue14370/TestRobustnessIssue14370-test-0"} logger.go:130: 2023-03-18T12:18:03.244+0100 INFO Saving watch responses {"path": "/tmp/TestRobustness_Issue14370/TestRobustnessIssue14370-test-0/responses.json"} logger.go:130: 2023-03-18T12:18:03.247+0100 INFO Saving watch events {"path": "/tmp/TestRobustness_Issue14370/TestRobustnessIssue14370-test-0/events.json"} - logger.go:130: 2023-03-18T12:18:03.248+0100 INFO Saving operation history {"path": "/tmp/TestRobustness_Issue14370/full-history.json"} - logger.go:130: 2023-03-18T12:18:03.252+0100 INFO Saving operation history {"path": "/tmp/TestRobustness_Issue14370/patched-history.json"} + logger.go:130: 2023-05-15T17:42:37.792+0200 INFO Saving operation history {"path": "/tmp/TestRobustness_ClusterOfSize3_Kubernetes/client-1/operations.json"} + logger.go:130: 2023-05-15T17:42:37.793+0200 INFO Saving watch responses {"path": "/tmp/TestRobustness_ClusterOfSize3_Kubernetes/client-2/watch.json"} logger.go:130: 2023-03-18T12:18:03.256+0100 INFO Saving visualization {"path": "/tmp/TestRobustness_Issue14370/history.html"} ``` Report includes multiple types of files: * Member db files, can be used to verify disk/memory corruption. -* Watch responses saved as json, can be used to validate [watch guarantees]. -* Operation history saved as both html visualization and a json, can be used to validate [API guarantees]. +* Watch responses saved as json, can be used to validate [watch API guarantees]. +* Operation history saved as both html visualization and a json, can be used to validate [KV API guarantees]. ### Example analysis of linearization issue @@ -78,22 +78,22 @@ To reproduce the issue by yourself run `make test-robustness-issue15271`. After a couple of tries robustness tests should fail with a logs `Broke watch guarantee` and save report locally. Watch issues are easiest to analyse by reading the recorded watch history. -Watch history is recorded for each member separated in different subdirectory under `/tmp/TestRobustness_Issue15271/` -Open `responses.json` for member mentioned in log `Broke watch guarantee`. -For example for member `TestRobustnessIssue15271-test-1` open `/tmp/TestRobustness_Issue15271/TestRobustnessIssue15271-test-1/responses.json`. +Watch history is recorded for each client separated in different subdirectory under `/tmp/TestRobustness_Issue15271/` +Open `watch.json` for client mentioned in log `Broke watch guarantee`. +For example for client `14` open `/tmp/TestRobustness_Issue15271/client-14/watch.json`. Each line consists of json blob corresponding to single watch response observed by client. Look for lines with `mod_revision` equal to revision mentioned in the first log with `Broke watch guarantee` You should see two lines where the `mod_revision` decreases like ones below: ``` -{"Header":{"cluster_id":12951239930360520062,"member_id":16914881897345358027,"revision":2574,"raft_term":2},"Events":[{"kv":{"key":"Ng==","create_revision":2303,"mod_revision":2574,"version":46,"value":"Mjg5OA=="}}],"CompactRevision":0,"Canceled":false,"Created":false} -{"Header":{"cluster_id":12951239930360520062,"member_id":16914881897345358027,"revision":7708,"raft_term":2},"Events":[{"kv":{"key":"NQ==","create_revision":5,"mod_revision":91,"version":10,"value":"MTAy"}}, ... } +{"Events":[{"Op":{"Type":"put","Key":"5","WithPrefix":false,"Limit":0,"Value":{"Value":"357","Hash":0},"LeaseID":0},"Revision":335}],"IsProgressNotify":false,"Revision":335,"Time":1050415777} +{"Events":[{"Op":{"Type":"put","Key":"1","WithPrefix":false,"Limit":0,"Value":{"Value":"24","Hash":0},"LeaseID":0},"Revision":24}, ... ``` -Up to the first line the `mod_revision` of events within responses only increased up to a value of `2574`. -However, the following line includes an event with `mod_revision` equal `91`. -If you follow the `mod_revision` throughout the file you should notice that watch replayed revisions second time. -This is incorrect and breaks `Ordered` and `Unique` [watch guarantees]. +Up to the first line the `revision` of events within responses only increased up to a value of `335`. +However, the following line includes an event with `revision` equal `24`. +If you follow the `revision` throughout the file you should notice that watch replayed revisions second time. +This is incorrect and breaks `Ordered` and `Unique` [watch API guarantees]. This is consistent with the root cause of [#14370] where member reconnecting to cluster will incorrectly resend revisions. [#15271]: https://github.com/etcd-io/etcd/issues/15271 diff --git a/tests/robustness/identity/id.go b/tests/robustness/identity/id.go index 810a60e1bdb7..3dd22433a836 100644 --- a/tests/robustness/identity/id.go +++ b/tests/robustness/identity/id.go @@ -21,6 +21,8 @@ type Provider interface { NewStreamId() int // NewRequestId returns unique identification used to make write requests unique. NewRequestId() int + // NewClientId returns unique identification for client and their reports. + NewClientId() int } func NewIdProvider() Provider { @@ -30,6 +32,7 @@ func NewIdProvider() Provider { type atomicProvider struct { streamId atomic.Int64 requestId atomic.Int64 + clientId atomic.Int64 } func (id *atomicProvider) NewStreamId() int { @@ -39,3 +42,7 @@ func (id *atomicProvider) NewStreamId() int { func (id *atomicProvider) NewRequestId() int { return int(id.requestId.Add(1)) } + +func (id *atomicProvider) NewClientId() int { + return int(id.clientId.Add(1)) +} diff --git a/tests/robustness/linearizability_test.go b/tests/robustness/linearizability_test.go index a658d524c44a..e49545f7d825 100644 --- a/tests/robustness/linearizability_test.go +++ b/tests/robustness/linearizability_test.go @@ -19,13 +19,13 @@ import ( "testing" "time" - "github.com/anishathalye/porcupine" "go.uber.org/zap" "go.uber.org/zap/zaptest" "golang.org/x/sync/errgroup" "go.etcd.io/etcd/api/v3/version" "go.etcd.io/etcd/tests/v3/framework/e2e" + "go.etcd.io/etcd/tests/v3/robustness/identity" "go.etcd.io/etcd/tests/v3/robustness/model" "go.etcd.io/etcd/tests/v3/robustness/traffic" ) @@ -152,27 +152,24 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, s testSce defer func() { r.Report(t, panicked) }() - r.operations, r.responses = s.run(ctx, t, lg, r.clus) + r.clientReports = s.run(ctx, t, lg, r.clus) forcestopCluster(r.clus) watchProgressNotifyEnabled := r.clus.Cfg.WatchProcessNotifyInterval != 0 - validateWatchResponses(t, r.clus, r.responses, s.watch.requestProgress || watchProgressNotifyEnabled) - - r.events = watchEvents(r.responses) - validateEventsMatch(t, r.events) - - r.patchedOperations = patchOperationBasedOnWatchEvents(r.operations, longestHistory(r.events)) - r.visualizeHistory = model.ValidateOperationHistoryAndReturnVisualize(t, lg, r.patchedOperations) + validateGotAtLeastOneProgressNotify(t, r.clientReports, s.watch.requestProgress || watchProgressNotifyEnabled) + r.visualizeHistory = validateCorrectness(t, lg, r.clientReports) panicked = false } -func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) (operations []porcupine.Operation, responses [][]traffic.WatchResponse) { +func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) (reports []traffic.ClientReport) { g := errgroup.Group{} + var operationReport, watchReport []traffic.ClientReport finishTraffic := make(chan struct{}) // baseTime is used to get monotonic clock reading when recording operations/watch events baseTime := time.Now() + ids := identity.NewIdProvider() g.Go(func() error { defer close(finishTraffic) injectFailpoints(ctx, t, lg, clus, s.failpoint) @@ -182,22 +179,22 @@ func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clu maxRevisionChan := make(chan int64, 1) g.Go(func() error { defer close(maxRevisionChan) - operations = traffic.SimulateTraffic(ctx, t, lg, clus, s.traffic, finishTraffic, baseTime) - maxRevisionChan <- operationsMaxRevision(operations) + operationReport = traffic.SimulateTraffic(ctx, t, lg, clus, s.traffic, finishTraffic, baseTime, ids) + maxRevisionChan <- operationsMaxRevision(operationReport) return nil }) g.Go(func() error { - responses = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan, s.watch, baseTime) + watchReport = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan, s.watch, baseTime, ids) return nil }) g.Wait() - return operations, responses + return append(operationReport, watchReport...) } -func operationsMaxRevision(operations []porcupine.Operation) int64 { +func operationsMaxRevision(reports []traffic.ClientReport) int64 { var maxRevision int64 - for _, op := range operations { - revision := op.Output.(model.EtcdNonDeterministicResponse).Revision + for _, r := range reports { + revision := r.OperationHistory.MaxRevision() if revision > maxRevision { maxRevision = revision } @@ -212,3 +209,9 @@ func forcestopCluster(clus *e2e.EtcdProcessCluster) error { } return clus.ConcurrentStop() } + +func validateCorrectness(t *testing.T, lg *zap.Logger, reports []traffic.ClientReport) (visualize func(basepath string)) { + validateWatchCorrectness(t, reports) + operations := operationsFromClientReports(reports) + return model.ValidateOperationHistoryAndReturnVisualize(t, lg, operations) +} diff --git a/tests/robustness/model/history.go b/tests/robustness/model/history.go index 10a09468c34f..4f5d52c98749 100644 --- a/tests/robustness/model/history.go +++ b/tests/robustness/model/history.go @@ -508,6 +508,10 @@ func (h History) Merge(h2 History) History { return result } +func (h History) Len() int { + return len(h.successful) + len(h.failed) +} + func (h History) Operations() []porcupine.Operation { operations := make([]porcupine.Operation, 0, len(h.successful)+len(h.failed)) var maxTime int64 @@ -530,3 +534,14 @@ func (h History) Operations() []porcupine.Operation { } return operations } + +func (h History) MaxRevision() int64 { + var maxRevision int64 + for _, op := range h.successful { + revision := op.Output.(EtcdNonDeterministicResponse).Revision + if revision > maxRevision { + maxRevision = revision + } + } + return maxRevision +} diff --git a/tests/robustness/report.go b/tests/robustness/report.go index aeebf8e6e089..4e995acbdb07 100644 --- a/tests/robustness/report.go +++ b/tests/robustness/report.go @@ -16,8 +16,10 @@ package robustness import ( "encoding/json" + "fmt" "os" "path/filepath" + "sort" "strings" "testing" @@ -29,13 +31,10 @@ import ( ) type report struct { - lg *zap.Logger - clus *e2e.EtcdProcessCluster - responses [][]traffic.WatchResponse - events [][]watchEvent - operations []porcupine.Operation - patchedOperations []porcupine.Operation - visualizeHistory func(path string) + lg *zap.Logger + clus *e2e.EtcdProcessCluster + clientReports []traffic.ClientReport + visualizeHistory func(path string) } func testResultsDirectory(t *testing.T) string { @@ -65,21 +64,28 @@ func testResultsDirectory(t *testing.T) string { func (r *report) Report(t *testing.T, force bool) { path := testResultsDirectory(t) if t.Failed() || force { - for i, member := range r.clus.Procs { - memberDataDir := filepath.Join(path, member.Config().Name) + for _, member := range r.clus.Procs { + memberDataDir := filepath.Join(path, fmt.Sprintf("server-%s", member.Config().Name)) persistMemberDataDir(t, r.lg, member, memberDataDir) - if r.responses != nil { - persistWatchResponses(t, r.lg, filepath.Join(memberDataDir, "responses.json"), r.responses[i]) - } - if r.events != nil { - persistWatchEvents(t, r.lg, filepath.Join(memberDataDir, "events.json"), r.events[i]) - } } - if r.operations != nil { - persistOperationHistory(t, r.lg, filepath.Join(path, "full-history.json"), r.operations) - } - if r.patchedOperations != nil { - persistOperationHistory(t, r.lg, filepath.Join(path, "patched-history.json"), r.patchedOperations) + if r.clientReports != nil { + sort.Slice(r.clientReports, func(i, j int) bool { + return r.clientReports[i].ClientId < r.clientReports[j].ClientId + }) + for _, report := range r.clientReports { + clientDir := filepath.Join(path, fmt.Sprintf("client-%d", report.ClientId)) + err := os.MkdirAll(clientDir, 0700) + if err != nil { + t.Fatal(err) + } + if len(report.Watch) != 0 { + persistWatchResponses(t, r.lg, filepath.Join(clientDir, "watch.json"), report.Watch) + } + operations := report.OperationHistory.Operations() + if len(operations) != 0 { + persistOperationHistory(t, r.lg, filepath.Join(clientDir, "operations.json"), operations) + } + } } } if r.visualizeHistory != nil { @@ -112,7 +118,7 @@ func persistWatchResponses(t *testing.T, lg *zap.Logger, path string, responses } } -func persistWatchEvents(t *testing.T, lg *zap.Logger, path string, events []watchEvent) { +func persistWatchEvents(t *testing.T, lg *zap.Logger, path string, events []traffic.TimedWatchEvent) { lg.Info("Saving watch events", zap.String("path", path)) file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755) if err != nil { diff --git a/tests/robustness/traffic/client.go b/tests/robustness/traffic/client.go index c36e97c21196..746734c2bb34 100644 --- a/tests/robustness/traffic/client.go +++ b/tests/robustness/traffic/client.go @@ -32,6 +32,7 @@ import ( // clientv3.Client) that records all the requests and responses made. Doesn't // allow for concurrent requests to confirm to model.AppendableHistory requirements. type RecordingClient struct { + id int client clientv3.Client // baseTime is needed to achieve monotonic clock reading. // Only time-measuring operations should be used to record time. @@ -46,10 +47,22 @@ type RecordingClient struct { } type WatchResponse struct { - clientv3.WatchResponse + Events []WatchEvent + IsProgressNotify bool + Revision int64 + Time time.Duration +} + +type TimedWatchEvent struct { + WatchEvent Time time.Duration } +type WatchEvent struct { + Op model.EtcdOperation + Revision int64 +} + func NewClient(endpoints []string, ids identity.Provider, baseTime time.Time) (*RecordingClient, error) { cc, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, @@ -61,6 +74,7 @@ func NewClient(endpoints []string, ids identity.Provider, baseTime time.Time) (* return nil, err } return &RecordingClient{ + id: ids.NewClientId(), client: *cc, operations: model.NewAppendableHistory(ids), baseTime: baseTime, @@ -73,14 +87,24 @@ func (c *RecordingClient) Close() error { func (c *RecordingClient) Report() ClientReport { return ClientReport{ - Operations: c.operations.History, - Watch: nil, + ClientId: c.id, + OperationHistory: c.operations.History, + Watch: c.watchResponses, } } type ClientReport struct { - Operations model.History - Watch []WatchResponse + ClientId int + OperationHistory model.History + Watch []WatchResponse +} + +func (r ClientReport) WatchEventCount() int { + count := 0 + for _, resp := range r.Watch { + count += len(resp.Events) + } + return count } func (c *RecordingClient) Get(ctx context.Context, key string) (*mvccpb.KeyValue, error) { @@ -206,10 +230,38 @@ func (c *RecordingClient) Watch(ctx context.Context, key string, rev int64, with defer close(respCh) for r := range c.client.Watch(ctx, key, ops...) { c.watchMux.Lock() - c.watchResponses = append(c.watchResponses, WatchResponse{r, time.Since(c.baseTime)}) + c.watchResponses = append(c.watchResponses, ToWatchResponse(r, c.baseTime)) c.watchMux.Unlock() respCh <- r } }() return respCh } + +func ToWatchResponse(r clientv3.WatchResponse, baseTime time.Time) WatchResponse { + resp := WatchResponse{Time: time.Since(baseTime)} + for _, event := range r.Events { + resp.Events = append(resp.Events, toWatchEvent(*event)) + } + resp.IsProgressNotify = r.IsProgressNotify() + resp.Revision = r.Header.Revision + return resp +} + +func toWatchEvent(event clientv3.Event) WatchEvent { + var op model.OperationType + switch event.Type { + case mvccpb.PUT: + op = model.Put + case mvccpb.DELETE: + op = model.Delete + } + return WatchEvent{ + Revision: event.Kv.ModRevision, + Op: model.EtcdOperation{ + Type: op, + Key: string(event.Kv.Key), + Value: model.ToValueOrHash(string(event.Kv.Value)), + }, + } +} diff --git a/tests/robustness/traffic/etcd.go b/tests/robustness/traffic/etcd.go index 864472fef3e6..555c574a8f38 100644 --- a/tests/robustness/traffic/etcd.go +++ b/tests/robustness/traffic/etcd.go @@ -86,7 +86,7 @@ const ( Defragment etcdRequestType = "defragment" ) -func (t etcdTraffic) Run(ctx context.Context, clientId int, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{}) { +func (t etcdTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{}) { for { select { @@ -103,7 +103,7 @@ func (t etcdTraffic) Run(ctx context.Context, clientId int, c *RecordingClient, continue } limiter.Wait(ctx) - err = t.Write(ctx, c, limiter, key, ids, lm, clientId, resp) + err = t.Write(ctx, c, limiter, key, ids, lm, resp) if err != nil { continue } @@ -118,7 +118,7 @@ func (t etcdTraffic) Read(ctx context.Context, c *RecordingClient, key string) ( return resp, err } -func (t etcdTraffic) Write(ctx context.Context, c *RecordingClient, limiter *rate.Limiter, key string, id identity.Provider, lm identity.LeaseIdStorage, cid int, lastValues *mvccpb.KeyValue) error { +func (t etcdTraffic) Write(ctx context.Context, c *RecordingClient, limiter *rate.Limiter, key string, id identity.Provider, lm identity.LeaseIdStorage, lastValues *mvccpb.KeyValue) error { writeCtx, cancel := context.WithTimeout(ctx, RequestTimeout) var err error @@ -139,11 +139,11 @@ func (t etcdTraffic) Write(ctx context.Context, c *RecordingClient, limiter *rat value := fmt.Sprintf("%d", id.NewRequestId()) _, err = c.Txn(ctx, []clientv3.Cmp{clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision)}, []clientv3.Op{clientv3.OpPut(key, value)}, nil) case PutWithLease: - leaseId := lm.LeaseId(cid) + leaseId := lm.LeaseId(c.id) if leaseId == 0 { leaseId, err = c.LeaseGrant(writeCtx, t.leaseTTL) if err == nil { - lm.AddLeaseId(cid, leaseId) + lm.AddLeaseId(c.id, leaseId) limiter.Wait(ctx) } } @@ -153,12 +153,12 @@ func (t etcdTraffic) Write(ctx context.Context, c *RecordingClient, limiter *rat putCancel() } case LeaseRevoke: - leaseId := lm.LeaseId(cid) + leaseId := lm.LeaseId(c.id) if leaseId != 0 { err = c.LeaseRevoke(writeCtx, leaseId) //if LeaseRevoke has failed, do not remove the mapping. if err == nil { - lm.RemoveLeaseId(cid) + lm.RemoveLeaseId(c.id) } } case Defragment: diff --git a/tests/robustness/traffic/kubernetes.go b/tests/robustness/traffic/kubernetes.go index 5cd9bfb0bb50..7c4b822eba7e 100644 --- a/tests/robustness/traffic/kubernetes.go +++ b/tests/robustness/traffic/kubernetes.go @@ -56,7 +56,7 @@ type kubernetesTraffic struct { writeChoices []choiceWeight[KubernetesRequestType] } -func (t kubernetesTraffic) Run(ctx context.Context, clientId int, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{}) { +func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{}) { kc := &kubernetesClient{client: c} s := newStorage() keyPrefix := "/registry/" + t.resource + "/" @@ -80,7 +80,7 @@ func (t kubernetesTraffic) Run(ctx context.Context, clientId int, c *RecordingCl s.Reset(resp) limiter.Wait(ctx) watchCtx, cancel := context.WithTimeout(ctx, WatchTimeout) - for e := range c.Watch(watchCtx, keyPrefix, resp.Header.Revision, true) { + for e := range c.Watch(watchCtx, keyPrefix, resp.Header.Revision+1, true) { s.Update(e) } cancel() diff --git a/tests/robustness/traffic/traffic.go b/tests/robustness/traffic/traffic.go index f476dee20c82..a28dc699256c 100644 --- a/tests/robustness/traffic/traffic.go +++ b/tests/robustness/traffic/traffic.go @@ -20,13 +20,11 @@ import ( "testing" "time" - "github.com/anishathalye/porcupine" "go.uber.org/zap" "golang.org/x/time/rate" "go.etcd.io/etcd/tests/v3/framework/e2e" "go.etcd.io/etcd/tests/v3/robustness/identity" - "go.etcd.io/etcd/tests/v3/robustness/model" ) var ( @@ -36,13 +34,12 @@ var ( MultiOpTxnOpCount = 4 ) -func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, config Config, finish <-chan struct{}, baseTime time.Time) []porcupine.Operation { +func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, config Config, finish <-chan struct{}, baseTime time.Time, ids identity.Provider) []ClientReport { mux := sync.Mutex{} endpoints := clus.EndpointsGRPC() - ids := identity.NewIdProvider() lm := identity.NewLeaseIdStorage() - h := model.History{} + reports := []ClientReport{} limiter := rate.NewLimiter(rate.Limit(config.maximalQPS), 200) startTime := time.Now() @@ -58,15 +55,15 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2 if err != nil { t.Fatal(err) } - go func(c *RecordingClient, clientId int) { + go func(c *RecordingClient) { defer wg.Done() defer c.Close() - config.traffic.Run(ctx, clientId, c, limiter, ids, lm, finish) + config.traffic.Run(ctx, c, limiter, ids, lm, finish) mux.Lock() - h = h.Merge(c.operations.History) + reports = append(reports, c.Report()) mux.Unlock() - }(c, i) + }(c) } wg.Wait() endTime := time.Now() @@ -77,17 +74,20 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2 if err != nil { t.Error(err) } - h = h.Merge(cc.operations.History) + reports = append(reports, cc.Report()) - operations := h.Operations() - lg.Info("Recorded operations", zap.Int("count", len(operations))) + var operationCount int + for _, r := range reports { + operationCount += r.OperationHistory.Len() + } + lg.Info("Recorded operations", zap.Int("operationCount", operationCount)) - qps := float64(len(operations)) / float64(endTime.Sub(startTime)) * float64(time.Second) + qps := float64(operationCount) / float64(endTime.Sub(startTime)) * float64(time.Second) lg.Info("Average traffic", zap.Float64("qps", qps)) if qps < config.minimalQPS { t.Errorf("Requiring minimal %f qps for test results to be reliable, got %f qps", config.minimalQPS, qps) } - return operations + return reports } type Config struct { @@ -99,5 +99,5 @@ type Config struct { } type Traffic interface { - Run(ctx context.Context, clientId int, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{}) + Run(ctx context.Context, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{}) } diff --git a/tests/robustness/watch.go b/tests/robustness/watch.go index 316b42fa63a2..7d175526e6b8 100644 --- a/tests/robustness/watch.go +++ b/tests/robustness/watch.go @@ -22,20 +22,19 @@ import ( "github.com/anishathalye/porcupine" "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" "go.uber.org/zap" - "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/tests/v3/framework/e2e" + "go.etcd.io/etcd/tests/v3/robustness/identity" "go.etcd.io/etcd/tests/v3/robustness/model" "go.etcd.io/etcd/tests/v3/robustness/traffic" ) -func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, maxRevisionChan <-chan int64, cfg watchConfig, baseTime time.Time) [][]traffic.WatchResponse { +func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, maxRevisionChan <-chan int64, cfg watchConfig, baseTime time.Time, ids identity.Provider) []traffic.ClientReport { mux := sync.Mutex{} var wg sync.WaitGroup - memberResponses := make([][]traffic.WatchResponse, len(clus.Procs)) + reports := make([]traffic.ClientReport, len(clus.Procs)) memberMaxRevisionChans := make([]chan int64, len(clus.Procs)) for i, member := range clus.Procs { c, err := clientv3.New(clientv3.Config{ @@ -55,7 +54,10 @@ func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.Etcd defer c.Close() responses := watchMember(ctx, t, c, memberChan, cfg, baseTime) mux.Lock() - memberResponses[i] = responses + reports[i] = traffic.ClientReport{ + ClientId: ids.NewClientId(), + Watch: responses, + } mux.Unlock() }(i, c) } @@ -68,7 +70,7 @@ func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.Etcd } }() wg.Wait() - return memberResponses + return reports } type watchConfig struct { @@ -76,6 +78,7 @@ type watchConfig struct { } // watchMember collects all responses until context is cancelled, it has observed revision provided via maxRevisionChan or maxRevisionChan was closed. +// TODO: Use traffic.RecordingClient instead of clientv3.Client func watchMember(ctx context.Context, t *testing.T, c *clientv3.Client, maxRevisionChan <-chan int64, cfg watchConfig, baseTime time.Time) (resps []traffic.WatchResponse) { var maxRevision int64 = 0 var lastRevision int64 = 0 @@ -112,7 +115,7 @@ func watchMember(ctx context.Context, t *testing.T, c *clientv3.Client, maxRevis if resp.Err() == nil { // using time.Since time-measuring operation to get monotonic clock reading // see https://github.com/golang/go/blob/master/src/time/time.go#L17 - resps = append(resps, traffic.WatchResponse{WatchResponse: resp, Time: time.Since(baseTime)}) + resps = append(resps, traffic.ToWatchResponse(resp, baseTime)) } else if !resp.Canceled { t.Errorf("Watch stream received error, err %v", resp.Err()) } @@ -131,142 +134,139 @@ func watchResponsesMaxRevision(responses []traffic.WatchResponse) int64 { var maxRevision int64 for _, response := range responses { for _, event := range response.Events { - if event.Kv.ModRevision > maxRevision { - maxRevision = event.Kv.ModRevision + if event.Revision > maxRevision { + maxRevision = event.Revision } } } return maxRevision } -func validateWatchResponses(t *testing.T, clus *e2e.EtcdProcessCluster, responses [][]traffic.WatchResponse, expectProgressNotify bool) { - for i, member := range clus.Procs { - validateMemberWatchResponses(t, member.Config().Name, responses[i], expectProgressNotify) +func validateWatchCorrectness(t *testing.T, reports []traffic.ClientReport) { + // Validate etcd watch properties defined in https://etcd.io/docs/v3.6/learning/api_guarantees/#watch-apis + for _, r := range reports { + validateOrdered(t, r) + validateUnique(t, r) + validateAtomic(t, r) + validateBookmarkable(t, r) + // TODO: Validate presumable } + validateEventsMatch(t, reports) + // Expects that longest history encompasses all events. + // TODO: Use combined events from all histories instead of the longest history. + // TODO: Validate that each watch report is reliable, not only the longest one. + validateReliable(t, longestEventHistory(reports)) } -func validateMemberWatchResponses(t *testing.T, memberId string, responses []traffic.WatchResponse, expectProgressNotify bool) { - // Validate watch is correctly configured to ensure proper testing - validateGotAtLeastOneProgressNotify(t, memberId, responses, expectProgressNotify) - - // Validate etcd watch properties defined in https://etcd.io/docs/v3.6/learning/api/#watch-streams - validateOrderedAndReliable(t, memberId, responses) - validateUnique(t, memberId, responses) - validateAtomic(t, memberId, responses) - // Validate kubernetes usage of watch - validateRenewable(t, memberId, responses) -} - -func validateGotAtLeastOneProgressNotify(t *testing.T, memberId string, responses []traffic.WatchResponse, expectProgressNotify bool) { +func validateGotAtLeastOneProgressNotify(t *testing.T, reports []traffic.ClientReport, expectProgressNotify bool) { var gotProgressNotify = false - var lastHeadRevision int64 = 1 - for _, resp := range responses { - if resp.IsProgressNotify() && resp.Header.Revision == lastHeadRevision { - gotProgressNotify = true + for _, r := range reports { + var lastHeadRevision int64 = 1 + for _, resp := range r.Watch { + if resp.IsProgressNotify && resp.Revision == lastHeadRevision { + gotProgressNotify = true + break + } + lastHeadRevision = resp.Revision + } + if gotProgressNotify { break } - lastHeadRevision = resp.Header.Revision } if gotProgressNotify != expectProgressNotify { - t.Errorf("Progress notify does not match, expect: %v, got: %v, member: %q", expectProgressNotify, gotProgressNotify, memberId) + t.Errorf("Progress notify does not match, expect: %v, got: %v", expectProgressNotify, gotProgressNotify) } } -func validateRenewable(t *testing.T, memberId string, responses []traffic.WatchResponse) { +func validateBookmarkable(t *testing.T, report traffic.ClientReport) { var lastProgressNotifyRevision int64 = 0 - for _, resp := range responses { + for _, resp := range report.Watch { for _, event := range resp.Events { - if event.Kv.ModRevision <= lastProgressNotifyRevision { - t.Errorf("Broke watch guarantee: Renewable - watch can renewed using revision in last progress notification; Progress notification guarantees that previous events have been already delivered, eventRevision: %d, progressNotifyRevision: %d, member: %q", event.Kv.ModRevision, lastProgressNotifyRevision, memberId) + if event.Revision <= lastProgressNotifyRevision { + t.Errorf("Broke watch guarantee: Renewable - watch can renewed using revision in last progress notification; Progress notification guarantees that previous events have been already delivered, eventRevision: %d, progressNotifyRevision: %d", event.Revision, lastProgressNotifyRevision) } } - if resp.IsProgressNotify() { - lastProgressNotifyRevision = resp.Header.Revision + if resp.IsProgressNotify { + lastProgressNotifyRevision = resp.Revision } } } -func validateOrderedAndReliable(t *testing.T, memberId string, responses []traffic.WatchResponse) { +func validateOrdered(t *testing.T, report traffic.ClientReport) { var lastEventRevision int64 = 1 - for _, resp := range responses { + for _, resp := range report.Watch { for _, event := range resp.Events { - if event.Kv.ModRevision != lastEventRevision && event.Kv.ModRevision != lastEventRevision+1 { - if event.Kv.ModRevision < lastEventRevision { - t.Errorf("Broke watch guarantee: Ordered - events are ordered by revision; an event will never appear on a watch if it precedes an event in time that has already been posted, lastRevision: %d, currentRevision: %d, member: %q", lastEventRevision, event.Kv.ModRevision, memberId) - } else { - t.Errorf("Broke watch guarantee: Reliable - a sequence of events will never drop any subsequence of events; if there are events ordered in time as a < b < c, then if the watch receives events a and c, it is guaranteed to receive b, lastRevision: %d, currentRevision: %d, member: %q", lastEventRevision, event.Kv.ModRevision, memberId) - } + if event.Revision < lastEventRevision { + t.Errorf("Broke watch guarantee: Ordered - events are ordered by revision; an event will never appear on a watch if it precedes an event in time that has already been posted, lastRevision: %d, currentRevision: %d, client: %d", lastEventRevision, event.Revision, report.ClientId) } - lastEventRevision = event.Kv.ModRevision + lastEventRevision = event.Revision } } } -func validateUnique(t *testing.T, memberId string, responses []traffic.WatchResponse) { +func validateUnique(t *testing.T, report traffic.ClientReport) { type revisionKey struct { revision int64 key string } uniqueOperations := map[revisionKey]struct{}{} - for _, resp := range responses { + for _, resp := range report.Watch { for _, event := range resp.Events { - rk := revisionKey{key: string(event.Kv.Key), revision: event.Kv.ModRevision} + rk := revisionKey{key: event.Op.Key, revision: event.Revision} if _, found := uniqueOperations[rk]; found { - t.Errorf("Broke watch guarantee: Unique - an event will never appear on a watch twice, key: %q, revision: %d, member: %q", rk.key, rk.revision, memberId) + t.Errorf("Broke watch guarantee: Unique - an event will never appear on a watch twice, key: %q, revision: %d, client: %d", rk.key, rk.revision, report.ClientId) } uniqueOperations[rk] = struct{}{} } } } -func validateAtomic(t *testing.T, memberId string, responses []traffic.WatchResponse) { +func validateAtomic(t *testing.T, report traffic.ClientReport) { var lastEventRevision int64 = 1 - for _, resp := range responses { + for _, resp := range report.Watch { if len(resp.Events) > 0 { - if resp.Events[0].Kv.ModRevision == lastEventRevision { - t.Errorf("Broke watch guarantee: Atomic - a list of events is guaranteed to encompass complete revisions; updates in the same revision over multiple keys will not be split over several lists of events, previousListEventRevision: %d, currentListEventRevision: %d, member: %q", lastEventRevision, resp.Events[0].Kv.ModRevision, memberId) + if resp.Events[0].Revision == lastEventRevision { + t.Errorf("Broke watch guarantee: Atomic - a list of events is guaranteed to encompass complete revisions; updates in the same revision over multiple keys will not be split over several lists of events, previousListEventRevision: %d, currentListEventRevision: %d, client: %d", lastEventRevision, resp.Events[0].Revision, report.ClientId) } - lastEventRevision = resp.Events[len(resp.Events)-1].Kv.ModRevision + lastEventRevision = resp.Events[len(resp.Events)-1].Revision + } + } +} + +func validateReliable(t *testing.T, events []traffic.TimedWatchEvent) { + var lastEventRevision int64 = 1 + for _, event := range events { + if event.Revision > lastEventRevision && event.Revision != lastEventRevision+1 { + t.Errorf("Broke watch guarantee: Reliable - a sequence of events will never drop any subsequence of events; if there are events ordered in time as a < b < c, then if the watch receives events a and c, it is guaranteed to receive b, missing revisions from range: %d-%d", lastEventRevision, event.Revision) } + lastEventRevision = event.Revision } } -func toWatchEvents(responses []traffic.WatchResponse) (events []watchEvent) { +func toWatchEvents(responses []traffic.WatchResponse) (events []traffic.TimedWatchEvent) { for _, resp := range responses { for _, event := range resp.Events { - var op model.OperationType - switch event.Type { - case mvccpb.PUT: - op = model.Put - case mvccpb.DELETE: - op = model.Delete - } - events = append(events, watchEvent{ - Time: resp.Time, - Revision: event.Kv.ModRevision, - Op: model.EtcdOperation{ - Type: op, - Key: string(event.Kv.Key), - Value: model.ToValueOrHash(string(event.Kv.Value)), - }, + events = append(events, traffic.TimedWatchEvent{ + Time: resp.Time, + WatchEvent: event, }) } } return events } -type watchEvent struct { - Op model.EtcdOperation - Revision int64 - Time time.Duration -} - -func patchOperationBasedOnWatchEvents(operations []porcupine.Operation, watchEvents []watchEvent) []porcupine.Operation { - newOperations := make([]porcupine.Operation, 0, len(operations)) - persisted := map[model.EtcdOperation]watchEvent{} - for _, op := range watchEvents { - persisted[op.Op] = op +func operationsFromClientReports(reports []traffic.ClientReport) []porcupine.Operation { + operations := []porcupine.Operation{} + persisted := map[model.EtcdOperation]traffic.TimedWatchEvent{} + for _, r := range reports { + operations = append(operations, r.OperationHistory.Operations()...) + for _, resp := range r.Watch { + for _, event := range resp.Events { + persisted[event.Op] = traffic.TimedWatchEvent{Time: resp.Time, WatchEvent: event} + } + } } + newOperations := make([]porcupine.Operation, 0, len(operations)) lastObservedOperation := lastOperationObservedInWatch(operations, persisted) for _, op := range operations { @@ -298,7 +298,7 @@ func patchOperationBasedOnWatchEvents(operations []porcupine.Operation, watchEve return newOperations } -func lastOperationObservedInWatch(operations []porcupine.Operation, watchEvents map[model.EtcdOperation]watchEvent) porcupine.Operation { +func lastOperationObservedInWatch(operations []porcupine.Operation, watchEvents map[model.EtcdOperation]traffic.TimedWatchEvent) porcupine.Operation { var maxCallTime int64 var lastOperation porcupine.Operation for _, op := range operations { @@ -315,8 +315,8 @@ func lastOperationObservedInWatch(operations []porcupine.Operation, watchEvents return lastOperation } -func matchWatchEvent(request *model.TxnRequest, watchEvents map[model.EtcdOperation]watchEvent) *watchEvent { - for _, etcdOp := range request.OperationsOnSuccess { +func matchWatchEvent(request *model.TxnRequest, watchEvents map[model.EtcdOperation]traffic.TimedWatchEvent) *traffic.TimedWatchEvent { + for _, etcdOp := range append(request.OperationsOnSuccess, request.OperationsOnFailure...) { if etcdOp.Type == model.Put { // Remove LeaseID which is not exposed in watch. event, ok := watchEvents[model.EtcdOperation{ @@ -350,31 +350,40 @@ func hasUniqueWriteOperation(request *model.TxnRequest) bool { return false } -func watchEvents(responses [][]traffic.WatchResponse) [][]watchEvent { - ops := make([][]watchEvent, len(responses)) - for i, resps := range responses { - ops[i] = toWatchEvents(resps) +func validateEventsMatch(t *testing.T, reports []traffic.ClientReport) { + type revisionKey struct { + revision int64 + key string } - return ops -} - -func validateEventsMatch(t *testing.T, histories [][]watchEvent) { - longestHistory := longestHistory(histories) - for i := 0; i < len(histories); i++ { - length := len(histories[i]) - // We compare prefix of watch events, as we are not guaranteed to collect all events from each node. - if diff := cmp.Diff(longestHistory[:length], histories[i][:length], cmpopts.IgnoreFields(watchEvent{}, "Time")); diff != "" { - t.Error("Events in watches do not match") + type eventClientId struct { + traffic.WatchEvent + ClientId int + } + revisionKeyToEvent := map[revisionKey]eventClientId{} + for _, r := range reports { + for _, resp := range r.Watch { + for _, event := range resp.Events { + rk := revisionKey{key: event.Op.Key, revision: event.Revision} + if prev, found := revisionKeyToEvent[rk]; found { + if prev.WatchEvent != event { + t.Errorf("Events between clients %d and %d don't match, key: %q, revision: %d, diff: %s", prev.ClientId, r.ClientId, rk.key, rk.revision, cmp.Diff(prev, event)) + } + } + revisionKeyToEvent[rk] = eventClientId{ClientId: r.ClientId, WatchEvent: event} + } } } } -func longestHistory(histories [][]watchEvent) []watchEvent { +func longestEventHistory(report []traffic.ClientReport) []traffic.TimedWatchEvent { longestIndex := 0 - for i, history := range histories { - if len(history) > len(histories[longestIndex]) { + longestEventCount := 0 + for i, r := range report { + rEventCount := r.WatchEventCount() + if rEventCount > longestEventCount { longestIndex = i + longestEventCount = rEventCount } } - return histories[longestIndex] + return toWatchEvents(report[longestIndex].Watch) }