Skip to content

Commit

Permalink
move serialization methods into TaskMessage class
Browse files Browse the repository at this point in the history
  • Loading branch information
afeng committed Mar 30, 2013
1 parent 8e57a6c commit 08ee9c4
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 20 deletions.
18 changes: 18 additions & 0 deletions src/jvm/backtype/storm/messaging/TaskMessage.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package backtype.storm.messaging;

import java.nio.ByteBuffer;

public class TaskMessage {
private int _task;
private byte[] _message;
Expand All @@ -16,4 +18,20 @@ public int task() {
public byte[] message() {
return _message;
}

public byte[] serialize() {
ByteBuffer bb = ByteBuffer.allocate(_message.length+2);
bb.putShort((short)_task);
bb.put(_message);
return bb.array();
}

public void deserialize(byte[] packet) {
if (packet==null) return;
ByteBuffer bb = ByteBuffer.wrap(packet);
_task = bb.getShort();
_message = new byte[packet.length-2];
bb.get(_message);
}

}
25 changes: 5 additions & 20 deletions src/jvm/backtype/storm/messaging/zmq/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,29 +31,14 @@ public void close() {
public TaskMessage recv(int flags) {
LOG.debug("zmq.Connection:recv()");
byte[] packet = socket.recv(flags);
return parsePacket(packet);
TaskMessage message = new TaskMessage(0, null);
message.deserialize(packet);
return message;
}

public void send(int taskId, byte[] payload) {
LOG.debug("zmq.Connection:send()");
byte[] packet = mkPacket(new TaskMessage(taskId, payload));
byte[] packet = new TaskMessage(taskId, payload).serialize();
socket.send(packet, ZMQ.NOBLOCK);
}

private byte[] mkPacket(TaskMessage message) {
byte[] payload = message.message();
ByteBuffer bb = ByteBuffer.allocate(payload.length+2);
bb.putShort((short)message.task());
bb.put(payload);
return bb.array();
}

private TaskMessage parsePacket(byte[] packet) {
if (packet==null) return null;
ByteBuffer bb = ByteBuffer.wrap(packet);
int task = bb.getShort();
byte[] payload = new byte[packet.length-2];
bb.get(payload);
return new TaskMessage(task,payload);
}
}
}

0 comments on commit 08ee9c4

Please sign in to comment.