From 896efe12cdde1b7d0504ae57d859f3af4a78d67c Mon Sep 17 00:00:00 2001 From: Michele Masili Date: Sun, 18 Jul 2021 23:03:14 +0200 Subject: [PATCH] Small fixes --- core/commands/connectors/db_connector.go | 8 +++++++- core/commands/load_osm.go | 5 +++-- core/commands/load_wof.go | 3 ++- core/commands/osm/decode_tag.go | 9 +++++++-- core/commands/pipeline/api.go | 2 +- core/commands/pipeline/connector_processor.go | 9 +++++++-- 6 files changed, 27 insertions(+), 9 deletions(-) diff --git a/core/commands/connectors/db_connector.go b/core/commands/connectors/db_connector.go index 34d8979..f7d5112 100644 --- a/core/commands/connectors/db_connector.go +++ b/core/commands/connectors/db_connector.go @@ -8,6 +8,7 @@ import ( "github.com/godruoyi/go-snowflake" "github.com/lib/pq" "github.com/meekyphotos/experive-cli/core/utils" + "regexp" "strings" ) @@ -37,6 +38,8 @@ func (p *PgConnector) Close() { } } +var carriageReturn = regexp.MustCompile("[\n\r]") + func (p *PgConnector) Write(data []map[string]interface{}) error { if p.columns == nil { return errors.New("no columns found, call init before starting") @@ -71,7 +74,8 @@ func (p *PgConnector) Write(data []map[string]interface{}) error { value := row[c.Name] switch value.(type) { case string: // already marshalled - vals[i] = value + repairedString := string(carriageReturn.ReplaceAll([]byte(value.(string)), []byte{})) + vals[i] = repairedString default: marshal, err := json.Marshal(value) if err != nil { @@ -122,6 +126,8 @@ func (p *PgConnector) Init(columns []Column) error { stmt.WriteString(f.Name) stmt.WriteString(" ") switch f.Type { + case Snowflake: + stmt.WriteString("bigint") case Bigint: stmt.WriteString("bigint") case DoublePrecision: diff --git a/core/commands/load_osm.go b/core/commands/load_osm.go index 7acc859..8aee050 100644 --- a/core/commands/load_osm.go +++ b/core/commands/load_osm.go @@ -52,11 +52,12 @@ func (r OsmRunner) Run(c *utils.Config) error { if err != nil { return err } - requests := pipeline.BatchRequest(channel, 10000, time.Second) + requests := pipeline.BatchRequest(channel, 2500, time.Second) var pgWorkers sync.WaitGroup pgWorkers.Add(1) + beat := &pipeline.ProgressBarBeat{OperationName: "Writing"} go func() { - err := pipeline.ProcessChannel(requests, r.Connector) + err := pipeline.ProcessChannel(requests, r.Connector, beat) if err != nil { panic(err) } diff --git a/core/commands/load_wof.go b/core/commands/load_wof.go index 8fc4c7d..8bb2ce5 100644 --- a/core/commands/load_wof.go +++ b/core/commands/load_wof.go @@ -72,8 +72,9 @@ func (r WofRunner) Run(c *utils.Config) error { requests := pipeline.BatchRequest(channelOut, 10000, time.Second) var pgWorkers sync.WaitGroup pgWorkers.Add(1) + beat := &pipeline.ProgressBarBeat{OperationName: "Writing"} go func() { - err := pipeline.ProcessChannel(requests, r.Connector) + err := pipeline.ProcessChannel(requests, r.Connector, beat) if err != nil { panic(err) } diff --git a/core/commands/osm/decode_tag.go b/core/commands/osm/decode_tag.go index e668a12..f481dd2 100644 --- a/core/commands/osm/decode_tag.go +++ b/core/commands/osm/decode_tag.go @@ -2,6 +2,7 @@ package osm import ( "bytes" + "regexp" "strings" ) @@ -90,6 +91,8 @@ var keyVal = []byte(`":"`) var quotes = []byte(`"`) var endPar = []byte(`}`) var nameBytes = []byte(`name`) +var carriageReturn = regexp.MustCompile(`[\n\r"\\]`) +var escapeQuote = regexp.MustCompile(`"`) // Make tags map from stringtable and array of IDs (used in DenseNodes encoding). func (tu *tagUnpacker) next() (string, string, string, string) { @@ -135,7 +138,8 @@ keyLoop: } nameJson.Write(keyBytes) nameJson.Write(keyVal) - nameJson.Write(valBytes) + cleaned := carriageReturn.ReplaceAll(valBytes, []byte{}) + nameJson.Write(cleaned) nameJson.Write(quotes) } else { if !firstTag { @@ -146,7 +150,8 @@ keyLoop: } tagsJson.Write(keyBytes) tagsJson.Write(keyVal) - tagsJson.Write(valBytes) + cleaned := carriageReturn.ReplaceAll(valBytes, []byte{}) + tagsJson.Write(cleaned) tagsJson.Write(quotes) } diff --git a/core/commands/pipeline/api.go b/core/commands/pipeline/api.go index 14c30da..526c773 100644 --- a/core/commands/pipeline/api.go +++ b/core/commands/pipeline/api.go @@ -54,7 +54,7 @@ type ProgressBarBeat struct { } func (b *ProgressBarBeat) Start() { - b.bar = progressbar.NewOptions64(-1, progressbar.OptionSetDescription(b.OperationName)) + b.bar = progressbar.Default(-1, "Writing") } func (b *ProgressBarBeat) Beat(amount int) { diff --git a/core/commands/pipeline/connector_processor.go b/core/commands/pipeline/connector_processor.go index 30377e4..7dc4414 100644 --- a/core/commands/pipeline/connector_processor.go +++ b/core/commands/pipeline/connector_processor.go @@ -5,20 +5,25 @@ import ( "time" ) -func ProcessChannel(channel chan []map[string]interface{}, db connectors.Connector) error { +func ProcessChannel(channel chan []map[string]interface{}, db connectors.Connector, beat Heartbeat) error { + beat.Start() + defer beat.Done() for { select { case content := <-channel: - if len(content) == 0 { + i := len(content) + if i == 0 { return nil } err := db.Write(content) + beat.Beat(i) if err != nil { return err } default: } } + } func BatchRequest(values <-chan map[string]interface{}, maxItems int, maxTimeout time.Duration) chan []map[string]interface{} {