Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Intelligently select extremities used in backfill. (#8349)
Browse files Browse the repository at this point in the history
Instead of just using the most recent extremities let's pick the
ones that will give us results that the pagination request cares about,
i.e. pick extremities only if they have a smaller depth than the
pagination token.

This is useful when we fail to backfill an extremity, as we no longer
get stuck requesting that same extremity repeatedly.
  • Loading branch information
erikjohnston authored Sep 18, 2020
1 parent 9db4c1b commit 43f2b67
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 20 deletions.
1 change: 1 addition & 0 deletions changelog.d/8349.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a longstanding bug where back pagination over federation could get stuck if it failed to handle a received event.
65 changes: 57 additions & 8 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -943,15 +943,26 @@ async def backfill(self, dest, room_id, limit, extremities):

return events

async def maybe_backfill(self, room_id, current_depth):
async def maybe_backfill(
self, room_id: str, current_depth: int, limit: int
) -> bool:
"""Checks the database to see if we should backfill before paginating,
and if so do.
Args:
room_id
current_depth: The depth from which we're paginating from. This is
used to decide if we should backfill and what extremities to
use.
limit: The number of events that the pagination request will
return. This is used as part of the heuristic to decide if we
should back paginate.
"""
extremities = await self.store.get_oldest_events_with_depth_in_room(room_id)

if not extremities:
logger.debug("Not backfilling as no extremeties found.")
return
return False

# We only want to paginate if we can actually see the events we'll get,
# as otherwise we'll just spend a lot of resources to get redacted
Expand Down Expand Up @@ -1004,16 +1015,54 @@ async def maybe_backfill(self, room_id, current_depth):
sorted_extremeties_tuple = sorted(extremities.items(), key=lambda e: -int(e[1]))
max_depth = sorted_extremeties_tuple[0][1]

# If we're approaching an extremity we trigger a backfill, otherwise we
# no-op.
#
# We chose twice the limit here as then clients paginating backwards
# will send pagination requests that trigger backfill at least twice
# using the most recent extremity before it gets removed (see below). We
# chose more than one times the limit in case of failure, but choosing a
# much larger factor will result in triggering a backfill request much
# earlier than necessary.
if current_depth - 2 * limit > max_depth:
logger.debug(
"Not backfilling as we don't need to. %d < %d - 2 * %d",
max_depth,
current_depth,
limit,
)
return False

logger.debug(
"room_id: %s, backfill: current_depth: %s, max_depth: %s, extrems: %s",
room_id,
current_depth,
max_depth,
sorted_extremeties_tuple,
)

# We ignore extremities that have a greater depth than our current depth
# as:
# 1. we don't really care about getting events that have happened
# before our current position; and
# 2. we have likely previously tried and failed to backfill from that
# extremity, so to avoid getting "stuck" requesting the same
# backfill repeatedly we drop those extremities.
filtered_sorted_extremeties_tuple = [
t for t in sorted_extremeties_tuple if int(t[1]) <= current_depth
]

# However, we need to check that the filtered extremities are non-empty.
# If they are empty then either we can a) bail or b) still attempt to
# backill. We opt to try backfilling anyway just in case we do get
# relevant events.
if filtered_sorted_extremeties_tuple:
sorted_extremeties_tuple = filtered_sorted_extremeties_tuple

# We don't want to specify too many extremities as it causes the backfill
# request URI to be too long.
extremities = dict(sorted_extremeties_tuple[:5])

if current_depth > max_depth:
logger.debug(
"Not backfilling as we don't need to. %d < %d", max_depth, current_depth
)
return

# Now we need to decide which hosts to hit first.

# First we try hosts that are already in the room
Expand Down
8 changes: 4 additions & 4 deletions synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,9 +362,9 @@ async def get_messages(
# if we're going backwards, we might need to backfill. This
# requires that we have a topo token.
if room_token.topological:
max_topo = room_token.topological
curr_topo = room_token.topological
else:
max_topo = await self.store.get_max_topological_token(
curr_topo = await self.store.get_current_topological_token(
room_id, room_token.stream
)

Expand All @@ -380,11 +380,11 @@ async def get_messages(
leave_token = await self.store.get_topological_token_for_event(
member_event_id
)
if RoomStreamToken.parse(leave_token).topological < max_topo:
if RoomStreamToken.parse(leave_token).topological < curr_topo:
source_config.from_key = str(leave_token)

await self.hs.get_handlers().federation_handler.maybe_backfill(
room_id, max_topo
room_id, curr_topo, limit=source_config.limit,
)

events, next_key = await self.store.paginate_room_events(
Expand Down
13 changes: 5 additions & 8 deletions synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -648,23 +648,20 @@ async def get_topological_token_for_event(self, event_id: str) -> str:
)
return "t%d-%d" % (row["topological_ordering"], row["stream_ordering"])

async def get_max_topological_token(self, room_id: str, stream_key: int) -> int:
"""Get the max topological token in a room before the given stream
async def get_current_topological_token(self, room_id: str, stream_key: int) -> int:
"""Gets the topological token in a room after or at the given stream
ordering.
Args:
room_id
stream_key
Returns:
The maximum topological token.
"""
sql = (
"SELECT coalesce(max(topological_ordering), 0) FROM events"
" WHERE room_id = ? AND stream_ordering < ?"
"SELECT coalesce(MIN(topological_ordering), 0) FROM events"
" WHERE room_id = ? AND stream_ordering >= ?"
)
row = await self.db_pool.execute(
"get_max_topological_token", None, sql, room_id, stream_key
"get_current_topological_token", None, sql, room_id, stream_key
)
return row[0][0] if row else 0

Expand Down

0 comments on commit 43f2b67

Please sign in to comment.