diff --git a/src/jvm/backtype/storm/messaging/TaskMessage.java b/src/jvm/backtype/storm/messaging/TaskMessage.java index 3bc9604ab..d4f08b7c2 100644 --- a/src/jvm/backtype/storm/messaging/TaskMessage.java +++ b/src/jvm/backtype/storm/messaging/TaskMessage.java @@ -1,5 +1,7 @@ package backtype.storm.messaging; +import java.nio.ByteBuffer; + public class TaskMessage { private int _task; private byte[] _message; @@ -16,4 +18,20 @@ public int task() { public byte[] message() { return _message; } + + public byte[] serialize() { + ByteBuffer bb = ByteBuffer.allocate(_message.length+2); + bb.putShort((short)_task); + bb.put(_message); + return bb.array(); + } + + public void deserialize(byte[] packet) { + if (packet==null) return; + ByteBuffer bb = ByteBuffer.wrap(packet); + _task = bb.getShort(); + _message = new byte[packet.length-2]; + bb.get(_message); + } + } diff --git a/src/jvm/backtype/storm/messaging/zmq/Connection.java b/src/jvm/backtype/storm/messaging/zmq/Connection.java index 424ff8cbf..54fa82bf1 100644 --- a/src/jvm/backtype/storm/messaging/zmq/Connection.java +++ b/src/jvm/backtype/storm/messaging/zmq/Connection.java @@ -31,29 +31,14 @@ public void close() { public TaskMessage recv(int flags) { LOG.debug("zmq.Connection:recv()"); byte[] packet = socket.recv(flags); - return parsePacket(packet); + TaskMessage message = new TaskMessage(0, null); + message.deserialize(packet); + return message; } public void send(int taskId, byte[] payload) { LOG.debug("zmq.Connection:send()"); - byte[] packet = mkPacket(new TaskMessage(taskId, payload)); + byte[] packet = new TaskMessage(taskId, payload).serialize(); socket.send(packet, ZMQ.NOBLOCK); - } - - private byte[] mkPacket(TaskMessage message) { - byte[] payload = message.message(); - ByteBuffer bb = ByteBuffer.allocate(payload.length+2); - bb.putShort((short)message.task()); - bb.put(payload); - return bb.array(); - } - - private TaskMessage parsePacket(byte[] packet) { - if (packet==null) return null; - ByteBuffer bb = ByteBuffer.wrap(packet); - int task = bb.getShort(); - byte[] payload = new byte[packet.length-2]; - bb.get(payload); - return new TaskMessage(task,payload); - } + } }