Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Use ijson to parse the response to /send_join, reducing memory usage. #9958

Merged
merged 7 commits into from
May 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/9958.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Reduce memory usage when joining very large rooms over federation.
3 changes: 3 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,6 @@ ignore_missing_imports = True

[mypy-pympler.*]
ignore_missing_imports = True

[mypy-ijson.*]
ignore_missing_imports = True
28 changes: 10 additions & 18 deletions synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
)
from synapse.events import EventBase, builder
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
from synapse.federation.transport.client import SendJoinResponse
from synapse.logging.context import make_deferred_yieldable, preserve_fn
from synapse.logging.utils import log_function
from synapse.types import JsonDict, get_domain_from_id
Expand Down Expand Up @@ -665,19 +666,10 @@ async def send_join(
"""

async def send_request(destination) -> Dict[str, Any]:
content = await self._do_send_join(destination, pdu)
response = await self._do_send_join(room_version, destination, pdu)

logger.debug("Got content: %s", content)

state = [
event_from_pdu_json(p, room_version, outlier=True)
for p in content.get("state", [])
]

auth_chain = [
event_from_pdu_json(p, room_version, outlier=True)
for p in content.get("auth_chain", [])
]
state = response.state
auth_chain = response.auth_events

pdus = {p.event_id: p for p in itertools.chain(state, auth_chain)}

Expand Down Expand Up @@ -752,11 +744,14 @@ async def send_request(destination) -> Dict[str, Any]:

return await self._try_destination_list("send_join", destinations, send_request)

async def _do_send_join(self, destination: str, pdu: EventBase) -> JsonDict:
async def _do_send_join(
self, room_version: RoomVersion, destination: str, pdu: EventBase
) -> SendJoinResponse:
time_now = self._clock.time_msec()

try:
return await self.transport_layer.send_join_v2(
room_version=room_version,
destination=destination,
room_id=pdu.room_id,
event_id=pdu.event_id,
Expand All @@ -771,17 +766,14 @@ async def _do_send_join(self, destination: str, pdu: EventBase) -> JsonDict:

logger.debug("Couldn't send_join with the v2 API, falling back to the v1 API")

resp = await self.transport_layer.send_join_v1(
return await self.transport_layer.send_join_v1(
room_version=room_version,
destination=destination,
room_id=pdu.room_id,
event_id=pdu.event_id,
content=pdu.get_pdu_json(time_now),
)

# We expect the v1 API to respond with [200, content], so we only return the
# content.
return resp[1]

async def send_invite(
self,
destination: str,
Expand Down
85 changes: 81 additions & 4 deletions synapse/federation/transport/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,19 @@
import urllib
from typing import Any, Dict, List, Optional

import attr
import ijson

from synapse.api.constants import Membership
from synapse.api.errors import Codes, HttpResponseException, SynapseError
from synapse.api.room_versions import RoomVersion
from synapse.api.urls import (
FEDERATION_UNSTABLE_PREFIX,
FEDERATION_V1_PREFIX,
FEDERATION_V2_PREFIX,
)
from synapse.events import EventBase, make_event_from_dict
from synapse.http.matrixfederationclient import ByteParser
from synapse.logging.utils import log_function
from synapse.types import JsonDict

Expand Down Expand Up @@ -240,21 +246,36 @@ async def make_membership_event(
return content

@log_function
async def send_join_v1(self, destination, room_id, event_id, content):
async def send_join_v1(
self,
room_version,
destination,
room_id,
event_id,
content,
) -> "SendJoinResponse":
path = _create_v1_path("/send_join/%s/%s", room_id, event_id)

response = await self.client.put_json(
destination=destination, path=path, data=content
destination=destination,
path=path,
data=content,
parser=SendJoinParser(room_version, v1_api=True),
)

return response

@log_function
async def send_join_v2(self, destination, room_id, event_id, content):
async def send_join_v2(
self, room_version, destination, room_id, event_id, content
) -> "SendJoinResponse":
path = _create_v2_path("/send_join/%s/%s", room_id, event_id)

response = await self.client.put_json(
destination=destination, path=path, data=content
destination=destination,
path=path,
data=content,
parser=SendJoinParser(room_version, v1_api=False),
)

return response
Expand Down Expand Up @@ -1052,3 +1073,59 @@ def _create_v2_path(path, *args):
str
"""
return _create_path(FEDERATION_V2_PREFIX, path, *args)


@attr.s(slots=True, auto_attribs=True)
class SendJoinResponse:
"""The parsed response of a `/send_join` request."""

auth_events: List[EventBase]
state: List[EventBase]


@ijson.coroutine
def _event_list_parser(room_version: RoomVersion, events: List[EventBase]):
"""Helper function for use with `ijson.items_coro` to parse an array of
events and add them to the given list.
"""

while True:
obj = yield
event = make_event_from_dict(obj, room_version)
events.append(event)


class SendJoinParser(ByteParser[SendJoinResponse]):
"""A parser for the response to `/send_join` requests.

Args:
room_version: The version of the room.
v1_api: Whether the response is in the v1 format.
"""

CONTENT_TYPE = "application/json"

def __init__(self, room_version: RoomVersion, v1_api: bool):
self._response = SendJoinResponse([], [])

# The V1 API has the shape of `[200, {...}]`, which we handle by
# prefixing with `item.*`.
prefix = "item." if v1_api else ""

self._coro_state = ijson.items_coro(
_event_list_parser(room_version, self._response.state),
prefix + "state.item",
)
self._coro_auth = ijson.items_coro(
_event_list_parser(room_version, self._response.auth_events),
prefix + "auth_chain.item",
)

def write(self, data: bytes) -> int:
self._coro_state.send(data)
self._coro_auth.send(data)

return len(data)

def finish(self) -> SendJoinResponse:
return self._response
7 changes: 6 additions & 1 deletion synapse/http/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,12 @@ def dataReceived(self, data: bytes) -> None:
if self.deferred.called:
return

self.stream.write(data)
try:
self.stream.write(data)
except Exception:
self.deferred.errback()
return

self.length += len(data)
# The first time the maximum size is exceeded, error and cancel the
# connection. dataReceived might be called again if data was received
Expand Down
Loading