Skip to content

Commit

Permalink
Feat memstore artificial latency (#114)
Browse files Browse the repository at this point in the history
* chore: make server flags with default value optional

* chore: fix run-server command (renamed to run-memstore-server)

* feat: put/get artificial latency for memstore backend

* doc: add memstore put/get latency flags into README options table

* chore: clean flags

* refactor: use config for memstore

* fix: memstore tests

* refactor: add default verifier config creation function for memstore tests

* fix(lint): remove unused var
  • Loading branch information
samlaf authored Sep 6, 2024
1 parent 7959e8a commit 1258e26
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 88 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ stop-minio:
docker stop minio && docker rm minio; \
fi

run-server:
./bin/eigenda-proxy
run-memstore-server:
./bin/eigenda-proxy --memstore.enabled

clean:
rm bin/eigenda-proxy
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ In order to disperse to the EigenDA network in production, or at high throughput
| `--log.pid` | `false` | `$EIGENDA_PROXY_LOG_PID` | Show pid in the log. |
| `--memstore.enabled` | `false` | `$MEMSTORE_ENABLED` | Whether to use mem-store for DA logic. |
| `--memstore.expiration` | `25m0s` | `$MEMSTORE_EXPIRATION` | Duration that a mem-store blob/commitment pair are allowed to live. |
| `--memstore.put-latency` | `0` | `$MEMSTORE_PUT_LATENCY` | Artificial latency added for memstore backend to mimic EigenDA's dispersal latency. |
| `--memstore.get-latency` | `0` | `$MEMSTORE_GET_LATENCY` | Artificial latency added for memstore backend to mimic EigenDA's retrieval latency. |
| `--metrics.addr` | `"0.0.0.0"` | `$EIGENDA_PROXY_METRICS_ADDR` | Metrics listening address. |
| `--metrics.enabled` | `false` | `$EIGENDA_PROXY_METRICS_ENABLED` | Enable the metrics server. |
| `--metrics.port` | `7300` | `$EIGENDA_PROXY_METRICS_PORT` | Metrics listening port. |
Expand Down
3 changes: 0 additions & 3 deletions cmd/server/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ import (
)

func StartProxySvr(cliCtx *cli.Context) error {
if err := server.CheckRequired(cliCtx); err != nil {
return err
}
cfg := server.ReadCLIConfig(cliCtx)
if err := cfg.Check(); err != nil {
return err
Expand Down
18 changes: 18 additions & 0 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ const (
// memstore flags
MemstoreFlagName = "memstore.enabled"
MemstoreExpirationFlagName = "memstore.expiration"
MemstorePutLatencyFlagName = "memstore.put-latency"
MemstoreGetLatencyFlagName = "memstore.get-latency"

// S3 client flags
S3CredentialTypeFlagName = "s3.credential-type" // #nosec G101
Expand Down Expand Up @@ -90,6 +92,8 @@ type Config struct {
// Memstore
MemstoreEnabled bool
MemstoreBlobExpiration time.Duration
MemstoreGetLatency time.Duration
MemstorePutLatency time.Duration

// routing
FallbackTargets []string
Expand Down Expand Up @@ -179,6 +183,8 @@ func ReadConfig(ctx *cli.Context) Config {
EthConfirmationDepth: ctx.Int64(EthConfirmationDepthFlagName),
MemstoreEnabled: ctx.Bool(MemstoreFlagName),
MemstoreBlobExpiration: ctx.Duration(MemstoreExpirationFlagName),
MemstoreGetLatency: ctx.Duration(MemstoreGetLatencyFlagName),
MemstorePutLatency: ctx.Duration(MemstorePutLatencyFlagName),
FallbackTargets: ctx.StringSlice(FallbackTargets),
CacheTargets: ctx.StringSlice(CacheTargets),
}
Expand Down Expand Up @@ -419,6 +425,18 @@ func CLIFlags() []cli.Flag {
Value: 25 * time.Minute,
EnvVars: []string{"MEMSTORE_EXPIRATION"},
},
&cli.DurationFlag{
Name: MemstorePutLatencyFlagName,
Usage: "Artificial latency added for memstore backend to mimic EigenDA's dispersal latency.",
Value: 0,
EnvVars: []string{"MEMSTORE_PUT_LATENCY"},
},
&cli.DurationFlag{
Name: MemstoreGetLatencyFlagName,
Usage: "Artificial latency added for memstore backend to mimic EigenDA's retrieval latency.",
Value: 0,
EnvVars: []string{"MEMSTORE_GET_LATENCY"},
},
&cli.StringSliceFlag{
Name: FallbackTargets,
Usage: "List of read fallback targets to rollover to if cert can't be read from EigenDA.",
Expand Down
41 changes: 10 additions & 31 deletions server/flags.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package server

import (
"fmt"

"github.com/Layr-Labs/eigenda-proxy/store"
"github.com/urfave/cli/v2"

Expand All @@ -22,38 +20,28 @@ func prefixEnvVars(name string) []string {
return opservice.PrefixEnvVar(EnvVarPrefix, name)
}

var (
ListenAddrFlag = &cli.StringFlag{
// Flags contains the list of configuration options available to the binary.
var Flags = []cli.Flag{
&cli.StringFlag{
Name: ListenAddrFlagName,
Usage: "server listening address",
Value: "127.0.0.1",
Value: "0.0.0.0",
EnvVars: prefixEnvVars("ADDR"),
}
PortFlag = &cli.IntFlag{
},
&cli.IntFlag{
Name: PortFlagName,
Usage: "server listening port",
Value: 3100,
EnvVars: prefixEnvVars("PORT"),
}
)

var requiredFlags = []cli.Flag{
ListenAddrFlag,
PortFlag,
},
}

var optionalFlags = []cli.Flag{}

func init() {
optionalFlags = append(optionalFlags, oplog.CLIFlags(EnvVarPrefix)...)
optionalFlags = append(optionalFlags, CLIFlags()...)
optionalFlags = append(optionalFlags, opmetrics.CLIFlags(EnvVarPrefix)...)
Flags = append(requiredFlags, optionalFlags...) //nolint:gocritic // this is a global variable
Flags = append(Flags, oplog.CLIFlags(EnvVarPrefix)...)
Flags = append(Flags, CLIFlags()...)
Flags = append(Flags, opmetrics.CLIFlags(EnvVarPrefix)...)
}

// Flags contains the list of configuration options available to the binary.
var Flags []cli.Flag

type CLIConfig struct {
S3Config store.S3Config
EigenDAConfig Config
Expand All @@ -76,12 +64,3 @@ func (c CLIConfig) Check() error {
}
return nil
}

func CheckRequired(ctx *cli.Context) error {
for _, f := range requiredFlags {
if !ctx.IsSet(f.Names()[0]) {
return fmt.Errorf("flag %s is required", f.Names()[0])
}
}
return nil
}
7 changes: 6 additions & 1 deletion server/load_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,12 @@ func LoadStoreRouter(ctx context.Context, cfg CLIConfig, log log.Logger) (store.
var eigenda store.KeyGeneratedStore
if cfg.EigenDAConfig.MemstoreEnabled {
log.Info("Using mem-store backend for EigenDA")
eigenda, err = store.NewMemStore(ctx, verifier, log, maxBlobLength, cfg.EigenDAConfig.MemstoreBlobExpiration)
eigenda, err = store.NewMemStore(ctx, verifier, log, store.MemStoreConfig{
MaxBlobSizeBytes: maxBlobLength,
BlobExpiration: cfg.EigenDAConfig.MemstoreBlobExpiration,
PutLatency: cfg.EigenDAConfig.MemstorePutLatency,
GetLatency: cfg.EigenDAConfig.MemstoreGetLatency,
})
} else {
var client *clients.EigenDAClient
log.Info("Using EigenDA backend")
Expand Down
43 changes: 26 additions & 17 deletions store/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ const (
DefaultPruneInterval = 500 * time.Millisecond
)

type MemStoreConfig struct {
MaxBlobSizeBytes uint64
BlobExpiration time.Duration
// artificial latency added for memstore backend to mimic eigenda's latency
PutLatency time.Duration
GetLatency time.Duration
}

/*
MemStore is a simple in-memory store for blobs which uses an expiration
time to evict blobs to best emulate the ephemeral nature of blobs dispersed to
Expand All @@ -30,34 +38,33 @@ EigenDA operators.
type MemStore struct {
sync.RWMutex

config MemStoreConfig
l log.Logger
keyStarts map[string]time.Time
store map[string][]byte
verifier *verify.Verifier
codec codecs.BlobCodec

maxBlobSizeBytes uint64
blobExpiration time.Duration
reads int
reads int
}

var _ KeyGeneratedStore = (*MemStore)(nil)

// NewMemStore ... constructor
func NewMemStore(ctx context.Context, verifier *verify.Verifier, l log.Logger,
maxBlobSizeBytes uint64, blobExpiration time.Duration) (*MemStore, error) {
func NewMemStore(
ctx context.Context, verifier *verify.Verifier, l log.Logger, config MemStoreConfig,
) (*MemStore, error) {
store := &MemStore{
l: l,
keyStarts: make(map[string]time.Time),
store: make(map[string][]byte),
verifier: verifier,
codec: codecs.NewIFFTCodec(codecs.NewDefaultBlobCodec()),
maxBlobSizeBytes: maxBlobSizeBytes,
blobExpiration: blobExpiration,
l: l,
config: config,
keyStarts: make(map[string]time.Time),
store: make(map[string][]byte),
verifier: verifier,
codec: codecs.NewIFFTCodec(codecs.NewDefaultBlobCodec()),
}

if store.blobExpiration != 0 {
l.Info("memstore expiration enabled", "time", store.blobExpiration)
if store.config.BlobExpiration != 0 {
l.Info("memstore expiration enabled", "time", store.config.BlobExpiration)
go store.EventLoop(ctx)
}

Expand Down Expand Up @@ -86,7 +93,7 @@ func (e *MemStore) pruneExpired() {
defer e.Unlock()

for commit, dur := range e.keyStarts {
if time.Since(dur) >= e.blobExpiration {
if time.Since(dur) >= e.config.BlobExpiration {
delete(e.keyStarts, commit)
delete(e.store, commit)

Expand All @@ -97,6 +104,7 @@ func (e *MemStore) pruneExpired() {

// Get fetches a value from the store.
func (e *MemStore) Get(_ context.Context, commit []byte) ([]byte, error) {
time.Sleep(e.config.GetLatency)
e.reads++
e.RLock()
defer e.RUnlock()
Expand Down Expand Up @@ -124,8 +132,9 @@ func (e *MemStore) Get(_ context.Context, commit []byte) ([]byte, error) {

// Put inserts a value into the store.
func (e *MemStore) Put(_ context.Context, value []byte) ([]byte, error) {
if uint64(len(value)) > e.maxBlobSizeBytes {
return nil, fmt.Errorf("blob is larger than max blob size: blob length %d, max blob size %d", len(value), e.maxBlobSizeBytes)
time.Sleep(e.config.PutLatency)
if uint64(len(value)) > e.config.MaxBlobSizeBytes {
return nil, fmt.Errorf("blob is larger than max blob size: blob length %d, max blob size %d", len(value), e.config.MaxBlobSizeBytes)
}

e.Lock()
Expand Down
95 changes: 61 additions & 34 deletions store/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,41 @@ const (
testPreimage = "Four score and seven years ago"
)

func TestGetSet(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

kzgConfig := &kzg.KzgConfig{
G1Path: "../resources/g1.point",
G2PowerOf2Path: "../resources/g2.point.powerOf2",
CacheDir: "../resources/SRSTables",
SRSOrder: 3000,
SRSNumberToLoad: 3000,
NumWorker: uint64(runtime.GOMAXPROCS(0)),
func getDefaultMemStoreTestConfig() MemStoreConfig {
return MemStoreConfig{
MaxBlobSizeBytes: 1024 * 1024,
BlobExpiration: 0,
PutLatency: 0,
GetLatency: 0,
}
}

cfg := &verify.Config{
Verify: false,
KzgConfig: kzgConfig,
func getDefaultVerifierTestConfig() *verify.Config {
return &verify.Config{
Verify: false,
KzgConfig: &kzg.KzgConfig{
G1Path: "../resources/g1.point",
G2PowerOf2Path: "../resources/g2.point.powerOf2",
CacheDir: "../resources/SRSTables",
SRSOrder: 3000,
SRSNumberToLoad: 3000,
NumWorker: uint64(runtime.GOMAXPROCS(0)),
},
}
}

verifier, err := verify.NewVerifier(cfg, nil)
func TestGetSet(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

verifier, err := verify.NewVerifier(getDefaultVerifierTestConfig(), nil)
require.NoError(t, err)

ms, err := NewMemStore(
ctx,
verifier,
log.New(),
1024*1024*2,
time.Hour*1000,
getDefaultMemStoreTestConfig(),
)

require.NoError(t, err)
Expand All @@ -62,29 +70,16 @@ func TestExpiration(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

kzgConfig := &kzg.KzgConfig{
G1Path: "../resources/g1.point",
G2PowerOf2Path: "../resources/g2.point.powerOf2",
CacheDir: "../resources/SRSTables",
SRSOrder: 3000,
SRSNumberToLoad: 3000,
NumWorker: uint64(runtime.GOMAXPROCS(0)),
}

cfg := &verify.Config{
Verify: false,
KzgConfig: kzgConfig,
}

verifier, err := verify.NewVerifier(cfg, nil)
verifier, err := verify.NewVerifier(getDefaultVerifierTestConfig(), nil)
require.NoError(t, err)

memstoreConfig := getDefaultMemStoreTestConfig()
memstoreConfig.BlobExpiration = 10 * time.Millisecond
ms, err := NewMemStore(
ctx,
verifier,
log.New(),
1024*1024*2,
time.Millisecond*10,
memstoreConfig,
)

require.NoError(t, err)
Expand All @@ -100,3 +95,35 @@ func TestExpiration(t *testing.T) {
require.Error(t, err)

}

func TestLatency(t *testing.T) {
t.Parallel()

putLatency := 1 * time.Second
getLatency := 1 * time.Second

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

verifier, err := verify.NewVerifier(getDefaultVerifierTestConfig(), nil)
require.NoError(t, err)

config := getDefaultMemStoreTestConfig()
config.PutLatency = putLatency
config.GetLatency = getLatency
ms, err := NewMemStore(ctx, verifier, log.New(), config)

require.NoError(t, err)

preimage := []byte(testPreimage)
timeBeforePut := time.Now()
key, err := ms.Put(ctx, preimage)
require.NoError(t, err)
require.GreaterOrEqual(t, time.Since(timeBeforePut), putLatency)

timeBeforeGet := time.Now()
_, err = ms.Get(ctx, key)
require.NoError(t, err)
require.GreaterOrEqual(t, time.Since(timeBeforeGet), getLatency)

}

0 comments on commit 1258e26

Please sign in to comment.