From a488825cd24fd7cead769b5fc5f410f850e54d69 Mon Sep 17 00:00:00 2001 From: Patrick Schork <354473+pschork@users.noreply.github.com> Date: Thu, 15 Aug 2024 12:42:22 -0700 Subject: [PATCH] Refactor batcher/minibatcher creation to use generic interface --- disperser/batcher/batcher.go | 54 +++++++++++-------- disperser/batcher/batcher_test.go | 12 +++-- .../batcher/batchstore/minibatch_store.go | 2 +- .../batchstore/minibatch_store_test.go | 2 +- disperser/cmd/batcher/config.go | 13 +++-- disperser/cmd/batcher/main.go | 47 +++++++--------- 6 files changed, 67 insertions(+), 63 deletions(-) diff --git a/disperser/batcher/batcher.go b/disperser/batcher/batcher.go index 466cdb72d..6ed437223 100644 --- a/disperser/batcher/batcher.go +++ b/disperser/batcher/batcher.go @@ -65,6 +65,7 @@ type Config struct { TargetNumChunks uint MaxBlobsToFetchFromStore int EnableMinibatch bool + MinibatcherConfig MinibatcherConfig } type Batcher struct { @@ -105,7 +106,8 @@ func NewBatcher( logger logging.Logger, metrics *Metrics, heartbeatChan chan time.Time, -) (*Batcher, error) { + minibatchStore MinibatchStore, +) (interface{}, error) { batchTrigger := NewEncodedSizeNotifier( make(chan struct{}, 1), uint64(config.BatchSizeMBLimit)*1024*1024, // convert to bytes @@ -125,27 +127,35 @@ func NewBatcher( return nil, err } - return &Batcher{ - Config: config, - TimeoutConfig: timeoutConfig, - - Queue: queue, - Dispatcher: dispatcher, - EncoderClient: encoderClient, - - ChainState: chainState, - AssignmentCoordinator: assignmentCoordinator, - Aggregator: aggregator, - EncodingStreamer: encodingStreamer, - Transactor: transactor, - TransactionManager: txnManager, - Metrics: metrics, - - ethClient: ethClient, - finalizer: finalizer, - logger: logger.With("component", "Batcher"), - HeartbeatChan: heartbeatChan, - }, nil + if config.EnableMinibatch { + minibatcher, err := NewMinibatcher(config.MinibatcherConfig, queue, minibatchStore, dispatcher, chainState, assignmentCoordinator, encodingStreamer, ethClient, encodingWorkerPool, logger) + if err != nil { + return nil, err + } + return minibatcher, nil + } else { + return &Batcher{ + Config: config, + TimeoutConfig: timeoutConfig, + + Queue: queue, + Dispatcher: dispatcher, + EncoderClient: encoderClient, + + ChainState: chainState, + AssignmentCoordinator: assignmentCoordinator, + Aggregator: aggregator, + EncodingStreamer: encodingStreamer, + Transactor: transactor, + TransactionManager: txnManager, + Metrics: metrics, + + ethClient: ethClient, + finalizer: finalizer, + logger: logger.With("component", "Batcher"), + HeartbeatChan: heartbeatChan, + }, nil + } } func (b *Batcher) RecoverState(ctx context.Context) error { diff --git a/disperser/batcher/batcher_test.go b/disperser/batcher/batcher_test.go index 29786e920..a01e14b63 100644 --- a/disperser/batcher/batcher_test.go +++ b/disperser/batcher/batcher_test.go @@ -4,6 +4,7 @@ import ( "context" "encoding/hex" "errors" + "log" "math/big" "runtime" "sync" @@ -17,6 +18,7 @@ import ( "github.com/Layr-Labs/eigenda/core" coremock "github.com/Layr-Labs/eigenda/core/mock" "github.com/Layr-Labs/eigenda/disperser" + "github.com/Layr-Labs/eigenda/disperser/batcher" bat "github.com/Layr-Labs/eigenda/disperser/batcher" batchermock "github.com/Layr-Labs/eigenda/disperser/batcher/mock" batmock "github.com/Layr-Labs/eigenda/disperser/batcher/mock" @@ -73,7 +75,7 @@ func makeTestBlob(securityParams []*core.SecurityParam) core.Blob { return blob } -func makeBatcher(t *testing.T) (*batcherComponents, *bat.Batcher, func() []time.Time) { +func makeBatcher(t *testing.T) (*batcherComponents, *batcher.Batcher, func() []time.Time) { // Common Components // logger, err := common.NewLogger(common.DefaultLoggerConfig()) // assert.NoError(t, err) @@ -128,8 +130,12 @@ func makeBatcher(t *testing.T) (*batcherComponents, *bat.Batcher, func() []time. ethClient := &cmock.MockEthClient{} txnManager := batmock.NewTxnManager() - b, err := bat.NewBatcher(config, timeoutConfig, blobStore, dispatcher, cst, asgn, encoderClient, agg, ethClient, finalizer, transactor, txnManager, logger, metrics, handleBatchLivenessChan) + batcherInterface, err := bat.NewBatcher(config, timeoutConfig, blobStore, dispatcher, cst, asgn, encoderClient, agg, ethClient, finalizer, transactor, txnManager, logger, metrics, handleBatchLivenessChan, nil) assert.NoError(t, err) + b, ok := batcherInterface.(batcher.Batcher) + if !ok { + log.Fatal("Failed to cast interface{} to batcher.Batcher") + } var mu sync.Mutex var heartbeatsReceived []time.Time @@ -158,7 +164,7 @@ func makeBatcher(t *testing.T) (*batcherComponents, *bat.Batcher, func() []time. ethClient: ethClient, dispatcher: dispatcher, chainData: cst, - }, b, func() []time.Time { + }, &b, func() []time.Time { close(doneListening) // Stop the goroutine listening to heartbeats mu.Lock() // Lock before reading the slice diff --git a/disperser/batcher/batchstore/minibatch_store.go b/disperser/batcher/batchstore/minibatch_store.go index c72b160ab..c66ca3f0d 100644 --- a/disperser/batcher/batchstore/minibatch_store.go +++ b/disperser/batcher/batchstore/minibatch_store.go @@ -41,7 +41,7 @@ type MinibatchStore struct { var _ batcher.MinibatchStore = (*MinibatchStore)(nil) -func NewMinibatchStore(dynamoDBClient *commondynamodb.Client, logger logging.Logger, tableName string, ttl time.Duration) *MinibatchStore { +func NewMinibatchStore(dynamoDBClient *commondynamodb.Client, logger logging.Logger, tableName string, ttl time.Duration) batcher.MinibatchStore { logger.Debugf("creating minibatch store with table %s with TTL: %s", tableName, ttl) return &MinibatchStore{ dynamoDBClient: dynamoDBClient, diff --git a/disperser/batcher/batchstore/minibatch_store_test.go b/disperser/batcher/batchstore/minibatch_store_test.go index c17ce73d7..695245624 100644 --- a/disperser/batcher/batchstore/minibatch_store_test.go +++ b/disperser/batcher/batchstore/minibatch_store_test.go @@ -35,7 +35,7 @@ var ( localStackPort = "4566" dynamoClient *dynamodb.Client - minibatchStore *batchstore.MinibatchStore + minibatchStore batcher.MinibatchStore UUID = uuid.New() minibatchTableName = fmt.Sprintf("test-MinibatchStore-%v", UUID) diff --git a/disperser/cmd/batcher/config.go b/disperser/cmd/batcher/config.go index e67d2db8d..27196c10a 100644 --- a/disperser/cmd/batcher/config.go +++ b/disperser/cmd/batcher/config.go @@ -16,7 +16,6 @@ import ( type Config struct { BatcherConfig batcher.Config - MinibatcherConfig batcher.MinibatcherConfig TimeoutConfig batcher.TimeoutConfig BlobstoreConfig blobstore.Config MinibatchStoreConfig batchstore.Config @@ -73,6 +72,12 @@ func NewConfig(ctx *cli.Context) (Config, error) { MaxBlobsToFetchFromStore: ctx.GlobalInt(flags.MaxBlobsToFetchFromStoreFlag.Name), FinalizationBlockDelay: ctx.GlobalUint(flags.FinalizationBlockDelayFlag.Name), EnableMinibatch: ctx.Bool(flags.EnableMinibatchFlag.Name), + MinibatcherConfig: batcher.MinibatcherConfig{ + PullInterval: ctx.GlobalDuration(flags.MinibatcherPullIntervalFlag.Name), + MaxNumConnections: ctx.GlobalUint(flags.MaxNodeConnectionsFlag.Name), + MaxNumRetriesPerBlob: ctx.GlobalUint(flags.MaxNumRetriesPerBlobFlag.Name), + MaxNumRetriesPerDispersal: ctx.GlobalUint(flags.MaxNumRetriesPerDispersalFlag.Name), + }, }, TimeoutConfig: batcher.TimeoutConfig{ EncodingTimeout: ctx.GlobalDuration(flags.EncodingTimeoutFlag.Name), @@ -94,12 +99,6 @@ func NewConfig(ctx *cli.Context) (Config, error) { IndexerConfig: indexer.ReadIndexerConfig(ctx), KMSKeyConfig: kmsConfig, EnableGnarkBundleEncoding: ctx.Bool(flags.EnableGnarkBundleEncodingFlag.Name), - MinibatcherConfig: batcher.MinibatcherConfig{ - PullInterval: ctx.GlobalDuration(flags.MinibatcherPullIntervalFlag.Name), - MaxNumConnections: ctx.GlobalUint(flags.MaxNodeConnectionsFlag.Name), - MaxNumRetriesPerBlob: ctx.GlobalUint(flags.MaxNumRetriesPerBlobFlag.Name), - MaxNumRetriesPerDispersal: ctx.GlobalUint(flags.MaxNumRetriesPerDispersalFlag.Name), - }, MinibatchStoreConfig: batchstore.Config{ TableName: ctx.GlobalString(flags.MinibatchStoreTableNameFlag.Name), }, diff --git a/disperser/cmd/batcher/main.go b/disperser/cmd/batcher/main.go index 27217affe..a8fe471a9 100644 --- a/disperser/cmd/batcher/main.go +++ b/disperser/cmd/batcher/main.go @@ -29,7 +29,6 @@ import ( gethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/rpc" - "github.com/gammazero/workerpool" "github.com/urfave/cli" ) @@ -241,41 +240,31 @@ func RunBatcher(ctx *cli.Context) error { logger.Info("Enabled metrics for Batcher", "socket", httpSocket) } + var minibatchStore batcher.MinibatchStore if config.BatcherConfig.EnableMinibatch { - minibatchStore := batchstore.NewMinibatchStore(dynamoClient, logger, config.MinibatchStoreConfig.TableName, time.Duration((storeDurationBlocks+blockStaleMeasure)*12)*time.Second) - streamerConfig := batcher.StreamerConfig{ - SRSOrder: config.BatcherConfig.SRSOrder, - EncodingRequestTimeout: config.BatcherConfig.PullInterval, - EncodingQueueLimit: config.BatcherConfig.EncodingRequestQueueSize, - TargetNumChunks: config.BatcherConfig.TargetNumChunks, - MaxBlobsToFetchFromStore: config.BatcherConfig.MaxBlobsToFetchFromStore, - FinalizationBlockDelay: config.BatcherConfig.FinalizationBlockDelay, - ChainStateTimeout: config.TimeoutConfig.ChainStateTimeout, - } - encodingWorkerPool := workerpool.New(config.BatcherConfig.NumConnections) - batchTrigger := batcher.NewEncodedSizeNotifier( - make(chan struct{}, 1), - uint64(config.BatcherConfig.BatchSizeMBLimit)*1024*1024, // convert to bytes - ) - encodingStreamer, err := batcher.NewEncodingStreamer(streamerConfig, queue, ics, encoderClient, asgn, batchTrigger, encodingWorkerPool, metrics.EncodingStreamerMetrics, logger) - if err != nil { - return err - } - pool := workerpool.New(int(config.MinibatcherConfig.MaxNumConnections)) - minibatcher, err := batcher.NewMinibatcher(config.MinibatcherConfig, queue, minibatchStore, dispatcher, ics, asgn, encodingStreamer, client, pool, logger) - if err != nil { - return err + minibatchStore = batchstore.NewMinibatchStore(dynamoClient, logger, config.MinibatchStoreConfig.TableName, time.Duration((storeDurationBlocks+blockStaleMeasure)*12)*time.Second) + } + + batcherInterface, err := batcher.NewBatcher(config.BatcherConfig, config.TimeoutConfig, queue, dispatcher, ics, asgn, encoderClient, agg, client, finalizer, tx, txnManager, logger, metrics, handleBatchLivenessChan, minibatchStore) + if err != nil { + return err + } + + if config.BatcherConfig.EnableMinibatch { + b, ok := batcherInterface.(batcher.Minibatcher) + if !ok { + log.Fatal("Failed to cast interface{} to batcher.Minibatcher") } - err = minibatcher.Start(context.Background()) + err = b.Start(context.Background()) if err != nil { return err } } else { - batcher, err := batcher.NewBatcher(config.BatcherConfig, config.TimeoutConfig, queue, dispatcher, ics, asgn, encoderClient, agg, client, finalizer, tx, txnManager, logger, metrics, handleBatchLivenessChan) - if err != nil { - return err + b, ok := batcherInterface.(batcher.Batcher) + if !ok { + log.Fatal("Failed to cast interface{} to batcher.Batcher") } - err = batcher.Start(context.Background()) + err = b.Start(context.Background()) if err != nil { return err }