From dd1581c983b88f4320e032d7abdfa773dfa935c7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 24 May 2023 13:36:17 +0100 Subject: [PATCH 1/6] Speed up selecting users Adding an `ORDER BY` clause forces the use of an index, which is much faster than a sequential scan. I *think* that's because we keep deleting rows from the table, and so a sequential scan has to skip over many deleted rows. --- synapse/storage/databases/main/user_directory.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py index b7d58978de2b..c946d2297489 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py @@ -357,7 +357,7 @@ async def _populate_user_directory_process_users( """ def _get_next_batch(txn: LoggingTransaction) -> Optional[List[str]]: - sql = "SELECT user_id FROM %s LIMIT %s" % ( + sql = "SELECT user_id FROM %s ORDER BY user_id LIMIT %s" % ( TEMP_TABLE + "_users", str(batch_size), ) From fc43d01ecf65104992e21067e988cdf799d96d18 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 24 May 2023 13:45:21 +0100 Subject: [PATCH 2/6] Add a batched version of `should_include_local_user_in_dir` --- .../storage/databases/main/user_directory.py | 37 ++++++++++++++++--- 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py index c946d2297489..956f99f7d09c 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py @@ -17,6 +17,7 @@ import unicodedata from typing import ( TYPE_CHECKING, + Collection, Iterable, List, Mapping, @@ -45,7 +46,7 @@ if TYPE_CHECKING: from synapse.server import HomeServer -from synapse.api.constants import EventTypes, HistoryVisibility, JoinRules +from synapse.api.constants import EventTypes, HistoryVisibility, JoinRules, UserTypes from synapse.storage.database import ( DatabasePool, LoggingDatabaseConnection, @@ -397,11 +398,11 @@ def _get_next_batch(txn: LoggingTransaction) -> Optional[List[str]]: ) # First filter down to users we want to insert into the user directory. - users_to_insert = [ - user_id - for user_id in users_to_work_on - if await self.should_include_local_user_in_dir(user_id) - ] + users_to_insert = await self.db_pool.runInteraction( + "populate_user_directory_process_users_filter", + self._filter_local_users_for_dir_txn, + users_to_work_on, + ) # Next fetch their profiles. Note that the `user_id` here is the # *localpart*, and that not all users have profiles. @@ -494,6 +495,30 @@ async def should_include_local_user_in_dir(self, user: str) -> bool: return True + def _filter_local_users_for_dir_txn( + self, txn: LoggingTransaction, users: Collection[str] + ) -> Collection[str]: + """A batched version of `should_include_local_user_in_dir`""" + users = [ + user + for user in users + if self.get_app_service_by_user_id(user) is None # type: ignore[attr-defined] + and not self.get_if_app_services_interested_in_user(user) # type: ignore[attr-defined] + ] + + rows = self.db_pool.simple_select_many_txn( + txn, + table="users", + column="name", + iterable=users, + keyvalues={ + "deactivated": 0, + }, + retcols=("name", "user_type"), + ) + + return [row["name"] for row in rows if row["user_type"] != UserTypes.SUPPORT] + async def is_room_world_readable_or_publicly_joinable(self, room_id: str) -> bool: """Check if the room is either world_readable or publically joinable""" From 79eb03a5a2735fe3204a0889ef7580efdd111bf5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 24 May 2023 13:49:43 +0100 Subject: [PATCH 3/6] Do everything in one transaction --- .../storage/databases/main/user_directory.py | 132 +++++++++--------- 1 file changed, 64 insertions(+), 68 deletions(-) diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py index 956f99f7d09c..e7557590c58b 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py @@ -357,7 +357,9 @@ async def _populate_user_directory_process_users( Add all local users to the user directory. """ - def _get_next_batch(txn: LoggingTransaction) -> Optional[List[str]]: + def _populate_user_directory_process_users_txn( + txn: LoggingTransaction, + ) -> Optional[int]: sql = "SELECT user_id FROM %s ORDER BY user_id LIMIT %s" % ( TEMP_TABLE + "_users", str(batch_size), @@ -379,85 +381,79 @@ def _get_next_batch(txn: LoggingTransaction) -> Optional[List[str]]: assert count_result is not None progress["remaining"] = count_result[0] - return users_to_work_on + if not users_to_work_on: + return None - users_to_work_on = await self.db_pool.runInteraction( - "populate_user_directory_temp_read", _get_next_batch - ) + logger.debug( + "Processing the next %d users of %d remaining", + len(users_to_work_on), + progress["remaining"], + ) - # No more users -- complete the transaction. - if not users_to_work_on: - await self.db_pool.updates._end_background_update( - "populate_user_directory_process_users" + # First filter down to users we want to insert into the user directory. + users_to_insert = self._filter_local_users_for_dir_txn( + txn, users_to_work_on ) - return 1 - logger.debug( - "Processing the next %d users of %d remaining" - % (len(users_to_work_on), progress["remaining"]) - ) + # Next fetch their profiles. Note that the `user_id` here is the + # *localpart*, and that not all users have profiles. + profile_rows = self.db_pool.simple_select_many_txn( + txn, + table="profiles", + column="user_id", + iterable=[get_localpart_from_id(u) for u in users_to_insert], + retcols=( + "user_id", + "displayname", + "avatar_url", + ), + keyvalues={}, + ) + profiles = { + f"@{row['user_id']}:{self.server_name}": _UserDirProfile( + f"@{row['user_id']}:{self.server_name}", + row["displayname"], + row["avatar_url"], + ) + for row in profile_rows + } - # First filter down to users we want to insert into the user directory. - users_to_insert = await self.db_pool.runInteraction( - "populate_user_directory_process_users_filter", - self._filter_local_users_for_dir_txn, - users_to_work_on, - ) + profiles_to_insert = [ + profiles.get(user_id) or _UserDirProfile(user_id) + for user_id in users_to_insert + ] - # Next fetch their profiles. Note that the `user_id` here is the - # *localpart*, and that not all users have profiles. - profile_rows = await self.db_pool.simple_select_many_batch( - table="profiles", - column="user_id", - iterable=[get_localpart_from_id(u) for u in users_to_insert], - retcols=( - "user_id", - "displayname", - "avatar_url", - ), - keyvalues={}, - desc="populate_user_directory_process_users_get_profiles", - ) - profiles = { - f"@{row['user_id']}:{self.server_name}": _UserDirProfile( - f"@{row['user_id']}:{self.server_name}", - row["displayname"], - row["avatar_url"], - ) - for row in profile_rows - } + # Actually insert the users with their profiles into the directory. + self._update_profiles_in_user_dir_txn(txn, profiles_to_insert) - profiles_to_insert = [ - profiles.get(user_id) or _UserDirProfile(user_id) - for user_id in users_to_insert - ] + # We've finished processing the users. Delete it from the table. + self.db_pool.simple_delete_many_txn( + txn, + table=TEMP_TABLE + "_users", + column="user_id", + values=users_to_work_on, + keyvalues={}, + ) - # Actually insert the users with their profiles into the directory. - await self.db_pool.runInteraction( - "populate_user_directory_process_users_insertion", - self._update_profiles_in_user_dir_txn, - profiles_to_insert, - ) + # Update the remaining counter. + progress["remaining"] -= len(users_to_work_on) + self.db_pool.updates._background_update_progress_txn( + txn, "populate_user_directory_process_users", progress + ) + return len(users_to_work_on) - # We've finished processing the users. Delete it from the table. - await self.db_pool.simple_delete_many( - table=TEMP_TABLE + "_users", - column="user_id", - iterable=users_to_work_on, - keyvalues={}, - desc="populate_user_directory_process_users_delete", + processed_count = await self.db_pool.runInteraction( + "populate_user_directory_temp", _populate_user_directory_process_users_txn ) - # Update the remaining counter. - progress["remaining"] -= len(users_to_work_on) - await self.db_pool.runInteraction( - "populate_user_directory", - self.db_pool.updates._background_update_progress_txn, - "populate_user_directory_process_users", - progress, - ) + # No more users -- complete the transaction. + if not processed_count: + await self.db_pool.updates._end_background_update( + "populate_user_directory_process_users" + ) + return 1 - return len(users_to_work_on) + return processed_count async def should_include_local_user_in_dir(self, user: str) -> bool: """Certain classes of local user are omitted from the user directory. From 57d6886b68e5fda08fd0fe22032e9b4ba7f94eaa Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 24 May 2023 13:57:48 +0100 Subject: [PATCH 4/6] Use DELETE.. RETURNING --- .../storage/databases/main/user_directory.py | 41 ++++++++++++------- 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py index e7557590c58b..3dfba11a9269 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py @@ -360,12 +360,23 @@ async def _populate_user_directory_process_users( def _populate_user_directory_process_users_txn( txn: LoggingTransaction, ) -> Optional[int]: - sql = "SELECT user_id FROM %s ORDER BY user_id LIMIT %s" % ( - TEMP_TABLE + "_users", - str(batch_size), - ) - txn.execute(sql) - user_result = cast(List[Tuple[str]], txn.fetchall()) + if self.database_engine.supports_returning: + sql = f""" + DELETE FROM {TEMP_TABLE + "_users"} + WHERE user_id IN ( + SELECT user_id FROM {TEMP_TABLE + "_users"} ORDER BY user_id LIMIT ? + ) + RETURNING user_id + """ + txn.execute(sql, (batch_size,)) + user_result = cast(List[Tuple[str]], txn.fetchall()) + else: + sql = "SELECT user_id FROM %s ORDER BY user_id LIMIT %s" % ( + TEMP_TABLE + "_users", + str(batch_size), + ) + txn.execute(sql) + user_result = cast(List[Tuple[str]], txn.fetchall()) if not user_result: return None @@ -426,14 +437,16 @@ def _populate_user_directory_process_users_txn( # Actually insert the users with their profiles into the directory. self._update_profiles_in_user_dir_txn(txn, profiles_to_insert) - # We've finished processing the users. Delete it from the table. - self.db_pool.simple_delete_many_txn( - txn, - table=TEMP_TABLE + "_users", - column="user_id", - values=users_to_work_on, - keyvalues={}, - ) + # We've finished processing the users. Delete it from the table, if + # we haven't already. + if not self.database_engine.supports_returning: + self.db_pool.simple_delete_many_txn( + txn, + table=TEMP_TABLE + "_users", + column="user_id", + values=users_to_work_on, + keyvalues={}, + ) # Update the remaining counter. progress["remaining"] -= len(users_to_work_on) From 3aa31d0ad435c5f5ff4b3aabdcff2798a7652ddb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 24 May 2023 14:01:44 +0100 Subject: [PATCH 5/6] Newsfile --- changelog.d/15665.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/15665.misc diff --git a/changelog.d/15665.misc b/changelog.d/15665.misc new file mode 100644 index 000000000000..7ad424d8dfd0 --- /dev/null +++ b/changelog.d/15665.misc @@ -0,0 +1 @@ +Speed up rebuilding of the user directory for local users. From 01e5c09d6b6fa0e0a20b467bf80535c830268971 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 24 May 2023 14:33:56 +0100 Subject: [PATCH 6/6] Add comment --- synapse/storage/databases/main/user_directory.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py index 3dfba11a9269..a0319575f071 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py @@ -361,6 +361,10 @@ def _populate_user_directory_process_users_txn( txn: LoggingTransaction, ) -> Optional[int]: if self.database_engine.supports_returning: + # Note: we use an ORDER BY in the SELECT to force usage of an + # index. Otherwise, postgres does a sequential scan that is + # surprisingly slow (I think due to the fact it will read/skip + # over lots of already deleted rows). sql = f""" DELETE FROM {TEMP_TABLE + "_users"} WHERE user_id IN (