Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Member During Network Partition #63

Merged
merged 7 commits into from
Mar 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 58 additions & 27 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ import (
"strings"
"time"

"github.com/coreos/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
"go.etcd.io/etcd/etcdserver"
"go.etcd.io/etcd/etcdserver/etcdserverpb"
"go.uber.org/zap"
)

Expand All @@ -34,39 +35,27 @@ func Join(ctx context.Context, cfg Config) (*Cluster, error) {
}

if cfg.etcdConfig.ClusterState == embed.ClusterStateFlagExisting {
if len(cfg.InitialClusterClientUrls) == 0 {
return nil, fmt.Errorf("joining an existing cluster requires at least one client url from a member from the existing cluster")
}
initialClusterClientCfg := clientv3.Config{
Endpoints: cfg.InitialClusterClientUrls,
DialTimeout: 5 * time.Second,
}
initialClusterClient, err := clientv3.New(initialClusterClientCfg)
if err != nil {
return nil, fmt.Errorf("failed to create etcd client from config: %w", err)
}

cfg.etcdConfig.InitialCluster, err = memberAdd(ctx, initialClusterClient, *cfg.etcdConfig)
initialCluster, err := joinExistingCluster(ctx, cfg)
if err != nil {
return nil, err
}
cfg.etcdConfig.InitialCluster = initialCluster
}

e, err := startEmbeddedEtcd(ctx, cfg)
if err != nil {
return nil, err
}

clientUrls := urlsToString(cfg.etcdConfig.LCUrls)
clientCfg := clientv3.Config{
localClient, err := clientv3.New(clientv3.Config{
Endpoints: clientUrls,
DialTimeout: 5 * time.Second,
}
client, err := clientv3.New(clientCfg)
})
if err != nil {
return nil, fmt.Errorf("failed to create etcd client from config: %w", err)
}

e, err := startEmbeddedEtcd(cfg.etcdConfig)
if err != nil {
return nil, err
}

registry, err := newEtcdRegistry(ctx, clientUrls)
if err != nil {
return nil, err
Expand All @@ -82,7 +71,7 @@ func Join(ctx context.Context, cfg Config) (*Cluster, error) {

return &Cluster{
Registry: registry,
client: client,
client: localClient,
etcd: e,
localAddr: addr,
}, nil
Expand All @@ -107,13 +96,28 @@ func (c *Cluster) NewClient(serviceName string, cfg *ConnConfig) (*Client, error
return newClient(c.localAddr, serviceName, c.Registry, cfg)
}

func joinExistingCluster(ctx context.Context, cfg Config) (initalCluster string, err error) {
if len(cfg.InitialClusterClientUrls) == 0 {
return "", fmt.Errorf("joining an existing cluster requires at least one client url from a member from the existing cluster")
}
initialClusterClient, err := clientv3.New(clientv3.Config{
Endpoints: cfg.InitialClusterClientUrls,
DialTimeout: 5 * time.Second,
})
if err != nil {
return "", fmt.Errorf("failed to create etcd client from config: %w", err)
}

return memberAdd(ctx, initialClusterClient, *cfg.etcdConfig)
}

func memberAdd(ctx context.Context, client *clientv3.Client, cfg embed.Config) (string, error) {
peerUrlStrings := make([]string, len(cfg.LPUrls))
for i, url := range cfg.LPUrls {
peerUrlStrings[i] = url.String()
}

mresp, err := client.MemberAdd(ctx, peerUrlStrings)
mresp, err := client.MemberAddAsLearner(ctx, peerUrlStrings)
if err != nil {
return "", fmt.Errorf("failed to add member with peerURLs %v: %w", cfg.LPUrls, err)
}
Expand Down Expand Up @@ -148,14 +152,41 @@ func urlsToString(urls []url.URL) []string {
return urlStrings
}

func startEmbeddedEtcd(cfg *embed.Config) (*embed.Etcd, error) {
e, err := embed.StartEtcd(cfg)
func startEmbeddedEtcd(ctx context.Context, cfg Config) (*embed.Etcd, error) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

e, err := embed.StartEtcd(cfg.etcdConfig)
if err != nil {
return nil, fmt.Errorf("failed to start etcd: %w", err)
}

<-e.Server.ReadyNotify()
return e, nil
zap.S().Debugw("etcd started", "id", e.Server.ID(), "learner", e.Server.IsLearner())
if !e.Server.IsLearner() {
return e, nil
}

initialClusterClient, err := clientv3.New(clientv3.Config{
Endpoints: cfg.InitialClusterClientUrls,
DialTimeout: 5 * time.Second,
})
if err != nil {
return nil, fmt.Errorf("failed to create etcd client from config: %w", err)
}
for {
JackyChiu marked this conversation as resolved.
Show resolved Hide resolved
_, err := initialClusterClient.MemberPromote(ctx, uint64(e.Server.ID()))
if err == nil {
zap.S().Debugw("etcd learner successfully promoted", "id", e.Server.ID())
return e, nil
}
if err.Error() == etcdserver.ErrLearnerNotReady.Error() {
zap.S().Debugw("learner not ready to be promoted", "id", e.Server.ID())
time.Sleep(2 * time.Second)
continue
}
return nil, fmt.Errorf("can't promote member: %w", err)
}
}

func getIP() (string, error) {
Expand Down
70 changes: 42 additions & 28 deletions cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ import (
"os"
"path/filepath"
"testing"
"time"

"github.com/coreos/etcd/etcdserver"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"go.etcd.io/etcd/embed"
Expand Down Expand Up @@ -72,12 +70,6 @@ func (suite *ClusterSuite) TestMemberAdd() {
LCUrl, err := url.Parse("http://127.0.0.1:22379")
require.NoError(t, err)

APUrl, err := url.Parse("http://127.0.0.1:22380")
require.NoError(t, err)

ACUrl, err := url.Parse("http://127.0.0.1:22379")
require.NoError(t, err)

memberCfg := Config{
ServiceName: "testservice",
NodeName: "node2",
Expand All @@ -87,10 +79,11 @@ func (suite *ClusterSuite) TestMemberAdd() {
}
memberCfg.etcdConfig.Name = "node2"
memberCfg.etcdConfig.Dir = "tmp2"
memberCfg.etcdConfig.Logger = "zap"
memberCfg.etcdConfig.LPUrls = []url.URL{*LPUrl}
memberCfg.etcdConfig.LCUrls = []url.URL{*LCUrl}
memberCfg.etcdConfig.APUrls = []url.URL{*APUrl}
memberCfg.etcdConfig.ACUrls = []url.URL{*ACUrl}
memberCfg.etcdConfig.APUrls = []url.URL{*LPUrl}
memberCfg.etcdConfig.ACUrls = []url.URL{*LCUrl}
memberCfg.etcdConfig.ClusterState = embed.ClusterStateFlagExisting

c2, err := Join(ctx, memberCfg)
Expand All @@ -101,30 +94,16 @@ func (suite *ClusterSuite) TestMemberAdd() {
require.NoError(t, err)
require.Equal(t, 2, len(members))

time.Sleep(etcdserver.HealthInterval)

LPUrl, err = url.Parse("http://127.0.0.1:32380")
require.NoError(t, err)

LPUrl2, err := url.Parse("http://127.0.0.1:42380")
LPUrl2, err := url.Parse("http://127.0.0.1:32480")
require.NoError(t, err)

LCUrl, err = url.Parse("http://127.0.0.1:32379")
require.NoError(t, err)

LCUrl2, err := url.Parse("http://127.0.0.1:42379")
require.NoError(t, err)

APUrl, err = url.Parse("http://127.0.0.1:32380")
require.NoError(t, err)

APUrl2, err := url.Parse("http://127.0.0.1:42380")
require.NoError(t, err)

ACUrl, err = url.Parse("http://127.0.0.1:32379")
require.NoError(t, err)

ACUrl2, err := url.Parse("http://127.0.0.1:42379")
LCUrl2, err := url.Parse("http://127.0.0.1:32479")
require.NoError(t, err)

memberCfg = Config{
Expand All @@ -136,10 +115,11 @@ func (suite *ClusterSuite) TestMemberAdd() {
}
memberCfg.etcdConfig.Name = "node3"
memberCfg.etcdConfig.Dir = "tmp3"
memberCfg.etcdConfig.Logger = "zap"
memberCfg.etcdConfig.LPUrls = []url.URL{*LPUrl, *LPUrl2}
memberCfg.etcdConfig.LCUrls = []url.URL{*LCUrl, *LCUrl2}
memberCfg.etcdConfig.APUrls = []url.URL{*APUrl, *APUrl2}
memberCfg.etcdConfig.ACUrls = []url.URL{*ACUrl, *ACUrl2}
memberCfg.etcdConfig.APUrls = []url.URL{*LPUrl, *LPUrl2}
memberCfg.etcdConfig.ACUrls = []url.URL{*LCUrl, *LCUrl2}
memberCfg.etcdConfig.ClusterState = embed.ClusterStateFlagExisting

c3, err := Join(ctx, memberCfg)
Expand All @@ -149,6 +129,40 @@ func (suite *ClusterSuite) TestMemberAdd() {
members, err = c.MemberList(ctx)
require.NoError(t, err)
require.Equal(t, 3, len(members))

t.Run("test add node with one faulty node in three node cluster", func(t *testing.T) {
LPUrl, err := url.Parse("http://127.0.0.1:42380")
require.NoError(t, err)

LCUrl, err := url.Parse("http://127.0.0.1:42379")
require.NoError(t, err)

memberCfg := Config{
ServiceName: "testservice3",
NodeName: "node4",
Port: 4040,
InitialClusterClientUrls: []string{cfg.etcdConfig.LCUrls[0].String()},
etcdConfig: embed.NewConfig(),
}
memberCfg.etcdConfig.Name = "node4"
memberCfg.etcdConfig.Dir = "tmp4"
memberCfg.etcdConfig.Logger = "zap"
memberCfg.etcdConfig.LPUrls = []url.URL{*LPUrl}
memberCfg.etcdConfig.LCUrls = []url.URL{*LCUrl}
memberCfg.etcdConfig.APUrls = []url.URL{*LPUrl}
memberCfg.etcdConfig.ACUrls = []url.URL{*LCUrl}
memberCfg.etcdConfig.ClusterState = embed.ClusterStateFlagExisting
JackyChiu marked this conversation as resolved.
Show resolved Hide resolved

c3.Close()

c4, err := Join(ctx, memberCfg)
require.NoError(t, err)
defer c4.Close()

members, err := c.MemberList(ctx)
require.NoError(t, err)
require.Equal(t, 4, len(members))
})
})
}

Expand Down
4 changes: 3 additions & 1 deletion cluster/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,17 +237,19 @@ func (suite *EtcdDependentSuite) TestEtcdRegistry_nodes() {

func startTestEtcd() ([]string, func()) {
cfg := embed.NewConfig()
cfg.Logger = "zap"

tmp, err := ioutil.TempDir("", "test_etcd")
cfg.Dir = tmp
if err != nil {
log.Fatal(err)
}

e, err := startEmbeddedEtcd(cfg)
e, err := embed.StartEtcd(cfg)
if err != nil {
log.Fatal(err)
}
<-e.Server.ReadyNotify()

addr := urlsToString(cfg.LCUrls)
return addr, func() {
Expand Down
4 changes: 4 additions & 0 deletions cluster/testdata/node1.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,7 @@ initial-cluster-token: 'etcd-cluster'

# Initial cluster state ('new' or 'existing').
initial-cluster-state: 'new'

strict-reconfig-check: false

logger: 'zap'
26 changes: 2 additions & 24 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,9 @@ module github.com/edegens/ptype
go 1.13

require (
github.com/coreos/bbolt v1.3.3 // indirect
github.com/coreos/etcd v3.3.17+incompatible
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf // indirect
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect
github.com/gogo/protobuf v1.3.1 // indirect
github.com/golang/groupcache v0.0.0-20191027212112-611e8accdfc9 // indirect
github.com/google/btree v1.0.0 // indirect
github.com/google/uuid v1.1.1 // indirect
github.com/gorilla/websocket v1.4.1 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.12.1 // indirect
github.com/jonboulle/clockwork v0.1.0 // indirect
github.com/json-iterator/go v1.1.8 // indirect
github.com/prometheus/client_golang v1.2.1 // indirect
github.com/soheilhy/cmux v0.1.4 // indirect
github.com/stretchr/testify v1.4.0
github.com/tmc/grpc-websocket-proxy v0.0.0-20200122045848-3419fae592fc // indirect
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
go.etcd.io/bbolt v1.3.3 // indirect
go.etcd.io/etcd v3.3.17+incompatible
// hack to get version v3.4.4, since upstream go.mod file is broken
go.etcd.io/etcd v0.0.0-20200224211402-c65a9e2dd1fd
go.uber.org/zap v1.12.0
golang.org/x/net v0.0.0-20191105084925-a882066a44e0 // indirect
golang.org/x/time v0.0.0-20191024005414-555d28b269f0 // indirect
sigs.k8s.io/yaml v1.1.0
)
Loading