Skip to content

Commit

Permalink
Extract tracing metadata from event
Browse files Browse the repository at this point in the history
  • Loading branch information
w1am committed Aug 29, 2024
1 parent c0a5a42 commit ac15c5c
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public void onNext(@NotNull StreamsOuterClass.ReadResp readResp) {
channel,
client.getSettings(),
options.getCredentials(),
resolvedEvent.getOriginalEvent());
resolvedEvent.getEvent());
} catch (Exception e) {
onError(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void onNext(Persistent.ReadResp readResp) {
args.getChannel(),
client.getSettings(),
options.getCredentials(),
resolvedEvent.getOriginalEvent());
resolvedEvent.getEvent());
} catch (Exception e) {
onError(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,18 @@ private static SpanContext tryExtractTracingContext(byte[] userMetadataBytes) {
if (!TraceId.isValid(traceId) || !SpanId.isValid(spanId))
return null;

return SpanContext.createFromRemoteParent(traceId, spanId, TraceFlags.getSampled(), TraceState.getDefault());
return SpanContext.createFromRemoteParent(traceId, spanId, TraceFlags.getSampled(),
TraceState.getDefault());
} catch (Throwable t) {
return null;
}
}

static CompletableFuture<WriteResult> traceAppend(BiFunction<ManagedChannel, List<EventData>, CompletableFuture<WriteResult>> appendOperation, ManagedChannel channel,
List<EventData> events, String streamId, EventStoreDBClientSettings settings, UserCredentials optionalCallCredentials) {
static CompletableFuture<WriteResult> traceAppend(
BiFunction<ManagedChannel, List<EventData>, CompletableFuture<WriteResult>> appendOperation,
ManagedChannel channel,
List<EventData> events, String streamId, EventStoreDBClientSettings settings,
UserCredentials optionalCallCredentials) {
Span span = createSpan(
ClientTelemetryConstants.Operations.APPEND,
SpanKind.CLIENT,
Expand Down Expand Up @@ -115,9 +119,13 @@ static CompletableFuture<WriteResult> traceAppend(BiFunction<ManagedChannel, Lis
}
}

static void traceSubscribe(Runnable tracedOperation, String subscriptionId, ManagedChannel channel, EventStoreDBClientSettings settings,
static void traceSubscribe(Runnable tracedOperation, String subscriptionId, ManagedChannel channel,
EventStoreDBClientSettings settings,
UserCredentials optionalCallCredentials, RecordedEvent event) {
SpanContext remoteParentContext = tryExtractTracingContext(event.getUserMetadata());

if (remoteParentContext == null) return;

Span span = createSpan(
ClientTelemetryConstants.Operations.SUBSCRIBE,
SpanKind.CONSUMER,
Expand Down Expand Up @@ -145,7 +153,8 @@ static void traceSubscribe(Runnable tracedOperation, String subscriptionId, Mana
}
}

static Span createSpan(String operationName, SpanKind spanKind, SpanContext parentContext, ClientTelemetryTags customAttributes) {
static Span createSpan(String operationName, SpanKind spanKind, SpanContext parentContext,
ClientTelemetryTags customAttributes) {
SpanBuilder spanBuilder = getTracer().spanBuilder(operationName).setSpanKind(spanKind);

if (parentContext != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,27 +64,62 @@ default void testTracingContextIsInjectedAsExpectedWhenUserMetadataIsJsonObject(
}

@Test
default void testTracingContextInjectionIsIgnoredAsExpectedWhenUserMetadataIsNonNullAndNotAJsonObject() throws Throwable {
@Timeout(value = 2, unit = TimeUnit.MINUTES)
default void testTracingContextInjectionIsIgnoredAsExpectedWhenUserMetadataIsNonNullAndNotAJsonObject()
throws Throwable {
EventStoreDBClient client = getDefaultClient();
String streamName = generateName();
byte[] userMetadata = mapper.writeValueAsBytes("clearlynotvalidjson");

EventData eventWithValidMetadata = EventData.builderAsJson("TestEvent", mapper.writeValueAsBytes(new Foo()))
.eventId(UUID.randomUUID())
.build();

EventData eventWithInvalidMetadata = EventData.builderAsJson("TestEvent", mapper.writeValueAsBytes(new Foo()))
.metadataAsBytes(userMetadata)
.eventId(UUID.randomUUID())
.build();

client.appendToStream(
streamName,
AppendToStreamOptions.get().expectedRevision(ExpectedRevision.noStream()),
EventData.builderAsJson("TestEvent", mapper.writeValueAsBytes(new Foo()))
.metadataAsBytes(userMetadata)
.eventId(UUID.randomUUID())
.build())
eventWithValidMetadata,
eventWithInvalidMetadata)
.get();

ReadResult readResult = client.readStream(streamName, ReadStreamOptions.get()).get();

ResolvedEvent resolvedEvent = readResult.getEvents().get(0);
Assertions.assertNotNull(resolvedEvent);
List<ResolvedEvent> resolvedEvent = readResult.getEvents();
Assertions.assertEquals(2, resolvedEvent.size());

// Assert unchanged
Assertions.assertArrayEquals(userMetadata, resolvedEvent.getEvent().getUserMetadata());
Assertions.assertArrayEquals(userMetadata, resolvedEvent.get(1).getEvent().getUserMetadata());

CountDownLatch subscribeSpansLatch = new CountDownLatch(1);
onOperationSpanEnded(ClientTelemetryConstants.Operations.SUBSCRIBE, span -> subscribeSpansLatch.countDown());

Subscription subscription = client.subscribeToStream(
streamName,
new SubscriptionListener() {
}
).get();

List<ReadableSpan> appendSpans = this.getSpansForOperation(ClientTelemetryConstants.Operations.APPEND);
Assertions.assertEquals(1, appendSpans.size());

subscribeSpansLatch.await();
subscription.stop();

List<ReadableSpan> subscribeSpans = this.getSpansForOperation(ClientTelemetryConstants.Operations.SUBSCRIBE);

Assertions.assertEquals(1, subscribeSpans.size());

assertSubscriptionActivityHasExpectedAttributes(
subscribeSpans.get(0),
streamName,
subscription.getSubscriptionId(),
eventWithValidMetadata.getEventId().toString(),
eventWithValidMetadata.getEventType());
}

@Test
Expand Down

0 comments on commit ac15c5c

Please sign in to comment.