Skip to content

Commit

Permalink
*: auto promote nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
maxenglander committed Oct 2, 2020
1 parent ab4cc3c commit baf146a
Show file tree
Hide file tree
Showing 23 changed files with 2,304 additions and 411 deletions.
30 changes: 23 additions & 7 deletions clientv3/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type (
Member pb.Member
MemberListResponse pb.MemberListResponse
MemberAddResponse pb.MemberAddResponse
MemberPromoteRule pb.MemberPromoteRule
MemberRemoveResponse pb.MemberRemoveResponse
MemberUpdateResponse pb.MemberUpdateResponse
MemberPromoteResponse pb.MemberPromoteResponse
Expand All @@ -36,19 +37,24 @@ type Cluster interface {
// MemberList lists the current cluster membership.
MemberList(ctx context.Context) (*MemberListResponse, error)

// MemberAdd adds a new member into the cluster.
// MemberAdd adds a new member as a node into the cluster.
MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error)

// MemberAddAsLearner adds a new learner member into the cluster.
MemberAddAsLearner(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error)

// MemberAddAsLearnerWithPromoteRules adds a new member as a learner
// which promotion rules which govern whether the member may be
// manually or automatically promoted.
MemberAddAsLearnerWithPromoteRules(ctx context.Context, peerAddrs []string, promoteRules []MemberPromoteRule) (*MemberAddResponse, error)

// MemberRemove removes an existing member from the cluster.
MemberRemove(ctx context.Context, id uint64) (*MemberRemoveResponse, error)

// MemberUpdate updates the peer addresses of the member.
MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*MemberUpdateResponse, error)

// MemberPromote promotes a member from raft learner (non-voting) to raft voting member.
// MemberPromote promotes a member from raft non-voting member to raft voting member.
MemberPromote(ctx context.Context, id uint64) (*MemberPromoteResponse, error)
}

Expand All @@ -74,22 +80,32 @@ func NewClusterFromClusterClient(remote pb.ClusterClient, c *Client) Cluster {
}

func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error) {
return c.memberAdd(ctx, peerAddrs, false)
return c.memberAdd(ctx, peerAddrs, false, []MemberPromoteRule{})
}

func (c *cluster) MemberAddAsLearnerWithPromoteRules(ctx context.Context, peerAddrs []string, promoteRules []MemberPromoteRule) (*MemberAddResponse, error) {
return c.memberAdd(ctx, peerAddrs, true, promoteRules)
}

func (c *cluster) MemberAddAsLearner(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error) {
return c.memberAdd(ctx, peerAddrs, true)
return c.memberAdd(ctx, peerAddrs, true, []MemberPromoteRule{})
}

func (c *cluster) memberAdd(ctx context.Context, peerAddrs []string, isLearner bool) (*MemberAddResponse, error) {
func (c *cluster) memberAdd(ctx context.Context, peerAddrs []string, isLearner bool, promoteRules []MemberPromoteRule) (*MemberAddResponse, error) {
// fail-fast before panic in rafthttp
if _, err := types.NewURLs(peerAddrs); err != nil {
return nil, err
}

pbRules := make([]*pb.MemberPromoteRule, len(promoteRules))
for i, rule := range promoteRules {
pbRules[i] = (*pb.MemberPromoteRule)(&rule)
}

r := &pb.MemberAddRequest{
PeerURLs: peerAddrs,
IsLearner: isLearner,
PeerURLs: peerAddrs,
IsLearner: isLearner,
PromoteRules: pbRules,
}
resp, err := c.remote.MemberAdd(ctx, r, c.callOpts...)
if err != nil {
Expand Down
22 changes: 22 additions & 0 deletions clientv3/example_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,28 @@ func ExampleCluster_memberAddAsLearner() {
// added member.IsLearner: true
}

func ExampleCluster_memberAddAsLearnerWithPromoteRules() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints[:2],
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()

peerURLs := endpoints[2:]
promoteRules := []clientv3.MemberPromoteRule{}
mresp, err := cli.MemberAddAsLearnerWithPromoteRules(context.Background(), peerURLs, promoteRules)
if err != nil {
log.Fatal(err)
}
fmt.Println("added member.PeerURLs:", mresp.Member.PeerURLs)
fmt.Println("added member.IsLearner:", mresp.Member.IsLearner)
// added member.PeerURLs: [http://localhost:32380]
// added member.IsLearner: true
}

func ExampleCluster_memberRemove() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints[1:],
Expand Down
151 changes: 151 additions & 0 deletions clientv3/integration/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"testing"
"time"

"go.etcd.io/etcd/v3/clientv3"
pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb"
"go.etcd.io/etcd/v3/integration"
"go.etcd.io/etcd/v3/pkg/testutil"
"go.etcd.io/etcd/v3/pkg/types"
Expand Down Expand Up @@ -216,6 +218,48 @@ func TestMemberAddForLearner(t *testing.T) {
}
}

func TestMemberAddForAutoPromotee(t *testing.T) {
defer testutil.AfterTest(t)

clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)

capi := clus.RandClient()

urls := []string{"http://127.0.0.1:1234"}
rules := []clientv3.MemberPromoteRule{
{
Auto: true,
Monitors: []*pb.MemberMonitor{
{
Op: pb.MemberMonitor_GREATER_EQUAL,
Type: pb.MemberMonitor_PROGRESS,
Threshold: 50,
Delay: 0,
},
},
},
}
resp, err := capi.MemberAddAsLearnerWithPromoteRules(context.Background(), urls, rules)
if err != nil {
t.Fatalf("failed to add member %v", err)
}

if !resp.Member.IsLearner {
t.Errorf("Added a member as auto-promoting node, got resp.Member.IsLearner = %v", resp.Member.IsLearner)
}

numberOfLearners := 0
for _, m := range resp.Members {
if m.IsLearner {
numberOfLearners++
}
}
if numberOfLearners != 1 {
t.Errorf("Added 1 auto-promoting learner to cluster, got %d", numberOfLearners)
}
}

func TestMemberPromote(t *testing.T) {
defer testutil.AfterTest(t)

Expand Down Expand Up @@ -377,6 +421,113 @@ func TestMemberPromoteMemberNotExist(t *testing.T) {
}
}

// Test that auto-promoting learners are automatically promoted to voters
// upon catching up with the leader.
func TestMemberAutoPromotion(t *testing.T) {
defer testutil.AfterTest(t)

clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)

// add auto-promoting member request can be sent to any server
// in cluster, the request will be auto-forwarded to leader on
// server-side. This test explicitly includes the server-side
// forwarding by sending the request to follower.
leaderIdx := clus.WaitLeader(t)
followerIdx := (leaderIdx + 1) % 3
capi := clus.Client(followerIdx)

urls := []string{"http://127.0.0.1:1234"}
rules := []clientv3.MemberPromoteRule{
{
Auto: true,
Monitors: []*pb.MemberMonitor{
{
Op: pb.MemberMonitor_GREATER_EQUAL,
Type: pb.MemberMonitor_PROGRESS,
Threshold: 50,
Delay: 0,
},
},
},
}
memberAddResp, err := capi.MemberAddAsLearnerWithPromoteRules(context.Background(), urls, rules)
if err != nil {
t.Fatalf("failed to add auto-promoting member %v", err)
}

if !memberAddResp.Member.IsLearner {
t.Fatalf("Added a member as auto-promoting learner, got resp.Member.IsLearner = %v", memberAddResp.Member.IsLearner)
}
learnerID := memberAddResp.Member.ID

numberOfLearners := 0
for _, m := range memberAddResp.Members {
if m.IsLearner {
numberOfLearners++
}
}
if numberOfLearners != 1 {
t.Fatalf("Added 1 (auto-promoting) learner node to cluster, got %d learners", numberOfLearners)
}

// Verify that cluster stored new member as learner
foundMember := false
if memberList, err := capi.MemberList(context.Background()); err != nil {
t.Errorf("before launching member, failed to get member list: %v", err)
} else {
for _, member := range memberList.Members {
if member.ID == learnerID {
foundMember = true
if member.IsLearner == false {
t.Errorf("expected new member %v to be stored as learner, but is stored as voter", learnerID)
}
if len(member.PromoteRules) == 0 {
t.Errorf("expected new member %v to have 1 promote rules, but had 0", learnerID)
}
}
}
}
if foundMember == false {
t.Errorf("expected to find new member %v in member list, but new member was not found", learnerID)
}

// create and launch auto-promoting learner member based on the response of V3 Member Add API.
// (the response has information on peer urls of the existing members in cluster)
learnerMember := clus.MustNewMember(t, memberAddResp)
clus.Members = append(clus.Members, learnerMember)
if err := learnerMember.Launch(); err != nil {
t.Fatal(err)
}

// retry until auto-promote success or timeout
timeout := time.After(5 * time.Second)
for {
select {
case <-time.After(500 * time.Millisecond):
case <-timeout:
t.Errorf("auto-promoting learner member was not promoted, last error: %v", err)
break
}

if memberList, err := capi.MemberList(context.Background()); err != nil {
t.Errorf("while waiting for member auto-promote, failed to get member list: %v", err)
} else {
success := false
for _, member := range memberList.Members {
if member.ID == learnerID {
if member.IsLearner == false {
success = true
}
}
}
if success {
break
}
}
}
}

// TestMaxLearnerInCluster verifies that the maximum number of learners allowed in a cluster is 1
func TestMaxLearnerInCluster(t *testing.T) {
defer testutil.AfterTest(t)
Expand Down
25 changes: 24 additions & 1 deletion etcdctl/ctlv3/command/member_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@ import (

"github.com/spf13/cobra"
"go.etcd.io/etcd/v3/clientv3"
pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb"
)

var (
memberPeerURLs string
isLearner bool
autoPromote bool
minProgress uint64
)

// NewMemberCommand returns the cobra command for "member".
Expand Down Expand Up @@ -56,6 +59,8 @@ func NewMemberAddCommand() *cobra.Command {

cc.Flags().StringVar(&memberPeerURLs, "peer-urls", "", "comma separated peer URLs for the new member.")
cc.Flags().BoolVar(&isLearner, "learner", false, "indicates if the new member is raft learner")
cc.Flags().BoolVar(&autoPromote, "auto-promote", false, "whether or not to automatically promote the member")
cc.Flags().Uint64Var(&minProgress, "min-progress", 90, "the minimum progress (0-100) this member must make before being automatically promoted")

return cc
}
Expand Down Expand Up @@ -143,7 +148,25 @@ func memberAddCommandFunc(cmd *cobra.Command, args []string) {
err error
)
if isLearner {
resp, err = cli.MemberAddAsLearner(ctx, urls)
if autoPromote {
rule := clientv3.MemberPromoteRule{
Auto: autoPromote,
}
if minProgress < 0 || minProgress > 100 {
ev := "--min-progress must be between 0 and 100"
ExitWithError(ExitBadArgs, errors.New(ev))
}
rule.Monitors = []*pb.MemberMonitor{
{
Op: pb.MemberMonitor_GREATER_EQUAL,
Threshold: minProgress,
Type: pb.MemberMonitor_PROGRESS,
},
}
resp, err = cli.MemberAddAsLearnerWithPromoteRules(ctx, urls, []clientv3.MemberPromoteRule{rule})
} else {
resp, err = cli.MemberAddAsLearner(ctx, urls)
}
} else {
resp, err = cli.MemberAdd(ctx, urls)
}
Expand Down
2 changes: 2 additions & 0 deletions etcdserver/api/etcdhttp/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ func (h *peerMemberPromoteHandler) ServeHTTP(w http.ResponseWriter, r *http.Requ
http.Error(w, err.Error(), http.StatusPreconditionFailed)
case etcdserver.ErrLearnerNotReady:
http.Error(w, err.Error(), http.StatusPreconditionFailed)
case etcdserver.ErrCannotPromote:
http.Error(w, err.Error(), http.StatusPreconditionFailed)
default:
WriteError(h.lg, w, r, err)
}
Expand Down
26 changes: 24 additions & 2 deletions etcdserver/api/membership/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ type RaftCluster struct {
// ConfigChangeContext represents a context for confChange.
type ConfigChangeContext struct {
Member
// IsPromote indicates if the config change is for promoting a learner member.
// This flag is needed because both adding a new member and promoting a learner member
// IsPromote indicates if the config change is for promoting a member to another role.
// This flag is needed because both adding a new member and promoting a member
// uses the same config change type 'ConfChangeAddNode'.
IsPromote bool `json:"isPromote"`
}
Expand Down Expand Up @@ -373,6 +373,7 @@ func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
func (c *RaftCluster) AddMember(m *Member) {
c.Lock()
defer c.Unlock()

if c.v2store != nil {
mustSaveMemberToStore(c.lg, c.v2store, m)
}
Expand Down Expand Up @@ -464,6 +465,7 @@ func (c *RaftCluster) PromoteMember(id types.ID) {
defer c.Unlock()

c.members[id].RaftAttributes.IsLearner = false

if c.v2store != nil {
mustUpdateMemberInStore(c.lg, c.v2store, c.members[id])
}
Expand Down Expand Up @@ -499,6 +501,26 @@ func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes)
)
}

func (c *RaftCluster) UpdatePromoteRules(id types.ID, rules []PromoteRule) {
m, ok := c.members[id]
if !ok {
return
}
for ridx := range m.PromoteRules {
if ridx > len(rules)-1 {
break
}
for midx := range m.PromoteRules[ridx].Monitors {
if midx > len(rules[ridx].Monitors)-1 {
break
}
m.PromoteRules[ridx].Monitors[midx].status = rules[ridx].Monitors[midx].status
m.PromoteRules[ridx].Monitors[midx].statusChangedAt = rules[ridx].Monitors[midx].statusChangedAt
m.PromoteRules[ridx].Monitors[midx].value = rules[ridx].Monitors[midx].value
}
}
}

func (c *RaftCluster) Version() *semver.Version {
c.Lock()
defer c.Unlock()
Expand Down
Loading

0 comments on commit baf146a

Please sign in to comment.