Skip to content

Commit

Permalink
Use badger as temporary storage
Browse files Browse the repository at this point in the history
  • Loading branch information
Michele Masili committed Jul 20, 2021
1 parent 1654058 commit d6415a9
Show file tree
Hide file tree
Showing 12 changed files with 340 additions and 125 deletions.
2 changes: 2 additions & 0 deletions core/commands/connectors/db_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/godruoyi/go-snowflake"
"github.com/lib/pq"
"github.com/meekyphotos/experive-cli/core/utils"
"log"
"regexp"
"strings"
)
Expand Down Expand Up @@ -94,6 +95,7 @@ func (p *PgConnector) Write(data []map[string]interface{}) error {
}
_, err := stmt.Exec(vals...)
if err != nil {
log.Println(err)
return err
}
}
Expand Down
109 changes: 59 additions & 50 deletions core/commands/load_osm.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@ package commands
import (
"github.com/meekyphotos/experive-cli/core/commands/connectors"
"github.com/meekyphotos/experive-cli/core/commands/pipeline"
"github.com/meekyphotos/experive-cli/core/dataproviders"
"github.com/meekyphotos/experive-cli/core/utils"
"github.com/urfave/cli/v2"
"github.com/valyala/fastjson"
"os"
"sync"
"time"
)

type OsmRunner struct {
NodeConnector connectors.Connector
WaysConnector connectors.Connector
RelationsConnector connectors.Connector
store dataproviders.Store
NodeConnector connectors.Connector
}

// ID int64
Expand All @@ -28,18 +30,6 @@ var osmFields = []connectors.Column{
{Name: "address", Type: connectors.Jsonb},
}

var wayFields = []connectors.Column{
{Name: "osm_id", Type: connectors.Bigint, Indexed: true},
{Name: "extratags", Type: connectors.Jsonb},
{Name: "node_ids", Type: connectors.Jsonb},
}

var relationFields = []connectors.Column{
{Name: "osm_id", Type: connectors.Bigint, Indexed: true},
{Name: "extratags", Type: connectors.Jsonb},
{Name: "members", Type: connectors.Jsonb},
}

func determineNodesCols(c *utils.Config) []connectors.Column {
cols := make([]connectors.Column, 0)
cols = append(cols, osmFields...)
Expand All @@ -63,69 +53,88 @@ func (r OsmRunner) Run(c *utils.Config) error {
if dbErr != nil {
return dbErr
}

r.WaysConnector = &connectors.PgConnector{
Config: c, TableName: c.TableName + "_ways", Db: pg.Db,
}
r.RelationsConnector = &connectors.PgConnector{
Config: c, TableName: c.TableName + "_relations", Db: pg.Db,
}
r.store = dataproviders.Store{}
os.RemoveAll("./db.tmp") // try to delete all to cleanup previous run
r.store.Open("./db.tmp")
defer func() {
os.RemoveAll("./db.tmp")
}()
defer r.store.Close()
// no need to close ways & relations
defer r.NodeConnector.Close()
dbErr = r.NodeConnector.Init(determineNodesCols(c))
if dbErr != nil {
return dbErr
}
dbErr = r.WaysConnector.Init(wayFields)
if dbErr != nil {
return dbErr
}
dbErr = r.RelationsConnector.Init(relationFields)
if dbErr != nil {
return dbErr
}

nodeChannel, wayChannel, relationChannel, err := pipeline.ReadFromPbf(c.File, &pipeline.NoopBeat{})
nodeChannel, wayChannel, err := pipeline.ReadFromPbf(c.File, &pipeline.NoopBeat{})
if err != nil {
return err
}
nodeRequests := pipeline.BatchRequest(nodeChannel, 10000, time.Second)
wayRequests := pipeline.BatchRequest(wayChannel, 10000, time.Second)
relationRequests := pipeline.BatchRequest(relationChannel, 10000, time.Second)
var pgWorkers sync.WaitGroup
nodeRequests := pipeline.BatchINodes(nodeChannel, 10000, time.Second)
var postProcessingWorkers sync.WaitGroup

nodeBeat := &pipeline.ProgressBarBeat{OperationName: "Nodes"}
relationBeat := &pipeline.ProgressBarBeat{OperationName: "Relations"}
waysBeat := &pipeline.ProgressBarBeat{OperationName: "Ways"}
actualBeat := &pipeline.ProgressBarBeat{OperationName: "Node written"}

pgWorkers.Add(1)
postProcessingWorkers.Add(1)
go func() {
err := pipeline.ProcessChannel(nodeRequests, r.NodeConnector, nodeBeat)
err := pipeline.ProcessINodes(nodeRequests, r.store, nodeBeat)
if err != nil {
panic(err)
}
pgWorkers.Done()
postProcessingWorkers.Done()
}()

pgWorkers.Add(1)
postProcessingWorkers.Add(1)
go func() {
err := pipeline.ProcessChannel(wayRequests, r.WaysConnector, waysBeat)
if err != nil {
panic(err)
}
pgWorkers.Done()
pipeline.ProcessNodeEnrichment(wayChannel, r.store, waysBeat)
postProcessingWorkers.Done()
}()

pgWorkers.Add(1)
postProcessingWorkers.Wait()

// I'm completely done with post processing.. now I should start writing stuff
storeChannel := r.store.Stream(func(value *fastjson.Value) map[string]interface{} {
baseObject := map[string]interface{}{
"osm_id": value.GetInt64("osm_id"),
"osm_type": string(value.GetStringBytes("osm_type")),
"class": string(value.GetStringBytes("class")),
"type": string(value.GetStringBytes("type")),
"latitude": value.GetFloat64("latitude"),
"longitude": value.GetFloat64("longitude"),
"name": "",
"address": "",
"extratags": "",
}
name := value.GetObject("name")
if name != nil {
baseObject["name"] = string(name.MarshalTo([]byte{}))
}
address := value.GetObject("address")
if address != nil {
baseObject["address"] = string(address.MarshalTo([]byte{}))
}

extratags := value.GetObject("extratags")
if extratags != nil {
baseObject["extratags"] = string(extratags.MarshalTo([]byte{}))
}
return baseObject
})
rowsChannel := pipeline.BatchRequest(storeChannel, 10000, time.Second)

var pgWorker sync.WaitGroup
pgWorker.Add(1)
go func() {
err := pipeline.ProcessChannel(relationRequests, r.RelationsConnector, relationBeat)
err := pipeline.ProcessChannel(rowsChannel, r.NodeConnector, actualBeat)
if err != nil {
panic(err)
}
pgWorkers.Done()
pgWorker.Done()
}()

pgWorkers.Wait()
pgWorker.Wait()
return r.NodeConnector.CreateIndexes()
}

Expand Down
11 changes: 8 additions & 3 deletions core/commands/osm/data.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package osm

import "time"
import (
"time"
)

type BoundingBox struct {
Left float64
Expand Down Expand Up @@ -30,11 +32,14 @@ type Info struct {
}

type Node struct {
Content map[string]interface{}
Id int64
Content []byte
}

type Way struct {
Content map[string]interface{}
Id int64
Tags map[string]string
NodeIds []int64
}

type MemberType int
Expand Down
35 changes: 26 additions & 9 deletions core/commands/osm/decode_tag.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ keyLoop:
}
key := string(keyBytes)
val := string(stringTable[valueIDs[index]])

if strings.Contains(key, "name") {
names[key] = val
} else {
Expand All @@ -87,7 +88,9 @@ type tagUnpacker struct {

var openPar = []byte(`{`)
var openWithComma = []byte(`,"`)
var comma = []byte(`,`)
var keyVal = []byte(`":"`)
var keyValPrimitive = []byte(`":`)
var quotes = []byte(`"`)
var endPar = []byte(`}`)
var nameBytes = []byte(`name`)
Expand All @@ -114,24 +117,38 @@ func (js *json) close() {
func (js *json) toString() string {
return js.buffer.String()
}

func (js *json) add(key []byte, val []byte) {
js.prepareForKey()
js.buffer.Write(key)
js.buffer.Write(keyVal)
cleaned := carriageReturn.ReplaceAll(val, []byte{})
js.buffer.Write(cleaned)
js.buffer.Write(quotes)
}

func (js *json) addPrimitive(key []byte, val []byte) {
if len(val) > 0 {
js.prepareForKey()
js.buffer.Write(key)
js.buffer.Write(keyValPrimitive)
js.buffer.Write(val)
}
}

func (js *json) prepareForKey() {
if js.started {
js.buffer.Write(openWithComma)
} else {
js.started = true
js.buffer.Write(openPar)
js.buffer.Write(quotes)
}
js.buffer.Write(key)
js.buffer.Write(keyVal)
cleaned := carriageReturn.ReplaceAll(val, []byte{})
js.buffer.Write(cleaned)
js.buffer.Write(quotes)
}

// Make tags map from stringtable and array of IDs (used in DenseNodes encoding).
func (tu *tagUnpacker) next() (string, string, string, string, string) {
var class, osmType string
func (tu *tagUnpacker) next() (string, string, string, []byte, []byte) {
var class, osmType []byte
tagsJson := newJson()
nameJson := newJson()
addressJson := newJson()
Expand All @@ -155,8 +172,8 @@ keyLoop:
valBytes := tu.stringTable[valID]
for _, b := range classDefiningAttributes {
if bytes.Equal(b, keyBytes) {
class = string(b)
osmType = string(valBytes)
class = b
osmType = valBytes
break // add key anyway
}
}
Expand Down
68 changes: 34 additions & 34 deletions core/commands/osm/decoder_data.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package osm

import (
"fmt"
"github.com/meekyphotos/experive-cli/core/commands/pbf"
"google.golang.org/protobuf/proto"
)
Expand Down Expand Up @@ -48,6 +49,16 @@ func (dec *dataDecoder) parsePrimitiveGroup(pb *pbf.PrimitiveBlock, pg *pbf.Prim
//}
}

var osmIdBytes = []byte("osm_id")
var osmTypeBytes = []byte("osm_type")
var classBytes = []byte("class")
var typeBytes = []byte("type")
var latitudeBytes = []byte("latitude")
var longitudeBytes = []byte("longitude")
var metadataBytes = []byte("extratags")
var namesBytes = []byte("name")
var addressesBytes = []byte("address")

func (dec *dataDecoder) parseNodes(pb *pbf.PrimitiveBlock, nodes []*pbf.Node) {
st := pb.GetStringtable().GetS()
granularity := int64(pb.GetGranularity())
Expand All @@ -64,20 +75,7 @@ func (dec *dataDecoder) parseNodes(pb *pbf.PrimitiveBlock, nodes []*pbf.Node) {
longitude := 1e-9 * float64(lonOffset+(granularity*lon))

tags, names, class, osmType := ExtractInfo(st, node.GetKeys(), node.GetVals())
if len(tags) != 2 || len(names) != 2 {

dec.q = append(dec.q, &Node{
Content: map[string]interface{}{
"osm_id": id,
"class": class,
"type": osmType,
"latitude": latitude,
"longitude": longitude,
"metadata": tags,
"names": names,
},
})
}
dec.addNodeQueue(tags, names, id, []byte(class), []byte(osmType), "", latitude, longitude)
}

}
Expand All @@ -100,21 +98,27 @@ func (dec *dataDecoder) parseDenseNodes(pb *pbf.PrimitiveBlock, dn *pbf.DenseNod
latitude := 1e-9 * float64(latOffset+(granularity*lat))
longitude := 1e-9 * float64(lonOffset+(granularity*lon))
tags, names, address, class, osmType := tu.next()
if len(tags) != 0 || len(names) != 0 {
dec.q = append(dec.q, &Node{
Content: map[string]interface{}{
"osm_id": id,
"osm_type": 'N',
"class": class,
"type": osmType,
"name": names,
"address": address,
"latitude": latitude,
"longitude": longitude,
"extratags": tags,
},
})
}
dec.addNodeQueue(tags, names, id, class, osmType, address, latitude, longitude)
}
}

func (dec *dataDecoder) addNodeQueue(tags string, names string, id int64, class []byte, osmType []byte, address string, latitude float64, longitude float64) {
if len(tags) != 0 || len(names) != 0 {
json := newJson()
json.addPrimitive(osmIdBytes, []byte(fmt.Sprintf("%d", id)))
json.add(osmTypeBytes, []byte("N"))
json.add(classBytes, class)
json.add(typeBytes, osmType)
json.addPrimitive(namesBytes, []byte(names))
json.addPrimitive(addressBytes, []byte(address))
json.addPrimitive(metadataBytes, []byte(tags))
json.addPrimitive(latitudeBytes, []byte(fmt.Sprintf("%f", latitude)))
json.addPrimitive(longitudeBytes, []byte(fmt.Sprintf("%f", longitude)))
json.close()
dec.q = append(dec.q, &Node{
Id: id,
Content: []byte(json.toString()),
})
}
}

Expand All @@ -135,11 +139,7 @@ func (dec *dataDecoder) parseWays(pb *pbf.PrimitiveBlock, ways []*pbf.Way) {
}

dec.q = append(dec.q, &Way{
Content: map[string]interface{}{
"osm_id": id,
"extratags": tags,
"node_ids": nodeIDs,
},
id, tags, nodeIDs,
})
}
}
Expand Down
Loading

0 comments on commit d6415a9

Please sign in to comment.