diff --git a/delivery.go b/delivery.go index 6fa242b..b62825c 100644 --- a/delivery.go +++ b/delivery.go @@ -6,12 +6,12 @@ package amqp091 import ( - "context" - "errors" - "fmt" - "time" + "context" + "errors" + "fmt" + "time" - "go.opentelemetry.io/otel/trace" + "go.opentelemetry.io/otel/trace" ) var errDeliveryNotInitialized = errors.New("delivery not initialized") @@ -21,45 +21,45 @@ var errDeliveryNotInitialized = errors.New("delivery not initialized") // // Applications can provide mock implementations in tests of Delivery handlers. type Acknowledger interface { - Ack(tag uint64, multiple bool) error - Nack(tag uint64, multiple, requeue bool) error - Reject(tag uint64, requeue bool) error + Ack(tag uint64, multiple bool) error + Nack(tag uint64, multiple, requeue bool) error + Reject(tag uint64, requeue bool) error } // Delivery captures the fields for a previously delivered message resident in // a queue to be delivered by the server to a consumer from Channel.Consume or // Channel.Get. type Delivery struct { - Acknowledger Acknowledger // the channel from which this delivery arrived - - Headers Table // Application or header exchange table - - // Properties - ContentType string // MIME content type - ContentEncoding string // MIME content encoding - DeliveryMode uint8 // queue implementation use - non-persistent (1) or persistent (2) - Priority uint8 // queue implementation use - 0 to 9 - CorrelationId string // application use - correlation identifier - ReplyTo string // application use - address to reply to (ex: RPC) - Expiration string // implementation use - message expiration spec - MessageId string // application use - message identifier - Timestamp time.Time // application use - message timestamp - Type string // application use - message type name - UserId string // application use - creating user - should be authenticated user - AppId string // application use - creating application id - - // Valid only with Channel.Consume - ConsumerTag string - - // Valid only with Channel.Get - MessageCount uint32 - - DeliveryTag uint64 - Redelivered bool - Exchange string // basic.publish exchange - RoutingKey string // basic.publish routing key - - Body []byte + Acknowledger Acknowledger // the channel from which this delivery arrived + + Headers Table // Application or header exchange table + + // Properties + ContentType string // MIME content type + ContentEncoding string // MIME content encoding + DeliveryMode uint8 // queue implementation use - non-persistent (1) or persistent (2) + Priority uint8 // queue implementation use - 0 to 9 + CorrelationId string // application use - correlation identifier + ReplyTo string // application use - address to reply to (ex: RPC) + Expiration string // implementation use - message expiration spec + MessageId string // application use - message identifier + Timestamp time.Time // application use - message timestamp + Type string // application use - message type name + UserId string // application use - creating user - should be authenticated user + AppId string // application use - creating application id + + // Valid only with Channel.Consume + ConsumerTag string + + // Valid only with Channel.Get + MessageCount uint32 + + DeliveryTag uint64 + Redelivered bool + Exchange string // basic.publish exchange + RoutingKey string // basic.publish routing key + + Body []byte } // Span returns context and a span that for the delivery @@ -67,57 +67,60 @@ type Delivery struct { // the appropraite headers set. See [context-propagation] for more details // // [context-propagation]: https://opentelemetry.io/docs/concepts/context-propagation/ -func (d *Delivery) Span(ctx context.Context, options ...trace.SpanStartOption) (context.Context, trace.Span) { - return spanForDelivery(ctx, d.ConsumerTag, d, options...) +func (d Delivery) Span( + ctx context.Context, + options ...trace.SpanStartOption, +) (context.Context, trace.Span) { + return spanForDelivery(ctx, &d, options...) } // Link returns a link for the delivery. The link points to the publication, if // the appropriate headers are set. -func (d *Delivery) Link(ctx context.Context) trace.Link { - return extractLinkFromDelivery(ctx, d) +func (d Delivery) Link(ctx context.Context) trace.Link { + return extractLinkFromDelivery(ctx, &d) } func newDelivery(channel *Channel, msg messageWithContent) *Delivery { - props, body := msg.getContent() - - delivery := Delivery{ - Acknowledger: channel, - - Headers: props.Headers, - ContentType: props.ContentType, - ContentEncoding: props.ContentEncoding, - DeliveryMode: props.DeliveryMode, - Priority: props.Priority, - CorrelationId: props.CorrelationId, - ReplyTo: props.ReplyTo, - Expiration: props.Expiration, - MessageId: props.MessageId, - Timestamp: props.Timestamp, - Type: props.Type, - UserId: props.UserId, - AppId: props.AppId, - - Body: body, - } - - // Properties for the delivery types - switch m := msg.(type) { - case *basicDeliver: - delivery.ConsumerTag = m.ConsumerTag - delivery.DeliveryTag = m.DeliveryTag - delivery.Redelivered = m.Redelivered - delivery.Exchange = m.Exchange - delivery.RoutingKey = m.RoutingKey - - case *basicGetOk: - delivery.MessageCount = m.MessageCount - delivery.DeliveryTag = m.DeliveryTag - delivery.Redelivered = m.Redelivered - delivery.Exchange = m.Exchange - delivery.RoutingKey = m.RoutingKey - } - - return &delivery + props, body := msg.getContent() + + delivery := Delivery{ + Acknowledger: channel, + + Headers: props.Headers, + ContentType: props.ContentType, + ContentEncoding: props.ContentEncoding, + DeliveryMode: props.DeliveryMode, + Priority: props.Priority, + CorrelationId: props.CorrelationId, + ReplyTo: props.ReplyTo, + Expiration: props.Expiration, + MessageId: props.MessageId, + Timestamp: props.Timestamp, + Type: props.Type, + UserId: props.UserId, + AppId: props.AppId, + + Body: body, + } + + // Properties for the delivery types + switch m := msg.(type) { + case *basicDeliver: + delivery.ConsumerTag = m.ConsumerTag + delivery.DeliveryTag = m.DeliveryTag + delivery.Redelivered = m.Redelivered + delivery.Exchange = m.Exchange + delivery.RoutingKey = m.RoutingKey + + case *basicGetOk: + delivery.MessageCount = m.MessageCount + delivery.DeliveryTag = m.DeliveryTag + delivery.Redelivered = m.Redelivered + delivery.Exchange = m.Exchange + delivery.RoutingKey = m.RoutingKey + } + + return &delivery } /* @@ -140,10 +143,10 @@ Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every delivery that is not automatically acknowledged. */ func (d Delivery) Ack(multiple bool) error { - if d.Acknowledger == nil { - return errDeliveryNotInitialized - } - return d.Acknowledger.Ack(d.DeliveryTag, multiple) + if d.Acknowledger == nil { + return errDeliveryNotInitialized + } + return d.Acknowledger.Ack(d.DeliveryTag, multiple) } /* @@ -160,10 +163,10 @@ Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every delivery that is not automatically acknowledged. */ func (d Delivery) Reject(requeue bool) error { - if d.Acknowledger == nil { - return errDeliveryNotInitialized - } - return d.Acknowledger.Reject(d.DeliveryTag, requeue) + if d.Acknowledger == nil { + return errDeliveryNotInitialized + } + return d.Acknowledger.Reject(d.DeliveryTag, requeue) } /* @@ -185,43 +188,43 @@ Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every delivery that is not automatically acknowledged. */ func (d Delivery) Nack(multiple, requeue bool) error { - if d.Acknowledger == nil { - return errDeliveryNotInitialized - } - return d.Acknowledger.Nack(d.DeliveryTag, multiple, requeue) + if d.Acknowledger == nil { + return errDeliveryNotInitialized + } + return d.Acknowledger.Nack(d.DeliveryTag, multiple, requeue) } type DeliveryResponse uint8 const ( - Ack DeliveryResponse = iota - Reject - Nack + Ack DeliveryResponse = iota + Reject + Nack ) func (r DeliveryResponse) Name() string { - switch r { - case Ack: - return "ack" - case Nack: - return "nack" - case Reject: - return "reject" - default: - return "unknown" - } + switch r { + case Ack: + return "ack" + case Nack: + return "nack" + case Reject: + return "reject" + default: + return "unknown" + } } -func (d *Delivery) Settle(ctx context.Context, response DeliveryResponse, multiple, requeue bool) error { - defer settleDelivery(ctx, d, response, multiple, requeue) - switch response { - case Ack: - return d.Ack(multiple) - case Nack: - return d.Nack(multiple, requeue) - case Reject: - return d.Reject(requeue) - default: - return fmt.Errorf("unknown operation %s", response.Name()) - } +func (d Delivery) Settle(ctx context.Context, response DeliveryResponse, multiple, requeue bool) error { + defer settleDelivery(ctx, &d, response, multiple, requeue) + switch response { + case Ack: + return d.Ack(multiple) + case Nack: + return d.Nack(multiple, requeue) + case Reject: + return d.Reject(requeue) + default: + return fmt.Errorf("unknown operation %s", response.Name()) + } } diff --git a/opentelemetry.go b/opentelemetry.go index e485102..5d0f51a 100644 --- a/opentelemetry.go +++ b/opentelemetry.go @@ -1,16 +1,16 @@ package amqp091 import ( - "context" - "errors" - "fmt" - - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" - "go.opentelemetry.io/otel/propagation" - semconv "go.opentelemetry.io/otel/semconv/v1.25.0" - "go.opentelemetry.io/otel/trace" + "context" + "errors" + "fmt" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/propagation" + semconv "go.opentelemetry.io/otel/semconv/v1.25.0" + "go.opentelemetry.io/otel/trace" ) // tracer is the tracer used by the package @@ -21,96 +21,118 @@ type amqpHeaderCarrier Table // Get returns the value associated with the passed key. func (c amqpHeaderCarrier) Get(key string) string { - v, ok := c[key] - if !ok { - return "" - } - s, ok := v.(string) - if ok { - return s - } - return "" + v, ok := c[key] + if !ok { + return "" + } + s, ok := v.(string) + if ok { + return s + } + return "" } // Set stores the key-value pair. func (c amqpHeaderCarrier) Set(key, value string) { - c[key] = value + c[key] = value } // Keys lists the keys stored in this carrier. func (c amqpHeaderCarrier) Keys() []string { - keys := []string{} - for k, v := range c { - if _, ok := v.(string); !ok { - continue - } - keys = append(keys, k) - } - return keys + var keys []string + for k, v := range c { + if _, ok := v.(string); !ok { + continue + } + keys = append(keys, k) + } + return keys } // ensure amqpHeaderCarrier implements the TextMapCarrier interface var _ propagation.TextMapCarrier = amqpHeaderCarrier{} +// keys for conventions in this file +var ( + // settleResponseKey is the key for indicating how the message was settled + settleResponseKey = attribute.Key("messaging.settle.response_type") + // settleMultipleKey indicates whether multiple outstanding messages were settled at once. + settleMultipleKey = attribute.Key("messaging.settle.multiple") + // settleRequeueKey indicates whether the messages were requeued. + settleRequeueKey = attribute.Key("messaging.settle.multiple") + // publishImmediate key indicates whether the AMQP immediate flag was set on the publishing. + publishImmediateKy = attribute.Key("messaging.publish.immediate") + // returnOperation indicates an AMQP 091 return + returnOperation = semconv.MessagingOperationKey.String("return") +) + // InjectSpan injects the span context into the AMQP headers. // It returns the input headers with the span headers added. func injectSpanFromContext(ctx context.Context, headers Table) Table { - carrier := amqpHeaderCarrier(headers) - if carrier == nil { - carrier = amqpHeaderCarrier{} - } - otel.GetTextMapPropagator().Inject(ctx, carrier) - return Table(carrier) + carrier := amqpHeaderCarrier(headers) + if carrier == nil { + carrier = amqpHeaderCarrier{} + } + otel.GetTextMapPropagator().Inject(ctx, carrier) + return Table(carrier) } // ExtractSpanContext extracts the span context from the AMQP headers. func ExtractSpanContext(ctx context.Context, headers Table) context.Context { - carrier := amqpHeaderCarrier(headers) - if carrier == nil { - carrier = amqpHeaderCarrier{} - } - return otel.GetTextMapPropagator().Extract(ctx, carrier) + carrier := amqpHeaderCarrier(headers) + if carrier == nil { + carrier = amqpHeaderCarrier{} + } + return otel.GetTextMapPropagator().Extract(ctx, carrier) } // extractSpanFromReturn creates a span for a returned message func extractSpanFromReturn( - ctx context.Context, - ret Return, + ctx context.Context, + ret Return, + options ...trace.SpanStartOption, ) (context.Context, trace.Span) { - spctx := ExtractSpanContext(ctx, ret.Headers) - spanName := fmt.Sprintf("%s return", ret.RoutingKey) - return tracer.Start(ctx, spanName, - trace.WithLinks(trace.LinkFromContext(spctx, semconv.MessagingMessageID(ret.MessageId))), - trace.WithSpanKind(trace.SpanKindProducer), - trace.WithAttributes( - semconv.MessagingRabbitmqDestinationRoutingKey(ret.RoutingKey), - semconv.MessagingDestinationPublishName(ret.Exchange), - semconv.MessagingOperationKey.String("return"), - semconv.MessagingMessageID(ret.MessageId), - semconv.MessagingMessageConversationID(ret.CorrelationId), - semconv.MessagingSystemRabbitmq, - semconv.ErrorTypeKey.String(ret.ReplyText), - // semconv.NetPeerPort(5672 - // semconv.NetPeerIP("localhost") - // semconv.ServerAddress("localhost") - ), - trace.WithNewRoot(), - ) + spctx := ExtractSpanContext(ctx, ret.Headers) + spanName := fmt.Sprintf("return %s %s", ret.Exchange, ret.RoutingKey) + + return tracer.Start(ctx, spanName, + append(options, + trace.WithLinks(trace.LinkFromContext(spctx, semconv.MessagingMessageID(ret.MessageId))), + trace.WithSpanKind(trace.SpanKindProducer), + trace.WithAttributes( + semconv.MessagingRabbitmqDestinationRoutingKey(ret.RoutingKey), + semconv.MessagingDestinationPublishName(ret.Exchange), + returnOperation, + semconv.MessagingMessageID(ret.MessageId), + semconv.MessagingMessageConversationID(ret.CorrelationId), + semconv.MessagingSystemRabbitmq, + semconv.ErrorTypeKey.String(ret.ReplyText), + // semconv.NetPeerPort(5672 + // semconv.NetPeerIP("localhost") + // semconv.ServerAddress("localhost") + ), + trace.WithNewRoot(), + )..., + + ) } // settleDelivery creates a span for the acking of a delivery func settleDelivery( - ctx context.Context, - delivery *Delivery, - response DeliveryResponse, - multiple, requeue bool, + ctx context.Context, + delivery *Delivery, + response DeliveryResponse, + multiple, requeue bool, ) (context.Context, trace.Span) { - return tracer.Start(ctx, - fmt.Sprintf("%s settle", delivery.RoutingKey), - trace.WithAttributes( - attribute.String("messaging.operation.name", response.Name()), - attribute.Bool("multiple", multiple), - attribute.Bool("requeue", requeue))) + return tracer.Start(ctx, + fmt.Sprintf("settle %s %s", delivery.Exchange, delivery.RoutingKey), + trace.WithAttributes( + semconv.MessagingOperationSettle, + settleResponseKey.String(response.Name()), + settleMultipleKey.Bool(multiple), + settleRequeueKey.Bool(requeue), + ), + ) } // extractLinkFromDelivery creates a link for a delivered message @@ -123,79 +145,68 @@ func settleDelivery( // The consumer span may containe 1 or more messages, which is why we don't // manufacture the span in its entirety here. func extractLinkFromDelivery(ctx context.Context, del *Delivery) trace.Link { - spctx := ExtractSpanContext(ctx, del.Headers) - return trace.LinkFromContext(spctx, - semconv.MessagingMessageConversationID(del.CorrelationId), - semconv.MessagingMessageID(del.MessageId), - semconv.MessagingRabbitmqMessageDeliveryTag(int(del.DeliveryTag))) + spctx := ExtractSpanContext(ctx, del.Headers) + return trace.LinkFromContext(spctx, + semconv.MessagingMessageConversationID(del.CorrelationId), + semconv.MessagingMessageID(del.MessageId), + semconv.MessagingRabbitmqMessageDeliveryTag(int(del.DeliveryTag))) } // spanForDelivery creates a span for the delivered messages // returns a new context with the span headers and the span. -func spanForDelivery( - ctx context.Context, - consumerTag string, - delivery *Delivery, - options ...trace.SpanStartOption, -) (context.Context, trace.Span) { - spanName := fmt.Sprintf("%s consume", consumerTag) - links := []trace.Link{} - links = append(links, extractLinkFromDelivery(ctx, delivery)) - return tracer.Start( - ctx, - spanName, - append( - options, - trace.WithLinks(links...), - trace.WithSpanKind(trace.SpanKindConsumer), - )..., - ) +func spanForDelivery(ctx context.Context, delivery *Delivery, options ...trace.SpanStartOption, ) (context.Context, trace.Span) { + spanName := fmt.Sprintf("consume %s %s", delivery.Exchange, delivery.RoutingKey) + var links []trace.Link + links = append(links, extractLinkFromDelivery(ctx, delivery)) + return tracer.Start( + ctx, + spanName, + append( + options, + trace.WithLinks(links...), + trace.WithSpanKind(trace.SpanKindConsumer), + )..., + ) } // Publish creates a span for a publishing message returns a new context with // the span headers, the mssage that was being published with span headers // injected, and a function to be called with the result of the publish func spanForPublication( - ctx context.Context, - publishing Publishing, - exchange, routinKey string, - immediate bool, + ctx context.Context, + publishing Publishing, + exchange, routinKey string, + immediate bool, ) (context.Context, Publishing, func(err error)) { - spanName := fmt.Sprintf("%s publish", routinKey) - ctx, span := tracer.Start(ctx, spanName, - trace.WithSpanKind(trace.SpanKindProducer), - trace.WithAttributes( - semconv.MessagingRabbitmqDestinationRoutingKey(routinKey), - semconv.MessagingDestinationPublishName(exchange), - semconv.MessagingOperationPublish, - semconv.MessagingMessageID(publishing.MessageId), - semconv.MessagingMessageConversationID(publishing.CorrelationId), - semconv.MessagingSystemRabbitmq, - semconv.MessagingClientIDKey.String(publishing.AppId), - semconv.MessagingMessageBodySize(len(publishing.Body)), - semconv.MessageTypeSent, - attribute.Bool("messaging.immediate", immediate), - - // TODO(AWinterman): Add these attributes - // semconv.NetPeerPort(5672) // nolint:gocritic // Why: see to do - // semconv.NetworkPeerAddress() // nolint:gocritic // Why: see to do - // semconv.NetPeerPort() // nolint:gocritic // Why: see to do - ), - ) - headers := injectSpanFromContext(ctx, publishing.Headers) - publishing.Headers = Table(headers) - - return ctx, publishing, func(err error) { - if err != nil { - span.RecordError(err) - amqpErr := &Error{} - if errors.As(err, &amqpErr) { - span.SetAttributes( - semconv.ErrorTypeKey.String(amqpErr.Reason), - ) - } - span.SetStatus(codes.Error, err.Error()) - } - span.End() - } + spanName := fmt.Sprintf("%s publish", routinKey) + ctx, span := tracer.Start(ctx, spanName, + trace.WithSpanKind(trace.SpanKindProducer), + trace.WithAttributes( + semconv.MessagingRabbitmqDestinationRoutingKey(routinKey), + semconv.MessagingDestinationPublishName(exchange), + semconv.MessagingOperationPublish, + semconv.MessagingMessageID(publishing.MessageId), + semconv.MessagingMessageConversationID(publishing.CorrelationId), + semconv.MessagingSystemRabbitmq, + semconv.MessagingClientIDKey.String(publishing.AppId), + semconv.MessagingMessageBodySize(len(publishing.Body)), + semconv.MessageTypeSent, + ), + ) + headers := injectSpanFromContext(ctx, publishing.Headers) + publishing.Headers = headers + + return ctx, publishing, func(err error) { + if err != nil { + span.RecordError(err) + amqpErr := &Error{} + if errors.As(err, &amqpErr) { + span.SetAttributes( + semconv.ErrorTypeKey.String(amqpErr.Reason), + ) + } + span.SetStatus(codes.Error, err.Error()) + } + span.End() + } } diff --git a/return.go b/return.go index cdc3875..356a844 100644 --- a/return.go +++ b/return.go @@ -6,59 +6,70 @@ package amqp091 import ( - "time" + "context" + "go.opentelemetry.io/otel/trace" + "time" ) // Return captures a flattened struct of fields returned by the server when a // Publishing is unable to be delivered either due to the `mandatory` flag set // and no route found, or `immediate` flag set and no free consumer. type Return struct { - ReplyCode uint16 // reason - ReplyText string // description - Exchange string // basic.publish exchange - RoutingKey string // basic.publish routing key + ReplyCode uint16 // reason + ReplyText string // description + Exchange string // basic.publish exchange + RoutingKey string // basic.publish routing key - // Properties - ContentType string // MIME content type - ContentEncoding string // MIME content encoding - Headers Table // Application or header exchange table - DeliveryMode uint8 // queue implementation use - non-persistent (1) or persistent (2) - Priority uint8 // queue implementation use - 0 to 9 - CorrelationId string // application use - correlation identifier - ReplyTo string // application use - address to to reply to (ex: RPC) - Expiration string // implementation use - message expiration spec - MessageId string // application use - message identifier - Timestamp time.Time // application use - message timestamp - Type string // application use - message type name - UserId string // application use - creating user id - AppId string // application use - creating application + // Properties + ContentType string // MIME content type + ContentEncoding string // MIME content encoding + Headers Table // Application or header exchange table + DeliveryMode uint8 // queue implementation use - non-persistent (1) or persistent (2) + Priority uint8 // queue implementation use - 0 to 9 + CorrelationId string // application use - correlation identifier + ReplyTo string // application use - address to to reply to (ex: RPC) + Expiration string // implementation use - message expiration spec + MessageId string // application use - message identifier + Timestamp time.Time // application use - message timestamp + Type string // application use - message type name + UserId string // application use - creating user id + AppId string // application use - creating application - Body []byte + Body []byte } func newReturn(msg basicReturn) *Return { - props, body := msg.getContent() + props, body := msg.getContent() - return &Return{ - ReplyCode: msg.ReplyCode, - ReplyText: msg.ReplyText, - Exchange: msg.Exchange, - RoutingKey: msg.RoutingKey, + return &Return{ + ReplyCode: msg.ReplyCode, + ReplyText: msg.ReplyText, + Exchange: msg.Exchange, + RoutingKey: msg.RoutingKey, - Headers: props.Headers, - ContentType: props.ContentType, - ContentEncoding: props.ContentEncoding, - DeliveryMode: props.DeliveryMode, - Priority: props.Priority, - CorrelationId: props.CorrelationId, - ReplyTo: props.ReplyTo, - Expiration: props.Expiration, - MessageId: props.MessageId, - Timestamp: props.Timestamp, - Type: props.Type, - UserId: props.UserId, - AppId: props.AppId, + Headers: props.Headers, + ContentType: props.ContentType, + ContentEncoding: props.ContentEncoding, + DeliveryMode: props.DeliveryMode, + Priority: props.Priority, + CorrelationId: props.CorrelationId, + ReplyTo: props.ReplyTo, + Expiration: props.Expiration, + MessageId: props.MessageId, + Timestamp: props.Timestamp, + Type: props.Type, + UserId: props.UserId, + AppId: props.AppId, - Body: body, - } + Body: body, + } +} + +// Span returns context and a span that for the delivery +// the resulting span is linked to the publication that created it, if it has +// the appropraite headers set. See [context-propagation] for more details +// +// [context-propagation]: https://opentelemetry.io/docs/concepts/context-propagation/ +func (r Return) Span(ctx context.Context, options ...trace.SpanStartOption) (context.Context, trace.Span) { + return extractSpanFromReturn(ctx, r, options...) }