diff --git a/disperser/common/blobstore/blob_metadata_store.go b/disperser/common/blobstore/blob_metadata_store.go index 4a1ea381d..033814bb7 100644 --- a/disperser/common/blobstore/blob_metadata_store.go +++ b/disperser/common/blobstore/blob_metadata_store.go @@ -130,6 +130,9 @@ func (s *BlobMetadataStore) GetBlobMetadataByStatusCount(ctx context.Context, st // GetBlobMetadataByStatusWithPagination returns all the metadata with the given status upto the specified limit // along with items, also returns a pagination token that can be used to fetch the next set of items +// +// Note that this may not return all the metadata for the batch if dynamodb query limit is reached. +// e.g 1mb limit for a single query func (s *BlobMetadataStore) GetBlobMetadataByStatusWithPagination(ctx context.Context, status disperser.BlobStatus, limit int32, exclusiveStartKey *disperser.BlobStoreExclusiveStartKey) ([]*disperser.BlobMetadata, *disperser.BlobStoreExclusiveStartKey, error) { var attributeMap map[string]types.AttributeValue @@ -203,6 +206,72 @@ func (s *BlobMetadataStore) GetAllBlobMetadataByBatch(ctx context.Context, batch return metadatas, nil } +// GetBlobMetadataByStatusWithPagination returns all the metadata with the given status upto the specified limit +// along with items, also returns a pagination token that can be used to fetch the next set of items +// +// Note that this may not return all the metadata for the batch if dynamodb query limit is reached. +// e.g 1mb limit for a single query +func (s *BlobMetadataStore) GetAllBlobMetadataByBatchWithPagination( + ctx context.Context, + batchHeaderHash [32]byte, + limit int32, + exclusiveStartKey *disperser.BatchIndexExclusiveStartKey, +) ([]*disperser.BlobMetadata, *disperser.BatchIndexExclusiveStartKey, error) { + var attributeMap map[string]types.AttributeValue + var err error + + // Convert the exclusive start key to a map of AttributeValue + if exclusiveStartKey != nil { + attributeMap, err = convertToAttribMapBatchIndex(exclusiveStartKey) + if err != nil { + return nil, nil, err + } + } + + queryResult, err := s.dynamoDBClient.QueryIndexWithPagination( + ctx, + s.tableName, + batchIndexName, + "BatchHeaderHash = :batch_header_hash", + commondynamodb.ExpresseionValues{ + ":batch_header_hash": &types.AttributeValueMemberB{ + Value: batchHeaderHash[:], + }, + }, + limit, + attributeMap, + ) + if err != nil { + return nil, nil, err + } + + s.logger.Info("Query result", "items", len(queryResult.Items), "lastEvaluatedKey", queryResult.LastEvaluatedKey) + // When no more results to fetch, the LastEvaluatedKey is nil + if queryResult.Items == nil && queryResult.LastEvaluatedKey == nil { + return nil, nil, nil + } + + metadata := make([]*disperser.BlobMetadata, len(queryResult.Items)) + for i, item := range queryResult.Items { + metadata[i], err = UnmarshalBlobMetadata(item) + if err != nil { + return nil, nil, err + } + } + + lastEvaluatedKey := queryResult.LastEvaluatedKey + if lastEvaluatedKey == nil { + return metadata, nil, nil + } + + // Convert the last evaluated key to a disperser.BatchIndexExclusiveStartKey + exclusiveStartKey, err = convertToExclusiveStartKeyBatchIndex(lastEvaluatedKey) + if err != nil { + return nil, nil, err + } + return metadata, exclusiveStartKey, nil +} + func (s *BlobMetadataStore) GetBlobMetadataInBatch(ctx context.Context, batchHeaderHash [32]byte, blobIndex uint32) (*disperser.BlobMetadata, error) { items, err := s.dynamoDBClient.QueryIndex(ctx, s.tableName, batchIndexName, "BatchHeaderHash = :batch_header_hash AND BlobIndex = :blob_index", commondynamodb.ExpresseionValues{ ":batch_header_hash": &types.AttributeValueMemberB{ @@ -468,6 +537,16 @@ func convertToExclusiveStartKey(exclusiveStartKeyMap map[string]types.AttributeV return &blobStoreExclusiveStartKey, nil } +func convertToExclusiveStartKeyBatchIndex(exclusiveStartKeyMap map[string]types.AttributeValue) (*disperser.BatchIndexExclusiveStartKey, error) { + blobStoreExclusiveStartKey := disperser.BatchIndexExclusiveStartKey{} + err := attributevalue.UnmarshalMap(exclusiveStartKeyMap, &blobStoreExclusiveStartKey) + if err != nil { + return nil, err + } + + return &blobStoreExclusiveStartKey, nil +} + func convertToAttribMap(blobStoreExclusiveStartKey *disperser.BlobStoreExclusiveStartKey) (map[string]types.AttributeValue, error) { if blobStoreExclusiveStartKey == nil { // Return an empty map or nil @@ -480,3 +559,16 @@ func convertToAttribMap(blobStoreExclusiveStartKey *disperser.BlobStoreExclusive } return avMap, nil } + +func convertToAttribMapBatchIndex(blobStoreExclusiveStartKey *disperser.BatchIndexExclusiveStartKey) (map[string]types.AttributeValue, error) { + if blobStoreExclusiveStartKey == nil { + // Return an empty map or nil + return nil, nil + } + + avMap, err := attributevalue.MarshalMap(blobStoreExclusiveStartKey) + if err != nil { + return nil, err + } + return avMap, nil +} diff --git a/disperser/common/blobstore/blob_metadata_store_test.go b/disperser/common/blobstore/blob_metadata_store_test.go index 462461946..ab00338b3 100644 --- a/disperser/common/blobstore/blob_metadata_store_test.go +++ b/disperser/common/blobstore/blob_metadata_store_test.go @@ -88,7 +88,7 @@ func TestBlobMetadataStoreOperations(t *testing.T) { assert.NoError(t, err) assert.Equal(t, int32(1), finalizedCount) - confirmedMetadata := getConfirmedMetadata(t, blobKey1) + confirmedMetadata := getConfirmedMetadata(t, blobKey1, 1) err = blobMetadataStore.UpdateBlobMetadata(ctx, blobKey1, confirmedMetadata) assert.NoError(t, err) @@ -188,6 +188,102 @@ func TestBlobMetadataStoreOperationsWithPagination(t *testing.T) { }) } +func TestGetAllBlobMetadataByBatchWithPagination(t *testing.T) { + ctx := context.Background() + blobKey1 := disperser.BlobKey{ + BlobHash: blobHash, + MetadataHash: "hash", + } + metadata1 := &disperser.BlobMetadata{ + MetadataHash: blobKey1.MetadataHash, + BlobHash: blobHash, + BlobStatus: disperser.Processing, + Expiry: 0, + NumRetries: 0, + RequestMetadata: &disperser.RequestMetadata{ + BlobRequestHeader: blob.RequestHeader, + BlobSize: blobSize, + RequestedAt: 123, + }, + } + blobKey2 := disperser.BlobKey{ + BlobHash: "blob2", + MetadataHash: "hash2", + } + metadata2 := &disperser.BlobMetadata{ + MetadataHash: blobKey2.MetadataHash, + BlobHash: blobKey2.BlobHash, + BlobStatus: disperser.Finalized, + Expiry: 0, + NumRetries: 0, + RequestMetadata: &disperser.RequestMetadata{ + BlobRequestHeader: blob.RequestHeader, + BlobSize: blobSize, + RequestedAt: 123, + }, + ConfirmationInfo: &disperser.ConfirmationInfo{}, + } + err := blobMetadataStore.QueueNewBlobMetadata(ctx, metadata1) + assert.NoError(t, err) + err = blobMetadataStore.QueueNewBlobMetadata(ctx, metadata2) + assert.NoError(t, err) + + confirmedMetadata1 := getConfirmedMetadata(t, blobKey1, 1) + err = blobMetadataStore.UpdateBlobMetadata(ctx, blobKey1, confirmedMetadata1) + assert.NoError(t, err) + + confirmedMetadata2 := getConfirmedMetadata(t, blobKey2, 2) + err = blobMetadataStore.UpdateBlobMetadata(ctx, blobKey2, confirmedMetadata2) + assert.NoError(t, err) + + // Fetch the blob metadata with limit 1 + metadata, exclusiveStartKey, err := blobMetadataStore.GetAllBlobMetadataByBatchWithPagination(ctx, confirmedMetadata1.ConfirmationInfo.BatchHeaderHash, 1, nil) + assert.NoError(t, err) + assert.Equal(t, metadata[0], confirmedMetadata1) + assert.NotNil(t, exclusiveStartKey) + assert.Equal(t, confirmedMetadata1.ConfirmationInfo.BlobIndex, exclusiveStartKey.BlobIndex) + + // Get the next blob metadata with limit 1 and the exclusive start key + metadata, exclusiveStartKey, err = blobMetadataStore.GetAllBlobMetadataByBatchWithPagination(ctx, confirmedMetadata1.ConfirmationInfo.BatchHeaderHash, 1, exclusiveStartKey) + assert.NoError(t, err) + assert.Equal(t, metadata[0], confirmedMetadata2) + assert.Equal(t, confirmedMetadata2.ConfirmationInfo.BlobIndex, exclusiveStartKey.BlobIndex) + + // Fetching the next blob metadata should return an empty list + metadata, exclusiveStartKey, err = blobMetadataStore.GetAllBlobMetadataByBatchWithPagination(ctx, confirmedMetadata1.ConfirmationInfo.BatchHeaderHash, 1, exclusiveStartKey) + assert.NoError(t, err) + assert.Len(t, metadata, 0) + assert.Nil(t, exclusiveStartKey) + + // Fetch the blob metadata with limit 2 + metadata, exclusiveStartKey, err = blobMetadataStore.GetAllBlobMetadataByBatchWithPagination(ctx, confirmedMetadata1.ConfirmationInfo.BatchHeaderHash, 2, nil) + assert.NoError(t, err) + assert.Len(t, metadata, 2) + assert.Equal(t, metadata[0], confirmedMetadata1) + assert.Equal(t, metadata[1], confirmedMetadata2) + assert.NotNil(t, exclusiveStartKey) + assert.Equal(t, confirmedMetadata2.ConfirmationInfo.BlobIndex, exclusiveStartKey.BlobIndex) + + // Fetch the blob metadata with limit 3 should return only 2 items + metadata, exclusiveStartKey, err = blobMetadataStore.GetAllBlobMetadataByBatchWithPagination(ctx, confirmedMetadata1.ConfirmationInfo.BatchHeaderHash, 3, nil) + assert.NoError(t, err) + assert.Len(t, metadata, 2) + assert.Equal(t, metadata[0], confirmedMetadata1) + assert.Equal(t, metadata[1], confirmedMetadata2) + assert.Nil(t, exclusiveStartKey) + + deleteItems(t, []commondynamodb.Key{ + { + "MetadataHash": &types.AttributeValueMemberS{Value: blobKey1.MetadataHash}, + "BlobHash": &types.AttributeValueMemberS{Value: blobKey1.BlobHash}, + }, + { + "MetadataHash": &types.AttributeValueMemberS{Value: blobKey2.MetadataHash}, + "BlobHash": &types.AttributeValueMemberS{Value: blobKey2.BlobHash}, + }, + }) +} + func TestBlobMetadataStoreOperationsWithPaginationNoStoredBlob(t *testing.T) { ctx := context.Background() // Query BlobMetadataStore for a blob that does not exist @@ -255,9 +351,8 @@ func deleteItems(t *testing.T, keys []commondynamodb.Key) { assert.NoError(t, err) } -func getConfirmedMetadata(t *testing.T, metadataKey disperser.BlobKey) *disperser.BlobMetadata { +func getConfirmedMetadata(t *testing.T, metadataKey disperser.BlobKey, blobIndex uint32) *disperser.BlobMetadata { batchHeaderHash := [32]byte{1, 2, 3} - blobIndex := uint32(1) requestedAt := uint64(time.Now().Nanosecond()) var commitX, commitY fp.Element _, err := commitX.SetString("21661178944771197726808973281966770251114553549453983978976194544185382599016") diff --git a/disperser/common/blobstore/shared_storage.go b/disperser/common/blobstore/shared_storage.go index 1e8b7a45f..456818a64 100644 --- a/disperser/common/blobstore/shared_storage.go +++ b/disperser/common/blobstore/shared_storage.go @@ -242,6 +242,10 @@ func (s *SharedBlobStore) GetAllBlobMetadataByBatch(ctx context.Context, batchHe return s.blobMetadataStore.GetAllBlobMetadataByBatch(ctx, batchHeaderHash) } +func (s *SharedBlobStore) GetAllBlobMetadataByBatchWithPagination(ctx context.Context, batchHeaderHash [32]byte, limit int32, exclusiveStartKey *disperser.BatchIndexExclusiveStartKey) ([]*disperser.BlobMetadata, *disperser.BatchIndexExclusiveStartKey, error) { + return s.blobMetadataStore.GetAllBlobMetadataByBatchWithPagination(ctx, batchHeaderHash, limit, exclusiveStartKey) +} + // GetMetadata returns a blob metadata given a metadata key func (s *SharedBlobStore) GetBlobMetadata(ctx context.Context, metadataKey disperser.BlobKey) (*disperser.BlobMetadata, error) { return s.blobMetadataStore.GetBlobMetadata(ctx, metadataKey) diff --git a/disperser/common/blobstore/shared_storage_test.go b/disperser/common/blobstore/shared_storage_test.go index 6c0b43894..f423967d9 100644 --- a/disperser/common/blobstore/shared_storage_test.go +++ b/disperser/common/blobstore/shared_storage_test.go @@ -8,9 +8,11 @@ import ( "testing" "time" + commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb" "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/disperser" "github.com/Layr-Labs/eigenda/encoding" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" "github.com/stretchr/testify/assert" "github.com/ethereum/go-ethereum/common" @@ -174,6 +176,232 @@ func TestSharedBlobStore(t *testing.T) { assert.NotNil(t, blob2Metadata) assertMetadata(t, blobKey, blobSize, requestedAt, disperser.Finalized, blob1Metadata) assertMetadata(t, blobKey2, blobSize2, requestedAt, disperser.InsufficientSignatures, blob2Metadata) + + // Cleanup: Delete test items + t.Cleanup(func() { + deleteItems(t, []commondynamodb.Key{ + { + "MetadataHash": &types.AttributeValueMemberS{Value: blobKey.MetadataHash}, + "BlobHash": &types.AttributeValueMemberS{Value: blobKey.BlobHash}, + }, + { + "MetadataHash": &types.AttributeValueMemberS{Value: blobKey2.MetadataHash}, + "BlobHash": &types.AttributeValueMemberS{Value: blobKey2.BlobHash}, + }, + }) + }) +} + +func TestSharedBlobStoreBlobMetadataStoreOperationsWithPagination(t *testing.T) { + ctx := context.Background() + blobKey1 := disperser.BlobKey{ + BlobHash: blobHash, + MetadataHash: "hash", + } + metadata1 := &disperser.BlobMetadata{ + MetadataHash: blobKey1.MetadataHash, + BlobHash: blobHash, + BlobStatus: disperser.Processing, + Expiry: 0, + NumRetries: 0, + RequestMetadata: &disperser.RequestMetadata{ + BlobRequestHeader: blob.RequestHeader, + BlobSize: blobSize, + RequestedAt: 123, + }, + } + blobKey2 := disperser.BlobKey{ + BlobHash: "blob2", + MetadataHash: "hash2", + } + metadata2 := &disperser.BlobMetadata{ + MetadataHash: blobKey2.MetadataHash, + BlobHash: blobKey2.BlobHash, + BlobStatus: disperser.Finalized, + Expiry: 0, + NumRetries: 0, + RequestMetadata: &disperser.RequestMetadata{ + BlobRequestHeader: blob.RequestHeader, + BlobSize: blobSize, + RequestedAt: 123, + }, + ConfirmationInfo: &disperser.ConfirmationInfo{}, + } + + // Setup: Queue new blob metadata + err := blobMetadataStore.QueueNewBlobMetadata(ctx, metadata1) + assert.NoError(t, err) + err = blobMetadataStore.QueueNewBlobMetadata(ctx, metadata2) + assert.NoError(t, err) + + // Test: Fetch individual blob metadata + fetchedMetadata, err := sharedStorage.GetBlobMetadata(ctx, blobKey1) + assert.NoError(t, err) + assert.Equal(t, metadata1, fetchedMetadata) + fetchedMetadata, err = sharedStorage.GetBlobMetadata(ctx, blobKey2) + assert.NoError(t, err) + assert.Equal(t, metadata2, fetchedMetadata) + + // Test: Fetch blob metadata by status with pagination + t.Run("Fetch Processing Blobs", func(t *testing.T) { + processing, lastEvaluatedKey, err := sharedStorage.GetBlobMetadataByStatusWithPagination(ctx, disperser.Processing, 1, nil) + assert.NoError(t, err) + assert.Len(t, processing, 1) + assert.Equal(t, metadata1, processing[0]) + assert.NotNil(t, lastEvaluatedKey) + + // Fetch next page (should be empty) + nextProcessing, nextLastEvaluatedKey, err := sharedStorage.GetBlobMetadataByStatusWithPagination(ctx, disperser.Processing, 1, lastEvaluatedKey) + assert.NoError(t, err) + assert.Len(t, nextProcessing, 0) + assert.Nil(t, nextLastEvaluatedKey) + }) + + t.Run("Fetch Finalized Blobs", func(t *testing.T) { + finalized, lastEvaluatedKey, err := sharedStorage.GetBlobMetadataByStatusWithPagination(ctx, disperser.Finalized, 1, nil) + assert.NoError(t, err) + assert.Len(t, finalized, 1) + assert.Equal(t, metadata2, finalized[0]) + assert.NotNil(t, lastEvaluatedKey) + + // Fetch next page (should be empty) + nextFinalized, nextLastEvaluatedKey, err := sharedStorage.GetBlobMetadataByStatusWithPagination(ctx, disperser.Finalized, 1, lastEvaluatedKey) + assert.NoError(t, err) + assert.Len(t, nextFinalized, 0) + assert.Nil(t, nextLastEvaluatedKey) + }) + + // Cleanup: Delete test items + t.Cleanup(func() { + deleteItems(t, []commondynamodb.Key{ + { + "MetadataHash": &types.AttributeValueMemberS{Value: blobKey1.MetadataHash}, + "BlobHash": &types.AttributeValueMemberS{Value: blobKey1.BlobHash}, + }, + { + "MetadataHash": &types.AttributeValueMemberS{Value: blobKey2.MetadataHash}, + "BlobHash": &types.AttributeValueMemberS{Value: blobKey2.BlobHash}, + }, + }) + }) +} + +func TestSharedBlobStoreGetAllBlobMetadataByBatchWithPagination(t *testing.T) { + ctx := context.Background() + batchHeaderHash := [32]byte{1, 2, 3} + + // Create and store multiple blob metadata for the same batch + numBlobs := 5 + blobKeys := make([]disperser.BlobKey, numBlobs) + for i := 0; i < numBlobs; i++ { + blobKey := disperser.BlobKey{ + BlobHash: fmt.Sprintf("blob%d", i), + MetadataHash: fmt.Sprintf("hash%d", i), + } + blobKeys[i] = blobKey + + metadata := &disperser.BlobMetadata{ + BlobHash: blobKey.BlobHash, + MetadataHash: blobKey.MetadataHash, + BlobStatus: disperser.Confirmed, + RequestMetadata: &disperser.RequestMetadata{ + BlobRequestHeader: blob.RequestHeader, + BlobSize: blobSize, + RequestedAt: uint64(time.Now().UnixNano()), + }, + ConfirmationInfo: &disperser.ConfirmationInfo{ + BatchHeaderHash: batchHeaderHash, + BlobIndex: uint32(i), + }, + } + + err := blobMetadataStore.QueueNewBlobMetadata(ctx, metadata) + assert.NoError(t, err) + } + + // Test pagination with a page size of 2 + t.Run("Fetch All Blobs with Pagination", func(t *testing.T) { + var allFetchedMetadata []*disperser.BlobMetadata + var lastEvaluatedKey *disperser.BatchIndexExclusiveStartKey + pageSize := int32(2) + + for { + fetchedMetadata, newLastEvaluatedKey, err := sharedStorage.GetAllBlobMetadataByBatchWithPagination(ctx, batchHeaderHash, pageSize, lastEvaluatedKey) + assert.NoError(t, err) + + allFetchedMetadata = append(allFetchedMetadata, fetchedMetadata...) + + if newLastEvaluatedKey == nil { + assert.Len(t, fetchedMetadata, numBlobs%int(pageSize)) + break + } else { + assert.Len(t, fetchedMetadata, int(pageSize)) + } + lastEvaluatedKey = newLastEvaluatedKey + } + + assert.Len(t, allFetchedMetadata, numBlobs) + + // Verify that all blob metadata is fetched and in the correct order + for i, metadata := range allFetchedMetadata { + assert.Equal(t, fmt.Sprintf("blob%d", i), metadata.BlobHash) + assert.Equal(t, fmt.Sprintf("hash%d", i), metadata.MetadataHash) + assert.Equal(t, uint32(i), metadata.ConfirmationInfo.BlobIndex) + } + }) + + // Test pagination with a page size of 10 + t.Run("Fetch All Blobs with Pagination (Page Size > Num Blobs)", func(t *testing.T) { + var allFetchedMetadata []*disperser.BlobMetadata + var lastEvaluatedKey *disperser.BatchIndexExclusiveStartKey + pageSize := int32(10) + + for { + fetchedMetadata, newLastEvaluatedKey, err := sharedStorage.GetAllBlobMetadataByBatchWithPagination(ctx, batchHeaderHash, pageSize, lastEvaluatedKey) + assert.NoError(t, err) + + allFetchedMetadata = append(allFetchedMetadata, fetchedMetadata...) + + if newLastEvaluatedKey == nil { + assert.Len(t, fetchedMetadata, numBlobs) + break + } else { + assert.Len(t, fetchedMetadata, int(pageSize)) + } + + lastEvaluatedKey = newLastEvaluatedKey + } + + assert.Len(t, allFetchedMetadata, numBlobs) + + // Verify that all blob metadata is fetched and in the correct order + for i, metadata := range allFetchedMetadata { + assert.Equal(t, fmt.Sprintf("blob%d", i), metadata.BlobHash) + assert.Equal(t, fmt.Sprintf("hash%d", i), metadata.MetadataHash) + assert.Equal(t, uint32(i), metadata.ConfirmationInfo.BlobIndex) + } + }) + + // Test invalid batch header hash + t.Run("Fetch All Blobs with Invalid Batch Header Hash", func(t *testing.T) { + invalidBatchHeaderHash := [32]byte{4, 5, 6} + allFetchedMetadata, lastEvaluatedKey, err := sharedStorage.GetAllBlobMetadataByBatchWithPagination(ctx, invalidBatchHeaderHash, 10, nil) + assert.NoError(t, err) + assert.Len(t, allFetchedMetadata, 0) + assert.Nil(t, lastEvaluatedKey) + }) + + // Cleanup: Delete test items + t.Cleanup(func() { + var keys []commondynamodb.Key + for _, blobKey := range blobKeys { + keys = append(keys, commondynamodb.Key{ + "MetadataHash": &types.AttributeValueMemberS{Value: blobKey.MetadataHash}, + "BlobHash": &types.AttributeValueMemberS{Value: blobKey.BlobHash}, + }) + } + deleteItems(t, keys) + }) } func assertMetadata(t *testing.T, blobKey disperser.BlobKey, expectedBlobSize uint, expectedRequestedAt uint64, expectedStatus disperser.BlobStatus, actualMetadata *disperser.BlobMetadata) { diff --git a/disperser/common/inmem/store.go b/disperser/common/inmem/store.go index 8fbab3607..5142c3f7e 100644 --- a/disperser/common/inmem/store.go +++ b/disperser/common/inmem/store.go @@ -279,6 +279,48 @@ func (q *BlobStore) GetAllBlobMetadataByBatch(ctx context.Context, batchHeaderHa return metas, nil } +func (q *BlobStore) GetAllBlobMetadataByBatchWithPagination(ctx context.Context, batchHeaderHash [32]byte, limit int32, exclusiveStartKey *disperser.BatchIndexExclusiveStartKey) ([]*disperser.BlobMetadata, *disperser.BatchIndexExclusiveStartKey, error) { + q.mu.RLock() + defer q.mu.RUnlock() + metas := make([]*disperser.BlobMetadata, 0) + foundStart := exclusiveStartKey == nil + + keys := make([]disperser.BlobKey, 0, len(q.Metadata)) + for k, v := range q.Metadata { + if v.ConfirmationInfo != nil && v.ConfirmationInfo.BatchHeaderHash == batchHeaderHash { + keys = append(keys, k) + } + } + sort.Slice(keys, func(i, j int) bool { + return q.Metadata[keys[i]].ConfirmationInfo.BlobIndex < q.Metadata[keys[j]].ConfirmationInfo.BlobIndex + }) + + for _, key := range keys { + meta := q.Metadata[key] + if foundStart { + metas = append(metas, meta) + if len(metas) == int(limit) { + return metas, &disperser.BatchIndexExclusiveStartKey{ + BatchHeaderHash: meta.ConfirmationInfo.BatchHeaderHash[:], + BlobIndex: meta.ConfirmationInfo.BlobIndex, + }, nil + } + } else if exclusiveStartKey != nil && meta.ConfirmationInfo.BlobIndex > uint32(exclusiveStartKey.BlobIndex) { + foundStart = true + metas = append(metas, meta) + if len(metas) == int(limit) { + return metas, &disperser.BatchIndexExclusiveStartKey{ + BatchHeaderHash: meta.ConfirmationInfo.BatchHeaderHash[:], + BlobIndex: meta.ConfirmationInfo.BlobIndex, + }, nil + } + } + } + + // Return all the metas if limit is not reached + return metas, nil, nil +} + func (q *BlobStore) GetBlobMetadata(ctx context.Context, blobKey disperser.BlobKey) (*disperser.BlobMetadata, error) { if meta, ok := q.Metadata[blobKey]; ok { return meta, nil diff --git a/disperser/dataapi/blobs_handlers.go b/disperser/dataapi/blobs_handlers.go index 21d477300..a77aa2885 100644 --- a/disperser/dataapi/blobs_handlers.go +++ b/disperser/dataapi/blobs_handlers.go @@ -35,6 +35,23 @@ func (s *server) getBlobs(ctx context.Context, limit int) ([]*BlobMetadataRespon return s.convertBlobMetadatasToBlobMetadataResponse(ctx, blobMetadatas) } +func (s *server) getBlobsFromBatchHeaderHash(ctx context.Context, batcherHeaderHash [32]byte, limit int, exclusiveStartKey *disperser.BatchIndexExclusiveStartKey) ([]*BlobMetadataResponse, *disperser.BatchIndexExclusiveStartKey, error) { + blobMetadatas, newExclusiveStartKey, err := s.getBlobMetadataByBatchHeaderHashWithLimit(ctx, batcherHeaderHash, int32(limit), exclusiveStartKey) + if err != nil { + return nil, nil, err + } + if len(blobMetadatas) == 0 { + return nil, nil, errNotFound + } + + responses, err := s.convertBlobMetadatasToBlobMetadataResponse(ctx, blobMetadatas) + if err != nil { + return nil, nil, err + } + + return responses, newExclusiveStartKey, nil +} + func (s *server) convertBlobMetadatasToBlobMetadataResponse(ctx context.Context, metadatas []*disperser.BlobMetadata) ([]*BlobMetadataResponse, error) { var ( err error @@ -96,3 +113,105 @@ func convertMetadataToBlobMetadataResponse(metadata *disperser.BlobMetadata) (*B BlobStatus: metadata.BlobStatus, }, nil } + +func (s *server) getBlobMetadataByBatchesWithLimit(ctx context.Context, limit int) ([]*Batch, []*disperser.BlobMetadata, error) { + var ( + blobMetadatas = make([]*disperser.BlobMetadata, 0) + batches = make([]*Batch, 0) + blobKeyPresence = make(map[string]struct{}) + batchPresence = make(map[string]struct{}) + ) + + for skip := 0; len(blobMetadatas) < limit && skip < limit; skip += maxQueryBatchesLimit { + batchesWithLimit, err := s.subgraphClient.QueryBatchesWithLimit(ctx, maxQueryBatchesLimit, skip) + if err != nil { + s.logger.Error("Failed to query batches", "error", err) + return nil, nil, err + } + + if len(batchesWithLimit) == 0 { + break + } + + for i := range batchesWithLimit { + s.logger.Debug("Getting blob metadata", "batchHeaderHash", batchesWithLimit[i].BatchHeaderHash) + var ( + batch = batchesWithLimit[i] + ) + if batch == nil { + continue + } + batchHeaderHash, err := ConvertHexadecimalToBytes(batch.BatchHeaderHash) + if err != nil { + s.logger.Error("Failed to convert batch header hash to hex string", "error", err) + continue + } + batchKey := string(batchHeaderHash[:]) + if _, found := batchPresence[batchKey]; !found { + batchPresence[batchKey] = struct{}{} + } else { + // The batch has processed, skip it. + s.logger.Error("Getting duplicate batch from the graph", "batch header hash", batchKey) + continue + } + + metadatas, err := s.blobstore.GetAllBlobMetadataByBatch(ctx, batchHeaderHash) + if err != nil { + s.logger.Error("Failed to get blob metadata", "error", err) + continue + } + for _, bm := range metadatas { + blobKey := bm.GetBlobKey().String() + if _, found := blobKeyPresence[blobKey]; !found { + blobKeyPresence[blobKey] = struct{}{} + blobMetadatas = append(blobMetadatas, bm) + } else { + s.logger.Error("Getting duplicate blob key from the blobstore", "blobkey", blobKey) + } + } + batches = append(batches, batch) + if len(blobMetadatas) >= limit { + break + } + } + } + + if len(blobMetadatas) >= limit { + blobMetadatas = blobMetadatas[:limit] + } + + return batches, blobMetadatas, nil +} + +func (s *server) getBlobMetadataByBatchHeaderHashWithLimit(ctx context.Context, batchHeaderHash [32]byte, limit int32, exclusiveStartKey *disperser.BatchIndexExclusiveStartKey) ([]*disperser.BlobMetadata, *disperser.BatchIndexExclusiveStartKey, error) { + var allMetadata []*disperser.BlobMetadata + var nextKey *disperser.BatchIndexExclusiveStartKey = exclusiveStartKey + + const maxLimit int32 = 1000 + remainingLimit := min(limit, maxLimit) + + s.logger.Debug("Getting blob metadata by batch header hash", "batchHeaderHash", batchHeaderHash, "remainingLimit", remainingLimit, "nextKey", nextKey) + for int32(len(allMetadata)) < remainingLimit { + metadatas, newNextKey, err := s.blobstore.GetAllBlobMetadataByBatchWithPagination(ctx, batchHeaderHash, remainingLimit-int32(len(allMetadata)), nextKey) + if err != nil { + s.logger.Error("Failed to get blob metadata", "error", err) + return nil, nil, err + } + + allMetadata = append(allMetadata, metadatas...) + + if newNextKey == nil { + // No more data to fetch + return allMetadata, nil, nil + } + + nextKey = newNextKey + + if int32(len(allMetadata)) == remainingLimit { + // We've reached the limit + break + } + } + + return allMetadata, nextKey, nil +} diff --git a/disperser/dataapi/docs/docs.go b/disperser/dataapi/docs/docs.go index 8e8a2262d..9beee4052 100644 --- a/disperser/dataapi/docs/docs.go +++ b/disperser/dataapi/docs/docs.go @@ -15,6 +15,64 @@ const docTemplate = `{ "host": "{{.Host}}", "basePath": "{{.BasePath}}", "paths": { + "/feed/batches/{batch_header_hash}/blobs": { + "get": { + "produces": [ + "application/json" + ], + "tags": [ + "Feed" + ], + "summary": "Fetch blob metadata by batch header hash", + "parameters": [ + { + "type": "string", + "description": "Batch Header Hash", + "name": "batch_header_hash", + "in": "path", + "required": true + }, + { + "type": "integer", + "description": "Limit [default: 10]", + "name": "limit", + "in": "query" + }, + { + "type": "string", + "description": "Next page token", + "name": "next_token", + "in": "query" + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/dataapi.BlobsResponse" + } + }, + "400": { + "description": "error: Bad request", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "404": { + "description": "error: Not found", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "500": { + "description": "error: Server error", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + } + } + } + }, "/feed/blobs": { "get": { "produces": [ @@ -656,6 +714,9 @@ const docTemplate = `{ "dataapi.Meta": { "type": "object", "properties": { + "next_token": { + "type": "string" + }, "size": { "type": "integer" } diff --git a/disperser/dataapi/docs/swagger.json b/disperser/dataapi/docs/swagger.json index e19fd43a6..a106eec62 100644 --- a/disperser/dataapi/docs/swagger.json +++ b/disperser/dataapi/docs/swagger.json @@ -11,32 +11,33 @@ "version": "1" }, "paths": { - "/ejector/operators": { - "post": { + "/feed/batches/{batch_header_hash}/blobs": { + "get": { "produces": [ "application/json" ], "tags": [ - "Ejector" + "Feed" ], - "summary": "Eject operators who violate the SLAs during the given time interval", + "summary": "Fetch blob metadata by batch header hash", "parameters": [ { - "type": "integer", - "description": "Lookback window for operator ejection [default: 86400]", - "name": "interval", - "in": "query" + "type": "string", + "description": "Batch Header Hash", + "name": "batch_header_hash", + "in": "path", + "required": true }, { "type": "integer", - "description": "End time for evaluating operator ejection [default: now]", - "name": "end", + "description": "Limit [default: 10]", + "name": "limit", "in": "query" }, { "type": "string", - "description": "Whether it's periodic or urgent ejection request [default: periodic]", - "name": "mode", + "description": "Next page token", + "name": "next_token", "in": "query" } ], @@ -44,7 +45,7 @@ "200": { "description": "OK", "schema": { - "$ref": "#/definitions/dataapi.EjectionResponse" + "$ref": "#/definitions/dataapi.BlobsResponse" } }, "400": { @@ -698,14 +699,6 @@ } } }, - "dataapi.EjectionResponse": { - "type": "object", - "properties": { - "transaction_hash": { - "type": "string" - } - } - }, "dataapi.ErrorResponse": { "type": "object", "properties": { @@ -717,6 +710,9 @@ "dataapi.Meta": { "type": "object", "properties": { + "next_token": { + "type": "string" + }, "size": { "type": "integer" } diff --git a/disperser/dataapi/docs/swagger.yaml b/disperser/dataapi/docs/swagger.yaml index 935033e60..ccf985794 100644 --- a/disperser/dataapi/docs/swagger.yaml +++ b/disperser/dataapi/docs/swagger.yaml @@ -66,11 +66,6 @@ definitions: meta: $ref: '#/definitions/dataapi.Meta' type: object - dataapi.EjectionResponse: - properties: - transaction_hash: - type: string - type: object dataapi.ErrorResponse: properties: error: @@ -78,6 +73,8 @@ definitions: type: object dataapi.Meta: properties: + next_token: + type: string size: type: integer type: object @@ -245,21 +242,21 @@ info: title: EigenDA Data Access API version: "1" paths: - /ejector/operators: - post: + /feed/batches/{batch_header_hash}/blobs: + get: parameters: - - description: 'Lookback window for operator ejection [default: 86400]' - in: query - name: interval - type: integer - - description: 'End time for evaluating operator ejection [default: now]' + - description: Batch Header Hash + in: path + name: batch_header_hash + required: true + type: string + - description: 'Limit [default: 10]' in: query - name: end + name: limit type: integer - - description: 'Whether it''s periodic or urgent ejection request [default: - periodic]' + - description: Next page token in: query - name: mode + name: next_token type: string produces: - application/json @@ -267,7 +264,7 @@ paths: "200": description: OK schema: - $ref: '#/definitions/dataapi.EjectionResponse' + $ref: '#/definitions/dataapi.BlobsResponse' "400": description: 'error: Bad request' schema: @@ -280,9 +277,9 @@ paths: description: 'error: Server error' schema: $ref: '#/definitions/dataapi.ErrorResponse' - summary: Eject operators who violate the SLAs during the given time interval + summary: Fetch blob metadata by batch header hash tags: - - Ejector + - Feed /feed/blobs: get: parameters: diff --git a/disperser/dataapi/server.go b/disperser/dataapi/server.go index 6f09293a4..b0c327ef6 100644 --- a/disperser/dataapi/server.go +++ b/disperser/dataapi/server.go @@ -2,6 +2,8 @@ package dataapi import ( "context" + "encoding/base64" + "encoding/json" "errors" "fmt" "math/big" @@ -43,7 +45,7 @@ const ( maxThroughputAge = 10 maxMetricAage = 10 maxFeedBlobsAge = 10 - maxFeedBlobAage = 300 // this is completely static + maxFeedBlobAge = 300 // this is completely static maxDisperserAvailabilityAge = 3 maxChurnerAvailabilityAge = 3 maxBatcherAvailabilityAge = 3 @@ -93,7 +95,8 @@ type ( } Meta struct { - Size int `json:"size"` + Size int `json:"size"` + NextToken string `json:"next_token,omitempty"` } BlobsResponse struct { @@ -194,7 +197,6 @@ func NewServer( } if eigenDAGRPCServiceChecker == nil { - eigenDAGRPCServiceChecker = NewEigenDAServiceHealthCheck(grpcConn, config.DisperserHostname, config.ChurnerHostname) } @@ -231,13 +233,13 @@ func (s *server) Start() error { basePath := "/api/v1" docs.SwaggerInfo.BasePath = basePath docs.SwaggerInfo.Host = os.Getenv("SWAGGER_HOST") - v1 := router.Group(basePath) { feed := v1.Group("/feed") { feed.GET("/blobs", s.FetchBlobsHandler) feed.GET("/blobs/:blob_key", s.FetchBlobHandler) + feed.GET("/batches/:batch_header_hash/blobs", s.FetchBlobsFromBatchHeaderHash) } operatorsInfo := v1.Group("/operators-info") { @@ -333,10 +335,118 @@ func (s *server) FetchBlobHandler(c *gin.Context) { } s.metrics.IncrementSuccessfulRequestNum("FetchBlob") - c.Writer.Header().Set(cacheControlParam, fmt.Sprintf("max-age=%d", maxFeedBlobAage)) + c.Writer.Header().Set(cacheControlParam, fmt.Sprintf("max-age=%d", maxFeedBlobAge)) c.JSON(http.StatusOK, metadata) } +// FetchBlobsFromBatchHeaderHash godoc +// +// @Summary Fetch blob metadata by batch header hash +// @Tags Feed +// @Produce json +// @Param batch_header_hash path string true "Batch Header Hash" +// @Param limit query int false "Limit [default: 10]" +// @Param next_token query string false "Next page token" +// @Success 200 {object} BlobsResponse +// @Failure 400 {object} ErrorResponse "error: Bad request" +// @Failure 404 {object} ErrorResponse "error: Not found" +// @Failure 500 {object} ErrorResponse "error: Server error" +// @Router /feed/batches/{batch_header_hash}/blobs [get] +func (s *server) FetchBlobsFromBatchHeaderHash(c *gin.Context) { + timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { + s.metrics.ObserveLatency("FetchBlobsFromBatchHeaderHash", f*1000) // make milliseconds + })) + defer timer.ObserveDuration() + + batchHeaderHash := c.Param("batch_header_hash") + batchHeaderHashBytes, err := ConvertHexadecimalToBytes([]byte(batchHeaderHash)) + if err != nil { + s.metrics.IncrementFailedRequestNum("FetchBlobsFromBatchHeaderHash") + errorResponse(c, fmt.Errorf("invalid batch header hash")) + return + } + + limit, err := strconv.Atoi(c.DefaultQuery("limit", "10")) + if err != nil { + s.metrics.IncrementFailedRequestNum("FetchBlobsFromBatchHeaderHash") + errorResponse(c, fmt.Errorf("invalid limit parameter")) + return + } + if limit <= 0 || limit > 1000 { + s.metrics.IncrementFailedRequestNum("FetchBlobsFromBatchHeaderHash") + errorResponse(c, fmt.Errorf("limit must be between 0 and 1000")) + return + } + + var exclusiveStartKey *disperser.BatchIndexExclusiveStartKey + nextToken := c.Query("next_token") + if nextToken != "" { + exclusiveStartKey, err = decodeNextToken(nextToken) + if err != nil { + s.metrics.IncrementFailedRequestNum("FetchBlobsFromBatchHeaderHash") + errorResponse(c, fmt.Errorf("invalid next_token")) + return + } + } + + metadatas, newExclusiveStartKey, err := s.getBlobsFromBatchHeaderHash(c.Request.Context(), batchHeaderHashBytes, limit, exclusiveStartKey) + if err != nil { + s.metrics.IncrementFailedRequestNum("FetchBlobsFromBatchHeaderHash") + errorResponse(c, err) + return + } + + var nextPageToken string + if newExclusiveStartKey != nil { + nextPageToken, err = encodeNextToken(newExclusiveStartKey) + if err != nil { + s.metrics.IncrementFailedRequestNum("FetchBlobsFromBatchHeaderHash") + errorResponse(c, fmt.Errorf("failed to generate next page token")) + return + } + } + + s.metrics.IncrementSuccessfulRequestNum("FetchBlobsFromBatchHeaderHash") + c.Writer.Header().Set(cacheControlParam, fmt.Sprintf("max-age=%d", maxFeedBlobAge)) + c.JSON(http.StatusOK, BlobsResponse{ + Meta: Meta{ + Size: len(metadatas), + NextToken: nextPageToken, + }, + Data: metadatas, + }) +} + +func decodeNextToken(token string) (*disperser.BatchIndexExclusiveStartKey, error) { + // Decode the base64 string + decodedBytes, err := base64.URLEncoding.DecodeString(token) + if err != nil { + return nil, fmt.Errorf("failed to decode token: %w", err) + } + + // Unmarshal the JSON into a BatchIndexExclusiveStartKey + var key disperser.BatchIndexExclusiveStartKey + err = json.Unmarshal(decodedBytes, &key) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal token: %w", err) + } + + return &key, nil +} + +func encodeNextToken(key *disperser.BatchIndexExclusiveStartKey) (string, error) { + // Marshal the key to JSON + jsonBytes, err := json.Marshal(key) + if err != nil { + return "", fmt.Errorf("failed to marshal key: %w", err) + } + + // Encode the JSON as a base64 string + token := base64.URLEncoding.EncodeToString(jsonBytes) + + return token, nil +} + // FetchBlobsHandler godoc // // @Summary Fetch blobs metadata list @@ -356,7 +466,14 @@ func (s *server) FetchBlobsHandler(c *gin.Context) { limit, err := strconv.Atoi(c.DefaultQuery("limit", "10")) if err != nil { - limit = 10 + s.metrics.IncrementFailedRequestNum("FetchBlobsFromBatchHeaderHash") + errorResponse(c, fmt.Errorf("invalid limit parameter")) + return + } + if limit <= 0 { + s.metrics.IncrementFailedRequestNum("FetchBlobsFromBatchHeaderHash") + errorResponse(c, fmt.Errorf("limit must be greater than 0")) + return } metadatas, err := s.getBlobs(c.Request.Context(), limit) @@ -848,75 +965,6 @@ func (s *server) FetchBatcherAvailability(c *gin.Context) { }) } -func (s *server) getBlobMetadataByBatchesWithLimit(ctx context.Context, limit int) ([]*Batch, []*disperser.BlobMetadata, error) { - var ( - blobMetadatas = make([]*disperser.BlobMetadata, 0) - batches = make([]*Batch, 0) - blobKeyPresence = make(map[string]struct{}) - batchPresence = make(map[string]struct{}) - ) - - for skip := 0; len(blobMetadatas) < limit && skip < limit; skip += maxQueryBatchesLimit { - batchesWithLimit, err := s.subgraphClient.QueryBatchesWithLimit(ctx, maxQueryBatchesLimit, skip) - if err != nil { - s.logger.Error("Failed to query batches", "error", err) - return nil, nil, err - } - - if len(batchesWithLimit) == 0 { - break - } - - for i := range batchesWithLimit { - s.logger.Debug("Getting blob metadata", "batchHeaderHash", batchesWithLimit[i].BatchHeaderHash) - var ( - batch = batchesWithLimit[i] - ) - if batch == nil { - continue - } - batchHeaderHash, err := ConvertHexadecimalToBytes(batch.BatchHeaderHash) - if err != nil { - s.logger.Error("Failed to convert batch header hash to hex string", "error", err) - continue - } - batchKey := string(batchHeaderHash[:]) - if _, found := batchPresence[batchKey]; !found { - batchPresence[batchKey] = struct{}{} - } else { - // The batch has processed, skip it. - s.logger.Error("Getting duplicate batch from the graph", "batch header hash", batchKey) - continue - } - - metadatas, err := s.blobstore.GetAllBlobMetadataByBatch(ctx, batchHeaderHash) - if err != nil { - s.logger.Error("Failed to get blob metadata", "error", err) - continue - } - for _, bm := range metadatas { - blobKey := bm.GetBlobKey().String() - if _, found := blobKeyPresence[blobKey]; !found { - blobKeyPresence[blobKey] = struct{}{} - blobMetadatas = append(blobMetadatas, bm) - } else { - s.logger.Error("Getting duplicate blob key from the blobstore", "blobkey", blobKey) - } - } - batches = append(batches, batch) - if len(blobMetadatas) >= limit { - break - } - } - } - - if len(blobMetadatas) >= limit { - blobMetadatas = blobMetadatas[:limit] - } - - return batches, blobMetadatas, nil -} - func errorResponse(c *gin.Context, err error) { _ = c.Error(err) var code int diff --git a/disperser/dataapi/server_test.go b/disperser/dataapi/server_test.go index 6e600bb3c..db39168f5 100644 --- a/disperser/dataapi/server_test.go +++ b/disperser/dataapi/server_test.go @@ -68,8 +68,6 @@ var ( }, }) testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, subgraphClient, mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil, nil) - expectedBatchHeaderHash = [32]byte{1, 2, 3} - expectedBlobIndex = uint32(1) expectedRequestedAt = uint64(5567830000000000000) expectedDataLength = 32 expectedBatchId = uint32(99) @@ -153,7 +151,9 @@ func TestFetchBlobHandler(t *testing.T) { blob := makeTestBlob(0, 80) key := queueBlob(t, &blob, blobstore) - markBlobConfirmed(t, &blob, key, expectedBatchHeaderHash, blobstore) + expectedBatchHeaderHash := [32]byte{1, 2, 3} + expectedBlobIndex := uint32(1) + markBlobConfirmed(t, &blob, key, expectedBlobIndex, expectedBatchHeaderHash, blobstore) blobKey := key.String() r.GET("/v1/feed/blobs/:blob_key", testDataApiServer.FetchBlobHandler) @@ -202,7 +202,7 @@ func TestFetchBlobsHandler(t *testing.T) { batchHeaderHashBytes := []byte(batch.BatchHeaderHash) batchHeaderHash, err := dataapi.ConvertHexadecimalToBytes(batchHeaderHashBytes) assert.NoError(t, err) - markBlobConfirmed(t, &blob, key, batchHeaderHash, blobstore) + markBlobConfirmed(t, &blob, key, 1, batchHeaderHash, blobstore) } mockSubgraphApi.On("QueryBatches").Return(subgraphBatches, nil) @@ -229,6 +229,118 @@ func TestFetchBlobsHandler(t *testing.T) { assert.Equal(t, 2, len(response.Data)) } +func TestFetchBlobsFromBatchHeaderHash(t *testing.T) { + r := setUpRouter() + + batchHeaderHash := "6E2EFA6EB7AE40CE7A65B465679DE5649F994296D18C075CF2C490564BBF7CA5" + batchHeaderHashBytes, err := dataapi.ConvertHexadecimalToBytes([]byte(batchHeaderHash)) + assert.NoError(t, err) + + blob1 := makeTestBlob(0, 80) + key1 := queueBlob(t, &blob1, blobstore) + + blob2 := makeTestBlob(0, 80) + key2 := queueBlob(t, &blob2, blobstore) + + markBlobConfirmed(t, &blob1, key1, 1, batchHeaderHashBytes, blobstore) + markBlobConfirmed(t, &blob2, key2, 2, batchHeaderHashBytes, blobstore) + + r.GET("/v1/feed/batches/:batch_header_hash/blobs", testDataApiServer.FetchBlobsFromBatchHeaderHash) + + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/v1/feed/batches/"+batchHeaderHash+"/blobs?limit=1", nil) + r.ServeHTTP(w, req) + + res := w.Result() + defer res.Body.Close() + + data, err := io.ReadAll(res.Body) + assert.NoError(t, err) + + var response dataapi.BlobsResponse + err = json.Unmarshal(data, &response) + assert.NoError(t, err) + assert.NotNil(t, response) + + assert.Equal(t, http.StatusOK, res.StatusCode) + assert.Equal(t, 1, response.Meta.Size) + assert.Equal(t, hex.EncodeToString(batchHeaderHashBytes[:]), response.Data[0].BatchHeaderHash) + assert.Equal(t, uint32(1), uint32(response.Data[0].BlobIndex)) + + // With the next_token query parameter set, the response should contain the next token + w = httptest.NewRecorder() + req = httptest.NewRequest(http.MethodGet, "/v1/feed/batches/"+batchHeaderHash+"/blobs?limit=1&next_token="+response.Meta.NextToken, nil) + r.ServeHTTP(w, req) + + res = w.Result() + defer res.Body.Close() + + data, err = io.ReadAll(res.Body) + assert.NoError(t, err) + + err = json.Unmarshal(data, &response) + assert.NoError(t, err) + assert.NotNil(t, response) + + assert.Equal(t, http.StatusOK, res.StatusCode) + assert.Equal(t, 1, response.Meta.Size) + assert.Equal(t, hex.EncodeToString(batchHeaderHashBytes[:]), response.Data[0].BatchHeaderHash) + assert.Equal(t, uint32(2), uint32(response.Data[0].BlobIndex)) + + // With the next_token query parameter set to an invalid value, the response should contain an error + w = httptest.NewRecorder() + req = httptest.NewRequest(http.MethodGet, "/v1/feed/batches/"+batchHeaderHash+"/blobs?limit=1&next_token=invalid", nil) + r.ServeHTTP(w, req) + + res = w.Result() + defer res.Body.Close() + + data, err = io.ReadAll(res.Body) + assert.NoError(t, err) + + var errorResponse dataapi.ErrorResponse + err = json.Unmarshal(data, &errorResponse) + assert.NoError(t, err) + + assert.Equal(t, http.StatusInternalServerError, res.StatusCode) + assert.Equal(t, "invalid next_token", errorResponse.Error) + + // Fetch both blobs when no limit is set + w = httptest.NewRecorder() + req = httptest.NewRequest(http.MethodGet, "/v1/feed/batches/"+batchHeaderHash+"/blobs", nil) + r.ServeHTTP(w, req) + + res = w.Result() + defer res.Body.Close() + + data, err = io.ReadAll(res.Body) + assert.NoError(t, err) + + err = json.Unmarshal(data, &response) + assert.NoError(t, err) + assert.NotNil(t, response) + + assert.Equal(t, http.StatusOK, res.StatusCode) + assert.Equal(t, 2, response.Meta.Size) + + // When the batch header hash is invalid, the response should contain an error + w = httptest.NewRecorder() + req = httptest.NewRequest(http.MethodGet, "/v1/feed/batches/invalid/blobs", nil) + r.ServeHTTP(w, req) + + res = w.Result() + defer res.Body.Close() + + data, err = io.ReadAll(res.Body) + assert.NoError(t, err) + + err = json.Unmarshal(data, &errorResponse) + assert.NoError(t, err) + + assert.Equal(t, http.StatusInternalServerError, res.StatusCode) + assert.Equal(t, "invalid batch header hash", errorResponse.Error) +} + func TestFetchMetricsHandler(t *testing.T) { defer goleak.VerifyNone(t) @@ -244,7 +356,7 @@ func TestFetchMetricsHandler(t *testing.T) { batchHeaderHash, err := dataapi.ConvertHexadecimalToBytes(batchHeaderHashBytes) assert.NoError(t, err) - markBlobConfirmed(t, &blob, key, batchHeaderHash, blobstore) + markBlobConfirmed(t, &blob, key, 1, batchHeaderHash, blobstore) } s := new(model.SampleStream) @@ -1423,7 +1535,7 @@ func queueBlob(t *testing.T, blob *core.Blob, queue disperser.BlobStore) dispers return key } -func markBlobConfirmed(t *testing.T, blob *core.Blob, key disperser.BlobKey, batchHeaderHash [32]byte, queue disperser.BlobStore) { +func markBlobConfirmed(t *testing.T, blob *core.Blob, key disperser.BlobKey, blobIndex uint32, batchHeaderHash [32]byte, queue disperser.BlobStore) { // simulate blob confirmation var commitX, commitY fp.Element _, err := commitX.SetString("21661178944771197726808973281966770251114553549453983978976194544185382599016") @@ -1437,7 +1549,7 @@ func markBlobConfirmed(t *testing.T, blob *core.Blob, key disperser.BlobKey, bat confirmationInfo := &disperser.ConfirmationInfo{ BatchHeaderHash: batchHeaderHash, - BlobIndex: expectedBlobIndex, + BlobIndex: blobIndex, SignatoryRecordHash: expectedSignatoryRecordHash, ReferenceBlockNumber: expectedReferenceBlockNumber, BatchRoot: expectedBatchRoot, diff --git a/disperser/dataapi/utils.go b/disperser/dataapi/utils.go index 148504cf5..c2548b96d 100644 --- a/disperser/dataapi/utils.go +++ b/disperser/dataapi/utils.go @@ -23,7 +23,7 @@ func ConvertHexadecimalToBytes(byteHash []byte) ([32]byte, error) { // We expect the resulting byte slice to have a length of 32 bytes. if len(decodedBytes) != 32 { - return [32]byte{}, errors.New("error decoding hash") + return [32]byte{}, errors.New("error decoding hash, invalid length") } // Convert the byte slice to a [32]byte array diff --git a/disperser/disperser.go b/disperser/disperser.go index 8603b4abf..cae2980df 100644 --- a/disperser/disperser.go +++ b/disperser/disperser.go @@ -135,6 +135,13 @@ type BlobStoreExclusiveStartKey struct { RequestedAt int64 // RequestedAt is epoch time in seconds } +type BatchIndexExclusiveStartKey struct { + BlobHash BlobHash + MetadataHash MetadataHash + BatchHeaderHash []byte + BlobIndex uint32 +} + type BlobStore interface { // StoreBlob adds a blob to the queue and returns a key that can be used to retrieve the blob later StoreBlob(ctx context.Context, blob *core.Blob, requestedAt uint64) (BlobKey, error) @@ -169,6 +176,8 @@ type BlobStore interface { GetBlobMetadataByStatusWithPagination(ctx context.Context, blobStatus BlobStatus, limit int32, exclusiveStartKey *BlobStoreExclusiveStartKey) ([]*BlobMetadata, *BlobStoreExclusiveStartKey, error) // GetAllBlobMetadataByBatch returns the metadata of all the blobs in the batch. GetAllBlobMetadataByBatch(ctx context.Context, batchHeaderHash [32]byte) ([]*BlobMetadata, error) + // GetAllBlobMetadataByBatchWithPagination returns all the blobs in the batch using pagination + GetAllBlobMetadataByBatchWithPagination(ctx context.Context, batchHeaderHash [32]byte, limit int32, exclusiveStartKey *BatchIndexExclusiveStartKey) ([]*BlobMetadata, *BatchIndexExclusiveStartKey, error) // GetBlobMetadata returns a blob metadata given a metadata key GetBlobMetadata(ctx context.Context, blobKey BlobKey) (*BlobMetadata, error) // HandleBlobFailure handles a blob failure by either incrementing the retry count or marking the blob as failed