Skip to content

Commit

Permalink
Move (remote) process execution options onto ExecutionOptions, and ad…
Browse files Browse the repository at this point in the history
…d new options for pantsbuild#5904.
  • Loading branch information
stuhood committed Jun 8, 2018
1 parent 7f27a74 commit 9133ef1
Show file tree
Hide file tree
Showing 11 changed files with 114 additions and 45 deletions.
2 changes: 1 addition & 1 deletion src/python/pants/backend/python/pex_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@ def get_local_platform():
# TODO(John Sirois): Kill some or all usages when https://github.com/pantsbuild/pex/issues/511
# is fixed.
current_platform = Platform.current()
return current_platform.platform
return current_platform.platform
2 changes: 1 addition & 1 deletion src/python/pants/bin/goal_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ def __init__(self, context, goals, run_tracker, kill_nailguns, exiter=sys.exit):

@classmethod
def subsystems(cls):
"""Subsystems used outside of any task."""
"""Subsystems used outside of any task which are not already included on GlobalOptionsRegistrar."""
return {
SourceRootConfig,
Reporting,
Expand Down
18 changes: 13 additions & 5 deletions src/python/pants/engine/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,11 @@
BufferBuffer,
TypeIdBuffer,
Buffer,
Buffer);
Buffer,
uint64_t,
uint64_t,
uint64_t,
uint64_t);
void scheduler_pre_fork(Scheduler*);
Value scheduler_metrics(Scheduler*, Session*);
RawNodes* scheduler_execute(Scheduler*, Session*, ExecutionRequest*);
Expand Down Expand Up @@ -720,8 +724,7 @@ def new_scheduler(self,
build_root,
work_dir,
ignore_patterns,
remote_store_server,
remote_execution_server,
execution_options,
construct_directory_digest,
construct_snapshot,
construct_file_content,
Expand Down Expand Up @@ -786,8 +789,13 @@ def tc(constraint):
self.context.utf8_buf_buf(ignore_patterns),
self.to_ids_buf(root_subject_types),
# Remote execution config.
self.context.utf8_buf(remote_store_server),
self.context.utf8_buf(remote_execution_server),
# We can't currently pass Options to the rust side, so we pass empty strings for None.
self.context.utf8_buf(execution_options.remote_store_server or ""),
self.context.utf8_buf(execution_options.remote_execution_server or ""),
execution_options.remote_store_thread_count,
execution_options.remote_store_chunk_size,
execution_options.remote_store_chunk_upload_timeout,
execution_options.process_execution_parallelism,
)
return self.gc(scheduler, self.lib.scheduler_destroy)

Expand Down
12 changes: 4 additions & 8 deletions src/python/pants/engine/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ def __init__(
project_tree,
work_dir,
rules,
remote_store_server,
remote_execution_server,
execution_options,
include_trace_on_error=True,
validate=True,
):
Expand All @@ -88,16 +87,14 @@ def __init__(
:param project_tree: An instance of ProjectTree for the current build root.
:param work_dir: The pants work dir.
:param rules: A set of Rules which is used to compute values in the graph.
:param execution_options: Execution options for (remote) processes.
:param include_trace_on_error: Include the trace through the graph upon encountering errors.
:type include_trace_on_error: bool
:param validate: True to assert that the ruleset is valid.
"""

if remote_execution_server and not remote_store_server:
if execution_options.remote_execution_server and not execution_options.remote_store_server:
raise ValueError("Cannot set remote execution server without setting remote store server")
# We can't currently pass Options to the rust side, so we pass empty strings for None:
remote_execution_server = remote_execution_server or ""
remote_store_server = remote_store_server or ""

self._native = native
self.include_trace_on_error = include_trace_on_error
Expand All @@ -121,8 +118,7 @@ def __init__(
project_tree.build_root,
work_dir,
project_tree.ignore_patterns,
remote_store_server,
remote_execution_server,
execution_options,
DirectoryDigest,
Snapshot,
FileContent,
Expand Down
16 changes: 9 additions & 7 deletions src/python/pants/init/engine_initializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
from pants.engine.parser import SymbolTable
from pants.engine.rules import SingletonRule
from pants.engine.scheduler import Scheduler
from pants.option.global_options import GlobMatchErrorBehavior
from pants.option.global_options import (DEFAULT_EXECUTION_OPTIONS, ExecutionOptions,
GlobMatchErrorBehavior)
from pants.util.objects import datatype


Expand Down Expand Up @@ -131,8 +132,7 @@ def setup_legacy_graph(native, bootstrap_options, build_config):
exclude_target_regexps=bootstrap_options.exclude_target_regexp,
subproject_roots=bootstrap_options.subproject_roots,
include_trace_on_error=bootstrap_options.print_exception_stacktrace,
remote_store_server=bootstrap_options.remote_store_server,
remote_execution_server=bootstrap_options.remote_execution_server,
execution_options=ExecutionOptions.from_bootstrap_options(bootstrap_options),
)

@staticmethod
Expand All @@ -149,8 +149,7 @@ def setup_legacy_graph_extended(
exclude_target_regexps=None,
subproject_roots=None,
include_trace_on_error=True,
remote_store_server=None,
remote_execution_server=None
execution_options=None,
):
"""Construct and return the components necessary for LegacyBuildGraph construction.
Expand All @@ -174,6 +173,8 @@ def setup_legacy_graph_extended(
under the current build root.
:param bool include_trace_on_error: If True, when an error occurs, the error message will
include the graph trace.
:param execution_options: Option values for (remote) process execution.
:type execution_options: :class:`pants.option.global_options.ExecutionOptions`
:returns: A LegacyGraphScheduler.
"""

Expand All @@ -186,6 +187,8 @@ def setup_legacy_graph_extended(

project_tree = FileSystemProjectTree(build_root, pants_ignore_patterns)

execution_options = execution_options or DEFAULT_EXECUTION_OPTIONS

# Register "literal" subjects required for these rules.
parser = LegacyPythonCallbacksParser(
symbol_table,
Expand Down Expand Up @@ -217,8 +220,7 @@ def setup_legacy_graph_extended(
project_tree,
workdir,
rules,
remote_store_server,
remote_execution_server,
execution_options,
include_trace_on_error=include_trace_on_error,
)

Expand Down
58 changes: 54 additions & 4 deletions src/python/pants/option/global_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
unicode_literals, with_statement)

import logging
import multiprocessing
import os
import sys

Expand Down Expand Up @@ -59,6 +60,42 @@ def __new__(cls, *args, **kwargs):
return this_object


class ExecutionOptions(datatype([
'remote_store_server',
'remote_store_thread_count',
'remote_execution_server',
'remote_store_chunk_size',
'remote_store_chunk_upload_timeout',
'process_execution_parallelism',
])):
"""A collection of all options related to (remote) execution of processes.
TODO: These options should move to a Subsystem once we add support for "bootstrap" Subsystems (ie,
allowing Subsystems to be consumed before the Scheduler has been created).
"""

@classmethod
def from_bootstrap_options(cls, bootstrap_options):
cls(
remote_store_server=bootstrap_options.remote_store_server,
remote_execution_server=bootstrap_options.remote_execution_server,
remote_store_thread_count=bootstrap_options.remote_store_thread_count,
remote_store_chunk_size=bootstrap_options.remote_store_chunk_size,
remote_store_chunk_upload_timeout=bootstrap_options.remote_store_chunk_upload_timeout,
process_execution_parallelism=bootstrap_options.process_execution_parallelism,
)


DEFAULT_EXECUTION_OPTIONS = ExecutionOptions(
remote_store_server=None,
remote_store_thread_count=1,
remote_execution_server=None,
remote_store_chunk_size=1024*1024,
remote_store_chunk_upload_timeout=60,
process_execution_parallelism=multiprocessing.cpu_count(),
)


class GlobalOptionsRegistrar(SubsystemClientMixin, Optionable):
options_scope = GLOBAL_SCOPE
options_scope_category = ScopeInfo.GLOBAL
Expand Down Expand Up @@ -251,10 +288,23 @@ def register_bootstrap_options(cls, register):
register('--build-file-imports', choices=['allow', 'warn', 'error'], default='warn',
help='Whether to allow import statements in BUILD files')

register('--remote-store-server',
help='host:port of grpc server to use as remote execution file store')
register('--remote-execution-server',
help='host:port of grpc server to use as remote execution scheduler')
remote_store_server = '--remote-store-server'
remote_execution_server = '--remote-execution-server'
register(remote_store_server, advanced=True,
help='host:port of grpc server to use as remote execution file store.')
register('--remote-store-thread-count', type=int, default=1, advanced=True,
help='Thread count to use for the pool that interacts with the remote file store.')
register(remote_execution_server, advanced=True,
help='host:port of grpc server to use as remote execution scheduler.')
register('--remote-store-chunk-size', type=int, default=1024*1024, advanced=True,
help='Size in bytes of chunks transferred to/from the remote file store.')
register('--remote-store-chunk-upload-timeout', type=int, default=60, advanced=True,
help='Timeout (in seconds) for uploads of individual chunks to the remote file store.')

# This should eventually deprecate the RunTracker worker count, which is used for legacy cache
# lookups via CacheSetup in TaskBase.
register('--process-execution-parallelism', type=int, default=multiprocessing.cpu_count(),
help='Number of concurrent processes that may be executed either locally and remotely.')

@classmethod
def register_options(cls, register):
Expand Down
32 changes: 17 additions & 15 deletions src/rust/engine/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;

use tokio::runtime::Runtime;

Expand Down Expand Up @@ -49,6 +50,10 @@ impl Core {
work_dir: &Path,
remote_store_server: Option<String>,
remote_execution_server: Option<String>,
remote_store_thread_count: usize,
remote_store_chunk_size: usize,
remote_store_chunk_upload_timeout: Duration,
process_execution_parallelism: usize,
) -> Core {
let mut snapshots_dir = PathBuf::from(work_dir);
snapshots_dir.push("snapshots");
Expand All @@ -65,19 +70,16 @@ impl Core {

let store = safe_create_dir_all_ioerror(&store_path)
.map_err(|e| format!("Error making directory {:?}: {:?}", store_path, e))
.and_then(|()| {
match remote_store_server {
Some(address) => Store::with_remote(
store_path,
fs_pool.clone(),
address,
// TODO: Allow configuration of all of the below:
1,
1024 * 1024,
std::time::Duration::from_secs(60),
),
None => Store::local_only(store_path, fs_pool.clone()),
}
.and_then(|()| match remote_store_server {
Some(address) => Store::with_remote(
store_path,
fs_pool.clone(),
address,
remote_store_thread_count,
remote_store_chunk_size,
remote_store_chunk_upload_timeout,
),
None => Store::local_only(store_path, fs_pool.clone()),
})
.unwrap_or_else(|e| panic!("Could not initialize Store: {:?}", e));

Expand All @@ -94,8 +96,8 @@ impl Core {
)),
};

// TODO: Allow configuration of process concurrency.
let command_runner = BoundedCommandRunner::new(underlying_command_runner, 16);
let command_runner =
BoundedCommandRunner::new(underlying_command_runner, process_execution_parallelism);

let rule_graph = RuleGraph::new(&tasks, root_subject_types);

Expand Down
9 changes: 9 additions & 0 deletions src/rust/engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use std::mem;
use std::os::raw;
use std::panic;
use std::path::{Path, PathBuf};
use std::time::Duration;

use context::Core;
use core::{Failure, Function, Key, TypeConstraint, TypeId, Value};
Expand Down Expand Up @@ -218,6 +219,10 @@ pub extern "C" fn scheduler_create(
root_type_ids: TypeIdBuffer,
remote_store_server: Buffer,
remote_execution_server: Buffer,
remote_store_thread_count: u64,
remote_store_chunk_size: u64,
remote_store_chunk_upload_timeout: u64,
process_execution_parallelism: u64,
) -> *const Scheduler {
let root_type_ids = root_type_ids.to_vec();
let ignore_patterns = ignore_patterns_buf
Expand Down Expand Up @@ -275,6 +280,10 @@ pub extern "C" fn scheduler_create(
} else {
Some(remote_execution_server_string)
},
remote_store_thread_count as usize,
remote_store_chunk_size as usize,
Duration::from_secs(remote_store_chunk_upload_timeout),
process_execution_parallelism as usize,
))))
}

Expand Down
2 changes: 2 additions & 0 deletions tests/python/pants_test/engine/examples/planners.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from pants.engine.scheduler import Scheduler
from pants.engine.selectors import Get, Select, SelectVariant
from pants.engine.struct import HasProducts, Struct, StructWithDeps, Variants
from pants.option.global_options import DEFAULT_EXECUTION_OPTIONS
from pants.util.meta import AbstractClass
from pants.util.objects import SubclassesOf, datatype
from pants_test.engine.examples.parsers import JsonParser
Expand Down Expand Up @@ -473,6 +474,7 @@ def setup_json_scheduler(build_root, native):
project_tree,
work_dir,
rules,
DEFAULT_EXECUTION_OPTIONS,
None,
None)
return scheduler.new_session()
4 changes: 2 additions & 2 deletions tests/python/pants_test/engine/scheduler_test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from pants.base.file_system_project_tree import FileSystemProjectTree
from pants.engine.nodes import Return, Throw
from pants.engine.scheduler import Scheduler
from pants.option.global_options import DEFAULT_EXECUTION_OPTIONS
from pants.util.contextutil import temporary_file_path
from pants.util.dirutil import safe_mkdtemp, safe_rmtree
from pants_test.engine.util import init_native
Expand Down Expand Up @@ -56,8 +57,7 @@ def mk_scheduler(self,
project_tree,
work_dir,
rules,
remote_store_server=None,
remote_execution_server=None,
DEFAULT_EXECUTION_OPTIONS,
include_trace_on_error=include_trace_on_error)
return scheduler.new_session()

Expand Down
4 changes: 2 additions & 2 deletions tests/python/pants_test/engine/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from pants.engine.scheduler import Scheduler
from pants.engine.selectors import Get
from pants.engine.struct import HasProducts, Struct
from pants.option.global_options import DEFAULT_EXECUTION_OPTIONS
from pants.util.objects import SubclassesOf
from pants_test.option.util.fakes import create_options_for_optionables
from pants_test.subsystem.subsystem_util import init_subsystem
Expand Down Expand Up @@ -97,8 +98,7 @@ def create_scheduler(rules, validate=True):
FileSystemProjectTree(os.getcwd()),
'./.pants.d',
rules,
remote_store_server=None,
remote_execution_server=None,
execution_options=DEFAULT_EXECUTION_OPTIONS,
validate=validate,
)

Expand Down

0 comments on commit 9133ef1

Please sign in to comment.