Skip to content

Commit

Permalink
Merge pull request grpc#19583 from gnossen/revert_signal_handling
Browse files Browse the repository at this point in the history
Revert signal handling
  • Loading branch information
gnossen committed Jul 8, 2019
2 parents 6d62eb1 + 2014a51 commit 1e7ec75
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 393 deletions.
151 changes: 59 additions & 92 deletions src/python/grpcio/grpc/_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# limitations under the License.
"""Invocation-side implementation of gRPC Python."""

import functools
import logging
import sys
import threading
Expand Down Expand Up @@ -82,6 +81,17 @@ def _unknown_code_details(unknown_cygrpc_code, details):
unknown_cygrpc_code, details)


def _wait_once_until(condition, until):
if until is None:
condition.wait()
else:
remaining = until - time.time()
if remaining < 0:
raise grpc.FutureTimeoutError()
else:
condition.wait(timeout=remaining)


class _RPCState(object):

def __init__(self, due, initial_metadata, trailing_metadata, code, details):
Expand Down Expand Up @@ -168,11 +178,12 @@ def handle_event(event):
#pylint: disable=too-many-statements
def _consume_request_iterator(request_iterator, state, call, request_serializer,
event_handler):
"""Consume a request iterator supplied by the user."""
if cygrpc.is_fork_support_enabled():
condition_wait_timeout = 1.0
else:
condition_wait_timeout = None

def consume_request_iterator(): # pylint: disable=too-many-branches
# Iterate over the request iterator until it is exhausted or an error
# condition is encountered.
while True:
return_from_user_request_generator_invoked = False
try:
Expand Down Expand Up @@ -213,19 +224,14 @@ def consume_request_iterator(): # pylint: disable=too-many-branches
state.due.add(cygrpc.OperationType.send_message)
else:
return

def _done():
return (state.code is not None or
cygrpc.OperationType.send_message not in
state.due)

_common.wait(
state.condition.wait,
_done,
spin_cb=functools.partial(
cygrpc.block_if_fork_in_progress, state))
if state.code is not None:
return
while True:
state.condition.wait(condition_wait_timeout)
cygrpc.block_if_fork_in_progress(state)
if state.code is None:
if cygrpc.OperationType.send_message not in state.due:
break
else:
return
else:
return
with state.condition:
Expand Down Expand Up @@ -275,57 +281,39 @@ def done(self):
with self._state.condition:
return self._state.code is not None

def _is_complete(self):
return self._state.code is not None

def result(self, timeout=None):
"""Returns the result of the computation or raises its exception.
See grpc.Future.result for the full API contract.
"""
until = None if timeout is None else time.time() + timeout
with self._state.condition:
timed_out = _common.wait(
self._state.condition.wait, self._is_complete, timeout=timeout)
if timed_out:
raise grpc.FutureTimeoutError()
else:
if self._state.code is grpc.StatusCode.OK:
while True:
if self._state.code is None:
_wait_once_until(self._state.condition, until)
elif self._state.code is grpc.StatusCode.OK:
return self._state.response
elif self._state.cancelled:
raise grpc.FutureCancelledError()
else:
raise self

def exception(self, timeout=None):
"""Return the exception raised by the computation.
See grpc.Future.exception for the full API contract.
"""
until = None if timeout is None else time.time() + timeout
with self._state.condition:
timed_out = _common.wait(
self._state.condition.wait, self._is_complete, timeout=timeout)
if timed_out:
raise grpc.FutureTimeoutError()
else:
if self._state.code is grpc.StatusCode.OK:
while True:
if self._state.code is None:
_wait_once_until(self._state.condition, until)
elif self._state.code is grpc.StatusCode.OK:
return None
elif self._state.cancelled:
raise grpc.FutureCancelledError()
else:
return self

def traceback(self, timeout=None):
"""Access the traceback of the exception raised by the computation.
See grpc.future.traceback for the full API contract.
"""
until = None if timeout is None else time.time() + timeout
with self._state.condition:
timed_out = _common.wait(
self._state.condition.wait, self._is_complete, timeout=timeout)
if timed_out:
raise grpc.FutureTimeoutError()
else:
if self._state.code is grpc.StatusCode.OK:
while True:
if self._state.code is None:
_wait_once_until(self._state.condition, until)
elif self._state.code is grpc.StatusCode.OK:
return None
elif self._state.cancelled:
raise grpc.FutureCancelledError()
Expand Down Expand Up @@ -357,23 +345,17 @@ def _next(self):
raise StopIteration()
else:
raise self

def _response_ready():
return (
self._state.response is not None or
(cygrpc.OperationType.receive_message not in self._state.due
and self._state.code is not None))

_common.wait(self._state.condition.wait, _response_ready)
if self._state.response is not None:
response = self._state.response
self._state.response = None
return response
elif cygrpc.OperationType.receive_message not in self._state.due:
if self._state.code is grpc.StatusCode.OK:
raise StopIteration()
elif self._state.code is not None:
raise self
while True:
self._state.condition.wait()
if self._state.response is not None:
response = self._state.response
self._state.response = None
return response
elif cygrpc.OperationType.receive_message not in self._state.due:
if self._state.code is grpc.StatusCode.OK:
raise StopIteration()
elif self._state.code is not None:
raise self

def __iter__(self):
return self
Expand Down Expand Up @@ -404,47 +386,32 @@ def add_callback(self, callback):

def initial_metadata(self):
with self._state.condition:

def _done():
return self._state.initial_metadata is not None

_common.wait(self._state.condition.wait, _done)
while self._state.initial_metadata is None:
self._state.condition.wait()
return self._state.initial_metadata

def trailing_metadata(self):
with self._state.condition:

def _done():
return self._state.trailing_metadata is not None

_common.wait(self._state.condition.wait, _done)
while self._state.trailing_metadata is None:
self._state.condition.wait()
return self._state.trailing_metadata

def code(self):
with self._state.condition:

def _done():
return self._state.code is not None

_common.wait(self._state.condition.wait, _done)
while self._state.code is None:
self._state.condition.wait()
return self._state.code

def details(self):
with self._state.condition:

def _done():
return self._state.details is not None

_common.wait(self._state.condition.wait, _done)
while self._state.details is None:
self._state.condition.wait()
return _common.decode(self._state.details)

def debug_error_string(self):
with self._state.condition:

def _done():
return self._state.debug_error_string is not None

_common.wait(self._state.condition.wait, _done)
while self._state.debug_error_string is None:
self._state.condition.wait()
return _common.decode(self._state.debug_error_string)

def _repr(self):
Expand Down
50 changes: 0 additions & 50 deletions src/python/grpcio/grpc/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import logging

import time
import six

import grpc
Expand Down Expand Up @@ -61,8 +60,6 @@
CYGRPC_STATUS_CODE_TO_STATUS_CODE)
}

MAXIMUM_WAIT_TIMEOUT = 0.1


def encode(s):
if isinstance(s, bytes):
Expand Down Expand Up @@ -99,50 +96,3 @@ def deserialize(serialized_message, deserializer):

def fully_qualified_method(group, method):
return '/{}/{}'.format(group, method)


def _wait_once(wait_fn, timeout, spin_cb):
wait_fn(timeout=timeout)
if spin_cb is not None:
spin_cb()


def wait(wait_fn, wait_complete_fn, timeout=None, spin_cb=None):
"""Blocks waiting for an event without blocking the thread indefinitely.
See https://github.com/grpc/grpc/issues/19464 for full context. CPython's
`threading.Event.wait` and `threading.Condition.wait` methods, if invoked
without a timeout kwarg, may block the calling thread indefinitely. If the
call is made from the main thread, this means that signal handlers may not
run for an arbitrarily long period of time.
This wrapper calls the supplied wait function with an arbitrary short
timeout to ensure that no signal handler has to wait longer than
MAXIMUM_WAIT_TIMEOUT before executing.
Args:
wait_fn: A callable acceptable a single float-valued kwarg named
`timeout`. This function is expected to be one of `threading.Event.wait`
or `threading.Condition.wait`.
wait_complete_fn: A callable taking no arguments and returning a bool.
When this function returns true, it indicates that waiting should cease.
timeout: An optional float-valued number of seconds after which the wait
should cease.
spin_cb: An optional Callable taking no arguments and returning nothing.
This callback will be called on each iteration of the spin. This may be
used for, e.g. work related to forking.
Returns:
True if a timeout was supplied and it was reached. False otherwise.
"""
if timeout is None:
while not wait_complete_fn():
_wait_once(wait_fn, MAXIMUM_WAIT_TIMEOUT, spin_cb)
else:
end = time.time() + timeout
while not wait_complete_fn():
remaining = min(end - time.time(), MAXIMUM_WAIT_TIMEOUT)
if remaining < 0:
return True
_wait_once(wait_fn, remaining, spin_cb)
return False
2 changes: 0 additions & 2 deletions src/python/grpcio_tests/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,6 @@ class TestGevent(setuptools.Command):
'unit._exit_test.ExitTest.test_in_flight_partial_unary_stream_call',
'unit._exit_test.ExitTest.test_in_flight_partial_stream_unary_call',
'unit._exit_test.ExitTest.test_in_flight_partial_stream_stream_call',
# TODO(https://github.com/grpc/grpc/issues/18980): Reenable.
'unit._signal_handling_test.SignalHandlingTest',
'unit._metadata_flags_test',
'health_check._health_servicer_test.HealthServicerTest.test_cancelled_watch_removed_from_watch_list',
# TODO(https://github.com/grpc/grpc/issues/17330) enable these three tests
Expand Down
1 change: 0 additions & 1 deletion src/python/grpcio_tests/tests/tests.json
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
"unit._server_ssl_cert_config_test.ServerSSLCertReloadTestWithoutClientAuth",
"unit._server_test.ServerTest",
"unit._session_cache_test.SSLSessionCacheTest",
"unit._signal_handling_test.SignalHandlingTest",
"unit._version_test.VersionTest",
"unit.beta._beta_features_test.BetaFeaturesTest",
"unit.beta._beta_features_test.ContextManagementAndLifecycleTest",
Expand Down
8 changes: 0 additions & 8 deletions src/python/grpcio_tests/tests/unit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ GRPCIO_TESTS_UNIT = [
"_credentials_test.py",
"_dns_resolver_test.py",
"_empty_message_test.py",
"_error_message_encoding_test.py",
"_exit_test.py",
"_interceptor_test.py",
"_invalid_metadata_test.py",
Expand All @@ -28,7 +27,6 @@ GRPCIO_TESTS_UNIT = [
# "_reconnect_test.py",
"_resource_exhausted_test.py",
"_rpc_test.py",
"_signal_handling_test.py",
# TODO(ghostwriternr): To be added later.
# "_server_ssl_cert_config_test.py",
"_server_test.py",
Expand All @@ -41,11 +39,6 @@ py_library(
srcs = ["_tcp_proxy.py"],
)

py_library(
name = "_signal_client",
srcs = ["_signal_client.py"],
)

py_library(
name = "resources",
srcs = ["resources.py"],
Expand Down Expand Up @@ -94,7 +87,6 @@ py_library(
":_server_shutdown_scenarios",
":_from_grpc_import_star",
":_tcp_proxy",
":_signal_client",
"//src/python/grpcio_tests/tests/unit/framework/common",
"//src/python/grpcio_tests/tests/testing",
requirement('six'),
Expand Down
Loading

0 comments on commit 1e7ec75

Please sign in to comment.