Skip to content

Commit

Permalink
Fixes go-zookeeper#40 Don't Fatal on failure to reset watches.
Browse files Browse the repository at this point in the history
- Add a test for slow servers
  • Loading branch information
samuel committed Nov 19, 2014
1 parent e59e565 commit 8499ec2
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 26 deletions.
34 changes: 23 additions & 11 deletions zk/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,18 @@ import (
"time"
)

type logWriter struct {
t *testing.T
p string
}

func (lw logWriter) Write(b []byte) (int, error) {
lw.t.Logf("%s%s", lw.p, string(b))
return len(b), nil
}

func TestBasicCluster(t *testing.T) {
ts, err := StartTestCluster(3)
ts, err := StartTestCluster(3, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
Expand All @@ -22,6 +32,8 @@ func TestBasicCluster(t *testing.T) {
}
defer zk2.Close()

time.Sleep(time.Second * 5)

if _, err := zk1.Create("/gozk-test", []byte("foo-cluster"), 0, WorldACL(PermAll)); err != nil {
t.Fatalf("Create failed on node 1: %+v", err)
}
Expand All @@ -33,7 +45,7 @@ func TestBasicCluster(t *testing.T) {
}

func TestClientClusterFailover(t *testing.T) {
ts, err := StartTestCluster(3)
ts, err := StartTestCluster(3, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
Expand All @@ -58,7 +70,7 @@ func TestClientClusterFailover(t *testing.T) {
}

func TestWaitForClose(t *testing.T) {
ts, err := StartTestCluster(1)
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
Expand All @@ -67,23 +79,23 @@ func TestWaitForClose(t *testing.T) {
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
timeout := time.After(30*time.Second)
timeout := time.After(30 * time.Second)
CONNECTED:
for{
select{
case ev := <-zk.eventChan :
for {
select {
case ev := <-zk.eventChan:
if ev.State == StateConnected {
break CONNECTED;
break CONNECTED
}
case <-timeout:
zk.Close()
t.Fatal("Timeout")
}
}
zk.Close()
for{
select{
case _,ok := <-zk.eventChan :
for {
select {
case _, ok := <-zk.eventChan:
if !ok {
return
}
Expand Down
2 changes: 1 addition & 1 deletion zk/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func (c *Conn) sendSetWatches() {
res := &setWatchesResponse{}
_, err := c.request(opSetWatches, req, res, nil)
if err != nil {
log.Fatal(err)
log.Printf("Failed to set previous watches: %s", err.Error())
}
}()
}
Expand Down
4 changes: 2 additions & 2 deletions zk/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
)

func TestLock(t *testing.T) {
ts, err := StartTestCluster(1)
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -64,7 +64,7 @@ func TestLock(t *testing.T) {
// This tests creating a lock with a path that's more than 1 node deep (e.g. "/test-multi-level/lock"),
// when a part of that path already exists (i.e. "/test-multi-level" node already exists).
func TestMultiLevelLock(t *testing.T) {
ts, err := StartTestCluster(1)
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
Expand Down
5 changes: 4 additions & 1 deletion zk/server_help.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package zk

import (
"fmt"
"io"
"io/ioutil"
"math/rand"
"os"
Expand All @@ -20,7 +21,7 @@ type TestCluster struct {
Servers []TestServer
}

func StartTestCluster(size int) (*TestCluster, error) {
func StartTestCluster(size int, stdout, stderr io.Writer) (*TestCluster, error) {
tmpPath, err := ioutil.TempDir("", "gozk")
if err != nil {
return nil, err
Expand Down Expand Up @@ -74,6 +75,8 @@ func StartTestCluster(size int) (*TestCluster, error) {

srv := &Server{
ConfigPath: cfgPath,
Stdout: stdout,
Stderr: stderr,
}
if err := srv.Start(); err != nil {
return nil, err
Expand Down
9 changes: 5 additions & 4 deletions zk/server_java.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,9 @@ func findZookeeperFatJar() string {
}

type Server struct {
JarPath string
ConfigPath string
JarPath string
ConfigPath string
Stdout, Stderr io.Writer

cmd *exec.Cmd
}
Expand All @@ -124,8 +125,8 @@ func (srv *Server) Start() error {
}
}
srv.cmd = exec.Command("java", "-jar", srv.JarPath, "server", srv.ConfigPath)
// srv.cmd.Stdout = os.Stdout
// srv.cmd.Stderr = os.Stderr
srv.cmd.Stdout = srv.Stdout
srv.cmd.Stderr = srv.Stderr
return srv.cmd.Start()
}

Expand Down
141 changes: 134 additions & 7 deletions zk/zk_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
package zk

import (
"fmt"
"io"
"net"
"strings"
"testing"
"time"

"camlistore.org/pkg/throttle"
)

func TestCreate(t *testing.T) {
ts, err := StartTestCluster(1)
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -37,7 +43,7 @@ func TestCreate(t *testing.T) {
}

func TestMulti(t *testing.T) {
ts, err := StartTestCluster(1)
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -77,7 +83,7 @@ func TestMulti(t *testing.T) {
}

func TestGetSetACL(t *testing.T) {
ts, err := StartTestCluster(1)
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -131,7 +137,7 @@ func TestGetSetACL(t *testing.T) {
}

func TestAuth(t *testing.T) {
ts, err := StartTestCluster(1)
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -181,7 +187,7 @@ func TestAuth(t *testing.T) {
}

func TestChildWatch(t *testing.T) {
ts, err := StartTestCluster(1)
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -252,7 +258,7 @@ func TestChildWatch(t *testing.T) {
}

func TestSetWatchers(t *testing.T) {
ts, err := StartTestCluster(1)
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -332,7 +338,7 @@ func TestSetWatchers(t *testing.T) {
}

func TestExpiringWatch(t *testing.T) {
ts, err := StartTestCluster(1)
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -396,3 +402,124 @@ func TestRequestFail(t *testing.T) {
t.Fatal("Get hung when connection could not be made")
}
}

func TestSlowServer(t *testing.T) {
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()

realAddr := fmt.Sprintf("127.0.0.1:%d", ts.Servers[0].Port)
proxyAddr, stopCh, err := startSlowProxy(t,
throttle.Rate{}, throttle.Rate{},
realAddr, func(ln *throttle.Listener) {
if ln.Up.Latency == 0 {
t.Log("Throttling next connection")
ln.Up.Latency = time.Millisecond * 2000
ln.Down.Latency = time.Millisecond * 2000
} else {
t.Log("Not throttling next connection")
ln.Up.Latency = 0
ln.Down.Latency = 0
}
})
if err != nil {
t.Fatal(err)
}
defer close(stopCh)

zk, _, err := Connect([]string{proxyAddr}, time.Millisecond*500)
if err != nil {
t.Fatal(err)
}
defer zk.Close()

_, _, wch, err := zk.ChildrenW("/")
if err != nil {
t.Fatal(err)
}

// Force a reconnect to get a throttled connection
zk.conn.Close()

time.Sleep(time.Millisecond * 100)

if err := zk.Delete("/gozk-test", -1); err == nil {
t.Fatal("Delete should have failed")
}

// Force a reconnect to get a non-throttled connection
zk.conn.Close()

time.Sleep(time.Millisecond * 100)

if _, err := zk.Create("/gozk-test", []byte{1, 2, 3, 4}, 0, WorldACL(PermAll)); err != nil {
t.Fatal(err)
}

// Make sure event is still returned because the session should not have been affected
select {
case ev := <-wch:
t.Logf("Received event: %+v", ev)
case <-time.After(time.Second):
t.Fatal("Expected to receive a watch event")
}
}

func startSlowProxy(t *testing.T, up, down throttle.Rate, upstream string, adj func(ln *throttle.Listener)) (string, chan bool, error) {
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
return "", nil, err
}
tln := &throttle.Listener{
Listener: ln,
Up: up,
Down: down,
}
stopCh := make(chan bool)
go func() {
<-stopCh
tln.Close()
}()
go func() {
for {
cn, err := tln.Accept()
if err != nil {
if !strings.Contains(err.Error(), "use of closed network connection") {
t.Fatalf("Accept failed: %s", err.Error())
}
return
}
if adj != nil {
adj(tln)
}
go func(cn net.Conn) {
// This will leave hanging goroutines util stopCh is closed
// but it doesn't matter in the context of running tests.
go func() {
<-stopCh
cn.Close()
}()
upcn, err := net.Dial("tcp", upstream)
if err != nil {
t.Fatal(err)
return
}
go func() {
if _, err := io.Copy(upcn, cn); err != nil {
if !strings.Contains(err.Error(), "use of closed network connection") {
t.Logf("Upstream write failed: %s", err.Error())
}
}
}()
if _, err := io.Copy(cn, upcn); err != nil {
if !strings.Contains(err.Error(), "use of closed network connection") {
t.Logf("Upstream read failed: %s", err.Error())
}
}
}(cn)
}
}()
return ln.Addr().String(), stopCh, nil
}

0 comments on commit 8499ec2

Please sign in to comment.