Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Convert presence handler helpers to async/await. #7939

Merged
merged 6 commits into from
Jul 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7939.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Convert presence handler helpers to async/await.
4 changes: 3 additions & 1 deletion synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,9 @@ def _process_presence_inner(self, states: List[UserPresenceState]):
"""Given a list of states populate self.pending_presence_by_dest and
poke to send a new transaction to each destination
"""
hosts_and_states = yield get_interested_remotes(self.store, states, self.state)
hosts_and_states = yield defer.ensureDeferred(
get_interested_remotes(self.store, states, self.state)
)

for destinations, states in hosts_and_states:
for destination in destinations:
Expand Down
43 changes: 20 additions & 23 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@
from prometheus_client import Counter
from typing_extensions import ContextManager

from twisted.internet import defer

import synapse.metrics
from synapse.api.constants import EventTypes, Membership, PresenceState
from synapse.api.errors import SynapseError
from synapse.logging.context import run_in_background
from synapse.logging.utils import log_function
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.state import StateHandler
from synapse.storage.data_stores.main import DataStore
from synapse.storage.presence import UserPresenceState
from synapse.types import JsonDict, UserID, get_domain_from_id
from synapse.util.async_helpers import Linearizer
Expand Down Expand Up @@ -895,16 +895,9 @@ async def _handle_state_delta(self, deltas):

await self._on_user_joined_room(room_id, state_key)

async def _on_user_joined_room(self, room_id, user_id):
async def _on_user_joined_room(self, room_id: str, user_id: str) -> None:
"""Called when we detect a user joining the room via the current state
delta stream.

Args:
room_id (str)
user_id (str)

Returns:
Deferred
"""

if self.is_mine_id(user_id):
Expand Down Expand Up @@ -1296,22 +1289,24 @@ def handle_update(prev_state, new_state, is_mine, wheel_timer, now):
return new_state, persist_and_notify, federation_ping


@defer.inlineCallbacks
def get_interested_parties(store, states):
async def get_interested_parties(
store: DataStore, states: List[UserPresenceState]
) -> Tuple[Dict[str, List[UserPresenceState]], Dict[str, List[UserPresenceState]]]:
"""Given a list of states return which entities (rooms, users)
are interested in the given states.

Args:
states (list(UserPresenceState))
store
states

Returns:
2-tuple: `(room_ids_to_states, users_to_states)`,
A 2-tuple of `(room_ids_to_states, users_to_states)`,
with each item being a dict of `entity_name` -> `[UserPresenceState]`
"""
room_ids_to_states = {} # type: Dict[str, List[UserPresenceState]]
users_to_states = {} # type: Dict[str, List[UserPresenceState]]
for state in states:
room_ids = yield store.get_rooms_for_user(state.user_id)
room_ids = await store.get_rooms_for_user(state.user_id)
for room_id in room_ids:
room_ids_to_states.setdefault(room_id, []).append(state)

Expand All @@ -1321,31 +1316,33 @@ def get_interested_parties(store, states):
return room_ids_to_states, users_to_states


@defer.inlineCallbacks
def get_interested_remotes(store, states, state_handler):
async def get_interested_remotes(
store: DataStore, states: List[UserPresenceState], state_handler: StateHandler
) -> List[Tuple[List[str], List[UserPresenceState]]]:
"""Given a list of presence states figure out which remote servers
should be sent which.

All the presence states should be for local users only.

Args:
store (DataStore)
states (list(UserPresenceState))
store
states
state_handler

Returns:
Deferred list of ([destinations], [UserPresenceState]), where for
each row the list of UserPresenceState should be sent to each
A list of 2-tuples of destinations and states, where for
each tuple the list of UserPresenceState should be sent to each
destination
"""
hosts_and_states = []

# First we look up the rooms each user is in (as well as any explicit
# subscriptions), then for each distinct room we look up the remote
# hosts in those rooms.
room_ids_to_states, users_to_states = yield get_interested_parties(store, states)
room_ids_to_states, users_to_states = await get_interested_parties(store, states)

for room_id, states in room_ids_to_states.items():
hosts = yield state_handler.get_current_hosts_in_room(room_id)
hosts = await state_handler.get_current_hosts_in_room(room_id)
hosts_and_states.append((hosts, states))

for user_id, states in users_to_states.items():
Expand Down