Skip to content

Commit

Permalink
get kafka client options
Browse files Browse the repository at this point in the history
  • Loading branch information
DmitryRomanov committed Jul 4, 2024
1 parent 39df717 commit 36de039
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 151 deletions.
113 changes: 113 additions & 0 deletions cfg/kafka_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package cfg

import (
"crypto/tls"
"os"

"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/sasl/aws"
"github.com/twmb/franz-go/pkg/sasl/plain"
"github.com/twmb/franz-go/pkg/sasl/scram"
"github.com/twmb/franz-go/plugin/kzap"
"github.com/twmb/tlscfg"
"go.uber.org/zap"
)

type KafkaClientConfig interface {
GetBrokers() []string
GetClientID() string

IsSaslEnabled() bool
GetSaslConfig() KafkaClientSaslConfig

IsSslEnabled() bool
GetSslConfig() KafkaClientSslConfig
}

type KafkaClientSaslConfig struct {
SaslMechanism string
SaslUsername string
SaslPassword string
}

type KafkaClientSslConfig struct {
CACert string
ClientCert string
ClientKey string
SslSkipVerify bool
}

func GetKafkaClientOptions(c KafkaClientConfig, l *zap.SugaredLogger) []kgo.Opt {
opts := []kgo.Opt{
kgo.SeedBrokers(c.GetBrokers()...),
kgo.ClientID(c.GetClientID()),
kgo.WithLogger(kzap.New(l.Desugar())),
}

if c.IsSaslEnabled() {
saslConfig := c.GetSaslConfig()
switch saslConfig.SaslMechanism {
case "PLAIN":
opts = append(opts, kgo.SASL(plain.Auth{
User: saslConfig.SaslUsername,
Pass: saslConfig.SaslPassword,
}.AsMechanism()))
case "SCRAM-SHA-256":
opts = append(opts, kgo.SASL(scram.Auth{
User: saslConfig.SaslUsername,
Pass: saslConfig.SaslPassword,
}.AsSha256Mechanism()))
case "SCRAM-SHA-512":
opts = append(opts, kgo.SASL(scram.Auth{
User: saslConfig.SaslUsername,
Pass: saslConfig.SaslPassword,
}.AsSha512Mechanism()))
case "AWS_MSK_IAM":
opts = append(opts, kgo.SASL(aws.Auth{
AccessKey: saslConfig.SaslUsername,
SecretKey: saslConfig.SaslPassword,
}.AsManagedStreamingIAMMechanism()))
}
opts = append(opts, kgo.DialTLSConfig(new(tls.Config)))
}

if c.IsSslEnabled() {
sslConfig := c.GetSslConfig()
tlsOpts := []tlscfg.Opt{}
if sslConfig.CACert != "" || sslConfig.ClientCert != "" || sslConfig.ClientKey != "" {
if sslConfig.CACert != "" {
if _, err := os.Stat(sslConfig.CACert); err != nil {
tlsOpts = append(tlsOpts,
tlscfg.WithCA(
[]byte(sslConfig.CACert), tlscfg.ForClient,
),
)
} else {
tlsOpts = append(tlsOpts,
tlscfg.MaybeWithDiskCA(sslConfig.CACert, tlscfg.ForClient),
)
}
}

if _, err := os.Stat(sslConfig.ClientCert); err != nil {
tlsOpts = append(tlsOpts,
tlscfg.WithKeyPair(
[]byte(sslConfig.ClientCert), []byte(sslConfig.ClientKey),
),
)
} else {
tlsOpts = append(tlsOpts,
tlscfg.MaybeWithDiskKeyPair(sslConfig.ClientCert, sslConfig.ClientKey),
)
}
}
tc, err := tlscfg.New(tlsOpts...)
if err != nil {
l.Fatalf("unable to create tls config: %v", err)
}
tc.InsecureSkipVerify = sslConfig.SslSkipVerify
opts = append(opts, kgo.DialTLSConfig(tc))
}

return opts
}
79 changes: 4 additions & 75 deletions plugin/input/kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,26 @@ package kafka

import (
"context"
"crypto/tls"
"os"

"github.com/ozontech/file.d/cfg"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/sasl/aws"
"github.com/twmb/franz-go/pkg/sasl/plain"
"github.com/twmb/franz-go/pkg/sasl/scram"
"github.com/twmb/franz-go/plugin/kzap"
"github.com/twmb/tlscfg"
"go.uber.org/zap"
)

func NewClient(c *Config, l *zap.SugaredLogger) *kgo.Client {
opts := []kgo.Opt{
kgo.SeedBrokers(c.Brokers...),
opts := cfg.GetKafkaClientOptions(c, l)
opts = append(opts, []kgo.Opt{
kgo.ConsumerGroup(c.ConsumerGroup),
kgo.ConsumeTopics(c.Topics...),
kgo.ClientID(c.ClientID),
kgo.FetchMaxWait(c.ConsumerMaxWaitTime_),
kgo.AutoCommitMarks(),
kgo.WithLogger(kzap.New(l.Desugar())),
kgo.MaxConcurrentFetches(c.MaxConcurrentFetches),
kgo.FetchMaxBytes(c.FetchMaxBytes_),
kgo.FetchMinBytes(c.FetchMinBytes_),
kgo.AutoCommitInterval(c.AutoCommitInterval_),
kgo.SessionTimeout(c.SessionTimeout_),
kgo.HeartbeatInterval(c.HeartbeatInterval_),
}
}...)

offset := kgo.NewOffset()
switch c.Offset_ {
Expand All @@ -40,69 +32,6 @@ func NewClient(c *Config, l *zap.SugaredLogger) *kgo.Client {
}
opts = append(opts, kgo.ConsumeResetOffset(offset))

if c.SaslEnabled {
switch c.SaslMechanism {
case "PLAIN":
opts = append(opts, kgo.SASL(plain.Auth{
User: c.SaslUsername,
Pass: c.SaslPassword,
}.AsMechanism()))
case "SCRAM-SHA-256":
opts = append(opts, kgo.SASL(scram.Auth{
User: c.SaslUsername,
Pass: c.SaslPassword,
}.AsSha256Mechanism()))
case "SCRAM-SHA-512":
opts = append(opts, kgo.SASL(scram.Auth{
User: c.SaslUsername,
Pass: c.SaslPassword,
}.AsSha512Mechanism()))
case "AWS_MSK_IAM":
opts = append(opts, kgo.SASL(aws.Auth{
AccessKey: c.SaslUsername,
SecretKey: c.SaslPassword,
}.AsManagedStreamingIAMMechanism()))
}
opts = append(opts, kgo.DialTLSConfig(new(tls.Config)))
}

if c.SslEnabled {
tlsOpts := []tlscfg.Opt{}
if c.CACert != "" || c.ClientCert != "" || c.ClientKey != "" {
if c.CACert != "" {
if _, err := os.Stat(c.CACert); err != nil {
tlsOpts = append(tlsOpts,
tlscfg.WithCA(
[]byte(c.CACert), tlscfg.ForClient,
),
)
} else {
tlsOpts = append(tlsOpts,
tlscfg.MaybeWithDiskCA(c.CACert, tlscfg.ForClient),
)
}
}

if _, err := os.Stat(c.ClientCert); err != nil {
tlsOpts = append(tlsOpts,
tlscfg.WithKeyPair(
[]byte(c.ClientCert), []byte(c.ClientKey),
),
)
} else {
tlsOpts = append(tlsOpts,
tlscfg.MaybeWithDiskKeyPair(c.ClientCert, c.ClientKey),
)
}
}
tc, err := tlscfg.New(tlsOpts...)
if err != nil {
l.Fatalf("unable to create tls config: %v", err)
}
tc.InsecureSkipVerify = c.SslSkipVerify
opts = append(opts, kgo.DialTLSConfig(tc))
}

switch c.BalancerPlan {
case "round-robin":
opts = append(opts, kgo.Balancers(kgo.RoundRobinBalancer()))
Expand Down
35 changes: 34 additions & 1 deletion plugin/input/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ type Config struct {

// > @3@4@5@6
// >
// > The maximum amount of time the consumer expects a message takes to process for the user.
// > The maximum amount of time the consumer expects a message takes to process for the user. (Not used anymore!)
ConsumerMaxProcessingTime cfg.Duration `json:"consumer_max_processing_time" default:"200ms" parse:"duration"` // *
ConsumerMaxProcessingTime_ time.Duration

Expand Down Expand Up @@ -214,6 +214,39 @@ type Config struct {
Meta cfg.MetaTemplates `json:"meta"` // *
}

func (c *Config) GetBrokers() []string {
return c.Brokers
}

func (c *Config) GetClientID() string {
return c.ClientID
}

func (c *Config) IsSaslEnabled() bool {
return c.SaslEnabled
}

func (c *Config) GetSaslConfig() cfg.KafkaClientSaslConfig {
return cfg.KafkaClientSaslConfig{
SaslMechanism: c.SaslMechanism,
SaslUsername: c.SaslUsername,
SaslPassword: c.SaslPassword,
}
}

func (c *Config) IsSslEnabled() bool {
return c.SslEnabled
}

func (c *Config) GetSslConfig() cfg.KafkaClientSslConfig {
return cfg.KafkaClientSslConfig{
CACert: c.CACert,
ClientCert: c.ClientCert,
ClientKey: c.ClientKey,
SslSkipVerify: c.SslSkipVerify,
}
}

func init() {
fd.DefaultPluginRegistry.RegisterInput(&pipeline.PluginStaticInfo{
Type: "kafka",
Expand Down
79 changes: 4 additions & 75 deletions plugin/output/kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,10 @@ package kafka

import (
"context"
"crypto/tls"
"os"
"time"

"github.com/ozontech/file.d/cfg"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/sasl/aws"
"github.com/twmb/franz-go/pkg/sasl/plain"
"github.com/twmb/franz-go/pkg/sasl/scram"
"github.com/twmb/franz-go/plugin/kzap"
"github.com/twmb/tlscfg"
"go.uber.org/zap"
)

Expand All @@ -21,78 +15,13 @@ type KafkaClient interface {
}

func NewClient(c *Config, l *zap.SugaredLogger) *kgo.Client {
opts := []kgo.Opt{
kgo.SeedBrokers(c.Brokers...),
kgo.ClientID(c.ClientID),
opts := cfg.GetKafkaClientOptions(c, l)
opts = append(opts, []kgo.Opt{
kgo.DefaultProduceTopic(c.DefaultTopic),
kgo.WithLogger(kzap.New(l.Desugar())),
kgo.MaxBufferedRecords(c.BatchSize_),
kgo.ProducerBatchMaxBytes(int32(c.MaxMessageBytes_)),
kgo.ProducerLinger(1 * time.Millisecond),
}

if c.SaslEnabled {
switch c.SaslMechanism {
case "PLAIN":
opts = append(opts, kgo.SASL(plain.Auth{
User: c.SaslUsername,
Pass: c.SaslPassword,
}.AsMechanism()))
case "SCRAM-SHA-256":
opts = append(opts, kgo.SASL(scram.Auth{
User: c.SaslUsername,
Pass: c.SaslPassword,
}.AsSha256Mechanism()))
case "SCRAM-SHA-512":
opts = append(opts, kgo.SASL(scram.Auth{
User: c.SaslUsername,
Pass: c.SaslPassword,
}.AsSha512Mechanism()))
case "AWS_MSK_IAM":
opts = append(opts, kgo.SASL(aws.Auth{
AccessKey: c.SaslUsername,
SecretKey: c.SaslPassword,
}.AsManagedStreamingIAMMechanism()))
}
opts = append(opts, kgo.DialTLSConfig(new(tls.Config)))
}

if c.SslEnabled {
tlsOpts := []tlscfg.Opt{}
if c.CACert != "" || c.ClientCert != "" || c.ClientKey != "" {
if c.CACert != "" {
if _, err := os.Stat(c.CACert); err != nil {
tlsOpts = append(tlsOpts,
tlscfg.WithCA(
[]byte(c.CACert), tlscfg.ForClient,
),
)
} else {
tlsOpts = append(tlsOpts,
tlscfg.MaybeWithDiskCA(c.CACert, tlscfg.ForClient),
)
}
}

if _, err := os.Stat(c.ClientCert); err != nil {
tlsOpts = append(tlsOpts,
tlscfg.WithKeyPair(
[]byte(c.ClientCert), []byte(c.ClientKey),
),
)
} else {
tlsOpts = append(tlsOpts,
tlscfg.MaybeWithDiskKeyPair(c.ClientCert, c.ClientKey),
)
}
}
tc, err := tlscfg.New(tlsOpts...)
if err != nil {
l.Fatalf("unable to create tls config: %v", err)
}
tc.InsecureSkipVerify = c.SslSkipVerify
opts = append(opts, kgo.DialTLSConfig(tc))
}
}...)

switch c.Compression {
case "none":
Expand Down
Loading

0 comments on commit 36de039

Please sign in to comment.