Skip to content

Commit

Permalink
Merge pull request #3863 from MetRonnie/prereq-db
Browse files Browse the repository at this point in the history
Store task prerequisites in their own DB table
  • Loading branch information
hjoliver authored Nov 4, 2020
2 parents 0a87217 + 8b1f295 commit 06455a2
Show file tree
Hide file tree
Showing 18 changed files with 313 additions and 534 deletions.
11 changes: 8 additions & 3 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ Cylc 8.0aX (alpha) releases are not compatible with Cylc 7 or with previous
8.0aX releases, as the API is still under heavy development.

The Cylc server program and CLI codebase is now a Python 3 package that can be
installed from PyPI with `pip` (see #2990), and has been renamed to
installed from PyPI with `pip` (see
[#2990](https://github.com/cylc/cylc-flow/pull/2990)), and has been renamed to
`cylc-flow`. The name `cylc` is now used as a native Python package namespace
to allow other projects to re-use it and extend Cylc with plug-ins.

Expand All @@ -26,13 +27,16 @@ have been removed, and `cylc graph` is only retained for text output
used in tests (it will be re-implemented in the new web UI).

The xtrigger examples were moved to a separate `cylc/cylc-xtriggers` project
(see #3123).
(see [#3123](https://github.com/cylc/cylc-flow/pull/3123)).

Jinja filters were moved from its `Jinja2Filters` folder to within the `cylc`
namespace, under `cylc.jinja.filters`.

Cylc Review was also removed in this version.

Cylc 7 suites cannot be restarted in Cylc 8 using `cylc restart`, but they
can still be run using `cylc run` ([#3863](https://github.com/cylc/cylc-flow/pull/3863)).

-------------------------------------------------------------------------------
## __cylc-8.0a3 (2020-08?)__

Expand Down Expand Up @@ -62,7 +66,8 @@ pyzmq.
command name to `cylc set-outputs` to better reflect its role in Cylc 8.

[#3796](https://github.com/cylc/cylc-flow/pull/3796) - Remote installation is
now on a per install target rather than a per platform basis. app/ bin/ etc/ lib/ directories are now installed on the target, configurable in flow.cylc.
now on a per install target rather than a per platform basis. `app/`, `bin/`,
`etc/`, `lib/` directories are now installed on the target, configurable in flow.cylc.

[#3724](https://github.com/cylc/cylc-flow/pull/3724) - Re-implemented
the `cylc scan` command line interface and added a Python API for accessing
Expand Down
17 changes: 13 additions & 4 deletions cylc/flow/prerequisite.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,16 @@


class Prerequisite:
"""The concrete result of an abstract logical trigger expression."""
"""The concrete result of an abstract logical trigger expression.
A single TaskProxy can have multiple Prerequisites, all of which require
satisfying. This corresponds to multiple tasks being dependencies of a task
in Cylc graphs (e.g. `a => c`, `b => c`). But a single Prerequisite can
also have multiple 'messages' (basically, subcomponents of a Prerequisite)
corresponding to parenthesised expressions in Cylc graphs (e.g.
`(a & b) => c` or `(a | b) => c`). For the OR operator (`|`), only one
message has to be satisfied for the Prerequisite to be satisfied.
"""

# Memory optimization - constrain possible attributes to this list.
__slots__ = ["satisfied", "_all_satisfied",
Expand Down Expand Up @@ -156,7 +165,7 @@ def is_satisfied(self):
return self._all_satisfied
else:
# No cached value.
if not self.satisfied:
if self.satisfied == {}:
# No prerequisites left after pre-initial simplification.
return True
if self.conditional_expression:
Expand Down Expand Up @@ -276,7 +285,7 @@ def set_satisfied(self):
self._all_satisfied = self._conditional_is_satisfied()

def set_not_satisfied(self):
"""Force this prerequiste into the un-satisfied state.
"""Force this prerequisite into the un-satisfied state.
State can be overridden by calling `self.satisfy_me`.
Expand All @@ -301,6 +310,6 @@ def get_resolved_dependencies(self):
E.G: ['foo.1', 'bar.2']
"""
return ['%s.%s' % (name, point) for
return [f'{name}.{point}' for
(name, point, _), satisfied in self.satisfied.items() if
satisfied == self.DEP_STATE_SATISFIED]
230 changes: 26 additions & 204 deletions cylc/flow/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,13 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Provide data access object for the suite runtime database."""

import re
import sqlite3
import traceback
from os.path import expandvars

from cylc.flow import LOG
import cylc.flow.flags
from cylc.flow.task_state import TASK_STATUS_WAITING
from cylc.flow.wallclock import get_current_time_string
from cylc.flow.platforms import platform_from_job_info
from cylc.flow.cfgspec.glbl_cfg import glbl_cfg


class CylcSuiteDAOTableColumn:
Expand Down Expand Up @@ -172,6 +168,7 @@ class CylcSuiteDAO:
CONN_TIMEOUT = 0.2
DB_FILE_BASE_NAME = "db"
MAX_TRIES = 100
RESTART_INCOMPAT_VERSION = "8.0a2" # Can't restart suite if <= this vers
CHECKPOINT_LATEST_ID = 0
CHECKPOINT_LATEST_EVENT = "latest"
TABLE_BROADCAST_EVENTS = "broadcast_events"
Expand All @@ -189,6 +186,7 @@ class CylcSuiteDAO:
TABLE_TASK_OUTPUTS = "task_outputs"
TABLE_TASK_POOL = "task_pool"
TABLE_TASK_POOL_CHECKPOINTS = "task_pool_checkpoints"
TABLE_TASK_PREREQUISITES = "task_prerequisites"
TABLE_TASK_STATES = "task_states"
TABLE_TASK_TIMEOUT_TIMERS = "task_timeout_timers"
TABLE_XTRIGGERS = "xtriggers"
Expand Down Expand Up @@ -288,9 +286,16 @@ class CylcSuiteDAO:
["name", {"is_primary_key": True}],
["flow_label", {"is_primary_key": True}],
["status"],
["satisfied"],
["is_held", {"datatype": "INTEGER"}],
],
TABLE_TASK_PREREQUISITES: [
["cycle", {"is_primary_key": True}],
["name", {"is_primary_key": True}],
["prereq_name", {"is_primary_key": True}],
["prereq_cycle", {"is_primary_key": True}],
["prereq_output", {"is_primary_key": True}],
["satisfied"],
],
TABLE_XTRIGGERS: [
["signature", {"is_primary_key": True}],
["results"],
Expand All @@ -301,7 +306,6 @@ class CylcSuiteDAO:
["name", {"is_primary_key": True}],
["flow_label", {"is_primary_key": True}],
["status"],
["satisfied"],
["is_held", {"datatype": "INTEGER"}],
],
TABLE_TASK_STATES: [
Expand Down Expand Up @@ -757,7 +761,6 @@ def select_task_pool_for_restart(self, callback, id_key=None):
%(task_pool)s.flow_label,
%(task_late_flags)s.value,
%(task_pool)s.status,
%(task_pool)s.satisfied,
%(task_pool)s.is_held,
%(task_states)s.submit_num,
%(task_jobs)s.try_num,
Expand Down Expand Up @@ -809,6 +812,22 @@ def select_task_pool_for_restart(self, callback, id_key=None):
for row_idx, row in enumerate(self.connect().execute(stmt, stmt_args)):
callback(row_idx, list(row))

def select_task_prerequisites(self, cycle, name):
"""Return prerequisites of a task of the given name & cycle point."""
stmt = f"""
SELECT
prereq_name,
prereq_cycle,
prereq_output,
satisfied
FROM
{self.TABLE_TASK_PREREQUISITES}
WHERE
cycle == '{cycle}' AND
name == '{name}'
"""
return list(self.connect().execute(stmt))

def select_task_times(self):
"""Select submit/start/stop times to compute job timings.
Expand Down Expand Up @@ -916,200 +935,3 @@ def remove_columns(self, table, to_drop):

# done
conn.commit()

def upgrade_retry_state(self):
"""Replace the retry state with xtriggers.
* Change *retrying tasks to waiting
* Add the required xtrigger
Note:
The retry status can be safely removed as this is really a display
state, the retry logic revolves around the TaskActionTimer.
From:
cylc<8
To:
cylc>=8
PR:
#3423
Returns:
list - (cycle, name, status) tuples of all retrying tasks.
"""
conn = self.connect()

for table in [self.TABLE_TASK_POOL_CHECKPOINTS, self.TABLE_TASK_POOL]:
tasks = list(conn.execute(
rf'''
SELECT
cycle, name, status
FROM
{table}
WHERE
status IN ('retrying', 'submit-retrying')
'''
))
if tasks:
LOG.info(f'Upgrade retrying tasks in table {table}')
conn.executemany(
rf'''
UPDATE
{table}
SET
status='{TASK_STATUS_WAITING}'
WHERE
cycle==?
and name==?
and status==?
''',
tasks
)
conn.commit()
return tasks

def upgrade_is_held(self):
"""Upgrade hold_swap => is_held.
* Add a is_held column.
* Set status and is_held as per the new schema.
* Set the swap_hold values to None
(bacause sqlite3 does not support DROP COLUMN)
From:
cylc<8
To:
cylc>=8
PR:
#3230
Returns:
bool - True if upgrade performed, False if upgrade skipped.
"""
conn = self.connect()

# check if upgrade required
schema = conn.execute(rf'PRAGMA table_info({self.TABLE_TASK_POOL})')
for _, name, *_ in schema:
if name == 'is_held':
LOG.debug('is_held column present - skipping db upgrade')
return False

# perform upgrade
for table in [self.TABLE_TASK_POOL, self.TABLE_TASK_POOL_CHECKPOINTS]:
LOG.info('Upgrade hold_swap => is_held in %s', table)
conn.execute(
rf'''
ALTER TABLE
{table}
ADD COLUMN
is_held BOOL
'''
)
for cycle, name, status, hold_swap in conn.execute(rf'''
SELECT
cycle, name, status, hold_swap
FROM
{table}
'''):
if status == 'held':
new_status = hold_swap
is_held = True
elif hold_swap == 'held':
new_status = status
is_held = True
else:
new_status = status
is_held = False
conn.execute(
rf'''
UPDATE
{table}
SET
status=?,
is_held=?,
hold_swap=?
WHERE
cycle==?
AND name==?
''',
(new_status, is_held, None, cycle, name)
)
self.remove_columns(table, ['hold_swap'])
conn.commit()
return True

def upgrade_to_platforms(self):
"""upgrade [job]batch system and [remote]host to platform
* Add 'platform' and 'user' columns to table task_jobs.
* Remove 'user_at_host' and 'batch_sys_name' columns
Returns:
bool - True if upgrade performed, False if upgrade skipped.
"""
conn = self.connect()

# check if upgrade required
schema = conn.execute(rf'PRAGMA table_info({self.TABLE_TASK_JOBS})')
for _, name, *_ in schema:
if name == 'platform_name':
LOG.debug('platform_name column present - skipping db upgrade')
return False

# Perform upgrade:
table = self.TABLE_TASK_JOBS
LOG.info('Upgrade to Cylc 8 platforms syntax')
conn.execute(
rf'''
ALTER TABLE
{table}
ADD COLUMN
user TEXT
'''
)
conn.execute(
rf'''
ALTER TABLE
{table}
ADD COLUMN
platform_name TEXT
'''
)
job_platforms = glbl_cfg(cached=False).get(['platforms'])
for cycle, name, user_at_host, batch_system in conn.execute(rf'''
SELECT
cycle, name, user_at_host, batch_system
FROM
{table}
'''):
match = re.match(r"(?P<user>\S+)@(?P<host>\S+)", user_at_host)
if match:
user = match.group('user')
host = match.group('host')
else:
user = ''
host = user_at_host
platform = platform_from_job_info(
job_platforms,
{'batch system': batch_system},
{'host': host}
)
conn.execute(
rf'''
UPDATE
{table}
SET
user=?,
platform_name=?
WHERE
cycle==?
AND name==?
''',
(user, platform, cycle, name)
)
conn.commit()
return True
3 changes: 1 addition & 2 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,6 @@ async def install(self):
* Register.
* Install authentication files.
* Build the directory tree.
* Upgrade the DB if required.
* Copy Python files.
"""
Expand Down Expand Up @@ -416,7 +415,7 @@ async def configure(self):
"""
self.profiler.log_memory("scheduler.py: start configure")
if self.is_restart:
self.suite_db_mgr.restart_upgrade()
self.suite_db_mgr.on_restart()
# This logic handles lack of initial cycle point in "flow.cylc".
# Things that can't change on suite reload.
pri_dao = self.suite_db_mgr.get_pri_dao()
Expand Down
Loading

0 comments on commit 06455a2

Please sign in to comment.