Skip to content

Commit

Permalink
Merge pull request #412 from forta-protocol/change-agent-stop-condition
Browse files Browse the repository at this point in the history
Bypass agent stop conditions
  • Loading branch information
canercidam authored Feb 9, 2022
2 parents 429d4f8 + 12c9a2d commit abeb5f5
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 11 deletions.
2 changes: 1 addition & 1 deletion cmd/scanner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func initServices(ctx context.Context, cfg config.Config) ([]services.Service, e
}

registryService := registry.New(cfg, key.Address, msgClient)
agentPool := agentpool.NewAgentPool(cfg.Scan, msgClient)
agentPool := agentpool.NewAgentPool(ctx, cfg.Scan, msgClient)
txAnalyzer, err := initTxAnalyzer(ctx, cfg, as, txStream, agentPool, msgClient)
if err != nil {
return nil, err
Expand Down
7 changes: 5 additions & 2 deletions services/scanner/agentpool/agent_pool.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package agentpool

import (
"context"
"strconv"
"sync"
"time"
Expand All @@ -27,6 +28,7 @@ const (
// AgentPool maintains the pool of agents that the scanner should
// interact with.
type AgentPool struct {
ctx context.Context
agents []*poolagent.Agent
txResults chan *scanner.TxResult
blockResults chan *scanner.BlockResult
Expand All @@ -36,8 +38,9 @@ type AgentPool struct {
}

// NewAgentPool creates a new agent pool.
func NewAgentPool(cfg config.ScannerConfig, msgClient clients.MessageClient) *AgentPool {
func NewAgentPool(ctx context.Context, cfg config.ScannerConfig, msgClient clients.MessageClient) *AgentPool {
agentPool := &AgentPool{
ctx: ctx,
txResults: make(chan *scanner.TxResult, DefaultBufferSize),
blockResults: make(chan *scanner.BlockResult, DefaultBufferSize),
msgClient: msgClient,
Expand Down Expand Up @@ -246,7 +249,7 @@ func (ap *AgentPool) handleAgentVersionsUpdate(payload messaging.AgentPayload) e
found = found || (agent.Config().ContainerName() == agentCfg.ContainerName())
}
if !found {
newAgents = append(newAgents, poolagent.New(agentCfg, ap.msgClient, ap.txResults, ap.blockResults))
newAgents = append(newAgents, poolagent.New(ap.ctx, agentCfg, ap.msgClient, ap.txResults, ap.blockResults))
agentsToRun = append(agentsToRun, agentCfg)
log.WithField("agent", agentCfg.ID).Info("will trigger start")
}
Expand Down
2 changes: 2 additions & 0 deletions services/scanner/agentpool/agent_pool_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package agentpool

import (
"context"
"testing"

"github.com/forta-protocol/forta-node/clients"
Expand Down Expand Up @@ -44,6 +45,7 @@ func (s *Suite) SetupTest() {
s.msgClient = mock_clients.NewMockMessageClient(gomock.NewController(s.T()))
s.agentClient = mock_clients.NewMockAgentClient(gomock.NewController(s.T()))
s.ap = &AgentPool{
ctx: context.Background(),
txResults: make(chan *scanner.TxResult, DefaultBufferSize),
blockResults: make(chan *scanner.BlockResult, DefaultBufferSize),
msgClient: s.msgClient,
Expand Down
17 changes: 9 additions & 8 deletions services/scanner/agentpool/poolagent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package poolagent

import (
"context"
"strings"
"sync"
"time"

Expand All @@ -14,7 +13,6 @@ import (
"github.com/forta-protocol/forta-node/config"
"github.com/forta-protocol/forta-node/protocol"
"github.com/forta-protocol/forta-node/services/scanner"
"google.golang.org/grpc/codes"

log "github.com/sirupsen/logrus"
)
Expand All @@ -27,6 +25,7 @@ const (

// Agent receives blocks and transactions, and produces results.
type Agent struct {
ctx context.Context
config config.AgentConfig

txRequests chan *protocol.EvaluateTxRequest // never closed - deallocated when agent is discarded
Expand All @@ -45,8 +44,9 @@ type Agent struct {
}

// New creates a new agent.
func New(agentCfg config.AgentConfig, msgClient clients.MessageClient, txResults chan<- *scanner.TxResult, blockResults chan<- *scanner.BlockResult) *Agent {
func New(ctx context.Context, agentCfg config.AgentConfig, msgClient clients.MessageClient, txResults chan<- *scanner.TxResult, blockResults chan<- *scanner.BlockResult) *Agent {
return &Agent{
ctx: ctx,
config: agentCfg,
txRequests: make(chan *protocol.EvaluateTxRequest, DefaultBufferSize),
txResults: txResults,
Expand All @@ -60,9 +60,10 @@ func New(agentCfg config.AgentConfig, msgClient clients.MessageClient, txResults
}

func isCriticalErr(err error) bool {
errStr := err.Error()
return strings.Contains(errStr, codes.DeadlineExceeded.String()) ||
strings.Contains(errStr, codes.Unavailable.String())
return false
// errStr := err.Error()
// return strings.Contains(errStr, codes.DeadlineExceeded.String()) ||
// strings.Contains(errStr, codes.Unavailable.String())
}

// LogStatus logs the status of the agent.
Expand Down Expand Up @@ -166,7 +167,7 @@ func (agent *Agent) processTransactions() {
if agent.IsClosed() {
return
}
ctx, cancel := context.WithTimeout(context.Background(), AgentTimeout)
ctx, cancel := context.WithTimeout(agent.ctx, AgentTimeout)
lg.WithField("duration", time.Since(startTime)).Debugf("sending request")
resp, err := agent.client.EvaluateTx(ctx, request)
cancel()
Expand Down Expand Up @@ -213,7 +214,7 @@ func (agent *Agent) processBlocks() {
return
}

ctx, cancel := context.WithTimeout(context.Background(), AgentTimeout)
ctx, cancel := context.WithTimeout(agent.ctx, AgentTimeout)
lg.WithField("duration", time.Since(startTime)).Debugf("sending request")
resp, err := agent.client.EvaluateBlock(ctx, request)
cancel()
Expand Down

0 comments on commit abeb5f5

Please sign in to comment.