diff --git a/tests/robustness/traffic/traffic.go b/tests/robustness/traffic/traffic.go index 41feed38853..23ca9e653b2 100644 --- a/tests/robustness/traffic/traffic.go +++ b/tests/robustness/traffic/traffic.go @@ -64,6 +64,11 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2 t.Fatal(err) } defer cc.Close() + // Ensure that first operation succeeds + _, err = cc.Put(ctx, "start", "true") + if err != nil { + t.Fatalf("First operation failed, validation requires first operation to succeed, err: %s", err) + } wg := sync.WaitGroup{} nonUniqueWriteLimiter := NewConcurrencyLimiter(profile.MaxNonUniqueRequestConcurrency) for i := 0; i < profile.ClientCount; i++ { @@ -85,11 +90,11 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2 wg.Wait() endTime := time.Now() - // Ensure that last operation is succeeds time.Sleep(time.Second) + // Ensure that last operation succeeds _, err = cc.Put(ctx, "tombstone", "true") if err != nil { - t.Error(err) + t.Fatalf("Last operation failed, validation requires last operation to succeed, err: %s", err) } reports = append(reports, cc.Report()) diff --git a/tests/robustness/validate/validate.go b/tests/robustness/validate/validate.go index 3e27ba49164..d2b875f74b4 100644 --- a/tests/robustness/validate/validate.go +++ b/tests/robustness/validate/validate.go @@ -30,6 +30,10 @@ import ( // ValidateAndReturnVisualize returns visualize as porcupine.linearizationInfo used to generate visualization is private. func ValidateAndReturnVisualize(t *testing.T, lg *zap.Logger, cfg Config, reports []report.ClientReport, timeout time.Duration) (visualize func(basepath string) error) { + err := checkValidationAssumptions(reports) + if err != nil { + t.Fatalf("Broken validation assumptions: %s", err) + } patchedOperations := patchedOperationHistory(reports) linearizable, visualize := validateLinearizableOperationsAndVisualize(lg, patchedOperations, timeout) if linearizable != porcupine.Ok { @@ -52,6 +56,149 @@ type Config struct { ExpectRevisionUnique bool } +func checkValidationAssumptions(reports []report.ClientReport) error { + err := validatePutOperationUnique(reports) + if err != nil { + return err + } + err = validateEmptyDatabaseAtStart(reports) + if err != nil { + return err + } + err = validateLastOperationAndObservedInWatch(reports) + if err != nil { + return err + } + err = validateObservedAllRevisionsInWatch(reports) + if err != nil { + return err + } + err = validateNonConcurrentClientRequests(reports) + if err != nil { + return err + } + return nil +} + +func validatePutOperationUnique(reports []report.ClientReport) error { + type KV struct { + Key string + Value model.ValueOrHash + } + putValue := map[KV]struct{}{} + for _, r := range reports { + for _, op := range r.KeyValue { + request := op.Input.(model.EtcdRequest) + if request.Type != model.Txn { + continue + } + for _, op := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) { + if op.Type != model.PutOperation { + continue + } + kv := KV{ + Key: op.Put.Key, + Value: op.Put.Value, + } + if _, ok := putValue[kv]; ok { + return fmt.Errorf("non unique put %v, required to patch operation history", kv) + } + putValue[kv] = struct{}{} + } + } + } + return nil +} + +func validateEmptyDatabaseAtStart(reports []report.ClientReport) error { + for _, r := range reports { + for _, op := range r.KeyValue { + request := op.Input.(model.EtcdRequest) + response := op.Output.(model.MaybeEtcdResponse) + if response.Revision == 2 && !request.IsRead() { + return nil + } + } + } + return fmt.Errorf("non empty database at start or first write didn't succeed, required by model implementation") +} + +func validateLastOperationAndObservedInWatch(reports []report.ClientReport) error { + var lastOperation porcupine.Operation + + for _, r := range reports { + for _, op := range r.KeyValue { + if op.Call > lastOperation.Call { + lastOperation = op + } + } + } + lastResponse := lastOperation.Output.(model.MaybeEtcdResponse) + if lastResponse.PartialResponse || lastResponse.Error != "" { + return fmt.Errorf("last operation %v failed, its success is required to validate watch", lastOperation) + } + for _, r := range reports { + for _, watch := range r.Watch { + for _, watchResp := range watch.Responses { + for _, e := range watchResp.Events { + if e.Revision == lastResponse.Revision { + return nil + } + } + } + } + } + return fmt.Errorf("revision from the last operation %d was not observed in watch, required to validate watch", lastResponse.Revision) +} + +func validateObservedAllRevisionsInWatch(reports []report.ClientReport) error { + var maxRevision int64 + for _, r := range reports { + for _, watch := range r.Watch { + for _, watchResp := range watch.Responses { + for _, e := range watchResp.Events { + if e.Revision > maxRevision { + maxRevision = e.Revision + } + } + } + } + } + observedRevisions := make([]bool, maxRevision+1) + for _, r := range reports { + for _, watch := range r.Watch { + for _, watchResp := range watch.Responses { + for _, e := range watchResp.Events { + observedRevisions[e.Revision] = true + } + } + } + } + for i := 2; i < len(observedRevisions); i++ { + if !observedRevisions[i] { + return fmt.Errorf("didn't observe revision %d in watch, required to patch operation and validate serializable requests", i) + } + } + return nil +} + +func validateNonConcurrentClientRequests(reports []report.ClientReport) error { + lastClientRequestReturn := map[int]int64{} + for _, r := range reports { + for _, op := range r.KeyValue { + lastRequest := lastClientRequestReturn[op.ClientId] + if op.Call <= lastRequest { + return fmt.Errorf("client %d has concurrent request, required for operation linearization", op.ClientId) + } + if op.Return <= op.Call { + return fmt.Errorf("operation %v ends before it starts, required for operation linearization", op) + } + lastClientRequestReturn[op.ClientId] = op.Return + } + } + return nil +} + func mergeWatchEventHistory(reports []report.ClientReport) ([]model.PersistedEvent, error) { type revisionEvents struct { events []model.PersistedEvent