Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge commit '7f837959e' into anoa/dinsic_release_1_21_x
Browse files Browse the repository at this point in the history
* commit '7f837959e':
  Convert directory, e2e_room_keys, end_to_end_keys, monthly_active_users database to async (#8042)
  Convert additional database stores to async/await (#8045)
  • Loading branch information
anoadragon453 committed Oct 19, 2020
2 parents d84510c + 7f83795 commit c5aaa80
Show file tree
Hide file tree
Showing 16 changed files with 248 additions and 272 deletions.
1 change: 1 addition & 0 deletions changelog.d/8042.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Convert various parts of the codebase to async/await.
1 change: 1 addition & 0 deletions changelog.d/8045.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Convert various parts of the codebase to async/await.
54 changes: 24 additions & 30 deletions synapse/storage/databases/main/client_ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@
# limitations under the License.

import logging

from twisted.internet import defer
from typing import Dict, Optional, Tuple

from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore
Expand Down Expand Up @@ -82,21 +81,19 @@ def __init__(self, database: DatabasePool, db_conn, hs):
"devices_last_seen", self._devices_last_seen_update
)

@defer.inlineCallbacks
def _remove_user_ip_nonunique(self, progress, batch_size):
async def _remove_user_ip_nonunique(self, progress, batch_size):
def f(conn):
txn = conn.cursor()
txn.execute("DROP INDEX IF EXISTS user_ips_user_ip")
txn.close()

yield self.db_pool.runWithConnection(f)
yield self.db_pool.updates._end_background_update(
await self.db_pool.runWithConnection(f)
await self.db_pool.updates._end_background_update(
"user_ips_drop_nonunique_index"
)
return 1

@defer.inlineCallbacks
def _analyze_user_ip(self, progress, batch_size):
async def _analyze_user_ip(self, progress, batch_size):
# Background update to analyze user_ips table before we run the
# deduplication background update. The table may not have been analyzed
# for ages due to the table locks.
Expand All @@ -106,14 +103,13 @@ def _analyze_user_ip(self, progress, batch_size):
def user_ips_analyze(txn):
txn.execute("ANALYZE user_ips")

yield self.db_pool.runInteraction("user_ips_analyze", user_ips_analyze)
await self.db_pool.runInteraction("user_ips_analyze", user_ips_analyze)

yield self.db_pool.updates._end_background_update("user_ips_analyze")
await self.db_pool.updates._end_background_update("user_ips_analyze")

return 1

@defer.inlineCallbacks
def _remove_user_ip_dupes(self, progress, batch_size):
async def _remove_user_ip_dupes(self, progress, batch_size):
# This works function works by scanning the user_ips table in batches
# based on `last_seen`. For each row in a batch it searches the rest of
# the table to see if there are any duplicates, if there are then they
Expand All @@ -140,7 +136,7 @@ def get_last_seen(txn):
return None

# Get a last seen that has roughly `batch_size` since `begin_last_seen`
end_last_seen = yield self.db_pool.runInteraction(
end_last_seen = await self.db_pool.runInteraction(
"user_ips_dups_get_last_seen", get_last_seen
)

Expand Down Expand Up @@ -275,15 +271,14 @@ def remove(txn):
txn, "user_ips_remove_dupes", {"last_seen": end_last_seen}
)

yield self.db_pool.runInteraction("user_ips_dups_remove", remove)
await self.db_pool.runInteraction("user_ips_dups_remove", remove)

if last:
yield self.db_pool.updates._end_background_update("user_ips_remove_dupes")
await self.db_pool.updates._end_background_update("user_ips_remove_dupes")

return batch_size

@defer.inlineCallbacks
def _devices_last_seen_update(self, progress, batch_size):
async def _devices_last_seen_update(self, progress, batch_size):
"""Background update to insert last seen info into devices table
"""

Expand Down Expand Up @@ -346,12 +341,12 @@ def _devices_last_seen_update_txn(txn):

return len(rows)

updated = yield self.db_pool.runInteraction(
updated = await self.db_pool.runInteraction(
"_devices_last_seen_update", _devices_last_seen_update_txn
)

if not updated:
yield self.db_pool.updates._end_background_update("devices_last_seen")
await self.db_pool.updates._end_background_update("devices_last_seen")

return updated

Expand Down Expand Up @@ -460,25 +455,25 @@ def _update_client_ips_batch_txn(self, txn, to_update):
# Failed to upsert, log and continue
logger.error("Failed to insert client IP %r: %r", entry, e)

@defer.inlineCallbacks
def get_last_client_ip_by_device(self, user_id, device_id):
async def get_last_client_ip_by_device(
self, user_id: str, device_id: Optional[str]
) -> Dict[Tuple[str, str], dict]:
"""For each device_id listed, give the user_ip it was last seen on
Args:
user_id (str)
device_id (str): If None fetches all devices for the user
user_id: The user to fetch devices for.
device_id: If None fetches all devices for the user
Returns:
defer.Deferred: resolves to a dict, where the keys
are (user_id, device_id) tuples. The values are also dicts, with
keys giving the column names
A dictionary mapping a tuple of (user_id, device_id) to dicts, with
keys giving the column names from the devices table.
"""

keyvalues = {"user_id": user_id}
if device_id is not None:
keyvalues["device_id"] = device_id

res = yield self.db_pool.simple_select_list(
res = await self.db_pool.simple_select_list(
table="devices",
keyvalues=keyvalues,
retcols=("user_id", "ip", "user_agent", "device_id", "last_seen"),
Expand All @@ -500,8 +495,7 @@ def get_last_client_ip_by_device(self, user_id, device_id):
}
return ret

@defer.inlineCallbacks
def get_user_ip_and_agents(self, user):
async def get_user_ip_and_agents(self, user):
user_id = user.to_string()
results = {}

Expand All @@ -511,7 +505,7 @@ def get_user_ip_and_agents(self, user):
user_agent, _, last_seen = self._batch_row_update[key]
results[(access_token, ip)] = (user_agent, last_seen)

rows = yield self.db_pool.simple_select_list(
rows = await self.db_pool.simple_select_list(
table="user_ips",
keyvalues={"user_id": user_id},
retcols=["access_token", "ip", "user_agent", "last_seen"],
Expand Down
12 changes: 7 additions & 5 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ def get_device_updates_by_remote(self, destination, from_stream_id, limit):
master_key_by_user = {}
self_signing_key_by_user = {}
for user in users:
cross_signing_key = yield self.get_e2e_cross_signing_key(user, "master")
cross_signing_key = yield defer.ensureDeferred(
self.get_e2e_cross_signing_key(user, "master")
)
if cross_signing_key:
key_id, verify_key = get_verify_key_from_cross_signing_key(
cross_signing_key
Expand All @@ -149,8 +151,8 @@ def get_device_updates_by_remote(self, destination, from_stream_id, limit):
"device_id": verify_key.version,
}

cross_signing_key = yield self.get_e2e_cross_signing_key(
user, "self_signing"
cross_signing_key = yield defer.ensureDeferred(
self.get_e2e_cross_signing_key(user, "self_signing")
)
if cross_signing_key:
key_id, verify_key = get_verify_key_from_cross_signing_key(
Expand Down Expand Up @@ -246,7 +248,7 @@ def _get_device_update_edus_by_remote(self, destination, from_stream_id, query_m
destination (str): The host the device updates are intended for
from_stream_id (int): The minimum stream_id to filter updates by, exclusive
query_map (Dict[(str, str): (int, str|None)]): Dictionary mapping
user_id/device_id to update stream_id and the relevent json-encoded
user_id/device_id to update stream_id and the relevant json-encoded
opentracing context
Returns:
Expand Down Expand Up @@ -599,7 +601,7 @@ async def get_all_device_list_changes_for_remotes(
between the requested tokens due to the limit.
The token returned can be used in a subsequent call to this
function to get further updatees.
function to get further updates.
The updates are a list of 2-tuples of stream ID and the row data
"""
Expand Down
51 changes: 25 additions & 26 deletions synapse/storage/databases/main/directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,29 @@
# limitations under the License.

from collections import namedtuple
from typing import Optional

from twisted.internet import defer
from typing import Iterable, Optional

from synapse.api.errors import SynapseError
from synapse.storage._base import SQLBaseStore
from synapse.types import RoomAlias
from synapse.util.caches.descriptors import cached

RoomAliasMapping = namedtuple("RoomAliasMapping", ("room_id", "room_alias", "servers"))


class DirectoryWorkerStore(SQLBaseStore):
@defer.inlineCallbacks
def get_association_from_room_alias(self, room_alias):
""" Get's the room_id and server list for a given room_alias
async def get_association_from_room_alias(
self, room_alias: RoomAlias
) -> Optional[RoomAliasMapping]:
"""Gets the room_id and server list for a given room_alias
Args:
room_alias (RoomAlias)
room_alias: The alias to translate to an ID.
Returns:
Deferred: results in namedtuple with keys "room_id" and
"servers" or None if no association can be found
The room alias mapping or None if no association can be found.
"""
room_id = yield self.db_pool.simple_select_one_onecol(
room_id = await self.db_pool.simple_select_one_onecol(
"room_aliases",
{"room_alias": room_alias.to_string()},
"room_id",
Expand All @@ -48,7 +47,7 @@ def get_association_from_room_alias(self, room_alias):
if not room_id:
return None

servers = yield self.db_pool.simple_select_onecol(
servers = await self.db_pool.simple_select_onecol(
"room_alias_servers",
{"room_alias": room_alias.to_string()},
"server",
Expand Down Expand Up @@ -79,18 +78,20 @@ def get_aliases_for_room(self, room_id):


class DirectoryStore(DirectoryWorkerStore):
@defer.inlineCallbacks
def create_room_alias_association(self, room_alias, room_id, servers, creator=None):
async def create_room_alias_association(
self,
room_alias: RoomAlias,
room_id: str,
servers: Iterable[str],
creator: Optional[str] = None,
) -> None:
""" Creates an association between a room alias and room_id/servers
Args:
room_alias (RoomAlias)
room_id (str)
servers (list)
creator (str): Optional user_id of creator.
Returns:
Deferred
room_alias: The alias to create.
room_id: The target of the alias.
servers: A list of servers through which it may be possible to join the room
creator: Optional user_id of creator.
"""

def alias_txn(txn):
Expand Down Expand Up @@ -118,24 +119,22 @@ def alias_txn(txn):
)

try:
ret = yield self.db_pool.runInteraction(
await self.db_pool.runInteraction(
"create_room_alias_association", alias_txn
)
except self.database_engine.module.IntegrityError:
raise SynapseError(
409, "Room alias %s already exists" % room_alias.to_string()
)
return ret

@defer.inlineCallbacks
def delete_room_alias(self, room_alias):
room_id = yield self.db_pool.runInteraction(
async def delete_room_alias(self, room_alias: RoomAlias) -> str:
room_id = await self.db_pool.runInteraction(
"delete_room_alias", self._delete_room_alias_txn, room_alias
)

return room_id

def _delete_room_alias_txn(self, txn, room_alias):
def _delete_room_alias_txn(self, txn, room_alias: RoomAlias) -> str:
txn.execute(
"SELECT room_id FROM room_aliases WHERE room_alias = ?",
(room_alias.to_string(),),
Expand Down
Loading

0 comments on commit c5aaa80

Please sign in to comment.