Skip to content

Commit

Permalink
Add an API to fetch blobs from a given batch header hash (#688)
Browse files Browse the repository at this point in the history
  • Loading branch information
dmanc authored Aug 14, 2024
1 parent 9a43392 commit e7b916f
Show file tree
Hide file tree
Showing 13 changed files with 929 additions and 126 deletions.
92 changes: 92 additions & 0 deletions disperser/common/blobstore/blob_metadata_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ func (s *BlobMetadataStore) GetBlobMetadataByStatusCount(ctx context.Context, st

// GetBlobMetadataByStatusWithPagination returns all the metadata with the given status upto the specified limit
// along with items, also returns a pagination token that can be used to fetch the next set of items
//
// Note that this may not return all the metadata for the batch if dynamodb query limit is reached.
// e.g 1mb limit for a single query
func (s *BlobMetadataStore) GetBlobMetadataByStatusWithPagination(ctx context.Context, status disperser.BlobStatus, limit int32, exclusiveStartKey *disperser.BlobStoreExclusiveStartKey) ([]*disperser.BlobMetadata, *disperser.BlobStoreExclusiveStartKey, error) {

var attributeMap map[string]types.AttributeValue
Expand Down Expand Up @@ -203,6 +206,72 @@ func (s *BlobMetadataStore) GetAllBlobMetadataByBatch(ctx context.Context, batch
return metadatas, nil
}

// GetBlobMetadataByStatusWithPagination returns all the metadata with the given status upto the specified limit
// along with items, also returns a pagination token that can be used to fetch the next set of items
//
// Note that this may not return all the metadata for the batch if dynamodb query limit is reached.
// e.g 1mb limit for a single query
func (s *BlobMetadataStore) GetAllBlobMetadataByBatchWithPagination(
ctx context.Context,
batchHeaderHash [32]byte,
limit int32,
exclusiveStartKey *disperser.BatchIndexExclusiveStartKey,
) ([]*disperser.BlobMetadata, *disperser.BatchIndexExclusiveStartKey, error) {
var attributeMap map[string]types.AttributeValue
var err error

// Convert the exclusive start key to a map of AttributeValue
if exclusiveStartKey != nil {
attributeMap, err = convertToAttribMapBatchIndex(exclusiveStartKey)
if err != nil {
return nil, nil, err
}
}

queryResult, err := s.dynamoDBClient.QueryIndexWithPagination(
ctx,
s.tableName,
batchIndexName,
"BatchHeaderHash = :batch_header_hash",
commondynamodb.ExpresseionValues{
":batch_header_hash": &types.AttributeValueMemberB{
Value: batchHeaderHash[:],
},
},
limit,
attributeMap,
)
if err != nil {
return nil, nil, err
}

s.logger.Info("Query result", "items", len(queryResult.Items), "lastEvaluatedKey", queryResult.LastEvaluatedKey)
// When no more results to fetch, the LastEvaluatedKey is nil
if queryResult.Items == nil && queryResult.LastEvaluatedKey == nil {
return nil, nil, nil
}

metadata := make([]*disperser.BlobMetadata, len(queryResult.Items))
for i, item := range queryResult.Items {
metadata[i], err = UnmarshalBlobMetadata(item)
if err != nil {
return nil, nil, err
}
}

lastEvaluatedKey := queryResult.LastEvaluatedKey
if lastEvaluatedKey == nil {
return metadata, nil, nil
}

// Convert the last evaluated key to a disperser.BatchIndexExclusiveStartKey
exclusiveStartKey, err = convertToExclusiveStartKeyBatchIndex(lastEvaluatedKey)
if err != nil {
return nil, nil, err
}
return metadata, exclusiveStartKey, nil
}

func (s *BlobMetadataStore) GetBlobMetadataInBatch(ctx context.Context, batchHeaderHash [32]byte, blobIndex uint32) (*disperser.BlobMetadata, error) {
items, err := s.dynamoDBClient.QueryIndex(ctx, s.tableName, batchIndexName, "BatchHeaderHash = :batch_header_hash AND BlobIndex = :blob_index", commondynamodb.ExpresseionValues{
":batch_header_hash": &types.AttributeValueMemberB{
Expand Down Expand Up @@ -468,6 +537,16 @@ func convertToExclusiveStartKey(exclusiveStartKeyMap map[string]types.AttributeV
return &blobStoreExclusiveStartKey, nil
}

func convertToExclusiveStartKeyBatchIndex(exclusiveStartKeyMap map[string]types.AttributeValue) (*disperser.BatchIndexExclusiveStartKey, error) {
blobStoreExclusiveStartKey := disperser.BatchIndexExclusiveStartKey{}
err := attributevalue.UnmarshalMap(exclusiveStartKeyMap, &blobStoreExclusiveStartKey)
if err != nil {
return nil, err
}

return &blobStoreExclusiveStartKey, nil
}

func convertToAttribMap(blobStoreExclusiveStartKey *disperser.BlobStoreExclusiveStartKey) (map[string]types.AttributeValue, error) {
if blobStoreExclusiveStartKey == nil {
// Return an empty map or nil
Expand All @@ -480,3 +559,16 @@ func convertToAttribMap(blobStoreExclusiveStartKey *disperser.BlobStoreExclusive
}
return avMap, nil
}

func convertToAttribMapBatchIndex(blobStoreExclusiveStartKey *disperser.BatchIndexExclusiveStartKey) (map[string]types.AttributeValue, error) {
if blobStoreExclusiveStartKey == nil {
// Return an empty map or nil
return nil, nil
}

avMap, err := attributevalue.MarshalMap(blobStoreExclusiveStartKey)
if err != nil {
return nil, err
}
return avMap, nil
}
101 changes: 98 additions & 3 deletions disperser/common/blobstore/blob_metadata_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestBlobMetadataStoreOperations(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, int32(1), finalizedCount)

confirmedMetadata := getConfirmedMetadata(t, blobKey1)
confirmedMetadata := getConfirmedMetadata(t, blobKey1, 1)
err = blobMetadataStore.UpdateBlobMetadata(ctx, blobKey1, confirmedMetadata)
assert.NoError(t, err)

Expand Down Expand Up @@ -188,6 +188,102 @@ func TestBlobMetadataStoreOperationsWithPagination(t *testing.T) {
})
}

func TestGetAllBlobMetadataByBatchWithPagination(t *testing.T) {
ctx := context.Background()
blobKey1 := disperser.BlobKey{
BlobHash: blobHash,
MetadataHash: "hash",
}
metadata1 := &disperser.BlobMetadata{
MetadataHash: blobKey1.MetadataHash,
BlobHash: blobHash,
BlobStatus: disperser.Processing,
Expiry: 0,
NumRetries: 0,
RequestMetadata: &disperser.RequestMetadata{
BlobRequestHeader: blob.RequestHeader,
BlobSize: blobSize,
RequestedAt: 123,
},
}
blobKey2 := disperser.BlobKey{
BlobHash: "blob2",
MetadataHash: "hash2",
}
metadata2 := &disperser.BlobMetadata{
MetadataHash: blobKey2.MetadataHash,
BlobHash: blobKey2.BlobHash,
BlobStatus: disperser.Finalized,
Expiry: 0,
NumRetries: 0,
RequestMetadata: &disperser.RequestMetadata{
BlobRequestHeader: blob.RequestHeader,
BlobSize: blobSize,
RequestedAt: 123,
},
ConfirmationInfo: &disperser.ConfirmationInfo{},
}
err := blobMetadataStore.QueueNewBlobMetadata(ctx, metadata1)
assert.NoError(t, err)
err = blobMetadataStore.QueueNewBlobMetadata(ctx, metadata2)
assert.NoError(t, err)

confirmedMetadata1 := getConfirmedMetadata(t, blobKey1, 1)
err = blobMetadataStore.UpdateBlobMetadata(ctx, blobKey1, confirmedMetadata1)
assert.NoError(t, err)

confirmedMetadata2 := getConfirmedMetadata(t, blobKey2, 2)
err = blobMetadataStore.UpdateBlobMetadata(ctx, blobKey2, confirmedMetadata2)
assert.NoError(t, err)

// Fetch the blob metadata with limit 1
metadata, exclusiveStartKey, err := blobMetadataStore.GetAllBlobMetadataByBatchWithPagination(ctx, confirmedMetadata1.ConfirmationInfo.BatchHeaderHash, 1, nil)
assert.NoError(t, err)
assert.Equal(t, metadata[0], confirmedMetadata1)
assert.NotNil(t, exclusiveStartKey)
assert.Equal(t, confirmedMetadata1.ConfirmationInfo.BlobIndex, exclusiveStartKey.BlobIndex)

// Get the next blob metadata with limit 1 and the exclusive start key
metadata, exclusiveStartKey, err = blobMetadataStore.GetAllBlobMetadataByBatchWithPagination(ctx, confirmedMetadata1.ConfirmationInfo.BatchHeaderHash, 1, exclusiveStartKey)
assert.NoError(t, err)
assert.Equal(t, metadata[0], confirmedMetadata2)
assert.Equal(t, confirmedMetadata2.ConfirmationInfo.BlobIndex, exclusiveStartKey.BlobIndex)

// Fetching the next blob metadata should return an empty list
metadata, exclusiveStartKey, err = blobMetadataStore.GetAllBlobMetadataByBatchWithPagination(ctx, confirmedMetadata1.ConfirmationInfo.BatchHeaderHash, 1, exclusiveStartKey)
assert.NoError(t, err)
assert.Len(t, metadata, 0)
assert.Nil(t, exclusiveStartKey)

// Fetch the blob metadata with limit 2
metadata, exclusiveStartKey, err = blobMetadataStore.GetAllBlobMetadataByBatchWithPagination(ctx, confirmedMetadata1.ConfirmationInfo.BatchHeaderHash, 2, nil)
assert.NoError(t, err)
assert.Len(t, metadata, 2)
assert.Equal(t, metadata[0], confirmedMetadata1)
assert.Equal(t, metadata[1], confirmedMetadata2)
assert.NotNil(t, exclusiveStartKey)
assert.Equal(t, confirmedMetadata2.ConfirmationInfo.BlobIndex, exclusiveStartKey.BlobIndex)

// Fetch the blob metadata with limit 3 should return only 2 items
metadata, exclusiveStartKey, err = blobMetadataStore.GetAllBlobMetadataByBatchWithPagination(ctx, confirmedMetadata1.ConfirmationInfo.BatchHeaderHash, 3, nil)
assert.NoError(t, err)
assert.Len(t, metadata, 2)
assert.Equal(t, metadata[0], confirmedMetadata1)
assert.Equal(t, metadata[1], confirmedMetadata2)
assert.Nil(t, exclusiveStartKey)

deleteItems(t, []commondynamodb.Key{
{
"MetadataHash": &types.AttributeValueMemberS{Value: blobKey1.MetadataHash},
"BlobHash": &types.AttributeValueMemberS{Value: blobKey1.BlobHash},
},
{
"MetadataHash": &types.AttributeValueMemberS{Value: blobKey2.MetadataHash},
"BlobHash": &types.AttributeValueMemberS{Value: blobKey2.BlobHash},
},
})
}

func TestBlobMetadataStoreOperationsWithPaginationNoStoredBlob(t *testing.T) {
ctx := context.Background()
// Query BlobMetadataStore for a blob that does not exist
Expand Down Expand Up @@ -255,9 +351,8 @@ func deleteItems(t *testing.T, keys []commondynamodb.Key) {
assert.NoError(t, err)
}

func getConfirmedMetadata(t *testing.T, metadataKey disperser.BlobKey) *disperser.BlobMetadata {
func getConfirmedMetadata(t *testing.T, metadataKey disperser.BlobKey, blobIndex uint32) *disperser.BlobMetadata {
batchHeaderHash := [32]byte{1, 2, 3}
blobIndex := uint32(1)
requestedAt := uint64(time.Now().Nanosecond())
var commitX, commitY fp.Element
_, err := commitX.SetString("21661178944771197726808973281966770251114553549453983978976194544185382599016")
Expand Down
4 changes: 4 additions & 0 deletions disperser/common/blobstore/shared_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,10 @@ func (s *SharedBlobStore) GetAllBlobMetadataByBatch(ctx context.Context, batchHe
return s.blobMetadataStore.GetAllBlobMetadataByBatch(ctx, batchHeaderHash)
}

func (s *SharedBlobStore) GetAllBlobMetadataByBatchWithPagination(ctx context.Context, batchHeaderHash [32]byte, limit int32, exclusiveStartKey *disperser.BatchIndexExclusiveStartKey) ([]*disperser.BlobMetadata, *disperser.BatchIndexExclusiveStartKey, error) {
return s.blobMetadataStore.GetAllBlobMetadataByBatchWithPagination(ctx, batchHeaderHash, limit, exclusiveStartKey)
}

// GetMetadata returns a blob metadata given a metadata key
func (s *SharedBlobStore) GetBlobMetadata(ctx context.Context, metadataKey disperser.BlobKey) (*disperser.BlobMetadata, error) {
return s.blobMetadataStore.GetBlobMetadata(ctx, metadataKey)
Expand Down
Loading

0 comments on commit e7b916f

Please sign in to comment.