Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Don't hammer the database for destination retry timings every ~5mins (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston authored May 21, 2021
1 parent e8ac9ac commit 3e831f2
Show file tree
Hide file tree
Showing 10 changed files with 62 additions and 76 deletions.
1 change: 1 addition & 0 deletions changelog.d/10036.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Properly invalidate caches for destination retry timings every (instead of expiring entries every 5 minutes).

This comment has been minimized.

Copy link
@daenney

daenney May 26, 2021

Contributor

I think this sentence is missing a word. "every ?"

2 changes: 0 additions & 2 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.rest.admin import register_servlets_for_media_repo
from synapse.rest.client.v1 import events, login, presence, room
from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
Expand Down Expand Up @@ -237,7 +236,6 @@ class GenericWorkerSlavedStore(
DirectoryStore,
SlavedApplicationServiceStore,
SlavedRegistrationStore,
SlavedTransactionStore,
SlavedProfileStore,
SlavedClientIpStore,
SlavedFilteringStore,
Expand Down
2 changes: 1 addition & 1 deletion synapse/federation/transport/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ async def authenticate_request(self, request, content):
# If we get a valid signed request from the other side, its probably
# alive
retry_timings = await self.store.get_destination_retry_timings(origin)
if retry_timings and retry_timings["retry_last_ts"]:
if retry_timings and retry_timings.retry_last_ts:
run_in_background(self._reset_retry_timings, origin)

return origin
Expand Down
21 changes: 0 additions & 21 deletions synapse/replication/slave/storage/transactions.py

This file was deleted.

4 changes: 2 additions & 2 deletions synapse/storage/databases/main/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
from .stats import StatsStore
from .stream import StreamStore
from .tags import TagsStore
from .transactions import TransactionStore
from .transactions import TransactionWorkerStore
from .ui_auth import UIAuthStore
from .user_directory import UserDirectoryStore
from .user_erasure_store import UserErasureStore
Expand All @@ -83,7 +83,7 @@ class DataStore(
StreamStore,
ProfileStore,
PresenceStore,
TransactionStore,
TransactionWorkerStore,
DirectoryStore,
KeyStore,
StateStore,
Expand Down
66 changes: 37 additions & 29 deletions synapse/storage/databases/main/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
from collections import namedtuple
from typing import Iterable, List, Optional, Tuple

import attr
from canonicaljson import encode_canonical_json

from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage._base import db_to_json
from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.types import JsonDict
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.caches.descriptors import cached

db_binary_type = memoryview

Expand All @@ -38,10 +40,23 @@
"_TransactionRow", ("response_code", "response_json")
)

SENTINEL = object()

@attr.s(slots=True, frozen=True, auto_attribs=True)
class DestinationRetryTimings:
"""The current destination retry timing info for a remote server."""

class TransactionWorkerStore(SQLBaseStore):
# The first time we tried and failed to reach the remote server, in ms.
failure_ts: int

# The last time we tried and failed to reach the remote server, in ms.
retry_last_ts: int

# How long since the last time we tried to reach the remote server before
# trying again, in ms.
retry_interval: int


class TransactionWorkerStore(CacheInvalidationWorkerStore):
def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs)

Expand All @@ -60,19 +75,6 @@ def _cleanup_transactions_txn(txn):
"_cleanup_transactions", _cleanup_transactions_txn
)


class TransactionStore(TransactionWorkerStore):
"""A collection of queries for handling PDUs."""

def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs)

self._destination_retry_cache = ExpiringCache(
cache_name="get_destination_retry_timings",
clock=self._clock,
expiry_ms=5 * 60 * 1000,
)

async def get_received_txn_response(
self, transaction_id: str, origin: str
) -> Optional[Tuple[int, JsonDict]]:
Expand Down Expand Up @@ -145,7 +147,11 @@ async def set_received_txn_response(
desc="set_received_txn_response",
)

async def get_destination_retry_timings(self, destination):
@cached(max_entries=10000)
async def get_destination_retry_timings(
self,
destination: str,
) -> Optional[DestinationRetryTimings]:
"""Gets the current retry timings (if any) for a given destination.
Args:
Expand All @@ -156,34 +162,29 @@ async def get_destination_retry_timings(self, destination):
Otherwise a dict for the retry scheme
"""

result = self._destination_retry_cache.get(destination, SENTINEL)
if result is not SENTINEL:
return result

result = await self.db_pool.runInteraction(
"get_destination_retry_timings",
self._get_destination_retry_timings,
destination,
)

# We don't hugely care about race conditions between getting and
# invalidating the cache, since we time out fairly quickly anyway.
self._destination_retry_cache[destination] = result
return result

def _get_destination_retry_timings(self, txn, destination):
def _get_destination_retry_timings(
self, txn, destination: str
) -> Optional[DestinationRetryTimings]:
result = self.db_pool.simple_select_one_txn(
txn,
table="destinations",
keyvalues={"destination": destination},
retcols=("destination", "failure_ts", "retry_last_ts", "retry_interval"),
retcols=("failure_ts", "retry_last_ts", "retry_interval"),
allow_none=True,
)

# check we have a row and retry_last_ts is not null or zero
# (retry_last_ts can't be negative)
if result and result["retry_last_ts"]:
return result
return DestinationRetryTimings(**result)
else:
return None

Expand All @@ -204,7 +205,6 @@ async def set_destination_retry_timings(
retry_interval: how long until next retry in ms
"""

self._destination_retry_cache.pop(destination, None)
if self.database_engine.can_native_upsert:
return await self.db_pool.runInteraction(
"set_destination_retry_timings",
Expand Down Expand Up @@ -252,6 +252,10 @@ def _set_destination_retry_timings_native(

txn.execute(sql, (destination, failure_ts, retry_last_ts, retry_interval))

self._invalidate_cache_and_stream(
txn, self.get_destination_retry_timings, (destination,)
)

def _set_destination_retry_timings_emulated(
self, txn, destination, failure_ts, retry_last_ts, retry_interval
):
Expand Down Expand Up @@ -295,6 +299,10 @@ def _set_destination_retry_timings_emulated(
},
)

self._invalidate_cache_and_stream(
txn, self.get_destination_retry_timings, (destination,)
)

async def store_destination_rooms_entries(
self,
destinations: Iterable[str],
Expand Down
8 changes: 3 additions & 5 deletions synapse/util/retryutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,9 @@ async def get_retry_limiter(destination, clock, store, ignore_backoff=False, **k
retry_timings = await store.get_destination_retry_timings(destination)

if retry_timings:
failure_ts = retry_timings["failure_ts"]
retry_last_ts, retry_interval = (
retry_timings["retry_last_ts"],
retry_timings["retry_interval"],
)
failure_ts = retry_timings.failure_ts
retry_last_ts = retry_timings.retry_last_ts
retry_interval = retry_timings.retry_interval

now = int(clock.time_msec())

Expand Down
8 changes: 1 addition & 7 deletions tests/handlers/test_typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,8 @@ def prepare(self, reactor, clock, hs):
self.event_source = hs.get_event_sources().sources["typing"]

self.datastore = hs.get_datastore()
retry_timings_res = {
"destination": "",
"retry_last_ts": 0,
"retry_interval": 0,
"failure_ts": None,
}
self.datastore.get_destination_retry_timings = Mock(
return_value=defer.succeed(retry_timings_res)
return_value=defer.succeed(None)
)

self.datastore.get_device_updates_by_remote = Mock(
Expand Down
8 changes: 6 additions & 2 deletions tests/storage/test_transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from synapse.storage.databases.main.transactions import DestinationRetryTimings
from synapse.util.retryutils import MAX_RETRY_INTERVAL

from tests.unittest import HomeserverTestCase
Expand All @@ -36,8 +37,11 @@ def test_get_set_transactions(self):
d = self.store.get_destination_retry_timings("example.com")
r = self.get_success(d)

self.assert_dict(
{"retry_last_ts": 50, "retry_interval": 100, "failure_ts": 1000}, r
self.assertEqual(
DestinationRetryTimings(
retry_last_ts=50, retry_interval=100, failure_ts=1000
),
r,
)

def test_initial_set_transactions(self):
Expand Down
18 changes: 11 additions & 7 deletions tests/util/test_retryutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,12 @@ def test_limiter(self):
except AssertionError:
pass

self.pump()

new_timings = self.get_success(store.get_destination_retry_timings("test_dest"))
self.assertEqual(new_timings["failure_ts"], failure_ts)
self.assertEqual(new_timings["retry_last_ts"], failure_ts)
self.assertEqual(new_timings["retry_interval"], MIN_RETRY_INTERVAL)
self.assertEqual(new_timings.failure_ts, failure_ts)
self.assertEqual(new_timings.retry_last_ts, failure_ts)
self.assertEqual(new_timings.retry_interval, MIN_RETRY_INTERVAL)

# now if we try again we should get a failure
self.get_failure(
Expand All @@ -77,14 +79,16 @@ def test_limiter(self):
except AssertionError:
pass

self.pump()

new_timings = self.get_success(store.get_destination_retry_timings("test_dest"))
self.assertEqual(new_timings["failure_ts"], failure_ts)
self.assertEqual(new_timings["retry_last_ts"], retry_ts)
self.assertEqual(new_timings.failure_ts, failure_ts)
self.assertEqual(new_timings.retry_last_ts, retry_ts)
self.assertGreaterEqual(
new_timings["retry_interval"], MIN_RETRY_INTERVAL * RETRY_MULTIPLIER * 0.5
new_timings.retry_interval, MIN_RETRY_INTERVAL * RETRY_MULTIPLIER * 0.5
)
self.assertLessEqual(
new_timings["retry_interval"], MIN_RETRY_INTERVAL * RETRY_MULTIPLIER * 2.0
new_timings.retry_interval, MIN_RETRY_INTERVAL * RETRY_MULTIPLIER * 2.0
)

#
Expand Down

0 comments on commit 3e831f2

Please sign in to comment.