From af986fa3d50e080cc0630e16f64f469c69f5be31 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Mon, 8 Jul 2019 12:50:03 -0700 Subject: [PATCH 1/2] Revert "Merge pull request #19581 from lidizheng/rf-signal" This reverts commit 6d62eb1b703617ff9165773b6d1e7d28ab84856d, reversing changes made to 70dd0b9b3b70e0923149f6c187834eb81b274401. --- src/python/grpcio_tests/tests/unit/_signal_handling_test.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/python/grpcio_tests/tests/unit/_signal_handling_test.py b/src/python/grpcio_tests/tests/unit/_signal_handling_test.py index 467b8df27f9d8..9d0d64709af01 100644 --- a/src/python/grpcio_tests/tests/unit/_signal_handling_test.py +++ b/src/python/grpcio_tests/tests/unit/_signal_handling_test.py @@ -13,6 +13,8 @@ # limitations under the License. """Test of responsiveness to signals.""" +from __future__ import print_function + import logging import os import signal From 2014a519fc082fa65614896ba219a31ae4b7c45f Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Mon, 8 Jul 2019 12:50:13 -0700 Subject: [PATCH 2/2] Revert "Merge pull request #19481 from gnossen/main_thread_starvation" This reverts commit 8f044f741f5a39411e04423de0dedd845459f770, reversing changes made to 5ae9afdc55b81a9046ef896cea0327faba56cf36. --- src/python/grpcio/grpc/_channel.py | 151 +++++++---------- src/python/grpcio/grpc/_common.py | 50 ------ src/python/grpcio_tests/commands.py | 2 - src/python/grpcio_tests/tests/tests.json | 1 - .../grpcio_tests/tests/unit/BUILD.bazel | 8 - .../grpcio_tests/tests/unit/_signal_client.py | 84 ---------- .../tests/unit/_signal_handling_test.py | 158 ------------------ 7 files changed, 59 insertions(+), 395 deletions(-) delete mode 100644 src/python/grpcio_tests/tests/unit/_signal_client.py delete mode 100644 src/python/grpcio_tests/tests/unit/_signal_handling_test.py diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index 0bf8e03b5ce2f..24f928ef69f19 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -13,7 +13,6 @@ # limitations under the License. """Invocation-side implementation of gRPC Python.""" -import functools import logging import sys import threading @@ -82,6 +81,17 @@ def _unknown_code_details(unknown_cygrpc_code, details): unknown_cygrpc_code, details) +def _wait_once_until(condition, until): + if until is None: + condition.wait() + else: + remaining = until - time.time() + if remaining < 0: + raise grpc.FutureTimeoutError() + else: + condition.wait(timeout=remaining) + + class _RPCState(object): def __init__(self, due, initial_metadata, trailing_metadata, code, details): @@ -168,11 +178,12 @@ def handle_event(event): #pylint: disable=too-many-statements def _consume_request_iterator(request_iterator, state, call, request_serializer, event_handler): - """Consume a request iterator supplied by the user.""" + if cygrpc.is_fork_support_enabled(): + condition_wait_timeout = 1.0 + else: + condition_wait_timeout = None def consume_request_iterator(): # pylint: disable=too-many-branches - # Iterate over the request iterator until it is exhausted or an error - # condition is encountered. while True: return_from_user_request_generator_invoked = False try: @@ -213,19 +224,14 @@ def consume_request_iterator(): # pylint: disable=too-many-branches state.due.add(cygrpc.OperationType.send_message) else: return - - def _done(): - return (state.code is not None or - cygrpc.OperationType.send_message not in - state.due) - - _common.wait( - state.condition.wait, - _done, - spin_cb=functools.partial( - cygrpc.block_if_fork_in_progress, state)) - if state.code is not None: - return + while True: + state.condition.wait(condition_wait_timeout) + cygrpc.block_if_fork_in_progress(state) + if state.code is None: + if cygrpc.OperationType.send_message not in state.due: + break + else: + return else: return with state.condition: @@ -275,21 +281,13 @@ def done(self): with self._state.condition: return self._state.code is not None - def _is_complete(self): - return self._state.code is not None - def result(self, timeout=None): - """Returns the result of the computation or raises its exception. - - See grpc.Future.result for the full API contract. - """ + until = None if timeout is None else time.time() + timeout with self._state.condition: - timed_out = _common.wait( - self._state.condition.wait, self._is_complete, timeout=timeout) - if timed_out: - raise grpc.FutureTimeoutError() - else: - if self._state.code is grpc.StatusCode.OK: + while True: + if self._state.code is None: + _wait_once_until(self._state.condition, until) + elif self._state.code is grpc.StatusCode.OK: return self._state.response elif self._state.cancelled: raise grpc.FutureCancelledError() @@ -297,17 +295,12 @@ def result(self, timeout=None): raise self def exception(self, timeout=None): - """Return the exception raised by the computation. - - See grpc.Future.exception for the full API contract. - """ + until = None if timeout is None else time.time() + timeout with self._state.condition: - timed_out = _common.wait( - self._state.condition.wait, self._is_complete, timeout=timeout) - if timed_out: - raise grpc.FutureTimeoutError() - else: - if self._state.code is grpc.StatusCode.OK: + while True: + if self._state.code is None: + _wait_once_until(self._state.condition, until) + elif self._state.code is grpc.StatusCode.OK: return None elif self._state.cancelled: raise grpc.FutureCancelledError() @@ -315,17 +308,12 @@ def exception(self, timeout=None): return self def traceback(self, timeout=None): - """Access the traceback of the exception raised by the computation. - - See grpc.future.traceback for the full API contract. - """ + until = None if timeout is None else time.time() + timeout with self._state.condition: - timed_out = _common.wait( - self._state.condition.wait, self._is_complete, timeout=timeout) - if timed_out: - raise grpc.FutureTimeoutError() - else: - if self._state.code is grpc.StatusCode.OK: + while True: + if self._state.code is None: + _wait_once_until(self._state.condition, until) + elif self._state.code is grpc.StatusCode.OK: return None elif self._state.cancelled: raise grpc.FutureCancelledError() @@ -357,23 +345,17 @@ def _next(self): raise StopIteration() else: raise self - - def _response_ready(): - return ( - self._state.response is not None or - (cygrpc.OperationType.receive_message not in self._state.due - and self._state.code is not None)) - - _common.wait(self._state.condition.wait, _response_ready) - if self._state.response is not None: - response = self._state.response - self._state.response = None - return response - elif cygrpc.OperationType.receive_message not in self._state.due: - if self._state.code is grpc.StatusCode.OK: - raise StopIteration() - elif self._state.code is not None: - raise self + while True: + self._state.condition.wait() + if self._state.response is not None: + response = self._state.response + self._state.response = None + return response + elif cygrpc.OperationType.receive_message not in self._state.due: + if self._state.code is grpc.StatusCode.OK: + raise StopIteration() + elif self._state.code is not None: + raise self def __iter__(self): return self @@ -404,47 +386,32 @@ def add_callback(self, callback): def initial_metadata(self): with self._state.condition: - - def _done(): - return self._state.initial_metadata is not None - - _common.wait(self._state.condition.wait, _done) + while self._state.initial_metadata is None: + self._state.condition.wait() return self._state.initial_metadata def trailing_metadata(self): with self._state.condition: - - def _done(): - return self._state.trailing_metadata is not None - - _common.wait(self._state.condition.wait, _done) + while self._state.trailing_metadata is None: + self._state.condition.wait() return self._state.trailing_metadata def code(self): with self._state.condition: - - def _done(): - return self._state.code is not None - - _common.wait(self._state.condition.wait, _done) + while self._state.code is None: + self._state.condition.wait() return self._state.code def details(self): with self._state.condition: - - def _done(): - return self._state.details is not None - - _common.wait(self._state.condition.wait, _done) + while self._state.details is None: + self._state.condition.wait() return _common.decode(self._state.details) def debug_error_string(self): with self._state.condition: - - def _done(): - return self._state.debug_error_string is not None - - _common.wait(self._state.condition.wait, _done) + while self._state.debug_error_string is None: + self._state.condition.wait() return _common.decode(self._state.debug_error_string) def _repr(self): diff --git a/src/python/grpcio/grpc/_common.py b/src/python/grpcio/grpc/_common.py index b4b24738e8f07..f69127e38ef10 100644 --- a/src/python/grpcio/grpc/_common.py +++ b/src/python/grpcio/grpc/_common.py @@ -15,7 +15,6 @@ import logging -import time import six import grpc @@ -61,8 +60,6 @@ CYGRPC_STATUS_CODE_TO_STATUS_CODE) } -MAXIMUM_WAIT_TIMEOUT = 0.1 - def encode(s): if isinstance(s, bytes): @@ -99,50 +96,3 @@ def deserialize(serialized_message, deserializer): def fully_qualified_method(group, method): return '/{}/{}'.format(group, method) - - -def _wait_once(wait_fn, timeout, spin_cb): - wait_fn(timeout=timeout) - if spin_cb is not None: - spin_cb() - - -def wait(wait_fn, wait_complete_fn, timeout=None, spin_cb=None): - """Blocks waiting for an event without blocking the thread indefinitely. - - See https://github.com/grpc/grpc/issues/19464 for full context. CPython's - `threading.Event.wait` and `threading.Condition.wait` methods, if invoked - without a timeout kwarg, may block the calling thread indefinitely. If the - call is made from the main thread, this means that signal handlers may not - run for an arbitrarily long period of time. - - This wrapper calls the supplied wait function with an arbitrary short - timeout to ensure that no signal handler has to wait longer than - MAXIMUM_WAIT_TIMEOUT before executing. - - Args: - wait_fn: A callable acceptable a single float-valued kwarg named - `timeout`. This function is expected to be one of `threading.Event.wait` - or `threading.Condition.wait`. - wait_complete_fn: A callable taking no arguments and returning a bool. - When this function returns true, it indicates that waiting should cease. - timeout: An optional float-valued number of seconds after which the wait - should cease. - spin_cb: An optional Callable taking no arguments and returning nothing. - This callback will be called on each iteration of the spin. This may be - used for, e.g. work related to forking. - - Returns: - True if a timeout was supplied and it was reached. False otherwise. - """ - if timeout is None: - while not wait_complete_fn(): - _wait_once(wait_fn, MAXIMUM_WAIT_TIMEOUT, spin_cb) - else: - end = time.time() + timeout - while not wait_complete_fn(): - remaining = min(end - time.time(), MAXIMUM_WAIT_TIMEOUT) - if remaining < 0: - return True - _wait_once(wait_fn, remaining, spin_cb) - return False diff --git a/src/python/grpcio_tests/commands.py b/src/python/grpcio_tests/commands.py index 166cea101a415..dc0795d4a1257 100644 --- a/src/python/grpcio_tests/commands.py +++ b/src/python/grpcio_tests/commands.py @@ -145,8 +145,6 @@ class TestGevent(setuptools.Command): 'unit._exit_test.ExitTest.test_in_flight_partial_unary_stream_call', 'unit._exit_test.ExitTest.test_in_flight_partial_stream_unary_call', 'unit._exit_test.ExitTest.test_in_flight_partial_stream_stream_call', - # TODO(https://github.com/grpc/grpc/issues/18980): Reenable. - 'unit._signal_handling_test.SignalHandlingTest', 'unit._metadata_flags_test', 'health_check._health_servicer_test.HealthServicerTest.test_cancelled_watch_removed_from_watch_list', # TODO(https://github.com/grpc/grpc/issues/17330) enable these three tests diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json index 16ba4847bc098..cc08d56248a0d 100644 --- a/src/python/grpcio_tests/tests/tests.json +++ b/src/python/grpcio_tests/tests/tests.json @@ -67,7 +67,6 @@ "unit._server_ssl_cert_config_test.ServerSSLCertReloadTestWithoutClientAuth", "unit._server_test.ServerTest", "unit._session_cache_test.SSLSessionCacheTest", - "unit._signal_handling_test.SignalHandlingTest", "unit._version_test.VersionTest", "unit.beta._beta_features_test.BetaFeaturesTest", "unit.beta._beta_features_test.ContextManagementAndLifecycleTest", diff --git a/src/python/grpcio_tests/tests/unit/BUILD.bazel b/src/python/grpcio_tests/tests/unit/BUILD.bazel index d21f5a59ad153..a161794f8be87 100644 --- a/src/python/grpcio_tests/tests/unit/BUILD.bazel +++ b/src/python/grpcio_tests/tests/unit/BUILD.bazel @@ -16,7 +16,6 @@ GRPCIO_TESTS_UNIT = [ "_credentials_test.py", "_dns_resolver_test.py", "_empty_message_test.py", - "_error_message_encoding_test.py", "_exit_test.py", "_interceptor_test.py", "_invalid_metadata_test.py", @@ -28,7 +27,6 @@ GRPCIO_TESTS_UNIT = [ # "_reconnect_test.py", "_resource_exhausted_test.py", "_rpc_test.py", - "_signal_handling_test.py", # TODO(ghostwriternr): To be added later. # "_server_ssl_cert_config_test.py", "_server_test.py", @@ -41,11 +39,6 @@ py_library( srcs = ["_tcp_proxy.py"], ) -py_library( - name = "_signal_client", - srcs = ["_signal_client.py"], -) - py_library( name = "resources", srcs = ["resources.py"], @@ -94,7 +87,6 @@ py_library( ":_server_shutdown_scenarios", ":_from_grpc_import_star", ":_tcp_proxy", - ":_signal_client", "//src/python/grpcio_tests/tests/unit/framework/common", "//src/python/grpcio_tests/tests/testing", requirement('six'), diff --git a/src/python/grpcio_tests/tests/unit/_signal_client.py b/src/python/grpcio_tests/tests/unit/_signal_client.py deleted file mode 100644 index 65ddd6d858ec4..0000000000000 --- a/src/python/grpcio_tests/tests/unit/_signal_client.py +++ /dev/null @@ -1,84 +0,0 @@ -# Copyright 2019 the gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Client for testing responsiveness to signals.""" - -from __future__ import print_function - -import argparse -import functools -import logging -import signal -import sys - -import grpc - -SIGTERM_MESSAGE = "Handling sigterm!" - -UNARY_UNARY = "/test/Unary" -UNARY_STREAM = "/test/ServerStreaming" - -_MESSAGE = b'\x00\x00\x00' - -_ASSERTION_MESSAGE = "Control flow should never reach here." - -# NOTE(gnossen): We use a global variable here so that the signal handler can be -# installed before the RPC begins. If we do not do this, then we may receive the -# SIGINT before the signal handler is installed. I'm not happy with per-process -# global state, but the per-process global state that is signal handlers -# somewhat forces my hand. -per_process_rpc_future = None - - -def handle_sigint(unused_signum, unused_frame): - print(SIGTERM_MESSAGE) - if per_process_rpc_future is not None: - per_process_rpc_future.cancel() - sys.stderr.flush() - sys.exit(0) - - -def main_unary(server_target): - """Initiate a unary RPC to be interrupted by a SIGINT.""" - global per_process_rpc_future # pylint: disable=global-statement - with grpc.insecure_channel(server_target) as channel: - multicallable = channel.unary_unary(UNARY_UNARY) - signal.signal(signal.SIGINT, handle_sigint) - per_process_rpc_future = multicallable.future( - _MESSAGE, wait_for_ready=True) - result = per_process_rpc_future.result() - assert False, _ASSERTION_MESSAGE - - -def main_streaming(server_target): - """Initiate a streaming RPC to be interrupted by a SIGINT.""" - global per_process_rpc_future # pylint: disable=global-statement - with grpc.insecure_channel(server_target) as channel: - signal.signal(signal.SIGINT, handle_sigint) - per_process_rpc_future = channel.unary_stream(UNARY_STREAM)( - _MESSAGE, wait_for_ready=True) - for result in per_process_rpc_future: - pass - assert False, _ASSERTION_MESSAGE - - -if __name__ == '__main__': - parser = argparse.ArgumentParser(description='Signal test client.') - parser.add_argument('server', help='Server target') - parser.add_argument( - 'arity', help='RPC arity', choices=('unary', 'streaming')) - args = parser.parse_args() - if args.arity == 'unary': - main_unary(args.server) - else: - main_streaming(args.server) diff --git a/src/python/grpcio_tests/tests/unit/_signal_handling_test.py b/src/python/grpcio_tests/tests/unit/_signal_handling_test.py deleted file mode 100644 index 9d0d64709af01..0000000000000 --- a/src/python/grpcio_tests/tests/unit/_signal_handling_test.py +++ /dev/null @@ -1,158 +0,0 @@ -# Copyright 2019 the gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Test of responsiveness to signals.""" - -from __future__ import print_function - -import logging -import os -import signal -import subprocess -import tempfile -import threading -import unittest -import sys - -import grpc - -from tests.unit import test_common -from tests.unit import _signal_client - -_CLIENT_PATH = os.path.abspath(os.path.realpath(_signal_client.__file__)) -_HOST = 'localhost' - - -class _GenericHandler(grpc.GenericRpcHandler): - - def __init__(self): - self._connected_clients_lock = threading.RLock() - self._connected_clients_event = threading.Event() - self._connected_clients = 0 - - self._unary_unary_handler = grpc.unary_unary_rpc_method_handler( - self._handle_unary_unary) - self._unary_stream_handler = grpc.unary_stream_rpc_method_handler( - self._handle_unary_stream) - - def _on_client_connect(self): - with self._connected_clients_lock: - self._connected_clients += 1 - self._connected_clients_event.set() - - def _on_client_disconnect(self): - with self._connected_clients_lock: - self._connected_clients -= 1 - if self._connected_clients == 0: - self._connected_clients_event.clear() - - def await_connected_client(self): - """Blocks until a client connects to the server.""" - self._connected_clients_event.wait() - - def _handle_unary_unary(self, request, servicer_context): - """Handles a unary RPC. - - Blocks until the client disconnects and then echoes. - """ - stop_event = threading.Event() - - def on_rpc_end(): - self._on_client_disconnect() - stop_event.set() - - servicer_context.add_callback(on_rpc_end) - self._on_client_connect() - stop_event.wait() - return request - - def _handle_unary_stream(self, request, servicer_context): - """Handles a server streaming RPC. - - Blocks until the client disconnects and then echoes. - """ - stop_event = threading.Event() - - def on_rpc_end(): - self._on_client_disconnect() - stop_event.set() - - servicer_context.add_callback(on_rpc_end) - self._on_client_connect() - stop_event.wait() - yield request - - def service(self, handler_call_details): - if handler_call_details.method == _signal_client.UNARY_UNARY: - return self._unary_unary_handler - elif handler_call_details.method == _signal_client.UNARY_STREAM: - return self._unary_stream_handler - else: - return None - - -def _read_stream(stream): - stream.seek(0) - return stream.read() - - -class SignalHandlingTest(unittest.TestCase): - - def setUp(self): - self._server = test_common.test_server() - self._port = self._server.add_insecure_port('{}:0'.format(_HOST)) - self._handler = _GenericHandler() - self._server.add_generic_rpc_handlers((self._handler,)) - self._server.start() - - def tearDown(self): - self._server.stop(None) - - @unittest.skipIf(os.name == 'nt', 'SIGINT not supported on windows') - def testUnary(self): - """Tests that the server unary code path does not stall signal handlers.""" - server_target = '{}:{}'.format(_HOST, self._port) - with tempfile.TemporaryFile(mode='r') as client_stdout: - with tempfile.TemporaryFile(mode='r') as client_stderr: - client = subprocess.Popen( - (sys.executable, _CLIENT_PATH, server_target, 'unary'), - stdout=client_stdout, - stderr=client_stderr) - self._handler.await_connected_client() - client.send_signal(signal.SIGINT) - self.assertFalse(client.wait(), msg=_read_stream(client_stderr)) - client_stdout.seek(0) - self.assertIn(_signal_client.SIGTERM_MESSAGE, - client_stdout.read()) - - @unittest.skipIf(os.name == 'nt', 'SIGINT not supported on windows') - def testStreaming(self): - """Tests that the server streaming code path does not stall signal handlers.""" - server_target = '{}:{}'.format(_HOST, self._port) - with tempfile.TemporaryFile(mode='r') as client_stdout: - with tempfile.TemporaryFile(mode='r') as client_stderr: - client = subprocess.Popen( - (sys.executable, _CLIENT_PATH, server_target, 'streaming'), - stdout=client_stdout, - stderr=client_stderr) - self._handler.await_connected_client() - client.send_signal(signal.SIGINT) - self.assertFalse(client.wait(), msg=_read_stream(client_stderr)) - client_stdout.seek(0) - self.assertIn(_signal_client.SIGTERM_MESSAGE, - client_stdout.read()) - - -if __name__ == '__main__': - logging.basicConfig() - unittest.main(verbosity=2)