Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Use a database table to hold the users that should have full presence…
Browse files Browse the repository at this point in the history
… sent to them, instead of something in-memory (#9823)
  • Loading branch information
anoadragon453 authored May 18, 2021
1 parent 206a7b5 commit 4d6e5a5
Show file tree
Hide file tree
Showing 11 changed files with 479 additions and 158 deletions.
1 change: 1 addition & 0 deletions changelog.d/9823.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow sending full presence to users via workers other than the one that called `ModuleApi.send_local_online_presence_to`.
6 changes: 5 additions & 1 deletion docs/presence_router_module.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ async def ModuleApi.send_local_online_presence_to(users: Iterable[str]) -> None
which can be given a list of local or remote MXIDs to broadcast known, online user
presence to (for those users that the receiving user is considered interested in).
It does not include state for users who are currently offline, and it can only be
called on workers that support sending federation.
called on workers that support sending federation. Additionally, this method must
only be called from the process that has been configured to write to the
the [presence stream](https://github.com/matrix-org/synapse/blob/master/docs/workers.md#stream-writers).
By default, this is the main process, but another worker can be configured to do
so.

### Module structure

Expand Down
136 changes: 110 additions & 26 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,21 @@ async def current_state_for_users(

@abc.abstractmethod
async def set_state(
self, target_user: UserID, state: JsonDict, ignore_status_msg: bool = False
self,
target_user: UserID,
state: JsonDict,
ignore_status_msg: bool = False,
force_notify: bool = False,
) -> None:
"""Set the presence state of the user. """
"""Set the presence state of the user.
Args:
target_user: The ID of the user to set the presence state of.
state: The presence state as a JSON dictionary.
ignore_status_msg: True to ignore the "status_msg" field of the `state` dict.
If False, the user's current status will be updated.
force_notify: Whether to force notification of the update to clients.
"""

@abc.abstractmethod
async def bump_presence_active_time(self, user: UserID):
Expand Down Expand Up @@ -296,6 +308,51 @@ async def maybe_send_presence_to_interested_destinations(
for destinations, states in hosts_and_states:
self._federation.send_presence_to_destinations(states, destinations)

async def send_full_presence_to_users(self, user_ids: Collection[str]):
"""
Adds to the list of users who should receive a full snapshot of presence
upon their next sync. Note that this only works for local users.
Then, grabs the current presence state for a given set of users and adds it
to the top of the presence stream.
Args:
user_ids: The IDs of the local users to send full presence to.
"""
# Retrieve one of the users from the given set
if not user_ids:
raise Exception(
"send_full_presence_to_users must be called with at least one user"
)
user_id = next(iter(user_ids))

# Mark all users as receiving full presence on their next sync
await self.store.add_users_to_send_full_presence_to(user_ids)

# Add a new entry to the presence stream. Since we use stream tokens to determine whether a
# local user should receive a full snapshot of presence when they sync, we need to bump the
# presence stream so that subsequent syncs with no presence activity in between won't result
# in the client receiving multiple full snapshots of presence.
#
# If we bump the stream ID, then the user will get a higher stream token next sync, and thus
# correctly won't receive a second snapshot.

# Get the current presence state for one of the users (defaults to offline if not found)
current_presence_state = await self.get_state(UserID.from_string(user_id))

# Convert the UserPresenceState object into a serializable dict
state = {
"presence": current_presence_state.state,
"status_message": current_presence_state.status_msg,
}

# Copy the presence state to the tip of the presence stream.

# We set force_notify=True here so that this presence update is guaranteed to
# increment the presence stream ID (which resending the current user's presence
# otherwise would not do).
await self.set_state(UserID.from_string(user_id), state, force_notify=True)


class _NullContextManager(ContextManager[None]):
"""A context manager which does nothing."""
Expand Down Expand Up @@ -480,8 +537,17 @@ async def set_state(
target_user: UserID,
state: JsonDict,
ignore_status_msg: bool = False,
force_notify: bool = False,
) -> None:
"""Set the presence state of the user."""
"""Set the presence state of the user.
Args:
target_user: The ID of the user to set the presence state of.
state: The presence state as a JSON dictionary.
ignore_status_msg: True to ignore the "status_msg" field of the `state` dict.
If False, the user's current status will be updated.
force_notify: Whether to force notification of the update to clients.
"""
presence = state["presence"]

valid_presence = (
Expand All @@ -508,6 +574,7 @@ async def set_state(
user_id=user_id,
state=state,
ignore_status_msg=ignore_status_msg,
force_notify=force_notify,
)

async def bump_presence_active_time(self, user: UserID) -> None:
Expand Down Expand Up @@ -677,13 +744,19 @@ async def _persist_unpersisted_changes(self) -> None:
[self.user_to_current_state[user_id] for user_id in unpersisted]
)

async def _update_states(self, new_states: Iterable[UserPresenceState]) -> None:
async def _update_states(
self, new_states: Iterable[UserPresenceState], force_notify: bool = False
) -> None:
"""Updates presence of users. Sets the appropriate timeouts. Pokes
the notifier and federation if and only if the changed presence state
should be sent to clients/servers.
Args:
new_states: The new user presence state updates to process.
force_notify: Whether to force notifying clients of this presence state update,
even if it doesn't change the state of a user's presence (e.g online -> online).
This is currently used to bump the max presence stream ID without changing any
user's presence (see PresenceHandler.add_users_to_send_full_presence_to).
"""
now = self.clock.time_msec()

Expand Down Expand Up @@ -720,6 +793,9 @@ async def _update_states(self, new_states: Iterable[UserPresenceState]) -> None:
now=now,
)

if force_notify:
should_notify = True

self.user_to_current_state[user_id] = new_state

if should_notify:
Expand Down Expand Up @@ -1058,9 +1134,21 @@ async def incoming_presence(self, origin: str, content: JsonDict) -> None:
await self._update_states(updates)

async def set_state(
self, target_user: UserID, state: JsonDict, ignore_status_msg: bool = False
self,
target_user: UserID,
state: JsonDict,
ignore_status_msg: bool = False,
force_notify: bool = False,
) -> None:
"""Set the presence state of the user."""
"""Set the presence state of the user.
Args:
target_user: The ID of the user to set the presence state of.
state: The presence state as a JSON dictionary.
ignore_status_msg: True to ignore the "status_msg" field of the `state` dict.
If False, the user's current status will be updated.
force_notify: Whether to force notification of the update to clients.
"""
status_msg = state.get("status_msg", None)
presence = state["presence"]

Expand Down Expand Up @@ -1091,7 +1179,9 @@ async def set_state(
):
new_fields["last_active_ts"] = self.clock.time_msec()

await self._update_states([prev_state.copy_and_replace(**new_fields)])
await self._update_states(
[prev_state.copy_and_replace(**new_fields)], force_notify=force_notify
)

async def is_visible(self, observed_user: UserID, observer_user: UserID) -> bool:
"""Returns whether a user can see another user's presence."""
Expand Down Expand Up @@ -1389,11 +1479,10 @@ def __init__(self, hs: "HomeServer"):
#
# Presence -> Notifier -> PresenceEventSource -> Presence
#
# Same with get_module_api, get_presence_router
# Same with get_presence_router:
#
# AuthHandler -> Notifier -> PresenceEventSource -> ModuleApi -> AuthHandler
self.get_presence_handler = hs.get_presence_handler
self.get_module_api = hs.get_module_api
self.get_presence_router = hs.get_presence_router
self.clock = hs.get_clock()
self.store = hs.get_datastore()
Expand Down Expand Up @@ -1424,16 +1513,21 @@ async def get_new_events(
stream_change_cache = self.store.presence_stream_cache

with Measure(self.clock, "presence.get_new_events"):
if user_id in self.get_module_api()._send_full_presence_to_local_users:
# This user has been specified by a module to receive all current, online
# user presence. Removing from_key and setting include_offline to false
# will do effectively this.
from_key = None
include_offline = False

if from_key is not None:
from_key = int(from_key)

# Check if this user should receive all current, online user presence. We only
# bother to do this if from_key is set, as otherwise the user will receive all
# user presence anyways.
if await self.store.should_user_receive_full_presence_with_token(
user_id, from_key
):
# This user has been specified by a module to receive all current, online
# user presence. Removing from_key and setting include_offline to false
# will do effectively this.
from_key = None
include_offline = False

max_token = self.store.get_current_presence_token()
if from_key == max_token:
# This is necessary as due to the way stream ID generators work
Expand Down Expand Up @@ -1467,12 +1561,6 @@ async def get_new_events(
user_id, include_offline, from_key
)

# Remove the user from the list of users to receive all presence
if user_id in self.get_module_api()._send_full_presence_to_local_users:
self.get_module_api()._send_full_presence_to_local_users.remove(
user_id
)

return presence_updates, max_token

# Make mypy happy. users_interested_in should now be a set
Expand Down Expand Up @@ -1522,10 +1610,6 @@ async def get_new_events(
)
presence_updates = list(users_to_state.values())

# Remove the user from the list of users to receive all presence
if user_id in self.get_module_api()._send_full_presence_to_local_users:
self.get_module_api()._send_full_presence_to_local_users.remove(user_id)

if not include_offline:
# Filter out offline presence states
presence_updates = self._filter_offline_presence_state(presence_updates)
Expand Down
63 changes: 30 additions & 33 deletions synapse/module_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,6 @@ def __init__(self, hs, auth_handler):
self._http_client = hs.get_simple_http_client() # type: SimpleHttpClient
self._public_room_list_manager = PublicRoomListManager(hs)

# The next time these users sync, they will receive the current presence
# state of all local users. Users are added by send_local_online_presence_to,
# and removed after a successful sync.
#
# We make this a private variable to deter modules from accessing it directly,
# though other classes in Synapse will still do so.
self._send_full_presence_to_local_users = set()

@property
def http_client(self):
"""Allows making outbound HTTP requests to remote resources.
Expand Down Expand Up @@ -405,39 +397,44 @@ async def send_local_online_presence_to(self, users: Iterable[str]) -> None:
Updates to remote users will be sent immediately, whereas local users will receive
them on their next sync attempt.
Note that this method can only be run on the main or federation_sender worker
processes.
Note that this method can only be run on the process that is configured to write to the
presence stream. By default this is the main process.
"""
if not self._hs.should_send_federation():
if self._hs._instance_name not in self._hs.config.worker.writers.presence:
raise Exception(
"send_local_online_presence_to can only be run "
"on processes that send federation",
"on the process that is configured to write to the "
"presence stream (by default this is the main process)",
)

local_users = set()
remote_users = set()
for user in users:
if self._hs.is_mine_id(user):
# Modify SyncHandler._generate_sync_entry_for_presence to call
# presence_source.get_new_events with an empty `from_key` if
# that user's ID were in a list modified by ModuleApi somewhere.
# That user would then get all presence state on next incremental sync.

# Force a presence initial_sync for this user next time
self._send_full_presence_to_local_users.add(user)
local_users.add(user)
else:
# Retrieve presence state for currently online users that this user
# is considered interested in
presence_events, _ = await self._presence_stream.get_new_events(
UserID.from_string(user), from_key=None, include_offline=False
)

# Send to remote destinations.

# We pull out the presence handler here to break a cyclic
# dependency between the presence router and module API.
presence_handler = self._hs.get_presence_handler()
await presence_handler.maybe_send_presence_to_interested_destinations(
presence_events
)
remote_users.add(user)

# We pull out the presence handler here to break a cyclic
# dependency between the presence router and module API.
presence_handler = self._hs.get_presence_handler()

if local_users:
# Force a presence initial_sync for these users next time they sync.
await presence_handler.send_full_presence_to_users(local_users)

for user in remote_users:
# Retrieve presence state for currently online users that this user
# is considered interested in.
presence_events, _ = await self._presence_stream.get_new_events(
UserID.from_string(user), from_key=None, include_offline=False
)

# Send to remote destinations.
destination = UserID.from_string(user).domain
presence_handler.get_federation_queue().send_presence_to_destinations(
presence_events, destination
)


class PublicRoomListManager:
Expand Down
11 changes: 9 additions & 2 deletions synapse/replication/http/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class ReplicationPresenceSetState(ReplicationEndpoint):
{
"state": { ... },
"ignore_status_msg": false,
"force_notify": false
}
200 OK
Expand All @@ -91,17 +92,23 @@ def __init__(self, hs: "HomeServer"):
self._presence_handler = hs.get_presence_handler()

@staticmethod
async def _serialize_payload(user_id, state, ignore_status_msg=False):
async def _serialize_payload(
user_id, state, ignore_status_msg=False, force_notify=False
):
return {
"state": state,
"ignore_status_msg": ignore_status_msg,
"force_notify": force_notify,
}

async def _handle_request(self, request, user_id):
content = parse_json_object_from_request(request)

await self._presence_handler.set_state(
UserID.from_string(user_id), content["state"], content["ignore_status_msg"]
UserID.from_string(user_id),
content["state"],
content["ignore_status_msg"],
content["force_notify"],
)

return (
Expand Down
8 changes: 5 additions & 3 deletions synapse/rest/admin/server_notice_servlet.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ def __init__(self, hs: "HomeServer"):
self.hs = hs
self.auth = hs.get_auth()
self.txns = HttpTransactionCache(hs)
self.snm = hs.get_server_notices_manager()

def register(self, json_resource: HttpServer):
PATTERN = "/send_server_notice"
Expand All @@ -77,15 +76,18 @@ async def on_POST(
event_type = body.get("type", EventTypes.Message)
state_key = body.get("state_key")

if not self.snm.is_enabled():
# We grab the server notices manager here as its initialisation has a check for worker processes,
# but worker processes still need to initialise SendServerNoticeServlet (as it is part of the
# admin api).
if not self.hs.get_server_notices_manager().is_enabled():
raise SynapseError(400, "Server notices are not enabled on this server")

user_id = body["user_id"]
UserID.from_string(user_id)
if not self.hs.is_mine_id(user_id):
raise SynapseError(400, "Server notices can only be sent to local users")

event = await self.snm.send_notice(
event = await self.hs.get_server_notices_manager().send_notice(
user_id=body["user_id"],
type=event_type,
state_key=state_key,
Expand Down
Loading

0 comments on commit 4d6e5a5

Please sign in to comment.