Skip to content

Commit

Permalink
Wip
Browse files Browse the repository at this point in the history
  • Loading branch information
pschork committed Sep 13, 2024
1 parent 7615542 commit 57b624b
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 22 deletions.
121 changes: 108 additions & 13 deletions disperser/dataapi/queried_operators_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package dataapi

import (
"context"
"errors"
"fmt"
"net"
"sort"
"strings"
"sync"
"time"

"github.com/Layr-Labs/eigenda/api/grpc/node"
Expand Down Expand Up @@ -170,10 +172,10 @@ func ValidOperatorIP(address string, logger logging.Logger) bool {
}

func (s *server) probeOperatorPorts(ctx context.Context, operatorId string) (*OperatorPortCheckResponse, error) {
operatorInfo, err := s.subgraphClient.QueryOperatorInfoByOperatorId(context.Background(), operatorId)
operatorInfo, err := s.getOperatorInfo(ctx, operatorId)
if err != nil {
s.logger.Warn("failed to fetch operator info", "operatorId", operatorId, "error", err)
return &OperatorPortCheckResponse{}, errors.New("operator info not found")
return &OperatorPortCheckResponse{}, err
}

operatorSocket := core.OperatorSocket(operatorInfo.Socket)
Expand All @@ -183,11 +185,6 @@ func (s *server) probeOperatorPorts(ctx context.Context, operatorId string) (*Op
dispersalSocket := operatorSocket.GetDispersalSocket()
dispersalOnline := checkIsOperatorOnline(dispersalSocket, 3, s.logger)

if dispersalOnline {
// collect node info if online
getNodeInfo(ctx, dispersalSocket, operatorId, s.logger)
}

// Create the metadata regardless of online status
portCheckResponse := &OperatorPortCheckResponse{
OperatorId: operatorId,
Expand All @@ -204,22 +201,120 @@ func (s *server) probeOperatorPorts(ctx context.Context, operatorId string) (*Op
return portCheckResponse, nil
}

func (s *server) getOperatorInfo(ctx context.Context, operatorId string) (*core.IndexedOperatorInfo, error) {
operatorInfo, err := s.subgraphClient.QueryOperatorInfoByOperatorId(ctx, operatorId)
if err != nil {
s.logger.Warn("failed to fetch operator info", "operatorId", operatorId, "error", err)
return nil, fmt.Errorf("operator info not found for operatorId %s", operatorId)
}
return operatorInfo, nil
}

func (s *server) scanOperatorsNodeInfo(ctx context.Context, operatorIds []string, logger logging.Logger) (map[string]int, error) {
registrations, err := s.subgraphClient.subgraphClient.api.QueryOperators(context.Background(), 10000)
if err != nil {
return nil, fmt.Errorf("failed to fetch indexed registered operator state - %s", err)
}
deregistrations, err := s.subgraphClient.QueryOperatorDeregistrations(context.Background(), 10000)
if err != nil {
return nil, fmt.Errorf("failed to fetch indexed deregistered operator state - %s", err)
}

operators := make(map[string]int)

// Add registrations
for _, registration := range registrations {
logger.Info("Operator", "operatorId", string(registration.OperatorId), "info", registration)
operators[string(registration.OperatorId)]++
}
// Deduct deregistrations
for _, deregistration := range deregistrations {
operators[string(deregistration.OperatorId)]--
}

activeOperators := make([]string, 0)
for operatorId, count := range operators {
if count > 0 {
activeOperators = append(activeOperators, operatorId)
}
}
logger.Info("Active operators found", "count", len(activeOperators))

var wg sync.WaitGroup
var mu sync.Mutex
numWorkers := 5
operatorChan := make(chan string, len(operatorIds))
semvers := make(map[string]int)
worker := func() {
for operatorId := range operatorChan {
operatorInfo, err := s.getOperatorInfo(ctx, operatorId)
if err != nil {
mu.Lock()
semvers["not-found"]++
mu.Unlock()
continue
}
operatorSocket := core.OperatorSocket(operatorInfo.Socket)
dispersalSocket := operatorSocket.GetDispersalSocket()
semver := getSemverInfo(context.Background(), operatorId, dispersalSocket, logger)

mu.Lock()
semvers[semver]++
mu.Unlock()
}
wg.Done()
}

// Launch worker goroutines
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go worker()
}

// Send operator IDs to the channel
for _, operatorId := range operatorIds {
operatorChan <- operatorId
}
close(operatorChan)

// Wait for all workers to finish
wg.Wait()
return semvers, nil
}

// query operator host info endpoint if available
func getNodeInfo(ctx context.Context, socket string, operatorId string, logger logging.Logger) {
func getSemverInfo(ctx context.Context, socket string, operatorId string, logger logging.Logger) string {
conn, err := grpc.Dial(socket, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
logger.Error("Failed to dial grpc operator socket", "operatorId", operatorId, "socket", socket, "error", err)
return
return "unreachable"
}
defer conn.Close()
client := node.NewDispersalClient(conn)
reply, err := client.NodeInfo(ctx, &node.NodeInfoRequest{})
if err != nil {
logger.Info("NodeInfo", "operatorId", operatorId, "semver", "unknown")
return
var semver string
if strings.Contains(err.Error(), "Unimplemented") {
semver = "<0.8.0"
} else if strings.Contains(err.Error(), "DeadlineExceeded") {
semver = "timeout"
} else if strings.Contains(err.Error(), "Unavailable") {
semver = "refused"
} else {
semver = "error"
}

logger.Warn("NodeInfo", "operatorId", operatorId, "semver", semver, "error", err)
return semver
}

// local node source compiles without semver
if reply.Semver == "" {
reply.Semver = "src-compile"
}

logger.Info("NodeInfo", "operatorId", operatorId, "semver", reply.Semver, "os", reply.Os, "arch", reply.Arch, "numCpu", reply.NumCpu, "memBytes", reply.MemBytes)
logger.Info("NodeInfo", "operatorId", operatorId, "socker", socket, "semver", reply.Semver, "os", reply.Os, "arch", reply.Arch, "numCpu", reply.NumCpu, "memBytes", reply.MemBytes)
return reply.Semver
}

// method to check if operator is online via socket dial
Expand Down
17 changes: 8 additions & 9 deletions tools/opscan/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,11 @@ func RunScan(ctx *cli.Context) error {
if err != nil {
return fmt.Errorf("failed to fetch indexed operator state - %s", err)
}
logger.Info("Scanning operators", "registrations", len(registrations), "deregistrations", len(deregistrations))

// Count registrations
operators := make(map[string]int)
for _, registration := range registrations {
logger.Info("Operator", "operatorId", string(registration.OperatorId), "info", registration)
operators[string(registration.OperatorId)]++
}

Expand Down Expand Up @@ -120,7 +120,7 @@ func scanOperators(subgraphClient dataapi.SubgraphClient, operatorIds []string,
}
operatorSocket := core.OperatorSocket(operatorInfo.Socket)
retrievalSocket := operatorSocket.GetRetrievalSocket()
semver := getNodeInfo(context.Background(), retrievalSocket, config.Timeout, logger)
semver := getNodeInfo(context.Background(), operatorId, retrievalSocket, config.Timeout, logger)

mu.Lock()
semvers[semver]++
Expand All @@ -146,10 +146,10 @@ func scanOperators(subgraphClient dataapi.SubgraphClient, operatorIds []string,
return semvers
}

func getNodeInfo(ctx context.Context, socket string, timeout time.Duration, logger logging.Logger) string {
func getNodeInfo(ctx context.Context, operatorId string, socket string, timeout time.Duration, logger logging.Logger) string {
conn, err := grpc.Dial(socket, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
logger.Error("Failed to dial grpc operator socket", "socket", socket, "error", err)
logger.Error("Failed to dial grpc operator socket", "operatorId", operatorId, "socket", socket, "error", err)
return "unreachable"
}
defer conn.Close()
Expand All @@ -169,17 +169,16 @@ func getNodeInfo(ctx context.Context, socket string, timeout time.Duration, logg
semver = "error"
}

logger.Warn("NodeInfo", "semver", semver, "error", err)
logger.Warn("NodeInfo", "operatorId", operatorId, "semver", semver, "error", err)
return semver
}

// local mode compiles without semver
// local node source compiles without semver
if reply.Semver == "" {
logger.Info("NodeInfo", "semver", "empty", "os", reply.Os, "arch", reply.Arch, "numCpu", reply.NumCpu, "memBytes", reply.MemBytes)
return "src-compile"
reply.Semver = "src-compile"
}

logger.Info("NodeInfo", "semver", reply.Semver, "os", reply.Os, "arch", reply.Arch, "numCpu", reply.NumCpu, "memBytes", reply.MemBytes)
logger.Info("NodeInfo", "operatorId", operatorId, "socker", socket, "semver", reply.Semver, "os", reply.Os, "arch", reply.Arch, "numCpu", reply.NumCpu, "memBytes", reply.MemBytes)
return reply.Semver
}

Expand Down

0 comments on commit 57b624b

Please sign in to comment.