From f6980b93a7cae1cdbea5c73208736b726bbe48f0 Mon Sep 17 00:00:00 2001 From: Patrick Schork <354473+pschork@users.noreply.github.com> Date: Tue, 23 Jul 2024 16:55:44 -0700 Subject: [PATCH] Refactor dynamodb client Convert PK to BatchID Adds custom minibatch struct conversion for batch, minibatch, dispersalRequest, dispersalResponse --- .../batcher/batchstore/minibatch_store.go | 236 +++++++++++++++--- .../batchstore/minibatch_store_test.go | 3 +- disperser/batcher/minibatch_store.go | 6 +- 3 files changed, 201 insertions(+), 44 deletions(-) diff --git a/disperser/batcher/batchstore/minibatch_store.go b/disperser/batcher/batchstore/minibatch_store.go index 82e3bd861..d4ba8d8a6 100644 --- a/disperser/batcher/batchstore/minibatch_store.go +++ b/disperser/batcher/batchstore/minibatch_store.go @@ -6,22 +6,56 @@ import ( "time" commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb" + "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/disperser/batcher" "github.com/Layr-Labs/eigensdk-go/logging" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" "github.com/aws/aws-sdk-go-v2/service/dynamodb" "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + gcommon "github.com/ethereum/go-ethereum/common" "github.com/google/uuid" ) const ( - batchKey = "BATCH#" - minibatchKey = "MINIBATCH#" - dispersalRequestKey = "DISPERSAL_REQUEST#" - dispersalResponseKey = "DISPERSAL_RESPONSE#" + batchSK = "BATCH#" + minibatchSK = "MINIBATCH#" + dispersalRequestSK = "DISPERSAL_REQUEST#" + dispersalResponseSK = "DISPERSAL_RESPONSE#" ) +type DynamoBatchRecord struct { + BatchID string + CreatedAt time.Time + ReferenceBlockNumber uint + HeaderHash [32]byte + AggregatePubKey *core.G2Point + AggregateSignature *core.Signature +} + +type DynamoMinibatchRecord struct { + BatchID string + MinibatchIndex uint + BlobHeaderHashes [][32]byte + BatchSize uint64 + ReferenceBlockNumber uint +} + +type DynamoDispersalRequest struct { + BatchID string + MinibatchIndex uint + OperatorID string + OperatorAddress string + NumBlobs uint + RequestedAt time.Time +} + +type DynamoDispersalResponse struct { + DynamoDispersalRequest + Signatures []*core.Signature + RespondedAt time.Time + Error error +} type MinibatchStore struct { dynamoDBClient *commondynamodb.Client tableName string @@ -43,7 +77,7 @@ func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacit return &dynamodb.CreateTableInput{ AttributeDefinitions: []types.AttributeDefinition{ { - AttributeName: aws.String("PK"), + AttributeName: aws.String("BatchID"), AttributeType: types.ScalarAttributeTypeS, }, { @@ -52,7 +86,7 @@ func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacit }, { AttributeName: aws.String("OperatorID"), - AttributeType: types.ScalarAttributeTypeB, + AttributeType: types.ScalarAttributeTypeS, }, { AttributeName: aws.String("RequestedAt"), @@ -61,7 +95,7 @@ func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacit }, KeySchema: []types.KeySchemaElement{ { - AttributeName: aws.String("PK"), + AttributeName: aws.String("BatchID"), KeyType: types.KeyTypeHash, }, { @@ -99,62 +133,176 @@ func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacit } } +func ToDynamoBatchRecord(br batcher.BatchRecord) DynamoBatchRecord { + return DynamoBatchRecord{ + BatchID: br.ID.String(), + CreatedAt: br.CreatedAt, + ReferenceBlockNumber: br.ReferenceBlockNumber, + HeaderHash: br.HeaderHash, + AggregatePubKey: br.AggregatePubKey, + AggregateSignature: br.AggregateSignature, + } +} + +func ToDynamoMinibatchRecord(br batcher.MinibatchRecord) DynamoMinibatchRecord { + return DynamoMinibatchRecord{ + BatchID: br.BatchID.String(), + MinibatchIndex: br.MinibatchIndex, + BlobHeaderHashes: br.BlobHeaderHashes, + BatchSize: br.BatchSize, + ReferenceBlockNumber: br.ReferenceBlockNumber, + } +} + +func ToDynamoDispersalRequest(dr batcher.DispersalRequest) DynamoDispersalRequest { + return DynamoDispersalRequest{ + BatchID: dr.BatchID.String(), + MinibatchIndex: dr.MinibatchIndex, + OperatorID: dr.OperatorID.Hex(), + OperatorAddress: dr.OperatorAddress.Hex(), + NumBlobs: dr.NumBlobs, + RequestedAt: dr.RequestedAt, + } +} + +func ToDynamoDispersalResponse(dr batcher.DispersalResponse) DynamoDispersalResponse { + return DynamoDispersalResponse{ + DynamoDispersalRequest: ToDynamoDispersalRequest(dr.DispersalRequest), + Signatures: dr.Signatures, + RespondedAt: dr.RespondedAt, + Error: dr.Error, + } +} + +func FromDynamoBatchRecord(dbr DynamoBatchRecord) (batcher.BatchRecord, error) { + batchID, err := uuid.Parse(dbr.BatchID) + if err != nil { + return batcher.BatchRecord{}, fmt.Errorf("failed to convert dynamo batch record batch ID %v from string: %v", dbr.BatchID, err) + } + + return batcher.BatchRecord{ + ID: batchID, + CreatedAt: dbr.CreatedAt, + ReferenceBlockNumber: dbr.ReferenceBlockNumber, + HeaderHash: dbr.HeaderHash, + AggregatePubKey: dbr.AggregatePubKey, + AggregateSignature: dbr.AggregateSignature, + }, nil +} + +func FromDynamoMinibatchRecord(dbr DynamoMinibatchRecord) (batcher.MinibatchRecord, error) { + batchID, err := uuid.Parse(dbr.BatchID) + if err != nil { + return batcher.MinibatchRecord{}, fmt.Errorf("failed to convert dynamo minibatch record batch ID %v from string: %v", dbr.BatchID, err) + } + + return batcher.MinibatchRecord{ + BatchID: batchID, + MinibatchIndex: dbr.MinibatchIndex, + BlobHeaderHashes: dbr.BlobHeaderHashes, + BatchSize: dbr.BatchSize, + ReferenceBlockNumber: dbr.ReferenceBlockNumber, + }, nil +} + +func FromDynamoDispersalRequest(ddr DynamoDispersalRequest) (batcher.DispersalRequest, error) { + batchID, err := uuid.Parse(ddr.BatchID) + if err != nil { + return batcher.DispersalRequest{}, fmt.Errorf("failed to convert dynamo dispersal request batch ID %v from string: %v", ddr.BatchID, err) + } + operatorID, err := core.OperatorIDFromHex(ddr.OperatorID) + if err != nil { + return batcher.DispersalRequest{}, fmt.Errorf("failed to convert dynamo dispersal request operator ID %v from hex: %v", ddr.OperatorID, err) + } + + return batcher.DispersalRequest{ + BatchID: batchID, + MinibatchIndex: ddr.MinibatchIndex, + OperatorID: operatorID, + OperatorAddress: gcommon.HexToAddress(ddr.OperatorAddress), + NumBlobs: ddr.NumBlobs, + RequestedAt: ddr.RequestedAt, + }, nil +} + +func FromDynamoDispersalResponse(ddr DynamoDispersalResponse) (batcher.DispersalResponse, error) { + request, err := FromDynamoDispersalRequest(ddr.DynamoDispersalRequest) + if err != nil { + return batcher.DispersalResponse{}, err + } + + return batcher.DispersalResponse{ + DispersalRequest: request, + Signatures: ddr.Signatures, + RespondedAt: ddr.RespondedAt, + Error: ddr.Error, + }, nil +} + func MarshalBatchRecord(batch *batcher.BatchRecord) (map[string]types.AttributeValue, error) { - fields, err := attributevalue.MarshalMap(batch) + fields, err := attributevalue.MarshalMap(ToDynamoBatchRecord(*batch)) if err != nil { return nil, err } - fields["PK"] = &types.AttributeValueMemberS{Value: batchKey + batch.ID.String()} - fields["SK"] = &types.AttributeValueMemberS{Value: batchKey + batch.ID.String()} + fields["SK"] = &types.AttributeValueMemberS{Value: batchSK + batch.ID.String()} fields["CreatedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", batch.CreatedAt.UTC().Unix())} return fields, nil } func MarshalMinibatchRecord(minibatch *batcher.MinibatchRecord) (map[string]types.AttributeValue, error) { - fields, err := attributevalue.MarshalMap(minibatch) + fields, err := attributevalue.MarshalMap(ToDynamoMinibatchRecord(*minibatch)) if err != nil { return nil, err } - fields["PK"] = &types.AttributeValueMemberS{Value: batchKey + minibatch.BatchID.String()} - fields["SK"] = &types.AttributeValueMemberS{Value: minibatchKey + fmt.Sprintf("%d", minibatch.MinibatchIndex)} + fields["SK"] = &types.AttributeValueMemberS{Value: minibatchSK + fmt.Sprintf("%d", minibatch.MinibatchIndex)} return fields, nil } func MarshalDispersalRequest(request *batcher.DispersalRequest) (map[string]types.AttributeValue, error) { - fields, err := attributevalue.MarshalMap(request) + fields, err := attributevalue.MarshalMap(ToDynamoDispersalRequest(*request)) if err != nil { return nil, err } - fields["PK"] = &types.AttributeValueMemberS{Value: batchKey + request.BatchID.String()} - fields["SK"] = &types.AttributeValueMemberS{Value: dispersalRequestKey + fmt.Sprintf("%d", request.MinibatchIndex)} + fields["SK"] = &types.AttributeValueMemberS{Value: dispersalRequestSK + fmt.Sprintf("%d", request.MinibatchIndex)} fields["RequestedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", request.RequestedAt.UTC().Unix())} return fields, nil } func MarshalDispersalResponse(response *batcher.DispersalResponse) (map[string]types.AttributeValue, error) { - fields, err := attributevalue.MarshalMap(response) + fields, err := attributevalue.MarshalMap(ToDynamoDispersalResponse(*response)) if err != nil { return nil, err } - fields["PK"] = &types.AttributeValueMemberS{Value: batchKey + response.BatchID.String()} - fields["SK"] = &types.AttributeValueMemberS{Value: dispersalResponseKey + fmt.Sprintf("%d", response.MinibatchIndex)} + fields["SK"] = &types.AttributeValueMemberS{Value: dispersalResponseSK + fmt.Sprintf("%d", response.MinibatchIndex)} fields["RespondedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", response.RespondedAt.UTC().Unix())} fields["RequestedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", response.DispersalRequest.RequestedAt.UTC().Unix())} return fields, nil } + func UnmarshalBatchRecord(item commondynamodb.Item) (*batcher.BatchRecord, error) { - batch := batcher.BatchRecord{} - err := attributevalue.UnmarshalMap(item, &batch) + dbr := DynamoBatchRecord{} + err := attributevalue.UnmarshalMap(item, &dbr) + if err != nil { + return nil, err + } + + batch, err := FromDynamoBatchRecord(dbr) if err != nil { return nil, err } + batch.CreatedAt = batch.CreatedAt.UTC() return &batch, nil } func UnmarshalMinibatchRecord(item commondynamodb.Item) (*batcher.MinibatchRecord, error) { - minibatch := batcher.MinibatchRecord{} - err := attributevalue.UnmarshalMap(item, &minibatch) + dbr := DynamoMinibatchRecord{} + err := attributevalue.UnmarshalMap(item, &dbr) + if err != nil { + return nil, err + } + + minibatch, err := FromDynamoMinibatchRecord(dbr) if err != nil { return nil, err } @@ -162,19 +310,29 @@ func UnmarshalMinibatchRecord(item commondynamodb.Item) (*batcher.MinibatchRecor } func UnmarshalDispersalRequest(item commondynamodb.Item) (*batcher.DispersalRequest, error) { - request := batcher.DispersalRequest{} - err := attributevalue.UnmarshalMap(item, &request) + ddr := DynamoDispersalRequest{} + err := attributevalue.UnmarshalMap(item, &ddr) if err != nil { return nil, fmt.Errorf("failed to unmarshal dispersal request from DynamoDB: %v", err) } + request, err := FromDynamoDispersalRequest(ddr) + if err != nil { + return nil, err + } + request.RequestedAt = request.RequestedAt.UTC() return &request, nil } func UnmarshalDispersalResponse(item commondynamodb.Item) (*batcher.DispersalResponse, error) { - response := batcher.DispersalResponse{} - err := attributevalue.UnmarshalMap(item, &response) + ddr := DynamoDispersalResponse{} + err := attributevalue.UnmarshalMap(item, &ddr) + if err != nil { + return nil, err + } + + response, err := FromDynamoDispersalResponse(ddr) if err != nil { return nil, err } @@ -222,11 +380,11 @@ func (m *MinibatchStore) PutDispersalResponse(ctx context.Context, response *bat func (m *MinibatchStore) GetBatch(ctx context.Context, batchID uuid.UUID) (*batcher.BatchRecord, error) { item, err := m.dynamoDBClient.GetItem(ctx, m.tableName, map[string]types.AttributeValue{ - "PK": &types.AttributeValueMemberS{ - Value: batchKey + batchID.String(), + "BatchID": &types.AttributeValueMemberS{ + Value: batchID.String(), }, "SK": &types.AttributeValueMemberS{ - Value: batchKey + batchID.String(), + Value: batchSK + batchID.String(), }, }) if err != nil { @@ -248,11 +406,11 @@ func (m *MinibatchStore) GetBatch(ctx context.Context, batchID uuid.UUID) (*batc func (m *MinibatchStore) GetMinibatch(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) (*batcher.MinibatchRecord, error) { item, err := m.dynamoDBClient.GetItem(ctx, m.tableName, map[string]types.AttributeValue{ - "PK": &types.AttributeValueMemberS{ - Value: batchKey + batchID.String(), + "BatchID": &types.AttributeValueMemberS{ + Value: batchID.String(), }, "SK": &types.AttributeValueMemberS{ - Value: minibatchKey + fmt.Sprintf("%d", minibatchIndex), + Value: minibatchSK + fmt.Sprintf("%d", minibatchIndex), }, }) if err != nil { @@ -274,11 +432,11 @@ func (m *MinibatchStore) GetMinibatch(ctx context.Context, batchID uuid.UUID, mi func (m *MinibatchStore) GetDispersalRequest(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) (*batcher.DispersalRequest, error) { item, err := m.dynamoDBClient.GetItem(ctx, m.tableName, map[string]types.AttributeValue{ - "PK": &types.AttributeValueMemberS{ - Value: batchKey + batchID.String(), + "BatchID": &types.AttributeValueMemberS{ + Value: batchID.String(), }, "SK": &types.AttributeValueMemberS{ - Value: dispersalRequestKey + fmt.Sprintf("%d", minibatchIndex), + Value: dispersalRequestSK + fmt.Sprintf("%d", minibatchIndex), }, }) if err != nil { @@ -300,11 +458,11 @@ func (m *MinibatchStore) GetDispersalRequest(ctx context.Context, batchID uuid.U func (m *MinibatchStore) GetDispersalResponse(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) (*batcher.DispersalResponse, error) { item, err := m.dynamoDBClient.GetItem(ctx, m.tableName, map[string]types.AttributeValue{ - "PK": &types.AttributeValueMemberS{ - Value: batchKey + batchID.String(), + "BatchID": &types.AttributeValueMemberS{ + Value: batchID.String(), }, "SK": &types.AttributeValueMemberS{ - Value: dispersalResponseKey + fmt.Sprintf("%d", minibatchIndex), + Value: dispersalResponseSK + fmt.Sprintf("%d", minibatchIndex), }, }) if err != nil { diff --git a/disperser/batcher/batchstore/minibatch_store_test.go b/disperser/batcher/batchstore/minibatch_store_test.go index 75f02dd68..2f072b9f3 100644 --- a/disperser/batcher/batchstore/minibatch_store_test.go +++ b/disperser/batcher/batchstore/minibatch_store_test.go @@ -28,7 +28,7 @@ var ( dockertestResource *dockertest.Resource deployLocalStack bool - localStackPort = "4570" + localStackPort = "4566" dynamoClient *dynamodb.Client minibatchStore *batchstore.MinibatchStore @@ -165,5 +165,4 @@ func TestPutDispersalResponse(t *testing.T) { r, err := minibatchStore.GetDispersalResponse(ctx, response.BatchID, response.MinibatchIndex) assert.NoError(t, err) assert.Equal(t, response, r) - assert.Error(t, err) } diff --git a/disperser/batcher/minibatch_store.go b/disperser/batcher/minibatch_store.go index 4fd573694..61d1cfd3f 100644 --- a/disperser/batcher/minibatch_store.go +++ b/disperser/batcher/minibatch_store.go @@ -27,9 +27,9 @@ type MinibatchRecord struct { } type DispersalRequest struct { - BatchID uuid.UUID - MinibatchIndex uint - OperatorID [32]byte + BatchID uuid.UUID + MinibatchIndex uint + core.OperatorID OperatorAddress gcommon.Address Socket string NumBlobs uint