Skip to content

Commit

Permalink
Merge pull request #6347 from garlick/issue#6345
Browse files Browse the repository at this point in the history
add --full option to display payloads in message tracing programs
  • Loading branch information
mergify[bot] authored Oct 7, 2024
2 parents d4cdf62 + 13a68f5 commit 13430c8
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 32 deletions.
7 changes: 6 additions & 1 deletion doc/man1/flux-module.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ SYNOPSIS
| **flux** **module** **list** [*-l*]
| **flux** **module** **stats** [*-R*] [*--clear*] *name*
| **flux** **module** **debug** [*--setbit=VAL*] [*--clearbit=VAL*] [*--set=MASK*] [*--clear=MASK*] *name*
| **flux** **module** **trace** [*-t TYPE,...*] [-T *topic-glob*] *name...*
| **flux** **module** **trace** [-f] [*-t TYPE,...*] [-T *topic-glob*] *name...*


Expand Down Expand Up @@ -151,6 +151,11 @@ trace
Display message summaries for messages transmitted and received by the
named modules.

.. option:: -f, --full

Include JSON payload in output, if any. Payloads that are not JSON are
not displayed.

.. option:: -T, --topic=GLOB

Filter output by topic string.
Expand Down
7 changes: 6 additions & 1 deletion doc/man1/flux-overlay.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ SYNOPSIS
| **flux** **overlay** **lookup** *target*
| **flux** **overlay** **parentof** *rank*
| **flux** **overlay** **disconnect** [*--parent=RANK*] *target*
| **flux** **overlay** **trace** [*-r rank*] [*-t TYPE,...*] [*topic-glob*]
| **flux** **overlay** **trace** [-f] [*-r rank*] [*-t TYPE,...*] [*topic-glob*]

DESCRIPTION
Expand Down Expand Up @@ -130,6 +130,11 @@ Display message summaries for messages transmitted and received on the
overlay network. A topic string glob pattern may be supplied as a positional
argument.

.. option:: -f, --full

Include JSON payload in output, if any. Payloads that are not JSON are
not displayed.

.. option:: -r, --rank=NODEID

Filter output by overlay network peer rank. Note that this rank is not
Expand Down
2 changes: 2 additions & 0 deletions etc/completions/flux.pre
Original file line number Diff line number Diff line change
Expand Up @@ -1168,6 +1168,7 @@ _flux_overlay()
-L --color= \
-H --human \
-d --delta \
-f --full \
"
_flux_split_longopt && split=true
case $prev in
Expand Down Expand Up @@ -1305,6 +1306,7 @@ _flux_module()
-L --color= \
-H --human \
-d --delta \
-f --full \
"
if [[ $cmd != "module" ]]; then

Expand Down
16 changes: 12 additions & 4 deletions src/broker/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,7 @@ static void message_trace (module_t *p,
char buf[64];
const char *topic = NULL;
int payload_size = 0;
json_t *payload_json = NULL;

(void)flux_msg_get_type (msg, &type);
if (type == FLUX_MSGTYPE_CONTROL) {
Expand All @@ -477,22 +478,29 @@ static void message_trace (module_t *p,
req = flux_msglist_first (p->trace_requests);
while (req) {
struct flux_match match = FLUX_MATCH_ANY;
int full = 0;
if (flux_request_unpack (req,
NULL,
"{s:i s:s}",
"{s:i s:s s?b}",
"typemask", &match.typemask,
"topic_glob", &match.topic_glob) < 0
"topic_glob", &match.topic_glob,
"full", &full) < 0
|| !flux_msg_cmp (msg, match))
goto next;

if (full && !payload_json && payload_size > 0)
(void)flux_msg_unpack (msg, "o", &payload_json);

if (flux_respond_pack (p->h,
req,
"{s:f s:s s:i s:s s:s s:i}",
"{s:f s:s s:i s:s s:s s:i s:O?}",
"timestamp", now,
"prefix", prefix,
"type", type,
"name", p->name,
"topic", topic ? topic : "NO-TOPIC",
"payload_size", payload_size) < 0)
"payload_size", payload_size,
"payload", payload_json) < 0)
flux_log_error (p->h, "error responding to module.trace");
next:
req = flux_msglist_next (p->trace_requests);
Expand Down
15 changes: 11 additions & 4 deletions src/broker/overlay.c
Original file line number Diff line number Diff line change
Expand Up @@ -933,6 +933,7 @@ static void message_trace (struct overlay *ov,
char buf[64];
const char *topic = NULL;
int payload_size = 0;
json_t *payload_json = NULL;

(void)flux_msg_get_type (msg, &type);
if (type == FLUX_MSGTYPE_CONTROL) {
Expand All @@ -958,25 +959,31 @@ static void message_trace (struct overlay *ov,
while (req) {
struct flux_match match = FLUX_MATCH_ANY;
int nodeid;
int full = 0;
if (flux_request_unpack (req,
NULL,
"{s:i s:s s:i}",
"{s:i s:s s:i s?b}",
"typemask", &match.typemask,
"topic_glob", &match.topic_glob,
"nodeid", &nodeid) < 0
"nodeid", &nodeid,
"full", &full) < 0
|| (nodeid != FLUX_NODEID_ANY && nodeid != rank)
|| !flux_msg_cmp (msg, match))
goto next;

if (full && !payload_json && payload_size > 0)
(void)flux_msg_unpack (msg, "o", &payload_json);

if (flux_respond_pack (ov->h,
req,
"{s:f s:s s:i s:i s:s s:i}",
"{s:f s:s s:i s:i s:s s:i s:O?}",
"timestamp", now,
"prefix", prefix,
"rank", rank,
"type", type,
"topic", topic ? topic : "NO-TOPIC",
"payload_size", payload_size) < 0)
"payload_size", payload_size,
"payload", payload_json) < 0)
flux_log_error (ov->h, "error responding to overlay.trace");
next:
req = flux_msglist_next (ov->trace_requests);
Expand Down
38 changes: 28 additions & 10 deletions src/cmd/builtin/overlay.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ static struct optparse_option trace_opts[] = {
{ .name = "rank", .key = 'r', .has_arg = 1, .arginfo = "NODEID",
.usage = "Filter output by peer rank",
},
{ .name = "full", .key = 'f', .has_arg = 0,
.usage = "Show JSON message payload, if any",
},
{ .name = "type", .key = 't', .has_arg = 1,
.flags = OPTPARSE_OPT_AUTOSPLIT,
.arginfo = "TYPE,...",
Expand Down Expand Up @@ -1208,12 +1211,15 @@ static void trace_print_human_timestamp (struct trace_ctx *ctx,
static void trace_print_human (struct trace_ctx *ctx,
double timestamp,
int message_type,
const char *s)
const char *s,
const char *payload_str)
{
trace_print_human_timestamp (ctx, timestamp);
printf (" %s%s%s\n",
printf (" %s%s%s%s%s\n",
trace_color (ctx, message_type),
s,
payload_str ? "\n" : "",
payload_str ? payload_str : "",
trace_color_reset (ctx));
}

Expand All @@ -1236,12 +1242,15 @@ static void trace_print_timestamp (struct trace_ctx *ctx, double timestamp)
static void trace_print (struct trace_ctx *ctx,
double timestamp,
int message_type,
const char *s)
const char *s,
const char *payload_str)
{
trace_print_timestamp (ctx, timestamp);
printf (" %s%s%s\n",
printf (" %s%s%s%s%s\n",
trace_color (ctx, message_type),
s,
payload_str ? "\n" : "",
payload_str ? payload_str : "",
trace_color_reset (ctx));
}

Expand Down Expand Up @@ -1302,10 +1311,11 @@ static int subcmd_trace (optparse_t *p, int ac, char *av[])
"overlay.trace",
FLUX_NODEID_ANY,
FLUX_RPC_STREAMING,
"{s:i s:s s:i}",
"{s:i s:s s:i s:b}",
"typemask", ctx.match.typemask,
"topic_glob", ctx.match.topic_glob,
"nodeid", ctx.nodeid)))
"nodeid", ctx.nodeid,
"full", optparse_hasopt (p, "full") ? 1 : 0)))
log_err_exit ("error sending overlay.trace request");
do {
double timestamp;
Expand All @@ -1314,18 +1324,24 @@ static int subcmd_trace (optparse_t *p, int ac, char *av[])
int type;
const char *topic;
int payload_size;
json_t *payload_json = json_null ();
char *payload_str = NULL;
char buf[160];

if (flux_rpc_get_unpack (f,
"{s:F s:s s:i s:i s:s s:i}",
"{s:F s:s s:i s:i s:s s:i s?o}",
"timestamp", &timestamp,
"prefix", &prefix,
"rank", &rank,
"type", &type,
"topic", &topic,
"payload_size", &payload_size) < 0)
"payload_size", &payload_size,
"payload", &payload_json) < 0)
log_err_exit ("%s", future_strerror (f, errno));

if (!json_is_null (payload_json))
payload_str = json_dumps (payload_json, JSON_INDENT(2));

char rankstr[16];
if (rank < 0)
snprintf (rankstr, sizeof (rankstr), "*");
Expand All @@ -1342,11 +1358,13 @@ static int subcmd_trace (optparse_t *p, int ac, char *av[])
encode_size (payload_size));

if (optparse_hasopt (p, "human"))
trace_print_human (&ctx, timestamp, type, buf);
trace_print_human (&ctx, timestamp, type, buf, payload_str);
else
trace_print (&ctx, timestamp, type, buf);
trace_print (&ctx, timestamp, type, buf, payload_str);
fflush (stdout);

free (payload_str);

flux_future_reset (f);
} while (1);
}
Expand Down
38 changes: 28 additions & 10 deletions src/cmd/flux-module.c
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ static struct optparse_option debug_opts[] = {
};

static struct optparse_option trace_opts[] = {
{ .name = "full", .key = 'f', .has_arg = 0,
.usage = "Show JSON message payload, if any",
},
{ .name = "topic", .key = 'T', .has_arg = 1,
.arginfo = "GLOB",
.usage = "Filter output by message topic glob",
Expand Down Expand Up @@ -840,12 +843,15 @@ static void trace_print_human_timestamp (struct trace_ctx *ctx,
static void trace_print_human (struct trace_ctx *ctx,
double timestamp,
int message_type,
const char *s)
const char *s,
const char *payload_str)
{
trace_print_human_timestamp (ctx, timestamp);
printf (" %s%s%s\n",
printf (" %s%s%s%s%s\n",
trace_color (ctx, message_type),
s,
payload_str ? "\n" : "",
payload_str ? payload_str : "",
trace_color_reset (ctx));
}

Expand All @@ -869,12 +875,15 @@ static void trace_print_timestamp (struct trace_ctx *ctx, double timestamp)
static void trace_print (struct trace_ctx *ctx,
double timestamp,
int message_type,
const char *s)
const char *s,
const char *payload_str)
{
trace_print_timestamp (ctx, timestamp);
printf (" %s%s%s\n",
printf (" %s%s%s%s%s\n",
trace_color (ctx, message_type),
s,
payload_str ? "\n" : "",
payload_str ? payload_str : "",
trace_color_reset (ctx));
}

Expand Down Expand Up @@ -986,10 +995,11 @@ int cmd_trace (optparse_t *p, int ac, char *av[])
"module.trace",
FLUX_NODEID_ANY,
FLUX_RPC_STREAMING,
"{s:i s:s s:O}",
"{s:i s:s s:O s:b}",
"typemask", ctx.match.typemask,
"topic_glob", ctx.match.topic_glob,
"names", ctx.names)))
"names", ctx.names,
"full", optparse_hasopt (p, "full") ? 1 : 0)))
log_err_exit ("error sending module.trace request");

signal (SIGINT, sighandler);
Expand All @@ -1002,18 +1012,24 @@ int cmd_trace (optparse_t *p, int ac, char *av[])
int type;
const char *topic;
int payload_size;
json_t *payload_json = json_null ();
char *payload_str = NULL;
char buf[160];

if (flux_rpc_get_unpack (f,
"{s:F s:s s:s s:i s:s s:i}",
"{s:F s:s s:s s:i s:s s:i s?o}",
"timestamp", &timestamp,
"prefix", &prefix,
"name", &name,
"type", &type,
"topic", &topic,
"payload_size", &payload_size) < 0)
"payload_size", &payload_size,
"payload", &payload_json) < 0)
log_msg_exit ("%s", future_strerror (f, errno));

if (!json_is_null (payload_json))
payload_str = json_dumps (payload_json, JSON_INDENT(2));

snprintf (buf,
sizeof (buf),
"%*s %s %s %s [%s]",
Expand All @@ -1025,11 +1041,13 @@ int cmd_trace (optparse_t *p, int ac, char *av[])
encode_size (payload_size));

if (optparse_hasopt (p, "human"))
trace_print_human (&ctx, timestamp, type, buf);
trace_print_human (&ctx, timestamp, type, buf, payload_str);
else
trace_print (&ctx, timestamp, type, buf);
trace_print (&ctx, timestamp, type, buf, payload_str);
fflush (stdout);

free (payload_str);

flux_future_reset (f);
} while (1);

Expand Down
19 changes: 18 additions & 1 deletion t/t3400-overlay-trace.t
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,33 @@ test_expect_success NO_CHAIN_LINT 'start background trace' '
flux overlay trace >trace.out &
echo $! >trace.pid
'
test_expect_success NO_CHAIN_LINT 'start second background trace with --full' '
flux overlay trace --full >trace2.out &
echo $! >trace2.pid
'
test_expect_success NO_CHAIN_LINT 'heartbeat.pulse event was captured' '
$waitfile -t 60 -p heartbeat.pulse trace.out
'
test_expect_success NO_CHAIN_LINT 'heartbeat.pulse event was captured with --full' '
$waitfile -t 60 -p heartbeat.pulse trace2.out
'
test_expect_success NO_CHAIN_LINT 'send one kvs.ping to rank 1' '
flux ping -r 1 -c 1 kvs
'
test_expect_success NO_CHAIN_LINT 'kvs.ping request/response was captured' '
$waitfile -t 60 -c 2 -p kvs.ping trace.out
'
test_expect_success NO_CHAIN_LINT 'kvs.ping request/response was captured with --full' '
$waitfile -t 60 -c 2 -p kvs.ping trace2.out
'
test_expect_success NO_CHAIN_LINT 'stop background trace' '
kill -15 $(cat trace.pid); wait || true
pid=$(cat trace.pid) &&
kill -15 $pid &&
wait $pid || true
'
test_expect_success NO_CHAIN_LINT 'stop second background trace' '
pid=$(cat trace2.pid) &&
kill -15 $pid &&
wait $pid || true
'
test_done
Loading

0 comments on commit 13430c8

Please sign in to comment.