Skip to content

Commit

Permalink
Merge pull request flux-framework#6330 from grondo/issue#6323
Browse files Browse the repository at this point in the history
perilog: never send SIGKILL to prolog/epilog, drain active nodes after kill-timeout instead
  • Loading branch information
mergify[bot] authored Oct 1, 2024
2 parents 12017cc + bea1f0c commit f7793b5
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 110 deletions.
21 changes: 13 additions & 8 deletions doc/man5/flux-config-job-manager.rst
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,12 @@ prolog
exception raised). The default prolog timeout is 30m. To disable
the timeout use ``0`` or ``infinity``.
kill-timeout
(optional) Floating-point number of seconds to wait after sending
SIGTERM to send SIGKILL to the prolog when the prolog has been
canceled due to timeout or exception. The default is 5.0. (This
key is mainly used for testing purposes).
(optional) If a job exception is raised during the job prolog and
``cancel-on-exception`` is true, the prolog will be canceled by
sending it a SIGTERM signal. ``kill-timeout`` is the number of
seconds to wait until any nodes with prolog tasks that are still
active will be drained. The drain reason will include the string
"canceled then timed out". The default is 10.
cancel-on-exception
(optional) A boolean indicating whether a fatal job exception raised
while the prolog is active terminates the prolog. The default is true.
Expand All @@ -151,10 +153,13 @@ epilog
timeout for the epilog, after which it is terminated (and a job
exception raised). By default, the epilog timeout is disabled.
kill-timeout
(optional) Floating-point number of seconds to wait after sending
SIGTERM to send SIGKILL to the epilog when it has been canceled due
to timeout or exception. The default is 5.0. (This key is mainly
used for testing purposes)
(optional) If a job exception is raised during the job epilog and
``cancel-on-exception`` is ``true``, then the epilog will be canceled
by sending it a SIGTERM signal. ``kill-timeout`` is the number of
seconds to wait until any nodes with prolog tasks that are still
active will be drained. The drain reason will include the string
"canceled then timed out". The default is 10. (``kill-timeout`` with
the job epilog should only be used for testing purposes)
cancel-on-exception
(optional) A boolean indicating whether a fatal job exception raised
while the epilog is active terminates the epilog. The default is true.
Expand Down
6 changes: 5 additions & 1 deletion src/cmd/flux-perilog-run.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,11 @@ async def kill_process_tree(parent_pid, sig):
# Iterate the list in reverse so that parent_pid comes last and
# (for the most part) children are signaled before their parent:
for pid in reversed(await get_children(parent_pid)):
os.kill(pid, sig)
try:
os.kill(pid, sig)
except ProcessLookupError:
# No error if process no longer exists
pass


async def run_with_timeout(cmd, label, timeout=1800.0):
Expand Down
162 changes: 104 additions & 58 deletions src/modules/job-manager/plugins/perilog.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
* - The job manager prolog is started at the RUN state.
*
* - If a job gets a fatal exception while the prolog is
* running, the prolog is terminated with SIGTERM, followed
* by SIGKILL
* running, the prolog is canceled and a SIGTERM signal
* is sent. After a configurable timeout, ranks on which
* the prolog is still active are drained.
*
* - The epilog is started as a result of a "finish" event or
* when the prolog completes if a fatal job exception has been
Expand Down Expand Up @@ -105,6 +106,7 @@ struct perilog_proc {
flux_jobid_t id;
uint32_t userid;
json_t *R;
bool per_rank;
bool prolog;
bool cancel_on_exception;
bool canceled;
Expand Down Expand Up @@ -247,7 +249,7 @@ static struct perilog_procdesc *perilog_procdesc_create (json_t *o,
}

pd->cmd = cmd;
pd->kill_timeout = kill_timeout > 0. ? kill_timeout : 5.;
pd->kill_timeout = kill_timeout > 0. ? kill_timeout : 10.;
pd->per_rank = per_rank;
pd->prolog = prolog;
pd->uses_imp = uses_imp;
Expand Down Expand Up @@ -435,11 +437,23 @@ static void emit_finish_event (struct perilog_proc *proc,
}
}

static flux_future_t *drain_failed_ranks (struct perilog_proc *proc)
static bool subprocess_failed (flux_subprocess_t *p)
{
if (flux_subprocess_state (p) == FLUX_SUBPROCESS_FAILED
|| flux_subprocess_status (p) != 0)
return true;
return false;
}

/* Drain ranks that failed, are still active or both. */
static flux_future_t *proc_drain_ranks (struct perilog_proc *proc,
bool drain_failed,
bool drain_active)
{
struct idset *failed = NULL;
flux_future_t *f = NULL;
unsigned long rank;
const char *msg;
char reason[256];
flux_t *h = flux_jobtap_get_flux (proc->p);

Expand All @@ -452,8 +466,8 @@ static flux_future_t *drain_failed_ranks (struct perilog_proc *proc)
while (rank != IDSET_INVALID_ID) {
flux_subprocess_t *p;
if ((p = bulk_exec_get_subprocess (proc->bulk_exec, rank))) {
if (flux_subprocess_state (p) == FLUX_SUBPROCESS_FAILED
|| flux_subprocess_status (p) != 0) {
if ((drain_failed && subprocess_failed (p))
|| (drain_active && flux_subprocess_active (p))) {
if (idset_set (failed, rank) < 0){
flux_log_error (h,
"failed to add rank=%lu to drain set",
Expand All @@ -470,11 +484,19 @@ static flux_future_t *drain_failed_ranks (struct perilog_proc *proc)
perilog_proc_name (proc));
goto out;
}

if (proc->canceled)
msg = "canceled then timed out";
else if (proc->timedout)
msg = "timed out";
else
msg = "failed";

(void) snprintf (reason,
sizeof (reason),
"%s %s for job %s",
perilog_proc_name (proc),
proc->timedout ? "timed out" : "failed",
msg,
idf58 (proc->id));

if (!(f = flux_rpc_pack (h,
Expand Down Expand Up @@ -599,42 +621,45 @@ static void drain_failed_cb (flux_future_t *f, void *arg)
perilog_proc_finish (proc);
}

static bool perilog_per_rank (struct perilog_proc *proc)
static void proc_drain_and_finish (struct perilog_proc *proc,
bool drain_failed,
bool drain_active)
{
if (proc->prolog)
return perilog_config.prolog->per_rank;
return perilog_config.epilog->per_rank;

if (drain_failed || drain_active) {
/* Drain the set of ranks that failed the prolog/epilog. If the
* drain RPC is successful, then wait for the response before
* emitting the "prolog/epilog-finish" event. O/w, resources could
* be freed and handed out to new jobs before they are drained.
*/
if ((proc->drain_f = proc_drain_ranks (proc,
drain_failed,
drain_active))
&& flux_future_then (proc->drain_f,
-1.,
drain_failed_cb,
proc) == 0)
return;

/* O/w, drain RPC failed, fall through so finish event is still
* emitted.
*/
}
perilog_proc_finish (proc);
}

static void completion_cb (struct bulk_exec *bulk_exec, void *arg)
{
struct perilog_proc *proc = bulk_exec_aux_get (bulk_exec, "perilog_proc");
if (proc) {
if (perilog_per_rank (proc)
bool drain_failed = false;

if (proc->per_rank
&& !proc->canceled
&& bulk_exec_rc (bulk_exec) != 0) {
&& bulk_exec_rc (bulk_exec) != 0)
drain_failed = true;

/* Drain the set of ranks that failed the prolog/epilog. If the
* drain RPC is successful, then wait for the response before
* emitting the "prolog/epilog-finish" event. O/w, resources could
* be freed and handed out to new jobs before they are drained.
*/
if ((proc->drain_f = drain_failed_ranks (proc))
&& flux_future_then (proc->drain_f,
-1.,
drain_failed_cb,
proc) == 0)
return;

/* O/w, drain RPC failed, report error and fall through so finish
* event is still emitted.
*/
flux_log_error (flux_jobtap_get_flux (proc->p),
"%s: failed to drain %s failed ranks",
idf58 (proc->id),
perilog_proc_name (proc));
}
perilog_proc_finish (proc);
proc_drain_and_finish (proc, drain_failed, false);
}
}

Expand Down Expand Up @@ -713,8 +738,18 @@ static void io_cb (struct bulk_exec *bulk_exec,
}
}


static void start_cb (struct bulk_exec *exec, void *arg)
{
struct perilog_proc *proc = bulk_exec_aux_get (exec, "perilog_proc");
/* Start timeout timer when processes have started
*/
if (proc && proc->timer)
flux_watcher_start (proc->timer);
}

static struct bulk_exec_ops ops = {
.on_start = NULL,
.on_start = start_cb,
.on_exit = NULL,
.on_complete = completion_cb,
.on_error = error_cb,
Expand Down Expand Up @@ -816,12 +851,14 @@ static struct perilog_proc *procdesc_run (flux_t *h,
perilog_proc_name (proc));
goto error;
}
flux_watcher_start (w);
proc->timer = w;
/* Note: watcher will be started in bulk-exec start callback
*/
}
proc->R = json_incref (R);
proc->bulk_exec = bulk_exec;
proc->ranks = ranks;
proc->per_rank = pd->per_rank;
proc->cancel_on_exception = pd->cancel_on_exception;
proc->kill_timeout = pd->kill_timeout;

Expand Down Expand Up @@ -974,27 +1011,20 @@ static int proc_kill (struct perilog_proc *proc)
return 0;
}

static void proc_kill_timer_cb (flux_reactor_t *r,
flux_watcher_t *w,
int revents,
void *arg)
static void proc_kill_timeout_cb (flux_reactor_t *r,
flux_watcher_t *w,
int revents,
void *arg)
{
flux_t *h;
flux_future_t *f;
struct perilog_proc *proc = arg;

if (!proc || !(h = flux_jobtap_get_flux (proc->p)))
return;

if (!(f = bulk_exec_kill (proc->bulk_exec, NULL, SIGKILL))) {
flux_log_error (h,
"%s: failed to send SIGKILL to %s",
idf58 (proc->id),
perilog_proc_name (proc));
return;
}
/* Do not wait for any response */
flux_future_destroy (f);
flux_t *h = flux_jobtap_get_flux (proc->p);
flux_log_error (h,
"%s: timed out waiting for SIGTERM to terminate %s",
idf58 (proc->id),
perilog_proc_name (proc));
/* Drain active ranks and post finish event
*/
proc_drain_and_finish (proc, false, true);
}

static int proc_kill_timer_start (struct perilog_proc *proc, double timeout)
Expand All @@ -1005,13 +1035,16 @@ static int proc_kill_timer_start (struct perilog_proc *proc, double timeout)
proc->kill_timer = flux_timer_watcher_create (r,
timeout,
0.,
proc_kill_timer_cb,
proc_kill_timeout_cb,
proc);
if (!proc->kill_timer) {
flux_log_error (h,
"%s: failed to start %s kill timer",
idf58 (proc->id),
perilog_proc_name (proc));
/* Since timer cb won't be run, drain and send finish event now
*/
proc_drain_and_finish (proc, false, true);
return -1;
}
flux_watcher_start (proc->kill_timer);
Expand Down Expand Up @@ -1235,11 +1268,18 @@ static int conf_init (flux_plugin_t *p, struct perilog_conf *conf)
return -1;
}

static int watcher_remaining_time (flux_watcher_t *w)
{
double next_wakeup = flux_watcher_next_wakeup (w);
return (int) round (next_wakeup - flux_reactor_time ());
}

static json_t *proc_to_json (struct perilog_proc *proc)
{
const char *state;
struct idset *active_ranks;
char *ranks = NULL;
int remaining_time = -1.;
json_t *o;
int total = bulk_exec_total (proc->bulk_exec);
int active = total - bulk_exec_complete (proc->bulk_exec);
Expand All @@ -1254,12 +1294,18 @@ static json_t *proc_to_json (struct perilog_proc *proc)
if ((active_ranks = bulk_exec_active_ranks (proc->bulk_exec)))
ranks = idset_encode (active_ranks, IDSET_FLAG_RANGE);

o = json_pack ("{s:s s:s s:i s:i s:s}",
if (proc->kill_timer)
remaining_time = watcher_remaining_time (proc->kill_timer);
else if (proc->timer)
remaining_time = watcher_remaining_time (proc->timer);

o = json_pack ("{s:s s:s s:i s:i s:s s:i}",
"name", perilog_proc_name (proc),
"state", state,
"total", total,
"active", active,
"active_ranks", ranks ? ranks : "");
"active_ranks", ranks ? ranks : "",
"remaining_time", remaining_time);
free (ranks);
idset_destroy (active_ranks);
return o;
Expand Down
21 changes: 20 additions & 1 deletion t/t2274-manager-perilog-per-rank.t
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ test_expect_success 'perilog: load a basic per-rank prolog config' '
flux config load <<-EOF &&
[job-manager.prolog]
per-rank = true
command = [ "sleep", "5" ]
command = [ "sleep", "30" ]
[job-manager.epilog]
per-rank = true
command = [ "sleep", "30" ]
Expand All @@ -175,6 +175,7 @@ test_expect_success 'perilog: load a basic per-rank prolog config' '
flux jobtap query perilog.so | jq .conf.prolog
'
test_expect_success 'perilog: prolog runs on all 4 ranks of a 4 node job' '
flux dmesg -c &&
jobid=$(flux submit -N4 hostname) &&
flux job wait-event -vt 30 $jobid prolog-start &&
flux jobtap query perilog.so | jq .procs &&
Expand Down Expand Up @@ -222,6 +223,24 @@ test_expect_success 'perilog: epilog triggered by prolog has correct userid' '
flux job wait-event -vHt 30 $jobid clean &&
flux dmesg -H | grep FLUX_JOB_USERID=$(id -u)
'
test_expect_success 'perilog: canceled prolog drains active ranks after kill_timeout' '
flux config load <<-EOF &&
[job-manager.prolog]
per-rank = true
command = [ "sh", "-c", "trap \"\" 15; if test \$(flux getattr rank) -eq 3; then sleep 10; fi" ]
kill-timeout = 0.5
EOF
flux jobtap query perilog.so | jq .conf.prolog &&
jobid=$(flux submit -N4 hostname) &&
flux job wait-event $jobid prolog-start &&
flux cancel $jobid &&
flux job wait-event $jobid prolog-finish &&
test_debug "echo drained_ranks=$(drained_ranks)" &&
test "$(drained_ranks)" = "3" &&
flux resource drain -no {reason} | grep "canceled then timed out" &&
flux job wait-event -vt 15 $jobid clean &&
flux resource undrain 3
'
test_expect_success 'perilog: signaled prolog is reported' '
flux config load <<-EOF &&
[job-manager.prolog]
Expand Down
Loading

0 comments on commit f7793b5

Please sign in to comment.