Skip to content

Commit

Permalink
chore: inabox payments e2e test
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen committed Oct 4, 2024
1 parent dd2ab52 commit 9295cbd
Show file tree
Hide file tree
Showing 7 changed files with 218 additions and 4 deletions.
1 change: 1 addition & 0 deletions api/clients/disperser_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func NewConfig(hostname, port string, timeout time.Duration, useSecureGrpcFlag b

type DisperserClient interface {
DisperseBlob(ctx context.Context, data []byte, customQuorums []uint8) (*disperser.BlobStatus, []byte, error)
PaidDisperseBlob(ctx context.Context, data []byte, customQuorums []uint8) (*disperser.BlobStatus, []byte, error)
DisperseBlobAuthenticated(ctx context.Context, data []byte, customQuorums []uint8) (*disperser.BlobStatus, []byte, error)
GetBlobStatus(ctx context.Context, key []byte) (*disperser_rpc.BlobStatusReply, error)
RetrieveBlob(ctx context.Context, batchHeaderHash []byte, blobIndex uint32) ([]byte, error)
Expand Down
22 changes: 22 additions & 0 deletions api/clients/mock/disperser_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,28 @@ func (c *MockDisperserClient) DisperseBlob(ctx context.Context, data []byte, quo
return status, key, err
}

func (c *MockDisperserClient) PaidDisperseBlob(ctx context.Context, data []byte, quorums []uint8) (*disperser.BlobStatus, []byte, error) {
//TODO: add payment logic to mocks?
args := c.Called(data, quorums)
var status *disperser.BlobStatus
if args.Get(0) != nil {
status = (args.Get(0)).(*disperser.BlobStatus)
}
var key []byte
if args.Get(1) != nil {
key = (args.Get(1)).([]byte)
}
var err error
if args.Get(2) != nil {
err = (args.Get(2)).(error)
}

keyStr := base64.StdEncoding.EncodeToString(key)
c.mockRequestIDStore[keyStr] = data

return status, key, err
}

func (c *MockDisperserClient) GetBlobStatus(ctx context.Context, key []byte) (*disperser_rpc.BlobStatusReply, error) {
args := c.Called(key)
var reply *disperser_rpc.BlobStatusReply
Expand Down
2 changes: 1 addition & 1 deletion encoding/utils/codec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
)

// ConvertByPaddingEmptyByte takes bytes and insert an empty byte at the front of every 31 byte.
// The empty byte is padded at the low address, because we use big endian to interpret a fiedl element.
// The empty byte is padded at the low address, because we use big endian to interpret a field element.
// This ensure every 32 bytes are within the valid range of a field element for bn254 curve.
// If the input data is not a multiple of 31, the reminder is added to the output by
// inserting a 0 and the reminder. The output does not necessarily be a multipler of 32
Expand Down
11 changes: 9 additions & 2 deletions inabox/deploy/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,15 @@ func (env *Config) generateDisperserVars(ind int, key, address, logPath, dbPath,
DISPERSER_SERVER_PER_USER_UNAUTH_BYTE_RATE: "32000,32000",
DISPERSER_SERVER_TOTAL_UNAUTH_BLOB_RATE: "10,10",
DISPERSER_SERVER_PER_USER_UNAUTH_BLOB_RATE: "2,2",
DISPERSER_SERVER_ENABLE_RATELIMITER: "true",
DISPERSER_SERVER_ALLOWLIST: "3.221.120.68/0/1000/10485760,18.214.113.214/0/1000/10485760",

DISPERSER_SERVER_ENABLE_PAYMENT_METERER: "true",
DISPERSER_SERVER_RESERVATION_WINDOW: "60",
DISPERSER_SERVER_MIN_CHARGEABLE_SIZE: "100",
DISPERSER_SERVER_PRICE_PER_CHARGEABLE: "100",
DISPERSER_SERVER_ON_DEMAND_GLOBAL_LIMIT: "1000",

DISPERSER_SERVER_ENABLE_RATELIMITER: "true",
DISPERSER_SERVER_ALLOWLIST: "3.221.120.68/0/1000/10485760,18.214.113.214/0/1000/10485760",

DISPERSER_SERVER_RETRIEVAL_BLOB_RATE: "4",
DISPERSER_SERVER_RETRIEVAL_BYTE_RATE: "10000000",
Expand Down
10 changes: 10 additions & 0 deletions inabox/deploy/env_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ type DisperserVars struct {
DISPERSER_SERVER_RETRIEVAL_BLOB_RATE string

DISPERSER_SERVER_RETRIEVAL_BYTE_RATE string

DISPERSER_SERVER_ENABLE_PAYMENT_METERER string

DISPERSER_SERVER_RESERVATION_WINDOW string

DISPERSER_SERVER_MIN_CHARGEABLE_SIZE string

DISPERSER_SERVER_PRICE_PER_CHARGEABLE string

DISPERSER_SERVER_ON_DEMAND_GLOBAL_LIMIT string
}

func (vars DisperserVars) getEnvMap() map[string]string {
Expand Down
13 changes: 12 additions & 1 deletion inabox/tests/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
rollupbindings "github.com/Layr-Labs/eigenda/contracts/bindings/MockRollup"
"github.com/Layr-Labs/eigenda/core/auth"
"github.com/Layr-Labs/eigenda/disperser"
"github.com/Layr-Labs/eigenda/disperser/meterer"
"github.com/ethereum/go-ethereum/crypto"

"github.com/Layr-Labs/eigenda/encoding/utils/codec"
. "github.com/onsi/ginkgo/v2"
Expand All @@ -36,11 +38,20 @@ var _ = Describe("Inabox Integration", func() {
privateKeyHex := "0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcded"
signer := auth.NewLocalBlobRequestSigner(privateKeyHex)

privateKey, err := crypto.HexToECDSA(privateKeyHex[2:]) // Remove "0x" prefix
disp := clients.NewDisperserClient(&clients.Config{
Hostname: "localhost",
Port: "32003",
Timeout: 10 * time.Second,
}, signer, clients.Accountant{})
}, signer, clients.NewAccountant(meterer.ActiveReservation{
DataRate: 1000,
StartEpoch: 100,
EndEpoch: 200,
QuorumSplit: []byte{50, 50},
QuorumNumbers: []uint8{0, 1},
}, meterer.OnDemandPayment{
CumulativePayment: 500,
}, 60, 100, 100, privateKey))

Expect(disp).To(Not(BeNil()))

Expand Down
163 changes: 163 additions & 0 deletions inabox/tests/payment_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package integration_test

import (
"bytes"
"context"
"crypto/rand"
"math"
"time"

"github.com/Layr-Labs/eigenda/api/clients"
disperserpb "github.com/Layr-Labs/eigenda/api/grpc/disperser"
"github.com/Layr-Labs/eigenda/core/auth"
"github.com/Layr-Labs/eigenda/disperser"
"github.com/Layr-Labs/eigenda/disperser/meterer"
"github.com/ethereum/go-ethereum/crypto"

"github.com/Layr-Labs/eigenda/encoding/utils/codec"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

var _ = Describe("Inabox Integration", func() {
It("test payment metering", func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*15)
defer cancel()

gasTipCap, gasFeeCap, err := ethClient.GetLatestGasCaps(ctx)
Expect(err).To(BeNil())

privateKeyHex := "0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcded"
signer := auth.NewLocalBlobRequestSigner(privateKeyHex)
reservationBytesLimit := uint64(1024)
paymentLimit := meterer.TokenAmount(512)
minimumChargeableSize := uint32(128)
minimumChargeablePayment := uint32(128)
// Disperser configs: rsv window 60s, min chargeable size 100 bytes, price per chargeable 100, global limit 500
// -> need to check the mock, can't just use any account for the disperser client, consider using static wallets...

// say with dataLength of 150 bytes, within a window, we can send 7 blobs with overflow of 50 bytes
// the later requests is then 250 bytes, try send 4 blobs within a second, 2 of them would fail but not charged for
// wait for a second, retry, and that should allow ondemand to work
privateKey, err := crypto.HexToECDSA(privateKeyHex[2:]) // Remove "0x" prefix
disp := clients.NewDisperserClient(&clients.Config{
Hostname: "localhost",
Port: "32003",
Timeout: 10 * time.Second,
}, signer, clients.NewAccountant(meterer.ActiveReservation{
DataRate: reservationBytesLimit,
StartEpoch: 0,
EndEpoch: math.MaxUint32,
QuorumSplit: []byte{50, 50},
QuorumNumbers: []uint8{0, 1},
}, meterer.OnDemandPayment{
CumulativePayment: paymentLimit,
}, 60, minimumChargeableSize, minimumChargeablePayment, privateKey))

Expect(disp).To(Not(BeNil()))

singleBlobSize := minimumChargeableSize
data := make([]byte, singleBlobSize)
_, err = rand.Read(data)
Expect(err).To(BeNil())

paddedData := codec.ConvertByPaddingEmptyByte(data)

// requests that count towards either reservation or payments
paidBlobStatus := []disperser.BlobStatus{}
paidKeys := [][]byte{}
for i := 0; i < (int(reservationBytesLimit)+int(paymentLimit))/int(singleBlobSize); i++ {
blobStatus, key, err := disp.PaidDisperseBlob(ctx, paddedData, []uint8{})
Expect(err).To(BeNil())
Expect(key).To(Not(BeNil()))
Expect(blobStatus).To(Not(BeNil()))
Expect(*blobStatus).To(Equal(disperser.Processing))
paidBlobStatus = append(paidBlobStatus, *blobStatus)
paidKeys = append(paidKeys, key)
}

// requests that aren't covered by reservation or on-demand payment
blobStatus, key, err := disp.PaidDisperseBlob(ctx, paddedData, []uint8{})
Expect(err).To(Not(BeNil()))
Expect(key).To(BeNil())
Expect(blobStatus).To(BeNil())

ticker := time.NewTicker(time.Second * 1)
defer ticker.Stop()

var replies []*disperserpb.BlobStatusReply
// now make sure all the paid blobs get confirmed
loop:
for {
select {
case <-ctx.Done():
Fail("timed out")
case <-ticker.C:
notConfirmed := false
for i, key := range paidKeys {
reply, err := disp.GetBlobStatus(context.Background(), key)
Expect(err).To(BeNil())
Expect(reply).To(Not(BeNil()))
status, err := disperser.FromBlobStatusProto(reply.GetStatus())
if *status != disperser.Confirmed {
notConfirmed = true
}
Expect(err).To(BeNil())
Expect(status).To(Equal(disperser.Confirmed))
replies = append(replies, reply)
paidBlobStatus[i] = *status
}

if notConfirmed {
mineAnvilBlocks(numConfirmations + 1)
continue
}

for _, reply := range replies {
blobHeader := blobHeaderFromProto(reply.GetInfo().GetBlobHeader())
verificationProof := blobVerificationProofFromProto(reply.GetInfo().GetBlobVerificationProof())
opts, err := ethClient.GetNoSendTransactOpts()
Expect(err).To(BeNil())
tx, err := mockRollup.PostCommitment(opts, blobHeader, verificationProof)
Expect(err).To(BeNil())
tx, err = ethClient.UpdateGas(ctx, tx, nil, gasTipCap, gasFeeCap)
Expect(err).To(BeNil())
err = ethClient.SendTransaction(ctx, tx)
Expect(err).To(BeNil())
mineAnvilBlocks(numConfirmations + 1)
_, err = ethClient.EnsureTransactionEvaled(ctx, tx, "PostCommitment")
Expect(err).To(BeNil())
}

break loop
}
}
for _, status := range paidBlobStatus {
Expect(status).To(Equal(disperser.Confirmed))
}

ctx, cancel = context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
for _, reply := range replies {
retrieved, err := retrievalClient.RetrieveBlob(ctx,
[32]byte(reply.GetInfo().GetBlobVerificationProof().GetBatchMetadata().GetBatchHeaderHash()),
reply.GetInfo().GetBlobVerificationProof().GetBlobIndex(),
uint(reply.GetInfo().GetBlobVerificationProof().GetBatchMetadata().GetBatchHeader().GetReferenceBlockNumber()),
[32]byte(reply.GetInfo().GetBlobVerificationProof().GetBatchMetadata().GetBatchHeader().GetBatchRoot()),
0, // retrieve blob 1 from quorum 0
)
Expect(err).To(BeNil())
restored := codec.RemoveEmptyByteFromPaddedBytes(retrieved)
Expect(bytes.TrimRight(restored, "\x00")).To(Equal(bytes.TrimRight(data, "\x00")))

_, err = retrievalClient.RetrieveBlob(ctx,
[32]byte(reply.GetInfo().GetBlobVerificationProof().GetBatchMetadata().GetBatchHeaderHash()),
reply.GetInfo().GetBlobVerificationProof().GetBlobIndex(),
uint(reply.GetInfo().GetBlobVerificationProof().GetBatchMetadata().GetBatchHeader().GetReferenceBlockNumber()),
[32]byte(reply.GetInfo().GetBlobVerificationProof().GetBatchMetadata().GetBatchHeader().GetBatchRoot()),
1, // retrieve blob 1 from quorum 1
)
Expect(err).NotTo(BeNil())
}
})
})

0 comments on commit 9295cbd

Please sign in to comment.