Skip to content

Commit

Permalink
Merge pull request #57 from migalabs/feat/add-rtt-ip-dist
Browse files Browse the repository at this point in the history
Add feature: RTT and IP distributions
  • Loading branch information
cortze authored Mar 30, 2023
2 parents 269ad32 + 9503842 commit a101a01
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 3 deletions.
71 changes: 70 additions & 1 deletion pkg/crawler/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,24 @@ var (
HostedPeers = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: modName,
Name: "hosted_peers_distribution",
Help: "Distribution of IPs hosting the nodes in the network",
Help: "Distribution of nodes that are hosted on non-residential networks",
},
[]string{"ip_host"},
)
RttDist = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: modName,
Name: "observed_rtt_distribution",
Help: "Distribution of RTT between our tool and nodes in the network",
},
[]string{"secs"},
)
IPDist = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: modName,
Name: "observed_ip_distribution",
Help: "Distribution of IPs hosting nodes in the network",
},
[]string{"numbernodes"},
)
)

func (c *EthereumCrawler) GetMetrics() *metrics.MetricsModule {
Expand All @@ -82,6 +96,8 @@ func (c *EthereumCrawler) GetMetrics() *metrics.MetricsModule {
metricsMod.AddIndvMetric(c.getPeersOs())
metricsMod.AddIndvMetric(c.getPeersArch())
metricsMod.AddIndvMetric(c.getHostedPeers())
metricsMod.AddIndvMetric(c.getRTTDist())
metricsMod.AddIndvMetric(c.getIPDist())

return metricsMod
}
Expand Down Expand Up @@ -292,3 +308,56 @@ func (c *EthereumCrawler) getHostedPeers() *metrics.IndvMetrics {
}
return ipHosting
}


func (c *EthereumCrawler) getRTTDist() *metrics.IndvMetrics {
initFn := func() error {
prometheus.MustRegister(RttDist)
return nil
}
updateFn := func() (interface{}, error) {
summary, err := c.DB.GetRTTDistribution()
if err != nil {
return nil, err
}
for key, val := range summary {
RttDist.WithLabelValues(key).Set(float64(val.(int)))
}
return summary, nil
}
indvMetric, err := metrics.NewIndvMetrics(
"rtt_distribution",
initFn,
updateFn,
)
if err != nil {
return nil
}
return indvMetric
}

func (c *EthereumCrawler) getIPDist() *metrics.IndvMetrics {
initFn := func() error {
prometheus.MustRegister(IPDist)
return nil
}
updateFn := func() (interface{}, error) {
summary, err := c.DB.GetIPDistribution()
if err != nil {
return nil, err
}
for key, val := range summary {
IPDist.WithLabelValues(key).Set(float64(val.(int)))
}
return summary, nil
}
indvMetric, err := metrics.NewIndvMetrics(
"ip_distribution",
initFn,
updateFn,
)
if err != nil {
return nil
}
return indvMetric
}
93 changes: 93 additions & 0 deletions pkg/db/postgresql/crawler_metrics.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package postgresql

import (
"fmt"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -251,3 +252,95 @@ func (db *DBClient) GetHostingDistribution() (map[string]interface{}, error) {
summary["hosted_ips"] = hosted
return summary, nil
}

func (db *DBClient) GetRTTDistribution() (map[string]interface{}, error) {
summary := make(map[string]interface{}, 0)

rows, err := db.psqlPool.Query(
db.ctx,
`
SELECT
t.latency as latency_range,
count(*) as nodes
FROM (
SELECT
CASE
WHEN latency between 0 and 100 THEN ' 0-100ms'
WHEN latency between 101 and 200 THEN '101-200ms'
WHEN latency between 201 and 300 THEN '201-300ms'
WHEN latency between 301 and 400 THEN '301-400ms'
WHEN latency between 401 and 500 THEN '401-500ms'
WHEN latency between 501 and 600 THEN '501-600ms'
WHEN latency between 601 and 700 THEN '601-700ms'
WHEN latency between 701 and 800 THEN '701-800ms'
WHEN latency between 801 and 900 THEN '801-900ms'
WHEN latency between 901 and 1000 THEN '901-1000ms'
ELSE '+1s'
END as latency
FROM peer_info
WHERE deprecated=false and client_name IS NOT NULL
) as t
GROUP BY t.latency
ORDER BY nodes DESC;
`,
)
if err != nil {
return summary, err
}

for rows.Next() {
var rttRange string
var rttValue int
err = rows.Scan(
&rttRange,
&rttValue,
)
if err != nil {
return summary, err
}
summary[rttRange] = rttValue
}
return summary, nil
}

func (db *DBClient) GetIPDistribution() (map[string]interface{}, error) {
summary := make(map[string]interface{}, 0)

rows, err := db.psqlPool.Query(
db.ctx,
`
SELECT
nodes as nodes_per_ip,
count(t.nodes) as number_of_ips
FROM (
SELECT
ip,
count(ip) as nodes
FROM peer_info
WHERE deprecated = false and client_name IS NOT NULL
GROUP BY ip
ORDER BY nodes DESC
) as t
GROUP BY nodes
ORDER BY number_of_ips DESC;
`,
)
if err != nil {
return summary, err
}

for rows.Next() {
var nodesPerIP int
var ips int
err = rows.Scan(
&nodesPerIP,
&ips,
)
if err != nil {
return summary, err
}
summary[fmt.Sprintf("%d", nodesPerIP)] = ips
}
return summary, nil
}
2 changes: 1 addition & 1 deletion pkg/discovery/dv5/dv5_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ func (d *Discovery5) nodeIterator() {
if d.Iterator.Next() {
// fill the given DiscoveredPeer interface with the next found peer
node := d.Iterator.Node()

log.WithFields(log.Fields{
"enr": node.String(),
"node_id": node.ID().String(),
"module": "Discv5",
}).Debug("new ENR discovered")
Expand Down
2 changes: 1 addition & 1 deletion pkg/gossipsub/topic_subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (c *TopicSubscription) MessageReadingLoop(selfId peer.ID, dbClient database
// use the msg handler for that specific topic that we have
content, err := c.handlerFn(msg)
if err != nil {
log.Error(errors.Wrap(err, "unable to unwrap message"))
log.Error(errors.Wrap(err, "unable to unwrap message on topic " + c.sub.Topic()))
continue
}
if !content.IsZero() && c.persistMsgs {
Expand Down

0 comments on commit a101a01

Please sign in to comment.