diff --git a/api/clients/disperser_client.go b/api/clients/disperser_client.go index 7bd91fc1b..485299884 100644 --- a/api/clients/disperser_client.go +++ b/api/clients/disperser_client.go @@ -35,6 +35,7 @@ func NewConfig(hostname, port string, timeout time.Duration, useSecureGrpcFlag b type DisperserClient interface { DisperseBlob(ctx context.Context, data []byte, customQuorums []uint8) (*disperser.BlobStatus, []byte, error) + PaidDisperseBlob(ctx context.Context, data []byte, customQuorums []uint8) (*disperser.BlobStatus, []byte, error) DisperseBlobAuthenticated(ctx context.Context, data []byte, customQuorums []uint8) (*disperser.BlobStatus, []byte, error) GetBlobStatus(ctx context.Context, key []byte) (*disperser_rpc.BlobStatusReply, error) RetrieveBlob(ctx context.Context, batchHeaderHash []byte, blobIndex uint32) ([]byte, error) diff --git a/api/clients/mock/disperser_client.go b/api/clients/mock/disperser_client.go index 0488af63c..4b05f5e0a 100644 --- a/api/clients/mock/disperser_client.go +++ b/api/clients/mock/disperser_client.go @@ -81,6 +81,28 @@ func (c *MockDisperserClient) DisperseBlob(ctx context.Context, data []byte, quo return status, key, err } +func (c *MockDisperserClient) PaidDisperseBlob(ctx context.Context, data []byte, quorums []uint8) (*disperser.BlobStatus, []byte, error) { + //TODO: add payment logic to mocks? + args := c.Called(data, quorums) + var status *disperser.BlobStatus + if args.Get(0) != nil { + status = (args.Get(0)).(*disperser.BlobStatus) + } + var key []byte + if args.Get(1) != nil { + key = (args.Get(1)).([]byte) + } + var err error + if args.Get(2) != nil { + err = (args.Get(2)).(error) + } + + keyStr := base64.StdEncoding.EncodeToString(key) + c.mockRequestIDStore[keyStr] = data + + return status, key, err +} + func (c *MockDisperserClient) GetBlobStatus(ctx context.Context, key []byte) (*disperser_rpc.BlobStatusReply, error) { args := c.Called(key) var reply *disperser_rpc.BlobStatusReply diff --git a/encoding/utils/codec/codec.go b/encoding/utils/codec/codec.go index eb08acc47..7445499fc 100644 --- a/encoding/utils/codec/codec.go +++ b/encoding/utils/codec/codec.go @@ -5,7 +5,7 @@ import ( ) // ConvertByPaddingEmptyByte takes bytes and insert an empty byte at the front of every 31 byte. -// The empty byte is padded at the low address, because we use big endian to interpret a fiedl element. +// The empty byte is padded at the low address, because we use big endian to interpret a field element. // This ensure every 32 bytes are within the valid range of a field element for bn254 curve. // If the input data is not a multiple of 31, the reminder is added to the output by // inserting a 0 and the reminder. The output does not necessarily be a multipler of 32 diff --git a/inabox/deploy/config.go b/inabox/deploy/config.go index 01795d65f..ef193e76b 100644 --- a/inabox/deploy/config.go +++ b/inabox/deploy/config.go @@ -175,8 +175,15 @@ func (env *Config) generateDisperserVars(ind int, key, address, logPath, dbPath, DISPERSER_SERVER_PER_USER_UNAUTH_BYTE_RATE: "32000,32000", DISPERSER_SERVER_TOTAL_UNAUTH_BLOB_RATE: "10,10", DISPERSER_SERVER_PER_USER_UNAUTH_BLOB_RATE: "2,2", - DISPERSER_SERVER_ENABLE_RATELIMITER: "true", - DISPERSER_SERVER_ALLOWLIST: "3.221.120.68/0/1000/10485760,18.214.113.214/0/1000/10485760", + + DISPERSER_SERVER_ENABLE_PAYMENT_METERER: "true", + DISPERSER_SERVER_RESERVATION_WINDOW: "60", + DISPERSER_SERVER_MIN_CHARGEABLE_SIZE: "100", + DISPERSER_SERVER_PRICE_PER_CHARGEABLE: "100", + DISPERSER_SERVER_ON_DEMAND_GLOBAL_LIMIT: "1000", + + DISPERSER_SERVER_ENABLE_RATELIMITER: "true", + DISPERSER_SERVER_ALLOWLIST: "3.221.120.68/0/1000/10485760,18.214.113.214/0/1000/10485760", DISPERSER_SERVER_RETRIEVAL_BLOB_RATE: "4", DISPERSER_SERVER_RETRIEVAL_BYTE_RATE: "10000000", diff --git a/inabox/deploy/env_vars.go b/inabox/deploy/env_vars.go index f701ce64d..c419feb2f 100644 --- a/inabox/deploy/env_vars.go +++ b/inabox/deploy/env_vars.go @@ -76,6 +76,16 @@ type DisperserVars struct { DISPERSER_SERVER_RETRIEVAL_BLOB_RATE string DISPERSER_SERVER_RETRIEVAL_BYTE_RATE string + + DISPERSER_SERVER_ENABLE_PAYMENT_METERER string + + DISPERSER_SERVER_RESERVATION_WINDOW string + + DISPERSER_SERVER_MIN_CHARGEABLE_SIZE string + + DISPERSER_SERVER_PRICE_PER_CHARGEABLE string + + DISPERSER_SERVER_ON_DEMAND_GLOBAL_LIMIT string } func (vars DisperserVars) getEnvMap() map[string]string { diff --git a/inabox/tests/integration_test.go b/inabox/tests/integration_test.go index ed9c7b0d9..d1307c035 100644 --- a/inabox/tests/integration_test.go +++ b/inabox/tests/integration_test.go @@ -12,6 +12,8 @@ import ( rollupbindings "github.com/Layr-Labs/eigenda/contracts/bindings/MockRollup" "github.com/Layr-Labs/eigenda/core/auth" "github.com/Layr-Labs/eigenda/disperser" + "github.com/Layr-Labs/eigenda/disperser/meterer" + "github.com/ethereum/go-ethereum/crypto" "github.com/Layr-Labs/eigenda/encoding/utils/codec" . "github.com/onsi/ginkgo/v2" @@ -36,11 +38,20 @@ var _ = Describe("Inabox Integration", func() { privateKeyHex := "0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcded" signer := auth.NewLocalBlobRequestSigner(privateKeyHex) + privateKey, err := crypto.HexToECDSA(privateKeyHex[2:]) // Remove "0x" prefix disp := clients.NewDisperserClient(&clients.Config{ Hostname: "localhost", Port: "32003", Timeout: 10 * time.Second, - }, signer, clients.Accountant{}) + }, signer, clients.NewAccountant(meterer.ActiveReservation{ + DataRate: 1000, + StartEpoch: 100, + EndEpoch: 200, + QuorumSplit: []byte{50, 50}, + QuorumNumbers: []uint8{0, 1}, + }, meterer.OnDemandPayment{ + CumulativePayment: 500, + }, 60, 100, 100, privateKey)) Expect(disp).To(Not(BeNil())) diff --git a/inabox/tests/payment_test.go b/inabox/tests/payment_test.go new file mode 100644 index 000000000..4a4bfdfd7 --- /dev/null +++ b/inabox/tests/payment_test.go @@ -0,0 +1,163 @@ +package integration_test + +import ( + "bytes" + "context" + "crypto/rand" + "math" + "time" + + "github.com/Layr-Labs/eigenda/api/clients" + disperserpb "github.com/Layr-Labs/eigenda/api/grpc/disperser" + "github.com/Layr-Labs/eigenda/core/auth" + "github.com/Layr-Labs/eigenda/disperser" + "github.com/Layr-Labs/eigenda/disperser/meterer" + "github.com/ethereum/go-ethereum/crypto" + + "github.com/Layr-Labs/eigenda/encoding/utils/codec" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("Inabox Integration", func() { + It("test payment metering", func() { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) + defer cancel() + + gasTipCap, gasFeeCap, err := ethClient.GetLatestGasCaps(ctx) + Expect(err).To(BeNil()) + + privateKeyHex := "0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcded" + signer := auth.NewLocalBlobRequestSigner(privateKeyHex) + reservationBytesLimit := uint64(1024) + paymentLimit := meterer.TokenAmount(512) + minimumChargeableSize := uint32(128) + minimumChargeablePayment := uint32(128) + // Disperser configs: rsv window 60s, min chargeable size 100 bytes, price per chargeable 100, global limit 500 + // -> need to check the mock, can't just use any account for the disperser client, consider using static wallets... + + // say with dataLength of 150 bytes, within a window, we can send 7 blobs with overflow of 50 bytes + // the later requests is then 250 bytes, try send 4 blobs within a second, 2 of them would fail but not charged for + // wait for a second, retry, and that should allow ondemand to work + privateKey, err := crypto.HexToECDSA(privateKeyHex[2:]) // Remove "0x" prefix + disp := clients.NewDisperserClient(&clients.Config{ + Hostname: "localhost", + Port: "32003", + Timeout: 10 * time.Second, + }, signer, clients.NewAccountant(meterer.ActiveReservation{ + DataRate: reservationBytesLimit, + StartEpoch: 0, + EndEpoch: math.MaxUint32, + QuorumSplit: []byte{50, 50}, + QuorumNumbers: []uint8{0, 1}, + }, meterer.OnDemandPayment{ + CumulativePayment: paymentLimit, + }, 60, minimumChargeableSize, minimumChargeablePayment, privateKey)) + + Expect(disp).To(Not(BeNil())) + + singleBlobSize := minimumChargeableSize + data := make([]byte, singleBlobSize) + _, err = rand.Read(data) + Expect(err).To(BeNil()) + + paddedData := codec.ConvertByPaddingEmptyByte(data) + + // requests that count towards either reservation or payments + paidBlobStatus := []disperser.BlobStatus{} + paidKeys := [][]byte{} + for i := 0; i < (int(reservationBytesLimit)+int(paymentLimit))/int(singleBlobSize); i++ { + blobStatus, key, err := disp.PaidDisperseBlob(ctx, paddedData, []uint8{}) + Expect(err).To(BeNil()) + Expect(key).To(Not(BeNil())) + Expect(blobStatus).To(Not(BeNil())) + Expect(*blobStatus).To(Equal(disperser.Processing)) + paidBlobStatus = append(paidBlobStatus, *blobStatus) + paidKeys = append(paidKeys, key) + } + + // requests that aren't covered by reservation or on-demand payment + blobStatus, key, err := disp.PaidDisperseBlob(ctx, paddedData, []uint8{}) + Expect(err).To(Not(BeNil())) + Expect(key).To(BeNil()) + Expect(blobStatus).To(BeNil()) + + ticker := time.NewTicker(time.Second * 1) + defer ticker.Stop() + + var replies []*disperserpb.BlobStatusReply + // now make sure all the paid blobs get confirmed + loop: + for { + select { + case <-ctx.Done(): + Fail("timed out") + case <-ticker.C: + notConfirmed := false + for i, key := range paidKeys { + reply, err := disp.GetBlobStatus(context.Background(), key) + Expect(err).To(BeNil()) + Expect(reply).To(Not(BeNil())) + status, err := disperser.FromBlobStatusProto(reply.GetStatus()) + if *status != disperser.Confirmed { + notConfirmed = true + } + Expect(err).To(BeNil()) + Expect(status).To(Equal(disperser.Confirmed)) + replies = append(replies, reply) + paidBlobStatus[i] = *status + } + + if notConfirmed { + mineAnvilBlocks(numConfirmations + 1) + continue + } + + for _, reply := range replies { + blobHeader := blobHeaderFromProto(reply.GetInfo().GetBlobHeader()) + verificationProof := blobVerificationProofFromProto(reply.GetInfo().GetBlobVerificationProof()) + opts, err := ethClient.GetNoSendTransactOpts() + Expect(err).To(BeNil()) + tx, err := mockRollup.PostCommitment(opts, blobHeader, verificationProof) + Expect(err).To(BeNil()) + tx, err = ethClient.UpdateGas(ctx, tx, nil, gasTipCap, gasFeeCap) + Expect(err).To(BeNil()) + err = ethClient.SendTransaction(ctx, tx) + Expect(err).To(BeNil()) + mineAnvilBlocks(numConfirmations + 1) + _, err = ethClient.EnsureTransactionEvaled(ctx, tx, "PostCommitment") + Expect(err).To(BeNil()) + } + + break loop + } + } + for _, status := range paidBlobStatus { + Expect(status).To(Equal(disperser.Confirmed)) + } + + ctx, cancel = context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + for _, reply := range replies { + retrieved, err := retrievalClient.RetrieveBlob(ctx, + [32]byte(reply.GetInfo().GetBlobVerificationProof().GetBatchMetadata().GetBatchHeaderHash()), + reply.GetInfo().GetBlobVerificationProof().GetBlobIndex(), + uint(reply.GetInfo().GetBlobVerificationProof().GetBatchMetadata().GetBatchHeader().GetReferenceBlockNumber()), + [32]byte(reply.GetInfo().GetBlobVerificationProof().GetBatchMetadata().GetBatchHeader().GetBatchRoot()), + 0, // retrieve blob 1 from quorum 0 + ) + Expect(err).To(BeNil()) + restored := codec.RemoveEmptyByteFromPaddedBytes(retrieved) + Expect(bytes.TrimRight(restored, "\x00")).To(Equal(bytes.TrimRight(data, "\x00"))) + + _, err = retrievalClient.RetrieveBlob(ctx, + [32]byte(reply.GetInfo().GetBlobVerificationProof().GetBatchMetadata().GetBatchHeaderHash()), + reply.GetInfo().GetBlobVerificationProof().GetBlobIndex(), + uint(reply.GetInfo().GetBlobVerificationProof().GetBatchMetadata().GetBatchHeader().GetReferenceBlockNumber()), + [32]byte(reply.GetInfo().GetBlobVerificationProof().GetBatchMetadata().GetBatchHeader().GetBatchRoot()), + 1, // retrieve blob 1 from quorum 1 + ) + Expect(err).NotTo(BeNil()) + } + }) +})