Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
* upstream/master:
  Do not discard session on connection drop
  Fix the server reported in the connecting event
  Restore watches on authentication success
  Better logging of connection state transitions
  Derive network timeouts from negotiated session timeout
  Fix typo
  • Loading branch information
Dallas Marlow committed Oct 8, 2015
2 parents f2fd088 + 913027e commit c5f8432
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 74 deletions.
200 changes: 164 additions & 36 deletions zk/cluster_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package zk

import (
"fmt"
"strings"
"sync"
"testing"
"time"
)
Expand Down Expand Up @@ -46,65 +45,125 @@ func TestBasicCluster(t *testing.T) {
}
}

// If the current leader dies, then the session is reestablished with the new one.
func TestClientClusterFailover(t *testing.T) {
ts, err := StartTestCluster(3, nil, logWriter{t: t, p: "[ZKERR] "})
tc, err := StartTestCluster(3, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
zk, evCh, err := ts.ConnectAll()
defer tc.Stop()
zk, evCh, err := tc.ConnectAll()
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk.Close()

hasSession := make(chan string, 1)
go func() {
for ev := range evCh {
if ev.Type == EventSession && ev.State == StateHasSession {
select {
case hasSession <- ev.Server:
default:
}
}
}
}()
sl := NewStateLogger(evCh)

waitSession := func() string {
select {
case srv := <-hasSession:
return srv
case <-time.After(time.Second * 8):
t.Fatal("Failed to connect and get a session")
}
return ""
hasSessionEvent1 := sl.NewWatcher(sessionStateMatcher(StateHasSession)).Wait(8 * time.Second)
if hasSessionEvent1 == nil {
t.Fatalf("Failed to connect and get session")
}

srv := waitSession()
if _, err := zk.Create("/gozk-test", []byte("foo-cluster"), 0, WorldACL(PermAll)); err != nil {
t.Fatalf("Create failed on node 1: %+v", err)
}

stopped := false
for _, s := range ts.Servers {
if strings.HasSuffix(srv, fmt.Sprintf(":%d", s.Port)) {
s.Srv.Stop()
stopped = true
break
}
}
if !stopped {
t.Fatal("Failed to stop server")
hasSessionWatcher2 := sl.NewWatcher(sessionStateMatcher(StateHasSession))

// Kill the current leader
tc.StopServer(hasSessionEvent1.Server)

// Wait for the session to be reconnected with the new leader.
hasSessionWatcher2.Wait(8 * time.Second)
if hasSessionWatcher2 == nil {
t.Fatalf("Failover failed")
}

waitSession()
if by, _, err := zk.Get("/gozk-test"); err != nil {
t.Fatalf("Get failed on node 2: %+v", err)
} else if string(by) != "foo-cluster" {
t.Fatal("Wrong data for node 2")
}
}

// If a ZooKeeper cluster looses quorum then a session is reconnected as soon
// as the quorum is restored.
func TestNoQuorum(t *testing.T) {
tc, err := StartTestCluster(3, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer tc.Stop()
zk, evCh, err := tc.ConnectAllTimeout(4 * time.Second)
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk.Close()
sl := NewStateLogger(evCh)

// Wait for initial session to be established
hasSessionEvent1 := sl.NewWatcher(sessionStateMatcher(StateHasSession)).Wait(8 * time.Second)
if hasSessionEvent1 == nil {
t.Fatalf("Failed to connect and get session")
}
initialSessionID := zk.sessionID
DefaultLogger.Printf(" Session established: id=%d, timeout=%d", zk.sessionID, zk.sessionTimeoutMs)

// Kill the ZooKeeper leader and wait for the session to reconnect.
DefaultLogger.Printf(" Kill the leader")
hasSessionWatcher2 := sl.NewWatcher(sessionStateMatcher(StateHasSession))
tc.StopServer(hasSessionEvent1.Server)
hasSessionEvent2 := hasSessionWatcher2.Wait(8 * time.Second)
if hasSessionEvent2 == nil {
t.Fatalf("Failover failed")
}

// Kill the ZooKeeper leader leaving the cluster without quorum.
DefaultLogger.Printf(" Kill the leader")
tc.StopServer(hasSessionEvent2.Server)

// Make sure that we keep retrying connecting to the only remaining
// ZooKeeper server, but the attempts are being dropped because there is
// no quorum.
DefaultLogger.Printf(" Retrying no luck...")
var firstDisconnect *Event
begin := time.Now()
for time.Now().Sub(begin) < 6*time.Second {
disconnectedEvent := sl.NewWatcher(sessionStateMatcher(StateDisconnected)).Wait(4 * time.Second)
if disconnectedEvent == nil {
t.Fatalf("Disconnected event expected")
}
if firstDisconnect == nil {
firstDisconnect = disconnectedEvent
continue
}
if disconnectedEvent.Server != firstDisconnect.Server {
t.Fatalf("Disconnect from wrong server: expected=%s, actual=%s",
firstDisconnect.Server, disconnectedEvent.Server)
}
}

// Start a ZooKeeper node to restore quorum.
hasSessionWatcher3 := sl.NewWatcher(sessionStateMatcher(StateHasSession))
tc.StartServer(hasSessionEvent1.Server)

// Make sure that session is reconnected with the same ID.
hasSessionEvent3 := hasSessionWatcher3.Wait(8 * time.Second)
if hasSessionEvent3 == nil {
t.Fatalf("Session has not been reconnected")
}
if zk.sessionID != initialSessionID {
t.Fatalf("Wrong session ID: expected=%d, actual=%d", initialSessionID, zk.sessionID)
}

// Make sure that the session is not dropped soon after reconnect
e := sl.NewWatcher(sessionStateMatcher(StateDisconnected)).Wait(6 * time.Second)
if e != nil {
t.Fatalf("Unexpected disconnect")
}
}

func TestWaitForClose(t *testing.T) {
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
Expand Down Expand Up @@ -164,3 +223,72 @@ func TestBadSession(t *testing.T) {
t.Fatalf("Delete returned error: %+v", err)
}
}

type EventLogger struct {
events []Event
watchers []*EventWatcher
lock sync.Mutex
wg sync.WaitGroup
}

func NewStateLogger(eventCh <-chan Event) *EventLogger {
el := &EventLogger{}
el.wg.Add(1)
go func() {
defer el.wg.Done()
for event := range eventCh {
el.lock.Lock()
for _, sw := range el.watchers {
if !sw.triggered && sw.matcher(event) {
sw.triggered = true
sw.matchCh <- event
}
}
DefaultLogger.Printf(" event received: %v\n", event)
el.events = append(el.events, event)
el.lock.Unlock()
}
}()
return el
}

func (el *EventLogger) NewWatcher(matcher func(Event) bool) *EventWatcher {
ew := &EventWatcher{matcher: matcher, matchCh: make(chan Event, 1)}
el.lock.Lock()
el.watchers = append(el.watchers, ew)
el.lock.Unlock()
return ew
}

func (el *EventLogger) Events() []Event {
el.lock.Lock()
transitions := make([]Event, len(el.events))
copy(transitions, el.events)
el.lock.Unlock()
return transitions
}

func (el *EventLogger) Wait4Stop() {
el.wg.Wait()
}

type EventWatcher struct {
matcher func(Event) bool
matchCh chan Event
triggered bool
}

func (ew *EventWatcher) Wait(timeout time.Duration) *Event {
select {
case event := <-ew.matchCh:
return &event
case <-time.After(timeout):
return nil
}
}

func sessionStateMatcher(s State) func(Event) bool {
return func(e Event) bool {
return e.Type == EventSession && e.State == s
}
}
Loading

0 comments on commit c5f8432

Please sign in to comment.