Skip to content

Commit

Permalink
Add Query support for primary index
Browse files Browse the repository at this point in the history
  • Loading branch information
pschork committed Jul 24, 2024
1 parent f6980b9 commit cf40593
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 10 deletions.
14 changes: 14 additions & 0 deletions common/aws/dynamodb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,20 @@ func (c *Client) QueryIndex(ctx context.Context, tableName string, indexName str
return response.Items, nil
}

// Query returns all items in the primary index that match the given expression
func (c *Client) Query(ctx context.Context, tableName string, keyCondition string, expAttributeValues ExpresseionValues) ([]Item, error) {
response, err := c.dynamoClient.Query(ctx, &dynamodb.QueryInput{
TableName: aws.String(tableName),
KeyConditionExpression: aws.String(keyCondition),
ExpressionAttributeValues: expAttributeValues,
})
if err != nil {
return nil, err
}

return response.Items, nil
}

// QueryIndexCount returns the count of the items in the index that match the given key
func (c *Client) QueryIndexCount(ctx context.Context, tableName string, indexName string, keyCondition string, expAttributeValues ExpresseionValues) (int32, error) {
response, err := c.dynamoClient.Query(ctx, &dynamodb.QueryInput{
Expand Down
85 changes: 79 additions & 6 deletions disperser/batcher/batchstore/minibatch_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ type MinibatchStore struct {
ttl time.Duration
}

var _ batcher.MinibatchStore = (*MinibatchStore)(nil)

func NewMinibatchStore(dynamoDBClient *commondynamodb.Client, logger logging.Logger, tableName string, ttl time.Duration) *MinibatchStore {
logger.Debugf("creating minibatch store with table %s with TTL: %s", tableName, ttl)
return &MinibatchStore{
Expand Down Expand Up @@ -263,7 +265,7 @@ func MarshalDispersalRequest(request *batcher.DispersalRequest) (map[string]type
if err != nil {
return nil, err
}
fields["SK"] = &types.AttributeValueMemberS{Value: dispersalRequestSK + fmt.Sprintf("%d", request.MinibatchIndex)}
fields["SK"] = &types.AttributeValueMemberS{Value: dispersalRequestSK + fmt.Sprintf("%d#%s", request.MinibatchIndex, request.OperatorID.Hex())}
fields["RequestedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", request.RequestedAt.UTC().Unix())}
return fields, nil
}
Expand All @@ -273,7 +275,7 @@ func MarshalDispersalResponse(response *batcher.DispersalResponse) (map[string]t
if err != nil {
return nil, err
}
fields["SK"] = &types.AttributeValueMemberS{Value: dispersalResponseSK + fmt.Sprintf("%d", response.MinibatchIndex)}
fields["SK"] = &types.AttributeValueMemberS{Value: dispersalResponseSK + fmt.Sprintf("%d#%s", response.MinibatchIndex, response.OperatorID.Hex())}
fields["RespondedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", response.RespondedAt.UTC().Unix())}
fields["RequestedAt"] = &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", response.DispersalRequest.RequestedAt.UTC().Unix())}
return fields, nil
Expand Down Expand Up @@ -359,6 +361,10 @@ func (m *MinibatchStore) PutMinibatch(ctx context.Context, minibatch *batcher.Mi
return m.dynamoDBClient.PutItem(ctx, m.tableName, item)
}

func (m *MinibatchStore) PutMiniBatch(ctx context.Context, minibatch *batcher.MinibatchRecord) error {
return m.PutMinibatch(ctx, minibatch)
}

func (m *MinibatchStore) PutDispersalRequest(ctx context.Context, request *batcher.DispersalRequest) error {
item, err := MarshalDispersalRequest(request)
if err != nil {
Expand Down Expand Up @@ -404,6 +410,11 @@ func (m *MinibatchStore) GetBatch(ctx context.Context, batchID uuid.UUID) (*batc
return batch, nil
}

// GetPendingBatch implements batcher.MinibatchStore.
func (m *MinibatchStore) GetPendingBatch(ctx context.Context) (*batcher.BatchRecord, error) {
panic("unimplemented")
}

func (m *MinibatchStore) GetMinibatch(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) (*batcher.MinibatchRecord, error) {
item, err := m.dynamoDBClient.GetItem(ctx, m.tableName, map[string]types.AttributeValue{
"BatchID": &types.AttributeValueMemberS{
Expand All @@ -430,13 +441,17 @@ func (m *MinibatchStore) GetMinibatch(ctx context.Context, batchID uuid.UUID, mi
return minibatch, nil
}

func (m *MinibatchStore) GetDispersalRequest(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) (*batcher.DispersalRequest, error) {
func (m *MinibatchStore) GetMiniBatch(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) (*batcher.MinibatchRecord, error) {
return m.GetMinibatch(ctx, batchID, minibatchIndex)
}

func (m *MinibatchStore) GetDispersalRequest(ctx context.Context, batchID uuid.UUID, minibatchIndex uint, opID core.OperatorID) (*batcher.DispersalRequest, error) {
item, err := m.dynamoDBClient.GetItem(ctx, m.tableName, map[string]types.AttributeValue{
"BatchID": &types.AttributeValueMemberS{
Value: batchID.String(),
},
"SK": &types.AttributeValueMemberS{
Value: dispersalRequestSK + fmt.Sprintf("%d", minibatchIndex),
Value: dispersalRequestSK + fmt.Sprintf("%d#%s", minibatchIndex, opID.Hex()),
},
})
if err != nil {
Expand All @@ -456,13 +471,42 @@ func (m *MinibatchStore) GetDispersalRequest(ctx context.Context, batchID uuid.U
return request, nil
}

func (m *MinibatchStore) GetDispersalResponse(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) (*batcher.DispersalResponse, error) {
func (m *MinibatchStore) GetDispersalRequests(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) ([]*batcher.DispersalRequest, error) {
items, err := m.dynamoDBClient.Query(ctx, m.tableName, "BatchID = :batchID AND SK = :sk", commondynamodb.ExpresseionValues{
":batchID": &types.AttributeValueMemberS{
Value: batchID.String(),
},
":sk": &types.AttributeValueMemberS{
Value: dispersalRequestSK + fmt.Sprintf("%s#%d", batchID.String(), minibatchIndex),
},
})
if err != nil {
return nil, err
}

if len(items) == 0 {
return nil, fmt.Errorf("no dispersal requests found for BatchID %s MinibatchIndex %d", batchID, minibatchIndex)
}

requests := make([]*batcher.DispersalRequest, len(items))
for i, item := range items {
requests[i], err = UnmarshalDispersalRequest(item)
if err != nil {
m.logger.Errorf("failed to unmarshal dispersal requests at index %d: %v", i, err)
return nil, err
}
}

return requests, nil
}

func (m *MinibatchStore) GetDispersalResponse(ctx context.Context, batchID uuid.UUID, minibatchIndex uint, opID core.OperatorID) (*batcher.DispersalResponse, error) {
item, err := m.dynamoDBClient.GetItem(ctx, m.tableName, map[string]types.AttributeValue{
"BatchID": &types.AttributeValueMemberS{
Value: batchID.String(),
},
"SK": &types.AttributeValueMemberS{
Value: dispersalResponseSK + fmt.Sprintf("%d", minibatchIndex),
Value: dispersalResponseSK + fmt.Sprintf("%d#%s", minibatchIndex, opID.Hex()),
},
})
if err != nil {
Expand All @@ -481,3 +525,32 @@ func (m *MinibatchStore) GetDispersalResponse(ctx context.Context, batchID uuid.
}
return response, nil
}

func (m *MinibatchStore) GetDispersalResponses(ctx context.Context, batchID uuid.UUID, minibatchIndex uint) ([]*batcher.DispersalResponse, error) {
items, err := m.dynamoDBClient.Query(ctx, m.tableName, "BatchID = :batchID AND SK = :sk", commondynamodb.ExpresseionValues{
":batchID": &types.AttributeValueMemberS{
Value: batchID.String(),
},
":sk": &types.AttributeValueMemberS{
Value: dispersalResponseSK + fmt.Sprintf("%s#%d", batchID.String(), minibatchIndex),
},
})
if err != nil {
return nil, err
}

if len(items) == 0 {
return nil, fmt.Errorf("no dispersal responses found for BatchID %s MinibatchIndex %d", batchID, minibatchIndex)
}

responses := make([]*batcher.DispersalResponse, len(items))
for i, item := range items {
responses[i], err = UnmarshalDispersalResponse(item)
if err != nil {
m.logger.Errorf("failed to unmarshal dispersal response at index %d: %v", i, err)
return nil, err
}
}

return responses, nil
}
10 changes: 6 additions & 4 deletions disperser/batcher/batchstore/minibatch_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,17 +127,18 @@ func TestPutDispersalRequest(t *testing.T) {
id, err := uuid.NewV7()
assert.NoError(t, err)
ts := time.Now().Truncate(time.Second).UTC()
opID := core.OperatorID([32]byte{123})
request := &batcher.DispersalRequest{
BatchID: id,
MinibatchIndex: 0,
OperatorID: core.OperatorID([32]byte{123}),
OperatorID: opID,
OperatorAddress: gcommon.HexToAddress("0x0"),
NumBlobs: 1,
RequestedAt: ts,
}
err = minibatchStore.PutDispersalRequest(ctx, request)
assert.NoError(t, err)
r, err := minibatchStore.GetDispersalRequest(ctx, request.BatchID, request.MinibatchIndex)
r, err := minibatchStore.GetDispersalRequest(ctx, request.BatchID, request.MinibatchIndex, opID)
assert.NoError(t, err)
assert.Equal(t, request, r)
}
Expand All @@ -147,11 +148,12 @@ func TestPutDispersalResponse(t *testing.T) {
id, err := uuid.NewV7()
assert.NoError(t, err)
ts := time.Now().Truncate(time.Second).UTC()
opID := core.OperatorID([32]byte{123})
response := &batcher.DispersalResponse{
DispersalRequest: batcher.DispersalRequest{
BatchID: id,
MinibatchIndex: 0,
OperatorID: core.OperatorID([32]byte{1}),
OperatorID: opID,
OperatorAddress: gcommon.HexToAddress("0x0"),
NumBlobs: 1,
RequestedAt: ts,
Expand All @@ -162,7 +164,7 @@ func TestPutDispersalResponse(t *testing.T) {
}
err = minibatchStore.PutDispersalResponse(ctx, response)
assert.NoError(t, err)
r, err := minibatchStore.GetDispersalResponse(ctx, response.BatchID, response.MinibatchIndex)
r, err := minibatchStore.GetDispersalResponse(ctx, response.BatchID, response.MinibatchIndex, opID)
assert.NoError(t, err)
assert.Equal(t, response, r)
}

0 comments on commit cf40593

Please sign in to comment.