Skip to content

Commit

Permalink
Refactor batcher/minibatcher creation to use generic interface
Browse files Browse the repository at this point in the history
  • Loading branch information
pschork committed Aug 15, 2024
1 parent 5296b08 commit a488825
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 63 deletions.
54 changes: 32 additions & 22 deletions disperser/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type Config struct {
TargetNumChunks uint
MaxBlobsToFetchFromStore int
EnableMinibatch bool
MinibatcherConfig MinibatcherConfig
}

type Batcher struct {
Expand Down Expand Up @@ -105,7 +106,8 @@ func NewBatcher(
logger logging.Logger,
metrics *Metrics,
heartbeatChan chan time.Time,
) (*Batcher, error) {
minibatchStore MinibatchStore,
) (interface{}, error) {
batchTrigger := NewEncodedSizeNotifier(
make(chan struct{}, 1),
uint64(config.BatchSizeMBLimit)*1024*1024, // convert to bytes
Expand All @@ -125,27 +127,35 @@ func NewBatcher(
return nil, err
}

return &Batcher{
Config: config,
TimeoutConfig: timeoutConfig,

Queue: queue,
Dispatcher: dispatcher,
EncoderClient: encoderClient,

ChainState: chainState,
AssignmentCoordinator: assignmentCoordinator,
Aggregator: aggregator,
EncodingStreamer: encodingStreamer,
Transactor: transactor,
TransactionManager: txnManager,
Metrics: metrics,

ethClient: ethClient,
finalizer: finalizer,
logger: logger.With("component", "Batcher"),
HeartbeatChan: heartbeatChan,
}, nil
if config.EnableMinibatch {
minibatcher, err := NewMinibatcher(config.MinibatcherConfig, queue, minibatchStore, dispatcher, chainState, assignmentCoordinator, encodingStreamer, ethClient, encodingWorkerPool, logger)
if err != nil {
return nil, err
}
return minibatcher, nil
} else {
return &Batcher{
Config: config,
TimeoutConfig: timeoutConfig,

Queue: queue,
Dispatcher: dispatcher,
EncoderClient: encoderClient,

ChainState: chainState,
AssignmentCoordinator: assignmentCoordinator,
Aggregator: aggregator,
EncodingStreamer: encodingStreamer,
Transactor: transactor,
TransactionManager: txnManager,
Metrics: metrics,

ethClient: ethClient,
finalizer: finalizer,
logger: logger.With("component", "Batcher"),
HeartbeatChan: heartbeatChan,
}, nil
}
}

func (b *Batcher) RecoverState(ctx context.Context) error {
Expand Down
12 changes: 9 additions & 3 deletions disperser/batcher/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/hex"
"errors"
"log"
"math/big"
"runtime"
"sync"
Expand All @@ -17,6 +18,7 @@ import (
"github.com/Layr-Labs/eigenda/core"
coremock "github.com/Layr-Labs/eigenda/core/mock"
"github.com/Layr-Labs/eigenda/disperser"
"github.com/Layr-Labs/eigenda/disperser/batcher"
bat "github.com/Layr-Labs/eigenda/disperser/batcher"
batchermock "github.com/Layr-Labs/eigenda/disperser/batcher/mock"
batmock "github.com/Layr-Labs/eigenda/disperser/batcher/mock"
Expand Down Expand Up @@ -73,7 +75,7 @@ func makeTestBlob(securityParams []*core.SecurityParam) core.Blob {
return blob
}

func makeBatcher(t *testing.T) (*batcherComponents, *bat.Batcher, func() []time.Time) {
func makeBatcher(t *testing.T) (*batcherComponents, *batcher.Batcher, func() []time.Time) {
// Common Components
// logger, err := common.NewLogger(common.DefaultLoggerConfig())
// assert.NoError(t, err)
Expand Down Expand Up @@ -128,8 +130,12 @@ func makeBatcher(t *testing.T) (*batcherComponents, *bat.Batcher, func() []time.
ethClient := &cmock.MockEthClient{}
txnManager := batmock.NewTxnManager()

b, err := bat.NewBatcher(config, timeoutConfig, blobStore, dispatcher, cst, asgn, encoderClient, agg, ethClient, finalizer, transactor, txnManager, logger, metrics, handleBatchLivenessChan)
batcherInterface, err := bat.NewBatcher(config, timeoutConfig, blobStore, dispatcher, cst, asgn, encoderClient, agg, ethClient, finalizer, transactor, txnManager, logger, metrics, handleBatchLivenessChan, nil)
assert.NoError(t, err)
b, ok := batcherInterface.(batcher.Batcher)
if !ok {
log.Fatal("Failed to cast interface{} to batcher.Batcher")
}

var mu sync.Mutex
var heartbeatsReceived []time.Time
Expand Down Expand Up @@ -158,7 +164,7 @@ func makeBatcher(t *testing.T) (*batcherComponents, *bat.Batcher, func() []time.
ethClient: ethClient,
dispatcher: dispatcher,
chainData: cst,
}, b, func() []time.Time {
}, &b, func() []time.Time {
close(doneListening) // Stop the goroutine listening to heartbeats

mu.Lock() // Lock before reading the slice
Expand Down
2 changes: 1 addition & 1 deletion disperser/batcher/batchstore/minibatch_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type MinibatchStore struct {

var _ batcher.MinibatchStore = (*MinibatchStore)(nil)

func NewMinibatchStore(dynamoDBClient *commondynamodb.Client, logger logging.Logger, tableName string, ttl time.Duration) *MinibatchStore {
func NewMinibatchStore(dynamoDBClient *commondynamodb.Client, logger logging.Logger, tableName string, ttl time.Duration) batcher.MinibatchStore {
logger.Debugf("creating minibatch store with table %s with TTL: %s", tableName, ttl)
return &MinibatchStore{
dynamoDBClient: dynamoDBClient,
Expand Down
2 changes: 1 addition & 1 deletion disperser/batcher/batchstore/minibatch_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ var (
localStackPort = "4566"

dynamoClient *dynamodb.Client
minibatchStore *batchstore.MinibatchStore
minibatchStore batcher.MinibatchStore

UUID = uuid.New()
minibatchTableName = fmt.Sprintf("test-MinibatchStore-%v", UUID)
Expand Down
13 changes: 6 additions & 7 deletions disperser/cmd/batcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (

type Config struct {
BatcherConfig batcher.Config
MinibatcherConfig batcher.MinibatcherConfig
TimeoutConfig batcher.TimeoutConfig
BlobstoreConfig blobstore.Config
MinibatchStoreConfig batchstore.Config
Expand Down Expand Up @@ -73,6 +72,12 @@ func NewConfig(ctx *cli.Context) (Config, error) {
MaxBlobsToFetchFromStore: ctx.GlobalInt(flags.MaxBlobsToFetchFromStoreFlag.Name),
FinalizationBlockDelay: ctx.GlobalUint(flags.FinalizationBlockDelayFlag.Name),
EnableMinibatch: ctx.Bool(flags.EnableMinibatchFlag.Name),
MinibatcherConfig: batcher.MinibatcherConfig{
PullInterval: ctx.GlobalDuration(flags.MinibatcherPullIntervalFlag.Name),
MaxNumConnections: ctx.GlobalUint(flags.MaxNodeConnectionsFlag.Name),
MaxNumRetriesPerBlob: ctx.GlobalUint(flags.MaxNumRetriesPerBlobFlag.Name),
MaxNumRetriesPerDispersal: ctx.GlobalUint(flags.MaxNumRetriesPerDispersalFlag.Name),
},
},
TimeoutConfig: batcher.TimeoutConfig{
EncodingTimeout: ctx.GlobalDuration(flags.EncodingTimeoutFlag.Name),
Expand All @@ -94,12 +99,6 @@ func NewConfig(ctx *cli.Context) (Config, error) {
IndexerConfig: indexer.ReadIndexerConfig(ctx),
KMSKeyConfig: kmsConfig,
EnableGnarkBundleEncoding: ctx.Bool(flags.EnableGnarkBundleEncodingFlag.Name),
MinibatcherConfig: batcher.MinibatcherConfig{
PullInterval: ctx.GlobalDuration(flags.MinibatcherPullIntervalFlag.Name),
MaxNumConnections: ctx.GlobalUint(flags.MaxNodeConnectionsFlag.Name),
MaxNumRetriesPerBlob: ctx.GlobalUint(flags.MaxNumRetriesPerBlobFlag.Name),
MaxNumRetriesPerDispersal: ctx.GlobalUint(flags.MaxNumRetriesPerDispersalFlag.Name),
},
MinibatchStoreConfig: batchstore.Config{
TableName: ctx.GlobalString(flags.MinibatchStoreTableNameFlag.Name),
},
Expand Down
47 changes: 18 additions & 29 deletions disperser/cmd/batcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rpc"
"github.com/gammazero/workerpool"
"github.com/urfave/cli"
)

Expand Down Expand Up @@ -241,41 +240,31 @@ func RunBatcher(ctx *cli.Context) error {
logger.Info("Enabled metrics for Batcher", "socket", httpSocket)
}

var minibatchStore batcher.MinibatchStore
if config.BatcherConfig.EnableMinibatch {
minibatchStore := batchstore.NewMinibatchStore(dynamoClient, logger, config.MinibatchStoreConfig.TableName, time.Duration((storeDurationBlocks+blockStaleMeasure)*12)*time.Second)
streamerConfig := batcher.StreamerConfig{
SRSOrder: config.BatcherConfig.SRSOrder,
EncodingRequestTimeout: config.BatcherConfig.PullInterval,
EncodingQueueLimit: config.BatcherConfig.EncodingRequestQueueSize,
TargetNumChunks: config.BatcherConfig.TargetNumChunks,
MaxBlobsToFetchFromStore: config.BatcherConfig.MaxBlobsToFetchFromStore,
FinalizationBlockDelay: config.BatcherConfig.FinalizationBlockDelay,
ChainStateTimeout: config.TimeoutConfig.ChainStateTimeout,
}
encodingWorkerPool := workerpool.New(config.BatcherConfig.NumConnections)
batchTrigger := batcher.NewEncodedSizeNotifier(
make(chan struct{}, 1),
uint64(config.BatcherConfig.BatchSizeMBLimit)*1024*1024, // convert to bytes
)
encodingStreamer, err := batcher.NewEncodingStreamer(streamerConfig, queue, ics, encoderClient, asgn, batchTrigger, encodingWorkerPool, metrics.EncodingStreamerMetrics, logger)
if err != nil {
return err
}
pool := workerpool.New(int(config.MinibatcherConfig.MaxNumConnections))
minibatcher, err := batcher.NewMinibatcher(config.MinibatcherConfig, queue, minibatchStore, dispatcher, ics, asgn, encodingStreamer, client, pool, logger)
if err != nil {
return err
minibatchStore = batchstore.NewMinibatchStore(dynamoClient, logger, config.MinibatchStoreConfig.TableName, time.Duration((storeDurationBlocks+blockStaleMeasure)*12)*time.Second)
}

batcherInterface, err := batcher.NewBatcher(config.BatcherConfig, config.TimeoutConfig, queue, dispatcher, ics, asgn, encoderClient, agg, client, finalizer, tx, txnManager, logger, metrics, handleBatchLivenessChan, minibatchStore)
if err != nil {
return err
}

if config.BatcherConfig.EnableMinibatch {
b, ok := batcherInterface.(batcher.Minibatcher)
if !ok {
log.Fatal("Failed to cast interface{} to batcher.Minibatcher")
}
err = minibatcher.Start(context.Background())
err = b.Start(context.Background())
if err != nil {
return err
}
} else {
batcher, err := batcher.NewBatcher(config.BatcherConfig, config.TimeoutConfig, queue, dispatcher, ics, asgn, encoderClient, agg, client, finalizer, tx, txnManager, logger, metrics, handleBatchLivenessChan)
if err != nil {
return err
b, ok := batcherInterface.(batcher.Batcher)
if !ok {
log.Fatal("Failed to cast interface{} to batcher.Batcher")
}
err = batcher.Start(context.Background())
err = b.Start(context.Background())
if err != nil {
return err
}
Expand Down

0 comments on commit a488825

Please sign in to comment.