diff --git a/server/etcdserver/api/membership/cluster.go b/server/etcdserver/api/membership/cluster.go index 264aa347e054..9f486592b364 100644 --- a/server/etcdserver/api/membership/cluster.go +++ b/server/etcdserver/api/membership/cluster.go @@ -29,6 +29,7 @@ import ( "go.etcd.io/etcd/api/v3/version" "go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/pkg/v3/netutil" + "go.etcd.io/etcd/pkg/v3/notify" "go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/raft/v3/raftpb" "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" @@ -57,7 +58,8 @@ type RaftCluster struct { // removed id cannot be reused. removed map[types.ID]bool - downgradeInfo *DowngradeInfo + downgradeInfo *DowngradeInfo + versionChanged *notify.Notifier } // ConfigChangeContext represents a context for confChange. @@ -247,6 +249,10 @@ func (c *RaftCluster) SetBackend(be MembershipBackend) { c.be.MustCreateBackendBuckets() } +func (c *RaftCluster) SetVersionChangedNotifier(n *notify.Notifier) { + c.versionChanged = n +} + func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) { c.Lock() defer c.Unlock() @@ -545,6 +551,9 @@ func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*zap.Logger, *s ClusterVersionMetrics.With(prometheus.Labels{"cluster_version": version.Cluster(oldVer.String())}).Set(0) } ClusterVersionMetrics.With(prometheus.Labels{"cluster_version": version.Cluster(ver.String())}).Set(1) + if c.versionChanged != nil { + c.versionChanged.Notify() + } onSet(c.lg, ver) } diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index c491b8225951..4a9d55efa256 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -288,7 +288,8 @@ type EtcdServer struct { leadTimeMu sync.RWMutex leadElectedTime time.Time - firstCommitInTerm *notify.Notifier + firstCommitInTerm *notify.Notifier + clusterVersionChanged *notify.Notifier *AccessController } @@ -312,27 +313,29 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { heartbeat := time.Duration(cfg.TickMs) * time.Millisecond srv = &EtcdServer{ - readych: make(chan struct{}), - Cfg: cfg, - lgMu: new(sync.RWMutex), - lg: cfg.Logger, - errorc: make(chan error, 1), - v2store: b.st, - snapshotter: b.ss, - r: *b.raft.newRaftNode(b.ss), - id: b.raft.wal.id, - attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, - cluster: b.raft.cl, - stats: sstats, - lstats: lstats, - SyncTicker: time.NewTicker(500 * time.Millisecond), - peerRt: b.prt, - reqIDGen: idutil.NewGenerator(uint16(b.raft.wal.id), time.Now()), - AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist}, - consistIndex: b.ci, - firstCommitInTerm: notify.NewNotifier(), + readych: make(chan struct{}), + Cfg: cfg, + lgMu: new(sync.RWMutex), + lg: cfg.Logger, + errorc: make(chan error, 1), + v2store: b.st, + snapshotter: b.ss, + r: *b.raft.newRaftNode(b.ss), + id: b.raft.wal.id, + attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, + cluster: b.raft.cl, + stats: sstats, + lstats: lstats, + SyncTicker: time.NewTicker(500 * time.Millisecond), + peerRt: b.prt, + reqIDGen: idutil.NewGenerator(uint16(b.raft.wal.id), time.Now()), + AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist}, + consistIndex: b.ci, + firstCommitInTerm: notify.NewNotifier(), + clusterVersionChanged: notify.NewNotifier(), } serverID.With(prometheus.Labels{"server_id": b.raft.wal.id.String()}).Set(1) + srv.cluster.SetVersionChangedNotifier(srv.clusterVersionChanged) srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster) srv.be = b.be @@ -2153,6 +2156,7 @@ func (s *EtcdServer) monitorStorageVersion() { for { select { case <-time.After(monitorVersionInterval): + case <-s.clusterVersionChanged.Receive(): case <-s.stopping: return }