Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🤖 Merge 8.2.x-sync into master #6040

Merged
merged 3 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 7 additions & 17 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
WorkflowFiles,
check_deprecation,
)
from cylc.flow.workflow_status import RunMode
from cylc.flow.xtrigger_mgr import XtriggerManager

if TYPE_CHECKING:
Expand Down Expand Up @@ -521,9 +522,9 @@ def __init__(

self.process_runahead_limit()

if self.run_mode('simulation', 'dummy'):
configure_sim_modes(
self.taskdefs.values(), self.run_mode())
run_mode = self.run_mode()
if run_mode in {RunMode.SIMULATION, RunMode.DUMMY}:
configure_sim_modes(self.taskdefs.values(), run_mode)

self.configure_workflow_state_polling_tasks()

Expand Down Expand Up @@ -1494,20 +1495,9 @@ def process_config_env(self):
os.environ['PATH'] = os.pathsep.join([
os.path.join(self.fdir, 'bin'), os.environ['PATH']])

def run_mode(self, *reqmodes):
"""Return the run mode.

Combine command line option with configuration setting.
If "reqmodes" is specified, return the boolean (mode in reqmodes).
Otherwise, return the mode as a str.
"""
mode = getattr(self.options, 'run_mode', None)
if not mode:
mode = 'live'
if reqmodes:
return mode in reqmodes
else:
return mode
def run_mode(self) -> str:
"""Return the run mode."""
return RunMode.get(self.options)

def _check_task_event_handlers(self):
"""Check custom event handler templates can be expanded.
Expand Down
13 changes: 0 additions & 13 deletions cylc/flow/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -617,19 +617,6 @@ def select_workflow_params_restart_count(self):
result = self.connect().execute(stmt).fetchone()
return int(result[0]) if result else 0

def select_workflow_params_run_mode(self):
"""Return original run_mode for workflow_params."""
stmt = rf"""
SELECT
value
FROM
{self.TABLE_WORKFLOW_PARAMS}
WHERE
key == 'run_mode'
""" # nosec (table name is code constant)
result = self.connect().execute(stmt).fetchone()
return result[0] if result else None

def select_workflow_template_vars(self, callback):
"""Select from workflow_template_vars.

Expand Down
53 changes: 22 additions & 31 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@
from cylc.flow.templatevars import eval_var
from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager
from cylc.flow.workflow_events import WorkflowEventHandler
from cylc.flow.workflow_status import StopMode, AutoRestartMode
from cylc.flow.workflow_status import RunMode, StopMode, AutoRestartMode
from cylc.flow import workflow_files
from cylc.flow.taskdef import TaskDef
from cylc.flow.task_events_mgr import TaskEventsManager
Expand Down Expand Up @@ -425,7 +425,14 @@ async def configure(self, params):
self._check_startup_opts()

if self.is_restart:
run_mode = self.get_run_mode()
self._set_workflow_params(params)
# Prevent changing run mode on restart:
og_run_mode = self.get_run_mode()
if run_mode != og_run_mode:
raise InputError(
f'This workflow was originally run in {og_run_mode} mode:'
f' Will not restart in {run_mode} mode.')

self.profiler.log_memory("scheduler.py: before load_flow_file")
try:
Expand All @@ -435,18 +442,6 @@ async def configure(self, params):
# Mark this exc as expected (see docstring for .schd_expected):
exc.schd_expected = True
raise exc

# Prevent changing mode on restart.
if self.is_restart:
# check run mode against db
og_run_mode = self.workflow_db_mgr.get_pri_dao(
).select_workflow_params_run_mode() or 'live'
run_mode = self.config.run_mode()
if run_mode != og_run_mode:
raise InputError(
f'This workflow was originally run in {og_run_mode} mode:'
f' Will not restart in {run_mode} mode.')

self.profiler.log_memory("scheduler.py: after load_flow_file")

self.workflow_db_mgr.on_workflow_start(self.is_restart)
Expand Down Expand Up @@ -605,7 +600,7 @@ def log_start(self) -> None:
# Note that the following lines must be present at the top of
# the workflow log file for use in reference test runs.
LOG.info(
f'Run mode: {self.config.run_mode()}',
f'Run mode: {self.get_run_mode()}',
extra=RotatingLogFileHandler.header_extra
)
LOG.info(
Expand Down Expand Up @@ -1053,7 +1048,7 @@ def command_resume(self) -> None:

def command_poll_tasks(self, tasks: Iterable[str]) -> int:
"""Poll pollable tasks or a task or family if options are provided."""
if self.config.run_mode('simulation'):
if self.get_run_mode() == RunMode.SIMULATION:
return 0
itasks, _, bad_items = self.pool.filter_task_proxies(tasks)
self.task_job_mgr.poll_task_jobs(self.workflow, itasks)
Expand All @@ -1062,7 +1057,7 @@ def command_poll_tasks(self, tasks: Iterable[str]) -> int:
def command_kill_tasks(self, tasks: Iterable[str]) -> int:
"""Kill all tasks or a task/family if options are provided."""
itasks, _, bad_items = self.pool.filter_task_proxies(tasks)
if self.config.run_mode('simulation'):
if self.get_run_mode() == RunMode.SIMULATION:
for itask in itasks:
if itask.state(*TASK_STATUSES_ACTIVE):
itask.state_reset(TASK_STATUS_FAILED)
Expand Down Expand Up @@ -1360,6 +1355,9 @@ def _set_workflow_params(
"""
LOG.info('LOADING workflow parameters')
for key, value in params:
if key == self.workflow_db_mgr.KEY_RUN_MODE:
self.options.run_mode = value or RunMode.LIVE
LOG.info(f"+ run mode = {value}")
if value is None:
continue
if key in self.workflow_db_mgr.KEY_INITIAL_CYCLE_POINT_COMPATS:
Expand All @@ -1380,12 +1378,6 @@ def _set_workflow_params(
elif self.options.stopcp is None:
self.options.stopcp = value
LOG.info(f"+ stop point = {value}")
elif (
key == self.workflow_db_mgr.KEY_RUN_MODE
and self.options.run_mode is None
):
self.options.run_mode = value
LOG.info(f"+ run mode = {value}")
elif key == self.workflow_db_mgr.KEY_UUID_STR:
self.uuid_str = value
LOG.info(f"+ workflow UUID = {value}")
Expand Down Expand Up @@ -1431,12 +1423,8 @@ def run_event_handlers(self, event, reason=""):

Run workflow events in simulation and dummy mode ONLY if enabled.
"""
conf = self.config
with suppress(KeyError):
if (
conf.run_mode('simulation', 'dummy')
):
return
if self.get_run_mode() in {RunMode.SIMULATION, RunMode.DUMMY}:
return
self.workflow_event_handler.handle(self, event, str(reason))

def release_queued_tasks(self) -> bool:
Expand Down Expand Up @@ -1509,7 +1497,7 @@ def release_queued_tasks(self) -> bool:
pre_prep_tasks,
self.server.curve_auth,
self.server.client_pub_key_dir,
is_simulation=self.config.run_mode('simulation')
is_simulation=(self.get_run_mode() == RunMode.SIMULATION)
):
if itask.flow_nums:
flow = ','.join(str(i) for i in itask.flow_nums)
Expand Down Expand Up @@ -1560,7 +1548,7 @@ def timeout_check(self):
"""Check workflow and task timers."""
self.check_workflow_timers()
# check submission and execution timeout and polling timers
if not self.config.run_mode('simulation'):
if self.get_run_mode() != RunMode.SIMULATION:
self.task_job_mgr.check_task_jobs(self.workflow, self.pool)

async def workflow_shutdown(self):
Expand Down Expand Up @@ -1759,7 +1747,7 @@ async def _main_loop(self) -> None:
self.release_queued_tasks()

if (
self.pool.config.run_mode('simulation')
self.get_run_mode() == RunMode.SIMULATION
and sim_time_check(
self.task_events_mgr,
self.pool.get_tasks(),
Expand Down Expand Up @@ -2252,6 +2240,9 @@ def _check_startup_opts(self) -> None:
f"option --{opt}=reload is only valid for restart"
)

def get_run_mode(self) -> str:
return RunMode.get(self.options)

async def handle_exception(self, exc: BaseException) -> NoReturn:
"""Gracefully shut down the scheduler given a caught exception.

Expand Down
3 changes: 2 additions & 1 deletion cylc/flow/scheduler_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
is_terminal,
prompt,
)
from cylc.flow.workflow_status import RunMode

if TYPE_CHECKING:
from optparse import Values
Expand Down Expand Up @@ -130,7 +131,7 @@
["-m", "--mode"],
help="Run mode: live, dummy, simulation (default live).",
metavar="STRING", action='store', dest="run_mode",
choices=['live', 'dummy', 'simulation'],
choices=[RunMode.LIVE, RunMode.DUMMY, RunMode.SIMULATION],
)

PLAY_RUN_MODE = deepcopy(RUN_MODE)
Expand Down
3 changes: 2 additions & 1 deletion cylc/flow/scripts/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
from cylc.flow.templatevars import get_template_vars
from cylc.flow.terminal import cli_function
from cylc.flow.scheduler_cli import RUN_MODE
from cylc.flow.workflow_status import RunMode

if TYPE_CHECKING:
from cylc.flow.option_parsers import Values
Expand Down Expand Up @@ -126,7 +127,7 @@ def get_option_parser():
{
'check_circular': False,
'profile_mode': False,
'run_mode': 'live'
'run_mode': RunMode.LIVE
}
)

Expand Down
6 changes: 4 additions & 2 deletions cylc/flow/simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
from time import time

from metomi.isodatetime.parsers import DurationParser

from cylc.flow import LOG
from cylc.flow.cycling.loader import get_point
from cylc.flow.exceptions import PointParsingError
Expand All @@ -30,8 +32,8 @@
TASK_STATUS_SUCCEEDED,
)
from cylc.flow.wallclock import get_unix_time_from_time_string
from cylc.flow.workflow_status import RunMode

from metomi.isodatetime.parsers import DurationParser

if TYPE_CHECKING:
from cylc.flow.task_events_mgr import TaskEventsManager
Expand Down Expand Up @@ -134,7 +136,7 @@ def configure_sim_modes(taskdefs, sim_mode):
"""Adjust task defs for simulation and dummy mode.

"""
dummy_mode = bool(sim_mode == 'dummy')
dummy_mode = (sim_mode == RunMode.DUMMY)

for tdef in taskdefs:
# Compute simulated run time by scaling the execution limit.
Expand Down
14 changes: 8 additions & 6 deletions cylc/flow/task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
get_template_variables as get_workflow_template_variables,
process_mail_footer,
)
from cylc.flow.workflow_status import RunMode


if TYPE_CHECKING:
Expand Down Expand Up @@ -769,7 +770,7 @@ def process_message(

# ... but either way update the job ID in the job proxy (it only
# comes in via the submission message).
if itask.tdef.run_mode != 'simulation':
if itask.tdef.run_mode != RunMode.SIMULATION:
job_tokens = itask.tokens.duplicate(
job=str(itask.submit_num)
)
Expand Down Expand Up @@ -887,7 +888,8 @@ def _process_message_check(

if (
itask.state(TASK_STATUS_WAITING)
and itask.tdef.run_mode == 'live' # Polling in live mode only.
# Polling in live mode only:
and itask.tdef.run_mode == RunMode.LIVE
and (
(
# task has a submit-retry lined up
Expand Down Expand Up @@ -932,7 +934,7 @@ def _process_message_check(

def setup_event_handlers(self, itask, event, message):
"""Set up handlers for a task event."""
if itask.tdef.run_mode != 'live':
if itask.tdef.run_mode != RunMode.LIVE:
return
msg = ""
if message != f"job {event}":
Expand Down Expand Up @@ -1457,7 +1459,7 @@ def _process_message_submitted(
)

itask.set_summary_time('submitted', event_time)
if itask.tdef.run_mode == 'simulation':
if itask.tdef.run_mode == RunMode.SIMULATION:
# Simulate job started as well.
itask.set_summary_time('started', event_time)
if itask.state_reset(TASK_STATUS_RUNNING, forced=forced):
Expand Down Expand Up @@ -1494,7 +1496,7 @@ def _process_message_submitted(
'submitted',
event_time,
)
if itask.tdef.run_mode == 'simulation':
if itask.tdef.run_mode == RunMode.SIMULATION:
# Simulate job started as well.
self.data_store_mgr.delta_job_time(
job_tokens,
Expand Down Expand Up @@ -1527,7 +1529,7 @@ def _insert_task_job(
# not see previous submissions (so can't use itask.jobs[submit_num-1]).
# And transient tasks, used for setting outputs and spawning children,
# do not submit jobs.
if itask.tdef.run_mode == "simulation" or forced:
if (itask.tdef.run_mode == RunMode.SIMULATION) or forced:
job_conf = {"submit_num": 0}
else:
job_conf = itask.jobs[-1]
Expand Down
19 changes: 19 additions & 0 deletions cylc/flow/workflow_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from cylc.flow.wallclock import get_time_string_from_unix_time as time2str

if TYPE_CHECKING:
from optparse import Values
from cylc.flow.scheduler import Scheduler

# Keys for identify API call
Expand Down Expand Up @@ -198,3 +199,21 @@ def get_workflow_status(schd: 'Scheduler') -> Tuple[str, str]:
status_msg = 'running'

return (status.value, status_msg)


class RunMode:
"""The possible run modes of a workflow."""

LIVE = 'live'
"""Workflow will run normally."""

SIMULATION = 'simulation'
"""Workflow will run in simulation mode."""

DUMMY = 'dummy'
"""Workflow will run in dummy mode."""

@staticmethod
def get(options: 'Values') -> str:
"""Return the run mode from the options."""
return getattr(options, 'run_mode', None) or RunMode.LIVE
Loading
Loading