Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

8.0.x #5158

Merged
merged 5 commits into from
Sep 27, 2022
Merged

8.0.x #5158

Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Db store force triggered (#5023)
Store force-triggered flag in the run DB.

* Add a new functional test.
* Remove some redundant DB updates.
* Update change log.
  • Loading branch information
hjoliver committed Sep 27, 2022
commit 20bc9a097668fd365163a9c87d03b6e5ee3c54c5
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@ creating a new release entry be sure to copy & paste the span tag with the
`actions:bind` attribute, which is used by a regex to find the text to be
updated. Only the first match gets replaced, so it's fine to leave the old
ones in. -->

-------------------------------------------------------------------------------
## __cylc-8.0.3 (<span actions:bind='release-date'>Pending YYYY-MM-DD</span>)__

Maintenance release.

### Fixes

[#5023](https://github.com/cylc/cylc-flow/pull/5023) - tasks force-triggered
after a shutdown was ordered should submit to run immediately on restart.

[#5137](https://github.com/cylc/cylc-flow/pull/5137) -
Install the `ana/` directory to remote platforms by default.
Expand Down
6 changes: 4 additions & 2 deletions cylc/flow/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ class CylcWorkflowDAO:
["submit_num", {"datatype": "INTEGER"}],
["status"],
["flow_wait", {"datatype": "INTEGER"}],
["is_manual_submit", {"datatype": "INTEGER"}],
],
TABLE_TASK_TIMEOUT_TIMERS: [
["cycle", {"is_primary_key": True}],
Expand Down Expand Up @@ -802,14 +803,15 @@ def select_task_pool_for_restart(self, callback):
"""Select from task_pool+task_states+task_jobs for restart.

Invoke callback(row_idx, row) on each row, where each row contains:
[cycle, name, is_late, status, is_held, submit_num,
try_num, platform_name, time_submit, time_run, timeout, outputs]
the fields in the SELECT statement below.
"""
form_stmt = r"""
SELECT
%(task_pool)s.cycle,
%(task_pool)s.name,
%(task_pool)s.flow_nums,
%(task_states)s.flow_wait,
%(task_states)s.is_manual_submit,
%(task_late_flags)s.value,
%(task_pool)s.status,
%(task_pool)s.is_held,
Expand Down
1 change: 0 additions & 1 deletion cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,6 @@ def prep_submit_task_jobs(self, workflow, itasks, check_syntax=True):
itask.submit_num += 1
itask.state_reset(TASK_STATUS_PREPARING)
self.data_store_mgr.delta_task_state(itask)
self.workflow_db_mgr.put_update_task_state(itask)
prep_task = self._prep_submit_task_job(
workflow, itask, check_syntax=check_syntax)
if prep_task:
Expand Down
80 changes: 40 additions & 40 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,32 @@ def load_from_point(self):
point = tdef.first_point(self.config.start_point)
self.spawn_to_rh_limit(tdef, point, {flow_num})

def add_to_pool(self, itask, is_new: bool = True) -> None:
def db_add_new_flow_rows(self, itask: TaskProxy) -> None:
"""Add new rows to DB task tables that record flow_nums.

Call when a new task is spawned or a flow merge occurs.
"""
# Add row to task_states table.
now = get_current_time_string()
self.workflow_db_mgr.put_insert_task_states(
itask,
{
"time_created": now,
"time_updated": now,
"status": itask.state.status,
"flow_nums": serialise(itask.flow_nums),
"flow_wait": itask.flow_wait,
"is_manual_submit": itask.is_manual_submit
}
)
# Add row to task_outputs table:
self.workflow_db_mgr.put_insert_task_outputs(itask)

def add_to_pool(self, itask) -> None:
"""Add a task to the hidden (if not satisfied) or main task pool.

If the task already exists in the hidden pool and is satisfied, move it
to the main pool.

(is_new is False inidcates load from DB at restart).
"""
if itask.is_task_prereqs_not_done() and not itask.is_manual_submit:
# Add to hidden pool if not satisfied.
Expand All @@ -205,21 +224,6 @@ def add_to_pool(self, itask, is_new: bool = True) -> None:

self.create_data_store_elements(itask)

if is_new:
# Add row to "task_states" table.
now = get_current_time_string()
self.workflow_db_mgr.put_insert_task_states(
itask,
{
"time_created": now,
"time_updated": now,
"status": itask.state.status,
"flow_nums": serialise(itask.flow_nums)
}
)
# Add row to "task_outputs" table:
self.workflow_db_mgr.put_insert_task_outputs(itask)

if itask.tdef.max_future_prereq_offset is not None:
# (Must do this once added to the pool).
self.set_max_future_offset()
Expand Down Expand Up @@ -416,9 +420,9 @@ def load_db_task_pool_for_restart(self, row_idx, row):
if row_idx == 0:
LOG.info("LOADING task proxies")
# Create a task proxy corresponding to this DB entry.
(cycle, name, flow_nums, is_late, status, is_held, submit_num, _,
platform_name, time_submit, time_run, timeout, outputs_str) = row

(cycle, name, flow_nums, flow_wait, is_manual_submit, is_late, status,
is_held, submit_num, _, platform_name, time_submit, time_run, timeout,
outputs_str) = row
try:
itask = TaskProxy(
self.config.get_taskdef(name),
Expand All @@ -427,7 +431,9 @@ def load_db_task_pool_for_restart(self, row_idx, row):
status=status,
is_held=is_held,
submit_num=submit_num,
is_late=bool(is_late)
is_late=bool(is_late),
flow_wait=bool(flow_wait),
is_manual_submit=bool(is_manual_submit)
)
except WorkflowConfigError:
LOG.exception(
Expand Down Expand Up @@ -491,7 +497,7 @@ def load_db_task_pool_for_restart(self, row_idx, row):

if itask.state_reset(status, is_runahead=True):
self.data_store_mgr.delta_task_runahead(itask)
self.add_to_pool(itask, is_new=False)
self.add_to_pool(itask)

# All tasks load as runahead-limited, but finished and manually
# triggered tasks (incl. --start-task's) can be released now.
Expand Down Expand Up @@ -628,8 +634,9 @@ def _get_spawned_or_merged_task(

def spawn_to_rh_limit(self, tdef, point, flow_nums) -> None:
"""Spawn parentless task instances from point to runahead limit."""
if not flow_nums:
# force-triggered no-flow task.
if not flow_nums or point is None:
# Force-triggered no-flow task.
# Or called with an invalid next_point.
return
if self.runahead_limit_point is None:
self.compute_runahead()
Expand Down Expand Up @@ -1205,14 +1212,6 @@ def spawn_on_output(self, itask, output, forced=False):
if c_task is not None and c_task != itask:
# (Avoid self-suicide: A => !A)
self.merge_flows(c_task, itask.flow_nums)
self.workflow_db_mgr.put_insert_task_states(
c_task,
{
"status": c_task.state.status,
"flow_nums": serialise(c_task.flow_nums)
}
)
# self.workflow_db_mgr.process_queued_ops()
elif (
c_task is None
and (itask.flow_nums or forced)
Expand Down Expand Up @@ -1482,6 +1481,7 @@ def spawn_task(
return None

LOG.info(f"[{itask}] spawned")
self.db_add_new_flow_rows(itask)
return itask

def force_spawn_children(
Expand Down Expand Up @@ -1588,7 +1588,6 @@ def force_trigger_tasks(
)
if itask is None:
continue
self.add_to_pool(itask, is_new=True)
itasks.append(itask)

# Trigger matched tasks if not already active.
Expand Down Expand Up @@ -1616,7 +1615,6 @@ def force_trigger_tasks(
# De-queue it to run now.
self.task_queue_mgr.force_release_task(itask)

self.workflow_db_mgr.put_update_task_state(itask)
return len(unmatched)

def sim_time_check(self, message_queue):
Expand Down Expand Up @@ -1919,24 +1917,26 @@ def merge_flows(self, itask: TaskProxy, flow_nums: 'FlowNums') -> None:
# and via suicide triggers ("A =>!A": A tries to spawn itself).
return

merge_with_no_flow = not itask.flow_nums

itask.merge_flows(flow_nums)
# Merged tasks get a new row in the db task_states table.
self.db_add_new_flow_rows(itask)

if (
itask.state(*TASK_STATUSES_FINAL)
and itask.state.outputs.get_incomplete()
):
# Re-queue incomplete task to run again in the merged flow.
LOG.info(f"[{itask}] incomplete task absorbed by new flow.")
itask.merge_flows(flow_nums)
itask.state_reset(TASK_STATUS_WAITING)
self.queue_task(itask)
self.data_store_mgr.delta_task_state(itask)

elif not itask.flow_nums or itask.flow_wait:
elif merge_with_no_flow or itask.flow_wait:
# 2. Retro-spawn on completed outputs and continue as merged flow.
LOG.info(f"[{itask}] spawning on pre-merge outputs")
itask.merge_flows(flow_nums)
itask.flow_wait = False
self.spawn_on_all_outputs(itask, completed_only=True)
self.spawn_to_rh_limit(
itask.tdef, itask.next_point(), itask.flow_nums)
else:
itask.merge_flows(flow_nums)
20 changes: 13 additions & 7 deletions cylc/flow/workflow_db_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,14 +431,15 @@ def put_xtriggers(self, sat_xtrig):
def put_update_task_state(self, itask):
"""Update task_states table for current state of itask.

For final event-driven update before removing finished tasks.
No need to update task_pool table as finished tasks are immediately
removed from the pool.
NOTE the task_states table is normally updated along with the task pool
table. This method is only needed as a final update for finished tasks,
when they get removed from the task_pool.
"""
set_args = {
"time_updated": itask.state.time_updated,
"status": itask.state.status,
"flow_wait": itask.flow_wait
"flow_wait": itask.flow_wait,
"is_manual_submit": itask.is_manual_submit
}
where_args = {
"cycle": str(itask.point),
Expand All @@ -451,10 +452,15 @@ def put_update_task_state(self, itask):
(set_args, where_args))

def put_task_pool(self, pool: 'TaskPool') -> None:
"""Update various task tables for current pool, in runtime database.
"""Delete task pool table content and recreate from current task pool.

Queue delete (everything) statements to wipe the tables, and queue the
relevant insert statements for the current tasks in the pool.
Also recreate:
- prerequisites table
- timeout timers table
- action timers table

And update:
- task states table
"""
self.db_deletes_map[self.TABLE_TASK_POOL].append({})
# Comment this out to retain the trigger-time prereq status of past
Expand Down
2 changes: 1 addition & 1 deletion tests/flakyfunctional/database/00-simple/schema.out
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ CREATE TABLE task_late_flags(cycle TEXT, name TEXT, value INTEGER, PRIMARY KEY(c
CREATE TABLE task_outputs(cycle TEXT, name TEXT, flow_nums TEXT, outputs TEXT, PRIMARY KEY(cycle, name, flow_nums));
CREATE TABLE task_pool(cycle TEXT, name TEXT, flow_nums TEXT, status TEXT, is_held INTEGER, PRIMARY KEY(cycle, name, flow_nums));
CREATE TABLE task_prerequisites(cycle TEXT, name TEXT, flow_nums TEXT, prereq_name TEXT, prereq_cycle TEXT, prereq_output TEXT, satisfied TEXT, PRIMARY KEY(cycle, name, flow_nums, prereq_name, prereq_cycle, prereq_output));
CREATE TABLE task_states(name TEXT, cycle TEXT, flow_nums TEXT, time_created TEXT, time_updated TEXT, submit_num INTEGER, status TEXT, flow_wait INTEGER, PRIMARY KEY(name, cycle, flow_nums));
CREATE TABLE task_states(name TEXT, cycle TEXT, flow_nums TEXT, time_created TEXT, time_updated TEXT, submit_num INTEGER, status TEXT, flow_wait INTEGER, is_manual_submit INTEGER, PRIMARY KEY(name, cycle, flow_nums));
CREATE TABLE task_timeout_timers(cycle TEXT, name TEXT, timeout REAL, PRIMARY KEY(cycle, name));
CREATE TABLE tasks_to_hold(name TEXT, cycle TEXT);
CREATE TABLE workflow_flows(flow_num INTEGER, start_time TEXT, description TEXT, PRIMARY KEY(flow_num));
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/flow-triggers/11-wait-merge.t
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ cmp_ok "${TEST_NAME}.stdout" <<\__END__
1|b|[1]|["submitted", "started", "succeeded"]
1|a|[2]|["submitted", "started", "succeeded"]
1|c|[2]|["submitted", "started", "x"]
1|x|[1, 2]|["submitted", "started", "succeeded"]
1|c|[1, 2]|["submitted", "started", "succeeded", "x"]
1|x|[1, 2]|["submitted", "started", "succeeded"]
1|d|[1, 2]|["submitted", "started", "succeeded"]
1|b|[2]|["submitted", "started", "succeeded"]
__END__
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ CREATE TABLE task_late_flags(cycle TEXT, name TEXT, value INTEGER, PRIMARY KEY(c
CREATE TABLE task_outputs(cycle TEXT, name TEXT, flow_nums TEXT, outputs TEXT, PRIMARY KEY(cycle, name, flow_nums));
CREATE TABLE task_pool(cycle TEXT, name TEXT, flow_nums TEXT, status TEXT, is_held INTEGER, PRIMARY KEY(cycle, name, flow_nums));
INSERT INTO task_pool VALUES('1','foo','["1", "2"]','waiting', 0);
CREATE TABLE task_states(name TEXT, cycle TEXT, flow_nums TEXT, time_created TEXT, time_updated TEXT, submit_num INTEGER, status TEXT, flow_wait INTEGER, PRIMARY KEY(name, cycle, flow_nums));
INSERT INTO task_states VALUES('foo','1','["1", "2"]', '2019-06-14T11:30:16+01:00','2019-06-14T11:40:24+01:00',99,'waiting','0');
CREATE TABLE task_states(name TEXT, cycle TEXT, flow_nums TEXT, time_created TEXT, time_updated TEXT, submit_num INTEGER, status TEXT, flow_wait INTEGER, is_manual_submit INTEGER, PRIMARY KEY(name, cycle, flow_nums));
INSERT INTO task_states VALUES('foo','1','["1", "2"]', '2019-06-14T11:30:16+01:00','2019-06-14T11:40:24+01:00',99,'waiting','0', '0');
CREATE TABLE task_prerequisites(cycle TEXT, name TEXT, flow_nums TEXT, prereq_name TEXT, prereq_cycle TEXT, prereq_output TEXT, satisfied TEXT, PRIMARY KEY(cycle, name, flow_nums, prereq_name, prereq_cycle, prereq_output));
CREATE TABLE task_timeout_timers(cycle TEXT, name TEXT, timeout REAL, PRIMARY KEY(cycle, name));
CREATE TABLE xtriggers(signature TEXT, results TEXT, PRIMARY KEY(signature));
Expand Down
4 changes: 2 additions & 2 deletions tests/functional/restart/57-ghost-job/db.sqlite3
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ INSERT INTO task_outputs VALUES('1','foo','[1]','[]');
CREATE TABLE task_pool(cycle TEXT, name TEXT, flow_nums TEXT, status TEXT, is_held INTEGER, PRIMARY KEY(cycle, name, flow_nums));
INSERT INTO task_pool VALUES('1','foo','[1]','preparing',0);
CREATE TABLE task_prerequisites(cycle TEXT, name TEXT, flow_nums TEXT, prereq_name TEXT, prereq_cycle TEXT, prereq_output TEXT, satisfied TEXT, PRIMARY KEY(cycle, name, flow_nums, prereq_name, prereq_cycle, prereq_output));
CREATE TABLE task_states(name TEXT, cycle TEXT, flow_nums TEXT, time_created TEXT, time_updated TEXT, submit_num INTEGER, status TEXT, flow_wait INTEGER, PRIMARY KEY(name, cycle, flow_nums));
INSERT INTO task_states VALUES('foo','1','[1]','2022-07-25T16:18:23+01:00','2022-07-25T16:18:23+01:00',1,'preparing',NULL);
CREATE TABLE task_states(name TEXT, cycle TEXT, flow_nums TEXT, time_created TEXT, time_updated TEXT, submit_num INTEGER, status TEXT, flow_wait INTEGER, is_manual_submit INTEGER, PRIMARY KEY(name, cycle, flow_nums));
INSERT INTO task_states VALUES('foo','1','[1]','2022-07-25T16:18:23+01:00','2022-07-25T16:18:23+01:00',1,'preparing',NULL, '0');
CREATE TABLE task_timeout_timers(cycle TEXT, name TEXT, timeout REAL, PRIMARY KEY(cycle, name));
CREATE TABLE tasks_to_hold(name TEXT, cycle TEXT);
CREATE TABLE workflow_flows(flow_num INTEGER, start_time TEXT, description TEXT, PRIMARY KEY(flow_num));
Expand Down
47 changes: 47 additions & 0 deletions tests/functional/restart/58-waiting-manual-triggered.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#!/bin/bash
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

#-------------------------------------------------------------------------------
# Test that a task manually triggered just before shutdown will run on restart.

. "$(dirname "$0")/test_header"

set_test_number 6

install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}"

run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}"

workflow_run_ok "${TEST_NAME_BASE}-run" cylc play --no-detach "${WORKFLOW_NAME}"

DB_FILE="${WORKFLOW_RUN_DIR}/log/db"

# It should have shut down with 2/foo waiting with the is_manual_submit flag on.
TEST_NAME="${TEST_NAME_BASE}-db-task-states"
QUERY='SELECT status, is_manual_submit FROM task_states WHERE cycle IS 2;'
run_ok "$TEST_NAME" sqlite3 "$DB_FILE" "$QUERY"
cmp_ok "${TEST_NAME}.stdout" << '__EOF__'
waiting|1
__EOF__

# It should restart and shut down normally, not stall with 2/foo waiting on 1/foo.
workflow_run_ok "${TEST_NAME_BASE}-restart" cylc play --no-detach "${WORKFLOW_NAME}"
# Check that 2/foo job 02 did run before shutdown.
grep_workflow_log_ok "${TEST_NAME_BASE}-grep" "\[2\/foo running job:02 flows:1\] => succeeded"

purge
exit
22 changes: 22 additions & 0 deletions tests/functional/restart/58-waiting-manual-triggered/flow.cylc
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[scheduler]
[[events]]
stall timeout = PT0S
abort on stall timeout = True
[scheduling]
cycling mode = integer
runahead limit = P1
final cycle point = 3
[[graph]]
P1 = foo[-P1] => foo
[runtime]
[[foo]]
script = """
if (( CYLC_TASK_CYCLE_POINT == 3 )); then
# Order a normal shutdown: no more job submissions, and shut
# down after active jobs (i.e. this one) finish.
cylc stop "$CYLC_WORKFLOW_ID"
# Force-trigger 2/foo before shutdown. On restart it should be
# in the waiting state with the force-triggered flag set.
cylc trigger "${CYLC_WORKFLOW_ID}//2/foo"
fi
"""