Skip to content

Commit

Permalink
Refactor dynamodb client
Browse files Browse the repository at this point in the history
Convert PK to BatchID
Adds custom minibatch struct conversion for batch, minibatch, dispersalRequest, dispersalResponse
  • Loading branch information
pschork committed Jul 24, 2024
1 parent 4e82420 commit f6980b9
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 44 deletions.
236 changes: 197 additions & 39 deletions disperser/batcher/batchstore/minibatch_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,56 @@ import (
"time"

commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/disperser/batcher"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
gcommon "github.com/ethereum/go-ethereum/common"
"github.com/google/uuid"
)

const (
batchKey = "BATCH#"
minibatchKey = "MINIBATCH#"
dispersalRequestKey = "DISPERSAL_REQUEST#"
dispersalResponseKey = "DISPERSAL_RESPONSE#"
batchSK = "BATCH#"
minibatchSK = "MINIBATCH#"
dispersalRequestSK = "DISPERSAL_REQUEST#"
dispersalResponseSK = "DISPERSAL_RESPONSE#"
)

type DynamoBatchRecord struct {
BatchID string
CreatedAt time.Time
ReferenceBlockNumber uint
HeaderHash [32]byte
AggregatePubKey *core.G2Point
AggregateSignature *core.Signature
}

type DynamoMinibatchRecord struct {
BatchID string
MinibatchIndex uint
BlobHeaderHashes [][32]byte
BatchSize uint64
ReferenceBlockNumber uint
}

type DynamoDispersalRequest struct {
BatchID string
MinibatchIndex uint
OperatorID string
OperatorAddress string
NumBlobs uint
RequestedAt time.Time
}

type DynamoDispersalResponse struct {
DynamoDispersalRequest
Signatures []*core.Signature
RespondedAt time.Time
Error error
}
type MinibatchStore struct {
dynamoDBClient *commondynamodb.Client
tableName string
Expand All @@ -43,7 +77,7 @@ func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacit
return &dynamodb.CreateTableInput{
AttributeDefinitions: []types.AttributeDefinition{
{
AttributeName: aws.String("PK"),
AttributeName: aws.String("BatchID"),
AttributeType: types.ScalarAttributeTypeS,
},
{
Expand All @@ -52,7 +86,7 @@ func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacit
},
{
AttributeName: aws.String("OperatorID"),
AttributeType: types.ScalarAttributeTypeB,
AttributeType: types.ScalarAttributeTypeS,
},
{
AttributeName: aws.String("RequestedAt"),
Expand All @@ -61,7 +95,7 @@ func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacit
},
KeySchema: []types.KeySchemaElement{
{
AttributeName: aws.String("PK"),
AttributeName: aws.String("BatchID"),
KeyType: types.KeyTypeHash,
},
{
Expand Down Expand Up @@ -99,82 +133,206 @@ func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacit
}
}

func ToDynamoBatchRecord(br batcher.BatchRecord) DynamoBatchRecord {
return DynamoBatchRecord{
BatchID: br.ID.String(),
CreatedAt: br.CreatedAt,
ReferenceBlockNumber: br.ReferenceBlockNumber,
HeaderHash: br.HeaderHash,
AggregatePubKey: br.AggregatePubKey,
AggregateSignature: br.AggregateSignature,
}
}

func ToDynamoMinibatchRecord(br batcher.MinibatchRecord) DynamoMinibatchRecord {
return DynamoMinibatchRecord{
BatchID: br.BatchID.String(),
MinibatchIndex: br.MinibatchIndex,
BlobHeaderHashes: br.BlobHeaderHashes,
BatchSize: br.BatchSize,
ReferenceBlockNumber: br.ReferenceBlockNumber,
}
}

func ToDynamoDispersalRequest(dr batcher.DispersalRequest) DynamoDispersalRequest {
return DynamoDispersalRequest{
BatchID: dr.BatchID.String(),
MinibatchIndex: dr.MinibatchIndex,
OperatorID: dr.OperatorID.Hex(),
OperatorAddress: dr.OperatorAddress.Hex(),
NumBlobs: dr.NumBlobs,
RequestedAt: dr.RequestedAt,
}
}

func ToDynamoDispersalResponse(dr batcher.DispersalResponse) DynamoDispersalResponse {
return DynamoDispersalResponse{
DynamoDispersalRequest: ToDynamoDispersalRequest(dr.DispersalRequest),
Signatures: dr.Signatures,
RespondedAt: dr.RespondedAt,
Error: dr.Error,
}
}

func FromDynamoBatchRecord(dbr DynamoBatchRecord) (batcher.BatchRecord, error) {
batchID, err := uuid.Parse(dbr.BatchID)
if err != nil {
return batcher.BatchRecord{}, fmt.Errorf("failed to convert dynamo batch record batch ID %v from string: %v", dbr.BatchID, err)
}

return batcher.BatchRecord{
ID: batchID,
CreatedAt: dbr.CreatedAt,
ReferenceBlockNumber: dbr.ReferenceBlockNumber,
HeaderHash: dbr.HeaderHash,
AggregatePubKey: dbr.AggregatePubKey,
AggregateSignature: dbr.AggregateSignature,
}, nil
}

func FromDynamoMinibatchRecord(dbr DynamoMinibatchRecord) (batcher.MinibatchRecord, error) {
batchID, err := uuid.Parse(dbr.BatchID)
if err != nil {
return batcher.MinibatchRecord{}, fmt.Errorf("failed to convert dynamo minibatch record batch ID %v from string: %v", dbr.BatchID, err)
}

return batcher.MinibatchRecord{
BatchID: batchID,
MinibatchIndex: dbr.MinibatchIndex,
BlobHeaderHashes: dbr.BlobHeaderHashes,
BatchSize: dbr.BatchSize,
ReferenceBlockNumber: dbr.ReferenceBlockNumber,
}, nil
}

func FromDynamoDispersalRequest(ddr DynamoDispersalRequest) (batcher.DispersalRequest, error) {
batchID, err := uuid.Parse(ddr.BatchID)
if err != nil {
return batcher.DispersalRequest{}, fmt.Errorf("failed to convert dynamo dispersal request batch ID %v from string: %v", ddr.BatchID, err)
}
operatorID, err := core.OperatorIDFromHex(ddr.OperatorID)
if err != nil {
return batcher.DispersalRequest{}, fmt.Errorf("failed to convert dynamo dispersal request operator ID %v from hex: %v", ddr.OperatorID, err)
}

return batcher.DispersalRequest{
BatchID: batchID,
MinibatchIndex: ddr.MinibatchIndex,
OperatorID: operatorID,
OperatorAddress: gcommon.HexToAddress(ddr.OperatorAddress),
NumBlobs: ddr.NumBlobs,
RequestedAt: ddr.RequestedAt,
}, nil
}

func FromDynamoDispersalResponse(ddr DynamoDispersalResponse) (batcher.DispersalResponse, error) {
request, err := FromDynamoDispersalRequest(ddr.DynamoDispersalRequest)
if err != nil {
return batcher.DispersalResponse{}, err
}

return batcher.DispersalResponse{
DispersalRequest: request,
Signatures: ddr.Signatures,
RespondedAt: ddr.RespondedAt,
Error: ddr.Error,
}, nil
}

func MarshalBatchRecord(batch *batcher.BatchRecord) (map[string]types.AttributeValue, error) {
fields, err := attributevalue.MarshalMap(batch)
fields, err := attributevalue.MarshalMap(ToDynamoBatchRecord(*batch))
if err != nil {
return nil, err
}
fields["PK"] = &types.AttributeValueMemberS{Value: batchKey + batch.ID.String()}
fields["SK"] = &types.AttributeValueMemberS{Value: batchKey + batch.ID.String()}
fields["SK"] = &types.AttributeValueMemberS{Value: batchSK + batch.ID.String()}
fields["CreatedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", batch.CreatedAt.UTC().Unix())}
return fields, nil
}

func MarshalMinibatchRecord(minibatch *batcher.MinibatchRecord) (map[string]types.AttributeValue, error) {
fields, err := attributevalue.MarshalMap(minibatch)
fields, err := attributevalue.MarshalMap(ToDynamoMinibatchRecord(*minibatch))
if err != nil {
return nil, err
}
fields["PK"] = &types.AttributeValueMemberS{Value: batchKey + minibatch.BatchID.String()}
fields["SK"] = &types.AttributeValueMemberS{Value: minibatchKey + fmt.Sprintf("%d", minibatch.MinibatchIndex)}
fields["SK"] = &types.AttributeValueMemberS{Value: minibatchSK + fmt.Sprintf("%d", minibatch.MinibatchIndex)}
return fields, nil
}

func MarshalDispersalRequest(request *batcher.DispersalRequest) (map[string]types.AttributeValue, error) {
fields, err := attributevalue.MarshalMap(request)
fields, err := attributevalue.MarshalMap(ToDynamoDispersalRequest(*request))
if err != nil {
return nil, err
}
fields["PK"] = &types.AttributeValueMemberS{Value: batchKey + request.BatchID.String()}
fields["SK"] = &types.AttributeValueMemberS{Value: dispersalRequestKey + fmt.Sprintf("%d", request.MinibatchIndex)}
fields["SK"] = &types.AttributeValueMemberS{Value: dispersalRequestSK + fmt.Sprintf("%d", request.MinibatchIndex)}
fields["RequestedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", request.RequestedAt.UTC().Unix())}
return fields, nil
}

func MarshalDispersalResponse(response *batcher.DispersalResponse) (map[string]types.AttributeValue, error) {
fields, err := attributevalue.MarshalMap(response)
fields, err := attributevalue.MarshalMap(ToDynamoDispersalResponse(*response))
if err != nil {
return nil, err
}
fields["PK"] = &types.AttributeValueMemberS{Value: batchKey + response.BatchID.String()}
fields["SK"] = &types.AttributeValueMemberS{Value: dispersalResponseKey + fmt.Sprintf("%d", response.MinibatchIndex)}
fields["SK"] = &types.AttributeValueMemberS{Value: dispersalResponseSK + fmt.Sprintf("%d", response.MinibatchIndex)}
fields["RespondedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", response.RespondedAt.UTC().Unix())}
fields["RequestedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", response.DispersalRequest.RequestedAt.UTC().Unix())}
return fields, nil
}

func UnmarshalBatchRecord(item commondynamodb.Item) (*batcher.BatchRecord, error) {
batch := batcher.BatchRecord{}
err := attributevalue.UnmarshalMap(item, &batch)
dbr := DynamoBatchRecord{}
err := attributevalue.UnmarshalMap(item, &dbr)
if err != nil {
return nil, err
}

batch, err := FromDynamoBatchRecord(dbr)
if err != nil {
return nil, err
}

batch.CreatedAt = batch.CreatedAt.UTC()
return &batch, nil
}

func UnmarshalMinibatchRecord(item commondynamodb.Item) (*batcher.MinibatchRecord, error) {
minibatch := batcher.MinibatchRecord{}
err := attributevalue.UnmarshalMap(item, &minibatch)
dbr := DynamoMinibatchRecord{}
err := attributevalue.UnmarshalMap(item, &dbr)
if err != nil {
return nil, err
}

minibatch, err := FromDynamoMinibatchRecord(dbr)
if err != nil {
return nil, err
}
return &minibatch, nil
}

func UnmarshalDispersalRequest(item commondynamodb.Item) (*batcher.DispersalRequest, error) {
request := batcher.DispersalRequest{}
err := attributevalue.UnmarshalMap(item, &request)
ddr := DynamoDispersalRequest{}
err := attributevalue.UnmarshalMap(item, &ddr)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal dispersal request from DynamoDB: %v", err)
}

request, err := FromDynamoDispersalRequest(ddr)
if err != nil {
return nil, err
}

request.RequestedAt = request.RequestedAt.UTC()
return &request, nil
}

func UnmarshalDispersalResponse(item commondynamodb.Item) (*batcher.DispersalResponse, error) {
response := batcher.DispersalResponse{}
err := attributevalue.UnmarshalMap(item, &response)
ddr := DynamoDispersalResponse{}
err := attributevalue.UnmarshalMap(item, &ddr)
if err != nil {
return nil, err
}

response, err := FromDynamoDispersalResponse(ddr)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -222,11 +380,11 @@ func (m *MinibatchStore) PutDispersalResponse(ctx context.Context, response *bat

func (m *MinibatchStore) GetBatch(ctx context.Context, batchID uuid.UUID) (*batcher.BatchRecord, error) {
item, err := m.dynamoDBClient.GetItem(ctx, m.tableName, map[string]types.AttributeValue{
"PK": &types.AttributeValueMemberS{
Value: batchKey + batchID.String(),
"BatchID": &types.AttributeValueMemberS{
Value: batchID.String(),
},
"SK": &types.AttributeValueMemberS{
Value: batchKey + batchID.String(),
Value: batchSK + batchID.String(),
},
})
if err != nil {
Expand All @@ -248,11 +406,11 @@ func (m *MinibatchStore) GetBatch(ctx context.Context, batchID uuid.UUID) (*batc

func (m *MinibatchStore) GetMinibatch(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) (*batcher.MinibatchRecord, error) {
item, err := m.dynamoDBClient.GetItem(ctx, m.tableName, map[string]types.AttributeValue{
"PK": &types.AttributeValueMemberS{
Value: batchKey + batchID.String(),
"BatchID": &types.AttributeValueMemberS{
Value: batchID.String(),
},
"SK": &types.AttributeValueMemberS{
Value: minibatchKey + fmt.Sprintf("%d", minibatchIndex),
Value: minibatchSK + fmt.Sprintf("%d", minibatchIndex),
},
})
if err != nil {
Expand All @@ -274,11 +432,11 @@ func (m *MinibatchStore) GetMinibatch(ctx context.Context, batchID uuid.UUID, mi

func (m *MinibatchStore) GetDispersalRequest(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) (*batcher.DispersalRequest, error) {
item, err := m.dynamoDBClient.GetItem(ctx, m.tableName, map[string]types.AttributeValue{
"PK": &types.AttributeValueMemberS{
Value: batchKey + batchID.String(),
"BatchID": &types.AttributeValueMemberS{
Value: batchID.String(),
},
"SK": &types.AttributeValueMemberS{
Value: dispersalRequestKey + fmt.Sprintf("%d", minibatchIndex),
Value: dispersalRequestSK + fmt.Sprintf("%d", minibatchIndex),
},
})
if err != nil {
Expand All @@ -300,11 +458,11 @@ func (m *MinibatchStore) GetDispersalRequest(ctx context.Context, batchID uuid.U

func (m *MinibatchStore) GetDispersalResponse(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) (*batcher.DispersalResponse, error) {
item, err := m.dynamoDBClient.GetItem(ctx, m.tableName, map[string]types.AttributeValue{
"PK": &types.AttributeValueMemberS{
Value: batchKey + batchID.String(),
"BatchID": &types.AttributeValueMemberS{
Value: batchID.String(),
},
"SK": &types.AttributeValueMemberS{
Value: dispersalResponseKey + fmt.Sprintf("%d", minibatchIndex),
Value: dispersalResponseSK + fmt.Sprintf("%d", minibatchIndex),
},
})
if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions disperser/batcher/batchstore/minibatch_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ var (
dockertestResource *dockertest.Resource

deployLocalStack bool
localStackPort = "4570"
localStackPort = "4566"

dynamoClient *dynamodb.Client
minibatchStore *batchstore.MinibatchStore
Expand Down Expand Up @@ -165,5 +165,4 @@ func TestPutDispersalResponse(t *testing.T) {
r, err := minibatchStore.GetDispersalResponse(ctx, response.BatchID, response.MinibatchIndex)
assert.NoError(t, err)
assert.Equal(t, response, r)
assert.Error(t, err)
}
Loading

0 comments on commit f6980b9

Please sign in to comment.