From 783c5ad2d24b0fefebd18102cbb988892f813153 Mon Sep 17 00:00:00 2001 From: Chao Chen Date: Mon, 17 May 2021 10:57:29 -0700 Subject: [PATCH 1/2] use v2 api to update cluster version --- server/etcdserver/apply_v2.go | 9 ++++-- server/etcdserver/server.go | 51 ++++++++++++++++++++++++++++---- server/etcdserver/server_test.go | 48 ++++++++++++++++++++++++++++++ 3 files changed, 101 insertions(+), 7 deletions(-) diff --git a/server/etcdserver/apply_v2.go b/server/etcdserver/apply_v2.go index f2e3d89da57..6648e3279ee 100644 --- a/server/etcdserver/apply_v2.go +++ b/server/etcdserver/apply_v2.go @@ -21,7 +21,9 @@ import ( "strconv" "time" + "github.com/coreos/go-semver/semver" "go.etcd.io/etcd/pkg/v3/pbutil" + "go.etcd.io/etcd/server/v3/etcdserver/api" "go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" @@ -92,9 +94,12 @@ func (a *applierV2store) Put(r *RequestV2) Response { // return an empty response since there is no consumer. return Response{} } - // remove v2 version set to avoid the conflict between v2 and v3. + // TODO remove v2 version set to avoid the conflict between v2 and v3 in etcd 3.6 if r.Path == membership.StoreClusterVersionKey() { - // return an empty response since there is no consumer. + if a.cluster != nil { + // persist to backend given v2store can be very stale + a.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)), api.UpdateCapability, membership.ApplyBoth) + } return Response{} } return toResponse(a.store.Set(r.Path, r.Dir, r.Val, ttlOptions)) diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 83babf95276..2c8855ec33d 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -2428,6 +2428,7 @@ func (s *EtcdServer) ClusterVersion() *semver.Version { // It updates the cluster version if all members agrees on a higher one. // It prints out log if there is a member with a higher version than the // local version. +// TODO switch to updateClusterVersionV3 in 3.6 func (s *EtcdServer) monitorVersions() { for { select { @@ -2458,27 +2459,67 @@ func (s *EtcdServer) monitorVersions() { if v != nil { verStr = v.String() } - s.GoAttach(func() { s.updateClusterVersion(verStr) }) + s.GoAttach(func() { s.updateClusterVersionV2(verStr) }) continue } if v != nil && membership.IsValidVersionChange(s.cluster.Version(), v) { - s.GoAttach(func() { s.updateClusterVersion(v.String()) }) + s.GoAttach(func() { s.updateClusterVersionV2(v.String()) }) } } } -func (s *EtcdServer) updateClusterVersion(ver string) { +func (s *EtcdServer) updateClusterVersionV2(ver string) { lg := s.Logger() if s.cluster.Version() == nil { lg.Info( - "setting up initial cluster version", + "setting up initial cluster version using v2 API", zap.String("cluster-version", version.Cluster(ver)), ) } else { lg.Info( - "updating cluster version", + "updating cluster version using v2 API", + zap.String("from", version.Cluster(s.cluster.Version().String())), + zap.String("to", version.Cluster(ver)), + ) + } + + req := pb.Request{ + Method: "PUT", + Path: membership.StoreClusterVersionKey(), + Val: ver, + } + + ctx, cancel := context.WithTimeout(s.ctx, s.Cfg.ReqTimeout()) + _, err := s.Do(ctx, req) + cancel() + + switch err { + case nil: + lg.Info("cluster version is updated", zap.String("cluster-version", version.Cluster(ver))) + return + + case ErrStopped: + lg.Warn("aborting cluster version update; server is stopped", zap.Error(err)) + return + + default: + lg.Warn("failed to update cluster version", zap.Error(err)) + } +} + +func (s *EtcdServer) updateClusterVersionV3(ver string) { + lg := s.Logger() + + if s.cluster.Version() == nil { + lg.Info( + "setting up initial cluster version using v3 API", + zap.String("cluster-version", version.Cluster(ver)), + ) + } else { + lg.Info( + "updating cluster version using v3 API", zap.String("from", version.Cluster(s.cluster.Version().String())), zap.String("to", version.Cluster(ver)), ) diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index 07a22e2b0ea..13ec0dd8901 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -22,6 +22,7 @@ import ( "math" "net/http" "os" + "path" "path/filepath" "reflect" "sync" @@ -1716,6 +1717,53 @@ func TestPublishV3Retry(t *testing.T) { <-ch } +func TestUpdateVersion(t *testing.T) { + n := newNodeRecorder() + ch := make(chan interface{}, 1) + // simulate that request has gone through consensus + ch <- Response{} + w := wait.NewWithResponse(ch) + ctx, cancel := context.WithCancel(context.TODO()) + srv := &EtcdServer{ + lgMu: new(sync.RWMutex), + lg: zap.NewExample(), + id: 1, + Cfg: config.ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, + r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}), + attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}}, + cluster: &membership.RaftCluster{}, + w: w, + reqIDGen: idutil.NewGenerator(0, time.Time{}), + SyncTicker: &time.Ticker{}, + + ctx: ctx, + cancel: cancel, + } + srv.updateClusterVersionV2("2.0.0") + + action := n.Action() + if len(action) != 1 { + t.Fatalf("len(action) = %d, want 1", len(action)) + } + if action[0].Name != "Propose" { + t.Fatalf("action = %s, want Propose", action[0].Name) + } + data := action[0].Params[0].([]byte) + var r pb.Request + if err := r.Unmarshal(data); err != nil { + t.Fatalf("unmarshal request error: %v", err) + } + if r.Method != "PUT" { + t.Errorf("method = %s, want PUT", r.Method) + } + if wpath := path.Join(StoreClusterPrefix, "version"); r.Path != wpath { + t.Errorf("path = %s, want %s", r.Path, wpath) + } + if r.Val != "2.0.0" { + t.Errorf("val = %s, want %s", r.Val, "2.0.0") + } +} + func TestStopNotify(t *testing.T) { s := &EtcdServer{ lgMu: new(sync.RWMutex), From 8e9b77a2c7bc7cbeec732e3c3489af9c16c1655f Mon Sep 17 00:00:00 2001 From: Chao Chen Date: Mon, 17 May 2021 11:46:14 -0700 Subject: [PATCH 2/2] Update CHANGELOG-3.5.md --- CHANGELOG-3.5.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG-3.5.md b/CHANGELOG-3.5.md index b60a08e5c79..e40a847d091 100644 --- a/CHANGELOG-3.5.md +++ b/CHANGELOG-3.5.md @@ -117,6 +117,7 @@ Note that any `etcd_debugging_*` metrics are experimental and subject to change. - `etcd --experimental-backend-bbolt-freelist-type` has been deprecated. - Support [downgrade API](https://github.com/etcd-io/etcd/pull/11715). - Deprecate v2 apply on cluster version. [Use v3 request to set cluster version and recover cluster version from v3 backend](https://github.com/etcd-io/etcd/pull/11427). +- [Use v2 api to update cluster version to support mixed version cluster during upgrade](https://github.com/etcd-io/etcd/pull/12988). - [Fix corruption bug in defrag](https://github.com/etcd-io/etcd/pull/11613). - Fix [quorum protection logic when promoting a learner](https://github.com/etcd-io/etcd/pull/11640). - Improve [peer corruption checker](https://github.com/etcd-io/etcd/pull/11621) to work when peer mTLS is enabled.