Skip to content

Commit

Permalink
[blobstore] Filter out expired blobs from query (#692)
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim authored Oct 2, 2024
1 parent a3eab61 commit 35e0b6b
Show file tree
Hide file tree
Showing 9 changed files with 204 additions and 89 deletions.
19 changes: 15 additions & 4 deletions disperser/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,24 @@ func (b *Batcher) RecoverState(ctx context.Context) error {
if err != nil {
return fmt.Errorf("failed to get blobs in dispersing state: %w", err)
}
expired := 0
processing := 0
for _, meta := range metas {
err = b.Queue.MarkBlobProcessing(ctx, meta.GetBlobKey())
if err != nil {
return fmt.Errorf("failed to mark blob (%s) as processing: %w", meta.GetBlobKey(), err)
if meta.Expiry == 0 || meta.Expiry < uint64(time.Now().Unix()) {
err = b.Queue.MarkBlobFailed(ctx, meta.GetBlobKey())
if err != nil {
return fmt.Errorf("failed to mark blob (%s) as failed: %w", meta.GetBlobKey(), err)
}
expired += 1
} else {
err = b.Queue.MarkBlobProcessing(ctx, meta.GetBlobKey())
if err != nil {
return fmt.Errorf("failed to mark blob (%s) as processing: %w", meta.GetBlobKey(), err)
}
processing += 1
}
}
b.logger.Info("Recovering state took", "duration", time.Since(start), "numBlobs", len(metas))
b.logger.Info("Recovering state took", "duration", time.Since(start), "numBlobs", len(metas), "expired", expired, "processing", processing)
return nil
}

Expand Down
55 changes: 42 additions & 13 deletions disperser/batcher/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ var (
type batcherComponents struct {
transactor *coremock.MockTransactor
txnManager *batchermock.MockTxnManager
blobStore disperser.BlobStore
blobStore *inmem.BlobStore
encoderClient *disperser.LocalEncoderClient
encodingStreamer *bat.EncodingStreamer
ethClient *cmock.MockEthClient
Expand Down Expand Up @@ -101,7 +101,10 @@ func makeBatcher(t *testing.T) (*batcherComponents, *bat.Batcher, func() []time.

// Disperser Components
dispatcher := dmock.NewDispatcher(state)
blobStore := inmem.NewBlobStore()
blobStore := &inmem.BlobStore{
Blobs: make(map[disperser.BlobHash]*inmem.BlobHolder),
Metadata: make(map[disperser.BlobKey]*disperser.BlobMetadata),
}

pullInterval := 100 * time.Millisecond
config := bat.Config{
Expand Down Expand Up @@ -779,31 +782,57 @@ func TestBatcherRecoverState(t *testing.T) {
},
})

blob2 := makeTestBlob([]*core.SecurityParam{
{
QuorumID: 0,
AdversaryThreshold: 80,
ConfirmationThreshold: 100,
},
{
QuorumID: 2,
AdversaryThreshold: 80,
ConfirmationThreshold: 100,
},
})

components, batcher, _ := makeBatcher(t)

blobStore := components.blobStore
ctx := context.Background()
_, key1 := queueBlob(t, ctx, &blob0, blobStore)
_, _ = queueBlob(t, ctx, &blob1, blobStore)
_, key0 := queueBlob(t, ctx, &blob0, blobStore)
_, key1 := queueBlob(t, ctx, &blob1, blobStore)
_, key2 := queueBlob(t, ctx, &blob2, blobStore)
components.blobStore.Metadata[key2].Expiry = uint64(time.Now().Add(time.Hour * (-24)).Unix())

err := blobStore.MarkBlobDispersing(ctx, key0)
assert.NoError(t, err)
err = blobStore.MarkBlobDispersing(ctx, key2)
assert.NoError(t, err)

err := blobStore.MarkBlobDispersing(ctx, key1)
b0, err := blobStore.GetBlobMetadata(ctx, key0)
assert.NoError(t, err)
processingBlobs, err := blobStore.GetBlobMetadataByStatus(ctx, disperser.Processing)
assert.Equal(t, b0.BlobStatus, disperser.Dispersing)

b1, err := blobStore.GetBlobMetadata(ctx, key1)
assert.NoError(t, err)
assert.Len(t, processingBlobs, 1)
assert.Equal(t, b1.BlobStatus, disperser.Processing)

dispersingBlobs, err := blobStore.GetBlobMetadataByStatus(ctx, disperser.Dispersing)
b2, err := blobStore.GetBlobMetadata(ctx, key2)
assert.NoError(t, err)
assert.Len(t, dispersingBlobs, 1)
assert.Equal(t, b2.BlobStatus, disperser.Dispersing)

err = batcher.RecoverState(context.Background())
assert.NoError(t, err)

processingBlobs, err = blobStore.GetBlobMetadataByStatus(ctx, disperser.Processing)
b0, err = blobStore.GetBlobMetadata(ctx, key0)
assert.NoError(t, err)
assert.Equal(t, b0.BlobStatus, disperser.Processing)

b1, err = blobStore.GetBlobMetadata(ctx, key1)
assert.NoError(t, err)
assert.Len(t, processingBlobs, 2)
assert.Equal(t, b1.BlobStatus, disperser.Processing)

dispersingBlobs, err = blobStore.GetBlobMetadataByStatus(ctx, disperser.Dispersing)
b2, err = blobStore.GetBlobMetadata(ctx, key2)
assert.NoError(t, err)
assert.Len(t, dispersingBlobs, 0)
assert.Equal(t, b2.BlobStatus, disperser.Failed)
}
11 changes: 7 additions & 4 deletions disperser/batcher/finalizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func TestFinalizedBlob(t *testing.T) {
blobIndex := uint32(10)
sigRecordHash := [32]byte{0}
inclusionProof := []byte{1, 2, 3, 4, 5}
expiry := uint64(time.Now().Add(time.Hour).Unix())
confirmationInfo := &disperser.ConfirmationInfo{
BatchHeaderHash: batchHeaderHash,
BlobIndex: blobIndex,
Expand All @@ -73,7 +74,7 @@ func TestFinalizedBlob(t *testing.T) {
BlobHash: metadataKey1.BlobHash,
MetadataHash: metadataKey1.MetadataHash,
BlobStatus: disperser.Processing,
Expiry: 0,
Expiry: expiry,
NumRetries: 0,
RequestMetadata: &disperser.RequestMetadata{
BlobRequestHeader: core.BlobRequestHeader{
Expand All @@ -86,7 +87,7 @@ func TestFinalizedBlob(t *testing.T) {
BlobHash: metadataKey2.BlobHash,
MetadataHash: metadataKey2.MetadataHash,
BlobStatus: disperser.Processing,
Expiry: 0,
Expiry: expiry + 1,
NumRetries: 0,
RequestMetadata: &disperser.RequestMetadata{
BlobRequestHeader: core.BlobRequestHeader{
Expand Down Expand Up @@ -164,11 +165,12 @@ func TestUnfinalizedBlob(t *testing.T) {
ConfirmationBlockNumber: uint32(150),
Fee: []byte{0},
}
expiry := uint64(time.Now().Add(100000).Unix())
metadata := &disperser.BlobMetadata{
BlobHash: metadataKey.BlobHash,
MetadataHash: metadataKey.MetadataHash,
BlobStatus: disperser.Processing,
Expiry: 0,
Expiry: expiry,
NumRetries: 0,
RequestMetadata: &disperser.RequestMetadata{
BlobRequestHeader: core.BlobRequestHeader{
Expand Down Expand Up @@ -234,11 +236,12 @@ func TestNoReceipt(t *testing.T) {
ConfirmationBlockNumber: uint32(150),
Fee: []byte{0},
}
expiry := uint64(time.Now().Add(100000).Unix())
metadata := &disperser.BlobMetadata{
BlobHash: metadataKey.BlobHash,
MetadataHash: metadataKey.MetadataHash,
BlobStatus: disperser.Processing,
Expiry: 0,
Expiry: expiry,
NumRetries: 0,
RequestMetadata: &disperser.RequestMetadata{
BlobRequestHeader: core.BlobRequestHeader{
Expand Down
50 changes: 43 additions & 7 deletions disperser/common/blobstore/blob_metadata_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
const (
statusIndexName = "StatusIndex"
batchIndexName = "BatchIndex"
expiryIndexName = "Status-Expiry-Index"
)

// BlobMetadataStore is a blob metadata storage backed by DynamoDB
Expand Down Expand Up @@ -121,9 +122,12 @@ func (s *BlobMetadataStore) GetBulkBlobMetadata(ctx context.Context, blobKeys []
// Because this function scans the entire index, it should only be used for status with a limited number of items.
// It should only be used to filter "Processing" status. To support other status, a streaming version should be implemented.
func (s *BlobMetadataStore) GetBlobMetadataByStatus(ctx context.Context, status disperser.BlobStatus) ([]*disperser.BlobMetadata, error) {
items, err := s.dynamoDBClient.QueryIndex(ctx, s.tableName, statusIndexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{
items, err := s.dynamoDBClient.QueryIndex(ctx, s.tableName, expiryIndexName, "BlobStatus = :status AND Expiry > :expiry", commondynamodb.ExpresseionValues{
":status": &types.AttributeValueMemberN{
Value: strconv.Itoa(int(status)),
},
":expiry": &types.AttributeValueMemberN{
Value: strconv.FormatInt(time.Now().Unix(), 10),
}})
if err != nil {
return nil, err
Expand All @@ -140,14 +144,18 @@ func (s *BlobMetadataStore) GetBlobMetadataByStatus(ctx context.Context, status
return metadata, nil
}

// GetBlobMetadataByStatusCount returns the count of all the metadata with the given status
// GetBlobMetadataCountByStatus returns the count of all the metadata with the given status
// Because this function scans the entire index, it should only be used for status with a limited number of items.
// It should only be used to filter "Processing" status. To support other status, a streaming version should be implemented.
func (s *BlobMetadataStore) GetBlobMetadataByStatusCount(ctx context.Context, status disperser.BlobStatus) (int32, error) {
count, err := s.dynamoDBClient.QueryIndexCount(ctx, s.tableName, statusIndexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{
func (s *BlobMetadataStore) GetBlobMetadataCountByStatus(ctx context.Context, status disperser.BlobStatus) (int32, error) {
count, err := s.dynamoDBClient.QueryIndexCount(ctx, s.tableName, expiryIndexName, "BlobStatus = :status AND Expiry > :expiry", commondynamodb.ExpresseionValues{
":status": &types.AttributeValueMemberN{
Value: strconv.Itoa(int(status)),
}})
},
":expiry": &types.AttributeValueMemberN{
Value: strconv.FormatInt(time.Now().Unix(), 10),
},
})
if err != nil {
return 0, err
}
Expand All @@ -173,10 +181,14 @@ func (s *BlobMetadataStore) GetBlobMetadataByStatusWithPagination(ctx context.Co
}
}

queryResult, err := s.dynamoDBClient.QueryIndexWithPagination(ctx, s.tableName, statusIndexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{
queryResult, err := s.dynamoDBClient.QueryIndexWithPagination(ctx, s.tableName, expiryIndexName, "BlobStatus = :status AND Expiry > :expiry", commondynamodb.ExpresseionValues{
":status": &types.AttributeValueMemberN{
Value: strconv.Itoa(int(status)),
}}, limit, attributeMap)
},
":expiry": &types.AttributeValueMemberN{
Value: strconv.FormatInt(time.Now().Unix(), 10),
},
}, limit, attributeMap)

if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -429,6 +441,10 @@ func GenerateTableSchema(metadataTableName string, readCapacityUnits int64, writ
AttributeName: aws.String("BlobIndex"),
AttributeType: types.ScalarAttributeTypeN,
},
{
AttributeName: aws.String("Expiry"),
AttributeType: types.ScalarAttributeTypeN,
},
},
KeySchema: []types.KeySchemaElement{
{
Expand Down Expand Up @@ -482,6 +498,26 @@ func GenerateTableSchema(metadataTableName string, readCapacityUnits int64, writ
WriteCapacityUnits: aws.Int64(writeCapacityUnits),
},
},
{
IndexName: aws.String(expiryIndexName),
KeySchema: []types.KeySchemaElement{
{
AttributeName: aws.String("BlobStatus"),
KeyType: types.KeyTypeHash,
},
{
AttributeName: aws.String("Expiry"),
KeyType: types.KeyTypeRange,
},
},
Projection: &types.Projection{
ProjectionType: types.ProjectionTypeAll,
},
ProvisionedThroughput: &types.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(readCapacityUnits),
WriteCapacityUnits: aws.Int64(writeCapacityUnits),
},
},
},
ProvisionedThroughput: &types.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(readCapacityUnits),
Expand Down
Loading

0 comments on commit 35e0b6b

Please sign in to comment.