Skip to content

Commit

Permalink
[Separate Reporter Part 5] Implement deviation reporter (#1900)
Browse files Browse the repository at this point in the history
* update last submission for deviation job

* deviation reporter implemented

* refactor data processing logic

* remove retrier from deviation job

* update feedHash and proof conversion

* fix feedhash and proof conversion issue

* clean up

* reflect feedback

* comment out app test
  • Loading branch information
Intizar-T authored Jul 25, 2024
1 parent 60c625e commit 367b2b7
Show file tree
Hide file tree
Showing 5 changed files with 316 additions and 371 deletions.
87 changes: 30 additions & 57 deletions node/pkg/reporter/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ func (a *App) Run(ctx context.Context) error {
return err
}

return a.startReporters(ctx)
a.startReporters(ctx)

return nil
}

func (a *App) setReporters(ctx context.Context) error {
Expand All @@ -44,15 +46,13 @@ func (a *App) setReporters(ctx context.Context) error {
return errorSentinel.ErrReporterSubmissionProxyContractNotFound
}

// TODO: probably can assign the chainHelper to app here and not reuse it in startReporters instead of closing
tmpChainHelper, err := helper.NewChainHelper(ctx)
chainHelper, err := helper.NewChainHelper(ctx)
if err != nil {
log.Error().Str("Player", "Reporter").Err(err).Msg("failed to create chain helper")
return err
}
defer tmpChainHelper.Close()

cachedWhitelist, err := ReadOnchainWhitelist(ctx, tmpChainHelper, contractAddress, GET_ONCHAIN_WHITELIST)
cachedWhitelist, err := ReadOnchainWhitelist(ctx, chainHelper, contractAddress, GET_ONCHAIN_WHITELIST)
if err != nil {
log.Error().Str("Player", "Reporter").Err(err).Msg("failed to get whitelist, starting with empty whitelist")
cachedWhitelist = []common.Address{}
Expand All @@ -78,78 +78,46 @@ func (a *App) setReporters(ctx context.Context) error {
WithInterval(groupInterval),
WithContractAddress(contractAddress),
WithCachedWhitelist(cachedWhitelist),
WithKaiaHelper(chainHelper),
WithLatestData(&a.LatestData),
)
if errNewReporter != nil {
log.Error().Str("Player", "Reporter").Err(errNewReporter).Msg("failed to set reporter")
return errNewReporter
}
reporter.LatestData = &a.LatestData
a.Reporters = append(a.Reporters, reporter)
}
if len(a.Reporters) == 0 {
log.Error().Str("Player", "Reporter").Msg("no reporters set")
return errorSentinel.ErrReporterNotFound
}

// deviationReporter, errNewDeviationReporter := NewReporter(
// ctx,
// WithConfigs(configs),
// WithInterval(DEVIATION_INTERVAL),
// WithContractAddress(contractAddress),
// WithCachedWhitelist(cachedWhitelist),
// WithJobType(DeviationJob),
// )
// if errNewDeviationReporter != nil {
// log.Error().Str("Player", "Reporter").Err(errNewDeviationReporter).Msg("failed to set deviation reporter")
// return errNewDeviationReporter
// }
// a.Reporters = append(a.Reporters, deviationReporter)
deviationReporter, errNewDeviationReporter := NewReporter(
ctx,
WithConfigs(configs),
WithInterval(DEVIATION_INTERVAL),
WithContractAddress(contractAddress),
WithCachedWhitelist(cachedWhitelist),
WithJobType(DeviationJob),
WithKaiaHelper(chainHelper),
WithLatestData(&a.LatestData),
)
if errNewDeviationReporter != nil {
log.Error().Str("Player", "Reporter").Err(errNewDeviationReporter).Msg("failed to set deviation reporter")
return errNewDeviationReporter
}
a.Reporters = append(a.Reporters, deviationReporter)

log.Info().Str("Player", "Reporter").Msgf("%d reporters set", len(a.Reporters))
return nil
}

func (a *App) startReporters(ctx context.Context) error {
var errs []string

func (a *App) startReporters(ctx context.Context) {
go a.WsHelper.Run(ctx, a.handleWsMessage)

chainHelper, chainHelperErr := helper.NewChainHelper(ctx)
if chainHelperErr != nil {
return chainHelperErr
}
a.chainHelper = chainHelper

for _, reporter := range a.Reporters {
err := a.startReporter(ctx, reporter)
if err != nil {
log.Error().Str("Player", "Reporter").Err(err).Msg("failed to start reporter")
errs = append(errs, err.Error())
}
}

if len(errs) > 0 {
return errorSentinel.ErrReporterStart
go reporter.Run(ctx)
}

return nil
}

func (a *App) startReporter(ctx context.Context, reporter *Reporter) error {
if reporter.isRunning {
log.Debug().Str("Player", "Reporter").Msg("reporter already running")
return errorSentinel.ErrReporterAlreadyRunning
}

reporter.KaiaHelper = a.chainHelper

nodeCtx, cancel := context.WithCancel(ctx)
reporter.nodeCtx = nodeCtx
reporter.nodeCancel = cancel
reporter.isRunning = true

go reporter.Run(nodeCtx)
return nil
}

func getConfigs(ctx context.Context) ([]Config, error) {
Expand All @@ -174,7 +142,12 @@ func groupConfigsBySubmitIntervals(reporterConfigs []Config) map[int][]Config {
}

func (a *App) handleWsMessage(ctx context.Context, data map[string]interface{}) error {
a.LatestData.Store(data["symbol"], data)
submissionData, err := ProcessDalWsRawData(data)
if err != nil {
log.Error().Str("Player", "Reporter").Err(err).Msg("failed to process dal ws raw data")
return err
}
a.LatestData.Store(data["symbol"], submissionData)
return nil
}

Expand Down
Loading

0 comments on commit 367b2b7

Please sign in to comment.