Skip to content

Commit

Permalink
refeactoring undo signal handling
Browse files Browse the repository at this point in the history
  • Loading branch information
ArnaudBger committed Feb 1, 2024
1 parent 828d22b commit 6a02865
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 12 deletions.
16 changes: 8 additions & 8 deletions data/psql.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,6 @@ func (p *Psql) handleTransaction(dbBlockID int64, transactionHash string) (dbTra
return
}

func (p *Psql) HandleBlockUndo(blockId string) error {
_, err := p.tx.Exec("DELETE CASCADE FROM solana_tokens.blocks WHERE hash = $1", blockId)
if err != nil {
return fmt.Errorf("deleting block: %w", err)
}
return nil
}

func (p *Psql) HandleInitializedAccount(dbBlockID int64, initializedAccounts []*pb.InitializedAccount) (err error) {
for _, initializedAccount := range initializedAccounts {
dbTransactionID, err := p.handleTransaction(dbBlockID, initializedAccount.TrxHash)
Expand All @@ -111,6 +103,14 @@ func (p *Psql) HandleInitializedAccount(dbBlockID int64, initializedAccounts []*
return nil
}

func (p *Psql) HandleBlocksUndo(lastValidBlockNum uint64) error {
_, err := p.tx.Exec("DELETE CASCADE FROM solana_tokens.blocks WHERE num > $1", lastValidBlockNum)
if err != nil {
return fmt.Errorf("deleting block from %s: %w", lastValidBlockNum, err)

Check failure on line 109 in data/psql.go

View workflow job for this annotation

GitHub Actions / Test (1.20.x, ubuntu-latest)

fmt.Errorf format %s has arg lastValidBlockNum of wrong type uint64
}
return nil
}

var NotFound = errors.New("Not found")

func (p *Psql) resolveAddress(derivedAddress string) (string, error) {
Expand Down
8 changes: 4 additions & 4 deletions data/sinker.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,15 +149,15 @@ func (s *Sinker) HandleBlockScopedData(ctx context.Context, data *pbsubstreamsrp
}

func (s *Sinker) HandleBlockUndoSignal(ctx context.Context, undoSignal *pbsubstreamsrpc.BlockUndoSignal, cursor *sink.Cursor) (err error) {
blockId := cursor.Block().ID()
lastValidBlockNum := undoSignal.LastValidBlock.Number

s.logger.Info("Handling undo block signal", zap.Stringer("block", cursor.Block()), zap.Stringer("cursor", cursor))

defer func() {
if err != nil {
if s.db.tx != nil {
e := s.db.RollbackTransaction()
err = fmt.Errorf("undo block: %s rollback transaction: %w: while handling err %w", blockId, e, err)
err = fmt.Errorf("undo blocks: %s rollback transaction: %w: while handling err %w", lastValidBlockNum, e, err)

Check failure on line 160 in data/sinker.go

View workflow job for this annotation

GitHub Actions / Test (1.20.x, ubuntu-latest)

fmt.Errorf format %s has arg lastValidBlockNum of wrong type uint64
}

return
Expand All @@ -172,9 +172,9 @@ func (s *Sinker) HandleBlockUndoSignal(ctx context.Context, undoSignal *pbsubstr
return fmt.Errorf("begin transaction: %w", err)
}

err = s.db.HandleBlockUndo(blockId)
err = s.db.HandleBlocksUndo(lastValidBlockNum)
if err != nil {
return fmt.Errorf("handle block %s undo: %w", blockId, err)
return fmt.Errorf("handle blocks undo from %d : %w", lastValidBlockNum, err)
}

err = s.db.StoreCursor(cursor)
Expand Down

0 comments on commit 6a02865

Please sign in to comment.