diff --git a/channel.go b/channel.go index d08f918..0deb01f 100644 --- a/channel.go +++ b/channel.go @@ -1492,7 +1492,7 @@ func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg /* PublishWithContext sends a Publishing from the client to an exchange on the server. -NOTE: this function is equivalent to [Channel.Publish]. Context is not honoured. +NOTE: Context termination is not honoured. When you want a single message to be delivered to a single queue, you can publish to the default exchange with the routingKey of the queue name. This is @@ -1523,8 +1523,9 @@ confirmations start at 1. Exit when all publishings are confirmed. When Publish does not return an error and the channel is in confirm mode, the internal counter for DeliveryTags with the first confirmation starts at 1. */ -func (ch *Channel) PublishWithContext(_ context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) error { - return ch.Publish(exchange, key, mandatory, immediate, msg) +func (ch *Channel) PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) error { + _, err := ch.PublishWithDeferredConfirmWithContext(ctx, exchange, key, mandatory, immediate, msg) + return err } /* @@ -1583,11 +1584,18 @@ DeferredConfirmation, allowing the caller to wait on the publisher confirmation for this message. If the channel has not been put into confirm mode, the DeferredConfirmation will be nil. -NOTE: PublishWithDeferredConfirmWithContext is equivalent to its non-context variant. The context passed -to this function is not honoured. +NOTE: PublishWithDeferredConfirmWithContext is equivalent to its non-context +variant. The termination of the context passed to this function is not +honoured. */ -func (ch *Channel) PublishWithDeferredConfirmWithContext(_ context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) (*DeferredConfirmation, error) { - return ch.PublishWithDeferredConfirm(exchange, key, mandatory, immediate, msg) +func (ch *Channel) PublishWithDeferredConfirmWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) (*DeferredConfirmation, error) { + _, msg, errFn := spanForPublication(ctx, msg, exchange, key, immediate) + dc, err := ch.PublishWithDeferredConfirm(exchange, key, mandatory, immediate, msg) + if err != nil { + errFn(err) + return nil, err + } + return dc, nil } /* diff --git a/delivery.go b/delivery.go index f692abb..b62825c 100644 --- a/delivery.go +++ b/delivery.go @@ -6,8 +6,12 @@ package amqp091 import ( - "errors" - "time" + "context" + "errors" + "fmt" + "time" + + "go.opentelemetry.io/otel/trace" ) var errDeliveryNotInitialized = errors.New("delivery not initialized") @@ -17,88 +21,106 @@ var errDeliveryNotInitialized = errors.New("delivery not initialized") // // Applications can provide mock implementations in tests of Delivery handlers. type Acknowledger interface { - Ack(tag uint64, multiple bool) error - Nack(tag uint64, multiple, requeue bool) error - Reject(tag uint64, requeue bool) error + Ack(tag uint64, multiple bool) error + Nack(tag uint64, multiple, requeue bool) error + Reject(tag uint64, requeue bool) error } // Delivery captures the fields for a previously delivered message resident in // a queue to be delivered by the server to a consumer from Channel.Consume or // Channel.Get. type Delivery struct { - Acknowledger Acknowledger // the channel from which this delivery arrived - - Headers Table // Application or header exchange table - - // Properties - ContentType string // MIME content type - ContentEncoding string // MIME content encoding - DeliveryMode uint8 // queue implementation use - non-persistent (1) or persistent (2) - Priority uint8 // queue implementation use - 0 to 9 - CorrelationId string // application use - correlation identifier - ReplyTo string // application use - address to reply to (ex: RPC) - Expiration string // implementation use - message expiration spec - MessageId string // application use - message identifier - Timestamp time.Time // application use - message timestamp - Type string // application use - message type name - UserId string // application use - creating user - should be authenticated user - AppId string // application use - creating application id - - // Valid only with Channel.Consume - ConsumerTag string - - // Valid only with Channel.Get - MessageCount uint32 - - DeliveryTag uint64 - Redelivered bool - Exchange string // basic.publish exchange - RoutingKey string // basic.publish routing key - - Body []byte + Acknowledger Acknowledger // the channel from which this delivery arrived + + Headers Table // Application or header exchange table + + // Properties + ContentType string // MIME content type + ContentEncoding string // MIME content encoding + DeliveryMode uint8 // queue implementation use - non-persistent (1) or persistent (2) + Priority uint8 // queue implementation use - 0 to 9 + CorrelationId string // application use - correlation identifier + ReplyTo string // application use - address to reply to (ex: RPC) + Expiration string // implementation use - message expiration spec + MessageId string // application use - message identifier + Timestamp time.Time // application use - message timestamp + Type string // application use - message type name + UserId string // application use - creating user - should be authenticated user + AppId string // application use - creating application id + + // Valid only with Channel.Consume + ConsumerTag string + + // Valid only with Channel.Get + MessageCount uint32 + + DeliveryTag uint64 + Redelivered bool + Exchange string // basic.publish exchange + RoutingKey string // basic.publish routing key + + Body []byte +} + +// Span returns context and a span that for the delivery +// the resulting span is linked to the publication that created it, if it has +// the appropraite headers set. See [context-propagation] for more details +// +// [context-propagation]: https://opentelemetry.io/docs/concepts/context-propagation/ +func (d Delivery) Span( + ctx context.Context, + options ...trace.SpanStartOption, +) (context.Context, trace.Span) { + return spanForDelivery(ctx, &d, options...) +} + +// Link returns a link for the delivery. The link points to the publication, if +// the appropriate headers are set. +func (d Delivery) Link(ctx context.Context) trace.Link { + return extractLinkFromDelivery(ctx, &d) } func newDelivery(channel *Channel, msg messageWithContent) *Delivery { - props, body := msg.getContent() - - delivery := Delivery{ - Acknowledger: channel, - - Headers: props.Headers, - ContentType: props.ContentType, - ContentEncoding: props.ContentEncoding, - DeliveryMode: props.DeliveryMode, - Priority: props.Priority, - CorrelationId: props.CorrelationId, - ReplyTo: props.ReplyTo, - Expiration: props.Expiration, - MessageId: props.MessageId, - Timestamp: props.Timestamp, - Type: props.Type, - UserId: props.UserId, - AppId: props.AppId, - - Body: body, - } - - // Properties for the delivery types - switch m := msg.(type) { - case *basicDeliver: - delivery.ConsumerTag = m.ConsumerTag - delivery.DeliveryTag = m.DeliveryTag - delivery.Redelivered = m.Redelivered - delivery.Exchange = m.Exchange - delivery.RoutingKey = m.RoutingKey - - case *basicGetOk: - delivery.MessageCount = m.MessageCount - delivery.DeliveryTag = m.DeliveryTag - delivery.Redelivered = m.Redelivered - delivery.Exchange = m.Exchange - delivery.RoutingKey = m.RoutingKey - } - - return &delivery + props, body := msg.getContent() + + delivery := Delivery{ + Acknowledger: channel, + + Headers: props.Headers, + ContentType: props.ContentType, + ContentEncoding: props.ContentEncoding, + DeliveryMode: props.DeliveryMode, + Priority: props.Priority, + CorrelationId: props.CorrelationId, + ReplyTo: props.ReplyTo, + Expiration: props.Expiration, + MessageId: props.MessageId, + Timestamp: props.Timestamp, + Type: props.Type, + UserId: props.UserId, + AppId: props.AppId, + + Body: body, + } + + // Properties for the delivery types + switch m := msg.(type) { + case *basicDeliver: + delivery.ConsumerTag = m.ConsumerTag + delivery.DeliveryTag = m.DeliveryTag + delivery.Redelivered = m.Redelivered + delivery.Exchange = m.Exchange + delivery.RoutingKey = m.RoutingKey + + case *basicGetOk: + delivery.MessageCount = m.MessageCount + delivery.DeliveryTag = m.DeliveryTag + delivery.Redelivered = m.Redelivered + delivery.Exchange = m.Exchange + delivery.RoutingKey = m.RoutingKey + } + + return &delivery } /* @@ -121,10 +143,10 @@ Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every delivery that is not automatically acknowledged. */ func (d Delivery) Ack(multiple bool) error { - if d.Acknowledger == nil { - return errDeliveryNotInitialized - } - return d.Acknowledger.Ack(d.DeliveryTag, multiple) + if d.Acknowledger == nil { + return errDeliveryNotInitialized + } + return d.Acknowledger.Ack(d.DeliveryTag, multiple) } /* @@ -141,10 +163,10 @@ Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every delivery that is not automatically acknowledged. */ func (d Delivery) Reject(requeue bool) error { - if d.Acknowledger == nil { - return errDeliveryNotInitialized - } - return d.Acknowledger.Reject(d.DeliveryTag, requeue) + if d.Acknowledger == nil { + return errDeliveryNotInitialized + } + return d.Acknowledger.Reject(d.DeliveryTag, requeue) } /* @@ -166,8 +188,43 @@ Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every delivery that is not automatically acknowledged. */ func (d Delivery) Nack(multiple, requeue bool) error { - if d.Acknowledger == nil { - return errDeliveryNotInitialized - } - return d.Acknowledger.Nack(d.DeliveryTag, multiple, requeue) + if d.Acknowledger == nil { + return errDeliveryNotInitialized + } + return d.Acknowledger.Nack(d.DeliveryTag, multiple, requeue) +} + +type DeliveryResponse uint8 + +const ( + Ack DeliveryResponse = iota + Reject + Nack +) + +func (r DeliveryResponse) Name() string { + switch r { + case Ack: + return "ack" + case Nack: + return "nack" + case Reject: + return "reject" + default: + return "unknown" + } +} + +func (d Delivery) Settle(ctx context.Context, response DeliveryResponse, multiple, requeue bool) error { + defer settleDelivery(ctx, &d, response, multiple, requeue) + switch response { + case Ack: + return d.Ack(multiple) + case Nack: + return d.Nack(multiple, requeue) + case Reject: + return d.Reject(requeue) + default: + return fmt.Errorf("unknown operation %s", response.Name()) + } } diff --git a/go.mod b/go.mod index d9b868c..d9a1589 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,17 @@ module github.com/rabbitmq/amqp091-go -go 1.20 +go 1.21 -require go.uber.org/goleak v1.3.0 +toolchain go1.22.0 + +require ( + go.opentelemetry.io/otel v1.27.0 + go.opentelemetry.io/otel/trace v1.27.0 + go.uber.org/goleak v1.3.0 +) + +require ( + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + go.opentelemetry.io/otel/metric v1.27.0 // indirect +) diff --git a/go.sum b/go.sum index 6037995..b34a22c 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,23 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= -github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +go.opentelemetry.io/otel v1.27.0 h1:9BZoF3yMK/O1AafMiQTVu0YDj5Ea4hPhxCs7sGva+cg= +go.opentelemetry.io/otel v1.27.0/go.mod h1:DMpAK8fzYRzs+bi3rS5REupisuqTheUlSZJ1WnZaPAQ= +go.opentelemetry.io/otel/metric v1.27.0 h1:hvj3vdEKyeCi4YaYfNjv2NUje8FqKqUY8IlF0FxV/ik= +go.opentelemetry.io/otel/metric v1.27.0/go.mod h1:mVFgmRlhljgBiuk/MP/oKylr4hs85GZAylncepAX/ak= +go.opentelemetry.io/otel/trace v1.27.0 h1:IqYb813p7cmbHk0a5y6pD5JPakbVfftRXABGt5/Rscw= +go.opentelemetry.io/otel/trace v1.27.0/go.mod h1:6RiD1hkAprV4/q+yd2ln1HG9GoPx39SuvvstaLBl+l4= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/opentelemetry.go b/opentelemetry.go new file mode 100644 index 0000000..5d0f51a --- /dev/null +++ b/opentelemetry.go @@ -0,0 +1,212 @@ +package amqp091 + +import ( + "context" + "errors" + "fmt" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/propagation" + semconv "go.opentelemetry.io/otel/semconv/v1.25.0" + "go.opentelemetry.io/otel/trace" +) + +// tracer is the tracer used by the package +var tracer = otel.Tracer("amqp091") + +// amqpHeaderCarrier is a carrier for AMQP headers. +type amqpHeaderCarrier Table + +// Get returns the value associated with the passed key. +func (c amqpHeaderCarrier) Get(key string) string { + v, ok := c[key] + if !ok { + return "" + } + s, ok := v.(string) + if ok { + return s + } + return "" +} + +// Set stores the key-value pair. +func (c amqpHeaderCarrier) Set(key, value string) { + c[key] = value +} + +// Keys lists the keys stored in this carrier. +func (c amqpHeaderCarrier) Keys() []string { + var keys []string + for k, v := range c { + if _, ok := v.(string); !ok { + continue + } + keys = append(keys, k) + } + return keys +} + +// ensure amqpHeaderCarrier implements the TextMapCarrier interface +var _ propagation.TextMapCarrier = amqpHeaderCarrier{} + +// keys for conventions in this file +var ( + // settleResponseKey is the key for indicating how the message was settled + settleResponseKey = attribute.Key("messaging.settle.response_type") + // settleMultipleKey indicates whether multiple outstanding messages were settled at once. + settleMultipleKey = attribute.Key("messaging.settle.multiple") + // settleRequeueKey indicates whether the messages were requeued. + settleRequeueKey = attribute.Key("messaging.settle.multiple") + // publishImmediate key indicates whether the AMQP immediate flag was set on the publishing. + publishImmediateKy = attribute.Key("messaging.publish.immediate") + // returnOperation indicates an AMQP 091 return + returnOperation = semconv.MessagingOperationKey.String("return") +) + +// InjectSpan injects the span context into the AMQP headers. +// It returns the input headers with the span headers added. +func injectSpanFromContext(ctx context.Context, headers Table) Table { + carrier := amqpHeaderCarrier(headers) + if carrier == nil { + carrier = amqpHeaderCarrier{} + } + otel.GetTextMapPropagator().Inject(ctx, carrier) + return Table(carrier) +} + +// ExtractSpanContext extracts the span context from the AMQP headers. +func ExtractSpanContext(ctx context.Context, headers Table) context.Context { + carrier := amqpHeaderCarrier(headers) + if carrier == nil { + carrier = amqpHeaderCarrier{} + } + return otel.GetTextMapPropagator().Extract(ctx, carrier) +} + +// extractSpanFromReturn creates a span for a returned message +func extractSpanFromReturn( + ctx context.Context, + ret Return, + options ...trace.SpanStartOption, +) (context.Context, trace.Span) { + spctx := ExtractSpanContext(ctx, ret.Headers) + spanName := fmt.Sprintf("return %s %s", ret.Exchange, ret.RoutingKey) + + return tracer.Start(ctx, spanName, + append(options, + trace.WithLinks(trace.LinkFromContext(spctx, semconv.MessagingMessageID(ret.MessageId))), + trace.WithSpanKind(trace.SpanKindProducer), + trace.WithAttributes( + semconv.MessagingRabbitmqDestinationRoutingKey(ret.RoutingKey), + semconv.MessagingDestinationPublishName(ret.Exchange), + returnOperation, + semconv.MessagingMessageID(ret.MessageId), + semconv.MessagingMessageConversationID(ret.CorrelationId), + semconv.MessagingSystemRabbitmq, + semconv.ErrorTypeKey.String(ret.ReplyText), + // semconv.NetPeerPort(5672 + // semconv.NetPeerIP("localhost") + // semconv.ServerAddress("localhost") + ), + trace.WithNewRoot(), + )..., + + ) +} + +// settleDelivery creates a span for the acking of a delivery +func settleDelivery( + ctx context.Context, + delivery *Delivery, + response DeliveryResponse, + multiple, requeue bool, +) (context.Context, trace.Span) { + return tracer.Start(ctx, + fmt.Sprintf("settle %s %s", delivery.Exchange, delivery.RoutingKey), + trace.WithAttributes( + semconv.MessagingOperationSettle, + settleResponseKey.String(response.Name()), + settleMultipleKey.Bool(multiple), + settleRequeueKey.Bool(requeue), + ), + ) +} + +// extractLinkFromDelivery creates a link for a delivered message +// +// The recommend way to link a consumer to the publisher is with a link, since +// the two operations can be quit far apart in time. If you have a usecase +// where you would like the spans to have a parent child relationship instead, use +// ExtractSpanContext +// +// The consumer span may containe 1 or more messages, which is why we don't +// manufacture the span in its entirety here. +func extractLinkFromDelivery(ctx context.Context, del *Delivery) trace.Link { + spctx := ExtractSpanContext(ctx, del.Headers) + return trace.LinkFromContext(spctx, + semconv.MessagingMessageConversationID(del.CorrelationId), + semconv.MessagingMessageID(del.MessageId), + semconv.MessagingRabbitmqMessageDeliveryTag(int(del.DeliveryTag))) +} + +// spanForDelivery creates a span for the delivered messages +// returns a new context with the span headers and the span. +func spanForDelivery(ctx context.Context, delivery *Delivery, options ...trace.SpanStartOption, ) (context.Context, trace.Span) { + spanName := fmt.Sprintf("consume %s %s", delivery.Exchange, delivery.RoutingKey) + var links []trace.Link + links = append(links, extractLinkFromDelivery(ctx, delivery)) + return tracer.Start( + ctx, + spanName, + append( + options, + trace.WithLinks(links...), + trace.WithSpanKind(trace.SpanKindConsumer), + )..., + ) +} + +// Publish creates a span for a publishing message returns a new context with +// the span headers, the mssage that was being published with span headers +// injected, and a function to be called with the result of the publish +func spanForPublication( + ctx context.Context, + publishing Publishing, + exchange, routinKey string, + immediate bool, +) (context.Context, Publishing, func(err error)) { + spanName := fmt.Sprintf("%s publish", routinKey) + ctx, span := tracer.Start(ctx, spanName, + trace.WithSpanKind(trace.SpanKindProducer), + trace.WithAttributes( + semconv.MessagingRabbitmqDestinationRoutingKey(routinKey), + semconv.MessagingDestinationPublishName(exchange), + semconv.MessagingOperationPublish, + semconv.MessagingMessageID(publishing.MessageId), + semconv.MessagingMessageConversationID(publishing.CorrelationId), + semconv.MessagingSystemRabbitmq, + semconv.MessagingClientIDKey.String(publishing.AppId), + semconv.MessagingMessageBodySize(len(publishing.Body)), + semconv.MessageTypeSent, + ), + ) + headers := injectSpanFromContext(ctx, publishing.Headers) + publishing.Headers = headers + + return ctx, publishing, func(err error) { + if err != nil { + span.RecordError(err) + amqpErr := &Error{} + if errors.As(err, &amqpErr) { + span.SetAttributes( + semconv.ErrorTypeKey.String(amqpErr.Reason), + ) + } + span.SetStatus(codes.Error, err.Error()) + } + span.End() + } +} diff --git a/return.go b/return.go index cdc3875..356a844 100644 --- a/return.go +++ b/return.go @@ -6,59 +6,70 @@ package amqp091 import ( - "time" + "context" + "go.opentelemetry.io/otel/trace" + "time" ) // Return captures a flattened struct of fields returned by the server when a // Publishing is unable to be delivered either due to the `mandatory` flag set // and no route found, or `immediate` flag set and no free consumer. type Return struct { - ReplyCode uint16 // reason - ReplyText string // description - Exchange string // basic.publish exchange - RoutingKey string // basic.publish routing key + ReplyCode uint16 // reason + ReplyText string // description + Exchange string // basic.publish exchange + RoutingKey string // basic.publish routing key - // Properties - ContentType string // MIME content type - ContentEncoding string // MIME content encoding - Headers Table // Application or header exchange table - DeliveryMode uint8 // queue implementation use - non-persistent (1) or persistent (2) - Priority uint8 // queue implementation use - 0 to 9 - CorrelationId string // application use - correlation identifier - ReplyTo string // application use - address to to reply to (ex: RPC) - Expiration string // implementation use - message expiration spec - MessageId string // application use - message identifier - Timestamp time.Time // application use - message timestamp - Type string // application use - message type name - UserId string // application use - creating user id - AppId string // application use - creating application + // Properties + ContentType string // MIME content type + ContentEncoding string // MIME content encoding + Headers Table // Application or header exchange table + DeliveryMode uint8 // queue implementation use - non-persistent (1) or persistent (2) + Priority uint8 // queue implementation use - 0 to 9 + CorrelationId string // application use - correlation identifier + ReplyTo string // application use - address to to reply to (ex: RPC) + Expiration string // implementation use - message expiration spec + MessageId string // application use - message identifier + Timestamp time.Time // application use - message timestamp + Type string // application use - message type name + UserId string // application use - creating user id + AppId string // application use - creating application - Body []byte + Body []byte } func newReturn(msg basicReturn) *Return { - props, body := msg.getContent() + props, body := msg.getContent() - return &Return{ - ReplyCode: msg.ReplyCode, - ReplyText: msg.ReplyText, - Exchange: msg.Exchange, - RoutingKey: msg.RoutingKey, + return &Return{ + ReplyCode: msg.ReplyCode, + ReplyText: msg.ReplyText, + Exchange: msg.Exchange, + RoutingKey: msg.RoutingKey, - Headers: props.Headers, - ContentType: props.ContentType, - ContentEncoding: props.ContentEncoding, - DeliveryMode: props.DeliveryMode, - Priority: props.Priority, - CorrelationId: props.CorrelationId, - ReplyTo: props.ReplyTo, - Expiration: props.Expiration, - MessageId: props.MessageId, - Timestamp: props.Timestamp, - Type: props.Type, - UserId: props.UserId, - AppId: props.AppId, + Headers: props.Headers, + ContentType: props.ContentType, + ContentEncoding: props.ContentEncoding, + DeliveryMode: props.DeliveryMode, + Priority: props.Priority, + CorrelationId: props.CorrelationId, + ReplyTo: props.ReplyTo, + Expiration: props.Expiration, + MessageId: props.MessageId, + Timestamp: props.Timestamp, + Type: props.Type, + UserId: props.UserId, + AppId: props.AppId, - Body: body, - } + Body: body, + } +} + +// Span returns context and a span that for the delivery +// the resulting span is linked to the publication that created it, if it has +// the appropraite headers set. See [context-propagation] for more details +// +// [context-propagation]: https://opentelemetry.io/docs/concepts/context-propagation/ +func (r Return) Span(ctx context.Context, options ...trace.SpanStartOption) (context.Context, trace.Span) { + return extractSpanFromReturn(ctx, r, options...) }