Skip to content

Commit

Permalink
Throttle validator registration external signing requests (#6301)
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanBratanov authored Oct 7, 2022
1 parent 8dcbafd commit 204e92e
Show file tree
Hide file tree
Showing 18 changed files with 345 additions and 67 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ For information on changes in released versions of Teku, see the [releases page]
- The `voluntary-exit` subcommand can restrict the exit to a specific list of validators public keys using the option `--validator-public-keys`.
Example: `teku voluntary-exit --beacon-node-api-endpoint=<ENDPOINT>[,<ENDPOINT>...]... --data-validator-path=<PATH> --include-keymanager-keys=<BOOLEAN> --validator-keys=<KEY_DIR>:<PASS_DIR> | <KEY_FILE>:<PASS_FILE> --validator-public-keys=<PUBKEY>[,<PUBKEY>...]...`
To include validator keys managed via keymanager APIs, the option `--include-keymanager-keys` could be set to `true` (The default value is set to `false`)
- Throttle signing of validator registrations when using an external signer

### Bug Fixes
- Filter out unknown validators when sending validator registrations to the builder network
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public ThrottlingEth1Provider(
final MetricsSystem metricsSystem) {
this.delegate = delegate;
taskQueue =
new ThrottlingTaskQueue(
ThrottlingTaskQueue.create(
maximumConcurrentRequests,
metricsSystem,
TekuMetricCategory.BEACON,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public ThrottlingBuilderClient(
final MetricsSystem metricsSystem) {
this.delegate = delegate;
taskQueue =
new ThrottlingTaskQueue(
ThrottlingTaskQueue.create(
maximumConcurrentRequests,
metricsSystem,
TekuMetricCategory.BEACON,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public ThrottlingExecutionEngineClient(
final MetricsSystem metricsSystem) {
this.delegate = delegate;
taskQueue =
new ThrottlingTaskQueue(
ThrottlingTaskQueue.create(
maximumConcurrentRequests,
metricsSystem,
TekuMetricCategory.BEACON,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,49 +13,75 @@

package tech.pegasys.teku.infrastructure.async;

import com.google.common.annotations.VisibleForTesting;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Supplier;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import tech.pegasys.teku.infrastructure.metrics.TekuMetricCategory;

public class ThrottlingTaskQueue {

protected final Queue<Runnable> queuedTasks = new ConcurrentLinkedQueue<>();

private final int maximumConcurrentTasks;
private final Queue<Runnable> queuedTasks = new ConcurrentLinkedQueue<>();

private int inflightTaskCount = 0;

public ThrottlingTaskQueue(
public static ThrottlingTaskQueue create(
final int maximumConcurrentTasks,
final MetricsSystem metricsSystem,
final TekuMetricCategory metricCategory,
final String metricName) {
this.maximumConcurrentTasks = maximumConcurrentTasks;

final ThrottlingTaskQueue taskQueue = new ThrottlingTaskQueue(maximumConcurrentTasks);
metricsSystem.createGauge(
metricCategory, metricName, "Number of tasks queued", queuedTasks::size);
metricCategory, metricName, "Number of tasks queued", taskQueue.queuedTasks::size);
return taskQueue;
}

protected ThrottlingTaskQueue(final int maximumConcurrentTasks) {
this.maximumConcurrentTasks = maximumConcurrentTasks;
}

public <T> SafeFuture<T> queueTask(final Supplier<SafeFuture<T>> request) {
final SafeFuture<T> future = new SafeFuture<>();
queuedTasks.add(
() -> {
final SafeFuture<T> requestFuture = request.get();
requestFuture.propagateTo(future);
requestFuture.always(this::taskComplete);
});
final SafeFuture<T> target = new SafeFuture<>();
final Runnable taskToQueue = getTaskToQueue(request, target);
queuedTasks.add(taskToQueue);
processQueuedTasks();
return future;
return target;
}

private synchronized void taskComplete() {
inflightTaskCount--;
processQueuedTasks();
protected <T> Runnable getTaskToQueue(
final Supplier<SafeFuture<T>> request, final SafeFuture<T> target) {
return () -> {
final SafeFuture<T> requestFuture = request.get();
requestFuture.propagateTo(target);
requestFuture.always(this::taskComplete);
};
}

private synchronized void processQueuedTasks() {
while (inflightTaskCount < maximumConcurrentTasks && !queuedTasks.isEmpty()) {
protected Runnable getTaskToRun() {
return queuedTasks.remove();
}

protected synchronized void processQueuedTasks() {
while (inflightTaskCount < maximumConcurrentTasks && getQueuedTasksCount() > 0) {
inflightTaskCount++;
queuedTasks.remove().run();
getTaskToRun().run();
}
}

protected int getQueuedTasksCount() {
return queuedTasks.size();
}

@VisibleForTesting
int getInflightTaskCount() {
return inflightTaskCount;
}

private synchronized void taskComplete() {
inflightTaskCount--;
processQueuedTasks();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright ConsenSys Software Inc., 2022
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.infrastructure.async;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Supplier;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.LabelledGauge;
import tech.pegasys.teku.infrastructure.metrics.TekuMetricCategory;

public class ThrottlingTaskQueueWithPriority extends ThrottlingTaskQueue {

private final Queue<Runnable> queuedPrioritizedTasks = new ConcurrentLinkedQueue<>();

public static ThrottlingTaskQueueWithPriority create(
final int maximumConcurrentTasks,
final MetricsSystem metricsSystem,
final TekuMetricCategory metricCategory,
final String metricName) {
final ThrottlingTaskQueueWithPriority taskQueue =
new ThrottlingTaskQueueWithPriority(maximumConcurrentTasks);
final LabelledGauge taskQueueGauge =
metricsSystem.createLabelledGauge(
metricCategory, metricName, "Number of tasks queued", "priority");
taskQueueGauge.labels(taskQueue.queuedTasks::size, "normal");
taskQueueGauge.labels(taskQueue.queuedPrioritizedTasks::size, "high");
return taskQueue;
}

private ThrottlingTaskQueueWithPriority(final int maximumConcurrentTasks) {
super(maximumConcurrentTasks);
}

public <T> SafeFuture<T> queueTask(
final Supplier<SafeFuture<T>> request, final boolean prioritize) {
if (!prioritize) {
return queueTask(request);
}
final SafeFuture<T> target = new SafeFuture<>();
final Runnable taskToQueue = getTaskToQueue(request, target);
queuedPrioritizedTasks.add(taskToQueue);
processQueuedTasks();
return target;
}

@Override
protected Runnable getTaskToRun() {
return !queuedPrioritizedTasks.isEmpty()
? queuedPrioritizedTasks.remove()
: queuedTasks.remove();
}

@Override
protected int getQueuedTasksCount() {
return queuedTasks.size() + queuedPrioritizedTasks.size();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright ConsenSys Software Inc., 2022
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.infrastructure.async;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.jupiter.api.Test;
import tech.pegasys.teku.infrastructure.metrics.StubMetricsSystem;
import tech.pegasys.teku.infrastructure.metrics.TekuMetricCategory;

public class ThrottlingTaskQueueTest {

private static final int MAXIMUM_CONCURRENT_TASKS = 3;

private final StubMetricsSystem stubMetricsSystem = new StubMetricsSystem();

private final StubAsyncRunner stubAsyncRunner = new StubAsyncRunner();

private final ThrottlingTaskQueue taskQueue =
ThrottlingTaskQueue.create(
MAXIMUM_CONCURRENT_TASKS, stubMetricsSystem, TekuMetricCategory.BEACON, "test_metric");

@Test
public void throttlesRequests() {
final List<SafeFuture<Void>> requests =
IntStream.range(0, 100)
.mapToObj(
element -> {
final SafeFuture<Void> request =
stubAsyncRunner.runAsync(
() -> {
assertThat(taskQueue.getInflightTaskCount())
.isLessThanOrEqualTo(MAXIMUM_CONCURRENT_TASKS);
});
return taskQueue.queueTask(() -> request);
})
.collect(Collectors.toList());

assertThat(getQueuedTasksGaugeValue()).isEqualTo(97);
assertThat(taskQueue.getInflightTaskCount()).isEqualTo(3);

stubAsyncRunner.executeQueuedActions();

requests.forEach(request -> assertThat(request).isCompleted());
}

private double getQueuedTasksGaugeValue() {
return stubMetricsSystem.getGauge(TekuMetricCategory.BEACON, "test_metric").getValue();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright ConsenSys Software Inc., 2022
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.infrastructure.async;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.jupiter.api.Test;
import tech.pegasys.teku.infrastructure.metrics.StubMetricsSystem;
import tech.pegasys.teku.infrastructure.metrics.TekuMetricCategory;

public class ThrottlingTaskQueueWithPriorityTest {

private static final int MAXIMUM_CONCURRENT_TASKS = 3;

private final StubMetricsSystem stubMetricsSystem = new StubMetricsSystem();

private final StubAsyncRunner stubAsyncRunner = new StubAsyncRunner();

private final ThrottlingTaskQueueWithPriority taskQueue =
ThrottlingTaskQueueWithPriority.create(
MAXIMUM_CONCURRENT_TASKS, stubMetricsSystem, TekuMetricCategory.BEACON, "test_metric");

@Test
public void throttlesRequests() {
final List<SafeFuture<Void>> requests =
IntStream.range(0, 100)
.mapToObj(
element -> {
final SafeFuture<Void> request =
stubAsyncRunner.runAsync(
() -> {
assertThat(taskQueue.getInflightTaskCount())
.isLessThanOrEqualTo(MAXIMUM_CONCURRENT_TASKS);
});
// prioritize 1/3 of requests
if (element % 3 == 0) {
return taskQueue.queueTask(() -> request, true);
}
return taskQueue.queueTask(() -> request);
})
.collect(Collectors.toList());

assertThat(getQueuedTasksGaugeValue(true)).isEqualTo(33);
assertThat(getQueuedTasksGaugeValue(false)).isEqualTo(64);
assertThat(taskQueue.getInflightTaskCount()).isEqualTo(3);

stubAsyncRunner.executeQueuedActions();

requests.forEach(request -> assertThat(request).isCompleted());
}

@Test
@SuppressWarnings("FutureReturnValueIgnored")
public void prioritizesRequests() {
final SafeFuture<Void> initialRequest = new SafeFuture<>();
final SafeFuture<Void> prioritizedRequest = new SafeFuture<>();
final SafeFuture<Void> normalRequest = new SafeFuture<>();

final AtomicBoolean priorityFirst = new AtomicBoolean(false);

// fill queue
IntStream.range(0, MAXIMUM_CONCURRENT_TASKS)
.forEach(__ -> taskQueue.queueTask(() -> initialRequest));
final SafeFuture<Void> assertion =
taskQueue.queueTask(
() -> {
// make sure the prioritized request is ran first
// even though It has been queued after this one
assertThat(priorityFirst).isTrue();
return normalRequest;
});
taskQueue.queueTask(
() -> {
priorityFirst.set(true);
return prioritizedRequest;
},
true);

assertThat(getQueuedTasksGaugeValue(true)).isEqualTo(1);
assertThat(getQueuedTasksGaugeValue(false)).isEqualTo(1);
assertThat(taskQueue.getInflightTaskCount()).isEqualTo(3);

initialRequest.complete(null);
normalRequest.complete(null);
prioritizedRequest.complete(null);

assertThat(assertion).isCompleted();
}

private double getQueuedTasksGaugeValue(final boolean priority) {
return stubMetricsSystem
.getLabelledGauge(TekuMetricCategory.BEACON, "test_metric")
.getValue(priority ? "high" : "normal")
.orElseThrow();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ public class ValidatorKeysOptions {
@CommandLine.Option(
names = {"--Xvalidators-external-signer-concurrent-limit"},
paramLabel = "<INTEGER>",
description = "The maximum number of concurrent background requests to make to the signer.",
description =
"The maximum number of concurrent background requests to make to the signer. This only applies for aggregation slot and validator registration requests.",
hidden = true,
arity = "1")
private int validatorExternalSignerConcurrentRequestLimit =
Expand Down
Loading

0 comments on commit 204e92e

Please sign in to comment.