diff --git a/disperser/cmd/dataapi/config.go b/disperser/cmd/dataapi/config.go index fea431782..23012436c 100644 --- a/disperser/cmd/dataapi/config.go +++ b/disperser/cmd/dataapi/config.go @@ -1,9 +1,6 @@ package main import ( - "errors" - "time" - "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigenda/common/aws" "github.com/Layr-Labs/eigenda/common/geth" @@ -18,7 +15,6 @@ type Config struct { AwsClientConfig aws.ClientConfig BlobstoreConfig blobstore.Config EthClientConfig geth.EthClientConfig - FireblocksConfig common.FireblocksConfig LoggerConfig common.LoggerConfig PrometheusConfig prometheus.Config MetricsConfig dataapi.MetricsConfig @@ -29,8 +25,6 @@ type Config struct { SubgraphApiOperatorStateAddr string ServerMode string AllowOrigins []string - EjectionToken string - NonsigningRateThreshold int BLSOperatorStateRetrieverAddr string EigenDAServiceManagerAddr string @@ -38,8 +32,6 @@ type Config struct { DisperserHostname string ChurnerHostname string BatcherHealthEndpt string - - TxnTimeout time.Duration } func NewConfig(ctx *cli.Context) (Config, error) { @@ -47,15 +39,7 @@ func NewConfig(ctx *cli.Context) (Config, error) { if err != nil { return Config{}, err } - ejectionToken := ctx.GlobalString(flags.EjectionTokenFlag.Name) - if len(ejectionToken) < 20 { - return Config{}, errors.New("the ejection token length must be at least 20") - } ethClientConfig := geth.ReadEthClientConfig(ctx) - fireblocksConfig := common.ReadFireblocksCLIConfig(ctx, flags.FlagPrefix) - if !fireblocksConfig.Disable { - ethClientConfig = geth.ReadEthClientConfigRPCOnly(ctx) - } config := Config{ BlobstoreConfig: blobstore.Config{ BucketName: ctx.GlobalString(flags.S3BucketNameFlag.Name), @@ -63,7 +47,6 @@ func NewConfig(ctx *cli.Context) (Config, error) { }, AwsClientConfig: aws.ReadClientConfig(ctx, flags.FlagPrefix), EthClientConfig: ethClientConfig, - FireblocksConfig: fireblocksConfig, LoggerConfig: *loggerConfig, SocketAddr: ctx.GlobalString(flags.SocketAddrFlag.Name), SubgraphApiBatchMetadataAddr: ctx.GlobalString(flags.SubgraphApiBatchMetadataAddrFlag.Name), @@ -77,9 +60,7 @@ func NewConfig(ctx *cli.Context) (Config, error) { Secret: ctx.GlobalString(flags.PrometheusServerSecretFlag.Name), Cluster: ctx.GlobalString(flags.PrometheusMetricsClusterLabelFlag.Name), }, - AllowOrigins: ctx.GlobalStringSlice(flags.AllowOriginsFlag.Name), - EjectionToken: ejectionToken, - NonsigningRateThreshold: ctx.GlobalInt(flags.NonsigningRateThresholdFlag.Name), + AllowOrigins: ctx.GlobalStringSlice(flags.AllowOriginsFlag.Name), MetricsConfig: dataapi.MetricsConfig{ HTTPPort: ctx.GlobalString(flags.MetricsHTTPPort.Name), @@ -88,8 +69,6 @@ func NewConfig(ctx *cli.Context) (Config, error) { DisperserHostname: ctx.GlobalString(flags.DisperserHostnameFlag.Name), ChurnerHostname: ctx.GlobalString(flags.ChurnerHostnameFlag.Name), BatcherHealthEndpt: ctx.GlobalString(flags.BatcherHealthEndptFlag.Name), - - TxnTimeout: ctx.GlobalDuration(flags.TxnTimeoutFlag.Name), } return config, nil } diff --git a/disperser/cmd/dataapi/flags/flags.go b/disperser/cmd/dataapi/flags/flags.go index 8a427ff8c..453b25a72 100644 --- a/disperser/cmd/dataapi/flags/flags.go +++ b/disperser/cmd/dataapi/flags/flags.go @@ -1,8 +1,6 @@ package flags import ( - "time" - "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigenda/common/aws" "github.com/Layr-Labs/eigenda/common/geth" @@ -97,12 +95,6 @@ var ( EnvVar: common.PrefixEnvVar(envVarPrefix, "ALLOW_ORIGINS"), Required: true, } - EjectionTokenFlag = cli.StringFlag{ - Name: common.PrefixFlag(FlagPrefix, "ejection-token"), - Usage: "The token used for authorizing the ejection requests", - Required: true, - EnvVar: common.PrefixEnvVar(envVarPrefix, "EJECTION_TOKEN"), - } EnableMetricsFlag = cli.BoolFlag{ Name: common.PrefixFlag(FlagPrefix, "enable-metrics"), Usage: "start metrics server", @@ -139,20 +131,6 @@ var ( Value: "9100", EnvVar: common.PrefixEnvVar(envVarPrefix, "METRICS_HTTP_PORT"), } - TxnTimeoutFlag = cli.DurationFlag{ - Name: common.PrefixFlag(FlagPrefix, "txn-timeout"), - Usage: "the timeout for the transaction", - Required: false, - Value: 6 * time.Minute, - EnvVar: common.PrefixEnvVar(envVarPrefix, "TRANSACTION_TIMEOUT"), - } - NonsigningRateThresholdFlag = cli.IntFlag{ - Name: common.PrefixFlag(FlagPrefix, "nonsigning-rate-threshold"), - Usage: "only operators with nonsigning rate >= this threshold are eligible for ejection, this value must be in range [10, 100], any value not in this range means disabling this flag", - Required: false, - Value: -1, - EnvVar: common.PrefixEnvVar(envVarPrefix, "NONSIGNING_RATE_THRESHOLD"), - } ) var requiredFlags = []cli.Flag{ @@ -168,18 +146,15 @@ var requiredFlags = []cli.Flag{ PrometheusServerSecretFlag, PrometheusMetricsClusterLabelFlag, AllowOriginsFlag, - EjectionTokenFlag, EnableMetricsFlag, DisperserHostnameFlag, ChurnerHostnameFlag, BatcherHealthEndptFlag, - TxnTimeoutFlag, } var optionalFlags = []cli.Flag{ ServerModeFlag, MetricsHTTPPort, - NonsigningRateThresholdFlag, } // Flags contains the list of configuration options available to the binary. @@ -189,6 +164,5 @@ func init() { Flags = append(requiredFlags, optionalFlags...) Flags = append(Flags, common.LoggerCLIFlags(envVarPrefix, FlagPrefix)...) Flags = append(Flags, geth.EthClientFlags(envVarPrefix)...) - Flags = append(Flags, common.FireblocksCLIFlags(envVarPrefix, FlagPrefix)...) Flags = append(Flags, aws.ClientFlags(envVarPrefix, FlagPrefix)...) } diff --git a/disperser/cmd/dataapi/main.go b/disperser/cmd/dataapi/main.go index 7a693ad65..abddf5b1e 100644 --- a/disperser/cmd/dataapi/main.go +++ b/disperser/cmd/dataapi/main.go @@ -2,7 +2,6 @@ package main import ( "context" - "errors" "fmt" "log" "os" @@ -19,13 +18,8 @@ import ( "github.com/Layr-Labs/eigenda/disperser/dataapi" "github.com/Layr-Labs/eigenda/disperser/dataapi/prometheus" "github.com/Layr-Labs/eigenda/disperser/dataapi/subgraph" - "github.com/Layr-Labs/eigenda/operators/ejector" - walletsdk "github.com/Layr-Labs/eigensdk-go/chainio/clients/wallet" - "github.com/Layr-Labs/eigensdk-go/logging" - "github.com/Layr-Labs/eigensdk-go/signerv2" gethcommon "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/crypto" "github.com/urfave/cli" ) @@ -83,12 +77,7 @@ func RunDataApi(ctx *cli.Context) error { return err } - sender := gethcommon.Address{} - if !config.FireblocksConfig.Disable { - sender = gethcommon.HexToAddress(config.FireblocksConfig.WalletAddress) - } - - client, err := geth.NewMultiHomingClient(config.EthClientConfig, sender, logger) + client, err := geth.NewMultiHomingClient(config.EthClientConfig, gethcommon.Address{}, logger) if err != nil { return err } @@ -98,10 +87,6 @@ func RunDataApi(ctx *cli.Context) error { return err } - wallet, err := getWallet(config, client, logger) - if err != nil { - return err - } var ( promClient = dataapi.NewPrometheusClient(promApi, config.PrometheusConfig.Cluster) blobMetadataStore = blobstore.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfig.TableName, 0) @@ -115,7 +100,6 @@ func RunDataApi(ctx *cli.Context) error { ServerMode: config.ServerMode, SocketAddr: config.SocketAddr, AllowOrigins: config.AllowOrigins, - EjectionToken: config.EjectionToken, DisperserHostname: config.DisperserHostname, ChurnerHostname: config.ChurnerHostname, BatcherHealthEndpt: config.BatcherHealthEndpt, @@ -125,7 +109,6 @@ func RunDataApi(ctx *cli.Context) error { subgraphClient, tx, chainState, - ejector.NewEjector(wallet, client, logger, tx, metrics.EjectorMetrics, config.TxnTimeout, config.NonsigningRateThreshold), logger, metrics, nil, @@ -164,32 +147,3 @@ func RunDataApi(ctx *cli.Context) error { return err } - -func getWallet(config Config, ethClient common.EthClient, logger logging.Logger) (walletsdk.Wallet, error) { - var wallet walletsdk.Wallet - if !config.FireblocksConfig.Disable { - return common.NewFireblocksWallet(&config.FireblocksConfig, ethClient, logger) - } else if len(config.EthClientConfig.PrivateKeyString) > 0 { - privateKey, err := crypto.HexToECDSA(config.EthClientConfig.PrivateKeyString) - if err != nil { - return nil, fmt.Errorf("failed to parse private key: %w", err) - } - chainID, err := ethClient.ChainID(context.Background()) - if err != nil { - return nil, fmt.Errorf("failed to get chain ID: %w", err) - } - signerV2, address, err := signerv2.SignerFromConfig(signerv2.Config{PrivateKey: privateKey}, chainID) - if err != nil { - return nil, err - } - wallet, err = walletsdk.NewPrivateKeyWallet(ethClient, signerV2, address, logger.With("component", "PrivateKeyWallet")) - if err != nil { - return nil, err - } - logger.Info("Initialized PrivateKey wallet", "address", address.Hex()) - } else { - return nil, errors.New("no wallet is configured. Either Fireblocks or PrivateKey wallet should be configured") - } - - return wallet, nil -} diff --git a/disperser/dataapi/config.go b/disperser/dataapi/config.go index c23512dbe..b7027007d 100644 --- a/disperser/dataapi/config.go +++ b/disperser/dataapi/config.go @@ -7,5 +7,4 @@ type Config struct { DisperserHostname string ChurnerHostname string BatcherHealthEndpt string - EjectionToken string } diff --git a/disperser/dataapi/docs/docs.go b/disperser/dataapi/docs/docs.go index 0f8b91c5b..8e8a2262d 100644 --- a/disperser/dataapi/docs/docs.go +++ b/disperser/dataapi/docs/docs.go @@ -15,63 +15,6 @@ const docTemplate = `{ "host": "{{.Host}}", "basePath": "{{.BasePath}}", "paths": { - "/ejector/operators": { - "post": { - "produces": [ - "application/json" - ], - "tags": [ - "Ejector" - ], - "summary": "Eject operators who violate the SLAs during the given time interval", - "parameters": [ - { - "type": "integer", - "description": "Lookback window for operator ejection [default: 86400]", - "name": "interval", - "in": "query" - }, - { - "type": "integer", - "description": "End time for evaluating operator ejection [default: now]", - "name": "end", - "in": "query" - }, - { - "type": "string", - "description": "Whether it's periodic or urgent ejection request [default: periodic]", - "name": "mode", - "in": "query" - } - ], - "responses": { - "200": { - "description": "OK", - "schema": { - "$ref": "#/definitions/dataapi.EjectionResponse" - } - }, - "400": { - "description": "error: Bad request", - "schema": { - "$ref": "#/definitions/dataapi.ErrorResponse" - } - }, - "404": { - "description": "error: Not found", - "schema": { - "$ref": "#/definitions/dataapi.ErrorResponse" - } - }, - "500": { - "description": "error: Server error", - "schema": { - "$ref": "#/definitions/dataapi.ErrorResponse" - } - } - } - } - }, "/feed/blobs": { "get": { "produces": [ @@ -702,14 +645,6 @@ const docTemplate = `{ } } }, - "dataapi.EjectionResponse": { - "type": "object", - "properties": { - "transaction_hash": { - "type": "string" - } - } - }, "dataapi.ErrorResponse": { "type": "object", "properties": { diff --git a/disperser/dataapi/metrics.go b/disperser/dataapi/metrics.go index c1f709dea..c2c0c7de1 100644 --- a/disperser/dataapi/metrics.go +++ b/disperser/dataapi/metrics.go @@ -7,7 +7,6 @@ import ( "github.com/Layr-Labs/eigenda/disperser" "github.com/Layr-Labs/eigenda/disperser/common/blobstore" - "github.com/Layr-Labs/eigenda/operators/ejector" "github.com/Layr-Labs/eigensdk-go/logging" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" @@ -23,9 +22,8 @@ type MetricsConfig struct { type Metrics struct { registry *prometheus.Registry - NumRequests *prometheus.CounterVec - Latency *prometheus.SummaryVec - EjectorMetrics *ejector.Metrics + NumRequests *prometheus.CounterVec + Latency *prometheus.SummaryVec httpPort string logger logging.Logger @@ -55,10 +53,9 @@ func NewMetrics(blobMetadataStore *blobstore.BlobMetadataStore, httpPort string, }, []string{"method"}, ), - EjectorMetrics: ejector.NewMetrics(reg, logger), - registry: reg, - httpPort: httpPort, - logger: logger.With("component", "DataAPIMetrics"), + registry: reg, + httpPort: httpPort, + logger: logger.With("component", "DataAPIMetrics"), } return metrics } diff --git a/disperser/dataapi/server.go b/disperser/dataapi/server.go index 678aeb68b..6f09293a4 100644 --- a/disperser/dataapi/server.go +++ b/disperser/dataapi/server.go @@ -15,7 +15,6 @@ import ( "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/encoding" - "github.com/Layr-Labs/eigenda/operators/ejector" "github.com/Layr-Labs/eigensdk-go/logging" "google.golang.org/grpc/health/grpc_health_v1" @@ -33,8 +32,7 @@ const ( maxWorkerPoolLimit = 10 maxQueryBatchesLimit = 2 - ejectionTokenParam = "X-Ejection-Token" - cacheControlParam = "Cache-Control" + cacheControlParam = "Cache-Control" // Cache control for responses. // The time unit is second for max age. @@ -166,8 +164,6 @@ type ( subgraphClient SubgraphClient transactor core.Transactor chainState core.ChainState - ejector *ejector.Ejector - ejectionToken string metrics *Metrics disperserHostName string @@ -185,7 +181,6 @@ func NewServer( subgraphClient SubgraphClient, transactor core.Transactor, chainState core.ChainState, - ejector *ejector.Ejector, logger logging.Logger, metrics *Metrics, grpcConn GRPCConn, @@ -218,8 +213,6 @@ func NewServer( transactor: transactor, chainState: chainState, metrics: metrics, - ejector: ejector, - ejectionToken: config.EjectionToken, disperserHostName: config.DisperserHostname, churnerHostName: config.ChurnerHostname, batcherHealthEndpt: config.BatcherHealthEndpt, @@ -262,8 +255,6 @@ func (s *server) Start() error { metrics.GET("/churner-service-availability", s.FetchChurnerServiceAvailability) metrics.GET("/batcher-service-availability", s.FetchBatcherAvailability) } - ejection := v1.Group("/ejection") - ejection.POST("/operators", s.EjectOperatorsHandler) swagger := v1.Group("/swagger") { swagger.GET("/*any", ginswagger.WrapHandler(swaggerfiles.Handler)) @@ -315,77 +306,6 @@ func (s *server) Shutdown() error { return nil } -// EjectOperatorsHandler godoc -// -// @Summary Eject operators who violate the SLAs during the given time interval -// @Tags Ejector -// @Produce json -// @Param interval query int false "Lookback window for operator ejection [default: 86400]" -// @Param end query int false "End time for evaluating operator ejection [default: now]" -// @Param mode query string false "Whether it's periodic or urgent ejection request [default: periodic]" -// @Success 200 {object} EjectionResponse -// @Failure 400 {object} ErrorResponse "error: Bad request" -// @Failure 404 {object} ErrorResponse "error: Not found" -// @Failure 500 {object} ErrorResponse "error: Server error" -// @Router /ejector/operators [post] -func (s *server) EjectOperatorsHandler(c *gin.Context) { - timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { - s.metrics.ObserveLatency("EjectOperators", f*1000) // make milliseconds - })) - defer timer.ObserveDuration() - - token := c.GetHeader(ejectionTokenParam) - if token != s.ejectionToken { - c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "Unauthorized"}) - return - } - - mode := ejector.PeriodicMode - if c.Query("mode") == "urgent" { - mode = ejector.UrgentMode - } - - endTime := time.Now() - if c.Query("end") != "" { - var err error - endTime, err = time.Parse("2006-01-02T15:04:05Z", c.Query("end")) - if err != nil { - s.metrics.IncrementFailedRequestNum("EjectOperators") - errorResponse(c, err) - return - } - } - - interval, err := strconv.ParseInt(c.DefaultQuery("interval", "86400"), 10, 64) - if err != nil || interval == 0 { - interval = 86400 - } - - nonSigningRate, err := s.getOperatorNonsigningRate(c.Request.Context(), endTime.Unix()-interval, endTime.Unix(), true) - var ejectionResponse *ejector.EjectionResponse - if err == nil { - nonSigningMetrics := make([]*ejector.NonSignerMetric, 0) - for _, metric := range nonSigningRate.Data { - nonSigningMetrics = append(nonSigningMetrics, &ejector.NonSignerMetric{ - OperatorId: metric.OperatorId, - OperatorAddress: metric.OperatorAddress, - QuorumId: metric.QuorumId, - TotalUnsignedBatches: metric.TotalUnsignedBatches, - Percentage: metric.Percentage, - StakePercentage: metric.StakePercentage, - }) - } - ejectionResponse, err = s.ejector.Eject(c.Request.Context(), nonSigningMetrics, mode) - } - if err != nil { - s.metrics.IncrementFailedRequestNum("EjectOperators") - errorResponse(c, err) - return - } - s.metrics.IncrementSuccessfulRequestNum("EjectOperators") - c.JSON(http.StatusOK, ejectionResponse) -} - // FetchBlobHandler godoc // // @Summary Fetch blob metadata by blob key diff --git a/disperser/dataapi/server_test.go b/disperser/dataapi/server_test.go index e7b09c5ea..6e600bb3c 100644 --- a/disperser/dataapi/server_test.go +++ b/disperser/dataapi/server_test.go @@ -15,7 +15,6 @@ import ( "testing" "time" - commonmock "github.com/Layr-Labs/eigenda/common/mock" "github.com/Layr-Labs/eigenda/core" coremock "github.com/Layr-Labs/eigenda/core/mock" "github.com/Layr-Labs/eigenda/disperser" @@ -25,19 +24,15 @@ import ( "github.com/Layr-Labs/eigenda/disperser/dataapi/subgraph" subgraphmock "github.com/Layr-Labs/eigenda/disperser/dataapi/subgraph/mock" "github.com/Layr-Labs/eigenda/encoding" - "github.com/Layr-Labs/eigenda/operators/ejector" - sdkmock "github.com/Layr-Labs/eigensdk-go/chainio/clients/mocks" "github.com/Layr-Labs/eigensdk-go/logging" "github.com/consensys/gnark-crypto/ecc/bn254/fp" "github.com/ethereum/go-ethereum/common" gethcommon "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" "github.com/gin-gonic/gin" "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "go.uber.org/goleak" - "go.uber.org/mock/gomock" "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" ) @@ -56,7 +51,7 @@ var ( mockSubgraphApi = &subgraphmock.MockSubgraphApi{} subgraphClient = dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger) - config = dataapi.Config{ServerMode: "test", SocketAddr: ":8080", AllowOrigins: []string{"*"}, DisperserHostname: "localhost:32007", ChurnerHostname: "localhost:32009", EjectionToken: "deadbeef"} + config = dataapi.Config{ServerMode: "test", SocketAddr: ":8080", AllowOrigins: []string{"*"}, DisperserHostname: "localhost:32007", ChurnerHostname: "localhost:32009"} mockTx = &coremock.MockTransactor{} metrics = dataapi.NewMetrics(nil, "9001", mockLogger) @@ -72,7 +67,7 @@ var ( opId1: 3, }, }) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, subgraphClient, mockTx, mockChainState, nil, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, subgraphClient, mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil, nil) expectedBatchHeaderHash = [32]byte{1, 2, 3} expectedBlobIndex = uint32(1) expectedRequestedAt = uint64(5567830000000000000) @@ -329,71 +324,6 @@ func TestFetchMetricsThroughputHandler(t *testing.T) { assert.Equal(t, float64(3.503022666666651e+07), totalThroughput) } -func TestEjectOperatorHandler(t *testing.T) { - r := setUpRouter() - ejectorComponents := getEjector(t) - testDataApiServer := dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, ejectorComponents.ejector, mockLogger, metrics, &MockGRPCConnection{}, nil, &MockHttpClient{ShouldSucceed: true}) - - stopTime := time.Now().UTC() - interval := 3600 - startTime := stopTime.Add(-time.Duration(interval) * time.Second) - mockSubgraphApi.On("QueryBatchNonSigningInfo", startTime.Unix(), stopTime.Unix()).Return(batchNonSigningInfo, nil) - addr1 := gethcommon.HexToAddress("0x00000000219ab540356cbb839cbe05303d7705fa") - addr2 := gethcommon.HexToAddress("0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2") - mockTx.On("BatchOperatorIDToAddress").Return([]gethcommon.Address{addr1, addr2}, nil) - mockTx.On("GetQuorumBitmapForOperatorsAtBlockNumber").Return([]*big.Int{big.NewInt(3), big.NewInt(0)}, nil) - mockTx.On("BuildEjectOperatorsTxn", mock.Anything, mock.Anything).Return(types.NewTransaction(0, gethcommon.HexToAddress("0x1"), big.NewInt(0), 0, big.NewInt(0), []byte{}), nil) - mockTx.On("EjectOperators").Return(&types.Receipt{ - GasUsed: uint64(10), - }, nil) - mockSubgraphApi.On("QueryOperatorAddedToQuorum").Return(operatorAddedToQuorum, nil) - mockSubgraphApi.On("QueryOperatorRemovedFromQuorum").Return(operatorRemovedFromQuorum, nil) - ejectorComponents.ethClient.On("GetLatestGasCaps").Return(big.NewInt(0), big.NewInt(0), nil) - ejectorComponents.ethClient.On("UpdateGas").Return(types.NewTransaction(0, gethcommon.HexToAddress("0x1"), big.NewInt(0), 0, big.NewInt(0), []byte{}), nil) - txID := "1234" - receipt := &types.Receipt{ - BlockNumber: new(big.Int).SetUint64(1), - TxHash: gethcommon.HexToHash("0xdf9c2506b0dbb107d5a35e262e2e94fe9ce91440dfbba2e7a919bd2e83aee29e"), - } - gomock.InOrder( - ejectorComponents.wallet.EXPECT().SendTransaction(gomock.Any(), gomock.Any()).Return(txID, nil), - ejectorComponents.wallet.EXPECT().GetTransactionReceipt(gomock.Any(), gomock.Any()).Return(receipt, nil), - ) - - r.GET("/v1/ejector/operator", testDataApiServer.EjectOperatorsHandler) - - reqStr := fmt.Sprintf("/v1/ejector/operator?interval=%v&end=%s", interval, stopTime.Format("2006-01-02T15:04:05Z")) - - w := httptest.NewRecorder() - req := httptest.NewRequest(http.MethodGet, reqStr, nil) - ctxWithDeadline, cancel := context.WithTimeout(req.Context(), 500*time.Microsecond) - defer cancel() - req = req.WithContext(ctxWithDeadline) - r.ServeHTTP(w, req) - assert.Equal(t, w.Code, http.StatusUnauthorized) - - w2 := httptest.NewRecorder() - req2 := httptest.NewRequest(http.MethodGet, reqStr, nil) - req2.Header.Set("X-Ejection-Token", "deadbeef") - ctxWithDeadline2, cancel2 := context.WithTimeout(req2.Context(), 500*time.Microsecond) - defer cancel2() - req2 = req2.WithContext(ctxWithDeadline2) - r.ServeHTTP(w2, req2) - assert.Equal(t, w2.Code, http.StatusOK) - - res := w2.Result() - defer res.Body.Close() - - data, err := io.ReadAll(res.Body) - assert.NoError(t, err) - - var response ejector.EjectionResponse - err = json.Unmarshal(data, &response) - assert.NoError(t, err) - assert.NotNil(t, response) - assert.Equal(t, receipt.TxHash.Hex(), response.TransactionHash) -} - func TestFetchUnsignedBatchesHandler(t *testing.T) { r := setUpRouter() @@ -453,24 +383,6 @@ func TestFetchUnsignedBatchesHandler(t *testing.T) { assert.Equal(t, float64(25), responseData.StakePercentage) } -type ejectorComponents struct { - wallet *sdkmock.MockWallet - ethClient *commonmock.MockEthClient - ejector *ejector.Ejector -} - -func getEjector(t *testing.T) *ejectorComponents { - ctrl := gomock.NewController(t) - w := sdkmock.NewMockWallet(ctrl) - ethClient := &commonmock.MockEthClient{} - ejector := ejector.NewEjector(w, ethClient, mockLogger, mockTx, metrics.EjectorMetrics, 100*time.Millisecond, -1) - return &ejectorComponents{ - wallet: w, - ethClient: ethClient, - ejector: ejector, - } -} - func TestPortCheckIpValidation(t *testing.T) { assert.Equal(t, false, dataapi.ValidOperatorIP("", mockLogger)) assert.Equal(t, false, dataapi.ValidOperatorIP("0.0.0.0:32005", mockLogger)) @@ -523,7 +435,7 @@ func TestPortCheck(t *testing.T) { func TestCheckBatcherHealthExpectServing(t *testing.T) { r := setUpRouter() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, &MockHttpClient{ShouldSucceed: true}) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, &MockHttpClient{ShouldSucceed: true}) r.GET("/v1/metrics/batcher-service-availability", testDataApiServer.FetchBatcherAvailability) @@ -556,7 +468,7 @@ func TestCheckBatcherHealthExpectServing(t *testing.T) { func TestCheckBatcherHealthExpectNotServing(t *testing.T) { r := setUpRouter() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, &MockHttpClient{ShouldSucceed: false}) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, &MockHttpClient{ShouldSucceed: false}) r.GET("/v1/metrics/batcher-service-availability", testDataApiServer.FetchBatcherAvailability) @@ -594,7 +506,7 @@ func TestFetchDisperserServiceAvailabilityHandler(t *testing.T) { Status: grpc_health_v1.HealthCheckResponse_SERVING, }) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, mockHealthCheckService, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, metrics, &MockGRPCConnection{}, mockHealthCheckService, nil) r.GET("/v1/metrics/disperser-service-availability", testDataApiServer.FetchDisperserServiceAvailability) @@ -632,7 +544,7 @@ func TestChurnerServiceAvailabilityHandler(t *testing.T) { Status: grpc_health_v1.HealthCheckResponse_SERVING, }) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, mockHealthCheckService, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, metrics, &MockGRPCConnection{}, mockHealthCheckService, nil) r.GET("/v1/metrics/churner-service-availability", testDataApiServer.FetchChurnerServiceAvailability) @@ -678,7 +590,7 @@ func TestFetchDeregisteredOperatorNoSocketInfoOneOperatorHandler(t *testing.T) { // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfoNoSocketInfo, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorStates, nil) @@ -726,7 +638,7 @@ func TestFetchDeregisteredMultipleOperatorsOneWithNoSocketInfoHandler(t *testing // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfoNoSocketInfo, nil).Once() mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorStates, nil) @@ -793,7 +705,7 @@ func TestFetchDeregisteredOperatorInfoInvalidTimeStampHandler(t *testing.T) { // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorStates, nil) @@ -837,7 +749,7 @@ func TestFetchDeregisteredOperatorInfoInvalidTimeStampTwoOperatorsHandler(t *tes // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorStates, nil) @@ -892,7 +804,7 @@ func TestFetchMetricsDeregisteredOperatorHandler(t *testing.T) { // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil).Once() mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorStates, nil) @@ -956,7 +868,7 @@ func TestFetchDeregisteredOperatorOffline(t *testing.T) { mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphOperatorDeregistered, nil) mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorState, nil) @@ -1008,7 +920,7 @@ func TestFetchDeregisteredOperatorsWithoutDaysQueryParam(t *testing.T) { // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil).Once() mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorStates, nil) @@ -1065,7 +977,7 @@ func TestFetchDeregisteredOperatorInvalidDaysQueryParam(t *testing.T) { mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphOperatorDeregistered, nil) mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorStates, nil) @@ -1106,7 +1018,7 @@ func TestFetchDeregisteredOperatorQueryDaysGreaterThan30(t *testing.T) { mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphOperatorDeregistered, nil) mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorState, nil) @@ -1151,7 +1063,7 @@ func TestFetchDeregisteredOperatorsMultipleOffline(t *testing.T) { // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil).Once() mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorStates, nil) @@ -1208,7 +1120,7 @@ func TestFetchDeregisteredOperatorOnline(t *testing.T) { mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphOperatorDeregistered, nil) mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorState, nil) @@ -1263,7 +1175,7 @@ func TestFetchDeregisteredOperatorsMultipleOfflineOnline(t *testing.T) { // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil).Once() mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorStates, nil) @@ -1329,7 +1241,7 @@ func TestFetchDeregisteredOperatorsMultipleOnline(t *testing.T) { mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphTwoOperatorsDeregistered, nil) mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil).Once() mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorStates, nil) @@ -1403,7 +1315,7 @@ func TestFetchDeregisteredOperatorsMultipleOfflineSameBlock(t *testing.T) { mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil).Once() mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo3, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorStates, nil) @@ -1463,7 +1375,7 @@ func TestFetchRegisteredOperatorOnline(t *testing.T) { indexedOperatorState[core.OperatorID{0}] = subgraphDeregisteredOperatorInfo mockSubgraphApi.On("QueryRegisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphOperatorRegistered, nil) mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, nil, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, metrics, &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedOperatorsWithStateForTimeWindow").Return(indexedOperatorState, nil)