Skip to content

Commit

Permalink
refactored clientv3 to build with grpc 1.30.x
Browse files Browse the repository at this point in the history
  • Loading branch information
corpix committed Jul 10, 2020
1 parent 2f25f53 commit adb7dca
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 19 deletions.
35 changes: 24 additions & 11 deletions clientv3/balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ type Balancer interface {
// connection with "grpc.Dial(addr, grpc.WithBalancerName)", and then those resolved
// addresses are passed to "grpc/balancer.Balancer.HandleResolvedAddrs".
// For each resolved address, balancer calls "balancer.ClientConn.NewSubConn".
// "grpc/balancer.Balancer.HandleSubConnStateChange" is called when connectivity state
// "grpc/balancer.Balancer.UpdateSubConnState" is called when connectivity state
// changes, thus requires failover logic in this method.
balancer.Balancer

Expand All @@ -138,13 +138,10 @@ type baseBalancer struct {
picker picker.Picker
}

// HandleResolvedAddrs implements "grpc/balancer.Balancer" interface.
// UpdateClientConnState implements "grpc/balancer.Balancer" interface.
// gRPC sends initial or updated resolved addresses from "Build".
func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
if err != nil {
bb.lg.Warn("HandleResolvedAddrs called with error", zap.String("balancer-id", bb.id), zap.Error(err))
return
}
func (bb *baseBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
addrs := ccs.ResolverState.Addresses
bb.lg.Info("resolved",
zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
Expand Down Expand Up @@ -186,18 +183,22 @@ func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error)
)

// Keep the state of this sc in bb.scToSt until sc's state becomes Shutdown.
// The entry will be deleted in HandleSubConnStateChange.
// The entry will be deleted in UpdateSubConnState.
// (DO NOT) delete(bb.scToAddr, sc)
// (DO NOT) delete(bb.scToSt, sc)
}
}

return nil
}

// HandleSubConnStateChange implements "grpc/balancer.Balancer" interface.
func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s grpcconnectivity.State) {
// UpdateSubConnState implements "grpc/balancer.Balancer" interface.
func (bb *baseBalancer) UpdateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) {
bb.mu.Lock()
defer bb.mu.Unlock()

s := scs.ConnectivityState

old, ok := bb.scToSt[sc]
if !ok {
bb.lg.Warn(
Expand Down Expand Up @@ -247,7 +248,10 @@ func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s grpcconn
bb.updatePicker()
}

bb.currentConn.UpdateBalancerState(bb.connectivityRecorder.GetCurrentState(), bb.picker)
bb.currentConn.UpdateState(balancer.State{
ConnectivityState: bb.connectivityRecorder.GetCurrentState(),
Picker: bb.picker,
})
}

func (bb *baseBalancer) updatePicker() {
Expand Down Expand Up @@ -291,3 +295,12 @@ func (bb *baseBalancer) updatePicker() {
func (bb *baseBalancer) Close() {
// TODO
}

// ResolverError implements "grpc/balancer.Balancer" interface.
func (bb *baseBalancer) ResolverError(err error) {
bb.lg.Error(
"got balancer resolver error",
zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
)
}
6 changes: 2 additions & 4 deletions clientv3/balancer/picker/err.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
package picker

import (
"context"

"google.golang.org/grpc/balancer"
)

Expand All @@ -34,6 +32,6 @@ func (ep *errPicker) String() string {
return ep.p.String()
}

func (ep *errPicker) Pick(context.Context, balancer.PickInfo) (balancer.SubConn, func(balancer.DoneInfo), error) {
return nil, nil, ep.err
func (ep *errPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
return balancer.PickResult{}, ep.err
}
10 changes: 6 additions & 4 deletions clientv3/balancer/picker/roundrobin_balanced.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package picker

import (
"context"
"sync"

"go.uber.org/zap"
Expand Down Expand Up @@ -52,12 +51,13 @@ type rrBalanced struct {
func (rb *rrBalanced) String() string { return rb.p.String() }

// Pick is called for every client request.
func (rb *rrBalanced) Pick(ctx context.Context, opts balancer.PickInfo) (balancer.SubConn, func(balancer.DoneInfo), error) {
func (rb *rrBalanced) Pick(opts balancer.PickInfo) (balancer.PickResult, error) {
rb.mu.RLock()
n := len(rb.scs)
rb.mu.RUnlock()
r := balancer.PickResult{}
if n == 0 {
return nil, nil, balancer.ErrNoSubConnAvailable
return r, balancer.ErrNoSubConnAvailable
}

rb.mu.Lock()
Expand Down Expand Up @@ -91,5 +91,7 @@ func (rb *rrBalanced) Pick(ctx context.Context, opts balancer.PickInfo) (balance
rb.lg.Warn("balancer failed", fss...)
}
}
return sc, doneFunc, nil
r.SubConn = sc
r.Done = doneFunc
return r, nil
}

0 comments on commit adb7dca

Please sign in to comment.