Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trie log preload timeout #4

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@

import java.util.Comparator;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Stream;
Expand All @@ -40,6 +46,7 @@
public class TrieLogPruner implements TrieLogEvent.TrieLogObserver {

private static final Logger LOG = LoggerFactory.getLogger(TrieLogPruner.class);
private static final int PRELOAD_TIMEOUT_IN_SECONDS = 30;

private final int pruningLimit;
private final int loadingLimit;
Expand Down Expand Up @@ -83,38 +90,82 @@ public TrieLogPruner(
BesuMetricCategory.PRUNER, "trie_log_pruned_orphan", "trie log pruned orphan");
}

public int initialize() {
return preloadQueue();
public void initialize() {
preloadQueueWithTimeout();
}

private int preloadQueue() {
private void preloadQueueWithTimeout() {

LOG.atInfo()
.setMessage("Loading first {} trie logs from database...")
.setMessage("Attempting to load first {} trie logs from database...")
.addArgument(loadingLimit)
.log();

try (final ScheduledExecutorService preloadExecutor = Executors.newScheduledThreadPool(1)) {

final AtomicBoolean timeoutOccurred = new AtomicBoolean(false);
final Runnable timeoutTask =
() -> {
timeoutOccurred.set(true);
LOG.atWarn()
.setMessage(
"Timeout occurred while loading and processing {} trie logs from database")
.addArgument(loadingLimit)
.log();
};

final ScheduledFuture<?> timeoutFuture =
preloadExecutor.schedule(timeoutTask, PRELOAD_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS);
LOG.atInfo()
.setMessage(
"Trie log pruning will timeout after {} seconds. If this is timing out, consider using `besu storage trie-log prune` subcommand, see https://besu.hyperledger.org/public-networks/how-to/bonsai-limit-trie-logs")
.addArgument(PRELOAD_TIMEOUT_IN_SECONDS)
.log();

preloadQueue(timeoutOccurred, timeoutFuture);
}
}

private void preloadQueue(
final AtomicBoolean timeoutOccurred, final ScheduledFuture<?> timeoutFuture) {

try (final Stream<byte[]> trieLogKeys = rootWorldStateStorage.streamTrieLogKeys(loadingLimit)) {
final AtomicLong count = new AtomicLong();

final AtomicLong addToPruneQueueCount = new AtomicLong();
final AtomicLong orphansPruned = new AtomicLong();
trieLogKeys.forEach(
blockHashAsBytes -> {
if (timeoutOccurred.get()) {
throw new RuntimeException(
new TimeoutException("Timeout occurred while preloading trie log prune queue"));
}
final Hash blockHash = Hash.wrap(Bytes32.wrap(blockHashAsBytes));
final Optional<BlockHeader> header = blockchain.getBlockHeader(blockHash);
if (header.isPresent()) {
addToPruneQueue(header.get().getNumber(), blockHash);
count.getAndIncrement();
addToPruneQueueCount.getAndIncrement();
} else {
// prune orphaned blocks (sometimes created during block production)
rootWorldStateStorage.pruneTrieLog(blockHash);
orphansPruned.getAndIncrement();
prunedOrphanCounter.inc();
}
});

timeoutFuture.cancel(true);
LOG.atDebug().log("Pruned {} orphaned trie logs from database...", orphansPruned.intValue());
LOG.atInfo().log("Loaded {} trie logs from database", count);
return pruneFromQueue() + orphansPruned.intValue();
LOG.atInfo().log(
"Added {} trie logs to prune queue. Commencing pruning of eligible trie logs...",
addToPruneQueueCount.intValue());
int prunedCount = pruneFromQueue();
LOG.atInfo().log("Pruned {} trie logs.", prunedCount);
} catch (Exception e) {
LOG.error("Error loading trie logs from database, nothing pruned", e);
return 0;
if (e.getCause() != null && e.getCause() instanceof TimeoutException) {
int prunedCount = pruneFromQueue();
LOG.atInfo().log("Operation timed out, but still pruned {} trie logs.", prunedCount);
} else {
LOG.error("Error loading trie logs from database, nothing pruned", e);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public interface DataStorageConfiguration {
long DEFAULT_BONSAI_MAX_LAYERS_TO_LOAD = 512;
boolean DEFAULT_BONSAI_LIMIT_TRIE_LOGS_ENABLED = true;
long MINIMUM_BONSAI_TRIE_LOG_RETENTION_LIMIT = DEFAULT_BONSAI_MAX_LAYERS_TO_LOAD;
int DEFAULT_BONSAI_TRIE_LOG_PRUNING_WINDOW_SIZE = 30_000;
int DEFAULT_BONSAI_TRIE_LOG_PRUNING_WINDOW_SIZE = 5_000;
boolean DEFAULT_RECEIPT_COMPACTION_ENABLED = false;

DataStorageConfiguration DEFAULT_CONFIG =
Expand Down
Loading