Skip to content

Commit

Permalink
Merge pull request #3796 from datamel/rsync_file_install_with_platform
Browse files Browse the repository at this point in the history
File installation via rsync and install target installation added
  • Loading branch information
hjoliver authored Oct 4, 2020
2 parents 785ffbb + 71d47ea commit 7497ae4
Show file tree
Hide file tree
Showing 43 changed files with 668 additions and 155 deletions.
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ compatibility, the `cylc run` command will automatically symlink an existing
[#3816](https://github.com/cylc/cylc-flow/pull/3816) - change `cylc spawn`
command name to `cylc set-outputs` to better reflect its role in Cylc 8.

[#3796](https://github.com/cylc/cylc-flow/pull/3796) - Remote installation is
now on a per install target rather than a per platform basis. app/ bin/ etc/ lib/ directories are now installed on the target, configurable in flow.cylc.

[#3724](https://github.com/cylc/cylc-flow/pull/3724) - Re-implemented
the `cylc scan` command line interface and added a Python API for accessing
workflow scanning functionality.
Expand Down
4 changes: 4 additions & 0 deletions cylc/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@


CYLC_LOG = 'cylc'
FILE_INSTALL_LOG = 'cylc-rsync'

LOG = logging.getLogger(CYLC_LOG)
LOG.addHandler(logging.NullHandler()) # Start with a null handler
RSYNC_LOG = logging.getLogger(FILE_INSTALL_LOG)
RSYNC_LOG.addHandler(logging.NullHandler())

LOG_LEVELS = {
"INFO": logging.INFO,
Expand Down
15 changes: 14 additions & 1 deletion cylc/flow/cfgspec/globalcfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,8 @@
similar interface to ``scp``.
''')
Conf('ssh command',
VDR.V_STRING, 'ssh -oBatchMode=yes -oConnectTimeout=10',
VDR.V_STRING,
'ssh -oBatchMode=yes -oConnectTimeout=10',
desc='''
A string for the command used to invoke commands on this host.
This is not used on the suite host unless you run local tasks
Expand Down Expand Up @@ -476,6 +477,18 @@
accepts up to 236 characters.
''')
Conf('owner', VDR.V_STRING)
Conf('install target', VDR.V_STRING, desc='''
This defaults to the platform name. This will be used as the
target for remote file installation.
For example, to indicate to Cylc that Platform_A shares a file
system with localhost, we would configure as follows:
.. code-block:: cylc
[platforms]
[[Platform_A]]
install target = localhost
''')
with Conf('localhost', meta=Platform):
Conf('hosts', VDR.V_STRING_LIST, ['localhost'])

Expand Down
36 changes: 36 additions & 0 deletions cylc/flow/cfgspec/suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,42 @@
"suite-priority".
''')

with Conf('scheduler'):
Conf('install', VDR.V_STRING_LIST, desc='''
Configure the directories and files to be included in the remote
file installation.
.. note::
These, as standard, include the following directories:
* app
* bin
* etc
* lib
And include the server.key file (from the .service
directory), this is required for authentication.
These should be located in the top level of your Cylc workflow,
i.e. the directory that contains your flow.cylc file.
Directories must have a trailing slash.
For example, to add the following items to your file installation:
.. code-block:: none
~/cylc-run/workflow_x
|__dir1/
|__dir2/
|__file1
|__file2
.. code-block:: cylc
[scheduler]
install = dir/, dir2/, file1, file2
''')

with Conf('cylc'):
Conf('UTC mode', VDR.V_BOOLEAN)
Conf('cycle point format', VDR.V_CYCLE_POINT_FORMAT, desc='''
Expand Down
17 changes: 17 additions & 0 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,9 @@ def __init__(
self.cfg = self.pcfg.get(sparse=True)
self.mem_log("config.py: after get(sparse=True)")

if 'scheduler' in self.cfg and 'install' in self.cfg['scheduler']:
self.get_validated_rsync_includes()

# First check for the essential scheduling section.
if 'scheduling' not in self.cfg:
raise SuiteConfigError("missing [scheduling] section.")
Expand Down Expand Up @@ -2333,3 +2336,17 @@ def get_expected_failed_tasks(self):
return []
else:
return None

def get_validated_rsync_includes(self):
"""Validate and return items to be included in the file installation"""
includes = self.cfg['scheduler']['install']
illegal_includes = []
for include in includes:
if include.count("/") > 1:
illegal_includes.append(f"{include}")
if len(illegal_includes) > 0:
raise SuiteConfigError(
"Error in [scheduler] install. "
"Directories can only be from the top level, please "
"reconfigure:" + str(illegal_includes)[1:-1])
return includes
5 changes: 2 additions & 3 deletions cylc/flow/loggingutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
from cylc.flow.wallclock import (get_current_time_string,
get_time_string_from_unix_time)
from cylc.flow.cfgspec.glbl_cfg import glbl_cfg
from cylc.flow.pathutil import get_suite_run_log_name


class CylcLogFormatter(logging.Formatter):
Expand Down Expand Up @@ -118,8 +117,8 @@ class TimestampRotatingFileHandler(logging.FileHandler):
GLBL_KEY = 'suite logging'
MIN_BYTES = 1024

def __init__(self, suite, no_detach=False, timestamp=True):
logging.FileHandler.__init__(self, get_suite_run_log_name(suite))
def __init__(self, log_file_path, no_detach=False, timestamp=True):
logging.FileHandler.__init__(self, log_file_path)
self.no_detach = no_detach
self.stamp = None
self.formatter = CylcLogFormatter(timestamp=timestamp)
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/network/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def key_housekeeping(reg, platform=None, create=True):
"client_public_key": KeyInfo(
KeyType.PUBLIC,
KeyOwner.CLIENT,
suite_srv_dir=suite_srv_dir, platform=platform),
suite_srv_dir=suite_srv_dir, install_target=platform),
"client_private_key": KeyInfo(
KeyType.PRIVATE,
KeyOwner.CLIENT,
Expand Down
3 changes: 2 additions & 1 deletion cylc/flow/option_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

from ansimarkup import parse as cparse

from cylc.flow import LOG
from cylc.flow import LOG, RSYNC_LOG
import cylc.flow.flags
from cylc.flow.loggingutil import CylcLogFormatter

Expand Down Expand Up @@ -255,6 +255,7 @@ def parse_args(self, remove_opts=None):
else:
LOG.setLevel(logging.INFO)
# Remove NullHandler before add the StreamHandler
RSYNC_LOG.setLevel(logging.INFO)
while LOG.handlers:
LOG.handlers[0].close()
LOG.removeHandler(LOG.handlers[0])
Expand Down
6 changes: 6 additions & 0 deletions cylc/flow/pathutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ def get_suite_run_log_name(suite):
return expandvars(path)


def get_suite_file_install_log_name(suite):
"""Return suite file install log file path."""
path = get_suite_run_dir(suite, 'log', 'suite', 'file-installation-log')
return expandvars(path)


def get_suite_run_config_log_dir(suite, *args):
"""Return suite run flow.cylc log directory, join any extra args."""
return expandvars(get_suite_run_dir(suite, 'log', 'flow-config', *args))
Expand Down
25 changes: 24 additions & 1 deletion cylc/flow/platforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from cylc.flow.exceptions import PlatformLookupError
from cylc.flow.cfgspec.glbl_cfg import glbl_cfg
from cylc.flow.hostuserutil import is_remote_host


FORBIDDEN_WITH_PLATFORM = (
Expand Down Expand Up @@ -315,7 +316,7 @@ def platform_from_job_info(platforms, job, remote):
# We have some special logic to identify whether task host and task
# batch system match the platform in question.
if (
task_host == 'localhost' and
not is_remote_host(task_host) and
task_batch_system == 'background'
):
return 'localhost'
Expand Down Expand Up @@ -402,3 +403,25 @@ def fail_if_platform_and_host_conflict(task_conf, task_name, warn_only=False):
f"\"{task_name}\" has the following settings which "
f"are not compatible:\n{fail_items}"
)


def get_install_target_from_platform(platform):
"""Sets install target to configured or default platform name.
Args:
platform (dict):
A dict representing a platform.
Returns install target."""

if not platform['install target']:
platform['install target'] = platform['name']

return platform.get('install target')


def is_platform_with_target_in_list(
install_target, distinct_platforms_list):
"""Determines whether install target is in the list of platforms"""
for distinct_platform in distinct_platforms_list:
return install_target == distinct_platform['install target']
68 changes: 68 additions & 0 deletions cylc/flow/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import cylc.flow.flags
from cylc.flow import __version__ as CYLC_VERSION
from cylc.flow import LOG
from cylc.flow.platforms import get_platform, get_host_from_platform


Expand Down Expand Up @@ -154,6 +155,73 @@ def construct_platform_ssh_cmd(raw_cmd, platform, **kwargs):
return ret


def get_includes_to_rsync(rsync_includes=None):
"""Returns list of configured dirs/files for remote file installation."""

configured_includes = []

if rsync_includes is not None:
for include in rsync_includes:
if include.endswith("/"): # item is a directory
configured_includes.append("/" + include + "***")
else: # item is a file
configured_includes.append("/" + include)

return configured_includes


def construct_rsync_over_ssh_cmd(
src_path, dst_path, platform, rsync_includes=None):
"""Constructs the rsync command used for remote file installation.
Includes as standard the directories: app, bin, etc, lib; and the server
key, used for ZMQ authentication.
Args:
src_path(string): source path
dst_path(string): path of target
platform(dict)): contains info relating to platform
logfile(str): the path to the file logging the rsync
rsync_includes(list): files and directories to be included in the rsync
"""
dst_host = get_host_from_platform(platform)
rsync_cmd = ["rsync"]
ssh_cmd = platform['ssh command']
rsync_options = [
"-v",
"--perms",
"--recursive",
"--links",
"--checksum",
"--delete",
"--rsh=" + ssh_cmd,
"--include=/.service/",
"--include=/.service/server.key"
]
rsync_cmd.extend(rsync_options)
# Note to future devs - be wary of changing the order of the following
# rsync options, rsync is very particular about order of in/ex-cludes.

for exclude in ['log', 'share', 'work']:
rsync_cmd.append(f"--exclude={exclude}")
default_includes = [
'/app/***',
'/bin/***',
'/etc/***',
'/lib/***']
for include in default_includes:
rsync_cmd.append(f"--include={include}")
for include in get_includes_to_rsync(rsync_includes):
rsync_cmd.append(f"--include={include}")
# The following excludes are required in case these are added to the
rsync_cmd.append("--exclude=*") # exclude everything else
rsync_cmd.append(f"{src_path}/")
rsync_cmd.append(f"{dst_host}:{dst_path}/")
LOG.debug(f"rsync cmd use for file install: {' '.join(rsync_cmd)}")
return rsync_cmd


def construct_ssh_cmd(
raw_cmd, user=None, host=None, forward_x11=False, stdin=False,
ssh_cmd=None, ssh_login_shell=None, ssh_cylc=None, set_UTC=False,
Expand Down
46 changes: 29 additions & 17 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@
get_suite_test_log_name,
make_suite_run_tree,
)
from cylc.flow.platforms import (
get_install_target_from_platform,
get_platform,
is_platform_with_target_in_list)
from cylc.flow.profiler import Profiler
from cylc.flow.resources import extract_resources
from cylc.flow.subprocpool import SubProcPool
Expand Down Expand Up @@ -101,7 +105,6 @@
get_time_string_from_unix_time as time2str,
get_utc_mode)
from cylc.flow.xtrigger_mgr import XtriggerManager
from cylc.flow.platforms import get_platform


class SchedulerStop(CylcError):
Expand Down Expand Up @@ -715,26 +718,33 @@ def restart_remote_init(self):
Note: tasks should all be in the runahead pool at this point.
"""
auths = set()

distinct_install_target_platforms = []

for itask in self.pool.get_rh_tasks():
itask.platform['install target'] = (
get_install_target_from_platform(itask.platform))
if itask.state(*TASK_STATUSES_ACTIVE):
auths.add(itask.platform['name'])
while auths:
for platform_name in auths.copy():
if (
self.task_job_mgr.task_remote_mgr.remote_init(
platform_name, self.curve_auth,
self.client_pub_key_dir
if not (
is_platform_with_target_in_list(
itask.platform['install target'],
distinct_install_target_platforms
)
is not None
):
auths.remove(
platform_name
)
if auths:
sleep(1.0)
# Remote init is done via process pool
self.proc_pool.process()
distinct_install_target_platforms.append(itask.platform)

incomplete_init = False
for platform in distinct_install_target_platforms:
if (self.task_job_mgr.task_remote_mgr.remote_init(
platform, self.curve_auth,
self.client_pub_key_dir) is None):
incomplete_init = True
break
if incomplete_init:
# TODO: Review whether this sleep is needed.
sleep(1.0)
# Remote init is done via process pool
self.proc_pool.process()
self.command_poll_tasks()

def _load_task_run_times(self, row_idx, row):
Expand Down Expand Up @@ -1228,6 +1238,8 @@ def process_task_pool(self):
itasks = self.pool.get_ready_tasks()
if itasks:
self.is_updated = True
self.task_job_mgr.task_remote_mgr.rsync_includes = (
self.config.get_validated_rsync_includes())
for itask in self.task_job_mgr.submit_task_jobs(
self.suite,
itasks,
Expand Down
Loading

0 comments on commit 7497ae4

Please sign in to comment.