Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

workflow events: fix an issue where "timeout" events would not fire #5959

Merged
merged 3 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/5959.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix an issue where workflow "timeout" events were not fired in all situations when they should have been.
227 changes: 113 additions & 114 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,8 @@ async def run_scheduler(self) -> None:
# Non-async sleep - yield to other threads rather than event loop
sleep(0)
self.profiler.start()
await self.main_loop()
while True: # MAIN LOOP
await self._main_loop()
Comment on lines -653 to +654
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On changing to this, it might make sense (clarity reasons) to move the sleep code from the end of the _main_loop method and into the while block?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reasonable. I think something (besides the sleep) is using tinit which might be why I left it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True. In that case, fine to leave as-is.


except SchedulerStop as exc:
# deliberate stop
Expand Down Expand Up @@ -1693,142 +1694,141 @@ def update_profiler_logs(self, tinit):
self.count, get_current_time_string()))
self.count += 1

async def main_loop(self) -> None:
"""The scheduler main loop."""
while True: # MAIN LOOP
tinit = time()
async def _main_loop(self) -> None:
"""A single iteration of the main loop."""
tinit = time()

# Useful for debugging core scheduler issues:
# self.pool.log_task_pool(logging.CRITICAL)
if self.incomplete_ri_map:
self.manage_remote_init()
# Useful for debugging core scheduler issues:
# self.pool.log_task_pool(logging.CRITICAL)
if self.incomplete_ri_map:
self.manage_remote_init()

await self.process_command_queue()
self.proc_pool.process()
await self.process_command_queue()
self.proc_pool.process()

# Unqueued tasks with satisfied prerequisites must be waiting on
# xtriggers or ext_triggers. Check these and queue tasks if ready.
for itask in self.pool.get_tasks():
if (
not itask.state(TASK_STATUS_WAITING)
or itask.state.is_queued
or itask.state.is_runahead
):
continue
# Unqueued tasks with satisfied prerequisites must be waiting on
# xtriggers or ext_triggers. Check these and queue tasks if ready.
for itask in self.pool.get_tasks():
if (
not itask.state(TASK_STATUS_WAITING)
or itask.state.is_queued
or itask.state.is_runahead
):
continue

if (
itask.state.xtriggers
and not itask.state.xtriggers_all_satisfied()
):
self.xtrigger_mgr.call_xtriggers_async(itask)
if (
itask.state.xtriggers
and not itask.state.xtriggers_all_satisfied()
):
self.xtrigger_mgr.call_xtriggers_async(itask)

if (
itask.state.external_triggers
and not itask.state.external_triggers_all_satisfied()
):
self.broadcast_mgr.check_ext_triggers(
itask, self.ext_trigger_queue)
if (
itask.state.external_triggers
and not itask.state.external_triggers_all_satisfied()
):
self.broadcast_mgr.check_ext_triggers(
itask, self.ext_trigger_queue)

if all(itask.is_ready_to_run()):
self.pool.queue_task(itask)
if all(itask.is_ready_to_run()):
self.pool.queue_task(itask)

if self.xtrigger_mgr.do_housekeeping:
self.xtrigger_mgr.housekeep(self.pool.get_tasks())
if self.xtrigger_mgr.do_housekeeping:
self.xtrigger_mgr.housekeep(self.pool.get_tasks())

self.pool.set_expired_tasks()
self.release_queued_tasks()
self.pool.set_expired_tasks()
self.release_queued_tasks()

if self.pool.sim_time_check(self.message_queue):
# A simulated task state change occurred.
self.reset_inactivity_timer()
if self.pool.sim_time_check(self.message_queue):
# A simulated task state change occurred.
self.reset_inactivity_timer()

self.broadcast_mgr.expire_broadcast(self.pool.get_min_point())
self.late_tasks_check()
self.broadcast_mgr.expire_broadcast(self.pool.get_min_point())
self.late_tasks_check()

self.process_queued_task_messages()
await self.process_command_queue()
self.task_events_mgr.process_events(self)
self.process_queued_task_messages()
await self.process_command_queue()
self.task_events_mgr.process_events(self)

# Update state summary, database, and uifeed
self.workflow_db_mgr.put_task_event_timers(self.task_events_mgr)
# Update state summary, database, and uifeed
self.workflow_db_mgr.put_task_event_timers(self.task_events_mgr)

# List of task whose states have changed.
updated_task_list = [
t for t in self.pool.get_tasks() if t.state.is_updated]
has_updated = updated_task_list or self.is_updated
# List of task whose states have changed.
updated_task_list = [
t for t in self.pool.get_tasks() if t.state.is_updated]
has_updated = updated_task_list or self.is_updated

if updated_task_list and self.is_restart_timeout_wait:
# Stop restart timeout if action has been triggered.
with suppress(KeyError):
self.timers[self.EVENT_RESTART_TIMEOUT].stop()
self.is_restart_timeout_wait = False

if has_updated or self.data_store_mgr.updates_pending:
# Update the datastore.
await self.update_data_structure()
if updated_task_list and self.is_restart_timeout_wait:
# Stop restart timeout if action has been triggered.
with suppress(KeyError):
self.timers[self.EVENT_RESTART_TIMEOUT].stop()
self.is_restart_timeout_wait = False

if has_updated:
if not self.is_reloaded:
# (A reload cannot un-stall workflow by itself)
self.is_stalled = False
self.is_reloaded = False
if has_updated or self.data_store_mgr.updates_pending:
# Update the datastore.
await self.update_data_structure()

# Reset workflow and task updated flags.
self.is_updated = False
for itask in updated_task_list:
itask.state.is_updated = False
if has_updated:
if not self.is_reloaded:
# (A reload cannot un-stall workflow by itself)
self.is_stalled = False
self.is_reloaded = False

if not self.is_stalled:
# Stop the stalled timer.
with suppress(KeyError):
self.timers[self.EVENT_STALL_TIMEOUT].stop()
# Reset workflow and task updated flags.
self.is_updated = False
for itask in updated_task_list:
itask.state.is_updated = False

self.process_workflow_db_queue()
if not self.is_stalled:
# Stop the stalled timer.
with suppress(KeyError):
self.timers[self.EVENT_STALL_TIMEOUT].stop()

# If public database is stuck, blast it away by copying the content
# of the private database into it.
self.database_health_check()
self.process_workflow_db_queue()

# Shutdown workflow if timeouts have occurred
self.timeout_check()
# If public database is stuck, blast it away by copying the content
# of the private database into it.
self.database_health_check()

# Does the workflow need to shutdown on task failure?
await self.workflow_shutdown()
# Shutdown workflow if timeouts have occurred
self.timeout_check()

if self.options.profile_mode:
self.update_profiler_logs(tinit)
# Does the workflow need to shutdown on task failure?
await self.workflow_shutdown()

# Run plugin functions
await asyncio.gather(
*main_loop.get_runners(
self.main_loop_plugins,
main_loop.CoroTypes.Periodic,
self
)
if self.options.profile_mode:
self.update_profiler_logs(tinit)

# Run plugin functions
await asyncio.gather(
*main_loop.get_runners(
self.main_loop_plugins,
main_loop.CoroTypes.Periodic,
self
)
)

if not has_updated and not self.stop_mode:
# Has the workflow stalled?
self.check_workflow_stalled()

# Sleep a bit for things to catch up.
# Quick sleep if there are items pending in process pool.
# (Should probably use quick sleep logic for other queues?)
elapsed = time() - tinit
quick_mode = self.proc_pool.is_not_done()
if (elapsed >= self.INTERVAL_MAIN_LOOP or
quick_mode and elapsed >= self.INTERVAL_MAIN_LOOP_QUICK):
# Main loop has taken quite a bit to get through
# Still yield control to other threads by sleep(0.0)
duration: float = 0
elif quick_mode:
duration = self.INTERVAL_MAIN_LOOP_QUICK - elapsed
else:
duration = self.INTERVAL_MAIN_LOOP - elapsed
await asyncio.sleep(duration)
# Record latest main loop interval
self.main_loop_intervals.append(time() - tinit)
# END MAIN LOOP
if not has_updated and not self.stop_mode:
# Has the workflow stalled?
self.check_workflow_stalled()

# Sleep a bit for things to catch up.
# Quick sleep if there are items pending in process pool.
# (Should probably use quick sleep logic for other queues?)
elapsed = time() - tinit
quick_mode = self.proc_pool.is_not_done()
if (elapsed >= self.INTERVAL_MAIN_LOOP or
quick_mode and elapsed >= self.INTERVAL_MAIN_LOOP_QUICK):
# Main loop has taken quite a bit to get through
# Still yield control to other threads by sleep(0.0)
duration: float = 0
elif quick_mode:
duration = self.INTERVAL_MAIN_LOOP_QUICK - elapsed
else:
duration = self.INTERVAL_MAIN_LOOP - elapsed
await asyncio.sleep(duration)
# Record latest main loop interval
self.main_loop_intervals.append(time() - tinit)
# END MAIN LOOP

def _update_workflow_state(self):
"""Update workflow state in the data store and push out any deltas.
Expand Down Expand Up @@ -1867,12 +1867,11 @@ def check_workflow_timers(self):
for event, timer in self.timers.items():
if not timer.timed_out():
continue
self.run_event_handlers(event)
abort_conf = f"abort on {event}"
if self._get_events_conf(abort_conf):
# "cylc play" needs to exit with error status here.
raise SchedulerError(f'"{abort_conf}" is set')
if self._get_events_conf(f"{event} handlers") is not None:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the source of the bug.

In my case I had mail events = <event> set, but not <event> handlers, this was preventing mail events from running.

I think the workflow events manager already has logic for determining when events should be run so this check was superfluous.

self.run_event_handlers(event)
if event == self.EVENT_RESTART_TIMEOUT:
# Unset wait flag to allow normal shutdown.
self.is_restart_timeout_wait = False
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ class MyException(Exception):
def killer():
raise MyException('mess')

one.main_loop = killer
one._main_loop = killer

# make sure that this error causes the flow to shutdown
with pytest.raises(MyException):
Expand Down
Loading
Loading