Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
WIP: Sort events topologically when we receive them over backfill
Browse files Browse the repository at this point in the history
  • Loading branch information
MadLittleMods committed Oct 29, 2021
1 parent 3e09d49 commit 5afc264
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 5 deletions.
125 changes: 123 additions & 2 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
get_domain_from_id,
)
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.iterutils import batch_iter
from synapse.util.iterutils import batch_iter, sorted_topologically
from synapse.util.retryutils import NotRetryingDestination
from synapse.util.stringutils import shortstr

Expand Down Expand Up @@ -665,9 +665,130 @@ async def _process_pulled_events(
notification to clients, and validation of device keys.)
"""

logger.info(
"backfill events=%s",
[
"event_id=%s,depth=%d,body=%s,prevs=%s\n"
% (
event.event_id,
event.depth,
event.content.get("body", event.type),
event.prev_event_ids(),
)
for event in events
],
)

# We want to sort these by depth so we process them and
# tell clients about them in order.
sorted_events = sorted(events, key=lambda x: x.depth)
# sorted_events = sorted(events, key=lambda x: x.depth)

event_ids = [event.event_id for event in events]
event_map = {event.event_id: event for event in events}

# Since the insertion event we try to reference later on might be in the
# backfill chunk itself, we need to make it easy to lookup. Maps a given
# batch_id to the insertion event.
batch_id_map = {
event.content.get(
EventContentFields.MSC2716_NEXT_BATCH_ID, None
): event.event_id
for event in events
if event.type == EventTypes.MSC2716_INSERTION
}

successor_event_id_map = {}
for event in events:
for prev_event_id in event.prev_event_ids():
successor_event_id_map.setdefault(prev_event_id, []).append(
event.event_id
)

event_id_graph = {}
for event in events:
# Assign the real edges to the graph.
# Make a copy so we don't modify the actual prev_events when we extend them below.
event_id_graph.setdefault(event.event_id, []).extend(
event.prev_event_ids().copy()
)

# We need to make some fake edge connections from the batch event at
# the bottom of the historical batch to the insertion event. This
# way the historical batch topologically sorts in ahead-in-time of
# the event we branched off of.
batch_id = event.content.get(EventContentFields.MSC2716_BATCH_ID, None)
if event.type == EventTypes.MSC2716_BATCH and batch_id:
# Maybe we can get lucky and save ourselves a lookup
# by checking the events in the backfill first
insertion_event_id = batch_id_map[
batch_id
] or await self._store.get_insertion_event_id_by_batch_id(
event.room_id, batch_id
)

if insertion_event_id:
# Add the insertion event as a fake edge connection to the batch
# event so the historical batch topologically sorts below
# the "live" event we branched off of.
event_id_graph.setdefault(event.event_id, []).append(
insertion_event_id
)

# Maybe we can get lucky and save ourselves a lookup
# by checking the events in the backfill first
insertion_event = event_map[
insertion_event_id
] or await self._store.get_event(
insertion_event_id, allow_none=True
)

if insertion_event:
# Also add some fake edges to connect the insertion
# event to it's prev_event successors so it sorts
# topologically behind-in-time the successor. Nestled
# perfectly between the prev_event and the successor.
for insertion_prev_event_id in insertion_event.prev_event_ids():
successor_event_ids = successor_event_id_map[
insertion_prev_event_id
]
logger.info(
"insertion_event_id=%s successor_event_ids=%s",
insertion_event_id,
successor_event_ids,
)
if successor_event_ids:

event_id_graph.setdefault(
insertion_event_id, []
).extend(
[
successor_event_id
for successor_event_id in successor_event_ids
# Don't add itself back as a successor
if successor_event_id != insertion_event_id
]
)

# We want to sort topologically so we process them and tell clients
# about them in order.
sorted_events = []
for event_id in sorted_topologically(event_ids, event_id_graph):
sorted_events.append(event_map[event_id])
sorted_events = reversed(sorted_events)

logger.info(
"backfill sorted_events=%s",
[
"event_id=%s,depth=%d,body=%s,prevs=%s\n"
% (
event.event_id,
event.depth,
event.content.get("body", event.type),
event.prev_event_ids(),
)
for event in sorted_events
],
)

for ev in sorted_events:
with nested_logging_context(ev.event_id):
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -1511,7 +1511,7 @@ async def persist_and_notify_client_event(
EventContentFields.MSC2716_NEXT_BATCH_ID
)
conflicting_insertion_event_id = (
await self.store.get_insertion_event_by_batch_id(
await self.store.get_insertion_event_id_by_batch_id(
event.room_id, next_batch_id
)
)
Expand Down
2 changes: 1 addition & 1 deletion synapse/rest/client/room_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ async def on_POST(
# and have the batch connected.
if batch_id_from_query:
corresponding_insertion_event_id = (
await self.store.get_insertion_event_by_batch_id(
await self.store.get_insertion_event_id_by_batch_id(
room_id, batch_id_from_query
)
)
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/room_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@


class RoomBatchStore(SQLBaseStore):
async def get_insertion_event_by_batch_id(
async def get_insertion_event_id_by_batch_id(
self, room_id: str, batch_id: str
) -> Optional[str]:
"""Retrieve a insertion event ID.
Expand Down

0 comments on commit 5afc264

Please sign in to comment.