From 9a04f94ee782d9baa372774e6a37ffa01275016f Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Thu, 4 Aug 2022 15:49:55 +0100 Subject: [PATCH] Optimise async get event lookups (#13435) Still maintains local in memory lookup optimisation, but does any external lookup as part of the deferred that prevents duplicate lookups for the same event at once. This makes the assumption that fetching from an external cache is a non-zero load operation. --- changelog.d/13435.misc | 1 + .../storage/databases/main/events_worker.py | 75 +++++++++++++++++-- synapse/storage/databases/main/roommember.py | 2 +- synapse/util/caches/lrucache.py | 17 +++++ 4 files changed, 87 insertions(+), 8 deletions(-) create mode 100644 changelog.d/13435.misc diff --git a/changelog.d/13435.misc b/changelog.d/13435.misc new file mode 100644 index 000000000000..c01b9136c809 --- /dev/null +++ b/changelog.d/13435.misc @@ -0,0 +1 @@ +Prevent unnecessary lookups to any external `get_event` cache. Contributed by Nick @ Beeper (@fizzadar). diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 98979d07c6cb..af601b1c5bb4 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -615,7 +615,11 @@ async def _get_events_from_cache_or_db( Returns: map from event id to result """ - event_entry_map = await self._get_events_from_cache( + # Shortcut: check if we have any events in the *in memory* cache - this function + # may be called repeatedly for the same event so at this point we cannot reach + # out to any external cache for performance reasons. The external cache is + # checked later on in the `get_missing_events_from_cache_or_db` function below. + event_entry_map = self._get_events_from_local_cache( event_ids, ) @@ -647,7 +651,9 @@ async def _get_events_from_cache_or_db( if missing_events_ids: - async def get_missing_events_from_db() -> Dict[str, EventCacheEntry]: + async def get_missing_events_from_cache_or_db() -> Dict[ + str, EventCacheEntry + ]: """Fetches the events in `missing_event_ids` from the database. Also creates entries in `self._current_event_fetches` to allow @@ -672,10 +678,18 @@ async def get_missing_events_from_db() -> Dict[str, EventCacheEntry]: # the events have been redacted, and if so pulling the redaction event # out of the database to check it. # + missing_events = {} try: - missing_events = await self._get_events_from_db( + # Try to fetch from any external cache. We already checked the + # in-memory cache above. + missing_events = await self._get_events_from_external_cache( missing_events_ids, ) + # Now actually fetch any remaining events from the DB + db_missing_events = await self._get_events_from_db( + missing_events_ids - missing_events.keys(), + ) + missing_events.update(db_missing_events) except Exception as e: with PreserveLoggingContext(): fetching_deferred.errback(e) @@ -694,7 +708,7 @@ async def get_missing_events_from_db() -> Dict[str, EventCacheEntry]: # cancellations, since multiple `_get_events_from_cache_or_db` calls can # reuse the same fetch. missing_events: Dict[str, EventCacheEntry] = await delay_cancellation( - get_missing_events_from_db() + get_missing_events_from_cache_or_db() ) event_entry_map.update(missing_events) @@ -769,7 +783,54 @@ def _invalidate_local_get_event_cache(self, event_id: str) -> None: async def _get_events_from_cache( self, events: Iterable[str], update_metrics: bool = True ) -> Dict[str, EventCacheEntry]: - """Fetch events from the caches. + """Fetch events from the caches, both in memory and any external. + + May return rejected events. + + Args: + events: list of event_ids to fetch + update_metrics: Whether to update the cache hit ratio metrics + """ + event_map = self._get_events_from_local_cache( + events, update_metrics=update_metrics + ) + + missing_event_ids = (e for e in events if e not in event_map) + event_map.update( + await self._get_events_from_external_cache( + events=missing_event_ids, + update_metrics=update_metrics, + ) + ) + + return event_map + + async def _get_events_from_external_cache( + self, events: Iterable[str], update_metrics: bool = True + ) -> Dict[str, EventCacheEntry]: + """Fetch events from any configured external cache. + + May return rejected events. + + Args: + events: list of event_ids to fetch + update_metrics: Whether to update the cache hit ratio metrics + """ + event_map = {} + + for event_id in events: + ret = await self._get_event_cache.get_external( + (event_id,), None, update_metrics=update_metrics + ) + if ret: + event_map[event_id] = ret + + return event_map + + def _get_events_from_local_cache( + self, events: Iterable[str], update_metrics: bool = True + ) -> Dict[str, EventCacheEntry]: + """Fetch events from the local, in memory, caches. May return rejected events. @@ -781,7 +842,7 @@ async def _get_events_from_cache( for event_id in events: # First check if it's in the event cache - ret = await self._get_event_cache.get( + ret = self._get_event_cache.get_local( (event_id,), None, update_metrics=update_metrics ) if ret: @@ -803,7 +864,7 @@ async def _get_events_from_cache( # We add the entry back into the cache as we want to keep # recently queried events in the cache. - await self._get_event_cache.set((event_id,), cache_entry) + self._get_event_cache.set_local((event_id,), cache_entry) return event_map diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 38f4c388d28f..15d9eb8073a4 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -920,7 +920,7 @@ async def _get_joined_users_from_context( # We don't update the event cache hit ratio as it completely throws off # the hit ratio counts. After all, we don't populate the cache if we # miss it here - event_map = await self._get_events_from_cache( + event_map = self._get_events_from_local_cache( member_event_ids, update_metrics=False ) diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py index b3bdedb04cad..aa93109d1380 100644 --- a/synapse/util/caches/lrucache.py +++ b/synapse/util/caches/lrucache.py @@ -834,9 +834,26 @@ async def get( ) -> Optional[VT]: return self._lru_cache.get(key, update_metrics=update_metrics) + async def get_external( + self, + key: KT, + default: Optional[T] = None, + update_metrics: bool = True, + ) -> Optional[VT]: + # This method should fetch from any configured external cache, in this case noop. + return None + + def get_local( + self, key: KT, default: Optional[T] = None, update_metrics: bool = True + ) -> Optional[VT]: + return self._lru_cache.get(key, update_metrics=update_metrics) + async def set(self, key: KT, value: VT) -> None: self._lru_cache.set(key, value) + def set_local(self, key: KT, value: VT) -> None: + self._lru_cache.set(key, value) + async def invalidate(self, key: KT) -> None: # This method should invalidate any external cache and then invalidate the LruCache. return self._lru_cache.invalidate(key)