From b1d45ea7315b7d00285b75a606184952250aca8e Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Mon, 26 Sep 2022 10:09:50 +0100 Subject: [PATCH 1/5] fix reversed data-store edge source-target (#5156) Co-authored-by: David Sutherland --- cylc/flow/data_store_mgr.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/cylc/flow/data_store_mgr.py b/cylc/flow/data_store_mgr.py index a53e9cdca4f..d5e4bd5acef 100644 --- a/cylc/flow/data_store_mgr.py +++ b/cylc/flow/data_store_mgr.py @@ -733,11 +733,18 @@ def _expand_graph_window( and e_id not in self.added[EDGES] and edge_distance <= self.n_edge_distance ): - self.added[EDGES][e_id] = PbEdge( - id=e_id, - source=s_tokens.id, - target=t_tokens.id - ) + if is_parent: + self.added[EDGES][e_id] = PbEdge( + id=e_id, + source=t_tokens.id, + target=s_tokens.id + ) + else: + self.added[EDGES][e_id] = PbEdge( + id=e_id, + source=s_tokens.id, + target=t_tokens.id + ) # Add edge id to node field for resolver reference self.updated[TASK_PROXIES].setdefault( t_tokens.id, From a438c69001ebfc01a4194f6a36c9357943f4391e Mon Sep 17 00:00:00 2001 From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> Date: Mon, 26 Sep 2022 10:34:38 +0100 Subject: [PATCH 2/5] `log_vc_info`: Redirect diff straight to file to avoid blocking pipe (#5139) * log_vc_info: redirect diff straight to file to avoid blocking pipe * Update changelog --- CHANGES.md | 4 + cylc/flow/install_plugins/log_vc_info.py | 98 +++++++++++++-------- tests/unit/post_install/test_log_vc_info.py | 46 ++++++---- 3 files changed, 94 insertions(+), 54 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index a70cfee8406..e5aea3b3559 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -38,6 +38,10 @@ Maintenance release. [#5104](https://github.com/cylc/cylc-flow/pull/5104) - Fix retriggering of failed tasks after a reload. +[#5139](https://github.com/cylc/cylc-flow/pull/5139) - Fix bug where +`cylc install` could hang if there was a large uncommitted diff in the +source dir (for git/svn repos). + [#5131](https://github.com/cylc/cylc-flow/pull/5131) - Infer workflow run number for `workflow_state` xtrigger. diff --git a/cylc/flow/install_plugins/log_vc_info.py b/cylc/flow/install_plugins/log_vc_info.py index 9be634c5b0f..1aec3eecd5d 100644 --- a/cylc/flow/install_plugins/log_vc_info.py +++ b/cylc/flow/install_plugins/log_vc_info.py @@ -62,7 +62,9 @@ import json from pathlib import Path from subprocess import Popen, DEVNULL, PIPE -from typing import Any, Dict, Iterable, List, Optional, TYPE_CHECKING, Union +from typing import ( + Any, Dict, Iterable, List, Optional, TYPE_CHECKING, TextIO, Union, overload +) from cylc.flow import LOG from cylc.flow.exceptions import CylcError @@ -80,8 +82,6 @@ GIT: ['describe', '--always', '--dirty'] } -# git ['show', '--quiet', '--format=short'], - STATUS_COMMANDS: Dict[str, List[str]] = { SVN: ['status', '--non-interactive'], GIT: ['status', '--short'] @@ -189,13 +189,40 @@ def get_vc_info(path: Union[Path, str]) -> Optional[Dict[str, Any]]: return None -def _run_cmd(vcs: str, args: Iterable[str], cwd: Union[Path, str]) -> str: - """Run a VCS command, return stdout. +@overload +def _run_cmd( + vcs: str, args: Iterable[str], cwd: Union[Path, str], stdout: int = PIPE +) -> str: + ... + + +@overload +def _run_cmd( + vcs: str, args: Iterable[str], cwd: Union[Path, str], stdout: TextIO +) -> None: + ... + + +def _run_cmd( + vcs: str, + args: Iterable[str], + cwd: Union[Path, str], + stdout: Union[TextIO, int] = PIPE +) -> Optional[str]: + """Run a VCS command. Args: vcs: The version control system. args: The args to pass to the version control command. cwd: Directory to run the command in. + stdout: Where to redirect output (either PIPE or a + text stream/file object). Note: only use PIPE for + commands that will not generate a large output, otherwise + the pipe might get blocked. + + Returns: + Stdout output if stdout=PIPE, else None as the output has been + written directly to the specified file. Raises: VCSNotInstalledError: The VCS is not found. @@ -208,7 +235,7 @@ def _run_cmd(vcs: str, args: Iterable[str], cwd: Union[Path, str]) -> str: cmd, cwd=cwd, stdin=DEVNULL, - stdout=PIPE, + stdout=stdout, stderr=PIPE, text=True, ) @@ -275,41 +302,40 @@ def _parse_svn_info(info_text: str) -> Dict[str, Any]: return ret -def get_diff(vcs: str, path: Union[Path, str]) -> Optional[str]: - """Return the diff of uncommitted changes for a repository. +def write_diff( + vcs: str, repo_path: Union[Path, str], run_dir: Union[Path, str] +) -> Path: + """Get and write the diff of uncommitted changes for a repository to the + workflow's vcs log dir. Args: vcs: The version control system. - path: The path to the repo. - """ - args_ = DIFF_COMMANDS[vcs] - if Path(path).is_absolute(): - args_.append(str(path)) - else: - args_.append(str(Path().cwd() / path)) - - try: - diff = _run_cmd(vcs, args_, cwd=path) - except (VCSNotInstalledError, VCSMissingBaseError): - return None - header = ( - "# Auto-generated diff of uncommitted changes in the Cylc " - "workflow repository:\n" - f"# {path}") - return f"{header}\n{diff}" - - -def write_diff(diff: str, run_dir: Union[Path, str]) -> None: - """Write a diff to the workflow's vcs log dir. - - Args: - diff: The diff. + repo_path: The path to the repo. run_dir: The workflow run directory. + + Returns the path to diff file. """ + args = DIFF_COMMANDS[vcs] + args.append( + str(repo_path) if Path(repo_path).is_absolute() else + str(Path().cwd() / repo_path) + ) + diff_file = Path(run_dir, LOG_VERSION_DIR, DIFF_FILENAME) diff_file.parent.mkdir(exist_ok=True) - with open(diff_file, 'w') as f: - f.write(diff) + + with open(diff_file, 'a') as f: + f.write( + "# Auto-generated diff of uncommitted changes in the Cylc " + "workflow repository:\n" + f"# {repo_path}\n" + ) + f.flush() + try: + _run_cmd(vcs, args, repo_path, stdout=f) + except VCSMissingBaseError as exc: + f.write(f"# No diff - {exc}") + return diff_file # Entry point: @@ -331,8 +357,6 @@ def main( if vc_info is None: return False vcs = vc_info['version control system'] - diff = get_diff(vcs, srcdir) write_vc_info(vc_info, rundir) - if diff is not None: - write_diff(diff, rundir) + write_diff(vcs, srcdir, rundir) return True diff --git a/tests/unit/post_install/test_log_vc_info.py b/tests/unit/post_install/test_log_vc_info.py index cddb8012997..58e16e241b5 100644 --- a/tests/unit/post_install/test_log_vc_info.py +++ b/tests/unit/post_install/test_log_vc_info.py @@ -26,13 +26,15 @@ from cylc.flow.install_plugins.log_vc_info import ( INFO_FILENAME, LOG_VERSION_DIR, - get_diff, _get_git_commit, get_status, get_vc_info, main, + write_diff, ) +from cylc.flow.workflow_files import WorkflowFiles + Fixture = Any @@ -161,12 +163,14 @@ def test_get_vc_info_git(git_source_repo: Tuple[str, str]): @require_git -def test_get_diff_git(git_source_repo: Tuple[str, str]): - """Test get_diff() for a git repo""" - source_dir, commit_sha = git_source_repo - diff = get_diff('git', source_dir) - assert diff is not None - diff_lines = diff.splitlines() +def test_write_diff_git(git_source_repo: Tuple[str, str], tmp_path: Path): + """Test write_diff() for a git repo""" + source_dir, _ = git_source_repo + run_dir = tmp_path / 'run_dir' + (run_dir / WorkflowFiles.LOG_DIR).mkdir(parents=True) + diff_file = write_diff('git', source_dir, run_dir) + diff_lines = diff_file.read_text().splitlines() + assert diff_lines[0].startswith("# Auto-generated diff") for line in ("diff --git a/flow.cylc b/flow.cylc", "- R1 = foo", "+ R1 = bar"): @@ -205,12 +209,14 @@ def test_get_vc_info_svn(svn_source_repo: Tuple[str, str, str]): @require_svn -def test_get_diff_svn(svn_source_repo: Tuple[str, str, str]): - """Test get_diff() for an svn working copy""" - source_dir, uuid, repo_path = svn_source_repo - diff = get_diff('svn', source_dir) - assert diff is not None - diff_lines = diff.splitlines() +def test_write_diff_svn(svn_source_repo: Tuple[str, str, str], tmp_path: Path): + """Test write_diff() for an svn working copy""" + source_dir, _, _ = svn_source_repo + run_dir = tmp_path / 'run_dir' + (run_dir / WorkflowFiles.LOG_DIR).mkdir(parents=True) + diff_file = write_diff('svn', source_dir, run_dir) + diff_lines = diff_file.read_text().splitlines() + assert diff_lines[0].startswith("# Auto-generated diff") for line in (f"--- {source_dir}/flow.cylc (revision 1)", f"+++ {source_dir}/flow.cylc (working copy)", "- R1 = foo", @@ -239,20 +245,26 @@ def test_not_repo(tmp_path: Path, monkeypatch: MonkeyPatch): @require_git def test_no_base_commit_git(tmp_path: Path): - """Test get_vc_info() and get_diff() for a recently init'd git source dir + """Test get_vc_info() and write_diff() for a recently init'd git source dir that does not have a base commit yet.""" source_dir = Path(tmp_path, 'new_git_repo') source_dir.mkdir() subprocess.run(['git', 'init'], cwd=source_dir, check=True) flow_file = source_dir.joinpath('flow.cylc') flow_file.write_text(BASIC_FLOW_1) + run_dir = tmp_path / 'run_dir' + (run_dir / WorkflowFiles.LOG_DIR).mkdir(parents=True) vc_info = get_vc_info(source_dir) assert vc_info is not None - expected = [ + assert list(vc_info.items()) == [ ('version control system', "git"), ('working copy root path', str(source_dir)), ('status', ["?? flow.cylc"]) ] - assert list(vc_info.items()) == expected - assert get_diff('git', source_dir) is None + + # Diff file expected to be empty (only containing comment lines), + # but should work without raising + diff_file = write_diff('git', source_dir, run_dir) + for line in diff_file.read_text().splitlines(): + assert line.startswith('#') From 6d479660550c97a957b2f4034a03b1cf56a49557 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Mon, 26 Sep 2022 22:49:05 +1300 Subject: [PATCH 3/5] A no-flow task should not merge and retrigger incomplete children (#5146) Prevent no-flow merge. --- CHANGES.md | 3 ++ cylc/flow/task_pool.py | 8 ++-- .../flow-triggers/13-noflow-nomerge.t | 47 +++++++++++++++++++ .../flow-triggers/13-noflow-nomerge/flow.cylc | 7 +++ 4 files changed, 62 insertions(+), 3 deletions(-) create mode 100644 tests/functional/flow-triggers/13-noflow-nomerge.t create mode 100644 tests/functional/flow-triggers/13-noflow-nomerge/flow.cylc diff --git a/CHANGES.md b/CHANGES.md index e5aea3b3559..05b3f99aa4d 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -35,6 +35,9 @@ Maintenance release. ### Fixes +[#5146](https://github.com/cylc/cylc-flow/pull/5146) - no-flow tasks should not +retrigger incomplete children. + [#5104](https://github.com/cylc/cylc-flow/pull/5104) - Fix retriggering of failed tasks after a reload. diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 5d86663fd12..cdf10156dda 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -1910,9 +1910,11 @@ def merge_flows(self, itask: TaskProxy, flow_nums: 'FlowNums') -> None: This also performs required spawning / state changing for edge cases. """ - if flow_nums == itask.flow_nums: - # Don't do anything if trying to spawn the same task in the same - # flow. This arises downstream of an AND trigger (if "A & B => C" + if not flow_nums or (flow_nums == itask.flow_nums): + # Don't do anything if: + # 1. merging from a no-flow task, or + # 2. trying to spawn the same task in the same flow. This arises + # downstream of an AND trigger (if "A & B => C" # and A spawns C first, B will find C is already in the pool), # and via suicide triggers ("A =>!A": A tries to spawn itself). return diff --git a/tests/functional/flow-triggers/13-noflow-nomerge.t b/tests/functional/flow-triggers/13-noflow-nomerge.t new file mode 100644 index 00000000000..c8b4528a2f9 --- /dev/null +++ b/tests/functional/flow-triggers/13-noflow-nomerge.t @@ -0,0 +1,47 @@ +#!/usr/bin/env bash +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +#------------------------------------------------------------------------------- + +. "$(dirname "$0")/test_header" +set_test_number 7 + +install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}" +DB="${WORKFLOW_RUN_DIR}/log/db" + +run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}" +run_ok "${TEST_NAME_BASE}-run" cylc play "${WORKFLOW_NAME}" +poll_grep_workflow_log "Workflow stalled" + +run_ok "${TEST_NAME_BASE}-trigger" cylc trigger --flow=none "${WORKFLOW_NAME}//1/a" +poll_grep_workflow_log -E "1/a running job:02 flows:none.*=> succeeded" + +cylc stop --now --now --max-polls=5 --interval=2 "$WORKFLOW_NAME" + +TEST_NAME="${TEST_NAME_BASE}-count" +QUERY="SELECT COUNT(*) FROM task_states WHERE name=='a'" +run_ok "${TEST_NAME}" sqlite3 "${DB}" "$QUERY" +cmp_ok "${TEST_NAME}.stdout" <<__END__ +2 +__END__ + +QUERY="SELECT COUNT(*) FROM task_states WHERE name=='b'" +run_ok "${TEST_NAME}" sqlite3 "${DB}" "$QUERY" +cmp_ok "${TEST_NAME}.stdout" <<__END__ +1 +__END__ + +purge diff --git a/tests/functional/flow-triggers/13-noflow-nomerge/flow.cylc b/tests/functional/flow-triggers/13-noflow-nomerge/flow.cylc new file mode 100644 index 00000000000..9b7e2b90a25 --- /dev/null +++ b/tests/functional/flow-triggers/13-noflow-nomerge/flow.cylc @@ -0,0 +1,7 @@ +[scheduling] + [[graph]] + R1 = "a => b" +[runtime] + [[a]] + [[b]] + script = false # die as incomplete From c44563a25591eef8acb6cc5465da214af1e09708 Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Tue, 27 Sep 2022 09:42:44 +0100 Subject: [PATCH 4/5] remote-install: add "ana/" to the default install list (#5137) * The `ana/` directory is used by `rose_ana` to load comparison modules. * These are typically run where the data is generated which is often remote. * These directories typically contain a small number of Python files. --- CHANGES.md | 4 +++ cylc/flow/cfgspec/workflow.py | 15 ++++++++--- cylc/flow/remote.py | 16 +++++++----- tests/functional/remote/01-file-install.t | 10 +++++--- tests/unit/test_remote.py | 31 +++++++++++++++++------ 5 files changed, 54 insertions(+), 22 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 05b3f99aa4d..9d2f958bf90 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -35,6 +35,10 @@ Maintenance release. ### Fixes + +[#5137](https://github.com/cylc/cylc-flow/pull/5137) - +Install the `ana/` directory to remote platforms by default. + [#5146](https://github.com/cylc/cylc-flow/pull/5146) - no-flow tasks should not retrigger incomplete children. diff --git a/cylc/flow/cfgspec/workflow.py b/cylc/flow/cfgspec/workflow.py index 1e048e830c8..36d52f5a985 100644 --- a/cylc/flow/cfgspec/workflow.py +++ b/cylc/flow/cfgspec/workflow.py @@ -222,10 +222,17 @@ def get_script_common_text(this: str, example: Optional[str] = None): The following directories already get installed by default: - * ``app/`` - * ``bin/`` - * ``etc/`` - * ``lib/`` + ``ana/`` + Rose ana analysis modules + ``app/`` + Rose applications + ``bin/`` + Cylc bin directory (added to ``PATH``) + ``etc/`` + Miscellaneous resources + ``lib/`` + Cylc lib directory (``lib/python`` added to ``PYTHONPATH`` + for workflow config) These should be located in the top level of your Cylc workflow, i.e. the directory that contains your ``flow.cylc`` file. diff --git a/cylc/flow/remote.py b/cylc/flow/remote.py index ccbb5558000..9bc0fe36d53 100644 --- a/cylc/flow/remote.py +++ b/cylc/flow/remote.py @@ -172,6 +172,15 @@ def get_includes_to_rsync(rsync_includes=None): '--no-t' ] +DEFAULT_INCLUDES = [ + '/ana/***', # Rose ana analysis modules + '/app/***', # Rose applications + '/bin/***', # Cylc bin directory (added to PATH) + '/etc/***', # Miscellaneous resources + '/lib/***', # Cylc lib directory (lib/python added to PYTHONPATH for + # workflow config) +] + def construct_rsync_over_ssh_cmd( src_path: str, dst_path: str, platform: Dict[str, Any], @@ -209,12 +218,7 @@ def construct_rsync_over_ssh_cmd( rsync_cmd.extend(rsync_options) for exclude in ['log', 'share', 'work']: rsync_cmd.append(f"--exclude={exclude}") - default_includes = [ - '/app/***', - '/bin/***', - '/etc/***', - '/lib/***'] - for include in default_includes: + for include in DEFAULT_INCLUDES: rsync_cmd.append(f"--include={include}") for include in get_includes_to_rsync(rsync_includes): rsync_cmd.append(f"--include={include}") diff --git a/tests/functional/remote/01-file-install.t b/tests/functional/remote/01-file-install.t index e673b7c7c9d..9f89c49f79a 100644 --- a/tests/functional/remote/01-file-install.t +++ b/tests/functional/remote/01-file-install.t @@ -23,7 +23,7 @@ set_test_number 6 create_files () { # dump some files into the run dir - for DIR in "bin" "app" "etc" "lib" "dir1" "dir2" + for DIR in "bin" "ana" "app" "etc" "lib" "dir1" "dir2" do mkdir -p "${WORKFLOW_RUN_DIR}/${DIR}" touch "${WORKFLOW_RUN_DIR}/${DIR}/moo" @@ -35,7 +35,7 @@ create_files () { } # Test configured files/directories along with default files/directories -# (app, bin, etc, lib) are correctly installed on the remote platform. +# (ana, app, bin, etc, lib) are correctly installed on the remote platform. TEST_NAME="${TEST_NAME_BASE}-default-paths" init_workflow "${TEST_NAME}" <<__FLOW_CONFIG__ [scheduling] @@ -59,8 +59,9 @@ workflow_run_ok "${TEST_NAME}-run1" cylc play "${WORKFLOW_NAME}" \ # ensure these files get installed on the remote platform SSH="$(cylc config -d -i "[platforms][$CYLC_TEST_PLATFORM]ssh command")" ${SSH} "${CYLC_TEST_HOST}" \ - find "${RUN_DIR_REL}/"{app,bin,etc,lib} -type f | sort > 'find.out' + find "${RUN_DIR_REL}/"{ana,app,bin,etc,lib} -type f | sort > 'find.out' cmp_ok 'find.out' <<__OUT__ +${RUN_DIR_REL}/ana/moo ${RUN_DIR_REL}/app/moo ${RUN_DIR_REL}/bin/moo ${RUN_DIR_REL}/etc/moo @@ -93,8 +94,9 @@ workflow_run_ok "${TEST_NAME}-run2" cylc play "${WORKFLOW_NAME}" \ -s "CYLC_TEST_PLATFORM='${CYLC_TEST_PLATFORM}'" ${SSH} "${CYLC_TEST_HOST}" \ - find "${RUN_DIR_REL}/"{app,bin,dir1,dir2,file1,file2,etc,lib} -type f | sort > 'find.out' + find "${RUN_DIR_REL}/"{ana,app,bin,dir1,dir2,file1,file2,etc,lib} -type f | sort > 'find.out' cmp_ok 'find.out' <<__OUT__ +${RUN_DIR_REL}/ana/moo ${RUN_DIR_REL}/app/moo ${RUN_DIR_REL}/bin/moo ${RUN_DIR_REL}/dir1/moo diff --git a/tests/unit/test_remote.py b/tests/unit/test_remote.py index 1645665a59f..a1d28cf63e0 100644 --- a/tests/unit/test_remote.py +++ b/tests/unit/test_remote.py @@ -63,11 +63,26 @@ def test_construct_rsync_over_ssh_cmd(): } ) assert host == 'miklegard' - assert ' '.join(cmd) == ( - 'rsync command --delete --rsh=strange_ssh --include=/.service/ ' - '--include=/.service/server.key -a --checksum ' - '--out-format=%o %n%L --no-t --exclude=log --exclude=share ' - '--exclude=work --include=/app/*** --include=/bin/*** ' - '--include=/etc/*** --include=/lib/*** --exclude=* ' - '/foo/ miklegard:/bar/' - ) + assert cmd == [ + 'rsync', + 'command', + '--delete', + '--rsh=strange_ssh', + '--include=/.service/', + '--include=/.service/server.key', + '-a', + '--checksum', + '--out-format=%o %n%L', + '--no-t', + '--exclude=log', + '--exclude=share', + '--exclude=work', + '--include=/ana/***', + '--include=/app/***', + '--include=/bin/***', + '--include=/etc/***', + '--include=/lib/***', + '--exclude=*', + '/foo/', + 'miklegard:/bar/', + ] From 20bc9a097668fd365163a9c87d03b6e5ee3c54c5 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Tue, 27 Sep 2022 22:55:33 +1300 Subject: [PATCH 5/5] Db store force triggered (#5023) Store force-triggered flag in the run DB. * Add a new functional test. * Remove some redundant DB updates. * Update change log. --- CHANGES.md | 3 + cylc/flow/rundb.py | 6 +- cylc/flow/task_job_mgr.py | 1 - cylc/flow/task_pool.py | 80 +++++++++---------- cylc/flow/workflow_db_mgr.py | 20 +++-- .../database/00-simple/schema.out | 2 +- .../functional/flow-triggers/11-wait-merge.t | 2 +- .../01-job-nn-localhost/db.sqlite3 | 4 +- .../restart/57-ghost-job/db.sqlite3 | 4 +- .../restart/58-waiting-manual-triggered.t | 47 +++++++++++ .../58-waiting-manual-triggered/flow.cylc | 22 +++++ 11 files changed, 135 insertions(+), 56 deletions(-) create mode 100644 tests/functional/restart/58-waiting-manual-triggered.t create mode 100644 tests/functional/restart/58-waiting-manual-triggered/flow.cylc diff --git a/CHANGES.md b/CHANGES.md index 9d2f958bf90..84a9dd8dc28 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -28,6 +28,7 @@ creating a new release entry be sure to copy & paste the span tag with the `actions:bind` attribute, which is used by a regex to find the text to be updated. Only the first match gets replaced, so it's fine to leave the old ones in. --> + ------------------------------------------------------------------------------- ## __cylc-8.0.3 (Pending YYYY-MM-DD)__ @@ -35,6 +36,8 @@ Maintenance release. ### Fixes +[#5023](https://github.com/cylc/cylc-flow/pull/5023) - tasks force-triggered +after a shutdown was ordered should submit to run immediately on restart. [#5137](https://github.com/cylc/cylc-flow/pull/5137) - Install the `ana/` directory to remote platforms by default. diff --git a/cylc/flow/rundb.py b/cylc/flow/rundb.py index 4cb3c630638..42a4cca3e9a 100644 --- a/cylc/flow/rundb.py +++ b/cylc/flow/rundb.py @@ -297,6 +297,7 @@ class CylcWorkflowDAO: ["submit_num", {"datatype": "INTEGER"}], ["status"], ["flow_wait", {"datatype": "INTEGER"}], + ["is_manual_submit", {"datatype": "INTEGER"}], ], TABLE_TASK_TIMEOUT_TIMERS: [ ["cycle", {"is_primary_key": True}], @@ -802,14 +803,15 @@ def select_task_pool_for_restart(self, callback): """Select from task_pool+task_states+task_jobs for restart. Invoke callback(row_idx, row) on each row, where each row contains: - [cycle, name, is_late, status, is_held, submit_num, - try_num, platform_name, time_submit, time_run, timeout, outputs] + the fields in the SELECT statement below. """ form_stmt = r""" SELECT %(task_pool)s.cycle, %(task_pool)s.name, %(task_pool)s.flow_nums, + %(task_states)s.flow_wait, + %(task_states)s.is_manual_submit, %(task_late_flags)s.value, %(task_pool)s.status, %(task_pool)s.is_held, diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py index 7d18bd347a1..1188e3cab01 100644 --- a/cylc/flow/task_job_mgr.py +++ b/cylc/flow/task_job_mgr.py @@ -238,7 +238,6 @@ def prep_submit_task_jobs(self, workflow, itasks, check_syntax=True): itask.submit_num += 1 itask.state_reset(TASK_STATUS_PREPARING) self.data_store_mgr.delta_task_state(itask) - self.workflow_db_mgr.put_update_task_state(itask) prep_task = self._prep_submit_task_job( workflow, itask, check_syntax=check_syntax) if prep_task: diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index cdf10156dda..c9d7ca483ea 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -173,13 +173,32 @@ def load_from_point(self): point = tdef.first_point(self.config.start_point) self.spawn_to_rh_limit(tdef, point, {flow_num}) - def add_to_pool(self, itask, is_new: bool = True) -> None: + def db_add_new_flow_rows(self, itask: TaskProxy) -> None: + """Add new rows to DB task tables that record flow_nums. + + Call when a new task is spawned or a flow merge occurs. + """ + # Add row to task_states table. + now = get_current_time_string() + self.workflow_db_mgr.put_insert_task_states( + itask, + { + "time_created": now, + "time_updated": now, + "status": itask.state.status, + "flow_nums": serialise(itask.flow_nums), + "flow_wait": itask.flow_wait, + "is_manual_submit": itask.is_manual_submit + } + ) + # Add row to task_outputs table: + self.workflow_db_mgr.put_insert_task_outputs(itask) + + def add_to_pool(self, itask) -> None: """Add a task to the hidden (if not satisfied) or main task pool. If the task already exists in the hidden pool and is satisfied, move it to the main pool. - - (is_new is False inidcates load from DB at restart). """ if itask.is_task_prereqs_not_done() and not itask.is_manual_submit: # Add to hidden pool if not satisfied. @@ -205,21 +224,6 @@ def add_to_pool(self, itask, is_new: bool = True) -> None: self.create_data_store_elements(itask) - if is_new: - # Add row to "task_states" table. - now = get_current_time_string() - self.workflow_db_mgr.put_insert_task_states( - itask, - { - "time_created": now, - "time_updated": now, - "status": itask.state.status, - "flow_nums": serialise(itask.flow_nums) - } - ) - # Add row to "task_outputs" table: - self.workflow_db_mgr.put_insert_task_outputs(itask) - if itask.tdef.max_future_prereq_offset is not None: # (Must do this once added to the pool). self.set_max_future_offset() @@ -416,9 +420,9 @@ def load_db_task_pool_for_restart(self, row_idx, row): if row_idx == 0: LOG.info("LOADING task proxies") # Create a task proxy corresponding to this DB entry. - (cycle, name, flow_nums, is_late, status, is_held, submit_num, _, - platform_name, time_submit, time_run, timeout, outputs_str) = row - + (cycle, name, flow_nums, flow_wait, is_manual_submit, is_late, status, + is_held, submit_num, _, platform_name, time_submit, time_run, timeout, + outputs_str) = row try: itask = TaskProxy( self.config.get_taskdef(name), @@ -427,7 +431,9 @@ def load_db_task_pool_for_restart(self, row_idx, row): status=status, is_held=is_held, submit_num=submit_num, - is_late=bool(is_late) + is_late=bool(is_late), + flow_wait=bool(flow_wait), + is_manual_submit=bool(is_manual_submit) ) except WorkflowConfigError: LOG.exception( @@ -491,7 +497,7 @@ def load_db_task_pool_for_restart(self, row_idx, row): if itask.state_reset(status, is_runahead=True): self.data_store_mgr.delta_task_runahead(itask) - self.add_to_pool(itask, is_new=False) + self.add_to_pool(itask) # All tasks load as runahead-limited, but finished and manually # triggered tasks (incl. --start-task's) can be released now. @@ -628,8 +634,9 @@ def _get_spawned_or_merged_task( def spawn_to_rh_limit(self, tdef, point, flow_nums) -> None: """Spawn parentless task instances from point to runahead limit.""" - if not flow_nums: - # force-triggered no-flow task. + if not flow_nums or point is None: + # Force-triggered no-flow task. + # Or called with an invalid next_point. return if self.runahead_limit_point is None: self.compute_runahead() @@ -1205,14 +1212,6 @@ def spawn_on_output(self, itask, output, forced=False): if c_task is not None and c_task != itask: # (Avoid self-suicide: A => !A) self.merge_flows(c_task, itask.flow_nums) - self.workflow_db_mgr.put_insert_task_states( - c_task, - { - "status": c_task.state.status, - "flow_nums": serialise(c_task.flow_nums) - } - ) - # self.workflow_db_mgr.process_queued_ops() elif ( c_task is None and (itask.flow_nums or forced) @@ -1482,6 +1481,7 @@ def spawn_task( return None LOG.info(f"[{itask}] spawned") + self.db_add_new_flow_rows(itask) return itask def force_spawn_children( @@ -1588,7 +1588,6 @@ def force_trigger_tasks( ) if itask is None: continue - self.add_to_pool(itask, is_new=True) itasks.append(itask) # Trigger matched tasks if not already active. @@ -1616,7 +1615,6 @@ def force_trigger_tasks( # De-queue it to run now. self.task_queue_mgr.force_release_task(itask) - self.workflow_db_mgr.put_update_task_state(itask) return len(unmatched) def sim_time_check(self, message_queue): @@ -1919,24 +1917,26 @@ def merge_flows(self, itask: TaskProxy, flow_nums: 'FlowNums') -> None: # and via suicide triggers ("A =>!A": A tries to spawn itself). return + merge_with_no_flow = not itask.flow_nums + + itask.merge_flows(flow_nums) + # Merged tasks get a new row in the db task_states table. + self.db_add_new_flow_rows(itask) + if ( itask.state(*TASK_STATUSES_FINAL) and itask.state.outputs.get_incomplete() ): # Re-queue incomplete task to run again in the merged flow. LOG.info(f"[{itask}] incomplete task absorbed by new flow.") - itask.merge_flows(flow_nums) itask.state_reset(TASK_STATUS_WAITING) self.queue_task(itask) self.data_store_mgr.delta_task_state(itask) - elif not itask.flow_nums or itask.flow_wait: + elif merge_with_no_flow or itask.flow_wait: # 2. Retro-spawn on completed outputs and continue as merged flow. LOG.info(f"[{itask}] spawning on pre-merge outputs") - itask.merge_flows(flow_nums) itask.flow_wait = False self.spawn_on_all_outputs(itask, completed_only=True) self.spawn_to_rh_limit( itask.tdef, itask.next_point(), itask.flow_nums) - else: - itask.merge_flows(flow_nums) diff --git a/cylc/flow/workflow_db_mgr.py b/cylc/flow/workflow_db_mgr.py index e8f82a27669..18265d67b79 100644 --- a/cylc/flow/workflow_db_mgr.py +++ b/cylc/flow/workflow_db_mgr.py @@ -431,14 +431,15 @@ def put_xtriggers(self, sat_xtrig): def put_update_task_state(self, itask): """Update task_states table for current state of itask. - For final event-driven update before removing finished tasks. - No need to update task_pool table as finished tasks are immediately - removed from the pool. + NOTE the task_states table is normally updated along with the task pool + table. This method is only needed as a final update for finished tasks, + when they get removed from the task_pool. """ set_args = { "time_updated": itask.state.time_updated, "status": itask.state.status, - "flow_wait": itask.flow_wait + "flow_wait": itask.flow_wait, + "is_manual_submit": itask.is_manual_submit } where_args = { "cycle": str(itask.point), @@ -451,10 +452,15 @@ def put_update_task_state(self, itask): (set_args, where_args)) def put_task_pool(self, pool: 'TaskPool') -> None: - """Update various task tables for current pool, in runtime database. + """Delete task pool table content and recreate from current task pool. - Queue delete (everything) statements to wipe the tables, and queue the - relevant insert statements for the current tasks in the pool. + Also recreate: + - prerequisites table + - timeout timers table + - action timers table + + And update: + - task states table """ self.db_deletes_map[self.TABLE_TASK_POOL].append({}) # Comment this out to retain the trigger-time prereq status of past diff --git a/tests/flakyfunctional/database/00-simple/schema.out b/tests/flakyfunctional/database/00-simple/schema.out index ac62d48639d..814faac2a59 100644 --- a/tests/flakyfunctional/database/00-simple/schema.out +++ b/tests/flakyfunctional/database/00-simple/schema.out @@ -10,7 +10,7 @@ CREATE TABLE task_late_flags(cycle TEXT, name TEXT, value INTEGER, PRIMARY KEY(c CREATE TABLE task_outputs(cycle TEXT, name TEXT, flow_nums TEXT, outputs TEXT, PRIMARY KEY(cycle, name, flow_nums)); CREATE TABLE task_pool(cycle TEXT, name TEXT, flow_nums TEXT, status TEXT, is_held INTEGER, PRIMARY KEY(cycle, name, flow_nums)); CREATE TABLE task_prerequisites(cycle TEXT, name TEXT, flow_nums TEXT, prereq_name TEXT, prereq_cycle TEXT, prereq_output TEXT, satisfied TEXT, PRIMARY KEY(cycle, name, flow_nums, prereq_name, prereq_cycle, prereq_output)); -CREATE TABLE task_states(name TEXT, cycle TEXT, flow_nums TEXT, time_created TEXT, time_updated TEXT, submit_num INTEGER, status TEXT, flow_wait INTEGER, PRIMARY KEY(name, cycle, flow_nums)); +CREATE TABLE task_states(name TEXT, cycle TEXT, flow_nums TEXT, time_created TEXT, time_updated TEXT, submit_num INTEGER, status TEXT, flow_wait INTEGER, is_manual_submit INTEGER, PRIMARY KEY(name, cycle, flow_nums)); CREATE TABLE task_timeout_timers(cycle TEXT, name TEXT, timeout REAL, PRIMARY KEY(cycle, name)); CREATE TABLE tasks_to_hold(name TEXT, cycle TEXT); CREATE TABLE workflow_flows(flow_num INTEGER, start_time TEXT, description TEXT, PRIMARY KEY(flow_num)); diff --git a/tests/functional/flow-triggers/11-wait-merge.t b/tests/functional/flow-triggers/11-wait-merge.t index 25b6443776a..cb3218ae463 100644 --- a/tests/functional/flow-triggers/11-wait-merge.t +++ b/tests/functional/flow-triggers/11-wait-merge.t @@ -34,8 +34,8 @@ cmp_ok "${TEST_NAME}.stdout" <<\__END__ 1|b|[1]|["submitted", "started", "succeeded"] 1|a|[2]|["submitted", "started", "succeeded"] 1|c|[2]|["submitted", "started", "x"] -1|x|[1, 2]|["submitted", "started", "succeeded"] 1|c|[1, 2]|["submitted", "started", "succeeded", "x"] +1|x|[1, 2]|["submitted", "started", "succeeded"] 1|d|[1, 2]|["submitted", "started", "succeeded"] 1|b|[2]|["submitted", "started", "succeeded"] __END__ diff --git a/tests/functional/job-submission/01-job-nn-localhost/db.sqlite3 b/tests/functional/job-submission/01-job-nn-localhost/db.sqlite3 index 9453b6960e3..15d9e84418f 100644 --- a/tests/functional/job-submission/01-job-nn-localhost/db.sqlite3 +++ b/tests/functional/job-submission/01-job-nn-localhost/db.sqlite3 @@ -18,8 +18,8 @@ CREATE TABLE task_late_flags(cycle TEXT, name TEXT, value INTEGER, PRIMARY KEY(c CREATE TABLE task_outputs(cycle TEXT, name TEXT, flow_nums TEXT, outputs TEXT, PRIMARY KEY(cycle, name, flow_nums)); CREATE TABLE task_pool(cycle TEXT, name TEXT, flow_nums TEXT, status TEXT, is_held INTEGER, PRIMARY KEY(cycle, name, flow_nums)); INSERT INTO task_pool VALUES('1','foo','["1", "2"]','waiting', 0); -CREATE TABLE task_states(name TEXT, cycle TEXT, flow_nums TEXT, time_created TEXT, time_updated TEXT, submit_num INTEGER, status TEXT, flow_wait INTEGER, PRIMARY KEY(name, cycle, flow_nums)); -INSERT INTO task_states VALUES('foo','1','["1", "2"]', '2019-06-14T11:30:16+01:00','2019-06-14T11:40:24+01:00',99,'waiting','0'); +CREATE TABLE task_states(name TEXT, cycle TEXT, flow_nums TEXT, time_created TEXT, time_updated TEXT, submit_num INTEGER, status TEXT, flow_wait INTEGER, is_manual_submit INTEGER, PRIMARY KEY(name, cycle, flow_nums)); +INSERT INTO task_states VALUES('foo','1','["1", "2"]', '2019-06-14T11:30:16+01:00','2019-06-14T11:40:24+01:00',99,'waiting','0', '0'); CREATE TABLE task_prerequisites(cycle TEXT, name TEXT, flow_nums TEXT, prereq_name TEXT, prereq_cycle TEXT, prereq_output TEXT, satisfied TEXT, PRIMARY KEY(cycle, name, flow_nums, prereq_name, prereq_cycle, prereq_output)); CREATE TABLE task_timeout_timers(cycle TEXT, name TEXT, timeout REAL, PRIMARY KEY(cycle, name)); CREATE TABLE xtriggers(signature TEXT, results TEXT, PRIMARY KEY(signature)); diff --git a/tests/functional/restart/57-ghost-job/db.sqlite3 b/tests/functional/restart/57-ghost-job/db.sqlite3 index d6837d6bd0c..4230831602f 100644 --- a/tests/functional/restart/57-ghost-job/db.sqlite3 +++ b/tests/functional/restart/57-ghost-job/db.sqlite3 @@ -19,8 +19,8 @@ INSERT INTO task_outputs VALUES('1','foo','[1]','[]'); CREATE TABLE task_pool(cycle TEXT, name TEXT, flow_nums TEXT, status TEXT, is_held INTEGER, PRIMARY KEY(cycle, name, flow_nums)); INSERT INTO task_pool VALUES('1','foo','[1]','preparing',0); CREATE TABLE task_prerequisites(cycle TEXT, name TEXT, flow_nums TEXT, prereq_name TEXT, prereq_cycle TEXT, prereq_output TEXT, satisfied TEXT, PRIMARY KEY(cycle, name, flow_nums, prereq_name, prereq_cycle, prereq_output)); -CREATE TABLE task_states(name TEXT, cycle TEXT, flow_nums TEXT, time_created TEXT, time_updated TEXT, submit_num INTEGER, status TEXT, flow_wait INTEGER, PRIMARY KEY(name, cycle, flow_nums)); -INSERT INTO task_states VALUES('foo','1','[1]','2022-07-25T16:18:23+01:00','2022-07-25T16:18:23+01:00',1,'preparing',NULL); +CREATE TABLE task_states(name TEXT, cycle TEXT, flow_nums TEXT, time_created TEXT, time_updated TEXT, submit_num INTEGER, status TEXT, flow_wait INTEGER, is_manual_submit INTEGER, PRIMARY KEY(name, cycle, flow_nums)); +INSERT INTO task_states VALUES('foo','1','[1]','2022-07-25T16:18:23+01:00','2022-07-25T16:18:23+01:00',1,'preparing',NULL, '0'); CREATE TABLE task_timeout_timers(cycle TEXT, name TEXT, timeout REAL, PRIMARY KEY(cycle, name)); CREATE TABLE tasks_to_hold(name TEXT, cycle TEXT); CREATE TABLE workflow_flows(flow_num INTEGER, start_time TEXT, description TEXT, PRIMARY KEY(flow_num)); diff --git a/tests/functional/restart/58-waiting-manual-triggered.t b/tests/functional/restart/58-waiting-manual-triggered.t new file mode 100644 index 00000000000..efba9f42b70 --- /dev/null +++ b/tests/functional/restart/58-waiting-manual-triggered.t @@ -0,0 +1,47 @@ +#!/bin/bash +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +#------------------------------------------------------------------------------- +# Test that a task manually triggered just before shutdown will run on restart. + +. "$(dirname "$0")/test_header" + +set_test_number 6 + +install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}" + +run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}" + +workflow_run_ok "${TEST_NAME_BASE}-run" cylc play --no-detach "${WORKFLOW_NAME}" + +DB_FILE="${WORKFLOW_RUN_DIR}/log/db" + +# It should have shut down with 2/foo waiting with the is_manual_submit flag on. +TEST_NAME="${TEST_NAME_BASE}-db-task-states" +QUERY='SELECT status, is_manual_submit FROM task_states WHERE cycle IS 2;' +run_ok "$TEST_NAME" sqlite3 "$DB_FILE" "$QUERY" +cmp_ok "${TEST_NAME}.stdout" << '__EOF__' +waiting|1 +__EOF__ + +# It should restart and shut down normally, not stall with 2/foo waiting on 1/foo. +workflow_run_ok "${TEST_NAME_BASE}-restart" cylc play --no-detach "${WORKFLOW_NAME}" +# Check that 2/foo job 02 did run before shutdown. +grep_workflow_log_ok "${TEST_NAME_BASE}-grep" "\[2\/foo running job:02 flows:1\] => succeeded" + +purge +exit diff --git a/tests/functional/restart/58-waiting-manual-triggered/flow.cylc b/tests/functional/restart/58-waiting-manual-triggered/flow.cylc new file mode 100644 index 00000000000..ea5f47c46d7 --- /dev/null +++ b/tests/functional/restart/58-waiting-manual-triggered/flow.cylc @@ -0,0 +1,22 @@ +[scheduler] + [[events]] + stall timeout = PT0S + abort on stall timeout = True +[scheduling] + cycling mode = integer + runahead limit = P1 + final cycle point = 3 + [[graph]] + P1 = foo[-P1] => foo +[runtime] + [[foo]] + script = """ + if (( CYLC_TASK_CYCLE_POINT == 3 )); then + # Order a normal shutdown: no more job submissions, and shut + # down after active jobs (i.e. this one) finish. + cylc stop "$CYLC_WORKFLOW_ID" + # Force-trigger 2/foo before shutdown. On restart it should be + # in the waiting state with the force-triggered flag set. + cylc trigger "${CYLC_WORKFLOW_ID}//2/foo" + fi + """