Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix unread counts on large servers #13140

Merged
merged 9 commits into from
Jun 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/13140.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix unread counts for users on large servers. Introduced in v1.62.0rc1.
48 changes: 26 additions & 22 deletions synapse/storage/databases/main/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -854,18 +854,20 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool:

limit = 100

min_stream_id = self.db_pool.simple_select_one_onecol_txn(
min_receipts_stream_id = self.db_pool.simple_select_one_onecol_txn(
txn,
table="event_push_summary_last_receipt_stream_id",
keyvalues={},
retcol="stream_id",
)

max_receipts_stream_id = self._receipts_id_gen.get_current_token()

sql = """
SELECT r.stream_id, r.room_id, r.user_id, e.stream_ordering
FROM receipts_linearized AS r
INNER JOIN events AS e USING (event_id)
WHERE r.stream_id > ? AND user_id LIKE ?
WHERE ? < r.stream_id AND r.stream_id <= ? AND user_id LIKE ?
ORDER BY r.stream_id ASC
LIMIT ?
"""
Expand All @@ -877,13 +879,21 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool:
txn.execute(
sql,
(
min_stream_id,
min_receipts_stream_id,
max_receipts_stream_id,
user_filter,
limit,
),
)
rows = txn.fetchall()

old_rotate_stream_ordering = self.db_pool.simple_select_one_onecol_txn(
txn,
table="event_push_summary_stream_ordering",
keyvalues={},
retcol="stream_ordering",
)

# For each new read receipt we delete push actions from before it and
# recalculate the summary.
for _, room_id, user_id, stream_ordering in rows:
Expand All @@ -902,13 +912,6 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool:
(room_id, user_id, stream_ordering),
)

old_rotate_stream_ordering = self.db_pool.simple_select_one_onecol_txn(
txn,
table="event_push_summary_stream_ordering",
keyvalues={},
retcol="stream_ordering",
)
Copy link
Member Author

@erikjohnston erikjohnston Jun 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this out of the loop as no point requerying every iteration


notif_count, unread_count = self._get_notif_unread_count_for_user_room(
txn, room_id, user_id, stream_ordering, old_rotate_stream_ordering
)
Expand All @@ -927,18 +930,19 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool:

# We always update `event_push_summary_last_receipt_stream_id` to
# ensure that we don't rescan the same receipts for remote users.
#
# This requires repeatable read to be safe, as we need the
# `MAX(stream_id)` to not include any new rows that have been committed
# since the start of the transaction (since those rows won't have been
# returned by the query above). Alternatively we could query the max
# stream ID at the start of the transaction and bound everything by
# that.
Comment on lines -931 to -936
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what was the reason for removing this comment, ooi?

Copy link
Member Author

@erikjohnston erikjohnston Jun 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're no longer reading the MAX(stream_id) and instead explicitly bound what we pull from the DB, so this comment is no longer true.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah duh yes. thanks.

txn.execute(
"""
UPDATE event_push_summary_last_receipt_stream_id
SET stream_id = (SELECT COALESCE(MAX(stream_id), 0) FROM receipts_linearized)
"""

upper_limit = max_receipts_stream_id
if len(rows) >= limit:
# If we pulled out a limited number of rows we only update the
# position to the last receipt we processed, so we continue
# processing the rest next iteration.
upper_limit = rows[-1][0]

self.db_pool.simple_update_txn(
txn,
table="event_push_summary_last_receipt_stream_id",
keyvalues={},
updatevalues={"stream_id": upper_limit},
)

return len(rows) < limit
Expand Down
12 changes: 5 additions & 7 deletions tests/storage/test_event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,12 @@ def _mark_read(stream: int, depth: int) -> None:
last_read_stream_ordering[0] = stream

self.get_success(
self.store.db_pool.runInteraction(
"",
self.store._insert_linearized_receipt_txn,
self.store.insert_receipt(
room_id,
"m.read",
user_id,
f"$test{stream}:example.com",
{},
stream,
user_id=user_id,
event_ids=[f"$test{stream}:example.com"],
data={},
)
)

Expand All @@ -166,6 +163,7 @@ def _mark_read(stream: int, depth: int) -> None:

_inject_actions(6, PlAIN_NOTIF)
_rotate(7)
_assert_counts(1, 0)

self.get_success(
self.store.db_pool.simple_delete(
Expand Down