Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Simplify cache invalidation after event persist txn #13796

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/13796.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Use shared methods for cache invalidation when persisting events, remove duplicate codepaths. Contributed by Nick @ Beeper (@fizzadar).
3 changes: 3 additions & 0 deletions synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ def _invalidate_state_caches(
self._attempt_to_invalidate_cache(
"get_user_in_room_with_profile", (room_id, user_id)
)
self._attempt_to_invalidate_cache(
"get_rooms_for_user_with_stream_ordering", (user_id,)
)

# Purge other caches based on room state.
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
Expand Down
34 changes: 21 additions & 13 deletions synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,15 +223,16 @@ def _invalidate_caches_for_event(
# process triggering the invalidation is responsible for clearing any external
# cached objects.
self._invalidate_local_get_event_cache(event_id)
self.have_seen_event.invalidate((room_id, event_id))

self.get_latest_event_ids_in_room.invalidate((room_id,))

self.get_unread_event_push_actions_by_room_for_user.invalidate((room_id,))
self._attempt_to_invalidate_cache("have_seen_event", (room_id, event_id))
self._attempt_to_invalidate_cache("get_latest_event_ids_in_room", (room_id,))
self._attempt_to_invalidate_cache(
"get_unread_event_push_actions_by_room_for_user", (room_id,)
)

# The `_get_membership_from_event_id` is immutable, except for the
# case where we look up an event *before* persisting it.
self._get_membership_from_event_id.invalidate((event_id,))
self._attempt_to_invalidate_cache("_get_membership_from_event_id", (event_id,))

if not backfilled:
self._events_stream_cache.entity_has_changed(room_id, stream_ordering)
Expand All @@ -240,19 +241,26 @@ def _invalidate_caches_for_event(
self._invalidate_local_get_event_cache(redacts)
# Caches which might leak edits must be invalidated for the event being
# redacted.
self.get_relations_for_event.invalidate((redacts,))
self.get_applicable_edit.invalidate((redacts,))
self._attempt_to_invalidate_cache("get_relations_for_event", (redacts,))
self._attempt_to_invalidate_cache("get_applicable_edit", (redacts,))

if etype == EventTypes.Member:
self._membership_stream_cache.entity_has_changed(state_key, stream_ordering)
self.get_invited_rooms_for_local_user.invalidate((state_key,))
self._attempt_to_invalidate_cache(
"get_invited_rooms_for_local_user", (state_key,)
)

if relates_to:
self.get_relations_for_event.invalidate((relates_to,))
self.get_aggregation_groups_for_event.invalidate((relates_to,))
self.get_applicable_edit.invalidate((relates_to,))
self.get_thread_summary.invalidate((relates_to,))
self.get_thread_participated.invalidate((relates_to,))
self._attempt_to_invalidate_cache("get_relations_for_event", (relates_to,))
self._attempt_to_invalidate_cache(
"get_aggregation_groups_for_event", (relates_to,)
)
self._attempt_to_invalidate_cache("get_applicable_edit", (relates_to,))
self._attempt_to_invalidate_cache("get_thread_summary", (relates_to,))
self._attempt_to_invalidate_cache("get_thread_participated", (relates_to,))
self._attempt_to_invalidate_cache(
"get_mutual_event_relations_for_rel_type", (relates_to,)
)
Comment on lines +254 to +263
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is significantly greedier than the old version which only invalidated caches based on the relation type. Not sure if this matters much, but we know ahead of time which of these might need to be invalidated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Also it is quite annoying that you can no longer jump to these definitions via PyCharm or anything since they're strings now. 😢 )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this function should take relation instead of relatest_to and it could evaluate the lost conditionals to reduce the invalidation amount?

(Also it is quite annoying that you can no longer jump to these definitions via PyCharm or anything since they're strings now. 😢 )

:( not ideal at all, nor do I have any ideas on how to solve this unfortunately! If all the DB classes were fully shared this could just assume each method existed and call them direct...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this function should take relation instead of relatest_to and it could evaluate the lost conditionals to reduce the invalidation amount?

Maybe, I'm not sure it is a huge deal or not, but definitely fits in the bucket of "code that doesn't need to run". _attempt_to_invalidate_cache sounds scary (and looking at it, it seems to use quite a few getattr calls, which aren't known for optimizations...)

(Also it is quite annoying that you can no longer jump to these definitions via PyCharm or anything since they're strings now. 😢 )

:( not ideal at all, nor do I have any ideas on how to solve this unfortunately! If all the DB classes were fully shared this could just assume each method existed and call them direct...

I think ideally it would be nice if we referred to everything by the store it is on, instead of via a god object. Like the changes we did with the config objects. I think this is doable, but a lot of work. 😢 See #11165

Anyway -- I don't think there's anything actionable to do here, just wanted to note my concerns down for posterity.


async def invalidate_cache_and_stream(
self, cache_name: str, keys: Tuple[Any, ...]
Expand Down
133 changes: 27 additions & 106 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from prometheus_client import Counter

import synapse.metrics
from synapse.api.constants import EventContentFields, EventTypes, RelationTypes
from synapse.api.constants import EventContentFields, EventTypes
from synapse.api.errors import Codes, SynapseError
from synapse.api.room_versions import RoomVersions
from synapse.events import EventBase, relation_from_event
Expand Down Expand Up @@ -410,6 +410,31 @@ def _persist_events_txn(
assert min_stream_order
assert max_stream_order

# Once the txn completes, invalidate all of the relevant caches. Note that we do this
# up here because it captures all the events_and_contexts before any are removed.
for event, _ in events_and_contexts:
self.store.invalidate_get_event_cache_after_txn(txn, event.event_id)
if event.redacts:
self.store.invalidate_get_event_cache_after_txn(txn, event.redacts)

relates_to = None
relation = relation_from_event(event)
if relation:
relates_to = relation.parent_id

assert event.internal_metadata.stream_ordering is not None
txn.call_after(
self.store._invalidate_caches_for_event,
event.internal_metadata.stream_ordering,
event.event_id,
event.room_id,
event.type,
getattr(event, "state_key", None),
event.redacts,
relates_to,
backfilled=False,
)
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

self._update_forward_extremities_txn(
txn,
new_forward_extremities=new_forward_extremities,
Expand Down Expand Up @@ -459,6 +484,7 @@ def _persist_events_txn(

# We call this last as it assumes we've inserted the events into
# room_memberships, where applicable.
# NB: This function invalidates all state related caches
self._update_current_state_txn(txn, state_delta_for_room, min_stream_order)

def _persist_event_auth_chain_txn(
Expand Down Expand Up @@ -1172,13 +1198,6 @@ def _update_current_state_txn(
)

# Invalidate the various caches

for member in members_changed:
txn.call_after(
self.store.get_rooms_for_user_with_stream_ordering.invalidate,
(member,),
)

self.store._invalidate_state_caches_and_stream(
txn, room_id, members_changed
)
Expand Down Expand Up @@ -1222,9 +1241,6 @@ def _update_forward_extremities_txn(
self.db_pool.simple_delete_txn(
txn, table="event_forward_extremities", keyvalues={"room_id": room_id}
)
txn.call_after(
self.store.get_latest_event_ids_in_room.invalidate, (room_id,)
)

self.db_pool.simple_insert_many_txn(
txn,
Expand Down Expand Up @@ -1294,8 +1310,6 @@ def _update_room_depths_txn(
"""
depth_updates: Dict[str, int] = {}
for event, context in events_and_contexts:
# Remove the any existing cache entries for the event_ids
self.store.invalidate_get_event_cache_after_txn(txn, event.event_id)
# Then update the `stream_ordering` position to mark the latest
# event as the front of the room. This should not be done for
# backfilled events because backfilled events have negative
Expand Down Expand Up @@ -1697,16 +1711,7 @@ async def prefill() -> None:
txn.async_call_after(prefill)

def _store_redaction(self, txn: LoggingTransaction, event: EventBase) -> None:
"""Invalidate the caches for the redacted event.

Note that these caches are also cleared as part of event replication in
_invalidate_caches_for_event.
"""
assert event.redacts is not None
self.store.invalidate_get_event_cache_after_txn(txn, event.redacts)
txn.call_after(self.store.get_relations_for_event.invalidate, (event.redacts,))
txn.call_after(self.store.get_applicable_edit.invalidate, (event.redacts,))

self.db_pool.simple_upsert_txn(
txn,
table="redactions",
Expand Down Expand Up @@ -1807,34 +1812,6 @@ def _store_room_members_txn(

for event in events:
assert event.internal_metadata.stream_ordering is not None
txn.call_after(
self.store._membership_stream_cache.entity_has_changed,
event.state_key,
event.internal_metadata.stream_ordering,
)
txn.call_after(
self.store.get_invited_rooms_for_local_user.invalidate,
(event.state_key,),
)
txn.call_after(
self.store.get_local_users_in_room.invalidate,
(event.room_id,),
)
txn.call_after(
self.store.get_number_joined_users_in_room.invalidate,
(event.room_id,),
)
txn.call_after(
self.store.get_user_in_room_with_profile.invalidate,
(event.room_id, event.state_key),
)

# The `_get_membership_from_event_id` is immutable, except for the
# case where we look up an event *before* persisting it.
txn.call_after(
self.store._get_membership_from_event_id.invalidate,
(event.event_id,),
)

# We update the local_current_membership table only if the event is
# "current", i.e., its something that has just happened.
Expand Down Expand Up @@ -1883,35 +1860,6 @@ def _handle_event_relations(
},
)

txn.call_after(
self.store.get_relations_for_event.invalidate, (relation.parent_id,)
)
txn.call_after(
self.store.get_aggregation_groups_for_event.invalidate,
(relation.parent_id,),
)
txn.call_after(
self.store.get_mutual_event_relations_for_rel_type.invalidate,
(relation.parent_id,),
)

if relation.rel_type == RelationTypes.REPLACE:
txn.call_after(
self.store.get_applicable_edit.invalidate, (relation.parent_id,)
)

if relation.rel_type == RelationTypes.THREAD:
txn.call_after(
self.store.get_thread_summary.invalidate, (relation.parent_id,)
)
# It should be safe to only invalidate the cache if the user has not
# previously participated in the thread, but that's difficult (and
# potentially error-prone) so it is always invalidated.
txn.call_after(
self.store.get_thread_participated.invalidate,
(relation.parent_id, event.sender),
)

def _handle_insertion_event(
self, txn: LoggingTransaction, event: EventBase
) -> None:
Expand Down Expand Up @@ -2213,28 +2161,6 @@ def _set_push_actions_for_event_and_users_txn(
),
)

room_to_event_ids: Dict[str, List[str]] = {}
for e in non_outlier_events:
room_to_event_ids.setdefault(e.room_id, []).append(e.event_id)

for room_id, event_ids in room_to_event_ids.items():
rows = self.db_pool.simple_select_many_txn(
txn,
table="event_push_actions_staging",
column="event_id",
iterable=event_ids,
keyvalues={},
retcols=("user_id",),
)

user_ids = {row["user_id"] for row in rows}

for user_id in user_ids:
txn.call_after(
self.store.get_unread_event_push_actions_by_room_for_user.invalidate,
(room_id, user_id),
)

# Now we delete the staging area for *all* events that were being
# persisted.
txn.execute_batch(
Expand All @@ -2249,11 +2175,6 @@ def _set_push_actions_for_event_and_users_txn(
def _remove_push_actions_for_event_id_txn(
self, txn: LoggingTransaction, room_id: str, event_id: str
) -> None:
# Sad that we have to blow away the cache for the whole room here
txn.call_after(
self.store.get_unread_event_push_actions_by_room_for_user.invalidate,
(room_id,),
)
Comment on lines -2252 to -2256
Copy link
Contributor

@MadLittleMods MadLittleMods Sep 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To check the review box that everything is accounted for, all of these removals are accounted for in _invalidate_caches_for_event and _invalidate_state_caches

_persist_events_txn takes care of calling _invalidate_caches_for_event and _update_current_state_txn -> _invalidate_state_caches_and_stream -> _invalidate_state_caches

And _invalidate_caches_for_event/_invalidate_state_caches are already called over replication (workers).

txn.execute(
"DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?",
(room_id, event_id),
Expand Down