Skip to content

Commit

Permalink
Add retry mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
dz0ny committed Sep 12, 2023
1 parent 8d8eaa2 commit a91c11f
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 18 deletions.
7 changes: 7 additions & 0 deletions subsetter/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ func (t *Table) IsSelfRelated() bool {
return false
}

// IsSelfRelated returns true if a table is self related.
func TableByName(tables []Table, name string) Table {
return lo.Filter(tables, func(table Table, _ int) bool {
return table.Name == name
})[0]
}

// GetTablesWithRows returns a list of tables with the number of rows in each table.
// Warning reltuples used to dermine size is an estimate of the number of rows in the table and can be zero for small tables.
func GetTablesWithRows(conn *pgxpool.Pool) (tables []Table, err error) {
Expand Down
65 changes: 47 additions & 18 deletions subsetter/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ import (
"github.com/samber/lo"
)

type SyncError struct {
Retry bool
}

func (se *SyncError) Error() string {
return fmt.Sprintf("Sync error: retry=%t", se.Retry)
}

type Sync struct {
source *pgxpool.Pool
destination *pgxpool.Pool
Expand All @@ -38,7 +46,7 @@ func (r *Rule) Query() string {
}

func (r *Rule) Copy(s *Sync) (err error) {
log.Debug().Str("query", r.Where).Msgf("Copying forced rows for table %s", r.Table)
log.Debug().Str("query", r.Where).Msgf("Transfering forced rows for table %s", r.Table)
var data string
if data, err = CopyQueryToString(r.Query(), s.source); err != nil {
return errors.Wrapf(err, "Error copying forced rows for table %s", r.Table)
Expand Down Expand Up @@ -100,11 +108,11 @@ func copyTableData(table Table, relatedQueries []string, withLimit bool, source

var data string
if data, err = CopyTableToString(table.Name, limit, subselectQeury, source); err != nil {
log.Error().Err(err).Msgf("Error getting table data for %s", table.Name)
//log.Error().Err(err).Str("table", table.Name).Msg("Error getting table data")
return
}
if err = CopyStringToTable(table.Name, data, destination); err != nil {
log.Error().Err(err).Msgf("Error pushing table data for %s", table.Name)
//log.Error().Err(err).Str("table", table.Name).Msg("Error pushing table data")
return
}
return
Expand All @@ -131,17 +139,20 @@ retry:
return err
} else {
if len(primaryKeys) == 0 {
log.Warn().Int("depth", *depth).Msgf("No keys found for %s", relation.ForeignTable)
missingTable := lo.Filter(tables, func(table Table, _ int) bool {
return table.Name == relation.ForeignTable
})[0]
RelationalCopy(depth, tables, missingTable, visitedTables, source, destination)

missingTable := TableByName(tables, relation.ForeignTable)
if err = RelationalCopy(depth, tables, missingTable, visitedTables, source, destination); err != nil {
return errors.Wrapf(err, "Error copying table %s", missingTable.Name)
}

// Retry short circuit
*depth++

log.Debug().Int("depth", *depth).Msgf("Retrying keys for %s", relation.ForeignTable)
if *depth < 1 {
goto retry
} else {
log.Warn().Int("depth", *depth).Msgf("Max depth reached for %s", relation.ForeignTable)
log.Warn().Str("table", relation.ForeignTable).Msgf("No keys found at this time")
return errors.New("Max depth reached")
}

Expand Down Expand Up @@ -178,9 +189,8 @@ func RelationalCopy(
if lo.Contains(*visitedTables, tableName) {
continue
}
relatedTable := lo.Filter(tables, func(table Table, _ int) bool {
return table.Name == tableName
})[0]

relatedTable := TableByName(tables, tableName)
*visitedTables = append(*visitedTables, relatedTable.Name)
// Use realized query to get priamry keys that are already in the destination for all related tables

Expand All @@ -192,13 +202,16 @@ func RelationalCopy(
}

if len(relatedQueries) > 0 {
log.Debug().Str("table", relatedTable.Name).Strs("relatedQueries", relatedQueries).Msg("Copying with RelationalCopy")
log.Debug().Str("table", relatedTable.Name).Strs("relatedQueries", relatedQueries).Msg("Transfering with RelationalCopy")
}

if err = copyTableData(relatedTable, relatedQueries, false, source, destination); err != nil {
if condition, ok := err.(*pgconn.PgError); ok && condition.Code == "23503" { // foreign key violation
RelationalCopy(depth, tables, relatedTable, visitedTables, source, destination)
if err := RelationalCopy(depth, tables, relatedTable, visitedTables, source, destination); err != nil {
return errors.Wrapf(err, "Error copying table %s", relatedTable.Name)
}
}
return errors.Wrapf(err, "Error copying table %s", relatedTable.Name)
}

}
Expand All @@ -214,7 +227,7 @@ func (s *Sync) CopyTables(tables []Table) (err error) {
for _, table := range lo.Filter(tables, func(table Table, _ int) bool {
return len(table.Relations) == 0
}) {
log.Info().Str("table", table.Name).Msg("Copying")
log.Info().Str("table", table.Name).Msg("Transfering")
if err = copyTableData(table, []string{}, true, s.source, s.destination); err != nil {
return errors.Wrapf(err, "Error copying table %s", table.Name)
}
Expand All @@ -231,20 +244,36 @@ func (s *Sync) CopyTables(tables []Table) (err error) {
// Prevent infinite loop, by setting max depth
depth := 0
// Copy tables with relations
maybeRetry := []Table{}

for _, complexTable := range lo.Filter(tables, func(table Table, _ int) bool {
return len(table.Relations) > 0
}) {
log.Info().Str("table", complexTable.Name).Msg("Copying")
RelationalCopy(&depth, tables, complexTable, &visitedTables, s.source, s.destination)
log.Info().Str("table", complexTable.Name).Msg("Transfering")
if err := RelationalCopy(&depth, tables, complexTable, &visitedTables, s.source, s.destination); err != nil {
log.Info().Str("table", complexTable.Name).Msgf("Transfering failed, retrying later")
maybeRetry = append(maybeRetry, complexTable)
}

for _, include := range s.include {
if include.Table == complexTable.Name {
log.Warn().Str("table", complexTable.Name).Msgf("Copying forced rows for relational table is not supported.")
log.Warn().Str("table", complexTable.Name).Msgf("Transfering forced rows for relational table is not supported.")
}
}
}

// Retry tables with relations
visitedRetriedTables := []string{}
for _, retiredTable := range maybeRetry {
log.Info().Str("table", retiredTable.Name).Msg("Transfering")
if err := RelationalCopy(&depth, tables, retiredTable, &visitedRetriedTables, s.source, s.destination); err != nil {
log.Warn().Str("table", retiredTable.Name).Msgf("Transfering failed")
}
}

// Remove excluded rows and print reports
fmt.Println()
fmt.Println("Report:")
for _, table := range tables {
// to ensure no data is in excluded tables
for _, exclude := range s.exclude {
Expand Down

0 comments on commit a91c11f

Please sign in to comment.