From 166f2c27eb9dc6993dcb116eb16df881c4144c45 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Mon, 22 Jul 2019 10:57:35 -0700 Subject: [PATCH] clientv3/balancer: refactor refactor + remove unused Signed-off-by: Gyuho Lee --- clientv3/balancer/balancer.go | 113 +-- clientv3/balancer/config.go | 36 - clientv3/balancer/connectivity.go | 58 -- .../balancer/connectivity/connectivity.go | 89 +++ clientv3/balancer/doc.go | 16 - clientv3/balancer/grpc1.7-health.go | 657 ------------------ clientv3/balancer/grpc1.7-health_test.go | 297 -------- clientv3/balancer/picker/err.go | 11 +- clientv3/balancer/picker/picker.go | 75 ++ clientv3/balancer/picker/picker_policy.go | 49 -- .../balancer/picker/roundrobin_balanced.go | 33 +- clientv3/balancer/utils.go | 4 +- clientv3/client.go | 7 +- clientv3/config.go | 2 + 14 files changed, 269 insertions(+), 1178 deletions(-) delete mode 100644 clientv3/balancer/config.go delete mode 100644 clientv3/balancer/connectivity.go create mode 100644 clientv3/balancer/connectivity/connectivity.go delete mode 100644 clientv3/balancer/doc.go delete mode 100644 clientv3/balancer/grpc1.7-health.go delete mode 100644 clientv3/balancer/grpc1.7-health_test.go delete mode 100644 clientv3/balancer/picker/picker_policy.go diff --git a/clientv3/balancer/balancer.go b/clientv3/balancer/balancer.go index c39702ec47c1..5406a9ff64b3 100644 --- a/clientv3/balancer/balancer.go +++ b/clientv3/balancer/balancer.go @@ -12,24 +12,45 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Package balancer implements client balancer. package balancer import ( - "fmt" "strconv" "sync" "time" + "go.etcd.io/etcd/clientv3/balancer/connectivity" "go.etcd.io/etcd/clientv3/balancer/picker" "go.uber.org/zap" "google.golang.org/grpc/balancer" - "google.golang.org/grpc/connectivity" + grpcconnectivity "google.golang.org/grpc/connectivity" "google.golang.org/grpc/resolver" _ "google.golang.org/grpc/resolver/dns" // register DNS resolver _ "google.golang.org/grpc/resolver/passthrough" // register passthrough resolver ) +// Config defines balancer configurations. +type Config struct { + // Policy configures balancer policy. + Policy picker.Policy + + // Picker implements gRPC picker. + // Leave empty if "Policy" field is not custom. + // TODO: currently custom policy is not supported. + // Picker picker.Picker + + // Name defines an additional name for balancer. + // Useful for balancer testing to avoid register conflicts. + // If empty, defaults to policy name. + Name string + + // Logger configures balancer logging. + // If nil, logs are discarded. + Logger *zap.Logger +} + // RegisterBuilder creates and registers a builder. Since this function calls balancer.Register, it // must be invoked at initialization time. func RegisterBuilder(cfg Config) { @@ -59,13 +80,13 @@ func (b *builder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balan addrToSc: make(map[resolver.Address]balancer.SubConn), scToAddr: make(map[balancer.SubConn]resolver.Address), - scToSt: make(map[balancer.SubConn]connectivity.State), + scToSt: make(map[balancer.SubConn]grpcconnectivity.State), - currentConn: nil, - csEvltr: &connectivityStateEvaluator{}, + currentConn: nil, + connectivityRecorder: connectivity.New(), // initialize picker always returns "ErrNoSubConnAvailable" - Picker: picker.NewErr(balancer.ErrNoSubConnAvailable), + picker: picker.NewErr(balancer.ErrNoSubConnAvailable), } if bb.lg == nil { bb.lg = zap.NewNop() @@ -112,13 +133,12 @@ type baseBalancer struct { addrToSc map[resolver.Address]balancer.SubConn scToAddr map[balancer.SubConn]resolver.Address - scToSt map[balancer.SubConn]connectivity.State + scToSt map[balancer.SubConn]grpcconnectivity.State - currentConn balancer.ClientConn - currentState connectivity.State - csEvltr *connectivityStateEvaluator + currentConn balancer.ClientConn + connectivityRecorder connectivity.Recorder - picker.Picker + picker picker.Picker } // HandleResolvedAddrs implements "grpc/balancer.Balancer" interface. @@ -128,7 +148,11 @@ func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) bb.lg.Warn("HandleResolvedAddrs called with error", zap.String("balancer-id", bb.id), zap.Error(err)) return } - bb.lg.Info("resolved", zap.String("balancer-id", bb.id), zap.Strings("addresses", addrsToStrings(addrs))) + bb.lg.Info("resolved", + zap.String("picker", bb.picker.String()), + zap.String("balancer-id", bb.id), + zap.Strings("addresses", addrsToStrings(addrs)), + ) bb.mu.Lock() defer bb.mu.Unlock() @@ -139,12 +163,13 @@ func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) if _, ok := bb.addrToSc[addr]; !ok { sc, err := bb.currentConn.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{}) if err != nil { - bb.lg.Warn("NewSubConn failed", zap.String("balancer-id", bb.id), zap.Error(err), zap.String("address", addr.Addr)) + bb.lg.Warn("NewSubConn failed", zap.String("picker", bb.picker.String()), zap.String("balancer-id", bb.id), zap.Error(err), zap.String("address", addr.Addr)) continue } + bb.lg.Info("created subconn", zap.String("address", addr.Addr)) bb.addrToSc[addr] = sc bb.scToAddr[sc] = addr - bb.scToSt[sc] = connectivity.Idle + bb.scToSt[sc] = grpcconnectivity.Idle sc.Connect() } } @@ -157,6 +182,7 @@ func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) bb.lg.Info( "removed subconn", + zap.String("picker", bb.picker.String()), zap.String("balancer-id", bb.id), zap.String("address", addr.Addr), zap.String("subconn", scToString(sc)), @@ -171,7 +197,7 @@ func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) } // HandleSubConnStateChange implements "grpc/balancer.Balancer" interface. -func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) { +func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s grpcconnectivity.State) { bb.mu.Lock() defer bb.mu.Unlock() @@ -179,8 +205,10 @@ func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connecti if !ok { bb.lg.Warn( "state change for an unknown subconn", + zap.String("picker", bb.picker.String()), zap.String("balancer-id", bb.id), zap.String("subconn", scToString(sc)), + zap.Int("subconn-size", len(bb.scToAddr)), zap.String("state", s.String()), ) return @@ -188,9 +216,11 @@ func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connecti bb.lg.Info( "state changed", + zap.String("picker", bb.picker.String()), zap.String("balancer-id", bb.id), - zap.Bool("connected", s == connectivity.Ready), + zap.Bool("connected", s == grpcconnectivity.Ready), zap.String("subconn", scToString(sc)), + zap.Int("subconn-size", len(bb.scToAddr)), zap.String("address", bb.scToAddr[sc].Addr), zap.String("old-state", old.String()), zap.String("new-state", s.String()), @@ -198,68 +228,63 @@ func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connecti bb.scToSt[sc] = s switch s { - case connectivity.Idle: + case grpcconnectivity.Idle: sc.Connect() - case connectivity.Shutdown: + case grpcconnectivity.Shutdown: // When an address was removed by resolver, b called RemoveSubConn but // kept the sc's state in scToSt. Remove state for this sc here. delete(bb.scToAddr, sc) delete(bb.scToSt, sc) } - oldAggrState := bb.currentState - bb.currentState = bb.csEvltr.recordTransition(old, s) + oldAggrState := bb.connectivityRecorder.GetCurrentState() + bb.connectivityRecorder.RecordTransition(old, s) // Regenerate picker when one of the following happens: // - this sc became ready from not-ready // - this sc became not-ready from ready // - the aggregated state of balancer became TransientFailure from non-TransientFailure // - the aggregated state of balancer became non-TransientFailure from TransientFailure - if (s == connectivity.Ready) != (old == connectivity.Ready) || - (bb.currentState == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) { - bb.regeneratePicker() + if (s == grpcconnectivity.Ready) != (old == grpcconnectivity.Ready) || + (bb.connectivityRecorder.GetCurrentState() == grpcconnectivity.TransientFailure) != (oldAggrState == grpcconnectivity.TransientFailure) { + bb.updatePicker() } - bb.currentConn.UpdateBalancerState(bb.currentState, bb.Picker) + bb.currentConn.UpdateBalancerState(bb.connectivityRecorder.GetCurrentState(), bb.picker) } -func (bb *baseBalancer) regeneratePicker() { - if bb.currentState == connectivity.TransientFailure { +func (bb *baseBalancer) updatePicker() { + if bb.connectivityRecorder.GetCurrentState() == grpcconnectivity.TransientFailure { + bb.picker = picker.NewErr(balancer.ErrTransientFailure) bb.lg.Info( - "generated transient error picker", + "updated picker to transient error picker", + zap.String("picker", bb.picker.String()), zap.String("balancer-id", bb.id), zap.String("policy", bb.policy.String()), ) - bb.Picker = picker.NewErr(balancer.ErrTransientFailure) return } // only pass ready subconns to picker - scs := make([]balancer.SubConn, 0) - addrToSc := make(map[resolver.Address]balancer.SubConn) scToAddr := make(map[balancer.SubConn]resolver.Address) for addr, sc := range bb.addrToSc { - if st, ok := bb.scToSt[sc]; ok && st == connectivity.Ready { - scs = append(scs, sc) - addrToSc[addr] = sc + if st, ok := bb.scToSt[sc]; ok && st == grpcconnectivity.Ready { scToAddr[sc] = addr } } - switch bb.policy { - case picker.RoundrobinBalanced: - bb.Picker = picker.NewRoundrobinBalanced(bb.lg, scs, addrToSc, scToAddr) - - default: - panic(fmt.Errorf("invalid balancer picker policy (%d)", bb.policy)) - } - + bb.picker = picker.New(picker.Config{ + Policy: bb.policy, + Logger: bb.lg, + SubConnToResolverAddress: scToAddr, + }) bb.lg.Info( - "generated picker", + "updated picker", + zap.String("picker", bb.picker.String()), zap.String("balancer-id", bb.id), zap.String("policy", bb.policy.String()), - zap.Strings("subconn-ready", scsToStrings(addrToSc)), - zap.Int("subconn-size", len(addrToSc)), + zap.Strings("subconn-ready", scsToStrings(scToAddr)), + zap.Int("subconn-size", len(scToAddr)), ) } diff --git a/clientv3/balancer/config.go b/clientv3/balancer/config.go deleted file mode 100644 index 0339a84d08f3..000000000000 --- a/clientv3/balancer/config.go +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright 2018 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package balancer - -import ( - "go.etcd.io/etcd/clientv3/balancer/picker" - - "go.uber.org/zap" -) - -// Config defines balancer configurations. -type Config struct { - // Policy configures balancer policy. - Policy picker.Policy - - // Name defines an additional name for balancer. - // Useful for balancer testing to avoid register conflicts. - // If empty, defaults to policy name. - Name string - - // Logger configures balancer logging. - // If nil, logs are discarded. - Logger *zap.Logger -} diff --git a/clientv3/balancer/connectivity.go b/clientv3/balancer/connectivity.go deleted file mode 100644 index 6cdeb3fa3a4b..000000000000 --- a/clientv3/balancer/connectivity.go +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright 2018 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package balancer - -import "google.golang.org/grpc/connectivity" - -// connectivityStateEvaluator gets updated by addrConns when their -// states transition, based on which it evaluates the state of -// ClientConn. -type connectivityStateEvaluator struct { - numReady uint64 // Number of addrConns in ready state. - numConnecting uint64 // Number of addrConns in connecting state. - numTransientFailure uint64 // Number of addrConns in transientFailure. -} - -// recordTransition records state change happening in every subConn and based on -// that it evaluates what aggregated state should be. -// It can only transition between Ready, Connecting and TransientFailure. Other states, -// Idle and Shutdown are transitioned into by ClientConn; in the beginning of the connection -// before any subConn is created ClientConn is in idle state. In the end when ClientConn -// closes it is in Shutdown state. -// -// recordTransition should only be called synchronously from the same goroutine. -func (cse *connectivityStateEvaluator) recordTransition(oldState, newState connectivity.State) connectivity.State { - // Update counters. - for idx, state := range []connectivity.State{oldState, newState} { - updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new. - switch state { - case connectivity.Ready: - cse.numReady += updateVal - case connectivity.Connecting: - cse.numConnecting += updateVal - case connectivity.TransientFailure: - cse.numTransientFailure += updateVal - } - } - - // Evaluate. - if cse.numReady > 0 { - return connectivity.Ready - } - if cse.numConnecting > 0 { - return connectivity.Connecting - } - return connectivity.TransientFailure -} diff --git a/clientv3/balancer/connectivity/connectivity.go b/clientv3/balancer/connectivity/connectivity.go new file mode 100644 index 000000000000..2bb3453ad7ad --- /dev/null +++ b/clientv3/balancer/connectivity/connectivity.go @@ -0,0 +1,89 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package connectivity implements client connectivity operations. +package connectivity + +import ( + "sync" + + "google.golang.org/grpc/connectivity" +) + +// Recorder records gRPC connectivity. +type Recorder interface { + GetCurrentState() connectivity.State + RecordTransition(oldState, newState connectivity.State) +} + +// New returns a new Recorder. +func New() Recorder { + return &recorder{} +} + +// recorder takes the connectivity states of multiple SubConns +// and returns one aggregated connectivity state. +// ref. https://github.com/grpc/grpc-go/blob/master/balancer/balancer.go +type recorder struct { + mu sync.RWMutex + + cur connectivity.State + + numReady uint64 // Number of addrConns in ready state. + numConnecting uint64 // Number of addrConns in connecting state. + numTransientFailure uint64 // Number of addrConns in transientFailure. +} + +func (rc *recorder) GetCurrentState() (state connectivity.State) { + rc.mu.RLock() + state = rc.cur + rc.mu.RUnlock() + return state +} + +// RecordTransition records state change happening in subConn and based on that +// it evaluates what aggregated state should be. +// +// - If at least one SubConn in Ready, the aggregated state is Ready; +// - Else if at least one SubConn in Connecting, the aggregated state is Connecting; +// - Else the aggregated state is TransientFailure. +// +// Idle and Shutdown are not considered. +// +// ref. https://github.com/grpc/grpc-go/blob/master/balancer/balancer.go +func (rc *recorder) RecordTransition(oldState, newState connectivity.State) { + rc.mu.Lock() + defer rc.mu.Unlock() + + for idx, state := range []connectivity.State{oldState, newState} { + updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new. + switch state { + case connectivity.Ready: + rc.numReady += updateVal + case connectivity.Connecting: + rc.numConnecting += updateVal + case connectivity.TransientFailure: + rc.numTransientFailure += updateVal + } + } + + switch { // must be exclusive, no overlap + case rc.numReady > 0: + rc.cur = connectivity.Ready + case rc.numConnecting > 0: + rc.cur = connectivity.Connecting + default: + rc.cur = connectivity.TransientFailure + } +} diff --git a/clientv3/balancer/doc.go b/clientv3/balancer/doc.go deleted file mode 100644 index 45af5e9d1039..000000000000 --- a/clientv3/balancer/doc.go +++ /dev/null @@ -1,16 +0,0 @@ -// Copyright 2018 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package balancer implements client balancer. -package balancer diff --git a/clientv3/balancer/grpc1.7-health.go b/clientv3/balancer/grpc1.7-health.go deleted file mode 100644 index 2153767354de..000000000000 --- a/clientv3/balancer/grpc1.7-health.go +++ /dev/null @@ -1,657 +0,0 @@ -// Copyright 2018 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package balancer - -import ( - "context" - "errors" - "io/ioutil" - "net/url" - "strings" - "sync" - "time" - - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/grpclog" - healthpb "google.golang.org/grpc/health/grpc_health_v1" - "google.golang.org/grpc/status" -) - -// TODO: replace with something better -var lg = grpclog.NewLoggerV2(ioutil.Discard, ioutil.Discard, ioutil.Discard) - -const ( - minHealthRetryDuration = 3 * time.Second - unknownService = "unknown service grpc.health.v1.Health" -) - -// ErrNoAddrAvailable is returned by Get() when the balancer does not have -// any active connection to endpoints at the time. -// This error is returned only when opts.BlockingWait is true. -var ErrNoAddrAvailable = status.Error(codes.Unavailable, "there is no address available") - -type NotifyMsg int - -const ( - NotifyReset NotifyMsg = iota - NotifyNext -) - -// GRPC17Health does the bare minimum to expose multiple eps -// to the grpc reconnection code path -type GRPC17Health struct { - // addrs are the client's endpoint addresses for grpc - addrs []grpc.Address - - // eps holds the raw endpoints from the client - eps []string - - // notifyCh notifies grpc of the set of addresses for connecting - notifyCh chan []grpc.Address - - // readyc closes once the first connection is up - readyc chan struct{} - readyOnce sync.Once - - // healthCheck checks an endpoint's health. - healthCheck func(ep string) (bool, error) - healthCheckTimeout time.Duration - - unhealthyMu sync.RWMutex - unhealthyHostPorts map[string]time.Time - - // mu protects all fields below. - mu sync.RWMutex - - // upc closes when pinAddr transitions from empty to non-empty or the balancer closes. - upc chan struct{} - - // downc closes when grpc calls down() on pinAddr - downc chan struct{} - - // stopc is closed to signal updateNotifyLoop should stop. - stopc chan struct{} - stopOnce sync.Once - wg sync.WaitGroup - - // donec closes when all goroutines are exited - donec chan struct{} - - // updateAddrsC notifies updateNotifyLoop to update addrs. - updateAddrsC chan NotifyMsg - - // grpc issues TLS cert checks using the string passed into dial so - // that string must be the host. To recover the full scheme://host URL, - // have a map from hosts to the original endpoint. - hostPort2ep map[string]string - - // pinAddr is the currently pinned address; set to the empty string on - // initialization and shutdown. - pinAddr string - - closed bool -} - -// DialFunc defines gRPC dial function. -type DialFunc func(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) - -// NewGRPC17Health returns a new health balancer with gRPC v1.7. -func NewGRPC17Health( - eps []string, - timeout time.Duration, - dialFunc DialFunc, -) *GRPC17Health { - notifyCh := make(chan []grpc.Address) - addrs := eps2addrs(eps) - hb := &GRPC17Health{ - addrs: addrs, - eps: eps, - notifyCh: notifyCh, - readyc: make(chan struct{}), - healthCheck: func(ep string) (bool, error) { return grpcHealthCheck(ep, dialFunc) }, - unhealthyHostPorts: make(map[string]time.Time), - upc: make(chan struct{}), - stopc: make(chan struct{}), - downc: make(chan struct{}), - donec: make(chan struct{}), - updateAddrsC: make(chan NotifyMsg), - hostPort2ep: getHostPort2ep(eps), - } - if timeout < minHealthRetryDuration { - timeout = minHealthRetryDuration - } - hb.healthCheckTimeout = timeout - - close(hb.downc) - go hb.updateNotifyLoop() - hb.wg.Add(1) - go func() { - defer hb.wg.Done() - hb.updateUnhealthy() - }() - return hb -} - -func (b *GRPC17Health) Start(target string, config grpc.BalancerConfig) error { return nil } - -func (b *GRPC17Health) ConnectNotify() <-chan struct{} { - b.mu.Lock() - defer b.mu.Unlock() - return b.upc -} - -func (b *GRPC17Health) UpdateAddrsC() chan NotifyMsg { return b.updateAddrsC } -func (b *GRPC17Health) StopC() chan struct{} { return b.stopc } - -func (b *GRPC17Health) Ready() <-chan struct{} { return b.readyc } - -func (b *GRPC17Health) Endpoint(hostPort string) string { - b.mu.RLock() - defer b.mu.RUnlock() - return b.hostPort2ep[hostPort] -} - -func (b *GRPC17Health) Pinned() string { - b.mu.RLock() - defer b.mu.RUnlock() - return b.pinAddr -} - -func (b *GRPC17Health) HostPortError(hostPort string, err error) { - if b.Endpoint(hostPort) == "" { - lg.Infof("clientv3/balancer: %q is stale (skip marking as unhealthy on %q)", hostPort, err.Error()) - return - } - - b.unhealthyMu.Lock() - b.unhealthyHostPorts[hostPort] = time.Now() - b.unhealthyMu.Unlock() - lg.Infof("clientv3/balancer: %q is marked unhealthy (%q)", hostPort, err.Error()) -} - -func (b *GRPC17Health) removeUnhealthy(hostPort, msg string) { - if b.Endpoint(hostPort) == "" { - lg.Infof("clientv3/balancer: %q was not in unhealthy (%q)", hostPort, msg) - return - } - - b.unhealthyMu.Lock() - delete(b.unhealthyHostPorts, hostPort) - b.unhealthyMu.Unlock() - lg.Infof("clientv3/balancer: %q is removed from unhealthy (%q)", hostPort, msg) -} - -func (b *GRPC17Health) countUnhealthy() (count int) { - b.unhealthyMu.RLock() - count = len(b.unhealthyHostPorts) - b.unhealthyMu.RUnlock() - return count -} - -func (b *GRPC17Health) isUnhealthy(hostPort string) (unhealthy bool) { - b.unhealthyMu.RLock() - _, unhealthy = b.unhealthyHostPorts[hostPort] - b.unhealthyMu.RUnlock() - return unhealthy -} - -func (b *GRPC17Health) cleanupUnhealthy() { - b.unhealthyMu.Lock() - for k, v := range b.unhealthyHostPorts { - if time.Since(v) > b.healthCheckTimeout { - delete(b.unhealthyHostPorts, k) - lg.Infof("clientv3/balancer: removed %q from unhealthy after %v", k, b.healthCheckTimeout) - } - } - b.unhealthyMu.Unlock() -} - -func (b *GRPC17Health) liveAddrs() ([]grpc.Address, map[string]struct{}) { - unhealthyCnt := b.countUnhealthy() - - b.mu.RLock() - defer b.mu.RUnlock() - - hbAddrs := b.addrs - if len(b.addrs) == 1 || unhealthyCnt == 0 || unhealthyCnt == len(b.addrs) { - liveHostPorts := make(map[string]struct{}, len(b.hostPort2ep)) - for k := range b.hostPort2ep { - liveHostPorts[k] = struct{}{} - } - return hbAddrs, liveHostPorts - } - - addrs := make([]grpc.Address, 0, len(b.addrs)-unhealthyCnt) - liveHostPorts := make(map[string]struct{}, len(addrs)) - for _, addr := range b.addrs { - if !b.isUnhealthy(addr.Addr) { - addrs = append(addrs, addr) - liveHostPorts[addr.Addr] = struct{}{} - } - } - return addrs, liveHostPorts -} - -func (b *GRPC17Health) updateUnhealthy() { - for { - select { - case <-time.After(b.healthCheckTimeout): - b.cleanupUnhealthy() - pinned := b.Pinned() - if pinned == "" || b.isUnhealthy(pinned) { - select { - case b.updateAddrsC <- NotifyNext: - case <-b.stopc: - return - } - } - case <-b.stopc: - return - } - } -} - -// NeedUpdate returns true if all connections are down or -// addresses do not include current pinned address. -func (b *GRPC17Health) NeedUpdate() bool { - // updating notifyCh can trigger new connections, - // need update addrs if all connections are down - // or addrs does not include pinAddr. - b.mu.RLock() - update := !hasAddr(b.addrs, b.pinAddr) - b.mu.RUnlock() - return update -} - -func (b *GRPC17Health) UpdateAddrs(eps ...string) { - np := getHostPort2ep(eps) - - b.mu.Lock() - defer b.mu.Unlock() - - match := len(np) == len(b.hostPort2ep) - if match { - for k, v := range np { - if b.hostPort2ep[k] != v { - match = false - break - } - } - } - if match { - // same endpoints, so no need to update address - return - } - - b.hostPort2ep = np - b.addrs, b.eps = eps2addrs(eps), eps - - b.unhealthyMu.Lock() - b.unhealthyHostPorts = make(map[string]time.Time) - b.unhealthyMu.Unlock() -} - -func (b *GRPC17Health) Next() { - b.mu.RLock() - downc := b.downc - b.mu.RUnlock() - select { - case b.updateAddrsC <- NotifyNext: - case <-b.stopc: - } - // wait until disconnect so new RPCs are not issued on old connection - select { - case <-downc: - case <-b.stopc: - } -} - -func (b *GRPC17Health) updateNotifyLoop() { - defer close(b.donec) - - for { - b.mu.RLock() - upc, downc, addr := b.upc, b.downc, b.pinAddr - b.mu.RUnlock() - // downc or upc should be closed - select { - case <-downc: - downc = nil - default: - } - select { - case <-upc: - upc = nil - default: - } - switch { - case downc == nil && upc == nil: - // stale - select { - case <-b.stopc: - return - default: - } - case downc == nil: - b.notifyAddrs(NotifyReset) - select { - case <-upc: - case msg := <-b.updateAddrsC: - b.notifyAddrs(msg) - case <-b.stopc: - return - } - case upc == nil: - select { - // close connections that are not the pinned address - case b.notifyCh <- []grpc.Address{{Addr: addr}}: - case <-downc: - case <-b.stopc: - return - } - select { - case <-downc: - b.notifyAddrs(NotifyReset) - case msg := <-b.updateAddrsC: - b.notifyAddrs(msg) - case <-b.stopc: - return - } - } - } -} - -func (b *GRPC17Health) notifyAddrs(msg NotifyMsg) { - if msg == NotifyNext { - select { - case b.notifyCh <- []grpc.Address{}: - case <-b.stopc: - return - } - } - b.mu.RLock() - pinAddr := b.pinAddr - downc := b.downc - b.mu.RUnlock() - addrs, hostPorts := b.liveAddrs() - - var waitDown bool - if pinAddr != "" { - _, ok := hostPorts[pinAddr] - waitDown = !ok - } - - select { - case b.notifyCh <- addrs: - if waitDown { - select { - case <-downc: - case <-b.stopc: - } - } - case <-b.stopc: - } -} - -func (b *GRPC17Health) Up(addr grpc.Address) func(error) { - if !b.mayPin(addr) { - return func(err error) {} - } - - b.mu.Lock() - defer b.mu.Unlock() - - // gRPC might call Up after it called Close. We add this check - // to "fix" it up at application layer. Otherwise, will panic - // if b.upc is already closed. - if b.closed { - return func(err error) {} - } - - // gRPC might call Up on a stale address. - // Prevent updating pinAddr with a stale address. - if !hasAddr(b.addrs, addr.Addr) { - return func(err error) {} - } - - if b.pinAddr != "" { - lg.Infof("clientv3/balancer: %q is up but not pinned (already pinned %q)", addr.Addr, b.pinAddr) - return func(err error) {} - } - - // notify waiting Get()s and pin first connected address - close(b.upc) - b.downc = make(chan struct{}) - b.pinAddr = addr.Addr - lg.Infof("clientv3/balancer: pin %q", addr.Addr) - - // notify client that a connection is up - b.readyOnce.Do(func() { close(b.readyc) }) - - return func(err error) { - // If connected to a black hole endpoint or a killed server, the gRPC ping - // timeout will induce a network I/O error, and retrying until success; - // finding healthy endpoint on retry could take several timeouts and redials. - // To avoid wasting retries, gray-list unhealthy endpoints. - b.HostPortError(addr.Addr, err) - - b.mu.Lock() - b.upc = make(chan struct{}) - close(b.downc) - b.pinAddr = "" - b.mu.Unlock() - lg.Infof("clientv3/balancer: unpin %q (%q)", addr.Addr, err.Error()) - } -} - -func (b *GRPC17Health) mayPin(addr grpc.Address) bool { - if b.Endpoint(addr.Addr) == "" { // stale host:port - return false - } - - b.unhealthyMu.RLock() - unhealthyCnt := len(b.unhealthyHostPorts) - failedTime, bad := b.unhealthyHostPorts[addr.Addr] - b.unhealthyMu.RUnlock() - - b.mu.RLock() - skip := len(b.addrs) == 1 || unhealthyCnt == 0 || len(b.addrs) == unhealthyCnt - b.mu.RUnlock() - if skip || !bad { - return true - } - - // prevent isolated member's endpoint from being infinitely retried, as follows: - // 1. keepalive pings detects GoAway with http2.ErrCodeEnhanceYourCalm - // 2. balancer 'Up' unpins with grpc: failed with network I/O error - // 3. grpc-healthcheck still SERVING, thus retry to pin - // instead, return before grpc-healthcheck if failed within healthcheck timeout - if elapsed := time.Since(failedTime); elapsed < b.healthCheckTimeout { - lg.Infof("clientv3/balancer: %q is up but not pinned (failed %v ago, require minimum %v after failure)", addr.Addr, elapsed, b.healthCheckTimeout) - return false - } - - if ok, _ := b.healthCheck(addr.Addr); ok { - b.removeUnhealthy(addr.Addr, "health check success") - return true - } - - b.HostPortError(addr.Addr, errors.New("health check failed")) - return false -} - -func (b *GRPC17Health) Get(ctx context.Context, opts grpc.BalancerGetOptions) (grpc.Address, func(), error) { - var ( - addr string - closed bool - ) - - // If opts.BlockingWait is false (for fail-fast RPCs), it should return - // an address it has notified via Notify immediately instead of blocking. - if !opts.BlockingWait { - b.mu.RLock() - closed = b.closed - addr = b.pinAddr - b.mu.RUnlock() - if closed { - return grpc.Address{Addr: ""}, nil, grpc.ErrClientConnClosing - } - if addr == "" { - return grpc.Address{Addr: ""}, nil, ErrNoAddrAvailable - } - return grpc.Address{Addr: addr}, func() {}, nil - } - - for { - b.mu.RLock() - ch := b.upc - b.mu.RUnlock() - select { - case <-ch: - case <-b.donec: - return grpc.Address{Addr: ""}, nil, grpc.ErrClientConnClosing - case <-ctx.Done(): - return grpc.Address{Addr: ""}, nil, ctx.Err() - } - b.mu.RLock() - closed = b.closed - addr = b.pinAddr - b.mu.RUnlock() - // Close() which sets b.closed = true can be called before Get(), Get() must exit if balancer is closed. - if closed { - return grpc.Address{Addr: ""}, nil, grpc.ErrClientConnClosing - } - if addr != "" { - break - } - } - return grpc.Address{Addr: addr}, func() {}, nil -} - -func (b *GRPC17Health) Notify() <-chan []grpc.Address { return b.notifyCh } - -func (b *GRPC17Health) Close() error { - b.mu.Lock() - // In case gRPC calls close twice. TODO: remove the checking - // when we are sure that gRPC wont call close twice. - if b.closed { - b.mu.Unlock() - <-b.donec - return nil - } - b.closed = true - b.stopOnce.Do(func() { close(b.stopc) }) - b.pinAddr = "" - - // In the case of following scenario: - // 1. upc is not closed; no pinned address - // 2. client issues an RPC, calling invoke(), which calls Get(), enters for loop, blocks - // 3. client.conn.Close() calls balancer.Close(); closed = true - // 4. for loop in Get() never exits since ctx is the context passed in by the client and may not be canceled - // we must close upc so Get() exits from blocking on upc - select { - case <-b.upc: - default: - // terminate all waiting Get()s - close(b.upc) - } - - b.mu.Unlock() - b.wg.Wait() - - // wait for updateNotifyLoop to finish - <-b.donec - close(b.notifyCh) - - return nil -} - -func grpcHealthCheck(ep string, dialFunc func(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error)) (bool, error) { - conn, err := dialFunc(ep) - if err != nil { - return false, err - } - defer conn.Close() - cli := healthpb.NewHealthClient(conn) - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - resp, err := cli.Check(ctx, &healthpb.HealthCheckRequest{}) - cancel() - if err != nil { - if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable { - if s.Message() == unknownService { // etcd < v3.3.0 - return true, nil - } - } - return false, err - } - return resp.Status == healthpb.HealthCheckResponse_SERVING, nil -} - -func hasAddr(addrs []grpc.Address, targetAddr string) bool { - for _, addr := range addrs { - if targetAddr == addr.Addr { - return true - } - } - return false -} - -func getHost(ep string) string { - url, uerr := url.Parse(ep) - if uerr != nil || !strings.Contains(ep, "://") { - return ep - } - return url.Host -} - -func eps2addrs(eps []string) []grpc.Address { - addrs := make([]grpc.Address, len(eps)) - for i := range eps { - addrs[i].Addr = getHost(eps[i]) - } - return addrs -} - -func getHostPort2ep(eps []string) map[string]string { - hm := make(map[string]string, len(eps)) - for i := range eps { - _, host, _ := parseEndpoint(eps[i]) - hm[host] = eps[i] - } - return hm -} - -func parseEndpoint(endpoint string) (proto string, host string, scheme string) { - proto = "tcp" - host = endpoint - url, uerr := url.Parse(endpoint) - if uerr != nil || !strings.Contains(endpoint, "://") { - return proto, host, scheme - } - scheme = url.Scheme - - // strip scheme:// prefix since grpc dials by host - host = url.Host - switch url.Scheme { - case "http", "https": - case "unix", "unixs": - proto = "unix" - host = url.Host + url.Path - default: - proto, host = "", "" - } - return proto, host, scheme -} diff --git a/clientv3/balancer/grpc1.7-health_test.go b/clientv3/balancer/grpc1.7-health_test.go deleted file mode 100644 index 1ebd205bbd0e..000000000000 --- a/clientv3/balancer/grpc1.7-health_test.go +++ /dev/null @@ -1,297 +0,0 @@ -// Copyright 2018 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package balancer - -import ( - "context" - "errors" - "net" - "sync" - "testing" - "time" - - pb "go.etcd.io/etcd/etcdserver/etcdserverpb" - "go.etcd.io/etcd/pkg/testutil" - - "google.golang.org/grpc" -) - -var endpoints = []string{"localhost:2379", "localhost:22379", "localhost:32379"} - -func TestOldHealthBalancerGetUnblocking(t *testing.T) { - hb := NewGRPC17Health(endpoints, minHealthRetryDuration, func(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) { return nil, nil }) - defer hb.Close() - if addrs := <-hb.Notify(); len(addrs) != len(endpoints) { - t.Errorf("Initialize NewGRPC17Health should have triggered Notify() chan, but it didn't") - } - unblockingOpts := grpc.BalancerGetOptions{BlockingWait: false} - - _, _, err := hb.Get(context.Background(), unblockingOpts) - if err != ErrNoAddrAvailable { - t.Errorf("Get() with no up endpoints should return ErrNoAddrAvailable, got: %v", err) - } - - down1 := hb.Up(grpc.Address{Addr: endpoints[1]}) - if addrs := <-hb.Notify(); len(addrs) != 1 { - t.Errorf("first Up() should have triggered balancer to send the first connected address via Notify chan so that other connections can be closed") - } - down2 := hb.Up(grpc.Address{Addr: endpoints[2]}) - addrFirst, putFun, err := hb.Get(context.Background(), unblockingOpts) - if err != nil { - t.Errorf("Get() with up endpoints should success, got %v", err) - } - if addrFirst.Addr != endpoints[1] { - t.Errorf("Get() didn't return expected address, got %v", addrFirst) - } - if putFun == nil { - t.Errorf("Get() returned unexpected nil put function") - } - addrSecond, _, _ := hb.Get(context.Background(), unblockingOpts) - if addrFirst.Addr != addrSecond.Addr { - t.Errorf("Get() didn't return the same address as previous call, got %v and %v", addrFirst, addrSecond) - } - - down1(errors.New("error")) - if addrs := <-hb.Notify(); len(addrs) != len(endpoints)-1 { // we call down on one endpoint - t.Errorf("closing the only connection should triggered balancer to send the %d endpoints via Notify chan so that we can establish a connection", len(endpoints)-1) - } - down2(errors.New("error")) - _, _, err = hb.Get(context.Background(), unblockingOpts) - if err != ErrNoAddrAvailable { - t.Errorf("Get() with no up endpoints should return ErrNoAddrAvailable, got: %v", err) - } -} - -func TestOldHealthBalancerGetBlocking(t *testing.T) { - hb := NewGRPC17Health(endpoints, minHealthRetryDuration, func(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) { return nil, nil }) - defer hb.Close() - if addrs := <-hb.Notify(); len(addrs) != len(endpoints) { - t.Errorf("Initialize NewGRPC17Health should have triggered Notify() chan, but it didn't") - } - blockingOpts := grpc.BalancerGetOptions{BlockingWait: true} - - ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100) - _, _, err := hb.Get(ctx, blockingOpts) - cancel() - if err != context.DeadlineExceeded { - t.Errorf("Get() with no up endpoints should timeout, got %v", err) - } - - downC := make(chan func(error), 1) - - go func() { - // ensure hb.Up() will be called after hb.Get() to see if Up() releases blocking Get() - time.Sleep(time.Millisecond * 100) - f := hb.Up(grpc.Address{Addr: endpoints[1]}) - if addrs := <-hb.Notify(); len(addrs) != 1 { - t.Errorf("first Up() should have triggered balancer to send the first connected address via Notify chan so that other connections can be closed") - } - downC <- f - }() - addrFirst, putFun, err := hb.Get(context.Background(), blockingOpts) - if err != nil { - t.Errorf("Get() with up endpoints should success, got %v", err) - } - if addrFirst.Addr != endpoints[1] { - t.Errorf("Get() didn't return expected address, got %v", addrFirst) - } - if putFun == nil { - t.Errorf("Get() returned unexpected nil put function") - } - down1 := <-downC - - down2 := hb.Up(grpc.Address{Addr: endpoints[2]}) - addrSecond, _, _ := hb.Get(context.Background(), blockingOpts) - if addrFirst.Addr != addrSecond.Addr { - t.Errorf("Get() didn't return the same address as previous call, got %v and %v", addrFirst, addrSecond) - } - - down1(errors.New("error")) - if addrs := <-hb.Notify(); len(addrs) != len(endpoints)-1 { // we call down on one endpoint - t.Errorf("closing the only connection should triggered balancer to send the %d endpoints via Notify chan so that we can establish a connection", len(endpoints)-1) - } - down2(errors.New("error")) - ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*100) - _, _, err = hb.Get(ctx, blockingOpts) - cancel() - if err != context.DeadlineExceeded { - t.Errorf("Get() with no up endpoints should timeout, got %v", err) - } -} - -// TestOldHealthBalancerGraylist checks one endpoint is tried after the other -// due to gray listing. -func TestOldHealthBalancerGraylist(t *testing.T) { - var wg sync.WaitGroup - // Use 3 endpoints so gray list doesn't fallback to all connections - // after failing on 2 endpoints. - lns, eps := make([]net.Listener, 3), make([]string, 3) - wg.Add(3) - connc := make(chan string, 2) - for i := range eps { - ln, err := net.Listen("tcp", ":0") - testutil.AssertNil(t, err) - lns[i], eps[i] = ln, ln.Addr().String() - go func() { - defer wg.Done() - for { - conn, err := ln.Accept() - if err != nil { - return - } - _, err = conn.Read(make([]byte, 512)) - conn.Close() - if err == nil { - select { - case connc <- ln.Addr().String(): - // sleep some so balancer catches up - // before attempted next reconnect. - time.Sleep(50 * time.Millisecond) - default: - } - } - } - }() - } - - hb := NewGRPC17Health(eps, 5*time.Second, func(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) { return nil, nil }) - - conn, err := grpc.Dial("", grpc.WithInsecure(), grpc.WithBalancer(hb)) - testutil.AssertNil(t, err) - defer conn.Close() - - kvc := pb.NewKVClient(conn) - <-hb.Ready() - - kvc.Range(context.TODO(), &pb.RangeRequest{}) - ep1 := <-connc - kvc.Range(context.TODO(), &pb.RangeRequest{}) - ep2 := <-connc - for _, ln := range lns { - ln.Close() - } - wg.Wait() - - if ep1 == ep2 { - t.Fatalf("expected %q != %q", ep1, ep2) - } -} - -// TestBalancerDoNotBlockOnClose ensures that balancer and grpc don't deadlock each other -// due to rapid open/close conn. The deadlock causes balancer.Close() to block forever. -// See issue: https://github.com/etcd-io/etcd/issues/7283 for more detail. -func TestOldHealthBalancerDoNotBlockOnClose(t *testing.T) { - defer testutil.AfterTest(t) - - kcl := newKillConnListener(t, 3) - defer kcl.close() - - for i := 0; i < 5; i++ { - hb := NewGRPC17Health(kcl.endpoints(), minHealthRetryDuration, func(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) { return nil, nil }) - conn, err := grpc.Dial("", grpc.WithInsecure(), grpc.WithBalancer(hb)) - if err != nil { - t.Fatal(err) - } - kvc := pb.NewKVClient(conn) - <-hb.readyc - - var wg sync.WaitGroup - wg.Add(100) - cctx, cancel := context.WithCancel(context.TODO()) - for j := 0; j < 100; j++ { - go func() { - defer wg.Done() - kvc.Range(cctx, &pb.RangeRequest{}, grpc.FailFast(false)) - }() - } - // balancer.Close() might block - // if balancer and grpc deadlock each other. - bclosec, cclosec := make(chan struct{}), make(chan struct{}) - go func() { - defer close(bclosec) - hb.Close() - }() - go func() { - defer close(cclosec) - conn.Close() - }() - select { - case <-bclosec: - case <-time.After(3 * time.Second): - testutil.FatalStack(t, "balancer close timeout") - } - select { - case <-cclosec: - case <-time.After(3 * time.Second): - t.Fatal("grpc conn close timeout") - } - - cancel() - wg.Wait() - } -} - -// killConnListener listens incoming conn and kills it immediately. -type killConnListener struct { - wg sync.WaitGroup - eps []string - stopc chan struct{} - t *testing.T -} - -func newKillConnListener(t *testing.T, size int) *killConnListener { - kcl := &killConnListener{stopc: make(chan struct{}), t: t} - - for i := 0; i < size; i++ { - ln, err := net.Listen("tcp", ":0") - if err != nil { - t.Fatal(err) - } - kcl.eps = append(kcl.eps, ln.Addr().String()) - kcl.wg.Add(1) - go kcl.listen(ln) - } - return kcl -} - -func (kcl *killConnListener) endpoints() []string { - return kcl.eps -} - -func (kcl *killConnListener) listen(l net.Listener) { - go func() { - defer kcl.wg.Done() - for { - conn, err := l.Accept() - select { - case <-kcl.stopc: - return - default: - } - if err != nil { - kcl.t.Error(err) - } - time.Sleep(1 * time.Millisecond) - conn.Close() - } - }() - <-kcl.stopc - l.Close() -} - -func (kcl *killConnListener) close() { - close(kcl.stopc) - kcl.wg.Wait() -} diff --git a/clientv3/balancer/picker/err.go b/clientv3/balancer/picker/err.go index c70ce158b680..9e043789c8df 100644 --- a/clientv3/balancer/picker/err.go +++ b/clientv3/balancer/picker/err.go @@ -22,13 +22,18 @@ import ( // NewErr returns a picker that always returns err on "Pick". func NewErr(err error) Picker { - return &errPicker{err: err} + return &errPicker{p: Error, err: err} } type errPicker struct { + p Policy err error } -func (p *errPicker) Pick(context.Context, balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) { - return nil, nil, p.err +func (ep *errPicker) String() string { + return ep.p.String() +} + +func (ep *errPicker) Pick(context.Context, balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) { + return nil, nil, ep.err } diff --git a/clientv3/balancer/picker/picker.go b/clientv3/balancer/picker/picker.go index 7ea761bdb57c..07700664fc04 100644 --- a/clientv3/balancer/picker/picker.go +++ b/clientv3/balancer/picker/picker.go @@ -15,10 +15,85 @@ package picker import ( + "fmt" + + "go.uber.org/zap" "google.golang.org/grpc/balancer" + "google.golang.org/grpc/resolver" ) // Picker defines balancer Picker methods. type Picker interface { balancer.Picker + String() string +} + +// Config defines picker configuration. +type Config struct { + // Policy specifies etcd clientv3's built in balancer policy. + Policy Policy + + // Logger defines picker logging object. + Logger *zap.Logger + + // SubConnToResolverAddress maps each gRPC sub-connection to an address. + // Basically, it is a list of addresses that the Picker can pick from. + SubConnToResolverAddress map[balancer.SubConn]resolver.Address +} + +// Policy defines balancer picker policy. +type Policy uint8 + +const ( + // Error is error picker policy. + Error Policy = iota + + // RoundrobinBalanced balance loads over multiple endpoints + // and implements failover in roundrobin fashion. + RoundrobinBalanced + + // TODO: only send loads to pinned address "RoundrobinFailover" + // just like how 3.3 client works + // + // TODO: prioritize leader + // TODO: health-check + // TODO: weighted roundrobin + // TODO: power of two random choice + + // Custom defines custom balancer picker. + // TODO: custom picker is not supported yet. + Custom +) + +func (p Policy) String() string { + switch p { + case Error: + return "picker-error" + + case RoundrobinBalanced: + return "picker-roundrobin-balanced" + + case Custom: + panic("'custom' picker policy is not supported yet") + + default: + panic(fmt.Errorf("invalid balancer picker policy (%d)", p)) + } +} + +// New creates a new Picker. +func New(cfg Config) Picker { + switch cfg.Policy { + case Error: + panic("'error' picker policy is not supported here; use 'picker.NewErr'") + + case RoundrobinBalanced: + return newRoundrobinBalanced(cfg) + + case Custom: + panic("'custom' picker policy is not supported yet") + + default: + panic(fmt.Errorf("invalid balancer picker policy (%d)", cfg.Policy)) + } } diff --git a/clientv3/balancer/picker/picker_policy.go b/clientv3/balancer/picker/picker_policy.go deleted file mode 100644 index 7bca39cb1e66..000000000000 --- a/clientv3/balancer/picker/picker_policy.go +++ /dev/null @@ -1,49 +0,0 @@ -// Copyright 2018 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package picker - -import "fmt" - -// Policy defines balancer picker policy. -type Policy uint8 - -const ( - // TODO: custom picker is not supported yet. - // custom defines custom balancer picker. - custom Policy = iota - - // RoundrobinBalanced balance loads over multiple endpoints - // and implements failover in roundrobin fashion. - RoundrobinBalanced Policy = iota - - // TODO: only send loads to pinned address "RoundrobinFailover" - // just like how 3.3 client works - // - // TODO: prioritize leader - // TODO: health-check - // TODO: weighted roundrobin - // TODO: power of two random choice -) - -func (p Policy) String() string { - switch p { - case custom: - panic("'custom' picker policy is not supported yet") - case RoundrobinBalanced: - return "etcd-client-roundrobin-balanced" - default: - panic(fmt.Errorf("invalid balancer picker policy (%d)", p)) - } -} diff --git a/clientv3/balancer/picker/roundrobin_balanced.go b/clientv3/balancer/picker/roundrobin_balanced.go index b043d572dd73..1b8b28573782 100644 --- a/clientv3/balancer/picker/roundrobin_balanced.go +++ b/clientv3/balancer/picker/roundrobin_balanced.go @@ -24,32 +24,33 @@ import ( "google.golang.org/grpc/resolver" ) -// NewRoundrobinBalanced returns a new roundrobin balanced picker. -func NewRoundrobinBalanced( - lg *zap.Logger, - scs []balancer.SubConn, - addrToSc map[resolver.Address]balancer.SubConn, - scToAddr map[balancer.SubConn]resolver.Address, -) Picker { +// newRoundrobinBalanced returns a new roundrobin balanced picker. +func newRoundrobinBalanced(cfg Config) Picker { + scs := make([]balancer.SubConn, 0, len(cfg.SubConnToResolverAddress)) + for sc := range cfg.SubConnToResolverAddress { + scs = append(scs, sc) + } return &rrBalanced{ - lg: lg, + p: RoundrobinBalanced, + lg: cfg.Logger, scs: scs, - addrToSc: addrToSc, - scToAddr: scToAddr, + scToAddr: cfg.SubConnToResolverAddress, } } type rrBalanced struct { - lg *zap.Logger + p Policy - mu sync.RWMutex - next int - scs []balancer.SubConn + lg *zap.Logger - addrToSc map[resolver.Address]balancer.SubConn + mu sync.RWMutex + next int + scs []balancer.SubConn scToAddr map[balancer.SubConn]resolver.Address } +func (rb *rrBalanced) String() string { return rb.p.String() } + // Pick is called for every client request. func (rb *rrBalanced) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) { rb.mu.RLock() @@ -68,6 +69,7 @@ func (rb *rrBalanced) Pick(ctx context.Context, opts balancer.PickOptions) (bala rb.lg.Debug( "picked", + zap.String("picker", rb.p.String()), zap.String("address", picked), zap.Int("subconn-index", cur), zap.Int("subconn-size", n), @@ -77,6 +79,7 @@ func (rb *rrBalanced) Pick(ctx context.Context, opts balancer.PickOptions) (bala // TODO: error handling? fss := []zapcore.Field{ zap.Error(info.Err), + zap.String("picker", rb.p.String()), zap.String("address", picked), zap.Bool("success", info.Err == nil), zap.Bool("bytes-sent", info.BytesSent), diff --git a/clientv3/balancer/utils.go b/clientv3/balancer/utils.go index a11faeb7e6c7..48eb87507404 100644 --- a/clientv3/balancer/utils.go +++ b/clientv3/balancer/utils.go @@ -29,9 +29,9 @@ func scToString(sc balancer.SubConn) string { return fmt.Sprintf("%p", sc) } -func scsToStrings(scs map[resolver.Address]balancer.SubConn) (ss []string) { +func scsToStrings(scs map[balancer.SubConn]resolver.Address) (ss []string) { ss = make([]string, 0, len(scs)) - for a, sc := range scs { + for sc, a := range scs { ss = append(ss, fmt.Sprintf("%s (%s)", a.Addr, scToString(sc))) } sort.Strings(ss) diff --git a/clientv3/client.go b/clientv3/client.go index d8cb13808e88..5eca63ff6fdc 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -51,12 +51,17 @@ var ( func init() { lg := zap.NewNop() if os.Getenv("ETCD_CLIENT_DEBUG") != "" { + lcfg := logutil.DefaultZapLoggerConfig + lcfg.Level = zap.NewAtomicLevelAt(zap.DebugLevel) + var err error - lg, err = zap.NewProductionConfig().Build() // info level logging + lg, err = lcfg.Build() // info level logging if err != nil { panic(err) } } + + // TODO: support custom balancer balancer.RegisterBuilder(balancer.Config{ Policy: picker.RoundrobinBalanced, Name: roundRobinBalancerName, diff --git a/clientv3/config.go b/clientv3/config.go index bd0376880ff6..2ef034449e00 100644 --- a/clientv3/config.go +++ b/clientv3/config.go @@ -81,4 +81,6 @@ type Config struct { // PermitWithoutStream when set will allow client to send keepalive pings to server without any active streams(RPCs). PermitWithoutStream bool `json:"permit-without-stream"` + + // TODO: support customer balancer picker }