Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Move update_client_ip background job from the main process to the background worker. #12251

Merged
merged 13 commits into from
Apr 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/12251.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Offload the `update_client_ip` background job from the main process to the background worker, when using Redis-based replication.
2 changes: 0 additions & 2 deletions synapse/app/admin_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.events import SlavedEventStore
Expand Down Expand Up @@ -61,7 +60,6 @@ class AdminCmdSlavedStore(
SlavedDeviceStore,
SlavedPushRuleStore,
SlavedEventStore,
SlavedClientIpStore,
BaseSlavedStore,
RoomWorkerStore,
):
Expand Down
2 changes: 0 additions & 2 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.directory import DirectoryStore
Expand Down Expand Up @@ -247,7 +246,6 @@ class GenericWorkerSlavedStore(
SlavedApplicationServiceStore,
SlavedRegistrationStore,
SlavedProfileStore,
SlavedClientIpStore,
SlavedFilteringStore,
MonthlyActiveUsersWorkerStore,
MediaRepositoryStore,
Expand Down
59 changes: 0 additions & 59 deletions synapse/replication/slave/storage/client_ips.py

This file was deleted.

8 changes: 7 additions & 1 deletion synapse/replication/tcp/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ def __init__(
access_token: str,
ip: str,
user_agent: str,
device_id: str,
device_id: Optional[str],
last_seen: int,
):
self.user_id = user_id
Expand Down Expand Up @@ -389,6 +389,12 @@ def to_line(self) -> str:
)
)

def __repr__(self) -> str:
return (
f"UserIpCommand({self.user_id!r}, .., {self.ip!r}, "
f"{self.user_agent!r}, {self.device_id!r}, {self.last_seen})"
)


class RemoteServerUpCommand(_SimpleCommand):
"""Sent when a worker has detected that a remote server is no longer
Expand Down
48 changes: 35 additions & 13 deletions synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,14 @@ def __init__(self, hs: "HomeServer"):
if self._is_master:
self._server_notices_sender = hs.get_server_notices_sender()

if hs.config.redis.redis_enabled:
# If we're using Redis, it's the background worker that should
# receive USER_IP commands and store the relevant client IPs.
self._should_insert_client_ips = hs.config.worker.run_background_tasks
else:
# If we're NOT using Redis, this must be handled by the master
self._should_insert_client_ips = hs.get_instance_name() == "master"

def _add_command_to_stream_queue(
self, conn: IReplicationConnection, cmd: Union[RdataCommand, PositionCommand]
) -> None:
Expand Down Expand Up @@ -401,23 +409,37 @@ def on_USER_IP(
) -> Optional[Awaitable[None]]:
user_ip_cache_counter.inc()

if self._is_master:
if self._is_master or self._should_insert_client_ips:
# We make a point of only returning an awaitable if there's actually
# something to do; on_USER_IP is not an async function, but
# _handle_user_ip is.
# If on_USER_IP returns an awaitable, it gets scheduled as a
# background process (see `BaseReplicationStreamProtocol.handle_command`).
return self._handle_user_ip(cmd)
else:
# Returning None when this process definitely has nothing to do
# reduces the overhead of handling the USER_IP command, which is
# currently broadcast to all workers regardless of utility.
return None

async def _handle_user_ip(self, cmd: UserIpCommand) -> None:
await self._store.insert_client_ip(
cmd.user_id,
cmd.access_token,
cmd.ip,
cmd.user_agent,
cmd.device_id,
cmd.last_seen,
)

assert self._server_notices_sender is not None
await self._server_notices_sender.on_user_ip(cmd.user_id)
"""
Handles a User IP, branching depending on whether we are the main process
and/or the background worker.
"""
if self._is_master:
assert self._server_notices_sender is not None
await self._server_notices_sender.on_user_ip(cmd.user_id)

if self._should_insert_client_ips:
await self._store.insert_client_ip(
cmd.user_id,
cmd.access_token,
cmd.ip,
cmd.user_agent,
cmd.device_id,
cmd.last_seen,
)

def on_RDATA(self, conn: IReplicationConnection, cmd: RdataCommand) -> None:
if cmd.instance_name == self._instance_name:
Expand Down Expand Up @@ -698,7 +720,7 @@ def send_user_ip(
access_token: str,
ip: str,
user_agent: str,
device_id: str,
device_id: Optional[str],
last_seen: int,
) -> None:
"""Tell the master that the user made a request."""
Expand Down
8 changes: 4 additions & 4 deletions synapse/storage/databases/main/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from .appservice import ApplicationServiceStore, ApplicationServiceTransactionStore
from .cache import CacheInvalidationWorkerStore
from .censor_events import CensorEventsStore
from .client_ips import ClientIpStore
from .client_ips import ClientIpWorkerStore
from .deviceinbox import DeviceInboxStore
from .devices import DeviceStore
from .directory import DirectoryStore
Expand All @@ -49,7 +49,7 @@
from .lock import LockStore
from .media_repository import MediaRepositoryStore
from .metrics import ServerMetricsStore
from .monthly_active_users import MonthlyActiveUsersStore
from .monthly_active_users import MonthlyActiveUsersWorkerStore
from .openid import OpenIdStore
from .presence import PresenceStore
from .profile import ProfileStore
Expand Down Expand Up @@ -112,13 +112,13 @@ class DataStore(
AccountDataStore,
EventPushActionsStore,
OpenIdStore,
ClientIpStore,
ClientIpWorkerStore,
DeviceStore,
DeviceInboxStore,
UserDirectoryStore,
GroupServerStore,
UserErasureStore,
MonthlyActiveUsersStore,
MonthlyActiveUsersWorkerStore,
StatsStore,
RelationsStore,
CensorEventsStore,
Expand Down
Loading