From d537aee8312a2cb23251c5bec527e448f0d6f733 Mon Sep 17 00:00:00 2001 From: AndrewWinterman Date: Fri, 28 Jun 2024 15:11:17 -0700 Subject: [PATCH 1/4] feat(otel): add opentelemety utility functions This PR extracts opentelemetry utility functions from my private project and adds them to this project without calling them. It resolves https://github.com/rabbitmq/amqp091-go/issues/43 I'd like a broader discussion about whether these should be automatically called by the library where possible, or if they should simply be provided to clients to use if they so wish. I did my best to follow OpenTelemetry semantic conventions as described here https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/, but they are at times ambiguous for rabbitmq-- e.g. is the destination for a message the Queue or the Consumer Tag the message was delivered to. Given the channel based approaches of this library, it is impossible for the library to know the full execution of a consumer. Unless autoack=false, we cannot actually know when to end the span associated with a delivery, so at least in the consumer case, it's probably best to allow the client to manage spans for themselves. We *can* manage spans on the producer side, and at the very least extract span identifiers to include on published headers automatically, and provide utilities for pulling them back out again. My intention with putting this PR up is to move the conversation forward. Because the PR *only* provides private methods (if I left members public please call them out), it can be safely merged while these questions are worked out. --- go.mod | 17 +++- go.sum | 23 +++++- opentelemetry.go | 196 +++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 233 insertions(+), 3 deletions(-) create mode 100644 opentelemetry.go diff --git a/go.mod b/go.mod index d9b868c..43655d8 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,18 @@ module github.com/rabbitmq/amqp091-go -go 1.20 +go 1.21 -require go.uber.org/goleak v1.3.0 +toolchain go1.22.0 + +require ( + github.com/getoutreach/gobox v1.92.1 + go.opentelemetry.io/otel v1.27.0 + go.opentelemetry.io/otel/trace v1.27.0 + go.uber.org/goleak v1.3.0 +) + +require ( + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + go.opentelemetry.io/otel/metric v1.27.0 // indirect +) diff --git a/go.sum b/go.sum index 6037995..041b8f4 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,27 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/getoutreach/gobox v1.92.1 h1:MBDedZCUN+ef/ljBHAOSyVisqvR5dPlSwso1JdMPbXw= +github.com/getoutreach/gobox v1.92.1/go.mod h1:IPy+RNuOYRMTizH6iTr33myGKcRhjEIIHS2VMqzZL0A= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= -github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +go.opentelemetry.io/otel v1.27.0 h1:9BZoF3yMK/O1AafMiQTVu0YDj5Ea4hPhxCs7sGva+cg= +go.opentelemetry.io/otel v1.27.0/go.mod h1:DMpAK8fzYRzs+bi3rS5REupisuqTheUlSZJ1WnZaPAQ= +go.opentelemetry.io/otel/metric v1.27.0 h1:hvj3vdEKyeCi4YaYfNjv2NUje8FqKqUY8IlF0FxV/ik= +go.opentelemetry.io/otel/metric v1.27.0/go.mod h1:mVFgmRlhljgBiuk/MP/oKylr4hs85GZAylncepAX/ak= +go.opentelemetry.io/otel/trace v1.27.0 h1:IqYb813p7cmbHk0a5y6pD5JPakbVfftRXABGt5/Rscw= +go.opentelemetry.io/otel/trace v1.27.0/go.mod h1:6RiD1hkAprV4/q+yd2ln1HG9GoPx39SuvvstaLBl+l4= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU= +gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU= diff --git a/opentelemetry.go b/opentelemetry.go new file mode 100644 index 0000000..a59c2e1 --- /dev/null +++ b/opentelemetry.go @@ -0,0 +1,196 @@ +package amqp091 + +import ( + "context" + "fmt" + + "github.com/getoutreach/gobox/pkg/app" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/propagation" + semconv "go.opentelemetry.io/otel/semconv/v1.25.0" + "go.opentelemetry.io/otel/trace" +) + +// tracer is the tracer used by the package +var tracer = otel.Tracer("amqp091") + +// amqpHeaderCarrier is a carrier for AMQP headers. +type amqpHeaderCarrier Table + +// Get returns the value associated with the passed key. +func (c amqpHeaderCarrier) Get(key string) string { + v, ok := c[key] + if !ok { + return "" + } + s, ok := v.(string) + if ok { + return s + } + return "" +} + +// Set stores the key-value pair. +func (c amqpHeaderCarrier) Set(key, value string) { + c[key] = value +} + +// Keys lists the keys stored in this carrier. +func (c amqpHeaderCarrier) Keys() []string { + keys := []string{} + for k, v := range c { + if _, ok := v.(string); !ok { + continue + } + keys = append(keys, k) + } + return keys +} + +// ensure amqpHeaderCarrier implements the TextMapCarrier interface +var _ propagation.TextMapCarrier = amqpHeaderCarrier{} + +// InjectSpan injects the span context into the AMQP headers. +// It returns the input headers with the span headers added. +func injectSpanFromContext(ctx context.Context, headers Table) Table { + carrier := amqpHeaderCarrier(headers) + if carrier == nil { + carrier = amqpHeaderCarrier{} + } + otel.GetTextMapPropagator().Inject(ctx, carrier) + return Table(carrier) +} + +// ExtractSpanContext extracts the span context from the AMQP headers. +func ExtractSpanContext(ctx context.Context, headers Table) context.Context { + carrier := amqpHeaderCarrier(headers) + if carrier == nil { + carrier = amqpHeaderCarrier{} + } + return otel.GetTextMapPropagator().Extract(ctx, carrier) +} + +// extractSpanFromReturn creates a span for a returned message +func extractSpanFromReturn( + ctx context.Context, + ret Return, +) (context.Context, trace.Span) { + spctx := ExtractSpanContext(ctx, ret.Headers) + spanName := fmt.Sprintf("%s return", ret.RoutingKey) + return tracer.Start(ctx, spanName, + trace.WithLinks(trace.LinkFromContext(spctx, semconv.MessagingMessageID(ret.MessageId))), + trace.WithSpanKind(trace.SpanKindProducer), + trace.WithAttributes( + semconv.MessagingRabbitmqDestinationRoutingKey(ret.RoutingKey), + semconv.MessagingDestinationPublishName(ret.Exchange), + semconv.MessagingOperationKey.String("return"), + semconv.MessagingMessageID(ret.MessageId), + semconv.MessagingMessageConversationID(ret.CorrelationId), + semconv.MessagingSystemRabbitmq, + semconv.MessagingClientIDKey.String(app.Info().Name), + semconv.ErrorTypeKey.String(ret.ReplyText), + // semconv.NetPeerPort(5672 + // semconv.NetPeerIP("localhost") + // semconv.ServerAddress("localhost") + ), + trace.WithNewRoot(), + ) +} + +// settleDelivery creates a span for the acking of a delivery +func settleDelivery( + ctx context.Context, + consumerTag string, + multiple, requeue bool, +) (context.Context, trace.Span) { + return tracer.Start(ctx, + fmt.Sprintf("%s settle", consumerTag), + trace.WithAttributes( + attribute.Bool("multiple", multiple), + attribute.Bool("requeue", requeue))) +} + +// extractLinkFromDelivery creates a link for a delivered message +// +// The recommend way to link a consumer to the publisher is with a link, since +// the two operations can be quit far apart in time. If you have a usecase +// where you would like the spans to have a parent child relationship instead, use +// ExtractSpanContext +// +// The consumer span may containe 1 or more messages, which is why we don't +// manufacture the span in its entirety here. +func extractLinkFromDelivery(ctx context.Context, del Delivery) trace.Link { + spctx := ExtractSpanContext(ctx, del.Headers) + return trace.LinkFromContext(spctx, semconv.MessagingMessageID(del.MessageId)) +} + +// spanForDelivery creates a span for the delivered messages +// returns a new context with the span headers and the span +func spanForDelivery( + ctx context.Context, + consumerTag string, + delivery []Delivery, + options ...trace.SpanStartOption, +) (context.Context, trace.Span) { + spanName := fmt.Sprintf("%s consume", consumerTag) + links := []trace.Link{} + for _, del := range delivery { + links = append(links, extractLinkFromDelivery(ctx, del)) + } + return tracer.Start( + ctx, + spanName, + append( + options, + trace.WithLinks(links...), + trace.WithSpanKind(trace.SpanKindConsumer), + )..., + ) +} + +// Publish creates a span for a publishing message returns a new context with +// the span headers, the mssage that was being published with span headers +// injected, and a function to be called with the result of the publish +func spanForPublication( + ctx context.Context, + publishing Publishing, + exchange, routinKey string, + immediate bool, +) (context.Context, Publishing, func(err error, typ string)) { + spanName := fmt.Sprintf("%s publish", routinKey) + ctx, span := tracer.Start(ctx, spanName, + trace.WithSpanKind(trace.SpanKindProducer), + trace.WithAttributes( + semconv.MessagingRabbitmqDestinationRoutingKey(routinKey), + semconv.MessagingDestinationPublishName(exchange), + semconv.MessagingOperationPublish, + semconv.MessagingMessageID(publishing.MessageId), + semconv.MessagingMessageConversationID(publishing.CorrelationId), + semconv.MessagingSystemRabbitmq, + semconv.MessagingClientIDKey.String(app.Info().Name), + semconv.MessagingMessageBodySize(len(publishing.Body)), + semconv.MessageTypeSent, + attribute.Bool("messaging.immediate", immediate), + + // TODO(AWinterman): Add these attributes + // semconv.NetPeerPort(5672) // nolint:gocritic // Why: see to do + // semconv.NetworkPeerAddress() // nolint:gocritic // Why: see to do + // semconv.NetPeerPort() // nolint:gocritic // Why: see to do + ), + ) + headers := injectSpanFromContext(ctx, publishing.Headers) + publishing.Headers = Table(headers) + + return ctx, publishing, func(err error, typ string) { + if err != nil { + span.RecordError(err) + span.SetAttributes( + semconv.ErrorTypeKey.String(typ), + ) + span.SetStatus(codes.Error, err.Error()) + } + span.End() + } +} From 75a6aeb0617c1b7a2d9620f644e656f949ca3133 Mon Sep 17 00:00:00 2001 From: AndrewWinterman Date: Mon, 29 Jul 2024 22:20:17 -0700 Subject: [PATCH 2/4] feat(otel): take a stab at wiring otel up --- channel.go | 22 +++++++++++++------- delivery.go | 54 ++++++++++++++++++++++++++++++++++++++++++++++++ opentelemetry.go | 37 +++++++++++++++++++-------------- 3 files changed, 91 insertions(+), 22 deletions(-) diff --git a/channel.go b/channel.go index d08f918..0deb01f 100644 --- a/channel.go +++ b/channel.go @@ -1492,7 +1492,7 @@ func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg /* PublishWithContext sends a Publishing from the client to an exchange on the server. -NOTE: this function is equivalent to [Channel.Publish]. Context is not honoured. +NOTE: Context termination is not honoured. When you want a single message to be delivered to a single queue, you can publish to the default exchange with the routingKey of the queue name. This is @@ -1523,8 +1523,9 @@ confirmations start at 1. Exit when all publishings are confirmed. When Publish does not return an error and the channel is in confirm mode, the internal counter for DeliveryTags with the first confirmation starts at 1. */ -func (ch *Channel) PublishWithContext(_ context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) error { - return ch.Publish(exchange, key, mandatory, immediate, msg) +func (ch *Channel) PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) error { + _, err := ch.PublishWithDeferredConfirmWithContext(ctx, exchange, key, mandatory, immediate, msg) + return err } /* @@ -1583,11 +1584,18 @@ DeferredConfirmation, allowing the caller to wait on the publisher confirmation for this message. If the channel has not been put into confirm mode, the DeferredConfirmation will be nil. -NOTE: PublishWithDeferredConfirmWithContext is equivalent to its non-context variant. The context passed -to this function is not honoured. +NOTE: PublishWithDeferredConfirmWithContext is equivalent to its non-context +variant. The termination of the context passed to this function is not +honoured. */ -func (ch *Channel) PublishWithDeferredConfirmWithContext(_ context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) (*DeferredConfirmation, error) { - return ch.PublishWithDeferredConfirm(exchange, key, mandatory, immediate, msg) +func (ch *Channel) PublishWithDeferredConfirmWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) (*DeferredConfirmation, error) { + _, msg, errFn := spanForPublication(ctx, msg, exchange, key, immediate) + dc, err := ch.PublishWithDeferredConfirm(exchange, key, mandatory, immediate, msg) + if err != nil { + errFn(err) + return nil, err + } + return dc, nil } /* diff --git a/delivery.go b/delivery.go index f692abb..6fa242b 100644 --- a/delivery.go +++ b/delivery.go @@ -6,8 +6,12 @@ package amqp091 import ( + "context" "errors" + "fmt" "time" + + "go.opentelemetry.io/otel/trace" ) var errDeliveryNotInitialized = errors.New("delivery not initialized") @@ -58,6 +62,21 @@ type Delivery struct { Body []byte } +// Span returns context and a span that for the delivery +// the resulting span is linked to the publication that created it, if it has +// the appropraite headers set. See [context-propagation] for more details +// +// [context-propagation]: https://opentelemetry.io/docs/concepts/context-propagation/ +func (d *Delivery) Span(ctx context.Context, options ...trace.SpanStartOption) (context.Context, trace.Span) { + return spanForDelivery(ctx, d.ConsumerTag, d, options...) +} + +// Link returns a link for the delivery. The link points to the publication, if +// the appropriate headers are set. +func (d *Delivery) Link(ctx context.Context) trace.Link { + return extractLinkFromDelivery(ctx, d) +} + func newDelivery(channel *Channel, msg messageWithContent) *Delivery { props, body := msg.getContent() @@ -171,3 +190,38 @@ func (d Delivery) Nack(multiple, requeue bool) error { } return d.Acknowledger.Nack(d.DeliveryTag, multiple, requeue) } + +type DeliveryResponse uint8 + +const ( + Ack DeliveryResponse = iota + Reject + Nack +) + +func (r DeliveryResponse) Name() string { + switch r { + case Ack: + return "ack" + case Nack: + return "nack" + case Reject: + return "reject" + default: + return "unknown" + } +} + +func (d *Delivery) Settle(ctx context.Context, response DeliveryResponse, multiple, requeue bool) error { + defer settleDelivery(ctx, d, response, multiple, requeue) + switch response { + case Ack: + return d.Ack(multiple) + case Nack: + return d.Nack(multiple, requeue) + case Reject: + return d.Reject(requeue) + default: + return fmt.Errorf("unknown operation %s", response.Name()) + } +} diff --git a/opentelemetry.go b/opentelemetry.go index a59c2e1..b561816 100644 --- a/opentelemetry.go +++ b/opentelemetry.go @@ -2,6 +2,7 @@ package amqp091 import ( "context" + "errors" "fmt" "github.com/getoutreach/gobox/pkg/app" @@ -102,12 +103,14 @@ func extractSpanFromReturn( // settleDelivery creates a span for the acking of a delivery func settleDelivery( ctx context.Context, - consumerTag string, + delivery *Delivery, + response DeliveryResponse, multiple, requeue bool, ) (context.Context, trace.Span) { return tracer.Start(ctx, - fmt.Sprintf("%s settle", consumerTag), + fmt.Sprintf("%s settle", delivery.RoutingKey), trace.WithAttributes( + attribute.String("messaging.operation.name", response.Name()), attribute.Bool("multiple", multiple), attribute.Bool("requeue", requeue))) } @@ -121,24 +124,25 @@ func settleDelivery( // // The consumer span may containe 1 or more messages, which is why we don't // manufacture the span in its entirety here. -func extractLinkFromDelivery(ctx context.Context, del Delivery) trace.Link { +func extractLinkFromDelivery(ctx context.Context, del *Delivery) trace.Link { spctx := ExtractSpanContext(ctx, del.Headers) - return trace.LinkFromContext(spctx, semconv.MessagingMessageID(del.MessageId)) + return trace.LinkFromContext(spctx, + semconv.MessagingMessageConversationID(del.CorrelationId), + semconv.MessagingMessageID(del.MessageId), + semconv.MessagingRabbitmqMessageDeliveryTag(int(del.DeliveryTag))) } // spanForDelivery creates a span for the delivered messages -// returns a new context with the span headers and the span +// returns a new context with the span headers and the span. func spanForDelivery( ctx context.Context, consumerTag string, - delivery []Delivery, + delivery *Delivery, options ...trace.SpanStartOption, ) (context.Context, trace.Span) { spanName := fmt.Sprintf("%s consume", consumerTag) links := []trace.Link{} - for _, del := range delivery { - links = append(links, extractLinkFromDelivery(ctx, del)) - } + links = append(links, extractLinkFromDelivery(ctx, delivery)) return tracer.Start( ctx, spanName, @@ -158,7 +162,7 @@ func spanForPublication( publishing Publishing, exchange, routinKey string, immediate bool, -) (context.Context, Publishing, func(err error, typ string)) { +) (context.Context, Publishing, func(err error)) { spanName := fmt.Sprintf("%s publish", routinKey) ctx, span := tracer.Start(ctx, spanName, trace.WithSpanKind(trace.SpanKindProducer), @@ -169,7 +173,7 @@ func spanForPublication( semconv.MessagingMessageID(publishing.MessageId), semconv.MessagingMessageConversationID(publishing.CorrelationId), semconv.MessagingSystemRabbitmq, - semconv.MessagingClientIDKey.String(app.Info().Name), + semconv.MessagingClientIDKey.String(publishing.AppId), semconv.MessagingMessageBodySize(len(publishing.Body)), semconv.MessageTypeSent, attribute.Bool("messaging.immediate", immediate), @@ -183,12 +187,15 @@ func spanForPublication( headers := injectSpanFromContext(ctx, publishing.Headers) publishing.Headers = Table(headers) - return ctx, publishing, func(err error, typ string) { + return ctx, publishing, func(err error) { if err != nil { span.RecordError(err) - span.SetAttributes( - semconv.ErrorTypeKey.String(typ), - ) + amqpErr := &Error{} + if errors.As(err, &amqpErr) { + span.SetAttributes( + semconv.ErrorTypeKey.String(amqpErr.Reason), + ) + } span.SetStatus(codes.Error, err.Error()) } span.End() From e0fa7c65ce563e4646430afc577a5547742adaf9 Mon Sep 17 00:00:00 2001 From: AndrewWinterman Date: Mon, 29 Jul 2024 22:29:15 -0700 Subject: [PATCH 3/4] fix: remove reference to outreach gobox lib --- go.mod | 1 - go.sum | 4 ---- opentelemetry.go | 2 -- 3 files changed, 7 deletions(-) diff --git a/go.mod b/go.mod index 43655d8..d9a1589 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,6 @@ go 1.21 toolchain go1.22.0 require ( - github.com/getoutreach/gobox v1.92.1 go.opentelemetry.io/otel v1.27.0 go.opentelemetry.io/otel/trace v1.27.0 go.uber.org/goleak v1.3.0 diff --git a/go.sum b/go.sum index 041b8f4..b34a22c 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,5 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/getoutreach/gobox v1.92.1 h1:MBDedZCUN+ef/ljBHAOSyVisqvR5dPlSwso1JdMPbXw= -github.com/getoutreach/gobox v1.92.1/go.mod h1:IPy+RNuOYRMTizH6iTr33myGKcRhjEIIHS2VMqzZL0A= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= @@ -23,5 +21,3 @@ go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU= -gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU= diff --git a/opentelemetry.go b/opentelemetry.go index b561816..e485102 100644 --- a/opentelemetry.go +++ b/opentelemetry.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" - "github.com/getoutreach/gobox/pkg/app" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" @@ -90,7 +89,6 @@ func extractSpanFromReturn( semconv.MessagingMessageID(ret.MessageId), semconv.MessagingMessageConversationID(ret.CorrelationId), semconv.MessagingSystemRabbitmq, - semconv.MessagingClientIDKey.String(app.Info().Name), semconv.ErrorTypeKey.String(ret.ReplyText), // semconv.NetPeerPort(5672 // semconv.NetPeerIP("localhost") From 47aa58b54cf20000c7827e4451f2d3bbd99fb505 Mon Sep 17 00:00:00 2001 From: AndrewWinterman Date: Mon, 19 Aug 2024 11:48:52 -0700 Subject: [PATCH 4/4] a smidge of polish --- delivery.go | 241 ++++++++++++++++++++-------------------- opentelemetry.go | 279 ++++++++++++++++++++++++----------------------- return.go | 93 +++++++++------- 3 files changed, 319 insertions(+), 294 deletions(-) diff --git a/delivery.go b/delivery.go index 6fa242b..b62825c 100644 --- a/delivery.go +++ b/delivery.go @@ -6,12 +6,12 @@ package amqp091 import ( - "context" - "errors" - "fmt" - "time" + "context" + "errors" + "fmt" + "time" - "go.opentelemetry.io/otel/trace" + "go.opentelemetry.io/otel/trace" ) var errDeliveryNotInitialized = errors.New("delivery not initialized") @@ -21,45 +21,45 @@ var errDeliveryNotInitialized = errors.New("delivery not initialized") // // Applications can provide mock implementations in tests of Delivery handlers. type Acknowledger interface { - Ack(tag uint64, multiple bool) error - Nack(tag uint64, multiple, requeue bool) error - Reject(tag uint64, requeue bool) error + Ack(tag uint64, multiple bool) error + Nack(tag uint64, multiple, requeue bool) error + Reject(tag uint64, requeue bool) error } // Delivery captures the fields for a previously delivered message resident in // a queue to be delivered by the server to a consumer from Channel.Consume or // Channel.Get. type Delivery struct { - Acknowledger Acknowledger // the channel from which this delivery arrived - - Headers Table // Application or header exchange table - - // Properties - ContentType string // MIME content type - ContentEncoding string // MIME content encoding - DeliveryMode uint8 // queue implementation use - non-persistent (1) or persistent (2) - Priority uint8 // queue implementation use - 0 to 9 - CorrelationId string // application use - correlation identifier - ReplyTo string // application use - address to reply to (ex: RPC) - Expiration string // implementation use - message expiration spec - MessageId string // application use - message identifier - Timestamp time.Time // application use - message timestamp - Type string // application use - message type name - UserId string // application use - creating user - should be authenticated user - AppId string // application use - creating application id - - // Valid only with Channel.Consume - ConsumerTag string - - // Valid only with Channel.Get - MessageCount uint32 - - DeliveryTag uint64 - Redelivered bool - Exchange string // basic.publish exchange - RoutingKey string // basic.publish routing key - - Body []byte + Acknowledger Acknowledger // the channel from which this delivery arrived + + Headers Table // Application or header exchange table + + // Properties + ContentType string // MIME content type + ContentEncoding string // MIME content encoding + DeliveryMode uint8 // queue implementation use - non-persistent (1) or persistent (2) + Priority uint8 // queue implementation use - 0 to 9 + CorrelationId string // application use - correlation identifier + ReplyTo string // application use - address to reply to (ex: RPC) + Expiration string // implementation use - message expiration spec + MessageId string // application use - message identifier + Timestamp time.Time // application use - message timestamp + Type string // application use - message type name + UserId string // application use - creating user - should be authenticated user + AppId string // application use - creating application id + + // Valid only with Channel.Consume + ConsumerTag string + + // Valid only with Channel.Get + MessageCount uint32 + + DeliveryTag uint64 + Redelivered bool + Exchange string // basic.publish exchange + RoutingKey string // basic.publish routing key + + Body []byte } // Span returns context and a span that for the delivery @@ -67,57 +67,60 @@ type Delivery struct { // the appropraite headers set. See [context-propagation] for more details // // [context-propagation]: https://opentelemetry.io/docs/concepts/context-propagation/ -func (d *Delivery) Span(ctx context.Context, options ...trace.SpanStartOption) (context.Context, trace.Span) { - return spanForDelivery(ctx, d.ConsumerTag, d, options...) +func (d Delivery) Span( + ctx context.Context, + options ...trace.SpanStartOption, +) (context.Context, trace.Span) { + return spanForDelivery(ctx, &d, options...) } // Link returns a link for the delivery. The link points to the publication, if // the appropriate headers are set. -func (d *Delivery) Link(ctx context.Context) trace.Link { - return extractLinkFromDelivery(ctx, d) +func (d Delivery) Link(ctx context.Context) trace.Link { + return extractLinkFromDelivery(ctx, &d) } func newDelivery(channel *Channel, msg messageWithContent) *Delivery { - props, body := msg.getContent() - - delivery := Delivery{ - Acknowledger: channel, - - Headers: props.Headers, - ContentType: props.ContentType, - ContentEncoding: props.ContentEncoding, - DeliveryMode: props.DeliveryMode, - Priority: props.Priority, - CorrelationId: props.CorrelationId, - ReplyTo: props.ReplyTo, - Expiration: props.Expiration, - MessageId: props.MessageId, - Timestamp: props.Timestamp, - Type: props.Type, - UserId: props.UserId, - AppId: props.AppId, - - Body: body, - } - - // Properties for the delivery types - switch m := msg.(type) { - case *basicDeliver: - delivery.ConsumerTag = m.ConsumerTag - delivery.DeliveryTag = m.DeliveryTag - delivery.Redelivered = m.Redelivered - delivery.Exchange = m.Exchange - delivery.RoutingKey = m.RoutingKey - - case *basicGetOk: - delivery.MessageCount = m.MessageCount - delivery.DeliveryTag = m.DeliveryTag - delivery.Redelivered = m.Redelivered - delivery.Exchange = m.Exchange - delivery.RoutingKey = m.RoutingKey - } - - return &delivery + props, body := msg.getContent() + + delivery := Delivery{ + Acknowledger: channel, + + Headers: props.Headers, + ContentType: props.ContentType, + ContentEncoding: props.ContentEncoding, + DeliveryMode: props.DeliveryMode, + Priority: props.Priority, + CorrelationId: props.CorrelationId, + ReplyTo: props.ReplyTo, + Expiration: props.Expiration, + MessageId: props.MessageId, + Timestamp: props.Timestamp, + Type: props.Type, + UserId: props.UserId, + AppId: props.AppId, + + Body: body, + } + + // Properties for the delivery types + switch m := msg.(type) { + case *basicDeliver: + delivery.ConsumerTag = m.ConsumerTag + delivery.DeliveryTag = m.DeliveryTag + delivery.Redelivered = m.Redelivered + delivery.Exchange = m.Exchange + delivery.RoutingKey = m.RoutingKey + + case *basicGetOk: + delivery.MessageCount = m.MessageCount + delivery.DeliveryTag = m.DeliveryTag + delivery.Redelivered = m.Redelivered + delivery.Exchange = m.Exchange + delivery.RoutingKey = m.RoutingKey + } + + return &delivery } /* @@ -140,10 +143,10 @@ Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every delivery that is not automatically acknowledged. */ func (d Delivery) Ack(multiple bool) error { - if d.Acknowledger == nil { - return errDeliveryNotInitialized - } - return d.Acknowledger.Ack(d.DeliveryTag, multiple) + if d.Acknowledger == nil { + return errDeliveryNotInitialized + } + return d.Acknowledger.Ack(d.DeliveryTag, multiple) } /* @@ -160,10 +163,10 @@ Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every delivery that is not automatically acknowledged. */ func (d Delivery) Reject(requeue bool) error { - if d.Acknowledger == nil { - return errDeliveryNotInitialized - } - return d.Acknowledger.Reject(d.DeliveryTag, requeue) + if d.Acknowledger == nil { + return errDeliveryNotInitialized + } + return d.Acknowledger.Reject(d.DeliveryTag, requeue) } /* @@ -185,43 +188,43 @@ Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every delivery that is not automatically acknowledged. */ func (d Delivery) Nack(multiple, requeue bool) error { - if d.Acknowledger == nil { - return errDeliveryNotInitialized - } - return d.Acknowledger.Nack(d.DeliveryTag, multiple, requeue) + if d.Acknowledger == nil { + return errDeliveryNotInitialized + } + return d.Acknowledger.Nack(d.DeliveryTag, multiple, requeue) } type DeliveryResponse uint8 const ( - Ack DeliveryResponse = iota - Reject - Nack + Ack DeliveryResponse = iota + Reject + Nack ) func (r DeliveryResponse) Name() string { - switch r { - case Ack: - return "ack" - case Nack: - return "nack" - case Reject: - return "reject" - default: - return "unknown" - } + switch r { + case Ack: + return "ack" + case Nack: + return "nack" + case Reject: + return "reject" + default: + return "unknown" + } } -func (d *Delivery) Settle(ctx context.Context, response DeliveryResponse, multiple, requeue bool) error { - defer settleDelivery(ctx, d, response, multiple, requeue) - switch response { - case Ack: - return d.Ack(multiple) - case Nack: - return d.Nack(multiple, requeue) - case Reject: - return d.Reject(requeue) - default: - return fmt.Errorf("unknown operation %s", response.Name()) - } +func (d Delivery) Settle(ctx context.Context, response DeliveryResponse, multiple, requeue bool) error { + defer settleDelivery(ctx, &d, response, multiple, requeue) + switch response { + case Ack: + return d.Ack(multiple) + case Nack: + return d.Nack(multiple, requeue) + case Reject: + return d.Reject(requeue) + default: + return fmt.Errorf("unknown operation %s", response.Name()) + } } diff --git a/opentelemetry.go b/opentelemetry.go index e485102..5d0f51a 100644 --- a/opentelemetry.go +++ b/opentelemetry.go @@ -1,16 +1,16 @@ package amqp091 import ( - "context" - "errors" - "fmt" - - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" - "go.opentelemetry.io/otel/propagation" - semconv "go.opentelemetry.io/otel/semconv/v1.25.0" - "go.opentelemetry.io/otel/trace" + "context" + "errors" + "fmt" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/propagation" + semconv "go.opentelemetry.io/otel/semconv/v1.25.0" + "go.opentelemetry.io/otel/trace" ) // tracer is the tracer used by the package @@ -21,96 +21,118 @@ type amqpHeaderCarrier Table // Get returns the value associated with the passed key. func (c amqpHeaderCarrier) Get(key string) string { - v, ok := c[key] - if !ok { - return "" - } - s, ok := v.(string) - if ok { - return s - } - return "" + v, ok := c[key] + if !ok { + return "" + } + s, ok := v.(string) + if ok { + return s + } + return "" } // Set stores the key-value pair. func (c amqpHeaderCarrier) Set(key, value string) { - c[key] = value + c[key] = value } // Keys lists the keys stored in this carrier. func (c amqpHeaderCarrier) Keys() []string { - keys := []string{} - for k, v := range c { - if _, ok := v.(string); !ok { - continue - } - keys = append(keys, k) - } - return keys + var keys []string + for k, v := range c { + if _, ok := v.(string); !ok { + continue + } + keys = append(keys, k) + } + return keys } // ensure amqpHeaderCarrier implements the TextMapCarrier interface var _ propagation.TextMapCarrier = amqpHeaderCarrier{} +// keys for conventions in this file +var ( + // settleResponseKey is the key for indicating how the message was settled + settleResponseKey = attribute.Key("messaging.settle.response_type") + // settleMultipleKey indicates whether multiple outstanding messages were settled at once. + settleMultipleKey = attribute.Key("messaging.settle.multiple") + // settleRequeueKey indicates whether the messages were requeued. + settleRequeueKey = attribute.Key("messaging.settle.multiple") + // publishImmediate key indicates whether the AMQP immediate flag was set on the publishing. + publishImmediateKy = attribute.Key("messaging.publish.immediate") + // returnOperation indicates an AMQP 091 return + returnOperation = semconv.MessagingOperationKey.String("return") +) + // InjectSpan injects the span context into the AMQP headers. // It returns the input headers with the span headers added. func injectSpanFromContext(ctx context.Context, headers Table) Table { - carrier := amqpHeaderCarrier(headers) - if carrier == nil { - carrier = amqpHeaderCarrier{} - } - otel.GetTextMapPropagator().Inject(ctx, carrier) - return Table(carrier) + carrier := amqpHeaderCarrier(headers) + if carrier == nil { + carrier = amqpHeaderCarrier{} + } + otel.GetTextMapPropagator().Inject(ctx, carrier) + return Table(carrier) } // ExtractSpanContext extracts the span context from the AMQP headers. func ExtractSpanContext(ctx context.Context, headers Table) context.Context { - carrier := amqpHeaderCarrier(headers) - if carrier == nil { - carrier = amqpHeaderCarrier{} - } - return otel.GetTextMapPropagator().Extract(ctx, carrier) + carrier := amqpHeaderCarrier(headers) + if carrier == nil { + carrier = amqpHeaderCarrier{} + } + return otel.GetTextMapPropagator().Extract(ctx, carrier) } // extractSpanFromReturn creates a span for a returned message func extractSpanFromReturn( - ctx context.Context, - ret Return, + ctx context.Context, + ret Return, + options ...trace.SpanStartOption, ) (context.Context, trace.Span) { - spctx := ExtractSpanContext(ctx, ret.Headers) - spanName := fmt.Sprintf("%s return", ret.RoutingKey) - return tracer.Start(ctx, spanName, - trace.WithLinks(trace.LinkFromContext(spctx, semconv.MessagingMessageID(ret.MessageId))), - trace.WithSpanKind(trace.SpanKindProducer), - trace.WithAttributes( - semconv.MessagingRabbitmqDestinationRoutingKey(ret.RoutingKey), - semconv.MessagingDestinationPublishName(ret.Exchange), - semconv.MessagingOperationKey.String("return"), - semconv.MessagingMessageID(ret.MessageId), - semconv.MessagingMessageConversationID(ret.CorrelationId), - semconv.MessagingSystemRabbitmq, - semconv.ErrorTypeKey.String(ret.ReplyText), - // semconv.NetPeerPort(5672 - // semconv.NetPeerIP("localhost") - // semconv.ServerAddress("localhost") - ), - trace.WithNewRoot(), - ) + spctx := ExtractSpanContext(ctx, ret.Headers) + spanName := fmt.Sprintf("return %s %s", ret.Exchange, ret.RoutingKey) + + return tracer.Start(ctx, spanName, + append(options, + trace.WithLinks(trace.LinkFromContext(spctx, semconv.MessagingMessageID(ret.MessageId))), + trace.WithSpanKind(trace.SpanKindProducer), + trace.WithAttributes( + semconv.MessagingRabbitmqDestinationRoutingKey(ret.RoutingKey), + semconv.MessagingDestinationPublishName(ret.Exchange), + returnOperation, + semconv.MessagingMessageID(ret.MessageId), + semconv.MessagingMessageConversationID(ret.CorrelationId), + semconv.MessagingSystemRabbitmq, + semconv.ErrorTypeKey.String(ret.ReplyText), + // semconv.NetPeerPort(5672 + // semconv.NetPeerIP("localhost") + // semconv.ServerAddress("localhost") + ), + trace.WithNewRoot(), + )..., + + ) } // settleDelivery creates a span for the acking of a delivery func settleDelivery( - ctx context.Context, - delivery *Delivery, - response DeliveryResponse, - multiple, requeue bool, + ctx context.Context, + delivery *Delivery, + response DeliveryResponse, + multiple, requeue bool, ) (context.Context, trace.Span) { - return tracer.Start(ctx, - fmt.Sprintf("%s settle", delivery.RoutingKey), - trace.WithAttributes( - attribute.String("messaging.operation.name", response.Name()), - attribute.Bool("multiple", multiple), - attribute.Bool("requeue", requeue))) + return tracer.Start(ctx, + fmt.Sprintf("settle %s %s", delivery.Exchange, delivery.RoutingKey), + trace.WithAttributes( + semconv.MessagingOperationSettle, + settleResponseKey.String(response.Name()), + settleMultipleKey.Bool(multiple), + settleRequeueKey.Bool(requeue), + ), + ) } // extractLinkFromDelivery creates a link for a delivered message @@ -123,79 +145,68 @@ func settleDelivery( // The consumer span may containe 1 or more messages, which is why we don't // manufacture the span in its entirety here. func extractLinkFromDelivery(ctx context.Context, del *Delivery) trace.Link { - spctx := ExtractSpanContext(ctx, del.Headers) - return trace.LinkFromContext(spctx, - semconv.MessagingMessageConversationID(del.CorrelationId), - semconv.MessagingMessageID(del.MessageId), - semconv.MessagingRabbitmqMessageDeliveryTag(int(del.DeliveryTag))) + spctx := ExtractSpanContext(ctx, del.Headers) + return trace.LinkFromContext(spctx, + semconv.MessagingMessageConversationID(del.CorrelationId), + semconv.MessagingMessageID(del.MessageId), + semconv.MessagingRabbitmqMessageDeliveryTag(int(del.DeliveryTag))) } // spanForDelivery creates a span for the delivered messages // returns a new context with the span headers and the span. -func spanForDelivery( - ctx context.Context, - consumerTag string, - delivery *Delivery, - options ...trace.SpanStartOption, -) (context.Context, trace.Span) { - spanName := fmt.Sprintf("%s consume", consumerTag) - links := []trace.Link{} - links = append(links, extractLinkFromDelivery(ctx, delivery)) - return tracer.Start( - ctx, - spanName, - append( - options, - trace.WithLinks(links...), - trace.WithSpanKind(trace.SpanKindConsumer), - )..., - ) +func spanForDelivery(ctx context.Context, delivery *Delivery, options ...trace.SpanStartOption, ) (context.Context, trace.Span) { + spanName := fmt.Sprintf("consume %s %s", delivery.Exchange, delivery.RoutingKey) + var links []trace.Link + links = append(links, extractLinkFromDelivery(ctx, delivery)) + return tracer.Start( + ctx, + spanName, + append( + options, + trace.WithLinks(links...), + trace.WithSpanKind(trace.SpanKindConsumer), + )..., + ) } // Publish creates a span for a publishing message returns a new context with // the span headers, the mssage that was being published with span headers // injected, and a function to be called with the result of the publish func spanForPublication( - ctx context.Context, - publishing Publishing, - exchange, routinKey string, - immediate bool, + ctx context.Context, + publishing Publishing, + exchange, routinKey string, + immediate bool, ) (context.Context, Publishing, func(err error)) { - spanName := fmt.Sprintf("%s publish", routinKey) - ctx, span := tracer.Start(ctx, spanName, - trace.WithSpanKind(trace.SpanKindProducer), - trace.WithAttributes( - semconv.MessagingRabbitmqDestinationRoutingKey(routinKey), - semconv.MessagingDestinationPublishName(exchange), - semconv.MessagingOperationPublish, - semconv.MessagingMessageID(publishing.MessageId), - semconv.MessagingMessageConversationID(publishing.CorrelationId), - semconv.MessagingSystemRabbitmq, - semconv.MessagingClientIDKey.String(publishing.AppId), - semconv.MessagingMessageBodySize(len(publishing.Body)), - semconv.MessageTypeSent, - attribute.Bool("messaging.immediate", immediate), - - // TODO(AWinterman): Add these attributes - // semconv.NetPeerPort(5672) // nolint:gocritic // Why: see to do - // semconv.NetworkPeerAddress() // nolint:gocritic // Why: see to do - // semconv.NetPeerPort() // nolint:gocritic // Why: see to do - ), - ) - headers := injectSpanFromContext(ctx, publishing.Headers) - publishing.Headers = Table(headers) - - return ctx, publishing, func(err error) { - if err != nil { - span.RecordError(err) - amqpErr := &Error{} - if errors.As(err, &amqpErr) { - span.SetAttributes( - semconv.ErrorTypeKey.String(amqpErr.Reason), - ) - } - span.SetStatus(codes.Error, err.Error()) - } - span.End() - } + spanName := fmt.Sprintf("%s publish", routinKey) + ctx, span := tracer.Start(ctx, spanName, + trace.WithSpanKind(trace.SpanKindProducer), + trace.WithAttributes( + semconv.MessagingRabbitmqDestinationRoutingKey(routinKey), + semconv.MessagingDestinationPublishName(exchange), + semconv.MessagingOperationPublish, + semconv.MessagingMessageID(publishing.MessageId), + semconv.MessagingMessageConversationID(publishing.CorrelationId), + semconv.MessagingSystemRabbitmq, + semconv.MessagingClientIDKey.String(publishing.AppId), + semconv.MessagingMessageBodySize(len(publishing.Body)), + semconv.MessageTypeSent, + ), + ) + headers := injectSpanFromContext(ctx, publishing.Headers) + publishing.Headers = headers + + return ctx, publishing, func(err error) { + if err != nil { + span.RecordError(err) + amqpErr := &Error{} + if errors.As(err, &amqpErr) { + span.SetAttributes( + semconv.ErrorTypeKey.String(amqpErr.Reason), + ) + } + span.SetStatus(codes.Error, err.Error()) + } + span.End() + } } diff --git a/return.go b/return.go index cdc3875..356a844 100644 --- a/return.go +++ b/return.go @@ -6,59 +6,70 @@ package amqp091 import ( - "time" + "context" + "go.opentelemetry.io/otel/trace" + "time" ) // Return captures a flattened struct of fields returned by the server when a // Publishing is unable to be delivered either due to the `mandatory` flag set // and no route found, or `immediate` flag set and no free consumer. type Return struct { - ReplyCode uint16 // reason - ReplyText string // description - Exchange string // basic.publish exchange - RoutingKey string // basic.publish routing key + ReplyCode uint16 // reason + ReplyText string // description + Exchange string // basic.publish exchange + RoutingKey string // basic.publish routing key - // Properties - ContentType string // MIME content type - ContentEncoding string // MIME content encoding - Headers Table // Application or header exchange table - DeliveryMode uint8 // queue implementation use - non-persistent (1) or persistent (2) - Priority uint8 // queue implementation use - 0 to 9 - CorrelationId string // application use - correlation identifier - ReplyTo string // application use - address to to reply to (ex: RPC) - Expiration string // implementation use - message expiration spec - MessageId string // application use - message identifier - Timestamp time.Time // application use - message timestamp - Type string // application use - message type name - UserId string // application use - creating user id - AppId string // application use - creating application + // Properties + ContentType string // MIME content type + ContentEncoding string // MIME content encoding + Headers Table // Application or header exchange table + DeliveryMode uint8 // queue implementation use - non-persistent (1) or persistent (2) + Priority uint8 // queue implementation use - 0 to 9 + CorrelationId string // application use - correlation identifier + ReplyTo string // application use - address to to reply to (ex: RPC) + Expiration string // implementation use - message expiration spec + MessageId string // application use - message identifier + Timestamp time.Time // application use - message timestamp + Type string // application use - message type name + UserId string // application use - creating user id + AppId string // application use - creating application - Body []byte + Body []byte } func newReturn(msg basicReturn) *Return { - props, body := msg.getContent() + props, body := msg.getContent() - return &Return{ - ReplyCode: msg.ReplyCode, - ReplyText: msg.ReplyText, - Exchange: msg.Exchange, - RoutingKey: msg.RoutingKey, + return &Return{ + ReplyCode: msg.ReplyCode, + ReplyText: msg.ReplyText, + Exchange: msg.Exchange, + RoutingKey: msg.RoutingKey, - Headers: props.Headers, - ContentType: props.ContentType, - ContentEncoding: props.ContentEncoding, - DeliveryMode: props.DeliveryMode, - Priority: props.Priority, - CorrelationId: props.CorrelationId, - ReplyTo: props.ReplyTo, - Expiration: props.Expiration, - MessageId: props.MessageId, - Timestamp: props.Timestamp, - Type: props.Type, - UserId: props.UserId, - AppId: props.AppId, + Headers: props.Headers, + ContentType: props.ContentType, + ContentEncoding: props.ContentEncoding, + DeliveryMode: props.DeliveryMode, + Priority: props.Priority, + CorrelationId: props.CorrelationId, + ReplyTo: props.ReplyTo, + Expiration: props.Expiration, + MessageId: props.MessageId, + Timestamp: props.Timestamp, + Type: props.Type, + UserId: props.UserId, + AppId: props.AppId, - Body: body, - } + Body: body, + } +} + +// Span returns context and a span that for the delivery +// the resulting span is linked to the publication that created it, if it has +// the appropraite headers set. See [context-propagation] for more details +// +// [context-propagation]: https://opentelemetry.io/docs/concepts/context-propagation/ +func (r Return) Span(ctx context.Context, options ...trace.SpanStartOption) (context.Context, trace.Span) { + return extractSpanFromReturn(ctx, r, options...) }