From 5d857f5770894d4b2c7f4b8b343864ef99d7c363 Mon Sep 17 00:00:00 2001 From: Mikel Cortes Date: Tue, 21 Nov 2023 17:35:07 +0000 Subject: [PATCH 1/4] update definition of active peers --- pkg/db/postgresql/active_peers_backup.go | 3 +- pkg/db/postgresql/crawler_metrics.go | 62 +++++++++++++++--------- pkg/db/postgresql/peer_info.go | 2 +- 3 files changed, 43 insertions(+), 24 deletions(-) diff --git a/pkg/db/postgresql/active_peers_backup.go b/pkg/db/postgresql/active_peers_backup.go index dc325ba..538496f 100644 --- a/pkg/db/postgresql/active_peers_backup.go +++ b/pkg/db/postgresql/active_peers_backup.go @@ -46,8 +46,9 @@ func (c *DBClient) getActivePeers() ([]int, error) { id, peer_id FROM peer_info - WHERE deprecated = 'false' and attempted = 'true' and client_name IS NOT NULL + WHERE deprecated = 'false' and attempted = 'true' and client_name IS NOT NULL and to_timestamp(last_activity) > CURRENT_TIMESTAMP - INTERVAL '$1 DAY' `, + LastActivityValidRange, ) if err != nil { return activePeers, errors.Wrap(err, "unable to retrieve active peer's ids") diff --git a/pkg/db/postgresql/crawler_metrics.go b/pkg/db/postgresql/crawler_metrics.go index 435d848..1bdc924 100644 --- a/pkg/db/postgresql/crawler_metrics.go +++ b/pkg/db/postgresql/crawler_metrics.go @@ -6,6 +6,10 @@ import ( log "github.com/sirupsen/logrus" ) +var ( + LastActivityValidRange = 180 // 6 Months +) + // this file contains all the list of queries to extract the metrics from the Crawler (as agnostic as possible from the network) // Basic call over the whole list of non-deprecated peers @@ -20,10 +24,11 @@ func (db *DBClient) GetClientDistribution() (map[string]interface{}, error) { client_name, count(client_name) as count FROM peer_info WHERE - deprecated = 'false' and attempted = 'true' and client_name IS NOT NULL + deprecated = 'false' and attempted = 'true' and client_name IS NOT NULL and to_timestamp(last_activity) > CURRENT_TIMESTAMP - INTERVAL '$1 DAY'; GROUP BY client_name ORDER BY count DESC; `, + LastActivityValidRange, ) // make sure we close the rows and we free the connection/session defer rows.Close() @@ -57,10 +62,11 @@ func (db *DBClient) GetVersionDistribution() (map[string]interface{}, error) { count(client_version) as cnt FROM peer_info WHERE - deprecated = 'false' and attempted = 'true' and client_name IS NOT NULL + deprecated = 'false' and attempted = 'true' and client_name IS NOT NULL and to_timestamp(last_activity) > CURRENT_TIMESTAMP - INTERVAL '$1 DAY' GROUP BY client_name, client_version ORDER BY client_name DESC, cnt DESC; `, + LastActivityValidRange, ) // make sure we close the rows and we free the connection/session defer rows.Close() @@ -99,11 +105,12 @@ func (db *DBClient) GetGeoDistribution() (map[string]interface{}, error) { ips.country_code FROM peer_info RIGHT JOIN ips on peer_info.ip = ips.ip - WHERE deprecated = 'false' and attempted = 'true' and client_name IS NOT NULL + WHERE deprecated = 'false' and attempted = 'true' and client_name IS NOT NULL and to_timestamp(last_activity) > CURRENT_TIMESTAMP - INTERVAL '$1 DAY' ) as aux GROUP BY country_code ORDER BY cnt DESC; `, + LastActivityValidRange, ) // make sure we close the rows and we free the connection/session defer rows.Close() @@ -133,10 +140,12 @@ func (db *DBClient) GetOsDistribution() (map[string]interface{}, error) { client_os, count(client_os) as nodes FROM peer_info - WHERE deprecated='false' and attempted='true' and client_name IS NOT NULL + WHERE deprecated='false' and attempted='true' and client_name IS NOT NULL and to_timestamp(last_activity) > CURRENT_TIMESTAMP - INTERVAL '$1 DAY' GROUP BY client_os ORDER BY nodes DESC; - `) + `, + LastActivityValidRange, + ) if err != nil { return summary, err } @@ -158,10 +167,12 @@ func (db *DBClient) GetArchDistribution() (map[string]interface{}, error) { client_arch, count(client_arch) as nodes FROM peer_info - WHERE deprecated='false' and attempted='true' and client_name IS NOT NULL + WHERE deprecated='false' and attempted='true' and client_name IS NOT NULL and to_timestamp(last_activity) > CURRENT_TIMESTAMP - INTERVAL '$1 DAY' GROUP BY client_arch ORDER BY nodes DESC; - `) + `, + LastActivityValidRange, + ) if err != nil { return summary, err } @@ -193,9 +204,11 @@ func (db *DBClient) GetHostingDistribution() (map[string]interface{}, error) { ips.mobile FROM peer_info as pi INNER JOIN ips ON pi.ip=ips.ip - WHERE pi.deprecated='false' and attempted = 'true' and client_name IS NOT NULL and ips.mobile='true' + WHERE pi.deprecated='false' and attempted = 'true' and client_name IS NOT NULL and ips.mobile='true' and to_timestamp(last_activity) > CURRENT_TIMESTAMP - INTERVAL '$1 DAY' ) as aux - `).Scan(&mobile) + `, + LastActivityValidRange, + ).Scan(&mobile) if err != nil { return summary, err } @@ -218,9 +231,11 @@ func (db *DBClient) GetHostingDistribution() (map[string]interface{}, error) { ips.proxy FROM peer_info as pi INNER JOIN ips ON pi.ip=ips.ip - WHERE pi.deprecated='false' and attempted = 'true' and client_name IS NOT NULL and ips.proxy='true' + WHERE pi.deprecated='false' and attempted = 'true' and client_name IS NOT NULL and ips.proxy='true' and to_timestamp(last_activity) > CURRENT_TIMESTAMP - INTERVAL '$1 DAY' ) as aux - `).Scan(&proxy) + `, + LastActivityValidRange, + ).Scan(&proxy) if err != nil { return summary, err } @@ -243,9 +258,11 @@ func (db *DBClient) GetHostingDistribution() (map[string]interface{}, error) { ips.hosting FROM peer_info as pi INNER JOIN ips ON pi.ip=ips.ip - WHERE pi.deprecated='false' and attempted = 'true' and client_name IS NOT NULL and ips.hosting='true' + WHERE pi.deprecated='false' and attempted = 'true' and client_name IS NOT NULL and ips.hosting='true' and to_timestamp(last_activity) > CURRENT_TIMESTAMP - INTERVAL '$1 DAY' ) as aux - `).Scan(&hosted) + `, + LastActivityValidRange, + ).Scan(&hosted) if err != nil { return summary, err } @@ -278,12 +295,12 @@ func (db *DBClient) GetRTTDistribution() (map[string]interface{}, error) { ELSE '+1s' END as latency FROM peer_info - WHERE deprecated=false and client_name IS NOT NULL + WHERE deprecated=false and client_name IS NOT NULL and to_timestamp(last_activity) > CURRENT_TIMESTAMP - INTERVAL '$1 DAY' ) as t GROUP BY t.latency ORDER BY nodes DESC; - `, + LastActivityValidRange, ) if err != nil { return summary, err @@ -295,11 +312,11 @@ func (db *DBClient) GetRTTDistribution() (map[string]interface{}, error) { err = rows.Scan( &rttRange, &rttValue, - ) + ) if err != nil { - return summary, err + return summary, err } - summary[rttRange] = rttValue + summary[rttRange] = rttValue } return summary, nil } @@ -318,13 +335,14 @@ func (db *DBClient) GetIPDistribution() (map[string]interface{}, error) { ip, count(ip) as nodes FROM peer_info - WHERE deprecated = false and client_name IS NOT NULL + WHERE deprecated = false and client_name IS NOT NULL and to_timestamp(last_activity) > CURRENT_TIMESTAMP - INTERVAL '$1 DAY' GROUP BY ip ORDER BY nodes DESC ) as t GROUP BY nodes ORDER BY number_of_ips DESC; `, + LastActivityValidRange, ) if err != nil { return summary, err @@ -336,11 +354,11 @@ func (db *DBClient) GetIPDistribution() (map[string]interface{}, error) { err = rows.Scan( &nodesPerIP, &ips, - ) + ) if err != nil { - return summary, err + return summary, err } - summary[fmt.Sprintf("%d", nodesPerIP)] = ips + summary[fmt.Sprintf("%d", nodesPerIP)] = ips } return summary, nil } diff --git a/pkg/db/postgresql/peer_info.go b/pkg/db/postgresql/peer_info.go index 8ab50ad..2efe843 100644 --- a/pkg/db/postgresql/peer_info.go +++ b/pkg/db/postgresql/peer_info.go @@ -3,7 +3,7 @@ package postgresql import ( "time" - "github.com/jackc/pgx/v4" + pgx "github.com/jackc/pgx/v4" "github.com/libp2p/go-libp2p-core/peer" "github.com/migalabs/armiarma/pkg/db/models" "github.com/migalabs/armiarma/pkg/utils" From f243be54db074f854d9cb9b44f9f4dd863fe1354 Mon Sep 17 00:00:00 2001 From: Mikel Cortes Date: Tue, 21 Nov 2023 17:37:57 +0000 Subject: [PATCH 2/4] add straigth away persistance of the active peer idxs --- pkg/db/postgresql/active_peers_backup.go | 4 ++++ pkg/db/postgresql/server.go | 18 +++++++++++------- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/pkg/db/postgresql/active_peers_backup.go b/pkg/db/postgresql/active_peers_backup.go index 538496f..a5b7844 100644 --- a/pkg/db/postgresql/active_peers_backup.go +++ b/pkg/db/postgresql/active_peers_backup.go @@ -74,6 +74,10 @@ func (c *DBClient) activePeersBackup() error { if err != nil { return errors.Wrap(err, "unable to backup active peers") } + if len(activePeers) <= 0 { + log.Infof("tried to persist %d active peers (skipped)", len(activePeers)) + return nil + } // backup the list of active peers _, err = c.psqlPool.Exec( diff --git a/pkg/db/postgresql/server.go b/pkg/db/postgresql/server.go index 7455931..7cd3e0f 100644 --- a/pkg/db/postgresql/server.go +++ b/pkg/db/postgresql/server.go @@ -26,7 +26,6 @@ var ( noQueryResult string = "no result" ) - type DBClient struct { // Control Variables ctx context.Context @@ -49,11 +48,11 @@ type DBClient struct { } func NewDBClient( -ctx context.Context, -p2pNetwork utils.NetworkType, -loginStr string, -dailyBackupInt time.Duration, -options ...DBOption) (*DBClient, error) { + ctx context.Context, + p2pNetwork utils.NetworkType, + loginStr string, + dailyBackupInt time.Duration, + options ...DBOption) (*DBClient, error) { // check if the login string has enough len if len(loginStr) == 0 { return nil, errors.New("empty db-endpoint provided") @@ -102,7 +101,7 @@ options ...DBOption) (*DBClient, error) { persistC: persistC, doneC: make(chan struct{}), wg: &wg, - persistConnEvents: true, + persistConnEvents: true, } // Check for all the available options @@ -336,6 +335,11 @@ func (c *DBClient) launchPersister() { } func (c *DBClient) dailyBackupheartbeat() { + // make a first backup of the active peers(if any) + err := c.activePeersBackup() + if err != nil { + log.Error(err) + } ticker := time.NewTicker(c.dailyBackupInterval) for { select { From c8a3bbf36e46853ec7ad89ff058f321cee58bac8 Mon Sep 17 00:00:00 2001 From: Mikel Cortes Date: Tue, 21 Nov 2023 18:31:40 +0000 Subject: [PATCH 3/4] update dynamic Activity_Interval --- pkg/db/postgresql/active_peers_backup.go | 3 +- pkg/db/postgresql/crawler_metrics.go | 49 +++++++++++++++++++----- 2 files changed, 40 insertions(+), 12 deletions(-) diff --git a/pkg/db/postgresql/active_peers_backup.go b/pkg/db/postgresql/active_peers_backup.go index a5b7844..5372d65 100644 --- a/pkg/db/postgresql/active_peers_backup.go +++ b/pkg/db/postgresql/active_peers_backup.go @@ -46,7 +46,7 @@ func (c *DBClient) getActivePeers() ([]int, error) { id, peer_id FROM peer_info - WHERE deprecated = 'false' and attempted = 'true' and client_name IS NOT NULL and to_timestamp(last_activity) > CURRENT_TIMESTAMP - INTERVAL '$1 DAY' + WHERE deprecated = 'false' and attempted = 'true' and client_name IS NOT NULL and to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') `, LastActivityValidRange, ) @@ -63,7 +63,6 @@ func (c *DBClient) getActivePeers() ([]int, error) { } activePeers = append(activePeers, id) } - return activePeers, nil } diff --git a/pkg/db/postgresql/crawler_metrics.go b/pkg/db/postgresql/crawler_metrics.go index 1bdc924..5bc0888 100644 --- a/pkg/db/postgresql/crawler_metrics.go +++ b/pkg/db/postgresql/crawler_metrics.go @@ -24,7 +24,10 @@ func (db *DBClient) GetClientDistribution() (map[string]interface{}, error) { client_name, count(client_name) as count FROM peer_info WHERE - deprecated = 'false' and attempted = 'true' and client_name IS NOT NULL and to_timestamp(last_activity) > CURRENT_TIMESTAMP - INTERVAL '$1 DAY'; + deprecated = 'false' and + attempted = 'true' and + client_name IS NOT NULL and + to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') GROUP BY client_name ORDER BY count DESC; `, @@ -33,6 +36,7 @@ func (db *DBClient) GetClientDistribution() (map[string]interface{}, error) { // make sure we close the rows and we free the connection/session defer rows.Close() if err != nil { + fmt.Print("\n", err.Error()) return cliDist, errors.Wrap(err, "unable to fetch client distribution") } @@ -62,7 +66,10 @@ func (db *DBClient) GetVersionDistribution() (map[string]interface{}, error) { count(client_version) as cnt FROM peer_info WHERE - deprecated = 'false' and attempted = 'true' and client_name IS NOT NULL and to_timestamp(last_activity) > CURRENT_TIMESTAMP - INTERVAL '$1 DAY' + deprecated = 'false' and + attempted = 'true' and + client_name IS NOT NULL and + to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') GROUP BY client_name, client_version ORDER BY client_name DESC, cnt DESC; `, @@ -105,7 +112,10 @@ func (db *DBClient) GetGeoDistribution() (map[string]interface{}, error) { ips.country_code FROM peer_info RIGHT JOIN ips on peer_info.ip = ips.ip - WHERE deprecated = 'false' and attempted = 'true' and client_name IS NOT NULL and to_timestamp(last_activity) > CURRENT_TIMESTAMP - INTERVAL '$1 DAY' + WHERE deprecated = 'false' and + attempted = 'true' and + client_name IS NOT NULL and + to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') ) as aux GROUP BY country_code ORDER BY cnt DESC; @@ -140,7 +150,10 @@ func (db *DBClient) GetOsDistribution() (map[string]interface{}, error) { client_os, count(client_os) as nodes FROM peer_info - WHERE deprecated='false' and attempted='true' and client_name IS NOT NULL and to_timestamp(last_activity) > CURRENT_TIMESTAMP - INTERVAL '$1 DAY' + WHERE deprecated='false' and + attempted='true' and + client_name IS NOT NULL and + to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') GROUP BY client_os ORDER BY nodes DESC; `, @@ -167,7 +180,10 @@ func (db *DBClient) GetArchDistribution() (map[string]interface{}, error) { client_arch, count(client_arch) as nodes FROM peer_info - WHERE deprecated='false' and attempted='true' and client_name IS NOT NULL and to_timestamp(last_activity) > CURRENT_TIMESTAMP - INTERVAL '$1 DAY' + WHERE deprecated='false' and + attempted='true' and + client_name IS NOT NULL and + to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') GROUP BY client_arch ORDER BY nodes DESC; `, @@ -204,7 +220,10 @@ func (db *DBClient) GetHostingDistribution() (map[string]interface{}, error) { ips.mobile FROM peer_info as pi INNER JOIN ips ON pi.ip=ips.ip - WHERE pi.deprecated='false' and attempted = 'true' and client_name IS NOT NULL and ips.mobile='true' and to_timestamp(last_activity) > CURRENT_TIMESTAMP - INTERVAL '$1 DAY' + WHERE pi.deprecated='false' and + attempted = 'true' and + client_name IS NOT NULL and + ips.mobile='true' and to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') ) as aux `, LastActivityValidRange, @@ -231,7 +250,10 @@ func (db *DBClient) GetHostingDistribution() (map[string]interface{}, error) { ips.proxy FROM peer_info as pi INNER JOIN ips ON pi.ip=ips.ip - WHERE pi.deprecated='false' and attempted = 'true' and client_name IS NOT NULL and ips.proxy='true' and to_timestamp(last_activity) > CURRENT_TIMESTAMP - INTERVAL '$1 DAY' + WHERE pi.deprecated='false' and + attempted = 'true' and + client_name IS NOT NULL and ips.proxy='true' and + to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') ) as aux `, LastActivityValidRange, @@ -258,7 +280,10 @@ func (db *DBClient) GetHostingDistribution() (map[string]interface{}, error) { ips.hosting FROM peer_info as pi INNER JOIN ips ON pi.ip=ips.ip - WHERE pi.deprecated='false' and attempted = 'true' and client_name IS NOT NULL and ips.hosting='true' and to_timestamp(last_activity) > CURRENT_TIMESTAMP - INTERVAL '$1 DAY' + WHERE pi.deprecated='false' and + attempted = 'true' and + client_name IS NOT NULL and + ips.hosting='true' and to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') ) as aux `, LastActivityValidRange, @@ -295,7 +320,9 @@ func (db *DBClient) GetRTTDistribution() (map[string]interface{}, error) { ELSE '+1s' END as latency FROM peer_info - WHERE deprecated=false and client_name IS NOT NULL and to_timestamp(last_activity) > CURRENT_TIMESTAMP - INTERVAL '$1 DAY' + WHERE deprecated=false and + client_name IS NOT NULL and + to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') ) as t GROUP BY t.latency ORDER BY nodes DESC; @@ -335,7 +362,9 @@ func (db *DBClient) GetIPDistribution() (map[string]interface{}, error) { ip, count(ip) as nodes FROM peer_info - WHERE deprecated = false and client_name IS NOT NULL and to_timestamp(last_activity) > CURRENT_TIMESTAMP - INTERVAL '$1 DAY' + WHERE deprecated = false and + client_name IS NOT NULL and + to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') GROUP BY ip ORDER BY nodes DESC ) as t From a09277b3a7de47d74497c10fddb0fa42a8cc0212 Mon Sep 17 00:00:00 2001 From: Mikel Cortes Date: Wed, 22 Nov 2023 14:25:34 +0000 Subject: [PATCH 4/4] format sql queries --- pkg/db/postgresql/crawler_metrics.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/db/postgresql/crawler_metrics.go b/pkg/db/postgresql/crawler_metrics.go index 5bc0888..071e5f2 100644 --- a/pkg/db/postgresql/crawler_metrics.go +++ b/pkg/db/postgresql/crawler_metrics.go @@ -223,7 +223,8 @@ func (db *DBClient) GetHostingDistribution() (map[string]interface{}, error) { WHERE pi.deprecated='false' and attempted = 'true' and client_name IS NOT NULL and - ips.mobile='true' and to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') + ips.mobile='true' and + to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') ) as aux `, LastActivityValidRange, @@ -283,7 +284,8 @@ func (db *DBClient) GetHostingDistribution() (map[string]interface{}, error) { WHERE pi.deprecated='false' and attempted = 'true' and client_name IS NOT NULL and - ips.hosting='true' and to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') + ips.hosting='true' and + to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY') ) as aux `, LastActivityValidRange,