Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disperser meterer for payments #779

Draft
wants to merge 25 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
435e46d
feat: reservation with mocked interfaces
hopeyen Sep 26, 2024
c5858cf
feat: on-demand with mocked interfaces
hopeyen Sep 30, 2024
57c83d7
refactor: creatTable's in util
hopeyen Sep 30, 2024
8eaa2a8
refactor: clean up dead code
hopeyen Sep 30, 2024
41fd0c8
feat: add meterer to disperser api server
hopeyen Sep 30, 2024
b14bf86
feat: chargeable size calc and some config flags
hopeyen Oct 1, 2024
8548415
refactor: remove some explicit mocked payment state
hopeyen Oct 1, 2024
1b65acf
feat: reservation bin limit config
hopeyen Oct 2, 2024
671df9b
refactor: rm global index validation check for on-demand
hopeyen Oct 2, 2024
533e6d3
fix: check cumulative payments with on-chain state upperbd
hopeyen Oct 2, 2024
13f8fde
refactor: sorted key, query fn with ordering, payment num type
hopeyen Oct 2, 2024
4afd390
refactor: rm dead code, add quoromNumber in ActiveReservations
hopeyen Oct 2, 2024
1af7037
feat: reservation quorom check and tests
hopeyen Oct 2, 2024
d93c91c
feat: ondemand quorom check and tests
hopeyen Oct 2, 2024
e8386ab
refactor: more memory efficient integer types
hopeyen Oct 2, 2024
e1d454b
refactor: rm dup fields BlobSize -> DataLength
hopeyen Oct 2, 2024
18ddb8a
feat: eigenda client local accountant
hopeyen Oct 3, 2024
24977c1
refactor: rm unnecessary fields: nonce, version
hopeyen Oct 3, 2024
ba7bb66
refactor: accountant sign over blob header fields
hopeyen Oct 3, 2024
dd2ab52
fix: no accounting in normal dispersal blob
hopeyen Oct 4, 2024
500ecc4
chore: inabox payments e2e test
hopeyen Oct 4, 2024
75acda3
feat: grpc paid version impl
hopeyen Oct 4, 2024
243a2f9
chore: add dummy state for testing (later add dummy contract)
hopeyen Oct 4, 2024
219acf5
chore: passing payment e2e inabox test with some hardcoded values
hopeyen Oct 5, 2024
9239918
fix: implement PaidDisperseBlob for mockDisperserClient
hopeyen Oct 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions common/aws/dynamodb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"math"
"strconv"
"sync"

commonaws "github.com/Layr-Labs/eigenda/common/aws"
Expand Down Expand Up @@ -156,6 +157,51 @@ func (c *Client) UpdateItem(ctx context.Context, tableName string, key Key, item
return resp.Attributes, err
}

func (c *Client) UpdateItemIncrement(ctx context.Context, tableName string, key Key, item Item) (Item, error) {
update := expression.UpdateBuilder{}
for itemKey, itemValue := range item {
if _, ok := key[itemKey]; ok {
// Cannot update the key
continue
}
Comment on lines +163 to +166
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize this is using the template above, but it's unclear to me why this is failing silently.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apparently dynamoDB doesn't let user update the primary key or sort key of an existing item, so if the current itemKey is part of the item's key, we simply move on to the next attribute in the item map. I think this is failing silently because we want to operate on other fields anyway.

some 3 alternative ways that still ensure consistency:

  1. a loop to check the keys (and return err) before the updating loop,
  2. do the same as now, but revert the changes later on if we cannot update the key
  3. do the same as now, and return a list of keys that couldn't get updated

// fmt.Println("updating item", itemKey, itemValue)
// ADD numeric values
if n, ok := itemValue.(*types.AttributeValueMemberN); ok {
// update = update.Add(expression.Name(itemKey), expression.Value(n.Value))
// update = update.Add(expression.Name(itemKey), expression.Value(n.Value))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove?

f, _ := strconv.ParseFloat(n.Value, 64)
update = update.Add(expression.Name(itemKey), expression.Value(aws.Float64(f)))

} else {
// For non-numeric values, use SET as before
update = update.Set(expression.Name(itemKey), expression.Value(itemValue))
}
}

expr, err := expression.NewBuilder().WithUpdate(update).Build()
if err != nil {
return nil, err
}

fmt.Println("update item increment", expr.Update())
resp, err := c.dynamoClient.UpdateItem(ctx, &dynamodb.UpdateItemInput{
TableName: aws.String(tableName),
Key: key,
ExpressionAttributeNames: expr.Names(),
ExpressionAttributeValues: expr.Values(),
UpdateExpression: expr.Update(),
ReturnValues: types.ReturnValueUpdatedNew,
})
if err != nil {
fmt.Println("error updating item", err)
return nil, err
}

fmt.Println("update item increment", resp.Attributes)

return resp.Attributes, nil
}

func (c *Client) GetItem(ctx context.Context, tableName string, key Key) (Item, error) {
resp, err := c.dynamoClient.GetItem(ctx, &dynamodb.GetItemInput{Key: key, TableName: aws.String(tableName)})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion contracts/bindings/AVSDirectory/binding.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion contracts/bindings/BLSApkRegistry/binding.go

Large diffs are not rendered by default.

213 changes: 5 additions & 208 deletions contracts/bindings/DelegationManager/binding.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion contracts/bindings/EigenDAServiceManager/binding.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion contracts/bindings/EjectionManager/binding.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion contracts/bindings/IndexRegistry/binding.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion contracts/bindings/MockRollup/binding.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion contracts/bindings/OperatorStateRetriever/binding.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion contracts/bindings/RegistryCoordinator/binding.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion contracts/bindings/StakeRegistry/binding.go

Large diffs are not rendered by default.

96 changes: 96 additions & 0 deletions core/eth/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -979,3 +979,99 @@ func bitmapToBytesArray(bitmap *big.Int) []byte {
}
return bytesArray
}

// // GetActiveReservations retrieves all active reservations from the payment contract
// func (t *Transactor) GetActiveReservations(ctx context.Context, blockNumber uint) (map[string]*core.ActiveReservation, error) {
// opts := &bind.CallOpts{
// Context: ctx,
// BlockNumber: big.NewInt(int64(blockNumber)),
// }

// // Assuming the contract has a method to get all active reservation accounts
// accounts, err := t.Bindings.PaymentContract.GetActiveReservationAccounts(opts)
// if err != nil {
// return nil, err
// }

// reservations := make(map[string]*core.ActiveReservation)
// for _, account := range accounts {
// reservation, err := t.Bindings.PaymentContract.GetReservation(opts, account)
// if err != nil {
// return nil, err
// }
// reservations[account.String()] = &core.ActiveReservation{
// DataRate: reservation.DataRate,
// StartEpoch: reservation.StartEpoch,
// EndEpoch: reservation.EndEpoch,
// QuorumSplit: reservation.QuorumSplit,
// }
// }

// return reservations, nil
// }

// // GetActiveReservationByAccount retrieves the active reservation for a specific account
// func (t *Transactor) GetActiveReservationByAccount(ctx context.Context, blockNumber uint, accountID string) (*core.ActiveReservation, error) {
// opts := &bind.CallOpts{
// Context: ctx,
// BlockNumber: big.NewInt(int64(blockNumber)),
// }

// account := common.HexToAddress(accountID)
// reservation, err := t.Bindings.PaymentContract.GetReservation(opts, account)
// if err != nil {
// return nil, err
// }

// return &core.ActiveReservation{
// DataRate: reservation.DataRate,
// StartEpoch: reservation.StartEpoch,
// EndEpoch: reservation.EndEpoch,
// QuorumSplit: reservation.QuorumSplit,
// }, nil
// }

// // GetOnDemandPayments retrieves all on-demand payments from the payment contract
// func (t *Transactor) GetOnDemandPayments(ctx context.Context, blockNumber uint) (map[string]*core.OnDemandPayment, error) {
// opts := &bind.CallOpts{
// Context: ctx,
// BlockNumber: big.NewInt(int64(blockNumber)),
// }

// // Assuming the contract has a method to get all on-demand payment accounts
// accounts, err := t.Bindings.PaymentContract.GetOnDemandPaymentAccounts(opts)
// if err != nil {
// return nil, err
// }

// payments := make(map[string]*core.OnDemandPayment)
// for _, account := range accounts {
// payment, err := t.Bindings.PaymentContract.GetOnDemandPayment(opts, account)
// if err != nil {
// return nil, err
// }
// payments[account.String()] = &core.OnDemandPayment{
// CumulativePayment: core.TokenAmount(payment.CumulativePayment.Uint64()),
// }
// }

// return payments, nil
// }

// // GetOnDemandPaymentByAccount retrieves the on-demand payment for a specific account
// func (t *Transactor) GetOnDemandPaymentByAccount(ctx context.Context, blockNumber uint, accountID string) (*core.OnDemandPayment, error) {
// opts := &bind.CallOpts{
// Context: ctx,
// BlockNumber: big.NewInt(int64(blockNumber)),
// }

// account := common.HexToAddress(accountID)
// payment, err := t.Bindings.PaymentContract.GetOnDemandPayment(opts, account)
// if err != nil {
// return nil, err
// }

// return &core.OnDemandPayment{
// CumulativePayment: core.TokenAmount(payment.CumulativePayment.Uint64()),
// }, nil
// }
70 changes: 70 additions & 0 deletions core/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package core

import (
"math/big"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/math"
"github.com/ethereum/go-ethereum/signer/core/apitypes"
)

// EIP712Domain represents the EIP-712 domain for our blob headers
var EIP712Domain = apitypes.TypedDataDomain{
Name: "EigenDA",
Version: "1",
ChainId: (*math.HexOrDecimal256)(big.NewInt(17000)),
VerifyingContract: common.HexToAddress("0x1234000000000000000000000000000000000000").Hex(),
}

// Protocol defines parameters: epoch length and rate-limit window interval
type ActiveReservation struct {
dataRate uint32 // bandwith being reserved

Check failure on line 21 in core/types.go

View workflow job for this annotation

GitHub Actions / Linter

field `dataRate` is unused (unused)
startEpoch uint32 // index of epoch where reservation begins
endEpoch uint32 // index of epoch where reservation ends
quorumSplit []byte // each byte is a percentage at the corresponding quorum index
}

// Protocol defines parameters: FixedFeePerByte; fine to leave global rate-limit offchain atm
type OnDemandPayment struct {
amountDeposited big.Int
// amountCollected big.Int
}

// // Create the typed data for EIP-712 signature verification
// typedData := apitypes.TypedData{
// Types: apitypes.Types{
// "EIP712Domain": []apitypes.Type{
// {Name: "name", Type: "string"},
// {Name: "version", Type: "string"},
// {Name: "chainId", Type: "uint256"},
// {Name: "verifyingContract", Type: "address"},
// },
// "BlobHeader": []apitypes.Type{
// {Name: "version", Type: "uint32"},
// {Name: "accountID", Type: "string"},
// {Name: "nonce", Type: "uint32"},
// {Name: "binIndex", Type: "uint32"},
// {Name: "cumulativePayment", Type: "uint64"},
// {Name: "commitment", Type: "bytes"},
// {Name: "dataLength", Type: "uint32"},
// {Name: "blobQuorumParams", Type: "BlobQuorumParam[]"},
// },
// "BlobQuorumParam": []apitypes.Type{
// {Name: "quorumID", Type: "uint8"},
// {Name: "adversaryThreshold", Type: "uint32"},
// {Name: "quorumThreshold", Type: "uint32"},
// },
// },
// Domain: EIP712Domain,
// PrimaryType: "BlobHeader",
// Message: apitypes.TypedDataMessage{
// "version": header.Version,
// "accountID": header.AccountID,
// "nonce": header.Nonce,
// "binIndex": header.BinIndex,
// "cumulativePayment": header.CumulativePayment,
// "commitment": header.Commitment.Bytes(),
// "dataLength": header.DataLength,
// "blobQuorumParams": header.BlobQuorumParams,
// },
// }
17 changes: 15 additions & 2 deletions disperser/apiserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ import (
"context"
"errors"
"fmt"
"google.golang.org/grpc/status"
"math/rand"
"net"
"slices"
"strings"
"sync"
"time"

"google.golang.org/grpc/status"

"github.com/Layr-Labs/eigenda/api"
commonpb "github.com/Layr-Labs/eigenda/api/grpc/common"
pb "github.com/Layr-Labs/eigenda/api/grpc/disperser"
Expand All @@ -20,6 +21,7 @@ import (
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/core/auth"
"github.com/Layr-Labs/eigenda/disperser"
"github.com/Layr-Labs/eigenda/disperser/meterer"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigenda/encoding/rs"
"github.com/Layr-Labs/eigensdk-go/logging"
Expand All @@ -44,6 +46,7 @@ type DispersalServer struct {
tx core.Transactor
quorumConfig QuorumConfig

meterer *meterer.Meterer
ratelimiter common.RateLimiter
authenticator core.BlobRequestAuthenticator

Expand All @@ -69,6 +72,7 @@ func NewDispersalServer(
tx core.Transactor,
_logger logging.Logger,
metrics *disperser.Metrics,
meterer *meterer.Meterer,
ratelimiter common.RateLimiter,
rateConfig RateConfig,
maxBlobSize int,
Expand All @@ -90,6 +94,7 @@ func NewDispersalServer(
tx: tx,
metrics: metrics,
logger: logger,
meterer: meterer,
ratelimiter: ratelimiter,
authenticator: authenticator,
mu: &sync.RWMutex{},
Expand Down Expand Up @@ -271,7 +276,15 @@ func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob, aut

s.logger.Debug("received a new blob dispersal request", "authenticatedAddress", authenticatedAddress, "origin", origin, "blobSizeBytes", blobSize, "securityParams", strings.Join(securityParamsStrings, ", "))

if s.ratelimiter != nil {
// payments before ratelimits
if s.meterer != nil {
//TODO: blob request header needs to be updated for payments;
// the tests rely on a temporarily defined struct from disperser/meterer/types
// err := s.meterer.MeterRequest(ctx, blob.RequestHeader)
// if err != nil {
// return nil, err
// }
} else if s.ratelimiter != nil {
err := s.checkRateLimitsAndAddRatesToHeader(ctx, blob, origin, authenticatedAddress, apiMethodName)
if err != nil {
// Note checkRateLimitsAndAddRatesToHeader already updated the metrics for this error.
Expand Down
36 changes: 35 additions & 1 deletion disperser/apiserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import (
"testing"
"time"

commonaws "github.com/Layr-Labs/eigenda/common/aws"
"github.com/Layr-Labs/eigenda/core/auth"
"github.com/Layr-Labs/eigenda/core/mock"
"github.com/Layr-Labs/eigenda/disperser/apiserver"
"github.com/Layr-Labs/eigenda/disperser/common/blobstore"
"github.com/Layr-Labs/eigenda/disperser/meterer"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigenda/encoding/utils/codec"
"github.com/Layr-Labs/eigensdk-go/logging"
Expand Down Expand Up @@ -645,6 +647,38 @@ func newTestServer(transactor core.Transactor) *apiserver.DispersalServer {
if err != nil {
panic("failed to create bucket store")
}
meterConfig := meterer.Config{
PricePerByte: 1,
GlobalBytesPerSecond: 1000,
ReservationWindow: time.Minute,
}

paymentChainState := meterer.NewMockedOnchainPaymentState()

paymentChainState.InitializeOnchainPaymentState()

clientConfig := commonaws.ClientConfig{
Region: "us-east-1",
AccessKey: "localstack",
SecretAccessKey: "localstack",
EndpointURL: fmt.Sprintf("http://0.0.0.0:4566"),
}

store, err := meterer.NewOffchainStore(
clientConfig,
"reservations",
"ondemand",
"global",
logger,
)
if err != nil {
teardown()
panic("failed to create offchain store")
}
meterer, err := meterer.NewMeterer(meterConfig, meterer.TimeoutConfig{}, paymentChainState, store, logger)
if err != nil {
panic("failed to create meterer")
}
ratelimiter := ratelimit.NewRateLimiter(prometheus.NewRegistry(), globalParams, bucketStore, logger)

rateConfig := apiserver.RateConfig{
Expand Down Expand Up @@ -701,7 +735,7 @@ func newTestServer(transactor core.Transactor) *apiserver.DispersalServer {
return apiserver.NewDispersalServer(disperser.ServerConfig{
GrpcPort: "51001",
GrpcTimeout: 1 * time.Second,
}, queue, transactor, logger, disperser.NewMetrics(prometheus.NewRegistry(), "9001", logger), ratelimiter, rateConfig, testMaxBlobSize)
}, queue, transactor, logger, disperser.NewMetrics(prometheus.NewRegistry(), "9001", logger), meterer, ratelimiter, rateConfig, testMaxBlobSize)
}

func disperseBlob(t *testing.T, server *apiserver.DispersalServer, data []byte) (pb.BlobStatus, uint, []byte) {
Expand Down
Loading
Loading