Skip to content

Commit

Permalink
Add support for copy command
Browse files Browse the repository at this point in the history
  • Loading branch information
dz0ny committed Aug 11, 2023
1 parent b03af99 commit 152b231
Show file tree
Hide file tree
Showing 7 changed files with 172 additions and 10 deletions.
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
# pg-subsetter

`pg-subsetter` is a powerful and efficient tool designed to synchronize a fraction of a PostgreSQL database to another PostgreSQL database on the fly. Written in Go, this utility is tailored for the modern, scalable architecture that demands performance and robustness.
`pg-subsetter` is a powerful and efficient tool designed to synchronize a fraction of a PostgreSQL database to another PostgreSQL database on the fly, it does not copy the SCHEMA, this means that your target database has to have schema populated in some other way.

### Database Fraction Synchronization:
### Database Fraction Synchronization
`pg-subsetter` allows you to select and sync a specific subset of your database. Whether it's a fraction of a table or a particular dataset, you can have it replicated in another database without synchronizing the entire DB.

### Integrity Preservation with Foreign Keys:
### Integrity Preservation with Foreign Keys
Foreign keys play a vital role in maintaining the relationships between tables. `pg-subsetter` ensures that all foreign keys are handled correctly during the synchronization process, maintaining the integrity and relationships of the data.

### Efficient COPY Method:
### Efficient COPY Method
Utilizing the native PostgreSQL COPY command, pg-subsetter performs data transfer with high efficiency. This method significantly speeds up the synchronization process, minimizing downtime and resource consumption.

### Stateless Operation:
### Stateless Operation
`pg-subsetter` is built to be stateless, meaning it does not maintain any internal state between runs. This ensures that each synchronization process is independent, enhancing reliability and making it easier to manage and scale.


Expand Down
18 changes: 16 additions & 2 deletions subsetter/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package subsetter

import (
"context"
"fmt"
"os"

"github.com/jackc/pgx/v5"
Expand All @@ -23,19 +24,32 @@ func getTestConnection() *pgx.Conn {
func populateTests(conn *pgx.Conn) {
conn.Exec(context.Background(), `
CREATE TABLE simple (
id UUID PRIMARY KEY,
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
text TEXT
);
CREATE TABLE relation (
id UUID PRIMARY KEY,
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
simple_id UUID
);
ALTER TABLE relation ADD CONSTRAINT relation_simple_fk FOREIGN KEY (simple_id) REFERENCES simple(id);
`)
}

func populateTestsWithData(conn *pgx.Conn, table string, size int) {
for i := 0; i < size; i++ {
query := fmt.Sprintf("INSERT INTO %s (text) VALUES ('test%d') RETURNING id", table, i)
var row string
err := conn.QueryRow(context.Background(), query).Scan(&row)
fmt.Println(err)
query = fmt.Sprintf("INSERT INTO relation (simple_id) VALUES ('%v')", row)

conn.Exec(context.Background(), query)

}
}

func clearPopulateTests(conn *pgx.Conn) {
conn.Exec(context.Background(), `
ALTER TABLE relation DROP CONSTRAINT relation_simple_fk;
Expand Down
17 changes: 17 additions & 0 deletions subsetter/info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package subsetter

import "math"

// GetTargetSet returns a subset of tables with the number of rows scaled by the fraction.
func GetTargetSet(fraction float64, tables []Table) []Table {
var subset []Table

for _, table := range tables {
subset = append(subset, Table{
Name: table.Name,
Rows: int(math.Pow(10, math.Log10(float64(table.Rows))*fraction)),
})
}

return subset
}
31 changes: 31 additions & 0 deletions subsetter/info_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package subsetter

import (
"context"
"reflect"
"testing"
)

func TestGetTargetSet(t *testing.T) {
conn := getTestConnection()
populateTests(conn)
defer conn.Close(context.Background())
defer clearPopulateTests(conn)

tests := []struct {
name string
fraction float64
tables []Table
want []Table
}{
{"simple", 0.5, []Table{{"simple", 1000}}, []Table{{"simple", 31}}},
{"simple", 0.5, []Table{{"simple", 10}}, []Table{{"simple", 3}}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := GetTargetSet(tt.fraction, tt.tables); !reflect.DeepEqual(got, tt.want) {
t.Errorf("GetTargetSet() = %v, want %v", got, tt.want)
}
})
}
}
19 changes: 19 additions & 0 deletions subsetter/query.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package subsetter

import (
"bytes"
"context"
"fmt"

"github.com/jackc/pgx/v5"
)
Expand Down Expand Up @@ -40,3 +42,20 @@ func GetTablesWithRows(conn *pgx.Conn) (tables []Table, err error) {

return
}

func CopyTableToString(table string, limit int, conn *pgx.Conn) (result string, err error) {
q := fmt.Sprintf(`copy (SELECT * FROM %s order by random() limit %d) to stdout`, table, limit)
var buff bytes.Buffer
conn.PgConn().CopyTo(context.Background(), &buff, q)
result = buff.String()
return
}

func CopyStringToTable(table string, data string, conn *pgx.Conn) (err error) {
q := fmt.Sprintf(`copy %s from stdout`, table)
var buff bytes.Buffer
buff.WriteString(data)
conn.PgConn().CopyFrom(context.Background(), &buff, q)

return
}
74 changes: 74 additions & 0 deletions subsetter/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package subsetter
import (
"context"
"reflect"
"strings"
"testing"

"github.com/jackc/pgx/v5"
Expand Down Expand Up @@ -55,3 +56,76 @@ func TestGetTablesWithRows(t *testing.T) {
})
}
}

func TestCopyRowToString(t *testing.T) {
conn := getTestConnection()
populateTests(conn)
defer conn.Close(context.Background())
defer clearPopulateTests(conn)
populateTestsWithData(conn, "simple", 10)

tests := []struct {
name string
table string
conn *pgx.Conn
wantResult bool
wantErr bool
}{
{"With tables", "simple", conn, true, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotResult, err := CopyTableToString(tt.table, 10, tt.conn)
if (err != nil) != tt.wantErr {
t.Errorf("CopyRowToString() error = %v, wantErr %v", err, tt.wantErr)
return
}
if strings.Contains(gotResult, "test") != tt.wantResult {
t.Errorf("CopyRowToString() = %v, want %v", gotResult, tt.wantResult)
}
})
}
}

func TestCopyStringToTable(t *testing.T) {
conn := getTestConnection()
populateTests(conn)
defer conn.Close(context.Background())
defer clearPopulateTests(conn)
populateTestsWithData(conn, "simple", 10)

tests := []struct {
name string
table string
data string
conn *pgx.Conn
wantResult int
wantErr bool
}{
{"With tables", "simple", "cccc5f58-44d3-4d7a-bf37-a97d4f081a63 test\n", conn, 1, false},
{"With more tables", "simple", "edcd63fe-303e-4d51-83ea-3fd00740ba2c test4\na170b0f5-3aec-469c-9589-cf25888a72e2 test7", conn, 2, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := CopyStringToTable(tt.table, tt.data, tt.conn)
if (err != nil) != tt.wantErr {
t.Errorf("CopyStringToTable() error = %v, wantErr %v", err, tt.wantErr)
return
}
if tt.wantResult == insertedRows(tt.table, tt.conn) {
t.Errorf("CopyStringToTable() = %v, want %v", tt.wantResult, tt.wantResult)
}

})
}
}

func insertedRows(s string, conn *pgx.Conn) int {
tables, _ := GetTablesWithRows(conn)
for _, table := range tables {
if table.Name == s {
return table.Rows
}
}
return 0
}
13 changes: 10 additions & 3 deletions subsetter/relations.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,22 @@ import (
// GetRelations returns a list of tables that have a foreign key for particular table.
func GetRelations(table string, conn *pgx.Conn) (relations []string, err error) {

q := `SELECT ccu.table_name::string AS foreign_table_name
q := `SELECT tc.table_name AS foreign_table_name
FROM
information_schema.table_constraints AS tc
JOIN information_schema.key_column_usage AS kcu
ON tc.constraint_name = kcu.constraint_name
JOIN information_schema.constraint_column_usage AS ccu
ON ccu.constraint_name = tc.constraint_name
WHERE tc.constraint_type = 'FOREIGN KEY' AND tc.table_name = $1;`
WHERE tc.constraint_type = 'FOREIGN KEY' AND ccu.table_name = $1;`

err = conn.QueryRow(context.Background(), q, table).Scan(&relations)
rows, err := conn.Query(context.Background(), q, table)
for rows.Next() {
var table string
rows.Scan(&table)

relations = append(relations, table)
}
rows.Close()
return
}

0 comments on commit 152b231

Please sign in to comment.