From 6322f1554c9e79628d770ef1d97a135744484e41 Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Wed, 25 Jan 2023 13:25:56 +0000 Subject: [PATCH] id: pass tokens objects between interfaces * Change `TaskProxy.tokens` to hold the absolute ID rather than the relative ID to make the object useful in more situations. * Refactor the data_store_mgr interfaces to accept `Tokens` instances rather than raw inputs (e.g. cycle_point, task_name, etc). * This avoids doing `Tokens(str(tokens))` when passing context into the data store interfaces. --- CHANGES.md | 9 +- cylc/flow/data_store_mgr.py | 205 ++++++++++++----------- cylc/flow/scheduler.py | 6 +- cylc/flow/scripts/validate.py | 7 +- cylc/flow/task_events_mgr.py | 53 +++--- cylc/flow/task_job_mgr.py | 16 +- cylc/flow/task_pool.py | 20 ++- cylc/flow/task_proxy.py | 4 +- mypy.ini | 6 +- tests/integration/test_data_store_mgr.py | 9 +- tests/unit/test_id_match.py | 1 + tests/unit/test_xtrigger_mgr.py | 9 +- 12 files changed, 203 insertions(+), 142 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index b863a4ac5e..85e0f9505c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -11,7 +11,14 @@ updated. Only the first match gets replaced, so it's fine to leave the old ones in. --> ------------------------------------------------------------------------------- -## __cylc-8.1.1 (Released 2023-01-31)__ +## __cylc-8.2.0 (Coming Soon)__ + +### Fixes +[#5328](https://github.com/cylc/cylc-flow/pull/5328) - +Efficiency improvements to reduce task management overheads on the Scheduler. + +------------------------------------------------------------------------------- +## __cylc-8.1.1 (Coming Soon)__ ### Fixes diff --git a/cylc/flow/data_store_mgr.py b/cylc/flow/data_store_mgr.py index 819f9f1d41..965fb0a142 100644 --- a/cylc/flow/data_store_mgr.py +++ b/cylc/flow/data_store_mgr.py @@ -61,7 +61,13 @@ from copy import deepcopy import json from time import time -from typing import Union, Tuple, TYPE_CHECKING +from typing import ( + Any, + Optional, + TYPE_CHECKING, + Tuple, + Union, +) import zlib from cylc.flow import __version__ as CYLC_VERSION, LOG @@ -651,17 +657,17 @@ def generate_definition_elements(self): self.parents = parents def increment_graph_window( - self, - source_tokens, - point, - flow_nums, - edge_distance=0, - active_id=None, - descendant=False, - is_parent=False, - is_manual_submit=False, - itask=None - ): + self, + source_tokens: Tokens, + point, + flow_nums, + edge_distance=0, + active_id: Optional[str] = None, + descendant=False, + is_parent=False, + is_manual_submit=False, + itask=None + ) -> None: """Generate graph window about active task proxy to n-edge-distance. A recursive function, that creates a node then moves to children and @@ -686,7 +692,6 @@ def increment_graph_window( Active/Other task proxy, passed in with pool invocation. Returns: - None """ @@ -735,6 +740,8 @@ def increment_graph_window( edge_distance += 1 # Don't expand window about orphan task. + child_tokens: Tokens + parent_tokens: Tokens if not is_orphan: tdef = self.schd.config.taskdefs[source_tokens['task']] if graph_children is None: @@ -826,7 +833,12 @@ def increment_graph_window( getattr(self.updated[WORKFLOW], EDGES).edges.extend( self.n_window_edges[active_id]) - def generate_edge(self, parent_tokens, child_tokens, active_id): + def generate_edge( + self, + parent_tokens: Tokens, + child_tokens: Tokens, + active_id: str, + ) -> None: """Construct edge of child and parent task proxy node.""" # Initiate edge element. e_id = self.edge_id(parent_tokens, child_tokens) @@ -884,12 +896,12 @@ def add_pool_node(self, name, point): def generate_ghost_task( self, - tokens, + tokens: Tokens, point, flow_nums, is_parent=False, itask=None - ): + ) -> Tuple[bool, Optional[dict]]: """Create task-point element populated with static data. Args: @@ -902,8 +914,7 @@ def generate_ghost_task( Update task-node from corresponding task proxy object. Returns: - - (True/False, Dict/None) + (is_orphan, graph_children) Orphan tasks with no children return (True, None) respectively. @@ -927,6 +938,7 @@ def generate_ghost_task( if itask is None: itask = TaskProxy( + self.id_, self.schd.config.get_taskdef(name), point, flow_nums, @@ -1279,12 +1291,12 @@ def _apply_broadcasts_to_runtime(self, relative_id, rtconfig): poverride(rtconfig, overrides, prepend=True) return rtconfig - def insert_job(self, name, point_string, status, job_conf): + def insert_job(self, name, cycle_point, status, job_conf): """Insert job into data-store. Args: name (str): Corresponding task name. - point_string (str): Cycle point string + cycle_point (str|PointBase): Cycle point string job_conf (dic): Dictionary of job configuration used to generate the job script. @@ -1296,17 +1308,16 @@ def insert_job(self, name, point_string, status, job_conf): """ sub_num = job_conf['submit_num'] - tp_id, tproxy = self.store_node_fetcher(name, point_string) + tp_tokens = self.id_.duplicate( + cycle=str(cycle_point), + task=name, + ) + tp_id, tproxy = self.store_node_fetcher(tp_tokens) if not tproxy: return update_time = time() - tp_tokens = Tokens(tp_id) j_tokens = tp_tokens.duplicate(job=str(sub_num)) - j_id, job = self.store_node_fetcher( - j_tokens['task'], - j_tokens['cycle'], - j_tokens['job'], - ) + j_id, job = self.store_node_fetcher(j_tokens) if job: # Job already exists (i.e. post-submission submit failure) return @@ -1371,11 +1382,13 @@ def insert_db_job(self, row_idx, row): job_id, platform_name ) = row - - tp_id, tproxy = self.store_node_fetcher(name, point_string) + tp_tokens = self.id_.duplicate( + cycle=point_string, + task=name, + ) + tp_id, tproxy = self.store_node_fetcher(tp_tokens) if not tproxy: return - tp_tokens = Tokens(tp_id) j_tokens = tp_tokens.duplicate(job=str(submit_num)) j_id = j_tokens.id @@ -1866,7 +1879,7 @@ def delta_task_state(self, itask): objects from the workflow task pool. """ - tp_id, tproxy = self.store_node_fetcher(itask.tdef.name, itask.point) + tp_id, tproxy = self.store_node_fetcher(itask.tokens) if not tproxy: return update_time = time() @@ -1900,7 +1913,7 @@ def delta_task_state(self, itask): def delta_task_held( self, itask: Union[TaskProxy, Tuple[str, 'PointBase', bool]] - ): + ) -> None: """Create delta for change in task proxy held state. Args: @@ -1910,13 +1923,16 @@ def delta_task_held( """ if isinstance(itask, TaskProxy): - name = itask.tdef.name - cycle = itask.point + tokens = itask.tokens is_held = itask.state.is_held else: name, cycle, is_held = itask + tokens = self.id_.duplicate( + task=name, + cycle=str(cycle), + ) - tp_id, tproxy = self.store_node_fetcher(name, cycle) + tp_id, tproxy = self.store_node_fetcher(tokens) if not tproxy: return tp_delta = self.updated[TASK_PROXIES].setdefault( @@ -1926,7 +1942,7 @@ def delta_task_held( self.state_update_families.add(tproxy.first_parent) self.updates_pending = True - def delta_task_queued(self, itask): + def delta_task_queued(self, itask: TaskProxy) -> None: """Create delta for change in task proxy queued state. Args: @@ -1935,7 +1951,7 @@ def delta_task_queued(self, itask): objects from the workflow task pool. """ - tp_id, tproxy = self.store_node_fetcher(itask.tdef.name, itask.point) + tp_id, tproxy = self.store_node_fetcher(itask.tokens) if not tproxy: return tp_delta = self.updated[TASK_PROXIES].setdefault( @@ -1945,7 +1961,7 @@ def delta_task_queued(self, itask): self.state_update_families.add(tproxy.first_parent) self.updates_pending = True - def delta_task_runahead(self, itask): + def delta_task_runahead(self, itask: TaskProxy) -> None: """Create delta for change in task proxy runahead state. Args: @@ -1954,7 +1970,7 @@ def delta_task_runahead(self, itask): objects from the workflow task pool. """ - tp_id, tproxy = self.store_node_fetcher(itask.tdef.name, itask.point) + tp_id, tproxy = self.store_node_fetcher(itask.tokens) if not tproxy: return tp_delta = self.updated[TASK_PROXIES].setdefault( @@ -1964,7 +1980,11 @@ def delta_task_runahead(self, itask): self.state_update_families.add(tproxy.first_parent) self.updates_pending = True - def delta_task_output(self, itask, message): + def delta_task_output( + self, + itask: TaskProxy, + message: str, + ) -> None: """Create delta for change in task proxy output. Args: @@ -1973,7 +1993,7 @@ def delta_task_output(self, itask, message): objects from the workflow task pool. """ - tp_id, tproxy = self.store_node_fetcher(itask.tdef.name, itask.point) + tp_id, tproxy = self.store_node_fetcher(itask.tokens) if not tproxy: return item = itask.state.outputs.get_item(message) @@ -1992,7 +2012,7 @@ def delta_task_output(self, itask, message): output.time = update_time self.updates_pending = True - def delta_task_outputs(self, itask): + def delta_task_outputs(self, itask: TaskProxy) -> None: """Create delta for change in all task proxy outputs. Args: @@ -2001,7 +2021,7 @@ def delta_task_outputs(self, itask): objects from the workflow task pool. """ - tp_id, tproxy = self.store_node_fetcher(itask.tdef.name, itask.point) + tp_id, tproxy = self.store_node_fetcher(itask.tokens) if not tproxy: return update_time = time() @@ -2016,7 +2036,7 @@ def delta_task_outputs(self, itask): self.updates_pending = True - def delta_task_prerequisite(self, itask): + def delta_task_prerequisite(self, itask: TaskProxy) -> None: """Create delta for change in task proxy prerequisite. Args: @@ -2025,7 +2045,7 @@ def delta_task_prerequisite(self, itask): objects from the workflow task pool. """ - tp_id, tproxy = self.store_node_fetcher(itask.tdef.name, itask.point) + tp_id, tproxy = self.store_node_fetcher(itask.tokens) if not tproxy: return update_time = time() @@ -2043,7 +2063,11 @@ def delta_task_prerequisite(self, itask): tp_delta.prerequisites.extend(prereq_list) self.updates_pending = True - def delta_task_clock_trigger(self, itask, check_items): + def delta_task_clock_trigger( + self, + itask: TaskProxy, + check_items: Tuple, + ) -> None: """Create delta for change in task proxy prereqs. Args: @@ -2055,7 +2079,7 @@ def delta_task_clock_trigger(self, itask, check_items): task is ready to run. """ - tp_id, tproxy = self.store_node_fetcher(itask.tdef.name, itask.point) + tp_id, tproxy = self.store_node_fetcher(itask.tokens) if not tproxy: return if len(check_items) == 1: @@ -2063,8 +2087,8 @@ def delta_task_clock_trigger(self, itask, check_items): _, clock, _ = check_items # update task instance if ( - tproxy.HasField('clock_trigger') - and tproxy.clock_trigger.satisfied is not clock + tproxy.HasField('clock_trigger') + and tproxy.clock_trigger.satisfied is not clock ): update_time = time() tp_delta = self.updated[TASK_PROXIES].setdefault( @@ -2073,7 +2097,13 @@ def delta_task_clock_trigger(self, itask, check_items): tp_delta.clock_trigger.satisfied = clock self.updates_pending = True - def delta_task_ext_trigger(self, itask, trig, message, satisfied): + def delta_task_ext_trigger( + self, + itask: TaskProxy, + trig: str, + message: str, + satisfied: bool, + ) -> None: """Create delta for change in task proxy external_trigger. Args: @@ -2084,7 +2114,7 @@ def delta_task_ext_trigger(self, itask, trig, message, satisfied): message (str): Trigger message. """ - tp_id, tproxy = self.store_node_fetcher(itask.tdef.name, itask.point) + tp_id, tproxy = self.store_node_fetcher(itask.tokens) if not tproxy: return # update task instance @@ -2123,14 +2153,9 @@ def delta_task_xtrigger(self, sig, satisfied): # ----------- # Job Deltas # ----------- - def delta_job_msg(self, job_d, msg): + def delta_job_msg(self, tokens: Tokens, msg: str) -> None: """Add message to job.""" - tokens = Tokens(job_d, relative=True) - j_id, job = self.store_node_fetcher( - tokens['task'], - tokens['cycle'], - tokens['job'], - ) + j_id, job = self.store_node_fetcher(tokens) if not job: return j_delta = self.updated[JOBS].setdefault( @@ -2146,14 +2171,14 @@ def delta_job_msg(self, job_d, msg): j_delta.messages.append(msg) self.updates_pending = True - def delta_job_attr(self, job_d, attr_key, attr_val): + def delta_job_attr( + self, + tokens: Tokens, + attr_key: str, + attr_val: Any, + ) -> None: """Set job attribute.""" - tokens = Tokens(job_d, relative=True) - j_id, job = self.store_node_fetcher( - tokens['task'], - tokens['cycle'], - tokens['job'], - ) + j_id, job = self.store_node_fetcher(tokens) if not job: return j_delta = PbJob(stamp=f'{j_id}@{time()}') @@ -2164,14 +2189,13 @@ def delta_job_attr(self, job_d, attr_key, attr_val): ).MergeFrom(j_delta) self.updates_pending = True - def delta_job_state(self, job_d, status): + def delta_job_state( + self, + tokens: Tokens, + status: str, + ) -> None: """Set job state.""" - tokens = Tokens(job_d, relative=True) - j_id, job = self.store_node_fetcher( - tokens['task'], - tokens['cycle'], - tokens['job'], - ) + j_id, job = self.store_node_fetcher(tokens) if not job or status not in JOB_STATUS_SET: return j_delta = PbJob( @@ -2184,17 +2208,17 @@ def delta_job_state(self, job_d, status): ).MergeFrom(j_delta) self.updates_pending = True - def delta_job_time(self, job_d, event_key, time_str=None): + def delta_job_time( + self, + tokens: Tokens, + event_key: str, + time_str: Optional[str] = None, + ) -> None: """Set an event time in job pool object. Set values of both event_key + '_time' and event_key + '_time_string'. """ - tokens = Tokens(job_d, relative=True) - j_id, job = self.store_node_fetcher( - tokens['task'], - tokens['cycle'], - tokens['job'], - ) + j_id, job = self.store_node_fetcher(tokens) if not job: return j_delta = PbJob(stamp=f'{j_id}@{time()}') @@ -2206,24 +2230,13 @@ def delta_job_time(self, job_d, event_key, time_str=None): ).MergeFrom(j_delta) self.updates_pending = True - def store_node_fetcher( - self, name, point=None, sub_num=None, node_type=TASK_PROXIES): + def store_node_fetcher(self, tokens: Tokens) -> Tuple[str, Any]: """Check that task proxy is in or being added to the store""" - if point is None: - node_id = self.definition_id(name) - node_type = TASKS - elif sub_num is None: - node_id = self.id_.duplicate( - cycle=str(point), - task=name, - ).id - else: - node_id = self.id_.duplicate( - cycle=str(point), - task=name, - job=str(sub_num), - ).id - node_type = JOBS + node_type = { + 'task': TASK_PROXIES, + 'job': JOBS, + }[tokens.lowest_token] + node_id = tokens.id if node_id in self.added[node_type]: return (node_id, self.added[node_type][node_id]) elif node_id in self.data[self.workflow_id][node_type]: diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 9acac8bd28..4fa19c8187 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -273,10 +273,11 @@ def __init__(self, reg: str, options: Values) -> None: self.workflow_name = get_workflow_name_from_id(self.workflow) self.owner = get_user() self.host = get_host() - self.id = Tokens( + self.tokens = Tokens( user=self.owner, workflow=self.workflow, - ).id + ) + self.id = self.tokens.id self.uuid_str = str(uuid4()) self.options = options self.template_vars = load_template_vars( @@ -462,6 +463,7 @@ async def configure(self): get_workflow_test_log_path(self.workflow))) self.pool = TaskPool( + self.tokens, self.config, self.workflow_db_mgr, self.task_events_mgr, diff --git a/cylc/flow/scripts/validate.py b/cylc/flow/scripts/validate.py index 0db83d1bdf..2c1f7468af 100755 --- a/cylc/flow/scripts/validate.py +++ b/cylc/flow/scripts/validate.py @@ -39,6 +39,7 @@ TriggerExpressionError ) import cylc.flow.flags +from cylc.flow.id import Tokens from cylc.flow.id_cli import parse_id_async from cylc.flow.loggingutil import disable_timestamps from cylc.flow.option_parsers import ( @@ -167,7 +168,11 @@ async def wrapped_main( print('Instantiating tasks to check trigger expressions') for name, taskdef in cfg.taskdefs.items(): try: - itask = TaskProxy(taskdef, cfg.start_point) + itask = TaskProxy( + Tokens(workflow_id), + taskdef, + cfg.start_point, + ) except TaskProxySequenceBoundsError: # Should already failed above mesg = 'Task out of bounds for %s: %s\n' % (cfg.start_point, name) diff --git a/cylc/flow/task_events_mgr.py b/cylc/flow/task_events_mgr.py index ea2c7c1be7..5c5aaa6191 100644 --- a/cylc/flow/task_events_mgr.py +++ b/cylc/flow/task_events_mgr.py @@ -594,7 +594,7 @@ def process_message( else: new_msg = message self.data_store_mgr.delta_job_msg( - itask.tokens.duplicate(job=str(submit_num)).relative_id, + itask.tokens.duplicate(job=str(submit_num)), new_msg ) @@ -675,11 +675,11 @@ def process_message( # ... but either way update the job ID in the job proxy (it only # comes in via the submission message). if itask.tdef.run_mode != 'simulation': - job_d = itask.tokens.duplicate( + job_tokens = itask.tokens.duplicate( job=str(itask.submit_num) - ).relative_id + ) self.data_store_mgr.delta_job_attr( - job_d, 'job_id', itask.summary['submit_method_id']) + job_tokens, 'job_id', itask.summary['submit_method_id']) elif message.startswith(FAIL_MESSAGE_PREFIX): # Task received signal. @@ -1101,9 +1101,9 @@ def _process_message_failed(self, itask, event_time, message): if event_time is None: event_time = get_current_time_string() itask.set_summary_time('finished', event_time) - job_d = itask.tokens.duplicate(job=str(itask.submit_num)).relative_id - self.data_store_mgr.delta_job_time(job_d, 'finished', event_time) - self.data_store_mgr.delta_job_state(job_d, TASK_STATUS_FAILED) + job_tokens = itask.tokens.duplicate(job=str(itask.submit_num)) + self.data_store_mgr.delta_job_time(job_tokens, 'finished', event_time) + self.data_store_mgr.delta_job_state(job_tokens, TASK_STATUS_FAILED) self.workflow_db_mgr.put_update_task_jobs(itask, { "run_status": 1, "time_run_exit": event_time, @@ -1133,9 +1133,9 @@ def _process_message_started(self, itask, event_time): if itask.job_vacated: itask.job_vacated = False LOG.warning(f"[{itask}] Vacated job restarted") - job_d = itask.tokens.duplicate(job=str(itask.submit_num)).relative_id - self.data_store_mgr.delta_job_time(job_d, 'started', event_time) - self.data_store_mgr.delta_job_state(job_d, TASK_STATUS_RUNNING) + job_tokens = itask.tokens.duplicate(job=str(itask.submit_num)) + self.data_store_mgr.delta_job_time(job_tokens, 'started', event_time) + self.data_store_mgr.delta_job_state(job_tokens, TASK_STATUS_RUNNING) itask.set_summary_time('started', event_time) self.workflow_db_mgr.put_update_task_jobs(itask, { "time_run": itask.summary['started_time_string']}) @@ -1151,9 +1151,10 @@ def _process_message_started(self, itask, event_time): def _process_message_succeeded(self, itask, event_time): """Helper for process_message, handle a succeeded message.""" - job_d = itask.tokens.duplicate(job=str(itask.submit_num)).relative_id - self.data_store_mgr.delta_job_time(job_d, 'finished', event_time) - self.data_store_mgr.delta_job_state(job_d, TASK_STATUS_SUCCEEDED) + + job_tokens = itask.tokens.duplicate(job=str(itask.submit_num)) + self.data_store_mgr.delta_job_time(job_tokens, 'finished', event_time) + self.data_store_mgr.delta_job_state(job_tokens, TASK_STATUS_SUCCEEDED) itask.set_summary_time('finished', event_time) self.workflow_db_mgr.put_update_task_jobs(itask, { "run_status": 0, @@ -1206,9 +1207,12 @@ def _process_message_submit_failed(self, itask, event_time, submit_num): self.setup_event_handlers(itask, self.EVENT_SUBMIT_RETRY, msg) # Register newly submit-failed job with the database and datastore. - job_d = itask.tokens.duplicate(job=str(itask.submit_num)).relative_id + job_tokens = itask.tokens.duplicate(job=str(itask.submit_num)) self._insert_task_job(itask, event_time, self.JOB_SUBMIT_FAIL_FLAG) - self.data_store_mgr.delta_job_state(job_d, TASK_STATUS_SUBMIT_FAILED) + self.data_store_mgr.delta_job_state( + job_tokens, + TASK_STATUS_SUBMIT_FAILED + ) self._reset_job_timers(itask) @@ -1257,13 +1261,24 @@ def _process_message_submitted( # Register the newly submitted job with the database and datastore. # Do after itask has changed state self._insert_task_job(itask, event_time, self.JOB_SUBMIT_SUCCESS_FLAG) - job_d = itask.tokens.duplicate(job=str(itask.submit_num)).relative_id - self.data_store_mgr.delta_job_time(job_d, 'submitted', event_time) + job_tokens = itask.tokens.duplicate(job=str(itask.submit_num)) + self.data_store_mgr.delta_job_time( + job_tokens, + 'submitted', + event_time, + ) if itask.tdef.run_mode == 'simulation': # Simulate job started as well. - self.data_store_mgr.delta_job_time(job_d, 'started', event_time) + self.data_store_mgr.delta_job_time( + job_tokens, + 'started', + event_time, + ) else: - self.data_store_mgr.delta_job_state(job_d, TASK_STATUS_SUBMITTED) + self.data_store_mgr.delta_job_state( + job_tokens, + TASK_STATUS_SUBMITTED, + ) def _insert_task_job( self, diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py index da995bac84..b3954a30a1 100644 --- a/cylc/flow/task_job_mgr.py +++ b/cylc/flow/task_job_mgr.py @@ -370,7 +370,7 @@ def submit_task_jobs(self, workflow, itasks, curve_auth, self.data_store_mgr.delta_job_msg( itask.tokens.duplicate( job=str(itask.submit_num) - ).relative_id, + ), self.REMOTE_INIT_MSG, ) continue @@ -399,7 +399,7 @@ def submit_task_jobs(self, workflow, itasks, curve_auth, self.data_store_mgr.delta_job_msg( itask.tokens.duplicate( job=str(itask.submit_num) - ).relative_id, + ), self.REMOTE_INIT_MSG ) continue @@ -423,7 +423,7 @@ def submit_task_jobs(self, workflow, itasks, curve_auth, self.data_store_mgr.delta_job_msg( itask.tokens.duplicate( job=str(itask.submit_num) - ).relative_id, + ), self.REMOTE_INIT_MSG, ) continue @@ -457,7 +457,7 @@ def submit_task_jobs(self, workflow, itasks, curve_auth, self.data_store_mgr.delta_job_msg( itask.tokens.duplicate( job=str(itask.submit_num) - ).relative_id, + ), REMOTE_FILE_INSTALL_IN_PROGRESS ) continue @@ -706,7 +706,7 @@ def _kill_task_job_callback(self, workflow, itask, cmd_ctx, line): self.data_store_mgr.delta_job_msg( itask.tokens.duplicate( job=str(itask.submit_num) - ).relative_id, + ), log_msg ) LOG.log(log_lvl, f"[{itask}] {log_msg}") @@ -811,17 +811,17 @@ def _poll_task_job_callback(self, workflow, itask, cmd_ctx, line): ctx.out = line ctx.ret_code = 0 # See cylc.flow.job_runner_mgr.JobPollContext - job_d = itask.tokens.duplicate(job=str(itask.submit_num)).relative_id + job_tokens = itask.tokens.duplicate(job=str(itask.submit_num)) try: job_log_dir, context = line.split('|')[1:3] items = json.loads(context) jp_ctx = JobPollContext(job_log_dir, **items) except TypeError: - self.data_store_mgr.delta_job_msg(job_d, self.POLL_FAIL) + self.data_store_mgr.delta_job_msg(job_tokens, self.POLL_FAIL) ctx.cmd = cmd_ctx.cmd # print original command on failure return except ValueError: - self.data_store_mgr.delta_job_msg(job_d, self.POLL_FAIL) + self.data_store_mgr.delta_job_msg(job_tokens, self.POLL_FAIL) ctx.cmd = cmd_ctx.cmd # print original command on failure return finally: diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 700727ba89..1dd0d5e6c1 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -87,13 +87,14 @@ class TaskPool: def __init__( self, + tokens: 'Tokens', config: 'WorkflowConfig', workflow_db_mgr: 'WorkflowDatabaseManager', task_events_mgr: 'TaskEventsManager', data_store_mgr: 'DataStoreMgr', flow_mgr: 'FlowMgr' ) -> None: - + self.tokens = tokens self.config: 'WorkflowConfig' = config self.stop_point = config.stop_point or config.final_point self.workflow_db_mgr: 'WorkflowDatabaseManager' = workflow_db_mgr @@ -437,6 +438,7 @@ def load_db_task_pool_for_restart(self, row_idx, row): outputs_str) = row try: itask = TaskProxy( + self.tokens, self.config.get_taskdef(name), get_point(cycle), deserialise(flow_nums), @@ -906,8 +908,12 @@ def reload_taskdefs(self) -> None: ) else: new_task = TaskProxy( + self.tokens, self.config.get_taskdef(itask.tdef.name), - itask.point, itask.flow_nums, itask.state.status) + itask.point, + itask.flow_nums, + itask.state.status, + ) itask.copy_to_reload_successor(new_task) self._swap_out(new_task) LOG.info(f"[{itask}] reloaded task definition") @@ -1457,12 +1463,13 @@ def spawn_task( return None itask = TaskProxy( + self.tokens, taskdef, point, flow_nums, submit_num=submit_num, is_manual_submit=is_manual_submit, - flow_wait=flow_wait + flow_wait=flow_wait, ) if (name, point) in self.tasks_to_hold: LOG.info(f"[{itask}] holding (as requested earlier)") @@ -1538,7 +1545,12 @@ def force_spawn_children( n_warnings, task_items = self.match_taskdefs(items) for (_, point), taskdef in sorted(task_items.items()): # This the parent task: - itask = TaskProxy(taskdef, point, flow_nums=flow_nums) + itask = TaskProxy( + self.tokens, + taskdef, + point, + flow_nums=flow_nums, + ) # Spawn children of selected outputs. for trig, out, _ in itask.state.outputs.get_all(): if trig in outputs: diff --git a/cylc/flow/task_proxy.py b/cylc/flow/task_proxy.py index 696794f44d..0136728e1b 100644 --- a/cylc/flow/task_proxy.py +++ b/cylc/flow/task_proxy.py @@ -185,6 +185,7 @@ class TaskProxy: def __init__( self, + scheduler_tokens: 'Tokens', tdef: 'TaskDef', start_point: 'PointBase', flow_nums: Optional[Set[int]] = None, @@ -209,8 +210,7 @@ def __init__( self.flow_nums = copy(flow_nums) self.flow_wait = flow_wait self.point = start_point - self.tokens = Tokens( - # TODO: make these absolute? + self.tokens = scheduler_tokens.duplicate( cycle=str(self.point), task=self.tdef.name, ) diff --git a/mypy.ini b/mypy.ini index 1bff04085a..4c35cf1d53 100644 --- a/mypy.ini +++ b/mypy.ini @@ -14,4 +14,8 @@ strict_equality = True show_error_codes = True # Not yet mypy compliant. -exclude= cylc/flow/etc/tutorial/.* \ No newline at end of file +exclude= cylc/flow/etc/tutorial/.* + +# Suppress the following messages: +# By default the bodies of untyped functions are not checked, consider using --check-untyped-defs +disable_error_code = annotation-unchecked diff --git a/tests/integration/test_data_store_mgr.py b/tests/integration/test_data_store_mgr.py index 68e2d8a10d..e25330562c 100644 --- a/tests/integration/test_data_store_mgr.py +++ b/tests/integration/test_data_store_mgr.py @@ -24,6 +24,7 @@ TASK_PROXIES, WORKFLOW ) +from cylc.flow.id import Tokens from cylc.flow.task_state import ( TASK_STATUS_FAILED, TASK_STATUS_SUCCEEDED, @@ -226,10 +227,10 @@ def test_delta_job_msg(harness): """Test method adding messages to job element.""" schd, data = harness j_id = ext_id(schd) - job_d = int_id(schd) + tokens = Tokens(j_id) # First update creation assert schd.data_store_mgr.updated[JOBS].get('j_id') is None - schd.data_store_mgr.delta_job_msg(job_d, 'The Atomic Age') + schd.data_store_mgr.delta_job_msg(tokens, 'The Atomic Age') assert schd.data_store_mgr.updated[JOBS][j_id].messages @@ -237,7 +238,7 @@ def test_delta_job_attr(harness): """Test method modifying job fields to job element.""" schd, data = harness schd.data_store_mgr.delta_job_attr( - int_id(schd), 'job_runner_name', 'at') + Tokens(ext_id(schd)), 'job_runner_name', 'at') assert schd.data_store_mgr.updated[JOBS][ext_id(schd)].messages != ( schd.data_store_mgr.added[JOBS][ext_id(schd)].job_runner_name ) @@ -248,7 +249,7 @@ def test_delta_job_time(harness): schd, data = harness event_time = get_current_time_string() schd.data_store_mgr.delta_job_time( - int_id(schd), 'submitted', event_time) + Tokens(ext_id(schd)), 'submitted', event_time) job_updated = schd.data_store_mgr.updated[JOBS][ext_id(schd)] with pytest.raises(ValueError): job_updated.HasField('jumped_time') diff --git a/tests/unit/test_id_match.py b/tests/unit/test_id_match.py index 33fcf87e3c..d26e85b092 100644 --- a/tests/unit/test_id_match.py +++ b/tests/unit/test_id_match.py @@ -44,6 +44,7 @@ def _task_proxy(id_, hier): tdef = create_autospec(TaskDef, namespace_hierarchy=hier) tdef.name = tokens['task'] return TaskProxy( + Tokens('~user/workflow'), tdef, start_point=IntegerPoint(tokens['cycle']), status=tokens['task_sel'], diff --git a/tests/unit/test_xtrigger_mgr.py b/tests/unit/test_xtrigger_mgr.py index ec547364f0..89cc0024dd 100644 --- a/tests/unit/test_xtrigger_mgr.py +++ b/tests/unit/test_xtrigger_mgr.py @@ -20,6 +20,7 @@ from cylc.flow import CYLC_LOG from cylc.flow.cycling.iso8601 import ISO8601Point, ISO8601Sequence, init from cylc.flow.exceptions import XtriggerConfigError +from cylc.flow.id import Tokens from cylc.flow.subprocctx import SubFuncContext from cylc.flow.task_proxy import TaskProxy from cylc.flow.taskdef import TaskDef @@ -158,7 +159,7 @@ def test_housekeeping_with_xtrigger_satisfied(xtrigger_mgr): sequence = ISO8601Sequence('P1D', '2019') tdef.xtrig_labels[sequence] = ["get_name"] start_point = ISO8601Point('2019') - itask = TaskProxy(tdef, start_point) + itask = TaskProxy(Tokens('~user/workflow'), tdef, start_point) # pretend the function has been activated xtrigger_mgr.active.append(xtrig.get_signature()) xtrigger_mgr.callback(xtrig) @@ -205,7 +206,7 @@ def test__call_xtriggers_async(xtrigger_mgr): init() start_point = ISO8601Point('2019') # create task proxy - itask = TaskProxy(tdef, start_point) + itask = TaskProxy(Tokens('~user/workflow'), tdef, start_point) # we start with no satisfied xtriggers, and nothing active assert len(xtrigger_mgr.sat_xtrig) == 0 @@ -306,7 +307,7 @@ def test_check_xtriggers(xtrigger_mgr): sequence = ISO8601Sequence('P1D', '2019') tdef1.xtrig_labels[sequence] = ["get_name"] start_point = ISO8601Point('2019') - itask1 = TaskProxy(tdef1, start_point) + itask1 = TaskProxy(Tokens('~user/workflow'), tdef1, start_point) itask1.state.xtriggers["get_name"] = False # satisfied? # add a clock xtrigger @@ -330,7 +331,7 @@ def test_check_xtriggers(xtrigger_mgr): init() start_point = ISO8601Point('20000101T0000+05') # create task proxy - TaskProxy(tdef2, start_point) + TaskProxy(Tokens('~user/workflow'), tdef2, start_point) xtrigger_mgr.check_xtriggers(itask1, lambda foo: None) # won't be satisfied, as it is async, we are are not calling callback