Skip to content

Commit

Permalink
Fix build
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanBratanov committed Dec 17, 2024
1 parent 2d4a637 commit 4d0a6c8
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public Optional<BlobSidecar> getBlobSidecar(final UInt64 index) {
return Optional.ofNullable(blobSidecars.get(index));
}

public Stream<BlobIdentifier> getMissingBlobSidecarsForBlock() {
public Stream<BlobIdentifier> getMissingBlobSidecars() {
final Optional<Integer> blockCommitmentsCount = getBlockKzgCommitmentsCount();
checkState(blockCommitmentsCount.isPresent(), "Block must me known to call this method");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ public synchronized void onCompletedBlockAndBlobSidecars(
LOG.error(
"Tracker for block {} is supposed to be completed but it is not. Missing blob sidecars: {}",
block.toLogString(),
blobSidecarsTracker.getMissingBlobSidecarsForBlock().count());
blobSidecarsTracker.getMissingBlobSidecars().count());
}

if (orderedBlobSidecarsTrackers.add(slotAndBlockRoot)) {
Expand Down Expand Up @@ -435,7 +435,7 @@ public synchronized Set<BlobIdentifier> getAllRequiredBlobSidecars() {
if (tracker.getBlock().isEmpty()) {
return Stream.empty();
}
return tracker.getMissingBlobSidecarsForBlock();
return tracker.getMissingBlobSidecars();
})
.collect(Collectors.toSet());
}
Expand Down Expand Up @@ -597,7 +597,7 @@ private void onFirstSeen(
asyncRunner.runAfterDelay(() -> fetchMissingContent(slotAndBlockRoot), blockFetchDelay);
}
// no delay for attempting to fetch blobs for when the block is first seen
case BLOCK -> fetchMissingContent(slotAndBlockRoot);
case BLOCK -> asyncRunner.runAsync(() -> fetchMissingContent(slotAndBlockRoot));
}
}

Expand Down Expand Up @@ -632,6 +632,7 @@ Duration calculateBlockFetchDelay(final SlotAndBlockRoot slotAndBlockRoot) {
return Duration.ofMillis(finalTime.minus(nowMillis).intValue());
}

/** Fetch missing block (when block is unknown) or fetch missing blob sidecars via EL and RPC */
private void fetchMissingContent(final SlotAndBlockRoot slotAndBlockRoot) {
fetchMissingBlobsFromLocalEL(slotAndBlockRoot)
.handleException(
Expand All @@ -654,7 +655,7 @@ private synchronized SafeFuture<Void> fetchMissingBlobsFromLocalEL(
}

final List<BlobIdentifier> missingBlobsIdentifiers =
blockBlobSidecarsTracker.getMissingBlobSidecarsForBlock().toList();
blockBlobSidecarsTracker.getMissingBlobSidecars().toList();

final SpecVersion specVersion = spec.atSlot(slotAndBlockRoot.getSlot());
final MiscHelpersDeneb miscHelpersDeneb =
Expand Down Expand Up @@ -727,7 +728,7 @@ private synchronized void fetchMissingContentFromRemotePeers(
}

if (blockBlobSidecarsTracker.getBlock().isEmpty()) {
// fetch missing block

blockBlobSidecarsTracker.setRpcBlockFetchTriggered();

poolStatsCounters.labels(COUNTER_BLOCK_TYPE, COUNTER_RPC_FETCH_SUBTYPE).inc();
Expand All @@ -737,11 +738,10 @@ private synchronized void fetchMissingContentFromRemotePeers(
return;
}

// fetch missing blob sidecars
blockBlobSidecarsTracker.setRpcBlobsFetchTriggered();

blockBlobSidecarsTracker
.getMissingBlobSidecarsForBlock()
.getMissingBlobSidecars()
.forEach(
blobIdentifier -> {
poolStatsCounters.labels(COUNTER_SIDECAR_TYPE, COUNTER_RPC_FETCH_SUBTYPE).inc();
Expand All @@ -761,7 +761,7 @@ private void dropMissingContent(final BlockBlobSidecarsTracker blockBlobSidecars

if (blockBlobSidecarsTracker.isRpcBlobsFetchTriggered()) {
blockBlobSidecarsTracker
.getMissingBlobSidecarsForBlock()
.getMissingBlobSidecars()
.forEach(
blobIdentifier ->
requiredBlobSidecarDroppedSubscribers.deliver(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -69,7 +68,6 @@ void isNotCompletedJustAfterCreation() {

SafeFutureAssert.assertThatSafeFuture(blockBlobSidecarsTracker.getCompletionFuture())
.isNotCompleted();
assertThat(blockBlobSidecarsTracker.getMissingBlobSidecarsForBlock()).isEmpty();
assertThat(blockBlobSidecarsTracker.getBlock()).isEmpty();
assertThat(blockBlobSidecarsTracker.getBlobSidecars()).isEmpty();
assertThat(blockBlobSidecarsTracker.getSlotAndBlockRoot()).isEqualTo(slotAndBlockRoot);
Expand All @@ -86,7 +84,7 @@ void setBlock_shouldAcceptCorrectBlock() {

SafeFutureAssert.assertThatSafeFuture(blockBlobSidecarsTracker.getCompletionFuture())
.isNotCompleted();
assertThat(blockBlobSidecarsTracker.getMissingBlobSidecarsForBlock())
assertThat(blockBlobSidecarsTracker.getMissingBlobSidecars())
.containsExactlyInAnyOrderElementsOf(blobIdentifiersForBlock);
assertThat(blockBlobSidecarsTracker.getBlock()).isEqualTo(Optional.of(block));
assertThat(blockBlobSidecarsTracker.getBlobSidecars()).isEmpty();
Expand Down Expand Up @@ -125,7 +123,7 @@ void setBlock_immediatelyCompletesWithBlockWithoutBlobs() {
SafeFutureAssert.assertThatSafeFuture(completionFuture).isCompleted();
assertThat(blockBlobSidecarsTracker.isComplete()).isTrue();

assertThat(blockBlobSidecarsTracker.getMissingBlobSidecarsForBlock()).isEmpty();
assertThat(blockBlobSidecarsTracker.getMissingBlobSidecars()).isEmpty();
assertThat(blockBlobSidecarsTracker.getBlobSidecars()).isEmpty();
}

Expand Down Expand Up @@ -169,22 +167,17 @@ void getCompletionFuture_returnsIndependentFutures() {
void add_shouldWorkTillCompletionWhenAddingBlobsBeforeBlockIsSet() {
final BlockBlobSidecarsTracker blockBlobSidecarsTracker =
new BlockBlobSidecarsTracker(slotAndBlockRoot);
final BlobSidecar toAdd = blobSidecarsForBlock.get(0);
final BlobSidecar toAdd = blobSidecarsForBlock.getFirst();
final Map<UInt64, BlobSidecar> added = new HashMap<>();
final SafeFuture<Void> completionFuture = blockBlobSidecarsTracker.getCompletionFuture();

added.put(toAdd.getIndex(), toAdd);
blockBlobSidecarsTracker.add(toAdd);

// we don't know the block, missing blobs are max blobs minus the blob we already have
final Set<BlobIdentifier> potentialMissingBlobs =
UInt64.range(UInt64.valueOf(1), maxBlobsPerBlock.plus(1))
.map(index -> new BlobIdentifier(slotAndBlockRoot.getBlockRoot(), index))
.collect(Collectors.toSet());

SafeFutureAssert.assertThatSafeFuture(completionFuture).isNotCompleted();
assertThat(blockBlobSidecarsTracker.getMissingBlobSidecarsForBlock())
.containsExactlyInAnyOrderElementsOf(potentialMissingBlobs);
assertThatThrownBy(blockBlobSidecarsTracker::getMissingBlobSidecars)
.isInstanceOf(IllegalStateException.class)
.hasMessage("Block must me known to call this method");
assertThat(blockBlobSidecarsTracker.getBlobSidecars())
.containsExactlyInAnyOrderEntriesOf(added);

Expand All @@ -194,7 +187,7 @@ void add_shouldWorkTillCompletionWhenAddingBlobsBeforeBlockIsSet() {
// now we know the block and we know about missing blobs
final List<BlobIdentifier> stillMissing =
blobIdentifiersForBlock.subList(1, blobIdentifiersForBlock.size());
assertThat(blockBlobSidecarsTracker.getMissingBlobSidecarsForBlock())
assertThat(blockBlobSidecarsTracker.getMissingBlobSidecars())
.containsExactlyInAnyOrderElementsOf(stillMissing);
SafeFutureAssert.assertThatSafeFuture(completionFuture).isNotCompleted();

Expand Down Expand Up @@ -238,7 +231,7 @@ void add_shouldWorkWhenBlockIsSetFirst() {
added.put(toAdd.getIndex(), toAdd);
blockBlobSidecarsTracker.add(toAdd);

assertThat(blockBlobSidecarsTracker.getMissingBlobSidecarsForBlock())
assertThat(blockBlobSidecarsTracker.getMissingBlobSidecars())
.containsExactlyInAnyOrderElementsOf(stillMissing);
SafeFutureAssert.assertThatSafeFuture(completionFuture).isNotCompleted();
assertThat(blockBlobSidecarsTracker.getBlobSidecars())
Expand All @@ -264,40 +257,16 @@ void add_shouldAcceptAcceptSameBlobSidecarTwice() {
}

@Test
void getMissingBlobSidecars_ForBlock_shouldReturnPartialBlobsIdentifierWhenBlockIsUnknown() {
void getMissingBlobSidecars_shouldThrowWhenBlockIsUnknown() {
final BlockBlobSidecarsTracker blockBlobSidecarsTracker =
new BlockBlobSidecarsTracker(slotAndBlockRoot);
final BlobSidecar toAdd = blobSidecarsForBlock.get(2);

blockBlobSidecarsTracker.add(toAdd);

final List<BlobIdentifier> knownMissing =
blobIdentifiersForBlock.stream()
.filter(blobIdentifier -> !blobIdentifier.getIndex().equals(UInt64.valueOf(2)))
.collect(Collectors.toList());

assertThat(blockBlobSidecarsTracker.getMissingBlobSidecarsForBlock())
.containsExactlyInAnyOrderElementsOf(knownMissing);
}

@Test
void getMissingBlobSidecars_ForBlock_shouldRespectMaxBlobsPerBlock() {
final BlockBlobSidecarsTracker blockBlobSidecarsTracker =
new BlockBlobSidecarsTracker(slotAndBlockRoot);
final BlobSidecar toAdd =
dataStructureUtil
.createRandomBlobSidecarBuilder()
.signedBeaconBlockHeader(block.asHeader())
.index(UInt64.valueOf(100))
.build();

blockBlobSidecarsTracker.add(toAdd);

final List<BlobIdentifier> knownMissing =
blobIdentifiersForBlock.subList(0, maxBlobsPerBlock.intValue());

assertThat(blockBlobSidecarsTracker.getMissingBlobSidecarsForBlock())
.containsExactlyInAnyOrderElementsOf(knownMissing);
assertThatThrownBy(blockBlobSidecarsTracker::getMissingBlobSidecars)
.isInstanceOf(IllegalStateException.class)
.hasMessage("Block must me known to call this method");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -123,6 +124,8 @@ public void setup() {
blockBlobSidecarsTrackersPool.subscribeRequiredBlobSidecarDropped(
requiredBlobSidecarDroppedEvents::add);
blockBlobSidecarsTrackersPool.subscribeNewBlobSidecar(newBlobSidecarEvents::add);
when(executionLayer.engineGetBlobs(any(), eq(currentSlot)))
.thenReturn(SafeFuture.completedFuture(List.of()));
when(blobSidecarPublisher.apply(any())).thenReturn(SafeFuture.COMPLETE);
setSlot(currentSlot);
}
Expand Down Expand Up @@ -670,7 +673,7 @@ void shouldFetchMissingBlobSidecarsFromLocalELFirst() {
Optional.of(
(slotAndRoot) -> {
when(tracker.add(any())).thenReturn(true);
when(tracker.getMissingBlobSidecarsForBlock())
when(tracker.getMissingBlobSidecars())
.thenAnswer(__ -> missingBlobIdentifiers.stream());
when(tracker.getBlock()).thenReturn(Optional.of(block));
return tracker;
Expand Down Expand Up @@ -734,8 +737,7 @@ void shouldFetchMissingBlobSidecarsViaRPCAfterLocalEL() {
Optional.of(
(slotAndRoot) -> {
BlockBlobSidecarsTracker tracker = mock(BlockBlobSidecarsTracker.class);
when(tracker.getMissingBlobSidecarsForBlock())
.thenAnswer(__ -> missingBlobs.stream());
when(tracker.getMissingBlobSidecars()).thenAnswer(__ -> missingBlobs.stream());
when(tracker.getBlock()).thenReturn(Optional.of(block));
return tracker;
});
Expand Down Expand Up @@ -771,8 +773,7 @@ void shouldFetchMissingBlobSidecarsViaRPCWhenELLookupFails() {
Optional.of(
(slotAndRoot) -> {
BlockBlobSidecarsTracker tracker = mock(BlockBlobSidecarsTracker.class);
when(tracker.getMissingBlobSidecarsForBlock())
.thenAnswer(__ -> missingBlobs.stream());
when(tracker.getMissingBlobSidecars()).thenAnswer(__ -> missingBlobs.stream());
when(tracker.getBlock()).thenReturn(Optional.of(block));
return tracker;
});
Expand All @@ -796,7 +797,7 @@ void shouldFetchMissingBlobSidecarsViaRPCWhenELLookupFails() {
}

@Test
void shouldFetchMissingBlockAndBlobSidecars() {
void shouldFetchMissingBlock() {
final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlock(currentSlot);
final BlobSidecar blobSidecar =
dataStructureUtil
Expand All @@ -805,14 +806,8 @@ void shouldFetchMissingBlockAndBlobSidecars() {
.index(UInt64.valueOf(2))
.build();

final Set<BlobIdentifier> missingBlobs =
Set.of(
new BlobIdentifier(block.getRoot(), UInt64.ONE),
new BlobIdentifier(block.getRoot(), UInt64.ZERO));

final BlockBlobSidecarsTracker mockedTracker = mock(BlockBlobSidecarsTracker.class);
when(mockedTracker.getBlock()).thenReturn(Optional.empty());
when(mockedTracker.getMissingBlobSidecarsForBlock()).thenReturn(missingBlobs.stream());
when(mockedTracker.getSlotAndBlockRoot()).thenReturn(block.getSlotAndBlockRoot());

mockedTrackersFactory = Optional.of((__) -> mockedTracker);
Expand All @@ -827,78 +822,10 @@ void shouldFetchMissingBlockAndBlobSidecars() {
verify(mockedTracker, never()).setLocalElBlobsFetchTriggered();

assertThat(requiredBlockRootEvents).containsExactly(block.getRoot());
assertThat(requiredBlobSidecarEvents).containsExactlyElementsOf(missingBlobs);

assertStats("block", "rpc_fetch", 1);
assertStats("blob_sidecar", "rpc_fetch", missingBlobs.size());

assertThat(requiredBlockRootDroppedEvents).isEmpty();
assertThat(requiredBlobSidecarDroppedEvents).isEmpty();
}

@Test
void shouldDropBlobSidecarsThatHasBeenFetchedButNotPresentInBlock() {
final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlock(currentSlot);

final SlotAndBlockRoot slotAndBlockRoot = new SlotAndBlockRoot(currentSlot, block.getRoot());
final BlobSidecar blobSidecar =
dataStructureUtil
.createRandomBlobSidecarBuilder()
.signedBeaconBlockHeader(block.asHeader())
.index(UInt64.valueOf(2))
.build();

final Set<BlobIdentifier> blobsNotPresentInBlock =
Set.of(
new BlobIdentifier(slotAndBlockRoot.getBlockRoot(), UInt64.valueOf(2)),
new BlobIdentifier(slotAndBlockRoot.getBlockRoot(), UInt64.valueOf(3)));

mockedTrackersFactory =
Optional.of(
(slotAndRoot) -> {
BlockBlobSidecarsTracker tracker = mock(BlockBlobSidecarsTracker.class);
when(tracker.getBlock()).thenReturn(Optional.empty());
when(tracker.getSlotAndBlockRoot()).thenReturn(slotAndBlockRoot);
when(tracker.setBlock(block)).thenReturn(true);
when(tracker.isRpcBlockFetchTriggered()).thenReturn(true);
return tracker;
});

blockBlobSidecarsTrackersPool.onNewBlobSidecar(blobSidecar, RemoteOrigin.GOSSIP);

blockBlobSidecarsTrackersPool.onNewBlock(block, Optional.empty());

assertThat(requiredBlobSidecarDroppedEvents).containsExactlyElementsOf(blobsNotPresentInBlock);
}

@Test
void shouldNotDropUnusedBlobSidecarsIfFetchingHasNotOccurred() {
final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlock(currentSlot);

final SlotAndBlockRoot slotAndBlockRoot = new SlotAndBlockRoot(currentSlot, block.getRoot());
final BlobSidecar blobSidecar =
dataStructureUtil
.createRandomBlobSidecarBuilder()
.signedBeaconBlockHeader(block.asHeader())
.index(UInt64.valueOf(2))
.build();

mockedTrackersFactory =
Optional.of(
(slotAndRoot) -> {
BlockBlobSidecarsTracker tracker = mock(BlockBlobSidecarsTracker.class);
when(tracker.getBlock()).thenReturn(Optional.empty());
when(tracker.getSlotAndBlockRoot()).thenReturn(slotAndBlockRoot);
when(tracker.setBlock(block)).thenReturn(true);
when(tracker.isRpcBlockFetchTriggered()).thenReturn(false);
return tracker;
});

blockBlobSidecarsTrackersPool.onNewBlobSidecar(blobSidecar, RemoteOrigin.GOSSIP);

blockBlobSidecarsTrackersPool.onNewBlock(block, Optional.empty());

assertThat(requiredBlobSidecarDroppedEvents).isEmpty();
}

@Test
Expand Down Expand Up @@ -938,10 +865,10 @@ void shouldDropPossiblyFetchedBlobSidecars() {
Optional.of(
(slotAndRoot) -> {
BlockBlobSidecarsTracker tracker = mock(BlockBlobSidecarsTracker.class);
when(tracker.getMissingBlobSidecarsForBlock()).thenReturn(missingBlobs.stream());
when(tracker.getMissingBlobSidecars()).thenAnswer(__ -> missingBlobs.stream());
when(tracker.getBlock()).thenReturn(Optional.of(block));
when(tracker.getSlotAndBlockRoot()).thenReturn(block.getSlotAndBlockRoot());
when(tracker.isRpcBlockFetchTriggered()).thenReturn(true);
when(tracker.isRpcBlobsFetchTriggered()).thenReturn(true);
return tracker;
});

Expand Down Expand Up @@ -980,8 +907,7 @@ void shouldTryToFetchFromLocalELWhenBlockArrivesAfterRPCFetch() {
mockedTrackersFactory =
Optional.of(
(slotAndRoot) -> {
when(tracker.getMissingBlobSidecarsForBlock())
.thenAnswer(__ -> missingBlobs.stream());
when(tracker.getMissingBlobSidecars()).thenAnswer(__ -> missingBlobs.stream());
when(tracker.getBlock()).thenReturn(Optional.empty());
when(tracker.setBlock(any())).thenReturn(true);
when(tracker.getSlotAndBlockRoot()).thenReturn(block.getSlotAndBlockRoot());
Expand Down Expand Up @@ -1187,7 +1113,7 @@ void getAllRequiredBlobSidecars_shouldReturnAllRequiredBlobSidecars() {
Optional.of(
(slotAndRoot) -> {
BlockBlobSidecarsTracker tracker = mock(BlockBlobSidecarsTracker.class);
when(tracker.getMissingBlobSidecarsForBlock()).thenReturn(missingBlobs1.stream());
when(tracker.getMissingBlobSidecars()).thenReturn(missingBlobs1.stream());
when(tracker.getBlock()).thenReturn(Optional.of(block1));
return tracker;
});
Expand All @@ -1205,7 +1131,7 @@ void getAllRequiredBlobSidecars_shouldReturnAllRequiredBlobSidecars() {
Optional.of(
(slotAndRoot) -> {
BlockBlobSidecarsTracker tracker = mock(BlockBlobSidecarsTracker.class);
when(tracker.getMissingBlobSidecarsForBlock()).thenReturn(missingBlobs2.stream());
when(tracker.getMissingBlobSidecars()).thenReturn(missingBlobs2.stream());
when(tracker.getBlock()).thenReturn(Optional.of(block2));
return tracker;
});
Expand Down

0 comments on commit 4d0a6c8

Please sign in to comment.