Skip to content

Commit

Permalink
Add retry logic
Browse files Browse the repository at this point in the history
  • Loading branch information
dz0ny committed Sep 12, 2023
1 parent 8d8eaa2 commit a50ef28
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 31 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,16 @@ Usage of subsetter:
-dst string
Destination database DSN
-exclude value
Query to ignore tables, can be used multiple times; 'users: id = 123' for a specific user, 'users: 1=1' for all users
Query to ignore tables 'users: all', can be used multiple times
-f float
Fraction of rows to copy (default 0.05)
-include value
Query to copy required tables, can be used multiple times; 'users: id = 123' for a specific user, 'users: 1=1' for all users
Query to copy required rows 'users: id = 1', can be used multiple times
-src string
Source database DSN
-v Release information
-verbose
Show more information during sync (default true)
Show more information during sync
```


Expand All @@ -60,7 +60,7 @@ pg_subsetter \
-f 0.5
-include "user: id=1"
-include "group: id=1"
-exclude "domains: domain_name ilike '%.si'"
-exclude "domains: all"
```

Expand Down
4 changes: 2 additions & 2 deletions cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ func main() {
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr})
zerolog.ErrorStackMarshaler = pkgerrors.MarshalStack

flag.Var(&extraInclude, "include", "Query to copy required tables 'users: id = 1', can be used multiple times")
flag.Var(&extraExclude, "exclude", "Query to ignore tables 'users: id = 1', can be used multiple times")
flag.Var(&extraInclude, "include", "Query to copy required rows 'users: id = 1', can be used multiple times")
flag.Var(&extraExclude, "exclude", "Query to ignore tables 'users: all', can be used multiple times")
flag.Parse()

if *ver {
Expand Down
7 changes: 5 additions & 2 deletions subsetter/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,18 @@ import (
"github.com/stevenle/topsort"
)

func TableGraph(primary string, relations []Relation) (l []string, e error) {
func TableGraph(primary string, relations []Relation) (l []string, err error) {
graph := topsort.NewGraph() // Create a new graph

for _, r := range relations {
if !r.IsSelfRelated() {
graph.AddEdge(r.PrimaryTable, r.ForeignTable)

Check failure on line 14 in subsetter/graph.go

View workflow job for this annotation

GitHub Actions / Lint

Error return value of `graph.AddEdge` is not checked (errcheck)
}
}
l, e = graph.TopSort(primary)
l, err = graph.TopSort(primary)
if err != nil {
return
}
slices.Reverse(l)
return
}
4 changes: 2 additions & 2 deletions subsetter/graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ func TestTableGraph(t *testing.T) {

got, _ := TableGraph("users", relations)

if want, _ := lo.Last(got); want != "users" {
t.Fatalf("TableGraph() = %v, want %v", got, "users")
if want, _ := lo.Nth(got, 1); want != "collaborator_api_keys" {
t.Fatalf("TableGraph() = %v, want %v", got, want)
}

}
Expand Down
7 changes: 7 additions & 0 deletions subsetter/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ func (t *Table) IsSelfRelated() bool {
return false
}

// IsSelfRelated returns true if a table is self related.
func TableByName(tables []Table, name string) Table {
return lo.Filter(tables, func(table Table, _ int) bool {
return table.Name == name
})[0]
}

// GetTablesWithRows returns a list of tables with the number of rows in each table.
// Warning reltuples used to dermine size is an estimate of the number of rows in the table and can be zero for small tables.
func GetTablesWithRows(conn *pgxpool.Pool) (tables []Table, err error) {
Expand Down
2 changes: 1 addition & 1 deletion subsetter/relations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestGetRelations(t *testing.T) {
conn *pgxpool.Pool
wantRelations []Relation
}{
{"With relation", "simple", conn, []Relation{{"relation", "simple_id", "simple", "id"}}},
{"With relation", "relation", conn, []Relation{{"relation", "simple_id", "simple", "id"}}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
75 changes: 55 additions & 20 deletions subsetter/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ import (
"github.com/samber/lo"
)

type SyncError struct {
Retry bool
}

func (se *SyncError) Error() string {
return fmt.Sprintf("Sync error: retry=%t", se.Retry)
}

type Sync struct {
source *pgxpool.Pool
destination *pgxpool.Pool
Expand All @@ -38,7 +46,7 @@ func (r *Rule) Query() string {
}

func (r *Rule) Copy(s *Sync) (err error) {
log.Debug().Str("query", r.Where).Msgf("Copying forced rows for table %s", r.Table)
log.Debug().Str("query", r.Where).Msgf("Transfering forced rows for table %s", r.Table)
var data string
if data, err = CopyQueryToString(r.Query(), s.source); err != nil {
return errors.Wrapf(err, "Error copying forced rows for table %s", r.Table)
Expand Down Expand Up @@ -100,11 +108,11 @@ func copyTableData(table Table, relatedQueries []string, withLimit bool, source

var data string
if data, err = CopyTableToString(table.Name, limit, subselectQeury, source); err != nil {
log.Error().Err(err).Msgf("Error getting table data for %s", table.Name)
//log.Error().Err(err).Str("table", table.Name).Msg("Error getting table data")
return
}
if err = CopyStringToTable(table.Name, data, destination); err != nil {
log.Error().Err(err).Msgf("Error pushing table data for %s", table.Name)
//log.Error().Err(err).Str("table", table.Name).Msg("Error pushing table data")
return
}
return
Expand All @@ -131,17 +139,20 @@ retry:
return err
} else {
if len(primaryKeys) == 0 {
log.Warn().Int("depth", *depth).Msgf("No keys found for %s", relation.ForeignTable)
missingTable := lo.Filter(tables, func(table Table, _ int) bool {
return table.Name == relation.ForeignTable
})[0]
RelationalCopy(depth, tables, missingTable, visitedTables, source, destination)

missingTable := TableByName(tables, relation.ForeignTable)
if err = RelationalCopy(depth, tables, missingTable, visitedTables, source, destination); err != nil {
return errors.Wrapf(err, "Error copying table %s", missingTable.Name)
}

// Retry short circuit
*depth++

log.Debug().Int("depth", *depth).Msgf("Retrying keys for %s", relation.ForeignTable)
if *depth < 1 {
goto retry
} else {
log.Warn().Int("depth", *depth).Msgf("Max depth reached for %s", relation.ForeignTable)
log.Warn().Str("table", relation.ForeignTable).Str("primary", relation.PrimaryTable).Msgf("No keys found at this time")
return errors.New("Max depth reached")
}

Expand Down Expand Up @@ -178,27 +189,32 @@ func RelationalCopy(
if lo.Contains(*visitedTables, tableName) {
continue
}
relatedTable := lo.Filter(tables, func(table Table, _ int) bool {
return table.Name == tableName
})[0]

relatedTable := TableByName(tables, tableName)
*visitedTables = append(*visitedTables, relatedTable.Name)
// Use realized query to get priamry keys that are already in the destination for all related tables

// Selection query for this table
relatedQueries := []string{}

for _, relation := range relatedTable.Relations {
relatedQueriesBuilder(depth, tables, relation, relatedTable, source, destination, visitedTables, &relatedQueries)
err := relatedQueriesBuilder(depth, tables, relation, relatedTable, source, destination, visitedTables, &relatedQueries)
if err != nil {
return err
}
}

if len(relatedQueries) > 0 {
log.Debug().Str("table", relatedTable.Name).Strs("relatedQueries", relatedQueries).Msg("Copying with RelationalCopy")
log.Debug().Str("table", relatedTable.Name).Strs("relatedQueries", relatedQueries).Msg("Transfering with RelationalCopy")
}

if err = copyTableData(relatedTable, relatedQueries, false, source, destination); err != nil {
if condition, ok := err.(*pgconn.PgError); ok && condition.Code == "23503" { // foreign key violation
RelationalCopy(depth, tables, relatedTable, visitedTables, source, destination)
if err := RelationalCopy(depth, tables, relatedTable, visitedTables, source, destination); err != nil {
return errors.Wrapf(err, "Error copying table %s", relatedTable.Name)
}
}
return errors.Wrapf(err, "Error copying table %s", relatedTable.Name)
}

}
Expand All @@ -214,14 +230,17 @@ func (s *Sync) CopyTables(tables []Table) (err error) {
for _, table := range lo.Filter(tables, func(table Table, _ int) bool {
return len(table.Relations) == 0
}) {
log.Info().Str("table", table.Name).Msg("Copying")
log.Info().Str("table", table.Name).Msg("Transfering")
if err = copyTableData(table, []string{}, true, s.source, s.destination); err != nil {
return errors.Wrapf(err, "Error copying table %s", table.Name)
}

for _, include := range s.include {
if include.Table == table.Name {
include.Copy(s)
err = include.Copy(s)
if err != nil {
return errors.Wrapf(err, "Error copying forced rows for table %s", table.Name)
}
}
}

Expand All @@ -231,20 +250,36 @@ func (s *Sync) CopyTables(tables []Table) (err error) {
// Prevent infinite loop, by setting max depth
depth := 0
// Copy tables with relations
maybeRetry := []Table{}

for _, complexTable := range lo.Filter(tables, func(table Table, _ int) bool {
return len(table.Relations) > 0
}) {
log.Info().Str("table", complexTable.Name).Msg("Copying")
RelationalCopy(&depth, tables, complexTable, &visitedTables, s.source, s.destination)
log.Info().Str("table", complexTable.Name).Msg("Transfering")
if err := RelationalCopy(&depth, tables, complexTable, &visitedTables, s.source, s.destination); err != nil {
log.Info().Str("table", complexTable.Name).Msgf("Transfering failed, retrying later")
maybeRetry = append(maybeRetry, complexTable)
}

for _, include := range s.include {
if include.Table == complexTable.Name {
log.Warn().Str("table", complexTable.Name).Msgf("Copying forced rows for relational table is not supported.")
log.Warn().Str("table", complexTable.Name).Msgf("Transfering forced rows for relational table is not supported.")
}
}
}

// Retry tables with relations
visitedRetriedTables := []string{}
for _, retiredTable := range maybeRetry {
log.Info().Str("table", retiredTable.Name).Msg("Transfering")
if err := RelationalCopy(&depth, tables, retiredTable, &visitedRetriedTables, s.source, s.destination); err != nil {
log.Warn().Str("table", retiredTable.Name).Msgf("Transfering failed, try increasing fraction index")
}
}

// Remove excluded rows and print reports
fmt.Println()
fmt.Println("Report:")
for _, table := range tables {
// to ensure no data is in excluded tables
for _, exclude := range s.exclude {
Expand Down

0 comments on commit a50ef28

Please sign in to comment.