Skip to content

Commit

Permalink
Implement scheduler command method arg validation.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Apr 12, 2024
1 parent 4b70574 commit a6269bc
Show file tree
Hide file tree
Showing 9 changed files with 352 additions and 355 deletions.
259 changes: 259 additions & 0 deletions cylc/flow/command_validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Cylc command argument validation logic."""


from typing import (
Callable,
List,
Optional,
)

from cylc.flow.exceptions import InputError
from cylc.flow.id import Tokens
from cylc.flow.task_outputs import TASK_OUTPUT_SUCCEEDED
from cylc.flow.flow_mgr import FLOW_ALL, FLOW_NEW, FLOW_NONE


ERR_OPT_FLOW_VAL = "Flow values must be an integer, or 'all', 'new', or 'none'"
ERR_OPT_FLOW_INT = "Multiple flow options must all be integer valued"
ERR_OPT_FLOW_WAIT = (
f"--wait is not compatible with --flow={FLOW_NEW} or --flow={FLOW_NONE}"
)


def validate(func: Callable):
"""Decorate scheduler commands with a callable .validate attribute.
"""
# TODO: properly handle "Callable has no attribute validate"?
func.validate = globals()[ # type: ignore
func.__name__.replace("command", "validate")
]
return func


def validate_flow_opts(flows: List[str], flow_wait: bool) -> None:
"""Check validity of flow-related CLI options.
Note the schema defaults flows to ["all"].
Examples:
>>> validate_flow_opts(["new"], False)
>>> validate_flow_opts(["1", "2"], False)
>>> validate_flow_opts(["none", "1"], False)
Traceback (most recent call last):
cylc.flow.exceptions.InputError: ... must all be integer valued
>>> validate_flow_opts(None, True)
>>> validate_flow_opts(["cheese", "2"], True)
Traceback (most recent call last):
cylc.flow.exceptions.InputError: ... or 'all', 'new', or 'none'
>>> validate_flow_opts(["1", "2"], True)
>>> validate_flow_opts(["new"], True)
Traceback (most recent call last):
cylc.flow.exceptions.InputError: ...
"""
for val in flows:
val = val.strip()
if val in [FLOW_NONE, FLOW_NEW, FLOW_ALL]:
if len(flows) != 1:
raise InputError(ERR_OPT_FLOW_INT)
else:
try:
int(val)
except ValueError:
raise InputError(ERR_OPT_FLOW_VAL.format(val))

if flow_wait and flows[0] in [FLOW_NEW, FLOW_NONE]:
raise InputError(ERR_OPT_FLOW_WAIT)


def validate_prereqs(prereqs: Optional[List[str]]):
"""Validate a list of prerequisites, add implicit ":succeeded".
Comma-separated lists should be split already, client-side.
Examples:
# Set multiple at once:
>>> validate_prereqs(['1/foo:bar', '2/foo:baz'])
['1/foo:bar', '2/foo:baz']
# --pre=all
>>> validate_prereqs(["all"])
['all']
# implicit ":succeeded"
>>> validate_prereqs(["1/foo"])
['1/foo:succeeded']
# Error: invalid format:
>>> validate_prereqs(["fish"])
Traceback (most recent call last):
cylc.flow.exceptions.InputError: ...
# Error: invalid format:
>>> validate_prereqs(["1/foo::bar"])
Traceback (most recent call last):
cylc.flow.exceptions.InputError: ...
# Error: "all" must be used alone:
>>> validate_prereqs(["all", "2/foo:baz"])
Traceback (most recent call last):
cylc.flow.exceptions.InputError: ...
"""
if prereqs is None:
return []

prereqs2 = []
bad: List[str] = []
for pre in prereqs:
p = validate_prereq(pre)
if p is not None:
prereqs2.append(p)
else:
bad.append(pre)
if bad:
raise InputError(
"Use prerequisite format <cycle-point>/<task>:output\n"
"\n ".join(bad)
)

if len(prereqs2) > 1: # noqa SIM102 (anticipates "cylc set --pre=cycle")
if "all" in prereqs:
raise InputError("--pre=all must be used alone")

return prereqs2


def validate_prereq(prereq: str) -> Optional[str]:
"""Return prereq (with :succeeded) if valid, else None.
Format: cycle/task[:output]
Examples:
>>> validate_prereq('1/foo:succeeded')
'1/foo:succeeded'
>>> validate_prereq('1/foo')
'1/foo:succeeded'
>>> validate_prereq('all')
'all'
# Error:
>>> validate_prereq('fish')
"""
try:
tokens = Tokens(prereq, relative=True)
except ValueError:
return None
if (
tokens["cycle"] == prereq
and prereq != "all"
):
# Error: --pre=<word> other than "all"
return None

if prereq != "all" and tokens["task_sel"] is None:
prereq += f":{TASK_OUTPUT_SUCCEEDED}"

return prereq


def validate_outputs(outputs: Optional[List[str]]):
"""Validate outputs.
Comma-separated lists should be split already, client-side.
Examples:
Good:
>>> validate_outputs(['a', 'b'])
['a', 'b']
>>> validate_outputs(["required"]) # "required" is explicit default
[]
Bad:
>>> validate_outputs(["required", "a"])
Traceback (most recent call last):
cylc.flow.exceptions.InputError: --out=required must be used alone
>>> validate_outputs(["waiting"])
Traceback (most recent call last):
cylc.flow.exceptions.InputError: Tasks cannot be set to waiting...
"""
# If "required" is explicit just ditch it (same as the default)
if not outputs or outputs == ["required"]:
return []

if "required" in outputs:
raise InputError("--out=required must be used alone")

if "waiting" in outputs:
raise InputError(
"Tasks cannot be set to waiting. Use trigger to re-run tasks."
)

return outputs


def validate_consistency(
outputs: Optional[List[str]],
prereqs: Optional[List[str]]
) -> None:
"""Check global option consistency
Examples:
>>> validate_consistency(["a"], None) # OK
>>> validate_consistency(None, ["1/a:failed"]) #OK
>>> validate_consistency(["a"], ["1/a:failed"])
Traceback (most recent call last):
cylc.flow.exceptions.InputError: ...
"""
if outputs and prereqs:
raise InputError("Use --prerequisite or --output, not both.")


def validate_set(
tasks: List[str],
flow: List[str],
outputs: Optional[List[str]] = None,
prerequisites: Optional[List[str]] = None,
flow_wait: bool = False,
flow_descr: Optional[str] = None
) -> None:
"""Validate args of the scheduler "command_set" method.
Raise InputError if validation fails.
"""
validate_consistency(outputs, prerequisites)
outputs = validate_outputs(outputs)
prerequisites = validate_prereqs(prerequisites)
validate_flow_opts(flow, flow_wait)
29 changes: 0 additions & 29 deletions cylc/flow/flow_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import datetime

from cylc.flow import LOG
from cylc.flow.exceptions import InputError


if TYPE_CHECKING:
Expand All @@ -32,13 +31,6 @@
FLOW_NEW = "new"
FLOW_NONE = "none"

# For flow-related CLI options:
ERR_OPT_FLOW_VAL = "Flow values must be an integer, or 'all', 'new', or 'none'"
ERR_OPT_FLOW_INT = "Multiple flow options must all be integer valued"
ERR_OPT_FLOW_WAIT = (
f"--wait is not compatible with --flow={FLOW_NEW} or --flow={FLOW_NONE}"
)


def add_flow_opts(parser):
parser.add_option(
Expand All @@ -63,27 +55,6 @@ def add_flow_opts(parser):
)


def validate_flow_opts(options):
"""Check validity of flow-related CLI options."""
if options.flow is None:
# Default to all active flows
options.flow = [FLOW_ALL]

for val in options.flow:
val = val.strip()
if val in [FLOW_NONE, FLOW_NEW, FLOW_ALL]:
if len(options.flow) != 1:
raise InputError(ERR_OPT_FLOW_INT)
else:
try:
int(val)
except ValueError:
raise InputError(ERR_OPT_FLOW_VAL.format(val))

if options.flow_wait and options.flow[0] in [FLOW_NEW, FLOW_NONE]:
raise InputError(ERR_OPT_FLOW_WAIT)


def stringify_flow_nums(flow_nums: Set[int], full: bool = False) -> str:
"""Return a string representation of a set of flow numbers
Expand Down
26 changes: 25 additions & 1 deletion cylc/flow/network/multi.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,30 @@

from cylc.flow.async_util import unordered_map
from cylc.flow.id_cli import parse_ids_async
from cylc.flow.exceptions import InputError


def print_response(multi_results):
"""Print server mutation response to stdout.
The response will be either:
- (False, argument-validation-error)
- (True, ID-of-queued-command)
Raise InputError if validation failed.
"""
for multi_result in multi_results:
for _cmd, results in multi_result.items():
for result in results.values():
for wf_res in result:
wf_id = wf_res["id"]
response = wf_res["response"]
if not response[0]:
# Validation failure
raise InputError(response[1])
else:
print(f"{wf_id}: command {response[1]} queued")


def call_multi(*args, **kwargs):
Expand Down Expand Up @@ -107,4 +131,4 @@ def _report_single(report, workflow, result):


def _report(_):
print('Command submitted; the scheduler will log any problems.')
pass
12 changes: 11 additions & 1 deletion cylc/flow/network/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
EDGES, FAMILY_PROXIES, TASK_PROXIES, WORKFLOW,
DELTA_ADDED, create_delta_store
)
from cylc.flow.exceptions import InputError
from cylc.flow.id import Tokens
from cylc.flow.network.schema import (
DEF_TYPES,
Expand Down Expand Up @@ -740,10 +741,19 @@ async def _mutation_mapper(
return method(**kwargs)

try:
self.schd.get_command_method(command)
meth = self.schd.get_command_method(command)
except AttributeError:
raise ValueError(f"Command '{command}' not found")

# If meth has a command validation function, call it.
try:
# TODO: properly handle "Callable has no attribute validate"?
meth.validate(**kwargs) # type: ignore
except AttributeError:
LOG.debug(f"No command validation for {command}")
except InputError as exc:
return (False, str(exc))

# Queue the command to the scheduler, with a unique command ID
cmd_uuid = str(uuid4())
LOG.info(f"{log1} ID={cmd_uuid}\n{log2}")
Expand Down
2 changes: 2 additions & 0 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
)
from cylc.flow.broadcast_mgr import BroadcastMgr
from cylc.flow.cfgspec.glbl_cfg import glbl_cfg
from cylc.flow import command_validation
from cylc.flow.config import WorkflowConfig
from cylc.flow.data_store_mgr import DataStoreMgr
from cylc.flow.id import Tokens
Expand Down Expand Up @@ -2146,6 +2147,7 @@ def command_force_trigger_tasks(
return self.pool.force_trigger_tasks(
tasks, flow, flow_wait, flow_descr)

@command_validation.validate
def command_set(
self,
tasks: List[str],
Expand Down
Loading

0 comments on commit a6269bc

Please sign in to comment.