Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

functional-tester: add liveness mode #9528

Merged
merged 12 commits into from
Apr 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions test
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ function cov_pass {
mkdir -p "$COVERDIR"

# run code coverage for unit and integration tests
GOCOVFLAGS="-covermode=set -coverpkg ${PKGS_COMMA} -v -timeout 15m"
GOCOVFLAGS="-covermode=set -coverpkg ${PKGS_COMMA} -v -timeout 20m"
# shellcheck disable=SC2206
GOCOVFLAGS=($GOCOVFLAGS)
failed=""
Expand Down Expand Up @@ -292,7 +292,7 @@ function e2e_pass {
# expectation could be different
USERTIMEOUT=""
if [ -z "${TIMEOUT}" ]; then
USERTIMEOUT="15m"
USERTIMEOUT="20m"
else
USERTIMEOUT="${TIMEOUT}"
fi
Expand All @@ -303,9 +303,9 @@ function e2e_pass {
function integration_e2e_pass {
echo "Running integration and e2e tests..."

go test -timeout 15m -v -cpu 1,2,4 "$@" "${REPO_PATH}/e2e" &
go test -timeout 20m -v -cpu 1,2,4 "$@" "${REPO_PATH}/e2e" &
e2epid="$!"
go test -timeout 15m -v -cpu 1,2,4 "$@" "${REPO_PATH}/integration" &
go test -timeout 20m -v -cpu 1,2,4 "$@" "${REPO_PATH}/integration" &
intpid="$!"
wait $e2epid
wait $intpid
Expand All @@ -315,7 +315,7 @@ function integration_e2e_pass {
function grpcproxy_pass {
go test -timeout 20m -v ${RACE} -tags cluster_proxy -cpu 1,2,4 "$@" "${REPO_PATH}/integration"
go test -timeout 20m -v ${RACE} -tags cluster_proxy -cpu 1,2,4 "$@" "${REPO_PATH}/clientv3/integration"
go test -timeout 15m -v -tags cluster_proxy "$@" "${REPO_PATH}/e2e"
go test -timeout 20m -v -tags cluster_proxy "$@" "${REPO_PATH}/e2e"
}

function release_pass {
Expand Down
364 changes: 217 additions & 147 deletions tools/functional-tester/rpcpb/rpc.pb.go

Large diffs are not rendered by default.

37 changes: 22 additions & 15 deletions tools/functional-tester/rpcpb/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -100,23 +100,24 @@ enum FailureCase {
DELAY_PEER_PORT_TX_RX_LEADER = 10;
DELAY_PEER_PORT_TX_RX_ALL = 11;

FAILPOINTS = 100;

NO_FAIL = 200;
// TODO: support no-op of liveness duration
// NO_FAIL_LIVENESS = 201;

// NO_FAIL_WITH_STRESS runs no-op failure injection for specified period
// while stressers are still sending requests.
NO_FAIL_WITH_STRESS = 100;
// NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS runs no-op failure injection
// with all stressers stopped.
NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS = 101;

FAILPOINTS = 200;
EXTERNAL = 300;
}

enum StressType {
KV = 0;
LEASE = 1;
NO_STRESS = 2;
ELECTION_RUNNER = 3;
WATCH_RUNNER = 4;
LOCK_RACER_RUNNER = 5;
LEASE_RUNNER = 6;
ELECTION_RUNNER = 2;
WATCH_RUNNER = 3;
LOCK_RACER_RUNNER = 4;
LEASE_RUNNER = 5;
}

message Tester {
Expand All @@ -140,12 +141,14 @@ message Tester {

// FailureCases is the selected test cases to schedule.
// If empty, run all failure cases.
// TODO: support no-op
repeated string FailureCases = 31 [(gogoproto.moretags) = "yaml:\"failure-cases\""];
// FailureDelayMs is the delay duration after failure is injected.
// Useful when triggering snapshot or no-op failure cases.
uint32 FailureDelayMs = 32 [(gogoproto.moretags) = "yaml:\"failure-delay-ms\""];
// FailureShuffle is true to randomize failure injecting order.
bool FailureShuffle = 32 [(gogoproto.moretags) = "yaml:\"failure-shuffle\""];
bool FailureShuffle = 33 [(gogoproto.moretags) = "yaml:\"failure-shuffle\""];
// FailpointCommands is the list of "gofail" commands (e.g. panic("etcd-tester"),1*sleep(1000)).
repeated string FailpointCommands = 33 [(gogoproto.moretags) = "yaml:\"failpoint-commands\""];
repeated string FailpointCommands = 34 [(gogoproto.moretags) = "yaml:\"failpoint-commands\""];

// RunnerExecPath is a path of etcd-runner binary.
string RunnerExecPath = 41 [(gogoproto.moretags) = "yaml:\"runner-exec-path\""];
Expand All @@ -167,8 +170,12 @@ message Tester {
int32 StressKeySuffixRangeTxn = 105 [(gogoproto.moretags) = "yaml:\"stress-key-suffix-range-txn\""];
// StressKeyTxnOps is the number of operations per a transaction (max 64).
int32 StressKeyTxnOps = 106 [(gogoproto.moretags) = "yaml:\"stress-key-txn-ops\""];

// StressClients is the number of concurrent stressing clients
// with "one" shared TCP connection.
int32 StressClients = 201 [(gogoproto.moretags) = "yaml:\"stress-clients\""];
// StressQPS is the maximum number of stresser requests per second.
int32 StressQPS = 107 [(gogoproto.moretags) = "yaml:\"stress-qps\""];
int32 StressQPS = 202 [(gogoproto.moretags) = "yaml:\"stress-qps\""];
}

message Request {
Expand Down
74 changes: 20 additions & 54 deletions tools/functional-tester/tester/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,27 +276,29 @@ func (clus *Cluster) updateFailures() {
case "KILL_ALL":
clus.failures = append(clus.failures, newFailureKillAll())
case "BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER":
clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxOneFollower())
clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxOneFollower(clus))
case "BLACKHOLE_PEER_PORT_TX_RX_LEADER":
clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxLeader())
clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxLeader(clus))
case "BLACKHOLE_PEER_PORT_TX_RX_ALL":
clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxAll())
clus.failures = append(clus.failures, newFailureBlackholePeerPortTxRxAll(clus))
case "DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER":
clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxOneFollower(clus))
case "DELAY_PEER_PORT_TX_RX_LEADER":
clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxLeader(clus))
case "DELAY_PEER_PORT_TX_RX_ALL":
clus.failures = append(clus.failures, newFailureDelayPeerPortTxRxAll(clus))
case "NO_FAIL_WITH_STRESS":
clus.failures = append(clus.failures, newFailureNoFailWithStress(clus))
case "NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS":
clus.failures = append(clus.failures, newFailureNoFailWithNoStressForLiveness(clus))
case "EXTERNAL":
clus.failures = append(clus.failures, newFailureExternal(clus.Tester.ExternalExecPath))
case "FAILPOINTS":
fpFailures, fperr := failpointFailures(clus)
if len(fpFailures) == 0 {
clus.lg.Info("no failpoints found!", zap.Error(fperr))
}
clus.failures = append(clus.failures, fpFailures...)
case "NO_FAIL":
clus.failures = append(clus.failures, newFailureNoOp())
case "EXTERNAL":
clus.failures = append(clus.failures, newFailureExternal(clus.Tester.ExternalExecPath))
}
}
}
Expand Down Expand Up @@ -360,8 +362,8 @@ func (clus *Cluster) updateStresserChecker() {
)

cs := &compositeStresser{}
for idx := range clus.Members {
cs.stressers = append(cs.stressers, newStresser(clus, idx))
for _, m := range clus.Members {
cs.stressers = append(cs.stressers, newStresser(clus, m))
}
clus.stresser = cs

Expand All @@ -381,49 +383,6 @@ func (clus *Cluster) updateStresserChecker() {
)
}

func (clus *Cluster) startStresser() (err error) {
clus.lg.Info(
"starting stressers",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
)
err = clus.stresser.Stress()
clus.lg.Info(
"started stressers",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
)
return err
}

func (clus *Cluster) closeStresser() {
clus.lg.Info(
"closing stressers",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
)
clus.stresser.Close()
clus.lg.Info(
"closed stressers",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
)
}

func (clus *Cluster) pauseStresser() {
clus.lg.Info(
"pausing stressers",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
)
clus.stresser.Pause()
clus.lg.Info(
"paused stressers",
zap.Int("round", clus.rd),
zap.Int("case", clus.cs),
)
}

func (clus *Cluster) checkConsistency() (err error) {
defer func() {
if err != nil {
Expand All @@ -436,7 +395,6 @@ func (clus *Cluster) checkConsistency() (err error) {
)
return
}
err = clus.startStresser()
}()

clus.lg.Info(
Expand Down Expand Up @@ -759,4 +717,12 @@ func (clus *Cluster) defrag() error {
return nil
}

func (clus *Cluster) Report() int64 { return clus.stresser.ModifiedKeys() }
// GetFailureDelayDuration computes failure delay duration.
func (clus *Cluster) GetFailureDelayDuration() time.Duration {
return time.Duration(clus.Tester.FailureDelayMs) * time.Millisecond
}

// Report reports the number of modified keys.
func (clus *Cluster) Report() int64 {
return clus.stresser.ModifiedKeys()
}
4 changes: 4 additions & 0 deletions tools/functional-tester/tester/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,10 @@ func Test_newCluster(t *testing.T) {
"DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER",
"DELAY_PEER_PORT_TX_RX_LEADER",
"DELAY_PEER_PORT_TX_RX_ALL",
"NO_FAIL_WITH_STRESS",
"NO_FAIL_WITH_NO_STRESS_FOR_LIVENESS",
},
FailureDelayMs: 7000,
FailureShuffle: true,
FailpointCommands: []string{`panic("etcd-tester")`},
RunnerExecPath: "/etcd-runner",
Expand All @@ -142,6 +145,7 @@ func Test_newCluster(t *testing.T) {
StressKeySuffixRange: 250000,
StressKeySuffixRangeTxn: 100,
StressKeyTxnOps: 10,
StressClients: 100,
StressQPS: 1000,
},
}
Expand Down
Loading