Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pick notify watcher series #9741

Merged
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build-support/githooks/pre-commit
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,15 @@ if git rev-parse --verify "${MERGE_BASE}" &>/dev/null; then
./build-support/bin/mypy.py || exit 1

if git diff "${MERGE_BASE}" --name-only | grep '\.rs$' > /dev/null; then
echo "* Checking formatting of Rust files"
./build-support/bin/check_rust_formatting.sh || exit 1
hrfuller marked this conversation as resolved.
Show resolved Hide resolved
# Clippy happens on a different Travis CI shard because of separate caching concerns.
# The TRAVIS env var is documented here:
# https://docs.travis-ci.com/user/environment-variables/#default-environment-variables
if [[ "${TRAVIS}" != "true" ]]; then
echo "* Running cargo clippy"
./build-support/bin/check_clippy.sh || exit 1
fi
echo "* Checking formatting of Rust files"
./build-support/bin/check_rust_formatting.sh || exit 1
echo "* Checking Rust target headers"
build-support/bin/check_rust_target_headers.sh || exit 1
fi
Expand Down
11 changes: 9 additions & 2 deletions pants.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,22 @@ backend_packages2 = [
"internal_backend.rules_for_testing",
]

# The pants script in this repo consumes these files to run pants
pantsd_invalidation_globs.add = ["src/python/**/*.py"]
# The invalidation globs cover the PYTHONPATH by default, but we additionally add the rust code.
pantsd_invalidation_globs.add = [
"!*_test.py",
# NB: The `target` directory is ignored via `pants_ignore` below.
"src/rust/engine/**/*.rs",
"src/rust/engine/**/*.toml",
]
# Path patterns to ignore for filesystem operations on top of the builtin patterns.
pants_ignore.add = [
# venv directories under build-support.
"/build-support/virtualenvs/",
"/build-support/*.venv/",
# An absolute symlink to the Pants Rust toolchain sources.
"/build-support/bin/native/src",
# We shouldn't walk or watch the rust compiler artifacts because it is slow.
"/src/rust/engine/target",
]

build_file_imports = "error"
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.40.0
1.42.0
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,10 @@ def _hydrate_pex_file(self, hydrated_pex_file):

# Perform a fully pinned intransitive resolve to hydrate the install cache.
resolver_settings = ipex_info["resolver_settings"]
fetchers = (
[Fetcher([url]) for url in resolver_settings.pop('find_links')] +
[PyPIFetcher(url) for url in resolver_settings.pop('indexes')]
)
resolver_settings['fetchers'] = fetchers
fetchers = [Fetcher([url]) for url in resolver_settings.pop("find_links")] + [
PyPIFetcher(url) for url in resolver_settings.pop("indexes")
]
resolver_settings["fetchers"] = fetchers

resolved_distributions = resolver.resolve(
requirements=bootstrap_info.requirements,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@
from pants.backend.native.subsystems.native_toolchain import NativeToolchain
from pants.backend.native.targets.native_library import NativeLibrary
from pants.backend.python.subsystems.executable_pex_tool import ExecutablePexTool
from pants.python.python_requirement import PythonRequirement
from pants.backend.python.targets.python_distribution import PythonDistribution
from pants.base.exceptions import IncompatiblePlatformsError
from pants.engine.rules import rule, subsystem_rule
from pants.python import pex_build_util
from pants.python.python_requirement import PythonRequirement
from pants.python.python_setup import PythonSetup
from pants.subsystem.subsystem import Subsystem
from pants.util.memo import memoized_property
Expand Down
4 changes: 3 additions & 1 deletion src/python/pants/bin/local_pants_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ def _maybe_init_graph_session(
)

v2_ui = options.for_global_scope().get("v2_ui", False)
use_colors = options.for_global_scope().get("colors", True)
zipkin_trace_v2 = options.for_scope("reporting").zipkin_trace_v2
# TODO(#8658) This should_report_workunits flag must be set to True for
# StreamingWorkunitHandler to receive WorkUnits. It should eventually
Expand All @@ -137,7 +138,8 @@ def _maybe_init_graph_session(
graph_session = graph_scheduler_helper.new_session(
zipkin_trace_v2,
RunTracker.global_instance().run_id,
v2_ui,
v2_ui=v2_ui,
use_colors=use_colors,
should_report_workunits=stream_workunits,
)
return graph_session, graph_session.scheduler_session
Expand Down
1 change: 1 addition & 0 deletions src/python/pants/engine/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -999,6 +999,7 @@ def ti(type_obj):
execution_options.process_execution_use_local_cache,
self.context.utf8_dict(execution_options.remote_execution_headers),
execution_options.process_execution_local_enable_nailgun,
execution_options.experimental_fs_watcher,
)
if scheduler_result.is_throw:
value = self.context.from_value(scheduler_result.throw_handle)
Expand Down
117 changes: 88 additions & 29 deletions src/python/pants/engine/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import traceback
from dataclasses import dataclass
from textwrap import dedent
from typing import TYPE_CHECKING, Any, Dict, Tuple
from typing import TYPE_CHECKING, Any, Dict, Optional, Sequence, Tuple, Type, Union, cast

from pants.base.exception_sink import ExceptionSink
from pants.base.exiter import PANTS_FAILED_EXIT_CODE
Expand Down Expand Up @@ -59,6 +59,10 @@ def end_user_messages(self):
return [str(exc) for exc in self.wrapped_exceptions]


class ExecutionTimeoutError(ExecutionError):
"""An ExecutionRequest specified a timeout which elapsed before the request completed."""


class Scheduler:
def __init__(
self,
Expand Down Expand Up @@ -242,10 +246,14 @@ def invalidate_files(self, direct_filenames):
def invalidate_all_files(self):
return self._native.lib.graph_invalidate_all_paths(self._scheduler)

def check_invalidation_watcher_liveness(self):
res = self._native.lib.check_invalidation_watcher_liveness(self._scheduler)
self._raise_or_return(res)

def graph_len(self):
return self._native.lib.graph_len(self._scheduler)

def add_root_selection(self, execution_request, subject_or_params, product):
def execution_add_root_select(self, execution_request, subject_or_params, product):
if isinstance(subject_or_params, Params):
params = subject_or_params.params
else:
Expand All @@ -255,6 +263,17 @@ def add_root_selection(self, execution_request, subject_or_params, product):
)
self._raise_or_return(res)

def execution_set_timeout(self, execution_request, timeout: float):
timeout_in_ms = int(timeout * 1000)
self._native.lib.execution_set_timeout(execution_request, timeout_in_ms)

def execution_set_poll(self, execution_request, poll: bool):
self._native.lib.execution_set_poll(execution_request, poll)

def execution_set_poll_delay(self, execution_request, poll_delay: float):
poll_delay_in_ms = int(poll_delay * 1000)
self._native.lib.execution_set_poll_delay(execution_request, poll_delay_in_ms)

@property
def visualize_to_dir(self):
return self._visualize_to_dir
Expand All @@ -275,8 +294,14 @@ def with_fork_context(self, func):

def _run_and_return_roots(self, session, execution_request):
raw_roots = self._native.lib.scheduler_execute(self._scheduler, session, execution_request)
if raw_roots == self._native.ffi.NULL:
if raw_roots.err == self._native.lib.NoError:
pass
elif raw_roots.err == self._native.lib.KeyboardInterrupt:
raise KeyboardInterrupt
elif raw_roots.err == self._native.lib.Timeout:
raise ExecutionTimeoutError("Timed out")
else:
raise Exception(f"Unrecognized error type from native execution: {raw_roots.err}")

remaining_runtime_exceptions_to_capture = list(
self._native.consume_cffi_extern_method_runtime_exceptions()
Expand Down Expand Up @@ -354,8 +379,6 @@ class SchedulerSession:
Session.
"""

execution_error_type = ExecutionError

def __init__(self, scheduler, session):
self._scheduler = scheduler
self._session = session
Expand All @@ -372,6 +395,15 @@ def poll_workunits(self) -> Tuple[Dict[str, Any], ...]:
def graph_len(self):
return self._scheduler.graph_len()

def new_run_id(self):
"""Assigns a new "run id" to this Session, without creating a new Session.

Usually each Session corresponds to one end user "run", but there are exceptions: notably,
the `--loop` feature uses one Session, but would like to observe new values for uncacheable
nodes in each iteration of its loop.
"""
self._scheduler._native.lib.session_new_run_id(self._session)

def trace(self, execution_request):
"""Yields a stringified 'stacktrace' starting from the scheduler's roots."""
for line in self._scheduler.graph_trace(self._session, execution_request.native):
Expand All @@ -387,30 +419,41 @@ def visualize_graph_to_file(self, filename):
def visualize_rule_graph_to_file(self, filename):
self._scheduler.visualize_rule_graph_to_file(filename)

def execution_request_literal(self, request_specs):
native_execution_request = self._scheduler._native.new_execution_request()
for subject, product in request_specs:
self._scheduler.add_root_selection(native_execution_request, subject, product)
return ExecutionRequest(request_specs, native_execution_request)

def execution_request(self, products, subjects):
def execution_request(
self,
products: Sequence[Type],
subjects: Sequence[Union[Any, Params]],
poll: bool = False,
poll_delay: Optional[float] = None,
timeout: Optional[float] = None,
) -> ExecutionRequest:
"""Create and return an ExecutionRequest for the given products and subjects.

The resulting ExecutionRequest object will contain keys tied to this scheduler's product Graph,
and so it will not be directly usable with other scheduler instances without being re-created.

NB: This method does a "cross product", mapping all subjects to all products. To create a
request for just the given list of subject -> product tuples, use `execution_request_literal()`!
NB: This method does a "cross product", mapping all subjects to all products.

:param products: A list of product types to request for the roots.
:type products: list of types
:param subjects: A list of AddressSpec and/or PathGlobs objects.
:type subject: list of :class:`pants.base.specs.AddressSpec`, `pants.build_graph.Address`, and/or
:class:`pants.engine.fs.PathGlobs` objects.
:param subjects: A list of singleton input parameters or Params instances.
:param poll: True to wait for _all_ of the given roots to
have changed since their last observed values in this SchedulerSession.
:param poll_delay: A delay (in seconds) to wait after observing a change, and before
beginning to compute a new value.
:param timeout: An optional timeout to wait for the request to complete (in seconds). If the
request has not completed before the timeout has elapsed, ExecutionTimeoutError is raised.
:returns: An ExecutionRequest for the given products and subjects.
"""
roots = tuple((s, p) for s in subjects for p in products)
return self.execution_request_literal(roots)
request_specs = tuple((s, p) for s in subjects for p in products)
native_execution_request = self._scheduler._native.new_execution_request()
for subject, product in request_specs:
self._scheduler.execution_add_root_select(native_execution_request, subject, product)
if timeout:
self._scheduler.execution_set_timeout(native_execution_request, timeout)
if poll_delay:
self._scheduler.execution_set_poll_delay(native_execution_request, poll_delay)
self._scheduler.execution_set_poll(native_execution_request, poll)
return ExecutionRequest(request_specs, native_execution_request)

def invalidate_files(self, direct_filenames):
"""Invalidates the given filenames in an internal product Graph instance."""
Expand Down Expand Up @@ -444,7 +487,7 @@ def _maybe_visualize(self):
self._run_count += 1
self.visualize_graph_to_file(os.path.join(self._scheduler.visualize_to_dir, name))

def execute(self, execution_request):
def execute(self, execution_request: ExecutionRequest):
"""Invoke the engine for the given ExecutionRequest, returning Return and Throw states.

:return: A tuple of (root, Return) tuples and (root, Throw) tuples.
Expand All @@ -470,7 +513,7 @@ def execute(self, execution_request):

returns = tuple((root, state) for root, state in roots if type(state) is Return)
throws = tuple((root, state) for root, state in roots if type(state) is Throw)
return returns, throws
return cast(Tuple[Tuple[Return, ...], Tuple[Throw, ...]], (returns, throws))

def _trace_on_error(self, unique_exceptions, request):
exception_noun = pluralize(len(unique_exceptions), "Exception")
Expand All @@ -490,13 +533,21 @@ def _trace_on_error(self, unique_exceptions, request):
unique_exceptions,
)

def run_goal_rule(self, product, subject):
def run_goal_rule(
self,
product: Type,
subject: Union[Any, Params],
poll: bool = False,
poll_delay: Optional[float] = None,
) -> int:
"""
:param product: A Goal subtype.
:param subject: subject for the request.
:param poll: See self.execution_request.
:param poll_delay: See self.execution_request.
:returns: An exit_code for the given Goal.
"""
request = self.execution_request([product], [subject])
request = self.execution_request([product], [subject], poll=poll, poll_delay=poll_delay)
returns, throws = self.execute(request)

if throws:
Expand All @@ -507,17 +558,25 @@ def run_goal_rule(self, product, subject):
_, state = returns[0]
return state.value.exit_code

def product_request(self, product, subjects):
def product_request(
self,
product: Type,
subjects: Sequence[Union[Any, Params]],
poll: bool = False,
timeout: Optional[float] = None,
):
"""Executes a request for a single product for some subjects, and returns the products.

:param class product: A product type for the request.
:param list subjects: A list of subjects or Params instances for the request.
:param product: A product type for the request.
:param subjects: A list of subjects or Params instances for the request.
:param poll: See self.execution_request.
:param timeout: See self.execution_request.
:returns: A list of the requested products, with length match len(subjects).
"""
request = None
raised_exception = None
try:
request = self.execution_request([product], subjects)
request = self.execution_request([product], subjects, poll=poll, timeout=timeout)
except: # noqa: T803
# If there are any exceptions during CFFI extern method calls, we want to return an error with
# them and whatever failure results from it. This typically results from unhashable types.
Expand Down Expand Up @@ -576,7 +635,7 @@ def product_request(self, product, subjects):
)
)

returns, throws = self.execute(request)
returns, throws = self.execute(cast(ExecutionRequest, request))

# Throw handling.
if throws:
Expand Down
Loading