Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Convert user directory handler and related classes to async/await #7640

Merged
merged 7 commits into from
Jun 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7640.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Convert user directory, state deltas, and stats handlers to async/await.
6 changes: 4 additions & 2 deletions synapse/handlers/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,10 @@ def register_user(

if self.hs.config.user_directory_search_all_users:
profile = yield self.store.get_profileinfo(localpart)
yield self.user_directory_handler.handle_local_profile_change(
user_id, profile
yield defer.ensureDeferred(
self.user_directory_handler.handle_local_profile_change(
user_id, profile
)
)

else:
Expand Down
9 changes: 3 additions & 6 deletions synapse/handlers/state_deltas.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,14 @@

import logging

from twisted.internet import defer

logger = logging.getLogger(__name__)


class StateDeltasHandler(object):
def __init__(self, hs):
self.store = hs.get_datastore()

@defer.inlineCallbacks
def _get_key_change(self, prev_event_id, event_id, key_name, public_value):
async def _get_key_change(self, prev_event_id, event_id, key_name, public_value):
"""Given two events check if the `key_name` field in content changed
from not matching `public_value` to doing so.

Expand All @@ -41,10 +38,10 @@ def _get_key_change(self, prev_event_id, event_id, key_name, public_value):
prev_event = None
event = None
if prev_event_id:
prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
prev_event = await self.store.get_event(prev_event_id, allow_none=True)

if event_id:
event = yield self.store.get_event(event_id, allow_none=True)
event = await self.store.get_event(event_id, allow_none=True)

if not event and not prev_event:
logger.debug("Neither event exists: %r %r", prev_event_id, event_id)
Expand Down
47 changes: 20 additions & 27 deletions synapse/handlers/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,14 @@
import logging
from collections import Counter

from twisted.internet import defer

from synapse.api.constants import EventTypes, Membership
from synapse.handlers.state_deltas import StateDeltasHandler
from synapse.metrics import event_processing_positions
from synapse.metrics.background_process_metrics import run_as_background_process

logger = logging.getLogger(__name__)


class StatsHandler(StateDeltasHandler):
class StatsHandler:
"""Handles keeping the *_stats tables updated with a simple time-series of
information about the users, rooms and media on the server, such that admins
have some idea of who is consuming their resources.
Expand All @@ -35,7 +32,6 @@ class StatsHandler(StateDeltasHandler):
"""

def __init__(self, hs):
super(StatsHandler, self).__init__(hs)
self.hs = hs
self.store = hs.get_datastore()
self.state = hs.get_state_handler()
Expand Down Expand Up @@ -68,20 +64,18 @@ def notify_new_event(self):

self._is_processing = True

@defer.inlineCallbacks
def process():
async def process():
try:
yield self._unsafe_process()
await self._unsafe_process()
finally:
self._is_processing = False

run_as_background_process("stats.notify_new_event", process)

@defer.inlineCallbacks
def _unsafe_process(self):
async def _unsafe_process(self):
# If self.pos is None then means we haven't fetched it from DB
if self.pos is None:
self.pos = yield self.store.get_stats_positions()
self.pos = await self.store.get_stats_positions()

# Loop round handling deltas until we're up to date

Expand All @@ -96,13 +90,13 @@ def _unsafe_process(self):
logger.debug(
"Processing room stats %s->%s", self.pos, room_max_stream_ordering
)
max_pos, deltas = yield self.store.get_current_state_deltas(
max_pos, deltas = await self.store.get_current_state_deltas(
self.pos, room_max_stream_ordering
)

if deltas:
logger.debug("Handling %d state deltas", len(deltas))
room_deltas, user_deltas = yield self._handle_deltas(deltas)
room_deltas, user_deltas = await self._handle_deltas(deltas)
else:
room_deltas = {}
user_deltas = {}
Expand All @@ -111,7 +105,7 @@ def _unsafe_process(self):
(
room_count,
user_count,
) = yield self.store.get_changes_room_total_events_and_bytes(
) = await self.store.get_changes_room_total_events_and_bytes(
self.pos, max_pos
)

Expand All @@ -125,7 +119,7 @@ def _unsafe_process(self):
logger.debug("user_deltas: %s", user_deltas)

# Always call this so that we update the stats position.
yield self.store.bulk_update_stats_delta(
await self.store.bulk_update_stats_delta(
self.clock.time_msec(),
updates={"room": room_deltas, "user": user_deltas},
stream_id=max_pos,
Expand All @@ -137,13 +131,12 @@ def _unsafe_process(self):

self.pos = max_pos

@defer.inlineCallbacks
def _handle_deltas(self, deltas):
async def _handle_deltas(self, deltas):
"""Called with the state deltas to process

Returns:
Deferred[tuple[dict[str, Counter], dict[str, counter]]]
Resovles to two dicts, the room deltas and the user deltas,
tuple[dict[str, Counter], dict[str, counter]]
Two dicts: the room deltas and the user deltas,
mapping from room/user ID to changes in the various fields.
"""

Expand All @@ -162,7 +155,7 @@ def _handle_deltas(self, deltas):

logger.debug("Handling: %r, %r %r, %s", room_id, typ, state_key, event_id)

token = yield self.store.get_earliest_token_for_stats("room", room_id)
token = await self.store.get_earliest_token_for_stats("room", room_id)

# If the earliest token to begin from is larger than our current
# stream ID, skip processing this delta.
Expand All @@ -184,7 +177,7 @@ def _handle_deltas(self, deltas):

sender = None
if event_id is not None:
event = yield self.store.get_event(event_id, allow_none=True)
event = await self.store.get_event(event_id, allow_none=True)
if event:
event_content = event.content or {}
sender = event.sender
Expand All @@ -200,16 +193,16 @@ def _handle_deltas(self, deltas):
room_stats_delta["current_state_events"] += 1

if typ == EventTypes.Member:
# we could use _get_key_change here but it's a bit inefficient
# given we're not testing for a specific result; might as well
# just grab the prev_membership and membership strings and
# compare them.
# we could use StateDeltasHandler._get_key_change here but it's
# a bit inefficient given we're not testing for a specific
# result; might as well just grab the prev_membership and
# membership strings and compare them.
# We take None rather than leave as a previous membership
# in the absence of a previous event because we do not want to
# reduce the leave count when a new-to-the-room user joins.
prev_membership = None
if prev_event_id is not None:
prev_event = yield self.store.get_event(
prev_event = await self.store.get_event(
prev_event_id, allow_none=True
)
if prev_event:
Expand Down Expand Up @@ -301,6 +294,6 @@ def _handle_deltas(self, deltas):

for room_id, state in room_to_state_updates.items():
logger.debug("Updating room_stats_state for %s: %s", room_id, state)
yield self.store.update_room_state(room_id, state)
await self.store.update_room_state(room_id, state)

return room_to_stats_deltas, user_to_stats_deltas
Loading