Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Track notification counts per thread (implement MSC3773) #13181

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
2c7a568
Extract the thread ID when processing push rules.
clokep Jun 14, 2022
dfd921d
Return thread notification counts down sync.
clokep Jun 16, 2022
e0ed95a
Add an experimental config option.
clokep Jul 6, 2022
d56296a
Add a sync flag for unread thread notifications
clokep Jul 28, 2022
18ea92b
Reset the notif/unread counts for all summaries before updating the c…
clokep Aug 5, 2022
8978bb7
Merge remote-tracking branch 'origin/develop' into clokep/thread-notifs
clokep Aug 15, 2022
dd96e07
Sync tests with non-thread version.
clokep Aug 16, 2022
9349642
Merge remote-tracking branch 'origin/develop' into clokep/thread-notifs
clokep Aug 16, 2022
2e85ec6
Make thread_id nullable.
clokep Aug 18, 2022
15afd70
Properly count the number of items cached by get_unread_event_push_ac…
clokep Aug 19, 2022
621b300
Merge remote-tracking branch 'origin/develop' into clokep/thread-notifs
clokep Aug 19, 2022
42e6da0
Tweaks to index on nulls.
clokep Aug 25, 2022
9ae86cb
Fix join when rotating notifications.
clokep Aug 25, 2022
b6e0e68
Add a where clause when upserting with nulls.
clokep Aug 25, 2022
000fed4
Merge remote-tracking branch 'origin/develop' into clokep/thread-notifs
clokep Aug 25, 2022
6dcf16d
Remove an XXX comment -- this should be fine.
clokep Aug 25, 2022
430cc0b
Merge remote-tracking branch 'origin/develop' into clokep/thread-notifs
clokep Aug 30, 2022
4c21565
Add a versions flag.
clokep Aug 31, 2022
15bdb62
Use an unstable identifier in the sync response.
clokep Aug 31, 2022
dbb1df3
Use unstable prefixes in filters.
clokep Aug 31, 2022
247132d
Merge branch 'develop' into clokep/thread-notifs
erikjohnston Sep 2, 2022
7d29206
Fix tests.
clokep Sep 6, 2022
b789c00
Merge remote-tracking branch 'origin/develop' into clokep/thread-notifs
clokep Sep 6, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ class JoinedSyncResult:
ephemeral: List[JsonDict]
account_data: List[JsonDict]
unread_notifications: JsonDict
unread_thread_notifications: JsonDict
summary: Optional[JsonDict]
unread_count: int

Expand Down Expand Up @@ -1053,7 +1054,7 @@ async def compute_state_delta(

async def unread_notifs_for_room_id(
self, room_id: str, sync_config: SyncConfig
) -> NotifCounts:
) -> Tuple[NotifCounts, Dict[str, NotifCounts]]:
with Measure(self.clock, "unread_notifs_for_room_id"):

return await self.store.get_unread_event_push_actions_by_room_for_user(
Expand Down Expand Up @@ -2115,18 +2116,32 @@ async def _generate_room_entry(
ephemeral=ephemeral,
account_data=account_data_events,
unread_notifications=unread_notifications,
unread_thread_notifications={},
summary=summary,
unread_count=0,
)

if room_sync or always_include:
notifs = await self.unread_notifs_for_room_id(room_id, sync_config)
notifs, thread_notifs = await self.unread_notifs_for_room_id(
room_id, sync_config
)

# Notifications for the main timeline.
unread_notifications["notification_count"] = notifs.notify_count
unread_notifications["highlight_count"] = notifs.highlight_count

room_sync.unread_count = notifs.unread_count

# And add info for each thread.
room_sync.unread_thread_notifications = {
thread_id: {
"notification_count": thread_notifs.notify_count,
"highlight_count": thread_notifs.highlight_count,
}
for thread_id, thread_notifs in thread_notifs.items()
if thread_id is not None
}

sync_result_builder.joined.append(room_sync)

if batch.limited and since_token:
Expand Down
11 changes: 8 additions & 3 deletions synapse/push/push_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,26 @@ async def get_badge_count(store: DataStore, user_id: str, group_by_room: bool) -
badge = len(invites)

for room_id in joins:
notifs = await (
notifs, thread_notifs = await (
store.get_unread_event_push_actions_by_room_for_user(
room_id,
user_id,
)
)
if notifs.notify_count == 0:
# Combine the counts from all the threads.
notify_count = notifs.notify_count + sum(
n.notify_count for n in thread_notifs.values()
)

if notify_count == 0:
continue

if group_by_room:
# return one badge count per conversation
badge += 1
else:
# increment the badge count by the number of unread messages in the room
badge += notifs.notify_count
badge += notify_count
return badge


Expand Down
1 change: 1 addition & 0 deletions synapse/rest/client/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@ async def encode_room(
ephemeral_events = room.ephemeral
result["ephemeral"] = {"events": ephemeral_events}
result["unread_notifications"] = room.unread_notifications
result["unread_thread_notifications"] = room.unread_thread_notifications
result["summary"] = room.summary
if self._msc2654_enabled:
result["org.matrix.msc2654.unread_count"] = room.unread_count
Expand Down
143 changes: 100 additions & 43 deletions synapse/storage/databases/main/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,12 +229,12 @@ def __init__(
replaces_index="event_push_summary_unique_index",
)

@cached(tree=True, max_entries=5000)
@cached(tree=True, max_entries=5000, iterable=True)
clokep marked this conversation as resolved.
Show resolved Hide resolved
async def get_unread_event_push_actions_by_room_for_user(
self,
room_id: str,
user_id: str,
) -> NotifCounts:
) -> Tuple[NotifCounts, Dict[str, NotifCounts]]:
"""Get the notification count, the highlight count and the unread message count
for a given user in a given room after the given read receipt.

Expand Down Expand Up @@ -263,7 +263,7 @@ def _get_unread_counts_by_receipt_txn(
txn: LoggingTransaction,
room_id: str,
user_id: str,
) -> NotifCounts:
) -> Tuple[NotifCounts, Dict[str, NotifCounts]]:
result = self.get_last_receipt_for_user_txn(
txn,
user_id,
Expand Down Expand Up @@ -295,12 +295,20 @@ def _get_unread_counts_by_receipt_txn(

def _get_unread_counts_by_pos_txn(
self, txn: LoggingTransaction, room_id: str, user_id: str, stream_ordering: int
) -> NotifCounts:
) -> Tuple[NotifCounts, Dict[str, NotifCounts]]:
"""Get the number of unread messages for a user/room that have happened
since the given stream ordering.

Returns:
A tuple of:
The unread messages for the main timeline

A dictionary of thread ID to unread messages for that thread.
Only contains threads with unread messages.
"""

counts = NotifCounts()
thread_counts = {}

# First we pull the counts from the summary table.
#
Expand All @@ -317,7 +325,7 @@ def _get_unread_counts_by_pos_txn(
# receipt).
txn.execute(
"""
SELECT stream_ordering, notif_count, COALESCE(unread_count, 0)
SELECT stream_ordering, notif_count, COALESCE(unread_count, 0), thread_id
FROM event_push_summary
WHERE room_id = ? AND user_id = ?
AND (
Expand All @@ -327,39 +335,67 @@ def _get_unread_counts_by_pos_txn(
""",
(room_id, user_id, stream_ordering, stream_ordering),
)
row = txn.fetchone()
max_summary_stream_ordering = 0
for summary_stream_ordering, notif_count, unread_count, thread_id in txn:
if not thread_id:
counts = NotifCounts(
notify_count=notif_count, unread_count=unread_count
)
# TODO Delete zeroed out threads completely from the database.
elif notif_count or unread_count:
thread_counts[thread_id] = NotifCounts(
notify_count=notif_count, unread_count=unread_count
)
Comment on lines +393 to +397
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth looking at this again briefly not that some other bugs have been fixed.


summary_stream_ordering = 0
if row:
summary_stream_ordering = row[0]
counts.notify_count += row[1]
counts.unread_count += row[2]
# XXX All threads should have the same stream ordering?
max_summary_stream_ordering = max(
summary_stream_ordering, max_summary_stream_ordering
)
Comment on lines +399 to +402
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to figure out what's going on here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at my test server a bit, this isn't true (that all event_push_summary for a room/user have the same stream_ordering). I'm not really sure why I thought this, but it likely causes subtle bugs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that stream ordering is the "max" stream ordering that we rotated from the event_push_actions table:

https://github.com/matrix-org/synapse/pull/13181/files#diff-f121377d76a7b35c60092da2c4bf8c849544459a2c676cf8e87057aff24ece54R1218

I think that means its equivalent to the stream ordering we rotated up to? They may differ but there should never be any relevant rows in EPA between the per-thread stream ordering and the max stream ordering?


# Next we need to count highlights, which aren't summarised
sql = """
SELECT COUNT(*) FROM event_push_actions
SELECT COUNT(*), thread_id FROM event_push_actions
WHERE user_id = ?
AND room_id = ?
AND stream_ordering > ?
AND highlight = 1
GROUP BY thread_id
"""
txn.execute(sql, (user_id, room_id, stream_ordering))
row = txn.fetchone()
if row:
counts.highlight_count += row[0]
for highlight_count, thread_id in txn:
if not thread_id:
counts.highlight_count += highlight_count
elif highlight_count:
if thread_id in thread_counts:
thread_counts[thread_id].highlight_count += highlight_count
else:
thread_counts[thread_id] = NotifCounts(
notify_count=0, unread_count=0, highlight_count=highlight_count
)

# Finally we need to count push actions that aren't included in the
# summary returned above, e.g. recent events that haven't been
# summarised yet, or the summary is empty due to a recent read receipt.
stream_ordering = max(stream_ordering, summary_stream_ordering)
notify_count, unread_count = self._get_notif_unread_count_for_user_room(
stream_ordering = max(stream_ordering, max_summary_stream_ordering)
unread_counts = self._get_notif_unread_count_for_user_room(
txn, room_id, user_id, stream_ordering
)

counts.notify_count += notify_count
counts.unread_count += unread_count
for notif_count, unread_count, thread_id in unread_counts:
if not thread_id:
counts.notify_count += notif_count
counts.unread_count += unread_count
elif thread_id in thread_counts:
thread_counts[thread_id].notify_count += notif_count
thread_counts[thread_id].unread_count += unread_count
else:
thread_counts[thread_id] = NotifCounts(
notify_count=notif_count,
unread_count=unread_count,
highlight_count=0,
)

return counts
return counts, thread_counts

def _get_notif_unread_count_for_user_room(
self,
Expand All @@ -368,7 +404,7 @@ def _get_notif_unread_count_for_user_room(
user_id: str,
stream_ordering: int,
max_stream_ordering: Optional[int] = None,
) -> Tuple[int, int]:
) -> List[Tuple[int, int, str]]:
"""Returns the notify and unread counts from `event_push_actions` for
the given user/room in the given range.

Expand All @@ -390,7 +426,7 @@ def _get_notif_unread_count_for_user_room(
# If there have been no events in the room since the stream ordering,
# there can't be any push actions either.
if not self._events_stream_cache.has_entity_changed(room_id, stream_ordering):
return 0, 0
return []

clause = ""
args = [user_id, room_id, stream_ordering]
Expand All @@ -401,26 +437,23 @@ def _get_notif_unread_count_for_user_room(
# If the max stream ordering is less than the min stream ordering,
# then obviously there are zero push actions in that range.
if max_stream_ordering <= stream_ordering:
return 0, 0
return []

sql = f"""
SELECT
COUNT(CASE WHEN notif = 1 THEN 1 END),
COUNT(CASE WHEN unread = 1 THEN 1 END)
FROM event_push_actions ea
WHERE user_id = ?
COUNT(CASE WHEN unread = 1 THEN 1 END),
thread_id
FROM event_push_actions ea
WHERE user_id = ?
AND room_id = ?
AND ea.stream_ordering > ?
{clause}
GROUP BY thread_id
"""

txn.execute(sql, args)
row = txn.fetchone()

if row:
return cast(Tuple[int, int], row)

return 0, 0
return cast(List[Tuple[int, int, str]], txn.fetchall())

async def get_push_action_users_in_range(
self, min_stream_ordering: int, max_stream_ordering: int
Expand Down Expand Up @@ -1010,21 +1043,42 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool:

# Fetch the notification counts between the stream ordering of the
# latest receipt and what was previously summarised.
notif_count, unread_count = self._get_notif_unread_count_for_user_room(
unread_counts = self._get_notif_unread_count_for_user_room(
txn, room_id, user_id, stream_ordering, old_rotate_stream_ordering
)

# Replace the previous summary with the new counts.
self.db_pool.simple_upsert_txn(
# Updated threads get their notification count and unread count updated.
self.db_pool.simple_upsert_many_txn(
txn,
table="event_push_summary",
keyvalues={"room_id": room_id, "user_id": user_id},
values={
"notif_count": notif_count,
"unread_count": unread_count,
"stream_ordering": old_rotate_stream_ordering,
"last_receipt_stream_ordering": stream_ordering,
},
key_names=("room_id", "user_id", "thread_id"),
key_values=[(room_id, user_id, row[2]) for row in unread_counts],
value_names=(
"notif_count",
"unread_count",
"stream_ordering",
"last_receipt_stream_ordering",
),
value_values=[
(row[0], row[1], old_rotate_stream_ordering, stream_ordering)
for row in unread_counts
],
)

# Other threads should be marked as reset at the old stream ordering.
txn.execute(
"""
UPDATE event_push_summary SET notif_count = 0, unread_count = 0, stream_ordering = ?, last_receipt_stream_ordering = ?
WHERE user_id = ? AND room_id = ? AND
stream_ordering <= ?
""",
(
old_rotate_stream_ordering,
stream_ordering,
user_id,
room_id,
old_rotate_stream_ordering,
),
)
clokep marked this conversation as resolved.
Show resolved Hide resolved

# We always update `event_push_summary_last_receipt_stream_id` to
Expand Down Expand Up @@ -1178,7 +1232,10 @@ def _rotate_notifs_before_txn(
txn,
table="event_push_summary",
key_names=("user_id", "room_id", "thread_id"),
key_values=[(user_id, room_id, thread_id) for user_id, room_id, thread_id in summaries],
key_values=[
(user_id, room_id, thread_id)
for user_id, room_id, thread_id in summaries
],
value_names=("notif_count", "unread_count", "stream_ordering"),
value_values=[
(
Expand Down
6 changes: 3 additions & 3 deletions tests/replication/slave/storage/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def test_push_actions_for_user(self, send_receipt: bool):
self.check(
"get_unread_event_push_actions_by_room_for_user",
[ROOM_ID, USER_ID_2],
NotifCounts(highlight_count=0, unread_count=0, notify_count=0),
(NotifCounts(highlight_count=0, unread_count=0, notify_count=0), {}),
)

self.persist(
Expand All @@ -191,7 +191,7 @@ def test_push_actions_for_user(self, send_receipt: bool):
self.check(
"get_unread_event_push_actions_by_room_for_user",
[ROOM_ID, USER_ID_2],
NotifCounts(highlight_count=0, unread_count=0, notify_count=1),
(NotifCounts(highlight_count=0, unread_count=0, notify_count=1), {}),
)

self.persist(
Expand All @@ -206,7 +206,7 @@ def test_push_actions_for_user(self, send_receipt: bool):
self.check(
"get_unread_event_push_actions_by_room_for_user",
[ROOM_ID, USER_ID_2],
NotifCounts(highlight_count=1, unread_count=0, notify_count=2),
(NotifCounts(highlight_count=1, unread_count=0, notify_count=2), {}),
)

def test_get_rooms_for_user_with_stream_ordering(self):
Expand Down
Loading