Skip to content

Commit

Permalink
Improve Server metrics (#34)
Browse files Browse the repository at this point in the history
* Fix default latency and payload bucket sizes

* Refactored metricsReporterInterceptor

* Remove unused metrics. Add KafkaLatencyMetric

* Fix tests

* Fix var -> const
  • Loading branch information
helder-junior authored Oct 30, 2024
1 parent 1814be1 commit 7bbe786
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 101 deletions.
73 changes: 20 additions & 53 deletions server/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ func (a *App) metricsReporterInterceptor(
handler grpc.UnaryHandler,
) (interface{}, error) {
events := []*pb.Event{}
retry := "0"
payloadSize := 0
switch t := req.(type) {
case *pb.Event:
Expand All @@ -187,70 +186,38 @@ func (a *App) metricsReporterInterceptor(
case *pb.SendEventsRequest:
request := req.(*pb.SendEventsRequest)
events = append(events, request.Events...)
retry = fmt.Sprintf("%d", request.Retry)
payloadSize = proto.Size(request)
default:
a.log.WithField("route", info.FullMethod).Infof("Unexpected request type %T", t)
}

topic := events[0].Topic
l := a.log.
WithField("route", info.FullMethod).
WithField("topic", topic)

metrics.APIPayloadSize.WithLabelValues(
info.FullMethod,
topic,
).Observe(float64(payloadSize))
topic).Observe(float64(payloadSize))
metrics.APIIncomingEvents.WithLabelValues(
topic).Add(float64(len(events)))

defer func(startTime time.Time) {
elapsedTime := float64(time.Since(startTime).Nanoseconds() / (1000 * 1000))
for _, e := range events {
metrics.APIResponseTime.WithLabelValues(
info.FullMethod,
e.Topic,
retry,
).Observe(elapsedTime)
}
l.WithField("elapsedTime", elapsedTime).Debug("request processed")
}(time.Now())

reportedFailures := false
startTime := time.Now()
res, err := handler(ctx, req)
responseStatus := "OK"
if err != nil {
l.WithError(err).Error("error processing request")
for _, e := range events {
metrics.APIRequestsFailureCounter.WithLabelValues(
info.FullMethod,
e.Topic,
retry,
"error processing request",
).Inc()
}
reportedFailures = true
return res, err
}
failureIndexes := []int64{}
if _, ok := res.(*pb.SendEventsResponse); ok {
failureIndexes = res.(*pb.SendEventsResponse).FailureIndexes
}
fC := 0
for i, e := range events {
if !reportedFailures && len(failureIndexes) > fC && int64(i) == failureIndexes[fC] {
metrics.APIRequestsFailureCounter.WithLabelValues(
info.FullMethod,
e.Topic,
retry,
"couldn't produce event",
).Inc()
fC++
}
metrics.APIRequestsSuccessCounter.WithLabelValues(
responseStatus = "ERROR"
metrics.APIResponseTime.WithLabelValues(
info.FullMethod,
e.Topic,
retry,
).Inc()
responseStatus,
topic,
).Observe(float64(time.Since(startTime).Milliseconds()))
a.log.
WithField("route", info.FullMethod).
WithField("topic", topic).
WithError(err).Error("error processing request")
return res, err
}
metrics.APIResponseTime.WithLabelValues(
info.FullMethod,
responseStatus,
topic,
).Observe(float64(time.Since(startTime).Milliseconds()))
return res, nil
}

Expand Down
9 changes: 4 additions & 5 deletions server/app/server_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,12 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/spf13/viper"

"strings"
"testing"

"github.com/topfreegames/eventsgateway/v4/server/logger"
"github.com/topfreegames/eventsgateway/v4/server/metrics"
"github.com/topfreegames/eventsgateway/v4/server/mocks"
mockpb "github.com/topfreegames/protos/eventsgateway/grpc/mock"
"strings"
"testing"
)

// GetDefaultConfig returns the configuration at ./config/test.yaml
Expand Down Expand Up @@ -77,7 +76,7 @@ var (
var _ = BeforeEach(func() {
log = &logger.NullLogger{}
config, _ = GetDefaultConfig()

metrics.StartServer(config)
mockCtrl = gomock.NewController(GinkgoT())
mockGRPCServer = mockpb.NewMockGRPCForwarderServer(mockCtrl)
mockForwarder = mocks.NewMockForwarder(mockCtrl)
Expand Down
77 changes: 39 additions & 38 deletions server/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,58 +35,40 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
)

const (
// LabelRoute is the GRPC route the request is reaching
LabelRoute = "route"
// LabelTopic is the Kafka topic the event refers to
LabelTopic = "topic"
// LabelStatus is the status of the request. OK if success or ERROR if fail
LabelStatus = "status"
)

var (
// APIResponseTime summary, observes the API response time as perceived by the server
APIResponseTime *prometheus.HistogramVec

// APIPayloadSize summary, observes the payload size of requests arriving at the server
APIPayloadSize *prometheus.HistogramVec

// APIRequestsSuccessCounter counter
APIRequestsSuccessCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "eventsgateway",
Subsystem: "api",
Name: "requests_success_counter",
Help: "A counter of succeeded api requests",
},
[]string{"route", "topic", "retry"},
)
// KafkaRequestLatency summary, observes that kafka request latency per topic and status
KafkaRequestLatency *prometheus.HistogramVec

// APIRequestsFailureCounter counter
APIRequestsFailureCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "eventsgateway",
Subsystem: "api",
Name: "requests_failure_counter",
Help: "A counter of failed api requests",
},
[]string{"route", "topic", "retry", "reason"},
)

APITopicsSubmission = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "eventsgateway",
Subsystem: "api",
Name: "topics_submission_total",
Help: "Topic submissions sent to kafka",
},
[]string{"topic", "success"},
)
// APIIncomingEvents count of all events the API is receiving (unpacking the array of input events)
APIIncomingEvents *prometheus.CounterVec
)

func defaultLatencyBuckets(config *viper.Viper) []float64 {
// in milliseconds
const configKey = "prometheus.buckets.latency"
config.SetDefault(configKey, []float64{3, 5, 10, 50, 100, 300, 500, 1000, 5000})

config.SetDefault(configKey, []float64{10, 30, 50, 100, 500})
return config.Get(configKey).([]float64)
}

func defaultPayloadSizeBuckets(config *viper.Viper) []float64 {
// in bytes
configKey := "prometheus.buckets.payloadSize"
config.SetDefault(configKey, []float64{100, 1000, 5000, 10000, 50000, 100000, 500000, 1000000, 5000000})

config.SetDefault(configKey, []float64{10000, 50000, 100000, 500000, 1000000, 5000000})
return config.Get(configKey).([]float64)
}

Expand Down Expand Up @@ -119,7 +101,7 @@ func StartServer(config *viper.Viper) {
Help: "payload size of API routes, in bytes",
Buckets: defaultPayloadSizeBuckets(config),
},
[]string{"route", "topic"},
[]string{LabelTopic},
)

APIResponseTime = prometheus.NewHistogramVec(
Expand All @@ -130,15 +112,34 @@ func StartServer(config *viper.Viper) {
Help: "the response time in ms of api routes",
Buckets: defaultLatencyBuckets(config),
},
[]string{"route", "topic", "retry"},
[]string{LabelRoute, LabelStatus, LabelTopic},
)

APIIncomingEvents = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "eventsgateway",
Subsystem: "api",
Name: "incoming_events",
Help: "A counter of succeeded api requests",
},
[]string{LabelTopic},
)
KafkaRequestLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "eventsgateway",
Subsystem: "kafka",
Name: "response_time_ms",
Help: "the response time in ms of Kafka",
Buckets: defaultLatencyBuckets(config),
},
[]string{LabelStatus, LabelTopic},
)

collectors := []prometheus.Collector{
APIResponseTime,
APIPayloadSize,
APIRequestsFailureCounter,
APIRequestsSuccessCounter,
APITopicsSubmission,
APIIncomingEvents,
KafkaRequestLatency,
}

err := RegisterMetrics(collectors)
Expand Down
12 changes: 7 additions & 5 deletions server/sender/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ func (k *KafkaSender) SendEvent(
ctx context.Context,
event *pb.Event,
) error {
startTime := time.Now()

l := k.logger.WithFields(map[string]interface{}{
"topic": event.GetTopic(),
"event": event,
Expand Down Expand Up @@ -103,16 +105,16 @@ func (k *KafkaSender) SendEvent(
}

topic := event.GetTopic()

partition, offset, err := k.producer.Produce(ctx, topic, buf.Bytes())

kafkaStatus := "OK"
if err != nil {
l.WithError(err).
Error("error producing event to kafka")
metrics.APITopicsSubmission.WithLabelValues(topic, "false").Inc()
kafkaStatus = "ERROR"
l.WithError(err).Error("error producing event to kafka")
metrics.KafkaRequestLatency.WithLabelValues(kafkaStatus, topic).Observe(float64(time.Since(startTime).Milliseconds()))
return err
}
metrics.APITopicsSubmission.WithLabelValues(topic, "true").Inc()
metrics.KafkaRequestLatency.WithLabelValues(kafkaStatus, topic).Observe(float64(time.Since(startTime).Milliseconds()))
l.WithFields(map[string]interface{}{
"partition": partition,
"offset": offset,
Expand Down

0 comments on commit 7bbe786

Please sign in to comment.