Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

shell: make kvs output limit configurable and default single-user jobs to unlimited #5732

Merged
merged 7 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions doc/man1/common/job-shell-options.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,8 @@

* - :option:`hwloc.xmlfile`
- Write hwloc XML gathered by job to a file and set ``HWLOC_XMLFILE``

* - :option:`output.limit`
- Set KVS output limit to SIZE bytes, where SIZE may be a floating point
value including optional SI units: k, K, M, G. This value is ignored
if output is directed to a file with :option:`--output`.
8 changes: 8 additions & 0 deletions doc/man1/flux-shell.rst
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,14 @@ plugins include:
this option directly, as it will be set automatically by options
of higher level commands such as :man1:`flux-submit`.

.. option:: output.limit=SIZE

Truncate KVS output after SIZE bytes have been written. SIZE may
be a floating point value with optional SI units k, K, M, G. A value of
0 is considered unlimited. The default KVS output limit is 10M for jobs
in a multi-user instance or unlimited for single-user instance jobs.
This value is ignored if output is directed to a file.

.. option:: output.{stdout,stderr}.path=PATH

Set job stderr/out file output to PATH.
Expand Down
1 change: 1 addition & 0 deletions doc/man3/flux_shell_get_info.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ with the following layout:
::

"jobid":I,
"instance_owner":i,
"rank":i,
"size":i,
"ntasks";i,
Expand Down
1 change: 1 addition & 0 deletions src/shell/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
struct flux_shell {
flux_jobid_t jobid;
int broker_rank;
uid_t broker_owner;
char hostname [MAXHOSTNAMELEN + 1];
int protocol_fd[2];

Expand Down
66 changes: 60 additions & 6 deletions src/shell/output.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
#include "src/common/libeventlog/eventlog.h"
#include "src/common/libeventlog/eventlogger.h"
#include "src/common/libioencode/ioencode.h"
#include "src/common/libutil/parse_size.h"
#include "ccan/str/str.h"

#include "task.h"
Expand All @@ -61,8 +62,7 @@
#include "builtins.h"
#include "log.h"

#define OUTPUT_LIMIT_BYTES 1024*1024*10
#define OUTPUT_LIMIT_STRING "10MB"
#define MULTIUSER_OUTPUT_LIMIT "10M"

enum {
FLUX_OUTPUT_TYPE_TERM = 1,
Expand All @@ -82,6 +82,8 @@

struct shell_output {
flux_shell_t *shell;
const char *kvs_limit_string;
size_t kvs_limit_bytes;
struct eventlogger *ev;
double batch_timeout;
int refcount;
Expand Down Expand Up @@ -341,6 +343,9 @@
size_t *bytesp;
size_t prev;

if (out->kvs_limit_bytes == 0)
return false;

if (is_stdout) {
stream = "stdout";
bytesp = &out->stdout_bytes;
Expand All @@ -353,13 +358,13 @@
prev = *bytesp;
*bytesp += len;

if (*bytesp > OUTPUT_LIMIT_BYTES) {
if (*bytesp > out->kvs_limit_bytes) {
/* Only log an error when the threshold is reached.
*/
if (prev <= OUTPUT_LIMIT_BYTES)
if (prev <= out->kvs_limit_bytes)
shell_warn ("%s will be truncated, %s limit exceeded",
stream,
OUTPUT_LIMIT_STRING);
out->kvs_limit_string);
return true;
}
return false;
Expand All @@ -385,7 +390,7 @@
out->stdout_bytes : out->stderr_bytes;
shell_warn ("%s: %zu of %zu bytes truncated",
is_stdout ? "stdout" : "stderr",
total - OUTPUT_LIMIT_BYTES,
total - out->kvs_limit_bytes,
total);
}
}
Expand Down Expand Up @@ -1135,6 +1140,53 @@
return 0;
}

static int get_output_limit (struct shell_output *out)
{
json_t *val = NULL;
uint64_t size;

/* Set default to unlimited (0) for single-user instances,
* O/w use the default multiuser output limit:
*/
if (out->shell->broker_owner == getuid())
out->kvs_limit_string = "0";
else
out->kvs_limit_string = MULTIUSER_OUTPUT_LIMIT;

Check warning on line 1154 in src/shell/output.c

View check run for this annotation

Codecov / codecov/patch

src/shell/output.c#L1154

Added line #L1154 was not covered by tests

if (flux_shell_getopt_unpack (out->shell,
"output",
"{s?o}",
"limit", &val) < 0) {
shell_log_error ("Unable to unpack shell output.limit");
return -1;

Check warning on line 1161 in src/shell/output.c

View check run for this annotation

Codecov / codecov/patch

src/shell/output.c#L1160-L1161

Added lines #L1160 - L1161 were not covered by tests
}
if (val != NULL) {
if (json_is_integer (val)) {
out->kvs_limit_bytes = (size_t) json_integer_value (val);
if (out->kvs_limit_bytes > 0) {
/* Need a string representation of limit for errors
*/
char *s = strdup (encode_size (out->kvs_limit_bytes));
if (s && flux_shell_aux_set (out->shell, NULL, s, free) < 0)
free (s);

Check warning on line 1171 in src/shell/output.c

View check run for this annotation

Codecov / codecov/patch

src/shell/output.c#L1171

Added line #L1171 was not covered by tests
else
out->kvs_limit_string = s;
}
return 0;
}
if (!(out->kvs_limit_string = json_string_value (val))) {
shell_log_error ("Unable to convert output.limit to string");
return -1;

Check warning on line 1179 in src/shell/output.c

View check run for this annotation

Codecov / codecov/patch

src/shell/output.c#L1178-L1179

Added lines #L1178 - L1179 were not covered by tests
}
}
if (parse_size (out->kvs_limit_string, &size) < 0) {
shell_log_errno ("Invalid KVS output.limit=%s", out->kvs_limit_string);
return -1;
}
out->kvs_limit_bytes = (size_t) size;
return 0;
}

struct shell_output *shell_output_create (flux_shell_t *shell)
{
struct shell_output *out;
Expand All @@ -1147,6 +1199,8 @@
out->stdout_buffer_type = "line";
out->stderr_buffer_type = "none";

if (get_output_limit (out) < 0)
goto error;
if (shell_output_check_alternate_output (out) < 0)
goto error;
if (shell_output_check_alternate_buffer_type (out) < 0)
Expand Down
24 changes: 23 additions & 1 deletion src/shell/shell.c
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,25 @@
return 0;
}

static uid_t get_instance_owner (flux_t *h)
{
const char *s;
char *endptr;
int id;

if (!(s = flux_attr_get (h, "security.owner"))) {
shell_log_errno ("error fetching security.owner attribute");
return (uid_t) 0;

Check warning on line 264 in src/shell/shell.c

View check run for this annotation

Codecov / codecov/patch

src/shell/shell.c#L263-L264

Added lines #L263 - L264 were not covered by tests
}
errno = 0;
id = strtoul (s, &endptr, 10);
if (errno != 0 || *endptr != '\0') {
shell_log_error ("error parsing security.owner=%s", s);
return (uid_t) 0;

Check warning on line 270 in src/shell/shell.c

View check run for this annotation

Codecov / codecov/patch

src/shell/shell.c#L269-L270

Added lines #L269 - L270 were not covered by tests
}
return (uid_t) id;
}

static void shell_connect_flux (flux_shell_t *shell)
{
uint32_t rank;
Expand All @@ -274,6 +293,8 @@
shell_log_errno ("error fetching broker rank");
shell->broker_rank = rank;

shell->broker_owner = get_instance_owner (shell->h);

if (plugstack_call (shell->plugstack, "shell.connect", NULL) < 0)
shell_log_errno ("shell.connect");
}
Expand Down Expand Up @@ -427,9 +448,10 @@
return o;

if (!(o = json_pack_ex (&err, 0,
"{ s:I s:i s:i s:i s:s s:O s:O s:{ s:i }}",
"{ s:I s:i s:i s:i s:i s:s s:O s:O s:{ s:i }}",
"jobid", shell->info->jobid,
"rank", shell->info->shell_rank,
"instance_owner", (int) shell->broker_owner,
"size", shell->info->shell_size,
"ntasks", shell->info->total_ntasks,
"service", shell_svc_name (shell->svc),
Expand Down
1 change: 1 addition & 0 deletions src/shell/shell.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ const struct taskmap *flux_shell_get_taskmap (flux_shell_t *shell);
/* Return shell info as a JSON string.
* {
* "jobid":I,
* "instance_owner":i,
* "rank":i,
* "size":i,
* "ntasks";i,
Expand Down
7 changes: 7 additions & 0 deletions t/system/0001-basic.t
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,10 @@ test_expect_success 'flux jobs lists job with correct userid' '
test_expect_success 'flux proxy can submit jobs to system instance' '
flux proxy $(flux getattr local-uri) flux submit true
'
test_expect_success 'flux-shell limits kvs output to 10M for guest jobs' '
dd if=/dev/urandom bs=10240 count=800 | base64 --wrap 79 >large.in &&
flux run -vvv cat large.in >large.out 2>trunc.err &&
ls -lh large* &&
test_debug "cat trunc.err" &&
grep "stdout.*truncated" trunc.err
'
27 changes: 17 additions & 10 deletions t/t2606-job-shell-output-redirection.t
Original file line number Diff line number Diff line change
Expand Up @@ -348,15 +348,22 @@ test_expect_success 'job-shell: shell errors are captured in error file' '
test_expect_code 127 flux run --error=test.err nosuchcommand &&
grep "nosuchcommand: No such file or directory" test.err
'
test_expect_success LONGTEST 'job-shell: output to kvs is truncated at 10MB' '
dd if=/dev/urandom bs=10240 count=800 | base64 --wrap 79 >expected &&
flux run cat expected >output 2>truncate.error &&
test_debug "cat truncate.error" &&
grep "stdout.*truncated" truncate.error
'
test_expect_success LONGTEST 'job-shell: stderr to kvs is truncated at 10MB' '
dd if=/dev/urandom bs=10240 count=800 | base64 --wrap 79 >expected &&
flux run sh -c "cat expected >&2" >truncate2.error 2>&1 &&
grep "stderr.*truncated" truncate2.error
test_expect_success 'job-shell: kvs output truncatation works' '
flux run -o output.limit=5 echo 0123456789 2>trunc.err &&
test_debug "cat trunc.err" &&
grep "stdout.*truncated" trunc.err
'
test_expect_success 'job-shell: stderr truncation works' '
flux run -o output.limit=5 \
sh -c "echo 0123456789 >&2" >trunc2.error 2>&1 &&
grep "stderr.*truncated" trunc2.error
'
test_expect_success LONGTEST 'job-shell: no truncation at 10MB for single-user job' '
dd if=/dev/urandom bs=10240 count=800 | base64 --wrap 79 >10M+ &&
flux run cat 10M+ >10M+.output &&
test_cmp 10M+ 10M+.output
'
test_expect_success 'job-shell: invalid output.limit string is rejected' '
test_must_fail flux run -o output.limit=foo hostname
'
test_done
2 changes: 1 addition & 1 deletion t/t9000-system.t
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ alias test_expect_success='expect_success_wrap'
#
for testscript in ${FLUX_SOURCE_DIR}/t/system/${T9000_SYSTEM_GLOB}; do
TEST_LABEL="$(basename $testscript)"
source $testscript
. $testscript
done

test_done
Loading