From d7a8fdff5a528de463570401c1a22c430660b269 Mon Sep 17 00:00:00 2001 From: Michele Masili Date: Mon, 19 Jul 2021 21:44:52 +0200 Subject: [PATCH] Added relations and way parsing --- core/commands/connectors/db_connector.go | 27 ++--- core/commands/load_osm.go | 94 +++++++++++++++--- core/commands/load_wof.go | 2 +- core/commands/osm/data.go | 18 +--- core/commands/osm/decode_tag.go | 82 +++++++++------- core/commands/osm/decoder_data.go | 98 ++++++++++--------- core/commands/pipeline/api.go | 2 +- core/commands/pipeline/connector_processor.go | 4 +- core/commands/pipeline/pbf_reader.go | 30 +++--- 9 files changed, 214 insertions(+), 143 deletions(-) diff --git a/core/commands/connectors/db_connector.go b/core/commands/connectors/db_connector.go index 760fd1f..acc7a76 100644 --- a/core/commands/connectors/db_connector.go +++ b/core/commands/connectors/db_connector.go @@ -15,7 +15,7 @@ import ( type PgConnector struct { Config *utils.Config TableName string - db *sql.DB + Db *sql.DB columns []Column } @@ -27,12 +27,12 @@ func (p *PgConnector) Connect() error { if err != nil { return err } - p.db = conn + p.Db = conn return nil } func (p *PgConnector) Close() { - err := p.db.Close() + err := p.Db.Close() if err != nil { panic(err) } @@ -44,7 +44,7 @@ func (p *PgConnector) Write(data []map[string]interface{}) error { if p.columns == nil { return errors.New("no columns found, call init before starting") } - txn, err := p.db.Begin() + txn, err := p.Db.Begin() if err != nil { return err } @@ -74,7 +74,9 @@ func (p *PgConnector) Write(data []map[string]interface{}) error { value := row[c.Name] switch value.(type) { case string: // already marshalled - vals[i] = value + if value != "" { + vals[i] = value + } default: marshal, err := json.Marshal(value) if err != nil { @@ -104,13 +106,16 @@ func (p *PgConnector) Write(data []map[string]interface{}) error { func (p *PgConnector) Init(columns []Column) error { p.columns = columns - txn, err := p.db.Begin() + if p.TableName == "" { + p.TableName = p.Config.TableName + } + txn, err := p.Db.Begin() if err != nil { return err } if p.Config.Create { - _, err := txn.Exec("DROP TABLE IF EXISTS " + p.Config.Schema + "." + p.Config.TableName) + _, err := txn.Exec("DROP TABLE IF EXISTS " + p.Config.Schema + "." + p.TableName) if err != nil { return err } @@ -119,7 +124,7 @@ func (p *PgConnector) Init(columns []Column) error { stmt.WriteString("CREATE TABLE IF NOT EXISTS ") stmt.WriteString(p.Config.Schema) stmt.WriteString(".") - stmt.WriteString(p.Config.TableName) + stmt.WriteString(p.TableName) stmt.WriteString("\n(") for i, f := range columns { stmt.WriteString(f.Name) @@ -159,20 +164,20 @@ func (p *PgConnector) CreateIndexes() error { if p.columns == nil { return errors.New("no columns found, call init before starting") } - txn, err := p.db.Begin() + txn, err := p.Db.Begin() if err != nil { return err } for _, c := range p.columns { if c.Indexed { if c.Type == Point { - _, err := txn.Exec("CREATE INDEX ON " + p.Config.Schema + "." + p.Config.TableName + " using BRIN (" + c.Name + ")") + _, err := txn.Exec("CREATE INDEX ON " + p.Config.Schema + "." + p.TableName + " using BRIN (" + c.Name + ")") if err != nil { _ = txn.Rollback() return err } } else { - _, err := txn.Exec("CREATE INDEX ON " + p.Config.Schema + "." + p.Config.TableName + " (" + c.Name + ")") + _, err := txn.Exec("CREATE INDEX ON " + p.Config.Schema + "." + p.TableName + " (" + c.Name + ")") if err != nil { _ = txn.Rollback() return err diff --git a/core/commands/load_osm.go b/core/commands/load_osm.go index 47ec0a4..f8ff240 100644 --- a/core/commands/load_osm.go +++ b/core/commands/load_osm.go @@ -10,22 +10,41 @@ import ( ) type OsmRunner struct { - Connector connectors.Connector + NodeConnector connectors.Connector + WaysConnector connectors.Connector + RelationsConnector connectors.Connector } +// ID int64 +// Tags map[string]string +// NodeIDs []int64 + var osmFields = []connectors.Column{ - {Name: "id", Type: connectors.Snowflake}, {Name: "osm_id", Type: connectors.Bigint, Indexed: true}, + {Name: "osm_type", Type: connectors.Text}, {Name: "class", Type: connectors.Text}, {Name: "type", Type: connectors.Text}, - {Name: "names", Type: connectors.Jsonb}, + {Name: "name", Type: connectors.Jsonb}, + {Name: "address", Type: connectors.Jsonb}, } -func determineOsmCols(c *utils.Config) []connectors.Column { +var wayFields = []connectors.Column{ + {Name: "osm_id", Type: connectors.Bigint, Indexed: true}, + {Name: "extratags", Type: connectors.Jsonb}, + {Name: "node_ids", Type: connectors.Jsonb}, +} + +var relationFields = []connectors.Column{ + {Name: "osm_id", Type: connectors.Bigint, Indexed: true}, + {Name: "extratags", Type: connectors.Jsonb}, + {Name: "members", Type: connectors.Jsonb}, +} + +func determineNodesCols(c *utils.Config) []connectors.Column { cols := make([]connectors.Column, 0) cols = append(cols, osmFields...) if c.InclKeyValues { - cols = append(cols, connectors.Column{Name: "metadata", Type: connectors.Jsonb}) + cols = append(cols, connectors.Column{Name: "extratags", Type: connectors.Jsonb}) } if c.UseGeom { cols = append(cols, geomFields...) @@ -36,35 +55,78 @@ func determineOsmCols(c *utils.Config) []connectors.Column { } func (r OsmRunner) Run(c *utils.Config) error { - r.Connector = &connectors.PgConnector{ - Config: c, TableName: c.TableName, + pg := &connectors.PgConnector{ + Config: c, TableName: c.TableName + "_node", + } + r.NodeConnector = pg + dbErr := r.NodeConnector.Connect() + if dbErr != nil { + return dbErr + } + + r.WaysConnector = &connectors.PgConnector{ + Config: c, TableName: c.TableName + "_ways", Db: pg.Db, } - dbErr := r.Connector.Connect() + r.RelationsConnector = &connectors.PgConnector{ + Config: c, TableName: c.TableName + "_relations", Db: pg.Db, + } + // no need to close ways & relations + defer r.NodeConnector.Close() + dbErr = r.NodeConnector.Init(determineNodesCols(c)) if dbErr != nil { return dbErr } - defer r.Connector.Close() - dbErr = r.Connector.Init(determineOsmCols(c)) + dbErr = r.WaysConnector.Init(wayFields) if dbErr != nil { return dbErr } - channel, err := pipeline.ReadFromPbf(c.File, &pipeline.NoopBeat{}) + dbErr = r.RelationsConnector.Init(relationFields) + if dbErr != nil { + return dbErr + } + + nodeChannel, wayChannel, relationChannel, err := pipeline.ReadFromPbf(c.File, &pipeline.NoopBeat{}) if err != nil { return err } - requests := pipeline.BatchRequest(channel, 10000, time.Second) + nodeRequests := pipeline.BatchRequest(nodeChannel, 10000, time.Second) + wayRequests := pipeline.BatchRequest(wayChannel, 10000, time.Second) + relationRequests := pipeline.BatchRequest(relationChannel, 10000, time.Second) var pgWorkers sync.WaitGroup + + nodeBeat := &pipeline.ProgressBarBeat{OperationName: "Nodes"} + relationBeat := &pipeline.ProgressBarBeat{OperationName: "Relations"} + waysBeat := &pipeline.ProgressBarBeat{OperationName: "Ways"} + + pgWorkers.Add(1) + go func() { + err := pipeline.ProcessChannel(nodeRequests, r.NodeConnector, nodeBeat) + if err != nil { + panic(err) + } + pgWorkers.Done() + }() + pgWorkers.Add(1) - beat := &pipeline.ProgressBarBeat{OperationName: "Writing"} go func() { - err := pipeline.ProcessChannel(requests, r.Connector, beat) + err := pipeline.ProcessChannel(wayRequests, r.WaysConnector, waysBeat) if err != nil { panic(err) } pgWorkers.Done() }() + + pgWorkers.Add(1) + go func() { + err := pipeline.ProcessChannel(relationRequests, r.RelationsConnector, relationBeat) + if err != nil { + panic(err) + } + pgWorkers.Done() + }() + pgWorkers.Wait() - return r.Connector.CreateIndexes() + return r.NodeConnector.CreateIndexes() } func LoadOsmMeta() *cli.Command { @@ -90,7 +152,7 @@ func LoadOsmMeta() *cli.Command { // OUTPUT FORMAT &cli.BoolFlag{Name: "latlong", Value: false, Usage: "Store coordinates in degrees of latitude & longitude."}, - &cli.StringFlag{Name: "t", Aliases: []string{"table"}, Value: "planet_data", Usage: "Output table name"}, + &cli.StringFlag{Name: "t", Aliases: []string{"table"}, Value: "planet_osm", Usage: "Prefix of table"}, &cli.BoolFlag{Name: "j", Aliases: []string{"json"}, Value: true, Usage: "Add tags without column to an additional json (key/value) column in the database tables."}, diff --git a/core/commands/load_wof.go b/core/commands/load_wof.go index 8bb2ce5..d49ddf1 100644 --- a/core/commands/load_wof.go +++ b/core/commands/load_wof.go @@ -33,7 +33,7 @@ var latLngFields = []connectors.Column{ } var geomFields = []connectors.Column{ - {Name: "latlng", Type: connectors.Point, Indexed: true}, + {Name: "geometry", Type: connectors.Point, Indexed: true}, } func determineCols(c *utils.Config) []connectors.Column { diff --git a/core/commands/osm/data.go b/core/commands/osm/data.go index f62679a..8e98523 100644 --- a/core/commands/osm/data.go +++ b/core/commands/osm/data.go @@ -30,20 +30,11 @@ type Info struct { } type Node struct { - OsmId int64 - Class string - Type string - Lat float64 - Lon float64 - Tags string - Names string + Content map[string]interface{} } type Way struct { - ID int64 - Tags map[string]string - NodeIDs []int64 - Info Info + Content map[string]interface{} } type MemberType int @@ -61,8 +52,5 @@ type Member struct { } type Relation struct { - ID int64 - Tags map[string]string - Members []Member - Info Info + Content map[string]interface{} } diff --git a/core/commands/osm/decode_tag.go b/core/commands/osm/decode_tag.go index 3807ce4..87acab0 100644 --- a/core/commands/osm/decode_tag.go +++ b/core/commands/osm/decode_tag.go @@ -91,18 +91,50 @@ var keyVal = []byte(`":"`) var quotes = []byte(`"`) var endPar = []byte(`}`) var nameBytes = []byte(`name`) +var addressBytes = []byte(`addr:`) var carriageReturn = regexp.MustCompile(`[\n\r\t"\\]`) var escapeQuote = regexp.MustCompile(`"`) +type json struct { + started bool + buffer *strings.Builder +} + +func newJson() json { + return json{ + buffer: &strings.Builder{}, + } +} +func (js *json) close() { + if js.started { + js.buffer.Write(endPar) + } +} + +func (js *json) toString() string { + return js.buffer.String() +} +func (js *json) add(key []byte, val []byte) { + if js.started { + js.buffer.Write(openWithComma) + } else { + js.started = true + js.buffer.Write(openPar) + js.buffer.Write(quotes) + } + js.buffer.Write(key) + js.buffer.Write(keyVal) + cleaned := carriageReturn.ReplaceAll(val, []byte{}) + js.buffer.Write(cleaned) + js.buffer.Write(quotes) +} + // Make tags map from stringtable and array of IDs (used in DenseNodes encoding). -func (tu *tagUnpacker) next() (string, string, string, string) { +func (tu *tagUnpacker) next() (string, string, string, string, string) { var class, osmType string - tagsJson := strings.Builder{} - nameJson := strings.Builder{} - tagsJson.Write(openPar) - nameJson.Write(openPar) - firstName := true - firstTag := true + tagsJson := newJson() + nameJson := newJson() + addressJson := newJson() keyLoop: for tu.index < len(tu.keysVals) { keyID := tu.keysVals[tu.index] @@ -125,38 +157,20 @@ keyLoop: if bytes.Equal(b, keyBytes) { class = string(b) osmType = string(valBytes) - continue keyLoop + break // add key anyway } } if bytes.Contains(keyBytes, nameBytes) { - if !firstName { - nameJson.Write(openWithComma) - } else { - firstName = false - nameJson.Write(quotes) - } - nameJson.Write(keyBytes) - nameJson.Write(keyVal) - cleaned := carriageReturn.ReplaceAll(valBytes, []byte{}) - nameJson.Write(cleaned) - nameJson.Write(quotes) + nameJson.add(keyBytes, valBytes) + } else if bytes.HasPrefix(keyBytes, addressBytes) { + addressJson.add(keyBytes, valBytes) } else { - if !firstTag { - tagsJson.Write(openWithComma) - } else { - firstTag = false - tagsJson.Write(quotes) - } - tagsJson.Write(keyBytes) - tagsJson.Write(keyVal) - cleaned := carriageReturn.ReplaceAll(valBytes, []byte{}) - tagsJson.Write(cleaned) - tagsJson.Write(quotes) - + tagsJson.add(keyBytes, valBytes) } } - tagsJson.Write(endPar) - nameJson.Write(endPar) - return tagsJson.String(), nameJson.String(), class, osmType + tagsJson.close() + nameJson.close() + addressJson.close() + return tagsJson.toString(), nameJson.toString(), addressJson.toString(), class, osmType } diff --git a/core/commands/osm/decoder_data.go b/core/commands/osm/decoder_data.go index 4dd7fc4..c9c7268 100644 --- a/core/commands/osm/decoder_data.go +++ b/core/commands/osm/decoder_data.go @@ -3,7 +3,6 @@ package osm import ( "github.com/meekyphotos/experive-cli/core/commands/pbf" "google.golang.org/protobuf/proto" - "time" ) type dataDecoder struct { @@ -37,16 +36,16 @@ func (dec *dataDecoder) parsePrimitiveBlock(pb *pbf.PrimitiveBlock) { } func (dec *dataDecoder) parsePrimitiveGroup(pb *pbf.PrimitiveBlock, pg *pbf.PrimitiveGroup) { - if !dec.skipNodes { - dec.parseNodes(pb, pg.GetNodes()) - dec.parseDenseNodes(pb, pg.GetDense()) - } - if !dec.skipWays { - dec.parseWays(pb, pg.GetWays()) - } - if !dec.skipRelations { - dec.parseRelations(pb, pg.GetRelations()) - } + //if !dec.skipNodes { + dec.parseNodes(pb, pg.GetNodes()) + dec.parseDenseNodes(pb, pg.GetDense()) + //} + //if !dec.skipWays { + dec.parseWays(pb, pg.GetWays()) + //} + //if !dec.skipRelations { + dec.parseRelations(pb, pg.GetRelations()) + //} } func (dec *dataDecoder) parseNodes(pb *pbf.PrimitiveBlock, nodes []*pbf.Node) { @@ -66,14 +65,17 @@ func (dec *dataDecoder) parseNodes(pb *pbf.PrimitiveBlock, nodes []*pbf.Node) { tags, names, class, osmType := ExtractInfo(st, node.GetKeys(), node.GetVals()) if len(tags) != 2 || len(names) != 2 { + dec.q = append(dec.q, &Node{ - OsmId: id, - Lat: latitude, - Lon: longitude, - Tags: tags, - Names: names, - Class: class, - Type: osmType, + Content: map[string]interface{}{ + "osm_id": id, + "class": class, + "type": osmType, + "latitude": latitude, + "longitude": longitude, + "metadata": tags, + "names": names, + }, }) } } @@ -97,9 +99,21 @@ func (dec *dataDecoder) parseDenseNodes(pb *pbf.PrimitiveBlock, dn *pbf.DenseNod lon = lons[index] + lon latitude := 1e-9 * float64(latOffset+(granularity*lat)) longitude := 1e-9 * float64(lonOffset+(granularity*lon)) - tags, names, class, osmType := tu.next() - if len(tags) != 2 || len(names) != 2 { - dec.q = append(dec.q, &Node{id, class, osmType, latitude, longitude, tags, names}) + tags, names, address, class, osmType := tu.next() + if len(tags) != 0 || len(names) != 0 { + dec.q = append(dec.q, &Node{ + Content: map[string]interface{}{ + "osm_id": id, + "osm_type": 'N', + "class": class, + "type": osmType, + "name": names, + "address": address, + "latitude": latitude, + "longitude": longitude, + "extratags": tags, + }, + }) } } } @@ -119,7 +133,14 @@ func (dec *dataDecoder) parseWays(pb *pbf.PrimitiveBlock, ways []*pbf.Way) { nodeID = refs[index] + nodeID // delta encoding nodeIDs[index] = nodeID } - dec.q = append(dec.q, &Way{ID: id, Tags: tags, NodeIDs: nodeIDs}) + + dec.q = append(dec.q, &Way{ + Content: map[string]interface{}{ + "osm_id": id, + "extratags": tags, + "node_ids": nodeIDs, + }, + }) } } @@ -154,37 +175,18 @@ func extractMembers(stringTable [][]byte, rel *pbf.Relation) []Member { func (dec *dataDecoder) parseRelations(pb *pbf.PrimitiveBlock, relations []*pbf.Relation) { st := pb.GetStringtable().GetS() - dateGranularity := int64(pb.GetDateGranularity()) for _, rel := range relations { id := rel.GetId() tags := extractTags(st, rel.GetKeys(), rel.GetVals()) members := extractMembers(st, rel) - info := extractInfo(st, rel.GetInfo(), dateGranularity) - dec.q = append(dec.q, &Relation{id, tags, members, info}) + dec.q = append(dec.q, &Relation{ + Content: map[string]interface{}{ + "osm_id": id, + "extratags": tags, + "members": members, + }, + }) } } - -func extractInfo(stringTable [][]byte, i *pbf.Info, dateGranularity int64) Info { - info := Info{Visible: true} - - if i != nil { - info.Version = i.GetVersion() - - millisec := time.Duration(i.GetTimestamp()*dateGranularity) * time.Millisecond - info.Timestamp = time.Unix(0, millisec.Nanoseconds()).UTC() - - info.Changeset = i.GetChangeset() - - info.Uid = i.GetUid() - - info.User = string(stringTable[i.GetUserSid()]) - - if i.Visible != nil { - info.Visible = i.GetVisible() - } - } - - return info -} diff --git a/core/commands/pipeline/api.go b/core/commands/pipeline/api.go index 526c773..a7a987b 100644 --- a/core/commands/pipeline/api.go +++ b/core/commands/pipeline/api.go @@ -54,7 +54,7 @@ type ProgressBarBeat struct { } func (b *ProgressBarBeat) Start() { - b.bar = progressbar.Default(-1, "Writing") + b.bar = progressbar.Default(-1, b.OperationName) } func (b *ProgressBarBeat) Beat(amount int) { diff --git a/core/commands/pipeline/connector_processor.go b/core/commands/pipeline/connector_processor.go index 7dc4414..620c110 100644 --- a/core/commands/pipeline/connector_processor.go +++ b/core/commands/pipeline/connector_processor.go @@ -5,7 +5,7 @@ import ( "time" ) -func ProcessChannel(channel chan []map[string]interface{}, db connectors.Connector, beat Heartbeat) error { +func ProcessChannel(channel chan []map[string]interface{}, nodes connectors.Connector, beat Heartbeat) error { beat.Start() defer beat.Done() for { @@ -15,7 +15,7 @@ func ProcessChannel(channel chan []map[string]interface{}, db connectors.Connect if i == 0 { return nil } - err := db.Write(content) + err := nodes.Write(content) beat.Beat(i) if err != nil { return err diff --git a/core/commands/pipeline/pbf_reader.go b/core/commands/pipeline/pbf_reader.go index f56e044..59704a8 100644 --- a/core/commands/pipeline/pbf_reader.go +++ b/core/commands/pipeline/pbf_reader.go @@ -7,12 +7,14 @@ import ( "runtime" ) -func ReadFromPbf(path string, heartbeat Heartbeat) (chan map[string]interface{}, error) { +func ReadFromPbf(path string, heartbeat Heartbeat) (chan map[string]interface{}, chan map[string]interface{}, chan map[string]interface{}, error) { f, err := os.Open(path) if err != nil { - return nil, err + return nil, nil, nil, err } - out := make(chan map[string]interface{}, 100000) + outNodes := make(chan map[string]interface{}, 100000) + outWays := make(chan map[string]interface{}, 100000) + outRelations := make(chan map[string]interface{}, 100000) heartbeat.Start() go func() { defer f.Close() @@ -23,7 +25,6 @@ func ReadFromPbf(path string, heartbeat Heartbeat) (chan map[string]interface{}, } d := osm.NewDecoder(open) d.SetBufferSize(osm.MaxBlobSize) - d.Skip(false, true, true) if err := d.Start(runtime.GOMAXPROCS(-1)); err != nil { panic(err) } @@ -38,22 +39,21 @@ func ReadFromPbf(path string, heartbeat Heartbeat) (chan map[string]interface{}, switch v.(type) { case *osm.Node: - m := make(map[string]interface{}, 7) node := v.(*osm.Node) - m["osm_id"] = node.OsmId - m["class"] = node.Class - m["type"] = node.Type - m["latitude"] = node.Lat - m["longitude"] = node.Lon - m["metadata"] = node.Tags - m["names"] = node.Names - out <- m + outNodes <- node.Content case *osm.Way: + node := v.(*osm.Way) + outWays <- node.Content case *osm.Relation: + node := v.(*osm.Relation) + outRelations <- node.Content + default: } } - close(out) + close(outNodes) + close(outWays) + close(outRelations) }() - return out, nil + return outNodes, outWays, outRelations, nil }