Skip to content

Commit

Permalink
Xtrig arg validate (cylc#5955)
Browse files Browse the repository at this point in the history
* Xtrigger function arg validation.
* Add integration tests for xtrigger validation, which provides simple examples for each of the built in xtriggers.
- Xrandom validate function
- Init test xrandom validate function
- Add unit tests for validation of built in xtriggers
- Automatically validate xtrigger function signature

---------

Co-authored-by: Hilary James Oliver <hilary.j.oliver@gmail.com>

* Apply suggestions from code review

Co-authored-by: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>

* fix flake8

* Improve xtrigger validation (#60)

* Improve xtrigger validation

* wall_clock: use placeholder function for signature validation & autodocs

* Fix docstring for autodoc [skip ci]

---------

Co-authored-by: Hilary Oliver <hilary.j.oliver@gmail.com>
Co-authored-by: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
  • Loading branch information
3 people authored Feb 22, 2024
1 parent 24caa24 commit 2b2fc77
Show file tree
Hide file tree
Showing 22 changed files with 602 additions and 199 deletions.
1 change: 1 addition & 0 deletions changes.d/5955.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Support xtrigger argument validation.
42 changes: 20 additions & 22 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
from cylc.flow.id import Tokens
from cylc.flow.cycling.integer import IntegerInterval
from cylc.flow.cycling.iso8601 import ingest_time, ISO8601Interval

from cylc.flow.exceptions import (
CylcError,
InputError,
Expand Down Expand Up @@ -1718,28 +1719,25 @@ def generate_triggers(self, lexpression, left_nodes, right, seq,
if label != 'wall_clock':
raise WorkflowConfigError(f"xtrigger not defined: {label}")
else:
# Allow "@wall_clock" in the graph as an undeclared
# zero-offset clock xtrigger.
xtrig = SubFuncContext(
'wall_clock', 'wall_clock', [], {})
# Allow "@wall_clock" in graph as implicit zero-offset.
xtrig = SubFuncContext('wall_clock', 'wall_clock', [], {})

if xtrig.func_name == 'wall_clock':
if self.cycling_type == INTEGER_CYCLING_TYPE:
raise WorkflowConfigError(
"Clock xtriggers require datetime cycling:"
f" {label} = {xtrig.get_signature()}"
)
else:
# Convert offset arg to kwarg for certainty later.
if "offset" not in xtrig.func_kwargs:
xtrig.func_kwargs["offset"] = None
with suppress(IndexError):
xtrig.func_kwargs["offset"] = xtrig.func_args[0]
if (
xtrig.func_name == 'wall_clock'
and self.cycling_type == INTEGER_CYCLING_TYPE
):
raise WorkflowConfigError(
"Clock xtriggers require datetime cycling:"
f" {label} = {xtrig.get_signature()}"
)

if self.xtrigger_mgr is None:
XtriggerManager.validate_xtrigger(label, xtrig, self.fdir)
else:
# Generic xtrigger validation.
XtriggerManager.check_xtrigger(label, xtrig, self.fdir)

if self.xtrigger_mgr:
# (not available during validation)
self.xtrigger_mgr.add_trig(label, xtrig, self.fdir)

self.taskdefs[right].add_xtrig_label(label, seq)

def get_actual_first_point(self, start_point):
Expand Down Expand Up @@ -2427,10 +2425,10 @@ def upgrade_clock_triggers(self):
# Derive an xtrigger label.
label = '_'.join(('_cylc', 'wall_clock', task_name))
# Define the xtrigger function.
xtrig = SubFuncContext(label, 'wall_clock', [], {})
xtrig.func_kwargs["offset"] = offset
args = [] if offset is None else [offset]
xtrig = SubFuncContext(label, 'wall_clock', args, {})
if self.xtrigger_mgr is None:
XtriggerManager.validate_xtrigger(label, xtrig, self.fdir)
XtriggerManager.check_xtrigger(label, xtrig, self.fdir)
else:
self.xtrigger_mgr.add_trig(label, xtrig, self.fdir)
# Add it to the task, for each sequence that the task appears in.
Expand Down
7 changes: 3 additions & 4 deletions cylc/flow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,13 +241,12 @@ class XtriggerConfigError(WorkflowConfigError):
"""

def __init__(self, label: str, trigger: str, message: str):
def __init__(self, label: str, message: str):
self.label: str = label
self.trigger: str = trigger
self.message: str = message

def __str__(self):
return f'[{self.label}] {self.message}'
def __str__(self) -> str:
return f'[@{self.label}] {self.message}'


class ClientError(CylcError):
Expand Down
11 changes: 7 additions & 4 deletions cylc/flow/scripts/function_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
(This command is for internal use.)
Run a Python function "<name>(*args, **kwargs)" in the process pool. It must be
defined in a module of the same name. Positional and keyword arguments must be
passed in as JSON strings. <src-dir> is the workflow source dir, needed to find
local xtrigger modules.
Run a Python xtrigger function "<name>(*args, **kwargs)" in the process pool.
It must be in a module of the same name. Positional and keyword arguments must
be passed in as JSON strings.
Python entry points are the preferred way to make xtriggers available to the
scheduler, but local xtriggers can be stored in <src-dir>.
"""
import sys

Expand Down
77 changes: 48 additions & 29 deletions cylc/flow/subprocpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
from cylc.flow.task_proxy import TaskProxy
from cylc.flow.wallclock import get_current_time_string

_XTRIG_FUNCS: dict = {}
_XTRIG_MOD_CACHE: dict = {}
_XTRIG_FUNC_CACHE: dict = {}


def _killpg(proc, signal):
Expand All @@ -66,68 +67,86 @@ def _killpg(proc, signal):
return True


def get_func(func_name, src_dir):
"""Find and return an xtrigger function from a module of the same name.
def get_xtrig_mod(mod_name, src_dir):
"""Find, cache, and return a named xtrigger module.
These locations are checked in this order:
- <src_dir>/lib/python/
- `$CYLC_PYTHONPATH`
- defined via a `cylc.xtriggers` entry point for an
installed Python package.
Locations checked in this order:
- <src_dir>/lib/python (prepend to sys.path)
- $CYLC_PYTHONPATH (already in sys.path)
- `cylc.xtriggers` entry point
Workflow source directory passed in as this is executed in an independent
process in the command pool and therefore doesn't know about the workflow.
(Check entry point last so users can override with local implementations).
Workflow source dir passed in - this executes in an independent subprocess.
Raises:
ImportError, if the module is not found
"""
if func_name in _XTRIG_FUNCS:
return _XTRIG_FUNCS[func_name]
if mod_name in _XTRIG_MOD_CACHE:
# Found and cached already.
return _XTRIG_MOD_CACHE[mod_name]

# First look in <src-dir>/lib/python.
sys.path.insert(0, os.path.join(src_dir, 'lib', 'python'))
mod_name = func_name
try:
mod_by_name = __import__(mod_name, fromlist=[mod_name])
_XTRIG_MOD_CACHE[mod_name] = __import__(mod_name, fromlist=[mod_name])
except ImportError:
# Look for xtriggers via entry_points for external sources.
# Do this after the lib/python and PYTHONPATH approaches to allow
# users to override entry_point definitions with local/custom
# implementations.
# Then entry point.
for entry_point in iter_entry_points('cylc.xtriggers'):
if func_name == entry_point.name:
_XTRIG_FUNCS[func_name] = entry_point.load()
return _XTRIG_FUNCS[func_name]

if mod_name == entry_point.name:
_XTRIG_MOD_CACHE[mod_name] = entry_point.load()
return _XTRIG_MOD_CACHE[mod_name]
# Still unable to find anything so abort
raise

try:
_XTRIG_FUNCS[func_name] = getattr(mod_by_name, func_name)
except AttributeError:
# Module func_name has no function func_name, nor an entry_point entry.
raise
return _XTRIG_FUNCS[func_name]
return _XTRIG_MOD_CACHE[mod_name]


def get_xtrig_func(mod_name, func_name, src_dir):
"""Find, cache, and return a function from an xtrigger module.
Raises:
ImportError, if the module is not found
AttributeError, if the function is not found in the module
"""
if (mod_name, func_name) in _XTRIG_FUNC_CACHE:
return _XTRIG_FUNC_CACHE[(mod_name, func_name)]

mod = get_xtrig_mod(mod_name, src_dir)

_XTRIG_FUNC_CACHE[(mod_name, func_name)] = getattr(mod, func_name)

return _XTRIG_FUNC_CACHE[(mod_name, func_name)]


def run_function(func_name, json_args, json_kwargs, src_dir):
"""Run a Python function in the process pool.
func_name(*func_args, **func_kwargs)
The function is presumed to be in a module of the same name.
Redirect any function stdout to stderr (and workflow log in debug mode).
Return value printed to stdout as a JSON string - allows use of the
existing process pool machinery as-is. src_dir is for local modules.
"""
func_args = json.loads(json_args)
func_kwargs = json.loads(json_kwargs)

# Find and import then function.
func = get_func(func_name, src_dir)
func = get_xtrig_func(func_name, func_name, src_dir)

# Redirect stdout to stderr.
orig_stdout = sys.stdout
sys.stdout = sys.stderr
res = func(*func_args, **func_kwargs)

# Restore stdout.
sys.stdout = orig_stdout

# Write function return value as JSON to stdout.
sys.stdout.write(json.dumps(res))

Expand Down
Loading

0 comments on commit 2b2fc77

Please sign in to comment.