Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Regularly try to wake up dests instead of waiting for next PDU/EDU #15743

Merged
merged 9 commits into from
Jun 16, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/15743.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Regularly try to send transactions to other servers after they failed instead of waiting for a new event to be available before trying.
28 changes: 14 additions & 14 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@
from typing_extensions import Literal

from twisted.internet import defer
from twisted.internet.interfaces import IDelayedCall

import synapse.metrics
from synapse.api.presence import UserPresenceState
Expand Down Expand Up @@ -184,14 +183,18 @@
"Total number of PDUs queued for sending across all destinations",
)

# Time (in s) after Synapse's startup that we will begin to wake up destinations
# that have catch-up outstanding.
CATCH_UP_STARTUP_DELAY_SEC = 15
# Time (in s) to wait before trying to wake up destinations that have
# catch-up outstanding. This will also be the delay applied at startup
# before trying the same.
# Please note that rate limiting still applies, so while the loop is
# executed every X seconds the destinations may not be wake up because
# they are being rate limited following previous attempt failures.
WAKEUP_RETRY_PERIOD_SEC = 60
MatMaul marked this conversation as resolved.
Show resolved Hide resolved

# Time (in s) to wait in between waking up each destination, i.e. one destination
# will be woken up every <x> seconds after Synapse's startup until we have woken
# every destination has outstanding catch-up.
CATCH_UP_STARTUP_INTERVAL_SEC = 5
# will be woken up every <x> seconds until we have woken every destination
# has outstanding catch-up.
WAKEUP_INTERVAL_BETWEEN_DESTINATIONS_SEC = 5


class AbstractFederationSender(metaclass=abc.ABCMeta):
Expand Down Expand Up @@ -415,12 +418,10 @@ def __init__(self, hs: "HomeServer"):
/ hs.config.ratelimiting.federation_rr_transactions_per_room_per_second
)

# wake up destinations that have outstanding PDUs to be caught up
self._catchup_after_startup_timer: Optional[
IDelayedCall
] = self.clock.call_later(
CATCH_UP_STARTUP_DELAY_SEC,
# Regularly wake up destinations that have outstanding PDUs to be caught up
self.clock.looping_call(
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
run_as_background_process,
WAKEUP_RETRY_PERIOD_SEC * 1000.0,
"wake_destinations_needing_catchup",
self._wake_destinations_needing_catchup,
)
Expand Down Expand Up @@ -966,7 +967,6 @@ async def _wake_destinations_needing_catchup(self) -> None:

if not destinations_to_wake:
# finished waking all destinations!
self._catchup_after_startup_timer = None
break

last_processed = destinations_to_wake[-1]
Expand All @@ -983,4 +983,4 @@ async def _wake_destinations_needing_catchup(self) -> None:
last_processed,
)
self.wake_destination(destination)
await self.clock.sleep(CATCH_UP_STARTUP_INTERVAL_SEC)
await self.clock.sleep(WAKEUP_INTERVAL_BETWEEN_DESTINATIONS_SEC)
20 changes: 7 additions & 13 deletions tests/federation/test_federation_catch_up.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,28 +431,22 @@ def test_catch_up_on_synapse_startup(self) -> None:
# ACT: call _wake_destinations_needing_catchup

# patch wake_destination to just count the destinations instead
woken = []
woken = set()

def wake_destination_track(destination: str) -> None:
woken.append(destination)
woken.add(destination)

self.federation_sender.wake_destination = wake_destination_track # type: ignore[assignment]

# cancel the pre-existing timer for _wake_destinations_needing_catchup
# this is because we are calling it manually rather than waiting for it
# to be called automatically
assert self.federation_sender._catchup_after_startup_timer is not None
self.federation_sender._catchup_after_startup_timer.cancel()

self.get_success(
self.federation_sender._wake_destinations_needing_catchup(), by=5.0
)
self.pump(by=5.0)
MatMaul marked this conversation as resolved.
Show resolved Hide resolved

# ASSERT (_wake_destinations_needing_catchup):
# - all remotes are woken up, save for zzzerver
self.assertNotIn("zzzerver", woken)
# - all destinations are woken exactly once; they appear once in woken.
self.assertCountEqual(woken, server_names[:-1])
# - all destinations are woken, potentially more than once, since the
# wake up is called regularly and we don't ack in this test that a transaction
# has been successfully sent.
self.assertCountEqual(woken, set(server_names[:-1]))

def test_not_latest_event(self) -> None:
"""Test that we send the latest event in the room even if its not ours."""
Expand Down