From 7fbfedb230bd5bcead8335f3e4c466086eae6638 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Thu, 4 Nov 2021 17:55:12 +0000 Subject: [PATCH 01/45] Add experimental config option to send to-device messages to AS's --- synapse/config/experimental.py | 8 ++++++++ synapse/handlers/appservice.py | 3 +++ 2 files changed, 11 insertions(+) diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py index 8b098ad48d56..593195ae90cb 100644 --- a/synapse/config/experimental.py +++ b/synapse/config/experimental.py @@ -46,3 +46,11 @@ def read_config(self, config: JsonDict, **kwargs): # MSC3266 (room summary api) self.msc3266_enabled: bool = experimental.get("msc3266_enabled", False) + + # MSC2409 (this setting only relates to optionally sending to-device messages). + # Presence, typing and read receipt EDUs are already sent to application services that + # have opted in to receive them. This setting, if enabled, adds to-device messages + # to that list. + self.msc2409_to_device_messages_enabled: bool = experimental.get( + "msc2409_to_device_messages_enabled", False + ) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 9abdad262b78..cd042e35b729 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -55,6 +55,9 @@ def __init__(self, hs: "HomeServer"): self.clock = hs.get_clock() self.notify_appservices = hs.config.appservice.notify_appservices self.event_sources = hs.get_event_sources() + self.msc2409_to_device_messages_enabled = ( + hs.config.experimental.msc2409_to_device_messages_enabled + ) self.current_max = 0 self.is_processing = False From b7a44d44024b88dcfa1ebe98b5a8a54fe5074c10 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Fri, 5 Nov 2021 15:47:33 +0000 Subject: [PATCH 02/45] Add a new ephemeral AS handler for to_device message edus Here we add the ability for the application service ephemeral message processor to handle new events on the "to_device" stream. We keep track of a stream id (token) per application service, and every time a new to-device message comes in, for each appservice we pull the messages between the last-recorded and current stream id and check whether any of the messages are for a user in that appservice's user namespace. get_new_messages is implemented in the next commit. since we rebased off latest develop. --- synapse/handlers/appservice.py | 103 +++++++++++++++++++++++++++++++-- synapse/notifier.py | 26 ++++++--- 2 files changed, 114 insertions(+), 15 deletions(-) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index cd042e35b729..8994313a3ea8 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -202,8 +202,9 @@ def notify_interested_services_ephemeral( Args: stream_key: The stream the event came from. - `stream_key` can be "typing_key", "receipt_key" or "presence_key". Any other - value for `stream_key` will cause this function to return early. + `stream_key` can be "typing_key", "receipt_key", "presence_key" or + "to_device_key". Any other value for `stream_key` will cause this function + to return early. Ephemeral events will only be pushed to appservices that have opted into receiving them by setting `push_ephemeral` to true in their registration @@ -219,10 +220,6 @@ def notify_interested_services_ephemeral( if not self.notify_appservices: return - # Ignore any unsupported streams - if stream_key not in ("typing_key", "receipt_key", "presence_key"): - return - # Assert that new_token is an integer (and not a RoomStreamToken). # All of the supported streams that this function handles use an # integer to track progress (rather than a RoomStreamToken - a @@ -236,6 +233,13 @@ def notify_interested_services_ephemeral( # Additional context: https://github.com/matrix-org/synapse/pull/11137 assert isinstance(new_token, int) + # Ignore to-device messages if the feature flag is not enabled + if ( + stream_key == "to_device_key" + and not self.msc2409_to_device_messages_enabled + ): + return + # Check whether there are any appservices which have registered to receive # ephemeral events. # @@ -310,6 +314,23 @@ async def _notify_interested_services_ephemeral( service, "presence", new_token ) + elif ( + stream_key == "to_device_key" + and self.msc2409_to_device_messages_enabled + ): + # Retrieve an iterable of to-device message events, as well as the + # maximum stream token of the messages we were able to retrieve. + events = await self._handle_to_device(service, new_token, users) + if events: + self.scheduler.submit_ephemeral_events_for_as( + service, events + ) + + # Persist the latest handled stream token for this appservice + await self.store.set_type_stream_id_for_appservice( + service, "to_device", new_token + ) + async def _handle_typing( self, service: ApplicationService, new_token: int ) -> List[JsonDict]: @@ -443,6 +464,76 @@ async def _handle_presence( return events + async def _handle_to_device( + self, + service: ApplicationService, + new_token: int, + users: Collection[Union[str, UserID]], + ) -> List[JsonDict]: + """ + Given an application service, determine which events it should receive + from those between the last-recorded typing event stream token for this + appservice and the given stream token. + + Args: + service: The application service to check for which events it should receive. + new_token: The latest to-device event stream token. + users: The users that should receive new to-device messages. + + Returns: + A list of JSON dictionaries containing data derived from the typing events + that should be sent to the given application service. + """ + # Get the stream token that this application service has processed up until + from_key = await self.store.get_type_stream_id_for_appservice( + service, "to_device" + ) + + # Filter out users that this appservice is not interested in + users_appservice_is_interested_in: List[str] = [] + for user in users: + if isinstance(user, UserID): + user = user.to_string() + + if service.is_interested_in_user(user): + users_appservice_is_interested_in.append(user) + + if not users_appservice_is_interested_in: + # Return early if the AS was not interested in any of these users + return [] + + # Retrieve the to-device messages for each user + recipient_user_id_device_id_to_messages = await self.store.get_new_messages( + users_appservice_is_interested_in, + from_key, + new_token, + ) + + # According to MSC2409, we'll need to add 'to_user_id' and 'to_device_id' fields + # to the event JSON so that the application service will know which user/device + # combination this messages was intended for. + # + # So we mangle this dict into a flat list of to-device messages with the relevant + # user ID and device ID embedded inside each message dict. + message_payload: List[JsonDict] = [] + for ( + user_id, + device_id, + ), messages in recipient_user_id_device_id_to_messages.items(): + for message_json in messages: + # Remove 'message_id' from the to-device message, as it's an internal ID + message_json.pop("message_id", None) + + message_payload.append( + { + "to_user_id": user_id, + "to_device_id": device_id, + **message_json, + } + ) + + return message_payload + async def query_user_exists(self, user_id: str) -> bool: """Check if any application service knows this user_id exists. diff --git a/synapse/notifier.py b/synapse/notifier.py index 60e5409895e6..eb316ae41982 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -444,15 +444,23 @@ def on_new_event( self.notify_replication() - # Notify appservices. - try: - self.appservice_handler.notify_interested_services_ephemeral( - stream_key, - new_token, - users, - ) - except Exception: - logger.exception("Error notifying application services of event") + # Notify appservices of updates in ephemeral event streams. + # Only the following streams are currently supported. + if stream_key in ( + "typing_key", + "receipt_key", + "presence_key", + "to_device_key", + ): + # Notify appservices. + try: + self.appservice_handler.notify_interested_services_ephemeral( + stream_key, + new_token, + users, + ) + except Exception: + logger.exception("Error notifying application services of event") def on_new_replication_data(self) -> None: """Used to inform replication listeners that something has happened From 78bd5eaa4fff1fad0139693f6a1b652fa8f4b3a3 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Fri, 5 Nov 2021 15:52:39 +0000 Subject: [PATCH 03/45] Allow setting/getting stream id per appservice for to-device messages --- synapse/storage/databases/main/appservice.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py index baec35ee27b2..92e6a491c98d 100644 --- a/synapse/storage/databases/main/appservice.py +++ b/synapse/storage/databases/main/appservice.py @@ -387,7 +387,7 @@ def get_new_events_for_appservice_txn(txn): async def get_type_stream_id_for_appservice( self, service: ApplicationService, type: str ) -> int: - if type not in ("read_receipt", "presence"): + if type not in ("read_receipt", "presence", "to_device"): raise ValueError( "Expected type to be a valid application stream id type, got %s" % (type,) @@ -414,7 +414,7 @@ def get_type_stream_id_for_appservice_txn(txn): async def set_type_stream_id_for_appservice( self, service: ApplicationService, stream_type: str, pos: Optional[int] ) -> None: - if stream_type not in ("read_receipt", "presence"): + if stream_type not in ("read_receipt", "presence", "to_device"): raise ValueError( "Expected type to be a valid application stream id type, got %s" % (stream_type,) From 7899f823ae6f6987811e027b3c74104197cf2482 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Fri, 5 Nov 2021 15:59:08 +0000 Subject: [PATCH 04/45] Add database method to fetch to-device messages by user_ids from db This method is quite similar to the one below, except that it doesn't support device ids, and supports querying with more than one user id, both of which are relevant to application services. The results are also formatted in a different data structure, so I'm not sure how much we could really share here between the two methods. --- synapse/storage/databases/main/deviceinbox.py | 76 ++++++++++++++++++- 1 file changed, 75 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 7c0f95336561..554c7a549dfb 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -14,7 +14,7 @@ # limitations under the License. import logging -from typing import TYPE_CHECKING, List, Optional, Tuple +from typing import TYPE_CHECKING, Collection, Dict, List, Optional, Tuple from synapse.logging import issue9533_logger from synapse.logging.opentracing import log_kv, set_tag, trace @@ -24,6 +24,7 @@ DatabasePool, LoggingDatabaseConnection, LoggingTransaction, + make_in_list_sql_clause, ) from synapse.storage.engines import PostgresEngine from synapse.storage.util.id_generators import ( @@ -136,6 +137,79 @@ def process_replication_rows(self, stream_name, instance_name, token, rows): def get_to_device_stream_token(self): return self._device_inbox_id_gen.get_current_token() + async def get_new_messages( + self, + user_ids: Collection[str], + from_stream_id: int, + to_stream_id: int, + ) -> Dict[Tuple[str, str], List[JsonDict]]: + """ + Retrieve to-device messages for a given set of user IDs. + + Only to-device messages with stream ids between the given boundaries + (from < X <= to) are returned. + + Note that a stream ID can be shared by multiple copies of the same message with + different recipient devices. Each (device, message_content) tuple has their own + row in the device_inbox table. + + Args: + user_ids: The users to retrieve to-device messages for. + from_stream_id: The lower boundary of stream id to filter with (exclusive). + to_stream_id: The upper boundary of stream id to filter with (inclusive). + + Returns: + A list of to-device messages. + """ + # Bail out if none of these users have any messages + for user_id in user_ids: + if self._device_inbox_stream_cache.has_entity_changed( + user_id, from_stream_id + ): + break + else: + return {} + + def get_new_messages_txn(txn: LoggingTransaction): + # Build a query to select messages from any of the given users that are between + # the given stream id bounds + + # Scope to only the given users. We need to use this method as doing so is + # different across database engines. + many_clause_sql, many_clause_args = make_in_list_sql_clause( + self.database_engine, "user_id", user_ids + ) + + sql = f""" + SELECT user_id, device_id, message_json FROM device_inbox + WHERE {many_clause_sql} + AND ? < stream_id AND stream_id <= ? + ORDER BY stream_id ASC + """ + + txn.execute(sql, (*many_clause_args, from_stream_id, to_stream_id)) + + # Create a dictionary of (user ID, device ID) -> list of messages that + # that device is meant to receive. + recipient_user_id_device_id_to_messages: Dict[ + Tuple[str, str], List[JsonDict] + ] = {} + + for row in txn: + recipient_user_id = row[0] + recipient_device_id = row[1] + message_dict = db_to_json(row[2]) + + recipient_user_id_device_id_to_messages.setdefault( + (recipient_user_id, recipient_device_id), [] + ).append(message_dict) + + return recipient_user_id_device_id_to_messages + + return await self.db_pool.runInteraction( + "get_new_messages", get_new_messages_txn + ) + async def get_new_messages_for_device( self, user_id: str, From 103f410bef4417fc22d9b0bb8548e6c2892f6f4c Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Fri, 5 Nov 2021 16:00:35 +0000 Subject: [PATCH 05/45] Add a to_device_stream_id column to the application_services_state table This is for tracking the stream id that each application service has been sent up to. In other words, there shouldn't be any need to process stream ids below the recorded one here as the AS should have already received them. Note that there is no reliability built-in here. Reliability of delivery if intended for a separate PR. --- ...09_add_device_id_appservice_stream_type.sql | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 synapse/storage/schema/main/delta/65/06_msc2409_add_device_id_appservice_stream_type.sql diff --git a/synapse/storage/schema/main/delta/65/06_msc2409_add_device_id_appservice_stream_type.sql b/synapse/storage/schema/main/delta/65/06_msc2409_add_device_id_appservice_stream_type.sql new file mode 100644 index 000000000000..3ad9780d1e78 --- /dev/null +++ b/synapse/storage/schema/main/delta/65/06_msc2409_add_device_id_appservice_stream_type.sql @@ -0,0 +1,18 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Add a column to track what to_device stream id that this application +-- service has been caught up to. +ALTER TABLE application_services_state ADD COLUMN to_device_stream_id BIGINT; \ No newline at end of file From e914f1d734ff7a151e0201801cab7fd38291628c Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Fri, 12 Nov 2021 18:49:49 +0000 Subject: [PATCH 06/45] Add tests I decided to spin up another test class for this as the existing one is 1. quite old and 2. was mocking away too much of the infrastructure to my liking. I've named the new class alluding to ephemeral messages, and while we already have some ephemeral tests in AppServiceHandlerTestCase, ideally I'd like to migrate those over. There's two new tests here. One for testing that to-device messages for a local user are received by any application services that have registered interest in that user - and that those that haven't won't receive those messages. The next test is similar, but tests with a whole bunch of to-device messages. Rather than actually registering tons of devices - which would make for a very slow unit test - we just directly insert them into the database. --- tests/handlers/test_appservice.py | 250 +++++++++++++++++++++++++++++- 1 file changed, 246 insertions(+), 4 deletions(-) diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index d6f14e2dba79..af50bfdee84d 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -1,4 +1,4 @@ -# Copyright 2015, 2016 OpenMarket Ltd +# Copyright 2015-2021 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,18 +12,23 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import Dict, Iterable, List, Optional, Tuple from unittest.mock import Mock from twisted.internet import defer +import synapse.rest.admin +import synapse.storage +from synapse.appservice import ApplicationService from synapse.handlers.appservice import ApplicationServicesHandler +from synapse.rest.client import login, receipts, room, sendtodevice from synapse.types import RoomStreamToken +from synapse.util.stringutils import random_string +from tests import unittest from tests.test_utils import make_awaitable from tests.utils import MockClock -from .. import unittest - class AppServiceHandlerTestCase(unittest.TestCase): """Tests the ApplicationServicesHandler.""" @@ -261,7 +266,6 @@ def test_notify_interested_services_ephemeral(self): """ interested_service = self._mkservice(is_interested=True) services = [interested_service] - self.mock_store.get_app_services.return_value = services self.mock_store.get_type_stream_id_for_appservice.return_value = make_awaitable( 579 @@ -321,3 +325,241 @@ def _mkservice_alias(self, is_interested_in_alias): service.token = "mock_service_token" service.url = "mock_service_url" return service + + +class ApplicationServiceEphemeralEventsTestCase(unittest.HomeserverTestCase): + servlets = [ + synapse.rest.admin.register_servlets_for_client_rest_resource, + login.register_servlets, + room.register_servlets, + sendtodevice.register_servlets, + receipts.register_servlets, + ] + + def prepare(self, reactor, clock, hs): + # Mock the application service scheduler so that we can track any + # outgoing transactions + self.mock_scheduler = Mock() + self.mock_scheduler.submit_ephemeral_events_for_as = Mock() + + hs.get_application_service_handler().scheduler = self.mock_scheduler + + self.device1 = "device1" + self.user1 = self.register_user("user1", "password") + self.token1 = self.login("user1", "password", self.device1) + + self.device2 = "device2" + self.user2 = self.register_user("user2", "password") + self.token2 = self.login("user2", "password", self.device2) + + @unittest.override_config( + {"experimental_features": {"msc2409_to_device_messages_enabled": True}} + ) + def test_application_services_receive_local_to_device(self): + """ + Test that when a user sends a to-device message to another user, and + that is in an application service's user namespace, that application + service will receive it. + """ + ( + interested_services, + _, + ) = self._register_interested_and_uninterested_application_services() + interested_service = interested_services[0] + + # Have user1 send a to-device message to user2 + message_content = {"some_key": "some really interesting value"} + chan = self.make_request( + "PUT", + "/_matrix/client/r0/sendToDevice/m.room_key_request/3", + content={"messages": {self.user2: {self.device2: message_content}}}, + access_token=self.token1, + ) + self.assertEqual(chan.code, 200, chan.result) + + # Have user2 send a to-device message to user1 + chan = self.make_request( + "PUT", + "/_matrix/client/r0/sendToDevice/m.room_key_request/4", + content={"messages": {self.user1: {self.device1: message_content}}}, + access_token=self.token2, + ) + self.assertEqual(chan.code, 200, chan.result) + + # Check if our application service - that is interested in user2 - received + # the to-device message as part of an AS transaction. + # Only the user1 -> user2 to-device message should have been forwarded to the AS. + # + # The uninterested application service should not have been notified at all. + self.assertEqual( + 1, self.mock_scheduler.submit_ephemeral_events_for_as.call_count + ) + service, events = self.mock_scheduler.submit_ephemeral_events_for_as.call_args[ + 0 + ] + + # Assert that this was the same to-device message that user1 sent + self.assertEqual(service, interested_service) + self.assertEqual(events[0]["type"], "m.room_key_request") + self.assertEqual(events[0]["sender"], self.user1) + + # Additional fields 'to_user_id' and 'to_device_id' specifically for + # to-device messages via the AS API + self.assertEqual(events[0]["to_user_id"], self.user2) + self.assertEqual(events[0]["to_device_id"], self.device2) + self.assertEqual(events[0]["content"], message_content) + + @unittest.override_config( + {"experimental_features": {"msc2409_to_device_messages_enabled": True}} + ) + def test_application_services_receive_bursts_of_to_device(self): + """ + Test that when a user sends >100 to-device messages at once, any + interested AS's will receive them in separate transactions. + """ + ( + interested_services, + _, + ) = self._register_interested_and_uninterested_application_services( + interested_count=2, + uninterested_count=2, + ) + + to_device_message_content = { + "some key": "some interesting value", + } + + # We need to send a large burst of to-device messages. We also would like to + # include them all in the same application service transaction so that we can + # test large transactions. + # + # To do this, we can send a single to-device message to many user devices at + # once. + # + # We insert number_of_messages - 1 messages into the database directly. We'll then + # send a final to-device message to the real device, which will also kick off + # an AS transaction (as just inserting messages into the DB won't). + number_of_messages = 150 + fake_device_ids = [f"device_{num}" for num in range(number_of_messages - 1)] + messages = { + self.user2: { + device_id: to_device_message_content for device_id in fake_device_ids + } + } + + # Create a fake device per message. We can't send to-device messages to + # a device that doesn't exist. + self.get_success( + self.hs.get_datastore().db_pool.simple_insert_many( + desc="test_application_services_receive_burst_of_to_device", + table="devices", + values=[ + { + "user_id": self.user2, + "device_id": device_id, + } + for device_id in fake_device_ids + ], + ) + ) + + # Seed the device_inbox table with our fake messages + self.get_success( + self.hs.get_datastore().add_messages_to_device_inbox(messages, {}) + ) + + # Now have user1 send a final to-device message to user2. All unsent + # to-device messages should be sent to any application services + # interested in user2. + chan = self.make_request( + "PUT", + "/_matrix/client/r0/sendToDevice/m.room_key_request/4", + content={ + "messages": {self.user2: {self.device2: to_device_message_content}} + }, + access_token=self.token1, + ) + self.assertEqual(chan.code, 200, chan.result) + + # Count the total number of to-device messages that were sent out per-service. + # Ensure that we only sent to-device messages to interested services, and that + # each interested service received the full count of to-device messages. + service_id_to_message_count: Dict[str, int] = {} + self.assertGreater( + self.mock_scheduler.submit_ephemeral_events_for_as.call_count, 0 + ) + for call in self.mock_scheduler.submit_ephemeral_events_for_as.call_args_list: + service, events = call[0] + + # Check that this was made to an interested service + self.assertIn(service, interested_services) + + # Add to the count of messages for this application service + service_id_to_message_count.setdefault(service.id, 0) + service_id_to_message_count[service.id] += len(events) + + # Assert that each interested service received the full count of messages + for count in service_id_to_message_count.values(): + self.assertEqual(count, number_of_messages) + + def _register_interested_and_uninterested_application_services( + self, + interested_count: int = 1, + uninterested_count: int = 1, + ) -> Tuple[List[ApplicationService], List[ApplicationService]]: + """ + Create application services with and without exclusive interest + in user2. + + Args: + interested_count: The number of application services to create + and register with exclusive interest. + uninterested_count: The number of application services to create + and register without any interest. + + Returns: + A two-tuple containing: + * Interested application services + * Uninterested application services + """ + # Create an application service with exclusive interest in user2 + interested_services = [] + uninterested_services = [] + for _ in range(interested_count): + interested_service = self._make_application_service( + namespaces={ + ApplicationService.NS_USERS: [ + { + "regex": "@user2:.+", + "exclusive": True, + } + ], + }, + ) + interested_services.append(interested_service) + + for _ in range(uninterested_count): + uninterested_services.append(self._make_application_service()) + + # Register this application service, along with another, uninterested one + services = [ + *uninterested_services, + *interested_services, + ] + self.hs.get_datastore().get_app_services = Mock(return_value=services) + + return interested_services, uninterested_services + + def _make_application_service( + self, + namespaces: Optional[Dict[str, Iterable[Dict]]] = None, + ) -> ApplicationService: + return ApplicationService( + token=None, + hostname="example.com", + id=random_string(10), + sender="@as:example.com", + rate_limited=False, + namespaces=namespaces, + supports_ephemeral=True, + ) From 2930fe6feada2743fd5b23e68bccd1e840dded0e Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Fri, 5 Nov 2021 16:07:44 +0000 Subject: [PATCH 07/45] Changelog --- changelog.d/11215.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/11215.feature diff --git a/changelog.d/11215.feature b/changelog.d/11215.feature new file mode 100644 index 000000000000..468020834b3d --- /dev/null +++ b/changelog.d/11215.feature @@ -0,0 +1 @@ +Add experimental support for sending to-device messages to application services, as specified by [MSC2409](https://github.com/matrix-org/matrix-doc/pull/2409). Disabled by default. From f65846b55b24fe98bf99ba5ee0e7a6acd828a35f Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Fri, 19 Nov 2021 15:09:46 +0000 Subject: [PATCH 08/45] Make msc2409_to_device_messages_enabled private; remove unnecessary check The second check for self._msc2409_to_device_messages_enabled was not necessary. It's already checked in notify_interested_services_ephemeral earlier. --- synapse/handlers/appservice.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 8994313a3ea8..8d5658f11417 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -55,7 +55,7 @@ def __init__(self, hs: "HomeServer"): self.clock = hs.get_clock() self.notify_appservices = hs.config.appservice.notify_appservices self.event_sources = hs.get_event_sources() - self.msc2409_to_device_messages_enabled = ( + self._msc2409_to_device_messages_enabled = ( hs.config.experimental.msc2409_to_device_messages_enabled ) @@ -236,7 +236,7 @@ def notify_interested_services_ephemeral( # Ignore to-device messages if the feature flag is not enabled if ( stream_key == "to_device_key" - and not self.msc2409_to_device_messages_enabled + and not self._msc2409_to_device_messages_enabled ): return @@ -314,10 +314,7 @@ async def _notify_interested_services_ephemeral( service, "presence", new_token ) - elif ( - stream_key == "to_device_key" - and self.msc2409_to_device_messages_enabled - ): + elif stream_key == "to_device_key": # Retrieve an iterable of to-device message events, as well as the # maximum stream token of the messages we were able to retrieve. events = await self._handle_to_device(service, new_token, users) From ce020c30fc16418a56a0b7e34e17660f923374ef Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Fri, 19 Nov 2021 15:21:49 +0000 Subject: [PATCH 09/45] Move stream filter back into AppserviceHandler --- synapse/handlers/appservice.py | 10 ++++++++++ synapse/notifier.py | 28 +++++++++++----------------- 2 files changed, 21 insertions(+), 17 deletions(-) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 8d5658f11417..fb9ad3ba62d3 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -220,6 +220,16 @@ def notify_interested_services_ephemeral( if not self.notify_appservices: return + # Notify appservices of updates in ephemeral event streams. + # Only the following streams are currently supported. + if stream_key not in ( + "typing_key", + "receipt_key", + "presence_key", + "to_device_key", + ): + return + # Assert that new_token is an integer (and not a RoomStreamToken). # All of the supported streams that this function handles use an # integer to track progress (rather than a RoomStreamToken - a diff --git a/synapse/notifier.py b/synapse/notifier.py index eb316ae41982..3b24a7f4ba7c 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -444,23 +444,17 @@ def on_new_event( self.notify_replication() - # Notify appservices of updates in ephemeral event streams. - # Only the following streams are currently supported. - if stream_key in ( - "typing_key", - "receipt_key", - "presence_key", - "to_device_key", - ): - # Notify appservices. - try: - self.appservice_handler.notify_interested_services_ephemeral( - stream_key, - new_token, - users, - ) - except Exception: - logger.exception("Error notifying application services of event") + # Notify appservices. + try: + self.appservice_handler.notify_interested_services_ephemeral( + stream_key, + new_token, + users, + ) + except Exception: + logger.exception( + "Error notifying application services of ephemeral event" + ) def on_new_replication_data(self) -> None: """Used to inform replication listeners that something has happened From 8f1183cf7b5c52012defa2eede2c9ede8918a2fa Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Fri, 19 Nov 2021 18:17:14 +0000 Subject: [PATCH 10/45] Broaden type hints; update comment --- synapse/appservice/scheduler.py | 10 ++++++---- synapse/handlers/appservice.py | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 6a2ce99b55dc..45cdee33f47d 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -48,7 +48,7 @@ components. """ import logging -from typing import List, Optional +from typing import Iterable, List, Optional from synapse.appservice import ApplicationService, ApplicationServiceState from synapse.events import EventBase @@ -95,8 +95,8 @@ def submit_event_for_as(self, service: ApplicationService, event: EventBase): self.queuer.enqueue_event(service, event) def submit_ephemeral_events_for_as( - self, service: ApplicationService, events: List[JsonDict] - ): + self, service: ApplicationService, events: Iterable[JsonDict] + ) -> None: self.queuer.enqueue_ephemeral(service, events) @@ -130,7 +130,9 @@ def enqueue_event(self, service: ApplicationService, event: EventBase): self.queued_events.setdefault(service.id, []).append(event) self._start_background_request(service) - def enqueue_ephemeral(self, service: ApplicationService, events: List[JsonDict]): + def enqueue_ephemeral( + self, service: ApplicationService, events: Iterable[JsonDict] + ) -> None: self.queued_ephemeral.setdefault(service.id, []).extend(events) self._start_background_request(service) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index fb9ad3ba62d3..1b08ded9fea8 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -325,7 +325,7 @@ async def _notify_interested_services_ephemeral( ) elif stream_key == "to_device_key": - # Retrieve an iterable of to-device message events, as well as the + # Retrieve a list of to-device message events, as well as the # maximum stream token of the messages we were able to retrieve. events = await self._handle_to_device(service, new_token, users) if events: From 401cb2bbda7ae0899800ff98e054033e195ab2ab Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Fri, 19 Nov 2021 18:26:51 +0000 Subject: [PATCH 11/45] Deduplicate ephemeral events to send conditional Test cases needed to be updated, as we now always call submit_ephemeral_events_for_as, it may just be with an empty events list. --- synapse/appservice/scheduler.py | 12 +++++++++++ synapse/handlers/appservice.py | 15 +++---------- tests/handlers/test_appservice.py | 35 +++++++++++++++++-------------- 3 files changed, 34 insertions(+), 28 deletions(-) diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 45cdee33f47d..a21125708864 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -97,6 +97,18 @@ def submit_event_for_as(self, service: ApplicationService, event: EventBase): def submit_ephemeral_events_for_as( self, service: ApplicationService, events: Iterable[JsonDict] ) -> None: + """ + Send ephemeral events to application services, and schedule a new + outgoing AS transaction. + + Args: + service: The service to send ephemeral events to. + events: The ephemeral events to send. + """ + # Ensure we have some events to send + if not events: + return + self.queuer.enqueue_ephemeral(service, events) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 1b08ded9fea8..88806bb78c60 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -302,10 +302,7 @@ async def _notify_interested_services_ephemeral( ): if stream_key == "receipt_key": events = await self._handle_receipts(service, new_token) - if events: - self.scheduler.submit_ephemeral_events_for_as( - service, events - ) + self.scheduler.submit_ephemeral_events_for_as(service, events) # Persist the latest handled stream token for this appservice await self.store.set_type_stream_id_for_appservice( @@ -314,10 +311,7 @@ async def _notify_interested_services_ephemeral( elif stream_key == "presence_key": events = await self._handle_presence(service, users, new_token) - if events: - self.scheduler.submit_ephemeral_events_for_as( - service, events - ) + self.scheduler.submit_ephemeral_events_for_as(service, events) # Persist the latest handled stream token for this appservice await self.store.set_type_stream_id_for_appservice( @@ -328,10 +322,7 @@ async def _notify_interested_services_ephemeral( # Retrieve a list of to-device message events, as well as the # maximum stream token of the messages we were able to retrieve. events = await self._handle_to_device(service, new_token, users) - if events: - self.scheduler.submit_ephemeral_events_for_as( - service, events - ) + self.scheduler.submit_ephemeral_events_for_as(service, events) # Persist the latest handled stream token for this appservice await self.store.set_type_stream_id_for_appservice( diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index af50bfdee84d..fc37b8729d0f 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -47,6 +47,12 @@ def setUp(self): self.handler = ApplicationServicesHandler(hs) self.event_source = hs.get_event_sources() + # Mock the ApplicationServiceScheduler queuer so that we can track any + # outgoing ephemeral events + self.mock_service_queuer = Mock() + self.mock_service_queuer.enqueue_ephemeral = Mock() + hs.get_application_service_handler().scheduler.queuer = self.mock_service_queuer + def test_notify_interested_services(self): interested_service = self._mkservice(is_interested=True) services = [ @@ -279,7 +285,7 @@ def test_notify_interested_services_ephemeral(self): self.handler.notify_interested_services_ephemeral( "receipt_key", 580, ["@fakerecipient:example.com"] ) - self.mock_scheduler.submit_ephemeral_events_for_as.assert_called_once_with( + self.mock_service_queuer.enqueue_ephemeral.assert_called_once_with( interested_service, [event] ) self.mock_store.set_type_stream_id_for_appservice.assert_called_once_with( @@ -309,7 +315,7 @@ def test_notify_interested_services_ephemeral_out_of_order(self): self.handler.notify_interested_services_ephemeral( "receipt_key", 580, ["@fakerecipient:example.com"] ) - self.mock_scheduler.submit_ephemeral_events_for_as.assert_not_called() + self.mock_service_queuer.enqueue_ephemeral.assert_not_called() def _mkservice(self, is_interested, protocols=None): service = Mock() @@ -337,12 +343,11 @@ class ApplicationServiceEphemeralEventsTestCase(unittest.HomeserverTestCase): ] def prepare(self, reactor, clock, hs): - # Mock the application service scheduler so that we can track any - # outgoing transactions - self.mock_scheduler = Mock() - self.mock_scheduler.submit_ephemeral_events_for_as = Mock() - - hs.get_application_service_handler().scheduler = self.mock_scheduler + # Mock the ApplicationServiceScheduler queuer so that we can track any + # outgoing ephemeral events + self.mock_service_queuer = Mock() + self.mock_service_queuer.enqueue_ephemeral = Mock() + hs.get_application_service_handler().scheduler.queuer = self.mock_service_queuer self.device1 = "device1" self.user1 = self.register_user("user1", "password") @@ -391,10 +396,8 @@ def test_application_services_receive_local_to_device(self): # Only the user1 -> user2 to-device message should have been forwarded to the AS. # # The uninterested application service should not have been notified at all. - self.assertEqual( - 1, self.mock_scheduler.submit_ephemeral_events_for_as.call_count - ) - service, events = self.mock_scheduler.submit_ephemeral_events_for_as.call_args[ + self.mock_service_queuer.enqueue_ephemeral.assert_called_once() + service, events = self.mock_service_queuer.enqueue_ephemeral.call_args[ 0 ] @@ -481,14 +484,14 @@ def test_application_services_receive_bursts_of_to_device(self): ) self.assertEqual(chan.code, 200, chan.result) + self.mock_service_queuer.enqueue_ephemeral.assert_called() + # Count the total number of to-device messages that were sent out per-service. # Ensure that we only sent to-device messages to interested services, and that # each interested service received the full count of to-device messages. service_id_to_message_count: Dict[str, int] = {} - self.assertGreater( - self.mock_scheduler.submit_ephemeral_events_for_as.call_count, 0 - ) - for call in self.mock_scheduler.submit_ephemeral_events_for_as.call_args_list: + + for call in self.mock_service_queuer.enqueue_ephemeral.call_args_list: service, events = call[0] # Check that this was made to an interested service From 179dd5ae0cc3c25181435e5108bd50be2b610ab0 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Fri, 19 Nov 2021 18:37:02 +0000 Subject: [PATCH 12/45] _handle_to_device -> _get_to_device_messages --- synapse/handlers/appservice.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 88806bb78c60..816b51e682d7 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -321,8 +321,12 @@ async def _notify_interested_services_ephemeral( elif stream_key == "to_device_key": # Retrieve a list of to-device message events, as well as the # maximum stream token of the messages we were able to retrieve. - events = await self._handle_to_device(service, new_token, users) - self.scheduler.submit_ephemeral_events_for_as(service, events) + to_device_messages = await self._get_to_device_messages( + service, new_token, users + ) + self.scheduler.submit_ephemeral_events_for_as( + service, to_device_messages + ) # Persist the latest handled stream token for this appservice await self.store.set_type_stream_id_for_appservice( @@ -462,7 +466,7 @@ async def _handle_presence( return events - async def _handle_to_device( + async def _get_to_device_messages( self, service: ApplicationService, new_token: int, From 8b0bbc1fb484115c4e5ad69fc471130c6876755b Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Fri, 19 Nov 2021 19:44:13 +0000 Subject: [PATCH 13/45] Rename ApplicationServiceEphemeralEventsTestCase --- tests/handlers/test_appservice.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index fc37b8729d0f..a2260129d49c 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -333,7 +333,11 @@ def _mkservice_alias(self, is_interested_in_alias): return service -class ApplicationServiceEphemeralEventsTestCase(unittest.HomeserverTestCase): +class ApplicationServicesHandlerSendEventsTestCase(unittest.HomeserverTestCase): + """ + Tests that the ApplicationServicesHandler sends events to application + services correctly. + """ servlets = [ synapse.rest.admin.register_servlets_for_client_rest_resource, login.register_servlets, From 31c4b4093b15610ee6a1ad2729c188ce382ff686 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Fri, 19 Nov 2021 19:51:11 +0000 Subject: [PATCH 14/45] Rename user1, user2 in tests to something more useful --- tests/handlers/test_appservice.py | 81 +++++++++++++++++++------------ 1 file changed, 51 insertions(+), 30 deletions(-) diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index a2260129d49c..23dbd6e40c57 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -338,6 +338,7 @@ class ApplicationServicesHandlerSendEventsTestCase(unittest.HomeserverTestCase): Tests that the ApplicationServicesHandler sends events to application services correctly. """ + servlets = [ synapse.rest.admin.register_servlets_for_client_rest_resource, login.register_servlets, @@ -353,22 +354,28 @@ def prepare(self, reactor, clock, hs): self.mock_service_queuer.enqueue_ephemeral = Mock() hs.get_application_service_handler().scheduler.queuer = self.mock_service_queuer - self.device1 = "device1" - self.user1 = self.register_user("user1", "password") - self.token1 = self.login("user1", "password", self.device1) + # A user on the homeserver. + self.local_user_device_id = "local_device" + self.local_user = self.register_user("local_user", "password") + self.local_user_token = self.login( + "local_user", "password", self.local_user_device_id + ) - self.device2 = "device2" - self.user2 = self.register_user("user2", "password") - self.token2 = self.login("user2", "password", self.device2) + # A user on the homeserver which lies within an appservice's exclusive user namespace. + self.exclusive_as_user_device_id = "exclusive_as_device" + self.exclusive_as_user = self.register_user("exclusive_as_user", "password") + self.exclusive_as_user_token = self.login( + "exclusive_as_user", "password", self.exclusive_as_user_device_id + ) @unittest.override_config( {"experimental_features": {"msc2409_to_device_messages_enabled": True}} ) def test_application_services_receive_local_to_device(self): """ - Test that when a user sends a to-device message to another user, and - that is in an application service's user namespace, that application - service will receive it. + Test that when a user sends a to-device message to another user + that is an application service's user namespace, the + application service will receive it. """ ( interested_services, @@ -376,28 +383,38 @@ def test_application_services_receive_local_to_device(self): ) = self._register_interested_and_uninterested_application_services() interested_service = interested_services[0] - # Have user1 send a to-device message to user2 + # Have local_user send a to-device message to exclusive_as_user message_content = {"some_key": "some really interesting value"} chan = self.make_request( "PUT", "/_matrix/client/r0/sendToDevice/m.room_key_request/3", - content={"messages": {self.user2: {self.device2: message_content}}}, - access_token=self.token1, + content={ + "messages": { + self.exclusive_as_user: { + self.exclusive_as_user_device_id: message_content + } + } + }, + access_token=self.local_user_token, ) self.assertEqual(chan.code, 200, chan.result) - # Have user2 send a to-device message to user1 + # Have exclusive_as_user send a to-device message to local_user chan = self.make_request( "PUT", "/_matrix/client/r0/sendToDevice/m.room_key_request/4", - content={"messages": {self.user1: {self.device1: message_content}}}, - access_token=self.token2, + content={ + "messages": { + self.local_user: {self.local_user_device_id: message_content} + } + }, + access_token=self.exclusive_as_user_token, ) self.assertEqual(chan.code, 200, chan.result) - # Check if our application service - that is interested in user2 - received + # Check if our application service - that is interested in exclusive_as_user - received # the to-device message as part of an AS transaction. - # Only the user1 -> user2 to-device message should have been forwarded to the AS. + # Only the local_user -> exclusive_as_user to-device message should have been forwarded to the AS. # # The uninterested application service should not have been notified at all. self.mock_service_queuer.enqueue_ephemeral.assert_called_once() @@ -405,15 +422,15 @@ def test_application_services_receive_local_to_device(self): 0 ] - # Assert that this was the same to-device message that user1 sent + # Assert that this was the same to-device message that local_user sent self.assertEqual(service, interested_service) self.assertEqual(events[0]["type"], "m.room_key_request") - self.assertEqual(events[0]["sender"], self.user1) + self.assertEqual(events[0]["sender"], self.local_user) # Additional fields 'to_user_id' and 'to_device_id' specifically for # to-device messages via the AS API - self.assertEqual(events[0]["to_user_id"], self.user2) - self.assertEqual(events[0]["to_device_id"], self.device2) + self.assertEqual(events[0]["to_user_id"], self.exclusive_as_user) + self.assertEqual(events[0]["to_device_id"], self.exclusive_as_user_device_id) self.assertEqual(events[0]["content"], message_content) @unittest.override_config( @@ -449,7 +466,7 @@ def test_application_services_receive_bursts_of_to_device(self): number_of_messages = 150 fake_device_ids = [f"device_{num}" for num in range(number_of_messages - 1)] messages = { - self.user2: { + self.exclusive_as_user: { device_id: to_device_message_content for device_id in fake_device_ids } } @@ -462,7 +479,7 @@ def test_application_services_receive_bursts_of_to_device(self): table="devices", values=[ { - "user_id": self.user2, + "user_id": self.exclusive_as_user, "device_id": device_id, } for device_id in fake_device_ids @@ -475,16 +492,20 @@ def test_application_services_receive_bursts_of_to_device(self): self.hs.get_datastore().add_messages_to_device_inbox(messages, {}) ) - # Now have user1 send a final to-device message to user2. All unsent + # Now have local_user send a final to-device message to exclusive_as_user. All unsent # to-device messages should be sent to any application services - # interested in user2. + # interested in exclusive_as_user. chan = self.make_request( "PUT", "/_matrix/client/r0/sendToDevice/m.room_key_request/4", content={ - "messages": {self.user2: {self.device2: to_device_message_content}} + "messages": { + self.exclusive_as_user: { + self.exclusive_as_user_device_id: to_device_message_content + } + } }, - access_token=self.token1, + access_token=self.local_user_token, ) self.assertEqual(chan.code, 200, chan.result) @@ -516,7 +537,7 @@ def _register_interested_and_uninterested_application_services( ) -> Tuple[List[ApplicationService], List[ApplicationService]]: """ Create application services with and without exclusive interest - in user2. + in exclusive_as_user. Args: interested_count: The number of application services to create @@ -529,7 +550,7 @@ def _register_interested_and_uninterested_application_services( * Interested application services * Uninterested application services """ - # Create an application service with exclusive interest in user2 + # Create an application service with exclusive interest in exclusive_as_user interested_services = [] uninterested_services = [] for _ in range(interested_count): @@ -537,7 +558,7 @@ def _register_interested_and_uninterested_application_services( namespaces={ ApplicationService.NS_USERS: [ { - "regex": "@user2:.+", + "regex": "@exclusive_as_user:.+", "exclusive": True, } ], From bd9d963af2c2c33774b35652087a37ac7c0114a9 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Fri, 19 Nov 2021 20:25:37 +0000 Subject: [PATCH 15/45] Simplify registration of appservices in tests --- tests/handlers/test_appservice.py | 115 +++++++++++++----------------- 1 file changed, 51 insertions(+), 64 deletions(-) diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index 23dbd6e40c57..90c97d2f8e75 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Dict, Iterable, List, Optional, Tuple +from typing import Dict, Iterable, List, Optional from unittest.mock import Mock from twisted.internet import defer @@ -354,6 +354,10 @@ def prepare(self, reactor, clock, hs): self.mock_service_queuer.enqueue_ephemeral = Mock() hs.get_application_service_handler().scheduler.queuer = self.mock_service_queuer + # Mock out application services, and allow defining our own in tests + self._services: List[ApplicationService] = [] + self.hs.get_datastore().get_app_services = Mock(return_value=self._services) + # A user on the homeserver. self.local_user_device_id = "local_device" self.local_user = self.register_user("local_user", "password") @@ -377,11 +381,16 @@ def test_application_services_receive_local_to_device(self): that is an application service's user namespace, the application service will receive it. """ - ( - interested_services, - _, - ) = self._register_interested_and_uninterested_application_services() - interested_service = interested_services[0] + interested_appservice = self._register_application_service( + namespaces={ + ApplicationService.NS_USERS: [ + { + "regex": "@exclusive_as_user:.+", + "exclusive": True, + } + ], + }, + ) # Have local_user send a to-device message to exclusive_as_user message_content = {"some_key": "some really interesting value"} @@ -418,12 +427,10 @@ def test_application_services_receive_local_to_device(self): # # The uninterested application service should not have been notified at all. self.mock_service_queuer.enqueue_ephemeral.assert_called_once() - service, events = self.mock_service_queuer.enqueue_ephemeral.call_args[ - 0 - ] + service, events = self.mock_service_queuer.enqueue_ephemeral.call_args[0] # Assert that this was the same to-device message that local_user sent - self.assertEqual(service, interested_service) + self.assertEqual(service, interested_appservice) self.assertEqual(events[0]["type"], "m.room_key_request") self.assertEqual(events[0]["sender"], self.local_user) @@ -440,14 +447,26 @@ def test_application_services_receive_bursts_of_to_device(self): """ Test that when a user sends >100 to-device messages at once, any interested AS's will receive them in separate transactions. + + Also tests that uninterested application services do not receive messages. """ - ( - interested_services, - _, - ) = self._register_interested_and_uninterested_application_services( - interested_count=2, - uninterested_count=2, - ) + # Register two application services with exclusive interest in a user + interested_appservices = [] + for _ in range(2): + appservice = self._register_application_service( + namespaces={ + ApplicationService.NS_USERS: [ + { + "regex": "@exclusive_as_user:.+", + "exclusive": True, + } + ], + }, + ) + interested_appservices.append(appservice) + + # ...and an application service which does not have any user interest. + self._register_application_service() to_device_message_content = { "some key": "some interesting value", @@ -520,7 +539,7 @@ def test_application_services_receive_bursts_of_to_device(self): service, events = call[0] # Check that this was made to an interested service - self.assertIn(service, interested_services) + self.assertIn(service, interested_appservices) # Add to the count of messages for this application service service_id_to_message_count.setdefault(service.id, 0) @@ -530,59 +549,22 @@ def test_application_services_receive_bursts_of_to_device(self): for count in service_id_to_message_count.values(): self.assertEqual(count, number_of_messages) - def _register_interested_and_uninterested_application_services( + def _register_application_service( self, - interested_count: int = 1, - uninterested_count: int = 1, - ) -> Tuple[List[ApplicationService], List[ApplicationService]]: + namespaces: Optional[Dict[str, Iterable[Dict]]] = None, + ) -> ApplicationService: """ - Create application services with and without exclusive interest - in exclusive_as_user. + Register a new application service, with the given namespaces of interest. Args: - interested_count: The number of application services to create - and register with exclusive interest. - uninterested_count: The number of application services to create - and register without any interest. + namespaces: A dictionary containing any user, room or alias namespaces that + the application service is interested in. Returns: - A two-tuple containing: - * Interested application services - * Uninterested application services + The registered application service. """ - # Create an application service with exclusive interest in exclusive_as_user - interested_services = [] - uninterested_services = [] - for _ in range(interested_count): - interested_service = self._make_application_service( - namespaces={ - ApplicationService.NS_USERS: [ - { - "regex": "@exclusive_as_user:.+", - "exclusive": True, - } - ], - }, - ) - interested_services.append(interested_service) - - for _ in range(uninterested_count): - uninterested_services.append(self._make_application_service()) - - # Register this application service, along with another, uninterested one - services = [ - *uninterested_services, - *interested_services, - ] - self.hs.get_datastore().get_app_services = Mock(return_value=services) - - return interested_services, uninterested_services - - def _make_application_service( - self, - namespaces: Optional[Dict[str, Iterable[Dict]]] = None, - ) -> ApplicationService: - return ApplicationService( + # Create an application service + appservice = ApplicationService( token=None, hostname="example.com", id=random_string(10), @@ -591,3 +573,8 @@ def _make_application_service( namespaces=namespaces, supports_ephemeral=True, ) + + # Register the application service + self._services.append(appservice) + + return appservice From 8f8226af3a1977f7ec2fdcc839074ab3093554f7 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Mon, 22 Nov 2021 16:50:50 +0000 Subject: [PATCH 16/45] Fix existing unit tests There is so much mocking going on here. I look forward to replacing these one day. --- tests/handlers/test_appservice.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index 90c97d2f8e75..0bbbce71fd8c 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -47,12 +47,6 @@ def setUp(self): self.handler = ApplicationServicesHandler(hs) self.event_source = hs.get_event_sources() - # Mock the ApplicationServiceScheduler queuer so that we can track any - # outgoing ephemeral events - self.mock_service_queuer = Mock() - self.mock_service_queuer.enqueue_ephemeral = Mock() - hs.get_application_service_handler().scheduler.queuer = self.mock_service_queuer - def test_notify_interested_services(self): interested_service = self._mkservice(is_interested=True) services = [ @@ -285,7 +279,7 @@ def test_notify_interested_services_ephemeral(self): self.handler.notify_interested_services_ephemeral( "receipt_key", 580, ["@fakerecipient:example.com"] ) - self.mock_service_queuer.enqueue_ephemeral.assert_called_once_with( + self.mock_scheduler.submit_ephemeral_events_for_as.assert_called_once_with( interested_service, [event] ) self.mock_store.set_type_stream_id_for_appservice.assert_called_once_with( @@ -315,7 +309,10 @@ def test_notify_interested_services_ephemeral_out_of_order(self): self.handler.notify_interested_services_ephemeral( "receipt_key", 580, ["@fakerecipient:example.com"] ) - self.mock_service_queuer.enqueue_ephemeral.assert_not_called() + # This method will be called, but with an empty list of events + self.mock_scheduler.submit_ephemeral_events_for_as.assert_called_once_with( + interested_service, [] + ) def _mkservice(self, is_interested, protocols=None): service = Mock() From b4a4b45dffb682c58030468e1aa94a922373086e Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Wed, 24 Nov 2021 15:19:57 +0000 Subject: [PATCH 17/45] rename set_type_stream_id_for_appservice -> set_appservice_stream_type_pos --- synapse/handlers/appservice.py | 8 ++++---- synapse/storage/databases/main/appservice.py | 6 +++--- tests/handlers/test_appservice.py | 2 +- tests/storage/test_appservice.py | 10 +++++----- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 816b51e682d7..c7ef6d2a9cae 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -283,7 +283,7 @@ async def _notify_interested_services_ephemeral( with Measure(self.clock, "notify_interested_services_ephemeral"): for service in services: if stream_key == "typing_key": - # Note that we don't persist the token (via set_type_stream_id_for_appservice) + # Note that we don't persist the token (via set_appservice_stream_type_pos) # for typing_key due to performance reasons and due to their highly # ephemeral nature. # @@ -305,7 +305,7 @@ async def _notify_interested_services_ephemeral( self.scheduler.submit_ephemeral_events_for_as(service, events) # Persist the latest handled stream token for this appservice - await self.store.set_type_stream_id_for_appservice( + await self.store.set_appservice_stream_type_pos( service, "read_receipt", new_token ) @@ -314,7 +314,7 @@ async def _notify_interested_services_ephemeral( self.scheduler.submit_ephemeral_events_for_as(service, events) # Persist the latest handled stream token for this appservice - await self.store.set_type_stream_id_for_appservice( + await self.store.set_appservice_stream_type_pos( service, "presence", new_token ) @@ -329,7 +329,7 @@ async def _notify_interested_services_ephemeral( ) # Persist the latest handled stream token for this appservice - await self.store.set_type_stream_id_for_appservice( + await self.store.set_appservice_stream_type_pos( service, "to_device", new_token ) diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py index 92e6a491c98d..1986e36d5230 100644 --- a/synapse/storage/databases/main/appservice.py +++ b/synapse/storage/databases/main/appservice.py @@ -411,7 +411,7 @@ def get_type_stream_id_for_appservice_txn(txn): "get_type_stream_id_for_appservice", get_type_stream_id_for_appservice_txn ) - async def set_type_stream_id_for_appservice( + async def set_appservice_stream_type_pos( self, service: ApplicationService, stream_type: str, pos: Optional[int] ) -> None: if stream_type not in ("read_receipt", "presence", "to_device"): @@ -420,7 +420,7 @@ async def set_type_stream_id_for_appservice( % (stream_type,) ) - def set_type_stream_id_for_appservice_txn(txn): + def set_appservice_stream_type_pos_txn(txn): stream_id_type = "%s_stream_id" % stream_type txn.execute( "UPDATE application_services_state SET %s = ? WHERE as_id=?" @@ -429,7 +429,7 @@ def set_type_stream_id_for_appservice_txn(txn): ) await self.db_pool.runInteraction( - "set_type_stream_id_for_appservice", set_type_stream_id_for_appservice_txn + "set_appservice_stream_type_pos", set_appservice_stream_type_pos_txn ) diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index 0bbbce71fd8c..6ca9c7f9f1f1 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -282,7 +282,7 @@ def test_notify_interested_services_ephemeral(self): self.mock_scheduler.submit_ephemeral_events_for_as.assert_called_once_with( interested_service, [event] ) - self.mock_store.set_type_stream_id_for_appservice.assert_called_once_with( + self.mock_store.set_appservice_stream_type_pos.assert_called_once_with( interested_service, "read_receipt", 580, diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index f26d5acf9c29..de10b69b7cb3 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -433,10 +433,10 @@ def test_get_type_stream_id_for_appservice_invalid_type(self): ValueError, ) - def test_set_type_stream_id_for_appservice(self): + def test_set_appservice_stream_type_pos(self): read_receipt_value = 1024 self.get_success( - self.store.set_type_stream_id_for_appservice( + self.store.set_appservice_stream_type_pos( self.service, "read_receipt", read_receipt_value ) ) @@ -446,7 +446,7 @@ def test_set_type_stream_id_for_appservice(self): self.assertEqual(result, read_receipt_value) self.get_success( - self.store.set_type_stream_id_for_appservice( + self.store.set_appservice_stream_type_pos( self.service, "presence", read_receipt_value ) ) @@ -455,9 +455,9 @@ def test_set_type_stream_id_for_appservice(self): ) self.assertEqual(result, read_receipt_value) - def test_set_type_stream_id_for_appservice_invalid_type(self): + def test_set_appservice_stream_type_pos_invalid_type(self): self.get_failure( - self.store.set_type_stream_id_for_appservice(self.service, "foobar", 1024), + self.store.set_appservice_stream_type_pos(self.service, "foobar", 1024), ValueError, ) From c691ef0992da5ad3b4998a698a39b97acfb81aea Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Wed, 24 Nov 2021 16:08:49 +0000 Subject: [PATCH 18/45] Add some FIXME comments --- synapse/handlers/appservice.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index c7ef6d2a9cae..4a7613b26261 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -222,6 +222,7 @@ def notify_interested_services_ephemeral( # Notify appservices of updates in ephemeral event streams. # Only the following streams are currently supported. + # FIXME: We should use constants for these values. if stream_key not in ( "typing_key", "receipt_key", @@ -494,6 +495,8 @@ async def _get_to_device_messages( # Filter out users that this appservice is not interested in users_appservice_is_interested_in: List[str] = [] for user in users: + # FIXME: We should do this farther up the call stack. We currently repeat + # this operation in _handle_presence. if isinstance(user, UserID): user = user.to_string() From 7cf6ad91972f230dbd59caab25f5edc9445cce44 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Wed, 24 Nov 2021 16:20:41 +0000 Subject: [PATCH 19/45] Add comment on why we don't NOT NULL to_device_stream_id --- .../65/06_msc2409_add_device_id_appservice_stream_type.sql | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/synapse/storage/schema/main/delta/65/06_msc2409_add_device_id_appservice_stream_type.sql b/synapse/storage/schema/main/delta/65/06_msc2409_add_device_id_appservice_stream_type.sql index 3ad9780d1e78..7b4024128226 100644 --- a/synapse/storage/schema/main/delta/65/06_msc2409_add_device_id_appservice_stream_type.sql +++ b/synapse/storage/schema/main/delta/65/06_msc2409_add_device_id_appservice_stream_type.sql @@ -15,4 +15,9 @@ -- Add a column to track what to_device stream id that this application -- service has been caught up to. + +-- We explicitly don't set this field as "NOT NULL", as having NULL as a possible +-- state is useful for determining if we've ever sent traffic for a stream type +-- to an appservice. See https://github.com/matrix-org/synapse/issues/10836 for +-- one way this can be used. ALTER TABLE application_services_state ADD COLUMN to_device_stream_id BIGINT; \ No newline at end of file From 6d68b8a4e8ac160f1e050a547a76efaa02793929 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Fri, 3 Dec 2021 20:00:30 +0000 Subject: [PATCH 20/45] Refactor and generalise the sending of arbitrary fields over AS transactions Things were starting to get a little inflexible as we kept adding new types of data to send to application services. It's better to just have one method for adding data to AS transactions, than one for each type of data. Note that subsequent PRs will need to add device lists, one-time keys and fallback keys to these transactions. Adding those are additional arguments to a method is much nicer than a new method for each one. Plus with this setup we can add multiple different types of data at once without kicking off an AS transaction for each type. This will be useful for OTK/fallback keys, as we plan to lazily attach those when handling other event types. --- synapse/appservice/scheduler.py | 62 ++++++++++++++++++--------------- synapse/handlers/appservice.py | 16 +++++---- 2 files changed, 44 insertions(+), 34 deletions(-) diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index a21125708864..305e751d46c4 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -48,7 +48,7 @@ components. """ import logging -from typing import Iterable, List, Optional +from typing import Dict, Iterable, List, Optional from synapse.appservice import ApplicationService, ApplicationServiceState from synapse.events import EventBase @@ -91,25 +91,31 @@ async def start(self): for service in services: self.txn_ctrl.start_recoverer(service) - def submit_event_for_as(self, service: ApplicationService, event: EventBase): - self.queuer.enqueue_event(service, event) - - def submit_ephemeral_events_for_as( - self, service: ApplicationService, events: Iterable[JsonDict] + def enqueue_for_appservice( + self, + appservice: ApplicationService, + events: Optional[Iterable[EventBase]] = None, + ephemeral: Optional[Iterable[JsonDict]] = None, ) -> None: """ - Send ephemeral events to application services, and schedule a new - outgoing AS transaction. - + Enqueue some data to be sent off to an application service. Args: - service: The service to send ephemeral events to. - events: The ephemeral events to send. + appservice: The application service to create and send a transaction to. + events: The persistent room events to send. + ephemeral: The ephemeral events to send. """ - # Ensure we have some events to send - if not events: + # We purposefully allow this method to run with empty events/ephemeral + # iterables, so that callers do not need to check iterable size themselves. + if not events and not ephemeral and not to_device_messages: return - self.queuer.enqueue_ephemeral(service, events) + if events: + self.queuer.queued_events.setdefault(appservice.id, []).extend(events) + if ephemeral: + self.queuer.queued_ephemeral.setdefault(appservice.id, []).extend(ephemeral) + + # Kick off a new application service transaction + self.queuer.start_background_request(appservice) class _ServiceQueuer: @@ -121,15 +127,17 @@ class _ServiceQueuer: """ def __init__(self, txn_ctrl, clock): - self.queued_events = {} # dict of {service_id: [events]} - self.queued_ephemeral = {} # dict of {service_id: [events]} + # dict of {service_id: [events]} + self.queued_events: Dict[str, List[EventBase]] = {} + # dict of {service_id: [event_json]} + self.queued_ephemeral: Dict[str, List[JsonDict]] = {} # the appservices which currently have a transaction in flight self.requests_in_flight = set() self.txn_ctrl = txn_ctrl self.clock = clock - def _start_background_request(self, service): + def start_background_request(self, service): # start a sender for this appservice if we don't already have one if service.id in self.requests_in_flight: return @@ -138,16 +146,6 @@ def _start_background_request(self, service): "as-sender-%s" % (service.id,), self._send_request, service ) - def enqueue_event(self, service: ApplicationService, event: EventBase): - self.queued_events.setdefault(service.id, []).append(event) - self._start_background_request(service) - - def enqueue_ephemeral( - self, service: ApplicationService, events: Iterable[JsonDict] - ) -> None: - self.queued_ephemeral.setdefault(service.id, []).extend(events) - self._start_background_request(service) - async def _send_request(self, service: ApplicationService): # sanity-check: we shouldn't get here if this service already has a sender # running. @@ -205,7 +203,15 @@ async def send( service: ApplicationService, events: List[EventBase], ephemeral: Optional[List[JsonDict]] = None, - ): + ) -> None: + """ + Create a transaction with the given data and send to the provided + application service. + Args: + service: The application service to send the transaction to. + events: The persistent events to include in the transaction. + ephemeral: The ephemeral events to include in the transaction. + """ try: txn = await self.store.create_appservice_txn( service=service, events=events, ephemeral=ephemeral or [] diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 4a7613b26261..5a28ceac4382 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -135,7 +135,9 @@ async def start_scheduler() -> None: # Fork off pushes to these services for service in services: - self.scheduler.submit_event_for_as(service, event) + self.scheduler.enqueue_for_appservice( + service, events=[event] + ) now = self.clock.time_msec() ts = await self.store.get_received_ts(event.event_id) @@ -292,7 +294,7 @@ async def _notify_interested_services_ephemeral( # and, if they apply to this application service, send it off. events = await self._handle_typing(service, new_token) if events: - self.scheduler.submit_ephemeral_events_for_as(service, events) + self.scheduler.enqueue_for_appservice(service, ephemeral=events) continue # Since we read/update the stream position for this AS/stream @@ -303,7 +305,7 @@ async def _notify_interested_services_ephemeral( ): if stream_key == "receipt_key": events = await self._handle_receipts(service, new_token) - self.scheduler.submit_ephemeral_events_for_as(service, events) + self.scheduler.enqueue_for_appservice(service, ephemeral=events) # Persist the latest handled stream token for this appservice await self.store.set_appservice_stream_type_pos( @@ -312,7 +314,7 @@ async def _notify_interested_services_ephemeral( elif stream_key == "presence_key": events = await self._handle_presence(service, users, new_token) - self.scheduler.submit_ephemeral_events_for_as(service, events) + self.scheduler.enqueue_for_appservice(service, ephemeral=events) # Persist the latest handled stream token for this appservice await self.store.set_appservice_stream_type_pos( @@ -325,8 +327,10 @@ async def _notify_interested_services_ephemeral( to_device_messages = await self._get_to_device_messages( service, new_token, users ) - self.scheduler.submit_ephemeral_events_for_as( - service, to_device_messages + # REVIEW: In a subsequent commit, we'll move this to a to-device-specific + # key in the AS transaction. + self.scheduler.enqueue_for_appservice( + service, ephemeral=to_device_messages ) # Persist the latest handled stream token for this appservice From 13b25cf51e41e30104c628032284b045a052bcc0 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Fri, 3 Dec 2021 20:01:10 +0000 Subject: [PATCH 21/45] Fix tests to mock _TransactionController.send of ApplicationServiceScheduler.enqueue* With enqueue_for_appservice being called per-event per-appservice, it's not a great target for mocking. So we use _TransactionController.send instead, to track things that are actually sent out to AS's. This also has the benefit of testing a wider bit of the AS txn pipeline --- synapse/appservice/scheduler.py | 1 + tests/appservice/test_scheduler.py | 46 ++++++++++++++++++------------ tests/handlers/test_appservice.py | 13 +++++---- 3 files changed, 35 insertions(+), 25 deletions(-) diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 305e751d46c4..31b297c7a136 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -207,6 +207,7 @@ async def send( """ Create a transaction with the given data and send to the provided application service. + Args: service: The application service to send the transaction to. events: The persistent events to include in the transaction. diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py index 55f0899bae7d..7ca65a23df60 100644 --- a/tests/appservice/test_scheduler.py +++ b/tests/appservice/test_scheduler.py @@ -14,14 +14,17 @@ from unittest.mock import Mock from twisted.internet import defer +from twisted.internet.testing import MemoryReactor from synapse.appservice import ApplicationServiceState from synapse.appservice.scheduler import ( + ApplicationServiceScheduler, _Recoverer, - _ServiceQueuer, _TransactionController, ) from synapse.logging.context import make_deferred_yieldable +from synapse.server import HomeServer +from synapse.util import Clock from tests import unittest from tests.test_utils import simple_async_mock @@ -189,17 +192,21 @@ def take_txn(*args, **kwargs): self.callback.assert_called_once_with(self.recoverer) -class ApplicationServiceSchedulerQueuerTestCase(unittest.TestCase): - def setUp(self): +class ApplicationServiceSchedulerQueuerTestCase(unittest.HomeserverTestCase): + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer): + self.scheduler = ApplicationServiceScheduler(hs) self.txn_ctrl = Mock() self.txn_ctrl.send = simple_async_mock() - self.queuer = _ServiceQueuer(self.txn_ctrl, MockClock()) + + # Replace instantiated _TransactionController instances with our Mock + self.scheduler.txn_ctrl = self.txn_ctrl + self.scheduler.queuer.txn_ctrl = self.txn_ctrl def test_send_single_event_no_queue(self): # Expect the event to be sent immediately. service = Mock(id=4) event = Mock() - self.queuer.enqueue_event(service, event) + self.scheduler.enqueue_for_appservice(service, events=[event]) self.txn_ctrl.send.assert_called_once_with(service, [event], []) def test_send_single_event_with_queue(self): @@ -212,10 +219,11 @@ def test_send_single_event_with_queue(self): event2 = Mock(event_id="second") event3 = Mock(event_id="third") # Send an event and don't resolve it just yet. - self.queuer.enqueue_event(service, event) + self.scheduler.enqueue_for_appservice(service, events=[event]) # Send more events: expect send() to NOT be called multiple times. - self.queuer.enqueue_event(service, event2) - self.queuer.enqueue_event(service, event3) + # (call enqueue_for_appservice multiple times deliberately) + self.scheduler.enqueue_for_appservice(service, events=[event2]) + self.scheduler.enqueue_for_appservice(service, events=[event3]) self.txn_ctrl.send.assert_called_with(service, [event], []) self.assertEquals(1, self.txn_ctrl.send.call_count) # Resolve the send event: expect the queued events to be sent @@ -244,11 +252,11 @@ def do_send(x, y, z): self.txn_ctrl.send = Mock(side_effect=do_send) # send events for different ASes and make sure they are sent - self.queuer.enqueue_event(srv1, srv_1_event) - self.queuer.enqueue_event(srv1, srv_1_event2) + self.scheduler.enqueue_for_appservice(srv1, events=[srv_1_event]) + self.scheduler.enqueue_for_appservice(srv1, events=[srv_1_event2]) self.txn_ctrl.send.assert_called_with(srv1, [srv_1_event], []) - self.queuer.enqueue_event(srv2, srv_2_event) - self.queuer.enqueue_event(srv2, srv_2_event2) + self.scheduler.enqueue_for_appservice(srv2, events=[srv_2_event]) + self.scheduler.enqueue_for_appservice(srv2, events=[srv_2_event2]) self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event], []) # make sure callbacks for a service only send queued events for THAT @@ -270,7 +278,7 @@ def do_send(x, y, z): service = Mock(id=4, name="service") event_list = [Mock(name="event%i" % (i + 1)) for i in range(200)] for event in event_list: - self.queuer.enqueue_event(service, event) + self.scheduler.enqueue_for_appservice(service, [event], []) # Expect the first event to be sent immediately. self.txn_ctrl.send.assert_called_with(service, [event_list[0]], []) @@ -286,14 +294,14 @@ def test_send_single_ephemeral_no_queue(self): # Expect the event to be sent immediately. service = Mock(id=4, name="service") event_list = [Mock(name="event")] - self.queuer.enqueue_ephemeral(service, event_list) + self.scheduler.enqueue_for_appservice(service, ephemeral=event_list) self.txn_ctrl.send.assert_called_once_with(service, [], event_list) def test_send_multiple_ephemeral_no_queue(self): # Expect the event to be sent immediately. service = Mock(id=4, name="service") event_list = [Mock(name="event1"), Mock(name="event2"), Mock(name="event3")] - self.queuer.enqueue_ephemeral(service, event_list) + self.scheduler.enqueue_for_appservice(service, ephemeral=event_list) self.txn_ctrl.send.assert_called_once_with(service, [], event_list) def test_send_single_ephemeral_with_queue(self): @@ -307,10 +315,10 @@ def test_send_single_ephemeral_with_queue(self): event_list_3 = [Mock(event_id="event5"), Mock(event_id="event6")] # Send an event and don't resolve it just yet. - self.queuer.enqueue_ephemeral(service, event_list_1) + self.scheduler.enqueue_for_appservice(service, ephemeral=event_list_1) # Send more events: expect send() to NOT be called multiple times. - self.queuer.enqueue_ephemeral(service, event_list_2) - self.queuer.enqueue_ephemeral(service, event_list_3) + self.scheduler.enqueue_for_appservice(service, ephemeral=event_list_2) + self.scheduler.enqueue_for_appservice(service, ephemeral=event_list_3) self.txn_ctrl.send.assert_called_with(service, [], event_list_1) self.assertEquals(1, self.txn_ctrl.send.call_count) # Resolve txn_ctrl.send @@ -329,7 +337,7 @@ def test_send_large_txns_ephemeral(self): first_chunk = [Mock(name="event%i" % (i + 1)) for i in range(100)] second_chunk = [Mock(name="event%i" % (i + 101)) for i in range(50)] event_list = first_chunk + second_chunk - self.queuer.enqueue_ephemeral(service, event_list) + self.scheduler.enqueue_for_appservice(service, ephemeral=event_list) self.txn_ctrl.send.assert_called_once_with(service, [], first_chunk) d.callback(service) self.txn_ctrl.send.assert_called_with(service, [], second_chunk) diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index 6ca9c7f9f1f1..c1c5dc79f966 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -41,6 +41,7 @@ def setUp(self): hs.get_datastore.return_value = self.mock_store self.mock_store.get_received_ts.return_value = make_awaitable(0) self.mock_store.set_appservice_last_pos.return_value = make_awaitable(None) + self.mock_store.set_appservice_stream_type_pos.return_value = make_awaitable(None) hs.get_application_service_api.return_value = self.mock_as_api hs.get_application_service_scheduler.return_value = self.mock_scheduler hs.get_clock.return_value = MockClock() @@ -68,8 +69,8 @@ def test_notify_interested_services(self): ] self.handler.notify_interested_services(RoomStreamToken(None, 1)) - self.mock_scheduler.submit_event_for_as.assert_called_once_with( - interested_service, event + self.mock_scheduler.enqueue_for_appservice.assert_called_once_with( + interested_service, events=[event] ) def test_query_user_exists_unknown_user(self): @@ -279,8 +280,8 @@ def test_notify_interested_services_ephemeral(self): self.handler.notify_interested_services_ephemeral( "receipt_key", 580, ["@fakerecipient:example.com"] ) - self.mock_scheduler.submit_ephemeral_events_for_as.assert_called_once_with( - interested_service, [event] + self.mock_scheduler.enqueue_for_appservice.assert_called_once_with( + interested_service, ephemeral=[event] ) self.mock_store.set_appservice_stream_type_pos.assert_called_once_with( interested_service, @@ -310,8 +311,8 @@ def test_notify_interested_services_ephemeral_out_of_order(self): "receipt_key", 580, ["@fakerecipient:example.com"] ) # This method will be called, but with an empty list of events - self.mock_scheduler.submit_ephemeral_events_for_as.assert_called_once_with( - interested_service, [] + self.mock_scheduler.enqueue_for_appservice.assert_called_once_with( + interested_service, ephemeral=[] ) def _mkservice(self, is_interested, protocols=None): From 275e1e0b3af2ddf6a8abd33834edb49613fd8ace Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Fri, 3 Dec 2021 19:17:32 +0000 Subject: [PATCH 22/45] Add to-device messages as their own special section in AS txns --- synapse/appservice/__init__.py | 3 ++ synapse/appservice/api.py | 30 ++++++++++++++++---- synapse/storage/databases/main/appservice.py | 14 +++++++-- 3 files changed, 39 insertions(+), 8 deletions(-) diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index 6504c6bd3f59..0ca8b2ae4029 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -329,11 +329,13 @@ def __init__( id: int, events: List[EventBase], ephemeral: List[JsonDict], + to_device_messages: List[JsonDict], ): self.service = service self.id = id self.events = events self.ephemeral = ephemeral + self.to_device_messages = to_device_messages async def send(self, as_api: "ApplicationServiceApi") -> bool: """Sends this transaction using the provided AS API interface. @@ -347,6 +349,7 @@ async def send(self, as_api: "ApplicationServiceApi") -> bool: service=self.service, events=self.events, ephemeral=self.ephemeral, + to_device_messages=self.to_device_messages, txn_id=self.id, ) diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index d08f6bbd7f2e..a54b4e867dd6 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -13,7 +13,7 @@ # limitations under the License. import logging import urllib -from typing import TYPE_CHECKING, List, Optional, Tuple +from typing import TYPE_CHECKING, Dict, List, Optional, Tuple from prometheus_client import Counter @@ -204,12 +204,25 @@ async def push_bulk( service: "ApplicationService", events: List[EventBase], ephemeral: List[JsonDict], + to_device_messages: List[JsonDict], txn_id: Optional[int] = None, - ): + ) -> bool: + """ + Push data to an application service. + Args: + service: The application service to send to. + events: The persistent events to send. + ephemeral: The ephemeral events to send. + to_device_messages: The to-device messages to send. + txn_id: An unique ID to assign to this transaction. Application services should + deduplicate transactions received with identitical IDs. + Returns: + True if the task succeeded, False if it failed. + """ if service.url is None: return True - events = self._serialize(service, events) + serialized_events = self._serialize(service, events) if txn_id is None: logger.warning( @@ -220,10 +233,15 @@ async def push_bulk( uri = service.url + ("/transactions/%s" % urllib.parse.quote(str(txn_id))) # Never send ephemeral events to appservices that do not support it + body: Dict[str, List[JsonDict]] = {"events": serialized_events} if service.supports_ephemeral: - body = {"events": events, "de.sorunome.msc2409.ephemeral": ephemeral} - else: - body = {"events": events} + body.update( + { + # TODO: Update to stable prefixes once MSC2409 completes FCP merge. + "de.sorunome.msc2409.ephemeral": ephemeral, + "de.sorunome.msc2409.to_device": to_device_messages, + } + ) try: await self.put_json( diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py index 1986e36d5230..b88e6e1a7578 100644 --- a/synapse/storage/databases/main/appservice.py +++ b/synapse/storage/databases/main/appservice.py @@ -194,6 +194,7 @@ async def create_appservice_txn( service: ApplicationService, events: List[EventBase], ephemeral: List[JsonDict], + to_device_messages: List[JsonDict], ) -> AppServiceTransaction: """Atomically creates a new transaction for this application service with the given list of events. Ephemeral events are NOT persisted to the @@ -203,6 +204,7 @@ async def create_appservice_txn( service: The service who the transaction is for. events: A list of persistent events to put in the transaction. ephemeral: A list of ephemeral events to put in the transaction. + to_device_messages: A list of to-device messages to put in the transaction. Returns: A new transaction. @@ -233,7 +235,11 @@ def _create_appservice_txn(txn): (service.id, new_txn_id, event_ids), ) return AppServiceTransaction( - service=service, id=new_txn_id, events=events, ephemeral=ephemeral + service=service, + id=new_txn_id, + events=events, + ephemeral=ephemeral, + to_device_messages=to_device_messages, ) return await self.db_pool.runInteraction( @@ -326,7 +332,11 @@ def _get_oldest_unsent_txn(txn): events = await self.get_events_as_list(event_ids) return AppServiceTransaction( - service=service, id=entry["txn_id"], events=events, ephemeral=[] + service=service, + id=entry["txn_id"], + events=events, + ephemeral=[], + to_device_messages=[], ) def _get_last_txn(self, txn, service_id: Optional[str]) -> int: From 403490de8b633327e300f249bfd798097bc12c8a Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Fri, 3 Dec 2021 19:22:22 +0000 Subject: [PATCH 23/45] Insert to-device messages into the new to-device part of AS txns --- synapse/appservice/scheduler.py | 34 ++++++++++++++++++++++++++++++--- synapse/handlers/appservice.py | 4 +--- 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 31b297c7a136..dae952dc1362 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -65,6 +65,9 @@ # Maximum number of ephemeral events to provide in an AS transaction. MAX_EPHEMERAL_EVENTS_PER_TRANSACTION = 100 +# Maximum number of to-device messages to provide in an AS transaction. +MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION = 100 + class ApplicationServiceScheduler: """Public facing API for this module. Does the required DI to tie the @@ -96,6 +99,7 @@ def enqueue_for_appservice( appservice: ApplicationService, events: Optional[Iterable[EventBase]] = None, ephemeral: Optional[Iterable[JsonDict]] = None, + to_device_messages: Optional[Iterable[JsonDict]] = None, ) -> None: """ Enqueue some data to be sent off to an application service. @@ -103,6 +107,9 @@ def enqueue_for_appservice( appservice: The application service to create and send a transaction to. events: The persistent room events to send. ephemeral: The ephemeral events to send. + to_device_messages: The to-device messages to send. These differ from normal + to-device messages sent to clients, as they have 'to_device_id' and + 'to_user_id' fields. """ # We purposefully allow this method to run with empty events/ephemeral # iterables, so that callers do not need to check iterable size themselves. @@ -113,6 +120,10 @@ def enqueue_for_appservice( self.queuer.queued_events.setdefault(appservice.id, []).extend(events) if ephemeral: self.queuer.queued_ephemeral.setdefault(appservice.id, []).extend(ephemeral) + if to_device_messages: + self.queuer.queued_to_device_messages.setdefault(appservice.id, []).extend( + to_device_messages + ) # Kick off a new application service transaction self.queuer.start_background_request(appservice) @@ -131,6 +142,8 @@ def __init__(self, txn_ctrl, clock): self.queued_events: Dict[str, List[EventBase]] = {} # dict of {service_id: [event_json]} self.queued_ephemeral: Dict[str, List[JsonDict]] = {} + # dict of {service_id: [to_device_message_json]} + self.queued_to_device_messages: Dict[str, List[JsonDict]] = {} # the appservices which currently have a transaction in flight self.requests_in_flight = set() @@ -162,11 +175,21 @@ async def _send_request(self, service: ApplicationService): ephemeral = all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION] del all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION] - if not events and not ephemeral: + all_to_device_messages = self.queued_to_device_messages.get( + service.id, [] + ) + to_device_messages_to_send = all_to_device_messages[ + :MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION + ] + del all_to_device_messages[:MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION] + + if not events and not ephemeral and not to_device_messages_to_send: return try: - await self.txn_ctrl.send(service, events, ephemeral) + await self.txn_ctrl.send( + service, events, ephemeral, to_device_messages_to_send + ) except Exception: logger.exception("AS request failed") finally: @@ -203,6 +226,7 @@ async def send( service: ApplicationService, events: List[EventBase], ephemeral: Optional[List[JsonDict]] = None, + to_device_messages: Optional[List[JsonDict]] = None, ) -> None: """ Create a transaction with the given data and send to the provided @@ -212,10 +236,14 @@ async def send( service: The application service to send the transaction to. events: The persistent events to include in the transaction. ephemeral: The ephemeral events to include in the transaction. + to_device_messages: The to-device messages to include in the transaction. """ try: txn = await self.store.create_appservice_txn( - service=service, events=events, ephemeral=ephemeral or [] + service=service, + events=events, + ephemeral=ephemeral or [], + to_device_messages=to_device_messages or [], ) service_is_up = await self._is_service_up(service) if service_is_up: diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 5a28ceac4382..c92668642a5d 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -327,10 +327,8 @@ async def _notify_interested_services_ephemeral( to_device_messages = await self._get_to_device_messages( service, new_token, users ) - # REVIEW: In a subsequent commit, we'll move this to a to-device-specific - # key in the AS transaction. self.scheduler.enqueue_for_appservice( - service, ephemeral=to_device_messages + service, to_device_messages=to_device_messages ) # Persist the latest handled stream token for this appservice From 385b3bf0561a208fa8bd17076ba3d7b2a0588c4d Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Fri, 3 Dec 2021 19:39:23 +0000 Subject: [PATCH 24/45] Modify tests to handle new location of to-device messages in AS txns --- tests/appservice/test_scheduler.py | 60 +++++++++++++++--------------- tests/handlers/test_appservice.py | 39 ++++++++++--------- 2 files changed, 52 insertions(+), 47 deletions(-) diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py index 7ca65a23df60..8f9afa85386c 100644 --- a/tests/appservice/test_scheduler.py +++ b/tests/appservice/test_scheduler.py @@ -61,7 +61,10 @@ def test_single_service_up_txn_sent(self): self.successResultOf(defer.ensureDeferred(self.txnctrl.send(service, events))) self.store.create_appservice_txn.assert_called_once_with( - service=service, events=events, ephemeral=[] # txn made and saved + service=service, + events=events, + ephemeral=[], + to_device_messages=[], # txn made and saved ) self.assertEquals(0, len(self.txnctrl.recoverers)) # no recoverer made txn.complete.assert_called_once_with(self.store) # txn completed @@ -82,7 +85,10 @@ def test_single_service_down(self): self.successResultOf(defer.ensureDeferred(self.txnctrl.send(service, events))) self.store.create_appservice_txn.assert_called_once_with( - service=service, events=events, ephemeral=[] # txn made and saved + service=service, + events=events, + ephemeral=[], + to_device_messages=[], # txn made and saved ) self.assertEquals(0, txn.send.call_count) # txn not sent though self.assertEquals(0, txn.complete.call_count) # or completed @@ -105,7 +111,7 @@ def test_single_service_up_txn_not_sent(self): self.successResultOf(defer.ensureDeferred(self.txnctrl.send(service, events))) self.store.create_appservice_txn.assert_called_once_with( - service=service, events=events, ephemeral=[] + service=service, events=events, ephemeral=[], to_device_messages=[] ) self.assertEquals(1, self.recoverer_fn.call_count) # recoverer made self.assertEquals(1, self.recoverer.recover.call_count) # and invoked @@ -207,13 +213,11 @@ def test_send_single_event_no_queue(self): service = Mock(id=4) event = Mock() self.scheduler.enqueue_for_appservice(service, events=[event]) - self.txn_ctrl.send.assert_called_once_with(service, [event], []) + self.txn_ctrl.send.assert_called_once_with(service, [event], [], []) def test_send_single_event_with_queue(self): d = defer.Deferred() - self.txn_ctrl.send = Mock( - side_effect=lambda x, y, z: make_deferred_yieldable(d) - ) + self.txn_ctrl.send = Mock(return_value=make_deferred_yieldable(d)) service = Mock(id=4) event = Mock(event_id="first") event2 = Mock(event_id="second") @@ -224,11 +228,11 @@ def test_send_single_event_with_queue(self): # (call enqueue_for_appservice multiple times deliberately) self.scheduler.enqueue_for_appservice(service, events=[event2]) self.scheduler.enqueue_for_appservice(service, events=[event3]) - self.txn_ctrl.send.assert_called_with(service, [event], []) + self.txn_ctrl.send.assert_called_with(service, [event], [], []) self.assertEquals(1, self.txn_ctrl.send.call_count) # Resolve the send event: expect the queued events to be sent d.callback(service) - self.txn_ctrl.send.assert_called_with(service, [event2, event3], []) + self.txn_ctrl.send.assert_called_with(service, [event2, event3], [], []) self.assertEquals(2, self.txn_ctrl.send.call_count) def test_multiple_service_queues(self): @@ -246,7 +250,7 @@ def test_multiple_service_queues(self): send_return_list = [srv_1_defer, srv_2_defer] - def do_send(x, y, z): + def do_send(*args, **kwargs): return make_deferred_yieldable(send_return_list.pop(0)) self.txn_ctrl.send = Mock(side_effect=do_send) @@ -254,15 +258,15 @@ def do_send(x, y, z): # send events for different ASes and make sure they are sent self.scheduler.enqueue_for_appservice(srv1, events=[srv_1_event]) self.scheduler.enqueue_for_appservice(srv1, events=[srv_1_event2]) - self.txn_ctrl.send.assert_called_with(srv1, [srv_1_event], []) + self.txn_ctrl.send.assert_called_with(srv1, [srv_1_event], [], []) self.scheduler.enqueue_for_appservice(srv2, events=[srv_2_event]) self.scheduler.enqueue_for_appservice(srv2, events=[srv_2_event2]) - self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event], []) + self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event], [], []) # make sure callbacks for a service only send queued events for THAT # service srv_2_defer.callback(srv2) - self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event2], []) + self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event2], [], []) self.assertEquals(3, self.txn_ctrl.send.call_count) def test_send_large_txns(self): @@ -270,7 +274,7 @@ def test_send_large_txns(self): srv_2_defer = defer.Deferred() send_return_list = [srv_1_defer, srv_2_defer] - def do_send(x, y, z): + def do_send(*args, **kwargs): return make_deferred_yieldable(send_return_list.pop(0)) self.txn_ctrl.send = Mock(side_effect=do_send) @@ -281,13 +285,13 @@ def do_send(x, y, z): self.scheduler.enqueue_for_appservice(service, [event], []) # Expect the first event to be sent immediately. - self.txn_ctrl.send.assert_called_with(service, [event_list[0]], []) + self.txn_ctrl.send.assert_called_with(service, [event_list[0]], [], []) srv_1_defer.callback(service) # Then send the next 100 events - self.txn_ctrl.send.assert_called_with(service, event_list[1:101], []) + self.txn_ctrl.send.assert_called_with(service, event_list[1:101], [], []) srv_2_defer.callback(service) # Then the final 99 events - self.txn_ctrl.send.assert_called_with(service, event_list[101:], []) + self.txn_ctrl.send.assert_called_with(service, event_list[101:], [], []) self.assertEquals(3, self.txn_ctrl.send.call_count) def test_send_single_ephemeral_no_queue(self): @@ -295,20 +299,18 @@ def test_send_single_ephemeral_no_queue(self): service = Mock(id=4, name="service") event_list = [Mock(name="event")] self.scheduler.enqueue_for_appservice(service, ephemeral=event_list) - self.txn_ctrl.send.assert_called_once_with(service, [], event_list) + self.txn_ctrl.send.assert_called_once_with(service, [], event_list, []) def test_send_multiple_ephemeral_no_queue(self): # Expect the event to be sent immediately. service = Mock(id=4, name="service") event_list = [Mock(name="event1"), Mock(name="event2"), Mock(name="event3")] self.scheduler.enqueue_for_appservice(service, ephemeral=event_list) - self.txn_ctrl.send.assert_called_once_with(service, [], event_list) + self.txn_ctrl.send.assert_called_once_with(service, [], event_list, []) def test_send_single_ephemeral_with_queue(self): d = defer.Deferred() - self.txn_ctrl.send = Mock( - side_effect=lambda x, y, z: make_deferred_yieldable(d) - ) + self.txn_ctrl.send = Mock(return_value=make_deferred_yieldable(d)) service = Mock(id=4) event_list_1 = [Mock(event_id="event1"), Mock(event_id="event2")] event_list_2 = [Mock(event_id="event3"), Mock(event_id="event4")] @@ -319,26 +321,26 @@ def test_send_single_ephemeral_with_queue(self): # Send more events: expect send() to NOT be called multiple times. self.scheduler.enqueue_for_appservice(service, ephemeral=event_list_2) self.scheduler.enqueue_for_appservice(service, ephemeral=event_list_3) - self.txn_ctrl.send.assert_called_with(service, [], event_list_1) + self.txn_ctrl.send.assert_called_with(service, [], event_list_1, []) self.assertEquals(1, self.txn_ctrl.send.call_count) # Resolve txn_ctrl.send d.callback(service) # Expect the queued events to be sent - self.txn_ctrl.send.assert_called_with(service, [], event_list_2 + event_list_3) + self.txn_ctrl.send.assert_called_with( + service, [], event_list_2 + event_list_3, [] + ) self.assertEquals(2, self.txn_ctrl.send.call_count) def test_send_large_txns_ephemeral(self): d = defer.Deferred() - self.txn_ctrl.send = Mock( - side_effect=lambda x, y, z: make_deferred_yieldable(d) - ) + self.txn_ctrl.send = Mock(return_value=make_deferred_yieldable(d)) # Expect the event to be sent immediately. service = Mock(id=4, name="service") first_chunk = [Mock(name="event%i" % (i + 1)) for i in range(100)] second_chunk = [Mock(name="event%i" % (i + 101)) for i in range(50)] event_list = first_chunk + second_chunk self.scheduler.enqueue_for_appservice(service, ephemeral=event_list) - self.txn_ctrl.send.assert_called_once_with(service, [], first_chunk) + self.txn_ctrl.send.assert_called_once_with(service, [], first_chunk, []) d.callback(service) - self.txn_ctrl.send.assert_called_with(service, [], second_chunk) + self.txn_ctrl.send.assert_called_with(service, [], second_chunk, []) self.assertEquals(2, self.txn_ctrl.send.call_count) diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index c1c5dc79f966..e4ec14927358 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -26,7 +26,7 @@ from synapse.util.stringutils import random_string from tests import unittest -from tests.test_utils import make_awaitable +from tests.test_utils import make_awaitable, simple_async_mock from tests.utils import MockClock @@ -41,7 +41,9 @@ def setUp(self): hs.get_datastore.return_value = self.mock_store self.mock_store.get_received_ts.return_value = make_awaitable(0) self.mock_store.set_appservice_last_pos.return_value = make_awaitable(None) - self.mock_store.set_appservice_stream_type_pos.return_value = make_awaitable(None) + self.mock_store.set_appservice_stream_type_pos.return_value = make_awaitable( + None + ) hs.get_application_service_api.return_value = self.mock_as_api hs.get_application_service_scheduler.return_value = self.mock_scheduler hs.get_clock.return_value = MockClock() @@ -346,11 +348,10 @@ class ApplicationServicesHandlerSendEventsTestCase(unittest.HomeserverTestCase): ] def prepare(self, reactor, clock, hs): - # Mock the ApplicationServiceScheduler queuer so that we can track any - # outgoing ephemeral events - self.mock_service_queuer = Mock() - self.mock_service_queuer.enqueue_ephemeral = Mock() - hs.get_application_service_handler().scheduler.queuer = self.mock_service_queuer + # Mock the ApplicationServiceScheduler's _TransactionController's send method so that + # we can track any outgoing ephemeral events + self.send_mock = simple_async_mock() + hs.get_application_service_handler().scheduler.txn_ctrl.send = self.send_mock # Mock out application services, and allow defining our own in tests self._services: List[ApplicationService] = [] @@ -424,19 +425,21 @@ def test_application_services_receive_local_to_device(self): # Only the local_user -> exclusive_as_user to-device message should have been forwarded to the AS. # # The uninterested application service should not have been notified at all. - self.mock_service_queuer.enqueue_ephemeral.assert_called_once() - service, events = self.mock_service_queuer.enqueue_ephemeral.call_args[0] + self.send_mock.assert_called_once() + service, _events, _ephemeral, to_device_messages = self.send_mock.call_args[0] # Assert that this was the same to-device message that local_user sent self.assertEqual(service, interested_appservice) - self.assertEqual(events[0]["type"], "m.room_key_request") - self.assertEqual(events[0]["sender"], self.local_user) + self.assertEqual(to_device_messages[0]["type"], "m.room_key_request") + self.assertEqual(to_device_messages[0]["sender"], self.local_user) # Additional fields 'to_user_id' and 'to_device_id' specifically for # to-device messages via the AS API - self.assertEqual(events[0]["to_user_id"], self.exclusive_as_user) - self.assertEqual(events[0]["to_device_id"], self.exclusive_as_user_device_id) - self.assertEqual(events[0]["content"], message_content) + self.assertEqual(to_device_messages[0]["to_user_id"], self.exclusive_as_user) + self.assertEqual( + to_device_messages[0]["to_device_id"], self.exclusive_as_user_device_id + ) + self.assertEqual(to_device_messages[0]["content"], message_content) @unittest.override_config( {"experimental_features": {"msc2409_to_device_messages_enabled": True}} @@ -526,22 +529,22 @@ def test_application_services_receive_bursts_of_to_device(self): ) self.assertEqual(chan.code, 200, chan.result) - self.mock_service_queuer.enqueue_ephemeral.assert_called() + self.send_mock.assert_called() # Count the total number of to-device messages that were sent out per-service. # Ensure that we only sent to-device messages to interested services, and that # each interested service received the full count of to-device messages. service_id_to_message_count: Dict[str, int] = {} - for call in self.mock_service_queuer.enqueue_ephemeral.call_args_list: - service, events = call[0] + for call in self.send_mock.call_args_list: + service, _events, _ephemeral, to_device_messages = call[0] # Check that this was made to an interested service self.assertIn(service, interested_appservices) # Add to the count of messages for this application service service_id_to_message_count.setdefault(service.id, 0) - service_id_to_message_count[service.id] += len(events) + service_id_to_message_count[service.id] += len(to_device_messages) # Assert that each interested service received the full count of messages for count in service_id_to_message_count.values(): From ba9143817fa759bb92027d1a8036f4c7a7bf9c50 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Tue, 7 Dec 2021 10:45:30 +0000 Subject: [PATCH 25/45] Fix calls to create_appservice_txn in tests --- tests/storage/test_appservice.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index db157781fc70..ddcb7f554918 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -266,7 +266,9 @@ def test_create_appservice_txn_first( service = Mock(id=self.as_list[0]["id"]) events = cast(List[EventBase], [Mock(event_id="e1"), Mock(event_id="e2")]) txn = self.get_success( - defer.ensureDeferred(self.store.create_appservice_txn(service, events, [])) + defer.ensureDeferred( + self.store.create_appservice_txn(service, events, [], []) + ) ) self.assertEquals(txn.id, 1) self.assertEquals(txn.events, events) @@ -280,7 +282,9 @@ def test_create_appservice_txn_older_last_txn( self.get_success(self._set_last_txn(service.id, 9643)) # AS is falling behind self.get_success(self._insert_txn(service.id, 9644, events)) self.get_success(self._insert_txn(service.id, 9645, events)) - txn = self.get_success(self.store.create_appservice_txn(service, events, [])) + txn = self.get_success( + self.store.create_appservice_txn(service, events, [], []) + ) self.assertEquals(txn.id, 9646) self.assertEquals(txn.events, events) self.assertEquals(txn.service, service) @@ -291,7 +295,9 @@ def test_create_appservice_txn_up_to_date_last_txn( service = Mock(id=self.as_list[0]["id"]) events = cast(List[EventBase], [Mock(event_id="e1"), Mock(event_id="e2")]) self.get_success(self._set_last_txn(service.id, 9643)) - txn = self.get_success(self.store.create_appservice_txn(service, events, [])) + txn = self.get_success( + self.store.create_appservice_txn(service, events, [], []) + ) self.assertEquals(txn.id, 9644) self.assertEquals(txn.events, events) self.assertEquals(txn.service, service) @@ -313,7 +319,9 @@ def test_create_appservice_txn_up_fuzzing( self.get_success(self._insert_txn(self.as_list[2]["id"], 10, events)) self.get_success(self._insert_txn(self.as_list[3]["id"], 9643, events)) - txn = self.get_success(self.store.create_appservice_txn(service, events, [])) + txn = self.get_success( + self.store.create_appservice_txn(service, events, [], []) + ) self.assertEquals(txn.id, 9644) self.assertEquals(txn.events, events) self.assertEquals(txn.service, service) From 06850217221760fc9ba684c14fa9eac8296c7ca4 Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Tue, 11 Jan 2022 11:21:31 +0000 Subject: [PATCH 26/45] Update synapse/storage/schema/main/delta/65/06_msc2409_add_device_id_appservice_stream_type.sql Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- .../65/06_msc2409_add_device_id_appservice_stream_type.sql | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/synapse/storage/schema/main/delta/65/06_msc2409_add_device_id_appservice_stream_type.sql b/synapse/storage/schema/main/delta/65/06_msc2409_add_device_id_appservice_stream_type.sql index 7b4024128226..87a6598f49ee 100644 --- a/synapse/storage/schema/main/delta/65/06_msc2409_add_device_id_appservice_stream_type.sql +++ b/synapse/storage/schema/main/delta/65/06_msc2409_add_device_id_appservice_stream_type.sql @@ -16,8 +16,6 @@ -- Add a column to track what to_device stream id that this application -- service has been caught up to. --- We explicitly don't set this field as "NOT NULL", as having NULL as a possible --- state is useful for determining if we've ever sent traffic for a stream type --- to an appservice. See https://github.com/matrix-org/synapse/issues/10836 for --- one way this can be used. +-- NULL indicates that this appservice has never received any to_device messages. This +-- can be used, for example, to avoid sending a huge dump of messages at startup. ALTER TABLE application_services_state ADD COLUMN to_device_stream_id BIGINT; \ No newline at end of file From 3d1661f35a00e8497907eb91cf596831df41201a Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Tue, 11 Jan 2022 18:18:10 +0000 Subject: [PATCH 27/45] rename recipient_user_id_device_id_to_messages -> recipient_device_to_messages --- synapse/handlers/appservice.py | 4 ++-- synapse/storage/databases/main/deviceinbox.py | 8 +++----- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 426844dce541..88b74e17f0c6 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -510,7 +510,7 @@ async def _get_to_device_messages( return [] # Retrieve the to-device messages for each user - recipient_user_id_device_id_to_messages = await self.store.get_new_messages( + recipient_device_to_messages = await self.store.get_new_messages( users_appservice_is_interested_in, from_key, new_token, @@ -526,7 +526,7 @@ async def _get_to_device_messages( for ( user_id, device_id, - ), messages in recipient_user_id_device_id_to_messages.items(): + ), messages in recipient_device_to_messages.items(): for message_json in messages: # Remove 'message_id' from the to-device message, as it's an internal ID message_json.pop("message_id", None) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 0f222b06d913..45e7edfc688a 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -191,20 +191,18 @@ def get_new_messages_txn(txn: LoggingTransaction): # Create a dictionary of (user ID, device ID) -> list of messages that # that device is meant to receive. - recipient_user_id_device_id_to_messages: Dict[ - Tuple[str, str], List[JsonDict] - ] = {} + recipient_device_to_messages: Dict[Tuple[str, str], List[JsonDict]] = {} for row in txn: recipient_user_id = row[0] recipient_device_id = row[1] message_dict = db_to_json(row[2]) - recipient_user_id_device_id_to_messages.setdefault( + recipient_device_to_messages.setdefault( (recipient_user_id, recipient_device_id), [] ).append(message_dict) - return recipient_user_id_device_id_to_messages + return recipient_device_to_messages return await self.db_pool.runInteraction( "get_new_messages", get_new_messages_txn From 0ac079b8d2db10fae62ac25282f6d6ff5f98b646 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Tue, 11 Jan 2022 18:52:01 +0000 Subject: [PATCH 28/45] lint --- tests/handlers/test_appservice.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index e4ec14927358..0c37f39c0ae3 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -566,7 +566,7 @@ def _register_application_service( """ # Create an application service appservice = ApplicationService( - token=None, + token=random_string(10), hostname="example.com", id=random_string(10), sender="@as:example.com", From 822e92a62f35f52a1cefe42d5af6bed75da93c62 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Tue, 25 Jan 2022 17:43:20 +0000 Subject: [PATCH 29/45] Refactor storage methods to retrieve to-device messages This commit refactors the previously rather duplicated 'get_new_messages_for_device' and 'get_new_messages' methods into one new private method with combined logic, and two small public methods. The public methods expose the correct interface for querying to-device messages for either a single device (where a limit can be used) and multiple devices (where using a limit is infeasible). --- synapse/storage/databases/main/deviceinbox.py | 281 ++++++++++++------ 1 file changed, 194 insertions(+), 87 deletions(-) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 45e7edfc688a..761c1fe0bd63 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -14,7 +14,7 @@ # limitations under the License. import logging -from typing import TYPE_CHECKING, Collection, Dict, List, Optional, Tuple, cast +from typing import TYPE_CHECKING, Collection, Dict, List, Optional, Set, Tuple, cast from synapse.logging import issue9533_logger from synapse.logging.opentracing import log_kv, set_tag, trace @@ -137,134 +137,241 @@ def process_replication_rows(self, stream_name, instance_name, token, rows): def get_to_device_stream_token(self): return self._device_inbox_id_gen.get_current_token() - async def get_new_messages( + async def get_messages_for_user_devices( self, user_ids: Collection[str], from_stream_id: int, to_stream_id: int, ) -> Dict[Tuple[str, str], List[JsonDict]]: """ - Retrieve to-device messages for a given set of user IDs. + Retrieve to-device messages for a given set of users. Only to-device messages with stream ids between the given boundaries (from < X <= to) are returned. - Note that a stream ID can be shared by multiple copies of the same message with - different recipient devices. Each (device, message_content) tuple has their own - row in the device_inbox table. - Args: user_ids: The users to retrieve to-device messages for. from_stream_id: The lower boundary of stream id to filter with (exclusive). to_stream_id: The upper boundary of stream id to filter with (inclusive). Returns: - A list of to-device messages. + A dictionary of (user id, device id) -> list of to-device messages. """ - # Bail out if none of these users have any messages - for user_id in user_ids: - if self._device_inbox_stream_cache.has_entity_changed( - user_id, from_stream_id - ): - break - else: - return {} - - def get_new_messages_txn(txn: LoggingTransaction): - # Build a query to select messages from any of the given users that are between - # the given stream id bounds + # We expect the stream ID returned by _get_new_device_messages to always + # return to_stream_id. So, no need to return it from this function. + user_id_device_id_to_messages, _ = await self._get_device_messages( + user_ids=user_ids, + from_stream_id=from_stream_id, + to_stream_id=to_stream_id, + ) - # Scope to only the given users. We need to use this method as doing so is - # different across database engines. - many_clause_sql, many_clause_args = make_in_list_sql_clause( - self.database_engine, "user_id", user_ids - ) + return user_id_device_id_to_messages - sql = f""" - SELECT user_id, device_id, message_json FROM device_inbox - WHERE {many_clause_sql} - AND ? < stream_id AND stream_id <= ? - ORDER BY stream_id ASC - """ + async def get_messages_for_device( + self, + user_id: str, + device_id: str, + from_stream_id: int, + to_stream_id: int, + limit: int = 100, + ) -> Tuple[List[JsonDict], int]: + """ + Retrieve to-device messages for a single user device. - txn.execute(sql, (*many_clause_args, from_stream_id, to_stream_id)) + Only to-device messages with stream ids between the given boundaries + (from < X <= to) are returned. - # Create a dictionary of (user ID, device ID) -> list of messages that - # that device is meant to receive. - recipient_device_to_messages: Dict[Tuple[str, str], List[JsonDict]] = {} + Args: + user_id: The ID of the user to retrieve messages for. + device_id: The ID of the device to retrieve to-device messages for. + from_stream_id: The lower boundary of stream id to filter with (exclusive). + to_stream_id: The upper boundary of stream id to filter with (inclusive). + limit: A limit on the number of to-device messages returned. - for row in txn: - recipient_user_id = row[0] - recipient_device_id = row[1] - message_dict = db_to_json(row[2]) + Returns: + A tuple containing: + * A dictionary of (user id, device id) -> list of to-device messages. + * The last-processed stream ID. Subsequent calls of this function with the + same device should pass this value as 'from_stream_id'. + """ + ( + user_id_device_id_to_messages, + last_processed_stream_id, + ) = await self._get_device_messages( + user_ids=[user_id], + device_ids=[device_id], + from_stream_id=from_stream_id, + to_stream_id=to_stream_id, + limit=limit, + ) - recipient_device_to_messages.setdefault( - (recipient_user_id, recipient_device_id), [] - ).append(message_dict) + if not user_id_device_id_to_messages: + # There were no messages! + return [], to_stream_id - return recipient_device_to_messages + # Extract the messages, no need to return the user and device ID again + to_device_messages = list(user_id_device_id_to_messages.values())[0] - return await self.db_pool.runInteraction( - "get_new_messages", get_new_messages_txn - ) + return to_device_messages, last_processed_stream_id - async def get_new_messages_for_device( + async def _get_device_messages( self, - user_id: str, - device_id: Optional[str], - last_stream_id: int, - current_stream_id: int, - limit: int = 100, - ) -> Tuple[List[dict], int]: + user_ids: Collection[str], + from_stream_id: int, + to_stream_id: int, + device_ids: Optional[Collection[str]] = None, + limit: Optional[int] = None, + ) -> Tuple[Dict[Tuple[str, str], List[JsonDict]], int]: """ + Retrieve pending to-device messages for a collection of user devices. + + Only to-device messages with stream ids between the given boundaries + (from < X <= to) are returned. + + Note that a stream ID can be shared by multiple copies of the same message with + different recipient devices. Stream IDs are only unique in the context of a single + user ID / device ID pair Thus, applying a limit (of messages to return) when working + with a sliding window of stream IDs is only possible when querying messages of a + single user device. + + Finally, note that device IDs are not unique across users. + Args: - user_id: The recipient user_id. - device_id: The recipient device_id. - last_stream_id: The last stream ID checked. - current_stream_id: The current position of the to device - message stream. - limit: The maximum number of messages to retrieve. + user_ids: The user IDs to filter device messages by. + from_stream_id: The lower boundary of stream id to filter with (exclusive). + to_stream_id: The upper boundary of stream id to filter with (inclusive). + device_ids: If provided, only messages destined for these device IDs will be returned. + If not provided, all device IDs for the given user IDs will be used. + limit: The maximum number of to-device messages to return. Can only be used when + passing a single user ID / device ID tuple. Returns: A tuple containing: - * A list of messages for the device. - * The max stream token of these messages. There may be more to retrieve - if the given limit was reached. + * A dict of (user_id, device_id) -> list of to-device messages + * The last-processed stream ID. If this is less than `to_stream_id`, then + there may be more messages to retrieve. If `limit` is not set, then this + is always equal to 'to_stream_id'. """ - has_changed = self._device_inbox_stream_cache.has_entity_changed( - user_id, last_stream_id - ) - if not has_changed: - return [], current_stream_id + # A limit can only be applied when querying for a single user ID / device ID tuple. + if limit: + if not device_ids: + raise AssertionError( + "Programming error: _get_new_device_messages was passed 'limit' " + "but not device_ids. This could lead to querying multiple user ID " + "/ device ID pairs, which is not compatible with 'limit'" + ) - def get_new_messages_for_device_txn(txn): - sql = ( - "SELECT stream_id, message_json FROM device_inbox" - " WHERE user_id = ? AND device_id = ?" - " AND ? < stream_id AND stream_id <= ?" - " ORDER BY stream_id ASC" - " LIMIT ?" + if len(user_ids) > 1 or len(device_ids) > 1: + raise AssertionError( + "Programming error: _get_new_device_messages was passed 'limit' " + "with >1 user id/device id pair" + ) + + user_ids_to_query: Set[str] = set() + device_ids_to_query: Set[str] = set() + + if device_ids is not None: + # If a collection of device IDs were passed, use them to filter results. + # Otherwise, device IDs will be derived from the given collection of user IDs. + device_ids_to_query.update(device_ids) + + # Determine which users have devices with pending messages + for user_id in user_ids: + if self._device_inbox_stream_cache.has_entity_changed( + user_id, from_stream_id + ): + # This user has new messages sent to them. Query messages for them + user_ids_to_query.add(user_id) + + def get_new_device_messages_txn(txn: LoggingTransaction): + # Build a query to select messages from any of the given devices that + # are between the given stream id bounds. + + # If a list of device IDs was not provided, retrieve all devices IDs + # for the given users. We explicitly do not query hidden devices, as + # hidden devices should not receive to-device messages. + if not device_ids: + user_device_dicts = self.db_pool.simple_select_many_txn( + txn, + table="devices", + column="user_id", + iterable=user_ids_to_query, + keyvalues={"user_id": user_id, "hidden": False}, + retcols=("device_id",), + ) + + device_ids_to_query.update( + {row["device_id"] for row in user_device_dicts} + ) + + if not user_ids_to_query or not device_ids_to_query: + # We've ended up with no devices to query. + return {}, to_stream_id + + # We include both user IDs and device IDs in this query, as we have an index + # (device_inbox_user_stream_id) for them. + user_id_many_clause_sql, user_id_many_clause_args = make_in_list_sql_clause( + self.database_engine, "user_id", user_ids_to_query ) - txn.execute( - sql, (user_id, device_id, last_stream_id, current_stream_id, limit) + ( + device_id_many_clause_sql, + device_id_many_clause_args, + ) = make_in_list_sql_clause( + self.database_engine, "device_id", device_ids_to_query ) - messages = [] - stream_pos = current_stream_id + sql = f""" + SELECT stream_id, user_id, device_id, message_json FROM device_inbox + WHERE {user_id_many_clause_sql} + AND {device_id_many_clause_sql} + AND ? < stream_id AND stream_id <= ? + ORDER BY stream_id ASC + """ + sql_args = ( + *user_id_many_clause_args, + *device_id_many_clause_args, + from_stream_id, + to_stream_id, + ) - for row in txn: - stream_pos = row[0] - messages.append(db_to_json(row[1])) + # If a limit was provided, limit the data retrieved from the database + if limit: + sql += "LIMIT ?" + sql_args += (limit,) - # If the limit was not reached we know that there's no more data for this - # user/device pair up to current_stream_id. - if len(messages) < limit: - stream_pos = current_stream_id + txn.execute(sql, sql_args) - return messages, stream_pos + # Create and fill a dictionary of (user ID, device ID) -> list of messages + # intended for each device. + recipient_device_to_messages: Dict[Tuple[str, str], List[JsonDict]] = {} + for message_count, row in enumerate(txn, start=1): + last_processed_stream_pos = row[0] + recipient_user_id = row[1] + recipient_device_id = row[2] + message_dict = db_to_json(row[3]) + + # Store the device details + recipient_device_to_messages.setdefault( + (recipient_user_id, recipient_device_id), [] + ).append(message_dict) + + if limit and message_count == limit: + # We ended up hitting the message limit. There may be more messages to retrieve. + # Return what we have, as well as the last stream position that was processed. + # + # The caller is expected to set this as the lower (exclusive) bound + # for the next query of this device. + return recipient_device_to_messages, last_processed_stream_pos + + # The limit was not reached, thus we know that recipient_device_to_messages + # contains all to-device messages for the given device and stream id range. + # + # We return to_stream_id, which the caller should then provide as the lower + # (exclusive) bound on the next query of this device. + return recipient_device_to_messages, to_stream_id return await self.db_pool.runInteraction( - "get_new_messages_for_device", get_new_messages_for_device_txn + "get_new_device_messages", get_new_device_messages_txn ) @trace From 25488fa880928f6091fae4fd8e3bbaa0cbc1f554 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Tue, 25 Jan 2022 18:00:35 +0000 Subject: [PATCH 30/45] Update old references to get_new_messages_for_device --- synapse/handlers/appservice.py | 2 +- synapse/handlers/sync.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 88b74e17f0c6..6222ed7c6720 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -510,7 +510,7 @@ async def _get_to_device_messages( return [] # Retrieve the to-device messages for each user - recipient_device_to_messages = await self.store.get_new_messages( + recipient_device_to_messages = await self.store.get_messages_for_user_devices( users_appservice_is_interested_in, from_key, new_token, diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 4b3f1ea0595c..3f80bc83ea3f 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1336,7 +1336,7 @@ async def _generate_sync_entry_for_to_device( since_stream_id = int(sync_result_builder.since_token.to_device_key) if since_stream_id != int(now_token.to_device_key): - messages, stream_id = await self.store.get_new_messages_for_device( + messages, stream_id = await self.store.get_messages_for_device( user_id, device_id, since_stream_id, now_token.to_device_key ) From 026cb8a5ca5f43d1b42e31ec500d97f4947a3535 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Wed, 26 Jan 2022 15:07:16 +0000 Subject: [PATCH 31/45] Don't query for to-device messages without a device --- synapse/handlers/sync.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 3f80bc83ea3f..7801aeb28802 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1335,7 +1335,7 @@ async def _generate_sync_entry_for_to_device( if sync_result_builder.since_token is not None: since_stream_id = int(sync_result_builder.since_token.to_device_key) - if since_stream_id != int(now_token.to_device_key): + if device_id is not None and since_stream_id != int(now_token.to_device_key): messages, stream_id = await self.store.get_messages_for_device( user_id, device_id, since_stream_id, now_token.to_device_key ) From 81296579c2ecfe12b8ebb0084a3bd45dc36cb6e2 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Wed, 26 Jan 2022 15:42:17 +0000 Subject: [PATCH 32/45] Fix shape of simple_insert_many argument --- tests/handlers/test_appservice.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index 0c37f39c0ae3..fe57ff267119 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -497,11 +497,12 @@ def test_application_services_receive_bursts_of_to_device(self): self.hs.get_datastore().db_pool.simple_insert_many( desc="test_application_services_receive_burst_of_to_device", table="devices", + keys=("user_id", "device_id"), values=[ - { - "user_id": self.exclusive_as_user, - "device_id": device_id, - } + ( + self.exclusive_as_user, + device_id, + ) for device_id in fake_device_ids ], ) From de48ab41a5bb4fd0911969d08ac7f5b0a5f96224 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Wed, 26 Jan 2022 16:40:10 +0000 Subject: [PATCH 33/45] Only import MemoryReactor if type-checking in test_scheduler; fix old-deps --- tests/appservice/test_scheduler.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py index 8f9afa85386c..8fb6687f89eb 100644 --- a/tests/appservice/test_scheduler.py +++ b/tests/appservice/test_scheduler.py @@ -11,10 +11,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from typing import TYPE_CHECKING from unittest.mock import Mock from twisted.internet import defer -from twisted.internet.testing import MemoryReactor from synapse.appservice import ApplicationServiceState from synapse.appservice.scheduler import ( @@ -31,6 +31,9 @@ from ..utils import MockClock +if TYPE_CHECKING: + from twisted.internet.testing import MemoryReactor + class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): def setUp(self): @@ -199,7 +202,7 @@ def take_txn(*args, **kwargs): class ApplicationServiceSchedulerQueuerTestCase(unittest.HomeserverTestCase): - def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer): + def prepare(self, reactor: "MemoryReactor", clock: Clock, hs: HomeServer): self.scheduler = ApplicationServiceScheduler(hs) self.txn_ctrl = Mock() self.txn_ctrl.send = simple_async_mock() From d8b8f74a6bb3897b32182b206e05da2fecc65afc Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Wed, 26 Jan 2022 16:43:51 +0000 Subject: [PATCH 34/45] Update _txn function to match outer method name --- synapse/storage/databases/main/deviceinbox.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 05637cdf8a1a..14a53cf3b36a 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -283,7 +283,7 @@ async def _get_device_messages( # This user has new messages sent to them. Query messages for them user_ids_to_query.add(user_id) - def get_new_device_messages_txn(txn: LoggingTransaction): + def get_device_messages_txn(txn: LoggingTransaction): # Build a query to select messages from any of the given devices that # are between the given stream id bounds. @@ -371,7 +371,7 @@ def get_new_device_messages_txn(txn: LoggingTransaction): return recipient_device_to_messages, to_stream_id return await self.db_pool.runInteraction( - "get_new_device_messages", get_new_device_messages_txn + "get_device_messages", get_device_messages_txn ) @trace From ced13148934b9c7c846d685b6d0133bdcd118ebe Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Fri, 28 Jan 2022 10:12:06 +0000 Subject: [PATCH 35/45] Clarify users arg in _get_to_device_messages docstring --- synapse/handlers/appservice.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 6222ed7c6720..d90c3c6b9a7d 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -483,7 +483,8 @@ async def _get_to_device_messages( Args: service: The application service to check for which events it should receive. new_token: The latest to-device event stream token. - users: The users that should receive new to-device messages. + users: The users to be notified for the new to-device messages + (ie, the recipients of the messages). Returns: A list of JSON dictionaries containing data derived from the typing events From c749fcb8edc820b07a24e845d0e74c81a2e46154 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Fri, 28 Jan 2022 13:13:50 +0000 Subject: [PATCH 36/45] assert stream_id returned by get_device_messages is as expected --- synapse/storage/databases/main/deviceinbox.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 14a53cf3b36a..f1bc6555ef70 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -159,12 +159,19 @@ async def get_messages_for_user_devices( """ # We expect the stream ID returned by _get_new_device_messages to always # return to_stream_id. So, no need to return it from this function. - user_id_device_id_to_messages, _ = await self._get_device_messages( + ( + user_id_device_id_to_messages, + last_processed_stream_id, + ) = await self._get_device_messages( user_ids=user_ids, from_stream_id=from_stream_id, to_stream_id=to_stream_id, ) + assert ( + last_processed_stream_id == to_stream_id + ), "Expected _get_device_messages to process all to-device messages up to `to_stream_id`" + return user_id_device_id_to_messages async def get_messages_for_device( From 24512fb45b405f6bac9ca3caf249aff8f17aee0b Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Fri, 28 Jan 2022 13:32:02 +0000 Subject: [PATCH 37/45] Fix get_messages_for_device return type documentation --- synapse/storage/databases/main/deviceinbox.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index f1bc6555ef70..aede8bef1886 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -197,7 +197,8 @@ async def get_messages_for_device( Returns: A tuple containing: - * A dictionary of (user id, device id) -> list of to-device messages. + * A list of to-device messages within the given stream id range intended for + the given user / device combo. * The last-processed stream ID. Subsequent calls of this function with the same device should pass this value as 'from_stream_id'. """ From 24bc3c59eea7577a744e0f22d1d1768922cd2e16 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Fri, 28 Jan 2022 15:05:47 +0000 Subject: [PATCH 38/45] Clean up limit checking --- synapse/storage/databases/main/deviceinbox.py | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index aede8bef1886..0e3daa589961 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -218,7 +218,7 @@ async def get_messages_for_device( return [], to_stream_id # Extract the messages, no need to return the user and device ID again - to_device_messages = list(user_id_device_id_to_messages.values())[0] + to_device_messages = user_id_device_id_to_messages.get((user_id, device_id), []) return to_device_messages, last_processed_stream_id @@ -343,7 +343,7 @@ def get_device_messages_txn(txn: LoggingTransaction): ) # If a limit was provided, limit the data retrieved from the database - if limit: + if limit is not None: sql += "LIMIT ?" sql_args += (limit,) @@ -351,8 +351,9 @@ def get_device_messages_txn(txn: LoggingTransaction): # Create and fill a dictionary of (user ID, device ID) -> list of messages # intended for each device. + last_processed_stream_pos = to_stream_id recipient_device_to_messages: Dict[Tuple[str, str], List[JsonDict]] = {} - for message_count, row in enumerate(txn, start=1): + for row in txn: last_processed_stream_pos = row[0] recipient_user_id = row[1] recipient_device_id = row[2] @@ -363,13 +364,13 @@ def get_device_messages_txn(txn: LoggingTransaction): (recipient_user_id, recipient_device_id), [] ).append(message_dict) - if limit and message_count == limit: - # We ended up hitting the message limit. There may be more messages to retrieve. - # Return what we have, as well as the last stream position that was processed. - # - # The caller is expected to set this as the lower (exclusive) bound - # for the next query of this device. - return recipient_device_to_messages, last_processed_stream_pos + if limit is not None and txn.rowcount == limit: + # We ended up hitting the message limit. There may be more messages to retrieve. + # Return what we have, as well as the last stream position that was processed. + # + # The caller is expected to set this as the lower (exclusive) bound + # for the next query of this device. + return recipient_device_to_messages, last_processed_stream_pos # The limit was not reached, thus we know that recipient_device_to_messages # contains all to-device messages for the given device and stream id range. From 30b74a5292aeacb0fe05bdad335a2db7409be379 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Fri, 28 Jan 2022 15:07:48 +0000 Subject: [PATCH 39/45] Move DB migration to schema v68 --- .../02_msc2409_add_device_id_appservice_stream_type.sql} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename synapse/storage/schema/main/delta/{65/06_msc2409_add_device_id_appservice_stream_type.sql => 68/02_msc2409_add_device_id_appservice_stream_type.sql} (94%) diff --git a/synapse/storage/schema/main/delta/65/06_msc2409_add_device_id_appservice_stream_type.sql b/synapse/storage/schema/main/delta/68/02_msc2409_add_device_id_appservice_stream_type.sql similarity index 94% rename from synapse/storage/schema/main/delta/65/06_msc2409_add_device_id_appservice_stream_type.sql rename to synapse/storage/schema/main/delta/68/02_msc2409_add_device_id_appservice_stream_type.sql index 87a6598f49ee..bbf0af53110f 100644 --- a/synapse/storage/schema/main/delta/65/06_msc2409_add_device_id_appservice_stream_type.sql +++ b/synapse/storage/schema/main/delta/68/02_msc2409_add_device_id_appservice_stream_type.sql @@ -1,4 +1,4 @@ -/* Copyright 2021 The Matrix.org Foundation C.I.C +/* Copyright 2022 The Matrix.org Foundation C.I.C * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From 80c37214c36a9c43c9ff8b2564d0742a9c9d5be0 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Fri, 28 Jan 2022 15:15:45 +0000 Subject: [PATCH 40/45] Accept iterables in enqueue_for_appservice --- synapse/appservice/scheduler.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index efe2a9f9ce1c..71342fa13f16 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -52,8 +52,8 @@ TYPE_CHECKING, Awaitable, Callable, + Collection, Dict, - Iterable, List, Optional, Set, @@ -112,9 +112,9 @@ async def start(self) -> None: def enqueue_for_appservice( self, appservice: ApplicationService, - events: Optional[Iterable[EventBase]] = None, - ephemeral: Optional[Iterable[JsonDict]] = None, - to_device_messages: Optional[Iterable[JsonDict]] = None, + events: Optional[Collection[EventBase]] = None, + ephemeral: Optional[Collection[JsonDict]] = None, + to_device_messages: Optional[Collection[JsonDict]] = None, ) -> None: """ Enqueue some data to be sent off to an application service. From 3d8e50d4fda4462a7b772d0ba102925f1311b728 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Fri, 28 Jan 2022 17:19:35 +0000 Subject: [PATCH 41/45] wording fixes --- synapse/appservice/scheduler.py | 3 ++- synapse/config/experimental.py | 3 +-- synapse/handlers/appservice.py | 2 +- synapse/storage/databases/main/deviceinbox.py | 9 +++++---- 4 files changed, 9 insertions(+), 8 deletions(-) diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 71342fa13f16..c42fa32fff32 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -118,6 +118,7 @@ def enqueue_for_appservice( ) -> None: """ Enqueue some data to be sent off to an application service. + Args: appservice: The application service to create and send a transaction to. events: The persistent room events to send. @@ -127,7 +128,7 @@ def enqueue_for_appservice( 'to_user_id' fields. """ # We purposefully allow this method to run with empty events/ephemeral - # iterables, so that callers do not need to check iterable size themselves. + # collections, so that callers do not need to check iterable size themselves. if not events and not ephemeral and not to_device_messages: return diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py index e1068ec5e63e..1f2c976814e2 100644 --- a/synapse/config/experimental.py +++ b/synapse/config/experimental.py @@ -57,8 +57,7 @@ def read_config(self, config: JsonDict, **kwargs): # MSC2409 (this setting only relates to optionally sending to-device messages). # Presence, typing and read receipt EDUs are already sent to application services that - # have opted in to receive them. This setting, if enabled, adds to-device messages - # to that list. + # have opted in to receive them. If enabled, this adds to-device messages to that list. self.msc2409_to_device_messages_enabled: bool = experimental.get( "msc2409_to_device_messages_enabled", False ) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index d90c3c6b9a7d..fb5299c779db 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -477,7 +477,7 @@ async def _get_to_device_messages( ) -> List[JsonDict]: """ Given an application service, determine which events it should receive - from those between the last-recorded typing event stream token for this + from those between the last-recorded to-device message stream token for this appservice and the given stream token. Args: diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 0e3daa589961..163bf89b1e9a 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -157,7 +157,7 @@ async def get_messages_for_user_devices( Returns: A dictionary of (user id, device id) -> list of to-device messages. """ - # We expect the stream ID returned by _get_new_device_messages to always + # We expect the stream ID returned by _get_device_messages to always # return to_stream_id. So, no need to return it from this function. ( user_id_device_id_to_messages, @@ -238,7 +238,7 @@ async def _get_device_messages( Note that a stream ID can be shared by multiple copies of the same message with different recipient devices. Stream IDs are only unique in the context of a single - user ID / device ID pair Thus, applying a limit (of messages to return) when working + user ID / device ID pair. Thus, applying a limit (of messages to return) when working with a sliding window of stream IDs is only possible when querying messages of a single user device. @@ -365,8 +365,9 @@ def get_device_messages_txn(txn: LoggingTransaction): ).append(message_dict) if limit is not None and txn.rowcount == limit: - # We ended up hitting the message limit. There may be more messages to retrieve. - # Return what we have, as well as the last stream position that was processed. + # We ended up bumping up against the message limit. There may be more messages + # to retrieve. Return what we have, as well as the last stream position that + # was processed. # # The caller is expected to set this as the lower (exclusive) bound # for the next query of this device. From 50ebcb9b8ff512934f128b5a06d9dcd4fc5ec614 Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Fri, 28 Jan 2022 17:58:02 +0000 Subject: [PATCH 42/45] Apply suggestions from code review Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- synapse/handlers/appservice.py | 2 +- synapse/notifier.py | 2 +- synapse/storage/databases/main/deviceinbox.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index fb5299c779db..0fb919acf672 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -487,7 +487,7 @@ async def _get_to_device_messages( (ie, the recipients of the messages). Returns: - A list of JSON dictionaries containing data derived from the typing events + A list of JSON dictionaries containing data derived from the to-device events that should be sent to the given application service. """ # Get the stream token that this application service has processed up until diff --git a/synapse/notifier.py b/synapse/notifier.py index e8c937f8474e..5988c67d9076 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -462,7 +462,7 @@ def on_new_event( ) except Exception: logger.exception( - "Error notifying application services of ephemeral event" + "Error notifying application services of ephemeral events" ) def on_new_replication_data(self) -> None: diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 163bf89b1e9a..d2aa904a9c75 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -158,7 +158,7 @@ async def get_messages_for_user_devices( A dictionary of (user id, device id) -> list of to-device messages. """ # We expect the stream ID returned by _get_device_messages to always - # return to_stream_id. So, no need to return it from this function. + # be to_stream_id. So, no need to return it from this function. ( user_id_device_id_to_messages, last_processed_stream_id, From 089e04168f64be088b89cb696dab76eb8b9f68ad Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Mon, 31 Jan 2022 12:15:59 +0000 Subject: [PATCH 43/45] Only allow querying either a single device ID, or all device IDs of users --- synapse/storage/databases/main/deviceinbox.py | 41 +++++++++++-------- 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index d2aa904a9c75..54e808a86338 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -207,7 +207,7 @@ async def get_messages_for_device( last_processed_stream_id, ) = await self._get_device_messages( user_ids=[user_id], - device_ids=[device_id], + device_id=device_id, from_stream_id=from_stream_id, to_stream_id=to_stream_id, limit=limit, @@ -227,7 +227,7 @@ async def _get_device_messages( user_ids: Collection[str], from_stream_id: int, to_stream_id: int, - device_ids: Optional[Collection[str]] = None, + device_id: Optional[str] = None, limit: Optional[int] = None, ) -> Tuple[Dict[Tuple[str, str], List[JsonDict]], int]: """ @@ -248,8 +248,9 @@ async def _get_device_messages( user_ids: The user IDs to filter device messages by. from_stream_id: The lower boundary of stream id to filter with (exclusive). to_stream_id: The upper boundary of stream id to filter with (inclusive). - device_ids: If provided, only messages destined for these device IDs will be returned. - If not provided, all device IDs for the given user IDs will be used. + device_id: A device ID to query to-device messages for. If not provided, to-device + messages from all device IDs for the given user IDs will be queried. May not be + provided if `user_ids` contains more than one entry. limit: The maximum number of to-device messages to return. Can only be used when passing a single user ID / device ID tuple. @@ -260,28 +261,32 @@ async def _get_device_messages( there may be more messages to retrieve. If `limit` is not set, then this is always equal to 'to_stream_id'. """ - # A limit can only be applied when querying for a single user ID / device ID tuple. - if limit: - if not device_ids: + if not user_ids: + logger.warning("No users provided upon querying for device IDs") + return {}, to_stream_id + + if len(user_ids) > 1: + if device_id is not None: raise AssertionError( - "Programming error: _get_new_device_messages was passed 'limit' " - "but not device_ids. This could lead to querying multiple user ID " - "/ device ID pairs, which is not compatible with 'limit'" + "Programming error: 'device_id' cannot be supplied to " + "_get_device_messages when >1 user_id has been provided" ) - if len(user_ids) > 1 or len(device_ids) > 1: + # A limit can only be applied when querying for a single user ID / device ID tuple. + if limit: raise AssertionError( - "Programming error: _get_new_device_messages was passed 'limit' " - "with >1 user id/device id pair" + "Programming error: _get_device_messages was passed 'limit' " + "with >1 user_id" ) user_ids_to_query: Set[str] = set() device_ids_to_query: Set[str] = set() - if device_ids is not None: - # If a collection of device IDs were passed, use them to filter results. + # Note that a device ID could be an empty str + if device_id is not None: + # If a device ID was passed, use it to filter results. # Otherwise, device IDs will be derived from the given collection of user IDs. - device_ids_to_query.update(device_ids) + device_ids_to_query.add(device_id) # Determine which users have devices with pending messages for user_id in user_ids: @@ -298,7 +303,7 @@ def get_device_messages_txn(txn: LoggingTransaction): # If a list of device IDs was not provided, retrieve all devices IDs # for the given users. We explicitly do not query hidden devices, as # hidden devices should not receive to-device messages. - if not device_ids: + if not device_ids_to_query: user_device_dicts = self.db_pool.simple_select_many_txn( txn, table="devices", @@ -312,7 +317,7 @@ def get_device_messages_txn(txn: LoggingTransaction): {row["device_id"] for row in user_device_dicts} ) - if not user_ids_to_query or not device_ids_to_query: + if not device_ids_to_query: # We've ended up with no devices to query. return {}, to_stream_id From d06781e4636d0aa48a947bb556aa0322c16eafef Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Tue, 1 Feb 2022 11:19:52 +0000 Subject: [PATCH 44/45] Apply suggestions from code review Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- synapse/storage/databases/main/deviceinbox.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 54e808a86338..b6c6d722e49f 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -273,7 +273,7 @@ async def _get_device_messages( ) # A limit can only be applied when querying for a single user ID / device ID tuple. - if limit: + if limit is not None: raise AssertionError( "Programming error: _get_device_messages was passed 'limit' " "with >1 user_id" @@ -303,6 +303,8 @@ def get_device_messages_txn(txn: LoggingTransaction): # If a list of device IDs was not provided, retrieve all devices IDs # for the given users. We explicitly do not query hidden devices, as # hidden devices should not receive to-device messages. + # Note that this is more efficient than just dropping `device_id` from the query, + # since device_inbox has an index on `(user_id, device_id, stream_id)` if not device_ids_to_query: user_device_dicts = self.db_pool.simple_select_many_txn( txn, From c334eef8ba7492d87938c868db7aebfc30d149af Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Tue, 1 Feb 2022 11:27:58 +0000 Subject: [PATCH 45/45] Require a user id/device id pair if passing 'limit' --- synapse/storage/databases/main/deviceinbox.py | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index b6c6d722e49f..8801b7b2dd83 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -265,19 +265,21 @@ async def _get_device_messages( logger.warning("No users provided upon querying for device IDs") return {}, to_stream_id - if len(user_ids) > 1: - if device_id is not None: - raise AssertionError( - "Programming error: 'device_id' cannot be supplied to " - "_get_device_messages when >1 user_id has been provided" - ) + # Prevent a query for one user's device also retrieving another user's device with + # the same device ID (device IDs are not unique across users). + if len(user_ids) > 1 and device_id is not None: + raise AssertionError( + "Programming error: 'device_id' cannot be supplied to " + "_get_device_messages when >1 user_id has been provided" + ) - # A limit can only be applied when querying for a single user ID / device ID tuple. - if limit is not None: - raise AssertionError( - "Programming error: _get_device_messages was passed 'limit' " - "with >1 user_id" - ) + # A limit can only be applied when querying for a single user ID / device ID tuple. + # See the docstring of this function for more details. + if limit is not None and device_id is None: + raise AssertionError( + "Programming error: _get_device_messages was passed 'limit' " + "without a specific user_id/device_id" + ) user_ids_to_query: Set[str] = set() device_ids_to_query: Set[str] = set()