Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Support enabling/disabling pushers (from MSC3881) #13799

Merged
merged 18 commits into from
Sep 21, 2022
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/13799.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add experimental support for enabling or disabling individual pushers (as a partial implementation of [MSC3881](https://github.com/matrix-org/matrix-spec-proposals/pull/3881)).
1 change: 1 addition & 0 deletions synapse/_scripts/synapse_port_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
"e2e_fallback_keys_json": ["used"],
"access_tokens": ["used"],
"device_lists_changes_in_room": ["converted_to_destinations"],
"pushers": ["enabled"],
}


Expand Down
3 changes: 3 additions & 0 deletions synapse/config/experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,6 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:

# MSC3852: Expose last seen user agent field on /_matrix/client/v3/devices.
self.msc3852_enabled: bool = experimental.get("msc3852_enabled", False)

# MSC3881: Remotely toggle push notifications for another client
self.msc3881_enabled: bool = experimental.get("msc3881_enabled", False)
2 changes: 2 additions & 0 deletions synapse/push/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ class PusherConfig:
last_stream_ordering: int
last_success: Optional[int]
failing_since: Optional[int]
enabled: bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks correct due to the COALESCEs in the relevant queries. But I think you said elsewhere that this field is meant to be nullable when given to clients. (MSC: "A new nullable field enabled is added to the Pusher model.")

Does "nullable" in that sentence actually mean "optional"? In other words, does it mean that

  • the field may not be present,
  • but if it is, it must be a boolean?

Copy link
Contributor Author

@babolivier babolivier Sep 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The MSC does say nullable but as you pointed out it's a mistake. It's optional, and its absence should be interpreted as true. The field that's nullable is device_id in GET /pushers responses, which is implemented in #13831.


def as_dict(self) -> Dict[str, Any]:
"""Information that can be retrieved about a pusher after creation."""
Expand All @@ -128,6 +129,7 @@ def as_dict(self) -> Dict[str, Any]:
"lang": self.lang,
"profile_tag": self.profile_tag,
"pushkey": self.pushkey,
"enabled": self.enabled,
}


Expand Down
79 changes: 60 additions & 19 deletions synapse/push/pusherpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ async def add_pusher(
lang: Optional[str],
data: JsonDict,
profile_tag: str = "",
enabled: bool = True,
) -> Optional[Pusher]:
"""Creates a new pusher and adds it to the pool

Expand Down Expand Up @@ -147,9 +148,20 @@ async def add_pusher(
last_stream_ordering=last_stream_ordering,
last_success=None,
failing_since=None,
enabled=enabled,
)
)

# Before we actually create the pusher, we check if the user already has one for
babolivier marked this conversation as resolved.
Show resolved Hide resolved
# this app ID and pushkey. If so, we want to keep the access token in place,
# since this could be one device modifying (e.g. enabling/disabling) another
# device's pusher.
existing_config = await self._get_pusher_config_for_user_by_app_id_and_pushkey(
user_id, app_id, pushkey
)
if existing_config:
access_token = existing_config.access_token

DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
await self.store.add_pusher(
user_id=user_id,
access_token=access_token,
Expand All @@ -163,8 +175,9 @@ async def add_pusher(
data=data,
last_stream_ordering=last_stream_ordering,
profile_tag=profile_tag,
enabled=enabled,
)
pusher = await self.start_pusher_by_id(app_id, pushkey, user_id)
pusher = await self.process_pusher_change_by_id(app_id, pushkey, user_id)

return pusher

Expand Down Expand Up @@ -276,10 +289,25 @@ async def on_new_receipts(
except Exception:
logger.exception("Exception in pusher on_new_receipts")

async def start_pusher_by_id(
async def _get_pusher_config_for_user_by_app_id_and_pushkey(
self, user_id: str, app_id: str, pushkey: str
) -> Optional[PusherConfig]:
resultlist = await self.store.get_pushers_by_app_id_and_pushkey(app_id, pushkey)

pusher_config = None
for r in resultlist:
if r.user_name == user_id:
pusher_config = r

return pusher_config

async def process_pusher_change_by_id(
self, app_id: str, pushkey: str, user_id: str
) -> Optional[Pusher]:
"""Look up the details for the given pusher, and start it
"""Look up the details for the given pusher, and either start it if its
"enabled" flag is True, or try to stop it otherwise.

If the pusher is new and its "enabled" flag is False, the stop is a noop.

Returns:
The pusher started, if any
Expand All @@ -290,12 +318,13 @@ async def start_pusher_by_id(
if not self._pusher_shard_config.should_handle(self._instance_name, user_id):
return None

resultlist = await self.store.get_pushers_by_app_id_and_pushkey(app_id, pushkey)
pusher_config = await self._get_pusher_config_for_user_by_app_id_and_pushkey(
user_id, app_id, pushkey
)

pusher_config = None
for r in resultlist:
if r.user_name == user_id:
pusher_config = r
if pusher_config and not pusher_config.enabled:
self.maybe_stop_pusher(app_id, pushkey, user_id)
return None

pusher = None
if pusher_config:
Expand All @@ -305,7 +334,7 @@ async def start_pusher_by_id(

async def _start_pushers(self) -> None:
"""Start all the pushers"""
pushers = await self.store.get_all_pushers()
pushers = await self.store.get_enabled_pushers()

# Stagger starting up the pushers so we don't completely drown the
# process on start up.
Expand Down Expand Up @@ -363,6 +392,8 @@ async def _start_pusher(self, pusher_config: PusherConfig) -> Optional[Pusher]:

synapse_pushers.labels(type(pusher).__name__, pusher.app_id).inc()

logger.info("Starting pusher %s / %s", pusher.user_id, appid_pushkey)

# Check if there *may* be push to process. We do this as this check is a
# lot cheaper to do than actually fetching the exact rows we need to
# push.
Expand All @@ -382,16 +413,7 @@ async def _start_pusher(self, pusher_config: PusherConfig) -> Optional[Pusher]:
return pusher

async def remove_pusher(self, app_id: str, pushkey: str, user_id: str) -> None:
appid_pushkey = "%s:%s" % (app_id, pushkey)

byuser = self.pushers.get(user_id, {})

if appid_pushkey in byuser:
logger.info("Stopping pusher %s / %s", user_id, appid_pushkey)
pusher = byuser.pop(appid_pushkey)
pusher.on_stop()

synapse_pushers.labels(type(pusher).__name__, pusher.app_id).dec()
self.maybe_stop_pusher(app_id, pushkey, user_id)

# We can only delete pushers on master.
if self._remove_pusher_client:
Expand All @@ -402,3 +424,22 @@ async def remove_pusher(self, app_id: str, pushkey: str, user_id: str) -> None:
await self.store.delete_pusher_by_app_id_pushkey_user_id(
app_id, pushkey, user_id
)

def maybe_stop_pusher(self, app_id: str, pushkey: str, user_id: str) -> None:
"""Stops a pusher with the given app ID and push key if one is running.

Args:
app_id: the pusher's app ID.
pushkey: the pusher's push key.
user_id: the user the pusher belongs to. Only used for logging.
"""
appid_pushkey = "%s:%s" % (app_id, pushkey)

byuser = self.pushers.get(user_id, {})

if appid_pushkey in byuser:
logger.info("Stopping pusher %s / %s", user_id, appid_pushkey)
pusher = byuser.pop(appid_pushkey)
pusher.on_stop()

synapse_pushers.labels(type(pusher).__name__, pusher.app_id).dec()
10 changes: 7 additions & 3 deletions synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,9 @@ async def on_rdata(
if row.deleted:
self.stop_pusher(row.user_id, row.app_id, row.pushkey)
else:
await self.start_pusher(row.user_id, row.app_id, row.pushkey)
await self.process_pusher_change(
row.user_id, row.app_id, row.pushkey
)
elif stream_name == EventsStream.NAME:
# We shouldn't get multiple rows per token for events stream, so
# we don't need to optimise this for multiple rows.
Expand Down Expand Up @@ -334,13 +336,15 @@ def stop_pusher(self, user_id: str, app_id: str, pushkey: str) -> None:
logger.info("Stopping pusher %r / %r", user_id, key)
pusher.on_stop()

async def start_pusher(self, user_id: str, app_id: str, pushkey: str) -> None:
async def process_pusher_change(
self, user_id: str, app_id: str, pushkey: str
) -> None:
if not self._notify_pushers:
return

key = "%s:%s" % (app_id, pushkey)
logger.info("Starting pusher %r / %r", user_id, key)
await self._pusher_pool.start_pusher_by_id(app_id, pushkey, user_id)
await self._pusher_pool.process_pusher_change_by_id(app_id, pushkey, user_id)


class FederationSenderHandler:
Expand Down
16 changes: 14 additions & 2 deletions synapse/rest/client/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def __init__(self, hs: "HomeServer"):
super().__init__()
self.hs = hs
self.auth = hs.get_auth()
self._msc3881_enabled = self.hs.config.experimental.msc3881_enabled

async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)
Expand All @@ -51,9 +52,14 @@ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
user.to_string()
)

filtered_pushers = [p.as_dict() for p in pushers]
pusher_dicts = [p.as_dict() for p in pushers]

return 200, {"pushers": filtered_pushers}
for pusher in pusher_dicts:
if self._msc3881_enabled:
pusher["org.matrix.msc3881.enabled"] = pusher["enabled"]
del pusher["enabled"]

return 200, {"pushers": pusher_dicts}


class PushersSetRestServlet(RestServlet):
Expand All @@ -65,6 +71,7 @@ def __init__(self, hs: "HomeServer"):
self.auth = hs.get_auth()
self.notifier = hs.get_notifier()
self.pusher_pool = self.hs.get_pusherpool()
self._msc3881_enabled = self.hs.config.experimental.msc3881_enabled

async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)
Expand Down Expand Up @@ -103,6 +110,10 @@ async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
if "append" in content:
append = content["append"]

enabled = True
if self._msc3881_enabled and "org.matrix.msc3881.enabled" in content:
enabled = content["org.matrix.msc3881.enabled"]

if not append:
await self.pusher_pool.remove_pushers_by_app_id_and_pushkey_not_user(
app_id=content["app_id"],
Expand All @@ -122,6 +133,7 @@ async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
lang=content["lang"],
data=content["data"],
profile_tag=content.get("profile_tag", ""),
enabled=enabled,
)
except PusherConfigException as pce:
raise SynapseError(
Expand Down
26 changes: 22 additions & 4 deletions synapse/storage/databases/main/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,19 @@ def _decode_pushers_rows(self, rows: Iterable[dict]) -> Iterator[PusherConfig]:
)
continue

# Pushers created while support for the 'enabled' field is not active
# (either because they were created before said support existed or because
# they were created while the experimental implementation is turned off)
# will have the 'enabled' column set to NULL, which needs to be interpreted
# as True.
if r["enabled"] is None:
r["enabled"] = True
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved

# If we're using SQLite, then boolean values are integers. This is
# troublesome since some code using the return value of this method might
# expect it to be a boolean, or will expose it to clients (in responses).
r["enabled"] = bool(r["enabled"])

yield PusherConfig(**r)

async def get_pushers_by_app_id_and_pushkey(
Expand Down Expand Up @@ -119,19 +132,22 @@ async def get_pushers_by(self, keyvalues: Dict[str, Any]) -> Iterator[PusherConf
"last_stream_ordering",
"last_success",
"failing_since",
"enabled",
],
desc="get_pushers_by",
)
return self._decode_pushers_rows(ret)

async def get_all_pushers(self) -> Iterator[PusherConfig]:
def get_pushers(txn: LoggingTransaction) -> Iterator[PusherConfig]:
txn.execute("SELECT * FROM pushers")
async def get_enabled_pushers(self) -> Iterator[PusherConfig]:
def get_enabled_pushers_txn(txn: LoggingTransaction) -> Iterator[PusherConfig]:
txn.execute("SELECT * FROM pushers WHERE COALESCE(enabled, TRUE)")
rows = self.db_pool.cursor_to_dict(txn)

return self._decode_pushers_rows(rows)

return await self.db_pool.runInteraction("get_all_pushers", get_pushers)
return await self.db_pool.runInteraction(
"get_enabled_pushers", get_enabled_pushers_txn
)

async def get_all_updated_pushers_rows(
self, instance_name: str, last_id: int, current_id: int, limit: int
Expand Down Expand Up @@ -476,6 +492,7 @@ async def add_pusher(
data: Optional[JsonDict],
last_stream_ordering: int,
profile_tag: str = "",
enabled: bool = True,
) -> None:
async with self._pushers_id_gen.get_next() as stream_id:
# no need to lock because `pushers` has a unique key on
Expand All @@ -494,6 +511,7 @@ async def add_pusher(
"last_stream_ordering": last_stream_ordering,
"profile_tag": profile_tag,
"id": stream_id,
"enabled": enabled,
},
desc="add_pusher",
lock=False,
Expand Down
16 changes: 16 additions & 0 deletions synapse/storage/schema/main/delta/73/02add_pusher_enabled.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/* Copyright 2022 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

ALTER TABLE pushers ADD COLUMN enabled BOOLEAN;
babolivier marked this conversation as resolved.
Show resolved Hide resolved
Loading