Skip to content

Commit

Permalink
Instrument Starlette background tasks (#355)
Browse files Browse the repository at this point in the history
Fixes #352.
  • Loading branch information
adamchainz authored Nov 5, 2019
1 parent 140451e commit 6f58e05
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 10 deletions.
57 changes: 51 additions & 6 deletions src/scout_apm/async_/starlette.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# coding=utf-8
from __future__ import absolute_import, division, print_function, unicode_literals

import wrapt
from starlette.background import BackgroundTask
from starlette.requests import Request

import scout_apm.core
Expand All @@ -18,6 +20,8 @@ def __init__(self, app):
self.app = app
installed = scout_apm.core.install()
self._do_nothing = not installed
if installed:
install_background_instrumentation()

async def __call__(self, scope, receive, send):
if self._do_nothing or scope["type"] != "http":
Expand Down Expand Up @@ -51,16 +55,57 @@ async def __call__(self, scope, receive, send):
amazon_queue_time = request.headers.get("x-amzn-trace-id", default="")
track_amazon_request_queue_time(amazon_queue_time, tracked_request)

try:
await self.app(scope, receive, send)
except Exception as exc:
tracked_request.tag("error", "true")
raise exc
finally:
def rename_controller_span_from_endpoint():
if "endpoint" in scope:
# Rename top span
endpoint = scope["endpoint"]
controller_span.operation = "Controller/{}.{}".format(
endpoint.__module__, endpoint.__qualname__
)
tracked_request.is_real_request = True

async def wrapped_send(data):
# Finish HTTP span when body finishes sending, not later (e.g.
# after background tasks)
if data.get("type", None) == "http.response.body" and not data.get(
"more_body", False
):
rename_controller_span_from_endpoint()
tracked_request.stop_span()
return await send(data)

try:
await self.app(scope, receive, wrapped_send)
except Exception as exc:
tracked_request.tag("error", "true")
raise exc
finally:
if tracked_request.end_time is None:
rename_controller_span_from_endpoint()
tracked_request.stop_span()


background_instrumentation_installed = False


def install_background_instrumentation():
global background_instrumentation_installed
if background_instrumentation_installed:
return
background_instrumentation_installed = True

@wrapt.decorator
async def wrapped_background_call(wrapped, instance, args, kwargs):
tracked_request = TrackedRequest.instance()
tracked_request.is_real_request = True
tracked_request.start_span(
operation="Job/{}.{}".format(
instance.func.__module__, instance.func.__qualname__
)
)
try:
return await wrapped(*args, **kwargs)
finally:
tracked_request.stop_span()

BackgroundTask.__call__ = wrapped_background_call(BackgroundTask.__call__)
54 changes: 50 additions & 4 deletions tests/integration/test_starlette_py36plus.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pytest
from asgiref.testing import ApplicationCommunicator
from starlette.applications import Starlette
from starlette.background import BackgroundTasks
from starlette.endpoints import HTTPEndpoint
from starlette.responses import PlainTextResponse

Expand Down Expand Up @@ -36,6 +37,11 @@ def app_with_scout(*, scout_config=None):

app = Starlette()

@app.exception_handler(500)
async def error(request, exc):
# Always raise exceptions
raise exc

@app.route("/")
async def home(request):
return PlainTextResponse("Welcome home.")
Expand All @@ -49,10 +55,19 @@ async def get(self, request):
async def crash(request):
raise ValueError("BØØM!") # non-ASCII

@app.exception_handler(500)
async def error(request, exc):
# Always raise exceptions
raise exc
@app.route("/background-jobs/")
async def background_jobs(request):
def sync_noop():
pass

async def async_noop():
pass

tasks = BackgroundTasks()
tasks.add_task(sync_noop)
tasks.add_task(async_noop)

return PlainTextResponse("Triggering background jobs", background=tasks)

# As per http://docs.scoutapm.com/#starlette
Config.set(**scout_config)
Expand Down Expand Up @@ -283,3 +298,34 @@ async def test_unknown_asgi_scope(tracked_requests):

assert response_start == {"type": "lifespan.startup.complete"}
assert tracked_requests == []


@async_test
async def test_background_jobs(tracked_requests):
with app_with_scout() as app:
communicator = ApplicationCommunicator(app, get_scope(path="/background-jobs/"))
await communicator.send_input({"type": "http.request"})
response_start = await communicator.receive_output()
response_body = await communicator.receive_output()
await communicator.wait()

assert response_start["type"] == "http.response.start"
assert response_start["status"] == 200
assert response_body["body"] == b"Triggering background jobs"
assert len(tracked_requests) == 3

sync_tracked_request = tracked_requests[1]
assert len(sync_tracked_request.complete_spans) == 1
sync_span = sync_tracked_request.complete_spans[0]
assert sync_span.operation == (
"Job/tests.integration.test_starlette_py36plus."
+ "app_with_scout.<locals>.background_jobs.<locals>.sync_noop"
)

async_tracked_request = tracked_requests[2]
assert len(async_tracked_request.complete_spans) == 1
async_span = async_tracked_request.complete_spans[0]
assert async_span.operation == (
"Job/tests.integration.test_starlette_py36plus."
+ "app_with_scout.<locals>.background_jobs.<locals>.async_noop"
)

0 comments on commit 6f58e05

Please sign in to comment.