diff --git a/cmd/scanner/main.go b/cmd/scanner/main.go index d3a2fc68..c3b48e4f 100644 --- a/cmd/scanner/main.go +++ b/cmd/scanner/main.go @@ -120,7 +120,7 @@ func initServices(ctx context.Context, cfg config.Config) ([]services.Service, e } registryService := registry.New(cfg, key.Address, msgClient) - agentPool := agentpool.NewAgentPool(cfg.Scan, msgClient) + agentPool := agentpool.NewAgentPool(ctx, cfg.Scan, msgClient) txAnalyzer, err := initTxAnalyzer(ctx, cfg, as, txStream, agentPool, msgClient) if err != nil { return nil, err diff --git a/services/scanner/agentpool/agent_pool.go b/services/scanner/agentpool/agent_pool.go index 297e1634..f5e79fe9 100644 --- a/services/scanner/agentpool/agent_pool.go +++ b/services/scanner/agentpool/agent_pool.go @@ -1,6 +1,7 @@ package agentpool import ( + "context" "strconv" "sync" "time" @@ -27,6 +28,7 @@ const ( // AgentPool maintains the pool of agents that the scanner should // interact with. type AgentPool struct { + ctx context.Context agents []*poolagent.Agent txResults chan *scanner.TxResult blockResults chan *scanner.BlockResult @@ -36,8 +38,9 @@ type AgentPool struct { } // NewAgentPool creates a new agent pool. -func NewAgentPool(cfg config.ScannerConfig, msgClient clients.MessageClient) *AgentPool { +func NewAgentPool(ctx context.Context, cfg config.ScannerConfig, msgClient clients.MessageClient) *AgentPool { agentPool := &AgentPool{ + ctx: ctx, txResults: make(chan *scanner.TxResult, DefaultBufferSize), blockResults: make(chan *scanner.BlockResult, DefaultBufferSize), msgClient: msgClient, @@ -246,7 +249,7 @@ func (ap *AgentPool) handleAgentVersionsUpdate(payload messaging.AgentPayload) e found = found || (agent.Config().ContainerName() == agentCfg.ContainerName()) } if !found { - newAgents = append(newAgents, poolagent.New(agentCfg, ap.msgClient, ap.txResults, ap.blockResults)) + newAgents = append(newAgents, poolagent.New(ap.ctx, agentCfg, ap.msgClient, ap.txResults, ap.blockResults)) agentsToRun = append(agentsToRun, agentCfg) log.WithField("agent", agentCfg.ID).Info("will trigger start") } diff --git a/services/scanner/agentpool/agent_pool_test.go b/services/scanner/agentpool/agent_pool_test.go index b01de46f..e0f216cc 100644 --- a/services/scanner/agentpool/agent_pool_test.go +++ b/services/scanner/agentpool/agent_pool_test.go @@ -1,6 +1,7 @@ package agentpool import ( + "context" "testing" "github.com/forta-protocol/forta-node/clients" @@ -44,6 +45,7 @@ func (s *Suite) SetupTest() { s.msgClient = mock_clients.NewMockMessageClient(gomock.NewController(s.T())) s.agentClient = mock_clients.NewMockAgentClient(gomock.NewController(s.T())) s.ap = &AgentPool{ + ctx: context.Background(), txResults: make(chan *scanner.TxResult, DefaultBufferSize), blockResults: make(chan *scanner.BlockResult, DefaultBufferSize), msgClient: s.msgClient, diff --git a/services/scanner/agentpool/poolagent/agent.go b/services/scanner/agentpool/poolagent/agent.go index 9f13631d..94a58b4a 100644 --- a/services/scanner/agentpool/poolagent/agent.go +++ b/services/scanner/agentpool/poolagent/agent.go @@ -2,7 +2,6 @@ package poolagent import ( "context" - "strings" "sync" "time" @@ -14,7 +13,6 @@ import ( "github.com/forta-protocol/forta-node/config" "github.com/forta-protocol/forta-node/protocol" "github.com/forta-protocol/forta-node/services/scanner" - "google.golang.org/grpc/codes" log "github.com/sirupsen/logrus" ) @@ -27,6 +25,7 @@ const ( // Agent receives blocks and transactions, and produces results. type Agent struct { + ctx context.Context config config.AgentConfig txRequests chan *protocol.EvaluateTxRequest // never closed - deallocated when agent is discarded @@ -45,8 +44,9 @@ type Agent struct { } // New creates a new agent. -func New(agentCfg config.AgentConfig, msgClient clients.MessageClient, txResults chan<- *scanner.TxResult, blockResults chan<- *scanner.BlockResult) *Agent { +func New(ctx context.Context, agentCfg config.AgentConfig, msgClient clients.MessageClient, txResults chan<- *scanner.TxResult, blockResults chan<- *scanner.BlockResult) *Agent { return &Agent{ + ctx: ctx, config: agentCfg, txRequests: make(chan *protocol.EvaluateTxRequest, DefaultBufferSize), txResults: txResults, @@ -60,9 +60,10 @@ func New(agentCfg config.AgentConfig, msgClient clients.MessageClient, txResults } func isCriticalErr(err error) bool { - errStr := err.Error() - return strings.Contains(errStr, codes.DeadlineExceeded.String()) || - strings.Contains(errStr, codes.Unavailable.String()) + return false + // errStr := err.Error() + // return strings.Contains(errStr, codes.DeadlineExceeded.String()) || + // strings.Contains(errStr, codes.Unavailable.String()) } // LogStatus logs the status of the agent. @@ -166,7 +167,7 @@ func (agent *Agent) processTransactions() { if agent.IsClosed() { return } - ctx, cancel := context.WithTimeout(context.Background(), AgentTimeout) + ctx, cancel := context.WithTimeout(agent.ctx, AgentTimeout) lg.WithField("duration", time.Since(startTime)).Debugf("sending request") resp, err := agent.client.EvaluateTx(ctx, request) cancel() @@ -213,7 +214,7 @@ func (agent *Agent) processBlocks() { return } - ctx, cancel := context.WithTimeout(context.Background(), AgentTimeout) + ctx, cancel := context.WithTimeout(agent.ctx, AgentTimeout) lg.WithField("duration", time.Since(startTime)).Debugf("sending request") resp, err := agent.client.EvaluateBlock(ctx, request) cancel()