Skip to content

Commit

Permalink
Added relations and way parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
Michele Masili committed Jul 19, 2021
1 parent cb42f3b commit d7a8fdf
Show file tree
Hide file tree
Showing 9 changed files with 214 additions and 143 deletions.
27 changes: 16 additions & 11 deletions core/commands/connectors/db_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
type PgConnector struct {
Config *utils.Config
TableName string
db *sql.DB
Db *sql.DB
columns []Column
}

Expand All @@ -27,12 +27,12 @@ func (p *PgConnector) Connect() error {
if err != nil {
return err
}
p.db = conn
p.Db = conn
return nil
}

func (p *PgConnector) Close() {
err := p.db.Close()
err := p.Db.Close()
if err != nil {
panic(err)
}
Expand All @@ -44,7 +44,7 @@ func (p *PgConnector) Write(data []map[string]interface{}) error {
if p.columns == nil {
return errors.New("no columns found, call init before starting")
}
txn, err := p.db.Begin()
txn, err := p.Db.Begin()
if err != nil {
return err
}
Expand Down Expand Up @@ -74,7 +74,9 @@ func (p *PgConnector) Write(data []map[string]interface{}) error {
value := row[c.Name]
switch value.(type) {
case string: // already marshalled
vals[i] = value
if value != "" {
vals[i] = value
}
default:
marshal, err := json.Marshal(value)
if err != nil {
Expand Down Expand Up @@ -104,13 +106,16 @@ func (p *PgConnector) Write(data []map[string]interface{}) error {

func (p *PgConnector) Init(columns []Column) error {
p.columns = columns
txn, err := p.db.Begin()
if p.TableName == "" {
p.TableName = p.Config.TableName
}
txn, err := p.Db.Begin()
if err != nil {
return err
}

if p.Config.Create {
_, err := txn.Exec("DROP TABLE IF EXISTS " + p.Config.Schema + "." + p.Config.TableName)
_, err := txn.Exec("DROP TABLE IF EXISTS " + p.Config.Schema + "." + p.TableName)
if err != nil {
return err
}
Expand All @@ -119,7 +124,7 @@ func (p *PgConnector) Init(columns []Column) error {
stmt.WriteString("CREATE TABLE IF NOT EXISTS ")
stmt.WriteString(p.Config.Schema)
stmt.WriteString(".")
stmt.WriteString(p.Config.TableName)
stmt.WriteString(p.TableName)
stmt.WriteString("\n(")
for i, f := range columns {
stmt.WriteString(f.Name)
Expand Down Expand Up @@ -159,20 +164,20 @@ func (p *PgConnector) CreateIndexes() error {
if p.columns == nil {
return errors.New("no columns found, call init before starting")
}
txn, err := p.db.Begin()
txn, err := p.Db.Begin()
if err != nil {
return err
}
for _, c := range p.columns {
if c.Indexed {
if c.Type == Point {
_, err := txn.Exec("CREATE INDEX ON " + p.Config.Schema + "." + p.Config.TableName + " using BRIN (" + c.Name + ")")
_, err := txn.Exec("CREATE INDEX ON " + p.Config.Schema + "." + p.TableName + " using BRIN (" + c.Name + ")")
if err != nil {
_ = txn.Rollback()
return err
}
} else {
_, err := txn.Exec("CREATE INDEX ON " + p.Config.Schema + "." + p.Config.TableName + " (" + c.Name + ")")
_, err := txn.Exec("CREATE INDEX ON " + p.Config.Schema + "." + p.TableName + " (" + c.Name + ")")
if err != nil {
_ = txn.Rollback()
return err
Expand Down
94 changes: 78 additions & 16 deletions core/commands/load_osm.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,41 @@ import (
)

type OsmRunner struct {
Connector connectors.Connector
NodeConnector connectors.Connector
WaysConnector connectors.Connector
RelationsConnector connectors.Connector
}

// ID int64
// Tags map[string]string
// NodeIDs []int64

var osmFields = []connectors.Column{
{Name: "id", Type: connectors.Snowflake},
{Name: "osm_id", Type: connectors.Bigint, Indexed: true},
{Name: "osm_type", Type: connectors.Text},
{Name: "class", Type: connectors.Text},
{Name: "type", Type: connectors.Text},
{Name: "names", Type: connectors.Jsonb},
{Name: "name", Type: connectors.Jsonb},
{Name: "address", Type: connectors.Jsonb},
}

func determineOsmCols(c *utils.Config) []connectors.Column {
var wayFields = []connectors.Column{
{Name: "osm_id", Type: connectors.Bigint, Indexed: true},
{Name: "extratags", Type: connectors.Jsonb},
{Name: "node_ids", Type: connectors.Jsonb},
}

var relationFields = []connectors.Column{
{Name: "osm_id", Type: connectors.Bigint, Indexed: true},
{Name: "extratags", Type: connectors.Jsonb},
{Name: "members", Type: connectors.Jsonb},
}

func determineNodesCols(c *utils.Config) []connectors.Column {
cols := make([]connectors.Column, 0)
cols = append(cols, osmFields...)
if c.InclKeyValues {
cols = append(cols, connectors.Column{Name: "metadata", Type: connectors.Jsonb})
cols = append(cols, connectors.Column{Name: "extratags", Type: connectors.Jsonb})
}
if c.UseGeom {
cols = append(cols, geomFields...)
Expand All @@ -36,35 +55,78 @@ func determineOsmCols(c *utils.Config) []connectors.Column {
}

func (r OsmRunner) Run(c *utils.Config) error {
r.Connector = &connectors.PgConnector{
Config: c, TableName: c.TableName,
pg := &connectors.PgConnector{
Config: c, TableName: c.TableName + "_node",
}
r.NodeConnector = pg
dbErr := r.NodeConnector.Connect()
if dbErr != nil {
return dbErr
}

r.WaysConnector = &connectors.PgConnector{
Config: c, TableName: c.TableName + "_ways", Db: pg.Db,
}
dbErr := r.Connector.Connect()
r.RelationsConnector = &connectors.PgConnector{
Config: c, TableName: c.TableName + "_relations", Db: pg.Db,
}
// no need to close ways & relations
defer r.NodeConnector.Close()
dbErr = r.NodeConnector.Init(determineNodesCols(c))
if dbErr != nil {
return dbErr
}
defer r.Connector.Close()
dbErr = r.Connector.Init(determineOsmCols(c))
dbErr = r.WaysConnector.Init(wayFields)
if dbErr != nil {
return dbErr
}
channel, err := pipeline.ReadFromPbf(c.File, &pipeline.NoopBeat{})
dbErr = r.RelationsConnector.Init(relationFields)
if dbErr != nil {
return dbErr
}

nodeChannel, wayChannel, relationChannel, err := pipeline.ReadFromPbf(c.File, &pipeline.NoopBeat{})
if err != nil {
return err
}
requests := pipeline.BatchRequest(channel, 10000, time.Second)
nodeRequests := pipeline.BatchRequest(nodeChannel, 10000, time.Second)
wayRequests := pipeline.BatchRequest(wayChannel, 10000, time.Second)
relationRequests := pipeline.BatchRequest(relationChannel, 10000, time.Second)
var pgWorkers sync.WaitGroup

nodeBeat := &pipeline.ProgressBarBeat{OperationName: "Nodes"}
relationBeat := &pipeline.ProgressBarBeat{OperationName: "Relations"}
waysBeat := &pipeline.ProgressBarBeat{OperationName: "Ways"}

pgWorkers.Add(1)
go func() {
err := pipeline.ProcessChannel(nodeRequests, r.NodeConnector, nodeBeat)
if err != nil {
panic(err)
}
pgWorkers.Done()
}()

pgWorkers.Add(1)
beat := &pipeline.ProgressBarBeat{OperationName: "Writing"}
go func() {
err := pipeline.ProcessChannel(requests, r.Connector, beat)
err := pipeline.ProcessChannel(wayRequests, r.WaysConnector, waysBeat)
if err != nil {
panic(err)
}
pgWorkers.Done()
}()

pgWorkers.Add(1)
go func() {
err := pipeline.ProcessChannel(relationRequests, r.RelationsConnector, relationBeat)
if err != nil {
panic(err)
}
pgWorkers.Done()
}()

pgWorkers.Wait()
return r.Connector.CreateIndexes()
return r.NodeConnector.CreateIndexes()
}

func LoadOsmMeta() *cli.Command {
Expand All @@ -90,7 +152,7 @@ func LoadOsmMeta() *cli.Command {

// OUTPUT FORMAT
&cli.BoolFlag{Name: "latlong", Value: false, Usage: "Store coordinates in degrees of latitude & longitude."},
&cli.StringFlag{Name: "t", Aliases: []string{"table"}, Value: "planet_data", Usage: "Output table name"},
&cli.StringFlag{Name: "t", Aliases: []string{"table"}, Value: "planet_osm", Usage: "Prefix of table"},

&cli.BoolFlag{Name: "j", Aliases: []string{"json"}, Value: true, Usage: "Add tags without column to an additional json (key/value) column in the database tables."},

Expand Down
2 changes: 1 addition & 1 deletion core/commands/load_wof.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ var latLngFields = []connectors.Column{
}

var geomFields = []connectors.Column{
{Name: "latlng", Type: connectors.Point, Indexed: true},
{Name: "geometry", Type: connectors.Point, Indexed: true},
}

func determineCols(c *utils.Config) []connectors.Column {
Expand Down
18 changes: 3 additions & 15 deletions core/commands/osm/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,11 @@ type Info struct {
}

type Node struct {
OsmId int64
Class string
Type string
Lat float64
Lon float64
Tags string
Names string
Content map[string]interface{}
}

type Way struct {
ID int64
Tags map[string]string
NodeIDs []int64
Info Info
Content map[string]interface{}
}

type MemberType int
Expand All @@ -61,8 +52,5 @@ type Member struct {
}

type Relation struct {
ID int64
Tags map[string]string
Members []Member
Info Info
Content map[string]interface{}
}
82 changes: 48 additions & 34 deletions core/commands/osm/decode_tag.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,18 +91,50 @@ var keyVal = []byte(`":"`)
var quotes = []byte(`"`)
var endPar = []byte(`}`)
var nameBytes = []byte(`name`)
var addressBytes = []byte(`addr:`)
var carriageReturn = regexp.MustCompile(`[\n\r\t"\\]`)
var escapeQuote = regexp.MustCompile(`"`)

type json struct {
started bool
buffer *strings.Builder
}

func newJson() json {
return json{
buffer: &strings.Builder{},
}
}
func (js *json) close() {
if js.started {
js.buffer.Write(endPar)
}
}

func (js *json) toString() string {
return js.buffer.String()
}
func (js *json) add(key []byte, val []byte) {
if js.started {
js.buffer.Write(openWithComma)
} else {
js.started = true
js.buffer.Write(openPar)
js.buffer.Write(quotes)
}
js.buffer.Write(key)
js.buffer.Write(keyVal)
cleaned := carriageReturn.ReplaceAll(val, []byte{})
js.buffer.Write(cleaned)
js.buffer.Write(quotes)
}

// Make tags map from stringtable and array of IDs (used in DenseNodes encoding).
func (tu *tagUnpacker) next() (string, string, string, string) {
func (tu *tagUnpacker) next() (string, string, string, string, string) {
var class, osmType string
tagsJson := strings.Builder{}
nameJson := strings.Builder{}
tagsJson.Write(openPar)
nameJson.Write(openPar)
firstName := true
firstTag := true
tagsJson := newJson()
nameJson := newJson()
addressJson := newJson()
keyLoop:
for tu.index < len(tu.keysVals) {
keyID := tu.keysVals[tu.index]
Expand All @@ -125,38 +157,20 @@ keyLoop:
if bytes.Equal(b, keyBytes) {
class = string(b)
osmType = string(valBytes)
continue keyLoop
break // add key anyway
}
}

if bytes.Contains(keyBytes, nameBytes) {
if !firstName {
nameJson.Write(openWithComma)
} else {
firstName = false
nameJson.Write(quotes)
}
nameJson.Write(keyBytes)
nameJson.Write(keyVal)
cleaned := carriageReturn.ReplaceAll(valBytes, []byte{})
nameJson.Write(cleaned)
nameJson.Write(quotes)
nameJson.add(keyBytes, valBytes)
} else if bytes.HasPrefix(keyBytes, addressBytes) {
addressJson.add(keyBytes, valBytes)
} else {
if !firstTag {
tagsJson.Write(openWithComma)
} else {
firstTag = false
tagsJson.Write(quotes)
}
tagsJson.Write(keyBytes)
tagsJson.Write(keyVal)
cleaned := carriageReturn.ReplaceAll(valBytes, []byte{})
tagsJson.Write(cleaned)
tagsJson.Write(quotes)

tagsJson.add(keyBytes, valBytes)
}
}
tagsJson.Write(endPar)
nameJson.Write(endPar)
return tagsJson.String(), nameJson.String(), class, osmType
tagsJson.close()
nameJson.close()
addressJson.close()
return tagsJson.toString(), nameJson.toString(), addressJson.toString(), class, osmType
}
Loading

0 comments on commit d7a8fdf

Please sign in to comment.