From f77130a2d0dba58a9208fcdf2df53cac427efc01 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 4 Jun 2020 14:32:17 -0400 Subject: [PATCH 1/7] Convert the user directory handler to async/await. --- synapse/handlers/register.py | 6 +- synapse/handlers/user_directory.py | 110 ++++++++++++-------------- tests/handlers/test_user_directory.py | 8 +- 3 files changed, 58 insertions(+), 66 deletions(-) diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 55a03e53ead4..e282683b2507 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -202,8 +202,10 @@ def register_user( if self.hs.config.user_directory_search_all_users: profile = yield self.store.get_profileinfo(localpart) - yield self.user_directory_handler.handle_local_profile_change( - user_id, profile + yield defer.ensureDeferred( + self.user_directory_handler.handle_local_profile_change( + user_id, profile + ) ) else: diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 722760c59d9f..8bc583c3d099 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -17,8 +17,6 @@ from six import iteritems, iterkeys -from twisted.internet import defer - import synapse.metrics from synapse.api.constants import EventTypes, JoinRules, Membership from synapse.handlers.state_deltas import StateDeltasHandler @@ -103,43 +101,39 @@ def notify_new_event(self): if self._is_processing: return - @defer.inlineCallbacks - def process(): + async def process(): try: - yield self._unsafe_process() + await self._unsafe_process() finally: self._is_processing = False self._is_processing = True run_as_background_process("user_directory.notify_new_event", process) - @defer.inlineCallbacks - def handle_local_profile_change(self, user_id, profile): + async def handle_local_profile_change(self, user_id, profile): """Called to update index of our local user profiles when they change irrespective of any rooms the user may be in. """ # FIXME(#3714): We should probably do this in the same worker as all # the other changes. - is_support = yield self.store.is_support_user(user_id) + is_support = await self.store.is_support_user(user_id) # Support users are for diagnostics and should not appear in the user directory. if not is_support: - yield self.store.update_profile_in_user_dir( + await self.store.update_profile_in_user_dir( user_id, profile.display_name, profile.avatar_url ) - @defer.inlineCallbacks - def handle_user_deactivated(self, user_id): + async def handle_user_deactivated(self, user_id): """Called when a user ID is deactivated """ # FIXME(#3714): We should probably do this in the same worker as all # the other changes. - yield self.store.remove_from_user_dir(user_id) + await self.store.remove_from_user_dir(user_id) - @defer.inlineCallbacks - def _unsafe_process(self): + async def _unsafe_process(self): # If self.pos is None then means we haven't fetched it from DB if self.pos is None: - self.pos = yield self.store.get_user_directory_stream_pos() + self.pos = await self.store.get_user_directory_stream_pos() # If still None then the initial background update hasn't happened yet if self.pos is None: @@ -155,12 +149,12 @@ def _unsafe_process(self): logger.debug( "Processing user stats %s->%s", self.pos, room_max_stream_ordering ) - max_pos, deltas = yield self.store.get_current_state_deltas( + max_pos, deltas = await self.store.get_current_state_deltas( self.pos, room_max_stream_ordering ) logger.debug("Handling %d state deltas", len(deltas)) - yield self._handle_deltas(deltas) + await self._handle_deltas(deltas) self.pos = max_pos @@ -169,10 +163,9 @@ def _unsafe_process(self): max_pos ) - yield self.store.update_user_directory_stream_pos(max_pos) + await self.store.update_user_directory_stream_pos(max_pos) - @defer.inlineCallbacks - def _handle_deltas(self, deltas): + async def _handle_deltas(self, deltas): """Called with the state deltas to process """ for delta in deltas: @@ -187,11 +180,11 @@ def _handle_deltas(self, deltas): # For join rule and visibility changes we need to check if the room # may have become public or not and add/remove the users in said room if typ in (EventTypes.RoomHistoryVisibility, EventTypes.JoinRules): - yield self._handle_room_publicity_change( + await self._handle_room_publicity_change( room_id, prev_event_id, event_id, typ ) elif typ == EventTypes.Member: - change = yield self._get_key_change( + change = await self._get_key_change( prev_event_id, event_id, key_name="membership", @@ -201,7 +194,7 @@ def _handle_deltas(self, deltas): if change is False: # Need to check if the server left the room entirely, if so # we might need to remove all the users in that room - is_in_room = yield self.store.is_host_joined( + is_in_room = await self.store.is_host_joined( room_id, self.server_name ) if not is_in_room: @@ -209,40 +202,41 @@ def _handle_deltas(self, deltas): # Fetch all the users that we marked as being in user # directory due to being in the room and then check if # need to remove those users or not - user_ids = yield self.store.get_users_in_dir_due_to_room( + user_ids = await self.store.get_users_in_dir_due_to_room( room_id ) for user_id in user_ids: - yield self._handle_remove_user(room_id, user_id) + await self._handle_remove_user(room_id, user_id) return else: logger.debug("Server is still in room: %r", room_id) - is_support = yield self.store.is_support_user(state_key) + is_support = await self.store.is_support_user(state_key) if not is_support: if change is None: # Handle any profile changes - yield self._handle_profile_change( + await self._handle_profile_change( state_key, room_id, prev_event_id, event_id ) continue if change: # The user joined - event = yield self.store.get_event(event_id, allow_none=True) + event = await self.store.get_event(event_id, allow_none=True) profile = ProfileInfo( avatar_url=event.content.get("avatar_url"), display_name=event.content.get("displayname"), ) - yield self._handle_new_user(room_id, state_key, profile) + await self._handle_new_user(room_id, state_key, profile) else: # The user left - yield self._handle_remove_user(room_id, state_key) + await self._handle_remove_user(room_id, state_key) else: logger.debug("Ignoring irrelevant type: %r", typ) - @defer.inlineCallbacks - def _handle_room_publicity_change(self, room_id, prev_event_id, event_id, typ): + async def _handle_room_publicity_change( + self, room_id, prev_event_id, event_id, typ + ): """Handle a room having potentially changed from/to world_readable/publically joinable. @@ -255,14 +249,14 @@ def _handle_room_publicity_change(self, room_id, prev_event_id, event_id, typ): logger.debug("Handling change for %s: %s", typ, room_id) if typ == EventTypes.RoomHistoryVisibility: - change = yield self._get_key_change( + change = await self._get_key_change( prev_event_id, event_id, key_name="history_visibility", public_value="world_readable", ) elif typ == EventTypes.JoinRules: - change = yield self._get_key_change( + change = await self._get_key_change( prev_event_id, event_id, key_name="join_rule", @@ -278,7 +272,7 @@ def _handle_room_publicity_change(self, room_id, prev_event_id, event_id, typ): # There's been a change to or from being world readable. - is_public = yield self.store.is_room_world_readable_or_publicly_joinable( + is_public = await self.store.is_room_world_readable_or_publicly_joinable( room_id ) @@ -293,11 +287,11 @@ def _handle_room_publicity_change(self, room_id, prev_event_id, event_id, typ): # ignore the change return - users_with_profile = yield self.state.get_current_users_in_room(room_id) + users_with_profile = await self.state.get_current_users_in_room(room_id) # Remove every user from the sharing tables for that room. for user_id in iterkeys(users_with_profile): - yield self.store.remove_user_who_share_room(user_id, room_id) + await self.store.remove_user_who_share_room(user_id, room_id) # Then, re-add them to the tables. # NOTE: this is not the most efficient method, as handle_new_user sets @@ -306,26 +300,24 @@ def _handle_room_publicity_change(self, room_id, prev_event_id, event_id, typ): # being added multiple times. The batching upserts shouldn't make this # too bad, though. for user_id, profile in iteritems(users_with_profile): - yield self._handle_new_user(room_id, user_id, profile) + await self._handle_new_user(room_id, user_id, profile) - @defer.inlineCallbacks - def _handle_local_user(self, user_id): + async def _handle_local_user(self, user_id): """Adds a new local roomless user into the user_directory_search table. Used to populate up the user index when we have an user_directory_search_all_users specified. """ logger.debug("Adding new local user to dir, %r", user_id) - profile = yield self.store.get_profileinfo(get_localpart_from_id(user_id)) + profile = await self.store.get_profileinfo(get_localpart_from_id(user_id)) - row = yield self.store.get_user_in_directory(user_id) + row = await self.store.get_user_in_directory(user_id) if not row: - yield self.store.update_profile_in_user_dir( + await self.store.update_profile_in_user_dir( user_id, profile.display_name, profile.avatar_url ) - @defer.inlineCallbacks - def _handle_new_user(self, room_id, user_id, profile): + async def _handle_new_user(self, room_id, user_id, profile): """Called when we might need to add user to directory Args: @@ -334,18 +326,18 @@ def _handle_new_user(self, room_id, user_id, profile): """ logger.debug("Adding new user to dir, %r", user_id) - yield self.store.update_profile_in_user_dir( + await self.store.update_profile_in_user_dir( user_id, profile.display_name, profile.avatar_url ) - is_public = yield self.store.is_room_world_readable_or_publicly_joinable( + is_public = await self.store.is_room_world_readable_or_publicly_joinable( room_id ) # Now we update users who share rooms with users. - users_with_profile = yield self.state.get_current_users_in_room(room_id) + users_with_profile = await self.state.get_current_users_in_room(room_id) if is_public: - yield self.store.add_users_in_public_rooms(room_id, (user_id,)) + await self.store.add_users_in_public_rooms(room_id, (user_id,)) else: to_insert = set() @@ -376,10 +368,9 @@ def _handle_new_user(self, room_id, user_id, profile): to_insert.add((other_user_id, user_id)) if to_insert: - yield self.store.add_users_who_share_private_room(room_id, to_insert) + await self.store.add_users_who_share_private_room(room_id, to_insert) - @defer.inlineCallbacks - def _handle_remove_user(self, room_id, user_id): + async def _handle_remove_user(self, room_id, user_id): """Called when we might need to remove user from directory Args: @@ -389,24 +380,23 @@ def _handle_remove_user(self, room_id, user_id): logger.debug("Removing user %r", user_id) # Remove user from sharing tables - yield self.store.remove_user_who_share_room(user_id, room_id) + await self.store.remove_user_who_share_room(user_id, room_id) # Are they still in any rooms? If not, remove them entirely. - rooms_user_is_in = yield self.store.get_user_dir_rooms_user_is_in(user_id) + rooms_user_is_in = await self.store.get_user_dir_rooms_user_is_in(user_id) if len(rooms_user_is_in) == 0: - yield self.store.remove_from_user_dir(user_id) + await self.store.remove_from_user_dir(user_id) - @defer.inlineCallbacks - def _handle_profile_change(self, user_id, room_id, prev_event_id, event_id): + async def _handle_profile_change(self, user_id, room_id, prev_event_id, event_id): """Check member event changes for any profile changes and update the database if there are. """ if not prev_event_id or not event_id: return - prev_event = yield self.store.get_event(prev_event_id, allow_none=True) - event = yield self.store.get_event(event_id, allow_none=True) + prev_event = await self.store.get_event(prev_event_id, allow_none=True) + event = await self.store.get_event(event_id, allow_none=True) if not prev_event or not event: return @@ -421,4 +411,4 @@ def _handle_profile_change(self, user_id, room_id, prev_event_id, event_id): new_avatar = event.content.get("avatar_url") if prev_name != new_name or prev_avatar != new_avatar: - yield self.store.update_profile_in_user_dir(user_id, new_name, new_avatar) + await self.store.update_profile_in_user_dir(user_id, new_name, new_avatar) diff --git a/tests/handlers/test_user_directory.py b/tests/handlers/test_user_directory.py index 572df8d80bf2..c15bce5bef77 100644 --- a/tests/handlers/test_user_directory.py +++ b/tests/handlers/test_user_directory.py @@ -14,6 +14,8 @@ # limitations under the License. from mock import Mock +from twisted.internet import defer + import synapse.rest.admin from synapse.api.constants import UserTypes from synapse.rest.client.v1 import login, room @@ -75,18 +77,16 @@ def test_handle_user_deactivated_support_user(self): ) ) - self.store.remove_from_user_dir = Mock() - self.store.remove_from_user_in_public_room = Mock() + self.store.remove_from_user_dir = Mock(return_value=defer.succeed(None)) self.get_success(self.handler.handle_user_deactivated(s_user_id)) self.store.remove_from_user_dir.not_called() - self.store.remove_from_user_in_public_room.not_called() def test_handle_user_deactivated_regular_user(self): r_user_id = "@regular:test" self.get_success( self.store.register_user(user_id=r_user_id, password_hash=None) ) - self.store.remove_from_user_dir = Mock() + self.store.remove_from_user_dir = Mock(return_value=defer.succeed(None)) self.get_success(self.handler.handle_user_deactivated(r_user_id)) self.store.remove_from_user_dir.called_once_with(r_user_id) From 7c3ed583a7e31cf1d4f92799d20230b6375505b5 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 4 Jun 2020 14:35:30 -0400 Subject: [PATCH 2/7] Make state deltas handler async/await. --- synapse/handlers/state_deltas.py | 9 +++------ synapse/handlers/stats.py | 12 +++++------- 2 files changed, 8 insertions(+), 13 deletions(-) diff --git a/synapse/handlers/state_deltas.py b/synapse/handlers/state_deltas.py index f065970c401a..8590c1eff428 100644 --- a/synapse/handlers/state_deltas.py +++ b/synapse/handlers/state_deltas.py @@ -15,8 +15,6 @@ import logging -from twisted.internet import defer - logger = logging.getLogger(__name__) @@ -24,8 +22,7 @@ class StateDeltasHandler(object): def __init__(self, hs): self.store = hs.get_datastore() - @defer.inlineCallbacks - def _get_key_change(self, prev_event_id, event_id, key_name, public_value): + async def _get_key_change(self, prev_event_id, event_id, key_name, public_value): """Given two events check if the `key_name` field in content changed from not matching `public_value` to doing so. @@ -41,10 +38,10 @@ def _get_key_change(self, prev_event_id, event_id, key_name, public_value): prev_event = None event = None if prev_event_id: - prev_event = yield self.store.get_event(prev_event_id, allow_none=True) + prev_event = await self.store.get_event(prev_event_id, allow_none=True) if event_id: - event = yield self.store.get_event(event_id, allow_none=True) + event = await self.store.get_event(event_id, allow_none=True) if not event and not prev_event: logger.debug("Neither event exists: %r %r", prev_event_id, event_id) diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index d93a2766930a..0000eaab50b8 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -19,14 +19,13 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership -from synapse.handlers.state_deltas import StateDeltasHandler from synapse.metrics import event_processing_positions from synapse.metrics.background_process_metrics import run_as_background_process logger = logging.getLogger(__name__) -class StatsHandler(StateDeltasHandler): +class StatsHandler: """Handles keeping the *_stats tables updated with a simple time-series of information about the users, rooms and media on the server, such that admins have some idea of who is consuming their resources. @@ -35,7 +34,6 @@ class StatsHandler(StateDeltasHandler): """ def __init__(self, hs): - super(StatsHandler, self).__init__(hs) self.hs = hs self.store = hs.get_datastore() self.state = hs.get_state_handler() @@ -200,10 +198,10 @@ def _handle_deltas(self, deltas): room_stats_delta["current_state_events"] += 1 if typ == EventTypes.Member: - # we could use _get_key_change here but it's a bit inefficient - # given we're not testing for a specific result; might as well - # just grab the prev_membership and membership strings and - # compare them. + # we could use StateDeltasHandler._get_key_change here but it's + # a bit inefficient given we're not testing for a specific + # result; might as well just grab the prev_membership and + # membership strings and compare them. # We take None rather than leave as a previous membership # in the absence of a previous event because we do not want to # reduce the leave count when a new-to-the-room user joins. From 52c3a8625e33d63a4ee8a5732d6083593c447887 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 4 Jun 2020 14:40:18 -0400 Subject: [PATCH 3/7] Change the stats handler to be async/await. --- synapse/handlers/stats.py | 35 +++++++++++++++-------------------- 1 file changed, 15 insertions(+), 20 deletions(-) diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index 0000eaab50b8..fef09668bc25 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -16,8 +16,6 @@ import logging from collections import Counter -from twisted.internet import defer - from synapse.api.constants import EventTypes, Membership from synapse.metrics import event_processing_positions from synapse.metrics.background_process_metrics import run_as_background_process @@ -66,20 +64,18 @@ def notify_new_event(self): self._is_processing = True - @defer.inlineCallbacks - def process(): + async def process(): try: - yield self._unsafe_process() + await self._unsafe_process() finally: self._is_processing = False run_as_background_process("stats.notify_new_event", process) - @defer.inlineCallbacks - def _unsafe_process(self): + async def _unsafe_process(self): # If self.pos is None then means we haven't fetched it from DB if self.pos is None: - self.pos = yield self.store.get_stats_positions() + self.pos = await self.store.get_stats_positions() # Loop round handling deltas until we're up to date @@ -94,13 +90,13 @@ def _unsafe_process(self): logger.debug( "Processing room stats %s->%s", self.pos, room_max_stream_ordering ) - max_pos, deltas = yield self.store.get_current_state_deltas( + max_pos, deltas = await self.store.get_current_state_deltas( self.pos, room_max_stream_ordering ) if deltas: logger.debug("Handling %d state deltas", len(deltas)) - room_deltas, user_deltas = yield self._handle_deltas(deltas) + room_deltas, user_deltas = await self._handle_deltas(deltas) else: room_deltas = {} user_deltas = {} @@ -109,7 +105,7 @@ def _unsafe_process(self): ( room_count, user_count, - ) = yield self.store.get_changes_room_total_events_and_bytes( + ) = await self.store.get_changes_room_total_events_and_bytes( self.pos, max_pos ) @@ -123,7 +119,7 @@ def _unsafe_process(self): logger.debug("user_deltas: %s", user_deltas) # Always call this so that we update the stats position. - yield self.store.bulk_update_stats_delta( + await self.store.bulk_update_stats_delta( self.clock.time_msec(), updates={"room": room_deltas, "user": user_deltas}, stream_id=max_pos, @@ -135,13 +131,12 @@ def _unsafe_process(self): self.pos = max_pos - @defer.inlineCallbacks - def _handle_deltas(self, deltas): + async def _handle_deltas(self, deltas): """Called with the state deltas to process Returns: - Deferred[tuple[dict[str, Counter], dict[str, counter]]] - Resovles to two dicts, the room deltas and the user deltas, + tuple[dict[str, Counter], dict[str, counter]] + Resolves to two dicts, the room deltas and the user deltas, mapping from room/user ID to changes in the various fields. """ @@ -160,7 +155,7 @@ def _handle_deltas(self, deltas): logger.debug("Handling: %r, %r %r, %s", room_id, typ, state_key, event_id) - token = yield self.store.get_earliest_token_for_stats("room", room_id) + token = await self.store.get_earliest_token_for_stats("room", room_id) # If the earliest token to begin from is larger than our current # stream ID, skip processing this delta. @@ -182,7 +177,7 @@ def _handle_deltas(self, deltas): sender = None if event_id is not None: - event = yield self.store.get_event(event_id, allow_none=True) + event = await self.store.get_event(event_id, allow_none=True) if event: event_content = event.content or {} sender = event.sender @@ -207,7 +202,7 @@ def _handle_deltas(self, deltas): # reduce the leave count when a new-to-the-room user joins. prev_membership = None if prev_event_id is not None: - prev_event = yield self.store.get_event( + prev_event = await self.store.get_event( prev_event_id, allow_none=True ) if prev_event: @@ -299,6 +294,6 @@ def _handle_deltas(self, deltas): for room_id, state in room_to_state_updates.items(): logger.debug("Updating room_stats_state for %s: %s", room_id, state) - yield self.store.update_room_state(room_id, state) + await self.store.update_room_state(room_id, state) return room_to_stats_deltas, user_to_stats_deltas From 3bd2b310b35e19c4f443b347bfe191947d945f73 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 4 Jun 2020 14:57:14 -0400 Subject: [PATCH 4/7] Add a newsfragement. --- changelog.d/7640.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/7640.misc diff --git a/changelog.d/7640.misc b/changelog.d/7640.misc new file mode 100644 index 000000000000..55edc1c78151 --- /dev/null +++ b/changelog.d/7640.misc @@ -0,0 +1 @@ +Convert user directory, state deltas, and stats handlers to async/await. From 20ba27315642db1119ed9f6d70f367b36040f1c0 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 5 Jun 2020 12:30:32 -0400 Subject: [PATCH 5/7] Clarify comment from review. Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- synapse/handlers/stats.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index fef09668bc25..149f861239da 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -136,7 +136,7 @@ async def _handle_deltas(self, deltas): Returns: tuple[dict[str, Counter], dict[str, counter]] - Resolves to two dicts, the room deltas and the user deltas, + Two dicts: the room deltas and the user deltas, mapping from room/user ID to changes in the various fields. """ From 6dcf7ec7aeab63ec41f200dd40d9d59a3111a6b1 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 5 Jun 2020 12:36:41 -0400 Subject: [PATCH 6/7] Remove dead code. --- synapse/handlers/user_directory.py | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 8bc583c3d099..7ea511f6de93 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -302,21 +302,6 @@ async def _handle_room_publicity_change( for user_id, profile in iteritems(users_with_profile): await self._handle_new_user(room_id, user_id, profile) - async def _handle_local_user(self, user_id): - """Adds a new local roomless user into the user_directory_search table. - Used to populate up the user index when we have an - user_directory_search_all_users specified. - """ - logger.debug("Adding new local user to dir, %r", user_id) - - profile = await self.store.get_profileinfo(get_localpart_from_id(user_id)) - - row = await self.store.get_user_in_directory(user_id) - if not row: - await self.store.update_profile_in_user_dir( - user_id, profile.display_name, profile.avatar_url - ) - async def _handle_new_user(self, room_id, user_id, profile): """Called when we might need to add user to directory From bba3924177d72f47fb391dbdfd9833f30d034f47 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 5 Jun 2020 13:52:13 -0400 Subject: [PATCH 7/7] Remove unused import. --- synapse/handlers/user_directory.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 7ea511f6de93..12423b909ace 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -22,7 +22,6 @@ from synapse.handlers.state_deltas import StateDeltasHandler from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.roommember import ProfileInfo -from synapse.types import get_localpart_from_id from synapse.util.metrics import Measure logger = logging.getLogger(__name__)