diff --git a/cmd/server/entrypoint.go b/cmd/server/entrypoint.go index 2959ec9..77cad47 100644 --- a/cmd/server/entrypoint.go +++ b/cmd/server/entrypoint.go @@ -13,9 +13,6 @@ import ( ) func StartProxySvr(cliCtx *cli.Context) error { - if err := server.CheckRequired(cliCtx); err != nil { - return err - } cfg := server.ReadCLIConfig(cliCtx) if err := cfg.Check(); err != nil { return err diff --git a/server/config.go b/server/config.go index ff1d084..f407301 100644 --- a/server/config.go +++ b/server/config.go @@ -38,6 +38,8 @@ const ( // memstore flags MemstoreFlagName = "memstore.enabled" MemstoreExpirationFlagName = "memstore.expiration" + MemstorePutLatencyFlagName = "memstore.put-latency" + MemstoreGetLatencyFlagName = "memstore.get-latency" // S3 client flags S3CredentialTypeFlagName = "s3.credential-type" // #nosec G101 @@ -90,6 +92,8 @@ type Config struct { // Memstore MemstoreEnabled bool MemstoreBlobExpiration time.Duration + MemstoreGetLatency time.Duration + MemstorePutLatency time.Duration // routing FallbackTargets []string @@ -179,6 +183,8 @@ func ReadConfig(ctx *cli.Context) Config { EthConfirmationDepth: ctx.Int64(EthConfirmationDepthFlagName), MemstoreEnabled: ctx.Bool(MemstoreFlagName), MemstoreBlobExpiration: ctx.Duration(MemstoreExpirationFlagName), + MemstoreGetLatency: ctx.Duration(MemstoreGetLatencyFlagName), + MemstorePutLatency: ctx.Duration(MemstorePutLatencyFlagName), FallbackTargets: ctx.StringSlice(FallbackTargets), CacheTargets: ctx.StringSlice(CacheTargets), } @@ -419,6 +425,18 @@ func CLIFlags() []cli.Flag { Value: 25 * time.Minute, EnvVars: []string{"MEMSTORE_EXPIRATION"}, }, + &cli.DurationFlag{ + Name: MemstorePutLatencyFlagName, + Usage: "Artificial latency added for memstore backend to mimic EigenDA's dispersal latency.", + Value: 0, + EnvVars: []string{"MEMSTORE_PUT_LATENCY"}, + }, + &cli.DurationFlag{ + Name: MemstoreGetLatencyFlagName, + Usage: "Artificial latency added for memstore backend to mimic EigenDA's retrieval latency.", + Value: 0, + EnvVars: []string{"MEMSTORE_GET_LATENCY"}, + }, &cli.StringSliceFlag{ Name: FallbackTargets, Usage: "List of read fallback targets to rollover to if cert can't be read from EigenDA.", diff --git a/server/load_store.go b/server/load_store.go index bc144f1..f2af62c 100644 --- a/server/load_store.go +++ b/server/load_store.go @@ -47,7 +47,7 @@ func LoadStoreRouter(ctx context.Context, cfg CLIConfig, log log.Logger) (store. var eigenda store.KeyGeneratedStore if cfg.EigenDAConfig.MemstoreEnabled { log.Info("Using mem-store backend for EigenDA") - eigenda, err = store.NewMemStore(ctx, verifier, log, maxBlobLength, cfg.EigenDAConfig.MemstoreBlobExpiration) + eigenda, err = store.NewMemStore(ctx, verifier, log, maxBlobLength, cfg.EigenDAConfig.MemstoreBlobExpiration, cfg.EigenDAConfig.MemstorePutLatency, cfg.EigenDAConfig.MemstoreGetLatency) } else { var client *clients.EigenDAClient log.Info("Using EigenDA backend") diff --git a/store/memory.go b/store/memory.go index 19d63c6..1f7e5e6 100644 --- a/store/memory.go +++ b/store/memory.go @@ -30,11 +30,13 @@ EigenDA operators. type MemStore struct { sync.RWMutex - l log.Logger - keyStarts map[string]time.Time - store map[string][]byte - verifier *verify.Verifier - codec codecs.BlobCodec + l log.Logger + keyStarts map[string]time.Time + store map[string][]byte + verifier *verify.Verifier + codec codecs.BlobCodec + putLatency time.Duration + getLatency time.Duration maxBlobSizeBytes uint64 blobExpiration time.Duration @@ -45,7 +47,8 @@ var _ KeyGeneratedStore = (*MemStore)(nil) // NewMemStore ... constructor func NewMemStore(ctx context.Context, verifier *verify.Verifier, l log.Logger, - maxBlobSizeBytes uint64, blobExpiration time.Duration) (*MemStore, error) { + maxBlobSizeBytes uint64, blobExpiration time.Duration, + putLatency, getLatency time.Duration) (*MemStore, error) { store := &MemStore{ l: l, keyStarts: make(map[string]time.Time), @@ -54,6 +57,9 @@ func NewMemStore(ctx context.Context, verifier *verify.Verifier, l log.Logger, codec: codecs.NewIFFTCodec(codecs.NewDefaultBlobCodec()), maxBlobSizeBytes: maxBlobSizeBytes, blobExpiration: blobExpiration, + // artificial latency added for memstore backend to mimic eigenda's latency + putLatency: putLatency, + getLatency: getLatency, } if store.blobExpiration != 0 { @@ -97,6 +103,7 @@ func (e *MemStore) pruneExpired() { // Get fetches a value from the store. func (e *MemStore) Get(_ context.Context, commit []byte) ([]byte, error) { + time.Sleep(e.getLatency) e.reads++ e.RLock() defer e.RUnlock() @@ -124,6 +131,7 @@ func (e *MemStore) Get(_ context.Context, commit []byte) ([]byte, error) { // Put inserts a value into the store. func (e *MemStore) Put(_ context.Context, value []byte) ([]byte, error) { + time.Sleep(e.putLatency) if uint64(len(value)) > e.maxBlobSizeBytes { return nil, fmt.Errorf("blob is larger than max blob size: blob length %d, max blob size %d", len(value), e.maxBlobSizeBytes) } diff --git a/store/memory_test.go b/store/memory_test.go index cbecc01..85e7ffd 100644 --- a/store/memory_test.go +++ b/store/memory_test.go @@ -43,6 +43,7 @@ func TestGetSet(t *testing.T) { log.New(), 1024*1024*2, time.Hour*1000, + 0, 0, ) require.NoError(t, err) @@ -85,6 +86,7 @@ func TestExpiration(t *testing.T) { log.New(), 1024*1024*2, time.Millisecond*10, + 0, 0, ) require.NoError(t, err) @@ -100,3 +102,56 @@ func TestExpiration(t *testing.T) { require.Error(t, err) } + +func TestLatency(t *testing.T) { + t.Parallel() + + putLatency := 1 * time.Second + getLatency := 1 * time.Second + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + kzgConfig := &kzg.KzgConfig{ + G1Path: "../resources/g1.point", + G2PowerOf2Path: "../resources/g2.point.powerOf2", + CacheDir: "../resources/SRSTables", + SRSOrder: 3000, + SRSNumberToLoad: 3000, + NumWorker: uint64(runtime.GOMAXPROCS(0)), + } + + cfg := &verify.Config{ + Verify: false, + KzgConfig: kzgConfig, + } + + verifier, err := verify.NewVerifier(cfg, nil) + require.NoError(t, err) + + ms, err := NewMemStore( + ctx, + verifier, + log.New(), + 1024*1024*2, + time.Millisecond*10, + putLatency, getLatency, + ) + + require.NoError(t, err) + + preimage := []byte(testPreimage) + timeBeforePut := time.Now() + key, err := ms.Put(ctx, preimage) + require.NoError(t, err) + require.GreaterOrEqual(t, time.Since(timeBeforePut), putLatency) + + // sleep 1 second and verify that older blob entries are removed + time.Sleep(time.Second * 1) + + timeBeforeGet := time.Now() + _, err = ms.Get(ctx, key) + require.Error(t, err) + require.GreaterOrEqual(t, time.Since(timeBeforeGet), getLatency) + +}