Skip to content

Commit

Permalink
adjust dialer check logic
Browse files Browse the repository at this point in the history
  • Loading branch information
fffw committed May 24, 2016
1 parent 6086c1e commit f1af351
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 69 deletions.
32 changes: 28 additions & 4 deletions src/github.com/getlantern/balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"net"
"net/http"
"sync"
"sync/atomic"
"time"

"github.com/getlantern/golog"
)
Expand All @@ -17,14 +19,20 @@ const (
)

var (
// When Dial() is called after an idle period larger than
// RecheckAfterIdleFor, Balancer will recheck all dialers to make sure they
// are alive and have up-to-date metrics.
RecheckAfterIdleFor = 1 * time.Minute

log = golog.LoggerFor("balancer")
)

// Balancer balances connections established by one or more Dialers.
// Balancer balances connections among multiple Dialers.
type Balancer struct {
mu sync.RWMutex
dialers dialerHeap
trusted dialerHeap
mu sync.RWMutex
dialers dialerHeap
trusted dialerHeap
lastDialTime atomic.Value // time.Time
}

// New creates a new Balancer using the supplied Strategy and Dialers.
Expand All @@ -45,6 +53,8 @@ func New(st Strategy, dialers ...*Dialer) *Balancer {
bal := &Balancer{dialers: st(dls), trusted: st(tdls)}
heap.Init(&bal.dialers)
heap.Init(&bal.trusted)
// Force checking all dialers on first Dial()
bal.lastDialTime.Store(time.Time{})
return bal
}

Expand All @@ -63,6 +73,13 @@ func (b *Balancer) OnRequest(req *http.Request) {
// either manages to connect, or runs out of dialers in which case it returns an
// error.
func (b *Balancer) Dial(network, addr string) (net.Conn, error) {
lastDialTime := b.lastDialTime.Load().(time.Time)
idled := time.Since(lastDialTime)
if idled > RecheckAfterIdleFor {
log.Debugf("Balancer idled for %s, start checking all dialers", idled)
b.checkDialers()
}
defer b.lastDialTime.Store(time.Now())
var dialers dialerHeap

_, port, _ := net.SplitHostPort(addr)
Expand Down Expand Up @@ -108,3 +125,10 @@ func (b *Balancer) Close() {
d.Stop()
}
}

// Parallel check all dialers
func (b *Balancer) checkDialers() {
for _, d := range b.dialers.dialers {
go d.check()
}
}
69 changes: 50 additions & 19 deletions src/github.com/getlantern/balancer/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,26 +150,57 @@ func TestTrusted(t *testing.T) {
assert.Equal(t, dialCount, 2, "should dial untrusted dialer")
}

func echoServer() (addr string, l net.Listener) {
l, err := net.Listen("tcp", "localhost:0")
if err != nil {
log.Fatalf("Unable to listen: %s", err)
}
go func() {
for {
c, err := l.Accept()
if err == nil {
go func() {
_, err = io.Copy(c, c)
if err != nil {
log.Fatalf("Unable to echo: %s", err)
}
}()
func TestCheck(t *testing.T) {
oldRecheckAfterIdleFor := RecheckAfterIdleFor
RecheckAfterIdleFor = 100 * time.Millisecond
defer func() { RecheckAfterIdleFor = oldRecheckAfterIdleFor }()
oldNextCheckFactor := nextCheckFactor
nextCheckFactor = 100 * time.Millisecond
defer func() { nextCheckFactor = oldNextCheckFactor }()

var wg sync.WaitGroup
var failToDial uint32 = 0
d := &Dialer{
DialFN: func(network, addr string) (net.Conn, error) {
if atomic.LoadUint32(&failToDial) == 1 {
return nil, fmt.Errorf("fail intentionally")
} else {
return nil, nil
}
}
}()
addr = l.Addr().String()
return
},
Check: func() bool {
wg.Done()
return atomic.LoadUint32(&failToDial) == 0
},
Trusted: true,
}
bal := New(Sticky, d)

// check when dial for the first time
wg.Add(1)
_, err := bal.Dial("tcp", "does-not-exist.com:80")
assert.NoError(t, err)
wg.Wait()

// recheck when dial after idled for a while
wg.Add(1)
time.Sleep(200 * time.Millisecond)
_, err = bal.Dial("tcp", "does-not-exist.com:80")
assert.NoError(t, err)
wg.Wait()

// not recheck with consecutive successes
_, err = bal.Dial("tcp", "does-not-exist.com:80")
assert.NoError(t, err)

// recheck failed dialer
atomic.StoreUint32(&failToDial, 1)
wg.Add(1)
_, err = bal.Dial("tcp", "does-not-exist.com:80")
assert.Error(t, err)
_, err = bal.Dial("tcp", "does-not-exist.com:80")
assert.Error(t, err)
wg.Wait()
}

func newDialer(id int) *Dialer {
Expand Down
86 changes: 50 additions & 36 deletions src/github.com/getlantern/balancer/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,15 @@ import (
"time"
)

var (
// Proxy server may add a client IP address to blacklist if it constantly
// makes connections without sending any request. Balancer will try to
// avoid being blacklisted. Current Lantern server has a threshold of 10.
ServerBlacklistingThreshold int32 = 10

nextCheckFactor = 10 * time.Second
)

// Dialer captures the configuration for dialing arbitrary addresses.
type Dialer struct {
// Label: optional label with which to tag this dialer for debug logging.
Expand All @@ -22,9 +31,10 @@ type Dialer struct {

// Check: - a function that's used to test reachibility metrics
// periodically or if the dialer was failed to connect.
// It should return true for a successful check.
//
// Checks are scheduled at exponentially increasing intervals that are
// capped at MaxCheckTimeout ± ½.
// Checks are scheduled at exponentially increasing intervals if dialer is
// failed. Balancer will also schedule check when required.
Check func() bool

// Determines whether a dialer can be trusted with unencrypted traffic.
Expand All @@ -34,31 +44,28 @@ type Dialer struct {
OnRequest func(req *http.Request)
}

var (
// MaxCheckTimeout is the average of maximum wait time before checking an idle or
// failed dialer. The real cap is a random duration between MaxCheckTimeout ± ½.
MaxCheckTimeout = 1 * time.Minute
)

type dialer struct {
// Ref dialer.EMADialTime() for the rationale
// Keep it at the top to make sure 64-bit alignment, see
// https://golang.org/pkg/sync/atomic/#pkg-note-BUG
emaDialTime int64

*Dialer
closeCh chan struct{}
closeCh chan struct{}
// prevent race condition when calling Timer.Reset()
muCheckTimer sync.Mutex
checkTimer *time.Timer

consecSuccesses int32
consecFailures int32
}

const longDuration = 100000 * time.Hour

func (d *dialer) Start() {
d.consecSuccesses = 1 // be optimistic
d.closeCh = make(chan struct{})
d.checkTimer = time.NewTimer(maxCheckTimeout())
d.checkTimer = time.NewTimer(longDuration)
if d.Check == nil {
d.Check = d.defaultCheck
}
Expand All @@ -73,23 +80,28 @@ func (d *dialer) Start() {
}
return
case <-d.checkTimer.C:
log.Tracef("Start checking dialer %s", d.Label)
t := time.Now()
ok := d.Check()
if ok {
d.markSuccess()
// Check time is generally larger than dial time, but still
// meaningful when comparing latency across multiple
// dialers.
d.updateEMADialTime(time.Since(t))
} else {
d.markFailure()
}
go d.check()
}
}
}()
}

func (d *dialer) check() {
log.Tracef("Start checking dialer %s", d.Label)
t := time.Now()
ok := d.Check()
if ok {
d.markSuccess()
// Check time is generally larger than dial time, but still
// meaningful when comparing latency across multiple
// dialers.
d.updateEMADialTime(time.Since(t))
} else {
log.Tracef("Dialer %s failed check", d.Label)
d.markFailure()
}
}

func (d *dialer) Stop() {
d.closeCh <- struct{}{}
}
Expand Down Expand Up @@ -123,30 +135,32 @@ func (d *dialer) updateEMADialTime(t time.Duration) {
// Ref dialer.EMADialTime() for the rationale.
// The values is large enough to safely ignore decimals.
newEMA := (atomic.LoadInt64(&d.emaDialTime) + t.Nanoseconds()) / 2
log.Tracef("Dialer %s EMA(exponential moving average) dial time: %v", d.Label, time.Duration(newEMA))
log.Tracef("Dialer %s EMA dial time: %v", d.Label, time.Duration(newEMA))
atomic.StoreInt64(&d.emaDialTime, newEMA)
}

func (d *dialer) markSuccess() {
newCS := atomic.AddInt32(&d.consecSuccesses, 1)
log.Tracef("Dialer %s consecutive successes: %d -> %d", d.Label, newCS-1, newCS)
atomic.StoreInt32(&d.consecFailures, 0)
d.muCheckTimer.Lock()
d.checkTimer.Reset(maxCheckTimeout())
d.muCheckTimer.Unlock()
// only when state is changing
if newCS <= 2 {
atomic.StoreInt32(&d.consecFailures, 0)
}
}

func (d *dialer) markFailure() {
atomic.StoreInt32(&d.consecSuccesses, 0)
newCF := atomic.AddInt32(&d.consecFailures, 1)
log.Tracef("Dialer %s consecutive failures: %d -> %d", d.Label, newCF-1, newCF)
nextCheck := time.Duration(newCF*newCF) * 100 * time.Millisecond
if nextCheck > MaxCheckTimeout {
nextCheck = maxCheckTimeout()
// Don't bother to recheck if dialer is constantly failing.
// Balancer will recheck when there's traffic after idle for some time.
if newCF < ServerBlacklistingThreshold/2 {
atomic.StoreInt32(&d.consecSuccesses, 0)
nextCheck := randomize(time.Duration(newCF*newCF) * nextCheckFactor)
log.Debugf("Will recheck %s %v later because it failed for %d times", d.Label, nextCheck, newCF)
d.muCheckTimer.Lock()
d.checkTimer.Reset(nextCheck)
d.muCheckTimer.Unlock()
}
d.muCheckTimer.Lock()
d.checkTimer.Reset(nextCheck)
d.muCheckTimer.Unlock()
}

func (d *dialer) defaultCheck() bool {
Expand All @@ -155,6 +169,6 @@ func (d *dialer) defaultCheck() bool {
}

// adds randomization to make requests less distinguishable on the network.
func maxCheckTimeout() time.Duration {
return time.Duration((MaxCheckTimeout.Nanoseconds() / 2) + rand.Int63n(MaxCheckTimeout.Nanoseconds()))
func randomize(d time.Duration) time.Duration {
return time.Duration((d.Nanoseconds() / 2) + rand.Int63n(d.Nanoseconds()))
}
22 changes: 12 additions & 10 deletions src/github.com/getlantern/balancer/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,22 +51,24 @@ func RandomlyFailWithVariedDelay(failPercent int, delay time.Duration, delta tim
}
}

func EchoServer() net.Listener {
func echoServer() (addr string, l net.Listener) {
l, err := net.Listen("tcp", "localhost:0")
if err != nil {
log.Fatal(err)
log.Fatalf("Unable to listen: %s", err)
}
go func() {
for {
conn, err := l.Accept()
if err != nil {
log.Fatal(err)
c, err := l.Accept()
if err == nil {
go func() {
_, err = io.Copy(c, c)
if err != nil {
log.Fatalf("Unable to echo: %s", err)
}
}()
}
go func(c net.Conn) {
_, _ = io.Copy(c, c)
_ = c.Close()
}(conn)
}
}()
return l
addr = l.Addr().String()
return
}

0 comments on commit f1af351

Please sign in to comment.