From 33d249f4b25fcdd5cef64b024397ab742a92afe7 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 21 Jul 2021 16:51:53 +0100 Subject: [PATCH 1/9] switch from `types.CoroutineType` to `typing.Coroutine` these should be identical semantically, and since `defer.ensureDeferred` is defined to take a `typing.Coroutine`, will keep mypy happy --- synapse/logging/context.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/logging/context.py b/synapse/logging/context.py index 18ac50780285..02e5ddd2ef2a 100644 --- a/synapse/logging/context.py +++ b/synapse/logging/context.py @@ -25,7 +25,7 @@ import inspect import logging import threading -import types +import typing import warnings from typing import TYPE_CHECKING, Optional, Tuple, TypeVar, Union @@ -745,7 +745,7 @@ def run_in_background(f, *args, **kwargs) -> defer.Deferred: # by synchronous exceptions, so let's turn them into Failures. return defer.fail() - if isinstance(res, types.CoroutineType): + if isinstance(res, typing.Coroutine): res = defer.ensureDeferred(res) # At this point we should have a Deferred, if not then f was a synchronous From 071447f72ecb33bc589bc823364cb663726db6b2 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 21 Jul 2021 16:59:18 +0100 Subject: [PATCH 2/9] Fix some annotations on inlineCallbacks functions --- synapse/http/federation/matrix_federation_agent.py | 4 ++-- synapse/module_api/__init__.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py index 950770201a79..c16b7f10e645 100644 --- a/synapse/http/federation/matrix_federation_agent.py +++ b/synapse/http/federation/matrix_federation_agent.py @@ -27,7 +27,7 @@ ) from twisted.web.client import URI, Agent, HTTPConnectionPool from twisted.web.http_headers import Headers -from twisted.web.iweb import IAgent, IAgentEndpointFactory, IBodyProducer +from twisted.web.iweb import IAgent, IAgentEndpointFactory, IBodyProducer, IResponse from synapse.crypto.context_factory import FederationPolicyForHTTPS from synapse.http.client import BlacklistingAgentWrapper @@ -116,7 +116,7 @@ def request( uri: bytes, headers: Optional[Headers] = None, bodyProducer: Optional[IBodyProducer] = None, - ) -> Generator[defer.Deferred, Any, defer.Deferred]: + ) -> Generator[defer.Deferred, Any, IResponse]: """ Args: method: HTTP method: GET/POST/etc diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index 1259fc2d906a..473812b8e295 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -484,7 +484,7 @@ async def complete_sso_login_async( @defer.inlineCallbacks def get_state_events_in_room( self, room_id: str, types: Iterable[Tuple[str, Optional[str]]] - ) -> Generator[defer.Deferred, Any, defer.Deferred]: + ) -> Generator[defer.Deferred, Any, Iterable[EventBase]]: """Gets current state events for the given room. (This is exposed for compatibility with the old SpamCheckerApi. We should From cd007f2dea9ce434544f4d0ed61635b5c0c44e67 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 21 Jul 2021 17:00:38 +0100 Subject: [PATCH 3/9] changelog --- changelog.d/10446.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/10446.misc diff --git a/changelog.d/10446.misc b/changelog.d/10446.misc new file mode 100644 index 000000000000..a5a0ca80eb4d --- /dev/null +++ b/changelog.d/10446.misc @@ -0,0 +1 @@ +Update type annotations to work with forthcoming Twisted 21.7.0 release. From 7e34f6e9856f890b2b9db06364747dcb0c0d214e Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 21 Jul 2021 19:56:16 +0100 Subject: [PATCH 4/9] improve typing annotations in CachedCall tighten up some of the typing in CachedCall, which is going to be needed when Twisted 21.7 brings better typing on Deferred. --- changelog.d/10450.misc | 1 + synapse/util/caches/cached_call.py | 34 ++++++++++++++++++++---------- 2 files changed, 24 insertions(+), 11 deletions(-) create mode 100644 changelog.d/10450.misc diff --git a/changelog.d/10450.misc b/changelog.d/10450.misc new file mode 100644 index 000000000000..aa646f0841c7 --- /dev/null +++ b/changelog.d/10450.misc @@ -0,0 +1 @@ + Update type annotations to work with forthcoming Twisted 21.7.0 release. diff --git a/synapse/util/caches/cached_call.py b/synapse/util/caches/cached_call.py index 891bee0b33ae..26683b4513ff 100644 --- a/synapse/util/caches/cached_call.py +++ b/synapse/util/caches/cached_call.py @@ -11,9 +11,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import enum from typing import Awaitable, Callable, Generic, Optional, TypeVar, Union +from twisted.internet import defer from twisted.internet.defer import Deferred from twisted.python.failure import Failure @@ -22,6 +23,10 @@ TV = TypeVar("TV") +class _Sentinel(enum.Enum): + sentinel = object() + + class CachedCall(Generic[TV]): """A wrapper for asynchronous calls whose results should be shared @@ -65,7 +70,7 @@ def __init__(self, f: Callable[[], Awaitable[TV]]): """ self._callable: Optional[Callable[[], Awaitable[TV]]] = f self._deferred: Optional[Deferred] = None - self._result: Union[None, Failure, TV] = None + self._result: Union[_Sentinel, TV, Failure] = _Sentinel.sentinel async def get(self) -> TV: """Kick off the call if necessary, and return the result""" @@ -78,8 +83,9 @@ async def get(self) -> TV: self._callable = None # once the deferred completes, store the result. We cannot simply leave the - # result in the deferred, since if it's a Failure, GCing the deferred - # would then log a critical error about unhandled Failures. + # result in the deferred, since `awaiting` a deferred destroys its result. + # (Also, if it's a Failure, GCing the deferred would log a critical error + # about unhandled Failures) def got_result(r): self._result = r @@ -92,13 +98,19 @@ def got_result(r): # and any eventual exception may not be reported. # we can now await the deferred, and once it completes, return the result. - await make_deferred_yieldable(self._deferred) - - # I *think* this is the easiest way to correctly raise a Failure without having - # to gut-wrench into the implementation of Deferred. - d = Deferred() - d.callback(self._result) - return await d + if isinstance(self._result, _Sentinel): + await make_deferred_yieldable(self._deferred) + assert not isinstance(self._result, _Sentinel) + + if isinstance(self._result, Failure): + # I *think* awaiting a failed Deferred is the easiest way to correctly raise + # the right exception. + d = defer.fail(self._result) + await d + # the `await` should always raise, so this should be unreachable. + raise AssertionError("unexpected return from await on failure") + + return self._result class RetryOnExceptionCachedCall(Generic[TV]): From f8e460f658cf3f9b42bebde42a4fab59b59207fd Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 21 Jul 2021 20:58:47 +0100 Subject: [PATCH 5/9] bump to Twisted 21.7 --- synapse/python_dependencies.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index cdcbdd772b14..2a1f1c19dfcd 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -60,8 +60,10 @@ "service_identity>=18.1.0", # Twisted 18.9 introduces some logger improvements that the structured # logger utilises - "Twisted>=18.9.0", - "treq>=15.1", + # testing twisted 21.7 + "Twisted>=21.7.0rc2", + "Twisted[tls]>=21.7.0rc2", + "treq>=17.8", # Twisted has required pyopenssl 16.0 since about Twisted 16.6. "pyopenssl>=16.0.0", "pyyaml>=3.11", From c9705489973ff90afe8ada657911afa738396587 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 21 Jul 2021 16:57:41 +0100 Subject: [PATCH 6/9] Add some simple type hints on Deferreds --- synapse/http/client.py | 4 ++-- synapse/replication/tcp/client.py | 2 +- synapse/util/async_helpers.py | 16 ++++++++-------- synapse/util/caches/descriptors.py | 2 +- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/synapse/http/client.py b/synapse/http/client.py index 2ac76b15c2d2..f25eba268890 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -847,7 +847,7 @@ def connectionLost(self, reason: Failure = connectionDone) -> None: def read_body_with_max_size( response: IResponse, stream: ByteWriteable, max_size: Optional[int] -) -> defer.Deferred: +) -> defer.Deferred[int]: """ Read a HTTP response body to a file-object. Optionally enforcing a maximum file size. @@ -862,7 +862,7 @@ def read_body_with_max_size( Returns: A Deferred which resolves to the length of the read body. """ - d = defer.Deferred() + d: defer.Deferred[int] = defer.Deferred() # If the Content-Length header gives a size larger than the maximum allowed # size, do not bother downloading the body. diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 9d4859798bd3..100078b5f4cf 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -285,7 +285,7 @@ async def wait_for_stream_position( # Create a new deferred that times out after N seconds, as we don't want # to wedge here forever. - deferred = Deferred() + deferred: Deferred[None] = Deferred() deferred = timeout_deferred( deferred, _WAIT_FOR_REPLICATION_TIMEOUT_SECONDS, self._reactor ) diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 014db1355b13..2435fbb5c21e 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -49,6 +49,8 @@ logger = logging.getLogger(__name__) +_T = TypeVar("_T") + class ObservableDeferred: """Wraps a deferred object so that we can add observer deferreds. These @@ -121,7 +123,7 @@ def observe(self) -> defer.Deferred: effect the underlying deferred. """ if not self._result: - d = defer.Deferred() + d: defer.Deferred[Any] = defer.Deferred() def remove(r): self._observers.discard(d) @@ -415,7 +417,7 @@ def __init__(self): self.key_to_current_writer: Dict[str, defer.Deferred] = {} async def read(self, key: str) -> ContextManager: - new_defer = defer.Deferred() + new_defer: defer.Deferred[None] = defer.Deferred() curr_readers = self.key_to_current_readers.setdefault(key, set()) curr_writer = self.key_to_current_writer.get(key, None) @@ -438,7 +440,7 @@ def _ctx_manager(): return _ctx_manager() async def write(self, key: str) -> ContextManager: - new_defer = defer.Deferred() + new_defer: defer.Deferred[None] = defer.Deferred() curr_readers = self.key_to_current_readers.get(key, set()) curr_writer = self.key_to_current_writer.get(key, None) @@ -471,10 +473,8 @@ def _ctx_manager(): def timeout_deferred( - deferred: defer.Deferred, - timeout: float, - reactor: IReactorTime, -) -> defer.Deferred: + deferred: defer.Deferred[_T], timeout: float, reactor: IReactorTime +) -> defer.Deferred[_T]: """The in built twisted `Deferred.addTimeout` fails to time out deferreds that have a canceller that throws exceptions. This method creates a new deferred that wraps and times out the given deferred, correctly handling @@ -497,7 +497,7 @@ def timeout_deferred( Returns: A new Deferred, which will errback with defer.TimeoutError on timeout. """ - new_d = defer.Deferred() + new_d: defer.Deferred[_T] = defer.Deferred() timed_out = [False] diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 1e8e6b1d01c6..766ffc7751db 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -413,7 +413,7 @@ def arg_to_cache_key(arg): # relevant result for that key. deferreds_map = {} for arg in missing: - deferred = defer.Deferred() + deferred: defer.Deferred[Any] = defer.Deferred() deferreds_map[arg] = deferred key = arg_to_cache_key(arg) cache.set(key, deferred, callback=invalidate_callback) From a413836cebf03c3fd877eb4a235f41b7657f5505 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 21 Jul 2021 17:06:52 +0100 Subject: [PATCH 7/9] type hints for Deferreds in DeferredCache --- synapse/util/caches/deferred_cache.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/synapse/util/caches/deferred_cache.py b/synapse/util/caches/deferred_cache.py index 8c6fafc6770a..e10dd360106c 100644 --- a/synapse/util/caches/deferred_cache.py +++ b/synapse/util/caches/deferred_cache.py @@ -16,7 +16,16 @@ import enum import threading -from typing import Callable, Generic, Iterable, MutableMapping, Optional, TypeVar, Union +from typing import ( + Callable, + Generic, + Iterable, + MutableMapping, + Optional, + TypeVar, + Union, + cast, +) from prometheus_client import Gauge @@ -166,7 +175,7 @@ def get_immediate( def set( self, key: KT, - value: defer.Deferred, + value: defer.Deferred[VT], callback: Optional[Callable[[], None]] = None, ) -> defer.Deferred: """Adds a new entry to the cache (or updates an existing one). @@ -214,7 +223,7 @@ def set( if value.called: result = value.result if not isinstance(result, failure.Failure): - self.cache.set(key, result, callbacks) + self.cache.set(key, cast(VT, result), callbacks) return value # otherwise, we'll add an entry to the _pending_deferred_cache for now, From e2a6fdc3c04b66a6c09a7ef153c09dfe184e45e1 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 21 Jul 2021 20:59:06 +0100 Subject: [PATCH 8/9] Generics for ObservableDeferred --- synapse/notifier.py | 5 +++-- synapse/storage/persist_events.py | 4 +++- synapse/util/async_helpers.py | 13 +++++++------ 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index c5fbebc17d75..bbe337949ac5 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -111,8 +111,9 @@ def __init__( self.last_notified_token = current_token self.last_notified_ms = time_now_ms - with PreserveLoggingContext(): - self.notify_deferred = ObservableDeferred(defer.Deferred()) + self.notify_deferred: ObservableDeferred[StreamToken] = ObservableDeferred( + defer.Deferred() + ) def notify( self, diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index a39877f0d5b2..0e8270746d78 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -170,7 +170,9 @@ async def add_to_queue( end_item = queue[-1] else: # need to make a new queue item - deferred = ObservableDeferred(defer.Deferred(), consumeErrors=True) + deferred: ObservableDeferred[_PersistResult] = ObservableDeferred( + defer.Deferred(), consumeErrors=True + ) end_item = _EventPersistQueueItem( events_and_contexts=[], diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 2435fbb5c21e..5e7b2c208c6a 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -23,6 +23,7 @@ Awaitable, Callable, Dict, + Generic, Hashable, Iterable, List, @@ -52,7 +53,7 @@ _T = TypeVar("_T") -class ObservableDeferred: +class ObservableDeferred(Generic[_T]): """Wraps a deferred object so that we can add observer deferreds. These observer deferreds do not affect the callback chain of the original deferred. @@ -70,7 +71,7 @@ class ObservableDeferred: __slots__ = ["_deferred", "_observers", "_result"] - def __init__(self, deferred: defer.Deferred, consumeErrors: bool = False): + def __init__(self, deferred: defer.Deferred[_T], consumeErrors: bool = False): object.__setattr__(self, "_deferred", deferred) object.__setattr__(self, "_result", None) object.__setattr__(self, "_observers", set()) @@ -115,7 +116,7 @@ def errback(f): deferred.addCallbacks(callback, errback) - def observe(self) -> defer.Deferred: + def observe(self) -> defer.Deferred[_T]: """Observe the underlying deferred. This returns a brand new deferred that is resolved when the underlying @@ -123,7 +124,7 @@ def observe(self) -> defer.Deferred: effect the underlying deferred. """ if not self._result: - d: defer.Deferred[Any] = defer.Deferred() + d: defer.Deferred[_T] = defer.Deferred() def remove(r): self._observers.discard(d) @@ -137,7 +138,7 @@ def remove(r): success, res = self._result return defer.succeed(res) if success else defer.fail(res) - def observers(self) -> List[defer.Deferred]: + def observers(self) -> List[defer.Deferred[_T]]: return self._observers def has_called(self) -> bool: @@ -146,7 +147,7 @@ def has_called(self) -> bool: def has_succeeded(self) -> bool: return self._result is not None and self._result[0] is True - def get_result(self) -> Any: + def get_result(self) -> _T: return self._result[1] def __getattr__(self, name: str) -> Any: From 71417508922ef4a47c3e553c26f7b121501230f0 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 21 Jul 2021 21:08:00 +0100 Subject: [PATCH 9/9] newsfile --- changelog.d/10402.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/10402.misc diff --git a/changelog.d/10402.misc b/changelog.d/10402.misc new file mode 100644 index 000000000000..e30141694b9d --- /dev/null +++ b/changelog.d/10402.misc @@ -0,0 +1 @@ +test twisted 21.7.