Skip to content

Commit

Permalink
Merge pull request #65 from migalabs/update/active-peer-filters
Browse files Browse the repository at this point in the history
Update/active peer filters
  • Loading branch information
cortze authored Nov 23, 2023
2 parents c8d4997 + a09277b commit f4dfd0b
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 32 deletions.
8 changes: 6 additions & 2 deletions pkg/db/postgresql/active_peers_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ func (c *DBClient) getActivePeers() ([]int, error) {
id,
peer_id
FROM peer_info
WHERE deprecated = 'false' and attempted = 'true' and client_name IS NOT NULL
WHERE deprecated = 'false' and attempted = 'true' and client_name IS NOT NULL and to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY')
`,
LastActivityValidRange,
)
if err != nil {
return activePeers, errors.Wrap(err, "unable to retrieve active peer's ids")
Expand All @@ -62,7 +63,6 @@ func (c *DBClient) getActivePeers() ([]int, error) {
}
activePeers = append(activePeers, id)
}

return activePeers, nil
}

Expand All @@ -73,6 +73,10 @@ func (c *DBClient) activePeersBackup() error {
if err != nil {
return errors.Wrap(err, "unable to backup active peers")
}
if len(activePeers) <= 0 {
log.Infof("tried to persist %d active peers (skipped)", len(activePeers))
return nil
}

// backup the list of active peers
_, err = c.psqlPool.Exec(
Expand Down
93 changes: 71 additions & 22 deletions pkg/db/postgresql/crawler_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ import (
log "github.com/sirupsen/logrus"
)

var (
LastActivityValidRange = 180 // 6 Months
)

// this file contains all the list of queries to extract the metrics from the Crawler (as agnostic as possible from the network)

// Basic call over the whole list of non-deprecated peers
Expand All @@ -20,14 +24,19 @@ func (db *DBClient) GetClientDistribution() (map[string]interface{}, error) {
client_name, count(client_name) as count
FROM peer_info
WHERE
deprecated = 'false' and attempted = 'true' and client_name IS NOT NULL
deprecated = 'false' and
attempted = 'true' and
client_name IS NOT NULL and
to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY')
GROUP BY client_name
ORDER BY count DESC;
`,
LastActivityValidRange,
)
// make sure we close the rows and we free the connection/session
defer rows.Close()
if err != nil {
fmt.Print("\n", err.Error())
return cliDist, errors.Wrap(err, "unable to fetch client distribution")
}

Expand Down Expand Up @@ -57,10 +66,14 @@ func (db *DBClient) GetVersionDistribution() (map[string]interface{}, error) {
count(client_version) as cnt
FROM peer_info
WHERE
deprecated = 'false' and attempted = 'true' and client_name IS NOT NULL
deprecated = 'false' and
attempted = 'true' and
client_name IS NOT NULL and
to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY')
GROUP BY client_name, client_version
ORDER BY client_name DESC, cnt DESC;
`,
LastActivityValidRange,
)
// make sure we close the rows and we free the connection/session
defer rows.Close()
Expand Down Expand Up @@ -99,11 +112,15 @@ func (db *DBClient) GetGeoDistribution() (map[string]interface{}, error) {
ips.country_code
FROM peer_info
RIGHT JOIN ips on peer_info.ip = ips.ip
WHERE deprecated = 'false' and attempted = 'true' and client_name IS NOT NULL
WHERE deprecated = 'false' and
attempted = 'true' and
client_name IS NOT NULL and
to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY')
) as aux
GROUP BY country_code
ORDER BY cnt DESC;
`,
LastActivityValidRange,
)
// make sure we close the rows and we free the connection/session
defer rows.Close()
Expand Down Expand Up @@ -133,10 +150,15 @@ func (db *DBClient) GetOsDistribution() (map[string]interface{}, error) {
client_os,
count(client_os) as nodes
FROM peer_info
WHERE deprecated='false' and attempted='true' and client_name IS NOT NULL
WHERE deprecated='false' and
attempted='true' and
client_name IS NOT NULL and
to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY')
GROUP BY client_os
ORDER BY nodes DESC;
`)
`,
LastActivityValidRange,
)
if err != nil {
return summary, err
}
Expand All @@ -158,10 +180,15 @@ func (db *DBClient) GetArchDistribution() (map[string]interface{}, error) {
client_arch,
count(client_arch) as nodes
FROM peer_info
WHERE deprecated='false' and attempted='true' and client_name IS NOT NULL
WHERE deprecated='false' and
attempted='true' and
client_name IS NOT NULL and
to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY')
GROUP BY client_arch
ORDER BY nodes DESC;
`)
`,
LastActivityValidRange,
)
if err != nil {
return summary, err
}
Expand Down Expand Up @@ -193,9 +220,15 @@ func (db *DBClient) GetHostingDistribution() (map[string]interface{}, error) {
ips.mobile
FROM peer_info as pi
INNER JOIN ips ON pi.ip=ips.ip
WHERE pi.deprecated='false' and attempted = 'true' and client_name IS NOT NULL and ips.mobile='true'
WHERE pi.deprecated='false' and
attempted = 'true' and
client_name IS NOT NULL and
ips.mobile='true' and
to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY')
) as aux
`).Scan(&mobile)
`,
LastActivityValidRange,
).Scan(&mobile)
if err != nil {
return summary, err
}
Expand All @@ -218,9 +251,14 @@ func (db *DBClient) GetHostingDistribution() (map[string]interface{}, error) {
ips.proxy
FROM peer_info as pi
INNER JOIN ips ON pi.ip=ips.ip
WHERE pi.deprecated='false' and attempted = 'true' and client_name IS NOT NULL and ips.proxy='true'
WHERE pi.deprecated='false' and
attempted = 'true' and
client_name IS NOT NULL and ips.proxy='true' and
to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY')
) as aux
`).Scan(&proxy)
`,
LastActivityValidRange,
).Scan(&proxy)
if err != nil {
return summary, err
}
Expand All @@ -243,9 +281,15 @@ func (db *DBClient) GetHostingDistribution() (map[string]interface{}, error) {
ips.hosting
FROM peer_info as pi
INNER JOIN ips ON pi.ip=ips.ip
WHERE pi.deprecated='false' and attempted = 'true' and client_name IS NOT NULL and ips.hosting='true'
WHERE pi.deprecated='false' and
attempted = 'true' and
client_name IS NOT NULL and
ips.hosting='true' and
to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY')
) as aux
`).Scan(&hosted)
`,
LastActivityValidRange,
).Scan(&hosted)
if err != nil {
return summary, err
}
Expand Down Expand Up @@ -278,12 +322,14 @@ func (db *DBClient) GetRTTDistribution() (map[string]interface{}, error) {
ELSE '+1s'
END as latency
FROM peer_info
WHERE deprecated=false and client_name IS NOT NULL
WHERE deprecated=false and
client_name IS NOT NULL and
to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY')
) as t
GROUP BY t.latency
ORDER BY nodes DESC;
`,
LastActivityValidRange,
)
if err != nil {
return summary, err
Expand All @@ -295,11 +341,11 @@ func (db *DBClient) GetRTTDistribution() (map[string]interface{}, error) {
err = rows.Scan(
&rttRange,
&rttValue,
)
)
if err != nil {
return summary, err
return summary, err
}
summary[rttRange] = rttValue
summary[rttRange] = rttValue
}
return summary, nil
}
Expand All @@ -318,13 +364,16 @@ func (db *DBClient) GetIPDistribution() (map[string]interface{}, error) {
ip,
count(ip) as nodes
FROM peer_info
WHERE deprecated = false and client_name IS NOT NULL
WHERE deprecated = false and
client_name IS NOT NULL and
to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY')
GROUP BY ip
ORDER BY nodes DESC
) as t
GROUP BY nodes
ORDER BY number_of_ips DESC;
`,
LastActivityValidRange,
)
if err != nil {
return summary, err
Expand All @@ -336,11 +385,11 @@ func (db *DBClient) GetIPDistribution() (map[string]interface{}, error) {
err = rows.Scan(
&nodesPerIP,
&ips,
)
)
if err != nil {
return summary, err
return summary, err
}
summary[fmt.Sprintf("%d", nodesPerIP)] = ips
summary[fmt.Sprintf("%d", nodesPerIP)] = ips
}
return summary, nil
}
2 changes: 1 addition & 1 deletion pkg/db/postgresql/peer_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package postgresql
import (
"time"

"github.com/jackc/pgx/v4"
pgx "github.com/jackc/pgx/v4"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/migalabs/armiarma/pkg/db/models"
"github.com/migalabs/armiarma/pkg/utils"
Expand Down
18 changes: 11 additions & 7 deletions pkg/db/postgresql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ var (
noQueryResult string = "no result"
)


type DBClient struct {
// Control Variables
ctx context.Context
Expand All @@ -49,11 +48,11 @@ type DBClient struct {
}

func NewDBClient(
ctx context.Context,
p2pNetwork utils.NetworkType,
loginStr string,
dailyBackupInt time.Duration,
options ...DBOption) (*DBClient, error) {
ctx context.Context,
p2pNetwork utils.NetworkType,
loginStr string,
dailyBackupInt time.Duration,
options ...DBOption) (*DBClient, error) {
// check if the login string has enough len
if len(loginStr) == 0 {
return nil, errors.New("empty db-endpoint provided")
Expand Down Expand Up @@ -102,7 +101,7 @@ options ...DBOption) (*DBClient, error) {
persistC: persistC,
doneC: make(chan struct{}),
wg: &wg,
persistConnEvents: true,
persistConnEvents: true,
}

// Check for all the available options
Expand Down Expand Up @@ -336,6 +335,11 @@ func (c *DBClient) launchPersister() {
}

func (c *DBClient) dailyBackupheartbeat() {
// make a first backup of the active peers(if any)
err := c.activePeersBackup()
if err != nil {
log.Error(err)
}
ticker := time.NewTicker(c.dailyBackupInterval)
for {
select {
Expand Down

0 comments on commit f4dfd0b

Please sign in to comment.