Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Support enabling/disabling pushers (from MSC3881) #13799

Merged
merged 18 commits into from
Sep 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/13799.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add experimental support for [MSC3881: Remotely toggle push notifications for another client](https://github.com/matrix-org/matrix-spec-proposals/pull/3881).
1 change: 1 addition & 0 deletions synapse/_scripts/synapse_port_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
"e2e_fallback_keys_json": ["used"],
"access_tokens": ["used"],
"device_lists_changes_in_room": ["converted_to_destinations"],
"pushers": ["enabled"],
}


Expand Down
3 changes: 3 additions & 0 deletions synapse/config/experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,6 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:

# MSC3852: Expose last seen user agent field on /_matrix/client/v3/devices.
self.msc3852_enabled: bool = experimental.get("msc3852_enabled", False)

# MSC3881: Remotely toggle push notifications for another client
self.msc3881_enabled: bool = experimental.get("msc3881_enabled", False)
4 changes: 2 additions & 2 deletions synapse/handlers/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -997,15 +997,15 @@ async def _register_email_threepid(
assert user_tuple
token_id = user_tuple.token_id

await self.pusher_pool.add_pusher(
await self.pusher_pool.add_or_update_pusher(
user_id=user_id,
access_token=token_id,
kind="email",
app_id="m.email",
app_display_name="Email Notifications",
device_display_name=threepid["address"],
pushkey=threepid["address"],
lang=None, # We don't know a user's language here
lang=None,
data={},
)

Expand Down
2 changes: 2 additions & 0 deletions synapse/push/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ class PusherConfig:
last_stream_ordering: int
last_success: Optional[int]
failing_since: Optional[int]
enabled: bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks correct due to the COALESCEs in the relevant queries. But I think you said elsewhere that this field is meant to be nullable when given to clients. (MSC: "A new nullable field enabled is added to the Pusher model.")

Does "nullable" in that sentence actually mean "optional"? In other words, does it mean that

  • the field may not be present,
  • but if it is, it must be a boolean?

Copy link
Contributor Author

@babolivier babolivier Sep 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The MSC does say nullable but as you pointed out it's a mistake. It's optional, and its absence should be interpreted as true. The field that's nullable is device_id in GET /pushers responses, which is implemented in #13831.


def as_dict(self) -> Dict[str, Any]:
"""Information that can be retrieved about a pusher after creation."""
Expand All @@ -128,6 +129,7 @@ def as_dict(self) -> Dict[str, Any]:
"lang": self.lang,
"profile_tag": self.profile_tag,
"pushkey": self.pushkey,
"enabled": self.enabled,
}


Expand Down
81 changes: 61 additions & 20 deletions synapse/push/pusherpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def start(self) -> None:
return
run_as_background_process("start_pushers", self._start_pushers)

async def add_pusher(
async def add_or_update_pusher(
self,
user_id: str,
access_token: Optional[int],
Expand All @@ -106,6 +106,7 @@ async def add_pusher(
lang: Optional[str],
data: JsonDict,
profile_tag: str = "",
enabled: bool = True,
) -> Optional[Pusher]:
"""Creates a new pusher and adds it to the pool
Expand Down Expand Up @@ -147,9 +148,20 @@ async def add_pusher(
last_stream_ordering=last_stream_ordering,
last_success=None,
failing_since=None,
enabled=enabled,
)
)

# Before we actually persist the pusher, we check if the user already has one
# for this app ID and pushkey. If so, we want to keep the access token in place,
# since this could be one device modifying (e.g. enabling/disabling) another
# device's pusher.
existing_config = await self._get_pusher_config_for_user_by_app_id_and_pushkey(
user_id, app_id, pushkey
)
if existing_config:
access_token = existing_config.access_token

await self.store.add_pusher(
user_id=user_id,
access_token=access_token,
Expand All @@ -163,8 +175,9 @@ async def add_pusher(
data=data,
last_stream_ordering=last_stream_ordering,
profile_tag=profile_tag,
enabled=enabled,
)
pusher = await self.start_pusher_by_id(app_id, pushkey, user_id)
pusher = await self.process_pusher_change_by_id(app_id, pushkey, user_id)

return pusher

Expand Down Expand Up @@ -276,10 +289,25 @@ async def on_new_receipts(
except Exception:
logger.exception("Exception in pusher on_new_receipts")

async def start_pusher_by_id(
async def _get_pusher_config_for_user_by_app_id_and_pushkey(
self, user_id: str, app_id: str, pushkey: str
) -> Optional[PusherConfig]:
resultlist = await self.store.get_pushers_by_app_id_and_pushkey(app_id, pushkey)

pusher_config = None
for r in resultlist:
if r.user_name == user_id:
pusher_config = r

return pusher_config

async def process_pusher_change_by_id(
self, app_id: str, pushkey: str, user_id: str
) -> Optional[Pusher]:
"""Look up the details for the given pusher, and start it
"""Look up the details for the given pusher, and either start it if its
"enabled" flag is True, or try to stop it otherwise.
If the pusher is new and its "enabled" flag is False, the stop is a noop.
Returns:
The pusher started, if any
Expand All @@ -290,12 +318,13 @@ async def start_pusher_by_id(
if not self._pusher_shard_config.should_handle(self._instance_name, user_id):
return None

resultlist = await self.store.get_pushers_by_app_id_and_pushkey(app_id, pushkey)
pusher_config = await self._get_pusher_config_for_user_by_app_id_and_pushkey(
user_id, app_id, pushkey
)

pusher_config = None
for r in resultlist:
if r.user_name == user_id:
pusher_config = r
if pusher_config and not pusher_config.enabled:
self.maybe_stop_pusher(app_id, pushkey, user_id)
return None

pusher = None
if pusher_config:
Expand All @@ -305,7 +334,7 @@ async def start_pusher_by_id(

async def _start_pushers(self) -> None:
"""Start all the pushers"""
pushers = await self.store.get_all_pushers()
pushers = await self.store.get_enabled_pushers()

# Stagger starting up the pushers so we don't completely drown the
# process on start up.
Expand Down Expand Up @@ -363,6 +392,8 @@ async def _start_pusher(self, pusher_config: PusherConfig) -> Optional[Pusher]:

synapse_pushers.labels(type(pusher).__name__, pusher.app_id).inc()

logger.info("Starting pusher %s / %s", pusher.user_id, appid_pushkey)

# Check if there *may* be push to process. We do this as this check is a
# lot cheaper to do than actually fetching the exact rows we need to
# push.
Expand All @@ -382,16 +413,7 @@ async def _start_pusher(self, pusher_config: PusherConfig) -> Optional[Pusher]:
return pusher

async def remove_pusher(self, app_id: str, pushkey: str, user_id: str) -> None:
appid_pushkey = "%s:%s" % (app_id, pushkey)

byuser = self.pushers.get(user_id, {})

if appid_pushkey in byuser:
logger.info("Stopping pusher %s / %s", user_id, appid_pushkey)
pusher = byuser.pop(appid_pushkey)
pusher.on_stop()

synapse_pushers.labels(type(pusher).__name__, pusher.app_id).dec()
self.maybe_stop_pusher(app_id, pushkey, user_id)

# We can only delete pushers on master.
if self._remove_pusher_client:
Expand All @@ -402,3 +424,22 @@ async def remove_pusher(self, app_id: str, pushkey: str, user_id: str) -> None:
await self.store.delete_pusher_by_app_id_pushkey_user_id(
app_id, pushkey, user_id
)

def maybe_stop_pusher(self, app_id: str, pushkey: str, user_id: str) -> None:
"""Stops a pusher with the given app ID and push key if one is running.
Args:
app_id: the pusher's app ID.
pushkey: the pusher's push key.
user_id: the user the pusher belongs to. Only used for logging.
"""
appid_pushkey = "%s:%s" % (app_id, pushkey)

byuser = self.pushers.get(user_id, {})

if appid_pushkey in byuser:
logger.info("Stopping pusher %s / %s", user_id, appid_pushkey)
pusher = byuser.pop(appid_pushkey)
pusher.on_stop()

synapse_pushers.labels(type(pusher).__name__, pusher.app_id).dec()
10 changes: 7 additions & 3 deletions synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,9 @@ async def on_rdata(
if row.deleted:
self.stop_pusher(row.user_id, row.app_id, row.pushkey)
else:
await self.start_pusher(row.user_id, row.app_id, row.pushkey)
await self.process_pusher_change(
row.user_id, row.app_id, row.pushkey
)
elif stream_name == EventsStream.NAME:
# We shouldn't get multiple rows per token for events stream, so
# we don't need to optimise this for multiple rows.
Expand Down Expand Up @@ -334,13 +336,15 @@ def stop_pusher(self, user_id: str, app_id: str, pushkey: str) -> None:
logger.info("Stopping pusher %r / %r", user_id, key)
pusher.on_stop()

async def start_pusher(self, user_id: str, app_id: str, pushkey: str) -> None:
async def process_pusher_change(
self, user_id: str, app_id: str, pushkey: str
) -> None:
if not self._notify_pushers:
return

key = "%s:%s" % (app_id, pushkey)
logger.info("Starting pusher %r / %r", user_id, key)
await self._pusher_pool.start_pusher_by_id(app_id, pushkey, user_id)
await self._pusher_pool.process_pusher_change_by_id(app_id, pushkey, user_id)


class FederationSenderHandler:
Expand Down
4 changes: 2 additions & 2 deletions synapse/rest/admin/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,15 +375,15 @@ async def on_PUT(
and self.hs.config.email.email_notif_for_new_users
and medium == "email"
):
await self.pusher_pool.add_pusher(
await self.pusher_pool.add_or_update_pusher(
user_id=user_id,
access_token=None,
kind="email",
app_id="m.email",
app_display_name="Email Notifications",
device_display_name=address,
pushkey=address,
lang=None, # We don't know a user's language here
lang=None,
data={},
)

Expand Down
18 changes: 15 additions & 3 deletions synapse/rest/client/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def __init__(self, hs: "HomeServer"):
super().__init__()
self.hs = hs
self.auth = hs.get_auth()
self._msc3881_enabled = self.hs.config.experimental.msc3881_enabled

async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)
Expand All @@ -51,9 +52,14 @@ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
user.to_string()
)

filtered_pushers = [p.as_dict() for p in pushers]
pusher_dicts = [p.as_dict() for p in pushers]

return 200, {"pushers": filtered_pushers}
for pusher in pusher_dicts:
if self._msc3881_enabled:
pusher["org.matrix.msc3881.enabled"] = pusher["enabled"]
del pusher["enabled"]

return 200, {"pushers": pusher_dicts}


class PushersSetRestServlet(RestServlet):
Expand All @@ -65,6 +71,7 @@ def __init__(self, hs: "HomeServer"):
self.auth = hs.get_auth()
self.notifier = hs.get_notifier()
self.pusher_pool = self.hs.get_pusherpool()
self._msc3881_enabled = self.hs.config.experimental.msc3881_enabled

async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)
Expand Down Expand Up @@ -103,6 +110,10 @@ async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
if "append" in content:
append = content["append"]

enabled = True
if self._msc3881_enabled and "org.matrix.msc3881.enabled" in content:
enabled = content["org.matrix.msc3881.enabled"]

if not append:
await self.pusher_pool.remove_pushers_by_app_id_and_pushkey_not_user(
app_id=content["app_id"],
Expand All @@ -111,7 +122,7 @@ async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
)

try:
await self.pusher_pool.add_pusher(
await self.pusher_pool.add_or_update_pusher(
user_id=user.to_string(),
access_token=requester.access_token_id,
kind=content["kind"],
Expand All @@ -122,6 +133,7 @@ async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
lang=content["lang"],
data=content["data"],
profile_tag=content.get("profile_tag", ""),
enabled=enabled,
)
except PusherConfigException as pce:
raise SynapseError(
Expand Down
Loading