Skip to content

Commit

Permalink
Merge pull request #28 from polarstreams/binary-producer
Browse files Browse the repository at this point in the history
Binary producer client
  • Loading branch information
jorgebay authored Apr 14, 2023
2 parents c7d5190 + 7756d07 commit cf09018
Show file tree
Hide file tree
Showing 19 changed files with 1,448 additions and 200 deletions.
6 changes: 3 additions & 3 deletions build/compose/docker-compose-cluster.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ services:
depends_on:
- polar_0
polar_0:
image: "polarstreams/polar:dev3"
image: "polarstreams/polar:dev5"
env_file: "../env/test.env"
environment:
- POLAR_BROKER_NAMES=polar_0,polar_1,polar_2
Expand All @@ -20,13 +20,13 @@ services:
interval: 1s
start_period: 10s
polar_1:
image: "polarstreams/polar:dev3"
image: "polarstreams/polar:dev5"
env_file: "../env/test.env"
environment:
- POLAR_BROKER_NAMES=polar_0,polar_1,polar_2
- POLAR_ORDINAL=1
polar_2:
image: "polarstreams/polar:dev3"
image: "polarstreams/polar:dev5"
env_file: "../env/test.env"
environment:
- POLAR_BROKER_NAMES=polar_0,polar_1,polar_2
Expand Down
2 changes: 1 addition & 1 deletion build/compose/docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ services:
depends_on:
- polar
polar:
image: "polarstreams/polar:dev3"
image: "polarstreams/polar:dev5"
env_file: "../env/test.env"
environment:
- POLAR_DEV_MODE=true
Expand Down
6 changes: 1 addition & 5 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"fmt"
"os"

"github.com/google/uuid"
. "github.com/polarstreams/go-client/internal"
. "github.com/polarstreams/go-client/types"
"github.com/google/uuid"
)

// Represents a PolarStreams client that reads records from a cluster.
Expand Down Expand Up @@ -65,10 +65,6 @@ func NewConsumerWithOpts(serviceUrl string, options ConsumerOptions) (Consumer,
return nil, err
}

if err := client.Connect(); err != nil {
return nil, err
}

client.RegisterAsConsumer(options)

c := &consumer{
Expand Down
58 changes: 58 additions & 0 deletions internal/backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package internal

import (
"math"
"time"
)

const (
baseReconnectionDelayMs = 20
maxReconnectionDelayMs = 30_000
)

type BackoffPolicy interface {
Reset()

// Returns the next delay
Next() time.Duration
}

// Represents a default reconnection strategy
type exponentialBackoff struct {
index int
}

func newExponentialBackoff() *exponentialBackoff {
return &exponentialBackoff{}
}

func (p *exponentialBackoff) Reset() {
p.index = 0
}

// Returns an exponential delay
func (p *exponentialBackoff) Next() time.Duration {
p.index++

delayMs := maxReconnectionDelayMs
if p.index < 53 {
delayMs = int(math.Pow(2, float64(p.index))) * baseReconnectionDelayMs
if delayMs > maxReconnectionDelayMs {
delayMs = maxReconnectionDelayMs
}
}

return time.Duration(delayMs) * time.Millisecond
}

type fixedBackoff struct {
delay time.Duration
}

func (p *fixedBackoff) Reset() {
}

// Returns an exponential delay
func (p *fixedBackoff) Next() time.Duration {
return p.delay
}
126 changes: 73 additions & 53 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ import (
"net"
"net/http"
"net/url"
"reflect"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/polarstreams/go-client/internal/serialization"
"github.com/polarstreams/go-client/internal/serialization/producer"
"github.com/polarstreams/go-client/internal/utils"
. "github.com/polarstreams/go-client/types"
"golang.org/x/net/context"
Expand All @@ -27,7 +29,7 @@ const DefaultTopologyPollInterval = 10 * time.Second
const defaultPollReqInterval = 5 * time.Second
const baseReconnectionDelay = 100 * time.Millisecond
const maxReconnectionDelay = 2 * time.Minute
const maxOrdinal = 1 << 31
const maxAtomicIncrement = 1 << 31
const defaultDiscoveryPort = 9250
const producerMaxConnsPerHost = 1

Expand All @@ -40,7 +42,7 @@ const (

type Client struct {
discoveryClient *http.Client
producerClient *http.Client
producerClient *producerClient
consumerClient *http.Client
discoveryUrl string // The full discovery url, like http://host:port/path
discoveryHost string // The host and port
Expand All @@ -59,9 +61,12 @@ type Client struct {
}

type ClientOptions struct {
Logger Logger
TopologyPollInterval time.Duration
FixedReconnectionDelay time.Duration
Logger Logger
TopologyPollInterval time.Duration
FixedReconnectionDelay time.Duration
ProducerInitialize bool
ProducerFlushThresholdBytes int
ProducerConnectionsPerHost int
}

var jitterRng = rand.New(rand.NewSource(time.Now().UnixNano()))
Expand Down Expand Up @@ -111,15 +116,27 @@ func NewClient(serviceUrl string, options *ClientOptions) (*Client, error) {
fixedReconnectionDelay: options.FixedReconnectionDelay,
}

client.producerClient = &http.Client{
Transport: &http.Transport{
MaxConnsPerHost: producerMaxConnsPerHost,
MaxIdleConnsPerHost: producerMaxConnsPerHost,
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
return client.dial(network, addr, client.getProducerStatus(addr))
},
},
Timeout: 1 * time.Second,
topology, err := client.queryTopology()
if err != nil {
return nil, err
}
options.Logger.Info("Discovered cluster composed of %d brokers", topology.Length)

if topology.ProducerBinaryPort == 0 {
return nil, fmt.Errorf("Invalid server version, make sure you use the latest polar version")
}

client.topology.Store(topology)
go client.pollTopology()

if options.ProducerInitialize {
// The producer client eagerly creates connections
client.producerClient = newProducerClient(
topology,
options.ProducerConnectionsPerHost,
options.ProducerFlushThresholdBytes,
options.FixedReconnectionDelay,
options.Logger)
}

client.consumerClient = &http.Client{
Expand All @@ -143,7 +160,7 @@ func NewClient(serviceUrl string, options *ClientOptions) (*Client, error) {
}

func (c *Client) dial(network string, addr string, brokerStatus *BrokerStatusInfo) (*utils.TrackedConnection, error) {
c.logger.Info("Creating new connection to %s", addr)
c.logger.Info("Creating new consumer connection to %s", addr)
conn, err := net.Dial(network, addr)
if err != nil {
c.logger.Warn("Connection to %s could not be established", addr)
Expand All @@ -167,22 +184,8 @@ func (c *Client) dial(network string, addr string, brokerStatus *BrokerStatusInf
return tc, nil
}

// Gets the topology the first time and starts the loop for polling for changes.
//
// Close() should be called to stop polling.
func (c *Client) Connect() error {
topology, err := c.queryTopology()
if err != nil {
return err
}

c.topology.Store(topology)
go c.pollTopology()
return nil
}

func (c *Client) isProducerUp(ordinal int, t *Topology) bool {
return c.getProducerStatusByOrdinal(ordinal, t).IsUp()
func (c *Client) isProducerUp(ordinal int) bool {
return c.producerClient.IsProducerUp(ordinal)
}

func (c *Client) getProducerStatusByOrdinal(ordinal int, t *Topology) *BrokerStatusInfo {
Expand Down Expand Up @@ -296,11 +299,17 @@ func (c *Client) pollTopology() {
time.Sleep(jitter(c.topologyPollInterval))
newTopology, err := c.queryTopology()
if err != nil {
// TODO: Use logging
c.logger.Warn("Error while trying to get the topology: %s", err)
continue
}

c.topology.Store(newTopology)
currentTopology := c.Topology()
if !reflect.DeepEqual(*newTopology, *currentTopology) {
c.topology.Store(newTopology)
c.logger.Info("Topology changed: %d brokers", newTopology.Length)
if c.producerClient != nil {
c.producerClient.OnNewTopology(newTopology)
}
}
}
}

Expand Down Expand Up @@ -336,14 +345,15 @@ func (c *Client) useDiscoveryHostForBroker(t *Topology) Topology {
}

return Topology{
Length: 1,
BrokerNames: []string{name},
ProducerPort: t.ProducerPort,
ConsumerPort: t.ConsumerPort,
Length: 1,
BrokerNames: []string{name},
ProducerPort: t.ProducerPort,
ConsumerPort: t.ConsumerPort,
ProducerBinaryPort: t.ProducerBinaryPort,
}
}

func (c *Client) ProduceJson(topic string, message io.Reader, partitionKey string) (*http.Response, error) {
func (c *Client) ProduceJson(topic string, message io.Reader, partitionKey string) error {
t := c.Topology()
ordinal := 0
if partitionKey == "" {
Expand All @@ -356,36 +366,38 @@ func (c *Client) ProduceJson(topic string, message io.Reader, partitionKey strin
initialPosition, err := bufferedMessage.Seek(0, io.SeekCurrent)
if err != nil {
// Seeking current position should be a safe operation, in any case, error out
return nil, err
return err
}

maxAttempts := int(math.Min(float64(t.Length), 4))
var lastErr error
for i := 0; i < maxAttempts; i++ {
if i > 0 {
// Rewind the reader
_, err := bufferedMessage.Seek(initialPosition, io.SeekStart)
if err != nil {
return nil, err
return err
}
}
brokerOrdinal := (ordinal + i) % t.Length

if !c.isProducerUp(brokerOrdinal, t) {
if !c.producerClient.IsProducerUp(brokerOrdinal) {
c.logger.Debug("B%d is down, moving to next host", brokerOrdinal)
lastErr = fmt.Errorf("Broker B%d is down", brokerOrdinal)
}

url := t.ProducerUrl(topic, brokerOrdinal, partitionKey)
req, err := http.NewRequest(http.MethodPost, url, bufferedMessage)
if err != nil {
return nil, err
resp := c.producerClient.Send(brokerOrdinal, topic, bufferedMessage, partitionKey)
if resp.Op() == producer.ProduceResponseOp {
// Success
return nil
}
req.Header.Set("Content-Type", contentType)
resp, err := c.producerClient.Do(req)
if err == nil {
return resp, nil
if resp.Op() == producer.ErrorOp {
lastErr = resp.(*producer.ErrorResponse).ToError()
continue
}
lastErr = fmt.Errorf("Response op is not valid: %d", resp.Op())
}
return nil, fmt.Errorf("No broker available: attempted %d brokers", maxAttempts)
return fmt.Errorf("No broker available: %d attempted, last error: %s", maxAttempts, lastErr)
}

func (c *Client) RegisterAsConsumer(options ConsumerOptions) {
Expand Down Expand Up @@ -574,8 +586,10 @@ func (c *Client) Close() {
c.logger.Info("PolarStreams client closing")
atomic.StoreInt64(&c.isClosing, 1)
c.discoveryClient.CloseIdleConnections()
c.producerClient.CloseIdleConnections()
c.consumerClient.CloseIdleConnections()
if c.producerClient != nil {
c.producerClient.Close()
}
}

func bodyClose(r *http.Response) {
Expand All @@ -586,7 +600,7 @@ func bodyClose(r *http.Response) {

func (c *Client) getNextOrdinal(index *uint32, t *Topology) int {
value := atomic.AddUint32(index, 1)
if value >= maxOrdinal {
if value >= maxAtomicIncrement {
// Atomic inc operations don't wrap around.
// Not exactly fair when value >= maxOrdinal, but in practical terms is good enough
atomic.StoreUint32(index, 0)
Expand Down Expand Up @@ -617,4 +631,10 @@ func setDefaultOptions(options *ClientOptions) {
if options.TopologyPollInterval == 0 {
options.TopologyPollInterval = DefaultTopologyPollInterval
}
if options.ProducerConnectionsPerHost == 0 {
options.ProducerConnectionsPerHost = 1
}
if options.ProducerFlushThresholdBytes == 0 {
options.ProducerFlushThresholdBytes = 64 * 1024
}
}
10 changes: 5 additions & 5 deletions internal/client_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import (
"net/http/httptest"
"time"

"github.com/polarstreams/go-client/internal/serialization"
. "github.com/polarstreams/go-client/types"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/polarstreams/go-client/internal/serialization"
. "github.com/polarstreams/go-client/types"
)

var _ = Describe("Client", func() {
Expand Down Expand Up @@ -39,7 +39,7 @@ var _ = Describe("Client", func() {
})

It("should subscribe to each discovered server", func() {
client := newTestClient(discoveryAddress)
client := newTestClient(discoveryAddress, true)
defer client.Close()
options := ConsumerOptions{
Group: "a",
Expand All @@ -56,7 +56,7 @@ var _ = Describe("Client", func() {
s0.Shutdown(context.Background())
s0 = nil

client := newTestClient(discoveryAddress)
client := newTestClient(discoveryAddress, true)
defer client.Close()
options := ConsumerOptions{
Group: "a",
Expand Down Expand Up @@ -118,7 +118,7 @@ var _ = Describe("Client", func() {
})

It("should query brokers until it times out", func() {
client := newTestClient(discoveryAddress)
client := newTestClient(discoveryAddress, true)
defer client.Close()

client.RegisterAsConsumer(consumerOptions)
Expand Down
Loading

0 comments on commit cf09018

Please sign in to comment.