From 57b624bec3ed8f0ee6165fc23da0c90713afdafc Mon Sep 17 00:00:00 2001 From: Patrick Schork <354473+pschork@users.noreply.github.com> Date: Tue, 10 Sep 2024 13:53:27 -0700 Subject: [PATCH] Wip --- .../dataapi/queried_operators_handlers.go | 121 ++++++++++++++++-- tools/opscan/cmd/main.go | 17 ++- 2 files changed, 116 insertions(+), 22 deletions(-) diff --git a/disperser/dataapi/queried_operators_handlers.go b/disperser/dataapi/queried_operators_handlers.go index 6c2e684d5..8431134e4 100644 --- a/disperser/dataapi/queried_operators_handlers.go +++ b/disperser/dataapi/queried_operators_handlers.go @@ -2,9 +2,11 @@ package dataapi import ( "context" - "errors" + "fmt" "net" "sort" + "strings" + "sync" "time" "github.com/Layr-Labs/eigenda/api/grpc/node" @@ -170,10 +172,10 @@ func ValidOperatorIP(address string, logger logging.Logger) bool { } func (s *server) probeOperatorPorts(ctx context.Context, operatorId string) (*OperatorPortCheckResponse, error) { - operatorInfo, err := s.subgraphClient.QueryOperatorInfoByOperatorId(context.Background(), operatorId) + operatorInfo, err := s.getOperatorInfo(ctx, operatorId) if err != nil { s.logger.Warn("failed to fetch operator info", "operatorId", operatorId, "error", err) - return &OperatorPortCheckResponse{}, errors.New("operator info not found") + return &OperatorPortCheckResponse{}, err } operatorSocket := core.OperatorSocket(operatorInfo.Socket) @@ -183,11 +185,6 @@ func (s *server) probeOperatorPorts(ctx context.Context, operatorId string) (*Op dispersalSocket := operatorSocket.GetDispersalSocket() dispersalOnline := checkIsOperatorOnline(dispersalSocket, 3, s.logger) - if dispersalOnline { - // collect node info if online - getNodeInfo(ctx, dispersalSocket, operatorId, s.logger) - } - // Create the metadata regardless of online status portCheckResponse := &OperatorPortCheckResponse{ OperatorId: operatorId, @@ -204,22 +201,120 @@ func (s *server) probeOperatorPorts(ctx context.Context, operatorId string) (*Op return portCheckResponse, nil } +func (s *server) getOperatorInfo(ctx context.Context, operatorId string) (*core.IndexedOperatorInfo, error) { + operatorInfo, err := s.subgraphClient.QueryOperatorInfoByOperatorId(ctx, operatorId) + if err != nil { + s.logger.Warn("failed to fetch operator info", "operatorId", operatorId, "error", err) + return nil, fmt.Errorf("operator info not found for operatorId %s", operatorId) + } + return operatorInfo, nil +} + +func (s *server) scanOperatorsNodeInfo(ctx context.Context, operatorIds []string, logger logging.Logger) (map[string]int, error) { + registrations, err := s.subgraphClient.subgraphClient.api.QueryOperators(context.Background(), 10000) + if err != nil { + return nil, fmt.Errorf("failed to fetch indexed registered operator state - %s", err) + } + deregistrations, err := s.subgraphClient.QueryOperatorDeregistrations(context.Background(), 10000) + if err != nil { + return nil, fmt.Errorf("failed to fetch indexed deregistered operator state - %s", err) + } + + operators := make(map[string]int) + + // Add registrations + for _, registration := range registrations { + logger.Info("Operator", "operatorId", string(registration.OperatorId), "info", registration) + operators[string(registration.OperatorId)]++ + } + // Deduct deregistrations + for _, deregistration := range deregistrations { + operators[string(deregistration.OperatorId)]-- + } + + activeOperators := make([]string, 0) + for operatorId, count := range operators { + if count > 0 { + activeOperators = append(activeOperators, operatorId) + } + } + logger.Info("Active operators found", "count", len(activeOperators)) + + var wg sync.WaitGroup + var mu sync.Mutex + numWorkers := 5 + operatorChan := make(chan string, len(operatorIds)) + semvers := make(map[string]int) + worker := func() { + for operatorId := range operatorChan { + operatorInfo, err := s.getOperatorInfo(ctx, operatorId) + if err != nil { + mu.Lock() + semvers["not-found"]++ + mu.Unlock() + continue + } + operatorSocket := core.OperatorSocket(operatorInfo.Socket) + dispersalSocket := operatorSocket.GetDispersalSocket() + semver := getSemverInfo(context.Background(), operatorId, dispersalSocket, logger) + + mu.Lock() + semvers[semver]++ + mu.Unlock() + } + wg.Done() + } + + // Launch worker goroutines + for i := 0; i < numWorkers; i++ { + wg.Add(1) + go worker() + } + + // Send operator IDs to the channel + for _, operatorId := range operatorIds { + operatorChan <- operatorId + } + close(operatorChan) + + // Wait for all workers to finish + wg.Wait() + return semvers, nil +} + // query operator host info endpoint if available -func getNodeInfo(ctx context.Context, socket string, operatorId string, logger logging.Logger) { +func getSemverInfo(ctx context.Context, socket string, operatorId string, logger logging.Logger) string { conn, err := grpc.Dial(socket, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { logger.Error("Failed to dial grpc operator socket", "operatorId", operatorId, "socket", socket, "error", err) - return + return "unreachable" } defer conn.Close() client := node.NewDispersalClient(conn) reply, err := client.NodeInfo(ctx, &node.NodeInfoRequest{}) if err != nil { - logger.Info("NodeInfo", "operatorId", operatorId, "semver", "unknown") - return + var semver string + if strings.Contains(err.Error(), "Unimplemented") { + semver = "<0.8.0" + } else if strings.Contains(err.Error(), "DeadlineExceeded") { + semver = "timeout" + } else if strings.Contains(err.Error(), "Unavailable") { + semver = "refused" + } else { + semver = "error" + } + + logger.Warn("NodeInfo", "operatorId", operatorId, "semver", semver, "error", err) + return semver + } + + // local node source compiles without semver + if reply.Semver == "" { + reply.Semver = "src-compile" } - logger.Info("NodeInfo", "operatorId", operatorId, "semver", reply.Semver, "os", reply.Os, "arch", reply.Arch, "numCpu", reply.NumCpu, "memBytes", reply.MemBytes) + logger.Info("NodeInfo", "operatorId", operatorId, "socker", socket, "semver", reply.Semver, "os", reply.Os, "arch", reply.Arch, "numCpu", reply.NumCpu, "memBytes", reply.MemBytes) + return reply.Semver } // method to check if operator is online via socket dial diff --git a/tools/opscan/cmd/main.go b/tools/opscan/cmd/main.go index 5adc1677f..b1ff03afb 100644 --- a/tools/opscan/cmd/main.go +++ b/tools/opscan/cmd/main.go @@ -68,11 +68,11 @@ func RunScan(ctx *cli.Context) error { if err != nil { return fmt.Errorf("failed to fetch indexed operator state - %s", err) } - logger.Info("Scanning operators", "registrations", len(registrations), "deregistrations", len(deregistrations)) // Count registrations operators := make(map[string]int) for _, registration := range registrations { + logger.Info("Operator", "operatorId", string(registration.OperatorId), "info", registration) operators[string(registration.OperatorId)]++ } @@ -120,7 +120,7 @@ func scanOperators(subgraphClient dataapi.SubgraphClient, operatorIds []string, } operatorSocket := core.OperatorSocket(operatorInfo.Socket) retrievalSocket := operatorSocket.GetRetrievalSocket() - semver := getNodeInfo(context.Background(), retrievalSocket, config.Timeout, logger) + semver := getNodeInfo(context.Background(), operatorId, retrievalSocket, config.Timeout, logger) mu.Lock() semvers[semver]++ @@ -146,10 +146,10 @@ func scanOperators(subgraphClient dataapi.SubgraphClient, operatorIds []string, return semvers } -func getNodeInfo(ctx context.Context, socket string, timeout time.Duration, logger logging.Logger) string { +func getNodeInfo(ctx context.Context, operatorId string, socket string, timeout time.Duration, logger logging.Logger) string { conn, err := grpc.Dial(socket, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { - logger.Error("Failed to dial grpc operator socket", "socket", socket, "error", err) + logger.Error("Failed to dial grpc operator socket", "operatorId", operatorId, "socket", socket, "error", err) return "unreachable" } defer conn.Close() @@ -169,17 +169,16 @@ func getNodeInfo(ctx context.Context, socket string, timeout time.Duration, logg semver = "error" } - logger.Warn("NodeInfo", "semver", semver, "error", err) + logger.Warn("NodeInfo", "operatorId", operatorId, "semver", semver, "error", err) return semver } - // local mode compiles without semver + // local node source compiles without semver if reply.Semver == "" { - logger.Info("NodeInfo", "semver", "empty", "os", reply.Os, "arch", reply.Arch, "numCpu", reply.NumCpu, "memBytes", reply.MemBytes) - return "src-compile" + reply.Semver = "src-compile" } - logger.Info("NodeInfo", "semver", reply.Semver, "os", reply.Os, "arch", reply.Arch, "numCpu", reply.NumCpu, "memBytes", reply.MemBytes) + logger.Info("NodeInfo", "operatorId", operatorId, "socker", socket, "semver", reply.Semver, "os", reply.Os, "arch", reply.Arch, "numCpu", reply.NumCpu, "memBytes", reply.MemBytes) return reply.Semver }