From 5afc264dd54221142b0acd0a56cbe07c3eac2113 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 29 Oct 2021 02:43:08 -0500 Subject: [PATCH] WIP: Sort events topologically when we receive them over backfill --- synapse/handlers/federation_event.py | 125 ++++++++++++++++++- synapse/handlers/message.py | 2 +- synapse/rest/client/room_batch.py | 2 +- synapse/storage/databases/main/room_batch.py | 2 +- 4 files changed, 126 insertions(+), 5 deletions(-) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 610a4e48c53b..66d3da871900 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -72,7 +72,7 @@ get_domain_from_id, ) from synapse.util.async_helpers import Linearizer, concurrently_execute -from synapse.util.iterutils import batch_iter +from synapse.util.iterutils import batch_iter, sorted_topologically from synapse.util.retryutils import NotRetryingDestination from synapse.util.stringutils import shortstr @@ -665,9 +665,130 @@ async def _process_pulled_events( notification to clients, and validation of device keys.) """ + logger.info( + "backfill events=%s", + [ + "event_id=%s,depth=%d,body=%s,prevs=%s\n" + % ( + event.event_id, + event.depth, + event.content.get("body", event.type), + event.prev_event_ids(), + ) + for event in events + ], + ) + # We want to sort these by depth so we process them and # tell clients about them in order. - sorted_events = sorted(events, key=lambda x: x.depth) + # sorted_events = sorted(events, key=lambda x: x.depth) + + event_ids = [event.event_id for event in events] + event_map = {event.event_id: event for event in events} + + # Since the insertion event we try to reference later on might be in the + # backfill chunk itself, we need to make it easy to lookup. Maps a given + # batch_id to the insertion event. + batch_id_map = { + event.content.get( + EventContentFields.MSC2716_NEXT_BATCH_ID, None + ): event.event_id + for event in events + if event.type == EventTypes.MSC2716_INSERTION + } + + successor_event_id_map = {} + for event in events: + for prev_event_id in event.prev_event_ids(): + successor_event_id_map.setdefault(prev_event_id, []).append( + event.event_id + ) + + event_id_graph = {} + for event in events: + # Assign the real edges to the graph. + # Make a copy so we don't modify the actual prev_events when we extend them below. + event_id_graph.setdefault(event.event_id, []).extend( + event.prev_event_ids().copy() + ) + + # We need to make some fake edge connections from the batch event at + # the bottom of the historical batch to the insertion event. This + # way the historical batch topologically sorts in ahead-in-time of + # the event we branched off of. + batch_id = event.content.get(EventContentFields.MSC2716_BATCH_ID, None) + if event.type == EventTypes.MSC2716_BATCH and batch_id: + # Maybe we can get lucky and save ourselves a lookup + # by checking the events in the backfill first + insertion_event_id = batch_id_map[ + batch_id + ] or await self._store.get_insertion_event_id_by_batch_id( + event.room_id, batch_id + ) + + if insertion_event_id: + # Add the insertion event as a fake edge connection to the batch + # event so the historical batch topologically sorts below + # the "live" event we branched off of. + event_id_graph.setdefault(event.event_id, []).append( + insertion_event_id + ) + + # Maybe we can get lucky and save ourselves a lookup + # by checking the events in the backfill first + insertion_event = event_map[ + insertion_event_id + ] or await self._store.get_event( + insertion_event_id, allow_none=True + ) + + if insertion_event: + # Also add some fake edges to connect the insertion + # event to it's prev_event successors so it sorts + # topologically behind-in-time the successor. Nestled + # perfectly between the prev_event and the successor. + for insertion_prev_event_id in insertion_event.prev_event_ids(): + successor_event_ids = successor_event_id_map[ + insertion_prev_event_id + ] + logger.info( + "insertion_event_id=%s successor_event_ids=%s", + insertion_event_id, + successor_event_ids, + ) + if successor_event_ids: + + event_id_graph.setdefault( + insertion_event_id, [] + ).extend( + [ + successor_event_id + for successor_event_id in successor_event_ids + # Don't add itself back as a successor + if successor_event_id != insertion_event_id + ] + ) + + # We want to sort topologically so we process them and tell clients + # about them in order. + sorted_events = [] + for event_id in sorted_topologically(event_ids, event_id_graph): + sorted_events.append(event_map[event_id]) + sorted_events = reversed(sorted_events) + + logger.info( + "backfill sorted_events=%s", + [ + "event_id=%s,depth=%d,body=%s,prevs=%s\n" + % ( + event.event_id, + event.depth, + event.content.get("body", event.type), + event.prev_event_ids(), + ) + for event in sorted_events + ], + ) for ev in sorted_events: with nested_logging_context(ev.event_id): diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index d6f0b99f5887..2f4b458d4564 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1511,7 +1511,7 @@ async def persist_and_notify_client_event( EventContentFields.MSC2716_NEXT_BATCH_ID ) conflicting_insertion_event_id = ( - await self.store.get_insertion_event_by_batch_id( + await self.store.get_insertion_event_id_by_batch_id( event.room_id, next_batch_id ) ) diff --git a/synapse/rest/client/room_batch.py b/synapse/rest/client/room_batch.py index 99f8156ad0ec..5423d39efde1 100644 --- a/synapse/rest/client/room_batch.py +++ b/synapse/rest/client/room_batch.py @@ -112,7 +112,7 @@ async def on_POST( # and have the batch connected. if batch_id_from_query: corresponding_insertion_event_id = ( - await self.store.get_insertion_event_by_batch_id( + await self.store.get_insertion_event_id_by_batch_id( room_id, batch_id_from_query ) ) diff --git a/synapse/storage/databases/main/room_batch.py b/synapse/storage/databases/main/room_batch.py index dcbce8fdcf03..97b261843782 100644 --- a/synapse/storage/databases/main/room_batch.py +++ b/synapse/storage/databases/main/room_batch.py @@ -18,7 +18,7 @@ class RoomBatchStore(SQLBaseStore): - async def get_insertion_event_by_batch_id( + async def get_insertion_event_id_by_batch_id( self, room_id: str, batch_id: str ) -> Optional[str]: """Retrieve a insertion event ID.