Skip to content

Commit

Permalink
Merge pull request zeromq#43 from hintjens/master
Browse files Browse the repository at this point in the history
Fixed issue zeromq#42 and other related items.
  • Loading branch information
jemc committed Dec 27, 2014
2 parents df0b45a + 4900a15 commit e4983cf
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 157 deletions.
242 changes: 121 additions & 121 deletions src/mlm_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -286,129 +286,129 @@ mlm_client_test (bool verbose)
zstr_sendx (auth, "PLAIN", "src/passwords.cfg", NULL);
zsock_wait (auth);

// // Test stream pattern
// mlm_client_t *writer = mlm_client_new ("ipc://@/malamute", 1000, "writer/secret");
// assert (writer);
// if (verbose)
// mlm_client_verbose (writer);
//
// mlm_client_t *reader = mlm_client_new ("ipc://@/malamute", 1000, "reader/secret");
// assert (reader);
// if (verbose)
// mlm_client_verbose (reader);
//
// mlm_client_set_producer (writer, "weather");
// mlm_client_set_consumer (reader, "weather", "temp.*");
//
// mlm_client_sendx (writer, "temp.moscow", "1", NULL);
// mlm_client_sendx (writer, "rain.moscow", "2", NULL);
// mlm_client_sendx (writer, "temp.madrid", "3", NULL);
// mlm_client_sendx (writer, "rain.madrid", "4", NULL);
// mlm_client_sendx (writer, "temp.london", "5", NULL);
// mlm_client_sendx (writer, "rain.london", "6", NULL);
//
// Test stream pattern
mlm_client_t *writer = mlm_client_new ("ipc://@/malamute", 1000, "writer/secret");
assert (writer);
if (verbose)
mlm_client_verbose (writer);

mlm_client_t *reader = mlm_client_new ("ipc://@/malamute", 1000, "reader/secret");
assert (reader);
if (verbose)
mlm_client_verbose (reader);

mlm_client_set_producer (writer, "weather");
mlm_client_set_consumer (reader, "weather", "temp.*");

mlm_client_sendx (writer, "temp.moscow", "1", NULL);
mlm_client_sendx (writer, "rain.moscow", "2", NULL);
mlm_client_sendx (writer, "temp.madrid", "3", NULL);
mlm_client_sendx (writer, "rain.madrid", "4", NULL);
mlm_client_sendx (writer, "temp.london", "5", NULL);
mlm_client_sendx (writer, "rain.london", "6", NULL);

char *subject, *content;
// mlm_client_recvx (reader, &subject, &content, NULL);
// assert (streq (subject, "temp.moscow"));
// assert (streq (content, "1"));
// assert (streq (mlm_client_command (reader), "STREAM DELIVER"));
// assert (streq (mlm_client_sender (reader), "writer"));
// zstr_free (&subject);
// zstr_free (&content);
//
// mlm_client_recvx (reader, &subject, &content, NULL);
// assert (streq (subject, "temp.madrid"));
// assert (streq (content, "3"));
// assert (streq (mlm_client_command (reader), "STREAM DELIVER"));
// assert (streq (mlm_client_subject (reader), "temp.madrid"));
// assert (streq (mlm_client_sender (reader), "writer"));
// zstr_free (&subject);
// zstr_free (&content);
//
// mlm_client_recvx (reader, &subject, &content, NULL);
// assert (streq (subject, "temp.london"));
// assert (streq (content, "5"));
// assert (streq (mlm_client_command (reader), "STREAM DELIVER"));
// assert (streq (mlm_client_sender (reader), "writer"));
// zstr_free (&subject);
// zstr_free (&content);
//
// // Test mailbox pattern
// mlm_client_sendtox (writer, "reader", "subject 1", "Message 1", "attachment", NULL);
//
// char *attach;
// mlm_client_recvx (reader, &subject, &content, &attach, NULL);
// assert (streq (subject, "subject 1"));
// assert (streq (content, "Message 1"));
// assert (streq (attach, "attachment"));
// assert (streq (mlm_client_command (reader), "MAILBOX DELIVER"));
// assert (streq (mlm_client_subject (reader), "subject 1"));
// assert (streq (mlm_client_sender (reader), "writer"));
// zstr_free (&subject);
// zstr_free (&content);
// zstr_free (&attach);
//
// // Now test that mailbox survives reader disconnect
// mlm_client_destroy (&reader);
// mlm_client_sendtox (writer, "reader", "subject 2", "Message 2", NULL);
// mlm_client_sendtox (writer, "reader", "subject 3", "Message 3", NULL);
//
// reader = mlm_client_new ("ipc://@/malamute", 500, "reader/secret");
// assert (reader);
// if (verbose)
// mlm_client_verbose (reader);
//
// mlm_client_recvx (reader, &subject, &content, &attach, NULL);
// assert (streq (subject, "subject 2"));
// assert (streq (content, "Message 2"));
// assert (streq (mlm_client_command (reader), "MAILBOX DELIVER"));
// zstr_free (&subject);
// zstr_free (&content);
//
// mlm_client_recvx (reader, &subject, &content, &attach, NULL);
// assert (streq (subject, "subject 3"));
// assert (streq (content, "Message 3"));
// assert (streq (mlm_client_command (reader), "MAILBOX DELIVER"));
// zstr_free (&subject);
// zstr_free (&content);
//
// // Test service pattern
// mlm_client_set_worker (reader, "printer", "bw.*");
// mlm_client_set_worker (reader, "printer", "color.*");
//
// mlm_client_sendforx (writer, "printer", "bw.A4", "Important contract", NULL);
// mlm_client_sendforx (writer, "printer", "bw.A5", "Special conditions", NULL);
//
// mlm_client_recvx (reader, &subject, &content, NULL);
// assert (streq (subject, "bw.A4"));
// assert (streq (content, "Important contract"));
// assert (streq (mlm_client_command (reader), "SERVICE DELIVER"));
// assert (streq (mlm_client_sender (reader), "writer"));
// zstr_free (&subject);
// zstr_free (&content);
//
// mlm_client_recvx (reader, &subject, &content, NULL);
// assert (streq (subject, "bw.A5"));
// assert (streq (content, "Special conditions"));
// assert (streq (mlm_client_command (reader), "SERVICE DELIVER"));
// assert (streq (mlm_client_sender (reader), "writer"));
// zstr_free (&subject);
// zstr_free (&content);
//
// // Test that writer shutdown does not cause message loss
// mlm_client_set_consumer (reader, "weather", "temp.*");
// mlm_client_sendx (writer, "temp.brussels", "7", NULL);
// mlm_client_destroy (&writer);
//
// mlm_client_recvx (reader, &subject, &content, NULL);
// assert (streq (subject, "temp.brussels"));
// assert (streq (content, "7"));
// zstr_free (&subject);
// zstr_free (&content);
// mlm_client_destroy (&reader);
//
mlm_client_recvx (reader, &subject, &content, NULL);
assert (streq (subject, "temp.moscow"));
assert (streq (content, "1"));
assert (streq (mlm_client_command (reader), "STREAM DELIVER"));
assert (streq (mlm_client_sender (reader), "writer"));
zstr_free (&subject);
zstr_free (&content);

mlm_client_recvx (reader, &subject, &content, NULL);
assert (streq (subject, "temp.madrid"));
assert (streq (content, "3"));
assert (streq (mlm_client_command (reader), "STREAM DELIVER"));
assert (streq (mlm_client_subject (reader), "temp.madrid"));
assert (streq (mlm_client_sender (reader), "writer"));
zstr_free (&subject);
zstr_free (&content);

mlm_client_recvx (reader, &subject, &content, NULL);
assert (streq (subject, "temp.london"));
assert (streq (content, "5"));
assert (streq (mlm_client_command (reader), "STREAM DELIVER"));
assert (streq (mlm_client_sender (reader), "writer"));
zstr_free (&subject);
zstr_free (&content);

// Test mailbox pattern
mlm_client_sendtox (writer, "reader", "subject 1", "Message 1", "attachment", NULL);

char *attach;
mlm_client_recvx (reader, &subject, &content, &attach, NULL);
assert (streq (subject, "subject 1"));
assert (streq (content, "Message 1"));
assert (streq (attach, "attachment"));
assert (streq (mlm_client_command (reader), "MAILBOX DELIVER"));
assert (streq (mlm_client_subject (reader), "subject 1"));
assert (streq (mlm_client_sender (reader), "writer"));
zstr_free (&subject);
zstr_free (&content);
zstr_free (&attach);

// Now test that mailbox survives reader disconnect
mlm_client_destroy (&reader);
mlm_client_sendtox (writer, "reader", "subject 2", "Message 2", NULL);
mlm_client_sendtox (writer, "reader", "subject 3", "Message 3", NULL);

reader = mlm_client_new ("ipc://@/malamute", 500, "reader/secret");
assert (reader);
if (verbose)
mlm_client_verbose (reader);

mlm_client_recvx (reader, &subject, &content, &attach, NULL);
assert (streq (subject, "subject 2"));
assert (streq (content, "Message 2"));
assert (streq (mlm_client_command (reader), "MAILBOX DELIVER"));
zstr_free (&subject);
zstr_free (&content);

mlm_client_recvx (reader, &subject, &content, &attach, NULL);
assert (streq (subject, "subject 3"));
assert (streq (content, "Message 3"));
assert (streq (mlm_client_command (reader), "MAILBOX DELIVER"));
zstr_free (&subject);
zstr_free (&content);

// Test service pattern
mlm_client_set_worker (reader, "printer", "bw.*");
mlm_client_set_worker (reader, "printer", "color.*");

mlm_client_sendforx (writer, "printer", "bw.A4", "Important contract", NULL);
mlm_client_sendforx (writer, "printer", "bw.A5", "Special conditions", NULL);

mlm_client_recvx (reader, &subject, &content, NULL);
assert (streq (subject, "bw.A4"));
assert (streq (content, "Important contract"));
assert (streq (mlm_client_command (reader), "SERVICE DELIVER"));
assert (streq (mlm_client_sender (reader), "writer"));
zstr_free (&subject);
zstr_free (&content);

mlm_client_recvx (reader, &subject, &content, NULL);
assert (streq (subject, "bw.A5"));
assert (streq (content, "Special conditions"));
assert (streq (mlm_client_command (reader), "SERVICE DELIVER"));
assert (streq (mlm_client_sender (reader), "writer"));
zstr_free (&subject);
zstr_free (&content);

// Test that writer shutdown does not cause message loss
mlm_client_set_consumer (reader, "weather", "temp.*");
mlm_client_sendx (writer, "temp.brussels", "7", NULL);
mlm_client_destroy (&writer);

mlm_client_recvx (reader, &subject, &content, NULL);
assert (streq (subject, "temp.brussels"));
assert (streq (content, "7"));
zstr_free (&subject);
zstr_free (&content);
mlm_client_destroy (&reader);

// Test multiple readers for same message
mlm_client_t *writer = mlm_client_new ("ipc://@/malamute", 1000, "writer/secret");
writer = mlm_client_new ("ipc://@/malamute", 1000, "writer/secret");
assert (writer);
if (verbose)
mlm_client_verbose (writer);
Expand Down
3 changes: 0 additions & 3 deletions src/mlm_client.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,6 @@
<event name = "expired">
<action name = "send" message = "CONNECTION PING" />
</event>
<event name = "error">
<action name = "terminate" />
</event>
</state>

<state name = "confirming" inherit = "defaults">
Expand Down
14 changes: 8 additions & 6 deletions src/mlm_client_engine.inc
Original file line number Diff line number Diff line change
Expand Up @@ -564,16 +564,18 @@ s_client_execute (s_client_t *self, event_t event)
}
}
else
if (self->event == connection_pong_event) {
}
else
if (self->event == error_event) {
if (!self->exception) {
// terminate
// check status code
if (self->verbose)
zsys_debug ("mlm_client: $ terminate");
self->terminated = true;
zsys_debug ("mlm_client: $ check status code");
check_status_code (&self->client);
}
}
else
if (self->event == connection_pong_event) {
if (!self->exception)
self->state = have_error_state;
}
else {
// Handle unexpected protocol events
Expand Down
1 change: 0 additions & 1 deletion src/mlm_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,6 @@ static void
message_not_valid_in_this_state (client_t *self)
{
mlm_proto_set_status_code (self->message, MLM_PROTO_COMMAND_INVALID);
engine_set_exception (self, exception_event);
}


Expand Down
4 changes: 4 additions & 0 deletions src/mlm_server.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
<action name = "send" message = "OK" />
<action name = "check for mailbox messages" />
</event>
<event name = "*">
<action name = "message not valid in this state" />
<action name = "send" message = "ERROR" />
</event>
</state>

<state name = "connected" inherit = "external">
Expand Down
18 changes: 2 additions & 16 deletions src/mlm_server_engine.inc
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ static void
register_new_client (client_t *self);
static void
check_for_mailbox_messages (client_t *self);
static void
message_not_valid_in_this_state (client_t *self);
static void
store_stream_writer (client_t *self);
static void
Expand All @@ -169,8 +171,6 @@ static void
deregister_the_client (client_t *self);
static void
allow_time_to_settle (client_t *self);
static void
message_not_valid_in_this_state (client_t *self);
static void
get_message_to_deliver (client_t *self);

Expand Down Expand Up @@ -649,20 +649,6 @@ s_client_execute (s_client_t *self, event_t event)
mlm_proto_set_routing_id (self->server->message, self->routing_id);
mlm_proto_send (self->server->message, self->server->router);
}
if (!self->exception) {
// deregister the client
if (self->server->verbose)
zsys_debug ("%s: $ deregister the client", self->log_prefix);
deregister_the_client (&self->client);
}
if (!self->exception) {
// allow time to settle
if (self->server->verbose)
zsys_debug ("%s: $ allow time to settle", self->log_prefix);
allow_time_to_settle (&self->client);
}
if (!self->exception)
self->state = settling_state;
}
break;

Expand Down
33 changes: 23 additions & 10 deletions src/mshell.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,36 @@

int main (int argc, char *argv [])
{
if (argc < 2) {
printf ("syntax: mshell stream type [ body ]\n");
int argn = 1;
bool verbose = false;
if (argc > argn && streq (argv [argn], "-v")) {
verbose = true;
argn++;
}
// Get stream, subject/pattern, and optional content to send
char *stream = argn < argc? argv [argn++]: NULL;
char *subject = argn < argc? argv [argn++]: NULL;
char *content = argn < argc? argv [argn++]: NULL;

if (!stream || !subject || streq (stream, "-h")) {
printf ("syntax: mshell [-v] stream subject [ body ]\n");
return 0;
}
mlm_client_t *client = mlm_client_new ("ipc://@/malamute", 1000, "mshell");
mlm_client_t *client = mlm_client_new ("ipc://@/malamute", 1000, "mshell/mshell");
if (!client) {
zsys_error ("mshell: server not reachable at ipc://@/malamute");
return 0;
}
if (argc == 3) {
if (verbose)
mlm_client_verbose (client);

if (content) {
mlm_client_set_producer (client, stream);
mlm_client_sendx (client, subject, content, NULL);
}
else {
// Consume the event subjects specified by the pattern
mlm_client_set_consumer (client, argv [1], argv [2]);
mlm_client_set_consumer (client, stream, subject);
while (true) {
// Now receive and print any messages we get
zmsg_t *msg = mlm_client_recv (client);
Expand All @@ -42,11 +60,6 @@ int main (int argc, char *argv [])
zmsg_destroy (&msg);
}
}
else
if (argc == 4) {
mlm_client_set_producer (client, argv [1]);
mlm_client_sendx (client, argv [2], argv [3], NULL);
}
mlm_client_destroy (&client);
return 0;
}
Loading

0 comments on commit e4983cf

Please sign in to comment.