Skip to content

Commit

Permalink
Add explicit checks for assumptions in robustness test validation
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
  • Loading branch information
serathius committed Apr 9, 2024
1 parent 65ac859 commit a2ecd1b
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 1 deletion.
6 changes: 5 additions & 1 deletion tests/robustness/traffic/traffic.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
t.Fatal(err)
}
defer cc.Close()
_, err = cc.Put(ctx, "start", "true")
if err != nil {
t.Fatal(err)
}
wg := sync.WaitGroup{}
nonUniqueWriteLimiter := NewConcurrencyLimiter(profile.MaxNonUniqueRequestConcurrency)
for i := 0; i < profile.ClientCount; i++ {
Expand All @@ -89,7 +93,7 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
time.Sleep(time.Second)
_, err = cc.Put(ctx, "tombstone", "true")
if err != nil {
t.Error(err)
t.Fatal(err)
}
reports = append(reports, cc.Report())

Expand Down
145 changes: 145 additions & 0 deletions tests/robustness/validate/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ import (

// ValidateAndReturnVisualize returns visualize as porcupine.linearizationInfo used to generate visualization is private.
func ValidateAndReturnVisualize(t *testing.T, lg *zap.Logger, cfg Config, reports []report.ClientReport, timeout time.Duration) (visualize func(basepath string) error) {
err := checkValidationAssumptions(reports)
if err != nil {
t.Fatalf("Broken validation assumptions: %s", err)
}
patchedOperations := patchedOperationHistory(reports)
linearizable, visualize := validateLinearizableOperationsAndVisualize(lg, patchedOperations, timeout)
if linearizable != porcupine.Ok {
Expand All @@ -52,6 +56,147 @@ type Config struct {
ExpectRevisionUnique bool
}

func checkValidationAssumptions(reports []report.ClientReport) error {
err := putValueUnique(reports)
if err != nil {
return err
}
err = emptyDatabaseAtStart(reports)
if err != nil {
return err
}
err = lastOperationSucceededAndObservedInWatch(reports)
if err != nil {
return err
}
err = observedAllRevisionsInWatch(reports)
if err != nil {
return err
}
err = nonConcurrentClientRequests(reports)
if err != nil {
return err
}
return nil
}

func putValueUnique(reports []report.ClientReport) error {
putValue := map[model.ValueOrHash]struct{}{}
for _, r := range reports {
for _, op := range r.KeyValue {
request := op.Input.(model.EtcdRequest)
if request.Type != model.Txn {
continue
}
for _, op := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) {
if op.Type != model.PutOperation {
continue
}
if _, ok := putValue[op.Put.Value]; ok {
return fmt.Errorf("non unique put value %v, required to patch operation history", op.Put.Value)
}
putValue[op.Put.Value] = struct{}{}
}
}
}
return nil
}

func emptyDatabaseAtStart(reports []report.ClientReport) error {
for _, r := range reports {
for _, op := range r.KeyValue {
response := op.Output.(model.MaybeEtcdResponse)
if response.Revision == 2 {
return nil
}
}
}
return fmt.Errorf("non empty database at start or first write didn't succeed, required by model implementation")
}

func lastOperationSucceededAndObservedInWatch(reports []report.ClientReport) error {
var lastOperation porcupine.Operation

for _, r := range reports {
for _, op := range r.KeyValue {
if op.Call > lastOperation.Call {
lastOperation = op
}
}
}
response := lastOperation.Output.(model.MaybeEtcdResponse)
if response.PartialResponse || response.Error != "" {
return fmt.Errorf("last operation %v failed, required to validate watch", lastOperation)
}
request := lastOperation.Input.(model.EtcdRequest)
if request.Type != model.Txn || len(request.Txn.Conditions) != 0 || len(request.Txn.OperationsOnSuccess) != 1 || request.Txn.OperationsOnSuccess[0].Type != model.PutOperation || request.Txn.OperationsOnSuccess[0].Put.Key != "tombstone" {
return fmt.Errorf("last operation %+v didn't write `end` key, required to validate watch", request)
}
for _, r := range reports {
for _, watch := range r.Watch {
for _, watchResp := range watch.Responses {
for _, e := range watchResp.Events {
if e.Revision > response.Revision {
return fmt.Errorf("observed watch event %v after last operation, required to validate watch", e)
}
if e.Revision == response.Revision && e.Key != "tombstone" {
return fmt.Errorf("observed watch event %v different than last operation, required to validate watch", e)
}
}
}
}
}
return nil
}

func observedAllRevisionsInWatch(reports []report.ClientReport) error {
var maxRevision int64
for _, r := range reports {
for _, watch := range r.Watch {
for _, watchResp := range watch.Responses {
for _, e := range watchResp.Events {
if e.Revision > maxRevision {
maxRevision = e.Revision
}
}
}
}
}
observedRevisions := make([]bool, maxRevision+1)
for _, r := range reports {
for _, watch := range r.Watch {
for _, watchResp := range watch.Responses {
for _, e := range watchResp.Events {
observedRevisions[e.Revision] = true
}
}
}
}
for i := 2; i < len(observedRevisions); i++ {
if !observedRevisions[i] {
return fmt.Errorf("didn't observe revision %d in watch, required to patch operation and validate serializable requests", i)
}
}
return nil
}

func nonConcurrentClientRequests(reports []report.ClientReport) error {
lastClientRequestReturn := map[int]int64{}
for _, r := range reports {
for _, op := range r.KeyValue {
lastRequest := lastClientRequestReturn[op.ClientId]
if op.Call <= lastRequest {
return fmt.Errorf("client %d has concurrent request, required for operation linearization", op.ClientId)
}
if op.Return <= op.Call {
return fmt.Errorf("operation %v ends before it starts, required for operation linearization", op)
}
lastClientRequestReturn[op.ClientId] = op.Return
}
}
return nil
}

func mergeWatchEventHistory(reports []report.ClientReport) ([]model.PersistedEvent, error) {
type revisionEvents struct {
events []model.PersistedEvent
Expand Down

0 comments on commit a2ecd1b

Please sign in to comment.