Skip to content

Commit

Permalink
Supersede fix-expire-trigger PR 5412
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Sep 26, 2023
1 parent 4cbfd48 commit d1e6e1c
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 12 deletions.
12 changes: 12 additions & 0 deletions cylc/flow/graph_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from cylc.flow.task_id import TaskID
from cylc.flow.task_trigger import TaskTrigger
from cylc.flow.task_outputs import (
TASK_OUTPUT_EXPIRED,
TASK_OUTPUT_SUCCEEDED,
TASK_OUTPUT_STARTED,
TASK_OUTPUT_FAILED,
Expand All @@ -40,6 +41,8 @@
TASK_OUTPUT_SUBMIT_FAILED
)
from cylc.flow.task_qualifiers import (
QUAL_FAM_EXPIRE_ALL,
QUAL_FAM_EXPIRE_ANY,
QUAL_FAM_SUCCEED_ALL,
QUAL_FAM_SUCCEED_ANY,
QUAL_FAM_FAIL_ALL,
Expand Down Expand Up @@ -123,6 +126,8 @@ class GraphParser:
# E.g. QUAL_FAM_START_ALL: (TASK_OUTPUT_STARTED, True) simply maps
# "FAM:start-all" to "MEMBER:started" and "-all" (all members).
fam_to_mem_trigger_map: Dict[str, Tuple[str, bool]] = {
QUAL_FAM_EXPIRE_ALL: (TASK_OUTPUT_EXPIRED, True),
QUAL_FAM_EXPIRE_ANY: (TASK_OUTPUT_EXPIRED, False),
QUAL_FAM_START_ALL: (TASK_OUTPUT_STARTED, True),
QUAL_FAM_START_ANY: (TASK_OUTPUT_STARTED, False),
QUAL_FAM_SUCCEED_ALL: (TASK_OUTPUT_SUCCEEDED, True),
Expand All @@ -139,6 +144,8 @@ class GraphParser:

# Map family pseudo triggers to affected member outputs.
fam_to_mem_output_map: Dict[str, List[str]] = {
QUAL_FAM_EXPIRE_ANY: [TASK_OUTPUT_EXPIRED],
QUAL_FAM_EXPIRE_ALL: [TASK_OUTPUT_EXPIRED],
QUAL_FAM_START_ANY: [TASK_OUTPUT_STARTED],
QUAL_FAM_START_ALL: [TASK_OUTPUT_STARTED],
QUAL_FAM_SUCCEED_ANY: [TASK_OUTPUT_SUCCEEDED],
Expand Down Expand Up @@ -736,6 +743,11 @@ def _set_output_opt(
if suicide:
return

# TODO: OS: "This is now covered by the optional outputs stuff"
if output == TASK_OUTPUT_EXPIRED and not optional:
raise GraphParseError(
f"Expired-output {name}:{output} must be optional")

if output == TASK_OUTPUT_FINISHED:
# Interpret :finish pseudo-output
if optional:
Expand Down
26 changes: 15 additions & 11 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -848,8 +848,8 @@ def release_queued_tasks(self):

for itask in released:
itask.state_reset(is_queued=False)
itask.waiting_on_job_prep = True
self.data_store_mgr.delta_task_queued(itask)
itask.waiting_on_job_prep = True

if cylc.flow.flags.cylc7_back_compat:
# Cylc 7 Back Compat: spawn downstream to cause Cylc 7 style
Expand Down Expand Up @@ -1025,8 +1025,7 @@ def can_stop(self, stop_mode):
and itask.state(*TASK_STATUSES_ACTIVE)
and not itask.state.kill_failed
)
# we don't need to check for preparing tasks because they will be
# reset to waiting on restart
# preparing tasks get reset to waiting on restart
for itask in self.get_tasks()
)

Expand Down Expand Up @@ -1343,7 +1342,7 @@ def remove_if_complete(self, itask):
if cylc.flow.flags.cylc7_back_compat:

if not itask.state(TASK_STATUS_FAILED):
self.remove(itask, 'finished')
self.remove(itask, 'completed')

if self.compute_runahead():
self.release_runahead_tasks()
Expand All @@ -1360,7 +1359,7 @@ def remove_if_complete(self, itask):
return

# Complete, can remove it from the pool.
self.remove(itask, 'finished and complete')
self.remove(itask, 'complete')

if itask.identity == self.stop_task_id:
self.stop_task_finished = True
Expand Down Expand Up @@ -1898,23 +1897,28 @@ def _set_expired_task(self, itask):
or itask.tdef.expiration_offset is None
):
return False

if itask.expire_time is None:
itask.expire_time = (
itask.get_point_as_seconds() +
itask.get_offset_as_seconds(itask.tdef.expiration_offset))
if time() > itask.expire_time:

if (
time() > itask.expire_time
and itask.state_reset(TASK_STATUS_EXPIRED)
):
self._expire_task(itask)
return True

return False

def _expire_task(self, itask):
msg = 'Task expired (skipping job).'
msg = 'Task expired: will not submit job.'
LOG.warning(f"[{itask}] {msg}")
self.task_events_mgr.setup_event_handlers(itask, "expired", msg)
if itask.state_reset(TASK_STATUS_EXPIRED, is_held=False):
self.data_store_mgr.delta_task_state(itask)
self.data_store_mgr.delta_task_held(itask)
self.remove(itask, 'expired')
self.data_store_mgr.delta_task_state(itask)
self.data_store_mgr.delta_task_held(itask)
self.spawn_on_output(itask, 'expired')

def task_succeeded(self, id_):
"""Return True if task with id_ is in the succeeded state."""
Expand Down
2 changes: 2 additions & 0 deletions cylc/flow/task_qualifiers.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
QUAL_FAM_SUBMIT_ANY = "submit-any"
QUAL_FAM_SUBMIT_FAIL_ALL = "submit-fail-all"
QUAL_FAM_SUBMIT_FAIL_ANY = "submit-fail-any"
QUAL_FAM_EXPIRE_ALL = "expire-all"
QUAL_FAM_EXPIRE_ANY = "expire-any"

# alternative (shorthand) qualifiers
ALT_QUALIFIERS = {
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/hold-release/05-release.t
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ init_workflow "${TEST_NAME_BASE}" <<'__FLOW_CONFIG__'
inherit = STOP
script = """
cylc__job__poll_grep_workflow_log -E \
'1/dog1 succeeded .* task proxy removed \(finished and complete\)'
'1/dog1 succeeded .* task proxy removed \(completed\)'
cylc stop "${CYLC_WORKFLOW_ID}"
"""
__FLOW_CONFIG__
Expand Down
22 changes: 22 additions & 0 deletions tests/functional/triggering/21-expire.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#!/usr/bin/env bash
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#-------------------------------------------------------------------------------
# Test expire triggering
. "$(dirname "$0")/test_header"
set_test_number 2
reftest
exit
19 changes: 19 additions & 0 deletions tests/functional/triggering/21-expire/flow.cylc
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[scheduling]
initial cycle point = 1999
[[special tasks]]
clock-expire = foo1(PT0S), foo2(PT0S), bar1(PT0S), x(PT0S)
[[graph]]
# Expire: foo1, foo2, bar1, x
# Run: y, bar2, baz, qux
R1 = """
x:expire? => y
FOO:expire-all? => baz
BAR:expire-any? => qux
"""
[runtime]
[[FOO, BAR]]
[[foo1, foo2]]
inherit = FOO
[[bar1, bar2]]
inherit = BAR
[[x, y, baz, qux]]
4 changes: 4 additions & 0 deletions tests/functional/triggering/21-expire/reference.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
19990101T0000Z/bar2 -triggered off [] in flow 1
19990101T0000Z/baz -triggered off ['19990101T0000Z/foo1', '19990101T0000Z/foo2'] in flow 1
19990101T0000Z/qux -triggered off ['19990101T0000Z/bar1'] in flow 1
19990101T0000Z/y -triggered off ['19990101T0000Z/x'] in flow 1
8 changes: 8 additions & 0 deletions tests/unit/test_graph_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,10 @@ def test_family_optional_outputs(qual, task_output):
"FAM => foo", # bare family on LHS
"Illegal family trigger"
],
[
"FAM:expire-all => foo",
"must be optional"
],
]
)
def test_family_trigger_errors(graph, error):
Expand Down Expand Up @@ -804,6 +808,10 @@ def test_family_trigger_errors(graph, error):
"a:finish? => b",
"Pseudo-output a:finished can't be optional",
],
[
"a:expire => b",
"must be optional",
],
]
)
def test_task_optional_output_errors_order(
Expand Down

0 comments on commit d1e6e1c

Please sign in to comment.