Skip to content

Commit

Permalink
Enable caching and retrieval for section 2 of auth chain difference a…
Browse files Browse the repository at this point in the history
…lgo txn
  • Loading branch information
realtyem committed Oct 23, 2023
1 parent bac8e9b commit 20dac9c
Showing 1 changed file with 32 additions and 1 deletion.
33 changes: 32 additions & 1 deletion synapse/storage/databases/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,26 @@ def fetch_chain_info(events_to_fetch: Collection[str]) -> None:

chains[chain_id] = max(seq_no, chains.get(chain_id, 0))

# Check the links list cache for anything we don't have to pull out
seen_in_link_list_cache = set()
seen_targeted_chains = set()
for chain in seen_chains:
cache_entry = self._authchain_links_list.get(chain)
if cache_entry is not None:
for origin_sequence_number, target_info in cache_entry.items():
for target_chain_id, target_sequence_number in target_info:
for chains in set_to_chain:
# chains are only reachable if the origin sequence number of
# the link is less than the max sequence number in the
# origin chain.
if origin_sequence_number <= chains.get(chain, 0):
chains[target_chain_id] = max(
target_sequence_number,
chains.get(target_chain_id, 0),
)
seen_targeted_chains.add(target_chain_id)
seen_in_link_list_cache.add(chain)
difference_from_seen_from_cached = set(seen_chains).difference(seen_in_link_list_cache)
# Now we look up all links for the chains we have, adding chains to
# set_to_chain that are reachable from each set.
sql = """
Expand All @@ -685,7 +705,7 @@ def fetch_chain_info(events_to_fetch: Collection[str]) -> None:

# (We need to take a copy of `seen_chains` as we want to mutate it in
# the loop)
for batch2 in batch_iter(set(seen_chains), 1000):
for batch2 in batch_iter(set(difference_from_seen_from_cached), 1000):
clause, args = make_in_list_sql_clause(
txn.database_engine, "origin_chain_id", batch2
)
Expand All @@ -697,6 +717,16 @@ def fetch_chain_info(events_to_fetch: Collection[str]) -> None:
target_chain_id,
target_sequence_number,
) in txn:
# Cache everything, in case it is needed later
cache_entry = self._authchain_links_list.get(
origin_chain_id, update_last_access=False
)
if cache_entry is None:
cache_entry = {}
cache_entry.setdefault(origin_sequence_number, set()).add(
(target_chain_id, target_sequence_number)
)
self._authchain_links_list.set(origin_chain_id, cache_entry)
for chains in set_to_chain:
# chains are only reachable if the origin sequence number of
# the link is less than the max sequence number in the
Expand All @@ -708,6 +738,7 @@ def fetch_chain_info(events_to_fetch: Collection[str]) -> None:
)

seen_chains.add(target_chain_id)
seen_chains = seen_chains.union(seen_targeted_chains)

# Now for each chain we figure out the maximum sequence number reachable
# from *any* state set and the minimum sequence number reachable from
Expand Down

0 comments on commit 20dac9c

Please sign in to comment.