Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Re-implement unread counts (again) #8059

Merged
merged 17 commits into from
Sep 2, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def __nonzero__(self) -> bool:
# if there are updates for it, which we check after the instance has been created.
# This should not be a big deal because we update the notification counts afterwards as
# well anyway.
@attr.s(slots=True, frozen=False)
@attr.s(slots=True)
class JoinedSyncResult:
room_id = attr.ib(type=str)
timeline = attr.ib(type=TimelineBatch)
Expand Down Expand Up @@ -936,7 +936,7 @@ async def compute_state_delta(

async def unread_notifs_for_room_id(
self, room_id: str, sync_config: SyncConfig
) -> Optional[Dict[str, int]]:
) -> Dict[str, int]:
with Measure(self.clock, "unread_notifs_for_room_id"):
last_unread_event_id = await self.store.get_last_receipt_event_id_for_user(
user_id=sync_config.user.to_string(),
Expand Down Expand Up @@ -1901,11 +1901,10 @@ async def _generate_room_entry(
if room_sync or always_include:
notifs = await self.unread_notifs_for_room_id(room_id, sync_config)

if notifs is not None:
unread_notifications["notification_count"] = notifs["notify_count"]
unread_notifications["highlight_count"] = notifs["highlight_count"]
unread_notifications["notification_count"] = notifs["notify_count"]
unread_notifications["highlight_count"] = notifs["highlight_count"]

room_sync.unread_count = notifs["unread_count"]
room_sync.unread_count = notifs["unread_count"]

sync_result_builder.joined.append(room_sync)

Expand Down
20 changes: 14 additions & 6 deletions synapse/push/bulk_push_rule_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,12 @@ async def _get_power_levels_and_sender_level(self, event, context):
return pl_event.content if pl_event else {}, sender_level

async def action_for_event_by_user(self, event, context) -> None:
"""Given an event and context, evaluate the push rules and insert the
results into the event_push_actions_staging table.
"""Given an event and context, evaluate the push rules, check if the message
should increment the unread count, and insert the results into the
event_push_actions_staging table.
"""
count_as_unread = _should_count_as_unread(event, context)

rules_by_user = await self._get_rules_for_event(event, context)
actions_by_user = {}

Expand Down Expand Up @@ -237,7 +240,12 @@ async def action_for_event_by_user(self, event, context) -> None:
# The push rules endpoint on the CS API checks the actions on new push rules
# and limit them to spec'd ones, so we shouldn't have to worry about users
# changing their push rules to include this action.
if _should_count_as_unread(event, context):
# An alternative way to do this would be to pass count_as_unread directly to
# add_push_actions_to_staging, but that wouldn't work as that function relies
# on actions_by_user to determine the list of users to insert a row for,
# therefore it wouldn't add a row for any user that's in the room but doesn't
# get notified by the event.
babolivier marked this conversation as resolved.
Show resolved Hide resolved
if count_as_unread:
if uid in actions_by_user:
actions_by_user[uid].append("mark_unread")
else:
Expand Down Expand Up @@ -426,8 +434,8 @@ async def _update_rules_with_member_event_ids(
Args:
ret_rules_by_user (dict): Partiallly filled dict of push rules. Gets
updated with any new rules.
member_event_ids (dict): List of event ids for membership events that
have happened since the last time we filled rules_by_user
member_event_ids (dict): Dict of user id to event id for membership events
that have happened since the last time we filled rules_by_user
state_group: The state group we are currently computing push rules
for. Used when updating the cache.
"""
Expand Down Expand Up @@ -455,7 +463,7 @@ async def _update_rules_with_member_event_ids(

logger.debug("Joined: %r", user_ids)

# Previously we only considered users with pushers and read receipts in that
# Previously we only considered users with pushers or read receipts in that
# room. We can't do this anymore because we use push actions to calculate unread
# counts, which don't rely on the user having pushers or sent a read receipt into
# the room. Therefore we just need to filter for local users here.
Expand Down
86 changes: 47 additions & 39 deletions synapse/storage/databases/main/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import attr

from synapse.api.constants import Membership
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import LoggingTransaction, SQLBaseStore, db_to_json
from synapse.storage.database import DatabasePool
Expand Down Expand Up @@ -99,31 +98,28 @@ def __init__(self, database: DatabasePool, db_conn, hs):
self._rotate_delay = 3
self._rotate_count = 10000

def _stream_ordering_from_event_id_and_room_id_txn(
self, txn: LoggingTransaction, event_id: str, room_id: str,
) -> int:
"""Retrieve the stream ordering for the given event.
@cached(num_args=3, tree=True, max_entries=5000)
async def get_unread_event_push_actions_by_room_for_user(
self, room_id: str, user_id: str, last_read_event_id: Optional[str],
) -> Dict[str, int]:
"""Get the notification count, the highlight count and the unread message count
for a given user in a given room after the given read receipt.

Args:
event_id: The ID of the event to retrieve the stream ordering of.
room_id: The room the event was sent into.
Note that this function assumes the user to be a current member of the room,
since it's either call by the sync handler to handle joined room entries, or by
babolivier marked this conversation as resolved.
Show resolved Hide resolved
the HTTP pusher to calculate the badge of unread joined rooms.

Returns:
The stream ordering for this event.
We should always have a stream ordering to return, because the event ID
should come from a local read receipt.
Args:
room_id: The room to retrieve the counts in.
user_id: The user to retrieve the counts for.
last_read_event_id: The event associated with the latest read receipt for
this user in this room. None if no receipt for this user in this room.

Returns
A dict containing the counts mentioned earlier in this docstring,
respectively under the keys "notify_count", "highlight_count" and
"unread_count".
"""
return self.db_pool.simple_select_one_onecol_txn(
txn=txn,
table="events",
keyvalues={"room_id": room_id, "event_id": event_id},
retcol="stream_ordering",
)

@cachedInlineCallbacks(num_args=3, tree=True, max_entries=5000)
def get_unread_event_push_actions_by_room_for_user(
self, room_id, user_id, last_read_event_id
):
return await self.db_pool.runInteraction(
"get_unread_event_push_actions_by_room",
self._get_unread_counts_by_receipt_txn,
Expand All @@ -135,17 +131,26 @@ def get_unread_event_push_actions_by_room_for_user(
def _get_unread_counts_by_receipt_txn(
self, txn, room_id, user_id, last_read_event_id,
):
if last_read_event_id is None:
stream_ordering = self.get_stream_ordering_for_local_membership_txn(
txn, user_id, room_id, Membership.JOIN,
)
else:
stream_ordering = self._stream_ordering_from_event_id_and_room_id_txn(
txn, last_read_event_id, room_id,
stream_ordering = None

if last_read_event_id is not None:
stream_ordering = self.get_stream_id_for_event_txn(
txn, last_read_event_id, allow_none=True,
)

if stream_ordering is None:
babolivier marked this conversation as resolved.
Show resolved Hide resolved
return {"notify_count": 0, "unread_count": 0, "highlight_count": 0}
# Either last_read_event_id is None, or it's an event we don't have (e.g.
# because it's been purged), in which case retrieve the stream ordering for
# the latest membership event from this user in this room (which we assume is
# a join).
event_id = self.db_pool.simple_select_one_onecol_txn(
txn=txn,
table="local_current_membership",
keyvalues={"room_id": room_id, "user_id": user_id},
retcol="event_id",
)

stream_ordering = self.get_stream_id_for_event_txn(txn, event_id)

return self._get_unread_counts_by_pos_txn(
txn, room_id, user_id, stream_ordering
Expand Down Expand Up @@ -211,9 +216,8 @@ def _get_count_from_push_actions_txn(
push_actions_column: The column to filter by when querying from
event_push_actions. The filtering will be done on the condition
"[column] = 1".
push_summary_column: The count in event_push_summary to add the results from
the first query to. None if there is no count in the event_push_summary
table to add the results to.
push_summary_column: The count in event_push_summary to retrieve and add to
the results from the first query. None if there's no such count.

Returns:
The desired count.
Expand Down Expand Up @@ -515,7 +519,6 @@ async def add_push_actions_to_staging(self, event_id, user_id_actions):
Returns:
Deferred
"""

if not user_id_actions:
return

Expand Down Expand Up @@ -921,14 +924,17 @@ def _rotate_notifs_before_txn(self, txn, rotate_to_stream_ordering):
LEFT JOIN event_push_summary AS old USING (user_id, room_id)
"""

# First get the count of unread messages.
txn.execute(
sql % ("unread_count", "unread"),
(old_rotate_stream_ordering, rotate_to_stream_ordering),
)

# We need to merge both lists into a single object because we might not have the
# same amount of rows in each of them. In this case we use a dict indexed on the
# user ID and room ID to make it easier to populate.
# We need to merge results from the two requests (the one that retrieves the
# unread count and the one that retrieves the notifications count) into a single
# object because we might not have the same amount of rows in each of them. To do
# this, we use a dict indexed on the user ID and room ID to make it easier to
# populate.
summaries = {} # type: Dict[Tuple[str, str], EventPushSummary]
for row in txn:
summaries[(row[0], row[1])] = EventPushSummary(
Expand All @@ -949,7 +955,9 @@ def _rotate_notifs_before_txn(self, txn, rotate_to_stream_ordering):
summaries[(row[0], row[1])].notif_count = row[2]
else:
# Because the rules on notifying are different than the rules on marking
# a message unread, we might end up with messages that notify and
# a message unread, we might end up with messages that notify but aren't
# marked unread, so we might not have a summary for this (user, room)
# tuple to complete.
summaries[(row[0], row[1])] = EventPushSummary(
unread_count=0,
stream_ordering=row[3],
Expand Down
34 changes: 0 additions & 34 deletions synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -814,40 +814,6 @@ def _is_local_host_in_room_ignoring_users_txn(txn):
_is_local_host_in_room_ignoring_users_txn,
)

def get_stream_ordering_for_local_membership_txn(
self,
txn: LoggingTransaction,
user_id: str,
room_id: str,
membership: Membership,
) -> Optional[int]:
"""Get the stream ordering for a given local room membership.

Args:
user_id: The user ID to retrieve the membership for.
room_id: The room ID to retrieve the membership for.
membership: The membership to retrieve the stream ordering for.

Returns:
The stream ordering, or None if the membership wasn't found.
"""
txn.execute(
"""
SELECT stream_ordering FROM local_current_membership
LEFT JOIN events USING (event_id, room_id)
WHERE membership = ?
AND user_id = ?
AND room_id = ?
""",
(membership, user_id, room_id),
)
row = txn.fetchone()

if row is None:
return None

return row[0]


class RoomMemberBackgroundUpdateStore(SQLBaseStore):
def __init__(self, database: DatabasePool, db_conn, hs):
Expand Down
21 changes: 18 additions & 3 deletions synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,11 @@
from synapse.events import EventBase
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import DatabasePool, make_in_list_sql_clause
from synapse.storage.database import (
DatabasePool,
LoggingTransaction,
make_in_list_sql_clause,
)
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
from synapse.types import RoomStreamToken
Expand Down Expand Up @@ -590,8 +594,19 @@ async def get_stream_id_for_event(self, event_id: str) -> int:
Returns:
A stream ID.
"""
return await self.db_pool.simple_select_one_onecol(
table="events", keyvalues={"event_id": event_id}, retcol="stream_ordering"
return await self.db_pool.runInteraction(
"get_stream_id_for_event", self.get_stream_id_for_event_txn, event_id,
)

def get_stream_id_for_event_txn(
self, txn: LoggingTransaction, event_id: str, allow_none=False,
) -> int:
return self.db_pool.simple_select_one_onecol_txn(
txn=txn,
table="events",
keyvalues={"event_id": event_id},
retcol="stream_ordering",
allow_none=allow_none,
)

async def get_stream_token_for_event(self, event_id: str) -> str:
Expand Down
1 change: 0 additions & 1 deletion tests/rest/client/v2_alpha/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,6 @@ def prepare(self, reactor, clock, hs):
tok=self.tok,
)

# @unittest.DEBUG
def test_unread_counts(self):
"""Tests that /sync returns the right value for the unread count (MSC2654)."""

Expand Down