diff --git a/src/modules/job-manager/Makefile.am b/src/modules/job-manager/Makefile.am index f15da1cedec4..6646fbe716ea 100644 --- a/src/modules/job-manager/Makefile.am +++ b/src/modules/job-manager/Makefile.am @@ -18,6 +18,7 @@ noinst_LTLIBRARIES = \ jobtap_plugin_LTLIBRARIES = \ plugins/submit-hold.la \ plugins/alloc-bypass.la \ + plugins/alloc-check.la \ plugins/perilog.la libjob_manager_la_SOURCES = \ @@ -90,6 +91,15 @@ plugins_alloc_bypass_la_LDFLAGS = \ $(fluxplugin_ldflags) \ -module +plugins_alloc_check_la_SOURCES = \ + plugins/alloc-check.c +plugins_alloc_check_la_LIBADD = \ + $(top_builddir)/src/common/libflux-internal.la \ + $(top_builddir)/src/common/librlist/librlist.la +plugins_alloc_check_la_LDFLAGS = \ + $(fluxplugin_ldflags) \ + -module + plugins_perilog_la_SOURCES = \ plugins/perilog.c plugins_perilog_la_LIBADD = \ diff --git a/src/modules/job-manager/plugins/alloc-check.c b/src/modules/job-manager/plugins/alloc-check.c new file mode 100644 index 000000000000..15c2837b0e68 --- /dev/null +++ b/src/modules/job-manager/plugins/alloc-check.c @@ -0,0 +1,215 @@ +/************************************************************\ + * Copyright 2023 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +/* alloc-check.c - plugin to ensure resources are never double booked + * + * A fatal exception is raised on jobs that are granted resources already + * granted to another. + * + * In order to be sure that the exception can be raised before a short job + * becomes inactive, R is looked up in the KVS synchronously, causing the + * job manager to be briefly unresponsive. Hence, this plugin is primarily + * suited for debug/test situations. + * + * N.B. This plugin does not account for any jobs that might already have + * allocations when the plugin is loaded. + */ + +#if HAVE_CONFIG_H +#include "config.h" +#endif + +#include +#include +#include + +#include "ccan/str/str.h" +#include "src/common/librlist/rlist.h" +#include "src/common/libjob/idf58.h" + +#define PLUGIN_NAME "alloc-check" +static const char *auxname = PLUGIN_NAME "::resdb"; + +/* Start out with empty resource set. Add resources on job.event.alloc + * (scheduler has allocated resources to job). Subtract resources on + * job.event.free (job manager has returned resources to the scheduler). + */ +struct resdb { + struct rlist *allocated; +}; + +static void resdb_destroy (struct resdb *resdb) +{ + if (resdb) { + int saved_errno = errno; + rlist_destroy (resdb->allocated); + free (resdb); + errno = saved_errno; + } +} + +static struct resdb *resdb_create (void) +{ + struct resdb *resdb; + + if (!(resdb = calloc (1, sizeof (*resdb)))) + return NULL; + if (!(resdb->allocated = rlist_create())) { + free (resdb); + errno = ENOMEM; + return NULL; + } + return resdb; +} + +/* Generate the kvs path to R for a given job + */ +static int res_makekey (flux_jobid_t id, char *buf, size_t size) +{ + char dir[128]; + if (flux_job_id_encode (id, "kvs", dir, sizeof (dir)) < 0) + return -1; + if (snprintf (buf, size, "%s.R", dir) >= size) { + errno = EOVERFLOW; + return -1; + } + return 0; +} + +/* Synchronously look up R for a given job and convert it to an rlist object + * which the caller must destroy with rlist_destroy(). + */ +static struct rlist *res_lookup (flux_t *h, flux_jobid_t id) +{ + char key[128]; + flux_future_t *f = NULL; + const char *R; + struct rlist *rlist; + + if (res_makekey (id, key, sizeof (key)) < 0 + || !(f = flux_kvs_lookup (h, NULL, 0, key)) + || flux_kvs_lookup_get (f, &R) < 0 + || !(rlist = rlist_from_R (R))) { + flux_future_destroy (f); + return NULL; + } + flux_future_destroy (f); + return rlist; +} + +static int jobtap_cb (flux_plugin_t *p, + const char *topic, + flux_plugin_arg_t *args, + void *arg) +{ + struct resdb *resdb = flux_plugin_aux_get (p, auxname); + flux_t *h = flux_jobtap_get_flux (p); + flux_jobid_t id; + + if (flux_plugin_arg_unpack (args, + FLUX_PLUGIN_ARG_IN, + "{s:I}", + "id", &id) < 0) { + flux_log (h, + LOG_ERR, + "%s %s: unpack: %s", + PLUGIN_NAME, + topic, + flux_plugin_arg_strerror (args)); + return -1; + } + /* job.event.* callbacks are not received unless subscribed on a per-job + * basis, so subscribe to them in the job.new callback. + */ + if (streq (topic, "job.new")) { + if (flux_jobtap_job_subscribe (p, id) < 0) { + flux_log_error (h, + "%s(%s) %s: subscribe", + PLUGIN_NAME, + idf58 (id), + topic); + } + } + /* Look up R that was just allocated to the job and attach it to the job + * aux container so we don't have to look it up again on free. Call + * rlist_append() to add the resources to resdb->allocated. If that + * fails, some resources are already allocated so raise a fatal exception + * on the job. + */ + else if (streq (topic, "job.event.alloc")) { + struct rlist *R; + if (!(R = res_lookup (h, id)) + || flux_jobtap_job_aux_set (p, + id, + PLUGIN_NAME "::R", + R, + (flux_free_f)rlist_destroy) < 0) { + flux_log_error (h, + "%s(%s) %s: failed to lookup or cache R", + PLUGIN_NAME, + idf58 (id), + topic); + rlist_destroy (R); + return -1; + } + if (rlist_append (resdb->allocated, R) < 0) { + flux_jobtap_raise_exception (p, + id, + "alloc-check", + 0, + "resources already allocated"); + } + } + /* Get R that was just freed from the job's aux container and remove it + * from resdb->allocated. Any jobs that had allocations before the module + * will not have the R aux item, so silently return success in that case. + */ + else if (streq (topic, "job.event.free")) { + struct rlist *R = flux_jobtap_job_aux_get (p, id, PLUGIN_NAME "::R"); + if (R) { + struct rlist *diff; + if (!(diff = rlist_diff (resdb->allocated, R))) { + flux_log_error (h, + "%s(%s) %s: rlist_diff", + PLUGIN_NAME, + idf58 (id), + topic); + return -1; + } + rlist_destroy (resdb->allocated); + resdb->allocated = diff; + } + } + return 0; +} + +static const struct flux_plugin_handler tab[] = { + { "job.event.alloc", jobtap_cb, NULL }, + { "job.event.free", jobtap_cb, NULL }, + { "job.new", jobtap_cb, NULL }, + { 0 } +}; + +int flux_plugin_init (flux_plugin_t *p) +{ + struct resdb *resdb; + + if (!(resdb = resdb_create ()) + || flux_plugin_aux_set (p, + auxname, + resdb, + (flux_free_f)resdb_destroy) < 0) { + resdb_destroy (resdb); + return -1; + } + return flux_plugin_register (p, "alloc-check", tab); +} + +// vi:ts=4 sw=4 expandtab diff --git a/t/Makefile.am b/t/Makefile.am index ae216db81f1f..a0b273ab8a11 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -159,6 +159,7 @@ TESTSCRIPTS = \ t2300-sched-simple.t \ t2302-sched-simple-up-down.t \ t2303-sched-hello.t \ + t2304-sched-simple-alloc-check.t \ t2310-resource-module.t \ t2311-resource-drain.t \ t2312-resource-exclude.t \ diff --git a/t/t2304-sched-simple-alloc-check.t b/t/t2304-sched-simple-alloc-check.t new file mode 100755 index 000000000000..893d68182078 --- /dev/null +++ b/t/t2304-sched-simple-alloc-check.t @@ -0,0 +1,79 @@ +#!/bin/sh + +test_description='check that sched-simple never double books resources' + +# Append --logfile option if FLUX_TESTS_LOGFILE is set in environment: +test -n "$FLUX_TESTS_LOGFILE" && set -- "$@" --logfile +. $(dirname $0)/sharness.sh + +test_under_flux 1 + +# Verify that alloc-check plugin works using alloc-bypass +test_expect_success 'load alloc-bypass and alloc-check plugins' ' + flux jobtap load alloc-bypass.so && + flux jobtap load alloc-check.so +' +test_expect_success 'run an alloc-bypass sleep job' ' + flux submit \ + -vvv \ + --wait-event=start \ + --setattr=alloc-bypass.R="$(flux R encode -r 0)" \ + -n 1 \ + sleep inf +' +test_expect_success 'a regular job fails with an alloc-check exception' ' + run_timeout 30 flux submit --flags=waitable -vvv \ + --wait-event=exception \ + -N1 /bin/true >bypass.jobid +' +test_expect_success 'flux job wait says the job failed' ' + test_must_fail flux job wait -v $(cat bypass.jobid) +' +test_expect_success 'clean up jobs' ' + flux cancel --all && + flux queue drain +' +test_expect_success 'unload plugins' ' + flux jobtap remove alloc-check.so && + flux jobtap remove alloc-bypass.so +' + +# Check that sched-simple doesn't suffer from time limit issue like +# flux-framework/flux-sched#1043 +# +test_expect_success 'configure epilog with 2s delay' ' + flux config load <<-EOT + [job-manager] + plugins = [ + { load = "perilog.so" }, + ] + epilog.command = [ "flux", "perilog-run", "epilog", "-e", "sleep,2" ] + EOT +' +test_expect_success 'load alloc-check plugin' ' + flux jobtap load alloc-check.so +' +test_expect_success 'submit consecutive jobs that exceed their time limit' ' + (for i in $(seq 5); \ + do flux submit -N1 -x -t1s sleep inf; \ + done) >jobids +' +test_expect_success 'wait for jobs to complete and capture their stderr' ' + (for id in $(cat jobids); do \ + flux job attach $id || true; \ + done) 2>joberr +' +test_expect_success 'some jobs received timeout exception' ' + grep "job.exception type=timeout" joberr +' +test_expect_success 'no jobs received alloc-check exception' ' + test_must_fail grep "job.exception type=alloc-check" joberr +' +test_expect_success 'remove alloc-check plugin' ' + flux jobtap remove alloc-check.so +' +test_expect_success 'undo epilog config' ' + flux config load