From 80d03b10bc602f895d31bde4eec03578bf034666 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 23 Jun 2020 19:01:12 +0100 Subject: [PATCH 01/40] Store unread messages in the database --- changelog.d/7736.feature | 1 + synapse/storage/data_stores/main/events.py | 80 ++++++++++++++++++- .../schema/delta/58/09unread_messages.sql | 21 +++++ 3 files changed, 101 insertions(+), 1 deletion(-) create mode 100644 changelog.d/7736.feature create mode 100644 synapse/storage/data_stores/main/schema/delta/58/09unread_messages.sql diff --git a/changelog.d/7736.feature b/changelog.d/7736.feature new file mode 100644 index 000000000000..c97864677aac --- /dev/null +++ b/changelog.d/7736.feature @@ -0,0 +1 @@ +Add unread messages count to sync responses. diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index cfd24d2f061d..184038556fad 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -25,6 +25,7 @@ from canonicaljson import json from prometheus_client import Counter +from twisted.enterprise.adbapi import Connection from twisted.internet import defer import synapse.metrics @@ -61,6 +62,12 @@ ["type", "origin_type", "origin_entity"], ) +STATE_EVENT_TYPES_TO_MARK_UNREAD = [ + EventTypes.PowerLevels, + EventTypes.Topic, + EventTypes.Name, +] + def encode_json(json_object): """ @@ -977,7 +984,7 @@ def _update_metadata_tables_txn( txn, events=[event for event, _ in events_and_contexts] ) - for event, _ in events_and_contexts: + for event, context in events_and_contexts: if event.type == EventTypes.Name: # Insert into the event_search table. self._store_room_name_txn(txn, event) @@ -1009,6 +1016,8 @@ def _update_metadata_tables_txn( if isinstance(expiry_ts, int) and not event.is_state(): self._insert_event_expiry_txn(txn, event.event_id, expiry_ts) + self._maybe_insert_unread_event_txn(txn, event, context) + # Insert into the room_memberships table. self._store_room_members_txn( txn, @@ -1614,3 +1623,72 @@ def f(txn, stream_ordering): await self.db.runInteraction("locally_reject_invite", f, stream_ordering) return stream_ordering + + def _maybe_insert_unread_event_txn( + self, txn: Connection, event: EventBase, context: EventContext, + ): + """Mark the event as unread for every current member of the room if it passes the + conditions for that. + + These conditions are: the event must either have a body, be an encrypted message, + or be either a power levels event, a room name event or a room topic event, and + must be neither rejected or soft-failed nor an edit or a notice. + + Args: + txn: The transaction to use to retrieve room members and to mark the event + as unread. + event: The event to evaluate and maybe mark as unread. + context: The context in which the event was sent (used to figure out whether + the event has been rejected). + """ + content = event.content + + is_edit = ( + content.get("m.relates_to", {}).get("rel_type") == RelationTypes.REPLACE + ) + is_notice = not event.is_state() and content.get("msgtype") == "m.notice" + + # We don't want rejected or soft-failed events, edits or notices to be marked + # unread. + if ( + context.rejected + or is_edit + or is_notice + or event.internal_metadata.is_soft_failed() + ): + return + + body_exists = content.get("body") is not None + is_state_event_to_mark_unread = ( + event.is_state() and event.type in STATE_EVENT_TYPES_TO_MARK_UNREAD + ) + is_encrypted_message = ( + not event.is_state() and event.type == EventTypes.Encrypted + ) + + # We want to mark unread messages with a body, some state events (power levels, + # room name, room topic) and encrypted messages. + if not (body_exists or is_state_event_to_mark_unread or is_encrypted_message): + return + + # Get the list of users that are currently joined to the room. + users_in_room = self.db.simple_select_onecol_txn( + txn=txn, + table="room_memberships", + keyvalues={"membership": Membership.JOIN, "room_id": event.room_id}, + retcol="user_id", + ) + + # Mark the message as unread for every user currently in the room. + self.db.simple_insert_many_txn( + txn=txn, + table="unread_messages", + values=[ + { + "user_id": user_id, + "stream_ordering": event.internal_metadata.stream_ordering, + "room_id": event.room_id, + } + for user_id in users_in_room + ], + ) diff --git a/synapse/storage/data_stores/main/schema/delta/58/09unread_messages.sql b/synapse/storage/data_stores/main/schema/delta/58/09unread_messages.sql new file mode 100644 index 000000000000..3c7d83b9bbd4 --- /dev/null +++ b/synapse/storage/data_stores/main/schema/delta/58/09unread_messages.sql @@ -0,0 +1,21 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS unread_messages( + user_id TEXT NOT NULL, -- The user for which the message is unread. + stream_ordering BIGINT NOT NULL, -- The position of the message in the event stream. + room_id TEXT NOT NULL, + UNIQUE (user_id, stream_ordering) +); \ No newline at end of file From 9db0bacf1da521d87de4b364dedc1838cce2bd21 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 24 Jun 2020 17:57:41 +0100 Subject: [PATCH 02/40] Count the number of unread messages on sync requests --- synapse/handlers/sync.py | 25 +++++++++++ synapse/rest/client/v2_alpha/sync.py | 1 + synapse/storage/data_stores/main/events.py | 1 + .../storage/data_stores/main/events_worker.py | 44 ++++++++++++++++++- .../schema/delta/58/09unread_messages.sql | 1 + 5 files changed, 71 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 0b82aa72a67e..43173fa9ed60 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -103,6 +103,7 @@ class JoinedSyncResult: account_data = attr.ib(type=List[JsonDict]) unread_notifications = attr.ib(type=JsonDict) summary = attr.ib(type=Optional[JsonDict]) + unread_count = attr.ib(type=int) def __nonzero__(self) -> bool: """Make the result appear empty if there are no updates. This is used @@ -951,6 +952,27 @@ async def unread_notifs_for_room_id( # count is whatever it was last time. return None + async def unread_messages_for_room_id( + self, room_id: str, sync_config: SyncConfig, + ) -> int: + """Retrieve the count of unread message for the current user in the given room. + """ + with Measure(self.clock, "unread_messages_for_room_id"): + last_unread_event_id = await self.store.get_last_receipt_event_id_for_user( + user_id=sync_config.user.to_string(), + room_id=room_id, + receipt_type="m.read", + ) + + if last_unread_event_id: + count = await self.store.get_unread_message_count_for_user( + sync_config.user.to_string(), room_id, last_unread_event_id + ) + return count + + # There is no unread message for this user in this room. + return 0 + async def generate_sync_result( self, sync_config: SyncConfig, @@ -1877,6 +1899,8 @@ async def _generate_room_entry( if room_builder.rtype == "joined": unread_notifications = {} # type: Dict[str, str] + + unread_count = await self.unread_messages_for_room_id(room_id, sync_config) room_sync = JoinedSyncResult( room_id=room_id, timeline=batch, @@ -1885,6 +1909,7 @@ async def _generate_room_entry( account_data=account_data_events, unread_notifications=unread_notifications, summary=summary, + unread_count=unread_count, ) if room_sync or always_include: diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 8fa68dd37f4d..427173ac51a6 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -417,6 +417,7 @@ def serialize(events): result["ephemeral"] = {"events": ephemeral_events} result["unread_notifications"] = room.unread_notifications result["summary"] = room.summary + result["org.matrix.msc2654.unread_count"] = room.unread_count return result diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index 184038556fad..69a9c71cb682 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -1688,6 +1688,7 @@ def _maybe_insert_unread_event_txn( "user_id": user_id, "stream_ordering": event.internal_metadata.stream_ordering, "room_id": event.room_id, + "event_id": event.event_id, } for user_id in users_in_room ], diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py index a48c7a96ca08..359a6ece3a49 100644 --- a/synapse/storage/data_stores/main/events_worker.py +++ b/synapse/storage/data_stores/main/events_worker.py @@ -24,6 +24,7 @@ from canonicaljson import json from constantly import NamedConstant, Names +from twisted.enterprise.adbapi import Connection from twisted.internet import defer from synapse.api.constants import EventTypes @@ -39,7 +40,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause -from synapse.storage.database import Database +from synapse.storage.database import Database, LoggingTransaction from synapse.storage.util.id_generators import StreamIdGenerator from synapse.types import get_domain_from_id from synapse.util.caches.descriptors import Cache, cached, cachedInlineCallbacks @@ -1360,6 +1361,47 @@ def get_next_event_to_expire_txn(txn): desc="get_next_event_to_expire", func=get_next_event_to_expire_txn ) + async def get_unread_message_count_for_user( + self, + user_id: str, + room_id: str, + last_read_event_id: str, + ): + return await self.db.runInteraction( + "get_unread_message_count_for_user", + self._get_unread_message_count_for_user_txn, + user_id, + room_id, + last_read_event_id, + ) + + def _get_unread_message_count_for_user_txn( + self, + txn: LoggingTransaction, + user_id: str, + room_id: str, + last_read_event_id: str, + ): + # Get the stream ordering for the last read event. + stream_ordering = self.db.simple_select_one_onecol_txn( + txn=txn, + table="events", + keyvalues={"room_id": room_id, "event_id": last_read_event_id}, + retcol="stream_ordering", + ) + + # Count the messages that qualify as unread after the stream ordering we've just + # retrieved. + sql = """ + SELECT COUNT(*) FROM unread_messages + WHERE user_id = ? AND room_id = ? AND stream_ordering > ? + """ + + txn.execute(sql, (user_id, room_id, stream_ordering)) + row = txn.fetchone() + + return row[0] if row else 0 + AllNewEventsResult = namedtuple( "AllNewEventsResult", diff --git a/synapse/storage/data_stores/main/schema/delta/58/09unread_messages.sql b/synapse/storage/data_stores/main/schema/delta/58/09unread_messages.sql index 3c7d83b9bbd4..e84f9833d070 100644 --- a/synapse/storage/data_stores/main/schema/delta/58/09unread_messages.sql +++ b/synapse/storage/data_stores/main/schema/delta/58/09unread_messages.sql @@ -17,5 +17,6 @@ CREATE TABLE IF NOT EXISTS unread_messages( user_id TEXT NOT NULL, -- The user for which the message is unread. stream_ordering BIGINT NOT NULL, -- The position of the message in the event stream. room_id TEXT NOT NULL, + event_id TEXT NOT NULL, -- The ID of the message, we need it to handle redactions. UNIQUE (user_id, stream_ordering) ); \ No newline at end of file From e33b8916b6ef3e15462b5248a797fb93142830b2 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 25 Jun 2020 09:47:44 +0100 Subject: [PATCH 03/40] Handle redactions --- synapse/storage/data_stores/main/events.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index 69a9c71cb682..c9dfdf334fc1 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -997,6 +997,8 @@ def _update_metadata_tables_txn( elif event.type == EventTypes.Redaction and event.redacts is not None: # Insert into the redactions table. self._store_redaction(txn, event) + # If the redacted event was unread, revert that. + self._handle_redacted_unread_event_txn(txn, event) elif event.type == EventTypes.Retention: # Update the room_retention table. self._store_retention_policy_for_room_txn(txn, event) @@ -1693,3 +1695,11 @@ def _maybe_insert_unread_event_txn( for user_id in users_in_room ], ) + + def _handle_redacted_unread_event_txn(self, txn: Connection, event: EventBase): + # Redact every row for this event in the unread_messages table. + self.db.simple_delete_txn( + txn=txn, + table="unread_messages", + keyvalues={"event_id": event.redacts} + ) From 0ee19635913e02ad4ac6a12c09609fddd6a7134c Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 25 Jun 2020 10:05:48 +0100 Subject: [PATCH 04/40] Only insert rows for local users Also run the linters --- synapse/storage/data_stores/main/events.py | 11 +++++++---- synapse/storage/data_stores/main/events_worker.py | 6 +----- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index c9dfdf334fc1..1fff5fc5f009 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -1679,6 +1679,11 @@ def _maybe_insert_unread_event_txn( table="room_memberships", keyvalues={"membership": Membership.JOIN, "room_id": event.room_id}, retcol="user_id", + ) # type: list + + # Only insert rows for local users. + local_users_in_room = list( + filter(lambda user_id: self.hs.is_mine_id(user_id), users_in_room) ) # Mark the message as unread for every user currently in the room. @@ -1692,14 +1697,12 @@ def _maybe_insert_unread_event_txn( "room_id": event.room_id, "event_id": event.event_id, } - for user_id in users_in_room + for user_id in local_users_in_room ], ) def _handle_redacted_unread_event_txn(self, txn: Connection, event: EventBase): # Redact every row for this event in the unread_messages table. self.db.simple_delete_txn( - txn=txn, - table="unread_messages", - keyvalues={"event_id": event.redacts} + txn=txn, table="unread_messages", keyvalues={"event_id": event.redacts} ) diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py index 359a6ece3a49..8ace3c9715ef 100644 --- a/synapse/storage/data_stores/main/events_worker.py +++ b/synapse/storage/data_stores/main/events_worker.py @@ -24,7 +24,6 @@ from canonicaljson import json from constantly import NamedConstant, Names -from twisted.enterprise.adbapi import Connection from twisted.internet import defer from synapse.api.constants import EventTypes @@ -1362,10 +1361,7 @@ def get_next_event_to_expire_txn(txn): ) async def get_unread_message_count_for_user( - self, - user_id: str, - room_id: str, - last_read_event_id: str, + self, user_id: str, room_id: str, last_read_event_id: str, ): return await self.db.runInteraction( "get_unread_message_count_for_user", From 9cb93781afcd4ea918bd16d1e0841de0f1580f35 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 25 Jun 2020 11:33:13 +0100 Subject: [PATCH 05/40] Handle lack of read receipt in a room --- synapse/handlers/sync.py | 12 +++---- .../storage/data_stores/main/events_worker.py | 33 ++++++++++++++----- 2 files changed, 29 insertions(+), 16 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 43173fa9ed60..cd4fecb8d06c 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -964,14 +964,10 @@ async def unread_messages_for_room_id( receipt_type="m.read", ) - if last_unread_event_id: - count = await self.store.get_unread_message_count_for_user( - sync_config.user.to_string(), room_id, last_unread_event_id - ) - return count - - # There is no unread message for this user in this room. - return 0 + count = await self.store.get_unread_message_count_for_user( + sync_config.user.to_string(), room_id, last_unread_event_id + ) + return count async def generate_sync_result( self, diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py index 8ace3c9715ef..b960b683127a 100644 --- a/synapse/storage/data_stores/main/events_worker.py +++ b/synapse/storage/data_stores/main/events_worker.py @@ -1376,15 +1376,32 @@ def _get_unread_message_count_for_user_txn( txn: LoggingTransaction, user_id: str, room_id: str, - last_read_event_id: str, + last_read_event_id: Optional[str], ): - # Get the stream ordering for the last read event. - stream_ordering = self.db.simple_select_one_onecol_txn( - txn=txn, - table="events", - keyvalues={"room_id": room_id, "event_id": last_read_event_id}, - retcol="stream_ordering", - ) + if last_read_event_id: + # Get the stream ordering for the last read event. + stream_ordering = self.db.simple_select_one_onecol_txn( + txn=txn, + table="events", + keyvalues={"room_id": room_id, "event_id": last_read_event_id}, + retcol="stream_ordering", + ) + else: + # If there's no read receipt for that room, it probably means the user hasn't + # opened it yet, in which case use the stream ID of their join event. + # We can't just set it to 0 otherwise messages from other local users from + # before this user joined will be counted as well. + txn.execute( + """ + SELECT stream_ordering FROM room_memberships + LEFT JOIN events USING (event_id, room_id) + WHERE membership = 'join' + AND user_id = ? + AND room_id = ? + """, (user_id, room_id) + ) + row = txn.fetchone() + stream_ordering = row[0] # Count the messages that qualify as unread after the stream ordering we've just # retrieved. From 9e723ddc84ce7220b3f565bf95bd5c39473481eb Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 25 Jun 2020 18:14:59 +0100 Subject: [PATCH 06/40] Don't mark an event as unread for its own sender --- synapse/storage/data_stores/main/events.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index 1fff5fc5f009..d02734409fcf 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -1686,7 +1686,9 @@ def _maybe_insert_unread_event_txn( filter(lambda user_id: self.hs.is_mine_id(user_id), users_in_room) ) - # Mark the message as unread for every user currently in the room. + # Mark the message as unread for every user currently in the room, except the + # sender of the event (because even if they haven't sent a read receipt for the + # event, it seems dumb to show it as unread to its sender). self.db.simple_insert_many_txn( txn=txn, table="unread_messages", @@ -1698,6 +1700,7 @@ def _maybe_insert_unread_event_txn( "event_id": event.event_id, } for user_id in local_users_in_room + if user_id != event.sender ], ) From 5399a7bdfa51df70dd7b8ca231ab4d142ee9e16b Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 25 Jun 2020 18:19:58 +0100 Subject: [PATCH 07/40] Run linter --- synapse/storage/data_stores/main/events_worker.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py index b960b683127a..c87c242e41e0 100644 --- a/synapse/storage/data_stores/main/events_worker.py +++ b/synapse/storage/data_stores/main/events_worker.py @@ -1398,7 +1398,8 @@ def _get_unread_message_count_for_user_txn( WHERE membership = 'join' AND user_id = ? AND room_id = ? - """, (user_id, room_id) + """, + (user_id, room_id), ) row = txn.fetchone() stream_ordering = row[0] From 580b499a08202314158d4a4d41494e91655d59a4 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Fri, 26 Jun 2020 12:33:34 +0100 Subject: [PATCH 08/40] Fetch joined users from current_state_events instead of room_memberships --- synapse/storage/data_stores/main/events.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index d02734409fcf..5baf04c7feee 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -1676,9 +1676,9 @@ def _maybe_insert_unread_event_txn( # Get the list of users that are currently joined to the room. users_in_room = self.db.simple_select_onecol_txn( txn=txn, - table="room_memberships", + table="current_state_events", keyvalues={"membership": Membership.JOIN, "room_id": event.room_id}, - retcol="user_id", + retcol="state_key", ) # type: list # Only insert rows for local users. From ff8b39d82a814abebbd9f8daaa32fca2115d005c Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Fri, 26 Jun 2020 17:17:18 +0100 Subject: [PATCH 09/40] Send unread count to the push gateway --- synapse/push/push_tools.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py index 4ea683fee010..84d36fa4d6b8 100644 --- a/synapse/push/push_tools.py +++ b/synapse/push/push_tools.py @@ -32,17 +32,14 @@ def get_badge_count(store, user_id): if room_id in my_receipts_by_room: last_unread_event_id = my_receipts_by_room[room_id] - notifs = yield ( - store.get_unread_event_push_actions_by_room_for_user( - room_id, user_id, last_unread_event_id + unread_count = yield defer.ensureDeferred( + store.get_unread_message_count_for_user( + user_id, room_id, last_unread_event_id ) ) # return one badge count per conversation, as count per # message is so noisy as to be almost useless - # We're populating this badge using the unread_count (instead of the - # notify_count) as this badge is the number of missed messages, not the - # number of missed notifications. - badge += 1 if notifs.get("unread_count") else 0 + badge += 1 if unread_count else 0 return badge From eafdf9f5dead782ad37340684917a76621da07d9 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Fri, 26 Jun 2020 19:24:58 +0100 Subject: [PATCH 10/40] Add a redaction helper for tests --- tests/rest/client/v1/utils.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/tests/rest/client/v1/utils.py b/tests/rest/client/v1/utils.py index 22d734e7630a..dec3b64d52f7 100644 --- a/tests/rest/client/v1/utils.py +++ b/tests/rest/client/v1/utils.py @@ -143,6 +143,28 @@ def send_event( return channel.json_body + def redact( + self, room_id, event_id, txn_id=None, tok=None, expect_code=200 + ): + if txn_id is None: + txn_id = "m%s" % (str(time.time())) + + path = "/_matrix/client/r0/rooms/%s/redact/%s/%s" % (room_id, event_id, txn_id) + if tok: + path = path + "?access_token=%s" % tok + + request, channel = make_request( + self.hs.get_reactor(), "PUT", path, json.dumps({}).encode("utf8") + ) + render(request, self.resource, self.hs.get_reactor()) + + assert int(channel.result["code"]) == expect_code, ( + "Expected: %d, got: %d, resp: %r" + % (expect_code, int(channel.result["code"]), channel.result["body"]) + ) + + return channel.json_body + def _read_write_state( self, room_id: str, From 994f267b50270031f4a48bbeabe3f8cdf7690c8b Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Fri, 26 Jun 2020 19:25:14 +0100 Subject: [PATCH 11/40] Add test case for unread counts --- tests/rest/client/v2_alpha/test_sync.py | 164 +++++++++++++++++++++++- 1 file changed, 162 insertions(+), 2 deletions(-) diff --git a/tests/rest/client/v2_alpha/test_sync.py b/tests/rest/client/v2_alpha/test_sync.py index fa3a3ec1bddd..6f4441c10247 100644 --- a/tests/rest/client/v2_alpha/test_sync.py +++ b/tests/rest/client/v2_alpha/test_sync.py @@ -16,9 +16,9 @@ import json import synapse.rest.admin -from synapse.api.constants import EventContentFields, EventTypes +from synapse.api.constants import EventContentFields, EventTypes, RelationTypes from synapse.rest.client.v1 import login, room -from synapse.rest.client.v2_alpha import sync +from synapse.rest.client.v2_alpha import read_marker, sync from tests import unittest from tests.server import TimedOutException @@ -324,3 +324,163 @@ def test_sync_backwards_typing(self): "GET", sync_url % (access_token, next_batch) ) self.assertRaises(TimedOutException, self.render, request) + + +class UnreadMessagesTestCase(unittest.HomeserverTestCase): + servlets = [ + synapse.rest.admin.register_servlets, + login.register_servlets, + read_marker.register_servlets, + room.register_servlets, + sync.register_servlets, + ] + + def prepare(self, reactor, clock, hs): + self.url = "/sync?since=%s" + self.next_batch = "s0" + + # Register the first user (used to check the unread counts). + self.user_id = self.register_user("kermit", "monkey") + self.tok = self.login("kermit", "monkey") + + # Create the room we'll check unread counts for. + self.room_id = self.helper.create_room_as(self.user_id, tok=self.tok) + + # Register the second user (used to send events to the room). + self.user2 = self.register_user("kermit2", "monkey") + self.tok2 = self.login("kermit2", "monkey") + + # Change the power levels of the room so that the second user can send state + # events. + self.power_levels = { + "users": { + self.user_id: 100, + self.user2: 100, + }, + "users_default": 0, + "events": { + "m.room.name": 50, + "m.room.power_levels": 100, + "m.room.history_visibility": 100, + "m.room.canonical_alias": 50, + "m.room.avatar": 50, + "m.room.tombstone": 100, + "m.room.server_acl": 100, + "m.room.encryption": 100 + }, + "events_default": 0, + "state_default": 50, + "ban": 50, + "kick": 50, + "redact": 50, + "invite": 0 + } + self.helper.send_state( + self.room_id, EventTypes.PowerLevels, self.power_levels, tok=self.tok, + ) + + def test_unread_counts(self): + """Tests that /sync returns the right value for the unread count (MSC2654).""" + + # Check that our own messages don't increase the unread count. + self.helper.send(self.room_id, "hello", tok=self.tok) + self._check_unread_count(0) + + # Join the new user and check that this doesn't increase the unread count. + self.helper.join(room=self.room_id, user=self.user2, tok=self.tok2) + self._check_unread_count(0) + + # Check that the new user sending a message increases our unread count. + res = self.helper.send(self.room_id, "hello", tok=self.tok2) + self._check_unread_count(1) + + # Check that redacting that message decreases our unread count. + self.helper.redact(self.room_id, res["event_id"], tok=self.tok2) + self._check_unread_count(0) + + # Re-send a message to prepare for the next check. + res = self.helper.send(self.room_id, "hello", tok=self.tok2) + self._check_unread_count(1) + + # Send a read receipt to tell the server we've read the latest event. + body = json.dumps({"m.read": res["event_id"]}).encode("utf8") + request, channel = self.make_request( + "POST", "/rooms/%s/read_markers" % self.room_id, body, access_token=self.tok, + ) + self.render(request) + self.assertEqual(channel.code, 200, channel.json_body) + + # Check that the unread counter is back to 0. + self._check_unread_count(0) + + # Check that room name changes increase the unread counter. + self.helper.send_state( + self.room_id, "m.room.name", {"name": "my super room"}, tok=self.tok2, + ) + self._check_unread_count(1) + + # Check that room topic changes increase the unread counter. + self.helper.send_state( + self.room_id, "m.room.topic", {"topic": "welcome!!!"}, tok=self.tok2, + ) + self._check_unread_count(2) + + # Check that encrypted messages increase the unread counter. + self.helper.send_event(self.room_id, EventTypes.Encrypted, {}, tok=self.tok2) + self._check_unread_count(3) + + # Check that custom events with a body increase the unread counter. + self.helper.send_event( + self.room_id, "org.matrix.custom_type", {"body": "hello"}, tok=self.tok2, + ) + self._check_unread_count(4) + + # Check that power level changes increase the unread counter. + self.power_levels["invite"] = 50 + self.helper.send_state( + self.room_id, EventTypes.PowerLevels, self.power_levels, tok=self.tok2, + ) + self._check_unread_count(5) + + # Check that edits don't increase the unread counter. + self.helper.send_event( + room_id=self.room_id, + type=EventTypes.Message, + content={ + "body": "hello", + "msgtype": "m.text", + "m.relates_to": {"rel_type": RelationTypes.REPLACE} + }, + tok=self.tok2, + ) + self._check_unread_count(5) + + # Check that notices don't increase the unread counter. + self.helper.send_event( + room_id=self.room_id, + type=EventTypes.Message, + content={ + "body": "hello", + "msgtype": "m.notice", + }, + tok=self.tok2, + ) + self._check_unread_count(5) + + def _check_unread_count(self, expected_count: True): + """Syncs and compares the unread count with the expected value.""" + + request, channel = self.make_request( + "GET", self.url % self.next_batch, access_token=self.tok, + ) + self.render(request) + + self.assertEqual(channel.code, 200, channel.json_body) + + room_entry = channel.json_body["rooms"]["join"][self.room_id] + self.assertEqual( + room_entry["org.matrix.msc2654.unread_count"], expected_count, room_entry, + ) + + # Store the next batch for the next request. + self.next_batch = channel.json_body["next_batch"] From 00fd9518cd3d18fcd3583786c7eb6e3fabf9e2f2 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Fri, 26 Jun 2020 19:41:51 +0100 Subject: [PATCH 12/40] Run linter on tests --- tests/rest/client/v1/utils.py | 4 +--- tests/rest/client/v2_alpha/test_sync.py | 21 +++++++++------------ 2 files changed, 10 insertions(+), 15 deletions(-) diff --git a/tests/rest/client/v1/utils.py b/tests/rest/client/v1/utils.py index dec3b64d52f7..7f8252330a44 100644 --- a/tests/rest/client/v1/utils.py +++ b/tests/rest/client/v1/utils.py @@ -143,9 +143,7 @@ def send_event( return channel.json_body - def redact( - self, room_id, event_id, txn_id=None, tok=None, expect_code=200 - ): + def redact(self, room_id, event_id, txn_id=None, tok=None, expect_code=200): if txn_id is None: txn_id = "m%s" % (str(time.time())) diff --git a/tests/rest/client/v2_alpha/test_sync.py b/tests/rest/client/v2_alpha/test_sync.py index 6f4441c10247..265f54fbb4ff 100644 --- a/tests/rest/client/v2_alpha/test_sync.py +++ b/tests/rest/client/v2_alpha/test_sync.py @@ -353,10 +353,7 @@ def prepare(self, reactor, clock, hs): # Change the power levels of the room so that the second user can send state # events. self.power_levels = { - "users": { - self.user_id: 100, - self.user2: 100, - }, + "users": {self.user_id: 100, self.user2: 100}, "users_default": 0, "events": { "m.room.name": 50, @@ -366,14 +363,14 @@ def prepare(self, reactor, clock, hs): "m.room.avatar": 50, "m.room.tombstone": 100, "m.room.server_acl": 100, - "m.room.encryption": 100 + "m.room.encryption": 100, }, "events_default": 0, "state_default": 50, "ban": 50, "kick": 50, "redact": 50, - "invite": 0 + "invite": 0, } self.helper.send_state( self.room_id, EventTypes.PowerLevels, self.power_levels, tok=self.tok, @@ -405,7 +402,10 @@ def test_unread_counts(self): # Send a read receipt to tell the server we've read the latest event. body = json.dumps({"m.read": res["event_id"]}).encode("utf8") request, channel = self.make_request( - "POST", "/rooms/%s/read_markers" % self.room_id, body, access_token=self.tok, + "POST", + "/rooms/%s/read_markers" % self.room_id, + body, + access_token=self.tok, ) self.render(request) self.assertEqual(channel.code, 200, channel.json_body) @@ -449,7 +449,7 @@ def test_unread_counts(self): content={ "body": "hello", "msgtype": "m.text", - "m.relates_to": {"rel_type": RelationTypes.REPLACE} + "m.relates_to": {"rel_type": RelationTypes.REPLACE}, }, tok=self.tok2, ) @@ -459,10 +459,7 @@ def test_unread_counts(self): self.helper.send_event( room_id=self.room_id, type=EventTypes.Message, - content={ - "body": "hello", - "msgtype": "m.notice", - }, + content={"body": "hello", "msgtype": "m.notice"}, tok=self.tok2, ) self._check_unread_count(5) From a008f6fc71060fb0253ebf40b1864f4ea2a6579d Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 2 Jul 2020 15:36:11 +0100 Subject: [PATCH 13/40] Implement the latest changes in the MSC --- synapse/storage/data_stores/main/events.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index 5baf04c7feee..b7d51b8229f5 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -66,6 +66,7 @@ EventTypes.PowerLevels, EventTypes.Topic, EventTypes.Name, + EventTypes.RoomAvatar, ] @@ -1632,9 +1633,9 @@ def _maybe_insert_unread_event_txn( """Mark the event as unread for every current member of the room if it passes the conditions for that. - These conditions are: the event must either have a body, be an encrypted message, - or be either a power levels event, a room name event or a room topic event, and - must be neither rejected or soft-failed nor an edit or a notice. + These conditions are: the event must either have a non-empty string body, be an + encrypted message, or be either a power levels event, a room name event or a room + topic event, and must be neither rejected or soft-failed nor an edit or a notice. Args: txn: The transaction to use to retrieve room members and to mark the event @@ -1660,7 +1661,7 @@ def _maybe_insert_unread_event_txn( ): return - body_exists = content.get("body") is not None + body_exists = isinstance(content.get("body"), str) is_state_event_to_mark_unread = ( event.is_state() and event.type in STATE_EVENT_TYPES_TO_MARK_UNREAD ) @@ -1668,8 +1669,8 @@ def _maybe_insert_unread_event_txn( not event.is_state() and event.type == EventTypes.Encrypted ) - # We want to mark unread messages with a body, some state events (power levels, - # room name, room topic) and encrypted messages. + # We want to mark unread messages with a non-empty string body, some state events + # (power levels, room name, room topic, room avatar) and encrypted messages. if not (body_exists or is_state_event_to_mark_unread or is_encrypted_message): return From d980c8699199d1b4c10ff1202b79e66440c181a3 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 6 Jul 2020 12:42:45 +0100 Subject: [PATCH 14/40] Incorporate most review comments --- synapse/storage/data_stores/main/events.py | 135 ++++++------------ .../storage/data_stores/main/events_worker.py | 8 +- .../schema/delta/58/09unread_messages.sql | 10 +- 3 files changed, 54 insertions(+), 99 deletions(-) diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index b7d51b8229f5..7a5e057b0840 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -62,12 +62,48 @@ ["type", "origin_type", "origin_entity"], ) -STATE_EVENT_TYPES_TO_MARK_UNREAD = [ +STATE_EVENT_TYPES_TO_MARK_UNREAD = { EventTypes.PowerLevels, EventTypes.Topic, EventTypes.Name, EventTypes.RoomAvatar, -] +} + + +def count_as_unread(event: EventBase, context: EventContext) -> bool: + # Exclude rejected and soft-failed events. + if context.rejected or event.internal_metadata.is_soft_failed(): + return False + + # Exclude notices. + if ( + not event.is_state() + and event.type == EventTypes.Message + and event.content.get("msgtype") == "m.notice" + ): + return False + + # Exclude edits. + relates_to = event.content.get("m.relates_to", {}) + if ( + relates_to.get("rel_type") == RelationTypes.REPLACE + ): + return False + + # Mark events that have a non-empty string body as unread. + body = event.content.get("body") + if isinstance(body, str) and body: + return True + + # Mark some state events as unread. + if event.is_state() and event.type in STATE_EVENT_TYPES_TO_MARK_UNREAD: + return True + + # Mark encrypted events as unread. + if not event.is_state() and event.type == EventTypes.Encrypted: + return True + + return False def encode_json(json_object): @@ -900,8 +936,9 @@ def event_dict(event): "contains_url": ( "url" in event.content and isinstance(event.content["url"], str) ), + "count_as_unread": count_as_unread(event, context), } - for event, _ in events_and_contexts + for event, context in events_and_contexts ], ) @@ -985,7 +1022,7 @@ def _update_metadata_tables_txn( txn, events=[event for event, _ in events_and_contexts] ) - for event, context in events_and_contexts: + for event, _ in events_and_contexts: if event.type == EventTypes.Name: # Insert into the event_search table. self._store_room_name_txn(txn, event) @@ -998,7 +1035,7 @@ def _update_metadata_tables_txn( elif event.type == EventTypes.Redaction and event.redacts is not None: # Insert into the redactions table. self._store_redaction(txn, event) - # If the redacted event was unread, revert that. + # Prevent the redacted event from counting towards the unread count. self._handle_redacted_unread_event_txn(txn, event) elif event.type == EventTypes.Retention: # Update the room_retention table. @@ -1019,8 +1056,6 @@ def _update_metadata_tables_txn( if isinstance(expiry_ts, int) and not event.is_state(): self._insert_event_expiry_txn(txn, event.event_id, expiry_ts) - self._maybe_insert_unread_event_txn(txn, event, context) - # Insert into the room_memberships table. self._store_room_members_txn( txn, @@ -1627,86 +1662,10 @@ def f(txn, stream_ordering): return stream_ordering - def _maybe_insert_unread_event_txn( - self, txn: Connection, event: EventBase, context: EventContext, - ): - """Mark the event as unread for every current member of the room if it passes the - conditions for that. - - These conditions are: the event must either have a non-empty string body, be an - encrypted message, or be either a power levels event, a room name event or a room - topic event, and must be neither rejected or soft-failed nor an edit or a notice. - - Args: - txn: The transaction to use to retrieve room members and to mark the event - as unread. - event: The event to evaluate and maybe mark as unread. - context: The context in which the event was sent (used to figure out whether - the event has been rejected). - """ - content = event.content - - is_edit = ( - content.get("m.relates_to", {}).get("rel_type") == RelationTypes.REPLACE - ) - is_notice = not event.is_state() and content.get("msgtype") == "m.notice" - - # We don't want rejected or soft-failed events, edits or notices to be marked - # unread. - if ( - context.rejected - or is_edit - or is_notice - or event.internal_metadata.is_soft_failed() - ): - return - - body_exists = isinstance(content.get("body"), str) - is_state_event_to_mark_unread = ( - event.is_state() and event.type in STATE_EVENT_TYPES_TO_MARK_UNREAD - ) - is_encrypted_message = ( - not event.is_state() and event.type == EventTypes.Encrypted - ) - - # We want to mark unread messages with a non-empty string body, some state events - # (power levels, room name, room topic, room avatar) and encrypted messages. - if not (body_exists or is_state_event_to_mark_unread or is_encrypted_message): - return - - # Get the list of users that are currently joined to the room. - users_in_room = self.db.simple_select_onecol_txn( - txn=txn, - table="current_state_events", - keyvalues={"membership": Membership.JOIN, "room_id": event.room_id}, - retcol="state_key", - ) # type: list - - # Only insert rows for local users. - local_users_in_room = list( - filter(lambda user_id: self.hs.is_mine_id(user_id), users_in_room) - ) - - # Mark the message as unread for every user currently in the room, except the - # sender of the event (because even if they haven't sent a read receipt for the - # event, it seems dumb to show it as unread to its sender). - self.db.simple_insert_many_txn( - txn=txn, - table="unread_messages", - values=[ - { - "user_id": user_id, - "stream_ordering": event.internal_metadata.stream_ordering, - "room_id": event.room_id, - "event_id": event.event_id, - } - for user_id in local_users_in_room - if user_id != event.sender - ], - ) - def _handle_redacted_unread_event_txn(self, txn: Connection, event: EventBase): - # Redact every row for this event in the unread_messages table. - self.db.simple_delete_txn( - txn=txn, table="unread_messages", keyvalues={"event_id": event.redacts} + self.db.simple_update_txn( + txn=txn, + table="events", + keyvalues={"event_id": event.redacts}, + updatevalues={"count_as_unread": False}, ) diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py index c87c242e41e0..31c4a5e039e1 100644 --- a/synapse/storage/data_stores/main/events_worker.py +++ b/synapse/storage/data_stores/main/events_worker.py @@ -1393,10 +1393,10 @@ def _get_unread_message_count_for_user_txn( # before this user joined will be counted as well. txn.execute( """ - SELECT stream_ordering FROM room_memberships + SELECT stream_ordering FROM current_state_events LEFT JOIN events USING (event_id, room_id) WHERE membership = 'join' - AND user_id = ? + AND state_key = ? AND room_id = ? """, (user_id, room_id), @@ -1407,8 +1407,8 @@ def _get_unread_message_count_for_user_txn( # Count the messages that qualify as unread after the stream ordering we've just # retrieved. sql = """ - SELECT COUNT(*) FROM unread_messages - WHERE user_id = ? AND room_id = ? AND stream_ordering > ? + SELECT COUNT(*) FROM events + WHERE sender != ? AND room_id = ? AND stream_ordering > ? AND count_as_unread """ txn.execute(sql, (user_id, room_id, stream_ordering)) diff --git a/synapse/storage/data_stores/main/schema/delta/58/09unread_messages.sql b/synapse/storage/data_stores/main/schema/delta/58/09unread_messages.sql index e84f9833d070..a70e8956411d 100644 --- a/synapse/storage/data_stores/main/schema/delta/58/09unread_messages.sql +++ b/synapse/storage/data_stores/main/schema/delta/58/09unread_messages.sql @@ -13,10 +13,6 @@ * limitations under the License. */ -CREATE TABLE IF NOT EXISTS unread_messages( - user_id TEXT NOT NULL, -- The user for which the message is unread. - stream_ordering BIGINT NOT NULL, -- The position of the message in the event stream. - room_id TEXT NOT NULL, - event_id TEXT NOT NULL, -- The ID of the message, we need it to handle redactions. - UNIQUE (user_id, stream_ordering) -); \ No newline at end of file +-- Store a boolean value in the events table for whether the event should be counted in +-- the unread_count property of sync responses. +ALTER TABLE events ADD COLUMN count_as_unread SMALLINT NOT NULL DEFAULT 0; From bf1b1ecbacd91f708dbb3503f75068e80fd8a454 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 6 Jul 2020 14:05:45 +0100 Subject: [PATCH 15/40] Lint --- synapse/storage/data_stores/main/events.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index d636f85222b5..d6fa54603308 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -84,9 +84,7 @@ def count_as_unread(event: EventBase, context: EventContext) -> bool: # Exclude edits. relates_to = event.content.get("m.relates_to", {}) - if ( - relates_to.get("rel_type") == RelationTypes.REPLACE - ): + if (relates_to.get("rel_type") == RelationTypes.REPLACE): return False # Mark events that have a non-empty string body as unread. From e07fef1911515594dab49f7c4935685f51f259e7 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 6 Jul 2020 14:30:48 +0100 Subject: [PATCH 16/40] Lint --- synapse/storage/data_stores/main/events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index d6fa54603308..007603de4ef8 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -84,7 +84,7 @@ def count_as_unread(event: EventBase, context: EventContext) -> bool: # Exclude edits. relates_to = event.content.get("m.relates_to", {}) - if (relates_to.get("rel_type") == RelationTypes.REPLACE): + if relates_to.get("rel_type") == RelationTypes.REPLACE: return False # Mark events that have a non-empty string body as unread. From 8b9d073e620591d47bbb71d717e12736b2bf9ccf Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 6 Jul 2020 15:09:42 +0100 Subject: [PATCH 17/40] Fix column type --- .../data_stores/main/schema/delta/58/09unread_messages.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/data_stores/main/schema/delta/58/09unread_messages.sql b/synapse/storage/data_stores/main/schema/delta/58/09unread_messages.sql index a70e8956411d..ac884e999f70 100644 --- a/synapse/storage/data_stores/main/schema/delta/58/09unread_messages.sql +++ b/synapse/storage/data_stores/main/schema/delta/58/09unread_messages.sql @@ -15,4 +15,4 @@ -- Store a boolean value in the events table for whether the event should be counted in -- the unread_count property of sync responses. -ALTER TABLE events ADD COLUMN count_as_unread SMALLINT NOT NULL DEFAULT 0; +ALTER TABLE events ADD COLUMN count_as_unread BOOLEAN NOT NULL DEFAULT 0; From 287c2630ca406c3e38699f57fce20f013ad36eb1 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 6 Jul 2020 15:16:03 +0100 Subject: [PATCH 18/40] Fix default value --- .../data_stores/main/schema/delta/58/09unread_messages.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/data_stores/main/schema/delta/58/09unread_messages.sql b/synapse/storage/data_stores/main/schema/delta/58/09unread_messages.sql index ac884e999f70..373a0d65e809 100644 --- a/synapse/storage/data_stores/main/schema/delta/58/09unread_messages.sql +++ b/synapse/storage/data_stores/main/schema/delta/58/09unread_messages.sql @@ -15,4 +15,4 @@ -- Store a boolean value in the events table for whether the event should be counted in -- the unread_count property of sync responses. -ALTER TABLE events ADD COLUMN count_as_unread BOOLEAN NOT NULL DEFAULT 0; +ALTER TABLE events ADD COLUMN count_as_unread BOOLEAN NOT NULL DEFAULT FALSE; From b5972a9124e7f55d8a5c1af23524be3c8819a1a9 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 6 Jul 2020 15:48:50 +0100 Subject: [PATCH 19/40] Update port_db's list of bool columns --- scripts/synapse_port_db | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index 2eb795192ff8..0f102d60597f 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -68,7 +68,7 @@ logger = logging.getLogger("synapse_port_db") BOOLEAN_COLUMNS = { - "events": ["processed", "outlier", "contains_url"], + "events": ["processed", "outlier", "contains_url", "count_as_unread"], "rooms": ["is_public"], "event_edges": ["is_state"], "presence_list": ["accepted"], From e9e2d8852649fd0fcf857f39f85af1f6713900ee Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 6 Jul 2020 15:49:37 +0100 Subject: [PATCH 20/40] Add a cache to get_unread_message_count_for_user --- synapse/handlers/sync.py | 8 +------- synapse/replication/slave/storage/receipts.py | 1 + synapse/storage/data_stores/main/events.py | 3 +++ synapse/storage/data_stores/main/events_worker.py | 9 ++++++++- synapse/storage/data_stores/main/receipts.py | 5 +++++ 5 files changed, 18 insertions(+), 8 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index d2c51acfaac9..7deaed0bb5b9 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -958,14 +958,8 @@ async def unread_messages_for_room_id( """Retrieve the count of unread message for the current user in the given room. """ with Measure(self.clock, "unread_messages_for_room_id"): - last_unread_event_id = await self.store.get_last_receipt_event_id_for_user( - user_id=sync_config.user.to_string(), - room_id=room_id, - receipt_type="m.read", - ) - count = await self.store.get_unread_message_count_for_user( - sync_config.user.to_string(), room_id, last_unread_event_id + sync_config.user.to_string(), room_id, ) return count diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py index 6982686eb512..3fbbd88e5b2e 100644 --- a/synapse/replication/slave/storage/receipts.py +++ b/synapse/replication/slave/storage/receipts.py @@ -43,6 +43,7 @@ def invalidate_caches_for_receipt(self, room_id, receipt_type, user_id): ) self._invalidate_get_users_with_receipts_in_room(room_id, receipt_type, user_id) self.get_receipts_for_room.invalidate((room_id, receipt_type)) + self.get_unread_message_count_for_user.invalidate((user_id, room_id)) def process_replication_rows(self, stream_name, instance_name, token, rows): if stream_name == ReceiptsStream.NAME: diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index 007603de4ef8..0a9002fc994d 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -489,6 +489,9 @@ def _persist_events_txn( backfilled=backfilled, ) + for event, _ in events_and_contexts: + self.get_unread_message_count_for_user.invalidate((event.room_id,)) + # We call this last as it assumes we've inserted the events into # room_memberships, where applicable. self._update_current_state_txn(txn, state_delta_for_room, min_stream_order) diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py index be83388400d6..477701abfebd 100644 --- a/synapse/storage/data_stores/main/events_worker.py +++ b/synapse/storage/data_stores/main/events_worker.py @@ -1362,9 +1362,16 @@ def get_next_event_to_expire_txn(txn): desc="get_next_event_to_expire", func=get_next_event_to_expire_txn ) + @cached(tree=True) async def get_unread_message_count_for_user( - self, user_id: str, room_id: str, last_read_event_id: str, + self, room_id: str, user_id: str, ): + last_read_event_id = await self.get_last_receipt_event_id_for_user( + user_id=user_id, + room_id=room_id, + receipt_type="m.read", + ) + return await self.db.runInteraction( "get_unread_message_count_for_user", self._get_unread_message_count_for_user_txn, diff --git a/synapse/storage/data_stores/main/receipts.py b/synapse/storage/data_stores/main/receipts.py index 8f5505bd674f..ba46ac368106 100644 --- a/synapse/storage/data_stores/main/receipts.py +++ b/synapse/storage/data_stores/main/receipts.py @@ -446,6 +446,11 @@ def insert_linearized_receipt_txn( (user_id, room_id, receipt_type), ) + txn.call_after( + self.get_unread_message_count_for_user.invalidate, + (room_id, user_id), + ) + self.db.simple_upsert_txn( txn, table="receipts_linearized", From bc44ad437ccd35399d939c92af988a0b32208c97 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 6 Jul 2020 17:14:25 +0100 Subject: [PATCH 21/40] Fix function call and cache invalidation --- synapse/handlers/sync.py | 2 +- synapse/storage/data_stores/main/events.py | 2 +- synapse/storage/data_stores/main/events_worker.py | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 7deaed0bb5b9..57bd30a9b639 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -959,7 +959,7 @@ async def unread_messages_for_room_id( """ with Measure(self.clock, "unread_messages_for_room_id"): count = await self.store.get_unread_message_count_for_user( - sync_config.user.to_string(), room_id, + room_id, sync_config.user.to_string(), ) return count diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index 0a9002fc994d..0f744acd55e6 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -490,7 +490,7 @@ def _persist_events_txn( ) for event, _ in events_and_contexts: - self.get_unread_message_count_for_user.invalidate((event.room_id,)) + self.store.get_unread_message_count_for_user.invalidate_many((event.room_id,)) # We call this last as it assumes we've inserted the events into # room_memberships, where applicable. diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py index 477701abfebd..24a988bbfc82 100644 --- a/synapse/storage/data_stores/main/events_worker.py +++ b/synapse/storage/data_stores/main/events_worker.py @@ -1402,10 +1402,10 @@ def _get_unread_message_count_for_user_txn( # before this user joined will be counted as well. txn.execute( """ - SELECT stream_ordering FROM current_state_events + SELECT stream_ordering FROM local_current_membership LEFT JOIN events USING (event_id, room_id) WHERE membership = 'join' - AND state_key = ? + AND user_id = ? AND room_id = ? """, (user_id, room_id), From 9b9f6e207cbcbba63f03fb677c6b4fc0e4bd29e8 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 6 Jul 2020 17:24:18 +0100 Subject: [PATCH 22/40] Lint --- synapse/replication/slave/storage/receipts.py | 1 - synapse/storage/data_stores/main/events.py | 4 +++- synapse/storage/data_stores/main/events_worker.py | 4 +--- synapse/storage/data_stores/main/receipts.py | 3 +-- 4 files changed, 5 insertions(+), 7 deletions(-) diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py index 3fbbd88e5b2e..6982686eb512 100644 --- a/synapse/replication/slave/storage/receipts.py +++ b/synapse/replication/slave/storage/receipts.py @@ -43,7 +43,6 @@ def invalidate_caches_for_receipt(self, room_id, receipt_type, user_id): ) self._invalidate_get_users_with_receipts_in_room(room_id, receipt_type, user_id) self.get_receipts_for_room.invalidate((room_id, receipt_type)) - self.get_unread_message_count_for_user.invalidate((user_id, room_id)) def process_replication_rows(self, stream_name, instance_name, token, rows): if stream_name == ReceiptsStream.NAME: diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index 0f744acd55e6..d6050c663a0c 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -490,7 +490,9 @@ def _persist_events_txn( ) for event, _ in events_and_contexts: - self.store.get_unread_message_count_for_user.invalidate_many((event.room_id,)) + self.store.get_unread_message_count_for_user.invalidate_many( + (event.room_id,), + ) # We call this last as it assumes we've inserted the events into # room_memberships, where applicable. diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py index 24a988bbfc82..b01eaa56bc54 100644 --- a/synapse/storage/data_stores/main/events_worker.py +++ b/synapse/storage/data_stores/main/events_worker.py @@ -1367,9 +1367,7 @@ async def get_unread_message_count_for_user( self, room_id: str, user_id: str, ): last_read_event_id = await self.get_last_receipt_event_id_for_user( - user_id=user_id, - room_id=room_id, - receipt_type="m.read", + user_id=user_id, room_id=room_id, receipt_type="m.read", ) return await self.db.runInteraction( diff --git a/synapse/storage/data_stores/main/receipts.py b/synapse/storage/data_stores/main/receipts.py index ba46ac368106..a076357ee4fd 100644 --- a/synapse/storage/data_stores/main/receipts.py +++ b/synapse/storage/data_stores/main/receipts.py @@ -447,8 +447,7 @@ def insert_linearized_receipt_txn( ) txn.call_after( - self.get_unread_message_count_for_user.invalidate, - (room_id, user_id), + self.get_unread_message_count_for_user.invalidate, (room_id, user_id), ) self.db.simple_upsert_txn( From 881fed372790bcc7a4ba704204dc3c5c9ee8058d Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 6 Jul 2020 17:49:10 +0100 Subject: [PATCH 23/40] Invalidate the cache in the main thread --- synapse/storage/data_stores/main/events.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index d6050c663a0c..aa622e392166 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -250,6 +250,11 @@ def _persist_events_and_state_updates( ) persist_event_counter.inc(len(events_and_contexts)) + for event, _ in events_and_contexts: + self.store.get_unread_message_count_for_user.invalidate_many( + (event.room_id,), + ) + if not backfilled: # backfilled events have negative stream orderings, so we don't # want to set the event_persisted_position to that. @@ -489,11 +494,6 @@ def _persist_events_txn( backfilled=backfilled, ) - for event, _ in events_and_contexts: - self.store.get_unread_message_count_for_user.invalidate_many( - (event.room_id,), - ) - # We call this last as it assumes we've inserted the events into # room_memberships, where applicable. self._update_current_state_txn(txn, state_delta_for_room, min_stream_order) From 52ddc4dd83908a6e0cdd217dd6da4867fe1c32bf Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 6 Jul 2020 18:28:58 +0100 Subject: [PATCH 24/40] Fix push badge computation --- synapse/push/push_tools.py | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py index 84d36fa4d6b8..8902ba9de32e 100644 --- a/synapse/push/push_tools.py +++ b/synapse/push/push_tools.py @@ -24,22 +24,17 @@ def get_badge_count(store, user_id): invites = yield store.get_invited_rooms_for_local_user(user_id) joins = yield store.get_rooms_for_user(user_id) - my_receipts_by_room = yield store.get_receipts_for_user(user_id, "m.read") - badge = len(invites) for room_id in joins: - if room_id in my_receipts_by_room: - last_unread_event_id = my_receipts_by_room[room_id] - - unread_count = yield defer.ensureDeferred( - store.get_unread_message_count_for_user( - user_id, room_id, last_unread_event_id - ) + unread_count = yield defer.ensureDeferred( + store.get_unread_message_count_for_user( + room_id, user_id, ) - # return one badge count per conversation, as count per - # message is so noisy as to be almost useless - badge += 1 if unread_count else 0 + ) + # return one badge count per conversation, as count per + # message is so noisy as to be almost useless + badge += 1 if unread_count else 0 return badge From ea01eff7ece1b302d2fef2ff407ee3ccfcca0362 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 6 Jul 2020 18:33:10 +0100 Subject: [PATCH 25/40] Lint --- synapse/push/push_tools.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py index 8902ba9de32e..044b377973ab 100644 --- a/synapse/push/push_tools.py +++ b/synapse/push/push_tools.py @@ -28,9 +28,7 @@ def get_badge_count(store, user_id): for room_id in joins: unread_count = yield defer.ensureDeferred( - store.get_unread_message_count_for_user( - room_id, user_id, - ) + store.get_unread_message_count_for_user(room_id, user_id) ) # return one badge count per conversation, as count per # message is so noisy as to be almost useless From aa0a56ba6af0d5bf7d70a41bc301abdbbdd380ea Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 8 Jul 2020 10:42:58 +0100 Subject: [PATCH 26/40] Use invalidate_cache_and_stream to invalidate the cache across workers --- synapse/storage/data_stores/main/receipts.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/synapse/storage/data_stores/main/receipts.py b/synapse/storage/data_stores/main/receipts.py index a076357ee4fd..7295cb8bfdf8 100644 --- a/synapse/storage/data_stores/main/receipts.py +++ b/synapse/storage/data_stores/main/receipts.py @@ -446,10 +446,6 @@ def insert_linearized_receipt_txn( (user_id, room_id, receipt_type), ) - txn.call_after( - self.get_unread_message_count_for_user.invalidate, (room_id, user_id), - ) - self.db.simple_upsert_txn( txn, table="receipts_linearized", @@ -527,6 +523,12 @@ def graph_to_linear(txn): stream_id=stream_id, ) + yield defer.ensureDeferred( + self.invalidate_cache_and_stream( + "get_unread_message_count_for_user", (room_id, user_id), + ) + ) + if event_ts is None: return None From 420573a8ea744f42b3a111d406e254ea476a4b2d Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 8 Jul 2020 14:55:35 +0100 Subject: [PATCH 27/40] Process the cache stream first for incoming replication Receiving new data about e.g. events, read receipts, etc., is likely to result in some database actions. An example of that is that we send a push on new read receipts, which involves calculating a badge from unread counts. Because of this, we probably want to process cache updates/invalidations first so these actions are not using outdated cache data. In the previous example, we want to make sure we invalidate the cache on calculating the unread count for the user/room the read receipt is for before sending a push. --- synapse/replication/tcp/streams/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py index d1a61c331480..34db0ff298e9 100644 --- a/synapse/replication/tcp/streams/__init__.py +++ b/synapse/replication/tcp/streams/__init__.py @@ -48,6 +48,7 @@ STREAMS_MAP = { stream.NAME: stream for stream in ( + CachesStream, EventsStream, BackfillStream, PresenceStream, @@ -55,7 +56,6 @@ ReceiptsStream, PushRulesStream, PushersStream, - CachesStream, PublicRoomsStream, DeviceListsStream, ToDeviceStream, From 79464f8784deeb7f71c0ea4fe1f9d80c47a8812e Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 8 Jul 2020 17:28:38 +0100 Subject: [PATCH 28/40] Fix receipts replication test --- tests/replication/tcp/streams/test_receipts.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/tests/replication/tcp/streams/test_receipts.py b/tests/replication/tcp/streams/test_receipts.py index 56b062ecc1d6..b36c4d5867df 100644 --- a/tests/replication/tcp/streams/test_receipts.py +++ b/tests/replication/tcp/streams/test_receipts.py @@ -39,9 +39,13 @@ def test_receipt(self): ) self.replicate() - # there should be one RDATA command - self.test_handler.on_rdata.assert_called_once() - stream_name, _, token, rdata_rows = self.test_handler.on_rdata.call_args[0] + # there should be two RDATA commands, one for invalidating the unread counts + # cache in sync responses and pushes, and one for the actual read receipt. + self.assertEqual(self.test_handler.on_rdata.call_count, 2) + + # The first call will be the cache invalidation, so we ignore it. + stream_name, _, token, rdata_rows = self.test_handler.on_rdata.call_args[1] + self.assertEqual(stream_name, "receipts") self.assertEqual(1, len(rdata_rows)) row = rdata_rows[0] # type: ReceiptsStream.ReceiptsStreamRow From 071007891707537c52501b3efa0a9d51ee7fe1a1 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 20 Jul 2020 17:20:18 +0100 Subject: [PATCH 29/40] Revert "Fix receipts replication test" This reverts commit 79464f8784deeb7f71c0ea4fe1f9d80c47a8812e. --- tests/replication/tcp/streams/test_receipts.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/tests/replication/tcp/streams/test_receipts.py b/tests/replication/tcp/streams/test_receipts.py index b36c4d5867df..56b062ecc1d6 100644 --- a/tests/replication/tcp/streams/test_receipts.py +++ b/tests/replication/tcp/streams/test_receipts.py @@ -39,13 +39,9 @@ def test_receipt(self): ) self.replicate() - # there should be two RDATA commands, one for invalidating the unread counts - # cache in sync responses and pushes, and one for the actual read receipt. - self.assertEqual(self.test_handler.on_rdata.call_count, 2) - - # The first call will be the cache invalidation, so we ignore it. - stream_name, _, token, rdata_rows = self.test_handler.on_rdata.call_args[1] - + # there should be one RDATA command + self.test_handler.on_rdata.assert_called_once() + stream_name, _, token, rdata_rows = self.test_handler.on_rdata.call_args[0] self.assertEqual(stream_name, "receipts") self.assertEqual(1, len(rdata_rows)) row = rdata_rows[0] # type: ReceiptsStream.ReceiptsStreamRow From ea10fc17a42104b82a3afd7d24f115a54fa4d7e3 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Fri, 24 Jul 2020 14:57:15 +0100 Subject: [PATCH 30/40] Revert "Process the cache stream first for incoming replication" This reverts commit 420573a8ea744f42b3a111d406e254ea476a4b2d. --- synapse/replication/tcp/streams/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py index 34db0ff298e9..d1a61c331480 100644 --- a/synapse/replication/tcp/streams/__init__.py +++ b/synapse/replication/tcp/streams/__init__.py @@ -48,7 +48,6 @@ STREAMS_MAP = { stream.NAME: stream for stream in ( - CachesStream, EventsStream, BackfillStream, PresenceStream, @@ -56,6 +55,7 @@ ReceiptsStream, PushRulesStream, PushersStream, + CachesStream, PublicRoomsStream, DeviceListsStream, ToDeviceStream, From 0ea07925d8a75069ba4cf1da9a6c207f5032a8e6 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Fri, 24 Jul 2020 14:57:37 +0100 Subject: [PATCH 31/40] Revert "Use invalidate_cache_and_stream to invalidate the cache across workers" This reverts commit aa0a56ba6af0d5bf7d70a41bc301abdbbdd380ea. --- synapse/storage/data_stores/main/receipts.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/synapse/storage/data_stores/main/receipts.py b/synapse/storage/data_stores/main/receipts.py index 221a644f230e..db6d903963fa 100644 --- a/synapse/storage/data_stores/main/receipts.py +++ b/synapse/storage/data_stores/main/receipts.py @@ -446,6 +446,10 @@ def insert_linearized_receipt_txn( (user_id, room_id, receipt_type), ) + txn.call_after( + self.get_unread_message_count_for_user.invalidate, (room_id, user_id), + ) + self.db.simple_upsert_txn( txn, table="receipts_linearized", @@ -523,12 +527,6 @@ def graph_to_linear(txn): stream_id=stream_id, ) - yield defer.ensureDeferred( - self.invalidate_cache_and_stream( - "get_unread_message_count_for_user", (room_id, user_id), - ) - ) - if event_ts is None: return None From 096aca9d9833c73cf244f8edf3a3b81b267b4893 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Fri, 24 Jul 2020 15:25:22 +0100 Subject: [PATCH 32/40] Fix read receipt cache invalidation (hopefully) --- synapse/storage/data_stores/main/events_worker.py | 12 +++++++++--- synapse/storage/data_stores/main/receipts.py | 4 ---- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py index 0980dde6a376..383ee33bb188 100644 --- a/synapse/storage/data_stores/main/events_worker.py +++ b/synapse/storage/data_stores/main/events_worker.py @@ -43,7 +43,12 @@ from synapse.storage.database import Database, LoggingTransaction from synapse.storage.util.id_generators import StreamIdGenerator from synapse.types import get_domain_from_id -from synapse.util.caches.descriptors import Cache, cached, cachedInlineCallbacks +from synapse.util.caches.descriptors import ( + _CacheContext, + Cache, + cached, + cachedInlineCallbacks, +) from synapse.util.iterutils import batch_iter from synapse.util.metrics import Measure @@ -1358,12 +1363,13 @@ def get_next_event_to_expire_txn(txn): desc="get_next_event_to_expire", func=get_next_event_to_expire_txn ) - @cached(tree=True) + @cached(tree=True, cache_context=True) async def get_unread_message_count_for_user( - self, room_id: str, user_id: str, + self, room_id: str, user_id: str, cache_context: _CacheContext, ): last_read_event_id = await self.get_last_receipt_event_id_for_user( user_id=user_id, room_id=room_id, receipt_type="m.read", + on_invalidate=cache_context.invalidate, ) return await self.db.runInteraction( diff --git a/synapse/storage/data_stores/main/receipts.py b/synapse/storage/data_stores/main/receipts.py index db6d903963fa..1d723f2d347e 100644 --- a/synapse/storage/data_stores/main/receipts.py +++ b/synapse/storage/data_stores/main/receipts.py @@ -446,10 +446,6 @@ def insert_linearized_receipt_txn( (user_id, room_id, receipt_type), ) - txn.call_after( - self.get_unread_message_count_for_user.invalidate, (room_id, user_id), - ) - self.db.simple_upsert_txn( txn, table="receipts_linearized", From 28f3e2bec2964fac0787beb2c113ddca257c81b6 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Fri, 24 Jul 2020 15:29:46 +0100 Subject: [PATCH 33/40] Lint --- synapse/storage/data_stores/main/events_worker.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py index 383ee33bb188..b97dfd93133d 100644 --- a/synapse/storage/data_stores/main/events_worker.py +++ b/synapse/storage/data_stores/main/events_worker.py @@ -44,8 +44,8 @@ from synapse.storage.util.id_generators import StreamIdGenerator from synapse.types import get_domain_from_id from synapse.util.caches.descriptors import ( - _CacheContext, Cache, + _CacheContext, cached, cachedInlineCallbacks, ) @@ -1368,7 +1368,9 @@ async def get_unread_message_count_for_user( self, room_id: str, user_id: str, cache_context: _CacheContext, ): last_read_event_id = await self.get_last_receipt_event_id_for_user( - user_id=user_id, room_id=room_id, receipt_type="m.read", + user_id=user_id, + room_id=room_id, + receipt_type="m.read", on_invalidate=cache_context.invalidate, ) From d40557bdf809f310eeee61a89d3689fd08ddf927 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 27 Jul 2020 16:41:33 +0100 Subject: [PATCH 34/40] Incorporate review --- synapse/handlers/sync.py | 15 ++---- synapse/push/push_tools.py | 4 +- synapse/storage/data_stores/main/cache.py | 1 + synapse/storage/data_stores/main/events.py | 19 +------ .../storage/data_stores/main/events_worker.py | 49 ++++++++++++------- ...ead_messages.sql => 12unread_messages.sql} | 0 6 files changed, 39 insertions(+), 49 deletions(-) rename synapse/storage/data_stores/main/schema/delta/58/{09unread_messages.sql => 12unread_messages.sql} (100%) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index dbc4309ea3e5..eaa4eeadf744 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -949,17 +949,6 @@ async def unread_notifs_for_room_id( # count is whatever it was last time. return None - async def unread_messages_for_room_id( - self, room_id: str, sync_config: SyncConfig, - ) -> int: - """Retrieve the count of unread message for the current user in the given room. - """ - with Measure(self.clock, "unread_messages_for_room_id"): - count = await self.store.get_unread_message_count_for_user( - room_id, sync_config.user.to_string(), - ) - return count - async def generate_sync_result( self, sync_config: SyncConfig, @@ -1899,7 +1888,9 @@ async def _generate_room_entry( if room_builder.rtype == "joined": unread_notifications = {} # type: Dict[str, str] - unread_count = await self.unread_messages_for_room_id(room_id, sync_config) + unread_count = await self.store.get_unread_message_count_for_user( + room_id, sync_config.user.to_string(), + ) room_sync = JoinedSyncResult( room_id=room_id, timeline=batch, diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py index 044b377973ab..b62262401ef9 100644 --- a/synapse/push/push_tools.py +++ b/synapse/push/push_tools.py @@ -27,9 +27,7 @@ def get_badge_count(store, user_id): badge = len(invites) for room_id in joins: - unread_count = yield defer.ensureDeferred( - store.get_unread_message_count_for_user(room_id, user_id) - ) + unread_count = yield store.get_unread_message_count_for_user(room_id, user_id) # return one badge count per conversation, as count per # message is so noisy as to be almost useless badge += 1 if unread_count else 0 diff --git a/synapse/storage/data_stores/main/cache.py b/synapse/storage/data_stores/main/cache.py index f39f556c2098..edc3624fed6a 100644 --- a/synapse/storage/data_stores/main/cache.py +++ b/synapse/storage/data_stores/main/cache.py @@ -172,6 +172,7 @@ def _invalidate_caches_for_event( self.get_latest_event_ids_in_room.invalidate((room_id,)) + self.get_unread_message_count_for_user.invalidate_many((room_id,)) self.get_unread_event_push_actions_by_room_for_user.invalidate_many((room_id,)) if not backfilled: diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index d7f1a68f443e..41b9c8dc7909 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -62,7 +62,7 @@ } -def count_as_unread(event: EventBase, context: EventContext) -> bool: +def should_count_as_unread(event: EventBase, context: EventContext) -> bool: # Exclude rejected and soft-failed events. if context.rejected or event.internal_metadata.is_soft_failed(): return False @@ -218,11 +218,6 @@ def _persist_events_and_state_updates( ) persist_event_counter.inc(len(events_and_contexts)) - for event, _ in events_and_contexts: - self.store.get_unread_message_count_for_user.invalidate_many( - (event.room_id,), - ) - if not backfilled: # backfilled events have negative stream orderings, so we don't # want to set the event_persisted_position to that. @@ -864,7 +859,7 @@ def event_dict(event): "contains_url": ( "url" in event.content and isinstance(event.content["url"], str) ), - "count_as_unread": count_as_unread(event, context), + "count_as_unread": should_count_as_unread(event, context), } for event, context in events_and_contexts ], @@ -963,8 +958,6 @@ def _update_metadata_tables_txn( elif event.type == EventTypes.Redaction and event.redacts is not None: # Insert into the redactions table. self._store_redaction(txn, event) - # Prevent the redacted event from counting towards the unread count. - self._handle_redacted_unread_event_txn(txn, event) elif event.type == EventTypes.Retention: # Update the room_retention table. self._store_retention_policy_for_room_txn(txn, event) @@ -1523,11 +1516,3 @@ def _update_backward_extremeties(self, txn, events): if not ev.internal_metadata.is_outlier() ], ) - - def _handle_redacted_unread_event_txn(self, txn: Connection, event: EventBase): - self.db.simple_update_txn( - txn=txn, - table="events", - keyvalues={"event_id": event.redacts}, - updatevalues={"count_as_unread": False}, - ) diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py index b97dfd93133d..b03b25963691 100644 --- a/synapse/storage/data_stores/main/events_worker.py +++ b/synapse/storage/data_stores/main/events_worker.py @@ -40,7 +40,8 @@ from synapse.replication.tcp.streams import BackfillStream from synapse.replication.tcp.streams.events import EventsStream from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause -from synapse.storage.database import Database, LoggingTransaction +from synapse.storage.database import Database +from synapse.storage.types import Cursor from synapse.storage.util.id_generators import StreamIdGenerator from synapse.types import get_domain_from_id from synapse.util.caches.descriptors import ( @@ -1366,29 +1367,39 @@ def get_next_event_to_expire_txn(txn): @cached(tree=True, cache_context=True) async def get_unread_message_count_for_user( self, room_id: str, user_id: str, cache_context: _CacheContext, - ): - last_read_event_id = await self.get_last_receipt_event_id_for_user( - user_id=user_id, - room_id=room_id, - receipt_type="m.read", - on_invalidate=cache_context.invalidate, - ) + ) -> int: + """Retrieve the count of unread messages for the given room and user. - return await self.db.runInteraction( - "get_unread_message_count_for_user", - self._get_unread_message_count_for_user_txn, - user_id, - room_id, - last_read_event_id, - ) + Args: + room_id: The ID of the room to count unread messages in. + user_id: The ID of the user to count unread messages for. + + Returns: + The number of unread messages for the given user in the given room. + """ + with Measure(self._clock, "get_unread_message_count_for_user"): + last_read_event_id = await self.get_last_receipt_event_id_for_user( + user_id=user_id, + room_id=room_id, + receipt_type="m.read", + on_invalidate=cache_context.invalidate, + ) + + return await self.db.runInteraction( + "get_unread_message_count_for_user", + self._get_unread_message_count_for_user_txn, + user_id, + room_id, + last_read_event_id, + ) def _get_unread_message_count_for_user_txn( self, - txn: LoggingTransaction, + txn: Cursor, user_id: str, room_id: str, last_read_event_id: Optional[str], - ): + ) -> int: if last_read_event_id: # Get the stream ordering for the last read event. stream_ordering = self.db.simple_select_one_onecol_txn( @@ -1413,6 +1424,10 @@ def _get_unread_message_count_for_user_txn( (user_id, room_id), ) row = txn.fetchone() + + if row is None: + return 0 + stream_ordering = row[0] # Count the messages that qualify as unread after the stream ordering we've just diff --git a/synapse/storage/data_stores/main/schema/delta/58/09unread_messages.sql b/synapse/storage/data_stores/main/schema/delta/58/12unread_messages.sql similarity index 100% rename from synapse/storage/data_stores/main/schema/delta/58/09unread_messages.sql rename to synapse/storage/data_stores/main/schema/delta/58/12unread_messages.sql From 5d3aaf75fcc7901394702e6160a86ad730fd2088 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 27 Jul 2020 16:45:26 +0100 Subject: [PATCH 35/40] Fix unread messages test --- synapse/storage/data_stores/main/events.py | 4 ++++ tests/rest/client/v2_alpha/test_sync.py | 8 -------- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index 41b9c8dc7909..dcb4e6f154d3 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -238,6 +238,10 @@ def _persist_events_and_state_updates( event_counter.labels(event.type, origin_type, origin_entity).inc() + self.store.get_unread_message_count_for_user.invalidate_many( + (event.room_id,), + ) + for room_id, new_state in current_state_for_room.items(): self.store.get_current_state_ids.prefill((room_id,), new_state) diff --git a/tests/rest/client/v2_alpha/test_sync.py b/tests/rest/client/v2_alpha/test_sync.py index 265f54fbb4ff..12db997d5846 100644 --- a/tests/rest/client/v2_alpha/test_sync.py +++ b/tests/rest/client/v2_alpha/test_sync.py @@ -391,14 +391,6 @@ def test_unread_counts(self): res = self.helper.send(self.room_id, "hello", tok=self.tok2) self._check_unread_count(1) - # Check that redacting that message decreases our unread count. - self.helper.redact(self.room_id, res["event_id"], tok=self.tok2) - self._check_unread_count(0) - - # Re-send a message to prepare for the next check. - res = self.helper.send(self.room_id, "hello", tok=self.tok2) - self._check_unread_count(1) - # Send a read receipt to tell the server we've read the latest event. body = json.dumps({"m.read": res["event_id"]}).encode("utf8") request, channel = self.make_request( From 6f50007122e386cdf8ea1aa8e346a6a5094216f5 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 27 Jul 2020 16:48:58 +0100 Subject: [PATCH 36/40] Lint --- synapse/storage/data_stores/main/events.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index dcb4e6f154d3..2d7c3bff03cf 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -22,7 +22,6 @@ import attr from prometheus_client import Counter -from twisted.enterprise.adbapi import Connection from twisted.internet import defer import synapse.metrics From debedb335f78b661f82ebc2c3ebb88c2f2803dd3 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 29 Jul 2020 11:02:41 +0100 Subject: [PATCH 37/40] Remove default value for count_as_unread So we don't end up rewriting the whole events table when running postgres < 11. --- .../data_stores/main/schema/delta/58/12unread_messages.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/data_stores/main/schema/delta/58/12unread_messages.sql b/synapse/storage/data_stores/main/schema/delta/58/12unread_messages.sql index 373a0d65e809..531b532c7387 100644 --- a/synapse/storage/data_stores/main/schema/delta/58/12unread_messages.sql +++ b/synapse/storage/data_stores/main/schema/delta/58/12unread_messages.sql @@ -15,4 +15,4 @@ -- Store a boolean value in the events table for whether the event should be counted in -- the unread_count property of sync responses. -ALTER TABLE events ADD COLUMN count_as_unread BOOLEAN NOT NULL DEFAULT FALSE; +ALTER TABLE events ADD COLUMN count_as_unread BOOLEAN; From 82d9f39b91e93ceefc24a96c45b81367207e0de1 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 29 Jul 2020 16:30:29 +0100 Subject: [PATCH 38/40] Match the latest changes on the MSC --- synapse/storage/data_stores/main/events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index 2d7c3bff03cf..0c9c02afa181 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -54,10 +54,10 @@ ) STATE_EVENT_TYPES_TO_MARK_UNREAD = { - EventTypes.PowerLevels, EventTypes.Topic, EventTypes.Name, EventTypes.RoomAvatar, + EventTypes.Tombstone, } From 17e922a403bcd6dcf615dc6cccdfa2605d015445 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 29 Jul 2020 17:36:57 +0100 Subject: [PATCH 39/40] Fix test --- tests/rest/client/v2_alpha/test_sync.py | 63 +++++++++++++------------ 1 file changed, 34 insertions(+), 29 deletions(-) diff --git a/tests/rest/client/v2_alpha/test_sync.py b/tests/rest/client/v2_alpha/test_sync.py index 12db997d5846..d05379e1d087 100644 --- a/tests/rest/client/v2_alpha/test_sync.py +++ b/tests/rest/client/v2_alpha/test_sync.py @@ -352,28 +352,30 @@ def prepare(self, reactor, clock, hs): # Change the power levels of the room so that the second user can send state # events. - self.power_levels = { - "users": {self.user_id: 100, self.user2: 100}, - "users_default": 0, - "events": { - "m.room.name": 50, - "m.room.power_levels": 100, - "m.room.history_visibility": 100, - "m.room.canonical_alias": 50, - "m.room.avatar": 50, - "m.room.tombstone": 100, - "m.room.server_acl": 100, - "m.room.encryption": 100, - }, - "events_default": 0, - "state_default": 50, - "ban": 50, - "kick": 50, - "redact": 50, - "invite": 0, - } self.helper.send_state( - self.room_id, EventTypes.PowerLevels, self.power_levels, tok=self.tok, + self.room_id, + EventTypes.PowerLevels, + { + "users": {self.user_id: 100, self.user2: 100}, + "users_default": 0, + "events": { + "m.room.name": 50, + "m.room.power_levels": 100, + "m.room.history_visibility": 100, + "m.room.canonical_alias": 50, + "m.room.avatar": 50, + "m.room.tombstone": 100, + "m.room.server_acl": 100, + "m.room.encryption": 100, + }, + "events_default": 0, + "state_default": 50, + "ban": 50, + "kick": 50, + "redact": 50, + "invite": 0, + }, + tok=self.tok, ) def test_unread_counts(self): @@ -427,13 +429,6 @@ def test_unread_counts(self): ) self._check_unread_count(4) - # Check that power level changes increase the unread counter. - self.power_levels["invite"] = 50 - self.helper.send_state( - self.room_id, EventTypes.PowerLevels, self.power_levels, tok=self.tok2, - ) - self._check_unread_count(5) - # Check that edits don't increase the unread counter. self.helper.send_event( room_id=self.room_id, @@ -445,7 +440,7 @@ def test_unread_counts(self): }, tok=self.tok2, ) - self._check_unread_count(5) + self._check_unread_count(4) # Check that notices don't increase the unread counter. self.helper.send_event( @@ -454,6 +449,16 @@ def test_unread_counts(self): content={"body": "hello", "msgtype": "m.notice"}, tok=self.tok2, ) + self._check_unread_count(4) + + # Check that tombstone events changes increase the unread counter. + self.power_levels["invite"] = 50 + self.helper.send_state( + self.room_id, + EventTypes.Tombstone, + {"replacement_room": "!someroom:test"}, + tok=self.tok2, + ) self._check_unread_count(5) def _check_unread_count(self, expected_count: True): From d7fdc350c4c03fd4ff8fafd46f5935061abcf8e6 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 29 Jul 2020 17:52:18 +0100 Subject: [PATCH 40/40] Typo --- tests/rest/client/v2_alpha/test_sync.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/rest/client/v2_alpha/test_sync.py b/tests/rest/client/v2_alpha/test_sync.py index d05379e1d087..a31e44c97e15 100644 --- a/tests/rest/client/v2_alpha/test_sync.py +++ b/tests/rest/client/v2_alpha/test_sync.py @@ -452,7 +452,6 @@ def test_unread_counts(self): self._check_unread_count(4) # Check that tombstone events changes increase the unread counter. - self.power_levels["invite"] = 50 self.helper.send_state( self.room_id, EventTypes.Tombstone,