From f07e364d039c8a79af02ebbeab79e9d63ab54b22 Mon Sep 17 00:00:00 2001 From: Cody Littley <56973212+cody-littley@users.noreply.github.com> Date: Fri, 16 Aug 2024 12:06:00 -0500 Subject: [PATCH] Split blob writer code out of larger PR. (#685) Signed-off-by: Cody Littley --- tools/traffic/workers/blob_writer.go | 162 ++++++++++++++++++++++ tools/traffic/workers/blob_writer_test.go | 134 ++++++++++++++++++ tools/traffic/workers/key_handler.go | 7 + tools/traffic/workers/mock_disperser.go | 44 ++++++ tools/traffic/workers/mock_key_handler.go | 24 ++++ 5 files changed, 371 insertions(+) create mode 100644 tools/traffic/workers/blob_writer.go create mode 100644 tools/traffic/workers/blob_writer_test.go create mode 100644 tools/traffic/workers/key_handler.go create mode 100644 tools/traffic/workers/mock_disperser.go create mode 100644 tools/traffic/workers/mock_key_handler.go diff --git a/tools/traffic/workers/blob_writer.go b/tools/traffic/workers/blob_writer.go new file mode 100644 index 000000000..a30a7e5bd --- /dev/null +++ b/tools/traffic/workers/blob_writer.go @@ -0,0 +1,162 @@ +package workers + +import ( + "context" + "crypto/md5" + "crypto/rand" + "fmt" + "github.com/Layr-Labs/eigenda/api/clients" + "github.com/Layr-Labs/eigenda/encoding/utils/codec" + "github.com/Layr-Labs/eigenda/tools/traffic/config" + "github.com/Layr-Labs/eigenda/tools/traffic/metrics" + "github.com/Layr-Labs/eigensdk-go/logging" + "sync" + "time" +) + +// BlobWriter sends blobs to a disperser at a configured rate. +type BlobWriter struct { + // The context for the generator. All work should cease when this context is cancelled. + ctx *context.Context + + // Tracks the number of active goroutines within the generator. + waitGroup *sync.WaitGroup + + // All logs should be written using this logger. + logger logging.Logger + + // Config contains the configuration for the generator. + config *config.WorkerConfig + + // disperser is the client used to send blobs to the disperser. + disperser clients.DisperserClient + + // Unconfirmed keys are sent here. + unconfirmedKeyHandler KeyHandler + + // fixedRandomData contains random data for blobs if RandomizeBlobs is false, and nil otherwise. + fixedRandomData []byte + + // writeLatencyMetric is used to record latency for write requests. + writeLatencyMetric metrics.LatencyMetric + + // writeSuccessMetric is used to record the number of successful write requests. + writeSuccessMetric metrics.CountMetric + + // writeFailureMetric is used to record the number of failed write requests. + writeFailureMetric metrics.CountMetric +} + +// NewBlobWriter creates a new BlobWriter instance. +func NewBlobWriter( + ctx *context.Context, + waitGroup *sync.WaitGroup, + logger logging.Logger, + config *config.WorkerConfig, + disperser clients.DisperserClient, + unconfirmedKeyHandler KeyHandler, + generatorMetrics metrics.Metrics) BlobWriter { + + var fixedRandomData []byte + if config.RandomizeBlobs { + // New random data will be generated for each blob. + fixedRandomData = nil + } else { + // Use this random data for each blob. + fixedRandomData = make([]byte, config.DataSize) + _, err := rand.Read(fixedRandomData) + if err != nil { + panic(fmt.Sprintf("unable to read random data: %s", err)) + } + fixedRandomData = codec.ConvertByPaddingEmptyByte(fixedRandomData) + } + + return BlobWriter{ + ctx: ctx, + waitGroup: waitGroup, + logger: logger, + config: config, + disperser: disperser, + unconfirmedKeyHandler: unconfirmedKeyHandler, + fixedRandomData: fixedRandomData, + writeLatencyMetric: generatorMetrics.NewLatencyMetric("write"), + writeSuccessMetric: generatorMetrics.NewCountMetric("write_success"), + writeFailureMetric: generatorMetrics.NewCountMetric("write_failure"), + } +} + +// Start begins the blob writer goroutine. +func (writer *BlobWriter) Start() { + writer.waitGroup.Add(1) + ticker := time.NewTicker(writer.config.WriteRequestInterval) + + go func() { + defer writer.waitGroup.Done() + + for { + select { + case <-(*writer.ctx).Done(): + return + case <-ticker.C: + writer.writeNextBlob() + } + } + }() +} + +// writeNextBlob attempts to send a random blob to the disperser. +func (writer *BlobWriter) writeNextBlob() { + data, err := writer.getRandomData() + if err != nil { + writer.logger.Error("failed to get random data", "err", err) + return + } + key, err := metrics.InvokeAndReportLatency(writer.writeLatencyMetric, func() ([]byte, error) { + return writer.sendRequest(data) + }) + if err != nil { + writer.writeFailureMetric.Increment() + writer.logger.Error("failed to send blob request", "err", err) + return + } + + writer.writeSuccessMetric.Increment() + + checksum := md5.Sum(data) + writer.unconfirmedKeyHandler.AddUnconfirmedKey(key, checksum, uint(len(data))) +} + +// getRandomData returns a slice of random data to be used for a blob. +func (writer *BlobWriter) getRandomData() ([]byte, error) { + if writer.fixedRandomData != nil { + return writer.fixedRandomData, nil + } + + data := make([]byte, writer.config.DataSize) + _, err := rand.Read(data) + if err != nil { + return nil, fmt.Errorf("unable to read random data: %w", err) + } + data = codec.ConvertByPaddingEmptyByte(data) + + return data, nil +} + +// sendRequest sends a blob to a disperser. +func (writer *BlobWriter) sendRequest(data []byte) (key []byte, err error) { + ctxTimeout, cancel := context.WithTimeout(*writer.ctx, writer.config.WriteTimeout) + defer cancel() + + if writer.config.SignerPrivateKey != "" { + _, key, err = writer.disperser.DisperseBlobAuthenticated( + ctxTimeout, + data, + writer.config.CustomQuorums) + } else { + _, key, err = writer.disperser.DisperseBlob( + ctxTimeout, + data, + writer.config.CustomQuorums) + } + return +} diff --git a/tools/traffic/workers/blob_writer_test.go b/tools/traffic/workers/blob_writer_test.go new file mode 100644 index 000000000..723894490 --- /dev/null +++ b/tools/traffic/workers/blob_writer_test.go @@ -0,0 +1,134 @@ +package workers + +import ( + "context" + "crypto/md5" + "fmt" + "github.com/Layr-Labs/eigenda/common" + tu "github.com/Layr-Labs/eigenda/common/testutils" + "github.com/Layr-Labs/eigenda/disperser" + "github.com/Layr-Labs/eigenda/encoding/utils/codec" + "github.com/Layr-Labs/eigenda/tools/traffic/config" + "github.com/Layr-Labs/eigenda/tools/traffic/metrics" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "golang.org/x/exp/rand" + "sync" + "testing" +) + +func TestBlobWriter(t *testing.T) { + tu.InitializeRandom() + + ctx, cancel := context.WithCancel(context.Background()) + waitGroup := sync.WaitGroup{} + logger, err := common.NewLogger(common.DefaultLoggerConfig()) + assert.Nil(t, err) + + dataSize := rand.Uint64()%1024 + 64 + + authenticated := rand.Intn(2) == 0 + var signerPrivateKey string + if authenticated { + signerPrivateKey = "asdf" + } + var functionName string + if authenticated { + functionName = "DisperseBlobAuthenticated" + } else { + functionName = "DisperseBlob" + } + + randomizeBlobs := rand.Intn(2) == 0 + + useCustomQuorum := rand.Intn(2) == 0 + var customQuorum []uint8 + if useCustomQuorum { + customQuorum = []uint8{1, 2, 3} + } + + config := &config.WorkerConfig{ + DataSize: dataSize, + SignerPrivateKey: signerPrivateKey, + RandomizeBlobs: randomizeBlobs, + CustomQuorums: customQuorum, + } + + disperserClient := &MockDisperserClient{} + unconfirmedKeyHandler := &MockKeyHandler{} + unconfirmedKeyHandler.mock.On( + "AddUnconfirmedKey", mock.Anything, mock.Anything, mock.Anything).Return(nil) + + generatorMetrics := metrics.NewMockMetrics() + + writer := NewBlobWriter( + &ctx, + &waitGroup, + logger, + config, + disperserClient, + unconfirmedKeyHandler, + generatorMetrics) + + errorCount := 0 + + var previousData []byte + + for i := 0; i < 100; i++ { + var errorToReturn error + if i%10 == 0 { + errorToReturn = fmt.Errorf("intentional error for testing purposes") + errorCount++ + } else { + errorToReturn = nil + } + + // This is the key that will be assigned to the next blob. + keyToReturn := make([]byte, 32) + _, err = rand.Read(keyToReturn) + assert.Nil(t, err) + + status := disperser.Processing + disperserClient.mock = mock.Mock{} // reset mock state + disperserClient.mock.On(functionName, mock.Anything, customQuorum).Return(&status, keyToReturn, errorToReturn) + + // Simulate the advancement of time (i.e. allow the writer to write the next blob). + writer.writeNextBlob() + + disperserClient.mock.AssertNumberOfCalls(t, functionName, 1) + unconfirmedKeyHandler.mock.AssertNumberOfCalls(t, "AddUnconfirmedKey", i+1-errorCount) + + if errorToReturn == nil { + + dataSentToDisperser := disperserClient.mock.Calls[0].Arguments.Get(0).([]byte) + assert.NotNil(t, dataSentToDisperser) + + // Strip away the extra encoding bytes. We should have data of the expected size. + decodedData := codec.RemoveEmptyByteFromPaddedBytes(dataSentToDisperser) + assert.Equal(t, dataSize, uint64(len(decodedData))) + + // Verify that the proper data was sent to the unconfirmed key handler. + checksum := md5.Sum(dataSentToDisperser) + + unconfirmedKeyHandler.mock.AssertCalled(t, "AddUnconfirmedKey", keyToReturn, checksum, uint(len(dataSentToDisperser))) + + // Verify that data has the proper amount of randomness. + if previousData != nil { + if randomizeBlobs { + // We expect each blob to be different. + assert.NotEqual(t, previousData, dataSentToDisperser) + } else { + // We expect each blob to be the same. + assert.Equal(t, previousData, dataSentToDisperser) + } + } + previousData = dataSentToDisperser + } + + // Verify metrics. + assert.Equal(t, float64(i+1-errorCount), generatorMetrics.GetCount("write_success")) + assert.Equal(t, float64(errorCount), generatorMetrics.GetCount("write_failure")) + } + + cancel() +} diff --git a/tools/traffic/workers/key_handler.go b/tools/traffic/workers/key_handler.go new file mode 100644 index 000000000..30c8b5ed9 --- /dev/null +++ b/tools/traffic/workers/key_handler.go @@ -0,0 +1,7 @@ +package workers + +// KeyHandler is an interface describing an object that can accept unconfirmed keys. +type KeyHandler interface { + // AddUnconfirmedKey accepts an unconfirmed blob key, the checksum of the blob, and the size of the blob in bytes. + AddUnconfirmedKey(key []byte, checksum [16]byte, size uint) +} diff --git a/tools/traffic/workers/mock_disperser.go b/tools/traffic/workers/mock_disperser.go new file mode 100644 index 000000000..ba1b01388 --- /dev/null +++ b/tools/traffic/workers/mock_disperser.go @@ -0,0 +1,44 @@ +package workers + +import ( + "context" + "github.com/Layr-Labs/eigenda/api/clients" + disperser_rpc "github.com/Layr-Labs/eigenda/api/grpc/disperser" + "github.com/Layr-Labs/eigenda/disperser" + "github.com/stretchr/testify/mock" +) + +var _ clients.DisperserClient = (*MockDisperserClient)(nil) + +type MockDisperserClient struct { + mock mock.Mock +} + +func (m *MockDisperserClient) DisperseBlob( + ctx context.Context, + data []byte, + customQuorums []uint8) (*disperser.BlobStatus, []byte, error) { + + args := m.mock.Called(data, customQuorums) + + return args.Get(0).(*disperser.BlobStatus), args.Get(1).([]byte), args.Error(2) +} + +func (m *MockDisperserClient) DisperseBlobAuthenticated( + ctx context.Context, + data []byte, + customQuorums []uint8) (*disperser.BlobStatus, []byte, error) { + + args := m.mock.Called(data, customQuorums) + return args.Get(0).(*disperser.BlobStatus), args.Get(1).([]byte), args.Error(2) +} + +func (m *MockDisperserClient) GetBlobStatus(ctx context.Context, key []byte) (*disperser_rpc.BlobStatusReply, error) { + args := m.mock.Called(key) + return args.Get(0).(*disperser_rpc.BlobStatusReply), args.Error(1) +} + +func (m *MockDisperserClient) RetrieveBlob(ctx context.Context, batchHeaderHash []byte, blobIndex uint32) ([]byte, error) { + args := m.mock.Called(batchHeaderHash, blobIndex) + return args.Get(0).([]byte), args.Error(1) +} diff --git a/tools/traffic/workers/mock_key_handler.go b/tools/traffic/workers/mock_key_handler.go new file mode 100644 index 000000000..2c48de995 --- /dev/null +++ b/tools/traffic/workers/mock_key_handler.go @@ -0,0 +1,24 @@ +package workers + +import ( + "github.com/stretchr/testify/mock" +) + +var _ KeyHandler = (*MockKeyHandler)(nil) + +// MockKeyHandler is a stand-in for the blob verifier's UnconfirmedKeyHandler. +type MockKeyHandler struct { + mock mock.Mock + + ProvidedKey []byte + ProvidedChecksum [16]byte + ProvidedSize uint +} + +func (m *MockKeyHandler) AddUnconfirmedKey(key []byte, checksum [16]byte, size uint) { + m.mock.Called(key, checksum, size) + + m.ProvidedKey = key + m.ProvidedChecksum = checksum + m.ProvidedSize = size +}