Skip to content

Commit

Permalink
Implement scheduler command method arg validation.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Apr 11, 2024
1 parent 4b70574 commit 74e7f30
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 6 deletions.
77 changes: 77 additions & 0 deletions cylc/flow/command_validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Cylc command argument validation funcions."""


from typing import (
Callable,
List,
Optional,
)

from cylc.flow.exceptions import InputError


def print_command_response(multi_results):
"""Print server mutation response, for the CLI.
"""
for multi_result in multi_results:
for _cmd, results in multi_result.items():
for result in results.values():
for wf_res in result:
wf_id = wf_res["id"]
response = wf_res["response"]
if not response[0]:
# Validation failure
print(

Check warning on line 41 in cylc/flow/command_validation.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/command_validation.py#L41

Added line #L41 was not covered by tests
f"{wf_id}: command validation failed:\n"
f" {response[1]}"
)
else:
print(f"{wf_id}: command queued (ID {response[1]})")


def validate(func: Callable):
"""Decorate scheduler commands with a callable .validate attribute.
"""
# TODO: properly handle "Callable has no attribute validate"?
func.validate = globals()[ # type: ignore
func.__name__.replace("command", "validate")
]
return func


def validate_set(
tasks: List[str],
flow: List[str],
outputs: Optional[List[str]] = None,
prerequisites: Optional[List[str]] = None,
flow_wait: bool = False,
flow_descr: Optional[str] = None
) -> None:
"""Validate args of the scheduler "command_set" method.
Raise InputError if validation fails.
"""
# TODO: validate the actual args

# For initial testing...
if False:
raise InputError(" ...args wrong innit...")
2 changes: 1 addition & 1 deletion cylc/flow/network/multi.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,4 @@ def _report_single(report, workflow, result):


def _report(_):
print('Command submitted; the scheduler will log any problems.')
pass
12 changes: 11 additions & 1 deletion cylc/flow/network/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
EDGES, FAMILY_PROXIES, TASK_PROXIES, WORKFLOW,
DELTA_ADDED, create_delta_store
)
from cylc.flow.exceptions import InputError
from cylc.flow.id import Tokens
from cylc.flow.network.schema import (
DEF_TYPES,
Expand Down Expand Up @@ -740,10 +741,19 @@ async def _mutation_mapper(
return method(**kwargs)

try:
self.schd.get_command_method(command)
meth = self.schd.get_command_method(command)
except AttributeError:
raise ValueError(f"Command '{command}' not found")

# If meth has a command validation function, call it.
try:
# TODO: properly handle "Callable has no attribute validate"?
meth.validate(**kwargs) # type: ignore
except AttributeError:
LOG.debug(f"No command validation for {command}")
except InputError as exc:
return (False, str(exc))

Check warning on line 755 in cylc/flow/network/resolvers.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/network/resolvers.py#L754-L755

Added lines #L754 - L755 were not covered by tests

# Queue the command to the scheduler, with a unique command ID
cmd_uuid = str(uuid4())
LOG.info(f"{log1} ID={cmd_uuid}\n{log2}")
Expand Down
2 changes: 2 additions & 0 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
)
from cylc.flow.broadcast_mgr import BroadcastMgr
from cylc.flow.cfgspec.glbl_cfg import glbl_cfg
from cylc.flow import command_validation
from cylc.flow.config import WorkflowConfig
from cylc.flow.data_store_mgr import DataStoreMgr
from cylc.flow.id import Tokens
Expand Down Expand Up @@ -2146,6 +2147,7 @@ def command_force_trigger_tasks(
return self.pool.force_trigger_tasks(
tasks, flow, flow_wait, flow_descr)

@command_validation.validate
def command_set(
self,
tasks: List[str],
Expand Down
10 changes: 6 additions & 4 deletions cylc/flow/scripts/set.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
from functools import partial
from typing import TYPE_CHECKING, List, Optional

from cylc.flow.command_validation import print_command_response
from cylc.flow.exceptions import InputError
from cylc.flow.network.client_factory import get_client
from cylc.flow.network.multi import call_multi
Expand Down Expand Up @@ -411,14 +412,15 @@ async def run(
}
}

await pclient.async_request('graphql', mutation_kwargs)
return await pclient.async_request('graphql', mutation_kwargs)


@cli_function(get_option_parser)
def main(parser: COP, options: 'Values', *ids) -> None:

validate_opts(options.outputs, options.prerequisites)
validate_flow_opts(options)
call_multi(
partial(run, options),
*ids,

print_command_response(
call_multi(partial(run, options), *ids)
)

0 comments on commit 74e7f30

Please sign in to comment.