Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingester: bridge OpenTracing spans to Otel #8242

Merged
merged 4 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
* [ENHANCEMENT] Query-frontend: include route name in query stats log lines. #8191
* [ENHANCEMENT] OTLP: Speed up conversion from OTel to Mimir format by about 8% and reduce memory consumption by about 30%. Can be disabled via `-distributor.direct-otlp-translation-enabled=false` #7957
* [ENHANCEMENT] Ingester/Querier: Optimise regexps with long lists of alternates. #8221, #8234
* [ENHANCEMENT] Ingester: Include more detail in tracing of queries. #8242
* [EHNAHCEMENT] Distributor: add `insight=true` to remote-write and OTLP write handlers when the HTTP response status code is 4xx. #8294
* [BUGFIX] Distributor: make OTLP endpoint return marshalled proto bytes as response body for 4xx/5xx errors. #8227
* [BUGFIX] Rules: improve error handling when querier is local to the ruler. #7567
Expand Down
2 changes: 2 additions & 0 deletions pkg/ingester/active_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/storage/sharding"
"github.com/grafana/mimir/pkg/util/spanlogger"
"github.com/grafana/mimir/pkg/util/tracing"
)

const activeSeriesMaxSizeBytes = 1 * 1024 * 1024
Expand All @@ -35,6 +36,7 @@ func (i *Ingester) ActiveSeries(request *client.ActiveSeriesRequest, stream clie

spanlog, ctx := spanlogger.NewWithLogger(stream.Context(), i.logger, "Ingester.ActiveSeries")
defer spanlog.Finish()
ctx = tracing.BridgeOpenTracingToOtel(ctx)

userID, err := tenant.TenantID(ctx)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ import (
util_math "github.com/grafana/mimir/pkg/util/math"
"github.com/grafana/mimir/pkg/util/shutdownmarker"
"github.com/grafana/mimir/pkg/util/spanlogger"
"github.com/grafana/mimir/pkg/util/tracing"
"github.com/grafana/mimir/pkg/util/validation"
)

Expand Down Expand Up @@ -1633,6 +1634,7 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery

spanlog, ctx := spanlogger.NewWithLogger(ctx, i.logger, "Ingester.QueryExemplars")
defer spanlog.Finish()
ctx = tracing.BridgeOpenTracingToOtel(ctx)

userID, err := tenant.TenantID(ctx)
if err != nil {
Expand Down Expand Up @@ -2079,6 +2081,7 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_

spanlog, ctx := spanlogger.NewWithLogger(stream.Context(), i.logger, "Ingester.QueryStream")
defer spanlog.Finish()
ctx = tracing.BridgeOpenTracingToOtel(ctx)

userID, err := tenant.TenantID(ctx)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/mimir/mimir.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ import (
util_log "github.com/grafana/mimir/pkg/util/log"
"github.com/grafana/mimir/pkg/util/noauth"
"github.com/grafana/mimir/pkg/util/process"
"github.com/grafana/mimir/pkg/util/tracing"
"github.com/grafana/mimir/pkg/util/validation"
"github.com/grafana/mimir/pkg/util/validation/exporter"
"github.com/grafana/mimir/pkg/vault"
Expand Down Expand Up @@ -786,7 +787,7 @@ func New(cfg Config, reg prometheus.Registerer) (*Mimir, error) {
// We are passing the wrapped tracer to both opentracing and opentelemetry until after the ecosystem
// gets converged into the latter.
opentracing.SetGlobalTracer(tracer)
otel.SetTracerProvider(NewOpenTelemetryProviderBridge(tracer))
otel.SetTracerProvider(tracing.NewOpenTelemetryProviderBridge(tracer))

mimir.Cfg.Server.Router = mux.NewRouter()

Expand Down
271 changes: 0 additions & 271 deletions pkg/mimir/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,9 @@ package mimir

import (
"context"
"encoding/binary"

"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/log"
objstoretracing "github.com/thanos-io/objstore/tracing/opentracing"
"github.com/uber/jaeger-client-go"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
"google.golang.org/grpc"
)

Expand All @@ -43,267 +36,3 @@ type wrappedServerStream struct {
func (ss wrappedServerStream) Context() context.Context {
return ss.ctx
}

type OpenTelemetryProviderBridge struct {
// TracerProvider is the fallback trace.TracerProvider used for functions not implemented
// by our custom one (even if we aim to implement all of them). OpenTelemetry library
// requires one (compilation fails without it).
noop.TracerProvider

tracer opentracing.Tracer
}

func NewOpenTelemetryProviderBridge(tracer opentracing.Tracer) *OpenTelemetryProviderBridge {
return &OpenTelemetryProviderBridge{
tracer: tracer,
}
}

// Tracer creates an implementation of the Tracer interface.
// The instrumentationName must be the name of the library providing
// instrumentation. This name may be the same as the instrumented code
// only if that code provides built-in instrumentation. If the
// instrumentationName is empty, then a implementation defined default
// name will be used instead.
//
// This method must be concurrency safe.
func (p *OpenTelemetryProviderBridge) Tracer(_ string, _ ...trace.TracerOption) trace.Tracer {
return NewOpenTelemetryTracerBridge(p.tracer, p)
}

type OpenTelemetryTracerBridge struct {
// Tracer is the fallback trace.Tracer used for functions not implemented
// by our custom one (even if we aim to implement all of them). OpenTelemetry library
// requires one (compilation fails without it).
noop.Tracer

tracer opentracing.Tracer
provider trace.TracerProvider
}

func NewOpenTelemetryTracerBridge(tracer opentracing.Tracer, provider trace.TracerProvider) *OpenTelemetryTracerBridge {
return &OpenTelemetryTracerBridge{
tracer: tracer,
provider: provider,
}
}

// Start creates a span and a context.Context containing the newly-created span.
//
// If the context.Context provided in `ctx` contains a Span then the newly-created
// Span will be a child of that span, otherwise it will be a root span. This behavior
// can be overridden by providing `WithNewRoot()` as a SpanOption, causing the
// newly-created Span to be a root span even if `ctx` contains a Span.
//
// When creating a Span it is recommended to provide all known span attributes using
// the `WithAttributes()` SpanOption as samplers will only have access to the
// attributes provided when a Span is created.
//
// Any Span that is created MUST also be ended. This is the responsibility of the user.
// Implementations of this API may leak memory or other resources if Spans are not ended.
func (t *OpenTelemetryTracerBridge) Start(ctx context.Context, spanName string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
var mappedOptions []opentracing.StartSpanOption

// Map supported options.
if len(opts) > 0 {
mappedOptions = make([]opentracing.StartSpanOption, 0, len(opts))
cfg := trace.NewSpanStartConfig(opts...)

if !cfg.Timestamp().IsZero() {
mappedOptions = append(mappedOptions, opentracing.StartTime(cfg.Timestamp()))
}
if len(cfg.Attributes()) > 0 {
tags := make(map[string]any, len(cfg.Attributes()))

for _, attr := range cfg.Attributes() {
if !attr.Valid() {
continue
}

tags[string(attr.Key)] = attr.Value.AsInterface()
}

mappedOptions = append(mappedOptions, opentracing.Tags(tags))
}
}

// If the context contains a valid *remote* OTel span, and there isn't yet an OpenTelemetry span there,
// we manually link this OTel span to the OT child we are about to start before.
// This allows preserving a minimal parent-child relationship with OTel spans, which were extracted
// via OTel propagators. Otherwise, the relation gets lost.
if sc := trace.SpanContextFromContext(ctx); sc.IsValid() && sc.IsRemote() {
if parentSpan := opentracing.SpanFromContext(ctx); parentSpan == nil {
// Create a new Jaeger span context with minimum required options.
jsc := jaeger.NewSpanContext(
jaegerFromOTelTraceID(sc.TraceID()),
jaegerFromOTelSpanID(sc.SpanID()),
0,
sc.IsSampled(),
nil,
)
mappedOptions = append(mappedOptions, opentracing.ChildOf(jsc))
}
}

span, ctx := opentracing.StartSpanFromContextWithTracer(ctx, t.tracer, spanName, mappedOptions...)
otelSpan := NewOpenTelemetrySpanBridge(span, t.provider)
return trace.ContextWithSpan(ctx, otelSpan), otelSpan
}

type OpenTelemetrySpanBridge struct {
// Span is the fallback trace.Span used for functions not implemented
// by our custom one (even if we aim to implement all of them). OpenTelemetry library
// requires one (compilation fails without it).
noop.Span

span opentracing.Span
provider trace.TracerProvider
}

func NewOpenTelemetrySpanBridge(span opentracing.Span, provider trace.TracerProvider) *OpenTelemetrySpanBridge {
return &OpenTelemetrySpanBridge{
span: span,
provider: provider,
}
}

// End completes the Span. The Span is considered complete and ready to be
// delivered through the rest of the telemetry pipeline after this method
// is called. Therefore, updates to the Span are not allowed after this
// method has been called.
func (s *OpenTelemetrySpanBridge) End(options ...trace.SpanEndOption) {
if len(options) == 0 {
s.span.Finish()
return
}

cfg := trace.NewSpanEndConfig(options...)
s.span.FinishWithOptions(opentracing.FinishOptions{
FinishTime: cfg.Timestamp(),
})
}

// AddEvent adds an event with the provided name and options.
func (s *OpenTelemetrySpanBridge) AddEvent(name string, options ...trace.EventOption) {
cfg := trace.NewEventConfig(options...)
s.addEvent(name, cfg.Attributes())
}

func (s *OpenTelemetrySpanBridge) addEvent(name string, attributes []attribute.KeyValue) {
s.logFieldWithAttributes(log.Event(name), attributes)
}

// IsRecording returns the recording state of the Span. It will return
// true if the Span is active and events can be recorded.
func (s *OpenTelemetrySpanBridge) IsRecording() bool {
return true
}

// RecordError will record err as an exception span event for this span. An
// additional call to SetStatus is required if the Status of the Span should
// be set to Error, as this method does not change the Span status. If this
// span is not being recorded or err is nil then this method does nothing.
func (s *OpenTelemetrySpanBridge) RecordError(err error, options ...trace.EventOption) {
cfg := trace.NewEventConfig(options...)
s.recordError(err, cfg.Attributes())
}

func (s *OpenTelemetrySpanBridge) recordError(err error, attributes []attribute.KeyValue) {
s.logFieldWithAttributes(log.Error(err), attributes)
}

// SpanContext returns the SpanContext of the Span. The returned SpanContext
// is usable even after the End method has been called for the Span.
func (s *OpenTelemetrySpanBridge) SpanContext() trace.SpanContext {
// We only support Jaeger span context.
sctx, ok := s.span.Context().(jaeger.SpanContext)
if !ok {
return trace.SpanContext{}
}

var flags trace.TraceFlags
flags = flags.WithSampled(sctx.IsSampled())

return trace.NewSpanContext(trace.SpanContextConfig{
TraceID: jaegerToOTelTraceID(sctx.TraceID()),
SpanID: jaegerToOTelSpanID(sctx.SpanID()),
TraceFlags: flags,

// Unsupported because we can't read it from the Jaeger span context.
Remote: false,
})
}

// SetStatus sets the status of the Span in the form of a code and a
// description, overriding previous values set. The description is only
// included in a status when the code is for an error.
func (s *OpenTelemetrySpanBridge) SetStatus(code codes.Code, description string) {
// We use a log instead of setting tags to have it more prominent in the tracing UI.
s.span.LogFields(log.Uint32("code", uint32(code)), log.String("description", description))
}

// SetName sets the Span name.
func (s *OpenTelemetrySpanBridge) SetName(name string) {
s.span.SetOperationName(name)
}

// SetAttributes sets kv as attributes of the Span. If a key from kv
// already exists for an attribute of the Span it will be overwritten with
// the value contained in kv.
func (s *OpenTelemetrySpanBridge) SetAttributes(kv ...attribute.KeyValue) {
for _, attr := range kv {
if !attr.Valid() {
continue
}

s.span.SetTag(string(attr.Key), attr.Value.AsInterface())
}
}

// TracerProvider returns a TracerProvider that can be used to generate
// additional Spans on the same telemetry pipeline as the current Span.
func (s *OpenTelemetrySpanBridge) TracerProvider() trace.TracerProvider {
return s.provider
}

func (s *OpenTelemetrySpanBridge) logFieldWithAttributes(field log.Field, attributes []attribute.KeyValue) {
if len(attributes) == 0 {
s.span.LogFields(field)
return
}

fields := make([]log.Field, 0, 1+len(attributes))
fields = append(fields, field)

for _, attr := range attributes {
if attr.Valid() {
fields = append(fields, log.Object(string(attr.Key), attr.Value.AsInterface()))
}
}

s.span.LogFields(fields...)
}

func jaegerToOTelTraceID(input jaeger.TraceID) trace.TraceID {
var traceID trace.TraceID
binary.BigEndian.PutUint64(traceID[0:8], input.High)
binary.BigEndian.PutUint64(traceID[8:16], input.Low)
return traceID
}

func jaegerToOTelSpanID(input jaeger.SpanID) trace.SpanID {
var spanID trace.SpanID
binary.BigEndian.PutUint64(spanID[0:8], uint64(input))
return spanID
}

func jaegerFromOTelTraceID(input trace.TraceID) jaeger.TraceID {
var traceID jaeger.TraceID
traceID.High = binary.BigEndian.Uint64(input[0:8])
traceID.Low = binary.BigEndian.Uint64(input[8:16])
return traceID
}

func jaegerFromOTelSpanID(input trace.SpanID) jaeger.SpanID {
return jaeger.SpanID(binary.BigEndian.Uint64(input[0:8]))
}
Loading
Loading