Skip to content

Commit

Permalink
Merge pull request #14304 from clarkfw/e2e-cluster-waitleader
Browse files Browse the repository at this point in the history
tests: add WaitLeader function to common framework
  • Loading branch information
ahrtr authored Aug 14, 2022
2 parents fff5d00 + a5409c6 commit 012fc51
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 5 deletions.
79 changes: 79 additions & 0 deletions tests/common/wait_leader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2022 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package common

import (
"context"
"testing"
"time"

"go.etcd.io/etcd/tests/v3/framework/config"
)

func TestWaitLeader(t *testing.T) {
testRunner.BeforeTest(t)

for _, tc := range clusterTestCases {
t.Run(tc.name, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
clus := testRunner.NewCluster(ctx, t, tc.config)
defer clus.Close()

leader := clus.WaitLeader(t)
if leader < 0 || leader >= len(clus.Members()) {
t.Fatalf("WaitLeader failed for the leader index (%d) is out of range, cluster member count: %d", leader, len(clus.Members()))
}
})
}
}

func TestWaitLeader_MemberStop(t *testing.T) {
testRunner.BeforeTest(t)
tcs := []testCase{
{
name: "PeerTLS",
config: config.ClusterConfig{ClusterSize: 3, PeerTLS: config.ManualTLS},
},
{
name: "PeerAutoTLS",
config: config.ClusterConfig{ClusterSize: 3, PeerTLS: config.AutoTLS},
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
clus := testRunner.NewCluster(ctx, t, tc.config)
defer clus.Close()

lead1 := clus.WaitLeader(t)
if lead1 < 0 || lead1 >= len(clus.Members()) {
t.Fatalf("WaitLeader failed for the leader index (%d) is out of range, cluster member count: %d", lead1, len(clus.Members()))
}

clus.Members()[lead1].Stop()
lead2 := clus.WaitLeader(t)
if lead2 < 0 || lead2 >= len(clus.Members()) {
t.Fatalf("WaitLeader failed for the leader index (%d) is out of range, cluster member count: %d", lead2, len(clus.Members()))
}

if lead1 == lead2 {
t.Fatalf("WaitLeader failed for the leader(index=%d) did not change as expected after a member stopped", lead1)
}
})
}
}
70 changes: 70 additions & 0 deletions tests/framework/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@ package framework
import (
"context"
"os"
"strings"
"testing"
"time"

"go.etcd.io/etcd/client/pkg/v3/testutil"
"go.etcd.io/etcd/tests/v3/framework/config"
"go.etcd.io/etcd/tests/v3/framework/e2e"
)

const TickDuration = 10 * time.Millisecond

type e2eRunner struct{}

func (e e2eRunner) TestMain(m *testing.M) {
Expand Down Expand Up @@ -92,6 +96,72 @@ func (c *e2eCluster) Members() (ms []Member) {
return ms
}

// WaitLeader returns index of the member in c.Members() that is leader
// or fails the test (if not established in 30s).
func (c *e2eCluster) WaitLeader(t testing.TB) int {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
return c.WaitMembersForLeader(ctx, t, c.Members())
}

// WaitMembersForLeader waits until given members agree on the same leader,
// and returns its 'index' in the 'membs' list
func (c *e2eCluster) WaitMembersForLeader(ctx context.Context, t testing.TB, membs []Member) int {
cc := c.Client()

// ensure leader is up via linearizable get
for {
select {
case <-ctx.Done():
t.Fatal("WaitMembersForLeader timeout")
default:
}
_, err := cc.Get("0", config.GetOptions{Timeout: 10*TickDuration + time.Second})
if err == nil || strings.Contains(err.Error(), "Key not found") {
break
}
}

leaders := make(map[uint64]struct{})
members := make(map[uint64]int)
for {
select {
case <-ctx.Done():
t.Fatal("WaitMembersForLeader timeout")
default:
}
for i := range membs {
resp, err := membs[i].Client().Status()
if err != nil {
if strings.Contains(err.Error(), "connection refused") {
// if member[i] has stopped
continue
} else {
t.Fatal(err)
}
}
members[resp[0].Header.MemberId] = i
leaders[resp[0].Leader] = struct{}{}
}
// members agree on the same leader
if len(leaders) == 1 {
break
}
leaders = make(map[uint64]struct{})
members = make(map[uint64]int)
time.Sleep(10 * TickDuration)
}
for l := range leaders {
if index, ok := members[l]; ok {
t.Logf("members agree on a leader, members:%v , leader:%v", members, l)
return index
}
t.Fatalf("members agree on a leader which is not one of members, members:%v , leader:%v", members, l)
}
t.Fatal("impossible path of execution")
return -1
}

type e2eClient struct {
*e2e.EtcdctlV3
}
Expand Down
11 changes: 6 additions & 5 deletions tests/framework/integration/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"strings"
"sync"
"sync/atomic"
"testing"
"time"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
Expand Down Expand Up @@ -399,21 +400,21 @@ func (c *Cluster) WaitMembersMatch(t testutil.TB, membs []*pb.Member) {
}

// WaitLeader returns index of the member in c.Members that is leader
// or fails the test (if not established in 30min).
func (c *Cluster) WaitLeader(t testutil.TB) int {
// or fails the test (if not established in 30s).
func (c *Cluster) WaitLeader(t testing.TB) int {
return c.WaitMembersForLeader(t, c.Members)
}

// WaitMembersForLeader waits until given members agree on the same leader,
// and returns its 'index' in the 'membs' list
func (c *Cluster) WaitMembersForLeader(t testutil.TB, membs []*Member) int {
func (c *Cluster) WaitMembersForLeader(t testing.TB, membs []*Member) int {
t.Logf("WaitMembersForLeader")
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
l := 0
for l = c.waitMembersForLeader(ctx, t, membs); l < 0; {
if ctx.Err() != nil {
t.Fatal("WaitLeader FAILED: %v", ctx.Err())
t.Fatalf("WaitLeader FAILED: %v", ctx.Err())
}
}
t.Logf("WaitMembersForLeader succeeded. Cluster leader index: %v", l)
Expand All @@ -432,7 +433,7 @@ func (c *Cluster) WaitMembersForLeader(t testutil.TB, membs []*Member) int {

// WaitMembersForLeader waits until given members agree on the same leader,
// and returns its 'index' in the 'membs' list
func (c *Cluster) waitMembersForLeader(ctx context.Context, t testutil.TB, membs []*Member) int {
func (c *Cluster) waitMembersForLeader(ctx context.Context, t testing.TB, membs []*Member) int {
possibleLead := make(map[uint64]bool)
var lead uint64
for _, m := range membs {
Expand Down
1 change: 1 addition & 0 deletions tests/framework/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type testRunner interface {
type Cluster interface {
Members() []Member
Client() Client
WaitLeader(t testing.TB) int
Close() error
}

Expand Down

0 comments on commit 012fc51

Please sign in to comment.