Skip to content

Commit

Permalink
put transIntColumn to member of clickhouse plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
childe committed Mar 24, 2023
1 parent 5335850 commit 0db1a3e
Showing 1 changed file with 27 additions and 32 deletions.
59 changes: 27 additions & 32 deletions output/clickhouse_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,6 @@ const (
CLICKHOUSE_DEFAULT_FLUSH_INTERVAL = 30
)

var transIntColumn = make(map[string]string)

var transArrayColumn = make(map[string]string)

var transFloatColumn = make(map[string]string)

type ClickhouseOutput struct {
config map[interface{}]interface{}

Expand All @@ -54,6 +48,10 @@ type ClickhouseOutput struct {
mux sync.Mutex
wg sync.WaitGroup
closeChan chan bool

transIntColumn []string
transFloatColumn []string
transIntArrayColumn []string
}

type rowDesc struct {
Expand Down Expand Up @@ -136,11 +134,11 @@ func (c *ClickhouseOutput) setTableDesc() {
for key1, value1 := range c.desc {
switch value1.Type {
case "Int64", "UInt64", "Int32", "UInt32", "Int16", "UInt16", "Int8", "UInt8", "Nullable(Int64)", "Nullable(Int32)", "Nullable(Int16)", "Nullable(Int8)":
transIntColumn[key1] = value1.Type
case "Array(String)", "Array(Int64)", "Array(Int32)", "Array(Int16)", "Array(Int8)":
transArrayColumn[key1] = value1.Type
c.transIntColumn = append(c.transIntColumn, key1)
case "Array(Int64)", "Array(Int32)", "Array(Int16)", "Array(Int8)":
c.transIntArrayColumn = append(c.transIntArrayColumn, key1)
case "Float64", "Float32", "Nullable(Float32)", "Nullable(Float64)":
transFloatColumn[key1] = value1.Type
c.transFloatColumn = append(c.transFloatColumn, key1)
}
}

Expand Down Expand Up @@ -428,44 +426,41 @@ func (c *ClickhouseOutput) innerFlush(events []map[string]interface{}) {

for _, event := range events {

for keyInt := range transIntColumn {
if keyIntValue, ok := event[keyInt]; ok {
for _, key := range c.transIntColumn {
if keyIntValue, ok := event[key]; ok {
if intConverterValue, err := cast.ToInt64E(keyIntValue); err == nil {
event[keyInt] = intConverterValue
event[key] = intConverterValue
} else {
glog.V(10).Infof("ch_output convert intType error: %s", err)
event[keyInt] = nil
event[key] = nil
}
}
}

for keyArray, columnArrayType := range transArrayColumn {
if keyArrayValue, ok := event[keyArray]; ok {
switch columnArrayType {
case "Array(Int64)", "Array(Int32)", "Array(Int16)", "Array(Int8)":
arrayIntValue := keyArrayValue.([]interface{})
ints := make([]int64, len(arrayIntValue))
for i, v := range arrayIntValue {
if v, err := cast.ToInt64E(v); err == nil {
ints[i] = v
} else {
glog.V(10).Infof("ch_output convert arrayIntType error: %s", err)
ints[i] = 0
}
for _, key := range c.transIntArrayColumn {
if keyArrayValue, ok := event[key]; ok {
arrayIntValue := keyArrayValue.([]interface{})
ints := make([]int64, len(arrayIntValue))
for i, v := range arrayIntValue {
if v, err := cast.ToInt64E(v); err == nil {
ints[i] = v
} else {
glog.V(10).Infof("ch_output convert arrayIntType error: %s", err)
ints[i] = 0
}
event[keyArray] = ints
event[key] = ints
}
}
}

for keyFloat, _ := range transFloatColumn {
if keyFloatValue, ok := event[keyFloat]; ok {
for _, key := range c.transFloatColumn {
if keyFloatValue, ok := event[key]; ok {
floatConverterValue, err := cast.ToFloat64E(keyFloatValue)
if err == nil {
event[keyFloat] = floatConverterValue
event[key] = floatConverterValue
} else {
glog.V(10).Infof("ch_output convert floatType error: %s", err)
event[keyFloat] = nil
event[key] = nil
}
}
}
Expand Down

0 comments on commit 0db1a3e

Please sign in to comment.