From 41cdbfd67c7604d580102cf95f133baf1ae0d4ef Mon Sep 17 00:00:00 2001 From: Patrick Schork <354473+pschork@users.noreply.github.com> Date: Thu, 25 Jul 2024 14:13:58 -0700 Subject: [PATCH] Refactor custom type handling Add unique constraint on (batchID,BATCH#batchID) --- common/aws/dynamodb/client.go | 12 + .../batcher/batchstore/minibatch_store.go | 290 +++++++----------- .../batchstore/minibatch_store_test.go | 47 ++- disperser/batcher/minibatch_store.go | 12 +- 4 files changed, 161 insertions(+), 200 deletions(-) diff --git a/common/aws/dynamodb/client.go b/common/aws/dynamodb/client.go index db31cb80e..2e309a86b 100644 --- a/common/aws/dynamodb/client.go +++ b/common/aws/dynamodb/client.go @@ -105,6 +105,18 @@ func (c *Client) PutItem(ctx context.Context, tableName string, item Item) (err return nil } +func (c *Client) PutItemWithCondition(ctx context.Context, tableName string, item Item, condition string) (err error) { + _, err = c.dynamoClient.PutItem(ctx, &dynamodb.PutItemInput{ + TableName: aws.String(tableName), Item: item, + ConditionExpression: aws.String(condition), + }) + if err != nil { + return err + } + + return nil +} + // PutItems puts items in batches of 25 items (which is a limit DynamoDB imposes) // It returns the items that failed to be put. func (c *Client) PutItems(ctx context.Context, tableName string, items []Item) ([]Item, error) { diff --git a/disperser/batcher/batchstore/minibatch_store.go b/disperser/batcher/batchstore/minibatch_store.go index 066aa98bf..cf69c7b24 100644 --- a/disperser/batcher/batchstore/minibatch_store.go +++ b/disperser/batcher/batchstore/minibatch_store.go @@ -14,54 +14,17 @@ import ( "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" "github.com/aws/aws-sdk-go-v2/service/dynamodb" "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" - gcommon "github.com/ethereum/go-ethereum/common" "github.com/google/uuid" ) const ( - blobMetadataIndexName = "BlobMetadataIndex" - batchStatusIndexName = "BatchStatusIndex" - batchSK = "BATCH#" - minibatchSK = "MINIBATCH#" - dispersalRequestSK = "DISPERSAL_REQUEST#" - dispersalResponseSK = "DISPERSAL_RESPONSE#" + batchStatusIndexName = "BatchStatusIndex" + batchSKPrefix = "BATCH#" + minibatchSKPrefix = "MINIBATCH#" + dispersalRequestSKPrefix = "DISPERSAL_REQUEST#" + dispersalResponseSKPrefix = "DISPERSAL_RESPONSE#" ) -type DynamoBatchRecord struct { - BatchID string - CreatedAt time.Time - ReferenceBlockNumber uint - BatchStatus batcher.BatchStatus - HeaderHash [32]byte - AggregatePubKey *core.G2Point - AggregateSignature *core.Signature -} - -type DynamoMinibatchRecord struct { - BatchID string - MinibatchIndex uint - BlobHeaderHashes [][32]byte - BatchSize uint64 - ReferenceBlockNumber uint -} - -type DynamoDispersalRequest struct { - BatchID string - MinibatchIndex uint - OperatorID string - OperatorAddress string - NumBlobs uint - RequestedAt time.Time - BlobHash string - MetadataHash string -} - -type DynamoDispersalResponse struct { - DynamoDispersalRequest - Signatures []*core.Signature - RespondedAt time.Time - Error error -} type MinibatchStore struct { dynamoDBClient *commondynamodb.Client tableName string @@ -141,215 +104,188 @@ func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacit } } -func ToDynamoBatchRecord(br batcher.BatchRecord) DynamoBatchRecord { - return DynamoBatchRecord{ - BatchID: br.ID.String(), - CreatedAt: br.CreatedAt, - ReferenceBlockNumber: br.ReferenceBlockNumber, - BatchStatus: br.Status, - HeaderHash: br.HeaderHash, - AggregatePubKey: br.AggregatePubKey, - AggregateSignature: br.AggregateSignature, - } -} - -func ToDynamoMinibatchRecord(br batcher.MinibatchRecord) DynamoMinibatchRecord { - return DynamoMinibatchRecord{ - BatchID: br.BatchID.String(), - MinibatchIndex: br.MinibatchIndex, - BlobHeaderHashes: br.BlobHeaderHashes, - BatchSize: br.BatchSize, - ReferenceBlockNumber: br.ReferenceBlockNumber, +func MarshalBatchRecord(batch *batcher.BatchRecord) (map[string]types.AttributeValue, error) { + fields, err := attributevalue.MarshalMap(*batch) + if err != nil { + return nil, err } + fields["BatchID"] = &types.AttributeValueMemberS{Value: batch.ID.String()} + fields["BatchStatus"] = &types.AttributeValueMemberN{Value: strconv.Itoa(int(batch.Status))} + fields["SK"] = &types.AttributeValueMemberS{Value: batchSKPrefix + batch.ID.String()} + fields["CreatedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", batch.CreatedAt.UTC().Unix())} + return fields, nil } -func ToDynamoDispersalRequest(dr batcher.DispersalRequest) DynamoDispersalRequest { - return DynamoDispersalRequest{ - BatchID: dr.BatchID.String(), - MinibatchIndex: dr.MinibatchIndex, - OperatorID: dr.OperatorID.Hex(), - OperatorAddress: dr.OperatorAddress.Hex(), - NumBlobs: dr.NumBlobs, - RequestedAt: dr.RequestedAt, - BlobHash: dr.BlobHash, - MetadataHash: dr.MetadataHash, +func MarshalMinibatchRecord(minibatch *batcher.MinibatchRecord) (map[string]types.AttributeValue, error) { + fields, err := attributevalue.MarshalMap(*minibatch) + if err != nil { + return nil, err } + fields["BatchID"] = &types.AttributeValueMemberS{Value: minibatch.BatchID.String()} + fields["SK"] = &types.AttributeValueMemberS{Value: minibatchSKPrefix + fmt.Sprintf("%d", minibatch.MinibatchIndex)} + return fields, nil } -func ToDynamoDispersalResponse(dr batcher.DispersalResponse) DynamoDispersalResponse { - return DynamoDispersalResponse{ - DynamoDispersalRequest: ToDynamoDispersalRequest(dr.DispersalRequest), - Signatures: dr.Signatures, - RespondedAt: dr.RespondedAt, - Error: dr.Error, +func MarshalDispersalRequest(request *batcher.DispersalRequest) (map[string]types.AttributeValue, error) { + fields, err := attributevalue.MarshalMap(*request) + if err != nil { + return nil, err } + fields["BatchID"] = &types.AttributeValueMemberS{Value: request.BatchID.String()} + fields["SK"] = &types.AttributeValueMemberS{Value: dispersalRequestSKPrefix + fmt.Sprintf("%d#%s", request.MinibatchIndex, request.OperatorID.Hex())} + fields["OperatorID"] = &types.AttributeValueMemberS{Value: request.OperatorID.Hex()} + fields["RequestedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", request.RequestedAt.UTC().Unix())} + return fields, nil } -func FromDynamoBatchRecord(dbr DynamoBatchRecord) (batcher.BatchRecord, error) { - batchID, err := uuid.Parse(dbr.BatchID) +func MarshalDispersalResponse(response *batcher.DispersalResponse) (map[string]types.AttributeValue, error) { + fields, err := attributevalue.MarshalMap(*response) if err != nil { - return batcher.BatchRecord{}, fmt.Errorf("failed to convert dynamo batch record batch ID %v from string: %v", dbr.BatchID, err) + return nil, err } - - return batcher.BatchRecord{ - ID: batchID, - CreatedAt: dbr.CreatedAt, - ReferenceBlockNumber: dbr.ReferenceBlockNumber, - Status: dbr.BatchStatus, - HeaderHash: dbr.HeaderHash, - AggregatePubKey: dbr.AggregatePubKey, - AggregateSignature: dbr.AggregateSignature, - }, nil + fields["BatchID"] = &types.AttributeValueMemberS{Value: response.BatchID.String()} + fields["SK"] = &types.AttributeValueMemberS{Value: dispersalResponseSKPrefix + fmt.Sprintf("%d#%s", response.MinibatchIndex, response.OperatorID.Hex())} + fields["OperatorID"] = &types.AttributeValueMemberS{Value: response.OperatorID.Hex()} + fields["RespondedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", response.RespondedAt.UTC().Unix())} + fields["RequestedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", response.DispersalRequest.RequestedAt.UTC().Unix())} + return fields, nil } -func FromDynamoMinibatchRecord(dbr DynamoMinibatchRecord) (batcher.MinibatchRecord, error) { - batchID, err := uuid.Parse(dbr.BatchID) - if err != nil { - return batcher.MinibatchRecord{}, fmt.Errorf("failed to convert dynamo minibatch record batch ID %v from string: %v", dbr.BatchID, err) +func UnmarshalBatchID(item commondynamodb.Item) (*uuid.UUID, error) { + type BatchID struct { + BatchID string } - return batcher.MinibatchRecord{ - BatchID: batchID, - MinibatchIndex: dbr.MinibatchIndex, - BlobHeaderHashes: dbr.BlobHeaderHashes, - BatchSize: dbr.BatchSize, - ReferenceBlockNumber: dbr.ReferenceBlockNumber, - }, nil -} - -func FromDynamoDispersalRequest(ddr DynamoDispersalRequest) (batcher.DispersalRequest, error) { - batchID, err := uuid.Parse(ddr.BatchID) + batch := BatchID{} + err := attributevalue.UnmarshalMap(item, &batch) if err != nil { - return batcher.DispersalRequest{}, fmt.Errorf("failed to convert dynamo dispersal request batch ID %v from string: %v", ddr.BatchID, err) + return nil, err } - operatorID, err := core.OperatorIDFromHex(ddr.OperatorID) + + batchID, err := uuid.Parse(batch.BatchID) if err != nil { - return batcher.DispersalRequest{}, fmt.Errorf("failed to convert dynamo dispersal request operator ID %v from hex: %v", ddr.OperatorID, err) + return nil, err } - return batcher.DispersalRequest{ - BatchID: batchID, - MinibatchIndex: ddr.MinibatchIndex, - OperatorID: operatorID, - OperatorAddress: gcommon.HexToAddress(ddr.OperatorAddress), - NumBlobs: ddr.NumBlobs, - RequestedAt: ddr.RequestedAt, - BlobHash: ddr.BlobHash, - MetadataHash: ddr.MetadataHash, - }, nil + return &batchID, nil } -func FromDynamoDispersalResponse(ddr DynamoDispersalResponse) (batcher.DispersalResponse, error) { - request, err := FromDynamoDispersalRequest(ddr.DynamoDispersalRequest) - if err != nil { - return batcher.DispersalResponse{}, err +func UnmarshalBatchStatus(item commondynamodb.Item) (*batcher.BatchStatus, error) { + type BatchStatus struct { + BatchStatus uint } - return batcher.DispersalResponse{ - DispersalRequest: request, - Signatures: ddr.Signatures, - RespondedAt: ddr.RespondedAt, - Error: ddr.Error, - }, nil -} - -func MarshalBatchRecord(batch *batcher.BatchRecord) (map[string]types.AttributeValue, error) { - fields, err := attributevalue.MarshalMap(ToDynamoBatchRecord(*batch)) + batch := BatchStatus{} + err := attributevalue.UnmarshalMap(item, &batch) if err != nil { return nil, err } - fields["SK"] = &types.AttributeValueMemberS{Value: batchSK + batch.ID.String()} - fields["CreatedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", batch.CreatedAt.UTC().Unix())} - return fields, nil + + status := batcher.BatchStatus(batch.BatchStatus) + + return &status, nil } -func MarshalMinibatchRecord(minibatch *batcher.MinibatchRecord) (map[string]types.AttributeValue, error) { - fields, err := attributevalue.MarshalMap(ToDynamoMinibatchRecord(*minibatch)) +func UnmarshalOperatorID(item commondynamodb.Item) (*core.OperatorID, error) { + type OperatorID struct { + OperatorID string + } + + dispersal := OperatorID{} + err := attributevalue.UnmarshalMap(item, &dispersal) if err != nil { return nil, err } - fields["SK"] = &types.AttributeValueMemberS{Value: minibatchSK + fmt.Sprintf("%d", minibatch.MinibatchIndex)} - return fields, nil -} -func MarshalDispersalRequest(request *batcher.DispersalRequest) (map[string]types.AttributeValue, error) { - fields, err := attributevalue.MarshalMap(ToDynamoDispersalRequest(*request)) + operatorID, err := core.OperatorIDFromHex(dispersal.OperatorID) if err != nil { return nil, err } - fields["SK"] = &types.AttributeValueMemberS{Value: dispersalRequestSK + fmt.Sprintf("%d#%s", request.MinibatchIndex, request.OperatorID.Hex())} - fields["RequestedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", request.RequestedAt.UTC().Unix())} - return fields, nil + + return &operatorID, nil } -func MarshalDispersalResponse(response *batcher.DispersalResponse) (map[string]types.AttributeValue, error) { - fields, err := attributevalue.MarshalMap(ToDynamoDispersalResponse(*response)) +func UnmarshalBatchRecord(item commondynamodb.Item) (*batcher.BatchRecord, error) { + batch := batcher.BatchRecord{} + err := attributevalue.UnmarshalMap(item, &batch) if err != nil { return nil, err } - fields["SK"] = &types.AttributeValueMemberS{Value: dispersalResponseSK + fmt.Sprintf("%d#%s", response.MinibatchIndex, response.OperatorID.Hex())} - fields["RespondedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", response.RespondedAt.UTC().Unix())} - fields["RequestedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", response.DispersalRequest.RequestedAt.UTC().Unix())} - return fields, nil -} -func UnmarshalBatchRecord(item commondynamodb.Item) (*batcher.BatchRecord, error) { - dbr := DynamoBatchRecord{} - err := attributevalue.UnmarshalMap(item, &dbr) + batchID, err := UnmarshalBatchID(item) if err != nil { return nil, err } + batch.ID = *batchID - batch, err := FromDynamoBatchRecord(dbr) + batchStatus, err := UnmarshalBatchStatus(item) if err != nil { return nil, err } + batch.Status = *batchStatus batch.CreatedAt = batch.CreatedAt.UTC() return &batch, nil } func UnmarshalMinibatchRecord(item commondynamodb.Item) (*batcher.MinibatchRecord, error) { - dbr := DynamoMinibatchRecord{} - err := attributevalue.UnmarshalMap(item, &dbr) + minibatch := batcher.MinibatchRecord{} + err := attributevalue.UnmarshalMap(item, &minibatch) if err != nil { return nil, err } - minibatch, err := FromDynamoMinibatchRecord(dbr) + batchID, err := UnmarshalBatchID(item) if err != nil { return nil, err } + minibatch.BatchID = *batchID + return &minibatch, nil } func UnmarshalDispersalRequest(item commondynamodb.Item) (*batcher.DispersalRequest, error) { - ddr := DynamoDispersalRequest{} - err := attributevalue.UnmarshalMap(item, &ddr) + request := batcher.DispersalRequest{} + err := attributevalue.UnmarshalMap(item, &request) if err != nil { return nil, fmt.Errorf("failed to unmarshal dispersal request from DynamoDB: %v", err) } - request, err := FromDynamoDispersalRequest(ddr) + batchID, err := UnmarshalBatchID(item) + if err != nil { + return nil, err + } + request.BatchID = *batchID + + operatorID, err := UnmarshalOperatorID(item) if err != nil { return nil, err } + request.OperatorID = *operatorID request.RequestedAt = request.RequestedAt.UTC() return &request, nil } func UnmarshalDispersalResponse(item commondynamodb.Item) (*batcher.DispersalResponse, error) { - ddr := DynamoDispersalResponse{} - err := attributevalue.UnmarshalMap(item, &ddr) + response := batcher.DispersalResponse{} + err := attributevalue.UnmarshalMap(item, &response) if err != nil { return nil, err } - response, err := FromDynamoDispersalResponse(ddr) + batchID, err := UnmarshalBatchID(item) if err != nil { return nil, err } + response.BatchID = *batchID + + operatorID, err := UnmarshalOperatorID(item) + if err != nil { + return nil, err + } + response.OperatorID = *operatorID + response.RespondedAt = response.RespondedAt.UTC() response.DispersalRequest.RequestedAt = response.DispersalRequest.RequestedAt.UTC() return &response, nil @@ -360,8 +296,8 @@ func (m *MinibatchStore) PutBatch(ctx context.Context, batch *batcher.BatchRecor if err != nil { return err } - - return m.dynamoDBClient.PutItem(ctx, m.tableName, item) + constraint := "attribute_not_exists(BatchID) AND attribute_not_exists(SK)" + return m.dynamoDBClient.PutItemWithCondition(ctx, m.tableName, item, constraint) } func (m *MinibatchStore) PutMinibatch(ctx context.Context, minibatch *batcher.MinibatchRecord) error { @@ -397,7 +333,7 @@ func (m *MinibatchStore) GetBatch(ctx context.Context, batchID uuid.UUID) (*batc Value: batchID.String(), }, "SK": &types.AttributeValueMemberS{ - Value: batchSK + batchID.String(), + Value: batchSKPrefix + batchID.String(), }, }) if err != nil { @@ -481,7 +417,7 @@ func (m *MinibatchStore) UpdateBatchStatus(ctx context.Context, batchID uuid.UUI } _, err := m.dynamoDBClient.UpdateItem(ctx, m.tableName, map[string]types.AttributeValue{ "BatchID": &types.AttributeValueMemberS{Value: batchID.String()}, - "SK": &types.AttributeValueMemberS{Value: batchSK + batchID.String()}, + "SK": &types.AttributeValueMemberS{Value: batchSKPrefix + batchID.String()}, }, commondynamodb.Item{ "BatchStatus": &types.AttributeValueMemberN{ Value: strconv.Itoa(int(status)), @@ -501,7 +437,7 @@ func (m *MinibatchStore) GetMinibatch(ctx context.Context, batchID uuid.UUID, mi Value: batchID.String(), }, "SK": &types.AttributeValueMemberS{ - Value: minibatchSK + fmt.Sprintf("%d", minibatchIndex), + Value: minibatchSKPrefix + fmt.Sprintf("%d", minibatchIndex), }, }) if err != nil { @@ -527,7 +463,7 @@ func (m *MinibatchStore) GetMinibatches(ctx context.Context, batchID uuid.UUID) Value: batchID.String(), }, ":prefix": &types.AttributeValueMemberS{ - Value: minibatchSK, + Value: minibatchSKPrefix, }, }) if err != nil { @@ -552,7 +488,7 @@ func (m *MinibatchStore) GetDispersalRequest(ctx context.Context, batchID uuid.U Value: batchID.String(), }, "SK": &types.AttributeValueMemberS{ - Value: dispersalRequestSK + fmt.Sprintf("%d#%s", minibatchIndex, opID.Hex()), + Value: dispersalRequestSKPrefix + fmt.Sprintf("%d#%s", minibatchIndex, opID.Hex()), }, }) if err != nil { @@ -578,7 +514,7 @@ func (m *MinibatchStore) GetDispersalRequests(ctx context.Context, batchID uuid. Value: batchID.String(), }, ":prefix": &types.AttributeValueMemberS{ - Value: dispersalRequestSK, + Value: dispersalRequestSKPrefix, }, }) if err != nil { @@ -603,7 +539,7 @@ func (m *MinibatchStore) GetMinibatchDispersalRequests(ctx context.Context, batc Value: batchID.String(), }, ":sk": &types.AttributeValueMemberS{ - Value: dispersalRequestSK + fmt.Sprintf("%s#%d", batchID.String(), minibatchIndex), + Value: dispersalRequestSKPrefix + fmt.Sprintf("%s#%d", batchID.String(), minibatchIndex), }, }) if err != nil { @@ -632,7 +568,7 @@ func (m *MinibatchStore) GetDispersalResponse(ctx context.Context, batchID uuid. Value: batchID.String(), }, "SK": &types.AttributeValueMemberS{ - Value: dispersalResponseSK + fmt.Sprintf("%d#%s", minibatchIndex, opID.Hex()), + Value: dispersalResponseSKPrefix + fmt.Sprintf("%d#%s", minibatchIndex, opID.Hex()), }, }) if err != nil { @@ -658,7 +594,7 @@ func (m *MinibatchStore) GetDispersalResponses(ctx context.Context, batchID uuid Value: batchID.String(), }, ":prefix": &types.AttributeValueMemberS{ - Value: dispersalResponseSK, + Value: dispersalResponseSKPrefix, }, }) if err != nil { @@ -683,7 +619,7 @@ func (m *MinibatchStore) GetMinibatchDispersalResponses(ctx context.Context, bat Value: batchID.String(), }, ":sk": &types.AttributeValueMemberS{ - Value: dispersalResponseSK + fmt.Sprintf("%s#%d", batchID.String(), minibatchIndex), + Value: dispersalResponseSKPrefix + fmt.Sprintf("%s#%d", batchID.String(), minibatchIndex), }, }) if err != nil { diff --git a/disperser/batcher/batchstore/minibatch_store_test.go b/disperser/batcher/batchstore/minibatch_store_test.go index 21aa5bcd5..5e3d4c05e 100644 --- a/disperser/batcher/batchstore/minibatch_store_test.go +++ b/disperser/batcher/batchstore/minibatch_store_test.go @@ -38,6 +38,12 @@ var ( ) func setup(m *testing.M) { + deployLocalStack = !(os.Getenv("DEPLOY_LOCALSTACK") == "false") + fmt.Printf("deployLocalStack: %v\n", deployLocalStack) + if !deployLocalStack { + localStackPort = os.Getenv("LOCALSTACK_PORT") + } + if deployLocalStack { var err error dockertestPool, dockertestResource, err = deploy.StartDockertestWithLocalstackContainer(localStackPort) @@ -99,17 +105,28 @@ func TestPutBatch(t *testing.T) { } err = minibatchStore.PutBatch(ctx, batch) assert.NoError(t, err) + err = minibatchStore.PutBatch(ctx, batch) + assert.Error(t, err) b, err := minibatchStore.GetBatch(ctx, batch.ID) assert.NoError(t, err) assert.Equal(t, batch, b) err = minibatchStore.UpdateBatchStatus(ctx, batch.ID, batcher.BatchStatusFormed) assert.NoError(t, err) - err = minibatchStore.UpdateBatchStatus(ctx, batch.ID, 4) - assert.Error(t, err) u, err := minibatchStore.GetBatch(ctx, batch.ID) assert.NoError(t, err) - assert.Equal(t, u.Status, batcher.BatchStatusFormed) - assert.Equal(t, batch, b) + assert.Equal(t, batcher.BatchStatusFormed, u.Status) + err = minibatchStore.UpdateBatchStatus(ctx, batch.ID, batcher.BatchStatusAttested) + assert.NoError(t, err) + u, err = minibatchStore.GetBatch(ctx, batch.ID) + assert.NoError(t, err) + assert.Equal(t, batcher.BatchStatusAttested, u.Status) + err = minibatchStore.UpdateBatchStatus(ctx, batch.ID, batcher.BatchStatusFailed) + assert.NoError(t, err) + u, err = minibatchStore.GetBatch(ctx, batch.ID) + assert.NoError(t, err) + assert.Equal(t, batcher.BatchStatusFailed, u.Status) + err = minibatchStore.UpdateBatchStatus(ctx, batch.ID, 4) + assert.Error(t, err) } func TestGetBatchesByStatus(t *testing.T) { @@ -122,7 +139,7 @@ func TestGetBatchesByStatus(t *testing.T) { ID: id1, CreatedAt: ts, ReferenceBlockNumber: 1, - Status: batcher.BatchStatusAttested, + Status: batcher.BatchStatusFormed, HeaderHash: [32]byte{1}, AggregatePubKey: nil, AggregateSignature: nil, @@ -132,7 +149,7 @@ func TestGetBatchesByStatus(t *testing.T) { ID: id2, CreatedAt: ts, ReferenceBlockNumber: 1, - Status: batcher.BatchStatusAttested, + Status: batcher.BatchStatusFormed, HeaderHash: [32]byte{1}, AggregatePubKey: nil, AggregateSignature: nil, @@ -143,7 +160,7 @@ func TestGetBatchesByStatus(t *testing.T) { ID: id3, CreatedAt: ts, ReferenceBlockNumber: 1, - Status: batcher.BatchStatusAttested, + Status: batcher.BatchStatusFormed, HeaderHash: [32]byte{1}, AggregatePubKey: nil, AggregateSignature: nil, @@ -151,24 +168,20 @@ func TestGetBatchesByStatus(t *testing.T) { err = minibatchStore.PutBatch(ctx, batch3) assert.NoError(t, err) - pending, err := minibatchStore.GetBatchesByStatus(ctx, batcher.BatchStatusAttested) - assert.NoError(t, err) - assert.Equal(t, 3, len(pending)) - - err = minibatchStore.UpdateBatchStatus(ctx, id1, batcher.BatchStatusFormed) + attested, err := minibatchStore.GetBatchesByStatus(ctx, batcher.BatchStatusAttested) assert.NoError(t, err) + assert.Equal(t, 0, len(attested)) formed, err := minibatchStore.GetBatchesByStatus(ctx, batcher.BatchStatusFormed) assert.NoError(t, err) - assert.Equal(t, 2, len(formed)) + assert.Equal(t, 3, len(formed)) - pending, err = minibatchStore.GetBatchesByStatus(ctx, batcher.BatchStatusAttested) + err = minibatchStore.UpdateBatchStatus(ctx, id1, batcher.BatchStatusAttested) assert.NoError(t, err) - assert.Equal(t, 2, len(pending)) - failed, err := minibatchStore.GetBatchesByStatus(ctx, batcher.BatchStatusFailed) + formed, err = minibatchStore.GetBatchesByStatus(ctx, batcher.BatchStatusFormed) assert.NoError(t, err) - assert.Equal(t, 0, len(failed)) + assert.Equal(t, 2, len(formed)) } func TestPutMinibatch(t *testing.T) { diff --git a/disperser/batcher/minibatch_store.go b/disperser/batcher/minibatch_store.go index 9c74ecdc2..953a574eb 100644 --- a/disperser/batcher/minibatch_store.go +++ b/disperser/batcher/minibatch_store.go @@ -30,17 +30,17 @@ const ( ) type BatchRecord struct { - ID uuid.UUID + ID uuid.UUID `dynamodbav:"-"` CreatedAt time.Time ReferenceBlockNumber uint - Status BatchStatus + Status BatchStatus `dynamodbav:"-"` HeaderHash [32]byte AggregatePubKey *core.G2Point AggregateSignature *core.Signature } type MinibatchRecord struct { - BatchID uuid.UUID + BatchID uuid.UUID `dynamodbav:"-"` MinibatchIndex uint BlobHeaderHashes [][32]byte BatchSize uint64 // in bytes @@ -48,9 +48,9 @@ type MinibatchRecord struct { } type DispersalRequest struct { - BatchID uuid.UUID - MinibatchIndex uint - core.OperatorID + BatchID uuid.UUID `dynamodbav:"-"` + MinibatchIndex uint + core.OperatorID `dynamodbav:"-"` OperatorAddress gcommon.Address Socket string NumBlobs uint