From ccbbfa27adb40c47229fdd8d103054cc5c8bbe23 Mon Sep 17 00:00:00 2001 From: Electron Date: Tue, 11 Apr 2023 18:54:23 +0530 Subject: [PATCH 01/10] Removed single user devices resync rest servlet class implementation --- synapse/replication/http/devices.py | 58 ----------------------------- 1 file changed, 58 deletions(-) diff --git a/synapse/replication/http/devices.py b/synapse/replication/http/devices.py index cc3929dcf565..ddb3091c8778 100644 --- a/synapse/replication/http/devices.py +++ b/synapse/replication/http/devices.py @@ -27,63 +27,6 @@ logger = logging.getLogger(__name__) - -class ReplicationUserDevicesResyncRestServlet(ReplicationEndpoint): - """Ask master to resync the device list for a user by contacting their - server. - - This must happen on master so that the results can be correctly cached in - the database and streamed to workers. - - Request format: - - POST /_synapse/replication/user_device_resync/:user_id - - {} - - Response is equivalent to ` /_matrix/federation/v1/user/devices/:user_id` - response, e.g.: - - { - "user_id": "@alice:example.org", - "devices": [ - { - "device_id": "JLAFKJWSCS", - "keys": { ... }, - "device_display_name": "Alice's Mobile Phone" - } - ] - } - """ - - NAME = "user_device_resync" - PATH_ARGS = ("user_id",) - CACHE = False - - def __init__(self, hs: "HomeServer"): - super().__init__(hs) - - from synapse.handlers.device import DeviceHandler - - handler = hs.get_device_handler() - assert isinstance(handler, DeviceHandler) - self.device_list_updater = handler.device_list_updater - - self.store = hs.get_datastores().main - self.clock = hs.get_clock() - - @staticmethod - async def _serialize_payload(user_id: str) -> JsonDict: # type: ignore[override] - return {} - - async def _handle_request( # type: ignore[override] - self, request: Request, content: JsonDict, user_id: str - ) -> Tuple[int, Optional[JsonDict]]: - user_devices = await self.device_list_updater.user_device_resync(user_id) - - return 200, user_devices - - class ReplicationMultiUserDevicesResyncRestServlet(ReplicationEndpoint): """Ask master to resync the device list for multiple users from the same remote server by contacting their server. @@ -216,6 +159,5 @@ async def _handle_request( # type: ignore[override] def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: - ReplicationUserDevicesResyncRestServlet(hs).register(http_server) ReplicationMultiUserDevicesResyncRestServlet(hs).register(http_server) ReplicationUploadKeysForUserRestServlet(hs).register(http_server) From 3e3a5f152fe2c72cd9fd6750597d2ba2d62d7156 Mon Sep 17 00:00:00 2001 From: Electron Date: Tue, 11 Apr 2023 19:03:00 +0530 Subject: [PATCH 02/10] Removed single user device resync api and its usage from DeviceListWorkerUpdater --- synapse/handlers/device.py | 19 +++---------------- 1 file changed, 3 insertions(+), 16 deletions(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index d2063d443558..ed37f7093da0 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -918,13 +918,9 @@ class DeviceListWorkerUpdater: def __init__(self, hs: "HomeServer"): from synapse.replication.http.devices import ( - ReplicationMultiUserDevicesResyncRestServlet, - ReplicationUserDevicesResyncRestServlet, + ReplicationMultiUserDevicesResyncRestServlet ) - self._user_device_resync_client = ( - ReplicationUserDevicesResyncRestServlet.make_client(hs) - ) self._multi_user_device_resync_client = ( ReplicationMultiUserDevicesResyncRestServlet.make_client(hs) ) @@ -948,17 +944,8 @@ async def multi_user_device_resync( try: return await self._multi_user_device_resync_client(user_ids=user_ids) - except SynapseError as err: - if not ( - err.code == HTTPStatus.NOT_FOUND and err.errcode == Codes.UNRECOGNIZED - ): - raise - - # Fall back to single requests - result: Dict[str, Optional[JsonDict]] = {} - for user_id in user_ids: - result[user_id] = await self._user_device_resync_client(user_id=user_id) - return result + except: + raise async def user_device_resync( self, user_id: str, mark_failed_as_stale: bool = True From efa83584232d7a8335cd694f5ecd10d74fdb3ae6 Mon Sep 17 00:00:00 2001 From: Electron Date: Tue, 11 Apr 2023 19:29:21 +0530 Subject: [PATCH 03/10] Fixed mistake of removing ReplicationUserDevicesResyncRestServlet class --- synapse/handlers/device.py | 3 +- synapse/replication/http/devices.py | 58 +++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index ed37f7093da0..2aa11e0f8afe 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -14,7 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from http import HTTPStatus from typing import ( TYPE_CHECKING, Any, @@ -918,7 +917,7 @@ class DeviceListWorkerUpdater: def __init__(self, hs: "HomeServer"): from synapse.replication.http.devices import ( - ReplicationMultiUserDevicesResyncRestServlet + ReplicationMultiUserDevicesResyncRestServlet, ) self._multi_user_device_resync_client = ( diff --git a/synapse/replication/http/devices.py b/synapse/replication/http/devices.py index ddb3091c8778..cc3929dcf565 100644 --- a/synapse/replication/http/devices.py +++ b/synapse/replication/http/devices.py @@ -27,6 +27,63 @@ logger = logging.getLogger(__name__) + +class ReplicationUserDevicesResyncRestServlet(ReplicationEndpoint): + """Ask master to resync the device list for a user by contacting their + server. + + This must happen on master so that the results can be correctly cached in + the database and streamed to workers. + + Request format: + + POST /_synapse/replication/user_device_resync/:user_id + + {} + + Response is equivalent to ` /_matrix/federation/v1/user/devices/:user_id` + response, e.g.: + + { + "user_id": "@alice:example.org", + "devices": [ + { + "device_id": "JLAFKJWSCS", + "keys": { ... }, + "device_display_name": "Alice's Mobile Phone" + } + ] + } + """ + + NAME = "user_device_resync" + PATH_ARGS = ("user_id",) + CACHE = False + + def __init__(self, hs: "HomeServer"): + super().__init__(hs) + + from synapse.handlers.device import DeviceHandler + + handler = hs.get_device_handler() + assert isinstance(handler, DeviceHandler) + self.device_list_updater = handler.device_list_updater + + self.store = hs.get_datastores().main + self.clock = hs.get_clock() + + @staticmethod + async def _serialize_payload(user_id: str) -> JsonDict: # type: ignore[override] + return {} + + async def _handle_request( # type: ignore[override] + self, request: Request, content: JsonDict, user_id: str + ) -> Tuple[int, Optional[JsonDict]]: + user_devices = await self.device_list_updater.user_device_resync(user_id) + + return 200, user_devices + + class ReplicationMultiUserDevicesResyncRestServlet(ReplicationEndpoint): """Ask master to resync the device list for multiple users from the same remote server by contacting their server. @@ -159,5 +216,6 @@ async def _handle_request( # type: ignore[override] def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: + ReplicationUserDevicesResyncRestServlet(hs).register(http_server) ReplicationMultiUserDevicesResyncRestServlet(hs).register(http_server) ReplicationUploadKeysForUserRestServlet(hs).register(http_server) From 7f21e3d2d09a49191b55a02b95c442ae6bc7c33d Mon Sep 17 00:00:00 2001 From: Electron Date: Tue, 11 Apr 2023 19:59:01 +0530 Subject: [PATCH 04/10] changelog added and Signed-off-by: Alok Kumar Singh alokaks601@gmail.com --- changelog.d/15418.removal | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/15418.removal diff --git a/changelog.d/15418.removal b/changelog.d/15418.removal new file mode 100644 index 000000000000..b9f7b99c1cbb --- /dev/null +++ b/changelog.d/15418.removal @@ -0,0 +1 @@ +Since multi-user device resync api is now available, single user counterpart is removed. \ No newline at end of file From ad34524a7b8f6dc9c5cc91ae194271b7b73511d4 Mon Sep 17 00:00:00 2001 From: Electron Date: Wed, 12 Apr 2023 23:44:33 +0530 Subject: [PATCH 05/10] Updated changelog for current PR --- changelog.d/15418.misc | 1 + changelog.d/15418.removal | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) create mode 100644 changelog.d/15418.misc delete mode 100644 changelog.d/15418.removal diff --git a/changelog.d/15418.misc b/changelog.d/15418.misc new file mode 100644 index 000000000000..ca6f995a9c70 --- /dev/null +++ b/changelog.d/15418.misc @@ -0,0 +1 @@ +Always use multi-user device resync replication endpoints. \ No newline at end of file diff --git a/changelog.d/15418.removal b/changelog.d/15418.removal deleted file mode 100644 index b9f7b99c1cbb..000000000000 --- a/changelog.d/15418.removal +++ /dev/null @@ -1 +0,0 @@ -Since multi-user device resync api is now available, single user counterpart is removed. \ No newline at end of file From 34ad2be7be2f7a9d30d898d025207f8f42881ae6 Mon Sep 17 00:00:00 2001 From: Electron Date: Fri, 14 Apr 2023 14:11:31 +0530 Subject: [PATCH 06/10] Removed single-user resync usage and updated it to use multi-user counterpart --- synapse/handlers/devicemessage.py | 14 ++++--- synapse/handlers/federation_event.py | 14 ++++--- synapse/replication/http/devices.py | 57 ---------------------------- 3 files changed, 17 insertions(+), 68 deletions(-) diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index 00c403db4925..3caf9b31cc8b 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -25,7 +25,9 @@ log_kv, set_tag, ) -from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet +from synapse.replication.http.devices import ( + ReplicationMultiUserDevicesResyncRestServlet, +) from synapse.types import JsonDict, Requester, StreamKeyType, UserID, get_domain_from_id from synapse.util import json_encoder from synapse.util.stringutils import random_string @@ -71,12 +73,12 @@ def __init__(self, hs: "HomeServer"): # sync. We do all device list resyncing on the master instance, so if # we're on a worker we hit the device resync replication API. if hs.config.worker.worker_app is None: - self._user_device_resync = ( - hs.get_device_handler().device_list_updater.user_device_resync + self._multi_user_device_resync = ( + hs.get_device_handler().device_list_updater.multi_user_device_resync ) else: - self._user_device_resync = ( - ReplicationUserDevicesResyncRestServlet.make_client(hs) + self._multi_user_device_resync = ( + ReplicationMultiUserDevicesResyncRestServlet.make_client(hs) ) # a rate limiter for room key requests. The keys are @@ -198,7 +200,7 @@ async def _check_for_unknown_devices( await self.store.mark_remote_users_device_caches_as_stale((sender_user_id,)) # Immediately attempt a resync in the background - run_in_background(self._user_device_resync, user_id=sender_user_id) + run_in_background(self._multi_user_device_resync, user_ids=[sender_user_id]) async def send_device_message( self, diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 8d5be81a9207..6f41535b7776 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -70,7 +70,9 @@ trace, ) from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet +from synapse.replication.http.devices import ( + ReplicationMultiUserDevicesResyncRestServlet, +) from synapse.replication.http.federation import ( ReplicationFederationSendEventsRestServlet, ) @@ -167,8 +169,8 @@ def __init__(self, hs: "HomeServer"): self._send_events = ReplicationFederationSendEventsRestServlet.make_client(hs) if hs.config.worker.worker_app: - self._user_device_resync = ( - ReplicationUserDevicesResyncRestServlet.make_client(hs) + self.multi_user_device_resync = ( + ReplicationMultiUserDevicesResyncRestServlet.make_client(hs) ) else: self._device_list_updater = hs.get_device_handler().device_list_updater @@ -1487,9 +1489,11 @@ async def _resync_device(self, sender: str) -> None: # Immediately attempt a resync in the background if self._config.worker.worker_app: - await self._user_device_resync(user_id=sender) + await self.multi_user_device_resync(user_ids=[sender]) else: - await self._device_list_updater.user_device_resync(sender) + await self._device_list_updater.multi_user_device_resync( + user_ids=[sender] + ) except Exception: logger.exception("Failed to resync device for %s", sender) diff --git a/synapse/replication/http/devices.py b/synapse/replication/http/devices.py index cc3929dcf565..f874f072f901 100644 --- a/synapse/replication/http/devices.py +++ b/synapse/replication/http/devices.py @@ -28,62 +28,6 @@ logger = logging.getLogger(__name__) -class ReplicationUserDevicesResyncRestServlet(ReplicationEndpoint): - """Ask master to resync the device list for a user by contacting their - server. - - This must happen on master so that the results can be correctly cached in - the database and streamed to workers. - - Request format: - - POST /_synapse/replication/user_device_resync/:user_id - - {} - - Response is equivalent to ` /_matrix/federation/v1/user/devices/:user_id` - response, e.g.: - - { - "user_id": "@alice:example.org", - "devices": [ - { - "device_id": "JLAFKJWSCS", - "keys": { ... }, - "device_display_name": "Alice's Mobile Phone" - } - ] - } - """ - - NAME = "user_device_resync" - PATH_ARGS = ("user_id",) - CACHE = False - - def __init__(self, hs: "HomeServer"): - super().__init__(hs) - - from synapse.handlers.device import DeviceHandler - - handler = hs.get_device_handler() - assert isinstance(handler, DeviceHandler) - self.device_list_updater = handler.device_list_updater - - self.store = hs.get_datastores().main - self.clock = hs.get_clock() - - @staticmethod - async def _serialize_payload(user_id: str) -> JsonDict: # type: ignore[override] - return {} - - async def _handle_request( # type: ignore[override] - self, request: Request, content: JsonDict, user_id: str - ) -> Tuple[int, Optional[JsonDict]]: - user_devices = await self.device_list_updater.user_device_resync(user_id) - - return 200, user_devices - - class ReplicationMultiUserDevicesResyncRestServlet(ReplicationEndpoint): """Ask master to resync the device list for multiple users from the same remote server by contacting their server. @@ -216,6 +160,5 @@ async def _handle_request( # type: ignore[override] def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: - ReplicationUserDevicesResyncRestServlet(hs).register(http_server) ReplicationMultiUserDevicesResyncRestServlet(hs).register(http_server) ReplicationUploadKeysForUserRestServlet(hs).register(http_server) From 4275b728621a08ff4ee3b69415fc70975a8e1dd0 Mon Sep 17 00:00:00 2001 From: Electron Date: Fri, 14 Apr 2023 14:31:38 +0530 Subject: [PATCH 07/10] Type fixed --- synapse/handlers/federation_event.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 6f41535b7776..06609fab93af 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -169,7 +169,7 @@ def __init__(self, hs: "HomeServer"): self._send_events = ReplicationFederationSendEventsRestServlet.make_client(hs) if hs.config.worker.worker_app: - self.multi_user_device_resync = ( + self._multi_user_device_resync = ( ReplicationMultiUserDevicesResyncRestServlet.make_client(hs) ) else: @@ -1489,7 +1489,7 @@ async def _resync_device(self, sender: str) -> None: # Immediately attempt a resync in the background if self._config.worker.worker_app: - await self.multi_user_device_resync(user_ids=[sender]) + await self._multi_user_device_resync(user_ids=[sender]) else: await self._device_list_updater.multi_user_device_resync( user_ids=[sender] From 8ef08bb97f380712710937ee06b3d333c2b0e29c Mon Sep 17 00:00:00 2001 From: Electron Date: Sat, 15 Apr 2023 10:44:55 +0530 Subject: [PATCH 08/10] Removed user_device_resync implementation --- synapse/handlers/device.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 2aa11e0f8afe..4ee65ecac060 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -1244,18 +1244,6 @@ async def multi_user_device_resync( return result - async def user_device_resync( - self, user_id: str, mark_failed_as_stale: bool = True - ) -> Optional[JsonDict]: - result, failed = await self._user_device_resync_returning_failed(user_id) - - if failed and mark_failed_as_stale: - # Mark the remote user's device list as stale so we know we need to retry - # it later. - await self.store.mark_remote_users_device_caches_as_stale((user_id,)) - - return result - async def _user_device_resync_returning_failed( self, user_id: str ) -> Tuple[Optional[JsonDict], bool]: From 41ecc8837590e030eda1e18c67fd3e44dddde36e Mon Sep 17 00:00:00 2001 From: Electron Date: Sat, 15 Apr 2023 11:15:25 +0530 Subject: [PATCH 09/10] Removed single user resync test and implementation --- synapse/handlers/device.py | 27 ++++----------------------- tests/test_federation.py | 4 +++- 2 files changed, 7 insertions(+), 24 deletions(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 4ee65ecac060..e2706a363efa 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -946,24 +946,6 @@ async def multi_user_device_resync( except: raise - async def user_device_resync( - self, user_id: str, mark_failed_as_stale: bool = True - ) -> Optional[JsonDict]: - """Fetches all devices for a user and updates the device cache with them. - - Args: - user_id: The user's id whose device_list will be updated. - mark_failed_as_stale: Whether to mark the user's device list as stale - if the attempt to resync failed. - Returns: - A dict with device info as under the "devices" in the result of this - request: - https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid - None when we weren't able to fetch the device info for some reason, - e.g. due to a connection problem. - """ - return (await self.multi_user_device_resync([user_id]))[user_id] - class DeviceListUpdater(DeviceListWorkerUpdater): "Handles incoming device list updates from federation and updates the DB" @@ -1115,7 +1097,7 @@ async def _handle_device_updates(self, user_id: str) -> None: ) if resync: - await self.user_device_resync(user_id) + await self.multi_user_device_resync([user_id]) else: # Simply update the single device, since we know that is the only # change (because of the single prev_id matching the current cache) @@ -1182,10 +1164,9 @@ async def _maybe_retry_device_resync(self) -> None: for user_id in need_resync: try: # Try to resync the current user's devices list. - result = await self.user_device_resync( - user_id=user_id, - mark_failed_as_stale=False, - ) + result = (await self.multi_user_device_resync([user_id], False))[ + user_id + ] # user_device_resync only returns a result if it managed to # successfully resync and update the database. Updating the table diff --git a/tests/test_federation.py b/tests/test_federation.py index 46d2f99eacc6..6d15ac759785 100644 --- a/tests/test_federation.py +++ b/tests/test_federation.py @@ -267,7 +267,9 @@ def test_cross_signing_keys_retry(self) -> None: # Resync the device list. device_handler = self.hs.get_device_handler() self.get_success( - device_handler.device_list_updater.user_device_resync(remote_user_id), + device_handler.device_list_updater.multi_user_device_resync( + [remote_user_id] + ), ) # Retrieve the cross-signing keys for this user. From fd6a7b6d53e540c751513a5ac14473c1b0ebcc44 Mon Sep 17 00:00:00 2001 From: Electron Date: Tue, 18 Apr 2023 17:22:40 +0530 Subject: [PATCH 10/10] Updated multi user device resync implementation --- synapse/handlers/device.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index e2706a363efa..4280d5dccbca 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -941,10 +941,7 @@ async def multi_user_device_resync( # Shortcut empty requests return {} - try: - return await self._multi_user_device_resync_client(user_ids=user_ids) - except: - raise + return await self._multi_user_device_resync_client(user_ids=user_ids) class DeviceListUpdater(DeviceListWorkerUpdater):