Skip to content

Commit

Permalink
Fix workflow-state command and xtrigger. (#5809)
Browse files Browse the repository at this point in the history
undefined
  • Loading branch information
hjoliver authored Jun 17, 2024
1 parent cd10adf commit 6eb8f61
Show file tree
Hide file tree
Showing 103 changed files with 2,940 additions and 1,163 deletions.
1 change: 1 addition & 0 deletions changes.d/5809.feat.d
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
The workflow-state command and xtrigger are now flow-aware and take universal IDs instead of separate arguments for cycle point, task name, etc. (which are still supported, but deprecated).
2 changes: 2 additions & 0 deletions changes.d/5809.fix.d
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fix bug where the "cylc workflow-state" command only polled for
task-specific status queries and custom outputs.
88 changes: 39 additions & 49 deletions cylc/flow/cfgspec/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1020,8 +1020,9 @@ def get_script_common_text(this: str, example: Optional[str] = None):
task has generated the outputs it was expected to.
If the task fails this check its outputs are considered
:term:`incomplete` and a warning will be raised alerting you
that something has gone wrong which requires investigation.
:term:`incomplete <output completion>` and a warning will be
raised alerting you that something has gone wrong which
requires investigation.
.. note::
Expand Down Expand Up @@ -1731,57 +1732,34 @@ def get_script_common_text(this: str, example: Optional[str] = None):
''')

with Conf('workflow state polling', desc=f'''
Configure automatic workflow polling tasks as described in
:ref:`WorkflowStatePolling`.
The items in this section reflect
options and defaults of the ``cylc workflow-state`` command,
except that the target workflow ID and the
``--task``, ``--cycle``, and ``--status`` options are
taken from the graph notation.
Deprecated support for automatic workflow state polling tasks
as described in :ref:`WorkflowStatePolling`. Note the Cylc 7
"user" and "host" config items are not supported.
.. versionchanged:: 8.0.0
{REPLACES}``[runtime][<namespace>]suite state polling``.
'''):
Conf('user', VDR.V_STRING, desc='''
Username of your account on the workflow host.
The polling
``cylc workflow-state`` command will be
run on the remote account.
''')
Conf('host', VDR.V_STRING, desc='''
The hostname of the target workflow.
.. deprecated:: 8.3.0
The polling
``cylc workflow-state`` command will be run there.
''')
Please use the :ref:`workflow_state xtrigger
<Built-in Workflow State Triggers>` instead.
'''):
Conf('interval', VDR.V_INTERVAL, desc='''
Polling interval.
''')
Conf('max-polls', VDR.V_INTEGER, desc='''
The maximum number of polls before timing out and entering
the "failed" state.
Maximum number of polls to attempt before the task fails.
''')
Conf('message', VDR.V_STRING, desc='''
Wait for the task in the target workflow to receive a
specified message rather than achieve a state.
Target task output (task message, not trigger name).
''')
Conf('run-dir', VDR.V_STRING, desc='''
Specify the location of the top level cylc-run directory
for the other workflow.
For your own workflows, there is no need to set this as it
is always ``~/cylc-run/``. But for other workflows,
(e.g those owned by others), or mirrored workflow databases
use this item to specify the location of the top level
cylc run directory (the database should be in a the same
place relative to this location for each workflow).
Conf('alt-cylc-run-dir', VDR.V_STRING, desc='''
The cylc-run directory location of the target workflow.
Use to poll workflows owned by other users.
''')
Conf('verbose mode', VDR.V_BOOLEAN, desc='''
Run the polling ``cylc workflow-state`` command in verbose
output mode.
Run the ``cylc workflow-state`` command in verbose mode.
''')

with Conf('environment', desc='''
Expand Down Expand Up @@ -1958,9 +1936,10 @@ def upg(cfg, descr):
"""
u = upgrader(cfg, descr)

u.obsolete(
'7.8.0',
['runtime', '__MANY__', 'suite state polling', 'template'])
'7.8.0', ['runtime', '__MANY__', 'suite state polling', 'template']
)
u.obsolete('7.8.1', ['cylc', 'events', 'reset timer'])
u.obsolete('7.8.1', ['cylc', 'events', 'reset inactivity timer'])
u.obsolete('8.0.0', ['cylc', 'force run mode'])
Expand Down Expand Up @@ -1996,6 +1975,25 @@ def upg(cfg, descr):
['cylc', 'mail', 'task event batch interval'],
silent=cylc.flow.flags.cylc7_back_compat,
)
u.deprecate(
'8.0.0',
['runtime', '__MANY__', 'suite state polling'],
['runtime', '__MANY__', 'workflow state polling'],
silent=cylc.flow.flags.cylc7_back_compat,
is_section=True,
)
u.obsolete(
'8.0.0', ['runtime', '__MANY__', 'workflow state polling', 'host'])
u.obsolete(
'8.0.0', ['runtime', '__MANY__', 'workflow state polling', 'user'])

u.deprecate(
'8.3.0',
['runtime', '__MANY__', 'workflow state polling', 'run-dir'],
['runtime', '__MANY__', 'workflow state polling', 'alt-cylc-run-dir'],
silent=cylc.flow.flags.cylc7_back_compat,
)

u.deprecate(
'8.0.0',
['cylc', 'parameters'],
Expand Down Expand Up @@ -2063,14 +2061,6 @@ def upg(cfg, descr):
silent=cylc.flow.flags.cylc7_back_compat,
)

u.deprecate(
'8.0.0',
['runtime', '__MANY__', 'suite state polling'],
['runtime', '__MANY__', 'workflow state polling'],
silent=cylc.flow.flags.cylc7_back_compat,
is_section=True
)

for job_setting in [
'execution polling intervals',
'execution retry delays',
Expand Down Expand Up @@ -2196,7 +2186,7 @@ def upgrade_graph_section(cfg: Dict[str, Any], descr: str) -> None:
keys.add(key)
if keys and not cylc.flow.flags.cylc7_back_compat:
msg = (
'deprecated graph items were automatically upgraded '
'graph items were automatically upgraded '
f'in "{descr}":\n'
f' * (8.0.0) {msg_old} -> {msg_new}'
)
Expand Down
73 changes: 32 additions & 41 deletions cylc/flow/command_polling.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import sys
from time import sleep
from cylc.flow import LOG


class Poller:
Expand All @@ -25,39 +26,30 @@ class Poller:

@classmethod
def add_to_cmd_options(cls, parser, d_interval=60, d_max_polls=10):
"""Add command line options for commands that can do polling"""
"""Add command line options for commands that can do polling."""
parser.add_option(
"--max-polls",
help=r"Maximum number of polls (default: %default).",
type="int",
metavar="INT",
action="store",
dest="max_polls",
default=d_max_polls)
default=d_max_polls
)
parser.add_option(
"--interval",
help=r"Polling interval in seconds (default: %default).",
type="int",
metavar="SECS",
action="store",
dest="interval",
default=d_interval)
default=d_interval
)

def __init__(self, condition, interval, max_polls, args):

self.condition = condition # e.g. "workflow stopped"

# check max_polls is an int
try:
self.max_polls = int(max_polls)
except ValueError:
sys.exit("max_polls must be an int")

# check interval is an int
try:
self.interval = int(interval)
except ValueError:
sys.exit("interval must be an integer")

self.n_polls = 0
self.interval = interval
self.max_polls = max_polls or 1 # no point in zero polls
self.args = args # any extra parameters needed by check()

async def check(self):
Expand All @@ -66,29 +58,28 @@ async def check(self):

async def poll(self):
"""Poll for the condition embodied by self.check().
Return True if condition met, or False if polling exhausted."""
if self.max_polls == 0:
# exit 1 as we can't know if the condition is satisfied
sys.exit("WARNING: nothing to do (--max-polls=0)")
elif self.max_polls == 1:
sys.stdout.write("checking for '%s'" % self.condition)
else:
sys.stdout.write("polling for '%s'" % self.condition)
Return True if condition met, or False if polling exhausted.
"""
n_polls = 0
result = False

while True:
n_polls += 1
result = await self.check()
if self.max_polls != 1:
sys.stderr.write(".")
sys.stderr.flush()
if result or n_polls >= self.max_polls:
if self.max_polls != 1:
sys.stderr.write("\n")
sys.stderr.flush()
break
sleep(self.interval)

while self.n_polls < self.max_polls:
self.n_polls += 1
if await self.check():
sys.stdout.write(": satisfied\n")
return True
if self.max_polls > 1:
sys.stdout.write(".")
sleep(self.interval)
sys.stdout.write("\n")
if self.max_polls > 1:
sys.stderr.write(
"ERROR: condition not satisfied after %d polls\n" %
self.max_polls)
if result:
return True
else:
sys.stderr.write("ERROR: condition not satisfied\n")
return False
LOG.error(f"failed after {n_polls} polls")
return False
Loading

0 comments on commit 6eb8f61

Please sign in to comment.