Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Make opentracing trace into event persistence #10134

Merged
merged 4 commits into from
Jun 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/10134.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve OpenTracing for event persistence.
4 changes: 2 additions & 2 deletions synapse/api/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ async def get_user_by_req(

request.requester = user_id
if user_id in self._force_tracing_for_users:
opentracing.set_tag(opentracing.tags.SAMPLING_PRIORITY, 1)
opentracing.force_tracing()
opentracing.set_tag("authenticated_entity", user_id)
opentracing.set_tag("user_id", user_id)
opentracing.set_tag("appservice_id", app_service.id)
Expand Down Expand Up @@ -260,7 +260,7 @@ async def get_user_by_req(

request.requester = requester
if user_info.token_owner in self._force_tracing_for_users:
opentracing.set_tag(opentracing.tags.SAMPLING_PRIORITY, 1)
opentracing.force_tracing()
opentracing.set_tag("authenticated_entity", user_info.token_owner)
opentracing.set_tag("user_id", user_info.user_id)
if device_id:
Expand Down
57 changes: 55 additions & 2 deletions synapse/logging/opentracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def set_fates(clotho, lachesis, atropos, father="Zues", mother="Themis"):
import logging
import re
from functools import wraps
from typing import TYPE_CHECKING, Dict, List, Optional, Pattern, Type
from typing import TYPE_CHECKING, Collection, Dict, List, Optional, Pattern, Type

import attr

Expand Down Expand Up @@ -278,13 +278,19 @@ class SynapseTags:
DB_TXN_ID = "db.txn_id"


class SynapseBaggage:
FORCE_TRACING = "synapse-force-tracing"


# Block everything by default
# A regex which matches the server_names to expose traces for.
# None means 'block everything'.
_homeserver_whitelist = None # type: Optional[Pattern[str]]

# Util methods

Sentinel = object()


def only_if_tracing(func):
"""Executes the function only if we're tracing. Otherwise returns None."""
Expand Down Expand Up @@ -447,12 +453,28 @@ def start_active_span(
)


def start_active_span_follows_from(operation_name, contexts):
def start_active_span_follows_from(
operation_name: str, contexts: Collection, inherit_force_tracing=False
):
"""Starts an active opentracing span, with additional references to previous spans

Args:
operation_name: name of the operation represented by the new span
contexts: the previous spans to inherit from
inherit_force_tracing: if set, and any of the previous contexts have had tracing
forced, the new span will also have tracing forced.
"""
if opentracing is None:
return noop_context_manager()

references = [opentracing.follows_from(context) for context in contexts]
scope = start_active_span(operation_name, references=references)

if inherit_force_tracing and any(
is_context_forced_tracing(ctx) for ctx in contexts
):
force_tracing(scope.span)

return scope


Expand Down Expand Up @@ -551,6 +573,10 @@ def start_active_span_from_edu(


# Opentracing setters for tags, logs, etc
@only_if_tracing
def active_span():
"""Get the currently active span, if any"""
return opentracing.tracer.active_span


@ensure_active_span("set a tag")
Expand All @@ -571,6 +597,33 @@ def set_operation_name(operation_name):
opentracing.tracer.active_span.set_operation_name(operation_name)


@only_if_tracing
def force_tracing(span=Sentinel) -> None:
"""Force sampling for the active/given span and its children.

Args:
span: span to force tracing for. By default, the active span.
"""
if span is Sentinel:
span = opentracing.tracer.active_span
if span is None:
logger.error("No active span in force_tracing")
return

span.set_tag(opentracing.tags.SAMPLING_PRIORITY, 1)

# also set a bit of baggage, so that we have a way of figuring out if
# it is enabled later
span.set_baggage_item(SynapseBaggage.FORCE_TRACING, "1")


def is_context_forced_tracing(span_context) -> bool:
"""Check if sampling has been force for the given span context."""
if span_context is None:
return False
return span_context.baggage.get(SynapseBaggage.FORCE_TRACING) is not None


# Injection and extraction


Expand Down
46 changes: 41 additions & 5 deletions synapse/storage/persist_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import logging
from collections import deque
from typing import (
Any,
Awaitable,
Callable,
Collection,
Expand All @@ -40,6 +41,7 @@
from synapse.api.constants import EventTypes, Membership
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.logging import opentracing
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.databases import Databases
Expand Down Expand Up @@ -103,12 +105,18 @@
)


@attr.s(auto_attribs=True, frozen=True, slots=True)
@attr.s(auto_attribs=True, slots=True)
class _EventPersistQueueItem:
events_and_contexts: List[Tuple[EventBase, EventContext]]
backfilled: bool
deferred: ObservableDeferred

parent_opentracing_span_contexts: List = []
"""A list of opentracing spans waiting for this batch"""

opentracing_span_context: Any = None
"""The opentracing span under which the persistence actually happened"""


_PersistResult = TypeVar("_PersistResult")

Expand Down Expand Up @@ -171,9 +179,27 @@ async def add_to_queue(
)
queue.append(end_item)

# add our events to the queue item
end_item.events_and_contexts.extend(events_and_contexts)

# also add our active opentracing span to the item so that we get a link back
span = opentracing.active_span()
if span:
end_item.parent_opentracing_span_contexts.append(span.context)

# start a processor for the queue, if there isn't one already
self._handle_queue(room_id)
return await make_deferred_yieldable(end_item.deferred.observe())

# wait for the queue item to complete
res = await make_deferred_yieldable(end_item.deferred.observe())

# add another opentracing span which links to the persist trace.
with opentracing.start_active_span_follows_from(
"persist_event_batch_complete", (end_item.opentracing_span_context,)
):
pass

return res

def _handle_queue(self, room_id):
"""Attempts to handle the queue for a room if not already being handled.
Expand All @@ -200,9 +226,17 @@ async def handle_queue_loop():
queue = self._get_drainining_queue(room_id)
for item in queue:
try:
ret = await self._per_item_callback(
item.events_and_contexts, item.backfilled
)
with opentracing.start_active_span_follows_from(
"persist_event_batch",
item.parent_opentracing_span_contexts,
inherit_force_tracing=True,
) as scope:
if scope:
item.opentracing_span_context = scope.span.context

ret = await self._per_item_callback(
item.events_and_contexts, item.backfilled
)
except Exception:
with PreserveLoggingContext():
item.deferred.errback()
Expand Down Expand Up @@ -252,6 +286,7 @@ def __init__(self, hs, stores: Databases):
self._event_persist_queue = _EventPeristenceQueue(self._persist_event_batch)
self._state_resolution_handler = hs.get_state_resolution_handler()

@opentracing.trace
async def persist_events(
self,
events_and_contexts: Iterable[Tuple[EventBase, EventContext]],
Expand Down Expand Up @@ -307,6 +342,7 @@ async def enqueue(item):
self.main_store.get_room_max_token(),
)

@opentracing.trace
async def persist_event(
self, event: EventBase, context: EventContext, backfilled: bool = False
) -> Tuple[EventBase, PersistedEventPosition, RoomStreamToken]:
Expand Down