From 144c5f27fb73c2ce04c24f3c30e07b536de35c47 Mon Sep 17 00:00:00 2001 From: Jorge Bay Date: Sat, 21 Jan 2023 17:57:25 +0100 Subject: [PATCH 1/9] Getting started with binary producer --- consumer.go | 2 +- internal/backoff.go | 39 +++ internal/client_consumer_test.go | 4 +- internal/client_test.go | 2 +- internal/connection.go | 333 ++++++++++++++++++++++ internal/models.go | 11 +- internal/serialization/producer/models.go | 109 +++++++ internal/serialization/serialization.go | 2 +- 8 files changed, 492 insertions(+), 10 deletions(-) create mode 100644 internal/backoff.go create mode 100644 internal/connection.go create mode 100644 internal/serialization/producer/models.go diff --git a/consumer.go b/consumer.go index bc724e3..fd4f4a8 100644 --- a/consumer.go +++ b/consumer.go @@ -4,9 +4,9 @@ import ( "fmt" "os" + "github.com/google/uuid" . "github.com/polarstreams/go-client/internal" . "github.com/polarstreams/go-client/types" - "github.com/google/uuid" ) // Represents a PolarStreams client that reads records from a cluster. diff --git a/internal/backoff.go b/internal/backoff.go new file mode 100644 index 0000000..35ed7c5 --- /dev/null +++ b/internal/backoff.go @@ -0,0 +1,39 @@ +package internal + +import ( + "math" + "time" +) + +const ( + baseReconnectionDelayMs = 20 + maxReconnectionDelayMs = 30_000 +) + +// Represents a default reconnection strategy +type exponentialBackoff struct { + index int +} + +func newExponentialBackoff() *exponentialBackoff { + return &exponentialBackoff{} +} + +func (p *exponentialBackoff) reset() { + p.index = 0 +} + +// Returns an exponential delay +func (p *exponentialBackoff) next() time.Duration { + p.index++ + + delayMs := maxReconnectionDelayMs + if p.index < 53 { + delayMs = int(math.Pow(2, float64(p.index))) * baseReconnectionDelayMs + if delayMs > maxReconnectionDelayMs { + delayMs = maxReconnectionDelayMs + } + } + + return time.Duration(delayMs) * time.Millisecond +} diff --git a/internal/client_consumer_test.go b/internal/client_consumer_test.go index 9c7262c..efcb684 100644 --- a/internal/client_consumer_test.go +++ b/internal/client_consumer_test.go @@ -8,10 +8,10 @@ import ( "net/http/httptest" "time" - "github.com/polarstreams/go-client/internal/serialization" - . "github.com/polarstreams/go-client/types" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "github.com/polarstreams/go-client/internal/serialization" + . "github.com/polarstreams/go-client/types" ) var _ = Describe("Client", func() { diff --git a/internal/client_test.go b/internal/client_test.go index 61fb62d..2fad5f1 100644 --- a/internal/client_test.go +++ b/internal/client_test.go @@ -13,9 +13,9 @@ import ( "testing" "time" - "github.com/polarstreams/go-client/types" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "github.com/polarstreams/go-client/types" "golang.org/x/net/http2" "golang.org/x/net/http2/h2c" ) diff --git a/internal/connection.go b/internal/connection.go new file mode 100644 index 0000000..b3c460b --- /dev/null +++ b/internal/connection.go @@ -0,0 +1,333 @@ +package internal + +import ( + "bytes" + "encoding/binary" + "fmt" + "io" + "net" + "sync" + "sync/atomic" + "time" + + . "github.com/polarstreams/go-client/internal/serialization/producer" + . "github.com/polarstreams/go-client/types" +) + +const maxStreamIds = 1024 + +// Represents TCP connection to a producer +type connection struct { + conn net.Conn + disconnectHandler disconnectHandler + closeOnce sync.Once + handlers sync.Map + streamIds chan StreamId + requests chan BinaryRequest + logger Logger + flushThreshold int +} + +func newConnection(address string, h disconnectHandler, flushThreshold int, logger Logger) (*connection, error) { + conn, err := net.Dial("tcp", address) + if err != nil { + return nil, err + } + + w := bytes.NewBuffer(make([]byte, HeaderSize)) + if err := WriteHeader(w, &BinaryHeader{ + Version: 1, + StreamId: 0, + Op: StartupOp, + }); err != nil { + _ = conn.Close() + return nil, err + } + + _, err = conn.Write(w.Bytes()) + if err != nil { + _ = conn.Close() + return nil, err + } + + c := &connection{ + conn: conn, + disconnectHandler: h, + closeOnce: sync.Once{}, + handlers: sync.Map{}, + logger: logger, + streamIds: make(chan StreamId, maxStreamIds), + flushThreshold: flushThreshold, + requests: make(chan BinaryRequest, 512), + } + + // Reserve StreamId(0) for the Startup message + for i := StreamId(1); i < maxStreamIds; i++ { + c.streamIds <- i + } + + go c.receiveResponses() + + response := make(chan BinaryResponse, 1) + c.handlers.Store(StreamId(0), func(r BinaryResponse) { + response <- r + }) + + r := <-response + if r.Op() != ReadyOp { + _ = conn.Close() + if r.Op() == ErrorOp { + return nil, r.(*ErrorResponse).ToError() + } + return nil, fmt.Errorf("Invalid response from server, expected READY, obtained: %d", r.Op()) + } + + go c.sendRequests() + + return c, nil +} + +func (c *connection) close() { + c.closeOnce.Do(func() { + _ = c.conn.Close() + c.disconnectHandler.OnConnectionClose(c) + }) +} + +func (c *connection) receiveResponses() { + header := &BinaryHeader{} + headerBuffer := make([]byte, HeaderSize) + headerReader := bytes.NewReader(headerBuffer) + bodyBuffer := make([]byte, ResponseBodyMaxLength) + + for { + _, err := io.ReadFull(c.conn, headerBuffer) + if err != nil { + c.logger.Debug("Error reading from binary server: %s", err) + break + } + headerReader.Reset(headerBuffer) + err = binary.Read(headerReader, Endianness, &header) + if err != nil { + c.logger.Debug("Error reading header from binary server: %s", err) + break + } + + h, loaded := c.handlers.LoadAndDelete(header.StreamId) + if !loaded { + panic(fmt.Sprintf("No handler for stream id %d", header.StreamId)) + } + handler := h.(streamHandler) + var response BinaryResponse + if header.BodyLength == 0 { + response = NewEmptyResponse(header.Op) + } else if header.Op == ErrorOp { + buf := bodyBuffer[:header.BodyLength] + _, err := io.ReadFull(c.conn, buf) + if err != nil { + c.logger.Debug("Error reading body from binary server: %s", err) + break + } + response = &ErrorResponse{ + Code: ErrorCode(buf[0]), + Message: string(buf[1:]), + } + } + + handler(response) + } + c.close() +} + +func totalRequestSize(r BinaryRequest) int { + return r.BodyLength() + HeaderSize +} + +func (c *connection) sendRequests() { + w := bytes.NewBuffer(make([]byte, c.flushThreshold)) + + shouldExit := false + var item BinaryRequest + for !shouldExit { + w.Reset() + groupSize := 0 + group := make([]BinaryRequest, 0) + canAddNext := true + + if item == nil { + // Block for the first item + var ok bool + item, ok = <-c.requests + if !ok { + break + } + } + + group = append(group, item) + groupSize += totalRequestSize(item) + item = nil + + // Coalesce requests w/ Nagle disabled + for canAddNext && !shouldExit { + select { + case request, ok := <-c.requests: + if !ok { + shouldExit = true + break + } + responseSize := totalRequestSize(request) + if responseSize+w.Len() > c.flushThreshold { + canAddNext = false + item = request + break + } + group = append(group, request) + groupSize += responseSize + + default: + canAddNext = false + } + } + + for _, request := range group { + if err := request.Marshal(w); err != nil { + c.logger.Warn("There was an error while marshaling a request, closing connection: %s", err) + shouldExit = true + break + } + } + + if w.Len() > 0 { + if _, err := c.conn.Write(w.Bytes()); err != nil { + c.logger.Warn("There was an error while writing to a producer server, closing connection: %s", err) + break + } + } + } + c.close() +} + +type connectionSet map[*connection]bool + +type disconnectHandler interface { + OnConnectionClose(c *connection) +} + +type streamHandler func(r BinaryResponse) + +// Represents a group of connections to a single producer +type producerConnectionPool struct { + address string + length int // The amount of expected connections + connections atomic.Value // Copy-on-write connections + mu sync.Mutex + isConnecting int32 + isClosed int32 + logger Logger + connectionFlushThreshold int +} + +func newProducerConnectionPool(length int, connectionFlushThreshold int, logger Logger) *producerConnectionPool { + p := &producerConnectionPool{ + length: length, + connections: atomic.Value{}, + mu: sync.Mutex{}, + isConnecting: 0, + isClosed: 0, + logger: logger, + connectionFlushThreshold: connectionFlushThreshold, + } + + go p.startConnecting() + return p +} + +func (p *producerConnectionPool) Close() { + atomic.StoreInt32(&p.isClosed, 1) + p.mu.Lock() + defer p.mu.Unlock() + + pool := p.getConnections() + for c := range pool { + c.close() + } +} + +func (p *producerConnectionPool) OnConnectionClose(c *connection) { + // Use a different go routine as we might be holding the lock + go func() { + p.mu.Lock() + defer p.mu.Unlock() + if p.isPoolClosed() { + p.addOrDelete(c, true) + } + }() +} + +// Determines whether the pool is closed, we should hold the lock in case the check is important +func (p *producerConnectionPool) isPoolClosed() bool { + return atomic.LoadInt32(&p.isClosed) == 1 +} + +func (p *producerConnectionPool) startConnecting() { + if !atomic.CompareAndSwapInt32(&p.isConnecting, 0, 1) { + // It's already connecting + return + } + + defer atomic.StoreInt32(&p.isConnecting, 0) + + backoff := newExponentialBackoff() + existing := p.getConnections() + for len(existing) < p.length && p.isPoolClosed() { + c, err := newConnection(p.address, p, p.connectionFlushThreshold, p.logger) + if err != nil { + if p.isPoolClosed() { + return + } + + time.Sleep(backoff.next()) + continue + } + + backoff.reset() + p.mu.Lock() + + if p.isPoolClosed() { + p.mu.Unlock() + c.close() + return + } + + existing = p.addOrDelete(c, false) + p.mu.Unlock() + } +} + +func (p *producerConnectionPool) getConnections() connectionSet { + value := p.connections.Load() + if value == nil { + return nil + } + return value.(connectionSet) +} + +// Adds or deletes a connection, callers MUST hold the lock +func (p *producerConnectionPool) addOrDelete(c *connection, deleteConnection bool) connectionSet { + existingMap := p.getConnections() + + // Shallow copy existing + newMap := make(connectionSet, len(existingMap)+1) + for k, v := range existingMap { + newMap[k] = v + } + + if deleteConnection { + delete(newMap, c) + } else { + newMap[c] = true + } + + p.connections.Store(newMap) + return newMap +} diff --git a/internal/models.go b/internal/models.go index 717e119..2ba91e2 100644 --- a/internal/models.go +++ b/internal/models.go @@ -5,11 +5,12 @@ import "fmt" const contentType = "application/json" type Topology struct { - BaseName string `json:"baseName,omitempty"` // When defined, it represents the base name to build the broker names, e.g. "polar-" - Length int `json:"length"` // The ring size - BrokerNames []string `json:"names,omitempty"` - ProducerPort int `json:"producerPort"` - ConsumerPort int `json:"consumerPort"` + BaseName string `json:"baseName,omitempty"` // When defined, it represents the base name to build the broker names, e.g. "polar-" + Length int `json:"length"` // The ring size + BrokerNames []string `json:"names,omitempty"` + ProducerPort int `json:"producerPort"` + ProducerBinaryPort int `json:"producerBinaryPort"` + ConsumerPort int `json:"consumerPort"` } func (t *Topology) ProducerUrl(topic string, ordinal int, partitionKey string) string { diff --git a/internal/serialization/producer/models.go b/internal/serialization/producer/models.go new file mode 100644 index 0000000..9eea46c --- /dev/null +++ b/internal/serialization/producer/models.go @@ -0,0 +1,109 @@ +package producer + +import ( + "bytes" + "encoding/binary" + "fmt" + "hash/crc32" +) + +type OpCode uint8 +type StreamId uint16 +type Flags uint8 +type ErrorCode uint8 + +// The only responses with body are errors, leave 511 for the error message +const ResponseBodyMaxLength = 512 + +var Endianness = binary.BigEndian + +// Operation codes. +// Use fixed numbers (not iota) to make it harder to break the protocol by moving stuff around. +const ( + StartupOp OpCode = 1 + ReadyOp OpCode = 2 + ErrorOp OpCode = 3 + ProduceOp OpCode = 4 + ProduceResponseOp OpCode = 5 + HeartbeatOp OpCode = 6 +) + +// Flags. +// Use fixed numbers (not iota) to make it harder to break the protocol by moving stuff around. +const ( + WithTimestamp Flags = 0b00000001 +) + +const ( + ServerError ErrorCode = 0 + RoutingError ErrorCode = 1 + LeaderNotFoundError ErrorCode = 2 +) + +// Header for producer messages. Order of fields defines the serialization format. +type BinaryHeader struct { + Version uint8 + Flags Flags + StreamId StreamId + Op OpCode + BodyLength uint32 + Crc uint32 +} + +var HeaderSize = binarySize(BinaryHeader{}) + +type BinaryRequest interface { + Marshal(w *bytes.Buffer) error + BodyLength() int +} + +type BinaryResponse interface { + Op() OpCode +} + +// Represents a response without body +func NewEmptyResponse(op OpCode) BinaryResponse { + return &emptyResponse{op} +} + +type emptyResponse struct { + op OpCode +} + +func (r *emptyResponse) Op() OpCode { + return r.op +} + +type ErrorResponse struct { + Code ErrorCode + Message string +} + +func (r *ErrorResponse) Op() OpCode { + return ErrorOp +} + +func (r *ErrorResponse) ToError() error { + return fmt.Errorf("Error response %d: %s", r.Code, r.Message) +} + +func binarySize(v interface{}) int { + size := binary.Size(v) + if size <= 0 { + panic(fmt.Sprintf("Size of type %v could not be determined", v)) + } + return size +} + +func WriteHeader(w *bytes.Buffer, header *BinaryHeader) error { + if err := binary.Write(w, Endianness, header); err != nil { + return err + } + + const crcByteSize = 4 + buf := w.Bytes() + headerBuf := buf[len(buf)-HeaderSize:] + crc := crc32.ChecksumIEEE(headerBuf[:len(headerBuf)-crcByteSize]) + Endianness.PutUint32(headerBuf[len(headerBuf)-crcByteSize:], crc) + return nil +} diff --git a/internal/serialization/serialization.go b/internal/serialization/serialization.go index 427a4e1..accb21b 100644 --- a/internal/serialization/serialization.go +++ b/internal/serialization/serialization.go @@ -7,8 +7,8 @@ import ( "net/http" "time" - . "github.com/polarstreams/go-client/types" "github.com/klauspost/compress/zstd" + . "github.com/polarstreams/go-client/types" ) var endianness = binary.BigEndian From e6ad2cdb70c02aa53420cace2667dad9c98cc402 Mon Sep 17 00:00:00 2001 From: Jorge Bay Date: Sun, 29 Jan 2023 18:33:56 +0100 Subject: [PATCH 2/9] Add connection implementation --- internal/connection.go | 81 ++++++++++++++++--- internal/connection_test.go | 96 +++++++++++++++++++++++ internal/serialization/producer/models.go | 36 +++++++++ internal/test/fakes/connection.go | 43 ++++++++++ 4 files changed, 244 insertions(+), 12 deletions(-) create mode 100644 internal/connection_test.go create mode 100644 internal/test/fakes/connection.go diff --git a/internal/connection.go b/internal/connection.go index b3c460b..25d3975 100644 --- a/internal/connection.go +++ b/internal/connection.go @@ -58,7 +58,7 @@ func newConnection(address string, h disconnectHandler, flushThreshold int, logg logger: logger, streamIds: make(chan StreamId, maxStreamIds), flushThreshold: flushThreshold, - requests: make(chan BinaryRequest, 512), + requests: make(chan BinaryRequest, 128), } // Reserve StreamId(0) for the Startup message @@ -87,11 +87,48 @@ func newConnection(address string, h disconnectHandler, flushThreshold int, logg return c, nil } -func (c *connection) close() { +// Creates the response handler, appends the request and waits for the response +func (c *connection) Send(req BinaryRequest) (resp BinaryResponse) { + defer func() { + if r := recover(); r != nil { + resp = NewClientErrorResponse("Request could not be sent: connection closed") + } + }() + response := make(chan BinaryResponse, 1) + streamId := <- c.streamIds + c.handlers.Store(streamId, func(r BinaryResponse) { + response <- r + }) + req.SetStreamId(streamId) + + // Append the request, it might panic when requests channel is closed + c.requests <- req + // Wait for the response + resp = <-response + return resp +} + +func (c *connection) Close() { c.closeOnce.Do(func() { + close(c.requests) _ = c.conn.Close() c.disconnectHandler.OnConnectionClose(c) }) + + toDelete := make([]StreamId, 0) + c.handlers.Range(func (key, value interface{}) bool { + toDelete = append(toDelete, key.(StreamId)) + return true + }) + + for _, streamId := range toDelete { + h, loaded := c.handlers.LoadAndDelete(streamId) + if !loaded { + continue + } + handler := h.(func(BinaryResponse)) + handler(NewClientErrorResponse("Request could not be sent: connection closed")) + } } func (c *connection) receiveResponses() { @@ -113,11 +150,7 @@ func (c *connection) receiveResponses() { break } - h, loaded := c.handlers.LoadAndDelete(header.StreamId) - if !loaded { - panic(fmt.Sprintf("No handler for stream id %d", header.StreamId)) - } - handler := h.(streamHandler) + handler := c.getHandler(header.StreamId) var response BinaryResponse if header.BodyLength == 0 { response = NewEmptyResponse(header.Op) @@ -125,6 +158,8 @@ func (c *connection) receiveResponses() { buf := bodyBuffer[:header.BodyLength] _, err := io.ReadFull(c.conn, buf) if err != nil { + // The handler was dequeued, surface the error + handler(NewClientErrorResponse("Error reading body from server")) c.logger.Debug("Error reading body from binary server: %s", err) break } @@ -136,7 +171,17 @@ func (c *connection) receiveResponses() { handler(response) } - c.close() + + c.Close() +} + +// Gets and deletes the handler from the pending handlers +func (c *connection) getHandler(id StreamId) streamHandler { + h, loaded := c.handlers.LoadAndDelete(id) + if !loaded { + panic(fmt.Sprintf("No handler for stream id %d", id)) + } + return h.(func(BinaryResponse)) } func totalRequestSize(r BinaryRequest) int { @@ -148,10 +193,11 @@ func (c *connection) sendRequests() { shouldExit := false var item BinaryRequest + var group []BinaryRequest for !shouldExit { w.Reset() groupSize := 0 - group := make([]BinaryRequest, 0) + group = make([]BinaryRequest, 0) canAddNext := true if item == nil { @@ -204,7 +250,18 @@ func (c *connection) sendRequests() { } } } - c.close() + + // Close in-flight group + for _, request := range group { + streamId := request.StreamId() + if streamId == nil { + panic("Invalid nil stream id") + } + handler := c.getHandler(*streamId) + handler(NewClientErrorResponse("Error while sending request")) + } + + c.Close() } type connectionSet map[*connection]bool @@ -249,7 +306,7 @@ func (p *producerConnectionPool) Close() { pool := p.getConnections() for c := range pool { - c.close() + c.Close() } } @@ -295,7 +352,7 @@ func (p *producerConnectionPool) startConnecting() { if p.isPoolClosed() { p.mu.Unlock() - c.close() + c.Close() return } diff --git a/internal/connection_test.go b/internal/connection_test.go new file mode 100644 index 0000000..23c6e26 --- /dev/null +++ b/internal/connection_test.go @@ -0,0 +1,96 @@ +package internal + +import ( + "sync" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + . "github.com/polarstreams/go-client/internal/serialization/producer" + fakes "github.com/polarstreams/go-client/internal/test/fakes" + . "github.com/polarstreams/go-client/types" +) + +var _ = Describe("connection", func() { + Describe("Send()", func() { + It("should recover from closed request channel", func () { + c := &connection{ + streamIds: make(chan StreamId, 10), + requests: make(chan BinaryRequest, 10), + logger: StdLogger, + } + + c.streamIds <- 0 + + close(c.requests) + + resp := c.Send(&ProduceRequest{}) + Expect(resp).To(Equal(NewClientErrorResponse("Request could not be sent: connection closed"))) + }) + + It("should return when handler is invoked", func () { + c := &connection{ + streamIds: make(chan StreamId, 10), + requests: make(chan BinaryRequest, 10), + logger: StdLogger, + } + + c.streamIds <- 0 + + r := make(chan BinaryResponse, 1) + var wg sync.WaitGroup + wg.Add(1) + go func() { + wg.Done() + r <- c.Send(&ProduceRequest{}) + }() + wg.Wait() + handler := c.getHandler(0) + handler(NewEmptyResponse(ProduceResponseOp)) + + select { + case resp := <-r: + Expect(resp).To(Equal(NewEmptyResponse(ProduceResponseOp))) + case <-time.After(1 * time.Second): + panic("Test timeout") + } + }) + }) + + Describe("Close()", func () { + It("should invoke pending handlers", func () { + c := &connection{ + streamIds: make(chan StreamId, 10), + requests: make(chan BinaryRequest, 10), + logger: StdLogger, + conn: &fakes.Connection{}, + disconnectHandler: &producerConnectionPool{}, + } + + const pending = 3 + r := make(chan BinaryResponse, pending) + var wg sync.WaitGroup + for i := 0; i < pending; i++ { + c.streamIds <- StreamId(i) + wg.Add(1) + go func() { + wg.Done() + r <- c.Send(&ProduceRequest{}) + }() + } + + wg.Wait() + + go c.Close() + + for i := 0; i < pending; i++ { + select { + case resp := <-r: + Expect(resp).To(Equal(NewClientErrorResponse("Request could not be sent: connection closed"))) + case <-time.After(1 * time.Second): + panic("Test timeout") + } + } + }) + }) +}) \ No newline at end of file diff --git a/internal/serialization/producer/models.go b/internal/serialization/producer/models.go index 9eea46c..2fb2924 100644 --- a/internal/serialization/producer/models.go +++ b/internal/serialization/producer/models.go @@ -38,6 +38,7 @@ const ( ServerError ErrorCode = 0 RoutingError ErrorCode = 1 LeaderNotFoundError ErrorCode = 2 + ClientError ErrorCode = 255 ) // Header for producer messages. Order of fields defines the serialization format. @@ -55,6 +56,12 @@ var HeaderSize = binarySize(BinaryHeader{}) type BinaryRequest interface { Marshal(w *bytes.Buffer) error BodyLength() int + + // Sets the stream id in a thread safe manner + SetStreamId(id StreamId) + + // Gets the stream id in a thread safe manner + StreamId() *StreamId } type BinaryResponse interface { @@ -79,6 +86,13 @@ type ErrorResponse struct { Message string } +func NewClientErrorResponse(message string) BinaryResponse { + return &ErrorResponse{ + Code: ClientError, + Message: message, + } +} + func (r *ErrorResponse) Op() OpCode { return ErrorOp } @@ -107,3 +121,25 @@ func WriteHeader(w *bytes.Buffer, header *BinaryHeader) error { Endianness.PutUint32(headerBuf[len(headerBuf)-crcByteSize:], crc) return nil } + +type ProduceRequest struct { +} + +func (r *ProduceRequest) Marshal(w *bytes.Buffer) error { + // TODO: IMPLEMENT + return nil +} + +func (r *ProduceRequest) BodyLength() int { + // TODO: IMPLEMENT + return 0 +} + +func (r *ProduceRequest) SetStreamId(id StreamId) { + // TODO: IMPLEMENT +} + +func (r *ProduceRequest) StreamId() *StreamId { + // TODO: IMPLEMENT + return nil +} \ No newline at end of file diff --git a/internal/test/fakes/connection.go b/internal/test/fakes/connection.go new file mode 100644 index 0000000..f325b41 --- /dev/null +++ b/internal/test/fakes/connection.go @@ -0,0 +1,43 @@ +package fakes + +import ( + "net" + "time" +) + +type Connection struct { + WriteBuffers [][]byte +} + +func (c *Connection) Read(b []byte) (n int, err error) { + return len(b), nil +} + +func (c *Connection) Write(b []byte) (n int, err error) { + c.WriteBuffers = append(c.WriteBuffers, b) + return len(b), nil +} + +func (c *Connection) Close() error { + return nil +} + +func (c *Connection) LocalAddr() net.Addr { + return nil +} + +func (c *Connection) RemoteAddr() net.Addr { + return nil +} + +func (c *Connection) SetDeadline(t time.Time) error { + return nil +} + +func (c *Connection) SetReadDeadline(t time.Time) error { + return nil +} + +func (c *Connection) SetWriteDeadline(t time.Time) error { + return nil +} From ac0f033c85e35d2ca6ebeb17ba49d298dabf5acf Mon Sep 17 00:00:00 2001 From: Jorge Bay Date: Sat, 4 Feb 2023 18:01:24 +0100 Subject: [PATCH 3/9] Adapt integration tests to use binary producer --- build/compose/docker-compose-cluster.yml | 6 +- build/compose/docker-compose-dev.yml | 2 +- consumer.go | 4 - internal/backoff.go | 23 +- internal/client.go | 126 ++++---- internal/client_consumer_test.go | 6 +- internal/client_test.go | 361 ++++++++++++++++------ internal/connection.go | 156 +++++++--- internal/connection_test.go | 35 ++- internal/models.go | 8 - internal/producer_client.go | 118 +++++++ internal/serialization/producer/models.go | 72 +++-- internal/test/fakes/connection.go | 4 +- internal/types/types.go | 10 + internal/utils/utils.go | 6 +- producer.go | 25 +- types/models.go | 7 + 17 files changed, 694 insertions(+), 275 deletions(-) create mode 100644 internal/producer_client.go create mode 100644 internal/types/types.go diff --git a/build/compose/docker-compose-cluster.yml b/build/compose/docker-compose-cluster.yml index 5feacc3..13f440e 100644 --- a/build/compose/docker-compose-cluster.yml +++ b/build/compose/docker-compose-cluster.yml @@ -10,7 +10,7 @@ services: depends_on: - polar_0 polar_0: - image: "polarstreams/polar:dev3" + image: "polarstreams/polar:dev4" env_file: "../env/test.env" environment: - POLAR_BROKER_NAMES=polar_0,polar_1,polar_2 @@ -20,13 +20,13 @@ services: interval: 1s start_period: 10s polar_1: - image: "polarstreams/polar:dev3" + image: "polarstreams/polar:dev4" env_file: "../env/test.env" environment: - POLAR_BROKER_NAMES=polar_0,polar_1,polar_2 - POLAR_ORDINAL=1 polar_2: - image: "polarstreams/polar:dev3" + image: "polarstreams/polar:dev4" env_file: "../env/test.env" environment: - POLAR_BROKER_NAMES=polar_0,polar_1,polar_2 diff --git a/build/compose/docker-compose-dev.yml b/build/compose/docker-compose-dev.yml index 2e3fba3..52aa7e6 100644 --- a/build/compose/docker-compose-dev.yml +++ b/build/compose/docker-compose-dev.yml @@ -7,7 +7,7 @@ services: depends_on: - polar polar: - image: "polarstreams/polar:dev3" + image: "polarstreams/polar:dev4" env_file: "../env/test.env" environment: - POLAR_DEV_MODE=true diff --git a/consumer.go b/consumer.go index fd4f4a8..ac4f4cc 100644 --- a/consumer.go +++ b/consumer.go @@ -65,10 +65,6 @@ func NewConsumerWithOpts(serviceUrl string, options ConsumerOptions) (Consumer, return nil, err } - if err := client.Connect(); err != nil { - return nil, err - } - client.RegisterAsConsumer(options) c := &consumer{ diff --git a/internal/backoff.go b/internal/backoff.go index 35ed7c5..5b206b7 100644 --- a/internal/backoff.go +++ b/internal/backoff.go @@ -10,6 +10,13 @@ const ( maxReconnectionDelayMs = 30_000 ) +type BackoffPolicy interface { + Reset() + + // Returns the next delay + Next() time.Duration +} + // Represents a default reconnection strategy type exponentialBackoff struct { index int @@ -19,12 +26,12 @@ func newExponentialBackoff() *exponentialBackoff { return &exponentialBackoff{} } -func (p *exponentialBackoff) reset() { +func (p *exponentialBackoff) Reset() { p.index = 0 } // Returns an exponential delay -func (p *exponentialBackoff) next() time.Duration { +func (p *exponentialBackoff) Next() time.Duration { p.index++ delayMs := maxReconnectionDelayMs @@ -37,3 +44,15 @@ func (p *exponentialBackoff) next() time.Duration { return time.Duration(delayMs) * time.Millisecond } + +type fixedBackoff struct { + delay time.Duration +} + +func (p *fixedBackoff) Reset() { +} + +// Returns an exponential delay +func (p *fixedBackoff) Next() time.Duration { + return p.delay +} diff --git a/internal/client.go b/internal/client.go index d208e43..2bb33cd 100644 --- a/internal/client.go +++ b/internal/client.go @@ -11,12 +11,14 @@ import ( "net" "net/http" "net/url" + "reflect" "strings" "sync" "sync/atomic" "time" "github.com/polarstreams/go-client/internal/serialization" + "github.com/polarstreams/go-client/internal/serialization/producer" "github.com/polarstreams/go-client/internal/utils" . "github.com/polarstreams/go-client/types" "golang.org/x/net/context" @@ -27,7 +29,7 @@ const DefaultTopologyPollInterval = 10 * time.Second const defaultPollReqInterval = 5 * time.Second const baseReconnectionDelay = 100 * time.Millisecond const maxReconnectionDelay = 2 * time.Minute -const maxOrdinal = 1 << 31 +const maxAtomicIncrement = 1 << 31 const defaultDiscoveryPort = 9250 const producerMaxConnsPerHost = 1 @@ -40,7 +42,7 @@ const ( type Client struct { discoveryClient *http.Client - producerClient *http.Client + producerClient *producerClient consumerClient *http.Client discoveryUrl string // The full discovery url, like http://host:port/path discoveryHost string // The host and port @@ -59,9 +61,12 @@ type Client struct { } type ClientOptions struct { - Logger Logger - TopologyPollInterval time.Duration - FixedReconnectionDelay time.Duration + Logger Logger + TopologyPollInterval time.Duration + FixedReconnectionDelay time.Duration + ProducerInitialize bool + ProducerFlushThresholdBytes int + ProducerConnectionsPerHost int } var jitterRng = rand.New(rand.NewSource(time.Now().UnixNano())) @@ -111,15 +116,27 @@ func NewClient(serviceUrl string, options *ClientOptions) (*Client, error) { fixedReconnectionDelay: options.FixedReconnectionDelay, } - client.producerClient = &http.Client{ - Transport: &http.Transport{ - MaxConnsPerHost: producerMaxConnsPerHost, - MaxIdleConnsPerHost: producerMaxConnsPerHost, - DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { - return client.dial(network, addr, client.getProducerStatus(addr)) - }, - }, - Timeout: 1 * time.Second, + topology, err := client.queryTopology() + if err != nil { + return nil, err + } + options.Logger.Info("Discovered cluster composed of %d brokers", topology.Length) + + if topology.ProducerBinaryPort == 0 { + return nil, fmt.Errorf("Invalid server version, make sure you use the latest polar version") + } + + client.topology.Store(topology) + go client.pollTopology() + + if options.ProducerInitialize { + // The producer client eagerly creates connections + client.producerClient = newProducerClient( + topology, + options.ProducerConnectionsPerHost, + options.ProducerFlushThresholdBytes, + options.FixedReconnectionDelay, + options.Logger) } client.consumerClient = &http.Client{ @@ -143,7 +160,7 @@ func NewClient(serviceUrl string, options *ClientOptions) (*Client, error) { } func (c *Client) dial(network string, addr string, brokerStatus *BrokerStatusInfo) (*utils.TrackedConnection, error) { - c.logger.Info("Creating new connection to %s", addr) + c.logger.Info("Creating new consumer connection to %s", addr) conn, err := net.Dial(network, addr) if err != nil { c.logger.Warn("Connection to %s could not be established", addr) @@ -167,22 +184,8 @@ func (c *Client) dial(network string, addr string, brokerStatus *BrokerStatusInf return tc, nil } -// Gets the topology the first time and starts the loop for polling for changes. -// -// Close() should be called to stop polling. -func (c *Client) Connect() error { - topology, err := c.queryTopology() - if err != nil { - return err - } - - c.topology.Store(topology) - go c.pollTopology() - return nil -} - -func (c *Client) isProducerUp(ordinal int, t *Topology) bool { - return c.getProducerStatusByOrdinal(ordinal, t).IsUp() +func (c *Client) isProducerUp(ordinal int) bool { + return c.producerClient.IsProducerUp(ordinal) } func (c *Client) getProducerStatusByOrdinal(ordinal int, t *Topology) *BrokerStatusInfo { @@ -296,11 +299,17 @@ func (c *Client) pollTopology() { time.Sleep(jitter(c.topologyPollInterval)) newTopology, err := c.queryTopology() if err != nil { - // TODO: Use logging + c.logger.Warn("Error while trying to get the topology: %s", err) continue } - - c.topology.Store(newTopology) + currentTopology := c.Topology() + if !reflect.DeepEqual(*newTopology, *currentTopology) { + c.topology.Store(newTopology) + c.logger.Info("Topology changed: %d brokers", newTopology.Length) + if c.producerClient != nil { + c.producerClient.OnNewTopology(newTopology) + } + } } } @@ -336,14 +345,15 @@ func (c *Client) useDiscoveryHostForBroker(t *Topology) Topology { } return Topology{ - Length: 1, - BrokerNames: []string{name}, - ProducerPort: t.ProducerPort, - ConsumerPort: t.ConsumerPort, + Length: 1, + BrokerNames: []string{name}, + ProducerPort: t.ProducerPort, + ConsumerPort: t.ConsumerPort, + ProducerBinaryPort: t.ProducerBinaryPort, } } -func (c *Client) ProduceJson(topic string, message io.Reader, partitionKey string) (*http.Response, error) { +func (c *Client) ProduceJson(topic string, message io.Reader, partitionKey string) error { t := c.Topology() ordinal := 0 if partitionKey == "" { @@ -356,36 +366,38 @@ func (c *Client) ProduceJson(topic string, message io.Reader, partitionKey strin initialPosition, err := bufferedMessage.Seek(0, io.SeekCurrent) if err != nil { // Seeking current position should be a safe operation, in any case, error out - return nil, err + return err } maxAttempts := int(math.Min(float64(t.Length), 4)) + var lastErr error for i := 0; i < maxAttempts; i++ { if i > 0 { // Rewind the reader _, err := bufferedMessage.Seek(initialPosition, io.SeekStart) if err != nil { - return nil, err + return err } } brokerOrdinal := (ordinal + i) % t.Length - if !c.isProducerUp(brokerOrdinal, t) { + if !c.producerClient.IsProducerUp(brokerOrdinal) { c.logger.Debug("B%d is down, moving to next host", brokerOrdinal) + lastErr = fmt.Errorf("Broker B%d is down", brokerOrdinal) } - url := t.ProducerUrl(topic, brokerOrdinal, partitionKey) - req, err := http.NewRequest(http.MethodPost, url, bufferedMessage) - if err != nil { - return nil, err + resp := c.producerClient.Send(brokerOrdinal, topic, bufferedMessage, partitionKey) + if resp.Op() == producer.ProduceResponseOp { + // Success + return nil } - req.Header.Set("Content-Type", contentType) - resp, err := c.producerClient.Do(req) - if err == nil { - return resp, nil + if resp.Op() == producer.ErrorOp { + lastErr = resp.(*producer.ErrorResponse).ToError() + continue } + lastErr = fmt.Errorf("Response op is not valid: %d", resp.Op()) } - return nil, fmt.Errorf("No broker available: attempted %d brokers", maxAttempts) + return fmt.Errorf("No broker available: %d attempted, last error: %s", maxAttempts, lastErr) } func (c *Client) RegisterAsConsumer(options ConsumerOptions) { @@ -574,8 +586,10 @@ func (c *Client) Close() { c.logger.Info("PolarStreams client closing") atomic.StoreInt64(&c.isClosing, 1) c.discoveryClient.CloseIdleConnections() - c.producerClient.CloseIdleConnections() c.consumerClient.CloseIdleConnections() + if c.producerClient != nil { + c.producerClient.Close() + } } func bodyClose(r *http.Response) { @@ -586,7 +600,7 @@ func bodyClose(r *http.Response) { func (c *Client) getNextOrdinal(index *uint32, t *Topology) int { value := atomic.AddUint32(index, 1) - if value >= maxOrdinal { + if value >= maxAtomicIncrement { // Atomic inc operations don't wrap around. // Not exactly fair when value >= maxOrdinal, but in practical terms is good enough atomic.StoreUint32(index, 0) @@ -617,4 +631,10 @@ func setDefaultOptions(options *ClientOptions) { if options.TopologyPollInterval == 0 { options.TopologyPollInterval = DefaultTopologyPollInterval } + if options.ProducerConnectionsPerHost == 0 { + options.ProducerConnectionsPerHost = 1 + } + if options.ProducerFlushThresholdBytes == 0 { + options.ProducerFlushThresholdBytes = 16 * 1024 + } } diff --git a/internal/client_consumer_test.go b/internal/client_consumer_test.go index efcb684..a2f9d9b 100644 --- a/internal/client_consumer_test.go +++ b/internal/client_consumer_test.go @@ -39,7 +39,7 @@ var _ = Describe("Client", func() { }) It("should subscribe to each discovered server", func() { - client := newTestClient(discoveryAddress) + client := newTestClient(discoveryAddress, true) defer client.Close() options := ConsumerOptions{ Group: "a", @@ -56,7 +56,7 @@ var _ = Describe("Client", func() { s0.Shutdown(context.Background()) s0 = nil - client := newTestClient(discoveryAddress) + client := newTestClient(discoveryAddress, true) defer client.Close() options := ConsumerOptions{ Group: "a", @@ -118,7 +118,7 @@ var _ = Describe("Client", func() { }) It("should query brokers until it times out", func() { - client := newTestClient(discoveryAddress) + client := newTestClient(discoveryAddress, true) defer client.Close() client.RegisterAsConsumer(consumerOptions) diff --git a/internal/client_test.go b/internal/client_test.go index 2fad5f1..5b360eb 100644 --- a/internal/client_test.go +++ b/internal/client_test.go @@ -1,11 +1,13 @@ package internal import ( - "context" + "bytes" + "encoding/binary" "encoding/json" "fmt" "io" "log" + "net" "net/http" "net/http/httptest" "strings" @@ -15,6 +17,7 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "github.com/polarstreams/go-client/internal/serialization/producer" "github.com/polarstreams/go-client/types" "golang.org/x/net/http2" "golang.org/x/net/http2/h2c" @@ -25,6 +28,7 @@ const ( partitionKeyT1Range = "567" partitionKeyT2Range = "234" ) +const topicName = "abc" const reconnectionDelay = 20 * time.Millisecond const additionalTestDelay = 500 * time.Millisecond @@ -38,12 +42,6 @@ var _ = Describe("Client", func() { // Note that on macos you need to manually create the alias for the loopback addresses, for example // for i in {2..3}; do sudo ifconfig lo0 alias 127.0.0.$i up; done Describe("NewClient()", func() { - It("should parse the url and set the http client", func() { - client, err := NewClient("polar://my-host:1234/", nil) - Expect(err).NotTo(HaveOccurred()) - Expect(client.discoveryUrl).To(Equal("http://my-host:1234/v1/brokers")) - }) - It("should return an error when service url is invalid", func() { _, err := NewClient("zzz://my-host:1234/", nil) Expect(err).To(HaveOccurred()) @@ -63,10 +61,11 @@ var _ = Describe("Client", func() { server = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") topology := Topology{ - BaseName: baseName, - Length: 10 + int(atomic.AddInt64(&counter, 1)), - ProducerPort: 8091, - ConsumerPort: 8092, + BaseName: baseName, + Length: 10 + int(atomic.AddInt64(&counter, 1)), + ProducerPort: 8091, + ConsumerPort: 8092, + ProducerBinaryPort: 8093, } Expect(json.NewEncoder(w).Encode(topology)).NotTo(HaveOccurred()) })) @@ -84,13 +83,13 @@ var _ = Describe("Client", func() { It("should retrieve and store the topology", func() { client, err := NewClient(fmt.Sprintf("polar://%s", discoveryAddress), nil) Expect(err).NotTo(HaveOccurred()) - Expect(client.Connect()).NotTo(HaveOccurred()) defer client.Close() Expect(client.Topology()).To(Equal(&Topology{ - BaseName: baseName, - Length: 11, - ProducerPort: 8091, - ConsumerPort: 8092, + BaseName: baseName, + Length: 11, + ProducerPort: 8091, + ConsumerPort: 8092, + ProducerBinaryPort: 8093, })) }) @@ -99,7 +98,6 @@ var _ = Describe("Client", func() { client, err := NewClient(fmt.Sprintf("polar://%s", discoveryAddress), nil) Expect(err).NotTo(HaveOccurred()) client.topologyPollInterval = pollInterval - Expect(client.Connect()).NotTo(HaveOccurred()) defer client.Close() time.Sleep(pollInterval * 4) Expect(client.Topology().BaseName).To(Equal(baseName)) @@ -110,41 +108,41 @@ var _ = Describe("Client", func() { Context("With a healthy cluster", func() { Describe("ProduceJson()", func() { var discoveryServer *httptest.Server - var s0, s1, s2 *http.Server - var c0, c1, c2 chan string + var shutdown0, shutdown1, shutdown2 func() + var c0, c1, c2 chan produceRequest topology := newTestTopology() discoveryAddress := "" BeforeEach(func() { discoveryServer = NewDiscoveryServer(topology) discoveryAddress = discoveryServer.URL[7:] // Remove http:// - s0, c0 = NewProducerServerWithChannel("127.0.0.1:8091") - s1, c1 = NewProducerServerWithChannel("127.0.0.2:8091") - s2, c2 = NewProducerServerWithChannel("127.0.0.3:8091") + shutdown0, c0 = NewProducerServerWithChannel("127.0.0.1:8093") + shutdown1, c1 = NewProducerServerWithChannel("127.0.0.2:8093") + shutdown2, c2 = NewProducerServerWithChannel("127.0.0.3:8093") }) AfterEach(func() { discoveryServer.Close() - s0.Shutdown(context.Background()) - s1.Shutdown(context.Background()) - s2.Shutdown(context.Background()) + shutdown0() + shutdown1() + shutdown2() }) It("should send a request to each host in round robin", func() { - client := newTestClient(discoveryAddress) + client := newTestClient(discoveryAddress, true) defer client.Close() produceJson(client, `{"key0": "value0"}`, "") produceJson(client, `{"key1": "value1"}`, "") produceJson(client, `{"key2": "value2"}`, "") - Expect(drainChan(c0)).To(Equal([]string{`{"key0": "value0"}`})) - Expect(drainChan(c1)).To(Equal([]string{`{"key1": "value1"}`})) - Expect(drainChan(c2)).To(Equal([]string{`{"key2": "value2"}`})) + Expect(drainChan(c0)).To(Equal([]produceRequest{{topic: topicName, message: `{"key0": "value0"}`}})) + Expect(drainChan(c1)).To(Equal([]produceRequest{{topic: topicName, message: `{"key1": "value1"}`}})) + Expect(drainChan(c2)).To(Equal([]produceRequest{{topic: topicName, message: `{"key2": "value2"}`}})) }) It("should send a request to each host according to the partition key", func() { - client := newTestClient(discoveryAddress) + client := newTestClient(discoveryAddress, true) defer client.Close() produceJson(client, `{"key0": "value0_0"}`, partitionKeyT0Range) @@ -153,9 +151,29 @@ var _ = Describe("Client", func() { produceJson(client, `{"key2": "value2"}`, partitionKeyT2Range) produceJson(client, `{"key1": "value1"}`, partitionKeyT1Range) - Expect(drainChan(c0)).To(Equal([]string{`{"key0": "value0_0"}`, `{"key0": "value0_1"}`, `{"key0": "value0_2"}`})) - Expect(drainChan(c1)).To(Equal([]string{`{"key1": "value1"}`})) - Expect(drainChan(c2)).To(Equal([]string{`{"key2": "value2"}`})) + Expect(drainChan(c0)).To(Equal([]produceRequest{{ + topic: topicName, + message: `{"key0": "value0_0"}`, + partitionKey: partitionKeyT0Range, + }, { + topic: topicName, + message: `{"key0": "value0_1"}`, + partitionKey: partitionKeyT0Range, + }, { + topic: topicName, + message: `{"key0": "value0_2"}`, + partitionKey: partitionKeyT0Range, + }})) + Expect(drainChan(c1)).To(Equal([]produceRequest{{ + topic: topicName, + message: `{"key1": "value1"}`, + partitionKey: partitionKeyT1Range, + }})) + Expect(drainChan(c2)).To(Equal([]produceRequest{{ + topic: topicName, + message: `{"key2": "value2"}`, + partitionKey: partitionKeyT2Range, + }})) }) }) }) @@ -165,86 +183,120 @@ var _ = Describe("Client", func() { Context("With a partial online cluster", func() { var discoveryServer *httptest.Server - var s0, s1, s2 *http.Server - var c0, c1, c2 chan string + var shutdown0, shutdown1, shutdown2 func() + var c0, c1, c2 chan produceRequest topology := Topology{ - Length: 3, - BrokerNames: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3"}, - ProducerPort: 8091, - ConsumerPort: 8092, + Length: 3, + BrokerNames: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3"}, + ProducerPort: 8091, + ConsumerPort: 8092, + ProducerBinaryPort: 8093, } discoveryAddress := "" BeforeEach(func() { discoveryServer = NewDiscoveryServer(topology) discoveryAddress = discoveryServer.URL[7:] // Remove http:// - s1, c1 = NewProducerServerWithChannel("127.0.0.2:8091") - s2, c2 = NewProducerServerWithChannel("127.0.0.3:8091") + shutdown1, c1 = NewProducerServerWithChannel("127.0.0.2:8093") + shutdown2, c2 = NewProducerServerWithChannel("127.0.0.3:8093") }) AfterEach(func() { discoveryServer.Close() - s1.Shutdown(context.Background()) - s2.Shutdown(context.Background()) - if s0 != nil { - s0.Shutdown(context.Background()) + shutdown1() + shutdown2() + if shutdown0 != nil { + shutdown0() } }) It("should route request according to the partition key or use the next host", func() { - client := newTestClient(discoveryAddress) + client := newTestClient(discoveryAddress, true) defer client.Close() // Host 0 is offline produceJson(client, `{"key0": "value0_0"}`, partitionKeyT0Range) produceJson(client, `{"key1": "value1"}`, partitionKeyT1Range) - Expect(drainChan(c0)).To(Equal([]string{})) + Expect(drainChan(c0)).To(Equal([]produceRequest{})) // The first message was rerouted to B1 - Expect(drainChan(c1)).To(Equal([]string{`{"key0": "value0_0"}`, `{"key1": "value1"}`})) - Expect(drainChan(c2)).To(Equal([]string{})) - t := client.Topology() - Expect(client.isProducerUp(0, t)).To(BeFalse()) - Expect(client.isProducerUp(1, t)).To(BeTrue()) - Expect(client.isProducerUp(2, t)).To(BeTrue()) + Expect(drainChan(c1)).To(Equal([]produceRequest{{ + topic: topicName, + message: `{"key0": "value0_0"}`, + partitionKey: partitionKeyT0Range, + }, { + topic: topicName, + message: `{"key1": "value1"}`, + partitionKey: partitionKeyT1Range, + }})) + Expect(drainChan(c2)).To(Equal([]produceRequest{})) + Expect(client.isProducerUp(0)).To(BeFalse()) + Expect(client.isProducerUp(1)).To(BeTrue()) + Expect(client.isProducerUp(2)).To(BeTrue()) time.Sleep(reconnectionDelay * 2) - s0, c0 = NewProducerServerWithChannel("127.0.0.1:8091") + shutdown0, c0 = NewProducerServerWithChannel("127.0.0.1:8093") time.Sleep(reconnectionDelay + additionalTestDelay) - Expect(client.isProducerUp(0, t)).To(BeTrue()) + Expect(client.isProducerUp(0)).To(BeTrue()) produceJson(client, `{"key0": "value0_1"}`, partitionKeyT0Range) - Expect(drainChan(c0)).To(Equal([]string{`{"key0": "value0_1"}`})) + Expect(drainChan(c0)).To(Equal([]produceRequest{{ + topic: topicName, + message: `{"key0": "value0_1"}`, + partitionKey: partitionKeyT0Range, + }})) }) It("should reconnect after successful initial connection", func() { - s0, c0 = NewProducerServerWithChannel("127.0.0.1:8091") - client := newTestClient(discoveryAddress) + shutdown0, c0 = NewProducerServerWithChannel("127.0.0.1:8093") + client := newTestClient(discoveryAddress, true) defer client.Close() - // Host 0 is offline + // Host 0 is online produceJson(client, `{"key0": "value0_0"}`, partitionKeyT0Range) produceJson(client, `{"key1": "value1"}`, partitionKeyT1Range) - Expect(drainChan(c0)).To(Equal([]string{`{"key0": "value0_0"}`})) - Expect(drainChan(c1)).To(Equal([]string{`{"key1": "value1"}`})) - Expect(drainChan(c2)).To(Equal([]string{})) + Expect(drainChan(c0)).To(Equal([]produceRequest{{ + topic: topicName, + message: `{"key0": "value0_0"}`, + partitionKey: partitionKeyT0Range, + }})) + Expect(drainChan(c1)).To(Equal([]produceRequest{{ + topic: topicName, + message: `{"key1": "value1"}`, + partitionKey: partitionKeyT1Range, + }})) + Expect(drainChan(c2)).To(Equal([]produceRequest{})) - s0.Shutdown(context.Background()) - produceJson(client, `{"key0": "value0_1"}`, partitionKeyT0Range) - Expect(drainChan(c1)).To(Equal([]string{`{"key0": "value0_1"}`})) - - t := client.Topology() - Expect(client.isProducerUp(0, t)).To(BeFalse()) - Expect(client.isProducerUp(1, t)).To(BeTrue()) - Expect(client.isProducerUp(2, t)).To(BeTrue()) + // Shutdown B0 + shutdown0() + time.Sleep(additionalTestDelay) - s0, c0 = NewProducerServerWithChannel("127.0.0.1:8091") - time.Sleep(reconnectionDelay + additionalTestDelay) - Expect(client.isProducerUp(0, t)).To(BeTrue()) + fmt.Println("--Shutdown B0") produceJson(client, `{"key0": "value0_1"}`, partitionKeyT0Range) - Expect(drainChan(c0)).To(Equal([]string{`{"key0": "value0_1"}`})) + Expect(drainChan(c1)).To(Equal([]produceRequest{{ + topic: topicName, + message: `{"key0": "value0_1"}`, + partitionKey: partitionKeyT0Range, + }})) + + Expect(client.isProducerUp(0)).To(BeFalse()) + Expect(client.isProducerUp(1)).To(BeTrue()) + Expect(client.isProducerUp(2)).To(BeTrue()) + + fmt.Println("--Creating server") + shutdown0, c0 = NewProducerServerWithChannel("127.0.0.1:8093") + time.Sleep(reconnectionDelay + additionalTestDelay) + fmt.Println("--Checking back online") + Expect(client.isProducerUp(0)).To(BeTrue()) + + produceJson(client, `{"key0": "value0_2"}`, partitionKeyT0Range) + Expect(drainChan(c0)).To(Equal([]produceRequest{{ + topic: topicName, + message: `{"key0": "value0_2"}`, + partitionKey: partitionKeyT0Range, + }})) }) }) }) @@ -267,20 +319,136 @@ func NewTestServer(address string, handler http.Handler) *http.Server { return server } -func NewProducerServerWithChannel(address string) (*http.Server, chan string) { - c := make(chan string, 100) - server := NewTestServer(address, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "text/html; charset=utf-8") - w.Write([]byte("OK")) - if r.URL.Path == "/status" { - log.Printf("Broker with address %s received status request", address) - return +func NewProducerServerWithChannel(address string) (func(), chan produceRequest) { + requests := make(chan produceRequest, 100) + l, err := net.Listen("tcp", address) + Expect(err).NotTo(HaveOccurred()) + go func() { + connections := make([]net.Conn, 0) + for { + conn, err := l.Accept() + if err != nil { + log.Printf("Server at %s stopped accepting connections", address) + break + } + + connections = append(connections, conn) + handleProducerConnection(conn, requests) } - body := reqBody(r) - c <- body - })) - return server, c + for _, c := range connections { + _ = c.Close() + } + }() + + closeHandler := func() { + _ = l.Close() + } + + return closeHandler, requests +} + +func handleProducerConnection(conn net.Conn, requests chan produceRequest) { + responses := make(chan []byte, 100) + go receiveRequests(conn, requests, responses) + go sendResponses(conn, responses) +} + +func receiveRequests(conn net.Conn, requests chan produceRequest, responses chan []byte) { + // First request must be a Startup message + initialized := false + header := &producer.BinaryHeader{} + + for { + err := binary.Read(conn, producer.Endianness, header) + if err != nil { + if err != io.EOF { + log.Printf("Reading header from client failed: %s", err) + } + break + } + + if !initialized { + if header.Op == producer.StartupOp && header.StreamId == 0 && header.BodyLength == 0 { + initialized = true + responses <- createResponse(header, producer.ReadyOp) + continue + } + + log.Printf("Invalid first header: %#v", header) + break + } + + if header.Op == producer.ProduceOp { + r, err := unmarshalRequest(header, conn) + if err != nil { + log.Printf("Reading body from client failed: %s", err) + break + } + requests <- *r + responses <- createResponse(header, producer.ProduceResponseOp) + continue + } + + panic("Received invalid operation in test server") + } + + _ = conn.Close() +} + +func sendResponses(conn net.Conn, responses chan []byte) { + for res := range responses { + _, err := conn.Write(res) + if err != nil { + log.Printf("Write to client failed: %s", err) + break + } + } + + _ = conn.Close() +} + +func createResponse(req *producer.BinaryHeader, op producer.OpCode) []byte { + buf := new(bytes.Buffer) + producer.WriteHeader(buf, &producer.BinaryHeader{ + Version: 1, + Flags: 0, + StreamId: req.StreamId, + Op: op, + BodyLength: 0, + Crc: 0, + }) + return buf.Bytes() +} + +func unmarshalRequest(header *producer.BinaryHeader, conn net.Conn) (*produceRequest, error) { + bodyBuf := make([]byte, header.BodyLength) + _, err := io.ReadFull(conn, bodyBuf) + if err != nil { + return nil, err + } + partitionKey, index := readString(0, bodyBuf) + topic, index := readString(index, bodyBuf) + messageLength := int(producer.Endianness.Uint32(bodyBuf[index:])) + index += 4 + + return &produceRequest{ + topic: topic, + message: string(bodyBuf[index : index+messageLength]), + partitionKey: partitionKey, + }, nil +} + +func readString(index int, buf []byte) (string, int) { + length := int(buf[index]) + end := index + 1 + length + return string(buf[index+1 : end]), end +} + +type produceRequest struct { + topic string + message string + partitionKey string } func NewDiscoveryServer(topology Topology) *httptest.Server { @@ -291,10 +459,8 @@ func NewDiscoveryServer(topology Topology) *httptest.Server { } func produceJson(client *Client, message string, partitionKey string) { - resp, err := client.ProduceJson("abc", strings.NewReader(message), partitionKey) + err := client.ProduceJson(topicName, strings.NewReader(message), partitionKey) Expect(err).NotTo(HaveOccurred()) - Expect(resp.StatusCode).To(Equal(http.StatusOK)) - Expect(respBody(resp)).To(Equal("OK")) } func respBody(resp *http.Response) string { @@ -310,8 +476,8 @@ func reqBody(req *http.Request) string { return string(body) } -func drainChan(c chan string) []string { - result := make([]string, 0) +func drainChan(c chan produceRequest) []produceRequest { + result := make([]produceRequest, 0) hasData := true for hasData { select { @@ -325,22 +491,23 @@ func drainChan(c chan string) []string { } // Returns a connected client -func newTestClient(discoveryAddress string) *Client { +func newTestClient(discoveryAddress string, withProducer bool) *Client { options := ClientOptions{ Logger: types.StdLogger, FixedReconnectionDelay: reconnectionDelay, + ProducerInitialize: withProducer, } client, err := NewClient(fmt.Sprintf("polar://%s", discoveryAddress), &options) Expect(err).NotTo(HaveOccurred()) - Expect(client.Connect()).NotTo(HaveOccurred()) return client } func newTestTopology() Topology { return Topology{ - Length: 3, - BrokerNames: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3"}, - ProducerPort: 8091, - ConsumerPort: 8092, + Length: 3, + BrokerNames: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3"}, + ProducerPort: 8091, + ConsumerPort: 8092, + ProducerBinaryPort: 8093, } } diff --git a/internal/connection.go b/internal/connection.go index 25d3975..d391cdd 100644 --- a/internal/connection.go +++ b/internal/connection.go @@ -10,7 +10,9 @@ import ( "sync/atomic" "time" + "github.com/polarstreams/go-client/internal/serialization/producer" . "github.com/polarstreams/go-client/internal/serialization/producer" + . "github.com/polarstreams/go-client/internal/types" . "github.com/polarstreams/go-client/types" ) @@ -34,7 +36,7 @@ func newConnection(address string, h disconnectHandler, flushThreshold int, logg return nil, err } - w := bytes.NewBuffer(make([]byte, HeaderSize)) + w := bytes.NewBuffer(make([]byte, 0, HeaderSize)) if err := WriteHeader(w, &BinaryHeader{ Version: 1, StreamId: 0, @@ -88,18 +90,19 @@ func newConnection(address string, h disconnectHandler, flushThreshold int, logg } // Creates the response handler, appends the request and waits for the response -func (c *connection) Send(req BinaryRequest) (resp BinaryResponse) { +func (c *connection) Send(topic string, message FixedLengthReader, partitionKey string) (resp BinaryResponse) { defer func() { - if r := recover(); r != nil { + if r := recover(); r != nil { resp = NewClientErrorResponse("Request could not be sent: connection closed") - } - }() + } + }() + + streamId := <-c.streamIds + req := producer.NewProduceRequest(streamId, topic, message, partitionKey) response := make(chan BinaryResponse, 1) - streamId := <- c.streamIds c.handlers.Store(streamId, func(r BinaryResponse) { response <- r }) - req.SetStreamId(streamId) // Append the request, it might panic when requests channel is closed c.requests <- req @@ -110,13 +113,14 @@ func (c *connection) Send(req BinaryRequest) (resp BinaryResponse) { func (c *connection) Close() { c.closeOnce.Do(func() { + c.logger.Debug("Connection to %s closed", c.conn.RemoteAddr().String()) close(c.requests) _ = c.conn.Close() c.disconnectHandler.OnConnectionClose(c) }) toDelete := make([]StreamId, 0) - c.handlers.Range(func (key, value interface{}) bool { + c.handlers.Range(func(key, value interface{}) bool { toDelete = append(toDelete, key.(StreamId)) return true }) @@ -133,20 +137,13 @@ func (c *connection) Close() { func (c *connection) receiveResponses() { header := &BinaryHeader{} - headerBuffer := make([]byte, HeaderSize) - headerReader := bytes.NewReader(headerBuffer) bodyBuffer := make([]byte, ResponseBodyMaxLength) for { - _, err := io.ReadFull(c.conn, headerBuffer) - if err != nil { - c.logger.Debug("Error reading from binary server: %s", err) - break - } - headerReader.Reset(headerBuffer) - err = binary.Read(headerReader, Endianness, &header) - if err != nil { - c.logger.Debug("Error reading header from binary server: %s", err) + if err := binary.Read(c.conn, Endianness, header); err != nil { + if err != io.EOF { + c.logger.Debug("Error reading header from binary server: %s", err) + } break } @@ -190,6 +187,7 @@ func totalRequestSize(r BinaryRequest) int { func (c *connection) sendRequests() { w := bytes.NewBuffer(make([]byte, c.flushThreshold)) + header := &BinaryHeader{Version: 1, Flags: 0} // Reuse allocation shouldExit := false var item BinaryRequest @@ -221,14 +219,14 @@ func (c *connection) sendRequests() { shouldExit = true break } - responseSize := totalRequestSize(request) - if responseSize+w.Len() > c.flushThreshold { + requestSize := totalRequestSize(request) + if groupSize+requestSize > c.flushThreshold { canAddNext = false item = request break } group = append(group, request) - groupSize += responseSize + groupSize += requestSize default: canAddNext = false @@ -236,8 +234,8 @@ func (c *connection) sendRequests() { } for _, request := range group { - if err := request.Marshal(w); err != nil { - c.logger.Warn("There was an error while marshaling a request, closing connection: %s", err) + if err := request.Marshal(w, header); err != nil { + c.logger.Error("Error marshaling a request, closing connection: %s", err) shouldExit = true break } @@ -254,10 +252,7 @@ func (c *connection) sendRequests() { // Close in-flight group for _, request := range group { streamId := request.StreamId() - if streamId == nil { - panic("Invalid nil stream id") - } - handler := c.getHandler(*streamId) + handler := c.getHandler(streamId) handler(NewClientErrorResponse("Error while sending request")) } @@ -275,24 +270,47 @@ type streamHandler func(r BinaryResponse) // Represents a group of connections to a single producer type producerConnectionPool struct { address string - length int // The amount of expected connections - connections atomic.Value // Copy-on-write connections + length int // The amount of expected connections + connectionsSnapshot atomic.Value // Connections collection with snapshot semantics ([]*connection) + connections connectionSet // Copy-on-write connections, must be accessed taking the lock mu sync.Mutex isConnecting int32 isClosed int32 logger Logger connectionFlushThreshold int + index uint32 // Used for round robin through connections + initHandler func() // Invoked once after the first connection attempt is made (success or error) + initOnce sync.Once // Use to control the init handler call + reconnectionBackoff BackoffPolicy // Reconnection backoff } -func newProducerConnectionPool(length int, connectionFlushThreshold int, logger Logger) *producerConnectionPool { +func newProducerConnectionPool( + address string, + length int, + connectionFlushThreshold int, + logger Logger, + fixedReconnectionDelay time.Duration, // To simplify test + initHandler func(), +) *producerConnectionPool { + var reconnectionBackoff BackoffPolicy = newExponentialBackoff() + if fixedReconnectionDelay > 0 { + reconnectionBackoff = &fixedBackoff{delay: fixedReconnectionDelay} + } + p := &producerConnectionPool{ + address: address, length: length, - connections: atomic.Value{}, + connectionsSnapshot: atomic.Value{}, + connections: map[*connection]bool{}, mu: sync.Mutex{}, isConnecting: 0, isClosed: 0, logger: logger, connectionFlushThreshold: connectionFlushThreshold, + index: 0, + initHandler: initHandler, + initOnce: sync.Once{}, + reconnectionBackoff: reconnectionBackoff, } go p.startConnecting() @@ -304,7 +322,7 @@ func (p *producerConnectionPool) Close() { p.mu.Lock() defer p.mu.Unlock() - pool := p.getConnections() + pool := p.connections for c := range pool { c.Close() } @@ -313,14 +331,40 @@ func (p *producerConnectionPool) Close() { func (p *producerConnectionPool) OnConnectionClose(c *connection) { // Use a different go routine as we might be holding the lock go func() { - p.mu.Lock() - defer p.mu.Unlock() if p.isPoolClosed() { - p.addOrDelete(c, true) + return } + + // Remove the existing connection from the pool + p.mu.Lock() + p.addOrDelete(c, true) + p.mu.Unlock() + + // Start reconnecting + p.startConnecting() }() } +// Gets the next available connection or nil +func (p *producerConnectionPool) NextConnection() *connection { + connections := p.getConnections() + length := len(connections) + if length == 0 { + return nil + } + if length == 1 { + return connections[0] + } + + index := atomic.AddUint32(&p.index, 1) + if index >= maxAtomicIncrement { + // Atomic inc operations don't wrap around, reset it (good-enough fairness) + atomic.StoreUint32(&p.index, 0) + } + return connections[(int(index)-1)%length] + +} + // Determines whether the pool is closed, we should hold the lock in case the check is important func (p *producerConnectionPool) isPoolClosed() bool { return atomic.LoadInt32(&p.isClosed) == 1 @@ -334,20 +378,27 @@ func (p *producerConnectionPool) startConnecting() { defer atomic.StoreInt32(&p.isConnecting, 0) - backoff := newExponentialBackoff() + p.reconnectionBackoff.Reset() existing := p.getConnections() - for len(existing) < p.length && p.isPoolClosed() { + + for len(existing) < p.length && !p.isPoolClosed() { + p.logger.Info("Creating new producer connection to %s", p.address) c, err := newConnection(p.address, p, p.connectionFlushThreshold, p.logger) + + // Mark as initialized, even on error + p.initOnce.Do(p.initHandler) + if err != nil { if p.isPoolClosed() { return } - time.Sleep(backoff.next()) + p.logger.Info("Error while opening a new connection to %s: %s", p.address, err.Error()) + time.Sleep(p.reconnectionBackoff.Next()) continue } - backoff.reset() + p.reconnectionBackoff.Reset() p.mu.Lock() if p.isPoolClosed() { @@ -358,20 +409,25 @@ func (p *producerConnectionPool) startConnecting() { existing = p.addOrDelete(c, false) p.mu.Unlock() + p.logger.Info("Created connection to %s (%d total)", p.address, len(existing)) } } -func (p *producerConnectionPool) getConnections() connectionSet { - value := p.connections.Load() +func (p *producerConnectionPool) getConnections() []*connection { + value := p.connectionsSnapshot.Load() if value == nil { return nil } - return value.(connectionSet) + return value.([]*connection) +} + +func (p *producerConnectionPool) IsConnected() bool { + return len(p.getConnections()) > 0 } // Adds or deletes a connection, callers MUST hold the lock -func (p *producerConnectionPool) addOrDelete(c *connection, deleteConnection bool) connectionSet { - existingMap := p.getConnections() +func (p *producerConnectionPool) addOrDelete(c *connection, deleteConnection bool) []*connection { + existingMap := p.connections // Shallow copy existing newMap := make(connectionSet, len(existingMap)+1) @@ -385,6 +441,12 @@ func (p *producerConnectionPool) addOrDelete(c *connection, deleteConnection boo newMap[c] = true } - p.connections.Store(newMap) - return newMap + snapshot := make([]*connection, 0, len(newMap)) + for c := range newMap { + snapshot = append(snapshot, c) + } + + p.connections = newMap + p.connectionsSnapshot.Store(snapshot) + return snapshot } diff --git a/internal/connection_test.go b/internal/connection_test.go index 23c6e26..aece56a 100644 --- a/internal/connection_test.go +++ b/internal/connection_test.go @@ -1,6 +1,7 @@ package internal import ( + "bytes" "sync" "time" @@ -13,26 +14,26 @@ import ( var _ = Describe("connection", func() { Describe("Send()", func() { - It("should recover from closed request channel", func () { + It("should recover from closed request channel", func() { c := &connection{ - streamIds: make(chan StreamId, 10), - requests: make(chan BinaryRequest, 10), - logger: StdLogger, + streamIds: make(chan StreamId, 10), + requests: make(chan BinaryRequest, 10), + logger: StdLogger, } c.streamIds <- 0 close(c.requests) - resp := c.Send(&ProduceRequest{}) + resp := c.Send("topic1", bytes.NewReader([]byte("abc")), "key1") Expect(resp).To(Equal(NewClientErrorResponse("Request could not be sent: connection closed"))) }) - It("should return when handler is invoked", func () { + It("should return when handler is invoked", func() { c := &connection{ - streamIds: make(chan StreamId, 10), - requests: make(chan BinaryRequest, 10), - logger: StdLogger, + streamIds: make(chan StreamId, 10), + requests: make(chan BinaryRequest, 10), + logger: StdLogger, } c.streamIds <- 0 @@ -42,7 +43,7 @@ var _ = Describe("connection", func() { wg.Add(1) go func() { wg.Done() - r <- c.Send(&ProduceRequest{}) + r <- c.Send("topic1", bytes.NewReader([]byte("abc")), "key1") }() wg.Wait() handler := c.getHandler(0) @@ -57,14 +58,14 @@ var _ = Describe("connection", func() { }) }) - Describe("Close()", func () { - It("should invoke pending handlers", func () { + Describe("Close()", func() { + It("should invoke pending handlers", func() { c := &connection{ streamIds: make(chan StreamId, 10), requests: make(chan BinaryRequest, 10), logger: StdLogger, conn: &fakes.Connection{}, - disconnectHandler: &producerConnectionPool{}, + disconnectHandler: &fakeDisconnectHandler{}, } const pending = 3 @@ -75,7 +76,7 @@ var _ = Describe("connection", func() { wg.Add(1) go func() { wg.Done() - r <- c.Send(&ProduceRequest{}) + r <- c.Send("topic1", bytes.NewReader([]byte("abc")), "key1") }() } @@ -93,4 +94,8 @@ var _ = Describe("connection", func() { } }) }) -}) \ No newline at end of file +}) + +type fakeDisconnectHandler struct{} + +func (f *fakeDisconnectHandler) OnConnectionClose(c *connection) {} diff --git a/internal/models.go b/internal/models.go index 2ba91e2..04b9425 100644 --- a/internal/models.go +++ b/internal/models.go @@ -13,14 +13,6 @@ type Topology struct { ConsumerPort int `json:"consumerPort"` } -func (t *Topology) ProducerUrl(topic string, ordinal int, partitionKey string) string { - querystring := "" - if partitionKey != "" { - querystring = fmt.Sprintf("?partitionKey=%s", partitionKey) - } - return fmt.Sprintf("http://%s:%d/v1/topic/%s/messages%s", t.hostName(ordinal), t.ProducerPort, topic, querystring) -} - func (t *Topology) hostName(ordinal int) string { if len(t.BrokerNames) > 0 { return t.BrokerNames[ordinal] diff --git a/internal/producer_client.go b/internal/producer_client.go new file mode 100644 index 0000000..31d83be --- /dev/null +++ b/internal/producer_client.go @@ -0,0 +1,118 @@ +package internal + +import ( + "fmt" + "sync" + "sync/atomic" + "time" + + . "github.com/polarstreams/go-client/internal/serialization/producer" + . "github.com/polarstreams/go-client/internal/types" + . "github.com/polarstreams/go-client/types" +) + +// Represents a set of connections per host that can send requests to a producer server +type producerClient struct { + topology atomic.Value + connectionsPerHost int + + // Copy on write semantics for connection pools + pools atomic.Value + mu sync.Mutex +} + +type poolMap map[int]*producerConnectionPool + +func newProducerClient( + t *Topology, + connectionsPerHost int, + flushThresholdBytes int, + fixedReconnectionDelay time.Duration, + logger Logger, +) *producerClient { + topology := atomic.Value{} + topology.Store(t) + + pools := atomic.Value{} + m := make(poolMap, t.Length) + wgInit := sync.WaitGroup{} + for i := 0; i < t.Length; i++ { + address := fmt.Sprintf("%s:%d", t.hostName(i), t.ProducerBinaryPort) + wgInit.Add(1) + m[i] = newProducerConnectionPool( + address, + connectionsPerHost, + flushThresholdBytes, + logger, + fixedReconnectionDelay, + func() { + wgInit.Done() + }, + ) + } + wgInit.Wait() + pools.Store(m) + + logger.Debug("Created producer client to target %d brokers and %d connections each", t.Length, connectionsPerHost) + + return &producerClient{ + topology: topology, + connectionsPerHost: connectionsPerHost, + pools: pools, + mu: sync.Mutex{}, + } +} + +// Return the pool or nil +func (c *producerClient) getPool(ordinal int) *producerConnectionPool { + value := c.pools.Load() + if value == nil { + return nil + } + m := value.(poolMap) + return m[ordinal] +} + +// Gets the current snapshot of the topology +func (c *producerClient) Topology() *Topology { + return c.topology.Load().(*Topology) +} + +func (c *producerClient) IsProducerUp(ordinal int) bool { + pool := c.getPool(ordinal) + fmt.Println("--Producer client pool for B", ordinal, len(pool.connections)) + return pool != nil && pool.IsConnected() +} + +func (c *producerClient) Close() { + defer c.mu.Unlock() + c.mu.Lock() + + pools := c.pools.Swap(make(poolMap)) + for _, pool := range pools.(poolMap) { + pool.Close() + } +} + +func (c *producerClient) OnNewTopology(newTopology *Topology) { + c.topology.Store(newTopology) +} + +// Gets an available connection and sends the request +func (c *producerClient) Send( + ordinal int, + topic string, + message FixedLengthReader, + partitionKey string, +) BinaryResponse { + pool := c.getPool(ordinal) + if pool == nil { + return NewClientErrorResponse("No connection available") + } + conn := pool.NextConnection() + if conn == nil { + return NewClientErrorResponse("No connection available") + } + + return conn.Send(topic, message, partitionKey) +} diff --git a/internal/serialization/producer/models.go b/internal/serialization/producer/models.go index 2fb2924..e3d938a 100644 --- a/internal/serialization/producer/models.go +++ b/internal/serialization/producer/models.go @@ -5,6 +5,9 @@ import ( "encoding/binary" "fmt" "hash/crc32" + "io" + + . "github.com/polarstreams/go-client/internal/types" ) type OpCode uint8 @@ -14,6 +17,7 @@ type ErrorCode uint8 // The only responses with body are errors, leave 511 for the error message const ResponseBodyMaxLength = 512 +const noStreamId = 0xFFFFFFFF var Endianness = binary.BigEndian @@ -54,14 +58,11 @@ type BinaryHeader struct { var HeaderSize = binarySize(BinaryHeader{}) type BinaryRequest interface { - Marshal(w *bytes.Buffer) error - BodyLength() int + Marshal(w *bytes.Buffer, header *BinaryHeader) error - // Sets the stream id in a thread safe manner - SetStreamId(id StreamId) + BodyLength() int - // Gets the stream id in a thread safe manner - StreamId() *StreamId + StreamId() StreamId } type BinaryResponse interface { @@ -122,24 +123,59 @@ func WriteHeader(w *bytes.Buffer, header *BinaryHeader) error { return nil } +func WriteString(w *bytes.Buffer, value string) error { + if err := w.WriteByte(byte(len(value))); err != nil { + return err + } + _, err := w.Write([]byte(value)) + return err +} + type ProduceRequest struct { + topic string + message FixedLengthReader + partitionKey string + streamId StreamId +} + +func NewProduceRequest(streamId StreamId, topic string, message FixedLengthReader, partitionKey string) BinaryRequest { + return &ProduceRequest{ + topic: topic, + message: message, + partitionKey: partitionKey, + streamId: streamId, + } } -func (r *ProduceRequest) Marshal(w *bytes.Buffer) error { - // TODO: IMPLEMENT - return nil +func (r *ProduceRequest) Marshal(w *bytes.Buffer, header *BinaryHeader) error { + header.StreamId = r.streamId + header.Op = ProduceOp + header.BodyLength = uint32(r.BodyLength()) + if err := WriteHeader(w, header); err != nil { + return err + } + if err := WriteString(w, r.partitionKey); err != nil { + return err + } + if err := WriteString(w, r.topic); err != nil { + return err + } + if err := binary.Write(w, Endianness, uint32(r.message.Len())); err != nil { + return err + } + + // Reader.WriteTo() should kick in + _, err := io.Copy(w, r.message) + return err } func (r *ProduceRequest) BodyLength() int { - // TODO: IMPLEMENT - return 0 + // optional timestamp μs (int64) | partition key length (uint8) | partition key (bytes) + // topic length (uint8) | topic name (bytes) + // message 0 length (uint32) | message 0 (bytes) + return 1 + len(r.partitionKey) + 1 + len(r.topic) + 4 + r.message.Len() } -func (r *ProduceRequest) SetStreamId(id StreamId) { - // TODO: IMPLEMENT +func (r *ProduceRequest) StreamId() StreamId { + return r.streamId } - -func (r *ProduceRequest) StreamId() *StreamId { - // TODO: IMPLEMENT - return nil -} \ No newline at end of file diff --git a/internal/test/fakes/connection.go b/internal/test/fakes/connection.go index f325b41..1b262b9 100644 --- a/internal/test/fakes/connection.go +++ b/internal/test/fakes/connection.go @@ -23,11 +23,11 @@ func (c *Connection) Close() error { } func (c *Connection) LocalAddr() net.Addr { - return nil + return &net.TCPAddr{} } func (c *Connection) RemoteAddr() net.Addr { - return nil + return &net.TCPAddr{} } func (c *Connection) SetDeadline(t time.Time) error { diff --git a/internal/types/types.go b/internal/types/types.go new file mode 100644 index 0000000..090043b --- /dev/null +++ b/internal/types/types.go @@ -0,0 +1,10 @@ +package types + +import "io" + +type FixedLengthReader interface { + io.ReadSeeker + + // Len returns the number of bytes of the unread portion of the reader. + Len() int +} diff --git a/internal/utils/utils.go b/internal/utils/utils.go index f719da6..74917f7 100644 --- a/internal/utils/utils.go +++ b/internal/utils/utils.go @@ -4,6 +4,8 @@ import ( "bytes" "io" "net/http" + + "github.com/polarstreams/go-client/internal/types" ) func ReadBody(resp *http.Response) (string, error) { @@ -20,8 +22,8 @@ func PanicIfErr(err error) { // Tries to reuse a buffer-backed reader. // Otherwise, it creates a new Reader from the buffered data. -func ToReadSeeker(r io.Reader) io.ReadSeeker { - readSeeker, ok := r.(io.ReadSeeker) +func ToReadSeeker(r io.Reader) types.FixedLengthReader { + readSeeker, ok := r.(types.FixedLengthReader) if ok { return readSeeker } diff --git a/producer.go b/producer.go index 80ed5bd..b3746fc 100644 --- a/producer.go +++ b/producer.go @@ -1,12 +1,9 @@ package polar import ( - "fmt" "io" - "net/http" . "github.com/polarstreams/go-client/internal" - "github.com/polarstreams/go-client/internal/utils" "github.com/polarstreams/go-client/types" ) @@ -36,7 +33,10 @@ func NewProducer(serviceUrl string) (Producer, error) { func fromProducerOptions(o *types.ProducerOptions) *ClientOptions { return &ClientOptions{ - Logger: o.Logger, + Logger: o.Logger, + ProducerInitialize: true, + ProducerFlushThresholdBytes: o.FlushThresholdBytes, + ProducerConnectionsPerHost: o.ConnectionsPerBroker, } } @@ -49,10 +49,6 @@ func NewProducerWithOpts(serviceUrl string, options types.ProducerOptions) (Prod return nil, err } - if err := client.Connect(); err != nil { - return nil, err - } - return &producer{ client: client, }, nil @@ -63,18 +59,7 @@ type producer struct { } func (p *producer) Send(topic string, message io.Reader, partitionKey string) error { - resp, err := p.client.ProduceJson(topic, message, partitionKey) - if err != nil { - return err - } - body, err := utils.ReadBody(resp) - if err != nil { - return err - } - if resp.StatusCode >= http.StatusOK && resp.StatusCode < http.StatusMultipleChoices { - return nil - } - return fmt.Errorf(body) + return p.client.ProduceJson(topic, message, partitionKey) } func (p *producer) BrokersLength() int { diff --git a/types/models.go b/types/models.go index 21957e0..8955202 100644 --- a/types/models.go +++ b/types/models.go @@ -79,7 +79,14 @@ type ConsumerOptions struct { // Represents the additional options to set when creating a Producer. type ProducerOptions struct { + // The logger for the producer to output internal log messages Logger Logger + + // The amount of bytes to coalesce internally before flushing the frames to the TCP connection (defaults to 16 KiB) + FlushThresholdBytes int + + // The amount of persistent connections to maintain per broker (defaults to 1) + ConnectionsPerBroker int } // An error that includes the origin of the error From da000b67fff7f2f7f5d3571c566753b9ed830a71 Mon Sep 17 00:00:00 2001 From: Jorge Bay Date: Fri, 17 Feb 2023 11:45:24 +0100 Subject: [PATCH 4/9] fix: Return the stream id --- internal/connection.go | 3 +++ internal/producer_client.go | 1 - 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/internal/connection.go b/internal/connection.go index d391cdd..545ac2f 100644 --- a/internal/connection.go +++ b/internal/connection.go @@ -167,6 +167,9 @@ func (c *connection) receiveResponses() { } handler(response) + + // StreamId can be reused + c.streamIds <- header.StreamId } c.Close() diff --git a/internal/producer_client.go b/internal/producer_client.go index 31d83be..183b79f 100644 --- a/internal/producer_client.go +++ b/internal/producer_client.go @@ -80,7 +80,6 @@ func (c *producerClient) Topology() *Topology { func (c *producerClient) IsProducerUp(ordinal int) bool { pool := c.getPool(ordinal) - fmt.Println("--Producer client pool for B", ordinal, len(pool.connections)) return pool != nil && pool.IsConnected() } From b073c1ec312987049a7855d48b1b7079481d1b2d Mon Sep 17 00:00:00 2001 From: Jorge Bay Date: Mon, 20 Feb 2023 11:14:58 +0100 Subject: [PATCH 5/9] Coalesce multiple parts into a single request --- internal/client_test.go | 4 - internal/connection.go | 92 +++++++++++++++-------- internal/connection_test.go | 13 ++-- internal/serialization/producer/models.go | 80 ++++++++++++++------ 4 files changed, 127 insertions(+), 62 deletions(-) diff --git a/internal/client_test.go b/internal/client_test.go index 5b360eb..f67016f 100644 --- a/internal/client_test.go +++ b/internal/client_test.go @@ -272,8 +272,6 @@ var _ = Describe("Client", func() { shutdown0() time.Sleep(additionalTestDelay) - fmt.Println("--Shutdown B0") - produceJson(client, `{"key0": "value0_1"}`, partitionKeyT0Range) Expect(drainChan(c1)).To(Equal([]produceRequest{{ topic: topicName, @@ -285,10 +283,8 @@ var _ = Describe("Client", func() { Expect(client.isProducerUp(1)).To(BeTrue()) Expect(client.isProducerUp(2)).To(BeTrue()) - fmt.Println("--Creating server") shutdown0, c0 = NewProducerServerWithChannel("127.0.0.1:8093") time.Sleep(reconnectionDelay + additionalTestDelay) - fmt.Println("--Checking back online") Expect(client.isProducerUp(0)).To(BeTrue()) produceJson(client, `{"key0": "value0_2"}`, partitionKeyT0Range) diff --git a/internal/connection.go b/internal/connection.go index 545ac2f..b932e70 100644 --- a/internal/connection.go +++ b/internal/connection.go @@ -25,7 +25,7 @@ type connection struct { closeOnce sync.Once handlers sync.Map streamIds chan StreamId - requests chan BinaryRequest + requests chan *ProduceRequestPart logger Logger flushThreshold int } @@ -60,7 +60,7 @@ func newConnection(address string, h disconnectHandler, flushThreshold int, logg logger: logger, streamIds: make(chan StreamId, maxStreamIds), flushThreshold: flushThreshold, - requests: make(chan BinaryRequest, 128), + requests: make(chan *ProduceRequestPart, 512), } // Reserve StreamId(0) for the Startup message @@ -97,17 +97,12 @@ func (c *connection) Send(topic string, message FixedLengthReader, partitionKey } }() - streamId := <-c.streamIds - req := producer.NewProduceRequest(streamId, topic, message, partitionKey) - response := make(chan BinaryResponse, 1) - c.handlers.Store(streamId, func(r BinaryResponse) { - response <- r - }) + reqPart := producer.NewProduceRequestPart(topic, message, partitionKey) // Append the request, it might panic when requests channel is closed - c.requests <- req + c.requests <- reqPart // Wait for the response - resp = <-response + resp = <-reqPart.Response return resp } @@ -119,6 +114,13 @@ func (c *connection) Close() { c.disconnectHandler.OnConnectionClose(c) }) + notSentErr := NewClientErrorResponse("Request could not be sent: connection closed") + + // Dequeue remaining request parts + for p := range c.requests { + p.Response <- notSentErr + } + toDelete := make([]StreamId, 0) c.handlers.Range(func(key, value interface{}) bool { toDelete = append(toDelete, key.(StreamId)) @@ -131,7 +133,7 @@ func (c *connection) Close() { continue } handler := h.(func(BinaryResponse)) - handler(NewClientErrorResponse("Request could not be sent: connection closed")) + handler(notSentErr) } } @@ -184,21 +186,19 @@ func (c *connection) getHandler(id StreamId) streamHandler { return h.(func(BinaryResponse)) } -func totalRequestSize(r BinaryRequest) int { - return r.BodyLength() + HeaderSize -} - func (c *connection) sendRequests() { w := bytes.NewBuffer(make([]byte, c.flushThreshold)) header := &BinaryHeader{Version: 1, Flags: 0} // Reuse allocation shouldExit := false - var item BinaryRequest - var group []BinaryRequest + var item *ProduceRequestPart + var group [][]*ProduceRequestPart + handledGroupIndex := -1 for !shouldExit { w.Reset() + handledGroupIndex = 0 groupSize := 0 - group = make([]BinaryRequest, 0) + group = make([][]*ProduceRequestPart, 0) canAddNext := true if item == nil { @@ -210,33 +210,44 @@ func (c *connection) sendRequests() { } } - group = append(group, item) - groupSize += totalRequestSize(item) + group = appendToGroup(group, item) + groupSize += item.Message.Len() item = nil // Coalesce requests w/ Nagle disabled for canAddNext && !shouldExit { select { - case request, ok := <-c.requests: + case reqPart, ok := <-c.requests: if !ok { shouldExit = true break } - requestSize := totalRequestSize(request) - if groupSize+requestSize > c.flushThreshold { + partSize := reqPart.Message.Len() + if groupSize+partSize >= c.flushThreshold { canAddNext = false - item = request + item = reqPart break } - group = append(group, request) - groupSize += requestSize + group = appendToGroup(group, reqPart) + groupSize += partSize default: canAddNext = false } } - for _, request := range group { + for i, parts := range group { + handledGroupIndex = i + streamId := <-c.streamIds + request := NewProduceRequest(streamId, parts) + // Capture only the channels + channels := request.ResponseChannels() + c.handlers.Store(streamId, func(r BinaryResponse) { + for _, responseChan := range channels { + responseChan <- r + } + }) + if err := request.Marshal(w, header); err != nil { c.logger.Error("Error marshaling a request, closing connection: %s", err) shouldExit = true @@ -247,21 +258,40 @@ func (c *connection) sendRequests() { if w.Len() > 0 { if _, err := c.conn.Write(w.Bytes()); err != nil { c.logger.Warn("There was an error while writing to a producer server, closing connection: %s", err) + shouldExit = true break } } } // Close in-flight group - for _, request := range group { - streamId := request.StreamId() - handler := c.getHandler(streamId) - handler(NewClientErrorResponse("Error while sending request")) + for i := handledGroupIndex + 1; i < len(group); i++ { + parts := group[i] + for _, p := range parts { + p.Response <- NewClientErrorResponse("Error while sending request") + } } c.Close() } +// TODO: TEST INDEPENDENTLY +func appendToGroup(group [][]*ProduceRequestPart, part *ProduceRequestPart) [][]*ProduceRequestPart { + // This could benefit from a linked list + length := len(group) + if length == 0 { + return append(group, []*ProduceRequestPart{part}) + } + + // Check the last element + lastPart := group[length-1][0] + if lastPart.Topic == part.Topic && lastPart.PartitionKey == part.PartitionKey { + group[length-1] = append(group[length-1], part) + return group + } + return append(group, []*ProduceRequestPart{part}) +} + type connectionSet map[*connection]bool type disconnectHandler interface { diff --git a/internal/connection_test.go b/internal/connection_test.go index aece56a..ded20de 100644 --- a/internal/connection_test.go +++ b/internal/connection_test.go @@ -17,7 +17,7 @@ var _ = Describe("connection", func() { It("should recover from closed request channel", func() { c := &connection{ streamIds: make(chan StreamId, 10), - requests: make(chan BinaryRequest, 10), + requests: make(chan *ProduceRequestPart, 10), logger: StdLogger, } @@ -29,10 +29,10 @@ var _ = Describe("connection", func() { Expect(resp).To(Equal(NewClientErrorResponse("Request could not be sent: connection closed"))) }) - It("should return when handler is invoked", func() { + It("should return when channel has data", func() { c := &connection{ streamIds: make(chan StreamId, 10), - requests: make(chan BinaryRequest, 10), + requests: make(chan *ProduceRequestPart, 10), logger: StdLogger, } @@ -46,8 +46,9 @@ var _ = Describe("connection", func() { r <- c.Send("topic1", bytes.NewReader([]byte("abc")), "key1") }() wg.Wait() - handler := c.getHandler(0) - handler(NewEmptyResponse(ProduceResponseOp)) + + part := <-c.requests + part.Response <- NewEmptyResponse(ProduceResponseOp) select { case resp := <-r: @@ -62,7 +63,7 @@ var _ = Describe("connection", func() { It("should invoke pending handlers", func() { c := &connection{ streamIds: make(chan StreamId, 10), - requests: make(chan BinaryRequest, 10), + requests: make(chan *ProduceRequestPart, 10), logger: StdLogger, conn: &fakes.Connection{}, disconnectHandler: &fakeDisconnectHandler{}, diff --git a/internal/serialization/producer/models.go b/internal/serialization/producer/models.go index e3d938a..5c163f4 100644 --- a/internal/serialization/producer/models.go +++ b/internal/serialization/producer/models.go @@ -60,7 +60,7 @@ var HeaderSize = binarySize(BinaryHeader{}) type BinaryRequest interface { Marshal(w *bytes.Buffer, header *BinaryHeader) error - BodyLength() int + ResponseChannels() []chan<- BinaryResponse StreamId() StreamId } @@ -131,49 +131,87 @@ func WriteString(w *bytes.Buffer, value string) error { return err } +// Represents a part of a potential produce request +type ProduceRequestPart struct { + Topic string + Message FixedLengthReader + PartitionKey string + Response chan BinaryResponse +} + +func NewProduceRequestPart( + topic string, + message FixedLengthReader, + partitionKey string, +) *ProduceRequestPart { + return &ProduceRequestPart{ + Topic: topic, + Message: message, + PartitionKey: partitionKey, + Response: make(chan BinaryResponse, 1), + } +} + type ProduceRequest struct { - topic string - message FixedLengthReader - partitionKey string - streamId StreamId + streamId StreamId + parts []*ProduceRequestPart } -func NewProduceRequest(streamId StreamId, topic string, message FixedLengthReader, partitionKey string) BinaryRequest { +func NewProduceRequest(streamId StreamId, parts []*ProduceRequestPart) BinaryRequest { return &ProduceRequest{ - topic: topic, - message: message, - partitionKey: partitionKey, - streamId: streamId, + streamId: streamId, + parts: parts, } } func (r *ProduceRequest) Marshal(w *bytes.Buffer, header *BinaryHeader) error { header.StreamId = r.streamId header.Op = ProduceOp - header.BodyLength = uint32(r.BodyLength()) + + firstPart := r.parts[0] + header.BodyLength = uint32(r.bodyLength()) if err := WriteHeader(w, header); err != nil { return err } - if err := WriteString(w, r.partitionKey); err != nil { + if err := WriteString(w, firstPart.PartitionKey); err != nil { return err } - if err := WriteString(w, r.topic); err != nil { - return err - } - if err := binary.Write(w, Endianness, uint32(r.message.Len())); err != nil { + if err := WriteString(w, firstPart.Topic); err != nil { return err } - // Reader.WriteTo() should kick in - _, err := io.Copy(w, r.message) - return err + for _, part := range r.parts { + if err := binary.Write(w, Endianness, uint32(part.Message.Len())); err != nil { + return err + } + + // Reader.WriteTo() should kick in + if _, err := io.Copy(w, part.Message); err != nil { + return err + } + } + return nil } -func (r *ProduceRequest) BodyLength() int { +func (r *ProduceRequest) bodyLength() int { // optional timestamp μs (int64) | partition key length (uint8) | partition key (bytes) // topic length (uint8) | topic name (bytes) // message 0 length (uint32) | message 0 (bytes) - return 1 + len(r.partitionKey) + 1 + len(r.topic) + 4 + r.message.Len() + + firstPart := r.parts[0] + total := 1 + len(firstPart.PartitionKey) + 1 + len(firstPart.Topic) + for _, p := range r.parts { + total += 4 + p.Message.Len() + } + return total +} + +func (r *ProduceRequest) ResponseChannels() []chan<- BinaryResponse { + channels := make([]chan<- BinaryResponse, len(r.parts)) + for i := 0; i < len(r.parts); i++ { + channels[i] = r.parts[i].Response + } + return channels } func (r *ProduceRequest) StreamId() StreamId { From 051b496e6a6c28a749584bea44b123fef59279de Mon Sep 17 00:00:00 2001 From: Jorge Bay Date: Mon, 20 Feb 2023 13:04:50 +0100 Subject: [PATCH 6/9] fix: Receiving before sending on startup --- internal/connection.go | 14 ++++++++------ internal/serialization/producer/models.go | 3 ++- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/internal/connection.go b/internal/connection.go index b932e70..54f2dcb 100644 --- a/internal/connection.go +++ b/internal/connection.go @@ -68,13 +68,15 @@ func newConnection(address string, h disconnectHandler, flushThreshold int, logg c.streamIds <- i } - go c.receiveResponses() - response := make(chan BinaryResponse, 1) c.handlers.Store(StreamId(0), func(r BinaryResponse) { response <- r }) + // Start receiving + go c.receiveResponses() + + // Wait for the startup response r := <-response if r.Op() != ReadyOp { _ = conn.Close() @@ -149,7 +151,7 @@ func (c *connection) receiveResponses() { break } - handler := c.getHandler(header.StreamId) + handler := c.getHandler(header) var response BinaryResponse if header.BodyLength == 0 { response = NewEmptyResponse(header.Op) @@ -178,10 +180,10 @@ func (c *connection) receiveResponses() { } // Gets and deletes the handler from the pending handlers -func (c *connection) getHandler(id StreamId) streamHandler { - h, loaded := c.handlers.LoadAndDelete(id) +func (c *connection) getHandler(header *BinaryHeader) streamHandler { + h, loaded := c.handlers.LoadAndDelete(header.StreamId) if !loaded { - panic(fmt.Sprintf("No handler for stream id %d", id)) + panic(fmt.Sprintf("No handler for stream id %d (v %d, op %d)", header.StreamId, header.Version, header.Op)) } return h.(func(BinaryResponse)) } diff --git a/internal/serialization/producer/models.go b/internal/serialization/producer/models.go index 5c163f4..9669e46 100644 --- a/internal/serialization/producer/models.go +++ b/internal/serialization/producer/models.go @@ -148,7 +148,7 @@ func NewProduceRequestPart( Topic: topic, Message: message, PartitionKey: partitionKey, - Response: make(chan BinaryResponse, 1), + Response: make(chan BinaryResponse, 2), } } @@ -197,6 +197,7 @@ func (r *ProduceRequest) bodyLength() int { // optional timestamp μs (int64) | partition key length (uint8) | partition key (bytes) // topic length (uint8) | topic name (bytes) // message 0 length (uint32) | message 0 (bytes) + // message n length (uint32) | message n (bytes) firstPart := r.parts[0] total := 1 + len(firstPart.PartitionKey) + 1 + len(firstPart.Topic) From 1229f3f6fc362f8908b6a7c8e23410961723ed6d Mon Sep 17 00:00:00 2001 From: Jorge Bay Date: Mon, 6 Mar 2023 10:41:32 +0100 Subject: [PATCH 7/9] More tests and fixes --- build/compose/docker-compose-cluster.yml | 6 ++--- build/compose/docker-compose-dev.yml | 2 +- internal/connection.go | 1 - internal/connection_test.go | 27 ++++++++++++++++++++++ internal/test/integration/producer_test.go | 18 +++++++++++++++ 5 files changed, 49 insertions(+), 5 deletions(-) diff --git a/build/compose/docker-compose-cluster.yml b/build/compose/docker-compose-cluster.yml index 13f440e..f1a6121 100644 --- a/build/compose/docker-compose-cluster.yml +++ b/build/compose/docker-compose-cluster.yml @@ -10,7 +10,7 @@ services: depends_on: - polar_0 polar_0: - image: "polarstreams/polar:dev4" + image: "polarstreams/polar:dev5" env_file: "../env/test.env" environment: - POLAR_BROKER_NAMES=polar_0,polar_1,polar_2 @@ -20,13 +20,13 @@ services: interval: 1s start_period: 10s polar_1: - image: "polarstreams/polar:dev4" + image: "polarstreams/polar:dev5" env_file: "../env/test.env" environment: - POLAR_BROKER_NAMES=polar_0,polar_1,polar_2 - POLAR_ORDINAL=1 polar_2: - image: "polarstreams/polar:dev4" + image: "polarstreams/polar:dev5" env_file: "../env/test.env" environment: - POLAR_BROKER_NAMES=polar_0,polar_1,polar_2 diff --git a/build/compose/docker-compose-dev.yml b/build/compose/docker-compose-dev.yml index 52aa7e6..772a330 100644 --- a/build/compose/docker-compose-dev.yml +++ b/build/compose/docker-compose-dev.yml @@ -7,7 +7,7 @@ services: depends_on: - polar polar: - image: "polarstreams/polar:dev4" + image: "polarstreams/polar:dev5" env_file: "../env/test.env" environment: - POLAR_DEV_MODE=true diff --git a/internal/connection.go b/internal/connection.go index 54f2dcb..863c464 100644 --- a/internal/connection.go +++ b/internal/connection.go @@ -277,7 +277,6 @@ func (c *connection) sendRequests() { c.Close() } -// TODO: TEST INDEPENDENTLY func appendToGroup(group [][]*ProduceRequestPart, part *ProduceRequestPart) [][]*ProduceRequestPart { // This could benefit from a linked list length := len(group) diff --git a/internal/connection_test.go b/internal/connection_test.go index ded20de..138f50b 100644 --- a/internal/connection_test.go +++ b/internal/connection_test.go @@ -95,6 +95,33 @@ var _ = Describe("connection", func() { } }) }) + + Describe("appendToGroup", func() { + It("should append to last group", func() { + part1 := NewProduceRequestPart("t1", nil, "k1") + part2 := NewProduceRequestPart("t1", nil, "k1") + part3 := NewProduceRequestPart("t1", nil, "k1") + part4 := NewProduceRequestPart("t2", nil, "k1") + part5 := NewProduceRequestPart("t1", nil, "k2") + part6 := NewProduceRequestPart("t1", nil, "") + part7 := NewProduceRequestPart("t1", nil, "") + + group := appendToGroup([][]*ProduceRequestPart{}, part1) + group = appendToGroup(group, part2) + group = appendToGroup(group, part3) + Expect(group).To(HaveLen(1)) + Expect(group[0]).To(HaveLen(3)) + group = appendToGroup(group, part4) + Expect(group).To(HaveLen(2)) + Expect(group[1]).To(HaveLen(1)) + group = appendToGroup(group, part5) + Expect(group).To(HaveLen(3)) + group = appendToGroup(group, part6) + group = appendToGroup(group, part7) + Expect(group).To(HaveLen(4)) + Expect(group[3]).To(HaveLen(2)) + }) + }) }) type fakeDisconnectHandler struct{} diff --git a/internal/test/integration/producer_test.go b/internal/test/integration/producer_test.go index 935bae7..2741048 100644 --- a/internal/test/integration/producer_test.go +++ b/internal/test/integration/producer_test.go @@ -54,6 +54,24 @@ var _ = Describe("Producer", func () { Expect(err).NotTo(HaveOccurred()) Expect(producer.BrokersLength()).To(Equal(expectedBrokers)) }) + + It("should send multiple requests in parallel", func () { + producer := newTestProducer(fmt.Sprintf("polar://%s", host)) + defer producer.Close() + + const total = 64 + responses := make(chan error, total) + + for i := 0; i < total; i++ { + go func(v int) { + responses <- producer.Send(topic, strings.NewReader(fmt.Sprintf(`{"hello": %03d}`, v)), partitionKeyT0Range) + }(i) + } + + for i := 0; i < total; i++ { + Expect(<-responses).NotTo(HaveOccurred()) + } + }) }) func newTestProducer(serviceUrl string) Producer { From 0109f1fdb9bff897a17e97fd54e57de4b7e1bafa Mon Sep 17 00:00:00 2001 From: Jorge Bay Date: Fri, 17 Mar 2023 11:09:05 +0100 Subject: [PATCH 8/9] fix: Default producer flush threshold --- internal/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/client.go b/internal/client.go index 2bb33cd..26d2c06 100644 --- a/internal/client.go +++ b/internal/client.go @@ -635,6 +635,6 @@ func setDefaultOptions(options *ClientOptions) { options.ProducerConnectionsPerHost = 1 } if options.ProducerFlushThresholdBytes == 0 { - options.ProducerFlushThresholdBytes = 16 * 1024 + options.ProducerFlushThresholdBytes = 64 * 1024 } } From 7756d07778af4535f764110df414b771416d09fc Mon Sep 17 00:00:00 2001 From: Jorge Bay Date: Sun, 2 Apr 2023 19:26:51 +0200 Subject: [PATCH 9/9] fix: Wait for a small amount of time while coalescing --- internal/connection.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/internal/connection.go b/internal/connection.go index 863c464..55c2602 100644 --- a/internal/connection.go +++ b/internal/connection.go @@ -216,7 +216,9 @@ func (c *connection) sendRequests() { groupSize += item.Message.Len() item = nil - // Coalesce requests w/ Nagle disabled + // Coalesce waiting for new data for a brief period + timeout := time.After(500 * time.Microsecond) + for canAddNext && !shouldExit { select { case reqPart, ok := <-c.requests: @@ -232,8 +234,7 @@ func (c *connection) sendRequests() { } group = appendToGroup(group, reqPart) groupSize += partSize - - default: + case <-timeout: canAddNext = false } }