Skip to content

Commit

Permalink
prevent cylc submitting more than 100 jobs at once and blocking the S…
Browse files Browse the repository at this point in the history
…TD pipes
  • Loading branch information
wxtim committed Nov 19, 2018
1 parent e6ab1d3 commit a6ee1d6
Showing 1 changed file with 55 additions and 34 deletions.
89 changes: 55 additions & 34 deletions lib/cylc/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ def submit_task_jobs(self, suite, itasks, is_simulation=False):
continue
# Build the "cylc jobs-submit" command
cmd = ['cylc', self.JOBS_SUBMIT]
if LOG.isEnabledFor(DEBUG):
if cylc.flags.debug:
cmd.append('--debug')
if get_utc_mode():
cmd.append('--utc-mode')
Expand All @@ -285,33 +285,53 @@ def submit_task_jobs(self, suite, itasks, is_simulation=False):
cmd.append('--')
cmd.append(glbl_cfg().get_derived_host_item(
suite, 'suite job log directory', host, owner))
stdin_file_paths = []
job_log_dirs = []
for itask in sorted(itasks, key=lambda itask: itask.identity):
if remote_mode:
stdin_file_paths.append(
get_task_job_job_log(
suite, itask.point, itask.tdef.name,
itask.submit_num))
job_log_dirs.append(get_task_job_id(
itask.point, itask.tdef.name, itask.submit_num))
# The job file is now (about to be) used: reset the file write
# flag so that subsequent manual retrigger will generate a new
# job file.
itask.local_job_file_path = None
itask.state.reset_state(TASK_STATUS_READY)
if itask.state.outputs.has_custom_triggers():
self.suite_db_mgr.put_update_task_outputs(itask)
cmd += job_log_dirs
self.proc_pool.put_command(
SubProcContext(
self.JOBS_SUBMIT,
cmd,
stdin_file_paths=stdin_file_paths,
job_log_dirs=job_log_dirs,
**kwargs
),
self._submit_task_jobs_callback, [suite, itasks])

def list_chopper(list_, n):
""" Iterator returning sublists of length n
Args:
list_ (list):
list to be chopped
n (int):
number of pieces into which to chop list_
Yields:
A chunk, size n of the original list
"""
for i in range(0, len(list_), n):
yield list_[i:i + n]

# Chop itasks into a series of shorter lists if it's very big
# to prevent overloading of stdout and stderr pipes.
itasks_batches = [i for i in list_chopper(itasks, 100)]
for itasks_batch in itasks_batches:
stdin_file_paths = []
job_log_dirs = []
for itask in sorted(itasks_batch,
key=lambda itask: itask.identity):
if remote_mode:
stdin_file_paths.append(
get_task_job_job_log(
suite, itask.point, itask.tdef.name,
itask.submit_num))
job_log_dirs.append(get_task_job_id(
itask.point, itask.tdef.name, itask.submit_num))
# The job file is now (about to be) used: reset the file
# write flag so that subsequent manual retrigger will
# generate a new job file.
itask.local_job_file_path = None
itask.state.reset_state(TASK_STATUS_READY)
if itask.state.outputs.has_custom_triggers():
self.suite_db_mgr.put_update_task_outputs(itask)
cmd += job_log_dirs
self.proc_pool.put_command(
SubProcContext(
self.JOBS_SUBMIT,
cmd,
stdin_file_paths=stdin_file_paths,
job_log_dirs=job_log_dirs,
**kwargs
),
self._submit_task_jobs_callback, [suite, itasks_batch])
return done_tasks

@staticmethod
Expand Down Expand Up @@ -370,7 +390,7 @@ def _get_job_scripts(itask, rtconfig):
comstr = "cylc suite-state " + \
" --task=" + itask.tdef.suite_polling_cfg['task'] + \
" --point=" + str(itask.point)
if LOG.isEnabledFor(DEBUG):
if cylc.flags.debug:
comstr += ' --debug'
for key, fmt in [
('user', ' --%s=%s'),
Expand Down Expand Up @@ -505,10 +525,11 @@ def _manip_task_jobs_callback(
point, name, submit_num = path.split(os.sep, 2)
itask = tasks[(point, name, submit_num)]
callback(suite, itask, ctx, line)
except (LookupError, ValueError) as exc:
LOG.warning(
'Unhandled %s output: %s', ctx.cmd_key, line)
LOG.exception(exc)
except (KeyError, ValueError):
if cylc.flags.debug:
LOG.warning('Unhandled %s output: %s' % (
ctx.cmd_key, line))
LOG.warning(traceback.format_exc())

def _poll_task_jobs_callback(self, ctx, suite, itasks):
"""Callback when poll tasks command exits."""
Expand Down Expand Up @@ -624,7 +645,7 @@ def _run_job_cmd(self, cmd_key, suite, itasks, callback):
auth_itasks[(itask.task_host, itask.task_owner)].append(itask)
for (host, owner), itasks in sorted(auth_itasks.items()):
cmd = ["cylc", cmd_key]
if LOG.isEnabledFor(DEBUG):
if cylc.flags.debug:
cmd.append("--debug")
if is_remote_host(host):
cmd.append("--host=%s" % (host))
Expand Down

0 comments on commit a6ee1d6

Please sign in to comment.