Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problem: address is always empty in STREAM DELIVER #69

Merged
merged 1 commit into from
Mar 19, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Problem: address is always empty in STREAM DELIVER
Solution: set this to stream name, as it should be.

Problem: sender is always empty in client API

Solution: return sender name properly

I extended the mlm_client.c self tests with checks for both these
problems.

Fixes #67
  • Loading branch information
hintjens committed Mar 19, 2015
commit 4dc0860a5cc2d4c8101aa46c3a645968f6d18706
2 changes: 1 addition & 1 deletion include/mlm_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ MLM_EXPORT int
MLM_EXPORT int
mlm_client_set_producer (mlm_client_t *self, const char *stream);

// Consume messages with matching addresses. The pattern is a regular expression
// Consume messages with matching subjects. The pattern is a regular expression
// using the CZMQ zrex syntax. The most useful elements are: ^ and $ to match the
// start and end, . to match any character, \s and \S to match whitespace and
// non-whitespace, \d and \D to match a digit and non-digit, \a and \A to match
Expand Down
59 changes: 43 additions & 16 deletions src/mlm_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ pass_stream_message_to_app (client_t *self)
{
zstr_sendm (self->msgpipe, "STREAM DELIVER");
zsock_bsend (self->msgpipe, "sssp",
mlm_proto_stream (self->message),
mlm_proto_address (self->message),
mlm_proto_sender (self->message),
mlm_proto_subject (self->message),
mlm_proto_get_content (self->message));
Expand Down Expand Up @@ -281,9 +281,7 @@ signal_server_not_present (client_t *self)
void
mlm_client_test (bool verbose)
{
printf (" * mlm_client: ");
if (verbose)
printf ("\n");
printf (" * mlm_client: \n");

// @selftest
mlm_client_verbose = verbose;
Expand Down Expand Up @@ -440,12 +438,19 @@ mlm_client_test (bool verbose)
zstr_free (&content);
mlm_client_destroy (&reader);

// Test multiple readers for same message
writer = mlm_client_new ();
assert (writer);
rc = mlm_client_set_plain_auth (writer, "writer", "secret");
// Test multiple readers and multiple writers
mlm_client_t *writer1 = mlm_client_new ();
assert (writer1);
rc = mlm_client_set_plain_auth (writer1, "writer", "secret");
assert (rc == 0);
rc = mlm_client_connect (writer, "inproc://malamute", 1000, "");
rc = mlm_client_connect (writer1, "inproc://malamute", 1000, "");
assert (rc == 0);

mlm_client_t *writer2 = mlm_client_new ();
assert (writer2);
rc = mlm_client_set_plain_auth (writer2, "writer", "secret");
assert (rc == 0);
rc = mlm_client_connect (writer2, "inproc://malamute", 1000, "");
assert (rc == 0);

mlm_client_t *reader1 = mlm_client_new ();
Expand All @@ -462,25 +467,47 @@ mlm_client_test (bool verbose)
rc = mlm_client_connect (reader2, "inproc://malamute", 1000, "");
assert (rc == 0);

mlm_client_set_producer (writer, "weather");
mlm_client_set_consumer (reader1, "weather", "temp.*");
mlm_client_set_consumer (reader2, "weather", "temp.*");
mlm_client_set_producer (writer1, "weather");
mlm_client_set_producer (writer2, "traffic");
mlm_client_set_consumer (reader1, "weather", "newyork");
mlm_client_set_consumer (reader1, "traffic", "newyork");
mlm_client_set_consumer (reader2, "weather", "newyork");
mlm_client_set_consumer (reader2, "traffic", "newyork");

mlm_client_sendx (writer, "temp.newyork", "8", NULL);
mlm_client_sendx (writer1, "newyork", "8", NULL);

mlm_client_recvx (reader1, &subject, &content, NULL);
assert (streq (subject, "temp.newyork"));
assert (streq (mlm_client_address (reader1), "weather"));
assert (streq (subject, "newyork"));
assert (streq (content, "8"));
zstr_free (&subject);
zstr_free (&content);

mlm_client_recvx (reader2, &subject, &content, NULL);
assert (streq (subject, "temp.newyork"));
assert (streq (mlm_client_address (reader2), "weather"));
assert (streq (subject, "newyork"));
assert (streq (content, "8"));
zstr_free (&subject);
zstr_free (&content);

mlm_client_destroy (&writer);
mlm_client_sendx (writer2, "newyork", "85", NULL);

mlm_client_recvx (reader1, &subject, &content, NULL);
assert (streq (mlm_client_address (reader1), "traffic"));
assert (streq (subject, "newyork"));
assert (streq (content, "85"));
zstr_free (&subject);
zstr_free (&content);

mlm_client_recvx (reader2, &subject, &content, NULL);
assert (streq (mlm_client_address (reader2), "traffic"));
assert (streq (subject, "newyork"));
assert (streq (content, "85"));
zstr_free (&subject);
zstr_free (&content);

mlm_client_destroy (&writer1);
mlm_client_destroy (&writer2);
mlm_client_destroy (&reader1);
mlm_client_destroy (&reader2);

Expand Down
2 changes: 1 addition & 1 deletion src/mlm_client.xml
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@
</method>

<method name = "set consumer" return = "status">
Consume messages with matching addresses. The pattern is a regular
Consume messages with matching subjects. The pattern is a regular
expression using the CZMQ zrex syntax. The most useful elements are:
^ and $ to match the start and end, . to match any character, \s and
\S to match whitespace and non-whitespace, \d and \D to match a digit
Expand Down
2 changes: 1 addition & 1 deletion src/mlm_client_engine.inc
Original file line number Diff line number Diff line change
Expand Up @@ -1282,7 +1282,7 @@ mlm_client_set_producer (mlm_client_t *self, const char *stream)


// ---------------------------------------------------------------------------
// Consume messages with matching addresses. The pattern is a regular expression
// Consume messages with matching subjects. The pattern is a regular expression
// using the CZMQ zrex syntax. The most useful elements are: ^ and $ to match the
// start and end, . to match any character, \s and \S to match whitespace and
// non-whitespace, \d and \D to match a digit and non-digit, \a and \A to match
Expand Down
10 changes: 10 additions & 0 deletions src/mlm_msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,16 @@ mlm_msg_subject (mlm_msg_t *self)
}


// --------------------------------------------------------------------------
// Return message address

char *
mlm_msg_address (mlm_msg_t *self)
{
return self->address;
}


// --------------------------------------------------------------------------
// Store message into mlm_proto object

Expand Down
18 changes: 11 additions & 7 deletions src/mlm_msg.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,32 +22,36 @@ typedef struct _mlm_msg_t mlm_msg_t;
// @interface
// Create a new mlm_msg; takes ownership of content, which the caller should
// not use after this call.
MLM_EXPORT mlm_msg_t *
mlm_msg_t *
mlm_msg_new (const char *sender, const char *address, const char *subject,
const char *tracker, uint timeout, zmsg_t *content);

// Destroy the mlm_msg
MLM_EXPORT void
void
mlm_msg_destroy (mlm_msg_t **self_p);

// Return message subject
MLM_EXPORT char *
char *
mlm_msg_subject (mlm_msg_t *self);

// Return message address
char *
mlm_msg_address (mlm_msg_t *self);

// Store message into mlm_proto object
MLM_EXPORT void
void
mlm_msg_set_proto (mlm_msg_t *self, mlm_proto_t *proto);

// Get reference-counted copy of message
MLM_EXPORT mlm_msg_t *
mlm_msg_t *
mlm_msg_link (mlm_msg_t *self);

// Drop reference to message
MLM_EXPORT void
void
mlm_msg_unlink (mlm_msg_t **self_p);

// Self test of this class
MLM_EXPORT int
int
mlm_msg_test (bool verbose);
// @end

Expand Down
12 changes: 8 additions & 4 deletions src/mlm_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ typedef struct _client_t client_t;
// This is a simple stream class

typedef struct {
char *name; // Stream name
zactor_t *actor; // Stream engine, zactor
zsock_t *msgpipe; // Socket to send messages to for stream
} stream_t;
Expand Down Expand Up @@ -126,18 +127,21 @@ s_stream_destroy (stream_t **self_p)
stream_t *self = *self_p;
zactor_destroy (&self->actor);
zsock_destroy (&self->msgpipe);
free (self->name);
free (self);
*self_p = NULL;
}
}

static stream_t *
s_stream_new (client_t *client)
s_stream_new (client_t *client, const char *name)
{
stream_t *self = (stream_t *) zmalloc (sizeof (stream_t));
if (self) {
zsock_t *backend;
self->msgpipe = zsys_create_pipe (&backend);
self->name = strdup (name);
if (self->name)
self->msgpipe = zsys_create_pipe (&backend);
if (self->msgpipe) {
engine_handle_socket (client->server, self->msgpipe, s_forward_stream_traffic);
self->actor = zactor_new (mlm_stream_simple, backend);
Expand All @@ -153,7 +157,7 @@ s_stream_require (client_t *self, const char *name)
{
stream_t *stream = (stream_t *) zhashx_lookup (self->server->streams, name);
if (!stream)
stream = s_stream_new (self);
stream = s_stream_new (self, name);
if (stream)
zhashx_insert (self->server->streams, name, stream);
return (stream);
Expand Down Expand Up @@ -397,7 +401,7 @@ write_message_to_stream (client_t *self)
if (self->writer) {
mlm_msg_t *msg = mlm_msg_new (
self->address,
NULL,
self->writer->name,
mlm_proto_subject (self->message),
NULL,
mlm_proto_timeout (self->message),
Expand Down