diff --git a/subsetter/query.go b/subsetter/query.go index 24709fb..6507620 100644 --- a/subsetter/query.go +++ b/subsetter/query.go @@ -36,6 +36,13 @@ func (t *Table) IsSelfRelated() bool { return false } +// IsSelfRelated returns true if a table is self related. +func TableByName(tables []Table, name string) Table { + return lo.Filter(tables, func(table Table, _ int) bool { + return table.Name == name + })[0] +} + // GetTablesWithRows returns a list of tables with the number of rows in each table. // Warning reltuples used to dermine size is an estimate of the number of rows in the table and can be zero for small tables. func GetTablesWithRows(conn *pgxpool.Pool) (tables []Table, err error) { diff --git a/subsetter/sync.go b/subsetter/sync.go index 4f509fb..353ff76 100644 --- a/subsetter/sync.go +++ b/subsetter/sync.go @@ -12,6 +12,14 @@ import ( "github.com/samber/lo" ) +type SyncError struct { + Retry bool +} + +func (se *SyncError) Error() string { + return fmt.Sprintf("Sync error: retry=%t", se.Retry) +} + type Sync struct { source *pgxpool.Pool destination *pgxpool.Pool @@ -38,7 +46,7 @@ func (r *Rule) Query() string { } func (r *Rule) Copy(s *Sync) (err error) { - log.Debug().Str("query", r.Where).Msgf("Copying forced rows for table %s", r.Table) + log.Debug().Str("query", r.Where).Msgf("Transfering forced rows for table %s", r.Table) var data string if data, err = CopyQueryToString(r.Query(), s.source); err != nil { return errors.Wrapf(err, "Error copying forced rows for table %s", r.Table) @@ -100,11 +108,11 @@ func copyTableData(table Table, relatedQueries []string, withLimit bool, source var data string if data, err = CopyTableToString(table.Name, limit, subselectQeury, source); err != nil { - log.Error().Err(err).Msgf("Error getting table data for %s", table.Name) + //log.Error().Err(err).Str("table", table.Name).Msg("Error getting table data") return } if err = CopyStringToTable(table.Name, data, destination); err != nil { - log.Error().Err(err).Msgf("Error pushing table data for %s", table.Name) + //log.Error().Err(err).Str("table", table.Name).Msg("Error pushing table data") return } return @@ -131,17 +139,20 @@ retry: return err } else { if len(primaryKeys) == 0 { - log.Warn().Int("depth", *depth).Msgf("No keys found for %s", relation.ForeignTable) - missingTable := lo.Filter(tables, func(table Table, _ int) bool { - return table.Name == relation.ForeignTable - })[0] - RelationalCopy(depth, tables, missingTable, visitedTables, source, destination) + + missingTable := TableByName(tables, relation.ForeignTable) + if err = RelationalCopy(depth, tables, missingTable, visitedTables, source, destination); err != nil { + return errors.Wrapf(err, "Error copying table %s", missingTable.Name) + } + + // Retry short circuit *depth++ + log.Debug().Int("depth", *depth).Msgf("Retrying keys for %s", relation.ForeignTable) if *depth < 1 { goto retry } else { - log.Warn().Int("depth", *depth).Msgf("Max depth reached for %s", relation.ForeignTable) + log.Warn().Str("table", relation.ForeignTable).Msgf("No keys found at this time") return errors.New("Max depth reached") } @@ -178,9 +189,8 @@ func RelationalCopy( if lo.Contains(*visitedTables, tableName) { continue } - relatedTable := lo.Filter(tables, func(table Table, _ int) bool { - return table.Name == tableName - })[0] + + relatedTable := TableByName(tables, tableName) *visitedTables = append(*visitedTables, relatedTable.Name) // Use realized query to get priamry keys that are already in the destination for all related tables @@ -192,13 +202,16 @@ func RelationalCopy( } if len(relatedQueries) > 0 { - log.Debug().Str("table", relatedTable.Name).Strs("relatedQueries", relatedQueries).Msg("Copying with RelationalCopy") + log.Debug().Str("table", relatedTable.Name).Strs("relatedQueries", relatedQueries).Msg("Transfering with RelationalCopy") } if err = copyTableData(relatedTable, relatedQueries, false, source, destination); err != nil { if condition, ok := err.(*pgconn.PgError); ok && condition.Code == "23503" { // foreign key violation - RelationalCopy(depth, tables, relatedTable, visitedTables, source, destination) + if err := RelationalCopy(depth, tables, relatedTable, visitedTables, source, destination); err != nil { + return errors.Wrapf(err, "Error copying table %s", relatedTable.Name) + } } + return errors.Wrapf(err, "Error copying table %s", relatedTable.Name) } } @@ -214,7 +227,7 @@ func (s *Sync) CopyTables(tables []Table) (err error) { for _, table := range lo.Filter(tables, func(table Table, _ int) bool { return len(table.Relations) == 0 }) { - log.Info().Str("table", table.Name).Msg("Copying") + log.Info().Str("table", table.Name).Msg("Transfering") if err = copyTableData(table, []string{}, true, s.source, s.destination); err != nil { return errors.Wrapf(err, "Error copying table %s", table.Name) } @@ -231,20 +244,36 @@ func (s *Sync) CopyTables(tables []Table) (err error) { // Prevent infinite loop, by setting max depth depth := 0 // Copy tables with relations + maybeRetry := []Table{} + for _, complexTable := range lo.Filter(tables, func(table Table, _ int) bool { return len(table.Relations) > 0 }) { - log.Info().Str("table", complexTable.Name).Msg("Copying") - RelationalCopy(&depth, tables, complexTable, &visitedTables, s.source, s.destination) + log.Info().Str("table", complexTable.Name).Msg("Transfering") + if err := RelationalCopy(&depth, tables, complexTable, &visitedTables, s.source, s.destination); err != nil { + log.Info().Str("table", complexTable.Name).Msgf("Transfering failed, retrying later") + maybeRetry = append(maybeRetry, complexTable) + } for _, include := range s.include { if include.Table == complexTable.Name { - log.Warn().Str("table", complexTable.Name).Msgf("Copying forced rows for relational table is not supported.") + log.Warn().Str("table", complexTable.Name).Msgf("Transfering forced rows for relational table is not supported.") } } } + // Retry tables with relations + visitedRetriedTables := []string{} + for _, retiredTable := range maybeRetry { + log.Info().Str("table", retiredTable.Name).Msg("Transfering") + if err := RelationalCopy(&depth, tables, retiredTable, &visitedRetriedTables, s.source, s.destination); err != nil { + log.Warn().Str("table", retiredTable.Name).Msgf("Transfering failed") + } + } + // Remove excluded rows and print reports + fmt.Println() + fmt.Println("Report:") for _, table := range tables { // to ensure no data is in excluded tables for _, exclude := range s.exclude {