Skip to content

Commit

Permalink
Merge pull request flux-framework#5304 from garlick/alloc_check
Browse files Browse the repository at this point in the history
job-manager: add alloc-check plugin
  • Loading branch information
mergify[bot] authored Jul 5, 2023
2 parents 49d65d0 + 4ec4d4d commit e0f2211
Show file tree
Hide file tree
Showing 4 changed files with 305 additions and 0 deletions.
10 changes: 10 additions & 0 deletions src/modules/job-manager/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ noinst_LTLIBRARIES = \
jobtap_plugin_LTLIBRARIES = \
plugins/submit-hold.la \
plugins/alloc-bypass.la \
plugins/alloc-check.la \
plugins/perilog.la

libjob_manager_la_SOURCES = \
Expand Down Expand Up @@ -90,6 +91,15 @@ plugins_alloc_bypass_la_LDFLAGS = \
$(fluxplugin_ldflags) \
-module

plugins_alloc_check_la_SOURCES = \
plugins/alloc-check.c
plugins_alloc_check_la_LIBADD = \
$(top_builddir)/src/common/libflux-internal.la \
$(top_builddir)/src/common/librlist/librlist.la
plugins_alloc_check_la_LDFLAGS = \
$(fluxplugin_ldflags) \
-module

plugins_perilog_la_SOURCES = \
plugins/perilog.c
plugins_perilog_la_LIBADD = \
Expand Down
215 changes: 215 additions & 0 deletions src/modules/job-manager/plugins/alloc-check.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
/************************************************************\
* Copyright 2023 Lawrence Livermore National Security, LLC
* (c.f. AUTHORS, NOTICE.LLNS, COPYING)
*
* This file is part of the Flux resource manager framework.
* For details, see https://github.com/flux-framework.
*
* SPDX-License-Identifier: LGPL-3.0
\************************************************************/

/* alloc-check.c - plugin to ensure resources are never double booked
*
* A fatal exception is raised on jobs that are granted resources already
* granted to another.
*
* In order to be sure that the exception can be raised before a short job
* becomes inactive, R is looked up in the KVS synchronously, causing the
* job manager to be briefly unresponsive. Hence, this plugin is primarily
* suited for debug/test situations.
*
* N.B. This plugin does not account for any jobs that might already have
* allocations when the plugin is loaded.
*/

#if HAVE_CONFIG_H
#include "config.h"
#endif

#include <jansson.h>
#include <flux/core.h>
#include <flux/jobtap.h>

#include "ccan/str/str.h"
#include "src/common/librlist/rlist.h"
#include "src/common/libjob/idf58.h"

#define PLUGIN_NAME "alloc-check"
static const char *auxname = PLUGIN_NAME "::resdb";

/* Start out with empty resource set. Add resources on job.event.alloc
* (scheduler has allocated resources to job). Subtract resources on
* job.event.free (job manager has returned resources to the scheduler).
*/
struct resdb {
struct rlist *allocated;
};

static void resdb_destroy (struct resdb *resdb)
{
if (resdb) {
int saved_errno = errno;
rlist_destroy (resdb->allocated);
free (resdb);
errno = saved_errno;
}
}

static struct resdb *resdb_create (void)
{
struct resdb *resdb;

if (!(resdb = calloc (1, sizeof (*resdb))))
return NULL;
if (!(resdb->allocated = rlist_create())) {
free (resdb);
errno = ENOMEM;
return NULL;
}
return resdb;
}

/* Generate the kvs path to R for a given job
*/
static int res_makekey (flux_jobid_t id, char *buf, size_t size)
{
char dir[128];
if (flux_job_id_encode (id, "kvs", dir, sizeof (dir)) < 0)
return -1;
if (snprintf (buf, size, "%s.R", dir) >= size) {
errno = EOVERFLOW;
return -1;
}
return 0;
}

/* Synchronously look up R for a given job and convert it to an rlist object
* which the caller must destroy with rlist_destroy().
*/
static struct rlist *res_lookup (flux_t *h, flux_jobid_t id)
{
char key[128];
flux_future_t *f = NULL;
const char *R;
struct rlist *rlist;

if (res_makekey (id, key, sizeof (key)) < 0
|| !(f = flux_kvs_lookup (h, NULL, 0, key))
|| flux_kvs_lookup_get (f, &R) < 0
|| !(rlist = rlist_from_R (R))) {
flux_future_destroy (f);
return NULL;
}
flux_future_destroy (f);
return rlist;
}

static int jobtap_cb (flux_plugin_t *p,
const char *topic,
flux_plugin_arg_t *args,
void *arg)
{
struct resdb *resdb = flux_plugin_aux_get (p, auxname);
flux_t *h = flux_jobtap_get_flux (p);
flux_jobid_t id;

if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s:I}",
"id", &id) < 0) {
flux_log (h,
LOG_ERR,
"%s %s: unpack: %s",
PLUGIN_NAME,
topic,
flux_plugin_arg_strerror (args));
return -1;
}
/* job.event.* callbacks are not received unless subscribed on a per-job
* basis, so subscribe to them in the job.new callback.
*/
if (streq (topic, "job.new")) {
if (flux_jobtap_job_subscribe (p, id) < 0) {
flux_log_error (h,
"%s(%s) %s: subscribe",
PLUGIN_NAME,
idf58 (id),
topic);
}
}
/* Look up R that was just allocated to the job and attach it to the job
* aux container so we don't have to look it up again on free. Call
* rlist_append() to add the resources to resdb->allocated. If that
* fails, some resources are already allocated so raise a fatal exception
* on the job.
*/
else if (streq (topic, "job.event.alloc")) {
struct rlist *R;
if (!(R = res_lookup (h, id))
|| flux_jobtap_job_aux_set (p,
id,
PLUGIN_NAME "::R",
R,
(flux_free_f)rlist_destroy) < 0) {
flux_log_error (h,
"%s(%s) %s: failed to lookup or cache R",
PLUGIN_NAME,
idf58 (id),
topic);
rlist_destroy (R);
return -1;
}
if (rlist_append (resdb->allocated, R) < 0) {
flux_jobtap_raise_exception (p,
id,
"alloc-check",
0,
"resources already allocated");
}
}
/* Get R that was just freed from the job's aux container and remove it
* from resdb->allocated. Any jobs that had allocations before the module
* will not have the R aux item, so silently return success in that case.
*/
else if (streq (topic, "job.event.free")) {
struct rlist *R = flux_jobtap_job_aux_get (p, id, PLUGIN_NAME "::R");
if (R) {
struct rlist *diff;
if (!(diff = rlist_diff (resdb->allocated, R))) {
flux_log_error (h,
"%s(%s) %s: rlist_diff",
PLUGIN_NAME,
idf58 (id),
topic);
return -1;
}
rlist_destroy (resdb->allocated);
resdb->allocated = diff;
}
}
return 0;
}

static const struct flux_plugin_handler tab[] = {
{ "job.event.alloc", jobtap_cb, NULL },
{ "job.event.free", jobtap_cb, NULL },
{ "job.new", jobtap_cb, NULL },
{ 0 }
};

int flux_plugin_init (flux_plugin_t *p)
{
struct resdb *resdb;

if (!(resdb = resdb_create ())
|| flux_plugin_aux_set (p,
auxname,
resdb,
(flux_free_f)resdb_destroy) < 0) {
resdb_destroy (resdb);
return -1;
}
return flux_plugin_register (p, "alloc-check", tab);
}

// vi:ts=4 sw=4 expandtab
1 change: 1 addition & 0 deletions t/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ TESTSCRIPTS = \
t2300-sched-simple.t \
t2302-sched-simple-up-down.t \
t2303-sched-hello.t \
t2304-sched-simple-alloc-check.t \
t2310-resource-module.t \
t2311-resource-drain.t \
t2312-resource-exclude.t \
Expand Down
79 changes: 79 additions & 0 deletions t/t2304-sched-simple-alloc-check.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#!/bin/sh

test_description='check that sched-simple never double books resources'

# Append --logfile option if FLUX_TESTS_LOGFILE is set in environment:
test -n "$FLUX_TESTS_LOGFILE" && set -- "$@" --logfile
. $(dirname $0)/sharness.sh

test_under_flux 1

# Verify that alloc-check plugin works using alloc-bypass
test_expect_success 'load alloc-bypass and alloc-check plugins' '
flux jobtap load alloc-bypass.so &&
flux jobtap load alloc-check.so
'
test_expect_success 'run an alloc-bypass sleep job' '
flux submit \
-vvv \
--wait-event=start \
--setattr=alloc-bypass.R="$(flux R encode -r 0)" \
-n 1 \
sleep inf
'
test_expect_success 'a regular job fails with an alloc-check exception' '
run_timeout 30 flux submit --flags=waitable -vvv \
--wait-event=exception \
-N1 /bin/true >bypass.jobid
'
test_expect_success 'flux job wait says the job failed' '
test_must_fail flux job wait -v $(cat bypass.jobid)
'
test_expect_success 'clean up jobs' '
flux cancel --all &&
flux queue drain
'
test_expect_success 'unload plugins' '
flux jobtap remove alloc-check.so &&
flux jobtap remove alloc-bypass.so
'

# Check that sched-simple doesn't suffer from time limit issue like
# flux-framework/flux-sched#1043
#
test_expect_success 'configure epilog with 2s delay' '
flux config load <<-EOT
[job-manager]
plugins = [
{ load = "perilog.so" },
]
epilog.command = [ "flux", "perilog-run", "epilog", "-e", "sleep,2" ]
EOT
'
test_expect_success 'load alloc-check plugin' '
flux jobtap load alloc-check.so
'
test_expect_success 'submit consecutive jobs that exceed their time limit' '
(for i in $(seq 5); \
do flux submit -N1 -x -t1s sleep inf; \
done) >jobids
'
test_expect_success 'wait for jobs to complete and capture their stderr' '
(for id in $(cat jobids); do \
flux job attach $id || true; \
done) 2>joberr
'
test_expect_success 'some jobs received timeout exception' '
grep "job.exception type=timeout" joberr
'
test_expect_success 'no jobs received alloc-check exception' '
test_must_fail grep "job.exception type=alloc-check" joberr
'
test_expect_success 'remove alloc-check plugin' '
flux jobtap remove alloc-check.so
'
test_expect_success 'undo epilog config' '
flux config load </dev/null
'

test_done

0 comments on commit e0f2211

Please sign in to comment.