From 84ad3902310d291c06fd3f17f24eff8904efe72b Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Tue, 26 Sep 2023 12:22:34 +1300 Subject: [PATCH] Clean up task expire handling. --- cylc/flow/scheduler.py | 2 +- cylc/flow/task_events_mgr.py | 40 +++++++++++++++++++---- cylc/flow/task_pool.py | 63 ++++-------------------------------- cylc/flow/task_proxy.py | 31 ++++++++++++++++-- cylc/flow/taskdef.py | 4 +++ tests/unit/test_id_match.py | 1 + 6 files changed, 74 insertions(+), 67 deletions(-) diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 5eb3e03583..ae5a3951f9 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -1759,7 +1759,7 @@ async def main_loop(self) -> None: # (Could do this periodically?) self.xtrigger_mgr.housekeep(self.pool.get_tasks()) - self.pool.set_expired_tasks() + self.pool.clock_expire_tasks() self.release_queued_tasks() if self.pool.sim_time_check(self.message_queue): diff --git a/cylc/flow/task_events_mgr.py b/cylc/flow/task_events_mgr.py index 46d46aae94..d0ca803926 100644 --- a/cylc/flow/task_events_mgr.py +++ b/cylc/flow/task_events_mgr.py @@ -71,8 +71,13 @@ TASK_STATUS_WAITING ) from cylc.flow.task_outputs import ( - TASK_OUTPUT_SUBMITTED, TASK_OUTPUT_STARTED, TASK_OUTPUT_SUCCEEDED, - TASK_OUTPUT_FAILED, TASK_OUTPUT_SUBMIT_FAILED) + TASK_OUTPUT_EXPIRED, + TASK_OUTPUT_SUBMITTED, + TASK_OUTPUT_STARTED, + TASK_OUTPUT_SUCCEEDED, + TASK_OUTPUT_FAILED, + TASK_OUTPUT_SUBMIT_FAILED +) from cylc.flow.wallclock import ( get_current_time_string, get_seconds_as_interval_string as intvl_as_str @@ -116,11 +121,15 @@ def log_task_job_activity(ctx, workflow, point, name, submit_num=None): try: with open(os.path.expandvars(job_activity_log), "ab") as handle: handle.write((ctx_str + '\n').encode()) - except IOError as exc: - # This happens when there is no job directory, e.g. if job host - # selection command causes an submission failure, there will be no job - # directory. In this case, just send the information to the log. - LOG.exception(exc) + except IOError: + # This happens when there is no job directory. E.g., if a job host + # selection command causes a submission failure, or if a waiting task + # expires before a job log directory is otherwise needed. + # (Don't log the exception content, it looks like a bug). + LOG.warning( + f"There is no log directory for {point}/{name} job:{submit_num}" + " so I'll just log the following activity." + ) LOG.info(ctx_str) if ctx.cmd and ctx.ret_code: LOG.error(ctx_str) @@ -337,6 +346,7 @@ class TaskEventsManager(): EVENT_RETRY = "retry" EVENT_STARTED = TASK_OUTPUT_STARTED EVENT_SUBMITTED = TASK_OUTPUT_SUBMITTED + EVENT_EXPIRED = TASK_OUTPUT_EXPIRED EVENT_SUBMIT_FAILED = "submission failed" EVENT_SUBMIT_RETRY = "submission retry" EVENT_SUCCEEDED = TASK_OUTPUT_SUCCEEDED @@ -638,6 +648,11 @@ def process_message( elif message == self.EVENT_SUCCEEDED: self._process_message_succeeded(itask, event_time) self.spawn_children(itask, TASK_OUTPUT_SUCCEEDED) + + elif message == self.EVENT_EXPIRED: + self._process_message_expired(itask, event_time) + self.spawn_children(itask, TASK_OUTPUT_EXPIRED) + elif message == self.EVENT_FAILED: if ( flag == self.FLAG_RECEIVED @@ -647,6 +662,7 @@ def process_message( if self._process_message_failed( itask, event_time, self.JOB_FAILED): self.spawn_children(itask, TASK_OUTPUT_FAILED) + elif message == self.EVENT_SUBMIT_FAILED: if ( flag == self.FLAG_RECEIVED @@ -659,6 +675,7 @@ def process_message( submit_num ): self.spawn_children(itask, TASK_OUTPUT_SUBMIT_FAILED) + elif message == self.EVENT_SUBMITTED: if ( flag == self.FLAG_RECEIVED @@ -1159,6 +1176,15 @@ def _process_message_started(self, itask, event_time): if TimerFlags.SUBMISSION_RETRY in itask.try_timers: itask.try_timers[TimerFlags.SUBMISSION_RETRY].num = 0 + def _process_message_expired(self, itask, event_time): + """Helper for process_message, handle task expiry.""" + # state reset already done for expired + msg = 'Task expired: will not submit job.' + self.setup_event_handlers(itask, self.EVENT_EXPIRED, msg) + self.data_store_mgr.delta_task_state(itask) + # self.data_store_mgr.delta_task_held(itask) # ?? + self._reset_job_timers(itask) + def _process_message_succeeded(self, itask, event_time): """Helper for process_message, handle a succeeded message.""" diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index ae4521caf8..add2e6933b 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -1640,27 +1640,13 @@ def _set_outputs(self, point, taskdef, outputs, flow_nums, flow_wait): # in the previous-submit log directory. # convert labels to messages, to send to task events manager. - good = set() for out in outputs: msg = itask.state.outputs.get_msg(out) if msg is None: LOG.warning(f"{point}/{taskdef.name} has no output {out}") else: - good.add(msg) - if not good: - # No valid outputs requested. - return - - # Try to spawn children of the outputs. - for msg in good: - if msg == TASK_OUTPUT_EXPIRED: - # not caused by task messages - self._expire_task(itask) - self.spawn_on_output(itask, expired) ## TODO CONTINUE FROM pr 5412 - else: + # Try to spawn children of this output. self.task_events_mgr.process_message(itask, logging.INFO, msg) - # TODO remove this - just log the actual spawning events - LOG.info(f"[{itask}] Forced spawning on {msg}") def _set_prereqs(self, point, taskdef, prereqs, flow_nums, flow_wait): """Set given prerequisites of a target task. @@ -1877,48 +1863,13 @@ def sim_time_check(self, message_queue: 'Queue[TaskMsg]') -> bool: sim_task_state_changed = True return sim_task_state_changed - def set_expired_tasks(self): - res = False + def clock_expire_tasks(self): + """Expire any tasks past their clock-expiry time.""" for itask in self.get_tasks(): - if self._set_expired_task(itask): - res = True - return res - - def _set_expired_task(self, itask): - """Check if task has expired. Set state and event handler if so. - - Return True if task has expired. - """ - if ( - not itask.state( - TASK_STATUS_WAITING, - is_held=False - ) - or itask.tdef.expiration_offset is None - ): - return False - - if itask.expire_time is None: - itask.expire_time = ( - itask.get_point_as_seconds() + - itask.get_offset_as_seconds(itask.tdef.expiration_offset)) - - if ( - time() > itask.expire_time - and itask.state_reset(TASK_STATUS_EXPIRED) - ): - self._expire_task(itask) - return True - - return False - - def _expire_task(self, itask): - msg = 'Task expired: will not submit job.' - LOG.warning(f"[{itask}] {msg}") - self.task_events_mgr.setup_event_handlers(itask, "expired", msg) - self.data_store_mgr.delta_task_state(itask) - self.data_store_mgr.delta_task_held(itask) - self.spawn_on_output(itask, 'expired') + if not itask.clock_expire(): + continue + self.task_events_mgr.process_message( + itask, logging.WARNING, TASK_OUTPUT_EXPIRED) def task_succeeded(self, id_): """Return True if task with id_ is in the succeeded state.""" diff --git a/cylc/flow/task_proxy.py b/cylc/flow/task_proxy.py index d166e94271..4791d5c670 100644 --- a/cylc/flow/task_proxy.py +++ b/cylc/flow/task_proxy.py @@ -19,6 +19,7 @@ from collections import Counter from copy import copy from fnmatch import fnmatchcase +from time import time from typing import ( Any, Callable, Dict, List, Set, Tuple, Optional, TYPE_CHECKING ) @@ -29,7 +30,11 @@ from cylc.flow.id import Tokens from cylc.flow.platforms import get_platform from cylc.flow.task_action_timer import TimerFlags -from cylc.flow.task_state import TaskState, TASK_STATUS_WAITING +from cylc.flow.task_state import ( + TaskState, + TASK_STATUS_WAITING, + TASK_STATUS_EXPIRED +) from cylc.flow.taskdef import generate_graph_children from cylc.flow.wallclock import get_unix_time_from_time_string as str2time from cylc.flow.cycling.iso8601 import ( @@ -248,16 +253,23 @@ def __init__( self.non_unique_events = Counter() # type: ignore # TODO: figure out self.clock_trigger_time: Optional[float] = None - self.expire_time: Optional[float] = None self.late_time: Optional[float] = None self.is_late = is_late self.waiting_on_job_prep = False self.state = TaskState(tdef, self.point, status, is_held) - # Determine graph children of this task (for spawning). self.graph_children = generate_graph_children(tdef, self.point) + self.expire_time: Optional[float] = None + if self.tdef.expiration_offset is not None: + self.expire_time = ( + self.get_point_as_seconds() + + self.get_offset_as_seconds( + self.tdef.expiration_offset + ) + ) + def __repr__(self) -> str: return f"<{self.__class__.__name__} '{self.tokens}'>" @@ -483,3 +495,16 @@ def satisfy_me(self, prereqs) -> bool: for err in bad: LOG.warning(f"{self.identity} has no prerequisites {err}") return len(bad) == 0 + + def clock_expire(self) -> bool: + """Check for, and do, clock expiry. Return True if expired.""" + + if ( + self.expire_time is None # expiry not configured + or self.state(TASK_STATUS_EXPIRED) # already expired + or time() < self.expire_time # not time yet + ): + return False + + self.state.reset(TASK_STATUS_EXPIRED) + return True diff --git a/cylc/flow/taskdef.py b/cylc/flow/taskdef.py index 0d95fdc3f3..ede3287ef2 100644 --- a/cylc/flow/taskdef.py +++ b/cylc/flow/taskdef.py @@ -23,6 +23,7 @@ from cylc.flow.exceptions import TaskDefError from cylc.flow.task_id import TaskID from cylc.flow.task_state import ( + TASK_OUTPUT_EXPIRED, TASK_OUTPUT_SUBMITTED, TASK_OUTPUT_SUBMIT_FAILED, TASK_OUTPUT_SUCCEEDED, @@ -212,6 +213,9 @@ def tweak_outputs(self): ): self.set_required_output(TASK_OUTPUT_SUCCEEDED, True) + # Expired must be optional + self.set_required_output(TASK_OUTPUT_EXPIRED, False) + # In Cylc 7 back compat mode, make all success outputs required. if cylc.flow.flags.cylc7_back_compat: for output in [ diff --git a/tests/unit/test_id_match.py b/tests/unit/test_id_match.py index 3fda5527f2..8727023a86 100644 --- a/tests/unit/test_id_match.py +++ b/tests/unit/test_id_match.py @@ -43,6 +43,7 @@ def _task_proxy(id_, hier): hier.append('root') tdef = create_autospec(TaskDef, namespace_hierarchy=hier) tdef.name = tokens['task'] + tdef.expiration_offset = None return TaskProxy( Tokens('~user/workflow'), tdef,