Skip to content

Commit

Permalink
scheduler commands: refactor
Browse files Browse the repository at this point in the history
* Separate scheduler commands from the main scheduler code.
* Put all commands behind a decorator to protect against leakage.
  (e.g. an import cannot be mistaken for a command/validator).
* Integrate command validators in with the commands themselves.
  This removes the duplicate command/validate function signatures and
  makes validators implicit (i.e. they are called as part of the command
  not searched for and called separately to the command).
* Make all commands async and provide a blank validator for each
  (i.e. add a yield at the top of each function). Note this means
  that command args/kwargs are now validated as part of queueing the
  command itself by default.
* Partially addresses #3329 by making commands generators capable of
  returning multiple messages.
* Improve interface for multi-workflow commands:
  - Colour formatting for success/error cases.
  - One line per workflow (presuming single line output).
  - Exceptions in processing one workflow's response caught and logged
    rather than interrupting other workflow's output.
  - Commands exit 1 if any workflow returns an error response.
* Add multi-workflow test for "cylc broadcast".
  • Loading branch information
oliver-sanders committed Jun 6, 2024
1 parent 895ad2f commit 24adfe0
Show file tree
Hide file tree
Showing 33 changed files with 1,116 additions and 531 deletions.
36 changes: 35 additions & 1 deletion cylc/flow/async_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,12 +420,43 @@ async def asyncqgen(queue):
yield await queue.get()


async def unordered_map(coroutine, iterator):
def wrap_exception(coroutine):
"""Catch and return exceptions rather than raising them.
Examples:
>>> async def myfcn():
... raise Exception('foo')
>>> mywrappedfcn = wrap_exception(myfcn)
>>> ret = asyncio.run(mywrappedfcn()) # the exception is not raised...
>>> ret # ...it is returned
Exception('foo')
"""
async def _inner(*args, **kwargs):
nonlocal coroutine
try:
return await coroutine(*args, **kwargs)
except Exception as exc:
return exc

return _inner


async def unordered_map(coroutine, iterator, wrap_exceptions=False):
"""An asynchronous map function which does not preserve order.
Use in situations where you want results as they are completed rather than
once they are all completed.
Args:
coroutine:
The async function you want to call.
iterator:
The arguments you want to call it with.
wrap_exceptions:
If True, then exceptions will be caught and returned rather than
raised.
Example:
# define your async coroutine
>>> async def square(x): return x**2
Expand All @@ -444,6 +475,9 @@ async def unordered_map(coroutine, iterator):
[((0,), 0), ((1,), 1), ((2,), 4), ((3,), 9), ((4,), 16)]
"""
if wrap_exceptions:
coroutine = wrap_exception(coroutine)

# create tasks
pending = []
for args in iterator:
Expand Down
88 changes: 29 additions & 59 deletions cylc/flow/command_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@


from typing import (
Callable,
List,
Optional,
)
Expand All @@ -36,38 +35,27 @@
)


def validate(func: Callable):
"""Decorate scheduler commands with a callable .validate attribute.
"""
# TODO: properly handle "Callable has no attribute validate"?
func.validate = globals()[ # type: ignore
func.__name__.replace("command", "validate")
]
return func


def validate_flow_opts(flows: List[str], flow_wait: bool) -> None:
def flow_opts(flows: List[str], flow_wait: bool) -> None:
"""Check validity of flow-related CLI options.
Note the schema defaults flows to ["all"].
Examples:
Good:
>>> validate_flow_opts(["new"], False)
>>> validate_flow_opts(["1", "2"], False)
>>> validate_flow_opts(["1", "2"], True)
>>> flow_opts(["new"], False)
>>> flow_opts(["1", "2"], False)
>>> flow_opts(["1", "2"], True)
Bad:
>>> validate_flow_opts(["none", "1"], False)
>>> flow_opts(["none", "1"], False)
Traceback (most recent call last):
cylc.flow.exceptions.InputError: ... must all be integer valued
>>> validate_flow_opts(["cheese", "2"], True)
>>> flow_opts(["cheese", "2"], True)
Traceback (most recent call last):
cylc.flow.exceptions.InputError: ... or 'all', 'new', or 'none'
>>> validate_flow_opts(["new"], True)
>>> flow_opts(["new"], True)
Traceback (most recent call last):
cylc.flow.exceptions.InputError: ...
Expand All @@ -87,36 +75,36 @@ def validate_flow_opts(flows: List[str], flow_wait: bool) -> None:
raise InputError(ERR_OPT_FLOW_WAIT)

Check warning on line 75 in cylc/flow/command_validation.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/command_validation.py#L75

Added line #L75 was not covered by tests


def validate_prereqs(prereqs: Optional[List[str]]):
def prereqs(prereqs: Optional[List[str]]):
"""Validate a list of prerequisites, add implicit ":succeeded".
Comma-separated lists should be split already, client-side.
Examples:
# Set multiple at once:
>>> validate_prereqs(['1/foo:bar', '2/foo:baz'])
>>> prereqs(['1/foo:bar', '2/foo:baz'])
['1/foo:bar', '2/foo:baz']
# --pre=all
>>> validate_prereqs(["all"])
>>> prereqs(["all"])
['all']
# implicit ":succeeded"
>>> validate_prereqs(["1/foo"])
>>> prereqs(["1/foo"])
['1/foo:succeeded']
# Error: invalid format:
>>> validate_prereqs(["fish"])
>>> prereqs(["fish"])
Traceback (most recent call last):
cylc.flow.exceptions.InputError: ...
# Error: invalid format:
>>> validate_prereqs(["1/foo::bar"])
>>> prereqs(["1/foo::bar"])
Traceback (most recent call last):
cylc.flow.exceptions.InputError: ...
# Error: "all" must be used alone:
>>> validate_prereqs(["all", "2/foo:baz"])
>>> prereqs(["all", "2/foo:baz"])
Traceback (most recent call last):
cylc.flow.exceptions.InputError: ...
Expand All @@ -127,7 +115,7 @@ def validate_prereqs(prereqs: Optional[List[str]]):
prereqs2 = []
bad: List[str] = []
for pre in prereqs:
p = validate_prereq(pre)
p = prereq(pre)
if p is not None:
prereqs2.append(p)
else:
Expand All @@ -145,23 +133,23 @@ def validate_prereqs(prereqs: Optional[List[str]]):
return prereqs2


def validate_prereq(prereq: str) -> Optional[str]:
def prereq(prereq: str) -> Optional[str]:
"""Return prereq (with :succeeded) if valid, else None.
Format: cycle/task[:output]
Examples:
>>> validate_prereq('1/foo:succeeded')
>>> prereq('1/foo:succeeded')
'1/foo:succeeded'
>>> validate_prereq('1/foo')
>>> prereq('1/foo')
'1/foo:succeeded'
>>> validate_prereq('all')
>>> prereq('all')
'all'
# Error:
>>> validate_prereq('fish')
>>> prereq('fish')
"""
try:
Expand All @@ -181,25 +169,25 @@ def validate_prereq(prereq: str) -> Optional[str]:
return prereq


def validate_outputs(outputs: Optional[List[str]]):
def outputs(outputs: Optional[List[str]]):
"""Validate outputs.
Comma-separated lists should be split already, client-side.
Examples:
Good:
>>> validate_outputs(['a', 'b'])
>>> outputs(['a', 'b'])
['a', 'b']
>>> validate_outputs(["required"]) # "required" is explicit default
>>> outputs(["required"]) # "required" is explicit default
[]
Bad:
>>> validate_outputs(["required", "a"])
>>> outputs(["required", "a"])
Traceback (most recent call last):
cylc.flow.exceptions.InputError: --out=required must be used alone
>>> validate_outputs(["waiting"])
>>> outputs(["waiting"])
Traceback (most recent call last):
cylc.flow.exceptions.InputError: Tasks cannot be set to waiting...
Expand All @@ -219,39 +207,21 @@ def validate_outputs(outputs: Optional[List[str]]):
return outputs


def validate_consistency(
def consistency(
outputs: Optional[List[str]],
prereqs: Optional[List[str]]
) -> None:
"""Check global option consistency
Examples:
>>> validate_consistency(["a"], None) # OK
>>> consistency(["a"], None) # OK
>>> validate_consistency(None, ["1/a:failed"]) #OK
>>> consistency(None, ["1/a:failed"]) #OK
>>> validate_consistency(["a"], ["1/a:failed"])
>>> consistency(["a"], ["1/a:failed"])
Traceback (most recent call last):
cylc.flow.exceptions.InputError: ...
"""
if outputs and prereqs:
raise InputError("Use --prerequisite or --output, not both.")

Check warning on line 227 in cylc/flow/command_validation.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/command_validation.py#L227

Added line #L227 was not covered by tests


def validate_set(
tasks: List[str],
flow: List[str],
outputs: Optional[List[str]] = None,
prerequisites: Optional[List[str]] = None,
flow_wait: bool = False,
flow_descr: Optional[str] = None
) -> None:
"""Validate args of the scheduler "command_set" method.
Raise InputError if validation fails.
"""
validate_consistency(outputs, prerequisites)
outputs = validate_outputs(outputs)
prerequisites = validate_prereqs(prerequisites)
validate_flow_opts(flow, flow_wait)
Loading

0 comments on commit 24adfe0

Please sign in to comment.