Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pool: prevent no-flow tasks from spawning downstreams in compat mode #5614

Merged
merged 1 commit into from
Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
pool: prevent no-flow tasks from spawning downstreams in compat mode
* Closes #5613
  • Loading branch information
oliver-sanders committed Jul 11, 2023
commit 258ad0fd28ded5fff9c7e9fb34968a012d629964
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ in a timely fashion when tasks completed.
Task outputs and messages are now validated to avoid conflicts with built-in
outputs, messages, qualifiers and Cylc keywords.

[#5614](https://github.com/cylc/cylc-flow/pull/5614) -
Fix a bug in Cylc 7 compatibility mode where tasks running in the `none` flow
(e.g. via `cylc trigger --flow=none`) would trigger downstream tasks.

[#5604](https://github.com/cylc/cylc-flow/pull/5604) -
Fix a possible issue where workflows started using
`cylc play --start-cycle-point` could hang during startup.
Expand Down
12 changes: 7 additions & 5 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1270,11 +1270,11 @@ def spawn_on_output(self, itask, output, forced=False):
and itask.identity not in self.expected_failed_tasks
):
self.abort_task_failed = True
try:
children = itask.graph_children[output]
except KeyError:
# No children depend on this output
children = []

children = []
if itask.flow_nums or forced:
with suppress(KeyError):
children = itask.graph_children[output]

suicide = []
for c_name, c_point, is_abs in children:
Expand Down Expand Up @@ -1404,6 +1404,8 @@ def spawn_on_all_outputs(
associated prerequisites of spawned children to satisifed.

"""
if not itask.flow_nums:
return
if completed_only:
outputs = itask.state.outputs.get_completed()
else:
Expand Down
70 changes: 67 additions & 3 deletions tests/integration/test_task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,18 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from cylc.flow import CYLC_LOG
from copy import deepcopy
import logging
from typing import AsyncGenerator, Callable, Iterable, List, Tuple, Union

import pytest
from pytest import param
from typing import AsyncGenerator, Callable, Iterable, List, Tuple, Union

from cylc.flow import CYLC_LOG
from cylc.flow.cycling import PointBase
from cylc.flow.cycling.integer import IntegerPoint
from cylc.flow.exceptions import PlatformLookupError
from cylc.flow.data_store_mgr import TASK_PROXIES
from cylc.flow.task_outputs import TASK_OUTPUT_SUCCEEDED
from cylc.flow.scheduler import Scheduler
from cylc.flow.flow_mgr import FLOW_ALL
from cylc.flow.task_state import (
Expand Down Expand Up @@ -1051,3 +1052,66 @@ async def test_db_update_on_removal(

# the task should be gone from the DB
assert list_pool_from_db(schd) == []


async def test_no_flow_tasks_dont_spawn(
flow,
scheduler,
start,
):
"""Ensure no-flow tasks don't spawn downstreams.

No-flow tasks (i.e `--flow=none`) are one-offs which are not attached to
any "flow".

See https://github.com/cylc/cylc-flow/issues/5613
"""
id_ = flow({
'scheduling': {
'graph': {
'R1': 'a => b => c'
}
},
'scheduler': {
'allow implicit tasks': 'true',
},
})

schd = scheduler(id_)
async with start(schd):
# mark task 1/a as succeeded
task_a = schd.pool.get_tasks()[0]
task_a.state_reset(TASK_OUTPUT_SUCCEEDED)

for flow_nums, force, pool in (
# outputs yielded from a no-flow task should not spawn downstreams
(set(), False, []),
# forced spawning downstream of a no-flow task should spawn
# downstreams with flow_nums={}
(set(), True, [('1/b', set())]),
# outputs yielded from a task with flow numbers should spawn
# downstreams in the same flow
({1}, False, [('1/b', {1})]),
# forced spawning should work in the same way
({1}, True, [('1/b', {1})]),
):
# set the flow-nums on 1/a
task_a.flow_nums = flow_nums

# spawn on the succeeded output
schd.pool.spawn_on_output(
task_a,
TASK_OUTPUT_SUCCEEDED,
forced=force,
)
schd.pool.spawn_on_all_outputs(task_a)

# ensure the pool is as expected
assert [
(itask.identity, itask.flow_nums)
for pool in [
schd.pool.get_tasks(),
schd.pool.get_hidden_tasks(),
]
for itask in pool
] == pool
Loading