Skip to content

Commit

Permalink
Merge pull request zeromq#32 from hintjens/master
Browse files Browse the repository at this point in the history
Implementing service pattern
  • Loading branch information
jemc committed Dec 6, 2014
2 parents ccda23b + 5dd135e commit 4f9d421
Show file tree
Hide file tree
Showing 11 changed files with 591 additions and 116 deletions.
11 changes: 11 additions & 0 deletions include/mlm_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ MLM_EXPORT void
MLM_EXPORT void
mlm_client_verbose (mlm_client_t *self);

// Return actor, when caller wants to work with multiple actors and/or
// input sockets asynchronously.
MLM_EXPORT zactor_t *
mlm_client_actor (mlm_client_t *self);

// Return message pipe for asynchronous message I/O. In the high-volume case,
// we send methods and get replies to the actor, in a synchronous manner, and
// we send/recv high volume message data to a second pipe, the msgpipe. In
Expand All @@ -72,6 +77,12 @@ MLM_EXPORT int
MLM_EXPORT int
mlm_client_consume (mlm_client_t *self, const char *stream, const char *pattern);

// Offer a particular named service, where the pattern matches request subjects
// using the CZMQ zrex syntax.
// Returns >= 0 if successful, -1 if interrupted.
MLM_EXPORT int
mlm_client_provide (mlm_client_t *self, const char *service, const char *pattern);

// Send STREAM SEND message to server
MLM_EXPORT int
mlm_client_stream_send (mlm_client_t *self, char *subject, zmsg_t **content_p);
Expand Down
2 changes: 1 addition & 1 deletion include/mlm_msg.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ discards it and returns CONFIRM with a TIMEOUT-EXPIRED status.
SERVICE_OFFER - Worker client offers a named service, specifying a pattern to match
message subjects. An empty pattern matches anything. A worker can offer
many different services at once. Server does not reply to this message.
many different services at once. Server replies with OK or ERROR.
service string Service name
pattern string Match message subjects
Expand Down
106 changes: 95 additions & 11 deletions src/mlm_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -107,31 +107,45 @@ use_heartbeat_timer (client_t *self)
}



// ---------------------------------------------------------------------------
// prepare_for_stream_write
// prepare_stream_write_command
//

static void
prepare_for_stream_write (client_t *self)
prepare_stream_write_command (client_t *self)
{
mlm_msg_set_stream (self->message, self->args->stream);
}


// ---------------------------------------------------------------------------
// prepare_for_stream_read
// prepare_stream_read_command
//

static void
prepare_for_stream_read (client_t *self)
prepare_stream_read_command (client_t *self)
{
mlm_msg_set_stream (self->message, self->args->stream);
mlm_msg_set_pattern (self->message, self->args->pattern);
}


// ---------------------------------------------------------------------------
// prepare_service_offer_command
//

static void
prepare_service_offer_command (client_t *self)
{
mlm_msg_set_service (self->message, self->args->service);
mlm_msg_set_pattern (self->message, self->args->pattern);
}


// ---------------------------------------------------------------------------
// pass_stream_message_to_app
// TODO: these methods could be generated automatically from the protocol
//

static void
Expand Down Expand Up @@ -163,6 +177,23 @@ pass_mailbox_message_to_app (client_t *self)
}


// ---------------------------------------------------------------------------
// pass_service_message_to_app
//

static void
pass_service_message_to_app (client_t *self)
{
zstr_sendm (self->msgpipe, "SERVICE DELIVER");
zsock_bsend (self->msgpipe, "ssssp",
mlm_msg_sender (self->message),
mlm_msg_service (self->message),
mlm_msg_subject (self->message),
mlm_msg_tracker (self->message),
mlm_msg_get_content (self->message));
}


// ---------------------------------------------------------------------------
// signal_success
//
Expand Down Expand Up @@ -238,7 +269,7 @@ mlm_client_test (bool verbose)
zstr_send (server, "VERBOSE");
zstr_sendx (server, "BIND", "ipc://@/malamute", NULL);

// Test stream access
// Test stream pattern
mlm_client_t *writer = mlm_client_new ("ipc://@/malamute", 500, "writer");
assert (writer);
if (verbose)
Expand Down Expand Up @@ -308,29 +339,82 @@ mlm_client_test (bool verbose)
zstr_free (&content);
zmsg_destroy (&msg);

// Test mailbox access
// Test mailbox pattern
msg = zmsg_new ();
zmsg_addstr (msg, "This is a multipart mailbox message");
zmsg_addstr (msg, "Message 1");
zmsg_addmem (msg, "attachment", sizeof ("attachment"));
mlm_client_mailbox_send (writer, "reader", "subject", "", 0, &msg);
mlm_client_mailbox_send (writer, "reader", "subject 1", "", 0, &msg);

msg = mlm_client_recv (reader);
assert (streq (mlm_client_command (reader), "MAILBOX DELIVER"));
assert (streq (mlm_client_subject (reader), "subject"));
assert (streq (mlm_client_subject (reader), "subject 1"));
assert (streq (mlm_client_sender (reader), "writer"));
content = zmsg_popstr (msg);
assert (streq (content, "This is a multipart mailbox message"));
assert (streq (content, "Message 1"));
zstr_free (&content);
content = zmsg_popstr (msg);
assert (streq (content, "attachment"));
zstr_free (&content);
zmsg_destroy (&msg);

// - connect, disconnect, send, send, connect, recv, recv
// Now test that mailbox survives reader disconnect
mlm_client_destroy (&reader);

msg = zmsg_new ();
zmsg_addstr (msg, "Message 2");
mlm_client_mailbox_send (writer, "reader", "subject 2", "", 0, &msg);

msg = zmsg_new ();
zmsg_addstr (msg, "Message 3");
mlm_client_mailbox_send (writer, "reader", "subject 3", "", 0, &msg);

reader = mlm_client_new ("ipc://@/malamute", 500, "reader");
assert (reader);
if (verbose)
mlm_client_verbose (reader);

msg = mlm_client_recv (reader);
assert (streq (mlm_client_command (reader), "MAILBOX DELIVER"));
assert (streq (mlm_client_subject (reader), "subject 2"));

msg = mlm_client_recv (reader);
assert (streq (mlm_client_command (reader), "MAILBOX DELIVER"));
assert (streq (mlm_client_subject (reader), "subject 3"));

// Test service pattern
mlm_client_provide (reader, "printer", "bw.*");
mlm_client_provide (reader, "printer", "color.*");

msg = zmsg_new ();
zmsg_addstr (msg, "Important contract");
mlm_client_service_send (writer, "printer", "bw.A4", "", 0, &msg);

msg = zmsg_new ();
zmsg_addstr (msg, "Special conditions");
mlm_client_service_send (writer, "printer", "bw.A5", "", 0, &msg);

msg = mlm_client_recv (reader);
assert (streq (mlm_client_command (reader), "SERVICE DELIVER"));
assert (streq (mlm_client_subject (reader), "bw.A4"));
assert (streq (mlm_client_sender (reader), "writer"));
content = zmsg_popstr (msg);
assert (streq (content, "Important contract"));
zstr_free (&content);
zmsg_destroy (&msg);

msg = mlm_client_recv (reader);
assert (streq (mlm_client_command (reader), "SERVICE DELIVER"));
assert (streq (mlm_client_subject (reader), "bw.A5"));
assert (streq (mlm_client_sender (reader), "writer"));
content = zmsg_popstr (msg);
assert (streq (content, "Special conditions"));
zstr_free (&content);
zmsg_destroy (&msg);

// Done, shut down
mlm_client_destroy (&reader);
mlm_client_destroy (&writer);

zactor_destroy (&server);
// @end
printf ("OK\n");
Expand Down
20 changes: 18 additions & 2 deletions src/mlm_client.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,17 @@

<state name = "connected" inherit = "defaults">
<event name = "produce" next = "confirming">
<action name = "prepare for stream write" />
<action name = "prepare stream write command" />
<action name = "send" message = "STREAM WRITE" />
</event>
<event name = "consume" next = "confirming">
<action name = "prepare for stream read" />
<action name = "prepare stream read command" />
<action name = "send" message = "STREAM READ" />
</event>
<event name = "provide" next = "confirming">
<action name = "prepare service offer command" />
<action name = "send" message = "SERVICE OFFER" />
</event>
<event name = "destructor" next = "disconnecting">
<action name = "send" message = "CONNECTION CLOSE" />
</event>
Expand All @@ -48,6 +52,9 @@
<event name = "MAILBOX DELIVER">
<action name = "pass mailbox message to app" />
</event>
<event name = "SERVICE DELIVER">
<action name = "pass service message to app" />
</event>
<event name = "expired">
<action name = "send" message = "CONNECTION PING" />
</event>
Expand Down Expand Up @@ -155,6 +162,15 @@
<accept reply = "FAILURE" />
</method>

<method name = "provide" return = "status">
Offer a particular named service, where the pattern matches request
subjects using the CZMQ zrex syntax.
<field name = "service" type = "string" />
<field name = "pattern" type = "string" />
<accept reply = "SUCCESS" />
<accept reply = "FAILURE" />
</method>

<!-- This defines the asynchronous send interface -->
<send>
<message name = "STREAM SEND" />
Expand Down
Loading

0 comments on commit 4f9d421

Please sign in to comment.