Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch call for retrieving blobs from RPC #8952

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import static tech.pegasys.teku.infrastructure.unsigned.UInt64.ZERO;

import java.util.List;
import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.beacon.sync.events.SyncState;
import tech.pegasys.teku.beacon.sync.events.SyncingStatus;
Expand Down Expand Up @@ -106,12 +107,13 @@ public void subscribeBlobSidecarFetched(final BlobSidecarSubscriber subscriber)
}

@Override
public void requestRecentBlobSidecar(final BlobIdentifier blobIdentifier) {
public void requestRecentBlobSidecars(
final Bytes32 blockRoot, final List<BlobIdentifier> blobIdentifiers) {
// No-op
}

@Override
public void cancelRecentBlobSidecarRequest(final BlobIdentifier blobIdentifier) {
public void cancelRecentBlobSidecarsRequest(final Bytes32 blockRoot) {
// No-op
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package tech.pegasys.teku.beacon.sync.fetch;

import java.util.List;
import java.util.Optional;
import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.networking.eth2.peers.Eth2Peer;
Expand All @@ -34,8 +35,10 @@ public FetchBlockTask createFetchBlockTask(
}

@Override
public FetchBlobSidecarTask createFetchBlobSidecarTask(
final BlobIdentifier blobIdentifier, final Optional<Eth2Peer> preferredPeer) {
return new FetchBlobSidecarTask(eth2Network, preferredPeer, blobIdentifier);
public FetchBlobSidecarsTask createFetchBlobSidecarsTask(
final Bytes32 blockRoot,
final List<BlobIdentifier> blobIdentifiers,
final Optional<Eth2Peer> preferredPeer) {
return new FetchBlobSidecarsTask(eth2Network, preferredPeer, blockRoot, blobIdentifiers);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright Consensys Software Inc., 2024
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.beacon.sync.fetch;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.beacon.sync.fetch.FetchResult.Status;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.networking.eth2.peers.Eth2Peer;
import tech.pegasys.teku.networking.p2p.network.P2PNetwork;
import tech.pegasys.teku.networking.p2p.rpc.RpcResponseHandler;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobIdentifier;

public class FetchBlobSidecarsTask extends AbstractFetchTask<Bytes32, List<BlobSidecar>> {

private static final Logger LOG = LogManager.getLogger();

private final Bytes32 blockRoot;
private final List<BlobIdentifier> blobIdentifiers;

FetchBlobSidecarsTask(
final P2PNetwork<Eth2Peer> eth2Network,
final Bytes32 blockRoot,
final List<BlobIdentifier> blobIdentifiers) {
super(eth2Network, Optional.empty());
this.blockRoot = blockRoot;
this.blobIdentifiers = blobIdentifiers;
}

public FetchBlobSidecarsTask(
final P2PNetwork<Eth2Peer> eth2Network,
final Optional<Eth2Peer> preferredPeer,
final Bytes32 blockRoot,
final List<BlobIdentifier> blobIdentifiers) {
super(eth2Network, preferredPeer);
this.blockRoot = blockRoot;
this.blobIdentifiers = blobIdentifiers;
}

@Override
public Bytes32 getKey() {
return blockRoot;
}

@Override
SafeFuture<FetchResult<List<BlobSidecar>>> fetch(final Eth2Peer peer) {
final SafeFuture<FetchResult<List<BlobSidecar>>> fetchResult = new SafeFuture<>();
final List<BlobSidecar> blobSidecars = new ArrayList<>();
peer.requestBlobSidecarsByRoot(
blobIdentifiers,
new RpcResponseHandler<>() {
@Override
public void onCompleted(final Optional<? extends Throwable> error) {
error.ifPresentOrElse(
err -> {
logFetchError(peer, err);
fetchResult.complete(FetchResult.createFailed(peer, Status.FETCH_FAILED));
},
() -> fetchResult.complete(FetchResult.createSuccessful(peer, blobSidecars)));
}

@Override
public SafeFuture<?> onResponse(final BlobSidecar response) {
blobSidecars.add(response);
return SafeFuture.COMPLETE;
}
})
.finish(
err -> {
logFetchError(peer, err);
fetchResult.complete(FetchResult.createFailed(peer, Status.FETCH_FAILED));
});
return fetchResult;
}

private void logFetchError(final Eth2Peer peer, final Throwable err) {
LOG.error(
String.format(
"Failed to fetch %d blob sidecars for block root %s from peer %s",
blobIdentifiers.size(), blockRoot, peer.getId()),
err);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package tech.pegasys.teku.beacon.sync.fetch;

import java.util.List;
import java.util.Optional;
import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.networking.eth2.peers.Eth2Peer;
Expand All @@ -26,10 +27,11 @@ default FetchBlockTask createFetchBlockTask(final Bytes32 blockRoot) {

FetchBlockTask createFetchBlockTask(Bytes32 blockRoot, Optional<Eth2Peer> preferredPeer);

default FetchBlobSidecarTask createFetchBlobSidecarTask(final BlobIdentifier blobIdentifier) {
return createFetchBlobSidecarTask(blobIdentifier, Optional.empty());
default FetchBlobSidecarsTask createFetchBlobSidecarsTask(
final Bytes32 blockRoot, final List<BlobIdentifier> blobIdentifiers) {
return createFetchBlobSidecarsTask(blockRoot, blobIdentifiers, Optional.empty());
}

FetchBlobSidecarTask createFetchBlobSidecarTask(
BlobIdentifier blobIdentifier, Optional<Eth2Peer> preferredPeer);
FetchBlobSidecarsTask createFetchBlobSidecarsTask(
Bytes32 blockRoot, List<BlobIdentifier> blobIdentifiers, Optional<Eth2Peer> preferredPeer);
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,5 @@ private String getTaskName(final T task) {
return task.getClass().getSimpleName();
}

public abstract T createTask(K key);

public abstract void processFetchedResult(T task, R result);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,30 @@

package tech.pegasys.teku.beacon.sync.gossip.blobs;

import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import tech.pegasys.teku.beacon.sync.fetch.FetchBlobSidecarTask;
import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.beacon.sync.fetch.FetchBlobSidecarsTask;
import tech.pegasys.teku.beacon.sync.fetch.FetchTaskFactory;
import tech.pegasys.teku.beacon.sync.forward.ForwardSync;
import tech.pegasys.teku.beacon.sync.gossip.AbstractFetchService;
import tech.pegasys.teku.beacon.sync.gossip.blocks.RecentBlocksFetchService;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.subscribers.Subscribers;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobIdentifier;
import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackersPool;

public class RecentBlobSidecarsFetchService
extends AbstractFetchService<BlobIdentifier, FetchBlobSidecarTask, BlobSidecar>
extends AbstractFetchService<Bytes32, FetchBlobSidecarsTask, List<BlobSidecar>>
implements RecentBlobSidecarsFetcher {

private static final Logger LOG = LogManager.getLogger();

private static final int MAX_CONCURRENT_REQUESTS = 3;

private final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool;
private final ForwardSync forwardSync;
private final FetchTaskFactory fetchTaskFactory;
Expand All @@ -57,17 +60,13 @@ public static RecentBlobSidecarsFetchService create(
final AsyncRunner asyncRunner,
final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool,
final ForwardSync forwardSync,
final FetchTaskFactory fetchTaskFactory,
final Spec spec) {
final int maxConcurrentRequests =
RecentBlocksFetchService.MAX_CONCURRENT_REQUESTS
* spec.getMaxBlobsPerBlockForHighestMilestone().orElse(1);
final FetchTaskFactory fetchTaskFactory) {
return new RecentBlobSidecarsFetchService(
asyncRunner,
blockBlobSidecarsTrackersPool,
forwardSync,
fetchTaskFactory,
maxConcurrentRequests);
MAX_CONCURRENT_REQUESTS);
}

@Override
Expand All @@ -87,34 +86,62 @@ public void subscribeBlobSidecarFetched(final BlobSidecarSubscriber subscriber)
}

@Override
public void requestRecentBlobSidecar(final BlobIdentifier blobIdentifier) {
public void requestRecentBlobSidecars(
final Bytes32 blockRoot, final List<BlobIdentifier> blobIdentifiers) {
if (forwardSync.isSyncActive()) {
// Forward sync already in progress, assume it will fetch any missing blob sidecars
return;
}
if (blockBlobSidecarsTrackersPool.containsBlobSidecar(blobIdentifier)) {
// We've already got this blob sidecar
final List<BlobIdentifier> requiredBlobIdentifiers =
blobIdentifiers.stream()
.filter(
blobIdentifier ->
!blockBlobSidecarsTrackersPool.containsBlobSidecar(blobIdentifier))
.toList();
if (requiredBlobIdentifiers.isEmpty()) {
// We already have all required blob sidecars
return;
}
final FetchBlobSidecarTask task = createTask(blobIdentifier);
if (allTasks.putIfAbsent(blobIdentifier, task) != null) {
final FetchBlobSidecarsTask task =
fetchTaskFactory.createFetchBlobSidecarsTask(blockRoot, requiredBlobIdentifiers);
if (allTasks.putIfAbsent(blockRoot, task) != null) {
// We're already tracking this task
task.cancel();
return;
}
LOG.trace("Queue blob sidecar to be fetched: {}", blobIdentifier);
LOG.trace("Queue blob sidecars to be fetched: {}", requiredBlobIdentifiers);
queueTask(task);
}

@Override
public void cancelRecentBlobSidecarRequest(final BlobIdentifier blobIdentifier) {
cancelRequest(blobIdentifier);
public void cancelRecentBlobSidecarsRequest(final Bytes32 blockRoot) {
cancelRequest(blockRoot);
}

@Override
public void processFetchedResult(
final FetchBlobSidecarsTask task, final List<BlobSidecar> result) {
result.forEach(
blobSidecar -> {
LOG.trace("Successfully fetched blob sidecar: {}", result);
blobSidecarSubscribers.forEach(s -> s.onBlobSidecar(blobSidecar));
});
// After retrieved blob sidecars have been processed, stop tracking it
removeTask(task);
}

@Override
public void onBlockValidated(final SignedBeaconBlock block) {}

@Override
public void onBlockImported(final SignedBeaconBlock block, final boolean executionOptimistic) {
cancelRecentBlobSidecarsRequest(block.getRoot());
}

private void setupSubscribers() {
blockBlobSidecarsTrackersPool.subscribeRequiredBlobSidecar(this::requestRecentBlobSidecar);
blockBlobSidecarsTrackersPool.subscribeRequiredBlobSidecarDropped(
this::cancelRecentBlobSidecarRequest);
blockBlobSidecarsTrackersPool.subscribeRequiredBlobSidecars(this::requestRecentBlobSidecars);
blockBlobSidecarsTrackersPool.subscribeRequiredBlobSidecarsDropped(
this::cancelRecentBlobSidecarsRequest);
forwardSync.subscribeToSyncChanges(this::onSyncStatusChanged);
}

Expand All @@ -126,19 +153,6 @@ private void onSyncStatusChanged(final boolean syncActive) {
// We may have ignored these requested blob sidecars while the sync was in progress
blockBlobSidecarsTrackersPool
.getAllRequiredBlobSidecars()
.forEach(this::requestRecentBlobSidecar);
}

@Override
public FetchBlobSidecarTask createTask(final BlobIdentifier key) {
return fetchTaskFactory.createFetchBlobSidecarTask(key);
}

@Override
public void processFetchedResult(final FetchBlobSidecarTask task, final BlobSidecar result) {
LOG.trace("Successfully fetched blob sidecar: {}", result);
blobSidecarSubscribers.forEach(s -> s.onBlobSidecar(result));
// After retrieved blob sidecar has been processed, stop tracking it
removeTask(task);
.forEach(this::requestRecentBlobSidecars);
}
}
Loading
Loading