Skip to content

Commit

Permalink
Clean up task expire handling.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Sep 26, 2023
1 parent d1e6e1c commit 84ad390
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 67 deletions.
2 changes: 1 addition & 1 deletion cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1759,7 +1759,7 @@ async def main_loop(self) -> None:
# (Could do this periodically?)
self.xtrigger_mgr.housekeep(self.pool.get_tasks())

self.pool.set_expired_tasks()
self.pool.clock_expire_tasks()
self.release_queued_tasks()

if self.pool.sim_time_check(self.message_queue):
Expand Down
40 changes: 33 additions & 7 deletions cylc/flow/task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,13 @@
TASK_STATUS_WAITING
)
from cylc.flow.task_outputs import (
TASK_OUTPUT_SUBMITTED, TASK_OUTPUT_STARTED, TASK_OUTPUT_SUCCEEDED,
TASK_OUTPUT_FAILED, TASK_OUTPUT_SUBMIT_FAILED)
TASK_OUTPUT_EXPIRED,
TASK_OUTPUT_SUBMITTED,
TASK_OUTPUT_STARTED,
TASK_OUTPUT_SUCCEEDED,
TASK_OUTPUT_FAILED,
TASK_OUTPUT_SUBMIT_FAILED
)
from cylc.flow.wallclock import (
get_current_time_string,
get_seconds_as_interval_string as intvl_as_str
Expand Down Expand Up @@ -116,11 +121,15 @@ def log_task_job_activity(ctx, workflow, point, name, submit_num=None):
try:
with open(os.path.expandvars(job_activity_log), "ab") as handle:
handle.write((ctx_str + '\n').encode())
except IOError as exc:
# This happens when there is no job directory, e.g. if job host
# selection command causes an submission failure, there will be no job
# directory. In this case, just send the information to the log.
LOG.exception(exc)
except IOError:
# This happens when there is no job directory. E.g., if a job host
# selection command causes a submission failure, or if a waiting task
# expires before a job log directory is otherwise needed.
# (Don't log the exception content, it looks like a bug).
LOG.warning(
f"There is no log directory for {point}/{name} job:{submit_num}"
" so I'll just log the following activity."
)
LOG.info(ctx_str)
if ctx.cmd and ctx.ret_code:
LOG.error(ctx_str)
Expand Down Expand Up @@ -337,6 +346,7 @@ class TaskEventsManager():
EVENT_RETRY = "retry"
EVENT_STARTED = TASK_OUTPUT_STARTED
EVENT_SUBMITTED = TASK_OUTPUT_SUBMITTED
EVENT_EXPIRED = TASK_OUTPUT_EXPIRED
EVENT_SUBMIT_FAILED = "submission failed"
EVENT_SUBMIT_RETRY = "submission retry"
EVENT_SUCCEEDED = TASK_OUTPUT_SUCCEEDED
Expand Down Expand Up @@ -638,6 +648,11 @@ def process_message(
elif message == self.EVENT_SUCCEEDED:
self._process_message_succeeded(itask, event_time)
self.spawn_children(itask, TASK_OUTPUT_SUCCEEDED)

elif message == self.EVENT_EXPIRED:
self._process_message_expired(itask, event_time)
self.spawn_children(itask, TASK_OUTPUT_EXPIRED)

elif message == self.EVENT_FAILED:
if (
flag == self.FLAG_RECEIVED
Expand All @@ -647,6 +662,7 @@ def process_message(
if self._process_message_failed(
itask, event_time, self.JOB_FAILED):
self.spawn_children(itask, TASK_OUTPUT_FAILED)

elif message == self.EVENT_SUBMIT_FAILED:
if (
flag == self.FLAG_RECEIVED
Expand All @@ -659,6 +675,7 @@ def process_message(
submit_num
):
self.spawn_children(itask, TASK_OUTPUT_SUBMIT_FAILED)

elif message == self.EVENT_SUBMITTED:
if (
flag == self.FLAG_RECEIVED
Expand Down Expand Up @@ -1159,6 +1176,15 @@ def _process_message_started(self, itask, event_time):
if TimerFlags.SUBMISSION_RETRY in itask.try_timers:
itask.try_timers[TimerFlags.SUBMISSION_RETRY].num = 0

def _process_message_expired(self, itask, event_time):
"""Helper for process_message, handle task expiry."""
# state reset already done for expired
msg = 'Task expired: will not submit job.'
self.setup_event_handlers(itask, self.EVENT_EXPIRED, msg)
self.data_store_mgr.delta_task_state(itask)
# self.data_store_mgr.delta_task_held(itask) # ??
self._reset_job_timers(itask)

def _process_message_succeeded(self, itask, event_time):
"""Helper for process_message, handle a succeeded message."""

Expand Down
63 changes: 7 additions & 56 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1640,27 +1640,13 @@ def _set_outputs(self, point, taskdef, outputs, flow_nums, flow_wait):
# in the previous-submit log directory.

# convert labels to messages, to send to task events manager.
good = set()
for out in outputs:
msg = itask.state.outputs.get_msg(out)
if msg is None:
LOG.warning(f"{point}/{taskdef.name} has no output {out}")
else:
good.add(msg)
if not good:
# No valid outputs requested.
return

# Try to spawn children of the outputs.
for msg in good:
if msg == TASK_OUTPUT_EXPIRED:
# not caused by task messages
self._expire_task(itask)
self.spawn_on_output(itask, expired) ## TODO CONTINUE FROM pr 5412
else:
# Try to spawn children of this output.
self.task_events_mgr.process_message(itask, logging.INFO, msg)
# TODO remove this - just log the actual spawning events
LOG.info(f"[{itask}] Forced spawning on {msg}")

def _set_prereqs(self, point, taskdef, prereqs, flow_nums, flow_wait):
"""Set given prerequisites of a target task.
Expand Down Expand Up @@ -1877,48 +1863,13 @@ def sim_time_check(self, message_queue: 'Queue[TaskMsg]') -> bool:
sim_task_state_changed = True
return sim_task_state_changed

def set_expired_tasks(self):
res = False
def clock_expire_tasks(self):
"""Expire any tasks past their clock-expiry time."""
for itask in self.get_tasks():
if self._set_expired_task(itask):
res = True
return res

def _set_expired_task(self, itask):
"""Check if task has expired. Set state and event handler if so.
Return True if task has expired.
"""
if (
not itask.state(
TASK_STATUS_WAITING,
is_held=False
)
or itask.tdef.expiration_offset is None
):
return False

if itask.expire_time is None:
itask.expire_time = (
itask.get_point_as_seconds() +
itask.get_offset_as_seconds(itask.tdef.expiration_offset))

if (
time() > itask.expire_time
and itask.state_reset(TASK_STATUS_EXPIRED)
):
self._expire_task(itask)
return True

return False

def _expire_task(self, itask):
msg = 'Task expired: will not submit job.'
LOG.warning(f"[{itask}] {msg}")
self.task_events_mgr.setup_event_handlers(itask, "expired", msg)
self.data_store_mgr.delta_task_state(itask)
self.data_store_mgr.delta_task_held(itask)
self.spawn_on_output(itask, 'expired')
if not itask.clock_expire():
continue
self.task_events_mgr.process_message(
itask, logging.WARNING, TASK_OUTPUT_EXPIRED)

def task_succeeded(self, id_):
"""Return True if task with id_ is in the succeeded state."""
Expand Down
31 changes: 28 additions & 3 deletions cylc/flow/task_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from collections import Counter
from copy import copy
from fnmatch import fnmatchcase
from time import time
from typing import (
Any, Callable, Dict, List, Set, Tuple, Optional, TYPE_CHECKING
)
Expand All @@ -29,7 +30,11 @@
from cylc.flow.id import Tokens
from cylc.flow.platforms import get_platform
from cylc.flow.task_action_timer import TimerFlags
from cylc.flow.task_state import TaskState, TASK_STATUS_WAITING
from cylc.flow.task_state import (
TaskState,
TASK_STATUS_WAITING,
TASK_STATUS_EXPIRED
)
from cylc.flow.taskdef import generate_graph_children
from cylc.flow.wallclock import get_unix_time_from_time_string as str2time
from cylc.flow.cycling.iso8601 import (
Expand Down Expand Up @@ -248,16 +253,23 @@ def __init__(
self.non_unique_events = Counter() # type: ignore # TODO: figure out

self.clock_trigger_time: Optional[float] = None
self.expire_time: Optional[float] = None
self.late_time: Optional[float] = None
self.is_late = is_late
self.waiting_on_job_prep = False

self.state = TaskState(tdef, self.point, status, is_held)

# Determine graph children of this task (for spawning).
self.graph_children = generate_graph_children(tdef, self.point)

self.expire_time: Optional[float] = None
if self.tdef.expiration_offset is not None:
self.expire_time = (
self.get_point_as_seconds() +
self.get_offset_as_seconds(
self.tdef.expiration_offset
)
)

def __repr__(self) -> str:
return f"<{self.__class__.__name__} '{self.tokens}'>"

Expand Down Expand Up @@ -483,3 +495,16 @@ def satisfy_me(self, prereqs) -> bool:
for err in bad:
LOG.warning(f"{self.identity} has no prerequisites {err}")
return len(bad) == 0

def clock_expire(self) -> bool:
"""Check for, and do, clock expiry. Return True if expired."""

if (
self.expire_time is None # expiry not configured
or self.state(TASK_STATUS_EXPIRED) # already expired
or time() < self.expire_time # not time yet
):
return False

self.state.reset(TASK_STATUS_EXPIRED)
return True
4 changes: 4 additions & 0 deletions cylc/flow/taskdef.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from cylc.flow.exceptions import TaskDefError
from cylc.flow.task_id import TaskID
from cylc.flow.task_state import (
TASK_OUTPUT_EXPIRED,
TASK_OUTPUT_SUBMITTED,
TASK_OUTPUT_SUBMIT_FAILED,
TASK_OUTPUT_SUCCEEDED,
Expand Down Expand Up @@ -212,6 +213,9 @@ def tweak_outputs(self):
):
self.set_required_output(TASK_OUTPUT_SUCCEEDED, True)

# Expired must be optional
self.set_required_output(TASK_OUTPUT_EXPIRED, False)

# In Cylc 7 back compat mode, make all success outputs required.
if cylc.flow.flags.cylc7_back_compat:
for output in [
Expand Down
1 change: 1 addition & 0 deletions tests/unit/test_id_match.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def _task_proxy(id_, hier):
hier.append('root')
tdef = create_autospec(TaskDef, namespace_hierarchy=hier)
tdef.name = tokens['task']
tdef.expiration_offset = None
return TaskProxy(
Tokens('~user/workflow'),
tdef,
Expand Down

0 comments on commit 84ad390

Please sign in to comment.