Skip to content

Commit

Permalink
Re-worked how the app lifecycle is wired, should fixed inconsistent t…
Browse files Browse the repository at this point in the history
…ermination cleanup leaving `.tmp` file

Fixes #6
  • Loading branch information
maoueh committed Nov 25, 2024
1 parent 2c2e482 commit 9849e46
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 36 deletions.
42 changes: 7 additions & 35 deletions cmd/substreams-sink-files/run.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"context"
"fmt"
"os"
"path/filepath"
Expand All @@ -14,9 +13,7 @@ import (
"github.com/streamingfast/cli"
. "github.com/streamingfast/cli"
"github.com/streamingfast/cli/sflags"
"github.com/streamingfast/derr"
"github.com/streamingfast/dstore"
"github.com/streamingfast/shutter"
sink "github.com/streamingfast/substreams-sink"
substreamsfile "github.com/streamingfast/substreams-sink-files"
"github.com/streamingfast/substreams-sink-files/bundler"
Expand Down Expand Up @@ -114,12 +111,7 @@ var SyncRunCmd = Command(syncRunE,
)

func syncRunE(cmd *cobra.Command, args []string) error {
app := shutter.New()

ctx, cancelApp := context.WithCancel(cmd.Context())
app.OnTerminating(func(_ error) {
cancelApp()
})
app := cli.NewApplication(cmd.Context())

sink.RegisterMetrics()

Expand Down Expand Up @@ -225,34 +217,14 @@ func syncRunE(cmd *cobra.Command, args []string) error {
return fmt.Errorf("new bundler: %w", err)
}

fileSinker := substreamsfile.NewFileSinker(sinker, bundler, sinkEncoder, zlog, tracer)
fileSinker.OnTerminating(app.Shutdown)
app.OnTerminating(func(err error) {
zlog.Info("application terminating shutting down file sinker")
fileSinker.Shutdown(err)
})

go func() {
fileSinker.Shutdown(fileSinker.Run(ctx))
}()

signalHandler := derr.SetupSignalHandler(0 * time.Second)
zlog.Info("ready, waiting for signal to quit")
select {
case <-signalHandler:
zlog.Info("received termination signal, quitting application")
go app.Shutdown(nil)
case <-app.Terminating():
NoError(app.Err(), "application shutdown unexpectedly, quitting")
}
app.SuperviseAndStart(substreamsfile.NewFileSinker(sinker, bundler, sinkEncoder, zlog, tracer))

zlog.Info("waiting for app termination")
select {
case <-app.Terminated():
case <-ctx.Done():
case <-time.After(30 * time.Second):
zlog.Error("application did not terminated within 30s, forcing exit")
if err := app.WaitForTermination(zlog, 0*time.Second, 30*time.Second); err != nil {
zlog.Info("app termination error", zap.Error(err))
return err
}

zlog.Info("app terminated")
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion sinker.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func NewFileSinker(sinker *sink.Sinker, bundler *bundler.Bundler, encoder encode
func (fs *FileSinker) Run(ctx context.Context) error {
cursor, err := fs.bundler.GetCursor()
if err != nil {
return fmt.Errorf("faile to read cursor: %w", err)
return fmt.Errorf("failed to read cursor: %w", err)
}

fs.Sinker.OnTerminating(fs.Shutdown)
Expand Down

0 comments on commit 9849e46

Please sign in to comment.