Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Support enabling/disabling pushers (from MSC3881) #13799

Merged
merged 18 commits into from
Sep 21, 2022
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/13799.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add experimental support for enabling or disabling individual pushers (as a partial implementation of [MSC3881](https://github.com/matrix-org/matrix-spec-proposals/pull/3881)).
3 changes: 3 additions & 0 deletions synapse/config/experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,6 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:

# MSC3852: Expose last seen user agent field on /_matrix/client/v3/devices.
self.msc3852_enabled: bool = experimental.get("msc3852_enabled", False)

# MSC3881: Remotely toggle push notifications for another client
self.msc3881_enabled: bool = experimental.get("msc3881_enabled", False)
2 changes: 2 additions & 0 deletions synapse/push/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ class PusherConfig:
last_stream_ordering: int
last_success: Optional[int]
failing_since: Optional[int]
enabled: bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks correct due to the COALESCEs in the relevant queries. But I think you said elsewhere that this field is meant to be nullable when given to clients. (MSC: "A new nullable field enabled is added to the Pusher model.")

Does "nullable" in that sentence actually mean "optional"? In other words, does it mean that

  • the field may not be present,
  • but if it is, it must be a boolean?

Copy link
Contributor Author

@babolivier babolivier Sep 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The MSC does say nullable but as you pointed out it's a mistake. It's optional, and its absence should be interpreted as true. The field that's nullable is device_id in GET /pushers responses, which is implemented in #13831.


def as_dict(self) -> Dict[str, Any]:
"""Information that can be retrieved about a pusher after creation."""
Expand All @@ -128,6 +129,7 @@ def as_dict(self) -> Dict[str, Any]:
"lang": self.lang,
"profile_tag": self.profile_tag,
"pushkey": self.pushkey,
"enabled": self.enabled,
}


Expand Down
50 changes: 36 additions & 14 deletions synapse/push/pusherpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ async def add_pusher(
lang: Optional[str],
data: JsonDict,
profile_tag: str = "",
enabled: bool = True,
) -> Optional[Pusher]:
"""Creates a new pusher and adds it to the pool

Expand Down Expand Up @@ -147,6 +148,7 @@ async def add_pusher(
last_stream_ordering=last_stream_ordering,
last_success=None,
failing_since=None,
enabled=enabled,
)
)

Expand All @@ -163,8 +165,9 @@ async def add_pusher(
data=data,
last_stream_ordering=last_stream_ordering,
profile_tag=profile_tag,
enabled=enabled,
)
pusher = await self.start_pusher_by_id(app_id, pushkey, user_id)
pusher = await self.process_pusher_change_by_id(app_id, pushkey, user_id)

return pusher

Expand Down Expand Up @@ -276,10 +279,13 @@ async def on_new_receipts(
except Exception:
logger.exception("Exception in pusher on_new_receipts")

async def start_pusher_by_id(
async def process_pusher_change_by_id(
self, app_id: str, pushkey: str, user_id: str
) -> Optional[Pusher]:
"""Look up the details for the given pusher, and start it
"""Look up the details for the given pusher, and either start it if its
"enabled" flag is True, or try to stop it otherwise.

If the pusher is new and its "enabled" flag is False, the stop is a noop.

Returns:
The pusher started, if any
Expand All @@ -297,6 +303,10 @@ async def start_pusher_by_id(
if r.user_name == user_id:
pusher_config = r

if pusher_config and not pusher_config.enabled:
self.maybe_stop_pusher(app_id, pushkey, user_id)
return None

pusher = None
if pusher_config:
pusher = await self._start_pusher(pusher_config)
Expand All @@ -305,7 +315,7 @@ async def start_pusher_by_id(

async def _start_pushers(self) -> None:
"""Start all the pushers"""
pushers = await self.store.get_all_pushers()
pushers = await self.store.get_enabled_pushers()

# Stagger starting up the pushers so we don't completely drown the
# process on start up.
Expand Down Expand Up @@ -363,6 +373,8 @@ async def _start_pusher(self, pusher_config: PusherConfig) -> Optional[Pusher]:

synapse_pushers.labels(type(pusher).__name__, pusher.app_id).inc()

logger.info("Starting pusher %s / %s", pusher.user_id, appid_pushkey)

# Check if there *may* be push to process. We do this as this check is a
# lot cheaper to do than actually fetching the exact rows we need to
# push.
Expand All @@ -382,16 +394,7 @@ async def _start_pusher(self, pusher_config: PusherConfig) -> Optional[Pusher]:
return pusher

async def remove_pusher(self, app_id: str, pushkey: str, user_id: str) -> None:
appid_pushkey = "%s:%s" % (app_id, pushkey)

byuser = self.pushers.get(user_id, {})

if appid_pushkey in byuser:
logger.info("Stopping pusher %s / %s", user_id, appid_pushkey)
pusher = byuser.pop(appid_pushkey)
pusher.on_stop()

synapse_pushers.labels(type(pusher).__name__, pusher.app_id).dec()
self.maybe_stop_pusher(app_id, pushkey, user_id)

# We can only delete pushers on master.
if self._remove_pusher_client:
Expand All @@ -402,3 +405,22 @@ async def remove_pusher(self, app_id: str, pushkey: str, user_id: str) -> None:
await self.store.delete_pusher_by_app_id_pushkey_user_id(
app_id, pushkey, user_id
)

def maybe_stop_pusher(self, app_id: str, pushkey: str, user_id: str) -> None:
"""Stops a pusher with the given app ID and push key if one is running.

Args:
app_id: the pusher's app ID.
pushkey: the pusher's push key.
user_id: the user the pusher belongs to. Only used for logging.
"""
appid_pushkey = "%s:%s" % (app_id, pushkey)

byuser = self.pushers.get(user_id, {})

if appid_pushkey in byuser:
logger.info("Stopping pusher %s / %s", user_id, appid_pushkey)
pusher = byuser.pop(appid_pushkey)
pusher.on_stop()

synapse_pushers.labels(type(pusher).__name__, pusher.app_id).dec()
10 changes: 7 additions & 3 deletions synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,9 @@ async def on_rdata(
if row.deleted:
self.stop_pusher(row.user_id, row.app_id, row.pushkey)
else:
await self.start_pusher(row.user_id, row.app_id, row.pushkey)
await self.process_pusher_change(
row.user_id, row.app_id, row.pushkey
)
elif stream_name == EventsStream.NAME:
# We shouldn't get multiple rows per token for events stream, so
# we don't need to optimise this for multiple rows.
Expand Down Expand Up @@ -334,13 +336,15 @@ def stop_pusher(self, user_id: str, app_id: str, pushkey: str) -> None:
logger.info("Stopping pusher %r / %r", user_id, key)
pusher.on_stop()

async def start_pusher(self, user_id: str, app_id: str, pushkey: str) -> None:
async def process_pusher_change(
self, user_id: str, app_id: str, pushkey: str
) -> None:
if not self._notify_pushers:
return

key = "%s:%s" % (app_id, pushkey)
logger.info("Starting pusher %r / %r", user_id, key)
await self._pusher_pool.start_pusher_by_id(app_id, pushkey, user_id)
await self._pusher_pool.process_pusher_change_by_id(app_id, pushkey, user_id)


class FederationSenderHandler:
Expand Down
12 changes: 12 additions & 0 deletions synapse/rest/client/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def __init__(self, hs: "HomeServer"):
super().__init__()
self.hs = hs
self.auth = hs.get_auth()
self._msc3881_enabled = self.hs.config.experimental.msc3881_enabled

async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)
Expand All @@ -53,6 +54,11 @@ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:

filtered_pushers = [p.as_dict() for p in pushers]

for pusher in filtered_pushers:
if self._msc3881_enabled:
pusher["org.matrix.msc3881.enabled"] = bool(pusher["enabled"])
del pusher["enabled"]

babolivier marked this conversation as resolved.
Show resolved Hide resolved
return 200, {"pushers": filtered_pushers}


Expand All @@ -65,6 +71,7 @@ def __init__(self, hs: "HomeServer"):
self.auth = hs.get_auth()
self.notifier = hs.get_notifier()
self.pusher_pool = self.hs.get_pusherpool()
self._msc3881_enabled = self.hs.config.experimental.msc3881_enabled

async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)
Expand Down Expand Up @@ -103,6 +110,10 @@ async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
if "append" in content:
append = content["append"]

enabled = True
if self._msc3881_enabled and "org.matrix.msc3881.enabled" in content:
enabled = content["org.matrix.msc3881.enabled"]

if not append:
await self.pusher_pool.remove_pushers_by_app_id_and_pushkey_not_user(
app_id=content["app_id"],
Expand All @@ -122,6 +133,7 @@ async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
lang=content["lang"],
data=content["data"],
profile_tag=content.get("profile_tag", ""),
enabled=enabled,
)
except PusherConfigException as pce:
raise SynapseError(
Expand Down
13 changes: 9 additions & 4 deletions synapse/storage/databases/main/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,19 +119,22 @@ async def get_pushers_by(self, keyvalues: Dict[str, Any]) -> Iterator[PusherConf
"last_stream_ordering",
"last_success",
"failing_since",
"enabled",
],
desc="get_pushers_by",
)
return self._decode_pushers_rows(ret)

async def get_all_pushers(self) -> Iterator[PusherConfig]:
def get_pushers(txn: LoggingTransaction) -> Iterator[PusherConfig]:
txn.execute("SELECT * FROM pushers")
async def get_enabled_pushers(self) -> Iterator[PusherConfig]:
def get_enabled_pushers_txn(txn: LoggingTransaction) -> Iterator[PusherConfig]:
txn.execute("SELECT * FROM pushers WHERE COALESCE(enabled, TRUE)")
rows = self.db_pool.cursor_to_dict(txn)

return self._decode_pushers_rows(rows)

return await self.db_pool.runInteraction("get_all_pushers", get_pushers)
return await self.db_pool.runInteraction(
"get_enabled_pushers", get_enabled_pushers_txn
)

async def get_all_updated_pushers_rows(
self, instance_name: str, last_id: int, current_id: int, limit: int
Expand Down Expand Up @@ -476,6 +479,7 @@ async def add_pusher(
data: Optional[JsonDict],
last_stream_ordering: int,
profile_tag: str = "",
enabled: bool = True,
) -> None:
async with self._pushers_id_gen.get_next() as stream_id:
# no need to lock because `pushers` has a unique key on
Expand All @@ -494,6 +498,7 @@ async def add_pusher(
"last_stream_ordering": last_stream_ordering,
"profile_tag": profile_tag,
"id": stream_id,
"enabled": enabled,
},
desc="add_pusher",
lock=False,
Expand Down
16 changes: 16 additions & 0 deletions synapse/storage/schema/main/delta/73/02add_pusher_enabled.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/* Copyright 2022 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

ALTER TABLE pushers ADD COLUMN enabled BOOLEAN;
babolivier marked this conversation as resolved.
Show resolved Hide resolved
81 changes: 77 additions & 4 deletions tests/push/test_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import synapse.rest.admin
from synapse.logging.context import make_deferred_yieldable
from synapse.push import PusherConfigException
from synapse.rest.client import login, push_rule, receipts, room
from synapse.rest.client import login, push_rule, pusher, receipts, room
from synapse.server import HomeServer
from synapse.types import JsonDict
from synapse.util import Clock
Expand All @@ -35,6 +35,7 @@ class HTTPPusherTests(HomeserverTestCase):
login.register_servlets,
receipts.register_servlets,
push_rule.register_servlets,
pusher.register_servlets,
]
user_id = True
hijack_auth = False
Expand Down Expand Up @@ -728,11 +729,31 @@ def _send_read_request(
)
self.assertEqual(channel.code, 200, channel.json_body)

def _make_user_with_pusher(self, username: str) -> Tuple[str, str]:
def _make_user_with_pusher(
self, username: str, enabled: bool = True
) -> Tuple[str, str]:
"""Registers a user and creates a pusher for them.

Args:
username: the localpart of the new user's Matrix ID.
enabled: whether to create the pusher in an enabled or disabled state.
"""
user_id = self.register_user(username, "pass")
access_token = self.login(username, "pass")

# Register the pusher
self._set_pusher(user_id, access_token, enabled)

return user_id, access_token

def _set_pusher(self, user_id: str, access_token: str, enabled: bool) -> None:
"""Creates or updates the pusher for the given user.

Args:
user_id: the user's Matrix ID.
access_token: the access token associated with the pusher.
enabled: whether to enable or disable the pusher.
"""
user_tuple = self.get_success(
self.hs.get_datastores().main.get_user_by_access_token(access_token)
)
Expand All @@ -749,11 +770,10 @@ def _make_user_with_pusher(self, username: str) -> Tuple[str, str]:
pushkey="a@example.com",
lang=None,
data={"url": "http://example.com/_matrix/push/v1/notify"},
enabled=enabled,
)
)

return user_id, access_token

def test_dont_notify_rule_overrides_message(self) -> None:
"""
The override push rule will suppress notification
Expand Down Expand Up @@ -791,3 +811,56 @@ def test_dont_notify_rule_overrides_message(self) -> None:
# The user sends a message back (sends a notification)
self.helper.send(room, body="Hello", tok=access_token)
self.assertEqual(len(self.push_attempts), 1)

@override_config({"experimental_features": {"msc3881_enabled": True}})
def test_disable(self) -> None:
"""Tests that disabling a pusher means it's not pushed to anymore."""
user_id, access_token = self._make_user_with_pusher("user")
other_user_id, other_access_token = self._make_user_with_pusher("otheruser")

room = self.helper.create_room_as(user_id, tok=access_token)
self.helper.join(room=room, user=other_user_id, tok=other_access_token)

# Send a message and check that it generated a push.
self.helper.send(room, body="Hi!", tok=other_access_token)
self.assertEqual(len(self.push_attempts), 1)

# Disable the pusher.
self._set_pusher(user_id, access_token, enabled=False)

# Send another message and check that it did not generate a push.
self.helper.send(room, body="Hi!", tok=other_access_token)
self.assertEqual(len(self.push_attempts), 1)

# Get the pushers for the user and check that it is marked as disabled.
channel = self.make_request("GET", "/pushers", access_token=access_token)
self.assertEqual(channel.code, 200)
self.assertEqual(len(channel.json_body["pushers"]), 1)
self.assertFalse(channel.json_body["pushers"][0]["org.matrix.msc3881.enabled"])

@override_config({"experimental_features": {"msc3881_enabled": True}})
def test_enable(self) -> None:
"""Tests that enabling a disabled pusher means it gets pushed to."""
# Create the user with the pusher already disabled.
user_id, access_token = self._make_user_with_pusher("user", enabled=False)
other_user_id, other_access_token = self._make_user_with_pusher("otheruser")

room = self.helper.create_room_as(user_id, tok=access_token)
self.helper.join(room=room, user=other_user_id, tok=other_access_token)

# Send a message and check that it did not generate a push.
self.helper.send(room, body="Hi!", tok=other_access_token)
self.assertEqual(len(self.push_attempts), 0)

# Enable the pusher.
self._set_pusher(user_id, access_token, enabled=True)

# Send another message and check that it did generate a push.
self.helper.send(room, body="Hi!", tok=other_access_token)
self.assertEqual(len(self.push_attempts), 1)

# Get the pushers for the user and check that it is marked as enabled.
channel = self.make_request("GET", "/pushers", access_token=access_token)
self.assertEqual(channel.code, 200)
self.assertEqual(len(channel.json_body["pushers"]), 1)
self.assertTrue(channel.json_body["pushers"][0]["org.matrix.msc3881.enabled"])