Skip to content

Commit

Permalink
Return error when hitting maxMessageSize (#38)
Browse files Browse the repository at this point in the history
* Return error on message size limit

* Set local max message size

* Simplify test

* Set max message bytes to 1MB
  • Loading branch information
helder-junior authored Nov 12, 2024
1 parent 76488af commit eeeb708
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 4 deletions.
2 changes: 1 addition & 1 deletion server/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (a *App) configureEventsForwarder() error {
if err != nil {
return err
}
kafkaSender := sender.NewKafkaSender(k, a.log)
kafkaSender := sender.NewKafkaSender(k, a.log, a.config)
a.Server = NewServer(kafkaSender, a.log)
return nil
}
Expand Down
53 changes: 52 additions & 1 deletion server/app/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ package app_test
import (
"bytes"
"context"
"fmt"
"github.com/spf13/viper"
"strings"
"time"

"github.com/golang/mock/gomock"
Expand All @@ -17,6 +20,21 @@ import (
pb "github.com/topfreegames/protos/eventsgateway/grpc/generated"
)

func initConfig() *viper.Viper {
config = viper.New()
config.SetConfigFile("../config/test.yaml")
config.SetConfigType("yaml")
config.SetEnvPrefix("eventsgateway")
config.AddConfigPath(".")
config.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
config.AutomaticEnv()

if err := config.ReadInConfig(); err != nil {
fmt.Printf("Error loading config file: %s\n", config.ConfigFileUsed())
}
return config
}

var _ = Describe("Client", func() {
var (
s *app.Server
Expand All @@ -25,7 +43,7 @@ var _ = Describe("Client", func() {

BeforeEach(func() {
nowMs = time.Now().UnixNano() / 1000000
sender := sender.NewKafkaSender(mockForwarder, log)
sender := sender.NewKafkaSender(mockForwarder, log, initConfig())
s = app.NewServer(sender, log)
Expect(s).NotTo(BeNil())
})
Expand Down Expand Up @@ -147,5 +165,38 @@ var _ = Describe("Client", func() {
Expect(res).NotTo(BeNil())
Expect(err).NotTo(HaveOccurred())
})
It("should fail send exceeds message size", func() {
msg := "a"
for _ = range 30000 {
msg += "a"
}

ctx := context.Background()
e := &pb.Event{
Id: "someid",
Name: "someName",
Topic: "sv-uploads-sometopic",
Props: map[string]string{
"bigmessage": msg,
},
Timestamp: nowMs,
}

mockForwarder.EXPECT().Produce(gomock.Eq("sv-uploads-sometopic"), gomock.Any()).Do(
func(topic string, aevent []byte) {
r := bytes.NewReader(aevent)
ev, err := avro.DeserializeEvent(r)
Expect(err).NotTo(HaveOccurred())
Expect(ev.Id).To(Equal(e.GetId()))
Expect(ev.Name).To(Equal(e.GetName()))
Expect(ev.ClientTimestamp).To(Equal(e.GetTimestamp()))
Expect(ev.ServerTimestamp).To(BeNumerically("~", nowMs, 10))
})

res, err := s.SendEvent(ctx, e)
Expect(res).To(BeNil())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal("Event size exceeds kafka.producer.maxMessageBytes 30000 bytes. Got 30068 bytes"))
})
})
})
2 changes: 1 addition & 1 deletion server/config/test.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
kafka:
producer:
brokers: kafka:9092
maxMessageBytes: 3000000
maxMessageBytes: 30000
otlp:
enabled: false
prometheus:
Expand Down
14 changes: 13 additions & 1 deletion server/sender/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@ package sender
import (
"bytes"
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/spf13/viper"
avro "github.com/topfreegames/avro/go/eventsgateway/generated"
"github.com/topfreegames/eventsgateway/v4/server/forwarder"
"github.com/topfreegames/eventsgateway/v4/server/logger"
Expand All @@ -25,13 +28,15 @@ import (
type KafkaSender struct {
logger logger.Logger
producer forwarder.Forwarder
config *viper.Viper
}

func NewKafkaSender(
producer forwarder.Forwarder,
logger logger.Logger,
config *viper.Viper,
) *KafkaSender {
k := &KafkaSender{producer: producer, logger: logger}
k := &KafkaSender{producer: producer, logger: logger, config: config}
return k
}

Expand Down Expand Up @@ -68,6 +73,13 @@ func (k *KafkaSender) SendEvent(
event *pb.Event,
) error {
startTime := time.Now()
maxMessageBytes := k.config.GetInt("kafka.producer.maxMessageBytes")

if event.XXX_Size() >= maxMessageBytes {
err := errors.New(fmt.Sprintf("Event size exceeds kafka.producer.maxMessageBytes %d bytes. Got %d bytes", maxMessageBytes, event.XXX_Size()))
k.logger.WithError(err).Error("Failed to send event")
return err
}

l := k.logger.WithFields(map[string]interface{}{
"topic": event.GetTopic(),
Expand Down

0 comments on commit eeeb708

Please sign in to comment.