Skip to content

Commit

Permalink
feat: put/get artificial latency for memstore backend
Browse files Browse the repository at this point in the history
  • Loading branch information
samlaf committed Sep 6, 2024
1 parent 40448db commit 701c4b6
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 10 deletions.
3 changes: 0 additions & 3 deletions cmd/server/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ import (
)

func StartProxySvr(cliCtx *cli.Context) error {
if err := server.CheckRequired(cliCtx); err != nil {
return err
}
cfg := server.ReadCLIConfig(cliCtx)
if err := cfg.Check(); err != nil {
return err
Expand Down
18 changes: 18 additions & 0 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ const (
// memstore flags
MemstoreFlagName = "memstore.enabled"
MemstoreExpirationFlagName = "memstore.expiration"
MemstorePutLatencyFlagName = "memstore.put-latency"
MemstoreGetLatencyFlagName = "memstore.get-latency"

// S3 client flags
S3CredentialTypeFlagName = "s3.credential-type" // #nosec G101
Expand Down Expand Up @@ -90,6 +92,8 @@ type Config struct {
// Memstore
MemstoreEnabled bool
MemstoreBlobExpiration time.Duration
MemstoreGetLatency time.Duration
MemstorePutLatency time.Duration

// routing
FallbackTargets []string
Expand Down Expand Up @@ -179,6 +183,8 @@ func ReadConfig(ctx *cli.Context) Config {
EthConfirmationDepth: ctx.Int64(EthConfirmationDepthFlagName),
MemstoreEnabled: ctx.Bool(MemstoreFlagName),
MemstoreBlobExpiration: ctx.Duration(MemstoreExpirationFlagName),
MemstoreGetLatency: ctx.Duration(MemstoreGetLatencyFlagName),
MemstorePutLatency: ctx.Duration(MemstorePutLatencyFlagName),
FallbackTargets: ctx.StringSlice(FallbackTargets),
CacheTargets: ctx.StringSlice(CacheTargets),
}
Expand Down Expand Up @@ -419,6 +425,18 @@ func CLIFlags() []cli.Flag {
Value: 25 * time.Minute,
EnvVars: []string{"MEMSTORE_EXPIRATION"},
},
&cli.DurationFlag{
Name: MemstorePutLatencyFlagName,
Usage: "Artificial latency added for memstore backend to mimic EigenDA's dispersal latency.",
Value: 0,
EnvVars: []string{"MEMSTORE_PUT_LATENCY"},
},
&cli.DurationFlag{
Name: MemstoreGetLatencyFlagName,
Usage: "Artificial latency added for memstore backend to mimic EigenDA's retrieval latency.",
Value: 0,
EnvVars: []string{"MEMSTORE_GET_LATENCY"},
},
&cli.StringSliceFlag{
Name: FallbackTargets,
Usage: "List of read fallback targets to rollover to if cert can't be read from EigenDA.",
Expand Down
2 changes: 1 addition & 1 deletion server/load_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func LoadStoreRouter(ctx context.Context, cfg CLIConfig, log log.Logger) (store.
var eigenda store.KeyGeneratedStore
if cfg.EigenDAConfig.MemstoreEnabled {
log.Info("Using mem-store backend for EigenDA")
eigenda, err = store.NewMemStore(ctx, verifier, log, maxBlobLength, cfg.EigenDAConfig.MemstoreBlobExpiration)
eigenda, err = store.NewMemStore(ctx, verifier, log, maxBlobLength, cfg.EigenDAConfig.MemstoreBlobExpiration, cfg.EigenDAConfig.MemstorePutLatency, cfg.EigenDAConfig.MemstoreGetLatency)
} else {
var client *clients.EigenDAClient
log.Info("Using EigenDA backend")
Expand Down
20 changes: 14 additions & 6 deletions store/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ EigenDA operators.
type MemStore struct {
sync.RWMutex

l log.Logger
keyStarts map[string]time.Time
store map[string][]byte
verifier *verify.Verifier
codec codecs.BlobCodec
l log.Logger
keyStarts map[string]time.Time
store map[string][]byte
verifier *verify.Verifier
codec codecs.BlobCodec
putLatency time.Duration
getLatency time.Duration

maxBlobSizeBytes uint64
blobExpiration time.Duration
Expand All @@ -45,7 +47,8 @@ var _ KeyGeneratedStore = (*MemStore)(nil)

// NewMemStore ... constructor
func NewMemStore(ctx context.Context, verifier *verify.Verifier, l log.Logger,
maxBlobSizeBytes uint64, blobExpiration time.Duration) (*MemStore, error) {
maxBlobSizeBytes uint64, blobExpiration time.Duration,
putLatency, getLatency time.Duration) (*MemStore, error) {
store := &MemStore{
l: l,
keyStarts: make(map[string]time.Time),
Expand All @@ -54,6 +57,9 @@ func NewMemStore(ctx context.Context, verifier *verify.Verifier, l log.Logger,
codec: codecs.NewIFFTCodec(codecs.NewDefaultBlobCodec()),
maxBlobSizeBytes: maxBlobSizeBytes,
blobExpiration: blobExpiration,
// artificial latency added for memstore backend to mimic eigenda's latency
putLatency: putLatency,
getLatency: getLatency,
}

if store.blobExpiration != 0 {
Expand Down Expand Up @@ -97,6 +103,7 @@ func (e *MemStore) pruneExpired() {

// Get fetches a value from the store.
func (e *MemStore) Get(_ context.Context, commit []byte) ([]byte, error) {
time.Sleep(e.getLatency)
e.reads++
e.RLock()
defer e.RUnlock()
Expand Down Expand Up @@ -124,6 +131,7 @@ func (e *MemStore) Get(_ context.Context, commit []byte) ([]byte, error) {

// Put inserts a value into the store.
func (e *MemStore) Put(_ context.Context, value []byte) ([]byte, error) {
time.Sleep(e.putLatency)
if uint64(len(value)) > e.maxBlobSizeBytes {
return nil, fmt.Errorf("blob is larger than max blob size: blob length %d, max blob size %d", len(value), e.maxBlobSizeBytes)
}
Expand Down
55 changes: 55 additions & 0 deletions store/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func TestGetSet(t *testing.T) {
log.New(),
1024*1024*2,
time.Hour*1000,
0, 0,
)

require.NoError(t, err)
Expand Down Expand Up @@ -85,6 +86,7 @@ func TestExpiration(t *testing.T) {
log.New(),
1024*1024*2,
time.Millisecond*10,
0, 0,
)

require.NoError(t, err)
Expand All @@ -100,3 +102,56 @@ func TestExpiration(t *testing.T) {
require.Error(t, err)

}

func TestLatency(t *testing.T) {
t.Parallel()

putLatency := 1 * time.Second
getLatency := 1 * time.Second

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

kzgConfig := &kzg.KzgConfig{
G1Path: "../resources/g1.point",
G2PowerOf2Path: "../resources/g2.point.powerOf2",
CacheDir: "../resources/SRSTables",
SRSOrder: 3000,
SRSNumberToLoad: 3000,
NumWorker: uint64(runtime.GOMAXPROCS(0)),
}

cfg := &verify.Config{
Verify: false,
KzgConfig: kzgConfig,
}

verifier, err := verify.NewVerifier(cfg, nil)
require.NoError(t, err)

ms, err := NewMemStore(
ctx,
verifier,
log.New(),
1024*1024*2,
time.Millisecond*10,
putLatency, getLatency,
)

require.NoError(t, err)

preimage := []byte(testPreimage)
timeBeforePut := time.Now()
key, err := ms.Put(ctx, preimage)
require.NoError(t, err)
require.GreaterOrEqual(t, time.Since(timeBeforePut), putLatency)

// sleep 1 second and verify that older blob entries are removed
time.Sleep(time.Second * 1)

timeBeforeGet := time.Now()
_, err = ms.Get(ctx, key)
require.Error(t, err)
require.GreaterOrEqual(t, time.Since(timeBeforeGet), getLatency)

}

0 comments on commit 701c4b6

Please sign in to comment.