diff --git a/networking/eth2/build.gradle b/networking/eth2/build.gradle index 43eb1f564d9..793769dd12e 100644 --- a/networking/eth2/build.gradle +++ b/networking/eth2/build.gradle @@ -46,6 +46,7 @@ dependencies { testFixturesImplementation testFixtures(project(':infrastructure:async')) testFixturesImplementation project(':infrastructure:bytes') testFixturesImplementation testFixtures(project(':infrastructure:events')) + testFixturesImplementation testFixtures(project(':infrastructure:metrics')) testFixturesImplementation testFixtures(project(':infrastructure:time')) testFixturesImplementation testFixtures(project(':storage')) testFixturesImplementation project(':infrastructure:subscribers') diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkBuilder.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkBuilder.java index d8e30bd1261..774928b6cf9 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkBuilder.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkBuilder.java @@ -29,6 +29,8 @@ import org.hyperledger.besu.plugin.services.MetricsSystem; import tech.pegasys.teku.infrastructure.async.AsyncRunner; import tech.pegasys.teku.infrastructure.events.EventChannels; +import tech.pegasys.teku.infrastructure.metrics.SettableLabelledGauge; +import tech.pegasys.teku.infrastructure.metrics.TekuMetricCategory; import tech.pegasys.teku.infrastructure.time.TimeProvider; import tech.pegasys.teku.networking.eth2.gossip.encoding.GossipEncoding; import tech.pegasys.teku.networking.eth2.gossip.forks.GossipForkManager; @@ -327,6 +329,13 @@ protected DiscoveryNetwork buildNetwork( discoConfig.getMinRandomlySelectedPeers()); final SchemaDefinitionsSupplier currentSchemaDefinitions = () -> recentChainData.getCurrentSpec().getSchemaDefinitions(); + final SettableLabelledGauge subnetPeerCountGauge = + SettableLabelledGauge.create( + metricsSystem, + TekuMetricCategory.NETWORK, + "subnet_peer_count", + "Number of currently connected peers subscribed to each subnet", + "subnet"); return createDiscoveryNetworkBuilder() .metricsSystem(metricsSystem) .asyncRunner(asyncRunner) @@ -342,7 +351,8 @@ protected DiscoveryNetwork buildNetwork( attestationSubnetTopicProvider, syncCommitteeSubnetTopicProvider, syncCommitteeSubnetService, - config.getTargetSubnetSubscriberCount()), + config.getTargetSubnetSubscriberCount(), + subnetPeerCountGauge), reputationManager, Collections::shuffle)) .discoveryConfig(discoConfig) diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/subnets/PeerSubnetSubscriptions.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/subnets/PeerSubnetSubscriptions.java index 7e5b806c141..00a131dcae0 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/subnets/PeerSubnetSubscriptions.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/subnets/PeerSubnetSubscriptions.java @@ -26,6 +26,7 @@ import java.util.function.Consumer; import java.util.stream.IntStream; import tech.pegasys.teku.infrastructure.exceptions.InvalidConfigurationException; +import tech.pegasys.teku.infrastructure.metrics.SettableLabelledGauge; import tech.pegasys.teku.infrastructure.ssz.collections.SszBitvector; import tech.pegasys.teku.infrastructure.ssz.schema.collections.SszBitvectorSchema; import tech.pegasys.teku.networking.eth2.SubnetSubscriptionService; @@ -55,42 +56,67 @@ public static PeerSubnetSubscriptions create( final AttestationSubnetTopicProvider attestationTopicProvider, final SyncCommitteeSubnetTopicProvider syncCommitteeSubnetTopicProvider, final SubnetSubscriptionService syncCommitteeSubnetService, - final int targetSubnetSubscriberCount) { + final int targetSubnetSubscriberCount, + final SettableLabelledGauge subnetPeerCountGauge) { final Map> subscribersByTopic = network.getSubscribersByTopic(); - return builder(currentSchemaDefinitions) - .targetSubnetSubscriberCount(targetSubnetSubscriberCount) - .attestationSubnetSubscriptions( - b -> - // Track all attestation subnets - streamAllAttestationSubnetIds(currentSchemaDefinitions) - .forEach( - attestationSubnet -> { - b.addRelevantSubnet(attestationSubnet); - subscribersByTopic - .getOrDefault( - attestationTopicProvider.getTopicForSubnet(attestationSubnet), - Collections.emptySet()) - .forEach( - subscriber -> b.addSubscriber(attestationSubnet, subscriber)); - })) - .syncCommitteeSubnetSubscriptions( - b -> - // Only track sync committee subnets that we're subscribed to - syncCommitteeSubnetService - .getSubnets() - .forEach( - syncCommitteeSubnet -> { - b.addRelevantSubnet(syncCommitteeSubnet); - subscribersByTopic - .getOrDefault( - syncCommitteeSubnetTopicProvider.getTopicForSubnet( - syncCommitteeSubnet), - Collections.emptySet()) - .forEach( - subscriber -> b.addSubscriber(syncCommitteeSubnet, subscriber)); - })) - .build(); + final PeerSubnetSubscriptions subscriptions = + builder(currentSchemaDefinitions) + .targetSubnetSubscriberCount(targetSubnetSubscriberCount) + .attestationSubnetSubscriptions( + b -> + // Track all attestation subnets + streamAllAttestationSubnetIds(currentSchemaDefinitions) + .forEach( + attestationSubnet -> { + b.addRelevantSubnet(attestationSubnet); + subscribersByTopic + .getOrDefault( + attestationTopicProvider.getTopicForSubnet(attestationSubnet), + Collections.emptySet()) + .forEach( + subscriber -> b.addSubscriber(attestationSubnet, subscriber)); + })) + .syncCommitteeSubnetSubscriptions( + b -> + // Only track sync committee subnets that we're subscribed to + syncCommitteeSubnetService + .getSubnets() + .forEach( + syncCommitteeSubnet -> { + b.addRelevantSubnet(syncCommitteeSubnet); + subscribersByTopic + .getOrDefault( + syncCommitteeSubnetTopicProvider.getTopicForSubnet( + syncCommitteeSubnet), + Collections.emptySet()) + .forEach( + subscriber -> + b.addSubscriber(syncCommitteeSubnet, subscriber)); + })) + .build(); + updateMetrics(currentSchemaDefinitions, subnetPeerCountGauge, subscriptions); + return subscriptions; + } + + private static void updateMetrics( + final SchemaDefinitionsSupplier currentSchemaDefinitions, + final SettableLabelledGauge subnetPeerCountGauge, + final PeerSubnetSubscriptions subscriptions) { + streamAllAttestationSubnetIds(currentSchemaDefinitions) + .forEach( + subnetId -> + subnetPeerCountGauge.set( + subscriptions.attestationSubnetSubscriptions.subscriberCountBySubnetId + .getOrDefault(subnetId, 0), + "attestation_" + subnetId)); + streamAllSyncCommitteeSubnetIds(currentSchemaDefinitions) + .forEach( + subnetId -> + subnetPeerCountGauge.set( + subscriptions.syncCommitteeSubnetSubscriptions.subscriberCountBySubnetId + .getOrDefault(subnetId, 0), + "sync_committee_" + subnetId)); } private static IntStream streamAllAttestationSubnetIds( @@ -98,6 +124,11 @@ private static IntStream streamAllAttestationSubnetIds( return IntStream.range(0, currentSchemaDefinitions.getAttnetsENRFieldSchema().getLength()); } + private static IntStream streamAllSyncCommitteeSubnetIds( + final SchemaDefinitionsSupplier currentSchemaDefinitions) { + return IntStream.range(0, currentSchemaDefinitions.getSyncnetsENRFieldSchema().getLength()); + } + static Builder builder(final SchemaDefinitionsSupplier currentSchemaDefinitions) { return new Builder(currentSchemaDefinitions); } diff --git a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/subnets/PeerSubnetSubscriptionsTest.java b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/subnets/PeerSubnetSubscriptionsTest.java index e9096fafad9..3811aba9d3d 100644 --- a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/subnets/PeerSubnetSubscriptionsTest.java +++ b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/subnets/PeerSubnetSubscriptionsTest.java @@ -30,6 +30,7 @@ import java.util.stream.IntStream; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import tech.pegasys.teku.infrastructure.metrics.SettableLabelledGauge; import tech.pegasys.teku.infrastructure.ssz.collections.SszBitvector; import tech.pegasys.teku.networking.eth2.SubnetSubscriptionService; import tech.pegasys.teku.networking.p2p.gossip.GossipNetwork; @@ -46,6 +47,7 @@ class PeerSubnetSubscriptionsTest { private static final int TARGET_SUBSCRIBER_COUNT = 2; private final Spec spec = TestSpecFactory.createMinimalAltair(); + private final SettableLabelledGauge subnetPeerCountGauge = mock(SettableLabelledGauge.class); private final SchemaDefinitionsSupplier currentSchemaDefinitions = spec::getGenesisSchemaDefinitions; private final GossipNetwork gossipNetwork = mock(GossipNetwork.class); @@ -201,7 +203,8 @@ private PeerSubnetSubscriptions createPeerSubnetSubscriptions() { attestationTopicProvider, syncCommitteeTopicProvider, syncnetSubscriptions, - TARGET_SUBSCRIBER_COUNT); + TARGET_SUBSCRIBER_COUNT, + subnetPeerCountGauge); } private void withSubscriberCountForAllSubnets(int subscriberCount) { diff --git a/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkFactory.java b/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkFactory.java index c6266ddc094..8989be7b8fb 100644 --- a/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkFactory.java +++ b/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkFactory.java @@ -40,6 +40,8 @@ import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.async.Waiter; import tech.pegasys.teku.infrastructure.events.EventChannels; +import tech.pegasys.teku.infrastructure.metrics.SettableLabelledGauge; +import tech.pegasys.teku.infrastructure.metrics.TekuMetricCategory; import tech.pegasys.teku.infrastructure.subscribers.Subscribers; import tech.pegasys.teku.infrastructure.time.StubTimeProvider; import tech.pegasys.teku.infrastructure.unsigned.UInt64; @@ -228,6 +230,13 @@ protected Eth2P2PNetwork buildNetwork(final P2PConfig config) { discoConfig.getMinRandomlySelectedPeers()); final SchemaDefinitionsSupplier currentSchemaDefinitions = () -> config.getSpec().getGenesisSchemaDefinitions(); + final SettableLabelledGauge subnetPeerCountGauge = + SettableLabelledGauge.create( + metricsSystem, + TekuMetricCategory.NETWORK, + "subnet_peer_count", + "Number of currently connected peers subscribed to each subnet", + "subnet"); final DiscoveryNetwork network = DiscoveryNetworkBuilder.create() .metricsSystem(metricsSystem) @@ -257,7 +266,8 @@ protected Eth2P2PNetwork buildNetwork(final P2PConfig config) { attestationSubnetTopicProvider, syncCommitteeTopicProvider, syncCommitteeSubnetService, - config.getTargetSubnetSubscriberCount()), + config.getTargetSubnetSubscriberCount(), + subnetPeerCountGauge), reputationManager, Collections::shuffle)) .discoveryConfig(config.getDiscoveryConfig())