diff --git a/Dockerfile b/Dockerfile index 215bfac..fd6979f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -32,4 +32,4 @@ COPY --from=builder /app/resources/ /app/resources/ EXPOSE 4242 7300 # Run app -CMD ["./eigenda-proxy"] \ No newline at end of file +ENTRYPOINT ["./eigenda-proxy"] \ No newline at end of file diff --git a/Makefile b/Makefile index 0a7a477..34dbab7 100644 --- a/Makefile +++ b/Makefile @@ -39,8 +39,8 @@ stop-redis: docker stop redis && docker rm redis; \ fi -run-server: - ./bin/eigenda-proxy +run-memstore-server: + ./bin/eigenda-proxy --memstore.enabled clean: rm bin/eigenda-proxy diff --git a/README.md b/README.md index ba6f17f..9c0f0ce 100644 --- a/README.md +++ b/README.md @@ -48,6 +48,8 @@ In order to disperse to the EigenDA network in production, or at high throughput | `--log.pid` | `false` | `$EIGENDA_PROXY_LOG_PID` | Show pid in the log. | | `--memstore.enabled` | `false` | `$MEMSTORE_ENABLED` | Whether to use mem-store for DA logic. | | `--memstore.expiration` | `25m0s` | `$MEMSTORE_EXPIRATION` | Duration that a mem-store blob/commitment pair are allowed to live. | +| `--memstore.put-latency` | `0` | `$MEMSTORE_PUT_LATENCY` | Artificial latency added for memstore backend to mimic EigenDA's dispersal latency. | +| `--memstore.get-latency` | `0` | `$MEMSTORE_GET_LATENCY` | Artificial latency added for memstore backend to mimic EigenDA's retrieval latency. | | `--metrics.addr` | `"0.0.0.0"` | `$EIGENDA_PROXY_METRICS_ADDR` | Metrics listening address. | | `--metrics.enabled` | `false` | `$EIGENDA_PROXY_METRICS_ENABLED` | Enable the metrics server. | | `--metrics.port` | `7300` | `$EIGENDA_PROXY_METRICS_PORT` | Metrics listening port. | diff --git a/cmd/server/entrypoint.go b/cmd/server/entrypoint.go index 2959ec9..77cad47 100644 --- a/cmd/server/entrypoint.go +++ b/cmd/server/entrypoint.go @@ -13,9 +13,6 @@ import ( ) func StartProxySvr(cliCtx *cli.Context) error { - if err := server.CheckRequired(cliCtx); err != nil { - return err - } cfg := server.ReadCLIConfig(cliCtx) if err := cfg.Check(); err != nil { return err diff --git a/server/config.go b/server/config.go index 583b21b..bd4279a 100644 --- a/server/config.go +++ b/server/config.go @@ -38,6 +38,8 @@ const ( // memstore flags MemstoreFlagName = "memstore.enabled" MemstoreExpirationFlagName = "memstore.expiration" + MemstorePutLatencyFlagName = "memstore.put-latency" + MemstoreGetLatencyFlagName = "memstore.get-latency" // redis client flags RedisEndpointFlagName = "redis.endpoint" @@ -95,6 +97,8 @@ type Config struct { // memstore MemstoreEnabled bool MemstoreBlobExpiration time.Duration + MemstoreGetLatency time.Duration + MemstorePutLatency time.Duration // routing FallbackTargets []string @@ -194,6 +198,8 @@ func ReadConfig(ctx *cli.Context) Config { EthConfirmationDepth: ctx.Int64(EthConfirmationDepthFlagName), MemstoreEnabled: ctx.Bool(MemstoreFlagName), MemstoreBlobExpiration: ctx.Duration(MemstoreExpirationFlagName), + MemstoreGetLatency: ctx.Duration(MemstoreGetLatencyFlagName), + MemstorePutLatency: ctx.Duration(MemstorePutLatencyFlagName), FallbackTargets: ctx.StringSlice(FallbackTargets), CacheTargets: ctx.StringSlice(CacheTargets), } @@ -466,6 +472,18 @@ func CLIFlags() []cli.Flag { Value: 25 * time.Minute, EnvVars: []string{"MEMSTORE_EXPIRATION"}, }, + &cli.DurationFlag{ + Name: MemstorePutLatencyFlagName, + Usage: "Artificial latency added for memstore backend to mimic EigenDA's dispersal latency.", + Value: 0, + EnvVars: []string{"MEMSTORE_PUT_LATENCY"}, + }, + &cli.DurationFlag{ + Name: MemstoreGetLatencyFlagName, + Usage: "Artificial latency added for memstore backend to mimic EigenDA's retrieval latency.", + Value: 0, + EnvVars: []string{"MEMSTORE_GET_LATENCY"}, + }, &cli.StringSliceFlag{ Name: FallbackTargets, Usage: "List of read fallback targets to rollover to if cert can't be read from EigenDA.", diff --git a/server/flags.go b/server/flags.go index da67281..9046e3b 100644 --- a/server/flags.go +++ b/server/flags.go @@ -1,8 +1,6 @@ package server import ( - "fmt" - "github.com/Layr-Labs/eigenda-proxy/store" "github.com/urfave/cli/v2" @@ -22,38 +20,28 @@ func prefixEnvVars(name string) []string { return opservice.PrefixEnvVar(EnvVarPrefix, name) } -var ( - ListenAddrFlag = &cli.StringFlag{ +// Flags contains the list of configuration options available to the binary. +var Flags = []cli.Flag{ + &cli.StringFlag{ Name: ListenAddrFlagName, Usage: "server listening address", - Value: "127.0.0.1", + Value: "0.0.0.0", EnvVars: prefixEnvVars("ADDR"), - } - PortFlag = &cli.IntFlag{ + }, + &cli.IntFlag{ Name: PortFlagName, Usage: "server listening port", Value: 3100, EnvVars: prefixEnvVars("PORT"), - } -) - -var requiredFlags = []cli.Flag{ - ListenAddrFlag, - PortFlag, + }, } -var optionalFlags = []cli.Flag{} - func init() { - optionalFlags = append(optionalFlags, oplog.CLIFlags(EnvVarPrefix)...) - optionalFlags = append(optionalFlags, CLIFlags()...) - optionalFlags = append(optionalFlags, opmetrics.CLIFlags(EnvVarPrefix)...) - Flags = append(requiredFlags, optionalFlags...) //nolint:gocritic // this is a global variable + Flags = append(Flags, oplog.CLIFlags(EnvVarPrefix)...) + Flags = append(Flags, CLIFlags()...) + Flags = append(Flags, opmetrics.CLIFlags(EnvVarPrefix)...) } -// Flags contains the list of configuration options available to the binary. -var Flags []cli.Flag - type CLIConfig struct { RedisCfg store.RedisConfig S3Config store.S3Config @@ -78,12 +66,3 @@ func (c CLIConfig) Check() error { } return nil } - -func CheckRequired(ctx *cli.Context) error { - for _, f := range requiredFlags { - if !ctx.IsSet(f.Names()[0]) { - return fmt.Errorf("flag %s is required", f.Names()[0]) - } - } - return nil -} diff --git a/server/load_store.go b/server/load_store.go index d3e8a92..e73f09d 100644 --- a/server/load_store.go +++ b/server/load_store.go @@ -86,7 +86,12 @@ func LoadStoreRouter(ctx context.Context, cfg CLIConfig, log log.Logger) (store. var eigenda store.KeyGeneratedStore if cfg.EigenDAConfig.MemstoreEnabled { log.Info("Using mem-store backend for EigenDA") - eigenda, err = store.NewMemStore(ctx, verifier, log, maxBlobLength, cfg.EigenDAConfig.MemstoreBlobExpiration) + eigenda, err = store.NewMemStore(ctx, verifier, log, store.MemStoreConfig{ + MaxBlobSizeBytes: maxBlobLength, + BlobExpiration: cfg.EigenDAConfig.MemstoreBlobExpiration, + PutLatency: cfg.EigenDAConfig.MemstorePutLatency, + GetLatency: cfg.EigenDAConfig.MemstoreGetLatency, + }) } else { var client *clients.EigenDAClient log.Info("Using EigenDA backend") diff --git a/store/memory.go b/store/memory.go index 19d63c6..76dd562 100644 --- a/store/memory.go +++ b/store/memory.go @@ -22,6 +22,14 @@ const ( DefaultPruneInterval = 500 * time.Millisecond ) +type MemStoreConfig struct { + MaxBlobSizeBytes uint64 + BlobExpiration time.Duration + // artificial latency added for memstore backend to mimic eigenda's latency + PutLatency time.Duration + GetLatency time.Duration +} + /* MemStore is a simple in-memory store for blobs which uses an expiration time to evict blobs to best emulate the ephemeral nature of blobs dispersed to @@ -30,42 +38,41 @@ EigenDA operators. type MemStore struct { sync.RWMutex + config MemStoreConfig l log.Logger keyStarts map[string]time.Time store map[string][]byte verifier *verify.Verifier codec codecs.BlobCodec - maxBlobSizeBytes uint64 - blobExpiration time.Duration - reads int + reads int } var _ KeyGeneratedStore = (*MemStore)(nil) // NewMemStore ... constructor -func NewMemStore(ctx context.Context, verifier *verify.Verifier, l log.Logger, - maxBlobSizeBytes uint64, blobExpiration time.Duration) (*MemStore, error) { +func NewMemStore( + ctx context.Context, verifier *verify.Verifier, l log.Logger, config MemStoreConfig, +) (*MemStore, error) { store := &MemStore{ - l: l, - keyStarts: make(map[string]time.Time), - store: make(map[string][]byte), - verifier: verifier, - codec: codecs.NewIFFTCodec(codecs.NewDefaultBlobCodec()), - maxBlobSizeBytes: maxBlobSizeBytes, - blobExpiration: blobExpiration, + l: l, + config: config, + keyStarts: make(map[string]time.Time), + store: make(map[string][]byte), + verifier: verifier, + codec: codecs.NewIFFTCodec(codecs.NewDefaultBlobCodec()), } - if store.blobExpiration != 0 { - l.Info("memstore expiration enabled", "time", store.blobExpiration) - go store.EventLoop(ctx) + if store.config.BlobExpiration != 0 { + l.Info("memstore expiration enabled", "time", store.config.BlobExpiration) + go store.pruningLoop(ctx) } return store, nil } -// EventLoop ... runs a background goroutine to prune expired blobs from the store on a regular interval. -func (e *MemStore) EventLoop(ctx context.Context) { +// pruningLoop ... runs a background goroutine to prune expired blobs from the store on a regular interval. +func (e *MemStore) pruningLoop(ctx context.Context) { timer := time.NewTicker(DefaultPruneInterval) for { @@ -86,7 +93,7 @@ func (e *MemStore) pruneExpired() { defer e.Unlock() for commit, dur := range e.keyStarts { - if time.Since(dur) >= e.blobExpiration { + if time.Since(dur) >= e.config.BlobExpiration { delete(e.keyStarts, commit) delete(e.store, commit) @@ -97,6 +104,7 @@ func (e *MemStore) pruneExpired() { // Get fetches a value from the store. func (e *MemStore) Get(_ context.Context, commit []byte) ([]byte, error) { + time.Sleep(e.config.GetLatency) e.reads++ e.RLock() defer e.RUnlock() @@ -124,8 +132,9 @@ func (e *MemStore) Get(_ context.Context, commit []byte) ([]byte, error) { // Put inserts a value into the store. func (e *MemStore) Put(_ context.Context, value []byte) ([]byte, error) { - if uint64(len(value)) > e.maxBlobSizeBytes { - return nil, fmt.Errorf("blob is larger than max blob size: blob length %d, max blob size %d", len(value), e.maxBlobSizeBytes) + time.Sleep(e.config.PutLatency) + if uint64(len(value)) > e.config.MaxBlobSizeBytes { + return nil, fmt.Errorf("blob is larger than max blob size: blob length %d, max blob size %d", len(value), e.config.MaxBlobSizeBytes) } e.Lock() diff --git a/store/memory_test.go b/store/memory_test.go index cbecc01..52965a0 100644 --- a/store/memory_test.go +++ b/store/memory_test.go @@ -16,33 +16,41 @@ const ( testPreimage = "Four score and seven years ago" ) -func TestGetSet(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - kzgConfig := &kzg.KzgConfig{ - G1Path: "../resources/g1.point", - G2PowerOf2Path: "../resources/g2.point.powerOf2", - CacheDir: "../resources/SRSTables", - SRSOrder: 3000, - SRSNumberToLoad: 3000, - NumWorker: uint64(runtime.GOMAXPROCS(0)), +func getDefaultMemStoreTestConfig() MemStoreConfig { + return MemStoreConfig{ + MaxBlobSizeBytes: 1024 * 1024, + BlobExpiration: 0, + PutLatency: 0, + GetLatency: 0, } +} - cfg := &verify.Config{ - Verify: false, - KzgConfig: kzgConfig, +func getDefaultVerifierTestConfig() *verify.Config { + return &verify.Config{ + Verify: false, + KzgConfig: &kzg.KzgConfig{ + G1Path: "../resources/g1.point", + G2PowerOf2Path: "../resources/g2.point.powerOf2", + CacheDir: "../resources/SRSTables", + SRSOrder: 3000, + SRSNumberToLoad: 3000, + NumWorker: uint64(runtime.GOMAXPROCS(0)), + }, } +} - verifier, err := verify.NewVerifier(cfg, nil) +func TestGetSet(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + verifier, err := verify.NewVerifier(getDefaultVerifierTestConfig(), nil) require.NoError(t, err) ms, err := NewMemStore( ctx, verifier, log.New(), - 1024*1024*2, - time.Hour*1000, + getDefaultMemStoreTestConfig(), ) require.NoError(t, err) @@ -62,29 +70,16 @@ func TestExpiration(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - kzgConfig := &kzg.KzgConfig{ - G1Path: "../resources/g1.point", - G2PowerOf2Path: "../resources/g2.point.powerOf2", - CacheDir: "../resources/SRSTables", - SRSOrder: 3000, - SRSNumberToLoad: 3000, - NumWorker: uint64(runtime.GOMAXPROCS(0)), - } - - cfg := &verify.Config{ - Verify: false, - KzgConfig: kzgConfig, - } - - verifier, err := verify.NewVerifier(cfg, nil) + verifier, err := verify.NewVerifier(getDefaultVerifierTestConfig(), nil) require.NoError(t, err) + memstoreConfig := getDefaultMemStoreTestConfig() + memstoreConfig.BlobExpiration = 10 * time.Millisecond ms, err := NewMemStore( ctx, verifier, log.New(), - 1024*1024*2, - time.Millisecond*10, + memstoreConfig, ) require.NoError(t, err) @@ -100,3 +95,35 @@ func TestExpiration(t *testing.T) { require.Error(t, err) } + +func TestLatency(t *testing.T) { + t.Parallel() + + putLatency := 1 * time.Second + getLatency := 1 * time.Second + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + verifier, err := verify.NewVerifier(getDefaultVerifierTestConfig(), nil) + require.NoError(t, err) + + config := getDefaultMemStoreTestConfig() + config.PutLatency = putLatency + config.GetLatency = getLatency + ms, err := NewMemStore(ctx, verifier, log.New(), config) + + require.NoError(t, err) + + preimage := []byte(testPreimage) + timeBeforePut := time.Now() + key, err := ms.Put(ctx, preimage) + require.NoError(t, err) + require.GreaterOrEqual(t, time.Since(timeBeforePut), putLatency) + + timeBeforeGet := time.Now() + _, err = ms.Get(ctx, key) + require.NoError(t, err) + require.GreaterOrEqual(t, time.Since(timeBeforeGet), getLatency) + +}