Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Style: improve api clients comments #780

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions api/clients/codecs/blob_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (

type BlobEncodingVersion byte

// All blob encodings are IFFT'd before being dispersed
const (
// This minimal blob encoding includes a version byte, a length uint32, and 31 byte field element mapping.
// This minimal blob encoding contains a 32 byte header = [0x00, version byte, uint32 len of data, 0x00, 0x00,...]
// followed by the encoded data [0x00, 31 bytes of data, 0x00, 31 bytes of data,...]
DefaultBlobEncoding BlobEncodingVersion = 0x0
)

Expand All @@ -30,6 +30,7 @@ func GenericDecodeBlob(data []byte) ([]byte, error) {
if len(data) <= 32 {
return nil, fmt.Errorf("data is not of length greater than 32 bytes: %d", len(data))
}
// TODO: why [1]? What's in [0]?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bxue-l2 you prob know this

version := BlobEncodingVersion(data[1])
codec, err := BlobEncodingVersionToCodec(version)
if err != nil {
Expand All @@ -38,7 +39,7 @@ func GenericDecodeBlob(data []byte) ([]byte, error) {

data, err = codec.DecodeBlob(data)
if err != nil {
return nil, err
return nil, fmt.Errorf("unable to decode blob: %w", err)
}

return data, nil
Expand Down
23 changes: 15 additions & 8 deletions api/clients/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,32 @@ import (
)

type EigenDAClientConfig struct {
// RPC is the HTTP provider URL for the Data Availability node.
// RPC is the HTTP provider URL for the EigenDA Disperser
RPC string

// The total amount of time that the client will spend waiting for EigenDA to confirm a blob
// Timeout used when making dispersals to the EigenDA Disperser
// TODO: we should change this param as its name is quite confusing
ResponseTimeout time.Duration

// The total amount of time that the client will spend waiting for EigenDA
// to confirm a blob after it has been dispersed
// Note that reasonable values for this field will depend on the value of WaitForFinalization.
StatusQueryTimeout time.Duration

// The amount of time to wait between status queries of a newly dispersed blob
StatusQueryRetryInterval time.Duration

// The total amount of time that the client will waiting for a response from the EigenDA disperser
ResponseTimeout time.Duration
// If true, will wait for the blob to finalize, if false, will wait only for the blob to confirm.
WaitForFinalization bool

// The quorum IDs to write blobs to using this client. Should not include default quorums 0 or 1.
CustomQuorumIDs []uint

// Signer private key in hex encoded format. This key should not be associated with an Ethereum address holding any funds.
// Signer private key in hex encoded format. This key is currently purely used for authn/authz on the disperser.
// For security, it should not be associated with an Ethereum address holding any funds.
// This might change once we introduce payments.
// OPTIONAL: this value is optional, and if set to "", will result in a read-only eigenDA client,
// that can retrieve blobs but cannot disperse blobs.
SignerPrivateKeyHex string

// Whether to disable TLS for an insecure connection when connecting to a local EigenDA disperser instance.
Expand All @@ -37,9 +47,6 @@ type EigenDAClientConfig struct {
// the commitment. With this mode disabled, you will need to supply the entire blob to perform a verification
// that any part of the data matches the KZG commitment.
DisablePointVerificationMode bool

// If true, will wait for the blob to finalize, if false, will wait only for the blob to confirm.
WaitForFinalization bool
}

func (c *EigenDAClientConfig) CheckAndSetDefaults() error {
Expand Down
3 changes: 3 additions & 0 deletions api/clients/disperser_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
type Config struct {
Hostname string
Port string
// BlobDispersal Timeouts for both authenticated and unauthenticated dispersals
// GetBlobStatus and RetrieveBlob timeouts are hardcoded to 60seconds
// TODO: do we want to add config timeouts for those separate requests?
Timeout time.Duration
UseSecureGrpcFlag bool
}
Expand Down
39 changes: 26 additions & 13 deletions api/clients/eigenda_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,19 @@ import (
"github.com/ethereum/go-ethereum/log"
)

// IEigenDAClient is a wrapper around the DisperserClient interface which
// encodes blobs before dispersing them, and decodes them after retrieving them.
type IEigenDAClient interface {
GetBlob(ctx context.Context, batchHeaderHash []byte, blobIndex uint32) ([]byte, error)
PutBlob(ctx context.Context, txData []byte) (*grpcdisperser.BlobInfo, error)
GetCodec() codecs.BlobCodec
}

// EigenDAClient is a wrapper around the DisperserClient which
// encodes blobs before dispersing them, and decodes them after retrieving them.
type EigenDAClient struct {
// TODO: all of these should be private, to prevent users from using them directly,
// which breaks encapsulation and makes it hard for us to do refactors or changes
Comment on lines +30 to +31
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this I personally think is a refactor worth doing... but it might break some client code... :\

Config EigenDAClientConfig
Log log.Logger
Client DisperserClient
Expand All @@ -46,6 +52,7 @@ func NewEigenDAClient(log log.Logger, config EigenDAClientConfig) (*EigenDAClien
if len(config.SignerPrivateKeyHex) == 64 {
signer = auth.NewLocalBlobRequestSigner(config.SignerPrivateKeyHex)
} else if len(config.SignerPrivateKeyHex) == 0 {
// noop signer is used when we need a read-only eigenda client
signer = auth.NewLocalNoopSigner()
} else {
return nil, fmt.Errorf("invalid length for signer private key")
Expand Down Expand Up @@ -74,6 +81,8 @@ func NewEigenDAClient(log log.Logger, config EigenDAClientConfig) (*EigenDAClien
}, nil
}

// Deprecated: do not rely on this function. Do not use m.Codec directly either.
// These will eventually be removed and not exposed.
func (m EigenDAClient) GetCodec() codecs.BlobCodec {
return m.Codec
}
Expand All @@ -84,27 +93,30 @@ func (m EigenDAClient) GetCodec() codecs.BlobCodec {
// data, which is necessary for generating KZG proofs for data's correctness.
// The function handles potential errors during blob retrieval, data length
// checks, and decoding processes.
// TODO: should we use a pointer receiver instead, to prevent unnecessary copying of the EigenDAClient struct?
func (m EigenDAClient) GetBlob(ctx context.Context, batchHeaderHash []byte, blobIndex uint32) ([]byte, error) {
data, err := m.Client.RetrieveBlob(ctx, batchHeaderHash, blobIndex)
if err != nil {
return nil, err
return nil, fmt.Errorf("could not retrieve blob: %w", err)
}

if len(data) == 0 {
// TODO: explain when/why/how this can happen
return nil, fmt.Errorf("blob has length zero")
}
Comment on lines 103 to 106
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bxue-l2 do you know?


decodedData, err := m.Codec.DecodeBlob(data)
if err != nil {
return nil, fmt.Errorf("error getting blob: %w", err)
return nil, fmt.Errorf("error decoding blob: %w", err)
}

return decodedData, nil
}

// PutBlob encodes and writes a blob to EigenDA, waiting for it to be finalized
// before returning. This function is resiliant to transient failures and
// before returning. This function is resilient to transient failures and
// timeouts.
// TODO: should we use a pointer receiver instead, to prevent unnecessary copying of the EigenDAClient struct?
func (m EigenDAClient) PutBlob(ctx context.Context, data []byte) (*grpcdisperser.BlobInfo, error) {
resultChan, errorChan := m.PutBlobAsync(ctx, data)
select { // no timeout here because we depend on the configured timeout in PutBlobAsync
Expand All @@ -115,6 +127,7 @@ func (m EigenDAClient) PutBlob(ctx context.Context, data []byte) (*grpcdisperser
}
}

// TODO: should we use a pointer receiver instead, to prevent unnecessary copying of the EigenDAClient struct?
func (m EigenDAClient) PutBlobAsync(ctx context.Context, data []byte) (resultChan chan *grpcdisperser.BlobInfo, errChan chan error) {
resultChan = make(chan *grpcdisperser.BlobInfo, 1)
errChan = make(chan error, 1)
Expand All @@ -137,11 +150,13 @@ func (m EigenDAClient) putBlob(ctx context.Context, rawData []byte, resultChan c
return
}

// TODO: should we just use uint8s directly in the config?
customQuorumNumbers := make([]uint8, len(m.Config.CustomQuorumIDs))
for i, e := range m.Config.CustomQuorumIDs {
customQuorumNumbers[i] = uint8(e)
}
// disperse blob
// TODO: can we consider passing a requestID directly to requests, to get idempotency and tracing?
blobStatus, requestID, err := m.Client.DisperseBlobAuthenticated(ctx, data, customQuorumNumbers)
if err != nil {
errChan <- fmt.Errorf("error initializing DisperseBlobAuthenticated() client: %w", err)
Expand All @@ -150,13 +165,12 @@ func (m EigenDAClient) putBlob(ctx context.Context, rawData []byte, resultChan c

// process response
if *blobStatus == disperser.Failed {
m.Log.Error("Unable to disperse blob to EigenDA, aborting", "err", err)
errChan <- fmt.Errorf("reply status is %d", blobStatus)
errChan <- fmt.Errorf("unable to disperse blob to eigenda (reply status %d): %w", blobStatus, err)
return
}

base64RequestID := base64.StdEncoding.EncodeToString(requestID)
m.Log.Info("Blob dispersed to EigenDA, now waiting for confirmation", "requestID", base64RequestID)
m.Log.Info("Blob accepted by EigenDA disperser, now polling for status updates", "requestID", base64RequestID)

ticker := time.NewTicker(m.Config.StatusQueryRetryInterval)
defer ticker.Stop()
Expand All @@ -175,25 +189,23 @@ func (m EigenDAClient) putBlob(ctx context.Context, rawData []byte, resultChan c
case <-ticker.C:
statusRes, err := m.Client.GetBlobStatus(ctx, requestID)
if err != nil {
m.Log.Error("Unable to retrieve blob dispersal status, will retry", "requestID", base64RequestID, "err", err)
m.Log.Warn("Unable to retrieve blob dispersal status, will retry", "requestID", base64RequestID, "err", err)
continue
}

switch statusRes.Status {
case grpcdisperser.BlobStatus_PROCESSING, grpcdisperser.BlobStatus_DISPERSING:
// to prevent log clutter, we only log at info level once
if alreadyWaitingForDispersal {
m.Log.Debug("Blob submitted, waiting for dispersal from EigenDA", "requestID", base64RequestID)
m.Log.Debug("Blob is being processed by the EigenDA network", "requestID", base64RequestID)
} else {
m.Log.Info("Blob submitted, waiting for dispersal from EigenDA", "requestID", base64RequestID)
m.Log.Info("Blob is being processed by the EigenDA network", "requestID", base64RequestID)
alreadyWaitingForDispersal = true
}
case grpcdisperser.BlobStatus_FAILED:
m.Log.Error("EigenDA blob dispersal failed in processing", "requestID", base64RequestID, "err", err)
errChan <- fmt.Errorf("EigenDA blob dispersal failed in processing, requestID=%s: %w", base64RequestID, err)
return
case grpcdisperser.BlobStatus_INSUFFICIENT_SIGNATURES:
m.Log.Error("EigenDA blob dispersal failed in processing with insufficient signatures", "requestID", base64RequestID, "err", err)
errChan <- fmt.Errorf("EigenDA blob dispersal failed in processing with insufficient signatures, requestID=%s: %w", base64RequestID, err)
return
case grpcdisperser.BlobStatus_CONFIRMED:
Expand All @@ -212,11 +224,12 @@ func (m EigenDAClient) putBlob(ctx context.Context, rawData []byte, resultChan c
}
case grpcdisperser.BlobStatus_FINALIZED:
batchHeaderHashHex := fmt.Sprintf("0x%s", hex.EncodeToString(statusRes.Info.BlobVerificationProof.BatchMetadata.BatchHeaderHash))
m.Log.Info("Successfully dispersed blob to EigenDA", "requestID", base64RequestID, "batchHeaderHash", batchHeaderHashHex)
m.Log.Info("EigenDA blob finalized", "requestID", base64RequestID, "batchHeaderHash", batchHeaderHashHex)
resultChan <- statusRes.Info
return
default:
errChan <- fmt.Errorf("EigenDA blob dispersal failed in processing with reply status %d", statusRes.Status)
// this should never happen. If it does, the blob is in a heisenberg state... it could either eventually get confirmed or fail
errChan <- fmt.Errorf("unknown reply status %d. ask for assistance from EigenDA team, using requestID %s", statusRes.Status, base64RequestID)
return
}
}
Expand Down
5 changes: 3 additions & 2 deletions api/clients/eigenda_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func TestPutRetrieveBlobIFFTNoDecodeSuccess(t *testing.T) {
(disperserClient.On("RetrieveBlob", mock.Anything, mock.Anything, mock.Anything).
Return(nil, nil).Once()) // pass nil in as the return blob to tell the mock to return the corresponding blob
logger := log.NewLogger(log.DiscardHandler())
ifftCodec := codecs.NewIFFTCodec(codecs.NewDefaultBlobCodec())
eigendaClient := clients.EigenDAClient{
Log: logger,
Config: clients.EigenDAClientConfig{
Expand All @@ -138,7 +139,7 @@ func TestPutRetrieveBlobIFFTNoDecodeSuccess(t *testing.T) {
WaitForFinalization: true,
},
Client: disperserClient,
Codec: codecs.NewIFFTCodec(codecs.NewDefaultBlobCodec()),
Codec: ifftCodec,
}
expectedBlob := []byte("dc49e7df326cfb2e7da5cf68f263e1898443ec2e862350606e7dfbda55ad10b5d61ed1d54baf6ae7a86279c1b4fa9c49a7de721dacb211264c1f5df31bade51c")
blobInfo, err := eigendaClient.PutBlob(context.Background(), expectedBlob)
Expand All @@ -148,7 +149,7 @@ func TestPutRetrieveBlobIFFTNoDecodeSuccess(t *testing.T) {

resultBlob, err := eigendaClient.GetBlob(context.Background(), []byte("mock-batch-header-hash"), 100)
require.NoError(t, err)
encodedBlob, err := eigendaClient.GetCodec().EncodeBlob(resultBlob)
encodedBlob, err := ifftCodec.EncodeBlob(resultBlob)
require.NoError(t, err)

resultBlob, err = codecs.NewIFFTCodec(codecs.NewDefaultBlobCodec()).DecodeBlob(encodedBlob)
Expand Down
6 changes: 3 additions & 3 deletions api/proto/disperser/disperser.proto
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,12 @@ message AuthenticationData {

message DisperseBlobRequest {
// The data to be dispersed.
// The size of data must be <= 2MiB. Every 32 bytes of data chunk is interpreted as an integer in big endian format
// The size of data must be <= 16MiB. Every 32 bytes of data chunk is interpreted as an integer in big endian format
// where the lower address has more significant bits. The integer must stay in the valid range to be interpreted
// as a field element on the bn254 curve. The valid range is
// 0 <= x < 21888242871839275222246405745257275088548364400416034343698204186575808495617
// containing slightly less than 254 bits and more than 253 bits. If any one of the 32 bytes chunk is outside the range,
// the whole request is deemed as invalid, and rejected.
// which is a 254 bit number (meaning the first 2 bits of each chunk must always be 00).
// If any one of the 32 bytes chunk is outside the range, the whole request is deemed as invalid, and rejected.
bytes data = 1;
// The quorums to which the blob will be sent, in addition to the required quorums which are configured
// on the EigenDA smart contract. If required quorums are included here, an error will be returned.
Expand Down
8 changes: 4 additions & 4 deletions encoding/utils/codec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
)

// ConvertByPaddingEmptyByte takes bytes and insert an empty byte at the front of every 31 byte.
// The empty byte is padded at the low address, because we use big endian to interpret a fiedl element.
// This ensure every 32 bytes are within the valid range of a field element for bn254 curve.
// If the input data is not a multiple of 31, the reminder is added to the output by
// inserting a 0 and the reminder. The output does not necessarily be a multipler of 32
// The empty byte is padded at the low address, because we use big endian to interpret a field element.
// This ensures every 32 bytes is within the valid range of a field element for bn254 curve.
// If the input data is not a multiple of 31, the remainder is added to the output by
// inserting a 0 and the remainder. The output is thus not necessarily a multiple of 32.
func ConvertByPaddingEmptyByte(data []byte) []byte {
dataSize := len(data)
parseSize := encoding.BYTES_PER_SYMBOL - 1
Expand Down
Loading