Skip to content

Commit

Permalink
Added tracing (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
pmorelli92 authored Oct 1, 2023
1 parent 9e50d07 commit 756e4cd
Show file tree
Hide file tree
Showing 9 changed files with 152 additions and 16 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ Bunnify is a library for publishing and consuming events for AMQP.

**Built-in event metadata handling:** The library automatically handles event metadata, including correlation IDs and other important details.

**Minimal dependencies:** The intention of the library is to avoid being a vector of attack due to lots of unneeded dependencies. I will always try to curate the dependencies and I compromise only to use:
**Tracing out of the box**: Automatically injects and extracts traces when publishing and consuming. Minimal setup is required and shown on the tracer test.

**Minimal dependencies:** The intention of the library is to avoid being a vector of attack due to lots of unneeded dependencies. I will always try to triple check the dependencies and use the least quantity of libraries to achieve the functionality required.

- `github.com/rabbitmq/amqp091-go`: Handles the connection with AMQP protocol.
- `github.com/google/uuid`: Generates UUID for events ID and correlation ID.
- `go.uber.org/goleak`: Used on tests to verify that there are no leaks of routines on the handling of channels.
- `go.opentelemetry.io/otel`: Handles the injection and extraction of the traces on the events.

## Motivation

Expand Down
8 changes: 4 additions & 4 deletions bunnify/consumableEvent.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ type DeliveryInfo struct {
// The type parameter T specifies the type of the event's payload.
type ConsumableEvent[T any] struct {
Metadata
DeliveryInfo
Payload T
DeliveryInfo DeliveryInfo
Payload T
}

// unmarshalEvent is used internally to unmarshal a PublishableEvent
// this way the payload ends up being a json.RawMessage instead of map[string]interface{}
// so that later the json.RawMessage can be unmarshal to ConsumableEvent[T].Payload.
type unmarshalEvent struct {
Metadata
DeliveryInfo
Payload json.RawMessage `json:"payload"`
DeliveryInfo DeliveryInfo
Payload json.RawMessage `json:"payload"`
}
5 changes: 2 additions & 3 deletions bunnify/consumer.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package bunnify

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -163,12 +162,12 @@ func (c Consumer) Consume() error {

uevt := unmarshalEvent{DeliveryInfo: deliveryInfo}
if err := json.Unmarshal(delivery.Body, &uevt); err != nil {
fmt.Println(err)
_ = delivery.Nack(false, false)
continue
}

if err := handler(context.TODO(), uevt); err != nil {
tracingCtx := extractToContext(delivery.Headers)
if err := handler(tracingCtx, uevt); err != nil {
notifyEventHandlerFailed(c.options.notificationCh, deliveryInfo.RoutingKey, err)
_ = delivery.Nack(false, false)
continue
Expand Down
7 changes: 5 additions & 2 deletions bunnify/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,14 @@ func (p *Publisher) Publish(
return fmt.Errorf("could not marshal event: %w", err)
}

return p.inUseChannel.PublishWithContext(ctx, exchange, routingKey, true, false, amqp.Publishing{
publishing := amqp.Publishing{
ContentEncoding: "application/json",
CorrelationId: event.CorrelationID,
MessageId: event.ID,
Timestamp: event.Timestamp,
Body: b,
})
Headers: injectToHeaders(ctx),
}

return p.inUseChannel.PublishWithContext(ctx, exchange, routingKey, true, false, publishing)
}
34 changes: 34 additions & 0 deletions bunnify/tracing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package bunnify

import (
"context"

amqp "github.com/rabbitmq/amqp091-go"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
)

// inject the span context to amqp table
func injectToHeaders(ctx context.Context) amqp.Table {
carrier := propagation.MapCarrier{}
otel.GetTextMapPropagator().Inject(ctx, carrier)

header := amqp.Table{}
for k, v := range carrier {
header[k] = v
}
return header
}

// extract the amqp table to a span context
func extractToContext(headers amqp.Table) context.Context {
carrier := propagation.MapCarrier{}
for k, v := range headers {
value, ok := v.(string)
if ok {
carrier[k] = value
}
}

return otel.GetTextMapPropagator().Extract(context.TODO(), carrier)
}
10 changes: 10 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,15 @@ go 1.20
require (
github.com/google/uuid v1.3.1
github.com/rabbitmq/amqp091-go v1.8.1
go.opentelemetry.io/otel v1.19.0
go.opentelemetry.io/otel/sdk v1.19.0
go.opentelemetry.io/otel/trace v1.19.0
go.uber.org/goleak v1.2.1
)

require (
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
go.opentelemetry.io/otel/metric v1.19.0 // indirect
golang.org/x/sys v0.12.0 // indirect
)
22 changes: 17 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,26 +1,38 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ=
github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4=
github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rabbitmq/amqp091-go v1.8.0 h1:GBFy5PpLQ5jSVVSYv8ecHGqeX7UTLYR4ItQbDCss9MM=
github.com/rabbitmq/amqp091-go v1.8.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc=
github.com/rabbitmq/amqp091-go v1.8.1 h1:RejT1SBUim5doqcL6s7iN6SBmsQqyTgXb1xMlH0h1hA=
github.com/rabbitmq/amqp091-go v1.8.1/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
go.opentelemetry.io/otel v1.19.0 h1:MuS/TNf4/j4IXsZuJegVzI1cwut7Qc00344rgH7p8bs=
go.opentelemetry.io/otel v1.19.0/go.mod h1:i0QyjOq3UPoTzff0PJB2N66fb4S0+rSbSB15/oyH9fY=
go.opentelemetry.io/otel/metric v1.19.0 h1:aTzpGtV0ar9wlV4Sna9sdJyII5jTVJEvKETPiOKwvpE=
go.opentelemetry.io/otel/metric v1.19.0/go.mod h1:L5rUsV9kM1IxCj1MmSdS+JQAcVm319EUrDVLrt7jqt8=
go.opentelemetry.io/otel/sdk v1.19.0 h1:6USY6zH+L8uMH8L3t1enZPR3WFEmSTADlqldyHtJi3o=
go.opentelemetry.io/otel/sdk v1.19.0/go.mod h1:NedEbbS4w3C6zElbLdPJKOpJQOrGUJ+GfzpjUvI0v1A=
go.opentelemetry.io/otel/trace v1.19.0 h1:DFVQmlVbfVeOuBRrwdtaehRrWiL1JoVs9CPIQ1Dzxpg=
go.opentelemetry.io/otel/trace v1.19.0/go.mod h1:mfaSyvGyEJEI0nyV2I4qhNQnbBOUUmYZpYojqMnX2vo=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Expand Down
6 changes: 5 additions & 1 deletion tests/consumer_publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@ func TestConsumerPublisher(t *testing.T) {
ID: orderCreatedID,
})

err := publisher.Publish(context.TODO(), exchangeName, routingKey, eventToPublish)
err := publisher.Publish(
context.TODO(),
exchangeName,
routingKey,
eventToPublish)
if err != nil {
t.Fatal(err)
}
Expand Down
71 changes: 71 additions & 0 deletions tests/consumer_publish_tracer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package tests

import (
"context"
"testing"
"time"

"github.com/google/uuid"
"github.com/pmorelli92/bunnify/bunnify"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
"go.uber.org/goleak"
)

func TestConsumerPublisherTracing(t *testing.T) {
// Setup tracing
otel.SetTracerProvider(tracesdk.NewTracerProvider())
otel.SetTextMapPropagator(propagation.TraceContext{})

// Setup amqp
queueName := uuid.NewString()
exchangeName := uuid.NewString()
routingKey := uuid.NewString()

connection := bunnify.NewConnection()
connection.Start()

// Exercise consuming
var actualTraceID trace.TraceID
eventHandler := func(ctx context.Context, _ bunnify.ConsumableEvent[any]) error {
actualTraceID = trace.SpanFromContext(ctx).SpanContext().TraceID()
return nil
}

consumer := connection.NewConsumer(
queueName,
bunnify.WithBindingToExchange(exchangeName),
bunnify.WithHandler(routingKey, eventHandler))
if err := consumer.Consume(); err != nil {
t.Fatal(err)
}

// Exercise publishing
publisher := connection.NewPublisher()
publishingContext, _ := otel.Tracer("amqp").Start(context.Background(), "publish-test")

err := publisher.Publish(
publishingContext,
exchangeName,
routingKey,
bunnify.NewPublishableEvent(struct{}{}))
if err != nil {
t.Fatal(err)
}

time.Sleep(50 * time.Millisecond)

if err := connection.Close(); err != nil {
t.Fatal(err)
}

// Assert
publishingTraceID := trace.SpanFromContext(publishingContext).SpanContext().TraceID()
if actualTraceID != publishingTraceID {
t.Fatalf("expected traceID %s, got %s", publishingTraceID, actualTraceID)
}

goleak.VerifyNone(t)
}

0 comments on commit 756e4cd

Please sign in to comment.