Skip to content

Commit

Permalink
Merge pull request #15325 from serathius/linearizability-refactor-report
Browse files Browse the repository at this point in the history
tests: Refactor file structure
  • Loading branch information
serathius authored Feb 16, 2023
2 parents ee6781b + a36951a commit e603d92
Show file tree
Hide file tree
Showing 7 changed files with 366 additions and 345 deletions.
33 changes: 33 additions & 0 deletions tests/linearizability/failpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,39 @@ var (
}}
)

func triggerFailpoints(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, config FailpointConfig) {
var err error
successes := 0
failures := 0
for _, proc := range clus.Procs {
if !config.failpoint.Available(proc) {
t.Errorf("Failpoint %q not available on %s", config.failpoint.Name(), proc.Config().Name)
return
}
}
for successes < config.count && failures < config.retries {
time.Sleep(config.waitBetweenTriggers)
lg.Info("Triggering failpoint\n", zap.String("failpoint", config.failpoint.Name()))
err = config.failpoint.Trigger(ctx, t, lg, clus)
if err != nil {
lg.Info("Failed to trigger failpoint", zap.String("failpoint", config.failpoint.Name()), zap.Error(err))
failures++
continue
}
successes++
}
if successes < config.count || failures >= config.retries {
t.Errorf("failed to trigger failpoints enough times, err: %v", err)
}
}

type FailpointConfig struct {
failpoint Failpoint
count int
retries int
waitBetweenTriggers time.Duration
}

type Failpoint interface {
Trigger(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error
Name() string
Expand Down
305 changes: 1 addition & 304 deletions tests/linearizability/linearizability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,15 @@ package linearizability

import (
"context"
"encoding/json"
"os"
"path/filepath"
"strings"
"sync"
"testing"
"time"

"github.com/anishathalye/porcupine"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"

"go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/linearizability/identity"
"go.etcd.io/etcd/tests/v3/linearizability/model"
)

Expand Down Expand Up @@ -198,42 +189,7 @@ func testLinearizability(ctx context.Context, t *testing.T, lg *zap.Logger, conf
validateEventsMatch(t, r.events)

r.patchedOperations = patchOperationBasedOnWatchEvents(r.operations, longestHistory(r.events))
r.visualizeHistory = validateOperationHistoryAndReturnVisualize(t, lg, r.patchedOperations)
}

type report struct {
lg *zap.Logger
clus *e2e.EtcdProcessCluster
responses [][]watchResponse
events [][]watchEvent
operations []porcupine.Operation
patchedOperations []porcupine.Operation
visualizeHistory func(path string)
}

func (r *report) Report(t *testing.T) {
path := testResultsDirectory(t)
if t.Failed() {
for i, member := range r.clus.Procs {
memberDataDir := filepath.Join(path, member.Config().Name)
persistMemberDataDir(t, r.lg, member, memberDataDir)
if r.responses != nil {
persistWatchResponses(t, r.lg, filepath.Join(memberDataDir, "responses.json"), r.responses[i])
}
if r.events != nil {
persistWatchEvents(t, r.lg, filepath.Join(memberDataDir, "events.json"), r.events[i])
}
}
if r.operations != nil {
persistOperationHistory(t, r.lg, filepath.Join(path, "full-history.json"), r.operations)
}
if r.patchedOperations != nil {
persistOperationHistory(t, r.lg, filepath.Join(path, "patched-history.json"), r.patchedOperations)
}
}
if r.visualizeHistory != nil {
r.visualizeHistory(filepath.Join(path, "history.html"))
}
r.visualizeHistory = model.ValidateOperationHistoryAndReturnVisualize(t, lg, r.patchedOperations)
}

func runScenario(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, traffic trafficConfig, failpoint FailpointConfig) (operations []porcupine.Operation, responses [][]watchResponse) {
Expand Down Expand Up @@ -261,265 +217,6 @@ func runScenario(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.Et
return operations, responses
}

func patchOperationBasedOnWatchEvents(operations []porcupine.Operation, watchEvents []watchEvent) []porcupine.Operation {
newOperations := make([]porcupine.Operation, 0, len(operations))
persisted := map[model.EtcdOperation]watchEvent{}
for _, op := range watchEvents {
persisted[op.Op] = op
}
lastObservedOperation := lastOperationObservedInWatch(operations, persisted)

for _, op := range operations {
request := op.Input.(model.EtcdRequest)
resp := op.Output.(model.EtcdResponse)
if resp.Err == nil || op.Call > lastObservedOperation.Call || request.Type != model.Txn {
// Cannot patch those requests.
newOperations = append(newOperations, op)
continue
}
event := matchWatchEvent(request.Txn, persisted)
if event != nil {
// Set revision and time based on watchEvent.
op.Return = event.Time.UnixNano()
op.Output = model.EtcdResponse{
Revision: event.Revision,
ResultUnknown: true,
}
newOperations = append(newOperations, op)
continue
}
if hasNonUniqueWriteOperation(request.Txn) && !hasUniqueWriteOperation(request.Txn) {
// Leave operation as it is as we cannot match non-unique operations to watch events.
newOperations = append(newOperations, op)
continue
}
// Remove non persisted operations
}
return newOperations
}

func lastOperationObservedInWatch(operations []porcupine.Operation, watchEvents map[model.EtcdOperation]watchEvent) porcupine.Operation {
var maxCallTime int64
var lastOperation porcupine.Operation
for _, op := range operations {
request := op.Input.(model.EtcdRequest)
if request.Type != model.Txn {
continue
}
event := matchWatchEvent(request.Txn, watchEvents)
if event != nil && op.Call > maxCallTime {
maxCallTime = op.Call
lastOperation = op
}
}
return lastOperation
}

func matchWatchEvent(request *model.TxnRequest, watchEvents map[model.EtcdOperation]watchEvent) *watchEvent {
for _, etcdOp := range request.Ops {
if etcdOp.Type == model.Put {
// Remove LeaseID which is not exposed in watch.
event, ok := watchEvents[model.EtcdOperation{
Type: etcdOp.Type,
Key: etcdOp.Key,
Value: etcdOp.Value,
}]
if ok {
return &event
}
}
}
return nil
}

func hasNonUniqueWriteOperation(request *model.TxnRequest) bool {
for _, etcdOp := range request.Ops {
if etcdOp.Type == model.Put || etcdOp.Type == model.Delete {
return true
}
}
return false
}

func hasUniqueWriteOperation(request *model.TxnRequest) bool {
for _, etcdOp := range request.Ops {
if etcdOp.Type == model.Put {
return true
}
}
return false
}

func triggerFailpoints(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, config FailpointConfig) {
var err error
successes := 0
failures := 0
for _, proc := range clus.Procs {
if !config.failpoint.Available(proc) {
t.Errorf("Failpoint %q not available on %s", config.failpoint.Name(), proc.Config().Name)
return
}
}
for successes < config.count && failures < config.retries {
time.Sleep(config.waitBetweenTriggers)
lg.Info("Triggering failpoint\n", zap.String("failpoint", config.failpoint.Name()))
err = config.failpoint.Trigger(ctx, t, lg, clus)
if err != nil {
lg.Info("Failed to trigger failpoint", zap.String("failpoint", config.failpoint.Name()), zap.Error(err))
failures++
continue
}
successes++
}
if successes < config.count || failures >= config.retries {
t.Errorf("failed to trigger failpoints enough times, err: %v", err)
}
}

type FailpointConfig struct {
failpoint Failpoint
count int
retries int
waitBetweenTriggers time.Duration
}

func simulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, config trafficConfig) []porcupine.Operation {
mux := sync.Mutex{}
endpoints := clus.EndpointsV3()

ids := identity.NewIdProvider()
lm := identity.NewLeaseIdStorage()
h := model.History{}
limiter := rate.NewLimiter(rate.Limit(config.maximalQPS), 200)

startTime := time.Now()
wg := sync.WaitGroup{}
for i := 0; i < config.clientCount; i++ {
wg.Add(1)
endpoints := []string{endpoints[i%len(endpoints)]}
c, err := NewClient(endpoints, ids, startTime)
if err != nil {
t.Fatal(err)
}
go func(c *recordingClient, clientId int) {
defer wg.Done()
defer c.Close()

config.traffic.Run(ctx, clientId, c, limiter, ids, lm)
mux.Lock()
h = h.Merge(c.history.History)
mux.Unlock()
}(c, i)
}
wg.Wait()
endTime := time.Now()
operations := h.Operations()
lg.Info("Recorded operations", zap.Int("count", len(operations)))

qps := float64(len(operations)) / float64(endTime.Sub(startTime)) * float64(time.Second)
lg.Info("Average traffic", zap.Float64("qps", qps))
if qps < config.minimalQPS {
t.Errorf("Requiring minimal %f qps for test results to be reliable, got %f qps", config.minimalQPS, qps)
}
return operations
}

type trafficConfig struct {
name string
minimalQPS float64
maximalQPS float64
clientCount int
traffic Traffic
}

func watchEvents(responses [][]watchResponse) [][]watchEvent {
ops := make([][]watchEvent, len(responses))
for i, resps := range responses {
ops[i] = toWatchEvents(resps)
}
return ops
}

func validateEventsMatch(t *testing.T, histories [][]watchEvent) {
longestHistory := longestHistory(histories)
for i := 0; i < len(histories); i++ {
length := len(histories[i])
// We compare prefix of watch events, as we are not guaranteed to collect all events from each node.
if diff := cmp.Diff(longestHistory[:length], histories[i][:length], cmpopts.IgnoreFields(watchEvent{}, "Time")); diff != "" {
t.Error("Events in watches do not match")
}
}
}

func longestHistory(histories [][]watchEvent) []watchEvent {
longestIndex := 0
for i, history := range histories {
if len(history) > len(histories[longestIndex]) {
longestIndex = i
}
}
return histories[longestIndex]
}

// return visualize as porcupine.linearizationInfo used to generate visualization is private
func validateOperationHistoryAndReturnVisualize(t *testing.T, lg *zap.Logger, operations []porcupine.Operation) (visualize func(basepath string)) {
linearizable, info := porcupine.CheckOperationsVerbose(model.Etcd, operations, 5*time.Minute)
if linearizable == porcupine.Illegal {
t.Error("Model is not linearizable")
}
if linearizable == porcupine.Unknown {
t.Error("Linearization timed out")
}
return func(path string) {
lg.Info("Saving visualization", zap.String("path", path))
err := porcupine.VisualizePath(model.Etcd, info, path)
if err != nil {
t.Errorf("Failed to visualize, err: %v", err)
}
}
}

func persistOperationHistory(t *testing.T, lg *zap.Logger, path string, operations []porcupine.Operation) {
lg.Info("Saving operation history", zap.String("path", path))
file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755)
if err != nil {
t.Errorf("Failed to save operation history: %v", err)
return
}
defer file.Close()
encoder := json.NewEncoder(file)
for _, op := range operations {
err := encoder.Encode(op)
if err != nil {
t.Errorf("Failed to encode operation: %v", err)
}
}
}

func persistMemberDataDir(t *testing.T, lg *zap.Logger, member e2e.EtcdProcess, path string) {
lg.Info("Saving member data dir", zap.String("member", member.Config().Name), zap.String("path", path))
err := os.Rename(member.Config().DataDirPath, path)
if err != nil {
t.Fatal(err)
}
}

func testResultsDirectory(t *testing.T) string {
path, err := filepath.Abs(filepath.Join(resultsDirectory, strings.ReplaceAll(t.Name(), "/", "_")))
if err != nil {
t.Fatal(err)
}
err = os.RemoveAll(path)
if err != nil {
t.Fatal(err)
}
err = os.MkdirAll(path, 0700)
if err != nil {
t.Fatal(err)
}
return path
}

// forcestopCluster stops the etcd member with signal kill.
func forcestopCluster(clus *e2e.EtcdProcessCluster) error {
for _, member := range clus.Procs {
Expand Down
Loading

0 comments on commit e603d92

Please sign in to comment.