diff --git a/doc/man1/common/job-shell-options.rst b/doc/man1/common/job-shell-options.rst index 905698b83705..cf06436052e3 100644 --- a/doc/man1/common/job-shell-options.rst +++ b/doc/man1/common/job-shell-options.rst @@ -39,3 +39,8 @@ * - :option:`hwloc.xmlfile` - Write hwloc XML gathered by job to a file and set ``HWLOC_XMLFILE`` + + * - :option:`output.limit` + - Set KVS output limit to SIZE bytes, where SIZE may be a floating point + value including optional SI units: k, K, M, G. This value is ignored + if output is directed to a file with :option:`--output`. diff --git a/doc/man1/flux-shell.rst b/doc/man1/flux-shell.rst index b7f5c9eea3e8..c5f545e7dee5 100644 --- a/doc/man1/flux-shell.rst +++ b/doc/man1/flux-shell.rst @@ -318,6 +318,14 @@ plugins include: this option directly, as it will be set automatically by options of higher level commands such as :man1:`flux-submit`. +.. option:: output.limit=SIZE + + Truncate KVS output after SIZE bytes have been written. SIZE may + be a floating point value with optional SI units k, K, M, G. A value of + 0 is considered unlimited. The default KVS output limit is 10M for jobs + in a multi-user instance or unlimited for single-user instance jobs. + This value is ignored if output is directed to a file. + .. option:: output.{stdout,stderr}.path=PATH Set job stderr/out file output to PATH. diff --git a/doc/man3/flux_shell_get_info.rst b/doc/man3/flux_shell_get_info.rst index 2fe6efd3d112..cd1146d9b4df 100644 --- a/doc/man3/flux_shell_get_info.rst +++ b/doc/man3/flux_shell_get_info.rst @@ -39,6 +39,7 @@ with the following layout: :: "jobid":I, + "instance_owner":i, "rank":i, "size":i, "ntasks";i, diff --git a/src/shell/internal.h b/src/shell/internal.h index 0d91d81c049e..01be0140c16b 100644 --- a/src/shell/internal.h +++ b/src/shell/internal.h @@ -26,6 +26,7 @@ struct flux_shell { flux_jobid_t jobid; int broker_rank; + uid_t broker_owner; char hostname [MAXHOSTNAMELEN + 1]; int protocol_fd[2]; diff --git a/src/shell/output.c b/src/shell/output.c index dc3b4260326b..c44ca16c0024 100644 --- a/src/shell/output.c +++ b/src/shell/output.c @@ -53,6 +53,7 @@ #include "src/common/libeventlog/eventlog.h" #include "src/common/libeventlog/eventlogger.h" #include "src/common/libioencode/ioencode.h" +#include "src/common/libutil/parse_size.h" #include "ccan/str/str.h" #include "task.h" @@ -61,8 +62,7 @@ #include "builtins.h" #include "log.h" -#define OUTPUT_LIMIT_BYTES 1024*1024*10 -#define OUTPUT_LIMIT_STRING "10MB" +#define MULTIUSER_OUTPUT_LIMIT "10M" enum { FLUX_OUTPUT_TYPE_TERM = 1, @@ -82,6 +82,8 @@ struct shell_output_type_file { struct shell_output { flux_shell_t *shell; + const char *kvs_limit_string; + size_t kvs_limit_bytes; struct eventlogger *ev; double batch_timeout; int refcount; @@ -341,6 +343,9 @@ static bool check_kvs_output_limit (struct shell_output *out, size_t *bytesp; size_t prev; + if (out->kvs_limit_bytes == 0) + return false; + if (is_stdout) { stream = "stdout"; bytesp = &out->stdout_bytes; @@ -353,13 +358,13 @@ static bool check_kvs_output_limit (struct shell_output *out, prev = *bytesp; *bytesp += len; - if (*bytesp > OUTPUT_LIMIT_BYTES) { + if (*bytesp > out->kvs_limit_bytes) { /* Only log an error when the threshold is reached. */ - if (prev <= OUTPUT_LIMIT_BYTES) + if (prev <= out->kvs_limit_bytes) shell_warn ("%s will be truncated, %s limit exceeded", stream, - OUTPUT_LIMIT_STRING); + out->kvs_limit_string); return true; } return false; @@ -385,7 +390,7 @@ static int shell_output_kvs (struct shell_output *out) out->stdout_bytes : out->stderr_bytes; shell_warn ("%s: %zu of %zu bytes truncated", is_stdout ? "stdout" : "stderr", - total - OUTPUT_LIMIT_BYTES, + total - out->kvs_limit_bytes, total); } } @@ -1135,6 +1140,53 @@ static int shell_lost (flux_plugin_t *p, return 0; } +static int get_output_limit (struct shell_output *out) +{ + json_t *val = NULL; + uint64_t size; + + /* Set default to unlimited (0) for single-user instances, + * O/w use the default multiuser output limit: + */ + if (out->shell->broker_owner == getuid()) + out->kvs_limit_string = "0"; + else + out->kvs_limit_string = MULTIUSER_OUTPUT_LIMIT; + + if (flux_shell_getopt_unpack (out->shell, + "output", + "{s?o}", + "limit", &val) < 0) { + shell_log_error ("Unable to unpack shell output.limit"); + return -1; + } + if (val != NULL) { + if (json_is_integer (val)) { + out->kvs_limit_bytes = (size_t) json_integer_value (val); + if (out->kvs_limit_bytes > 0) { + /* Need a string representation of limit for errors + */ + char *s = strdup (encode_size (out->kvs_limit_bytes)); + if (s && flux_shell_aux_set (out->shell, NULL, s, free) < 0) + free (s); + else + out->kvs_limit_string = s; + } + return 0; + } + if (!(out->kvs_limit_string = json_string_value (val))) { + shell_log_error ("Unable to convert output.limit to string"); + return -1; + } + } + if (parse_size (out->kvs_limit_string, &size) < 0) { + shell_log_errno ("Invalid KVS output.limit=%s", out->kvs_limit_string); + return -1; + } + out->kvs_limit_bytes = (size_t) size; + return 0; +} + struct shell_output *shell_output_create (flux_shell_t *shell) { struct shell_output *out; @@ -1147,6 +1199,8 @@ struct shell_output *shell_output_create (flux_shell_t *shell) out->stdout_buffer_type = "line"; out->stderr_buffer_type = "none"; + if (get_output_limit (out) < 0) + goto error; if (shell_output_check_alternate_output (out) < 0) goto error; if (shell_output_check_alternate_buffer_type (out) < 0) diff --git a/src/shell/shell.c b/src/shell/shell.c index ce95988f218a..e7eeb34e19e7 100644 --- a/src/shell/shell.c +++ b/src/shell/shell.c @@ -253,6 +253,25 @@ static int reconnect (flux_t *h, void *arg) return 0; } +static uid_t get_instance_owner (flux_t *h) +{ + const char *s; + char *endptr; + int id; + + if (!(s = flux_attr_get (h, "security.owner"))) { + shell_log_errno ("error fetching security.owner attribute"); + return (uid_t) 0; + } + errno = 0; + id = strtoul (s, &endptr, 10); + if (errno != 0 || *endptr != '\0') { + shell_log_error ("error parsing security.owner=%s", s); + return (uid_t) 0; + } + return (uid_t) id; +} + static void shell_connect_flux (flux_shell_t *shell) { uint32_t rank; @@ -274,6 +293,8 @@ static void shell_connect_flux (flux_shell_t *shell) shell_log_errno ("error fetching broker rank"); shell->broker_rank = rank; + shell->broker_owner = get_instance_owner (shell->h); + if (plugstack_call (shell->plugstack, "shell.connect", NULL) < 0) shell_log_errno ("shell.connect"); } @@ -427,9 +448,10 @@ static json_t *flux_shell_get_info_object (flux_shell_t *shell) return o; if (!(o = json_pack_ex (&err, 0, - "{ s:I s:i s:i s:i s:s s:O s:O s:{ s:i }}", + "{ s:I s:i s:i s:i s:i s:s s:O s:O s:{ s:i }}", "jobid", shell->info->jobid, "rank", shell->info->shell_rank, + "instance_owner", (int) shell->broker_owner, "size", shell->info->shell_size, "ntasks", shell->info->total_ntasks, "service", shell_svc_name (shell->svc), diff --git a/src/shell/shell.h b/src/shell/shell.h index b84591927401..2ce166466e91 100644 --- a/src/shell/shell.h +++ b/src/shell/shell.h @@ -105,6 +105,7 @@ const struct taskmap *flux_shell_get_taskmap (flux_shell_t *shell); /* Return shell info as a JSON string. * { * "jobid":I, + * "instance_owner":i, * "rank":i, * "size":i, * "ntasks";i, diff --git a/t/system/0001-basic.t b/t/system/0001-basic.t index 6b3de9ff2e98..e484225e8f99 100755 --- a/t/system/0001-basic.t +++ b/t/system/0001-basic.t @@ -15,3 +15,10 @@ test_expect_success 'flux jobs lists job with correct userid' ' test_expect_success 'flux proxy can submit jobs to system instance' ' flux proxy $(flux getattr local-uri) flux submit true ' +test_expect_success 'flux-shell limits kvs output to 10M for guest jobs' ' + dd if=/dev/urandom bs=10240 count=800 | base64 --wrap 79 >large.in && + flux run -vvv cat large.in >large.out 2>trunc.err && + ls -lh large* && + test_debug "cat trunc.err" && + grep "stdout.*truncated" trunc.err +' diff --git a/t/t2606-job-shell-output-redirection.t b/t/t2606-job-shell-output-redirection.t index a10717c77bd8..43c648d2a3d7 100755 --- a/t/t2606-job-shell-output-redirection.t +++ b/t/t2606-job-shell-output-redirection.t @@ -348,15 +348,22 @@ test_expect_success 'job-shell: shell errors are captured in error file' ' test_expect_code 127 flux run --error=test.err nosuchcommand && grep "nosuchcommand: No such file or directory" test.err ' -test_expect_success LONGTEST 'job-shell: output to kvs is truncated at 10MB' ' - dd if=/dev/urandom bs=10240 count=800 | base64 --wrap 79 >expected && - flux run cat expected >output 2>truncate.error && - test_debug "cat truncate.error" && - grep "stdout.*truncated" truncate.error -' -test_expect_success LONGTEST 'job-shell: stderr to kvs is truncated at 10MB' ' - dd if=/dev/urandom bs=10240 count=800 | base64 --wrap 79 >expected && - flux run sh -c "cat expected >&2" >truncate2.error 2>&1 && - grep "stderr.*truncated" truncate2.error +test_expect_success 'job-shell: kvs output truncatation works' ' + flux run -o output.limit=5 echo 0123456789 2>trunc.err && + test_debug "cat trunc.err" && + grep "stdout.*truncated" trunc.err +' +test_expect_success 'job-shell: stderr truncation works' ' + flux run -o output.limit=5 \ + sh -c "echo 0123456789 >&2" >trunc2.error 2>&1 && + grep "stderr.*truncated" trunc2.error +' +test_expect_success LONGTEST 'job-shell: no truncation at 10MB for single-user job' ' + dd if=/dev/urandom bs=10240 count=800 | base64 --wrap 79 >10M+ && + flux run cat 10M+ >10M+.output && + test_cmp 10M+ 10M+.output +' +test_expect_success 'job-shell: invalid output.limit string is rejected' ' + test_must_fail flux run -o output.limit=foo hostname ' test_done diff --git a/t/t9000-system.t b/t/t9000-system.t index 53fcb4ac68ce..e982b8d3b218 100755 --- a/t/t9000-system.t +++ b/t/t9000-system.t @@ -65,7 +65,7 @@ alias test_expect_success='expect_success_wrap' # for testscript in ${FLUX_SOURCE_DIR}/t/system/${T9000_SYSTEM_GLOB}; do TEST_LABEL="$(basename $testscript)" - source $testscript + . $testscript done test_done