Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Properly handle unknown results for the stream change cache.
Browse files Browse the repository at this point in the history
  • Loading branch information
clokep committed Dec 1, 2022
1 parent 854a688 commit 195bc0b
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 16 deletions.
4 changes: 2 additions & 2 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -1764,14 +1764,14 @@ async def _filter_all_presence_updates_for_user(
Returns:
A list of presence states for the given user to receive.
"""
updated_users = None
if from_key:
# Only return updates since the last sync
updated_users = self.store.presence_stream_cache.get_all_entities_changed(
from_key
)
if not updated_users:
updated_users = []

if updated_users is not None:
# Get the actual presence update for each change
users_to_state = await self.get_presence_handler().current_state_for_users(
updated_users
Expand Down
33 changes: 19 additions & 14 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -842,12 +842,11 @@ async def get_users_whose_devices_changed(
user_ids, from_key
)

if not user_ids_to_check:
# If an empty set was returned, there's nothing to do.
if user_ids_to_check is not None and not user_ids_to_check:
return set()

def _get_users_whose_devices_changed_txn(txn: LoggingTransaction) -> Set[str]:
changes: Set[str] = set()

stream_id_where_clause = "stream_id > ?"
sql_args = [from_key]

Expand All @@ -858,19 +857,25 @@ def _get_users_whose_devices_changed_txn(txn: LoggingTransaction) -> Set[str]:
sql = f"""
SELECT DISTINCT user_id FROM device_lists_stream
WHERE {stream_id_where_clause}
AND
"""

# Query device changes with a batch of users at a time
# Assertion for mypy's benefit; see also
# https://mypy.readthedocs.io/en/stable/common_issues.html#narrowing-and-inner-functions
assert user_ids_to_check is not None
for chunk in batch_iter(user_ids_to_check, 100):
clause, args = make_in_list_sql_clause(
txn.database_engine, "user_id", chunk
)
txn.execute(sql + clause, sql_args + args)
changes.update(user_id for user_id, in txn)
# If the stream change cache gave us no information, fetch *all*
# users between the stream IDs.
if user_ids_to_check is None:
txn.execute(sql, sql_args)
return {user_id for user_id, in txn}

# Otherwise, fetch changes for the given users.
else:
changes: Set[str] = set()

# Query device changes with a batch of users at a time
for chunk in batch_iter(user_ids_to_check, 100):
clause, args = make_in_list_sql_clause(
txn.database_engine, "user_id", chunk
)
txn.execute(sql + " AND " + clause, sql_args + args)
changes.update(user_id for user_id, in txn)

return changes

Expand Down

0 comments on commit 195bc0b

Please sign in to comment.