Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Handle inbound events from federation asynchronously #10272

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/10272.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Handle inbound events from federation asynchronously.
98 changes: 96 additions & 2 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
SynapseError,
UnsupportedRoomVersionError,
)
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
from synapse.events import EventBase
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
from synapse.federation.persistence import TransactionActions
Expand All @@ -57,10 +57,12 @@
)
from synapse.logging.opentracing import log_kv, start_active_span_from_edu, trace
from synapse.logging.utils import log_function
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.replication.http.federation import (
ReplicationFederationSendEduRestServlet,
ReplicationGetQueryRestServlet,
)
from synapse.storage.databases.main.lock import Lock
from synapse.types import JsonDict
from synapse.util import glob_to_regex, json_decoder, unwrapFirstError
from synapse.util.async_helpers import Linearizer, concurrently_execute
Expand Down Expand Up @@ -96,6 +98,11 @@
)


# The name of the lock to use when process events in a room received over
# federation.
_INBOUND_EVENT_HANDLING_LOCK_NAME = "federation_inbound_pdu"


class FederationServer(FederationBase):
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
Expand Down Expand Up @@ -834,7 +841,94 @@ async def _handle_received_pdu(self, origin: str, pdu: EventBase) -> None:
except SynapseError as e:
raise FederationError("ERROR", e.code, e.msg, affected=pdu.event_id)

await self.handler.on_receive_pdu(origin, pdu, sent_to_us_directly=True)
# Add the event to our staging area
await self.store.insert_received_event_to_staging(origin, pdu)

# Try and acquire the processing lock for the room, if we get it start a
# background process for handling the events in the room.
lock = await self.store.try_acquire_lock(
_INBOUND_EVENT_HANDLING_LOCK_NAME, pdu.room_id
)
if lock:
self._process_incoming_pdus_in_room_inner(
pdu.room_id, room_version, lock, origin, pdu
)

@wrap_as_background_process("_process_incoming_pdus_in_room_inner")
async def _process_incoming_pdus_in_room_inner(
self,
room_id: str,
room_version: RoomVersion,
lock: Lock,
latest_origin: str,
latest_event: EventBase,
) -> None:
"""Process events in the staging area for the given room.

The latest_origin and latest_event args are the latest origin and event
received.
"""

# The common path is for the event we just received be the only event in
# the room, so instead of pulling the event out of the DB and parsing
# the event we just pull out the next event ID and check if that matches.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I don't know whether the extra complexity is worth saving fetching a single event from the DB, happy to delete this logic 🤷

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤷

next_origin, next_event_id = await self.store.get_next_staged_event_id_for_room(
room_id
)
if next_origin == latest_origin and next_event_id == latest_event.event_id:
origin = latest_origin
event = latest_event
else:
next = await self.store.get_next_staged_event_for_room(
room_id, room_version
)
if not next:
return

origin, event = next

# We loop round until there are no more events in the room in the
# staging area, or we fail to get the lock (which means another process
# has started processing).
while True:
async with lock:
try:
await self.handler.on_receive_pdu(
origin, event, sent_to_us_directly=True
)
except FederationError as e:
# XXX: Ideally we'd inform the remote we failed to process
# the event, but we can't return an error in the transaction
# response (as we've already responded).
logger.warning("Error handling PDU %s: %s", event.event_id, e)
except Exception:
f = failure.Failure()
logger.error(
"Failed to handle PDU %s",
event.event_id,
exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore
)

await self.store.remove_received_event_from_staging(
origin, event.event_id
)

# We need to do this check outside the lock to avoid a race between
# a new event being inserted by another instance and it attempting
# to acquire the lock.
next = await self.store.get_next_staged_event_for_room(
room_id, room_version
)
if not next:
break

origin, event = next

lock = await self.store.try_acquire_lock(
_INBOUND_EVENT_HANDLING_LOCK_NAME, room_id
)
if not lock:
return

def __str__(self) -> str:
return "<ReplicationLayer(%s)>" % self.server_name
Expand Down
109 changes: 106 additions & 3 deletions synapse/storage/databases/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,20 @@
import itertools
import logging
from queue import Empty, PriorityQueue
from typing import Collection, Dict, Iterable, List, Set, Tuple
from typing import Collection, Dict, Iterable, List, Optional, Set, Tuple

from synapse.api.constants import MAX_DEPTH
from synapse.api.errors import StoreError
from synapse.events import EventBase
from synapse.api.room_versions import RoomVersion
from synapse.events import EventBase, make_event_from_dict
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.databases.main.signatures import SignatureWorkerStore
from synapse.storage.engines import PostgresEngine
from synapse.storage.types import Cursor
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached
from synapse.util.caches.lrucache import LruCache
from synapse.util.iterutils import batch_iter
Expand Down Expand Up @@ -1044,6 +1046,107 @@ def _delete_old_forward_extrem_cache_txn(txn):
_delete_old_forward_extrem_cache_txn,
)

async def insert_received_event_to_staging(
self, origin: str, event: EventBase
) -> None:
"""Insert a newly received event from federation into the staging area."""

# We use an upsert here to handle the case where we see the same event
# from the same server multiple times.
await self.db_pool.simple_upsert(
table="federation_inbound_events_staging",
keyvalues={
"origin": origin,
"event_id": event.event_id,
},
values={},
insertion_values={
"room_id": event.room_id,
"received_ts": self._clock.time_msec(),
"event_json": json_encoder.encode(event.get_dict()),
"internal_metadata": json_encoder.encode(
event.internal_metadata.get_dict()
),
},
desc="insert_received_event_to_staging",
)

async def remove_received_event_from_staging(
self,
origin: str,
event_id: str,
) -> None:
"""Remove the given event from the staging area"""
await self.db_pool.simple_delete(
table="federation_inbound_events_staging",
keyvalues={
"origin": origin,
"event_id": event_id,
},
desc="remove_received_event_from_staging",
)

async def get_next_staged_event_id_for_room(
self,
room_id: str,
) -> Optional[Tuple[str, str]]:
"""Get the next event ID in the staging area for the given room."""

def _get_next_staged_event_id_for_room_txn(txn):
sql = """
SELECT origin, event_id
FROM federation_inbound_events_staging
WHERE room_id = ?
ORDER BY received_ts ASC
LIMIT 1
"""

txn.execute(sql, (room_id,))

return txn.fetchone()

return await self.db_pool.runInteraction(
"get_next_staged_event_id_for_room", _get_next_staged_event_id_for_room_txn
)

async def get_next_staged_event_for_room(
self,
room_id: str,
room_version: RoomVersion,
) -> Optional[Tuple[str, EventBase]]:
"""Get the next event in the staging area for the given room."""

def _get_next_staged_event_for_room_txn(txn):
sql = """
SELECT event_json, internal_metadata, origin
FROM federation_inbound_events_staging
WHERE room_id = ?
ORDER BY received_ts ASC
LIMIT 1
"""
txn.execute(sql, (room_id,))

return txn.fetchone()

row = await self.db_pool.runInteraction(
"get_next_staged_event_for_room", _get_next_staged_event_for_room_txn
)

if not row:
return None

event_d = db_to_json(row[0])
internal_metadata_d = db_to_json(row[1])
origin = row[2]

event = make_event_from_dict(
event_dict=event_d,
room_version=room_version,
internal_metadata_dict=internal_metadata_d,
)

return origin, event


class EventFederationStore(EventFederationWorkerStore):
"""Responsible for storing and serving up the various graphs associated
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/* Copyright 2021 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


-- A staging area for newly received events over federation.
--
-- Note we may store the same event multiple times if it comes from different
-- servers; this is to handle the case if we get a redacted and non-redacted
-- versions of the event.
CREATE TABLE federation_inbound_events_staging (
origin TEXT NOT NULL,
room_id TEXT NOT NULL,
event_id TEXT NOT NULL,
received_ts BIGINT NOT NULL,
event_json TEXT NOT NULL,
internal_metadata TEXT NOT NULL
);

CREATE INDEX federation_inbound_events_staging_room ON federation_inbound_events_staging(room_id, received_ts);
CREATE UNIQUE INDEX federation_inbound_events_staging_instance_event ON federation_inbound_events_staging(origin, event_id);
6 changes: 6 additions & 0 deletions sytest-blacklist
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,9 @@ We can't peek into rooms with invited history_visibility
We can't peek into rooms with joined history_visibility
Local users can peek by room alias
Peeked rooms only turn up in the sync for the device who peeked them


# Blacklisted due to changes made in #10272
Outbound federation will ignore a missing event with bad JSON for room version 6
Backfilled events whose prev_events are in a different room do not allow cross-room back-pagination
Federation rejects inbound events where the prev_events cannot be found