Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No delay when fetching blobs with known block and no attempt to recover blobs for unknown block #8927

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@

### Additions and Improvements
- Optimized blobs validation pipeline
- Remove delay when fetching blobs from the local EL on block arrival

### Bug Fixes
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,12 @@
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecarSchema;
import tech.pegasys.teku.spec.datastructures.blocks.BeaconBlock;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlockHeader;
import tech.pegasys.teku.spec.datastructures.blocks.blockbody.BeaconBlockBody;
import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.deneb.BeaconBlockBodyDeneb;
import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.deneb.BeaconBlockBodySchemaDeneb;
import tech.pegasys.teku.spec.datastructures.execution.BlobAndProof;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobIdentifier;
import tech.pegasys.teku.spec.datastructures.type.SszKZGCommitment;
import tech.pegasys.teku.spec.datastructures.type.SszKZGProof;
import tech.pegasys.teku.spec.logic.common.helpers.MiscHelpers;
Expand Down Expand Up @@ -266,6 +270,32 @@ public BlobSidecar constructBlobSidecar(
index, blob, commitment, proof, signedBeaconBlock.asHeader(), kzgCommitmentInclusionProof);
}

public BlobSidecar constructBlobSidecarFromBlobAndProof(
final BlobIdentifier blobIdentifier,
final BlobAndProof blobAndProof,
final BeaconBlockBodyDeneb beaconBlockBodyDeneb,
final SignedBeaconBlockHeader signedBeaconBlockHeader) {

final SszKZGCommitment sszKZGCommitment =
beaconBlockBodyDeneb.getBlobKzgCommitments().get(blobIdentifier.getIndex().intValue());

final BlobSidecar blobSidecar =
blobSidecarSchema.create(
blobIdentifier.getIndex(),
blobAndProof.blob(),
sszKZGCommitment,
new SszKZGProof(blobAndProof.proof()),
signedBeaconBlockHeader,
computeKzgCommitmentInclusionProof(blobIdentifier.getIndex(), beaconBlockBodyDeneb));

blobSidecar.markSignatureAsValidated();
blobSidecar.markKzgCommitmentInclusionProofAsValidated();
// assume kzg validation done by local EL
blobSidecar.markKzgAsValidated();

return blobSidecar;
}

public boolean verifyBlobKzgCommitmentInclusionProof(final BlobSidecar blobSidecar) {
if (blobSidecar.isKzgCommitmentInclusionProofValidated()) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,7 @@ public BlobSidecarManagerImpl(
invalidBlobSidecarRoots,
(tracker) ->
new ForkChoiceBlobSidecarsAvailabilityChecker(spec, recentChainData, tracker, kzg),
// we don't care to set maxBlobsPerBlock since it isn't used with this immediate validation
// flow
(block) -> new BlockBlobSidecarsTracker(block.getSlotAndBlockRoot(), UInt64.ZERO));
(block) -> new BlockBlobSidecarsTracker(block.getSlotAndBlockRoot()));
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@

public class BlockBlobSidecarsTracker {
private static final Logger LOG = LogManager.getLogger();

private static final UInt64 CREATION_TIMING_IDX = UInt64.MAX_VALUE;
private static final UInt64 BLOCK_ARRIVAL_TIMING_IDX = CREATION_TIMING_IDX.decrement();
private static final UInt64 RPC_FETCH_TIMING_IDX = BLOCK_ARRIVAL_TIMING_IDX.decrement();
private static final UInt64 LOCAL_EL_FETCH_TIMING_IDX = RPC_FETCH_TIMING_IDX.decrement();
private static final UInt64 RPC_BLOCK_FETCH_TIMING_IDX = BLOCK_ARRIVAL_TIMING_IDX.decrement();
private static final UInt64 RPC_BLOBS_FETCH_TIMING_IDX = RPC_BLOCK_FETCH_TIMING_IDX.decrement();
private static final UInt64 LOCAL_EL_BLOBS_FETCH_TIMING_IDX =
RPC_BLOBS_FETCH_TIMING_IDX.decrement();

private final SlotAndBlockRoot slotAndBlockRoot;
private final UInt64 maxBlobsPerBlock;

private final AtomicReference<Optional<SignedBeaconBlock>> block =
new AtomicReference<>(Optional.empty());
Expand All @@ -56,8 +58,9 @@ public class BlockBlobSidecarsTracker {
private final NavigableMap<UInt64, BlobSidecar> blobSidecars = new ConcurrentSkipListMap<>();
private final SafeFuture<Void> blobSidecarsComplete = new SafeFuture<>();

private volatile boolean rpcFetchTriggered = false;
private volatile boolean localElFetchTriggered = false;
private volatile boolean localElBlobsFetchTriggered = false;
private volatile boolean rpcBlockFetchTriggered = false;
private volatile boolean rpcBlobsFetchTriggered = false;

private final Optional<Map<UInt64, Long>> maybeDebugTimings;

Expand All @@ -69,12 +72,9 @@ public class BlockBlobSidecarsTracker {
* tracker instance will be used, so no synchronization is required
*
* @param slotAndBlockRoot slot and block root to create tracker for
* @param maxBlobsPerBlock max number of blobs per block for the slot
*/
public BlockBlobSidecarsTracker(
final SlotAndBlockRoot slotAndBlockRoot, final UInt64 maxBlobsPerBlock) {
public BlockBlobSidecarsTracker(final SlotAndBlockRoot slotAndBlockRoot) {
this.slotAndBlockRoot = slotAndBlockRoot;
this.maxBlobsPerBlock = maxBlobsPerBlock;
if (LOG.isDebugEnabled()) {
// don't need a concurrent hashmap since we'll interact with it from synchronized BlobSidecar
// pool methods
Expand Down Expand Up @@ -111,29 +111,11 @@ public Optional<BlobSidecar> getBlobSidecar(final UInt64 index) {
}

public Stream<BlobIdentifier> getMissingBlobSidecars() {
final Optional<Integer> blockCommitmentsCount = getBlockKzgCommitmentsCount();
if (blockCommitmentsCount.isPresent()) {
return UInt64.range(UInt64.ZERO, UInt64.valueOf(blockCommitmentsCount.get()))
.filter(blobIndex -> !blobSidecars.containsKey(blobIndex))
.map(blobIndex -> new BlobIdentifier(slotAndBlockRoot.getBlockRoot(), blobIndex));
}

if (blobSidecars.isEmpty()) {
return Stream.of();
}

// We may return maxBlobsPerBlock because we don't know the block
return UInt64.range(UInt64.ZERO, maxBlobsPerBlock)
.filter(blobIndex -> !blobSidecars.containsKey(blobIndex))
.map(blobIndex -> new BlobIdentifier(slotAndBlockRoot.getBlockRoot(), blobIndex));
}

public Stream<BlobIdentifier> getUnusedBlobSidecarsForBlock() {
final Optional<Integer> blockCommitmentsCount = getBlockKzgCommitmentsCount();
checkState(blockCommitmentsCount.isPresent(), "Block must me known to call this method");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're at it: must be know


final UInt64 firstUnusedIndex = UInt64.valueOf(blockCommitmentsCount.get());
return UInt64.range(firstUnusedIndex, maxBlobsPerBlock)
return UInt64.range(UInt64.ZERO, UInt64.valueOf(blockCommitmentsCount.get()))
.filter(blobIndex -> !blobSidecars.containsKey(blobIndex))
.map(blobIndex -> new BlobIdentifier(slotAndBlockRoot.getBlockRoot(), blobIndex));
}

Expand Down Expand Up @@ -247,24 +229,35 @@ public boolean isComplete() {
return blobSidecarsComplete.isDone();
}

public boolean isRpcFetchTriggered() {
return rpcFetchTriggered;
public boolean isLocalElBlobsFetchTriggered() {
return localElBlobsFetchTriggered;
}

public void setRpcFetchTriggered() {
this.rpcFetchTriggered = true;
public void setLocalElBlobsFetchTriggered() {
this.localElBlobsFetchTriggered = true;
maybeDebugTimings.ifPresent(
debugTimings -> debugTimings.put(RPC_FETCH_TIMING_IDX, System.currentTimeMillis()));
debugTimings ->
debugTimings.put(LOCAL_EL_BLOBS_FETCH_TIMING_IDX, System.currentTimeMillis()));
}

public boolean isLocalElFetchTriggered() {
return localElFetchTriggered;
public boolean isRpcBlockFetchTriggered() {
return rpcBlockFetchTriggered;
}

public void setLocalElFetchTriggered() {
this.localElFetchTriggered = true;
public void setRpcBlockFetchTriggered() {
this.rpcBlockFetchTriggered = true;
maybeDebugTimings.ifPresent(
debugTimings -> debugTimings.put(LOCAL_EL_FETCH_TIMING_IDX, System.currentTimeMillis()));
debugTimings -> debugTimings.put(RPC_BLOCK_FETCH_TIMING_IDX, System.currentTimeMillis()));
}

public boolean isRpcBlobsFetchTriggered() {
return rpcBlobsFetchTriggered;
}

public void setRpcBlobsFetchTriggered() {
this.rpcBlobsFetchTriggered = true;
maybeDebugTimings.ifPresent(
debugTimings -> debugTimings.put(RPC_BLOBS_FETCH_TIMING_IDX, System.currentTimeMillis()));
}

private boolean areBlobsComplete() {
Expand Down Expand Up @@ -315,22 +308,31 @@ private void printDebugTimings(final Map<UInt64, Long> debugTimings) {
.append(debugTimings.getOrDefault(BLOCK_ARRIVAL_TIMING_IDX, 0L) - creationTime)
.append("ms - ");

if (debugTimings.containsKey(LOCAL_EL_FETCH_TIMING_IDX)) {
if (debugTimings.containsKey(LOCAL_EL_BLOBS_FETCH_TIMING_IDX)) {
timingsReport
.append("Local EL blobs fetch delay ")
.append(debugTimings.get(LOCAL_EL_BLOBS_FETCH_TIMING_IDX) - creationTime)
.append("ms - ");
} else {
timingsReport.append("Local EL blobs fetch wasn't required - ");
}

if (debugTimings.containsKey(RPC_BLOCK_FETCH_TIMING_IDX)) {
timingsReport
.append("Local EL fetch delay ")
.append(debugTimings.get(LOCAL_EL_FETCH_TIMING_IDX) - creationTime)
.append("RPC block fetch delay ")
.append(debugTimings.get(RPC_BLOCK_FETCH_TIMING_IDX) - creationTime)
.append("ms - ");
} else {
timingsReport.append("Local EL fetch wasn't required - ");
timingsReport.append("RPC block fetch wasn't required - ");
}

if (debugTimings.containsKey(RPC_FETCH_TIMING_IDX)) {
if (debugTimings.containsKey(RPC_BLOBS_FETCH_TIMING_IDX)) {
timingsReport
.append("RPC fetch delay ")
.append(debugTimings.get(RPC_FETCH_TIMING_IDX) - creationTime)
.append("RPC blobs fetch delay ")
.append(debugTimings.get(RPC_BLOBS_FETCH_TIMING_IDX) - creationTime)
.append("ms");
} else {
timingsReport.append("RPC fetch wasn't required");
timingsReport.append("RPC blobs fetch wasn't required");
}

LOG.debug(timingsReport.toString());
Expand All @@ -342,8 +344,9 @@ public String toString() {
.add("slotAndBlockRoot", slotAndBlockRoot)
.add("isBlockPresent", block.get().isPresent())
.add("isComplete", isComplete())
.add("rpcFetchTriggered", rpcFetchTriggered)
.add("localElFetchTriggered", localElFetchTriggered)
.add("localElBlobsFetchTriggered", localElBlobsFetchTriggered)
.add("rpcBlockFetchTriggered", rpcBlockFetchTriggered)
.add("rpcBlobsFetchTriggered", rpcBlobsFetchTriggered)
.add("blockImportOnCompletionEnabled", blockImportOnCompletionEnabled.get())
.add(
"blobSidecars",
Expand Down
Loading
Loading