Skip to content

Commit

Permalink
Split blob writer code out of larger PR. (#685)
Browse files Browse the repository at this point in the history
Signed-off-by: Cody Littley <[email protected]>
  • Loading branch information
cody-littley authored Aug 16, 2024
1 parent 12f1a6f commit f07e364
Show file tree
Hide file tree
Showing 5 changed files with 371 additions and 0 deletions.
162 changes: 162 additions & 0 deletions tools/traffic/workers/blob_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package workers

import (
"context"
"crypto/md5"
"crypto/rand"
"fmt"
"github.com/Layr-Labs/eigenda/api/clients"
"github.com/Layr-Labs/eigenda/encoding/utils/codec"
"github.com/Layr-Labs/eigenda/tools/traffic/config"
"github.com/Layr-Labs/eigenda/tools/traffic/metrics"
"github.com/Layr-Labs/eigensdk-go/logging"
"sync"
"time"
)

// BlobWriter sends blobs to a disperser at a configured rate.
type BlobWriter struct {
// The context for the generator. All work should cease when this context is cancelled.
ctx *context.Context

// Tracks the number of active goroutines within the generator.
waitGroup *sync.WaitGroup

// All logs should be written using this logger.
logger logging.Logger

// Config contains the configuration for the generator.
config *config.WorkerConfig

// disperser is the client used to send blobs to the disperser.
disperser clients.DisperserClient

// Unconfirmed keys are sent here.
unconfirmedKeyHandler KeyHandler

// fixedRandomData contains random data for blobs if RandomizeBlobs is false, and nil otherwise.
fixedRandomData []byte

// writeLatencyMetric is used to record latency for write requests.
writeLatencyMetric metrics.LatencyMetric

// writeSuccessMetric is used to record the number of successful write requests.
writeSuccessMetric metrics.CountMetric

// writeFailureMetric is used to record the number of failed write requests.
writeFailureMetric metrics.CountMetric
}

// NewBlobWriter creates a new BlobWriter instance.
func NewBlobWriter(
ctx *context.Context,
waitGroup *sync.WaitGroup,
logger logging.Logger,
config *config.WorkerConfig,
disperser clients.DisperserClient,
unconfirmedKeyHandler KeyHandler,
generatorMetrics metrics.Metrics) BlobWriter {

var fixedRandomData []byte
if config.RandomizeBlobs {
// New random data will be generated for each blob.
fixedRandomData = nil
} else {
// Use this random data for each blob.
fixedRandomData = make([]byte, config.DataSize)
_, err := rand.Read(fixedRandomData)
if err != nil {
panic(fmt.Sprintf("unable to read random data: %s", err))
}
fixedRandomData = codec.ConvertByPaddingEmptyByte(fixedRandomData)
}

return BlobWriter{
ctx: ctx,
waitGroup: waitGroup,
logger: logger,
config: config,
disperser: disperser,
unconfirmedKeyHandler: unconfirmedKeyHandler,
fixedRandomData: fixedRandomData,
writeLatencyMetric: generatorMetrics.NewLatencyMetric("write"),
writeSuccessMetric: generatorMetrics.NewCountMetric("write_success"),
writeFailureMetric: generatorMetrics.NewCountMetric("write_failure"),
}
}

// Start begins the blob writer goroutine.
func (writer *BlobWriter) Start() {
writer.waitGroup.Add(1)
ticker := time.NewTicker(writer.config.WriteRequestInterval)

go func() {
defer writer.waitGroup.Done()

for {
select {
case <-(*writer.ctx).Done():
return
case <-ticker.C:
writer.writeNextBlob()
}
}
}()
}

// writeNextBlob attempts to send a random blob to the disperser.
func (writer *BlobWriter) writeNextBlob() {
data, err := writer.getRandomData()
if err != nil {
writer.logger.Error("failed to get random data", "err", err)
return
}
key, err := metrics.InvokeAndReportLatency(writer.writeLatencyMetric, func() ([]byte, error) {
return writer.sendRequest(data)
})
if err != nil {
writer.writeFailureMetric.Increment()
writer.logger.Error("failed to send blob request", "err", err)
return
}

writer.writeSuccessMetric.Increment()

checksum := md5.Sum(data)
writer.unconfirmedKeyHandler.AddUnconfirmedKey(key, checksum, uint(len(data)))
}

// getRandomData returns a slice of random data to be used for a blob.
func (writer *BlobWriter) getRandomData() ([]byte, error) {
if writer.fixedRandomData != nil {
return writer.fixedRandomData, nil
}

data := make([]byte, writer.config.DataSize)
_, err := rand.Read(data)
if err != nil {
return nil, fmt.Errorf("unable to read random data: %w", err)
}
data = codec.ConvertByPaddingEmptyByte(data)

return data, nil
}

// sendRequest sends a blob to a disperser.
func (writer *BlobWriter) sendRequest(data []byte) (key []byte, err error) {
ctxTimeout, cancel := context.WithTimeout(*writer.ctx, writer.config.WriteTimeout)
defer cancel()

if writer.config.SignerPrivateKey != "" {
_, key, err = writer.disperser.DisperseBlobAuthenticated(
ctxTimeout,
data,
writer.config.CustomQuorums)
} else {
_, key, err = writer.disperser.DisperseBlob(
ctxTimeout,
data,
writer.config.CustomQuorums)
}
return
}
134 changes: 134 additions & 0 deletions tools/traffic/workers/blob_writer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package workers

import (
"context"
"crypto/md5"
"fmt"
"github.com/Layr-Labs/eigenda/common"
tu "github.com/Layr-Labs/eigenda/common/testutils"
"github.com/Layr-Labs/eigenda/disperser"
"github.com/Layr-Labs/eigenda/encoding/utils/codec"
"github.com/Layr-Labs/eigenda/tools/traffic/config"
"github.com/Layr-Labs/eigenda/tools/traffic/metrics"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"golang.org/x/exp/rand"
"sync"
"testing"
)

func TestBlobWriter(t *testing.T) {
tu.InitializeRandom()

ctx, cancel := context.WithCancel(context.Background())
waitGroup := sync.WaitGroup{}
logger, err := common.NewLogger(common.DefaultLoggerConfig())
assert.Nil(t, err)

dataSize := rand.Uint64()%1024 + 64

authenticated := rand.Intn(2) == 0
var signerPrivateKey string
if authenticated {
signerPrivateKey = "asdf"
}
var functionName string
if authenticated {
functionName = "DisperseBlobAuthenticated"
} else {
functionName = "DisperseBlob"
}

randomizeBlobs := rand.Intn(2) == 0

useCustomQuorum := rand.Intn(2) == 0
var customQuorum []uint8
if useCustomQuorum {
customQuorum = []uint8{1, 2, 3}
}

config := &config.WorkerConfig{
DataSize: dataSize,
SignerPrivateKey: signerPrivateKey,
RandomizeBlobs: randomizeBlobs,
CustomQuorums: customQuorum,
}

disperserClient := &MockDisperserClient{}
unconfirmedKeyHandler := &MockKeyHandler{}
unconfirmedKeyHandler.mock.On(
"AddUnconfirmedKey", mock.Anything, mock.Anything, mock.Anything).Return(nil)

generatorMetrics := metrics.NewMockMetrics()

writer := NewBlobWriter(
&ctx,
&waitGroup,
logger,
config,
disperserClient,
unconfirmedKeyHandler,
generatorMetrics)

errorCount := 0

var previousData []byte

for i := 0; i < 100; i++ {
var errorToReturn error
if i%10 == 0 {
errorToReturn = fmt.Errorf("intentional error for testing purposes")
errorCount++
} else {
errorToReturn = nil
}

// This is the key that will be assigned to the next blob.
keyToReturn := make([]byte, 32)
_, err = rand.Read(keyToReturn)
assert.Nil(t, err)

status := disperser.Processing
disperserClient.mock = mock.Mock{} // reset mock state
disperserClient.mock.On(functionName, mock.Anything, customQuorum).Return(&status, keyToReturn, errorToReturn)

// Simulate the advancement of time (i.e. allow the writer to write the next blob).
writer.writeNextBlob()

disperserClient.mock.AssertNumberOfCalls(t, functionName, 1)
unconfirmedKeyHandler.mock.AssertNumberOfCalls(t, "AddUnconfirmedKey", i+1-errorCount)

if errorToReturn == nil {

dataSentToDisperser := disperserClient.mock.Calls[0].Arguments.Get(0).([]byte)
assert.NotNil(t, dataSentToDisperser)

// Strip away the extra encoding bytes. We should have data of the expected size.
decodedData := codec.RemoveEmptyByteFromPaddedBytes(dataSentToDisperser)
assert.Equal(t, dataSize, uint64(len(decodedData)))

// Verify that the proper data was sent to the unconfirmed key handler.
checksum := md5.Sum(dataSentToDisperser)

unconfirmedKeyHandler.mock.AssertCalled(t, "AddUnconfirmedKey", keyToReturn, checksum, uint(len(dataSentToDisperser)))

// Verify that data has the proper amount of randomness.
if previousData != nil {
if randomizeBlobs {
// We expect each blob to be different.
assert.NotEqual(t, previousData, dataSentToDisperser)
} else {
// We expect each blob to be the same.
assert.Equal(t, previousData, dataSentToDisperser)
}
}
previousData = dataSentToDisperser
}

// Verify metrics.
assert.Equal(t, float64(i+1-errorCount), generatorMetrics.GetCount("write_success"))
assert.Equal(t, float64(errorCount), generatorMetrics.GetCount("write_failure"))
}

cancel()
}
7 changes: 7 additions & 0 deletions tools/traffic/workers/key_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package workers

// KeyHandler is an interface describing an object that can accept unconfirmed keys.
type KeyHandler interface {
// AddUnconfirmedKey accepts an unconfirmed blob key, the checksum of the blob, and the size of the blob in bytes.
AddUnconfirmedKey(key []byte, checksum [16]byte, size uint)
}
44 changes: 44 additions & 0 deletions tools/traffic/workers/mock_disperser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package workers

import (
"context"
"github.com/Layr-Labs/eigenda/api/clients"
disperser_rpc "github.com/Layr-Labs/eigenda/api/grpc/disperser"
"github.com/Layr-Labs/eigenda/disperser"
"github.com/stretchr/testify/mock"
)

var _ clients.DisperserClient = (*MockDisperserClient)(nil)

type MockDisperserClient struct {
mock mock.Mock
}

func (m *MockDisperserClient) DisperseBlob(
ctx context.Context,
data []byte,
customQuorums []uint8) (*disperser.BlobStatus, []byte, error) {

args := m.mock.Called(data, customQuorums)

return args.Get(0).(*disperser.BlobStatus), args.Get(1).([]byte), args.Error(2)
}

func (m *MockDisperserClient) DisperseBlobAuthenticated(
ctx context.Context,
data []byte,
customQuorums []uint8) (*disperser.BlobStatus, []byte, error) {

args := m.mock.Called(data, customQuorums)
return args.Get(0).(*disperser.BlobStatus), args.Get(1).([]byte), args.Error(2)
}

func (m *MockDisperserClient) GetBlobStatus(ctx context.Context, key []byte) (*disperser_rpc.BlobStatusReply, error) {
args := m.mock.Called(key)
return args.Get(0).(*disperser_rpc.BlobStatusReply), args.Error(1)
}

func (m *MockDisperserClient) RetrieveBlob(ctx context.Context, batchHeaderHash []byte, blobIndex uint32) ([]byte, error) {
args := m.mock.Called(batchHeaderHash, blobIndex)
return args.Get(0).([]byte), args.Error(1)
}
24 changes: 24 additions & 0 deletions tools/traffic/workers/mock_key_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package workers

import (
"github.com/stretchr/testify/mock"
)

var _ KeyHandler = (*MockKeyHandler)(nil)

// MockKeyHandler is a stand-in for the blob verifier's UnconfirmedKeyHandler.
type MockKeyHandler struct {
mock mock.Mock

ProvidedKey []byte
ProvidedChecksum [16]byte
ProvidedSize uint
}

func (m *MockKeyHandler) AddUnconfirmedKey(key []byte, checksum [16]byte, size uint) {
m.mock.Called(key, checksum, size)

m.ProvidedKey = key
m.ProvidedChecksum = checksum
m.ProvidedSize = size
}

0 comments on commit f07e364

Please sign in to comment.