Skip to content

Commit

Permalink
a smidge of polish
Browse files Browse the repository at this point in the history
  • Loading branch information
AndrewWinterman committed Aug 19, 2024
1 parent 1aeb2d0 commit 47aa58b
Show file tree
Hide file tree
Showing 3 changed files with 319 additions and 294 deletions.
241 changes: 122 additions & 119 deletions delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
package amqp091

import (
"context"
"errors"
"fmt"
"time"
"context"
"errors"
"fmt"
"time"

"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace"
)

var errDeliveryNotInitialized = errors.New("delivery not initialized")
Expand All @@ -21,103 +21,106 @@ var errDeliveryNotInitialized = errors.New("delivery not initialized")
//
// Applications can provide mock implementations in tests of Delivery handlers.
type Acknowledger interface {
Ack(tag uint64, multiple bool) error
Nack(tag uint64, multiple, requeue bool) error
Reject(tag uint64, requeue bool) error
Ack(tag uint64, multiple bool) error
Nack(tag uint64, multiple, requeue bool) error
Reject(tag uint64, requeue bool) error
}

// Delivery captures the fields for a previously delivered message resident in
// a queue to be delivered by the server to a consumer from Channel.Consume or
// Channel.Get.
type Delivery struct {
Acknowledger Acknowledger // the channel from which this delivery arrived

Headers Table // Application or header exchange table

// Properties
ContentType string // MIME content type
ContentEncoding string // MIME content encoding
DeliveryMode uint8 // queue implementation use - non-persistent (1) or persistent (2)
Priority uint8 // queue implementation use - 0 to 9
CorrelationId string // application use - correlation identifier
ReplyTo string // application use - address to reply to (ex: RPC)
Expiration string // implementation use - message expiration spec
MessageId string // application use - message identifier
Timestamp time.Time // application use - message timestamp
Type string // application use - message type name
UserId string // application use - creating user - should be authenticated user
AppId string // application use - creating application id

// Valid only with Channel.Consume
ConsumerTag string

// Valid only with Channel.Get
MessageCount uint32

DeliveryTag uint64
Redelivered bool
Exchange string // basic.publish exchange
RoutingKey string // basic.publish routing key

Body []byte
Acknowledger Acknowledger // the channel from which this delivery arrived

Headers Table // Application or header exchange table

// Properties
ContentType string // MIME content type
ContentEncoding string // MIME content encoding
DeliveryMode uint8 // queue implementation use - non-persistent (1) or persistent (2)
Priority uint8 // queue implementation use - 0 to 9
CorrelationId string // application use - correlation identifier
ReplyTo string // application use - address to reply to (ex: RPC)
Expiration string // implementation use - message expiration spec
MessageId string // application use - message identifier
Timestamp time.Time // application use - message timestamp
Type string // application use - message type name
UserId string // application use - creating user - should be authenticated user
AppId string // application use - creating application id

// Valid only with Channel.Consume
ConsumerTag string

// Valid only with Channel.Get
MessageCount uint32

DeliveryTag uint64
Redelivered bool
Exchange string // basic.publish exchange
RoutingKey string // basic.publish routing key

Body []byte
}

// Span returns context and a span that for the delivery
// the resulting span is linked to the publication that created it, if it has
// the appropraite headers set. See [context-propagation] for more details
//
// [context-propagation]: https://opentelemetry.io/docs/concepts/context-propagation/
func (d *Delivery) Span(ctx context.Context, options ...trace.SpanStartOption) (context.Context, trace.Span) {
return spanForDelivery(ctx, d.ConsumerTag, d, options...)
func (d Delivery) Span(
ctx context.Context,
options ...trace.SpanStartOption,
) (context.Context, trace.Span) {
return spanForDelivery(ctx, &d, options...)
}

// Link returns a link for the delivery. The link points to the publication, if
// the appropriate headers are set.
func (d *Delivery) Link(ctx context.Context) trace.Link {
return extractLinkFromDelivery(ctx, d)
func (d Delivery) Link(ctx context.Context) trace.Link {
return extractLinkFromDelivery(ctx, &d)
}

func newDelivery(channel *Channel, msg messageWithContent) *Delivery {
props, body := msg.getContent()

delivery := Delivery{
Acknowledger: channel,

Headers: props.Headers,
ContentType: props.ContentType,
ContentEncoding: props.ContentEncoding,
DeliveryMode: props.DeliveryMode,
Priority: props.Priority,
CorrelationId: props.CorrelationId,
ReplyTo: props.ReplyTo,
Expiration: props.Expiration,
MessageId: props.MessageId,
Timestamp: props.Timestamp,
Type: props.Type,
UserId: props.UserId,
AppId: props.AppId,

Body: body,
}

// Properties for the delivery types
switch m := msg.(type) {
case *basicDeliver:
delivery.ConsumerTag = m.ConsumerTag
delivery.DeliveryTag = m.DeliveryTag
delivery.Redelivered = m.Redelivered
delivery.Exchange = m.Exchange
delivery.RoutingKey = m.RoutingKey

case *basicGetOk:
delivery.MessageCount = m.MessageCount
delivery.DeliveryTag = m.DeliveryTag
delivery.Redelivered = m.Redelivered
delivery.Exchange = m.Exchange
delivery.RoutingKey = m.RoutingKey
}

return &delivery
props, body := msg.getContent()

delivery := Delivery{
Acknowledger: channel,

Headers: props.Headers,
ContentType: props.ContentType,
ContentEncoding: props.ContentEncoding,
DeliveryMode: props.DeliveryMode,
Priority: props.Priority,
CorrelationId: props.CorrelationId,
ReplyTo: props.ReplyTo,
Expiration: props.Expiration,
MessageId: props.MessageId,
Timestamp: props.Timestamp,
Type: props.Type,
UserId: props.UserId,
AppId: props.AppId,

Body: body,
}

// Properties for the delivery types
switch m := msg.(type) {
case *basicDeliver:
delivery.ConsumerTag = m.ConsumerTag
delivery.DeliveryTag = m.DeliveryTag
delivery.Redelivered = m.Redelivered
delivery.Exchange = m.Exchange
delivery.RoutingKey = m.RoutingKey

case *basicGetOk:
delivery.MessageCount = m.MessageCount
delivery.DeliveryTag = m.DeliveryTag
delivery.Redelivered = m.Redelivered
delivery.Exchange = m.Exchange
delivery.RoutingKey = m.RoutingKey
}

return &delivery
}

/*
Expand All @@ -140,10 +143,10 @@ Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every
delivery that is not automatically acknowledged.
*/
func (d Delivery) Ack(multiple bool) error {
if d.Acknowledger == nil {
return errDeliveryNotInitialized
}
return d.Acknowledger.Ack(d.DeliveryTag, multiple)
if d.Acknowledger == nil {
return errDeliveryNotInitialized
}
return d.Acknowledger.Ack(d.DeliveryTag, multiple)
}

/*
Expand All @@ -160,10 +163,10 @@ Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every
delivery that is not automatically acknowledged.
*/
func (d Delivery) Reject(requeue bool) error {
if d.Acknowledger == nil {
return errDeliveryNotInitialized
}
return d.Acknowledger.Reject(d.DeliveryTag, requeue)
if d.Acknowledger == nil {
return errDeliveryNotInitialized
}
return d.Acknowledger.Reject(d.DeliveryTag, requeue)
}

/*
Expand All @@ -185,43 +188,43 @@ Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every
delivery that is not automatically acknowledged.
*/
func (d Delivery) Nack(multiple, requeue bool) error {
if d.Acknowledger == nil {
return errDeliveryNotInitialized
}
return d.Acknowledger.Nack(d.DeliveryTag, multiple, requeue)
if d.Acknowledger == nil {
return errDeliveryNotInitialized
}
return d.Acknowledger.Nack(d.DeliveryTag, multiple, requeue)
}

type DeliveryResponse uint8

const (
Ack DeliveryResponse = iota
Reject
Nack
Ack DeliveryResponse = iota
Reject
Nack
)

func (r DeliveryResponse) Name() string {
switch r {
case Ack:
return "ack"
case Nack:
return "nack"
case Reject:
return "reject"
default:
return "unknown"
}
switch r {
case Ack:
return "ack"
case Nack:
return "nack"
case Reject:
return "reject"
default:
return "unknown"
}
}

func (d *Delivery) Settle(ctx context.Context, response DeliveryResponse, multiple, requeue bool) error {
defer settleDelivery(ctx, d, response, multiple, requeue)
switch response {
case Ack:
return d.Ack(multiple)
case Nack:
return d.Nack(multiple, requeue)
case Reject:
return d.Reject(requeue)
default:
return fmt.Errorf("unknown operation %s", response.Name())
}
func (d Delivery) Settle(ctx context.Context, response DeliveryResponse, multiple, requeue bool) error {
defer settleDelivery(ctx, &d, response, multiple, requeue)
switch response {
case Ack:
return d.Ack(multiple)
case Nack:
return d.Nack(multiple, requeue)
case Reject:
return d.Reject(requeue)
default:
return fmt.Errorf("unknown operation %s", response.Name())
}
}
Loading

0 comments on commit 47aa58b

Please sign in to comment.