diff --git a/src/python/pants/engine/native.py b/src/python/pants/engine/native.py index c72b9efe4d4..cd4d344f418 100644 --- a/src/python/pants/engine/native.py +++ b/src/python/pants/engine/native.py @@ -204,7 +204,11 @@ BufferBuffer, TypeIdBuffer, Buffer, - Buffer); + Buffer, + uint64_t, + uint64_t, + uint64_t, + uint64_t); void scheduler_pre_fork(Scheduler*); Value scheduler_metrics(Scheduler*, Session*); RawNodes* scheduler_execute(Scheduler*, Session*, ExecutionRequest*); @@ -720,8 +724,7 @@ def new_scheduler(self, build_root, work_dir, ignore_patterns, - remote_store_server, - remote_execution_server, + execution_options, construct_directory_digest, construct_snapshot, construct_file_content, @@ -786,8 +789,13 @@ def tc(constraint): self.context.utf8_buf_buf(ignore_patterns), self.to_ids_buf(root_subject_types), # Remote execution config. - self.context.utf8_buf(remote_store_server), - self.context.utf8_buf(remote_execution_server), + # We can't currently pass Options to the rust side, so we pass empty strings for None. + self.context.utf8_buf(execution_options.remote_store_server or ""), + self.context.utf8_buf(execution_options.remote_execution_server or ""), + execution_options.remote_store_thread_count, + execution_options.remote_store_chunk_bytes, + execution_options.remote_store_chunk_upload_timeout_seconds, + execution_options.process_execution_parallelism, ) return self.gc(scheduler, self.lib.scheduler_destroy) diff --git a/src/python/pants/engine/scheduler.py b/src/python/pants/engine/scheduler.py index 34ae1a10fa7..2f2fb7c6ac2 100644 --- a/src/python/pants/engine/scheduler.py +++ b/src/python/pants/engine/scheduler.py @@ -78,8 +78,7 @@ def __init__( project_tree, work_dir, rules, - remote_store_server, - remote_execution_server, + execution_options, include_trace_on_error=True, validate=True, ): @@ -88,16 +87,14 @@ def __init__( :param project_tree: An instance of ProjectTree for the current build root. :param work_dir: The pants work dir. :param rules: A set of Rules which is used to compute values in the graph. + :param execution_options: Execution options for (remote) processes. :param include_trace_on_error: Include the trace through the graph upon encountering errors. :type include_trace_on_error: bool :param validate: True to assert that the ruleset is valid. """ - if remote_execution_server and not remote_store_server: + if execution_options.remote_execution_server and not execution_options.remote_store_server: raise ValueError("Cannot set remote execution server without setting remote store server") - # We can't currently pass Options to the rust side, so we pass empty strings for None: - remote_execution_server = remote_execution_server or "" - remote_store_server = remote_store_server or "" self._native = native self.include_trace_on_error = include_trace_on_error @@ -121,8 +118,7 @@ def __init__( project_tree.build_root, work_dir, project_tree.ignore_patterns, - remote_store_server, - remote_execution_server, + execution_options, DirectoryDigest, Snapshot, FileContent, diff --git a/src/python/pants/init/engine_initializer.py b/src/python/pants/init/engine_initializer.py index 1ebbd82b643..40d3d216eee 100644 --- a/src/python/pants/init/engine_initializer.py +++ b/src/python/pants/init/engine_initializer.py @@ -28,7 +28,8 @@ from pants.engine.rules import SingletonRule from pants.engine.scheduler import Scheduler from pants.init.options_initializer import BuildConfigInitializer -from pants.option.global_options import GlobMatchErrorBehavior +from pants.option.global_options import (DEFAULT_EXECUTION_OPTIONS, ExecutionOptions, + GlobMatchErrorBehavior) from pants.option.options_bootstrapper import OptionsBootstrapper from pants.util.objects import datatype @@ -134,8 +135,7 @@ def setup_legacy_graph(native, bootstrap_options, build_configuration): exclude_target_regexps=bootstrap_options.exclude_target_regexp, subproject_roots=bootstrap_options.subproject_roots, include_trace_on_error=bootstrap_options.print_exception_stacktrace, - remote_store_server=bootstrap_options.remote_store_server, - remote_execution_server=bootstrap_options.remote_execution_server, + execution_options=ExecutionOptions.from_bootstrap_options(bootstrap_options), ) @staticmethod @@ -152,8 +152,7 @@ def setup_legacy_graph_extended( exclude_target_regexps=None, subproject_roots=None, include_trace_on_error=True, - remote_store_server=None, - remote_execution_server=None + execution_options=None, ): """Construct and return the components necessary for LegacyBuildGraph construction. @@ -177,6 +176,8 @@ def setup_legacy_graph_extended( under the current build root. :param bool include_trace_on_error: If True, when an error occurs, the error message will include the graph trace. + :param execution_options: Option values for (remote) process execution. + :type execution_options: :class:`pants.option.global_options.ExecutionOptions` :returns: A LegacyGraphScheduler. """ @@ -189,6 +190,8 @@ def setup_legacy_graph_extended( project_tree = FileSystemProjectTree(build_root, pants_ignore_patterns) + execution_options = execution_options or DEFAULT_EXECUTION_OPTIONS + # Register "literal" subjects required for these rules. parser = LegacyPythonCallbacksParser( symbol_table, @@ -223,8 +226,7 @@ def setup_legacy_graph_extended( project_tree, workdir, rules, - remote_store_server, - remote_execution_server, + execution_options, include_trace_on_error=include_trace_on_error, ) diff --git a/src/python/pants/option/global_options.py b/src/python/pants/option/global_options.py index a5d9588130a..15339c8ceb1 100644 --- a/src/python/pants/option/global_options.py +++ b/src/python/pants/option/global_options.py @@ -6,6 +6,7 @@ unicode_literals, with_statement) import logging +import multiprocessing import os import sys @@ -59,6 +60,42 @@ def __new__(cls, *args, **kwargs): return this_object +class ExecutionOptions(datatype([ + 'remote_store_server', + 'remote_store_thread_count', + 'remote_execution_server', + 'remote_store_chunk_bytes', + 'remote_store_chunk_upload_timeout_seconds', + 'process_execution_parallelism', +])): + """A collection of all options related to (remote) execution of processes. + + TODO: These options should move to a Subsystem once we add support for "bootstrap" Subsystems (ie, + allowing Subsystems to be consumed before the Scheduler has been created). + """ + + @classmethod + def from_bootstrap_options(cls, bootstrap_options): + cls( + remote_store_server=bootstrap_options.remote_store_server, + remote_execution_server=bootstrap_options.remote_execution_server, + remote_store_thread_count=bootstrap_options.remote_store_thread_count, + remote_store_chunk_bytes=bootstrap_options.remote_store_chunk_bytes, + remote_store_chunk_upload_timeout_seconds=bootstrap_options.remote_store_chunk_upload_timeout_seconds, + process_execution_parallelism=bootstrap_options.process_execution_parallelism, + ) + + +DEFAULT_EXECUTION_OPTIONS = ExecutionOptions( + remote_store_server=None, + remote_store_thread_count=1, + remote_execution_server=None, + remote_store_chunk_bytes=1024*1024, + remote_store_chunk_upload_timeout_seconds=60, + process_execution_parallelism=multiprocessing.cpu_count()*2, + ) + + class GlobalOptionsRegistrar(SubsystemClientMixin, Optionable): options_scope = GLOBAL_SCOPE options_scope_category = ScopeInfo.GLOBAL @@ -256,10 +293,24 @@ def register_bootstrap_options(cls, register): register('--build-file-imports', choices=['allow', 'warn', 'error'], default='warn', help='Whether to allow import statements in BUILD files') - register('--remote-store-server', - help='host:port of grpc server to use as remote execution file store') - register('--remote-execution-server', - help='host:port of grpc server to use as remote execution scheduler') + register('--remote-store-server', advanced=True, + help='host:port of grpc server to use as remote execution file store.') + register('--remote-store-thread-count', type=int, advanced=True, + default=DEFAULT_EXECUTION_OPTIONS.remote_store_thread_count, + help='Thread count to use for the pool that interacts with the remote file store.') + register('--remote-execution-server', advanced=True, + help='host:port of grpc server to use as remote execution scheduler.') + register('--remote-store-chunk-bytes', type=int, advanced=True, + default=DEFAULT_EXECUTION_OPTIONS.remote_store_chunk_bytes, + help='Size in bytes of chunks transferred to/from the remote file store.') + register('--remote-store-chunk-upload-timeout-seconds', type=int, advanced=True, + default=DEFAULT_EXECUTION_OPTIONS.remote_store_chunk_upload_timeout_seconds, + help='Timeout (in seconds) for uploads of individual chunks to the remote file store.') + + # This should eventually deprecate the RunTracker worker count, which is used for legacy cache + # lookups via CacheSetup in TaskBase. + register('--process-execution-parallelism', type=int, default=multiprocessing.cpu_count(), + help='Number of concurrent processes that may be executed either locally and remotely.') @classmethod def register_options(cls, register): diff --git a/src/rust/engine/src/context.rs b/src/rust/engine/src/context.rs index a0863d9d30f..1f6d33c4036 100644 --- a/src/rust/engine/src/context.rs +++ b/src/rust/engine/src/context.rs @@ -4,6 +4,7 @@ use std; use std::path::{Path, PathBuf}; use std::sync::Arc; +use std::time::Duration; use tokio::runtime::Runtime; @@ -49,6 +50,10 @@ impl Core { work_dir: &Path, remote_store_server: Option, remote_execution_server: Option, + remote_store_thread_count: usize, + remote_store_chunk_bytes: usize, + remote_store_chunk_upload_timeout: Duration, + process_execution_parallelism: usize, ) -> Core { let mut snapshots_dir = PathBuf::from(work_dir); snapshots_dir.push("snapshots"); @@ -65,19 +70,16 @@ impl Core { let store = safe_create_dir_all_ioerror(&store_path) .map_err(|e| format!("Error making directory {:?}: {:?}", store_path, e)) - .and_then(|()| { - match remote_store_server { - Some(address) => Store::with_remote( - store_path, - fs_pool.clone(), - address, - // TODO: Allow configuration of all of the below: - 1, - 1024 * 1024, - std::time::Duration::from_secs(60), - ), - None => Store::local_only(store_path, fs_pool.clone()), - } + .and_then(|()| match remote_store_server { + Some(address) => Store::with_remote( + store_path, + fs_pool.clone(), + address, + remote_store_thread_count, + remote_store_chunk_bytes, + remote_store_chunk_upload_timeout, + ), + None => Store::local_only(store_path, fs_pool.clone()), }) .unwrap_or_else(|e| panic!("Could not initialize Store: {:?}", e)); @@ -85,7 +87,8 @@ impl Core { match remote_execution_server { Some(address) => Box::new(process_execution::remote::CommandRunner::new( address, - 1, + // Allow for some overhead for bookkeeping threads (if any). + process_execution_parallelism + 2, store.clone(), )), None => Box::new(process_execution::local::CommandRunner::new( @@ -94,8 +97,8 @@ impl Core { )), }; - // TODO: Allow configuration of process concurrency. - let command_runner = BoundedCommandRunner::new(underlying_command_runner, 16); + let command_runner = + BoundedCommandRunner::new(underlying_command_runner, process_execution_parallelism); let rule_graph = RuleGraph::new(&tasks, root_subject_types); diff --git a/src/rust/engine/src/lib.rs b/src/rust/engine/src/lib.rs index dcb3d538cf5..e1f78374ba3 100644 --- a/src/rust/engine/src/lib.rs +++ b/src/rust/engine/src/lib.rs @@ -39,6 +39,7 @@ use std::mem; use std::os::raw; use std::panic; use std::path::{Path, PathBuf}; +use std::time::Duration; use context::Core; use core::{Failure, Function, Key, TypeConstraint, TypeId, Value}; @@ -218,6 +219,10 @@ pub extern "C" fn scheduler_create( root_type_ids: TypeIdBuffer, remote_store_server: Buffer, remote_execution_server: Buffer, + remote_store_thread_count: u64, + remote_store_chunk_bytes: u64, + remote_store_chunk_upload_timeout_seconds: u64, + process_execution_parallelism: u64, ) -> *const Scheduler { let root_type_ids = root_type_ids.to_vec(); let ignore_patterns = ignore_patterns_buf @@ -275,6 +280,10 @@ pub extern "C" fn scheduler_create( } else { Some(remote_execution_server_string) }, + remote_store_thread_count as usize, + remote_store_chunk_bytes as usize, + Duration::from_secs(remote_store_chunk_upload_timeout_seconds), + process_execution_parallelism as usize, )))) } diff --git a/tests/python/pants_test/engine/examples/planners.py b/tests/python/pants_test/engine/examples/planners.py index d0b812fed8b..84b9bac746f 100644 --- a/tests/python/pants_test/engine/examples/planners.py +++ b/tests/python/pants_test/engine/examples/planners.py @@ -24,6 +24,7 @@ from pants.engine.scheduler import Scheduler from pants.engine.selectors import Get, Select, SelectVariant from pants.engine.struct import HasProducts, Struct, StructWithDeps, Variants +from pants.option.global_options import DEFAULT_EXECUTION_OPTIONS from pants.util.meta import AbstractClass from pants.util.objects import SubclassesOf, datatype from pants_test.engine.examples.parsers import JsonParser @@ -473,6 +474,7 @@ def setup_json_scheduler(build_root, native): project_tree, work_dir, rules, + DEFAULT_EXECUTION_OPTIONS, None, None) return scheduler.new_session() diff --git a/tests/python/pants_test/engine/scheduler_test_base.py b/tests/python/pants_test/engine/scheduler_test_base.py index b82f68e6068..899054d9a86 100644 --- a/tests/python/pants_test/engine/scheduler_test_base.py +++ b/tests/python/pants_test/engine/scheduler_test_base.py @@ -11,6 +11,7 @@ from pants.base.file_system_project_tree import FileSystemProjectTree from pants.engine.nodes import Return, Throw from pants.engine.scheduler import Scheduler +from pants.option.global_options import DEFAULT_EXECUTION_OPTIONS from pants.util.contextutil import temporary_file_path from pants.util.dirutil import safe_mkdtemp, safe_rmtree from pants_test.engine.util import init_native @@ -56,8 +57,7 @@ def mk_scheduler(self, project_tree, work_dir, rules, - remote_store_server=None, - remote_execution_server=None, + DEFAULT_EXECUTION_OPTIONS, include_trace_on_error=include_trace_on_error) return scheduler.new_session() diff --git a/tests/python/pants_test/engine/util.py b/tests/python/pants_test/engine/util.py index 9ad417380d5..459e7a9bb95 100644 --- a/tests/python/pants_test/engine/util.py +++ b/tests/python/pants_test/engine/util.py @@ -17,6 +17,7 @@ from pants.engine.scheduler import Scheduler from pants.engine.selectors import Get from pants.engine.struct import HasProducts, Struct +from pants.option.global_options import DEFAULT_EXECUTION_OPTIONS from pants.util.objects import SubclassesOf from pants_test.option.util.fakes import create_options_for_optionables from pants_test.subsystem.subsystem_util import init_subsystem @@ -97,8 +98,7 @@ def create_scheduler(rules, validate=True): FileSystemProjectTree(os.getcwd()), './.pants.d', rules, - remote_store_server=None, - remote_execution_server=None, + execution_options=DEFAULT_EXECUTION_OPTIONS, validate=validate, )