diff --git a/pkg/db/postgresql/active_peers_backup.go b/pkg/db/postgresql/active_peers_backup.go index dc325ba..5372d65 100644 --- a/pkg/db/postgresql/active_peers_backup.go +++ b/pkg/db/postgresql/active_peers_backup.go @@ -46,8 +46,9 @@ func (c *DBClient) getActivePeers() ([]int, error) { id, peer_id FROM peer_info - WHERE deprecated = 'false' and attempted = 'true' and client_name IS NOT NULL + WHERE deprecated = 'false' and attempted = 'true' and client_name IS NOT NULL and to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') `, + LastActivityValidRange, ) if err != nil { return activePeers, errors.Wrap(err, "unable to retrieve active peer's ids") @@ -62,7 +63,6 @@ func (c *DBClient) getActivePeers() ([]int, error) { } activePeers = append(activePeers, id) } - return activePeers, nil } @@ -73,6 +73,10 @@ func (c *DBClient) activePeersBackup() error { if err != nil { return errors.Wrap(err, "unable to backup active peers") } + if len(activePeers) <= 0 { + log.Infof("tried to persist %d active peers (skipped)", len(activePeers)) + return nil + } // backup the list of active peers _, err = c.psqlPool.Exec( diff --git a/pkg/db/postgresql/crawler_metrics.go b/pkg/db/postgresql/crawler_metrics.go index 435d848..071e5f2 100644 --- a/pkg/db/postgresql/crawler_metrics.go +++ b/pkg/db/postgresql/crawler_metrics.go @@ -6,6 +6,10 @@ import ( log "github.com/sirupsen/logrus" ) +var ( + LastActivityValidRange = 180 // 6 Months +) + // this file contains all the list of queries to extract the metrics from the Crawler (as agnostic as possible from the network) // Basic call over the whole list of non-deprecated peers @@ -20,14 +24,19 @@ func (db *DBClient) GetClientDistribution() (map[string]interface{}, error) { client_name, count(client_name) as count FROM peer_info WHERE - deprecated = 'false' and attempted = 'true' and client_name IS NOT NULL + deprecated = 'false' and + attempted = 'true' and + client_name IS NOT NULL and + to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') GROUP BY client_name ORDER BY count DESC; `, + LastActivityValidRange, ) // make sure we close the rows and we free the connection/session defer rows.Close() if err != nil { + fmt.Print("\n", err.Error()) return cliDist, errors.Wrap(err, "unable to fetch client distribution") } @@ -57,10 +66,14 @@ func (db *DBClient) GetVersionDistribution() (map[string]interface{}, error) { count(client_version) as cnt FROM peer_info WHERE - deprecated = 'false' and attempted = 'true' and client_name IS NOT NULL + deprecated = 'false' and + attempted = 'true' and + client_name IS NOT NULL and + to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') GROUP BY client_name, client_version ORDER BY client_name DESC, cnt DESC; `, + LastActivityValidRange, ) // make sure we close the rows and we free the connection/session defer rows.Close() @@ -99,11 +112,15 @@ func (db *DBClient) GetGeoDistribution() (map[string]interface{}, error) { ips.country_code FROM peer_info RIGHT JOIN ips on peer_info.ip = ips.ip - WHERE deprecated = 'false' and attempted = 'true' and client_name IS NOT NULL + WHERE deprecated = 'false' and + attempted = 'true' and + client_name IS NOT NULL and + to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') ) as aux GROUP BY country_code ORDER BY cnt DESC; `, + LastActivityValidRange, ) // make sure we close the rows and we free the connection/session defer rows.Close() @@ -133,10 +150,15 @@ func (db *DBClient) GetOsDistribution() (map[string]interface{}, error) { client_os, count(client_os) as nodes FROM peer_info - WHERE deprecated='false' and attempted='true' and client_name IS NOT NULL + WHERE deprecated='false' and + attempted='true' and + client_name IS NOT NULL and + to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') GROUP BY client_os ORDER BY nodes DESC; - `) + `, + LastActivityValidRange, + ) if err != nil { return summary, err } @@ -158,10 +180,15 @@ func (db *DBClient) GetArchDistribution() (map[string]interface{}, error) { client_arch, count(client_arch) as nodes FROM peer_info - WHERE deprecated='false' and attempted='true' and client_name IS NOT NULL + WHERE deprecated='false' and + attempted='true' and + client_name IS NOT NULL and + to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') GROUP BY client_arch ORDER BY nodes DESC; - `) + `, + LastActivityValidRange, + ) if err != nil { return summary, err } @@ -193,9 +220,15 @@ func (db *DBClient) GetHostingDistribution() (map[string]interface{}, error) { ips.mobile FROM peer_info as pi INNER JOIN ips ON pi.ip=ips.ip - WHERE pi.deprecated='false' and attempted = 'true' and client_name IS NOT NULL and ips.mobile='true' + WHERE pi.deprecated='false' and + attempted = 'true' and + client_name IS NOT NULL and + ips.mobile='true' and + to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') ) as aux - `).Scan(&mobile) + `, + LastActivityValidRange, + ).Scan(&mobile) if err != nil { return summary, err } @@ -218,9 +251,14 @@ func (db *DBClient) GetHostingDistribution() (map[string]interface{}, error) { ips.proxy FROM peer_info as pi INNER JOIN ips ON pi.ip=ips.ip - WHERE pi.deprecated='false' and attempted = 'true' and client_name IS NOT NULL and ips.proxy='true' + WHERE pi.deprecated='false' and + attempted = 'true' and + client_name IS NOT NULL and ips.proxy='true' and + to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') ) as aux - `).Scan(&proxy) + `, + LastActivityValidRange, + ).Scan(&proxy) if err != nil { return summary, err } @@ -243,9 +281,15 @@ func (db *DBClient) GetHostingDistribution() (map[string]interface{}, error) { ips.hosting FROM peer_info as pi INNER JOIN ips ON pi.ip=ips.ip - WHERE pi.deprecated='false' and attempted = 'true' and client_name IS NOT NULL and ips.hosting='true' + WHERE pi.deprecated='false' and + attempted = 'true' and + client_name IS NOT NULL and + ips.hosting='true' and + to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') ) as aux - `).Scan(&hosted) + `, + LastActivityValidRange, + ).Scan(&hosted) if err != nil { return summary, err } @@ -278,12 +322,14 @@ func (db *DBClient) GetRTTDistribution() (map[string]interface{}, error) { ELSE '+1s' END as latency FROM peer_info - WHERE deprecated=false and client_name IS NOT NULL + WHERE deprecated=false and + client_name IS NOT NULL and + to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') ) as t GROUP BY t.latency ORDER BY nodes DESC; - `, + LastActivityValidRange, ) if err != nil { return summary, err @@ -295,11 +341,11 @@ func (db *DBClient) GetRTTDistribution() (map[string]interface{}, error) { err = rows.Scan( &rttRange, &rttValue, - ) + ) if err != nil { - return summary, err + return summary, err } - summary[rttRange] = rttValue + summary[rttRange] = rttValue } return summary, nil } @@ -318,13 +364,16 @@ func (db *DBClient) GetIPDistribution() (map[string]interface{}, error) { ip, count(ip) as nodes FROM peer_info - WHERE deprecated = false and client_name IS NOT NULL + WHERE deprecated = false and + client_name IS NOT NULL and + to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') GROUP BY ip ORDER BY nodes DESC ) as t GROUP BY nodes ORDER BY number_of_ips DESC; `, + LastActivityValidRange, ) if err != nil { return summary, err @@ -336,11 +385,11 @@ func (db *DBClient) GetIPDistribution() (map[string]interface{}, error) { err = rows.Scan( &nodesPerIP, &ips, - ) + ) if err != nil { - return summary, err + return summary, err } - summary[fmt.Sprintf("%d", nodesPerIP)] = ips + summary[fmt.Sprintf("%d", nodesPerIP)] = ips } return summary, nil } diff --git a/pkg/db/postgresql/peer_info.go b/pkg/db/postgresql/peer_info.go index 8ab50ad..2efe843 100644 --- a/pkg/db/postgresql/peer_info.go +++ b/pkg/db/postgresql/peer_info.go @@ -3,7 +3,7 @@ package postgresql import ( "time" - "github.com/jackc/pgx/v4" + pgx "github.com/jackc/pgx/v4" "github.com/libp2p/go-libp2p-core/peer" "github.com/migalabs/armiarma/pkg/db/models" "github.com/migalabs/armiarma/pkg/utils" diff --git a/pkg/db/postgresql/server.go b/pkg/db/postgresql/server.go index 7455931..7cd3e0f 100644 --- a/pkg/db/postgresql/server.go +++ b/pkg/db/postgresql/server.go @@ -26,7 +26,6 @@ var ( noQueryResult string = "no result" ) - type DBClient struct { // Control Variables ctx context.Context @@ -49,11 +48,11 @@ type DBClient struct { } func NewDBClient( -ctx context.Context, -p2pNetwork utils.NetworkType, -loginStr string, -dailyBackupInt time.Duration, -options ...DBOption) (*DBClient, error) { + ctx context.Context, + p2pNetwork utils.NetworkType, + loginStr string, + dailyBackupInt time.Duration, + options ...DBOption) (*DBClient, error) { // check if the login string has enough len if len(loginStr) == 0 { return nil, errors.New("empty db-endpoint provided") @@ -102,7 +101,7 @@ options ...DBOption) (*DBClient, error) { persistC: persistC, doneC: make(chan struct{}), wg: &wg, - persistConnEvents: true, + persistConnEvents: true, } // Check for all the available options @@ -336,6 +335,11 @@ func (c *DBClient) launchPersister() { } func (c *DBClient) dailyBackupheartbeat() { + // make a first backup of the active peers(if any) + err := c.activePeersBackup() + if err != nil { + log.Error(err) + } ticker := time.NewTicker(c.dailyBackupInterval) for { select {