From 5726d7ef4b23b2e99ce0e76f2fcebe40c0beca8d Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Tue, 13 Sep 2022 10:10:25 -0700 Subject: [PATCH 01/10] send events as a batch to be persisted --- synapse/handlers/room.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 09a1a82e6c05..a4e865724731 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1057,8 +1057,10 @@ async def _send_events_for_new_room( creator_id = creator.user.to_string() event_keys = {"room_id": room_id, "sender": creator_id, "state_key": ""} depth = 1 + # the last event sent/persisted to the db last_sent_event_id: Optional[str] = None + # the most recently created event prev_event: List[str] = [] # a map of event types, state keys -> event_ids. We collect these mappings this as events are @@ -1152,7 +1154,7 @@ async def send( prev_event_ids=[last_sent_event_id], depth=depth, ) - last_sent_event_id = member_event_id + # last_sent_event_id = member_event_id prev_event = [member_event_id] # update the depth and state map here as the membership event has been created @@ -1168,7 +1170,7 @@ async def send( EventTypes.PowerLevels, pl_content, False ) current_state_group = power_context._state_group - last_sent_stream_id = await send(power_event, power_context, creator) + await send(power_event, power_context, creator) else: power_level_content: JsonDict = { "users": {creator_id: 100}, @@ -1217,7 +1219,7 @@ async def send( False, ) current_state_group = pl_context._state_group - last_sent_stream_id = await send(pl_event, pl_context, creator) + await send(pl_event, pl_context, creator) events_to_send = [] if room_alias and (EventTypes.CanonicalAlias, "") not in initial_state: @@ -1271,9 +1273,11 @@ async def send( ) events_to_send.append((encryption_event, encryption_context)) - for event, context in events_to_send: - last_sent_stream_id = await send(event, context, creator) - return last_sent_stream_id, last_sent_event_id, depth + last_event = await self.event_creation_handler.handle_create_room_events( + creator, events_to_send + ) + assert last_event.internal_metadata.stream_ordering is not None + return last_event.internal_metadata.stream_ordering, last_event.event_id, depth def _generate_room_id(self) -> str: """Generates a random room ID. From 2f81005cff5b898a3363deed8b947ef6e71e4a46 Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Tue, 13 Sep 2022 10:11:11 -0700 Subject: [PATCH 02/10] add fucntions to persist events as a batch, encapsulate some logic in a helper function --- synapse/handlers/message.py | 218 ++++++++++++++++++++++++++++-------- 1 file changed, 174 insertions(+), 44 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 062f93bc676f..506dc2d62571 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -61,6 +61,7 @@ from synapse.storage.state import StateFilter from synapse.types import ( MutableStateMap, + PersistedEventPosition, Requester, RoomAlias, StateMap, @@ -1289,6 +1290,124 @@ async def _validate_event_relation(self, event: EventBase) -> None: 400, "Cannot start threads from an event with a relation" ) + async def handle_create_room_events( + self, + requester: Requester, + events_and_ctx: List[Tuple[EventBase, EventContext]], + ratelimit: bool = True, + ) -> EventBase: + """ + Process a batch of room creation events. For each event in the list it checks + the authorization and that the event can be serialized. Returns the last event in the + list once it has been persisted. + Args: + requester: the room creator + events_and_ctx: a set of events and their associated contexts to persist + ratelimit: whether to ratelimit this request + """ + for event, context in events_and_ctx: + try: + validate_event_for_room_version(event) + await self._event_auth_handler.check_auth_rules_from_context( + event, context + ) + except AuthError as err: + logger.warning("Denying new event %r because %s", event, err) + raise err + + # Ensure that we can round trip before trying to persist in db + try: + dump = json_encoder.encode(event.content) + json_decoder.decode(dump) + except Exception: + logger.exception("Failed to encode content: %r", event.content) + raise + + # We now persist the events + try: + result = await self._persist_events_batch( + requester, events_and_ctx, ratelimit + ) + except Exception as e: + logger.info(f"Encountered an error persisting events: {e}") + + return result + + async def _persist_events_batch( + self, + requestor: Requester, + events_and_ctx: List[Tuple[EventBase, EventContext]], + ratelimit: bool = True, + ) -> EventBase: + """ + Processes the push actions and adds them to the push staging area before attempting to + persist the batch of events. + See handle_create_room_events for arguments + Returns the last event in the list if persisted successfully + """ + for event, context in events_and_ctx: + with opentracing.start_active_span("calculate_push_actions"): + await self._bulk_push_rule_evaluator.action_for_event_by_user( + event, context + ) + try: + last_event = await self.persist_and_notify_batched_events( + requestor, events_and_ctx, ratelimit + ) + except Exception: + # Ensure that we actually remove the entries in the push actions + # staging area, if we calculated them. + for event, _ in events_and_ctx: + await self.store.remove_push_actions_from_staging(event.event_id) + raise + + return last_event + + async def persist_and_notify_batched_events( + self, + requester: Requester, + events_and_ctx: List[Tuple[EventBase, EventContext]], + ratelimit: bool = True, + ) -> EventBase: + """ + Handles the actual persisting of a batch of events to the DB, and sends the appropriate + notifications when this is done. + Args: + requester: the room creator + events_and_ctx: list of events and their associated contexts to persist + ratelimit: whether to apply ratelimiting to this request + """ + if ratelimit: + await self.request_ratelimiter.ratelimit(requester) + + for event, context in events_and_ctx: + await self._actions_by_event_type(event, context) + + assert self._storage_controllers.persistence is not None + ( + persisted_events, + max_stream_token, + ) = await self._storage_controllers.persistence.persist_events(events_and_ctx) + + stream_ordering = persisted_events[-1].internal_metadata.stream_ordering + assert stream_ordering is not None + pos = PersistedEventPosition(self._instance_name, stream_ordering) + + async def _notify() -> None: + try: + await self.notifier.on_new_room_event( + persisted_events[-1], pos, max_stream_token + ) + except Exception: + logger.exception( + "Error notifying about new room event %s", + event.event_id, + ) + + run_in_background(_notify) + + return persisted_events[-1] + @measure_func("handle_new_client_event") async def handle_new_client_event( self, @@ -1623,6 +1742,55 @@ async def persist_and_notify_client_event( requester, is_admin_redaction=is_admin_redaction ) + # run checks/actions on event based on type + await self._actions_by_event_type(event, context) + + # Mark any `m.historical` messages as backfilled so they don't appear + # in `/sync` and have the proper decrementing `stream_ordering` as we import + backfilled = False + if event.internal_metadata.is_historical(): + backfilled = True + + # Note that this returns the event that was persisted, which may not be + # the same as we passed in if it was deduplicated due transaction IDs. + ( + event, + event_pos, + max_stream_token, + ) = await self._storage_controllers.persistence.persist_event( + event, context=context, backfilled=backfilled + ) + + if self._ephemeral_events_enabled: + # If there's an expiry timestamp on the event, schedule its expiry. + self._message_handler.maybe_schedule_expiry(event) + + async def _notify() -> None: + try: + await self.notifier.on_new_room_event( + event, event_pos, max_stream_token, extra_users=extra_users + ) + except Exception: + logger.exception( + "Error notifying about new room event %s", + event.event_id, + ) + + run_in_background(_notify) + + if event.type == EventTypes.Message: + # We don't want to block sending messages on any presence code. This + # matters as sometimes presence code can take a while. + run_in_background(self._bump_active_time, requester.user) + + return event + + async def _actions_by_event_type( + self, event: EventBase, context: EventContext + ) -> None: + """ + Helper function to execute actions/checks based on the event type + """ if event.type == EventTypes.Member and event.membership == Membership.JOIN: ( current_membership, @@ -1643,11 +1811,13 @@ async def persist_and_notify_client_event( original_event_id = event.unsigned.get("replaces_state") if original_event_id: - original_event = await self.store.get_event(original_event_id) + original_alias_event = await self.store.get_event(original_event_id) - if original_event: - original_alias = original_event.content.get("alias", None) - original_alt_aliases = original_event.content.get("alt_aliases", []) + if original_alias_event: + original_alias = original_alias_event.content.get("alias", None) + original_alt_aliases = original_alias_event.content.get( + "alt_aliases", [] + ) # Check the alias is currently valid (if it has changed). room_alias_str = event.content.get("alias", None) @@ -1825,46 +1995,6 @@ async def persist_and_notify_client_event( errcode=Codes.INVALID_PARAM, ) - # Mark any `m.historical` messages as backfilled so they don't appear - # in `/sync` and have the proper decrementing `stream_ordering` as we import - backfilled = False - if event.internal_metadata.is_historical(): - backfilled = True - - # Note that this returns the event that was persisted, which may not be - # the same as we passed in if it was deduplicated due transaction IDs. - ( - event, - event_pos, - max_stream_token, - ) = await self._storage_controllers.persistence.persist_event( - event, context=context, backfilled=backfilled - ) - - if self._ephemeral_events_enabled: - # If there's an expiry timestamp on the event, schedule its expiry. - self._message_handler.maybe_schedule_expiry(event) - - async def _notify() -> None: - try: - await self.notifier.on_new_room_event( - event, event_pos, max_stream_token, extra_users=extra_users - ) - except Exception: - logger.exception( - "Error notifying about new room event %s", - event.event_id, - ) - - run_in_background(_notify) - - if event.type == EventTypes.Message: - # We don't want to block sending messages on any presence code. This - # matters as sometimes presence code can take a while. - run_in_background(self._bump_active_time, requester.user) - - return event - async def _maybe_kick_guest_users( self, event: EventBase, context: EventContext ) -> None: From c1a8a9e28904ca52f41fa85101122e49d5495d1a Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Fri, 16 Sep 2022 13:36:41 -0700 Subject: [PATCH 03/10] add suppport for persisting batched events over replication --- synapse/handlers/message.py | 29 ++++- synapse/replication/http/__init__.py | 2 + synapse/replication/http/send_events.py | 165 ++++++++++++++++++++++++ 3 files changed, 194 insertions(+), 2 deletions(-) create mode 100644 synapse/replication/http/send_events.py diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 506dc2d62571..ffb0caf7fd89 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -56,6 +56,7 @@ from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.http.send_event import ReplicationSendEventRestServlet +from synapse.replication.http.send_events import ReplicationSendEventsRestServlet from synapse.storage.databases.main.events import PartialStateConflictError from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.storage.state import StateFilter @@ -494,6 +495,7 @@ def __init__(self, hs: "HomeServer"): self.membership_types_to_include_profile_data_in.add(Membership.INVITE) self.send_event = ReplicationSendEventRestServlet.make_client(hs) + self.send_events = ReplicationSendEventsRestServlet.make_client(hs) self.request_ratelimiter = hs.get_request_ratelimiter() @@ -1335,7 +1337,7 @@ async def handle_create_room_events( async def _persist_events_batch( self, - requestor: Requester, + requester: Requester, events_and_ctx: List[Tuple[EventBase, EventContext]], ratelimit: bool = True, ) -> EventBase: @@ -1351,8 +1353,31 @@ async def _persist_events_batch( event, context ) try: + # If we're a worker we need to hit out to the master. + writer_instance = self._events_shard_config.get_instance(event.room_id) + if writer_instance != self._instance_name: + try: + result = await self.send_events( + instance_name=writer_instance, + store=self.store, + requester=requester, + events_and_ctx=events_and_ctx, + ratelimit=ratelimit, + ) + except SynapseError as e: + if e.code == HTTPStatus.CONFLICT: + raise PartialStateConflictError() + raise + stream_id = result["stream_id"] + + # If we newly persisted the event then we need to update its + # stream_ordering entry manually (as it was persisted on + # another worker). + event.internal_metadata.stream_ordering = stream_id + return event + last_event = await self.persist_and_notify_batched_events( - requestor, events_and_ctx, ratelimit + requester, events_and_ctx, ratelimit ) except Exception: # Ensure that we actually remove the entries in the push actions diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py index 53aa7fa4c6bd..ac9a92240af2 100644 --- a/synapse/replication/http/__init__.py +++ b/synapse/replication/http/__init__.py @@ -25,6 +25,7 @@ push, register, send_event, + send_events, state, streams, ) @@ -43,6 +44,7 @@ def __init__(self, hs: "HomeServer"): def register_servlets(self, hs: "HomeServer") -> None: send_event.register_servlets(hs, self) + send_events.register_servlets(hs, self) federation.register_servlets(hs, self) presence.register_servlets(hs, self) membership.register_servlets(hs, self) diff --git a/synapse/replication/http/send_events.py b/synapse/replication/http/send_events.py new file mode 100644 index 000000000000..5fad398b7946 --- /dev/null +++ b/synapse/replication/http/send_events.py @@ -0,0 +1,165 @@ +# Copyright 2022 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from typing import TYPE_CHECKING, List, Tuple + +from twisted.web.server import Request + +from synapse.api.room_versions import KNOWN_ROOM_VERSIONS +from synapse.events import EventBase, make_event_from_dict +from synapse.events.snapshot import EventContext +from synapse.http.server import HttpServer +from synapse.http.servlet import parse_json_object_from_request +from synapse.replication.http._base import ReplicationEndpoint +from synapse.types import JsonDict, Requester +from synapse.util.metrics import Measure + +if TYPE_CHECKING: + from synapse.server import HomeServer + from synapse.storage.databases.main import DataStore + +logger = logging.getLogger(__name__) + + +class ReplicationSendEventsRestServlet(ReplicationEndpoint): + """Handles batches of newly created events on workers, including persisting and + notifying. + + The API looks like: + + POST /_synapse/replication/send_events/:txn_id + + { + "events": [{ + "event": { .. serialized event .. }, + "room_version": .., // "1", "2", "3", etc: the version of the room + // containing the event + "event_format_version": .., // 1,2,3 etc: the event format version + "internal_metadata": { .. serialized internal_metadata .. }, + "outlier": true|false, + "rejected_reason": .., // The event.rejected_reason field + "context": { .. serialized event context .. }, + "requester": { .. serialized requester .. }, + "ratelimit": true, + }] + } + + 200 OK + + { "stream_id": 12345, "event_id": "$abcdef..." } + + Responds with a 409 when a `PartialStateConflictError` is raised due to an event + context that needs to be recomputed due to the un-partial stating of a room. + + """ + + NAME = "send_events" + PATH_ARGS = () + + def __init__(self, hs: "HomeServer"): + super().__init__(hs) + + self.event_creation_handler = hs.get_event_creation_handler() + self.store = hs.get_datastores().main + self._storage_controllers = hs.get_storage_controllers() + self.clock = hs.get_clock() + + @staticmethod + async def _serialize_payload( # type: ignore[override] + store: "DataStore", + events_and_ctx: List[Tuple[EventBase, EventContext]], + requester: Requester, + ratelimit: bool, + ) -> JsonDict: + """ + Args: + store + requester + events_and_ctx + ratelimit + """ + serialized_events = [] + + for event, context in events_and_ctx: + serialized_context = await context.serialize(event, store) + serialized_event = { + "event": event.get_pdu_json(), + "room_version": event.room_version.identifier, + "event_format_version": event.format_version, + "internal_metadata": event.internal_metadata.get_dict(), + "outlier": event.internal_metadata.is_outlier(), + "rejected_reason": event.rejected_reason, + "context": serialized_context, + "requester": requester.serialize(), + "ratelimit": ratelimit, + } + serialized_events.append(serialized_event) + + payload = {"events": serialized_events} + + return payload + + async def _handle_request( # type: ignore[override] + self, request: Request + ) -> Tuple[int, JsonDict]: + with Measure(self.clock, "repl_send_events_parse"): + payload = parse_json_object_from_request(request) + events_and_ctx = [] + events = payload["events"] + + for event_payload in events: + event_dict = event_payload["event"] + room_ver = KNOWN_ROOM_VERSIONS[event_payload["room_version"]] + internal_metadata = event_payload["internal_metadata"] + rejected_reason = event_payload["rejected_reason"] + + event = make_event_from_dict( + event_dict, room_ver, internal_metadata, rejected_reason + ) + event.internal_metadata.outlier = event_payload["outlier"] + + requester = Requester.deserialize( + self.store, event_payload["requester"] + ) + context = EventContext.deserialize( + self._storage_controllers, event_payload["context"] + ) + + ratelimit = event_payload["ratelimit"] + events_and_ctx.append((event, context)) + + logger.info( + "Got batch of events to send, last ID of batch is: %s, sending into room: %s", + event.event_id, + event.room_id, + ) + + last_event = ( + await self.event_creation_handler.persist_and_notify_batched_events( + requester, events_and_ctx, ratelimit + ) + ) + + return ( + 200, + { + "stream_id": last_event.internal_metadata.stream_ordering, + "event_id": last_event.event_id, + }, + ) + + +def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: + ReplicationSendEventsRestServlet(hs).register(http_server) From c0aa1937b88a903e1001b39aef3180b99958d6a7 Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Wed, 21 Sep 2022 18:45:40 -0700 Subject: [PATCH 04/10] newsfragment --- changelog.d/13800.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/13800.misc diff --git a/changelog.d/13800.misc b/changelog.d/13800.misc new file mode 100644 index 000000000000..eda34841808c --- /dev/null +++ b/changelog.d/13800.misc @@ -0,0 +1 @@ +Add support for persisting initial room creation events to the DB in a batch. From 75a802169c5d5983c43772047d2489f90672c0ed Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Fri, 23 Sep 2022 11:46:28 -0700 Subject: [PATCH 05/10] modify handle_new_client_event to take a list of [event,context] tuples --- synapse/handlers/message.py | 209 ++++++++++++++------------------ synapse/handlers/room.py | 11 +- synapse/handlers/room_batch.py | 3 +- synapse/handlers/room_member.py | 11 +- 4 files changed, 99 insertions(+), 135 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index ffb0caf7fd89..6fdbfa679c6c 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1019,8 +1019,7 @@ async def create_and_send_nonmember_event( ev = await self.handle_new_client_event( requester=requester, - event=event, - context=context, + events_and_context=[(event, context)], ratelimit=ratelimit, ignore_shadow_ban=ignore_shadow_ban, ) @@ -1292,49 +1291,6 @@ async def _validate_event_relation(self, event: EventBase) -> None: 400, "Cannot start threads from an event with a relation" ) - async def handle_create_room_events( - self, - requester: Requester, - events_and_ctx: List[Tuple[EventBase, EventContext]], - ratelimit: bool = True, - ) -> EventBase: - """ - Process a batch of room creation events. For each event in the list it checks - the authorization and that the event can be serialized. Returns the last event in the - list once it has been persisted. - Args: - requester: the room creator - events_and_ctx: a set of events and their associated contexts to persist - ratelimit: whether to ratelimit this request - """ - for event, context in events_and_ctx: - try: - validate_event_for_room_version(event) - await self._event_auth_handler.check_auth_rules_from_context( - event, context - ) - except AuthError as err: - logger.warning("Denying new event %r because %s", event, err) - raise err - - # Ensure that we can round trip before trying to persist in db - try: - dump = json_encoder.encode(event.content) - json_decoder.decode(dump) - except Exception: - logger.exception("Failed to encode content: %r", event.content) - raise - - # We now persist the events - try: - result = await self._persist_events_batch( - requester, events_and_ctx, ratelimit - ) - except Exception as e: - logger.info(f"Encountered an error persisting events: {e}") - - return result - async def _persist_events_batch( self, requester: Requester, @@ -1437,13 +1393,12 @@ async def _notify() -> None: async def handle_new_client_event( self, requester: Requester, - event: EventBase, - context: EventContext, + events_and_context: List[Tuple[EventBase, EventContext]], ratelimit: bool = True, extra_users: Optional[List[UserID]] = None, ignore_shadow_ban: bool = False, ) -> EventBase: - """Processes a new event. + """Processes new events. This includes deduplicating, checking auth, persisting, notifying users, sending to remote servers, etc. @@ -1453,8 +1408,7 @@ async def handle_new_client_event( Args: requester - event - context + events_and_context: A list of one or more tuples of event, context to be persisted ratelimit extra_users: Any extra users to notify about event @@ -1472,84 +1426,98 @@ async def handle_new_client_event( """ extra_users = extra_users or [] - # we don't apply shadow-banning to membership events here. Invites are blocked - # higher up the stack, and we allow shadow-banned users to send join and leave - # events as normal. - if ( - event.type != EventTypes.Member - and not ignore_shadow_ban - and requester.shadow_banned - ): - # We randomly sleep a bit just to annoy the requester. - await self.clock.sleep(random.randint(1, 10)) - raise ShadowBanError() + for event, context in events_and_context: + # we don't apply shadow-banning to membership events here. Invites are blocked + # higher up the stack, and we allow shadow-banned users to send join and leave + # events as normal. + if ( + event.type != EventTypes.Member + and not ignore_shadow_ban + and requester.shadow_banned + ): + # We randomly sleep a bit just to annoy the requester. + await self.clock.sleep(random.randint(1, 10)) + raise ShadowBanError() - if event.is_state(): - prev_event = await self.deduplicate_state_event(event, context) - if prev_event is not None: - logger.info( - "Not bothering to persist state event %s duplicated by %s", - event.event_id, - prev_event.event_id, - ) - return prev_event + if event.is_state(): + prev_event = await self.deduplicate_state_event(event, context) + if prev_event is not None: + logger.info( + "Not bothering to persist state event %s duplicated by %s", + event.event_id, + prev_event.event_id, + ) + return prev_event - if event.internal_metadata.is_out_of_band_membership(): - # the only sort of out-of-band-membership events we expect to see here are - # invite rejections and rescinded knocks that we have generated ourselves. - assert event.type == EventTypes.Member - assert event.content["membership"] == Membership.LEAVE - else: + if event.internal_metadata.is_out_of_band_membership(): + # the only sort of out-of-band-membership events we expect to see here are + # invite rejections and rescinded knocks that we have generated ourselves. + assert event.type == EventTypes.Member + assert event.content["membership"] == Membership.LEAVE + else: + try: + validate_event_for_room_version(event) + await self._event_auth_handler.check_auth_rules_from_context( + event, context + ) + except AuthError as err: + logger.warning("Denying new event %r because %s", event, err) + raise err + + # Ensure that we can round trip before trying to persist in db try: - validate_event_for_room_version(event) - await self._event_auth_handler.check_auth_rules_from_context( - event, context + dump = json_encoder.encode(event.content) + json_decoder.decode(dump) + except Exception: + logger.exception("Failed to encode content: %r", event.content) + raise + + if len(events_and_context) > 1: + try: + result = await self._persist_events_batch( + requester, events_and_context, ratelimit ) - except AuthError as err: - logger.warning("Denying new event %r because %s", event, err) - raise err - # Ensure that we can round trip before trying to persist in db - try: - dump = json_encoder.encode(event.content) - json_decoder.decode(dump) - except Exception: - logger.exception("Failed to encode content: %r", event.content) - raise + except Exception as e: + logger.info(f"Encountered an error persisting events: {e}") - # We now persist the event (and update the cache in parallel, since we - # don't want to block on it). - try: - result, _ = await make_deferred_yieldable( - gather_results( - ( - run_in_background( - self._persist_event, - requester=requester, - event=event, - context=context, - ratelimit=ratelimit, - extra_users=extra_users, - ), - run_in_background( - self.cache_joined_hosts_for_event, event, context - ).addErrback( - log_failure, "cache_joined_hosts_for_event failed" + return result + + else: + # We now persist the event (and update the cache in parallel, since we + # don't want to block on it). + event, context = events_and_context[0] + try: + result, _ = await make_deferred_yieldable( + gather_results( + ( + run_in_background( + self._persist_event, + requester=requester, + event=event, + context=context, + ratelimit=ratelimit, + extra_users=extra_users, + ), + run_in_background( + self.cache_joined_hosts_for_event, event, context + ).addErrback( + log_failure, "cache_joined_hosts_for_event failed" + ), ), - ), - consumeErrors=True, + consumeErrors=True, + ) + ).addErrback(unwrapFirstError) + except PartialStateConflictError as e: + # The event context needs to be recomputed. + # Turn the error into a 429, as a hint to the client to try again. + logger.info( + "Room %s was un-partial stated while persisting client event.", + event.room_id, ) - ).addErrback(unwrapFirstError) - except PartialStateConflictError as e: - # The event context needs to be recomputed. - # Turn the error into a 429, as a hint to the client to try again. - logger.info( - "Room %s was un-partial stated while persisting client event.", - event.room_id, - ) - raise LimitExceededError(msg=e.msg, errcode=e.errcode, retry_after_ms=0) + raise LimitExceededError(msg=e.msg, errcode=e.errcode, retry_after_ms=0) - return result + return result async def _persist_event( self, @@ -2107,8 +2075,7 @@ async def _send_dummy_event_for_room(self, room_id: str) -> bool: # shadow-banned user. await self.handle_new_client_event( requester, - event, - context, + events_and_context=[(event, context)], ratelimit=False, ignore_shadow_ban=True, ) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index a4e865724731..b220238e5597 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -301,8 +301,7 @@ async def _upgrade_room( # now send the tombstone await self.event_creation_handler.handle_new_client_event( requester=requester, - event=tombstone_event, - context=tombstone_context, + events_and_context=[(tombstone_event, tombstone_context)], ) state_filter = StateFilter.from_types( @@ -1114,8 +1113,7 @@ async def send( ev = await self.event_creation_handler.handle_new_client_event( requester=creator, - event=event, - context=context, + events_and_context=[(event, context)], ratelimit=False, ignore_shadow_ban=True, ) @@ -1154,7 +1152,6 @@ async def send( prev_event_ids=[last_sent_event_id], depth=depth, ) - # last_sent_event_id = member_event_id prev_event = [member_event_id] # update the depth and state map here as the membership event has been created @@ -1273,8 +1270,8 @@ async def send( ) events_to_send.append((encryption_event, encryption_context)) - last_event = await self.event_creation_handler.handle_create_room_events( - creator, events_to_send + last_event = await self.event_creation_handler.handle_new_client_event( + creator, events_to_send, ignore_shadow_ban=True ) assert last_event.internal_metadata.stream_ordering is not None return last_event.internal_metadata.stream_ordering, last_event.event_id, depth diff --git a/synapse/handlers/room_batch.py b/synapse/handlers/room_batch.py index 1414e575d6fc..411a6fb22fdb 100644 --- a/synapse/handlers/room_batch.py +++ b/synapse/handlers/room_batch.py @@ -379,8 +379,7 @@ async def persist_historical_events( await self.create_requester_for_user_id_from_app_service( event.sender, app_service_requester.app_service ), - event=event, - context=context, + events_and_context=[(event, context)], ) return event_ids diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 8d01f4bf2be5..88158822e009 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -432,8 +432,7 @@ async def _local_membership_update( with opentracing.start_active_span("handle_new_client_event"): result_event = await self.event_creation_handler.handle_new_client_event( requester, - event, - context, + events_and_context=[(event, context)], extra_users=[target], ratelimit=ratelimit, ) @@ -1252,7 +1251,10 @@ async def send_membership_event( raise SynapseError(403, "This room has been blocked on this server") event = await self.event_creation_handler.handle_new_client_event( - requester, event, context, extra_users=[target_user], ratelimit=ratelimit + requester, + events_and_context=[(event, context)], + extra_users=[target_user], + ratelimit=ratelimit, ) prev_member_event_id = prev_state_ids.get( @@ -1860,8 +1862,7 @@ async def _generate_local_out_of_band_leave( result_event = await self.event_creation_handler.handle_new_client_event( requester, - event, - context, + events_and_context=[(event, context)], extra_users=[UserID.from_string(target_user)], ) # we know it was persisted, so must have a stream ordering From aac9a4a16bbb52548d1a93429806b3d565ebfd10 Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Fri, 23 Sep 2022 11:46:49 -0700 Subject: [PATCH 06/10] update tests to reflect new calling convention --- tests/handlers/test_message.py | 10 ++++++++-- tests/handlers/test_register.py | 4 +++- tests/storage/test_event_chain.py | 8 ++++++-- tests/unittest.py | 4 +++- 4 files changed, 20 insertions(+), 6 deletions(-) diff --git a/tests/handlers/test_message.py b/tests/handlers/test_message.py index 986b50ce0c8b..99384837d05c 100644 --- a/tests/handlers/test_message.py +++ b/tests/handlers/test_message.py @@ -105,7 +105,10 @@ def test_duplicated_txn_id(self): event1, context = self._create_duplicate_event(txn_id) ret_event1 = self.get_success( - self.handler.handle_new_client_event(self.requester, event1, context) + self.handler.handle_new_client_event( + self.requester, + events_and_context=[(event1, context)], + ) ) stream_id1 = ret_event1.internal_metadata.stream_ordering @@ -118,7 +121,10 @@ def test_duplicated_txn_id(self): self.assertNotEqual(event1.event_id, event2.event_id) ret_event2 = self.get_success( - self.handler.handle_new_client_event(self.requester, event2, context) + self.handler.handle_new_client_event( + self.requester, + events_and_context=[(event2, context)], + ) ) stream_id2 = ret_event2.internal_metadata.stream_ordering diff --git a/tests/handlers/test_register.py b/tests/handlers/test_register.py index 86b3d5197547..765df75d914f 100644 --- a/tests/handlers/test_register.py +++ b/tests/handlers/test_register.py @@ -497,7 +497,9 @@ def test_auto_create_auto_join_room_preset_invalid_permissions(self): ) ) self.get_success( - event_creation_handler.handle_new_client_event(requester, event, context) + event_creation_handler.handle_new_client_event( + requester, events_and_context=[(event, context)] + ) ) # Register a second user, which won't be be in the room (or even have an invite) diff --git a/tests/storage/test_event_chain.py b/tests/storage/test_event_chain.py index a0ce077a9957..de9f4af2de90 100644 --- a/tests/storage/test_event_chain.py +++ b/tests/storage/test_event_chain.py @@ -531,7 +531,9 @@ def _generate_room(self) -> Tuple[str, List[Set[str]]]: ) ) self.get_success( - event_handler.handle_new_client_event(self.requester, event, context) + event_handler.handle_new_client_event( + self.requester, events_and_context=[(event, context)] + ) ) state1 = set(self.get_success(context.get_current_state_ids()).values()) @@ -549,7 +551,9 @@ def _generate_room(self) -> Tuple[str, List[Set[str]]]: ) ) self.get_success( - event_handler.handle_new_client_event(self.requester, event, context) + event_handler.handle_new_client_event( + self.requester, events_and_context=[(event, context)] + ) ) state2 = set(self.get_success(context.get_current_state_ids()).values()) diff --git a/tests/unittest.py b/tests/unittest.py index 00cb023198b5..5116be338ee0 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -734,7 +734,9 @@ def create_and_send_event( event.internal_metadata.soft_failed = True self.get_success( - event_creator.handle_new_client_event(requester, event, context) + event_creator.handle_new_client_event( + requester, events_and_context=[(event, context)] + ) ) return event.event_id From 0f50b9b9125d20f7f5d1ba10572ea0291cfe3cb3 Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Mon, 26 Sep 2022 20:41:33 -0700 Subject: [PATCH 07/10] use bulk path to persist all events --- synapse/handlers/message.py | 367 +++++++++--------------- synapse/replication/http/send_event.py | 4 +- synapse/replication/http/send_events.py | 20 +- 3 files changed, 150 insertions(+), 241 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 6fdbfa679c6c..b634761ea7af 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1291,104 +1291,6 @@ async def _validate_event_relation(self, event: EventBase) -> None: 400, "Cannot start threads from an event with a relation" ) - async def _persist_events_batch( - self, - requester: Requester, - events_and_ctx: List[Tuple[EventBase, EventContext]], - ratelimit: bool = True, - ) -> EventBase: - """ - Processes the push actions and adds them to the push staging area before attempting to - persist the batch of events. - See handle_create_room_events for arguments - Returns the last event in the list if persisted successfully - """ - for event, context in events_and_ctx: - with opentracing.start_active_span("calculate_push_actions"): - await self._bulk_push_rule_evaluator.action_for_event_by_user( - event, context - ) - try: - # If we're a worker we need to hit out to the master. - writer_instance = self._events_shard_config.get_instance(event.room_id) - if writer_instance != self._instance_name: - try: - result = await self.send_events( - instance_name=writer_instance, - store=self.store, - requester=requester, - events_and_ctx=events_and_ctx, - ratelimit=ratelimit, - ) - except SynapseError as e: - if e.code == HTTPStatus.CONFLICT: - raise PartialStateConflictError() - raise - stream_id = result["stream_id"] - - # If we newly persisted the event then we need to update its - # stream_ordering entry manually (as it was persisted on - # another worker). - event.internal_metadata.stream_ordering = stream_id - return event - - last_event = await self.persist_and_notify_batched_events( - requester, events_and_ctx, ratelimit - ) - except Exception: - # Ensure that we actually remove the entries in the push actions - # staging area, if we calculated them. - for event, _ in events_and_ctx: - await self.store.remove_push_actions_from_staging(event.event_id) - raise - - return last_event - - async def persist_and_notify_batched_events( - self, - requester: Requester, - events_and_ctx: List[Tuple[EventBase, EventContext]], - ratelimit: bool = True, - ) -> EventBase: - """ - Handles the actual persisting of a batch of events to the DB, and sends the appropriate - notifications when this is done. - Args: - requester: the room creator - events_and_ctx: list of events and their associated contexts to persist - ratelimit: whether to apply ratelimiting to this request - """ - if ratelimit: - await self.request_ratelimiter.ratelimit(requester) - - for event, context in events_and_ctx: - await self._actions_by_event_type(event, context) - - assert self._storage_controllers.persistence is not None - ( - persisted_events, - max_stream_token, - ) = await self._storage_controllers.persistence.persist_events(events_and_ctx) - - stream_ordering = persisted_events[-1].internal_metadata.stream_ordering - assert stream_ordering is not None - pos = PersistedEventPosition(self._instance_name, stream_ordering) - - async def _notify() -> None: - try: - await self.notifier.on_new_room_event( - persisted_events[-1], pos, max_stream_token - ) - except Exception: - logger.exception( - "Error notifying about new room event %s", - event.event_id, - ) - - run_in_background(_notify) - - return persisted_events[-1] - @measure_func("handle_new_client_event") async def handle_new_client_event( self, @@ -1472,62 +1374,48 @@ async def handle_new_client_event( logger.exception("Failed to encode content: %r", event.content) raise - if len(events_and_context) > 1: - try: - result = await self._persist_events_batch( - requester, events_and_context, ratelimit - ) - - except Exception as e: - logger.info(f"Encountered an error persisting events: {e}") - - return result - - else: - # We now persist the event (and update the cache in parallel, since we - # don't want to block on it). - event, context = events_and_context[0] - try: - result, _ = await make_deferred_yieldable( - gather_results( - ( - run_in_background( - self._persist_event, - requester=requester, - event=event, - context=context, - ratelimit=ratelimit, - extra_users=extra_users, - ), - run_in_background( - self.cache_joined_hosts_for_event, event, context - ).addErrback( - log_failure, "cache_joined_hosts_for_event failed" - ), + # We now persist the event (and update the cache in parallel, since we + # don't want to block on it). + event, context = events_and_context[0] + try: + result, _ = await make_deferred_yieldable( + gather_results( + ( + run_in_background( + self._persist_events, + requester=requester, + events_and_context=events_and_context, + ratelimit=ratelimit, + extra_users=extra_users, ), - consumeErrors=True, - ) - ).addErrback(unwrapFirstError) - except PartialStateConflictError as e: - # The event context needs to be recomputed. - # Turn the error into a 429, as a hint to the client to try again. - logger.info( - "Room %s was un-partial stated while persisting client event.", - event.room_id, + run_in_background( + self.cache_joined_hosts_for_event, event, context + ).addErrback( + log_failure, "cache_joined_hosts_for_event failed" + ), + ), + consumeErrors=True, ) - raise LimitExceededError(msg=e.msg, errcode=e.errcode, retry_after_ms=0) + ).addErrback(unwrapFirstError) + except PartialStateConflictError as e: + # The event context needs to be recomputed. + # Turn the error into a 429, as a hint to the client to try again. + logger.info( + "Room %s was un-partial stated while persisting client event.", + event.room_id, + ) + raise LimitExceededError(msg=e.msg, errcode=e.errcode, retry_after_ms=0) - return result + return result - async def _persist_event( + async def _persist_events( self, requester: Requester, - event: EventBase, - context: EventContext, + events_and_context: List[Tuple[EventBase, EventContext]], ratelimit: bool = True, extra_users: Optional[List[UserID]] = None, ) -> EventBase: - """Actually persists the event. Should only be called by + """Actually persists new events. Should only be called by `handle_new_client_event`, and see its docstring for documentation of the arguments. @@ -1535,29 +1423,31 @@ async def _persist_event( a room that has been un-partial stated. """ - # Skip push notification actions for historical messages - # because we don't want to notify people about old history back in time. - # The historical messages also do not have the proper `context.current_state_ids` - # and `state_groups` because they have `prev_events` that aren't persisted yet - # (historical messages persisted in reverse-chronological order). - if not event.internal_metadata.is_historical(): - with opentracing.start_active_span("calculate_push_actions"): - await self._bulk_push_rule_evaluator.action_for_event_by_user( - event, context - ) + for event, context in events_and_context: + # Skip push notification actions for historical messages + # because we don't want to notify people about old history back in time. + # The historical messages also do not have the proper `context.current_state_ids` + # and `state_groups` because they have `prev_events` that aren't persisted yet + # (historical messages persisted in reverse-chronological order). + if not event.internal_metadata.is_historical(): + with opentracing.start_active_span("calculate_push_actions"): + await self._bulk_push_rule_evaluator.action_for_event_by_user( + event, context + ) try: # If we're a worker we need to hit out to the master. - writer_instance = self._events_shard_config.get_instance(event.room_id) + first_event, _ = events_and_context[0] + writer_instance = self._events_shard_config.get_instance( + first_event.room_id + ) if writer_instance != self._instance_name: try: - result = await self.send_event( + result = await self.send_events( instance_name=writer_instance, - event_id=event.event_id, + events_and_context=events_and_context, store=self.store, requester=requester, - event=event, - context=context, ratelimit=ratelimit, extra_users=extra_users, ) @@ -1567,7 +1457,13 @@ async def _persist_event( raise stream_id = result["stream_id"] event_id = result["event_id"] - if event_id != event.event_id: + + # If we batch persisted events we return the last persisted event, otherwise + # we return the one event that was persisted + event, _ = events_and_context[-1] + + # We don't worry about de-duplicating batch persisted events + if event_id != event.event_id and len(events_and_context) == 1: # If we get a different event back then it means that its # been de-duplicated, so we replace the given event with the # one already persisted. @@ -1579,15 +1475,19 @@ async def _persist_event( event.internal_metadata.stream_ordering = stream_id return event - event = await self.persist_and_notify_client_event( - requester, event, context, ratelimit=ratelimit, extra_users=extra_users + event = await self.persist_and_notify_client_events( + requester, + events_and_context, + ratelimit=ratelimit, + extra_users=extra_users, ) return event except Exception: - # Ensure that we actually remove the entries in the push actions - # staging area, if we calculated them. - await self.store.remove_push_actions_from_staging(event.event_id) + for event, _ in events_and_context: + # Ensure that we actually remove the entries in the push actions + # staging area, if we calculated them. + await self.store.remove_push_actions_from_staging(event.event_id) raise async def cache_joined_hosts_for_event( @@ -1681,23 +1581,23 @@ async def _validate_canonical_alias( Codes.BAD_ALIAS, ) - async def persist_and_notify_client_event( + async def persist_and_notify_client_events( self, requester: Requester, - event: EventBase, - context: EventContext, + events_and_context: List[Tuple[EventBase, EventContext]], ratelimit: bool = True, extra_users: Optional[List[UserID]] = None, ) -> EventBase: - """Called when we have fully built the event, have already - calculated the push actions for the event, and checked auth. + """Called when we have fully built the events, have already + calculated the push actions for the events, and checked auth. This should only be run on the instance in charge of persisting events. Returns: - The persisted event. This may be different than the given event if - it was de-duplicated (e.g. because we had already persisted an - event with the same transaction ID.) + The persisted event, if one event is passed in, or the last event in the + list in the case of batch persisting. If only one event was persisted, the + returned event may be different than the given event if it was de-duplicated + (e.g. because we had already persisted an event with the same transaction ID.) Raises: PartialStateConflictError: if attempting to persist a partial state event in @@ -1705,78 +1605,81 @@ async def persist_and_notify_client_event( """ extra_users = extra_users or [] - assert self._storage_controllers.persistence is not None - assert self._events_shard_config.should_handle( - self._instance_name, event.room_id - ) + for event, context in events_and_context: + assert self._events_shard_config.should_handle( + self._instance_name, event.room_id + ) - if ratelimit: - # We check if this is a room admin redacting an event so that we - # can apply different ratelimiting. We do this by simply checking - # it's not a self-redaction (to avoid having to look up whether the - # user is actually admin or not). - is_admin_redaction = False - if event.type == EventTypes.Redaction: - assert event.redacts is not None - - original_event = await self.store.get_event( - event.redacts, - redact_behaviour=EventRedactBehaviour.as_is, - get_prev_content=False, - allow_rejected=False, - allow_none=True, - ) + if ratelimit: + # We check if this is a room admin redacting an event so that we + # can apply different ratelimiting. We do this by simply checking + # it's not a self-redaction (to avoid having to look up whether the + # user is actually admin or not). + is_admin_redaction = False + if event.type == EventTypes.Redaction: + assert event.redacts is not None + + original_event = await self.store.get_event( + event.redacts, + redact_behaviour=EventRedactBehaviour.as_is, + get_prev_content=False, + allow_rejected=False, + allow_none=True, + ) - is_admin_redaction = bool( - original_event and event.sender != original_event.sender - ) + is_admin_redaction = bool( + original_event and event.sender != original_event.sender + ) - await self.request_ratelimiter.ratelimit( - requester, is_admin_redaction=is_admin_redaction - ) + await self.request_ratelimiter.ratelimit( + requester, is_admin_redaction=is_admin_redaction + ) - # run checks/actions on event based on type - await self._actions_by_event_type(event, context) + # run checks/actions on event based on type + await self._actions_by_event_type(event, context) - # Mark any `m.historical` messages as backfilled so they don't appear - # in `/sync` and have the proper decrementing `stream_ordering` as we import - backfilled = False - if event.internal_metadata.is_historical(): - backfilled = True + # Mark any `m.historical` messages as backfilled so they don't appear + # in `/sync` and have the proper decrementing `stream_ordering` as we import + backfilled = False + if event.internal_metadata.is_historical(): + backfilled = True - # Note that this returns the event that was persisted, which may not be - # the same as we passed in if it was deduplicated due transaction IDs. + assert self._storage_controllers.persistence is not None ( - event, - event_pos, + persisted_events, max_stream_token, - ) = await self._storage_controllers.persistence.persist_event( - event, context=context, backfilled=backfilled + ) = await self._storage_controllers.persistence.persist_events( + events_and_context, backfilled=backfilled ) - if self._ephemeral_events_enabled: - # If there's an expiry timestamp on the event, schedule its expiry. - self._message_handler.maybe_schedule_expiry(event) + for event in persisted_events: + if self._ephemeral_events_enabled: + # If there's an expiry timestamp on the event, schedule its expiry. + self._message_handler.maybe_schedule_expiry(event) - async def _notify() -> None: - try: - await self.notifier.on_new_room_event( - event, event_pos, max_stream_token, extra_users=extra_users - ) - except Exception: - logger.exception( - "Error notifying about new room event %s", - event.event_id, - ) + stream_ordering = event.internal_metadata.stream_ordering + assert stream_ordering is not None + pos = PersistedEventPosition(self._instance_name, stream_ordering) - run_in_background(_notify) + async def _notify() -> None: + try: + await self.notifier.on_new_room_event( + event, pos, max_stream_token, extra_users=extra_users + ) + except Exception: + logger.exception( + "Error notifying about new room event %s", + event.event_id, + ) + + run_in_background(_notify) - if event.type == EventTypes.Message: - # We don't want to block sending messages on any presence code. This - # matters as sometimes presence code can take a while. - run_in_background(self._bump_active_time, requester.user) + if event.type == EventTypes.Message: + # We don't want to block sending messages on any presence code. This + # matters as sometimes presence code can take a while. + run_in_background(self._bump_active_time, requester.user) - return event + return persisted_events[-1] async def _actions_by_event_type( self, event: EventBase, context: EventContext diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py index 486f04723c89..4215a1c1bc41 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py @@ -141,8 +141,8 @@ async def _handle_request( # type: ignore[override] "Got event to send with ID: %s into room: %s", event.event_id, event.room_id ) - event = await self.event_creation_handler.persist_and_notify_client_event( - requester, event, context, ratelimit=ratelimit, extra_users=extra_users + event = await self.event_creation_handler.persist_and_notify_client_events( + requester, [(event, context)], ratelimit=ratelimit, extra_users=extra_users ) return ( diff --git a/synapse/replication/http/send_events.py b/synapse/replication/http/send_events.py index 5fad398b7946..8889bbb644e1 100644 --- a/synapse/replication/http/send_events.py +++ b/synapse/replication/http/send_events.py @@ -23,7 +23,7 @@ from synapse.http.server import HttpServer from synapse.http.servlet import parse_json_object_from_request from synapse.replication.http._base import ReplicationEndpoint -from synapse.types import JsonDict, Requester +from synapse.types import JsonDict, Requester, UserID from synapse.util.metrics import Measure if TYPE_CHECKING: @@ -78,10 +78,11 @@ def __init__(self, hs: "HomeServer"): @staticmethod async def _serialize_payload( # type: ignore[override] + events_and_context: List[Tuple[EventBase, EventContext]], store: "DataStore", - events_and_ctx: List[Tuple[EventBase, EventContext]], requester: Requester, ratelimit: bool, + extra_users: List[UserID], ) -> JsonDict: """ Args: @@ -92,7 +93,7 @@ async def _serialize_payload( # type: ignore[override] """ serialized_events = [] - for event, context in events_and_ctx: + for event, context in events_and_context: serialized_context = await context.serialize(event, store) serialized_event = { "event": event.get_pdu_json(), @@ -104,6 +105,7 @@ async def _serialize_payload( # type: ignore[override] "context": serialized_context, "requester": requester.serialize(), "ratelimit": ratelimit, + "extra_users": [u.to_string() for u in extra_users], } serialized_events.append(serialized_event) @@ -116,7 +118,7 @@ async def _handle_request( # type: ignore[override] ) -> Tuple[int, JsonDict]: with Measure(self.clock, "repl_send_events_parse"): payload = parse_json_object_from_request(request) - events_and_ctx = [] + events_and_context = [] events = payload["events"] for event_payload in events: @@ -138,7 +140,11 @@ async def _handle_request( # type: ignore[override] ) ratelimit = event_payload["ratelimit"] - events_and_ctx.append((event, context)) + events_and_context.append((event, context)) + + extra_users = [ + UserID.from_string(u) for u in event_payload["extra_users"] + ] logger.info( "Got batch of events to send, last ID of batch is: %s, sending into room: %s", @@ -147,8 +153,8 @@ async def _handle_request( # type: ignore[override] ) last_event = ( - await self.event_creation_handler.persist_and_notify_batched_events( - requester, events_and_ctx, ratelimit + await self.event_creation_handler.persist_and_notify_client_events( + requester, events_and_context, ratelimit, extra_users ) ) From bdc15b22fb0694285a264272ee311ec11cfa55a0 Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Mon, 26 Sep 2022 20:50:05 -0700 Subject: [PATCH 08/10] fix the diff --- synapse/handlers/message.py | 428 ++++++++++++++++++------------------ 1 file changed, 217 insertions(+), 211 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index b634761ea7af..23467634c380 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1636,7 +1636,223 @@ async def persist_and_notify_client_events( ) # run checks/actions on event based on type - await self._actions_by_event_type(event, context) + if event.type == EventTypes.Member and event.membership == Membership.JOIN: + ( + current_membership, + _, + ) = await self.store.get_local_current_membership_for_user_in_room( + event.state_key, event.room_id + ) + if current_membership != Membership.JOIN: + self._notifier.notify_user_joined_room( + event.event_id, event.room_id + ) + + await self._maybe_kick_guest_users(event, context) + + if event.type == EventTypes.CanonicalAlias: + # Validate a newly added alias or newly added alt_aliases. + + original_alias = None + original_alt_aliases: object = [] + + original_event_id = event.unsigned.get("replaces_state") + if original_event_id: + original_alias_event = await self.store.get_event(original_event_id) + + if original_alias_event: + original_alias = original_alias_event.content.get("alias", None) + original_alt_aliases = original_alias_event.content.get( + "alt_aliases", [] + ) + + # Check the alias is currently valid (if it has changed). + room_alias_str = event.content.get("alias", None) + directory_handler = self.hs.get_directory_handler() + if room_alias_str and room_alias_str != original_alias: + await self._validate_canonical_alias( + directory_handler, room_alias_str, event.room_id + ) + + # Check that alt_aliases is the proper form. + alt_aliases = event.content.get("alt_aliases", []) + if not isinstance(alt_aliases, (list, tuple)): + raise SynapseError( + 400, + "The alt_aliases property must be a list.", + Codes.INVALID_PARAM, + ) + + # If the old version of alt_aliases is of an unknown form, + # completely replace it. + if not isinstance(original_alt_aliases, (list, tuple)): + # TODO: check that the original_alt_aliases' entries are all strings + original_alt_aliases = [] + + # Check that each alias is currently valid. + new_alt_aliases = set(alt_aliases) - set(original_alt_aliases) + if new_alt_aliases: + for alias_str in new_alt_aliases: + await self._validate_canonical_alias( + directory_handler, alias_str, event.room_id + ) + + federation_handler = self.hs.get_federation_handler() + + if event.type == EventTypes.Member: + if event.content["membership"] == Membership.INVITE: + event.unsigned[ + "invite_room_state" + ] = await self.store.get_stripped_room_state_from_event_context( + context, + self.room_prejoin_state_types, + membership_user_id=event.sender, + ) + + invitee = UserID.from_string(event.state_key) + if not self.hs.is_mine(invitee): + # TODO: Can we add signature from remote server in a nicer + # way? If we have been invited by a remote server, we need + # to get them to sign the event. + + returned_invite = await federation_handler.send_invite( + invitee.domain, event + ) + event.unsigned.pop("room_state", None) + + # TODO: Make sure the signatures actually are correct. + event.signatures.update(returned_invite.signatures) + + if event.content["membership"] == Membership.KNOCK: + event.unsigned[ + "knock_room_state" + ] = await self.store.get_stripped_room_state_from_event_context( + context, + self.room_prejoin_state_types, + ) + + if event.type == EventTypes.Redaction: + assert event.redacts is not None + + original_event = await self.store.get_event( + event.redacts, + redact_behaviour=EventRedactBehaviour.as_is, + get_prev_content=False, + allow_rejected=False, + allow_none=True, + ) + + room_version = await self.store.get_room_version_id(event.room_id) + room_version_obj = KNOWN_ROOM_VERSIONS[room_version] + + # we can make some additional checks now if we have the original event. + if original_event: + if original_event.type == EventTypes.Create: + raise AuthError(403, "Redacting create events is not permitted") + + if original_event.room_id != event.room_id: + raise SynapseError( + 400, "Cannot redact event from a different room" + ) + + if original_event.type == EventTypes.ServerACL: + raise AuthError( + 403, "Redacting server ACL events is not permitted" + ) + + # Add a little safety stop-gap to prevent people from trying to + # redact MSC2716 related events when they're in a room version + # which does not support it yet. We allow people to use MSC2716 + # events in existing room versions but only from the room + # creator since it does not require any changes to the auth + # rules and in effect, the redaction algorithm . In the + # supported room version, we add the `historical` power level to + # auth the MSC2716 related events and adjust the redaction + # algorthim to keep the `historical` field around (redacting an + # event should only strip fields which don't affect the + # structural protocol level). + is_msc2716_event = ( + original_event.type == EventTypes.MSC2716_INSERTION + or original_event.type == EventTypes.MSC2716_BATCH + or original_event.type == EventTypes.MSC2716_MARKER + ) + if not room_version_obj.msc2716_historical and is_msc2716_event: + raise AuthError( + 403, + "Redacting MSC2716 events is not supported in this room version", + ) + + event_types = event_auth.auth_types_for_event(event.room_version, event) + prev_state_ids = await context.get_prev_state_ids( + StateFilter.from_types(event_types) + ) + + auth_events_ids = self._event_auth_handler.compute_auth_events( + event, prev_state_ids, for_verification=True + ) + auth_events_map = await self.store.get_events(auth_events_ids) + auth_events = { + (e.type, e.state_key): e for e in auth_events_map.values() + } + + if event_auth.check_redaction( + room_version_obj, event, auth_events=auth_events + ): + # this user doesn't have 'redact' rights, so we need to do some more + # checks on the original event. Let's start by checking the original + # event exists. + if not original_event: + raise NotFoundError( + "Could not find event %s" % (event.redacts,) + ) + + if event.user_id != original_event.user_id: + raise AuthError( + 403, "You don't have permission to redact events" + ) + + # all the checks are done. + event.internal_metadata.recheck_redaction = False + + if event.type == EventTypes.Create: + prev_state_ids = await context.get_prev_state_ids() + if prev_state_ids: + raise AuthError(403, "Changing the room create event is forbidden") + + if event.type == EventTypes.MSC2716_INSERTION: + room_version = await self.store.get_room_version_id(event.room_id) + room_version_obj = KNOWN_ROOM_VERSIONS[room_version] + + create_event = await self.store.get_create_event_for_room(event.room_id) + room_creator = create_event.content.get(EventContentFields.ROOM_CREATOR) + + # Only check an insertion event if the room version + # supports it or the event is from the room creator. + if room_version_obj.msc2716_historical or ( + self.config.experimental.msc2716_enabled + and event.sender == room_creator + ): + next_batch_id = event.content.get( + EventContentFields.MSC2716_NEXT_BATCH_ID + ) + conflicting_insertion_event_id = None + if next_batch_id: + conflicting_insertion_event_id = ( + await self.store.get_insertion_event_id_by_batch_id( + event.room_id, next_batch_id + ) + ) + if conflicting_insertion_event_id is not None: + # The current insertion event that we're processing is invalid + # because an insertion event already exists in the room with the + # same next_batch_id. We can't allow multiple because the batch + # pointing will get weird, e.g. we can't determine which insertion + # event the batch event is pointing to. + raise SynapseError( + HTTPStatus.BAD_REQUEST, + "Another insertion event already exists with the same next_batch_id", + errcode=Codes.INVALID_PARAM, + ) # Mark any `m.historical` messages as backfilled so they don't appear # in `/sync` and have the proper decrementing `stream_ordering` as we import @@ -1681,216 +1897,6 @@ async def _notify() -> None: return persisted_events[-1] - async def _actions_by_event_type( - self, event: EventBase, context: EventContext - ) -> None: - """ - Helper function to execute actions/checks based on the event type - """ - if event.type == EventTypes.Member and event.membership == Membership.JOIN: - ( - current_membership, - _, - ) = await self.store.get_local_current_membership_for_user_in_room( - event.state_key, event.room_id - ) - if current_membership != Membership.JOIN: - self._notifier.notify_user_joined_room(event.event_id, event.room_id) - - await self._maybe_kick_guest_users(event, context) - - if event.type == EventTypes.CanonicalAlias: - # Validate a newly added alias or newly added alt_aliases. - - original_alias = None - original_alt_aliases: object = [] - - original_event_id = event.unsigned.get("replaces_state") - if original_event_id: - original_alias_event = await self.store.get_event(original_event_id) - - if original_alias_event: - original_alias = original_alias_event.content.get("alias", None) - original_alt_aliases = original_alias_event.content.get( - "alt_aliases", [] - ) - - # Check the alias is currently valid (if it has changed). - room_alias_str = event.content.get("alias", None) - directory_handler = self.hs.get_directory_handler() - if room_alias_str and room_alias_str != original_alias: - await self._validate_canonical_alias( - directory_handler, room_alias_str, event.room_id - ) - - # Check that alt_aliases is the proper form. - alt_aliases = event.content.get("alt_aliases", []) - if not isinstance(alt_aliases, (list, tuple)): - raise SynapseError( - 400, "The alt_aliases property must be a list.", Codes.INVALID_PARAM - ) - - # If the old version of alt_aliases is of an unknown form, - # completely replace it. - if not isinstance(original_alt_aliases, (list, tuple)): - # TODO: check that the original_alt_aliases' entries are all strings - original_alt_aliases = [] - - # Check that each alias is currently valid. - new_alt_aliases = set(alt_aliases) - set(original_alt_aliases) - if new_alt_aliases: - for alias_str in new_alt_aliases: - await self._validate_canonical_alias( - directory_handler, alias_str, event.room_id - ) - - federation_handler = self.hs.get_federation_handler() - - if event.type == EventTypes.Member: - if event.content["membership"] == Membership.INVITE: - event.unsigned[ - "invite_room_state" - ] = await self.store.get_stripped_room_state_from_event_context( - context, - self.room_prejoin_state_types, - membership_user_id=event.sender, - ) - - invitee = UserID.from_string(event.state_key) - if not self.hs.is_mine(invitee): - # TODO: Can we add signature from remote server in a nicer - # way? If we have been invited by a remote server, we need - # to get them to sign the event. - - returned_invite = await federation_handler.send_invite( - invitee.domain, event - ) - event.unsigned.pop("room_state", None) - - # TODO: Make sure the signatures actually are correct. - event.signatures.update(returned_invite.signatures) - - if event.content["membership"] == Membership.KNOCK: - event.unsigned[ - "knock_room_state" - ] = await self.store.get_stripped_room_state_from_event_context( - context, - self.room_prejoin_state_types, - ) - - if event.type == EventTypes.Redaction: - assert event.redacts is not None - - original_event = await self.store.get_event( - event.redacts, - redact_behaviour=EventRedactBehaviour.as_is, - get_prev_content=False, - allow_rejected=False, - allow_none=True, - ) - - room_version = await self.store.get_room_version_id(event.room_id) - room_version_obj = KNOWN_ROOM_VERSIONS[room_version] - - # we can make some additional checks now if we have the original event. - if original_event: - if original_event.type == EventTypes.Create: - raise AuthError(403, "Redacting create events is not permitted") - - if original_event.room_id != event.room_id: - raise SynapseError(400, "Cannot redact event from a different room") - - if original_event.type == EventTypes.ServerACL: - raise AuthError(403, "Redacting server ACL events is not permitted") - - # Add a little safety stop-gap to prevent people from trying to - # redact MSC2716 related events when they're in a room version - # which does not support it yet. We allow people to use MSC2716 - # events in existing room versions but only from the room - # creator since it does not require any changes to the auth - # rules and in effect, the redaction algorithm . In the - # supported room version, we add the `historical` power level to - # auth the MSC2716 related events and adjust the redaction - # algorthim to keep the `historical` field around (redacting an - # event should only strip fields which don't affect the - # structural protocol level). - is_msc2716_event = ( - original_event.type == EventTypes.MSC2716_INSERTION - or original_event.type == EventTypes.MSC2716_BATCH - or original_event.type == EventTypes.MSC2716_MARKER - ) - if not room_version_obj.msc2716_historical and is_msc2716_event: - raise AuthError( - 403, - "Redacting MSC2716 events is not supported in this room version", - ) - - event_types = event_auth.auth_types_for_event(event.room_version, event) - prev_state_ids = await context.get_prev_state_ids( - StateFilter.from_types(event_types) - ) - - auth_events_ids = self._event_auth_handler.compute_auth_events( - event, prev_state_ids, for_verification=True - ) - auth_events_map = await self.store.get_events(auth_events_ids) - auth_events = {(e.type, e.state_key): e for e in auth_events_map.values()} - - if event_auth.check_redaction( - room_version_obj, event, auth_events=auth_events - ): - # this user doesn't have 'redact' rights, so we need to do some more - # checks on the original event. Let's start by checking the original - # event exists. - if not original_event: - raise NotFoundError("Could not find event %s" % (event.redacts,)) - - if event.user_id != original_event.user_id: - raise AuthError(403, "You don't have permission to redact events") - - # all the checks are done. - event.internal_metadata.recheck_redaction = False - - if event.type == EventTypes.Create: - prev_state_ids = await context.get_prev_state_ids() - if prev_state_ids: - raise AuthError(403, "Changing the room create event is forbidden") - - if event.type == EventTypes.MSC2716_INSERTION: - room_version = await self.store.get_room_version_id(event.room_id) - room_version_obj = KNOWN_ROOM_VERSIONS[room_version] - - create_event = await self.store.get_create_event_for_room(event.room_id) - room_creator = create_event.content.get(EventContentFields.ROOM_CREATOR) - - # Only check an insertion event if the room version - # supports it or the event is from the room creator. - if room_version_obj.msc2716_historical or ( - self.config.experimental.msc2716_enabled - and event.sender == room_creator - ): - next_batch_id = event.content.get( - EventContentFields.MSC2716_NEXT_BATCH_ID - ) - conflicting_insertion_event_id = None - if next_batch_id: - conflicting_insertion_event_id = ( - await self.store.get_insertion_event_id_by_batch_id( - event.room_id, next_batch_id - ) - ) - if conflicting_insertion_event_id is not None: - # The current insertion event that we're processing is invalid - # because an insertion event already exists in the room with the - # same next_batch_id. We can't allow multiple because the batch - # pointing will get weird, e.g. we can't determine which insertion - # event the batch event is pointing to. - raise SynapseError( - HTTPStatus.BAD_REQUEST, - "Another insertion event already exists with the same next_batch_id", - errcode=Codes.INVALID_PARAM, - ) - async def _maybe_kick_guest_users( self, event: EventBase, context: EventContext ) -> None: From 103b1067a9e2129751137fe461774e303fb215a4 Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Tue, 27 Sep 2022 10:03:47 -0700 Subject: [PATCH 09/10] requested changes --- changelog.d/13800.misc | 3 ++- synapse/handlers/message.py | 12 ++++++++---- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/changelog.d/13800.misc b/changelog.d/13800.misc index eda34841808c..c70f94eda584 100644 --- a/changelog.d/13800.misc +++ b/changelog.d/13800.misc @@ -1 +1,2 @@ -Add support for persisting initial room creation events to the DB in a batch. +Speed up creation of DM rooms. + diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 23467634c380..00e7645ba5cc 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1300,7 +1300,8 @@ async def handle_new_client_event( extra_users: Optional[List[UserID]] = None, ignore_shadow_ban: bool = False, ) -> EventBase: - """Processes new events. + """Processes new events. Please note that if batch persisting events, an error in + handling any one of these events will result in all of the events being dropped. This includes deduplicating, checking auth, persisting, notifying users, sending to remote servers, etc. @@ -1417,7 +1418,8 @@ async def _persist_events( ) -> EventBase: """Actually persists new events. Should only be called by `handle_new_client_event`, and see its docstring for documentation of - the arguments. + the arguments. Please note that if batch persisting events, an error in + handling any one of these events will result in all of the events being dropped. PartialStateConflictError: if attempting to persist a partial state event in a room that has been un-partial stated. @@ -1462,8 +1464,7 @@ async def _persist_events( # we return the one event that was persisted event, _ = events_and_context[-1] - # We don't worry about de-duplicating batch persisted events - if event_id != event.event_id and len(events_and_context) == 1: + if event_id != event.event_id: # If we get a different event back then it means that its # been de-duplicated, so we replace the given event with the # one already persisted. @@ -1593,6 +1594,9 @@ async def persist_and_notify_client_events( This should only be run on the instance in charge of persisting events. + Please note that if batch persisting events, an error in + handling any one of these events will result in all of the events being dropped. + Returns: The persisted event, if one event is passed in, or the last event in the list in the case of batch persisting. If only one event was persisted, the From 3d3f0eab441cfe931b0b7714401656b27222846d Mon Sep 17 00:00:00 2001 From: Shay Date: Tue, 27 Sep 2022 19:39:53 -0700 Subject: [PATCH 10/10] Update changelog.d/13800.misc Co-authored-by: Erik Johnston --- changelog.d/13800.misc | 1 - 1 file changed, 1 deletion(-) diff --git a/changelog.d/13800.misc b/changelog.d/13800.misc index c70f94eda584..761adc8b058e 100644 --- a/changelog.d/13800.misc +++ b/changelog.d/13800.misc @@ -1,2 +1 @@ Speed up creation of DM rooms. -