Skip to content

Commit

Permalink
Remove attempt to recover blobs when we don't have block
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanBratanov committed Dec 16, 2024
1 parent 6b07218 commit e03e619
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 252 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,12 @@
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecarSchema;
import tech.pegasys.teku.spec.datastructures.blocks.BeaconBlock;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlockHeader;
import tech.pegasys.teku.spec.datastructures.blocks.blockbody.BeaconBlockBody;
import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.deneb.BeaconBlockBodyDeneb;
import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.deneb.BeaconBlockBodySchemaDeneb;
import tech.pegasys.teku.spec.datastructures.execution.BlobAndProof;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobIdentifier;
import tech.pegasys.teku.spec.datastructures.type.SszKZGCommitment;
import tech.pegasys.teku.spec.datastructures.type.SszKZGProof;
import tech.pegasys.teku.spec.logic.common.helpers.MiscHelpers;
Expand Down Expand Up @@ -262,6 +266,24 @@ public BlobSidecar constructBlobSidecar(
index, blob, commitment, proof, signedBeaconBlock.asHeader(), kzgCommitmentInclusionProof);
}

public BlobSidecar constructBlobSidecarFromBlobAndProof(
final BlobIdentifier blobIdentifier,
final BlobAndProof blobAndProof,
final BeaconBlockBodyDeneb beaconBlockBodyDeneb,
final SignedBeaconBlockHeader signedBeaconBlockHeader) {

final SszKZGCommitment sszKZGCommitment =
beaconBlockBodyDeneb.getBlobKzgCommitments().get(blobIdentifier.getIndex().intValue());

return blobSidecarSchema.create(
blobIdentifier.getIndex(),
blobAndProof.blob(),
sszKZGCommitment,
new SszKZGProof(blobAndProof.proof()),
signedBeaconBlockHeader,
computeKzgCommitmentInclusionProof(blobIdentifier.getIndex(), beaconBlockBodyDeneb));
}

public boolean verifyBlobSidecarMerkleProof(final BlobSidecar blobSidecar) {
return predicates.isValidMerkleBranch(
blobSidecar.getSszKZGCommitment().hashTreeRoot(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,8 @@ public BlobSidecarsAndValidationResult createAvailabilityCheckerAndValidateImmed
return BlobSidecarsAndValidationResult.NOT_REQUIRED;
}

// we don't care to set maxBlobsPerBlock since it isn't used with this immediate validation flow
final BlockBlobSidecarsTracker blockBlobSidecarsTracker =
new BlockBlobSidecarsTracker(block.getSlotAndBlockRoot(), UInt64.ZERO);
new BlockBlobSidecarsTracker(block.getSlotAndBlockRoot());

blockBlobSidecarsTracker.setBlock(block);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@

public class BlockBlobSidecarsTracker {
private static final Logger LOG = LogManager.getLogger();

private static final UInt64 CREATION_TIMING_IDX = UInt64.MAX_VALUE;
private static final UInt64 BLOCK_ARRIVAL_TIMING_IDX = CREATION_TIMING_IDX.decrement();
private static final UInt64 RPC_FETCH_TIMING_IDX = BLOCK_ARRIVAL_TIMING_IDX.decrement();
private static final UInt64 LOCAL_EL_FETCH_TIMING_IDX = RPC_FETCH_TIMING_IDX.decrement();
private static final UInt64 RPC_BLOCK_FETCH_TIMING_IDX = BLOCK_ARRIVAL_TIMING_IDX.decrement();
private static final UInt64 RPC_BLOBS_FETCH_TIMING_IDX = RPC_BLOCK_FETCH_TIMING_IDX.decrement();
private static final UInt64 LOCAL_EL_BLOBS_FETCH_TIMING_IDX =
RPC_BLOBS_FETCH_TIMING_IDX.decrement();

private final SlotAndBlockRoot slotAndBlockRoot;
private final UInt64 maxBlobsPerBlock;

private final AtomicReference<Optional<SignedBeaconBlock>> block =
new AtomicReference<>(Optional.empty());
Expand All @@ -56,8 +58,9 @@ public class BlockBlobSidecarsTracker {
private final NavigableMap<UInt64, BlobSidecar> blobSidecars = new ConcurrentSkipListMap<>();
private final SafeFuture<Void> blobSidecarsComplete = new SafeFuture<>();

private volatile boolean rpcFetchTriggered = false;
private volatile boolean localElFetchTriggered = false;
private volatile boolean localElBlobsFetchTriggered = false;
private volatile boolean rpcBlockFetchTriggered = false;
private volatile boolean rpcBlobsFetchTriggered = false;

private final Optional<Map<UInt64, Long>> maybeDebugTimings;

Expand All @@ -69,12 +72,9 @@ public class BlockBlobSidecarsTracker {
* tracker instance will be used, so no synchronization is required
*
* @param slotAndBlockRoot slot and block root to create tracker for
* @param maxBlobsPerBlock max number of blobs per block for the slot
*/
public BlockBlobSidecarsTracker(
final SlotAndBlockRoot slotAndBlockRoot, final UInt64 maxBlobsPerBlock) {
public BlockBlobSidecarsTracker(final SlotAndBlockRoot slotAndBlockRoot) {
this.slotAndBlockRoot = slotAndBlockRoot;
this.maxBlobsPerBlock = maxBlobsPerBlock;
if (LOG.isDebugEnabled()) {
// don't need a concurrent hashmap since we'll interact with it from synchronized BlobSidecar
// pool methods
Expand Down Expand Up @@ -110,30 +110,12 @@ public Optional<BlobSidecar> getBlobSidecar(final UInt64 index) {
return Optional.ofNullable(blobSidecars.get(index));
}

public Stream<BlobIdentifier> getMissingBlobSidecars() {
final Optional<Integer> blockCommitmentsCount = getBlockKzgCommitmentsCount();
if (blockCommitmentsCount.isPresent()) {
return UInt64.range(UInt64.ZERO, UInt64.valueOf(blockCommitmentsCount.get()))
.filter(blobIndex -> !blobSidecars.containsKey(blobIndex))
.map(blobIndex -> new BlobIdentifier(slotAndBlockRoot.getBlockRoot(), blobIndex));
}

if (blobSidecars.isEmpty()) {
return Stream.of();
}

// We may return maxBlobsPerBlock because we don't know the block
return UInt64.range(UInt64.ZERO, maxBlobsPerBlock)
.filter(blobIndex -> !blobSidecars.containsKey(blobIndex))
.map(blobIndex -> new BlobIdentifier(slotAndBlockRoot.getBlockRoot(), blobIndex));
}

public Stream<BlobIdentifier> getUnusedBlobSidecarsForBlock() {
public Stream<BlobIdentifier> getMissingBlobSidecarsForBlock() {
final Optional<Integer> blockCommitmentsCount = getBlockKzgCommitmentsCount();
checkState(blockCommitmentsCount.isPresent(), "Block must me known to call this method");

final UInt64 firstUnusedIndex = UInt64.valueOf(blockCommitmentsCount.get());
return UInt64.range(firstUnusedIndex, maxBlobsPerBlock)
return UInt64.range(UInt64.ZERO, UInt64.valueOf(blockCommitmentsCount.get()))
.filter(blobIndex -> !blobSidecars.containsKey(blobIndex))
.map(blobIndex -> new BlobIdentifier(slotAndBlockRoot.getBlockRoot(), blobIndex));
}

Expand Down Expand Up @@ -247,24 +229,35 @@ public boolean isCompleted() {
return blobSidecarsComplete.isDone();
}

public boolean isRpcFetchTriggered() {
return rpcFetchTriggered;
public boolean isLocalElBlobsFetchTriggered() {
return localElBlobsFetchTriggered;
}

public void setRpcFetchTriggered() {
this.rpcFetchTriggered = true;
public void setLocalElBlobsFetchTriggered() {
this.localElBlobsFetchTriggered = true;
maybeDebugTimings.ifPresent(
debugTimings -> debugTimings.put(RPC_FETCH_TIMING_IDX, System.currentTimeMillis()));
debugTimings ->
debugTimings.put(LOCAL_EL_BLOBS_FETCH_TIMING_IDX, System.currentTimeMillis()));
}

public boolean isLocalElFetchTriggered() {
return localElFetchTriggered;
public boolean isRpcBlockFetchTriggered() {
return rpcBlockFetchTriggered;
}

public void setLocalElFetchTriggered() {
this.localElFetchTriggered = true;
public void setRpcBlockFetchTriggered() {
this.rpcBlockFetchTriggered = true;
maybeDebugTimings.ifPresent(
debugTimings -> debugTimings.put(LOCAL_EL_FETCH_TIMING_IDX, System.currentTimeMillis()));
debugTimings -> debugTimings.put(RPC_BLOCK_FETCH_TIMING_IDX, System.currentTimeMillis()));
}

public boolean isRpcBlobsFetchTriggered() {
return rpcBlobsFetchTriggered;
}

public void setRpcBlobsFetchTriggered() {
this.rpcBlobsFetchTriggered = true;
maybeDebugTimings.ifPresent(
debugTimings -> debugTimings.put(RPC_BLOBS_FETCH_TIMING_IDX, System.currentTimeMillis()));
}

private boolean areBlobsComplete() {
Expand Down Expand Up @@ -315,22 +308,31 @@ private void printDebugTimings(final Map<UInt64, Long> debugTimings) {
.append(debugTimings.getOrDefault(BLOCK_ARRIVAL_TIMING_IDX, 0L) - creationTime)
.append("ms - ");

if (debugTimings.containsKey(LOCAL_EL_FETCH_TIMING_IDX)) {
if (debugTimings.containsKey(LOCAL_EL_BLOBS_FETCH_TIMING_IDX)) {
timingsReport
.append("Local EL fetch delay ")
.append(debugTimings.get(LOCAL_EL_FETCH_TIMING_IDX) - creationTime)
.append("Local EL blobs fetch delay ")
.append(debugTimings.get(LOCAL_EL_BLOBS_FETCH_TIMING_IDX) - creationTime)
.append("ms - ");
} else {
timingsReport.append("Local EL fetch wasn't required - ");
}

if (debugTimings.containsKey(RPC_FETCH_TIMING_IDX)) {
if (debugTimings.containsKey(RPC_BLOCK_FETCH_TIMING_IDX)) {
timingsReport
.append("RPC block fetch delay ")
.append(debugTimings.get(RPC_BLOCK_FETCH_TIMING_IDX) - creationTime)
.append("ms");
} else {
timingsReport.append("RPC block fetch wasn't required");
}

if (debugTimings.containsKey(RPC_BLOBS_FETCH_TIMING_IDX)) {
timingsReport
.append("RPC fetch delay ")
.append(debugTimings.get(RPC_FETCH_TIMING_IDX) - creationTime)
.append("RPC blobs fetch delay ")
.append(debugTimings.get(RPC_BLOBS_FETCH_TIMING_IDX) - creationTime)
.append("ms");
} else {
timingsReport.append("RPC fetch wasn't required");
timingsReport.append("RPC blobs fetch wasn't required");
}

LOG.debug(timingsReport.toString());
Expand All @@ -342,8 +344,9 @@ public String toString() {
.add("slotAndBlockRoot", slotAndBlockRoot)
.add("isBlockPresent", block.get().isPresent())
.add("isCompleted", isCompleted())
.add("rpcFetchTriggered", rpcFetchTriggered)
.add("localElFetchTriggered", localElFetchTriggered)
.add("localElBlobsFetchTriggered", localElBlobsFetchTriggered)
.add("rpcBlockFetchTriggered", rpcBlockFetchTriggered)
.add("rpcBlobsFetchTriggered", rpcBlobsFetchTriggered)
.add("blockImportOnCompletionEnabled", blockImportOnCompletionEnabled.get())
.add(
"blobSidecars",
Expand Down
Loading

0 comments on commit e03e619

Please sign in to comment.