diff --git a/constants.go b/constants.go index b6c6136..75e4180 100644 --- a/constants.go +++ b/constants.go @@ -23,6 +23,7 @@ const ( ) const ( + opError = -1 opCreate = 1 opDelete = 2 opExists = 3 diff --git a/encode.go b/encode.go index 348c5aa..6324b31 100644 --- a/encode.go +++ b/encode.go @@ -329,6 +329,8 @@ func (r *MultiResponse) Encode(buf []byte) (int, error) { n, err = encodePacketValue(buf[total:], reflect.ValueOf(op.String)) case opSetData: n, err = encodePacketValue(buf[total:], reflect.ValueOf(op.Stat)) + case opError: + n, err = encodePacketValue(buf[total:], reflect.ValueOf(&op.Header.Err)) } total += n if err != nil { @@ -371,6 +373,8 @@ func (r *MultiResponse) Decode(buf []byte) (int, error) { res.Stat = new(Stat) w = reflect.ValueOf(res.Stat) case opCheck, opDelete: + case opError: + w = reflect.ValueOf(&res.Header.Err) } if w.IsValid() { n, err := decodePacketValue(buf[total:], w) diff --git a/integration/docker_test.go b/integration/docker_test.go index 52be0c6..2185e39 100644 --- a/integration/docker_test.go +++ b/integration/docker_test.go @@ -53,7 +53,7 @@ func NewContainer(containerName, dockerFile string, ports []string) (*Container, } func newContainerFiles(cfg ContainerConfig) (c *Container, err error) { - dc, err := docker.NewClient("unix://var/run/docker.sock") + dc, err := docker.NewClient("unix:///var/run/docker.sock") if err != nil { return nil, err } diff --git a/integration/drill/Dockerfile b/integration/drill/Dockerfile index 4611246..db25fcb 100644 --- a/integration/drill/Dockerfile +++ b/integration/drill/Dockerfile @@ -1,12 +1,14 @@ FROM java:openjdk-8-jdk # needs jdk to submit sql queries!? +ENV DRILL_VERSION 1.15.0 + RUN mkdir -p /opt/drill && \ - wget -q -O - http://www-us.apache.org/dist/drill/drill-1.10.0/apache-drill-1.10.0.tar.gz | tar -zxvf - -C /opt/drill + wget -q -O - http://www-us.apache.org/dist/drill/drill-${DRILL_VERSION}/apache-drill-${DRILL_VERSION}.tar.gz | tar -zxvf - -C /opt/drill EXPOSE 8047 -ENV DRILL_HOME /opt/drill/apache-drill-1.10.0 +ENV DRILL_HOME /opt/drill/apache-drill-${DRILL_VERSION} ENV DRILL_LOG_DIR ${DRILL_HOME}/log/ ENV DRILL_LOG_PREFIX ${DRILL_LOG_PATH}/drill @@ -23,8 +25,8 @@ ENTRYPOINT ["java",\ "-Ddrill.exec.enable-epoll=false",\ "-XX:+CMSClassUnloadingEnabled",\ "-XX:+UseG1GC",\ - "-Dlog.path=/opt/drill/apache-drill-1.10.0/log/drillbit.log",\ - "-Dlog.query.path=/opt/drill/apache-drill-1.10.0/log/drillbit_queries.json",\ + "-Dlog.path=/opt/drill/apache-drill-${DRILL_VERSION}/log/drillbit.log",\ + "-Dlog.query.path=/opt/drill/apache-drill-${DRILL_VERSION}/log/drillbit_queries.json",\ "-cp",\ - "/opt/drill/apache-drill-1.10.0/conf:/opt/drill/apache-drill-1.10.0/jars/*:/opt/drill/apache-drill-1.10.0/jars/ext/*:/opt/drill/apache-drill-1.10.0/jars/3rdparty/*:/opt/drill/apache-drill-1.10.0/jars/classb/*",\ + "/opt/drill/apache-drill-${DRILL_VERSION}/conf:/opt/drill/apache-drill-${DRILL_VERSION}/jars/*:/opt/drill/apache-drill-${DRILL_VERSION}/jars/ext/*:/opt/drill/apache-drill-${DRILL_VERSION}/jars/3rdparty/*:/opt/drill/apache-drill-${DRILL_VERSION}/jars/classb/*",\ "org.apache.drill.exec.server.Drillbit"] diff --git a/integration/integration_test.go b/integration/integration_test.go index 0e2f686..c2f11da 100644 --- a/integration/integration_test.go +++ b/integration/integration_test.go @@ -15,7 +15,6 @@ package integration import ( - "net" "testing" "time" @@ -557,26 +556,6 @@ func TestCreateInvalidPath(t *testing.T) { }) } -func TestRUOK(t *testing.T) { - zkclus := newZKCluster(t) - defer zkclus.Close(t) - - conn, err := net.Dial("tcp", zkclus.Addr()) - if err != nil { - t.Fatal(err) - } - if _, err := conn.Write([]byte("ruok")); err != nil { - t.Fatal(err) - } - buf := make([]byte, 4) - if _, err := conn.Read(buf); err != nil { - t.Fatal(err) - } - if string(buf) != "imok" { - t.Fatalf(`expected "imok", got %q`, string(buf)) - } -} - func TestMultiOp(t *testing.T) { runTest(t, testMultiOp) } func testMultiOp(t *testing.T, c *zk.Conn) { @@ -626,8 +605,8 @@ func testMultiOp(t *testing.T, c *zk.Conn) { ops = []interface{}{ &zk.CreateRequest{Path: "/foo", Data: []byte("foo"), Acl: acl}, } - if _, err := c.Multi(ops...); err == nil || err.Error() != zetcd.ErrAPIError.Error() { - t.Fatalf("expected %v, got %v", zetcd.ErrAPIError, err) + if _, err := c.Multi(ops...); err == nil || err.Error() != zetcd.ErrNodeExists.Error() { + t.Fatalf("expected %v, got %v", zetcd.ErrNodeExists, err) } // test create+delete on same key == no key ops = []interface{}{ @@ -653,8 +632,8 @@ func testMultiOp(t *testing.T, c *zk.Conn) { &zk.CheckVersionRequest{Path: "/foo", Version: 2}, } _, err = c.Multi(ops...) - if err == nil || err.Error() != zetcd.ErrAPIError.Error() { - t.Fatalf("expected %v, got %v", zetcd.ErrAPIError, err) + if err == nil || err.Error() != zetcd.ErrBadVersion.Error() { + t.Fatalf("expected %v, got %v", zetcd.ErrBadVersion, err) } if _, s1, err = c.Get("/test1"); err == nil || err.Error() != zetcd.ErrNoNode.Error() { t.Fatalf("expected %v, got (%v,%v)", zetcd.ErrNoNode, s1, err) @@ -681,8 +660,8 @@ func testMultiOp(t *testing.T, c *zk.Conn) { ops = []interface{}{ &zk.CheckVersionRequest{Path: "/missing-key", Version: 0}, } - if _, err = c.Multi(ops...); err == nil || err.Error() != zetcd.ErrAPIError.Error() { - t.Fatalf("expected %v, got %v", zetcd.ErrAPIError, err) + if _, err = c.Multi(ops...); err == nil || err.Error() != zetcd.ErrNoNode.Error() { + t.Fatalf("expected %v, got %v", zetcd.ErrNoNode, err) } // test empty operation list if resp, err = c.Multi(); err != nil || len(resp) != 0 { @@ -705,6 +684,23 @@ func testMultiOp(t *testing.T, c *zk.Conn) { if s1.Mzxid != s2.Mzxid { t.Fatalf("expected zxids in %+v to match %+v", *s1, *s2) } + // test partial success + ops = []interface{}{ + &zk.CheckVersionRequest{Path: "/test2", Version: 0}, + &zk.CreateRequest{Path: "/foo", Data: []byte("foo"), Acl: acl}, + } + if resp, err = c.Multi(ops...); err == nil || err.Error() != zetcd.ErrNodeExists.Error() { + t.Fatalf("expected %v, got %v", zetcd.ErrNodeExists, err) + } + if len(resp) != 2 { + t.Fatalf("expected %d results, got %d", 2, len(resp)) + } + if resp[0].Error != nil { + t.Fatalf("expected checkop error to be nil, got %v", resp[0].Error) + } + if resp[1].Error == nil || resp[1].Error.Error() != zetcd.ErrNodeExists.Error() { + t.Fatalf("expected createop error to be %v, got %v", zetcd.ErrNodeExists.Error(), resp[1].Error) + } } func runTest(t *testing.T, f func(*testing.T, *zk.Conn)) { diff --git a/integration/integration_zetcd_test.go b/integration/integration_zetcd_test.go new file mode 100644 index 0000000..2c3b579 --- /dev/null +++ b/integration/integration_zetcd_test.go @@ -0,0 +1,28 @@ +// +build !zkdocker,!xchk + +package integration + +import ( + "net" + "testing" +) + +func TestRUOK(t *testing.T) { + zkclus := newZKCluster(t) + defer zkclus.Close(t) + + conn, err := net.Dial("tcp", zkclus.Addr()) + if err != nil { + t.Fatal(err) + } + if _, err := conn.Write([]byte("ruok")); err != nil { + t.Fatal(err) + } + buf := make([]byte, 4) + if _, err := conn.Read(buf); err != nil { + t.Fatal(err) + } + if string(buf) != "imok" { + t.Fatalf(`expected "imok", got %q`, string(buf)) + } +} diff --git a/integration/kafka/Dockerfile b/integration/kafka/Dockerfile index c8fce83..232c445 100644 --- a/integration/kafka/Dockerfile +++ b/integration/kafka/Dockerfile @@ -8,7 +8,7 @@ RUN apt-get install -y wget supervisor dnsutils RUN rm -rf /var/lib/apt/lists/*; apt-get clean ENV SCALA_VERSION 2.11 -ENV KAFKA_VERSION 0.11.0.0 +ENV KAFKA_VERSION 0.11.0.3 RUN wget -q http://www-us.apache.org/dist/kafka/"$KAFKA_VERSION"/kafka_"$SCALA_VERSION"-"$KAFKA_VERSION".tgz -O /tmp/kafka_"$SCALA_VERSION"-"$KAFKA_VERSION".tgz RUN tar xfz /tmp/kafka_"$SCALA_VERSION"-"$KAFKA_VERSION".tgz -C /opt && rm /tmp/kafka_"$SCALA_VERSION"-"$KAFKA_VERSION".tgz && mv /opt/kafka_"$SCALA_VERSION"-"$KAFKA_VERSION" /kafka # 9092 is kafka port @@ -16,4 +16,5 @@ EXPOSE 9092 COPY kafka/ /kafka/config/ ADD kafka/run.sh /kafka/run.sh +RUN chmod uga+x /kafka/run.sh ENTRYPOINT [ "/bin/bash", "/kafka/run.sh" ] diff --git a/integration/kafka/kafka.chroot.properties b/integration/kafka/kafka.chroot.properties index 38e923b..c5736c7 100644 --- a/integration/kafka/kafka.chroot.properties +++ b/integration/kafka/kafka.chroot.properties @@ -15,8 +15,7 @@ transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 -# NOTE: assumes port 30001 for cross-checking configuration -zookeeper.connect=172.17.0.1:30001/kafka-chroot +zookeeper.connect=172.17.0.1:2181/kafka-chroot zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 auto.create.topics.enable=true diff --git a/integration/kafka/kafka.server.properties b/integration/kafka/kafka.server.properties index f8c1a0b..8b902f2 100644 --- a/integration/kafka/kafka.server.properties +++ b/integration/kafka/kafka.server.properties @@ -15,8 +15,7 @@ transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 -# NOTE: assumes port 30001 for cross-checking configuration -zookeeper.connect=172.17.0.1:30001 +zookeeper.connect=172.17.0.1:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 auto.create.topics.enable=true diff --git a/server.go b/server.go index e079534..baba6af 100644 --- a/server.go +++ b/server.go @@ -76,7 +76,7 @@ func serveRequest(s Session, zke ZK, zkreq ZKRequest) error { } zkresp := DispatchZK(zke, zkreq.xid, zkreq.req) if zkresp.Err != nil { - glog.V(9).Infof("dispatch error", zkresp.Err) + glog.V(9).Infof("dispatch error %v", zkresp.Err) return zkresp.Err } if zkresp.Hdr.Err == 0 { diff --git a/xchk/zk.go b/xchk/zk.go index a9b7bce..4011c6f 100644 --- a/xchk/zk.go +++ b/xchk/zk.go @@ -245,7 +245,13 @@ func (xchk *zkXchk) GetChildren2(xid zetcd.Xid, op *zetcd.GetChildren2Request) z return or } -func (xchk *zkXchk) Multi(xid zetcd.Xid, op *zetcd.MultiRequest) zetcd.ZKResponse { panic("wut") } +func (xchk *zkXchk) Multi(xid zetcd.Xid, op *zetcd.MultiRequest) zetcd.ZKResponse { + cf := func() zetcd.ZKResponse { return xchk.cZK.Multi(xid, op) } + of := func() zetcd.ZKResponse { return xchk.oZK.Multi(xid, op) } + cr, or, err := xchk.xchkResp(cf, of) + defer func() { xchk.reportErr(cr, or, err) }() + return or +} func (xchk *zkXchk) Close(xid zetcd.Xid, op *zetcd.CloseRequest) zetcd.ZKResponse { cf := func() zetcd.ZKResponse { return xchk.cZK.Close(xid, op) } diff --git a/zketcd.go b/zketcd.go index d396ac9..9f979c0 100644 --- a/zketcd.go +++ b/zketcd.go @@ -470,16 +470,31 @@ func (z *zkEtcd) Sync(xid Xid, op *SyncRequest) ZKResponse { func (z *zkEtcd) Multi(xid Xid, mreq *MultiRequest) ZKResponse { bs := make([]opBundle, len(mreq.Ops)) + mresp := &MultiResponse{ + Ops: make([]MultiResponseOp, len(mreq.Ops)), + DoneHeader: MultiHeader{Type: opMulti}, + } + if len(mreq.Ops) == 0 { + zxid, zerr := z.incrementAndGetZxid() + if zerr != nil { + return mkErr(zerr) + } + return mkZKResp(xid, zxid, mresp) + } for i, op := range mreq.Ops { switch req := op.Op.(type) { case *CreateRequest: bs[i] = z.mkCreateTxnOp(req) + mresp.Ops[i].Header.Type = opCreate case *DeleteRequest: bs[i] = z.mkDeleteTxnOp(req) + mresp.Ops[i].Header.Type = opDelete case *SetDataRequest: bs[i] = z.mkSetDataTxnOp(req) + mresp.Ops[i].Header.Type = opSetData case *CheckVersionRequest: bs[i] = z.mkCheckVersionPathTxnOp(req) + mresp.Ops[i].Header.Type = opCheck default: panic(fmt.Sprintf("unknown multi %+v %T", op.Op, op.Op)) } @@ -491,8 +506,13 @@ func (z *zkEtcd) Multi(xid Xid, mreq *MultiRequest) ZKResponse { } apply := func(s v3sync.STM) error { - for _, b := range bs { + for i, b := range bs { if err := b.apply(s); err != nil { + var ok bool + mresp.Ops[i].Header.Type = opError + if mresp.Ops[i].Header.Err, ok = errorToErrCode[err]; !ok { + mresp.Ops[i].Header.Err = errAPIError + } return err } } @@ -500,52 +520,29 @@ func (z *zkEtcd) Multi(xid Xid, mreq *MultiRequest) ZKResponse { } reply := func(xid Xid, zxid ZXid) ZKResponse { - ops := make([]MultiResponseOp, len(bs)) for i, b := range bs { resp := b.reply(xid, zxid) - ops[i].Header = MultiHeader{Err: 0} switch r := resp.Resp.(type) { case *CreateResponse: - ops[i].Header.Type = opCreate - ops[i].String = r.Path + mresp.Ops[i].String = r.Path case *SetDataResponse: - ops[i].Header.Type = opSetData - ops[i].Stat = &r.Stat - case *DeleteResponse: - ops[i].Header.Type = opDelete - case *struct{}: - ops[i].Header.Type = opCheck - default: - panic(fmt.Sprintf("unknown multi %+v %T", resp, resp)) + mresp.Ops[i].Stat = &r.Stat } } - mresp := &MultiResponse{ - Ops: ops, - DoneHeader: MultiHeader{Type: opMulti}, - } return mkZKResp(xid, zxid, mresp) } - resp, err := z.doSTM(apply, prefetch...) + resp, _ := z.doSTM(apply, prefetch...) if resp == nil { - // txn aborted, possibly due to any API error - if _, ok := errorToErrCode[err]; !ok { - // aborted due to non-API error - return mkErr(err) - } zxid, zerr := z.incrementAndGetZxid() if zerr != nil { return mkErr(zerr) } - // zkdocker seems to always return API error... - zkresp := apiErrToZKErr(xid, zxid, err) - zkresp.Hdr.Err = errAPIError - return zkresp + return reply(xid, zxid) } - mresp := reply(xid, ZXid(resp.Header.Revision)) - glog.V(7).Infof("Multi(%v) = (zxid=%v); txnresp: %+v", *mreq, resp.Header.Revision, *resp) - return mresp + glog.V(7).Infof("Multi(%v) = (zxid=%v); txnresp: %+v\n", *mreq, resp.Header.Revision, *resp) + return reply(xid, ZXid(resp.Header.Revision)) } func (z *zkEtcd) mkCheckVersionPathTxnOp(op *CheckVersionRequest) opBundle {