Skip to content

Commit

Permalink
refactor dispatcher timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim committed Jul 16, 2024
1 parent e22b250 commit 3bf5495
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 25 deletions.
2 changes: 1 addition & 1 deletion disperser/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error {
// Dispatch encoded batch
log.Debug("Dispatching encoded batch...")
stageTimer = time.Now()
update := b.Dispatcher.DisperseBatch(ctx, batch.State, batch.EncodedBlobs, batch.BatchHeader)
update := b.Dispatcher.DisperseBatch(ctx, batch.State, batch.EncodedBlobs, batch.BatchHeader, b.AttestationTimeout)
log.Debug("DisperseBatch took", "duration", time.Since(stageTimer))
h, err := batch.State.OperatorState.Hash()
if err != nil {
Expand Down
23 changes: 8 additions & 15 deletions disperser/batcher/grpc/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,37 +17,30 @@ import (
"google.golang.org/protobuf/proto"
)

type Config struct {
Timeout time.Duration
}

type dispatcher struct {
*Config

logger logging.Logger
metrics *batcher.DispatcherMetrics
}

func NewDispatcher(cfg *Config, logger logging.Logger, metrics *batcher.DispatcherMetrics) *dispatcher {
func NewDispatcher(logger logging.Logger, metrics *batcher.DispatcherMetrics) *dispatcher {
return &dispatcher{
Config: cfg,
logger: logger.With("component", "Dispatcher"),
metrics: metrics,
}
}

var _ disperser.Dispatcher = (*dispatcher)(nil)

func (c *dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, batchHeader *core.BatchHeader) chan core.SigningMessage {
func (c *dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, batchHeader *core.BatchHeader, timeout time.Duration) chan core.SigningMessage {
update := make(chan core.SigningMessage, len(state.IndexedOperators))

// Disperse
c.sendAllChunks(ctx, state, blobs, batchHeader, update)
c.sendAllChunks(ctx, state, blobs, batchHeader, timeout, update)

return update
}

func (c *dispatcher) sendAllChunks(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, batchHeader *core.BatchHeader, update chan core.SigningMessage) {
func (c *dispatcher) sendAllChunks(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, batchHeader *core.BatchHeader, timeout time.Duration, update chan core.SigningMessage) {
for id, op := range state.IndexedOperators {
go func(op core.IndexedOperatorInfo, id core.OperatorID) {
blobMessages := make([]*core.BlobMessage, 0)
Expand Down Expand Up @@ -79,7 +72,9 @@ func (c *dispatcher) sendAllChunks(ctx context.Context, state *core.IndexedOpera
}

requestedAt := time.Now()
sig, err := c.sendChunks(ctx, blobMessages, batchHeader, &op)
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
sig, err := c.SendChunksToOperator(ctx, blobMessages, batchHeader, &op)
latencyMs := float64(time.Since(requestedAt).Milliseconds())
if err != nil {
update <- core.SigningMessage{
Expand Down Expand Up @@ -109,7 +104,7 @@ func (c *dispatcher) sendAllChunks(ctx context.Context, state *core.IndexedOpera
}
}

func (c *dispatcher) sendChunks(ctx context.Context, blobs []*core.BlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) (*core.Signature, error) {
func (c *dispatcher) SendChunksToOperator(ctx context.Context, blobs []*core.BlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) (*core.Signature, error) {
// TODO Add secure Grpc

conn, err := grpc.Dial(
Expand All @@ -123,8 +118,6 @@ func (c *dispatcher) sendChunks(ctx context.Context, blobs []*core.BlobMessage,
defer conn.Close()

gc := node.NewDispersalClient(conn)
ctx, cancel := context.WithTimeout(ctx, c.Timeout)
defer cancel()
start := time.Now()
request, totalSize, err := GetStoreChunksRequest(blobs, batchHeader)
if err != nil {
Expand Down
4 changes: 1 addition & 3 deletions disperser/cmd/batcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,7 @@ func RunBatcher(ctx *cli.Context) error {

metrics := batcher.NewMetrics(config.MetricsConfig.HTTPPort, logger)

dispatcher := dispatcher.NewDispatcher(&dispatcher.Config{
Timeout: config.TimeoutConfig.AttestationTimeout,
}, logger, metrics.DispatcherMetrics)
dispatcher := dispatcher.NewDispatcher(logger, metrics.DispatcherMetrics)
asgn := &core.StdAssignmentCoordinator{}

var wallet walletsdk.Wallet
Expand Down
8 changes: 7 additions & 1 deletion disperser/disperser.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"strings"
"time"

"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/core"
Expand Down Expand Up @@ -176,7 +177,12 @@ type BlobStore interface {
}

type Dispatcher interface {
DisperseBatch(context.Context, *core.IndexedOperatorState, []core.EncodedBlob, *core.BatchHeader) chan core.SigningMessage
// DisperseBatch sends the blobs to the operators in the state and returns a channel to receive the signing messages
// Attestation timeout needs to be configured in the parameter to set the correct timeout for each dispersal request
DisperseBatch(context.Context, *core.IndexedOperatorState, []core.EncodedBlob, *core.BatchHeader, time.Duration) chan core.SigningMessage
// SendChunksToOperator sends the blobs to the operator and returns the signature and error
// It uses the context in the parameter to send the dispersal requests
SendChunksToOperator(ctx context.Context, blobs []*core.BlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) (*core.Signature, error)
}

// GenerateReverseIndexKey returns the key used to store the blob key in the reverse index
Expand Down
8 changes: 7 additions & 1 deletion disperser/mock/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package mock
import (
"context"
"errors"
"time"

"github.com/Layr-Labs/eigenda/core"
coremock "github.com/Layr-Labs/eigenda/core/mock"
Expand All @@ -23,7 +24,7 @@ func NewDispatcher(state *coremock.PrivateOperatorState) *Dispatcher {
}
}

func (d *Dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, header *core.BatchHeader) chan core.SigningMessage {
func (d *Dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, header *core.BatchHeader, timeout time.Duration) chan core.SigningMessage {
args := d.Called()
var nonSigners map[core.OperatorID]struct{}
if args.Get(0) != nil {
Expand Down Expand Up @@ -64,3 +65,8 @@ func (d *Dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOpera

return update
}

func (c *Dispatcher) SendChunksToOperator(ctx context.Context, blobs []*core.BlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) (*core.Signature, error) {
args := c.Called(ctx, blobs, batchHeader, op)
return args.Get(0).(*core.Signature), args.Error(1)
}
5 changes: 1 addition & 4 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,8 @@ type TestDisperser struct {
}

func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser.BlobStore, logger logging.Logger) TestDisperser {
dispatcherConfig := &dispatcher.Config{
Timeout: time.Second,
}
batcherMetrics := batcher.NewMetrics("9100", logger)
dispatcher := dispatcher.NewDispatcher(dispatcherConfig, logger, batcherMetrics.DispatcherMetrics)
dispatcher := dispatcher.NewDispatcher(logger, batcherMetrics.DispatcherMetrics)

transactor := &coremock.MockTransactor{}
transactor.On("OperatorIDToAddress").Return(gethcommon.Address{}, nil)
Expand Down

0 comments on commit 3bf5495

Please sign in to comment.