Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disperser meterer for payments #779

Draft
wants to merge 25 commits into
base: master
Choose a base branch
from
Draft

Conversation

hopeyen
Copy link

@hopeyen hopeyen commented Sep 30, 2024

Why are these changes needed?

PoC inside disperser for handling payments

summary of changes

  • Create a meterer package inside the disperser that can be used to replace ratelimiter that validate the requests before storing them
  • Add mocked interfaces for contracts
  • Add dynamo DB for managing offchain state
  • Using simple data types (first u64 before worrying about u128)
  • Unit tests
  • Add flag for payments and take precedence over ratelimit when disperser serve requests
    • ineffective as blob request header must be updated first

unit tests

  • reservations
    • test for invalid signatures
    • test for invalid bin index
    • test for non existent accounts
    • if blob size > bin, request is rejected
    • blob size <= bin, given total throughput is still under reservation bin limit, requests is okay (need to add handling and tests for less than chargeable size)
    • first overflow is okay, check that bin+2 gets charged
    • following overflows for the same bin is rejected
    • --> to add periodic bin cleanup/removal
  • on demand
    • test for invalid signatures
    • test for invalid bin index
    • test for non existent accounts
    • --> to add chargeable and ensure that the chargeable unit is recorded when blob is smaller
    • if blobSize > cumulative payments, request is rejected
    • if blob size > global rate limit, request is rejected
    • for everything in between, if globally okay, things should work
    • reject requests exceeding the cumulative payment limit

Checks

  • I've made sure the lint is passing in this PR.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, in that case, please comment that they are not relevant.
  • I've checked the new test coverage and the coverage percentage didn't drop.
  • Testing Strategy
    • Unit tests
    • Integration tests
    • This PR is not tested :(

@hopeyen hopeyen marked this pull request as draft September 30, 2024 22:59
disperser/meterer/meterer.go Outdated Show resolved Hide resolved
func (m *Meterer) ValidateBinIndex(blobHeader BlobHeader, reservation *ActiveReservation) bool {
currentBinIndex := GetCurrentBinIndex()
// Valid bin indexes are either the current bin or the previous bin
if (blobHeader.BinIndex != currentBinIndex && blobHeader.BinIndex != (currentBinIndex-1)) || (reservation.StartEpoch > blobHeader.BinIndex || blobHeader.BinIndex > reservation.EndEpoch) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense for Epochs to use the same units as Bins?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm open to change this for easier usage. What about using start and end timestamps instead of epochs?

// GetCurrentBinIndex returns the current bin index based on time
func GetCurrentBinIndex() uint64 {
currentTime := time.Now().Unix()
binInterval := 375 // Interval that allows 3 32MB blobs at minimal throughput
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be configuraable? What are the units?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes! changed this to use reservationWindow config and added an indication in the help string that this is in seconds

return fmt.Errorf("failed to get active reservation by account: %w", err)
}
if newUsage <= 2*binLimit && blobHeader.BinIndex+2 <= reservation.EndEpoch {
m.OffchainStore.UpdateReservationBin(ctx, blobHeader.AccountID, uint64(blobHeader.BinIndex+2), newUsage-binLimit)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: this is not a race condition because previous logic ensures this can only happen in a single thread.

Comment on lines 225 to 228
result, err := s.dynamoClient.QueryIndex(ctx, s.onDemandTableName, "AccountIDIndex", "AccountID = :account", commondynamodb.ExpresseionValues{
":account": &types.AttributeValueMemberS{
Value: accountID,
}})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should be able to configure a sort key associated with the CumulativePayment field and directly retrieve the records with cumulative payment above and below the provided payment.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated!

currentBinIndex := uint64(time.Now().Unix())

// Valid bin indexes are either the current bin or the previous bin (allow this second or prev sec)
if blobHeader.BinIndex != currentBinIndex && blobHeader.BinIndex != (currentBinIndex-1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For global rate limit, I think we can argue that we don't need to include the BinIndex within the header, but just use the receipt time directly.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, in this case we don't need to validate the bin index, maybe not even receipt time, but the time when global rate gets checked (since global rate limit check happens after payment validations, we avoid failing on cases where payment validation ever gets to more than 2 seconds)

Copy link
Contributor

@mooselumph mooselumph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some more comments!

Comment on lines +163 to +166
if _, ok := key[itemKey]; ok {
// Cannot update the key
continue
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize this is using the template above, but it's unclear to me why this is failing silently.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apparently dynamoDB doesn't let user update the primary key or sort key of an existing item, so if the current itemKey is part of the item's key, we simply move on to the next attribute in the item map. I think this is failing silently because we want to operate on other fields anyway.

some 3 alternative ways that still ensure consistency:

  1. a loop to check the keys (and return err) before the updating loop,
  2. do the same as now, but revert the changes later on if we cannot update the key
  3. do the same as now, and return a list of keys that couldn't get updated

Comment on lines 170 to 171
// update = update.Add(expression.Name(itemKey), expression.Value(n.Value))
// update = update.Add(expression.Name(itemKey), expression.Value(n.Value))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove?

return binUsageValue, nil
}

func (s *OffchainStore) FindReservationBin(ctx context.Context, accountID string, binIndex uint64) (*ReservationBin, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this used?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope atm, it was intended for an API, which i now realized I've yet to add

}

// Find all reservation bins for a given account
func (s *OffchainStore) FindReservationBins(ctx context.Context, accountID string) ([]ReservationBin, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this used?


// OperatorInfo contains information about an operator which is stored on the blockchain state,
// corresponding to a particular quorum
type ActiveReservation struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: neeed to add QuorumNumbers as discussed

// Existing fields
Commitment core.G1Point
DataLength uint32
BlobQuorumParams []BlobQuorumParam
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can just be a list of QuorumNumbers, per the new header spec.

BinIndex uint64

Signature []byte
BlobSize uint32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is redundant with DataLength. We should decide on canonical naming.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

went with DataLength

}

// Protocol defines parameters: FixedFeePerByte; fine to leave global rate-limit offchain atm
type OnDemand struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this used?

Version uint32
AccountID string
Nonce uint32 // use nonce to prevent duplicate payments in the same reservation window
BinIndex uint64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like uint32 would be sufficient for this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated to be more memory efficient; I'm assuming we don't need to worry about an index in the unit of seconds (previously global bin index is in seconds so 2^32 in unix let this go up to Jan 19, 2038, but with that being removed, uint32 is fine for reservation bin indices

I also updated start & end epochs to use uint32, as well as PricePerChargeable (2^64 gwei ~= 18M Eth; uint32 => ~4ETH), MinChargeableSize (in bytes, this is ~4GB/s), and ReservationWindow (136years but uint16 is only 18hours if we are counting in seconds; we could instead use hour as window unit and allow for 7years).

Left GlobalBytesPerSecond with uint64 for the aspiration of supporting rate-limit greater than 4GB/s network-wide ✨

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants