Skip to content

Commit

Permalink
feat: Refactor CLI code to handle force sync option
Browse files Browse the repository at this point in the history
- Modified `String` method of `arrayForce` to join strings with a comma
- Added import statements for packages "os" and "github.com/rs/zerolog"
- Initialized logger with ConsoleWriter for standard error output
- Added command line flag for "force" option
- Logged message with tables to force sync if length of "forceSync" array is greater than 0
  • Loading branch information
dz0ny committed Aug 12, 2023
1 parent 8ae0c5e commit 603dbdf
Show file tree
Hide file tree
Showing 16 changed files with 366 additions and 92 deletions.
4 changes: 1 addition & 3 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ on:
branches: ["main"]

jobs:
vuln:
uses: cristalhq/.github/.github/workflows/[email protected]

build:
runs-on: ubuntu-latest
Expand Down Expand Up @@ -48,7 +46,7 @@ jobs:
run: make build

- name: Test
run: go test -v ./...
run: go test -timeout 30s -v ./...

- name: Upload assets
uses: actions/upload-artifact@v3
Expand Down
11 changes: 11 additions & 0 deletions .github/workflows/vuln.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
name: vuln

on:
push:
branches: ["main"]
pull_request:
branches: ["main"]

jobs:
vuln:
uses: cristalhq/.github/.github/workflows/[email protected]
23 changes: 16 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# pg-subsetter

[![lint](https://github.com/teamniteo/pg-subsetter/actions/workflows/lint.yml/badge.svg)](https://github.com/teamniteo/pg-subsetter/actions/workflows/lint.yml) [![build](https://github.com/teamniteo/pg-subsetter/actions/workflows/go.yml/badge.svg)](https://github.com/teamniteo/pg-subsetter/actions/workflows/go.yml)
[![lint](https://github.com/teamniteo/pg-subsetter/actions/workflows/lint.yml/badge.svg)](https://github.com/teamniteo/pg-subsetter/actions/workflows/lint.yml) [![build](https://github.com/teamniteo/pg-subsetter/actions/workflows/go.yml/badge.svg)](https://github.com/teamniteo/pg-subsetter/actions/workflows/go.yml) [![vuln](https://github.com/teamniteo/pg-subsetter/actions/workflows/vuln.yml/badge.svg)](https://github.com/teamniteo/pg-subsetter/actions/workflows/vuln.yml)


`pg-subsetter` is a powerful and efficient tool designed to synchronize a fraction of a PostgreSQL database to another PostgreSQL database on the fly, it does not copy the SCHEMA, this means that your target database has to have schema populated in some other way.
`pg-subsetter` is a tool designed to synchronize a fraction of a PostgreSQL database to another PostgreSQL database on the fly, it does not copy the SCHEMA. This means that your target database has to have schema populated in some other way.

### Database Fraction Synchronization
`pg-subsetter` allows you to select and sync a specific subset of your database. Whether it's a fraction of a table or a particular dataset, you can have it replicated in another database without synchronizing the entire DB.
Expand All @@ -12,7 +12,7 @@
Foreign keys play a vital role in maintaining the relationships between tables. `pg-subsetter` ensures that all foreign keys are handled correctly during the synchronization process, maintaining the integrity and relationships of the data.

### Efficient COPY Method
Utilizing the native PostgreSQL COPY command, pg-subsetter performs data transfer with high efficiency. This method significantly speeds up the synchronization process, minimizing downtime and resource consumption.
Utilizing the native PostgreSQL COPY command, `pg-subsetter` performs data transfer with high efficiency. This method significantly speeds up the synchronization process, minimizing downtime and resource consumption.

### Stateless Operation
`pg-subsetter` is built to be stateless, meaning it does not maintain any internal state between runs. This ensures that each synchronization process is independent, enhancing reliability and making it easier to manage and scale.
Expand All @@ -23,17 +23,26 @@ Utilizing the native PostgreSQL COPY command, pg-subsetter performs data transfe
```
Usage of subsetter:
-dst string
Destination DSN
Destination database DSN
-f float
Fraction of rows to copy (default 0.05)
-force value
Query to copy required tables (users: id = 1)
-src string
Source DSN
Source database DSN
```


Example:
### Example

```pg-subsetter -src postgresql://:@/bigdb -dst postgresql://:@/littledb -f 0.05```
Copy a fraction of the database and force certain rows to be also copied over.

```
pg-subsetter \
-src "postgres://test_source@localhost:5432/test_source?sslmode=disable" \
-dst "postgres://test_target@localhost:5432/test_target?sslmode=disable" \
-f 0.05
```

# Installing

Expand Down
24 changes: 24 additions & 0 deletions cli/force.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package main

import (
"fmt"
"strings"

"niteo.co/subsetter/subsetter"
)

type arrayForce []subsetter.Force

func (i *arrayForce) String() string {
return fmt.Sprintf("%v", *i)
}

func (i *arrayForce) Set(value string) error {
q := strings.SplitAfter(strings.TrimSpace(value), ":")

*i = append(*i, subsetter.Force{
Table: q[0],
Where: q[1],
})
return nil
}
27 changes: 26 additions & 1 deletion cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,26 @@ package main

import (
"flag"
"os"

"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/rs/zerolog/pkgerrors"
"niteo.co/subsetter/subsetter"
)

var src = flag.String("src", "", "Source database DSN")
var dst = flag.String("dst", "", "Destination database DSN")
var fraction = flag.Float64("f", 0.05, "Fraction of rows to copy")
var verbose = flag.Bool("verbose", true, "Show more information during sync")
var forceSync arrayForce

func main() {
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr})
zerolog.ErrorStackMarshaler = pkgerrors.MarshalStack

flag.Var(&forceSync, "force", "Query to copy required tables (users: id = 1)")
flag.Parse()
log.Info().Msg("Starting")

if *src == "" || *dst == "" {
log.Fatal().Msg("Source and destination DSNs are required")
Expand All @@ -22,4 +31,20 @@ func main() {
log.Fatal().Msg("Fraction must be between 0 and 1")
}

if len(forceSync) > 0 {
log.Info().Str("forced", forceSync.String()).Msg("Forcing sync for tables")
}

s, err := subsetter.NewSync(*src, *dst, *fraction, forceSync, *verbose)
if err != nil {
log.Fatal().Stack().Err(err).Msg("Failed to configure sync")
}

defer s.Close()

err = s.Sync()
if err != nil {
log.Fatal().Stack().Err(err).Msg("Failed to sync")
}

}
1 change: 0 additions & 1 deletion flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
golangci-lint
postgresql
process-compose
shellcheck
nixpkgs-fmt
pgweb
];
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ require github.com/rs/zerolog v1.30.0
require (
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect
golang.org/x/sync v0.1.0 // indirect
)

require (
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.4.3 h1:cxFyXhxlvAifxnkKKdlxv8XqUf59tDlYjnV5YYfsJJY=
github.com/jackc/pgx/v5 v5.4.3/go.mod h1:Ig06C2Vu0t5qXC60W8sqIthScaEnFvojjj9dSljmHRA=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/mattn/go-colorable v0.1.12 h1:jF+Du6AlPIjs2BiUiQlKOX0rt3SujHxPnksPKZbaA40=
github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4=
github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y=
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand All @@ -28,6 +31,8 @@ golang.org/x/crypto v0.12.0 h1:tFM/ta59kqch6LlvYnPa0yx5a83cL2nHflFhYKvv9Yk=
golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw=
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 h1:3MTrJm4PyNL9NBqvYDSj3DHl46qQakyfqfWo4jgfaEM=
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM=
Expand Down
46 changes: 37 additions & 9 deletions subsetter/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,52 @@ import (
"context"
"fmt"
"os"
"sync"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)

func getTestConnection() *pgx.Conn {
var testConnSrc *pgxpool.Pool
var onceTestSrc sync.Once
var testConnTrg *pgxpool.Pool
var onceTestTrg sync.Once

func getTestConnection() *pgxpool.Pool {
DATABASE_URL := os.Getenv("DATABASE_URL")
if DATABASE_URL == "" {
DATABASE_URL = "postgres://test_source@localhost:5432/test_source?sslmode=disable"
}

conn, err := pgx.Connect(context.Background(), DATABASE_URL)
if err != nil {
panic(err)
onceTestSrc.Do(func() {
c, err := pgxpool.New(context.Background(), DATABASE_URL)
if err != nil {
panic(err)
}
testConnSrc = c
})

return testConnSrc
}

func getTestConnectionDst() *pgxpool.Pool {
DATABASE_URL := os.Getenv("DATABASE_URL")
if DATABASE_URL == "" {
DATABASE_URL = "postgres://test_target@localhost:5432/test_target?sslmode=disable"
}
return conn

onceTestTrg.Do(func() {
c, err := pgxpool.New(context.Background(), DATABASE_URL)
if err != nil {
panic(err)
}
testConnTrg = c
})

return testConnTrg
}

func populateTests(conn *pgx.Conn) {
func initSchema(conn *pgxpool.Pool) {

_, err := conn.Exec(context.Background(), `
CREATE TABLE simple (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
Expand All @@ -41,7 +69,7 @@ func populateTests(conn *pgx.Conn) {
}
}

func populateTestsWithData(conn *pgx.Conn, table string, size int) {
func populateTestsWithData(conn *pgxpool.Pool, table string, size int) {
for i := 0; i < size; i++ {
query := fmt.Sprintf("INSERT INTO %s (text) VALUES ('test%d') RETURNING id", table, i)
var row string
Expand All @@ -58,7 +86,7 @@ func populateTestsWithData(conn *pgx.Conn, table string, size int) {
}
}

func clearPopulateTests(conn *pgx.Conn) {
func clearSchema(conn *pgxpool.Pool) {
_, err := conn.Exec(context.Background(), `
ALTER TABLE relation DROP CONSTRAINT relation_simple_fk;
DROP TABLE simple;
Expand Down
10 changes: 4 additions & 6 deletions subsetter/info_test.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,22 @@
package subsetter

import (
"context"
"testing"
)

func TestGetTargetSet(t *testing.T) {
conn := getTestConnection()
populateTests(conn)
defer conn.Close(context.Background())
defer clearPopulateTests(conn)
initSchema(conn)
defer clearSchema(conn)

tests := []struct {
name string
fraction float64
tables []Table
want []Table
}{
{"simple", 0.5, []Table{{"simple", 1000, []string{}}}, []Table{{"simple", 31, []string{}}}},
{"simple", 0.5, []Table{{"simple", 10, []string{}}}, []Table{{"simple", 3, []string{}}}},
{"simple", 0.5, []Table{{"simple", 1000, []Relation{}}}, []Table{{"simple", 31, []Relation{}}}},
{"simple", 0.5, []Table{{"simple", 10, []Relation{}}}, []Table{{"simple", 3, []Relation{}}}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
36 changes: 27 additions & 9 deletions subsetter/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ import (
"context"
"fmt"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)

type Table struct {
Name string
Rows int
Relations []string
Relations []Relation
}

func GetTables(conn *pgx.Conn) (tables []string, err error) {
func GetTables(conn *pgxpool.Pool) (tables []string, err error) {
q := `SELECT table_name FROM information_schema.tables WHERE table_schema = 'public';`
rows, err := conn.Query(context.Background(), q)
for rows.Next() {
Expand All @@ -27,7 +27,7 @@ func GetTables(conn *pgx.Conn) (tables []string, err error) {
return
}

func GetTablesWithRows(conn *pgx.Conn) (tables []Table, err error) {
func GetTablesWithRows(conn *pgxpool.Pool) (tables []Table, err error) {
q := `SELECT relname, reltuples::int FROM pg_class,information_schema.tables WHERE table_schema = 'public' AND relname = table_name;`
rows, err := conn.Query(context.Background(), q)
for rows.Next() {
Expand All @@ -37,6 +37,11 @@ func GetTablesWithRows(conn *pgx.Conn) (tables []Table, err error) {
// fix for tables with no rows
if table.Rows == -1 {
table.Rows = 0
} else {
table.Relations, err = GetRelations(table.Name, conn)
if err != nil {
return nil, err
}
}
tables = append(tables, table)
}
Expand All @@ -47,21 +52,34 @@ func GetTablesWithRows(conn *pgx.Conn) (tables []Table, err error) {
return
}

func CopyTableToString(table string, limit int, conn *pgx.Conn) (result string, err error) {
q := fmt.Sprintf(`copy (SELECT * FROM %s order by random() limit %d) to stdout`, table, limit)
func CopyQueryToString(query string, conn *pgxpool.Pool) (result string, err error) {
q := fmt.Sprintf(`copy (%s) to stdout`, query)
var buff bytes.Buffer
if _, err = conn.PgConn().CopyFrom(context.Background(), &buff, q); err != nil {
c, err := conn.Acquire(context.Background())
if err != nil {
return
}
if _, err = c.Conn().PgConn().CopyTo(context.Background(), &buff, q); err != nil {
return
}
result = buff.String()
return
}

func CopyStringToTable(table string, data string, conn *pgx.Conn) (err error) {
func CopyTableToString(table string, limit int, conn *pgxpool.Pool) (result string, err error) {
q := fmt.Sprintf(`SELECT * FROM %s order by random() limit %d`, table, limit)
return CopyQueryToString(q, conn)
}

func CopyStringToTable(table string, data string, conn *pgxpool.Pool) (err error) {
q := fmt.Sprintf(`copy %s from stdin`, table)
var buff bytes.Buffer
buff.WriteString(data)
if _, err = conn.PgConn().CopyFrom(context.Background(), &buff, q); err != nil {
c, err := conn.Acquire(context.Background())
if err != nil {
return
}
if _, err = c.Conn().PgConn().CopyFrom(context.Background(), &buff, q); err != nil {
return
}

Expand Down
Loading

0 comments on commit 603dbdf

Please sign in to comment.