Skip to content

Commit

Permalink
workflow events: fix an issue where "timeout" events would not fire
Browse files Browse the repository at this point in the history
* Fix a small bug where "workflow timeout" and "inactivity timeout"
  events might not fire depending on the global config.
* Add integration tests to lock down the behaviour of workflow events.
  • Loading branch information
oliver-sanders committed Feb 7, 2024
1 parent 57fe4ae commit ee762a5
Show file tree
Hide file tree
Showing 2 changed files with 301 additions and 109 deletions.
221 changes: 112 additions & 109 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1696,139 +1696,143 @@ def update_profiler_logs(self, tinit):
async def main_loop(self) -> None:
"""The scheduler main loop."""
while True: # MAIN LOOP
tinit = time()
await self._main_loop()

# Useful for debugging core scheduler issues:
# self.pool.log_task_pool(logging.CRITICAL)
if self.incomplete_ri_map:
self.manage_remote_init()
async def _main_loop(self) -> None:
"""A single iteration of the main loop."""
tinit = time()

await self.process_command_queue()
self.proc_pool.process()
# Useful for debugging core scheduler issues:
# self.pool.log_task_pool(logging.CRITICAL)
if self.incomplete_ri_map:
self.manage_remote_init()

# Unqueued tasks with satisfied prerequisites must be waiting on
# xtriggers or ext_triggers. Check these and queue tasks if ready.
for itask in self.pool.get_tasks():
if (
not itask.state(TASK_STATUS_WAITING)
or itask.state.is_queued
or itask.state.is_runahead
):
continue
await self.process_command_queue()
self.proc_pool.process()

if (
itask.state.xtriggers
and not itask.state.xtriggers_all_satisfied()
):
self.xtrigger_mgr.call_xtriggers_async(itask)
# Unqueued tasks with satisfied prerequisites must be waiting on
# xtriggers or ext_triggers. Check these and queue tasks if ready.
for itask in self.pool.get_tasks():
if (
not itask.state(TASK_STATUS_WAITING)
or itask.state.is_queued
or itask.state.is_runahead
):
continue

if (
itask.state.external_triggers
and not itask.state.external_triggers_all_satisfied()
):
self.broadcast_mgr.check_ext_triggers(
itask, self.ext_trigger_queue)
if (
itask.state.xtriggers
and not itask.state.xtriggers_all_satisfied()
):
self.xtrigger_mgr.call_xtriggers_async(itask)

if all(itask.is_ready_to_run()):
self.pool.queue_task(itask)
if (
itask.state.external_triggers
and not itask.state.external_triggers_all_satisfied()
):
self.broadcast_mgr.check_ext_triggers(
itask, self.ext_trigger_queue)

if self.xtrigger_mgr.do_housekeeping:
self.xtrigger_mgr.housekeep(self.pool.get_tasks())
if all(itask.is_ready_to_run()):
self.pool.queue_task(itask)

self.pool.set_expired_tasks()
self.release_queued_tasks()
if self.xtrigger_mgr.do_housekeeping:
self.xtrigger_mgr.housekeep(self.pool.get_tasks())

if self.pool.sim_time_check(self.message_queue):
# A simulated task state change occurred.
self.reset_inactivity_timer()
self.pool.set_expired_tasks()
self.release_queued_tasks()

self.broadcast_mgr.expire_broadcast(self.pool.get_min_point())
self.late_tasks_check()
if self.pool.sim_time_check(self.message_queue):
# A simulated task state change occurred.
self.reset_inactivity_timer()

self.process_queued_task_messages()
await self.process_command_queue()
self.task_events_mgr.process_events(self)
self.broadcast_mgr.expire_broadcast(self.pool.get_min_point())
self.late_tasks_check()

# Update state summary, database, and uifeed
self.workflow_db_mgr.put_task_event_timers(self.task_events_mgr)
self.process_queued_task_messages()
await self.process_command_queue()
self.task_events_mgr.process_events(self)

# List of task whose states have changed.
updated_task_list = [
t for t in self.pool.get_tasks() if t.state.is_updated]
has_updated = updated_task_list or self.is_updated
# Update state summary, database, and uifeed
self.workflow_db_mgr.put_task_event_timers(self.task_events_mgr)

if updated_task_list and self.is_restart_timeout_wait:
# Stop restart timeout if action has been triggered.
with suppress(KeyError):
self.timers[self.EVENT_RESTART_TIMEOUT].stop()
self.is_restart_timeout_wait = False
# List of task whose states have changed.
updated_task_list = [
t for t in self.pool.get_tasks() if t.state.is_updated]
has_updated = updated_task_list or self.is_updated

if has_updated or self.data_store_mgr.updates_pending:
# Update the datastore.
await self.update_data_structure()
if updated_task_list and self.is_restart_timeout_wait:
# Stop restart timeout if action has been triggered.
with suppress(KeyError):
self.timers[self.EVENT_RESTART_TIMEOUT].stop()
self.is_restart_timeout_wait = False

if has_updated:
if not self.is_reloaded:
# (A reload cannot un-stall workflow by itself)
self.is_stalled = False
self.is_reloaded = False
if has_updated or self.data_store_mgr.updates_pending:
# Update the datastore.
await self.update_data_structure()

# Reset workflow and task updated flags.
self.is_updated = False
for itask in updated_task_list:
itask.state.is_updated = False
if has_updated:
if not self.is_reloaded:
# (A reload cannot un-stall workflow by itself)
self.is_stalled = False
self.is_reloaded = False

if not self.is_stalled:
# Stop the stalled timer.
with suppress(KeyError):
self.timers[self.EVENT_STALL_TIMEOUT].stop()
# Reset workflow and task updated flags.
self.is_updated = False
for itask in updated_task_list:
itask.state.is_updated = False

self.process_workflow_db_queue()
if not self.is_stalled:
# Stop the stalled timer.
with suppress(KeyError):
self.timers[self.EVENT_STALL_TIMEOUT].stop()

# If public database is stuck, blast it away by copying the content
# of the private database into it.
self.database_health_check()
self.process_workflow_db_queue()

# Shutdown workflow if timeouts have occurred
self.timeout_check()
# If public database is stuck, blast it away by copying the content
# of the private database into it.
self.database_health_check()

# Does the workflow need to shutdown on task failure?
await self.workflow_shutdown()
# Shutdown workflow if timeouts have occurred
self.timeout_check()

if self.options.profile_mode:
self.update_profiler_logs(tinit)
# Does the workflow need to shutdown on task failure?
await self.workflow_shutdown()

# Run plugin functions
await asyncio.gather(
*main_loop.get_runners(
self.main_loop_plugins,
main_loop.CoroTypes.Periodic,
self
)
if self.options.profile_mode:
self.update_profiler_logs(tinit)

# Run plugin functions
await asyncio.gather(
*main_loop.get_runners(
self.main_loop_plugins,
main_loop.CoroTypes.Periodic,
self
)
)

if not has_updated and not self.stop_mode:
# Has the workflow stalled?
self.check_workflow_stalled()

# Sleep a bit for things to catch up.
# Quick sleep if there are items pending in process pool.
# (Should probably use quick sleep logic for other queues?)
elapsed = time() - tinit
quick_mode = self.proc_pool.is_not_done()
if (elapsed >= self.INTERVAL_MAIN_LOOP or
quick_mode and elapsed >= self.INTERVAL_MAIN_LOOP_QUICK):
# Main loop has taken quite a bit to get through
# Still yield control to other threads by sleep(0.0)
duration: float = 0
elif quick_mode:
duration = self.INTERVAL_MAIN_LOOP_QUICK - elapsed
else:
duration = self.INTERVAL_MAIN_LOOP - elapsed
await asyncio.sleep(duration)
# Record latest main loop interval
self.main_loop_intervals.append(time() - tinit)
# END MAIN LOOP
if not has_updated and not self.stop_mode:
# Has the workflow stalled?
self.check_workflow_stalled()

# Sleep a bit for things to catch up.
# Quick sleep if there are items pending in process pool.
# (Should probably use quick sleep logic for other queues?)
elapsed = time() - tinit
quick_mode = self.proc_pool.is_not_done()
if (elapsed >= self.INTERVAL_MAIN_LOOP or
quick_mode and elapsed >= self.INTERVAL_MAIN_LOOP_QUICK):
# Main loop has taken quite a bit to get through
# Still yield control to other threads by sleep(0.0)
duration: float = 0
elif quick_mode:
duration = self.INTERVAL_MAIN_LOOP_QUICK - elapsed
else:
duration = self.INTERVAL_MAIN_LOOP - elapsed
await asyncio.sleep(duration)
# Record latest main loop interval
self.main_loop_intervals.append(time() - tinit)
# END MAIN LOOP

def _update_workflow_state(self):
"""Update workflow state in the data store and push out any deltas.
Expand Down Expand Up @@ -1867,12 +1871,11 @@ def check_workflow_timers(self):
for event, timer in self.timers.items():
if not timer.timed_out():
continue
self.run_event_handlers(event)
abort_conf = f"abort on {event}"
if self._get_events_conf(abort_conf):
# "cylc play" needs to exit with error status here.
raise SchedulerError(f'"{abort_conf}" is set')
if self._get_events_conf(f"{event} handlers") is not None:
self.run_event_handlers(event)
if event == self.EVENT_RESTART_TIMEOUT:
# Unset wait flag to allow normal shutdown.
self.is_restart_timeout_wait = False
Expand Down
Loading

0 comments on commit ee762a5

Please sign in to comment.