Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

opt outputs: submitted is implicitly required #5755

Merged
merged 2 commits into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/5755.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixes an issue where submit-failed tasks could be incorrectly considered as completed rather than causing the workflow to stall.
25 changes: 24 additions & 1 deletion cylc/flow/task_outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,24 @@ def __init__(self, tdef):
self._by_message = {}
self._by_trigger = {}
self._required = set()

# Add outputs from task def.
for trigger, (message, required) in tdef.outputs.items():
self._add(message, trigger, required=required)

# Handle implicit submit requirement
if (
# "submitted" is not declared as optional/required
tdef.outputs[TASK_OUTPUT_SUBMITTED][1] is None
# and "submit-failed" is not declared as optional/required
and tdef.outputs[TASK_OUTPUT_SUBMIT_FAILED][1] is None
):
self._add(
TASK_OUTPUT_SUBMITTED,
TASK_OUTPUT_SUBMITTED,
required=True,
)

def _add(self, message, trigger, is_completed=False, required=False):
"""Add a new output message"""
self._by_message[message] = [trigger, message, is_completed]
Expand Down Expand Up @@ -197,7 +211,16 @@ def is_incomplete(self):
)

def get_incomplete(self):
"""Return a list of required outputs that are not complete."""
"""Return a list of required outputs that are not complete.

A task is incomplete if:

* it finished executing without completing all required outputs
* or if job submission failed and the :submit output was not optional

https://github.com/cylc/cylc-admin/blob/master/docs/proposal-new-output-syntax.md#output-syntax

"""
return [
trigger
for trigger, (_, _, is_completed) in self._by_trigger.items()
Expand Down
5 changes: 4 additions & 1 deletion cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
TASK_OUTPUT_EXPIRED,
TASK_OUTPUT_FAILED,
TASK_OUTPUT_SUCCEEDED,
TASK_OUTPUT_SUBMIT_FAILED,
)
from cylc.flow.util import (
serialise,
Expand Down Expand Up @@ -1376,9 +1377,11 @@ def spawn_on_output(self, itask, output, forced=False):
self.remove(c_task, msg)

if not forced and output in [
# final task statuses
TASK_OUTPUT_SUCCEEDED,
TASK_OUTPUT_EXPIRED,
TASK_OUTPUT_FAILED
TASK_OUTPUT_FAILED,
TASK_OUTPUT_SUBMIT_FAILED,
]:
self.remove_if_complete(itask)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
[meta]
title = "Try out scenarios for intelligent host selection."
description = """
Tasks
- goodhost: a control to check that everything works
- badhost is always going to fail
- mixedhost contains some hosts that will and won't fail
Tasks:
- goodhost: a control to check that everything works
- badhost is always going to fail
- mixedhost contains some hosts that will and won't fail
"""

[scheduler]
Expand All @@ -18,7 +18,10 @@ Tasks
initial cycle point = 1
[[graph]]
# Run good and mixed as controls
R1 = badhosttask:submit-fail? => goodhosttask & mixedhosttask
R1 = """
badhosttask:submit-fail? => goodhosttask & mixedhosttask
mixedhosttask:submit-fail? # permit mixedhosttask to submit-fail
"""

[runtime]
[[root]]
Expand Down
47 changes: 47 additions & 0 deletions tests/functional/spawn-on-demand/18-submitted.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#!/usr/bin/env bash
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#-------------------------------------------------------------------------------

# Test the submitted and submit-failed triggers work correctly.
#
# The :submitted output should be considered required unless explicitly stated
# otherwise.
# See:
# * https://github.com/cylc/cylc-flow/pull/5755
# * https://github.com/cylc/cylc-admin/blob/master/docs/proposal-new-output-syntax.md#output-syntax

. "$(dirname "$0")/test_header"
set_test_number 5

# define a broken platform which will always result in submission failures
create_test_global_config '' '
[platforms]
[[broken]]
hosts = no-such-host
'

install_and_validate
reftest_run

for number in 1 2 3; do
grep_workflow_log_ok \
"${TEST_NAME_BASE}-a${number}" \
"${number}/a${number} .* did not complete required outputs: \['submitted'\]"
done

purge
exit
56 changes: 56 additions & 0 deletions tests/functional/spawn-on-demand/18-submitted/flow.cylc
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
[scheduler]
allow implicit tasks = True
[[events]]
# shut down once the workflow has stalled
# abort on stall timeout = True
# stall timeout = PT0S
stall handlers = cylc stop %(workflow)s
expected task failures = 1/a1, 2/a2, 3/a3

[scheduling]
initial cycle point = 1
cycling mode = integer
runahead limit = P10
[[graph]]
R/1 = """
# a1 should be incomplete (submission is implicitly required)
a1? => b
"""
R/2 = """
# a2 should be incomplete (submission is implicitly required)
a2:finished => b
"""
R/3 = """
# a3 should be incomplete (submission is explicitly required)
a3? => b
a3:submitted => s
"""
R/4 = """
# a4 should be complete (submission is explicitly optional)
a4? => b
a4:submitted? => s
"""
R/5 = """
# a5 should be complete (submission is explicitly optional)
a5? => b
a5:submitted? => s
a5:submit-failed? => f # branch should run
"""
R/6 = """
# a6 should be complete (submission is explicitly optional)
a6? => b
a6:submit-failed? => f # branch should run
"""
R/7 = """
# a7 should be complete (submission is explicitly optional)
a:submit-failed? => f # branch should run
"""
R/8 = """
# a8 should be complete (submission is explicitly optional)
a:submitted? => s # branch should run
"""

[runtime]
[[a1, a2, a3, a4, a5]]
# a task which will always submit-fail
platform = broken
11 changes: 11 additions & 0 deletions tests/functional/spawn-on-demand/18-submitted/reference.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
7/a -triggered off [] in flow 1
6/a6 -triggered off [] in flow 1
8/a -triggered off [] in flow 1
3/a3 -triggered off [] in flow 1
2/a2 -triggered off [] in flow 1
4/a4 -triggered off [] in flow 1
1/a1 -triggered off [] in flow 1
5/a5 -triggered off [] in flow 1
5/f -triggered off ['5/a5'] in flow 1
8/s -triggered off ['8/a'] in flow 1
6/b -triggered off ['6/a6'] in flow 1
45 changes: 45 additions & 0 deletions tests/integration/test_task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING,
TASK_STATUS_SUCCEEDED,
TASK_STATUS_FAILED,
TASK_STATUS_EXPIRED,
TASK_STATUS_SUBMIT_FAILED,
)

# NOTE: foo and bar have no parents so at start-up (even with the workflow
Expand Down Expand Up @@ -1201,3 +1204,45 @@ async def test_runahead_offset_start(
"""
task_pool = mod_example_flow_2.pool
assert task_pool.runahead_limit_point == ISO8601Point('2004')


async def test_detect_incomplete_tasks(
flow,
scheduler,
start,
log_filter,
):
"""Finished tasks should be marked as incomplete.

If a task finishes without completing all required outputs, then it should
be marked as incomplete.
"""
incomplete_final_task_states = [
TASK_STATUS_FAILED,
TASK_STATUS_EXPIRED,
TASK_STATUS_SUBMIT_FAILED,
]
id_ = flow({
'scheduler': {
'allow implicit tasks': 'True',
},
'scheduling': {
'graph': {
# a workflow with one task for each of the incomplete final
# task states
'R1': '\n'.join(incomplete_final_task_states)
}
}
})
schd = scheduler(id_)
async with start(schd) as log:
itasks = schd.pool.get_tasks()
for itask in itasks:
# spawn the output corresponding to the task
schd.pool.spawn_on_output(itask, itask.tdef.name)
# ensure that it is correctly identified as incomplete
assert itask.state.outputs.get_incomplete()
assert itask.state.outputs.is_incomplete()
assert log_filter(log, contains=f"[{itask}] did not complete required outputs:")
# the task should not have been removed
assert itask in schd.pool.get_tasks()
Loading