You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I have many ipfix records which inserted in the kafka and i have created consumer via go language with this code
package main
import (
"context"
"database/sql"
"encoding/json"
"flag"
"fmt"
"log"
// "os"
// "strconv"
"sync"
"time"
"github.com/ClickHouse/clickhouse-go"
"github.com/segmentio/kafka-go"
cluster "github.com/bsm/sarama-cluster"
)
type options struct {
Broker string
Topic string
Debug bool
Workers int
}
type dataField struct {
I int
V interface{}
}
type Header struct {
Version int
Length int
ExportTime int64
SequenceNo int
DomainID int
}
type ipfix struct {
AgentID string
Header Header
DataSets [][]dataField
}
type dIPFIXSample struct {
device string
sourceIPv4Address string
sourceTransportPort uint64
postNATSourceIPv4Address string
postNATSourceTransportPort uint64
destinationIPv4Address string
postNATDestinationIPv4Address string
postNATDestinationTransportPort uint64
dstport uint64
timestamp string
postNATSourceIPv6Address string
postNATDestinationIPv6Address string
sourceIPv6Address string
destinationIPv6Address string
proto uint8
login string
sessionid uint64
}
var opts options
func init() {
flag.StringVar(&opts.Broker, "broker", "172.18.0.4:9092", "broker ipaddress:port")
flag.StringVar(&opts.Topic, "topic", "vflow.ipfix", "kafka topic")
flag.BoolVar(&opts.Debug, "debug", true, "enabled/disabled debug")
flag.IntVar(&opts.Workers, "workers", 16, "workers number / partition number")
flag.Parse()
}
func main() {
var (
wg sync.WaitGroup
ch = make(chan ipfix, 10000)
)
for i := 0; i < 5; i++ {
go ingestClickHouse(ch)
}
wg.Add(opts.Workers)
for i := 0; i < opts.Workers; i++ {
go func(ti int) {
// create a new kafka reader with the broker and topic
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{opts.Broker},
Topic: opts.Topic,
GroupID: "mygroup",
// start consuming from the earliest message
StartOffset: 0,
})
pCount := 0
count := 0
tik := time.Tick(10 * time.Second)
for {
select {
case <-tik:
if opts.Debug {
log.Printf("partition GroupId#%d, rate=%d\n", ti, (count-pCount)/10)
}
pCount = count
default:
// read the next message from kafka
m, err := r.ReadMessage(context.Background())
if err != nil {
if err == kafka.ErrGenerationEnded {
log.Println("generation ended")
return
}
log.Println(err)
continue
}
// log.Printf("Received message from Kafka: %s\n", string(m.Value))
// unmarshal the message into an ipfix struct
objmap:= ipfix{}
if err := json.Unmarshal(m.Value, &objmap); err != nil {
log.Println(err)
continue
}
fmt.Sprintf("kkkkkkkkkkkkkkkk%v",objmap);
// send the ipfix struct to the ingestClickHouse goroutine
ch <- objmap
// go ingestClickHouse(ch)
// mark the message as processed
if err := r.CommitMessages(context.Background(), m); err != nil {
log.Println(err)
continue
}
count++
}
}
}(i)
}
wg.Wait()
// close(ch)
}
func ingestClickHouse(ch chan ipfix) {
var sample ipfix
connect, err := sql.Open("clickhouse", "tcp://127.0.0.1:9000?debug=true&username=default&password=wawa123")
if err != nil {
log.Fatal(err)
}
if err := connect.Ping(); err != nil {
if exception, ok := err.(*clickhouse.Exception); ok {
log.Printf("[%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace)
} else {
log.Println(err)
}
return
}
defer connect.Close()
for {
tx, err := connect.Begin()
if err != nil {
log.Fatal(err)
}
stmt, err := tx.Prepare("INSERT INTO natdb.natlogs (timestamp,router_ip,sourceIPv4Address, sourceTransportPort,postNATSourceIPv4Address,postNATSourceTransportPort,destinationIPv4Address,dstport,postNATDestinationIPv4Address, postNATDestinationTransportPort,postNATSourceIPv6Address,postNATDestinationIPv6Address,sourceIPv6Address,destinationIPv6Address,proto,login) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?)")
if err != nil {
log.Fatal(err)
}
for i := 0; i < 10000; i++ {
sample = <-ch
for _, data := range sample.DataSets {
s := dIPFIXSample{}
for _, dd := range data {
switch dd.I {
case 8:
s.sourceIPv4Address = dd.V.(string)
case 7:
s.sourceTransportPort =uint64( dd.V.(float64))
case 225:
s.postNATSourceIPv4Address = dd.V.(string)
case 227:
s.postNATSourceTransportPort = uint64(dd.V.(float64))
case 12:
s.destinationIPv4Address=dd.V.(string)
case 11:
s.dstport=uint64(dd.V.(float64))
case 226:
s.postNATDestinationIPv4Address=dd.V.(string)
case 27:
s.sourceIPv6Address=dd.V.(string)
case 28:
s.destinationIPv6Address=dd.V.(string)
case 281:
s.postNATSourceIPv6Address=dd.V.(string)
case 282:
s.postNATDestinationIPv6Address=dd.V.(string)
case 2003:
s.login =dd.V.(string)
log.Printf(dd.V.(string))
case 228:
s.postNATDestinationTransportPort=uint64(dd.V.(float64))
case 4:
s.proto = uint8(dd.V.(float64))
}
}
timestamp := time.Unix(sample.Header.ExportTime, 0).Format("2006-01-02 15:04:05")
if _, err := stmt.Exec(
timestamp,
sample.AgentID,
s.sourceIPv4Address,
s.sourceTransportPort,
s.postNATSourceIPv4Address,
s.postNATSourceTransportPort,
s.destinationIPv4Address,
s.dstport,
s.postNATDestinationIPv4Address,
s.postNATDestinationTransportPort,
s.postNATSourceIPv6Address,
s.postNATDestinationIPv6Address,
s.sourceIPv6Address,
s.destinationIPv6Address,
s.proto,
s.login,
); err != nil {
log.Fatal(err)
}
}
}
go func(tx *sql.Tx) {
if err := tx.Commit(); err != nil {
log.Fatal(err)
}
}(tx)
}
}
the code is working fine and i am able to insert data in clickhouse but because of high traffics and huge amount of data which inserted in kafka there is a delay between kafka and clickhouse which increase as the traffic increased , right now i have more than 20 hours delay , can you please recommend me any way to make it faster this is my clickhouse table
I have many ipfix records which inserted in the kafka and i have created consumer via go language with this code
the code is working fine and i am able to insert data in clickhouse but because of high traffics and huge amount of data which inserted in kafka there is a delay between kafka and clickhouse which increase as the traffic increased , right now i have more than 20 hours delay , can you please recommend me any way to make it faster this is my clickhouse table
I want to have faster way to insert data in clickhouse
thanks in advance
The text was updated successfully, but these errors were encountered: