diff --git a/disperser/dataapi/queried_operators_handlers.go b/disperser/dataapi/queried_operators_handlers.go index 8431134e4..8b70bcf60 100644 --- a/disperser/dataapi/queried_operators_handlers.go +++ b/disperser/dataapi/queried_operators_handlers.go @@ -210,8 +210,8 @@ func (s *server) getOperatorInfo(ctx context.Context, operatorId string) (*core. return operatorInfo, nil } -func (s *server) scanOperatorsNodeInfo(ctx context.Context, operatorIds []string, logger logging.Logger) (map[string]int, error) { - registrations, err := s.subgraphClient.subgraphClient.api.QueryOperators(context.Background(), 10000) +func (s *server) scanOperatorsHostInfo(ctx context.Context, logger logging.Logger) (*HostInfoReportResponse, error) { + registrations, err := s.subgraphClient.QueryOperatorsWithLimit(context.Background(), 10000) if err != nil { return nil, fmt.Errorf("failed to fetch indexed registered operator state - %s", err) } @@ -243,14 +243,14 @@ func (s *server) scanOperatorsNodeInfo(ctx context.Context, operatorIds []string var wg sync.WaitGroup var mu sync.Mutex numWorkers := 5 - operatorChan := make(chan string, len(operatorIds)) - semvers := make(map[string]int) + operatorChan := make(chan string, len(activeOperators)) + hostInfo := make(map[string]int) worker := func() { for operatorId := range operatorChan { operatorInfo, err := s.getOperatorInfo(ctx, operatorId) if err != nil { mu.Lock() - semvers["not-found"]++ + hostInfo["not-found"]++ mu.Unlock() continue } @@ -259,7 +259,7 @@ func (s *server) scanOperatorsNodeInfo(ctx context.Context, operatorIds []string semver := getSemverInfo(context.Background(), operatorId, dispersalSocket, logger) mu.Lock() - semvers[semver]++ + hostInfo[semver]++ mu.Unlock() } wg.Done() @@ -272,14 +272,20 @@ func (s *server) scanOperatorsNodeInfo(ctx context.Context, operatorIds []string } // Send operator IDs to the channel - for _, operatorId := range operatorIds { + for _, operatorId := range activeOperators { operatorChan <- operatorId } close(operatorChan) // Wait for all workers to finish wg.Wait() - return semvers, nil + + // Create HostInfoReportResponse instance + hostInfoReport := &HostInfoReportResponse{ + HostInfo: hostInfo, + } + + return hostInfoReport, nil } // query operator host info endpoint if available diff --git a/disperser/dataapi/server.go b/disperser/dataapi/server.go index b0c327ef6..b9a9d8946 100644 --- a/disperser/dataapi/server.go +++ b/disperser/dataapi/server.go @@ -153,6 +153,10 @@ type ( DispersalOnline bool `json:"dispersal_online"` RetrievalOnline bool `json:"retrieval_online"` } + HostInfoReportResponse struct { + HostInfo map[string]int `json:"hostinfo"` + } + ErrorResponse struct { Error string `json:"error"` } @@ -803,6 +807,30 @@ func (s *server) OperatorPortCheck(c *gin.Context) { c.JSON(http.StatusOK, portCheckResponse) } +// HostInfo report godoc +// +// @Summary Active operator hostinfo report +// @Tags OperatorsInfo +// @Produce json +// @Success 200 {object} OperatorPortCheckResponse +// @Failure 500 {object} ErrorResponse "error: Server error" +// @Router /operators-info/hostinfo-scan [get] +func (s *server) HostInfoReport(c *gin.Context) { + timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { + s.metrics.ObserveLatency("HostInfoReport", f*1000) // make milliseconds + })) + defer timer.ObserveDuration() + + hostInfo, err := s.scanOperatorsHostInfo(c.Request.Context(), s.logger) + if err != nil { + s.logger.Error("failed to scan operators host info", "error", err) + s.metrics.IncrementFailedRequestNum("HostInfoReport") + errorResponse(c, err) + } + c.Writer.Header().Set(cacheControlParam, fmt.Sprintf("max-age=%d", maxOperatorPortCheckAge)) + c.JSON(http.StatusOK, hostInfo) +} + // FetchDisperserServiceAvailability godoc // // @Summary Get status of EigenDA Disperser service. diff --git a/disperser/dataapi/subgraph/api.go b/disperser/dataapi/subgraph/api.go index fac5905d8..1663883a3 100644 --- a/disperser/dataapi/subgraph/api.go +++ b/disperser/dataapi/subgraph/api.go @@ -27,6 +27,7 @@ type ( QueryOperatorInfoByOperatorIdAtBlockNumber(ctx context.Context, operatorId string, blockNumber uint32) (*IndexedOperatorInfo, error) QueryOperatorAddedToQuorum(ctx context.Context, startBlock, endBlock uint32) ([]*OperatorQuorum, error) QueryOperatorRemovedFromQuorum(ctx context.Context, startBlock, endBlock uint32) ([]*OperatorQuorum, error) + QueryOperatorDeregistrations(ctx context.Context, first int) ([]*Operator, error) } api struct { diff --git a/disperser/dataapi/subgraph/mock/api.go b/disperser/dataapi/subgraph/mock/api.go index 465c198ef..30001e6a3 100644 --- a/disperser/dataapi/subgraph/mock/api.go +++ b/disperser/dataapi/subgraph/mock/api.go @@ -66,6 +66,21 @@ func (m *MockSubgraphApi) QueryOperators(ctx context.Context, first int) ([]*sub return value, args.Error(1) } +func (m *MockSubgraphApi) QueryOperatorDeregistrations(ctx context.Context, first int) ([]*subgraph.Operator, error) { + args := m.Called() + + var value []*subgraph.Operator + if args.Get(0) != nil { + value = args.Get(0).([]*subgraph.Operator) + + if len(value) > first { + value = value[:first] + } + } + + return value, args.Error(1) +} + func (m *MockSubgraphApi) QueryBatchNonSigningInfo(ctx context.Context, startTime, endTime int64) ([]*subgraph.BatchNonSigningInfo, error) { args := m.Called(startTime, endTime) diff --git a/disperser/dataapi/subgraph_client.go b/disperser/dataapi/subgraph_client.go index 3a8480314..79090d4b9 100644 --- a/disperser/dataapi/subgraph_client.go +++ b/disperser/dataapi/subgraph_client.go @@ -36,6 +36,7 @@ type ( QueryOperatorQuorumEvent(ctx context.Context, startBlock, endBlock uint32) (*OperatorQuorumEvents, error) QueryIndexedOperatorsWithStateForTimeWindow(ctx context.Context, days int32, state OperatorState) (*IndexedQueriedOperatorInfo, error) QueryOperatorInfoByOperatorId(ctx context.Context, operatorId string) (*core.IndexedOperatorInfo, error) + QueryOperatorDeregistrations(ctx context.Context, limit int) ([]*Operator, error) } Batch struct { Id []byte @@ -107,6 +108,23 @@ func NewSubgraphClient(api subgraph.Api, logger logging.Logger) *subgraphClient return &subgraphClient{api: api, logger: logger.With("component", "SubgraphClient")} } +func (sc *subgraphClient) QueryOperatorDeregistrations(ctx context.Context, limit int) ([]*Operator, error) { + // Implement the logic to query operator deregistrations + operatorsGql, err := sc.api.QueryOperatorDeregistrations(ctx, limit) + if err != nil { + return nil, err + } + operators := make([]*Operator, len(operatorsGql)) + for i, operatorGql := range operatorsGql { + operator, err := convertOperator(operatorGql) + if err != nil { + return nil, err + } + operators[i] = operator + } + return operators, nil +} + func (sc *subgraphClient) QueryBatchesWithLimit(ctx context.Context, limit, skip int) ([]*Batch, error) { subgraphBatches, err := sc.api.QueryBatches(ctx, true, "blockTimestamp", limit, skip) if err != nil {