Skip to content

Commit

Permalink
imported download / list and upload of wof
Browse files Browse the repository at this point in the history
  • Loading branch information
Michele Masili committed Jul 16, 2021
1 parent 82f0585 commit 8caab03
Show file tree
Hide file tree
Showing 23 changed files with 9,316 additions and 9 deletions.
12 changes: 11 additions & 1 deletion cmd/cli/main.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
package main

func main() {
import (
"github.com/meekyphotos/experive-cli/core/initializers"
"log"
"os"
)

func main() {
app := initializers.InitApp()
err := app.Run(os.Args)
if err != nil {
log.Fatal(err)
}
}
30 changes: 30 additions & 0 deletions core/commands/connectors/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package connectors

type DbType int

const (
Snowflake DbType = iota
Bigint DbType = iota
DoublePrecision DbType = iota
Text DbType = iota
Jsonb DbType = iota
Point DbType = iota
)

type Column struct {
Name string
Type DbType
Indexed bool
}

type DataConverter interface {
Convert(data interface{}) (string, error)
}

type Connector interface {
Connect() error
Close()
Init(columns []Column) error
Write(data []map[string]interface{}) error
CreateIndexes() error
}
174 changes: 174 additions & 0 deletions core/commands/connectors/db_connector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package connectors

import (
"database/sql"
"encoding/json"
"errors"
"fmt"
"github.com/godruoyi/go-snowflake"
"github.com/lib/pq"
"github.com/meekyphotos/experive-cli/core/utils"
"strings"
)

type PgConnector struct {
Config *utils.Config
TableName string
db *sql.DB
columns []Column
}

func (p *PgConnector) Connect() error {
config := p.Config
connStr := fmt.Sprintf("user=%s dbname=%s password=%s host=%s sslmode=disable",
config.UserName, config.DbName, config.Password, config.Host)
conn, err := sql.Open("postgres", connStr)
if err != nil {
return err
}
p.db = conn
return nil
}

func (p *PgConnector) Close() {
err := p.db.Close()
if err != nil {
panic(err)
}
}

func (p *PgConnector) Write(data []map[string]interface{}) error {
if p.columns == nil {
return errors.New("no columns found, call init before starting")
}
txn, err := p.db.Begin()
if err != nil {
return err
}
columnNames := make([]string, len(p.columns))
for i, c := range p.columns {
columnNames[i] = c.Name
}
stmt, err := txn.Prepare(pq.CopyIn(p.TableName,
columnNames...,
))
if err != nil {
return err
}
for _, row := range data {
vals := make([]interface{}, len(p.columns))
for i, c := range p.columns {
switch c.Type {
case Snowflake:
vals[i] = snowflake.ID()
case Bigint:
vals[i] = row[c.Name]
case DoublePrecision:
vals[i] = row[c.Name]
case Text:
vals[i] = row[c.Name]
case Jsonb:
marshal, err := json.Marshal(row[c.Name])
if err != nil {
return err
}
vals[i] = string(marshal)
case Point:
if lat, ok := row["latitude"]; ok {
if lng, ok := row["longitude"]; ok {
vals[i] = fmt.Sprintf("SRID=4326;POINT(%f %f)", lat, lng)
}
}
}
}
_, err := stmt.Exec(vals...)
if err != nil {
return err
}
}
_, err = stmt.Exec()
if err != nil {
return err
}
return txn.Commit()
}

func (p *PgConnector) Init(columns []Column) error {
p.columns = columns
txn, err := p.db.Begin()
if err != nil {
return err
}

if p.Config.Create {
_, err := txn.Exec("DROP TABLE IF EXISTS " + p.Config.Schema + "." + p.Config.TableName)
if err != nil {
return err
}
}
stmt := strings.Builder{}
stmt.WriteString("CREATE TABLE IF NOT EXISTS ")
stmt.WriteString(p.Config.Schema)
stmt.WriteString(".")
stmt.WriteString(p.Config.TableName)
stmt.WriteString("\n(")
for i, f := range columns {
stmt.WriteString(f.Name)
stmt.WriteString(" ")
switch f.Type {
case Bigint:
stmt.WriteString("bigint")
case DoublePrecision:
stmt.WriteString("double precision")
case Text:
stmt.WriteString("text")
case Jsonb:
stmt.WriteString("jsonb")
case Point:
stmt.WriteString("geography(POINT)")
}

if f.Name == "id" {
stmt.WriteString(" primary key")
}
if i < len(columns)-1 {
stmt.WriteString(", \n")
}
}
stmt.WriteString("\n)")
sqlStatement := stmt.String()
_, err = txn.Exec(sqlStatement)
if err != nil {
return txn.Rollback()
}
return txn.Commit()
}

func (p *PgConnector) CreateIndexes() error {
if p.columns == nil {
return errors.New("no columns found, call init before starting")
}
txn, err := p.db.Begin()
if err != nil {
return err
}
for _, c := range p.columns {
if c.Indexed {
if c.Type == Point {
_, err := txn.Exec("CREATE INDEX ON " + p.Config.Schema + "." + p.Config.TableName + " using BRIN (" + c.Name + ")")
if err != nil {
_ = txn.Rollback()
return err
}
} else {
_, err := txn.Exec("CREATE INDEX ON " + p.Config.Schema + "." + p.Config.TableName + " (" + c.Name + ")")
if err != nil {
_ = txn.Rollback()
return err
}
}

}
}
return txn.Commit()
}
21 changes: 19 additions & 2 deletions core/commands/download.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,24 @@
package commands

import "github.com/urfave/cli/v2"
import (
"github.com/urfave/cli/v2"
)

func DownloadMeta() *cli.Command {
return nil
return &cli.Command{
Name: "download",
Usage: "Download an extract from geofabrik",
ArgsUsage: "region to download",
Flags: []cli.Flag{
&cli.StringFlag{Name: "f", Aliases: []string{"format"}, Value: "pbf", Usage: "Specify format"},
},
Action: func(context *cli.Context) error {
downloader := OsmDownloader{}
err := downloader.Init()
if err != nil {
panic(err)
}
return downloader.OsmDownload(context)
},
}
}
Loading

0 comments on commit 8caab03

Please sign in to comment.