Skip to content
This repository has been archived by the owner on Oct 7, 2023. It is now read-only.

Issue98fix #102

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
)

const (
opError = -1
opCreate = 1
opDelete = 2
opExists = 3
Expand Down
4 changes: 4 additions & 0 deletions encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,8 @@ func (r *MultiResponse) Encode(buf []byte) (int, error) {
n, err = encodePacketValue(buf[total:], reflect.ValueOf(op.String))
case opSetData:
n, err = encodePacketValue(buf[total:], reflect.ValueOf(op.Stat))
case opError:
n, err = encodePacketValue(buf[total:], reflect.ValueOf(&op.Header.Err))
}
total += n
if err != nil {
Expand Down Expand Up @@ -371,6 +373,8 @@ func (r *MultiResponse) Decode(buf []byte) (int, error) {
res.Stat = new(Stat)
w = reflect.ValueOf(res.Stat)
case opCheck, opDelete:
case opError:
w = reflect.ValueOf(&res.Header.Err)
}
if w.IsValid() {
n, err := decodePacketValue(buf[total:], w)
Expand Down
2 changes: 1 addition & 1 deletion integration/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func NewContainer(containerName, dockerFile string, ports []string) (*Container,
}

func newContainerFiles(cfg ContainerConfig) (c *Container, err error) {
dc, err := docker.NewClient("unix://var/run/docker.sock")
dc, err := docker.NewClient("unix:///var/run/docker.sock")
if err != nil {
return nil, err
}
Expand Down
12 changes: 7 additions & 5 deletions integration/drill/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
FROM java:openjdk-8-jdk
# needs jdk to submit sql queries!?

ENV DRILL_VERSION 1.15.0

RUN mkdir -p /opt/drill && \
wget -q -O - http://www-us.apache.org/dist/drill/drill-1.10.0/apache-drill-1.10.0.tar.gz | tar -zxvf - -C /opt/drill
wget -q -O - http://www-us.apache.org/dist/drill/drill-${DRILL_VERSION}/apache-drill-${DRILL_VERSION}.tar.gz | tar -zxvf - -C /opt/drill

EXPOSE 8047

ENV DRILL_HOME /opt/drill/apache-drill-1.10.0
ENV DRILL_HOME /opt/drill/apache-drill-${DRILL_VERSION}
ENV DRILL_LOG_DIR ${DRILL_HOME}/log/
ENV DRILL_LOG_PREFIX ${DRILL_LOG_PATH}/drill

Expand All @@ -23,8 +25,8 @@ ENTRYPOINT ["java",\
"-Ddrill.exec.enable-epoll=false",\
"-XX:+CMSClassUnloadingEnabled",\
"-XX:+UseG1GC",\
"-Dlog.path=/opt/drill/apache-drill-1.10.0/log/drillbit.log",\
"-Dlog.query.path=/opt/drill/apache-drill-1.10.0/log/drillbit_queries.json",\
"-Dlog.path=/opt/drill/apache-drill-${DRILL_VERSION}/log/drillbit.log",\
"-Dlog.query.path=/opt/drill/apache-drill-${DRILL_VERSION}/log/drillbit_queries.json",\
"-cp",\
"/opt/drill/apache-drill-1.10.0/conf:/opt/drill/apache-drill-1.10.0/jars/*:/opt/drill/apache-drill-1.10.0/jars/ext/*:/opt/drill/apache-drill-1.10.0/jars/3rdparty/*:/opt/drill/apache-drill-1.10.0/jars/classb/*",\
"/opt/drill/apache-drill-${DRILL_VERSION}/conf:/opt/drill/apache-drill-${DRILL_VERSION}/jars/*:/opt/drill/apache-drill-${DRILL_VERSION}/jars/ext/*:/opt/drill/apache-drill-${DRILL_VERSION}/jars/3rdparty/*:/opt/drill/apache-drill-${DRILL_VERSION}/jars/classb/*",\
"org.apache.drill.exec.server.Drillbit"]
50 changes: 23 additions & 27 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package integration

import (
"net"
"testing"
"time"

Expand Down Expand Up @@ -557,26 +556,6 @@ func TestCreateInvalidPath(t *testing.T) {
})
}

func TestRUOK(t *testing.T) {
zkclus := newZKCluster(t)
defer zkclus.Close(t)

conn, err := net.Dial("tcp", zkclus.Addr())
if err != nil {
t.Fatal(err)
}
if _, err := conn.Write([]byte("ruok")); err != nil {
t.Fatal(err)
}
buf := make([]byte, 4)
if _, err := conn.Read(buf); err != nil {
t.Fatal(err)
}
if string(buf) != "imok" {
t.Fatalf(`expected "imok", got %q`, string(buf))
}
}

func TestMultiOp(t *testing.T) { runTest(t, testMultiOp) }

func testMultiOp(t *testing.T, c *zk.Conn) {
Expand Down Expand Up @@ -626,8 +605,8 @@ func testMultiOp(t *testing.T, c *zk.Conn) {
ops = []interface{}{
&zk.CreateRequest{Path: "/foo", Data: []byte("foo"), Acl: acl},
}
if _, err := c.Multi(ops...); err == nil || err.Error() != zetcd.ErrAPIError.Error() {
t.Fatalf("expected %v, got %v", zetcd.ErrAPIError, err)
if _, err := c.Multi(ops...); err == nil || err.Error() != zetcd.ErrNodeExists.Error() {
t.Fatalf("expected %v, got %v", zetcd.ErrNodeExists, err)
}
// test create+delete on same key == no key
ops = []interface{}{
Expand All @@ -653,8 +632,8 @@ func testMultiOp(t *testing.T, c *zk.Conn) {
&zk.CheckVersionRequest{Path: "/foo", Version: 2},
}
_, err = c.Multi(ops...)
if err == nil || err.Error() != zetcd.ErrAPIError.Error() {
t.Fatalf("expected %v, got %v", zetcd.ErrAPIError, err)
if err == nil || err.Error() != zetcd.ErrBadVersion.Error() {
t.Fatalf("expected %v, got %v", zetcd.ErrBadVersion, err)
}
if _, s1, err = c.Get("/test1"); err == nil || err.Error() != zetcd.ErrNoNode.Error() {
t.Fatalf("expected %v, got (%v,%v)", zetcd.ErrNoNode, s1, err)
Expand All @@ -681,8 +660,8 @@ func testMultiOp(t *testing.T, c *zk.Conn) {
ops = []interface{}{
&zk.CheckVersionRequest{Path: "/missing-key", Version: 0},
}
if _, err = c.Multi(ops...); err == nil || err.Error() != zetcd.ErrAPIError.Error() {
t.Fatalf("expected %v, got %v", zetcd.ErrAPIError, err)
if _, err = c.Multi(ops...); err == nil || err.Error() != zetcd.ErrNoNode.Error() {
t.Fatalf("expected %v, got %v", zetcd.ErrNoNode, err)
}
// test empty operation list
if resp, err = c.Multi(); err != nil || len(resp) != 0 {
Expand All @@ -705,6 +684,23 @@ func testMultiOp(t *testing.T, c *zk.Conn) {
if s1.Mzxid != s2.Mzxid {
t.Fatalf("expected zxids in %+v to match %+v", *s1, *s2)
}
// test partial success
ops = []interface{}{
&zk.CheckVersionRequest{Path: "/test2", Version: 0},
&zk.CreateRequest{Path: "/foo", Data: []byte("foo"), Acl: acl},
}
if resp, err = c.Multi(ops...); err == nil || err.Error() != zetcd.ErrNodeExists.Error() {
t.Fatalf("expected %v, got %v", zetcd.ErrNodeExists, err)
}
if len(resp) != 2 {
t.Fatalf("expected %d results, got %d", 2, len(resp))
}
if resp[0].Error != nil {
t.Fatalf("expected checkop error to be nil, got %v", resp[0].Error)
}
if resp[1].Error == nil || resp[1].Error.Error() != zetcd.ErrNodeExists.Error() {
t.Fatalf("expected createop error to be %v, got %v", zetcd.ErrNodeExists.Error(), resp[1].Error)
}
}

func runTest(t *testing.T, f func(*testing.T, *zk.Conn)) {
Expand Down
28 changes: 28 additions & 0 deletions integration/integration_zetcd_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// +build !zkdocker,!xchk

package integration

import (
"net"
"testing"
)

func TestRUOK(t *testing.T) {
zkclus := newZKCluster(t)
defer zkclus.Close(t)

conn, err := net.Dial("tcp", zkclus.Addr())
if err != nil {
t.Fatal(err)
}
if _, err := conn.Write([]byte("ruok")); err != nil {
t.Fatal(err)
}
buf := make([]byte, 4)
if _, err := conn.Read(buf); err != nil {
t.Fatal(err)
}
if string(buf) != "imok" {
t.Fatalf(`expected "imok", got %q`, string(buf))
}
}
3 changes: 2 additions & 1 deletion integration/kafka/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ RUN apt-get install -y wget supervisor dnsutils
RUN rm -rf /var/lib/apt/lists/*; apt-get clean

ENV SCALA_VERSION 2.11
ENV KAFKA_VERSION 0.11.0.0
ENV KAFKA_VERSION 0.11.0.3
RUN wget -q http://www-us.apache.org/dist/kafka/"$KAFKA_VERSION"/kafka_"$SCALA_VERSION"-"$KAFKA_VERSION".tgz -O /tmp/kafka_"$SCALA_VERSION"-"$KAFKA_VERSION".tgz
RUN tar xfz /tmp/kafka_"$SCALA_VERSION"-"$KAFKA_VERSION".tgz -C /opt && rm /tmp/kafka_"$SCALA_VERSION"-"$KAFKA_VERSION".tgz && mv /opt/kafka_"$SCALA_VERSION"-"$KAFKA_VERSION" /kafka
# 9092 is kafka port
EXPOSE 9092

COPY kafka/ /kafka/config/
ADD kafka/run.sh /kafka/run.sh
RUN chmod uga+x /kafka/run.sh
ENTRYPOINT [ "/bin/bash", "/kafka/run.sh" ]
3 changes: 1 addition & 2 deletions integration/kafka/kafka.chroot.properties
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
# NOTE: assumes port 30001 for cross-checking configuration
zookeeper.connect=172.17.0.1:30001/kafka-chroot
zookeeper.connect=172.17.0.1:2181/kafka-chroot
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
auto.create.topics.enable=true
3 changes: 1 addition & 2 deletions integration/kafka/kafka.server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
# NOTE: assumes port 30001 for cross-checking configuration
zookeeper.connect=172.17.0.1:30001
zookeeper.connect=172.17.0.1:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
auto.create.topics.enable=true
2 changes: 1 addition & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func serveRequest(s Session, zke ZK, zkreq ZKRequest) error {
}
zkresp := DispatchZK(zke, zkreq.xid, zkreq.req)
if zkresp.Err != nil {
glog.V(9).Infof("dispatch error", zkresp.Err)
glog.V(9).Infof("dispatch error %v", zkresp.Err)
return zkresp.Err
}
if zkresp.Hdr.Err == 0 {
Expand Down
8 changes: 7 additions & 1 deletion xchk/zk.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,13 @@ func (xchk *zkXchk) GetChildren2(xid zetcd.Xid, op *zetcd.GetChildren2Request) z
return or
}

func (xchk *zkXchk) Multi(xid zetcd.Xid, op *zetcd.MultiRequest) zetcd.ZKResponse { panic("wut") }
func (xchk *zkXchk) Multi(xid zetcd.Xid, op *zetcd.MultiRequest) zetcd.ZKResponse {
cf := func() zetcd.ZKResponse { return xchk.cZK.Multi(xid, op) }
of := func() zetcd.ZKResponse { return xchk.oZK.Multi(xid, op) }
cr, or, err := xchk.xchkResp(cf, of)
defer func() { xchk.reportErr(cr, or, err) }()
return or
}

func (xchk *zkXchk) Close(xid zetcd.Xid, op *zetcd.CloseRequest) zetcd.ZKResponse {
cf := func() zetcd.ZKResponse { return xchk.cZK.Close(xid, op) }
Expand Down
57 changes: 27 additions & 30 deletions zketcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,16 +470,31 @@ func (z *zkEtcd) Sync(xid Xid, op *SyncRequest) ZKResponse {

func (z *zkEtcd) Multi(xid Xid, mreq *MultiRequest) ZKResponse {
bs := make([]opBundle, len(mreq.Ops))
mresp := &MultiResponse{
Ops: make([]MultiResponseOp, len(mreq.Ops)),
DoneHeader: MultiHeader{Type: opMulti},
}
if len(mreq.Ops) == 0 {
zxid, zerr := z.incrementAndGetZxid()
if zerr != nil {
return mkErr(zerr)
}
return mkZKResp(xid, zxid, mresp)
}
for i, op := range mreq.Ops {
switch req := op.Op.(type) {
case *CreateRequest:
bs[i] = z.mkCreateTxnOp(req)
mresp.Ops[i].Header.Type = opCreate
case *DeleteRequest:
bs[i] = z.mkDeleteTxnOp(req)
mresp.Ops[i].Header.Type = opDelete
case *SetDataRequest:
bs[i] = z.mkSetDataTxnOp(req)
mresp.Ops[i].Header.Type = opSetData
case *CheckVersionRequest:
bs[i] = z.mkCheckVersionPathTxnOp(req)
mresp.Ops[i].Header.Type = opCheck
default:
panic(fmt.Sprintf("unknown multi %+v %T", op.Op, op.Op))
}
Expand All @@ -491,61 +506,43 @@ func (z *zkEtcd) Multi(xid Xid, mreq *MultiRequest) ZKResponse {
}

apply := func(s v3sync.STM) error {
for _, b := range bs {
for i, b := range bs {
if err := b.apply(s); err != nil {
var ok bool
mresp.Ops[i].Header.Type = opError
if mresp.Ops[i].Header.Err, ok = errorToErrCode[err]; !ok {
mresp.Ops[i].Header.Err = errAPIError
}
return err
}
}
return nil
}

reply := func(xid Xid, zxid ZXid) ZKResponse {
ops := make([]MultiResponseOp, len(bs))
for i, b := range bs {
resp := b.reply(xid, zxid)
ops[i].Header = MultiHeader{Err: 0}
switch r := resp.Resp.(type) {
case *CreateResponse:
ops[i].Header.Type = opCreate
ops[i].String = r.Path
mresp.Ops[i].String = r.Path
case *SetDataResponse:
ops[i].Header.Type = opSetData
ops[i].Stat = &r.Stat
case *DeleteResponse:
ops[i].Header.Type = opDelete
case *struct{}:
ops[i].Header.Type = opCheck
default:
panic(fmt.Sprintf("unknown multi %+v %T", resp, resp))
mresp.Ops[i].Stat = &r.Stat
}
}
mresp := &MultiResponse{
Ops: ops,
DoneHeader: MultiHeader{Type: opMulti},
}
return mkZKResp(xid, zxid, mresp)
}

resp, err := z.doSTM(apply, prefetch...)
resp, _ := z.doSTM(apply, prefetch...)
if resp == nil {
// txn aborted, possibly due to any API error
if _, ok := errorToErrCode[err]; !ok {
// aborted due to non-API error
return mkErr(err)
}
zxid, zerr := z.incrementAndGetZxid()
if zerr != nil {
return mkErr(zerr)
}
// zkdocker seems to always return API error...
zkresp := apiErrToZKErr(xid, zxid, err)
zkresp.Hdr.Err = errAPIError
return zkresp
return reply(xid, zxid)
}

mresp := reply(xid, ZXid(resp.Header.Revision))
glog.V(7).Infof("Multi(%v) = (zxid=%v); txnresp: %+v", *mreq, resp.Header.Revision, *resp)
return mresp
glog.V(7).Infof("Multi(%v) = (zxid=%v); txnresp: %+v\n", *mreq, resp.Header.Revision, *resp)
return reply(xid, ZXid(resp.Header.Revision))
}

func (z *zkEtcd) mkCheckVersionPathTxnOp(op *CheckVersionRequest) opBundle {
Expand Down