Skip to content

Commit

Permalink
Small fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Michele Masili committed Jul 18, 2021
1 parent ff3cfb1 commit 896efe1
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 9 deletions.
8 changes: 7 additions & 1 deletion core/commands/connectors/db_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/godruoyi/go-snowflake"
"github.com/lib/pq"
"github.com/meekyphotos/experive-cli/core/utils"
"regexp"
"strings"
)

Expand Down Expand Up @@ -37,6 +38,8 @@ func (p *PgConnector) Close() {
}
}

var carriageReturn = regexp.MustCompile("[\n\r]")

func (p *PgConnector) Write(data []map[string]interface{}) error {
if p.columns == nil {
return errors.New("no columns found, call init before starting")
Expand Down Expand Up @@ -71,7 +74,8 @@ func (p *PgConnector) Write(data []map[string]interface{}) error {
value := row[c.Name]
switch value.(type) {
case string: // already marshalled
vals[i] = value
repairedString := string(carriageReturn.ReplaceAll([]byte(value.(string)), []byte{}))
vals[i] = repairedString
default:
marshal, err := json.Marshal(value)
if err != nil {
Expand Down Expand Up @@ -122,6 +126,8 @@ func (p *PgConnector) Init(columns []Column) error {
stmt.WriteString(f.Name)
stmt.WriteString(" ")
switch f.Type {
case Snowflake:
stmt.WriteString("bigint")
case Bigint:
stmt.WriteString("bigint")
case DoublePrecision:
Expand Down
5 changes: 3 additions & 2 deletions core/commands/load_osm.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,12 @@ func (r OsmRunner) Run(c *utils.Config) error {
if err != nil {
return err
}
requests := pipeline.BatchRequest(channel, 10000, time.Second)
requests := pipeline.BatchRequest(channel, 2500, time.Second)
var pgWorkers sync.WaitGroup
pgWorkers.Add(1)
beat := &pipeline.ProgressBarBeat{OperationName: "Writing"}
go func() {
err := pipeline.ProcessChannel(requests, r.Connector)
err := pipeline.ProcessChannel(requests, r.Connector, beat)
if err != nil {
panic(err)
}
Expand Down
3 changes: 2 additions & 1 deletion core/commands/load_wof.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ func (r WofRunner) Run(c *utils.Config) error {
requests := pipeline.BatchRequest(channelOut, 10000, time.Second)
var pgWorkers sync.WaitGroup
pgWorkers.Add(1)
beat := &pipeline.ProgressBarBeat{OperationName: "Writing"}
go func() {
err := pipeline.ProcessChannel(requests, r.Connector)
err := pipeline.ProcessChannel(requests, r.Connector, beat)
if err != nil {
panic(err)
}
Expand Down
9 changes: 7 additions & 2 deletions core/commands/osm/decode_tag.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package osm

import (
"bytes"
"regexp"
"strings"
)

Expand Down Expand Up @@ -90,6 +91,8 @@ var keyVal = []byte(`":"`)
var quotes = []byte(`"`)
var endPar = []byte(`}`)
var nameBytes = []byte(`name`)
var carriageReturn = regexp.MustCompile(`[\n\r"\\]`)
var escapeQuote = regexp.MustCompile(`"`)

// Make tags map from stringtable and array of IDs (used in DenseNodes encoding).
func (tu *tagUnpacker) next() (string, string, string, string) {
Expand Down Expand Up @@ -135,7 +138,8 @@ keyLoop:
}
nameJson.Write(keyBytes)
nameJson.Write(keyVal)
nameJson.Write(valBytes)
cleaned := carriageReturn.ReplaceAll(valBytes, []byte{})
nameJson.Write(cleaned)
nameJson.Write(quotes)
} else {
if !firstTag {
Expand All @@ -146,7 +150,8 @@ keyLoop:
}
tagsJson.Write(keyBytes)
tagsJson.Write(keyVal)
tagsJson.Write(valBytes)
cleaned := carriageReturn.ReplaceAll(valBytes, []byte{})
tagsJson.Write(cleaned)
tagsJson.Write(quotes)

}
Expand Down
2 changes: 1 addition & 1 deletion core/commands/pipeline/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type ProgressBarBeat struct {
}

func (b *ProgressBarBeat) Start() {
b.bar = progressbar.NewOptions64(-1, progressbar.OptionSetDescription(b.OperationName))
b.bar = progressbar.Default(-1, "Writing")

}
func (b *ProgressBarBeat) Beat(amount int) {
Expand Down
9 changes: 7 additions & 2 deletions core/commands/pipeline/connector_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,25 @@ import (
"time"
)

func ProcessChannel(channel chan []map[string]interface{}, db connectors.Connector) error {
func ProcessChannel(channel chan []map[string]interface{}, db connectors.Connector, beat Heartbeat) error {
beat.Start()
defer beat.Done()
for {
select {
case content := <-channel:
if len(content) == 0 {
i := len(content)
if i == 0 {
return nil
}
err := db.Write(content)
beat.Beat(i)
if err != nil {
return err
}
default:
}
}

}

func BatchRequest(values <-chan map[string]interface{}, maxItems int, maxTimeout time.Duration) chan []map[string]interface{} {
Expand Down

0 comments on commit 896efe1

Please sign in to comment.