Skip to content

Commit

Permalink
Problem: need service distribution
Browse files Browse the repository at this point in the history
Solution: implement SERVICE commands

Fixes zeromq#22

Note, until credit based flow control is implemented, service requests
are sent on round-robin basis without any waiting. This is not yet a
load-balanced queue.
  • Loading branch information
hintjens committed Dec 6, 2014
1 parent d24543c commit 5dd135e
Show file tree
Hide file tree
Showing 7 changed files with 230 additions and 20 deletions.
5 changes: 5 additions & 0 deletions include/mlm_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ MLM_EXPORT void
MLM_EXPORT void
mlm_client_verbose (mlm_client_t *self);

// Return actor, when caller wants to work with multiple actors and/or
// input sockets asynchronously.
MLM_EXPORT zactor_t *
mlm_client_actor (mlm_client_t *self);

// Return message pipe for asynchronous message I/O. In the high-volume case,
// we send methods and get replies to the actor, in a synchronous manner, and
// we send/recv high volume message data to a second pipe, the msgpipe. In
Expand Down
39 changes: 37 additions & 2 deletions src/mlm_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,23 @@ pass_mailbox_message_to_app (client_t *self)
}


// ---------------------------------------------------------------------------
// pass_service_message_to_app
//

static void
pass_service_message_to_app (client_t *self)
{
zstr_sendm (self->msgpipe, "SERVICE DELIVER");
zsock_bsend (self->msgpipe, "ssssp",
mlm_msg_sender (self->message),
mlm_msg_service (self->message),
mlm_msg_subject (self->message),
mlm_msg_tracker (self->message),
mlm_msg_get_content (self->message));
}


// ---------------------------------------------------------------------------
// signal_success
//
Expand Down Expand Up @@ -374,8 +391,26 @@ mlm_client_test (bool verbose)

msg = zmsg_new ();
zmsg_addstr (msg, "Special conditions");
mlm_client_service_send (writer, "printer", "bw.A4", "", 0, &msg);

mlm_client_service_send (writer, "printer", "bw.A5", "", 0, &msg);

msg = mlm_client_recv (reader);
assert (streq (mlm_client_command (reader), "SERVICE DELIVER"));
assert (streq (mlm_client_subject (reader), "bw.A4"));
assert (streq (mlm_client_sender (reader), "writer"));
content = zmsg_popstr (msg);
assert (streq (content, "Important contract"));
zstr_free (&content);
zmsg_destroy (&msg);

msg = mlm_client_recv (reader);
assert (streq (mlm_client_command (reader), "SERVICE DELIVER"));
assert (streq (mlm_client_subject (reader), "bw.A5"));
assert (streq (mlm_client_sender (reader), "writer"));
content = zmsg_popstr (msg);
assert (streq (content, "Special conditions"));
zstr_free (&content);
zmsg_destroy (&msg);

// Done, shut down
mlm_client_destroy (&reader);
mlm_client_destroy (&writer);
Expand Down
3 changes: 3 additions & 0 deletions src/mlm_client.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@
<event name = "MAILBOX DELIVER">
<action name = "pass mailbox message to app" />
</event>
<event name = "SERVICE DELIVER">
<action name = "pass service message to app" />
</event>
<event name = "expired">
<action name = "send" message = "CONNECTION PING" />
</event>
Expand Down
36 changes: 32 additions & 4 deletions src/mlm_client_engine.inc
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,11 @@ typedef enum {
destructor_event = 7,
stream_deliver_event = 8,
mailbox_deliver_event = 9,
error_event = 10,
connection_pong_event = 11,
command_invalid_event = 12,
other_event = 13
service_deliver_event = 10,
error_event = 11,
connection_pong_event = 12,
command_invalid_event = 13,
other_event = 14
} event_t;

// Names for state machine logging and error reporting
Expand Down Expand Up @@ -77,6 +78,7 @@ s_event_name [] = {
"destructor",
"STREAM_DELIVER",
"MAILBOX_DELIVER",
"SERVICE_DELIVER",
"ERROR",
"CONNECTION_PONG",
"command_invalid",
Expand Down Expand Up @@ -155,6 +157,8 @@ static void
pass_stream_message_to_app (client_t *self);
static void
pass_mailbox_message_to_app (client_t *self);
static void
pass_service_message_to_app (client_t *self);
static void
signal_failure (client_t *self);
static void
Expand Down Expand Up @@ -331,6 +335,9 @@ s_protocol_event (s_client_t *self, mlm_msg_t *message)
case MLM_MSG_MAILBOX_DELIVER:
return mailbox_deliver_event;
break;
case MLM_MSG_SERVICE_DELIVER:
return service_deliver_event;
break;
case MLM_MSG_OK:
return ok_event;
break;
Expand Down Expand Up @@ -540,6 +547,15 @@ s_client_execute (s_client_t *self, event_t event)
}
}
else
if (self->event == service_deliver_event) {
if (!self->exception) {
// pass service message to app
if (self->verbose)
zsys_debug ("mlm_client: $ pass service message to app");
pass_service_message_to_app (&self->client);
}
}
else
if (self->event == expired_event) {
if (!self->exception) {
// send CONNECTION_PING
Expand Down Expand Up @@ -1044,6 +1060,18 @@ mlm_client_verbose (mlm_client_t *self)
}


// ---------------------------------------------------------------------------
// Return actor, when caller wants to work with multiple actors and/or
// input sockets asynchronously.

zactor_t *
mlm_client_actor (mlm_client_t *self)
{
assert (self);
return self->actor;
}


// ---------------------------------------------------------------------------
// Return message pipe for asynchronous message I/O. In the high-volume case,
// we send methods and get replies to the actor, in a synchronous manner, and
Expand Down
77 changes: 64 additions & 13 deletions src/mlm_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ typedef struct {

typedef struct {
char *name; // Service name
client_t *client; // Destination client, if any
zlistx_t *offers; // Service offers
zlistx_t *messages; // Pending messages
} service_t;
Expand All @@ -79,6 +78,7 @@ typedef struct {

typedef struct {
char *sender; // Originating client
char *address; // Message address or service
char *subject; // Message subject
char *tracker; // Message tracker
int64_t expiry; // Message expiry time
Expand All @@ -100,10 +100,13 @@ struct _server_t {
zhashx_t *mailboxes; // Holds mailboxes by address
zhashx_t *services; // Holds services by name

// Hold currently dispatching message here
// Hold currently dispatching stream message here
char *sender; // Originating client
char *subject; // Message subject
zmsg_t *content; // Message content

// Hold currently dispatching service message here
message_t *message; // Message structure
};


Expand Down Expand Up @@ -188,11 +191,12 @@ s_stream_require (client_t *self, const char *name)
// Work with a mailbox/service message instance

static message_t *
s_message_new (const char *sender, mlm_msg_t *msg)
s_message_new (const char *sender, const char *address, mlm_msg_t *msg)
{
message_t *self = (message_t *) zmalloc (sizeof (message_t));
if (self) {
self->sender = strdup (sender);
self->address = strdup (address);
self->subject = strdup (mlm_msg_subject (msg));
self->tracker = strdup (mlm_msg_tracker (msg));
self->content = mlm_msg_get_content (msg);
Expand Down Expand Up @@ -263,7 +267,7 @@ s_mailbox_require (client_t *self, const char *name)
static void
s_mailbox_dispatch (mailbox_t *self)
{
if (self->client)
if (self->client && zlistx_size (self->messages))
engine_send_event (self->client, mailbox_message_event);
}

Expand Down Expand Up @@ -348,10 +352,27 @@ s_service_require (client_t *self, const char *name)
}

void
s_service_dispatch (service_t *self)
s_service_dispatch (service_t *self, server_t *server)
{
// for each message, check regexp and dispatch if possible
// pop message and pass via server...
if (zlistx_size (self->offers)) {
message_t *message = (message_t *) zlistx_first (self->messages);
while (message) {
offer_t *offer = (offer_t *) zlistx_first (self->offers);
while (offer) {
if (zrex_matches (offer->rex, message->subject)) {
assert (!server->message);
server->message = (message_t *) zlistx_detach (
self->messages, zlistx_cursor (self->messages));
engine_send_event (offer->client, service_message_event);
zlistx_move_end (self->offers, zlistx_cursor (self->offers));
break;
}
offer = (offer_t *) zlistx_next (self->offers);
}
message = (message_t *) zlistx_next (self->messages);
}
}
}


Expand Down Expand Up @@ -497,7 +518,8 @@ write_message_to_mailbox (client_t *self)
{
mailbox_t *mailbox = s_mailbox_require (self, mlm_msg_address (self->message));
assert (mailbox);
zlistx_add_end (mailbox->messages, s_message_new (self->address, self->message));
zlistx_add_end (mailbox->messages,
s_message_new (self->address, mailbox->name, self->message));
s_mailbox_dispatch (mailbox);
}

Expand All @@ -511,8 +533,9 @@ write_message_to_service (client_t *self)
{
service_t *service = s_service_require (self, mlm_msg_service (self->message));
assert (service);
zlistx_add_end (service->messages, s_message_new (self->address, self->message));
s_service_dispatch (service);
zlistx_add_end (service->messages,
s_message_new (self->address, service->name, self->message));
s_service_dispatch (service, self->server);
}


Expand Down Expand Up @@ -557,8 +580,8 @@ get_mailbox_message_to_deliver (client_t *self)
// Get next message in mailbox queue
message_t *message = (message_t *) zlistx_detach (self->mailbox->messages, NULL);
assert (message);
mlm_msg_set_address (self->message, message->sender);
mlm_msg_set_sender (self->message, message->sender);
mlm_msg_set_address (self->message, message->address);
mlm_msg_set_subject (self->message, message->subject);
mlm_msg_set_content (self->message, &message->content);
s_message_destroy (&message);
Expand All @@ -572,11 +595,28 @@ get_mailbox_message_to_deliver (client_t *self)
static void
check_for_mailbox_messages (client_t *self)
{
if (self->mailbox && zlistx_size (self->mailbox->messages))
engine_set_exception (self, mailbox_message_event);
if (self->mailbox)
s_mailbox_dispatch (self->mailbox);
}


// ---------------------------------------------------------------------------
// get_service_message_to_deliver
//

static void
get_service_message_to_deliver (client_t *self)
{
// We pass the message via the server
message_t *message = self->server->message;
assert (message);
mlm_msg_set_sender (self->message, message->sender);
mlm_msg_set_service (self->message, message->address);
mlm_msg_set_subject (self->message, message->subject);
mlm_msg_set_content (self->message, &message->content);
s_message_destroy (&self->server->message);
}

// ---------------------------------------------------------------------------
// have_message_confirmation
//
Expand Down Expand Up @@ -615,7 +655,18 @@ deregister_the_client (client_t *self)
// Detach from mailbox, if any
if (self->mailbox)
self->mailbox->client = NULL;


// Cancel all service offerings
service_t *service = (service_t *) zhashx_first (self->server->services);
while (service) {
offer_t *offer = (offer_t *) zlistx_first (service->offers);
while (offer) {
if (offer->client == self)
zlistx_delete (service->offers, zlistx_cursor (service->offers));
offer = (offer_t *) zlistx_next (service->offers);
}
service = (service_t *) zhashx_next (self->server->services);
}
mlm_msg_set_status_code (self->message, MLM_MSG_SUCCESS);
}

Expand Down
4 changes: 4 additions & 0 deletions src/mlm_server.xml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@
<action name = "send" message = "MAILBOX DELIVER" />
<action name = "check for mailbox messages" />
</event>
<event name = "service message" next = "connected">
<action name = "get service message to deliver" />
<action name = "send" message = "SERVICE DELIVER" />
</event>
<!-- This built-in event hits on a client timeout -->
<event name = "expired" next = "settling">
<action name = "deregister the client" />
Expand Down
Loading

0 comments on commit 5dd135e

Please sign in to comment.