diff --git a/disperser/apiserver/server_test.go b/disperser/apiserver/server_test.go index 40a7a6747..3806a96c1 100644 --- a/disperser/apiserver/server_test.go +++ b/disperser/apiserver/server_test.go @@ -41,8 +41,9 @@ import ( ) var ( - queue disperser.BlobStore - dispersalServer *apiserver.DispersalServer + queue disperser.BlobStore + dispersalServer *apiserver.DispersalServer + dispersalServerV2 *apiserver.DispersalServerV2 dockertestPool *dockertest.Pool dockertestResource *dockertest.Resource @@ -605,6 +606,7 @@ func setup() { transactor.On("GetRequiredQuorumNumbers", tmock.Anything).Return([]uint8{}, nil) dispersalServer = newTestServer(transactor) + dispersalServerV2 = newTestServerV2(transactor) } func teardown() { diff --git a/disperser/apiserver/server_v2.go b/disperser/apiserver/server_v2.go new file mode 100644 index 000000000..5ecebadfb --- /dev/null +++ b/disperser/apiserver/server_v2.go @@ -0,0 +1,76 @@ +package apiserver + +import ( + "context" + "errors" + "fmt" + "net" + + "github.com/Layr-Labs/eigenda/api" + pb "github.com/Layr-Labs/eigenda/api/grpc/disperser/v2" + healthcheck "github.com/Layr-Labs/eigenda/common/healthcheck" + "github.com/Layr-Labs/eigenda/disperser" + "github.com/Layr-Labs/eigensdk-go/logging" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/reflection" +) + +type DispersalServerV2 struct { + pb.UnimplementedDisperserServer + + serverConfig disperser.ServerConfig + logger logging.Logger +} + +// NewDispersalServerV2 creates a new Server struct with the provided parameters. +func NewDispersalServerV2( + serverConfig disperser.ServerConfig, + _logger logging.Logger, +) *DispersalServerV2 { + logger := _logger.With("component", "DispersalServerV2") + + return &DispersalServerV2{ + serverConfig: serverConfig, + logger: logger, + } +} + +func (s *DispersalServerV2) DisperseBlob(ctx context.Context, req *pb.DisperseBlobRequest) (*pb.DisperseBlobReply, error) { + return &pb.DisperseBlobReply{}, api.NewGRPCError(codes.Unimplemented, "not implemented") +} + +func (s *DispersalServerV2) GetBlobStatus(ctx context.Context, req *pb.BlobStatusRequest) (*pb.BlobStatusReply, error) { + return &pb.BlobStatusReply{}, api.NewGRPCError(codes.Unimplemented, "not implemented") +} + +func (s *DispersalServerV2) GetBlobCommitment(ctx context.Context, req *pb.BlobCommitmentRequest) (*pb.BlobCommitmentReply, error) { + return &pb.BlobCommitmentReply{}, api.NewGRPCError(codes.Unimplemented, "not implemented") +} + +func (s *DispersalServerV2) Start(ctx context.Context) error { + // Serve grpc requests + addr := fmt.Sprintf("%s:%s", disperser.Localhost, s.serverConfig.GrpcPort) + listener, err := net.Listen("tcp", addr) + if err != nil { + return errors.New("could not start tcp listener") + } + + opt := grpc.MaxRecvMsgSize(1024 * 1024 * 300) // 300 MiB + + gs := grpc.NewServer(opt) + reflection.Register(gs) + pb.RegisterDisperserServer(gs, s) + + // Register Server for Health Checks + name := pb.Disperser_ServiceDesc.ServiceName + healthcheck.RegisterHealthServer(name, gs) + + s.logger.Info("GRPC Listening", "port", s.serverConfig.GrpcPort, "address", listener.Addr().String()) + + if err := gs.Serve(listener); err != nil { + return errors.New("could not start GRPC server") + } + + return nil +} diff --git a/disperser/apiserver/server_v2_test.go b/disperser/apiserver/server_v2_test.go new file mode 100644 index 000000000..20e6ac4a3 --- /dev/null +++ b/disperser/apiserver/server_v2_test.go @@ -0,0 +1,55 @@ +package apiserver_test + +import ( + "context" + "crypto/rand" + "testing" + "time" + + "github.com/Layr-Labs/eigenda/disperser/apiserver" + "github.com/Layr-Labs/eigenda/encoding/utils/codec" + "github.com/Layr-Labs/eigensdk-go/logging" + + pbv2 "github.com/Layr-Labs/eigenda/api/grpc/disperser/v2" + "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/disperser" + "github.com/stretchr/testify/assert" +) + +func TestV2DisperseBlob(t *testing.T) { + data := make([]byte, 3*1024) + _, err := rand.Read(data) + assert.NoError(t, err) + + data = codec.ConvertByPaddingEmptyByte(data) + reply, err := dispersalServerV2.DisperseBlob(context.Background(), &pbv2.DisperseBlobRequest{ + Data: data, + BlobHeader: &pbv2.BlobHeader{}, + }) + assert.Nil(t, reply) + assert.ErrorContains(t, err, "not implemented") +} + +func TestV2GetBlobStatus(t *testing.T) { + reply, err := dispersalServerV2.GetBlobStatus(context.Background(), &pbv2.BlobStatusRequest{ + BlobKey: []byte{1}, + }) + assert.Nil(t, reply) + assert.ErrorContains(t, err, "not implemented") +} + +func TestV2GetBlobCommitment(t *testing.T) { + reply, err := dispersalServerV2.GetBlobCommitment(context.Background(), &pbv2.BlobCommitmentRequest{ + Data: []byte{1}, + }) + assert.Nil(t, reply) + assert.ErrorContains(t, err, "not implemented") +} + +func newTestServerV2(transactor core.Transactor) *apiserver.DispersalServerV2 { + logger := logging.NewNoopLogger() + return apiserver.NewDispersalServerV2(disperser.ServerConfig{ + GrpcPort: "51002", + GrpcTimeout: 1 * time.Second, + }, logger) +} diff --git a/disperser/cmd/apiserver/config.go b/disperser/cmd/apiserver/config.go index db5740c34..480a0fb4a 100644 --- a/disperser/cmd/apiserver/config.go +++ b/disperser/cmd/apiserver/config.go @@ -1,6 +1,8 @@ package main import ( + "fmt" + "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigenda/common/aws" "github.com/Layr-Labs/eigenda/common/geth" @@ -12,7 +14,15 @@ import ( "github.com/urfave/cli" ) +type DisperserVersion uint + +const ( + V1 DisperserVersion = 1 + V2 DisperserVersion = 2 +) + type Config struct { + DisperserVersion DisperserVersion AwsClientConfig aws.ClientConfig BlobstoreConfig blobstore.Config ServerConfig disperser.ServerConfig @@ -32,6 +42,10 @@ type Config struct { } func NewConfig(ctx *cli.Context) (Config, error) { + version := ctx.GlobalUint(flags.DisperserVersionFlag.Name) + if version != uint(V1) && version != uint(V2) { + return Config{}, fmt.Errorf("unknown disperser version %d", version) + } ratelimiterConfig, err := ratelimit.ReadCLIConfig(ctx, flags.FlagPrefix) if err != nil { @@ -49,7 +63,8 @@ func NewConfig(ctx *cli.Context) (Config, error) { } config := Config{ - AwsClientConfig: aws.ReadClientConfig(ctx, flags.FlagPrefix), + DisperserVersion: DisperserVersion(version), + AwsClientConfig: aws.ReadClientConfig(ctx, flags.FlagPrefix), ServerConfig: disperser.ServerConfig{ GrpcPort: ctx.GlobalString(flags.GrpcPortFlag.Name), GrpcTimeout: ctx.GlobalDuration(flags.GrpcTimeoutFlag.Name), diff --git a/disperser/cmd/apiserver/flags/flags.go b/disperser/cmd/apiserver/flags/flags.go index 1211af62c..9402f7f9f 100644 --- a/disperser/cmd/apiserver/flags/flags.go +++ b/disperser/cmd/apiserver/flags/flags.go @@ -63,6 +63,13 @@ var ( EnvVar: common.PrefixEnvVar(envVarPrefix, "EIGENDA_SERVICE_MANAGER"), } /* Optional Flags*/ + DisperserVersionFlag = cli.UintFlag{ + Name: common.PrefixFlag(FlagPrefix, "disperser-version"), + Usage: "Disperser version. Options are 1 and 2.", + Required: false, + Value: 1, + EnvVar: common.PrefixEnvVar(envVarPrefix, "DISPERSER_VERSION"), + } MetricsHTTPPort = cli.StringFlag{ Name: common.PrefixFlag(FlagPrefix, "metrics-http-port"), Usage: "the http port which the metrics prometheus server is listening", @@ -113,6 +120,7 @@ var requiredFlags = []cli.Flag{ } var optionalFlags = []cli.Flag{ + DisperserVersionFlag, MetricsHTTPPort, EnableMetrics, EnableRatelimiter, diff --git a/disperser/cmd/apiserver/main.go b/disperser/cmd/apiserver/main.go index 5394ac1d8..d205e44fc 100644 --- a/disperser/cmd/apiserver/main.go +++ b/disperser/cmd/apiserver/main.go @@ -60,6 +60,11 @@ func RunDisperserServer(ctx *cli.Context) error { return err } + if config.DisperserVersion == V2 { + server := apiserver.NewDispersalServerV2(config.ServerConfig, logger) + return server.Start(context.Background()) + } + client, err := geth.NewMultiHomingClient(config.EthClientConfig, gethcommon.Address{}, logger) if err != nil { logger.Error("Cannot create chain.Client", "err", err)