Skip to content

Commit

Permalink
Adds dataapi endpoint for operator host info scan/report
Browse files Browse the repository at this point in the history
  • Loading branch information
pschork committed Sep 13, 2024
1 parent 57b624b commit 0b8ad49
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 8 deletions.
22 changes: 14 additions & 8 deletions disperser/dataapi/queried_operators_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,8 @@ func (s *server) getOperatorInfo(ctx context.Context, operatorId string) (*core.
return operatorInfo, nil
}

func (s *server) scanOperatorsNodeInfo(ctx context.Context, operatorIds []string, logger logging.Logger) (map[string]int, error) {
registrations, err := s.subgraphClient.subgraphClient.api.QueryOperators(context.Background(), 10000)
func (s *server) scanOperatorsHostInfo(ctx context.Context, logger logging.Logger) (*HostInfoReportResponse, error) {
registrations, err := s.subgraphClient.QueryOperatorsWithLimit(context.Background(), 10000)
if err != nil {
return nil, fmt.Errorf("failed to fetch indexed registered operator state - %s", err)
}
Expand Down Expand Up @@ -243,14 +243,14 @@ func (s *server) scanOperatorsNodeInfo(ctx context.Context, operatorIds []string
var wg sync.WaitGroup
var mu sync.Mutex
numWorkers := 5
operatorChan := make(chan string, len(operatorIds))
semvers := make(map[string]int)
operatorChan := make(chan string, len(activeOperators))
hostInfo := make(map[string]int)
worker := func() {
for operatorId := range operatorChan {
operatorInfo, err := s.getOperatorInfo(ctx, operatorId)
if err != nil {
mu.Lock()
semvers["not-found"]++
hostInfo["not-found"]++
mu.Unlock()
continue
}
Expand All @@ -259,7 +259,7 @@ func (s *server) scanOperatorsNodeInfo(ctx context.Context, operatorIds []string
semver := getSemverInfo(context.Background(), operatorId, dispersalSocket, logger)

mu.Lock()
semvers[semver]++
hostInfo[semver]++
mu.Unlock()
}
wg.Done()
Expand All @@ -272,14 +272,20 @@ func (s *server) scanOperatorsNodeInfo(ctx context.Context, operatorIds []string
}

// Send operator IDs to the channel
for _, operatorId := range operatorIds {
for _, operatorId := range activeOperators {
operatorChan <- operatorId
}
close(operatorChan)

// Wait for all workers to finish
wg.Wait()
return semvers, nil

// Create HostInfoReportResponse instance
hostInfoReport := &HostInfoReportResponse{
HostInfo: hostInfo,
}

return hostInfoReport, nil
}

// query operator host info endpoint if available
Expand Down
28 changes: 28 additions & 0 deletions disperser/dataapi/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ type (
DispersalOnline bool `json:"dispersal_online"`
RetrievalOnline bool `json:"retrieval_online"`
}
HostInfoReportResponse struct {
HostInfo map[string]int `json:"hostinfo"`
}

ErrorResponse struct {
Error string `json:"error"`
}
Expand Down Expand Up @@ -803,6 +807,30 @@ func (s *server) OperatorPortCheck(c *gin.Context) {
c.JSON(http.StatusOK, portCheckResponse)
}

// HostInfo report godoc
//
// @Summary Active operator hostinfo report
// @Tags OperatorsInfo
// @Produce json
// @Success 200 {object} OperatorPortCheckResponse
// @Failure 500 {object} ErrorResponse "error: Server error"
// @Router /operators-info/hostinfo-scan [get]
func (s *server) HostInfoReport(c *gin.Context) {
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) {
s.metrics.ObserveLatency("HostInfoReport", f*1000) // make milliseconds
}))
defer timer.ObserveDuration()

hostInfo, err := s.scanOperatorsHostInfo(c.Request.Context(), s.logger)
if err != nil {
s.logger.Error("failed to scan operators host info", "error", err)
s.metrics.IncrementFailedRequestNum("HostInfoReport")
errorResponse(c, err)
}
c.Writer.Header().Set(cacheControlParam, fmt.Sprintf("max-age=%d", maxOperatorPortCheckAge))
c.JSON(http.StatusOK, hostInfo)
}

// FetchDisperserServiceAvailability godoc
//
// @Summary Get status of EigenDA Disperser service.
Expand Down
1 change: 1 addition & 0 deletions disperser/dataapi/subgraph/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type (
QueryOperatorInfoByOperatorIdAtBlockNumber(ctx context.Context, operatorId string, blockNumber uint32) (*IndexedOperatorInfo, error)
QueryOperatorAddedToQuorum(ctx context.Context, startBlock, endBlock uint32) ([]*OperatorQuorum, error)
QueryOperatorRemovedFromQuorum(ctx context.Context, startBlock, endBlock uint32) ([]*OperatorQuorum, error)
QueryOperatorDeregistrations(ctx context.Context, first int) ([]*Operator, error)
}

api struct {
Expand Down
15 changes: 15 additions & 0 deletions disperser/dataapi/subgraph/mock/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,21 @@ func (m *MockSubgraphApi) QueryOperators(ctx context.Context, first int) ([]*sub
return value, args.Error(1)
}

func (m *MockSubgraphApi) QueryOperatorDeregistrations(ctx context.Context, first int) ([]*subgraph.Operator, error) {
args := m.Called()

var value []*subgraph.Operator
if args.Get(0) != nil {
value = args.Get(0).([]*subgraph.Operator)

if len(value) > first {
value = value[:first]
}
}

return value, args.Error(1)
}

func (m *MockSubgraphApi) QueryBatchNonSigningInfo(ctx context.Context, startTime, endTime int64) ([]*subgraph.BatchNonSigningInfo, error) {
args := m.Called(startTime, endTime)

Expand Down
18 changes: 18 additions & 0 deletions disperser/dataapi/subgraph_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type (
QueryOperatorQuorumEvent(ctx context.Context, startBlock, endBlock uint32) (*OperatorQuorumEvents, error)
QueryIndexedOperatorsWithStateForTimeWindow(ctx context.Context, days int32, state OperatorState) (*IndexedQueriedOperatorInfo, error)
QueryOperatorInfoByOperatorId(ctx context.Context, operatorId string) (*core.IndexedOperatorInfo, error)
QueryOperatorDeregistrations(ctx context.Context, limit int) ([]*Operator, error)
}
Batch struct {
Id []byte
Expand Down Expand Up @@ -107,6 +108,23 @@ func NewSubgraphClient(api subgraph.Api, logger logging.Logger) *subgraphClient
return &subgraphClient{api: api, logger: logger.With("component", "SubgraphClient")}
}

func (sc *subgraphClient) QueryOperatorDeregistrations(ctx context.Context, limit int) ([]*Operator, error) {
// Implement the logic to query operator deregistrations
operatorsGql, err := sc.api.QueryOperatorDeregistrations(ctx, limit)
if err != nil {
return nil, err
}
operators := make([]*Operator, len(operatorsGql))
for i, operatorGql := range operatorsGql {
operator, err := convertOperator(operatorGql)
if err != nil {
return nil, err
}
operators[i] = operator
}
return operators, nil
}

func (sc *subgraphClient) QueryBatchesWithLimit(ctx context.Context, limit, skip int) ([]*Batch, error) {
subgraphBatches, err := sc.api.QueryBatches(ctx, true, "blockTimestamp", limit, skip)
if err != nil {
Expand Down

0 comments on commit 0b8ad49

Please sign in to comment.