Skip to content

Commit

Permalink
Merge pull request flux-framework#6359 from garlick/trace_resp
Browse files Browse the repository at this point in the history
show response result in message traces
  • Loading branch information
mergify[bot] authored Oct 11, 2024
2 parents 47040bc + d462260 commit 9ed4cfb
Show file tree
Hide file tree
Showing 14 changed files with 1,705 additions and 185 deletions.
1 change: 1 addition & 0 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ AC_REPLACE_FUNCS( \
strlcat \
argz_add \
envz_add \
strerrorname_np \
)
X_AC_CHECK_PTHREADS
X_AC_CHECK_COND_LIB(rt, clock_gettime)
Expand Down
4 changes: 3 additions & 1 deletion src/broker/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ libbroker_la_SOURCES = \
shutdown.h \
shutdown.c \
topology.h \
topology.c
topology.c \
trace.h \
trace.c

flux_broker_LDADD = \
$(builddir)/libbroker.la \
Expand Down
72 changes: 3 additions & 69 deletions src/broker/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

#include "module.h"
#include "modservice.h"
#include "trace.h"

struct broker_module {
flux_t *h; /* ref to broker's internal flux_t handle */
Expand Down Expand Up @@ -441,77 +442,11 @@ int module_get_status (module_t *p)
return p ? p->status : 0;
}

static void message_trace (module_t *p,
const char *prefix,
const flux_msg_t *msg)
{
const flux_msg_t *req;
double now = flux_reactor_now (flux_get_reactor (p->h));
int type = 0;
char buf[64];
const char *topic = NULL;
int payload_size = 0;
json_t *payload_json = NULL;

(void)flux_msg_get_type (msg, &type);
if (type == FLUX_MSGTYPE_CONTROL) {
int ctype;
int cstatus;
if (flux_control_decode (msg, &ctype, &cstatus) == 0)
snprintf (buf,
sizeof (buf),
"%s %d",
ctype == FLUX_MODSTATE_INIT ? "init" :
ctype == FLUX_MODSTATE_RUNNING ? "running" :
ctype == FLUX_MODSTATE_FINALIZING ? "finalizing" :
ctype == FLUX_MODSTATE_EXITED ? "exited" : "unknown",
cstatus);
}
else {
(void)flux_msg_get_topic (msg, &topic);
(void)flux_msg_get_payload (msg, NULL, &payload_size);
if (topic && streq (topic, "module.trace"))
return;
}

req = flux_msglist_first (p->trace_requests);
while (req) {
struct flux_match match = FLUX_MATCH_ANY;
int full = 0;
if (flux_request_unpack (req,
NULL,
"{s:i s:s s?b}",
"typemask", &match.typemask,
"topic_glob", &match.topic_glob,
"full", &full) < 0
|| !flux_msg_cmp (msg, match))
goto next;

if (full && !payload_json && payload_size > 0)
(void)flux_msg_unpack (msg, "o", &payload_json);

if (flux_respond_pack (p->h,
req,
"{s:f s:s s:i s:s s:s s:i s:O?}",
"timestamp", now,
"prefix", prefix,
"type", type,
"name", p->name,
"topic", topic ? topic : "NO-TOPIC",
"payload_size", payload_size,
"payload", payload_json) < 0)
flux_log_error (p->h, "error responding to module.trace");
next:
req = flux_msglist_next (p->trace_requests);
}
}

flux_msg_t *module_recvmsg (module_t *p)
{
flux_msg_t *msg;
msg = flux_recv (p->h_broker_end, FLUX_MATCH_ANY, FLUX_O_NONBLOCK);
if (msg && flux_msglist_count (p->trace_requests) > 0)
message_trace (p, "tx", msg);
trace_module_msg (p->h, "tx", p->name, p->trace_requests, msg);
return msg;
}

Expand Down Expand Up @@ -541,8 +476,7 @@ int module_sendmsg_new (module_t *p, flux_msg_t **msg)
*msg = NULL;
return 0;
}
if (flux_msglist_count (p->trace_requests) > 0)
message_trace (p, "rx", *msg);
trace_module_msg (p->h, "rx", p->name, p->trace_requests, *msg);
return flux_send_new (p->h_broker_end, msg, 0);
}

Expand Down
118 changes: 26 additions & 92 deletions src/broker/overlay.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

#include "overlay.h"
#include "attr.h"
#include "trace.h"

/* How long to wait (seconds) for a peer broker's TCP ACK before disconnecting.
* This can be configured via TOML and on the broker command line.
Expand Down Expand Up @@ -67,14 +68,6 @@ static bool have_connect_timeout = false;

#define FLUX_ZAP_DOMAIN "flux"

/* Overlay control messages
*/
enum control_type {
CONTROL_HEARTBEAT = 0, // child sends when connection is idle
CONTROL_STATUS = 1, // child tells parent of subtree status change
CONTROL_DISCONNECT = 2,// parent tells child to immediately disconnect
};

/* Numerical values for "subtree health" so we can send them in control
* messages. Textual values below will be used for communication with front
* end diagnostic tool.
Expand Down Expand Up @@ -205,10 +198,6 @@ static int overlay_control_parent (struct overlay *ov,
int status);
static void overlay_health_respond_all (struct overlay *ov);
static struct child *child_lookup_byrank (struct overlay *ov, uint32_t rank);
static void message_trace (struct overlay *ov,
const char *prefix,
int rank,
const flux_msg_t *msg);

/* Convenience iterator for ov->children
*/
Expand Down Expand Up @@ -525,8 +514,11 @@ static int overlay_sendmsg_parent (struct overlay *ov, const flux_msg_t *msg)
rc = zmqutil_msg_send (ov->parent.zsock, msg);
if (rc == 0) {
ov->parent.lastsent = flux_reactor_now (ov->reactor);
if (flux_msglist_count (ov->trace_requests) > 0)
message_trace (ov, "tx", ov->parent.rank, msg);
trace_overlay_msg (ov->h,
"tx",
ov->parent.rank,
ov->trace_requests,
msg);
}
done:
return rc;
Expand Down Expand Up @@ -833,7 +825,7 @@ static int overlay_sendmsg_child (struct overlay *ov, const flux_msg_t *msg)
if ((uuid = flux_msg_route_last (msg))
&& (child = child_lookup_online (ov, uuid)))
rank = child->rank;
message_trace (ov, "tx", rank, msg);
trace_overlay_msg (ov->h, "tx", rank, ov->trace_requests, msg);
}
}
done:
Expand Down Expand Up @@ -873,8 +865,13 @@ static void overlay_mcast_child (struct overlay *ov, flux_msg_t *msg)
count++;
}
}
if (count > 0 && flux_msglist_count (ov->trace_requests) > 0)
message_trace (ov, "tx", -1, msg);
if (count > 0) {
trace_overlay_msg (ov->h,
"tx",
FLUX_NODEID_ANY,
ov->trace_requests,
msg);
}
}

static void logdrop (struct overlay *ov,
Expand Down Expand Up @@ -921,74 +918,6 @@ static int clear_msg_role (flux_msg_t *msg, uint32_t role)
return 0;
}

static void message_trace (struct overlay *ov,
const char *prefix,
int rank,
const flux_msg_t *msg)
{
const flux_msg_t *req;
double now = flux_reactor_now (ov->reactor);
int type = 0;
char buf[64];
const char *topic = NULL;
int payload_size = 0;
json_t *payload_json = NULL;

(void)flux_msg_get_type (msg, &type);
if (type == FLUX_MSGTYPE_CONTROL) {
int ctype;
int cstatus;
if (flux_control_decode (msg, &ctype, &cstatus) == 0) {
snprintf (buf,
sizeof (buf),
"%s %d",
ctype == CONTROL_HEARTBEAT ? "heartbeat" :
ctype == CONTROL_STATUS ? "status" :
ctype == CONTROL_DISCONNECT ? "disconnect" : "unknown",
cstatus);
topic = buf;
}
}
else {
(void)flux_msg_get_topic (msg, &topic);
(void)flux_msg_get_payload (msg, NULL, &payload_size);
}

req = flux_msglist_first (ov->trace_requests);
while (req) {
struct flux_match match = FLUX_MATCH_ANY;
int nodeid;
int full = 0;
if (flux_request_unpack (req,
NULL,
"{s:i s:s s:i s?b}",
"typemask", &match.typemask,
"topic_glob", &match.topic_glob,
"nodeid", &nodeid,
"full", &full) < 0
|| (nodeid != FLUX_NODEID_ANY && nodeid != rank)
|| !flux_msg_cmp (msg, match))
goto next;

if (full && !payload_json && payload_size > 0)
(void)flux_msg_unpack (msg, "o", &payload_json);

if (flux_respond_pack (ov->h,
req,
"{s:f s:s s:i s:i s:s s:i s:O?}",
"timestamp", now,
"prefix", prefix,
"rank", rank,
"type", type,
"topic", topic ? topic : "NO-TOPIC",
"payload_size", payload_size,
"payload", payload_json) < 0)
flux_log_error (ov->h, "error responding to overlay.trace");
next:
req = flux_msglist_next (ov->trace_requests);
}
}

/* Handle a message received from TBON child (downstream).
*/
static void child_cb (flux_reactor_t *r,
Expand Down Expand Up @@ -1027,7 +956,11 @@ static void child_cb (flux_reactor_t *r,
&& flux_msg_get_topic (msg, &topic) == 0
&& streq (topic, "overlay.hello")
&& !ov->shutdown_in_progress) {
message_trace (ov, "rx", -1, msg);
trace_overlay_msg (ov->h,
"rx",
FLUX_NODEID_ANY,
ov->trace_requests,
msg);
hello_request_handler (ov, msg);
}
/* Or one of the following cases occurred that requires (or at least
Expand Down Expand Up @@ -1058,7 +991,11 @@ static void child_cb (flux_reactor_t *r,
int type, status;
if (flux_control_decode (msg, &type, &status) == 0
&& type == CONTROL_STATUS) {
message_trace (ov, "rx", child->rank, msg);
trace_overlay_msg (ov->h,
"rx",
child->rank,
ov->trace_requests,
msg);
overlay_child_status_update (ov, child, status, NULL);
}
goto done;
Expand All @@ -1078,8 +1015,7 @@ static void child_cb (flux_reactor_t *r,
case FLUX_MSGTYPE_EVENT:
break;
}
if (flux_msglist_count (ov->trace_requests) > 0)
message_trace (ov, "rx", child->rank, msg);
trace_overlay_msg (ov->h, "rx", child->rank, ov->trace_requests, msg);
if (ov->recv_cb (&msg, OVERLAY_DOWNSTREAM, ov->recv_arg) < 0)
goto done;
return;
Expand Down Expand Up @@ -1169,9 +1105,7 @@ static void parent_cb (flux_reactor_t *r,
default:
break;
}
if (flux_msglist_count (ov->trace_requests) > 0) {
message_trace (ov, "rx", ov->parent.rank, msg);
}
trace_overlay_msg (ov->h, "rx", ov->parent.rank, ov->trace_requests, msg);
if (ov->recv_cb (&msg, OVERLAY_UPSTREAM, ov->recv_arg) < 0)
goto done;
return;
Expand Down
8 changes: 8 additions & 0 deletions src/broker/overlay.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ typedef enum {
OVERLAY_DOWNSTREAM,
} overlay_where_t;

/* Overlay control messages
*/
enum control_type {
CONTROL_HEARTBEAT = 0, // child sends when connection is idle
CONTROL_STATUS = 1, // child tells parent of subtree status change
CONTROL_DISCONNECT = 2,// parent tells child to immediately disconnect
};

struct overlay;

typedef void (*overlay_monitor_f)(struct overlay *ov, uint32_t rank, void *arg);
Expand Down
Loading

0 comments on commit 9ed4cfb

Please sign in to comment.