From 7da96a00f9cff8ebd4bfb3a5b432ff477c3cf572 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 16 Nov 2022 11:49:31 -0500 Subject: [PATCH 01/13] Include thread information when sending receipts over federation. --- changelog.d/14466.bugfix | 1 + .../sender/per_destination_queue.py | 7 ++- synapse/handlers/receipts.py | 1 - tests/federation/test_federation_sender.py | 43 +++++++++++++++++++ 4 files changed, 49 insertions(+), 3 deletions(-) create mode 100644 changelog.d/14466.bugfix diff --git a/changelog.d/14466.bugfix b/changelog.d/14466.bugfix new file mode 100644 index 000000000000..82f6e6b68e1f --- /dev/null +++ b/changelog.d/14466.bugfix @@ -0,0 +1 @@ +Fix a bug introduced in Synapse 1.70.0 where a receipt's thread ID was not sent over federation. diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index 084c45a95ca1..eed965090f7f 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -35,7 +35,7 @@ from synapse.logging.opentracing import SynapseTags, set_tag from synapse.metrics import sent_transactions_counter from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.types import ReadReceipt +from synapse.types import JsonDict, ReadReceipt from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter from synapse.visibility import filter_events_for_server @@ -202,9 +202,12 @@ def queue_read_receipt(self, receipt: ReadReceipt) -> None: Args: receipt: receipt to be queued """ + serialized_receipt: JsonDict = {"event_ids": receipt.event_ids, "data": receipt.data} + if receipt.thread_id is not None: + serialized_receipt["data"]["thread_id"] = receipt.thread_id self._pending_rrs.setdefault(receipt.room_id, {}).setdefault( receipt.receipt_type, {} - )[receipt.user_id] = {"event_ids": receipt.event_ids, "data": receipt.data} + )[receipt.user_id] = serialized_receipt def flush_read_receipts_for_room(self, room_id: str) -> None: # if we don't have any read-receipts for this room, it may be that we've already diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index ac015824423f..6a4fed115671 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -92,7 +92,6 @@ async def _received_remote_receipt(self, origin: str, content: JsonDict) -> None continue # Check if these receipts apply to a thread. - thread_id = None data = user_values.get("data", {}) thread_id = data.get("thread_id") # If the thread ID is invalid, consider it missing. diff --git a/tests/federation/test_federation_sender.py b/tests/federation/test_federation_sender.py index f1e357764ff4..a7b4e6050ef0 100644 --- a/tests/federation/test_federation_sender.py +++ b/tests/federation/test_federation_sender.py @@ -83,6 +83,49 @@ def test_send_receipts(self): ], ) + @override_config({"send_federation": True}) + def test_send_receipts_thread(self): + mock_send_transaction = ( + self.hs.get_federation_transport_client().send_transaction + ) + mock_send_transaction.return_value = make_awaitable({}) + + sender = self.hs.get_federation_sender() + receipt = ReadReceipt( + "room_id", + "m.read", + "user_id", + ["event_id"], + thread_id="thread_id", + data={"ts": 1234}, + ) + self.successResultOf(defer.ensureDeferred(sender.send_read_receipt(receipt))) + + self.pump() + + # expect a call to send_transaction + mock_send_transaction.assert_called_once() + json_cb = mock_send_transaction.call_args[0][1] + data = json_cb() + self.assertEqual( + data["edus"], + [ + { + "edu_type": EduTypes.RECEIPT, + "content": { + "room_id": { + "m.read": { + "user_id": { + "event_ids": ["event_id"], + "data": {"ts": 1234, "thread_id": "thread_id"}, + } + } + } + }, + } + ], + ) + @override_config({"send_federation": True}) def test_send_receipts_with_backoff(self): """Send two receipts in quick succession; the second should be flushed, but From 11518594e2e5e3663e2e2742c05d16df84327bc6 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 16 Nov 2022 12:24:46 -0500 Subject: [PATCH 02/13] Potentially send multiple EDUs to separate threads. --- .../sender/per_destination_queue.py | 56 +++++++++++++++---- tests/federation/test_federation_sender.py | 30 +++++++++- 2 files changed, 72 insertions(+), 14 deletions(-) diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index eed965090f7f..f097a376375f 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -136,8 +136,10 @@ def __init__( # destination self._pending_presence: Dict[str, UserPresenceState] = {} - # room_id -> receipt_type -> user_id -> receipt_dict - self._pending_rrs: Dict[str, Dict[str, Dict[str, dict]]] = {} + # room_id -> receipt_type -> thread_id -> user_id -> receipt_dict + self._pending_rrs: Dict[ + str, Dict[str, Dict[Optional[str], Dict[str, dict]]] + ] = {} self._rrs_pending_flush = False # stream_id of last successfully sent to-device message. @@ -202,12 +204,15 @@ def queue_read_receipt(self, receipt: ReadReceipt) -> None: Args: receipt: receipt to be queued """ - serialized_receipt: JsonDict = {"event_ids": receipt.event_ids, "data": receipt.data} + serialized_receipt: JsonDict = { + "event_ids": receipt.event_ids, + "data": receipt.data, + } if receipt.thread_id is not None: serialized_receipt["data"]["thread_id"] = receipt.thread_id self._pending_rrs.setdefault(receipt.room_id, {}).setdefault( receipt.receipt_type, {} - )[receipt.user_id] = serialized_receipt + ).setdefault(receipt.thread_id, {})[receipt.user_id] = serialized_receipt def flush_read_receipts_for_room(self, room_id: str) -> None: # if we don't have any read-receipts for this room, it may be that we've already @@ -552,15 +557,44 @@ def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]: # not yet time for this lot return - edu = Edu( - origin=self._server_name, - destination=self._destination, - edu_type=EduTypes.RECEIPT, - content=self._pending_rrs, - ) + # Build the EDUs needed to send these receipts. This is a bit complicated + # since we can share one for each unique (room, receipt type, user), but + # need additional ones for different threads. The result is that we will + # send N EDUs where N is the maximum number of threads in a room. + # + # This could be slightly more efficient by bundling users who have only + # send receipts for different threads. + while self._pending_rrs: + # The next EDU's content. + content: JsonDict = {} + + # Iterate each room's receipt types and threads, adding it to the content. + for room_id in list(self._pending_rrs.keys()): + for receipt_type in list(self._pending_rrs[room_id].keys()): + thread_ids = self._pending_rrs[room_id][receipt_type] + # The thread ID itself doesn't matter at this point. + content.setdefault(room_id, {})[ + receipt_type + ] = thread_ids.popitem()[1] + + # If there are no threads left in this room / receipt type. + # Clear it out. + if not thread_ids: + del self._pending_rrs[room_id][receipt_type] + + # Again, clear out any blank rooms. + if not self._pending_rrs[room_id]: + del self._pending_rrs[room_id] + + yield Edu( + origin=self._server_name, + destination=self._destination, + edu_type=EduTypes.RECEIPT, + content=content, + ) + self._pending_rrs = {} self._rrs_pending_flush = False - yield edu def _pop_pending_edus(self, limit: int) -> List[Edu]: pending_edus = self._pending_edus diff --git a/tests/federation/test_federation_sender.py b/tests/federation/test_federation_sender.py index a7b4e6050ef0..06588ed0135c 100644 --- a/tests/federation/test_federation_sender.py +++ b/tests/federation/test_federation_sender.py @@ -90,7 +90,17 @@ def test_send_receipts_thread(self): ) mock_send_transaction.return_value = make_awaitable({}) + # Create receipts for the same room and user, but on two different threads, sender = self.hs.get_federation_sender() + receipt = ReadReceipt( + "room_id", + "m.read", + "user_id", + ["event_id"], + thread_id=None, + data={"ts": 1234}, + ) + self.successResultOf(defer.ensureDeferred(sender.send_read_receipt(receipt))) receipt = ReadReceipt( "room_id", "m.read", @@ -103,11 +113,12 @@ def test_send_receipts_thread(self): self.pump() - # expect a call to send_transaction + # expect a call to send_transaction with two EDUs to separate threads. mock_send_transaction.assert_called_once() json_cb = mock_send_transaction.call_args[0][1] data = json_cb() - self.assertEqual( + # Note that the ordering of the EDUs doesn't matter. + self.assertCountEqual( data["edus"], [ { @@ -122,7 +133,20 @@ def test_send_receipts_thread(self): } } }, - } + }, + { + "edu_type": EduTypes.RECEIPT, + "content": { + "room_id": { + "m.read": { + "user_id": { + "event_ids": ["event_id"], + "data": {"ts": 1234}, + } + } + } + }, + }, ], ) From 2d9ae2dbfec81c623d835c00ccd6d4a1f5020d4f Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 16 Nov 2022 12:31:42 -0500 Subject: [PATCH 03/13] Update tests further. --- .../sender/per_destination_queue.py | 6 +-- tests/federation/test_federation_sender.py | 47 ++++++++++--------- 2 files changed, 27 insertions(+), 26 deletions(-) diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index f097a376375f..e66a72393bdb 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -560,10 +560,10 @@ def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]: # Build the EDUs needed to send these receipts. This is a bit complicated # since we can share one for each unique (room, receipt type, user), but # need additional ones for different threads. The result is that we will - # send N EDUs where N is the maximum number of threads in a room. + # send N EDUs where N is the number of unique threads across rooms. # - # This could be slightly more efficient by bundling users who have only - # send receipts for different threads. + # This could be more efficient by bundling users who have sent receipts + # for different threads. while self._pending_rrs: # The next EDU's content. content: JsonDict = {} diff --git a/tests/federation/test_federation_sender.py b/tests/federation/test_federation_sender.py index 06588ed0135c..3258f90e7e75 100644 --- a/tests/federation/test_federation_sender.py +++ b/tests/federation/test_federation_sender.py @@ -90,26 +90,23 @@ def test_send_receipts_thread(self): ) mock_send_transaction.return_value = make_awaitable({}) - # Create receipts for the same room and user, but on two different threads, + # Create receipts for: + # + # * The same room / user on multiple threads. + # * A different user in the same room. sender = self.hs.get_federation_sender() - receipt = ReadReceipt( - "room_id", - "m.read", - "user_id", - ["event_id"], - thread_id=None, - data={"ts": 1234}, - ) - self.successResultOf(defer.ensureDeferred(sender.send_read_receipt(receipt))) - receipt = ReadReceipt( - "room_id", - "m.read", - "user_id", - ["event_id"], - thread_id="thread_id", - data={"ts": 1234}, - ) - self.successResultOf(defer.ensureDeferred(sender.send_read_receipt(receipt))) + for user, thread in (("alice", None), ("alice", "thread"), ("bob", None)): + receipt = ReadReceipt( + "room_id", + "m.read", + user, + ["event_id"], + thread_id=thread, + data={"ts": 1234}, + ) + self.successResultOf( + defer.ensureDeferred(sender.send_read_receipt(receipt)) + ) self.pump() @@ -126,9 +123,9 @@ def test_send_receipts_thread(self): "content": { "room_id": { "m.read": { - "user_id": { + "alice": { "event_ids": ["event_id"], - "data": {"ts": 1234, "thread_id": "thread_id"}, + "data": {"ts": 1234, "thread_id": "thread"}, } } } @@ -139,10 +136,14 @@ def test_send_receipts_thread(self): "content": { "room_id": { "m.read": { - "user_id": { + "alice": { "event_ids": ["event_id"], "data": {"ts": 1234}, - } + }, + "bob": { + "event_ids": ["event_id"], + "data": {"ts": 1234}, + }, } } }, From 423321ac31e7e64ee6605593b7ed6e1178965b44 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 17 Nov 2022 15:10:59 -0500 Subject: [PATCH 04/13] Add limiting of read-receipt EDUs. --- .../sender/per_destination_queue.py | 90 +++++++++++-------- 1 file changed, 53 insertions(+), 37 deletions(-) diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index e66a72393bdb..bab392afdcc1 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -550,7 +550,7 @@ async def _catch_up_transmission_loop(self) -> None: self._destination, last_successful_stream_ordering ) - def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]: + def _get_rr_edus(self, force_flush: bool, limit: int) -> Iterable[Edu]: if not self._pending_rrs: return if not force_flush and not self._rrs_pending_flush: @@ -564,7 +564,8 @@ def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]: # # This could be more efficient by bundling users who have sent receipts # for different threads. - while self._pending_rrs: + generated_edus = 0 + while self._pending_rrs and generated_edus < limit: # The next EDU's content. content: JsonDict = {} @@ -592,6 +593,7 @@ def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]: edu_type=EduTypes.RECEIPT, content=content, ) + generated_edus += 1 self._pending_rrs = {} self._rrs_pending_flush = False @@ -681,27 +683,61 @@ class _TransactionQueueManager: async def __aenter__(self) -> Tuple[List[EventBase], List[Edu]]: # First we calculate the EDUs we want to send, if any. - # We start by fetching device related EDUs, i.e device updates and to - # device messages. We have to keep 2 free slots for presence and rr_edus. - device_edu_limit = MAX_EDUS_PER_TRANSACTION - 2 + # There's a maximum number of EDUs that can be sent with a transaction, + # generally device udates and to-device messages get priority, but we + # want to ensure that there's room for some other EDUs as well. + # + # This is done by: + # + # * Add a presence EDU, if one exists. + # * Add up-to a small limit of read receipt EDUs. + # * Add to-device EDUs, but leave some space for device list updates. + # * Add device list updates EDUs. + # * If there's any remaining room, add other EDUs. + pending_edus = [] + + # Add presence EDU. + if self.queue._pending_presence: + pending_edus.append( + Edu( + origin=self.queue._server_name, + destination=self.queue._destination, + edu_type=EduTypes.PRESENCE, + content={ + "push": [ + format_user_presence_state( + presence, self.queue._clock.time_msec() + ) + for presence in self.queue._pending_presence.values() + ] + }, + ) + ) + self.queue._pending_presence = {} - # We prioritize to-device messages so that existing encryption channels + # Add read receipt EDUs. + pending_edus.extend(self.queue._get_rr_edus(force_flush=False, limit=5)) + edu_limit = MAX_EDUS_PER_TRANSACTION - len(pending_edus) + + # Next, prioritize to-device messages so that existing encryption channels # work. We also keep a few slots spare (by reducing the limit) so that # we can still trickle out some device list updates. ( to_device_edus, device_stream_id, - ) = await self.queue._get_to_device_message_edus(device_edu_limit - 10) + ) = await self.queue._get_to_device_message_edus(edu_limit - 10) if to_device_edus: self._device_stream_id = device_stream_id else: self.queue._last_device_stream_id = device_stream_id - device_edu_limit -= len(to_device_edus) + pending_edus.extend(to_device_edus) + edu_limit -= len(to_device_edus) + # Add device list update EDUs. device_update_edus, dev_list_id = await self.queue._get_device_update_edus( - device_edu_limit + edu_limit ) if device_update_edus: @@ -709,40 +745,18 @@ async def __aenter__(self) -> Tuple[List[EventBase], List[Edu]]: else: self.queue._last_device_list_stream_id = dev_list_id - pending_edus = device_update_edus + to_device_edus - - # Now add the read receipt EDU. - pending_edus.extend(self.queue._get_rr_edus(force_flush=False)) - - # And presence EDU. - if self.queue._pending_presence: - pending_edus.append( - Edu( - origin=self.queue._server_name, - destination=self.queue._destination, - edu_type=EduTypes.PRESENCE, - content={ - "push": [ - format_user_presence_state( - presence, self.queue._clock.time_msec() - ) - for presence in self.queue._pending_presence.values() - ] - }, - ) - ) - self.queue._pending_presence = {} + pending_edus.extend(device_update_edus) + edu_limit -= len(device_update_edus) # Finally add any other types of EDUs if there is room. - pending_edus.extend( - self.queue._pop_pending_edus(MAX_EDUS_PER_TRANSACTION - len(pending_edus)) - ) + pending_edus.extend(self.queue._pop_pending_edus(edu_limit)) while ( len(pending_edus) < MAX_EDUS_PER_TRANSACTION and self.queue._pending_edus_keyed ): _, val = self.queue._pending_edus_keyed.popitem() pending_edus.append(val) + edu_limit -= 1 # Now we look for any PDUs to send, by getting up to 50 PDUs from the # queue @@ -753,8 +767,10 @@ async def __aenter__(self) -> Tuple[List[EventBase], List[Edu]]: # if we've decided to send a transaction anyway, and we have room, we # may as well send any pending RRs - if len(pending_edus) < MAX_EDUS_PER_TRANSACTION: - pending_edus.extend(self.queue._get_rr_edus(force_flush=True)) + if edu_limit: + pending_edus.extend( + self.queue._get_rr_edus(force_flush=True, limit=edu_limit) + ) if self._pdus: self._last_stream_ordering = self._pdus[ From 48f2e80c3b17f51b4f76e5f13d65432f14dda9eb Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 18 Nov 2022 08:03:52 -0500 Subject: [PATCH 05/13] Don't re-create the _pending_rrs variable. --- synapse/federation/sender/per_destination_queue.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index bab392afdcc1..47ae060c8b21 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -595,7 +595,6 @@ def _get_rr_edus(self, force_flush: bool, limit: int) -> Iterable[Edu]: ) generated_edus += 1 - self._pending_rrs = {} self._rrs_pending_flush = False def _pop_pending_edus(self, limit: int) -> List[Edu]: From 4b149dd103151979c180796dbab5a63aa6a73556 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 18 Nov 2022 08:05:24 -0500 Subject: [PATCH 06/13] Add another note. --- synapse/federation/sender/per_destination_queue.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index 47ae060c8b21..db0de0c7b264 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -570,6 +570,10 @@ def _get_rr_edus(self, force_flush: bool, limit: int) -> Iterable[Edu]: content: JsonDict = {} # Iterate each room's receipt types and threads, adding it to the content. + # + # _pending_rrs is mutated inside of this loop so take care to iterate + # a copy of the keys. Similarly, the dictionary values of _pending_rrs + # are mutated during iteration. for room_id in list(self._pending_rrs.keys()): for receipt_type in list(self._pending_rrs[room_id].keys()): thread_ids = self._pending_rrs[room_id][receipt_type] From 90611e552f3f739d04490bca46c5d8b3055c1484 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 18 Nov 2022 08:09:04 -0500 Subject: [PATCH 07/13] Add more comments. --- synapse/federation/sender/per_destination_queue.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index db0de0c7b264..7d637ca0d622 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -562,8 +562,8 @@ def _get_rr_edus(self, force_flush: bool, limit: int) -> Iterable[Edu]: # need additional ones for different threads. The result is that we will # send N EDUs where N is the number of unique threads across rooms. # - # This could be more efficient by bundling users who have sent receipts - # for different threads. + # TODO This could be more efficient by bundling users who have sent + # receipts for different threads. generated_edus = 0 while self._pending_rrs and generated_edus < limit: # The next EDU's content. @@ -577,17 +577,18 @@ def _get_rr_edus(self, force_flush: bool, limit: int) -> Iterable[Edu]: for room_id in list(self._pending_rrs.keys()): for receipt_type in list(self._pending_rrs[room_id].keys()): thread_ids = self._pending_rrs[room_id][receipt_type] - # The thread ID itself doesn't matter at this point. + # The thread ID itself doesn't matter at this point -- it is + # included in the receipt data already. content.setdefault(room_id, {})[ receipt_type ] = thread_ids.popitem()[1] - # If there are no threads left in this room / receipt type. - # Clear it out. + # If there are no threads left in the receipt type for this + # room, then delete the entire receipt type. if not thread_ids: del self._pending_rrs[room_id][receipt_type] - # Again, clear out any blank rooms. + # If there are no receipt types left in this room, delete the room. if not self._pending_rrs[room_id]: del self._pending_rrs[room_id] From bffd05294d6d4e3effe6dfcca734464f4fa5b51a Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 18 Nov 2022 08:11:02 -0500 Subject: [PATCH 08/13] Do not always reset the flush flag. --- synapse/federation/sender/per_destination_queue.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index 7d637ca0d622..94e752fb389a 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -600,7 +600,10 @@ def _get_rr_edus(self, force_flush: bool, limit: int) -> Iterable[Edu]: ) generated_edus += 1 - self._rrs_pending_flush = False + # If there are still pending read-receipts, don't reset the pending flush + # flag. + if not self._pending_rrs: + self._rrs_pending_flush = False def _pop_pending_edus(self, limit: int) -> List[Edu]: pending_edus = self._pending_edus From 276fdc9e86f32e7dd22c302dbb44319fd5c23b46 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 21 Nov 2022 14:26:56 -0500 Subject: [PATCH 09/13] Combine EDUs for different threads by different users. --- .../sender/per_destination_queue.py | 104 +++++++++--------- tests/federation/test_federation_sender.py | 13 ++- 2 files changed, 61 insertions(+), 56 deletions(-) diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index 94e752fb389a..c6ab7eb81089 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -136,10 +136,11 @@ def __init__( # destination self._pending_presence: Dict[str, UserPresenceState] = {} - # room_id -> receipt_type -> thread_id -> user_id -> receipt_dict - self._pending_rrs: Dict[ - str, Dict[str, Dict[Optional[str], Dict[str, dict]]] - ] = {} + # List of room_id -> receipt_type -> user_id -> receipt_dict, + # + # Each receipt can only have a single receipt per + # (room ID, receipt type, user ID, thread ID) tuple. + self._pending_receipt_edus: List[Dict[str, Dict[str, Dict[str, dict]]]] = [] self._rrs_pending_flush = False # stream_id of last successfully sent to-device message. @@ -210,17 +211,45 @@ def queue_read_receipt(self, receipt: ReadReceipt) -> None: } if receipt.thread_id is not None: serialized_receipt["data"]["thread_id"] = receipt.thread_id - self._pending_rrs.setdefault(receipt.room_id, {}).setdefault( - receipt.receipt_type, {} - ).setdefault(receipt.thread_id, {})[receipt.user_id] = serialized_receipt + + # Find which EDU to add this receipt to. There's three situations depending + # on the (room ID, receipt type, user, thread ID) tuple: + # + # 1. If it fully matches, clobber the information. + # 2. If it is missing, add the information. + # 3. If the subset tuple of (room ID, receipt type, user) matches, check + # the next EDU (or add a new EDU). + for edu in self._pending_receipt_edus: + receipt_content = edu.setdefault(receipt.room_id, {}).setdefault( + receipt.receipt_type, {} + ) + # If this room ID, receipt type, user ID is not in this EDU, OR if + # the full tuple matches, use the current EDU. + if ( + receipt.user_id not in receipt_content + or receipt_content[receipt.user_id].get("thread_id") + == receipt.thread_id + ): + receipt_content[receipt.user_id] = serialized_receipt + break + + # If no matching EDU was found, create a new one. + else: + self._pending_receipt_edus.append( + { + receipt.room_id: { + receipt.receipt_type: {receipt.user_id: serialized_receipt} + } + } + ) def flush_read_receipts_for_room(self, room_id: str) -> None: - # if we don't have any read-receipts for this room, it may be that we've already + # if we don't have any receipts for this room, it may be that we've already # sent them out, so we don't need to flush. - if room_id not in self._pending_rrs: - return - self._rrs_pending_flush = True - self.attempt_new_transaction() + for edu in self._pending_receipt_edus: + if room_id in edu: + self._rrs_pending_flush = True + self.attempt_new_transaction() def send_keyed_edu(self, edu: Edu, key: Hashable) -> None: self._pending_edus_keyed[(edu.edu_type, key)] = edu @@ -359,7 +388,7 @@ async def _transaction_transmission_loop(self) -> None: self._pending_edus = [] self._pending_edus_keyed = {} self._pending_presence = {} - self._pending_rrs = {} + self._pending_receipt_edus = [] self._start_catching_up() except FederationDeniedError as e: @@ -550,59 +579,26 @@ async def _catch_up_transmission_loop(self) -> None: self._destination, last_successful_stream_ordering ) - def _get_rr_edus(self, force_flush: bool, limit: int) -> Iterable[Edu]: - if not self._pending_rrs: + def _get_receipt_edus(self, force_flush: bool, limit: int) -> Iterable[Edu]: + if not self._pending_receipt_edus: return if not force_flush and not self._rrs_pending_flush: # not yet time for this lot return - # Build the EDUs needed to send these receipts. This is a bit complicated - # since we can share one for each unique (room, receipt type, user), but - # need additional ones for different threads. The result is that we will - # send N EDUs where N is the number of unique threads across rooms. - # - # TODO This could be more efficient by bundling users who have sent - # receipts for different threads. - generated_edus = 0 - while self._pending_rrs and generated_edus < limit: - # The next EDU's content. - content: JsonDict = {} - - # Iterate each room's receipt types and threads, adding it to the content. - # - # _pending_rrs is mutated inside of this loop so take care to iterate - # a copy of the keys. Similarly, the dictionary values of _pending_rrs - # are mutated during iteration. - for room_id in list(self._pending_rrs.keys()): - for receipt_type in list(self._pending_rrs[room_id].keys()): - thread_ids = self._pending_rrs[room_id][receipt_type] - # The thread ID itself doesn't matter at this point -- it is - # included in the receipt data already. - content.setdefault(room_id, {})[ - receipt_type - ] = thread_ids.popitem()[1] - - # If there are no threads left in the receipt type for this - # room, then delete the entire receipt type. - if not thread_ids: - del self._pending_rrs[room_id][receipt_type] - - # If there are no receipt types left in this room, delete the room. - if not self._pending_rrs[room_id]: - del self._pending_rrs[room_id] - + # Send at most limit EDUs for receipts. + for content in self._pending_receipt_edus[:limit]: yield Edu( origin=self._server_name, destination=self._destination, edu_type=EduTypes.RECEIPT, content=content, ) - generated_edus += 1 + self._pending_receipt_edus = self._pending_receipt_edus[limit:] # If there are still pending read-receipts, don't reset the pending flush # flag. - if not self._pending_rrs: + if not self._pending_receipt_edus: self._rrs_pending_flush = False def _pop_pending_edus(self, limit: int) -> List[Edu]: @@ -723,7 +719,7 @@ async def __aenter__(self) -> Tuple[List[EventBase], List[Edu]]: self.queue._pending_presence = {} # Add read receipt EDUs. - pending_edus.extend(self.queue._get_rr_edus(force_flush=False, limit=5)) + pending_edus.extend(self.queue._get_receipt_edus(force_flush=False, limit=5)) edu_limit = MAX_EDUS_PER_TRANSACTION - len(pending_edus) # Next, prioritize to-device messages so that existing encryption channels @@ -776,7 +772,7 @@ async def __aenter__(self) -> Tuple[List[EventBase], List[Edu]]: # may as well send any pending RRs if edu_limit: pending_edus.extend( - self.queue._get_rr_edus(force_flush=True, limit=edu_limit) + self.queue._get_receipt_edus(force_flush=True, limit=edu_limit) ) if self._pdus: diff --git a/tests/federation/test_federation_sender.py b/tests/federation/test_federation_sender.py index 3258f90e7e75..01f147418b9b 100644 --- a/tests/federation/test_federation_sender.py +++ b/tests/federation/test_federation_sender.py @@ -95,7 +95,12 @@ def test_send_receipts_thread(self): # * The same room / user on multiple threads. # * A different user in the same room. sender = self.hs.get_federation_sender() - for user, thread in (("alice", None), ("alice", "thread"), ("bob", None)): + for user, thread in ( + ("alice", None), + ("alice", "thread"), + ("bob", None), + ("bob", "diff-thread"), + ): receipt = ReadReceipt( "room_id", "m.read", @@ -126,7 +131,11 @@ def test_send_receipts_thread(self): "alice": { "event_ids": ["event_id"], "data": {"ts": 1234, "thread_id": "thread"}, - } + }, + "bob": { + "event_ids": ["event_id"], + "data": {"ts": 1234, "thread_id": "diff-thread"}, + }, } } }, From ac11302b2b246d39776343600f9d4beedd11814d Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 21 Nov 2022 14:42:52 -0500 Subject: [PATCH 10/13] Handle review comments. --- synapse/federation/sender/per_destination_queue.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index c6ab7eb81089..981161563515 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -752,11 +752,10 @@ async def __aenter__(self) -> Tuple[List[EventBase], List[Edu]]: edu_limit -= len(device_update_edus) # Finally add any other types of EDUs if there is room. - pending_edus.extend(self.queue._pop_pending_edus(edu_limit)) - while ( - len(pending_edus) < MAX_EDUS_PER_TRANSACTION - and self.queue._pending_edus_keyed - ): + other_edus = self.queue._pop_pending_edus(edu_limit) + pending_edus.extend(other_edus) + edu_limit -= len(other_edus) + while edu_limit > 0 and self.queue._pending_edus_keyed: _, val = self.queue._pending_edus_keyed.popitem() pending_edus.append(val) edu_limit -= 1 From a6ad64439cf9244e14d35fb0979706af201e7da4 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 28 Nov 2022 08:47:26 -0500 Subject: [PATCH 11/13] Fix typo. Co-authored-by: Sean Quah <8349537+squahtx@users.noreply.github.com> --- synapse/federation/sender/per_destination_queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index 981161563515..e0c4dda03506 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -687,7 +687,7 @@ async def __aenter__(self) -> Tuple[List[EventBase], List[Edu]]: # First we calculate the EDUs we want to send, if any. # There's a maximum number of EDUs that can be sent with a transaction, - # generally device udates and to-device messages get priority, but we + # generally device updates and to-device messages get priority, but we # want to ensure that there's room for some other EDUs as well. # # This is done by: From 79a1e357f6e2f8b5459c11bd2f215cabdb95b557 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 28 Nov 2022 08:53:56 -0500 Subject: [PATCH 12/13] Break on first EDU. Co-authored-by: Sean Quah <8349537+squahtx@users.noreply.github.com> --- synapse/federation/sender/per_destination_queue.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index e0c4dda03506..adb15b833586 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -250,6 +250,7 @@ def flush_read_receipts_for_room(self, room_id: str) -> None: if room_id in edu: self._rrs_pending_flush = True self.attempt_new_transaction() + break def send_keyed_edu(self, edu: Edu, key: Hashable) -> None: self._pending_edus_keyed[(edu.edu_type, key)] = edu From b8aee7e522f52518be76a86a2f3a7c99db8b3540 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 28 Nov 2022 08:56:26 -0500 Subject: [PATCH 13/13] Clarify comments. --- synapse/federation/sender/per_destination_queue.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index adb15b833586..465eadbee868 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -244,12 +244,13 @@ def queue_read_receipt(self, receipt: ReadReceipt) -> None: ) def flush_read_receipts_for_room(self, room_id: str) -> None: - # if we don't have any receipts for this room, it may be that we've already - # sent them out, so we don't need to flush. + # If there are any pending receipts for this room then force-flush them + # in a new transaction. for edu in self._pending_receipt_edus: if room_id in edu: self._rrs_pending_flush = True self.attempt_new_transaction() + # No use in checking remaining EDUs if the room was found. break def send_keyed_edu(self, edu: Edu, key: Hashable) -> None: