Skip to content

Commit

Permalink
refactor: debug-based print statements
Browse files Browse the repository at this point in the history
  • Loading branch information
yellowHatpro committed Sep 22, 2024
1 parent d38dc34 commit e6318eb
Show file tree
Hide file tree
Showing 11 changed files with 124 additions and 74 deletions.
29 changes: 12 additions & 17 deletions src/app/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::archival;
use crate::archival::notifier::Notifier;
use crate::configuration::Settings;
use crate::configuration::SETTINGS;
use crate::poller::Poller;
use sqlx::PgPool;
use std::sync::Arc;
Expand Down Expand Up @@ -40,16 +40,15 @@ pub async fn start(pool: &PgPool) -> Result<(), sqlx::Error> {
///
/// ⚠️ This must be awaited twice. Once to get the `JoinHandle`, and a second to start the task
pub async fn spawn_poller_task(db_pool: PgPool) -> JoinHandle<()> {
let settings = Settings::new().expect("Config settings are not configured properly");
let mut poller = Poller::new(settings.poller_task.poll_interval, db_pool.clone())
let mut poller = Poller::new(SETTINGS.poller_task.poll_interval, db_pool.clone())
.await
.expect("Could not find rows in edit rows to start poller");
.expect("[POLLER] Could not find rows in edit rows to start poller");

tokio::spawn(async move {
if let Err(e) = poller.poll().await {
eprintln!("Cannot poll edit data and edit notes, due to: {}", e);
eprintln!("[POLLER] Task Failed, Error: {}", e);
sentry::capture_message(
format!("Cannot poll edit data and edit notes, due to: {}", e).as_str(),
format!("[POLLER] Task Failed, Error: {}", e).as_str(),
sentry::Level::Warning,
);
}
Expand All @@ -60,23 +59,20 @@ pub async fn spawn_poller_task(db_pool: PgPool) -> JoinHandle<()> {
///
/// ⚠️ This must be awaited twice. Once to get the `JoinHandle`, and a second to start the task
async fn spawn_notification_task(db_pool: PgPool) -> JoinHandle<()> {
let settings = Settings::new().expect("Config settings are not configured properly");
let notifier = Arc::new(Mutex::new(Notifier::new(db_pool.clone()).await));

tokio::spawn(async move {
let mut interval =
tokio::time::interval(Duration::from_secs(settings.notify_task.notify_interval));
tokio::time::interval(Duration::from_secs(SETTINGS.notify_task.notify_interval));

while !db_pool.is_closed() {
interval.tick().await;
let notifier = Arc::clone(&notifier);
let mut notifier = notifier.lock().await;

if notifier.should_notify().await {
println!("Notifying");

if let Err(e) = notifier.notify().await {
eprintln!("Notify failed, error: {}", e);
eprintln!("[NOTIFIER] Task Failed, Error: {}", e);
sentry::capture_error(&e);
};
}
Expand All @@ -93,7 +89,7 @@ async fn spawn_archiver_task(db_pool: PgPool) -> JoinHandle<()> {
.await
.unwrap_or_else(|e| {
sentry::capture_error(&e);
eprintln!("Listener Task Error {}", e)
eprintln!("[LISTENER] Task Failed, Error {}", e)
})
})
}
Expand All @@ -102,24 +98,23 @@ async fn spawn_archiver_task(db_pool: PgPool) -> JoinHandle<()> {
///
/// ⚠️ This must be awaited twice. Once to get the `JoinHandle`, and a second to start the task
async fn spawn_retry_and_cleanup_task(db_pool: PgPool) -> JoinHandle<()> {
let settings = Settings::new().expect("Config settings are not configured properly");
tokio::spawn(async move {
let mut interval =
tokio::time::interval(Duration::from_secs(settings.retry_task.retry_interval));
tokio::time::interval(Duration::from_secs(SETTINGS.retry_task.retry_interval));
while !db_pool.is_closed() {
interval.tick().await;
sentry::capture_message("Retry and Cleanup Task started", sentry::Level::Info);
sentry::capture_message("[RETRY_AND_CLEANUP] Task started", sentry::Level::Info);
let archival_retry_task = archival::retry::start(db_pool.clone()).await;
match archival_retry_task {
Ok(_) => {
sentry::capture_message(
"Retry and Cleanup Task Completed",
"[RETRY_AND_CLEANUP] Task Completed",
sentry::Level::Info,
);
}
Err(e) => {
sentry::capture_error(&e);
eprintln!("Retry and Cleanup Task failed, error: {}", e)
eprintln!("[RETRY_AND_CLEANUP] Task Failed, Error: {}", e)
}
}
}
Expand Down
5 changes: 2 additions & 3 deletions src/archival/client.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
use crate::configuration::Settings;
use crate::configuration::SETTINGS;
use once_cell::sync::Lazy;
use reqwest::{header, Client};

pub static REQWEST_CLIENT: Lazy<Client> = Lazy::new(|| {
let settings = Settings::new().expect("Config needed");
let mut headers = header::HeaderMap::new();
headers.insert("Accept", "application/json".parse().unwrap());
headers.insert(
"Authorization",
format!(
"LOW {}:{}",
settings.wayback_machine_api.myaccesskey, settings.wayback_machine_api.mysecret
SETTINGS.wayback_machine_api.myaccesskey, SETTINGS.wayback_machine_api.mysecret
)
.parse()
.unwrap(),
Expand Down
18 changes: 10 additions & 8 deletions src/archival/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use crate::archival::utils::{
};

use crate::archival::error::ArchivalError;
use crate::configuration::Settings;
use crate::configuration::SETTINGS;
use crate::debug_println;
use crate::metrics::Metrics;
use crate::structs::internet_archive_urls::{ArchivalStatus, InternetArchiveUrls};
use sentry::Level::Error;
Expand All @@ -15,14 +16,15 @@ use tokio::time;

/// Listens to the `archive_urls` postgres channel
pub async fn listen(pool: PgPool) -> Result<(), ArchivalError> {
let settings = Settings::new().expect("Config settings not configured properly");

let mut listener = PgListener::connect_with(&pool).await?;
listener.listen("archive_urls").await?;
loop {
while let Some(notification) = listener.try_recv().await? {
time::sleep(Duration::from_secs(settings.listen_task.listen_interval)).await;
println!("Notification Payload: {}", notification.payload());
time::sleep(Duration::from_secs(SETTINGS.listen_task.listen_interval)).await;
debug_println!(
"[LISTENER] Notification Payload: {}",
notification.payload()
);
let payload: InternetArchiveUrls = serde_json::from_str(notification.payload())?;
handle_payload(payload, &pool).await?
}
Expand Down Expand Up @@ -52,7 +54,7 @@ pub async fn handle_payload(
} else {
let archival_result = archive(url, url_row.id, pool).await;
if let Err(e) = archival_result {
eprintln!("Archival Error : {}", e);
eprintln!("[LISTENER] Archival Error for id {}: {}", url_row.id, e);
set_status_with_message(
pool,
id,
Expand All @@ -79,7 +81,7 @@ pub async fn archive(url: String, id: i32, pool: &PgPool) -> Result<(), Archival
inc_archive_request_retry_count(&status_pool, id)
.await
.unwrap_or_else(|e| {
eprintln!("Could not increment archive request retry count");
eprintln!("[LISTENER] Could not increment archive request retry count for internet_archive_urls id: {}", id);
sentry::capture_error(&e);
});
set_status_with_message(
Expand All @@ -90,7 +92,7 @@ pub async fn archive(url: String, id: i32, pool: &PgPool) -> Result<(), Archival
)
.await
.unwrap_or_else(|e| {
eprintln!("Could not increment archive request retry count");
eprintln!("[LISTENER] Could not increment archive request retry count for internet_archive_urls id: {}", id);
sentry::capture_error(&e);
});
sentry::capture_error(&e);
Expand Down
9 changes: 5 additions & 4 deletions src/archival/notifier.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::archival::utils::{get_first_id_to_start_notifier_from, is_row_exists};
use crate::debug_println;
use sqlx::{Error, PgPool};

pub struct Notifier {
Expand All @@ -10,7 +11,7 @@ impl Notifier {
pub async fn new(pool: PgPool) -> Notifier {
let start_notifier_from = get_first_id_to_start_notifier_from(pool.clone()).await;
if start_notifier_from.is_some() {
println!("Notifies starts from : {}", start_notifier_from.unwrap());
debug_println!("[NOTIFIER] starts from : {}", start_notifier_from.unwrap());
}
Notifier {
start_notifier_from,
Expand All @@ -27,8 +28,8 @@ impl Notifier {
.bind(self.start_notifier_from)
.execute(&pool)
.await?;
println!(
"[start_id from notify], {}",
debug_println!(
"[NOTIFIER] start notifier from internet_archive_urls id, {}",
self.start_notifier_from.unwrap()
);

Expand All @@ -39,7 +40,7 @@ impl Notifier {
Ok(())
} else {
//Case: It could be that there is no URL in InternetArchiveURL table when we call `notify`, so we check for the id here, to start notifier from it in the next notify call
println!("[NOTIFIER] No row detected, checking again");
debug_println!("[NOTIFIER] No row detected to archive, checking again");
self.start_notifier_from = get_first_id_to_start_notifier_from(self.pool.clone()).await;
Ok(())
}
Expand Down
36 changes: 29 additions & 7 deletions src/archival/retry.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use crate::archival::utils::check_if_permanent_error;
use crate::configuration::Settings;
use crate::configuration::SETTINGS;
use crate::debug_println;
use crate::structs::internet_archive_urls::{ArchivalStatus, InternetArchiveUrls};
use chrono::{Duration, Utc};
use sqlx::{Error, PgPool};
use std::ops::Sub;

/// Method started by `retry_and_cleanup` task, which reiterates `internet_archive_urls`, and according to the conditions, either re archives or cleans the row
pub async fn start(pool: PgPool) -> Result<(), Error> {
let settings = Settings::new().expect("Config settings not configured properly");
let select_limit = settings.retry_task.select_limit;
let select_limit = SETTINGS.retry_task.select_limit;
let mut last_id = 0;
loop {
let query = format!(
Expand All @@ -31,7 +31,7 @@ pub async fn start(pool: PgPool) -> Result<(), Error> {
}
last_id += select_limit;
}
println!("Retry/Cleanup Task Complete");
debug_println!("[RETRY_AND_CLEANUP] Task Complete");
Ok(())
}

Expand All @@ -40,7 +40,6 @@ pub async fn retry_and_cleanup_ia_row(
row: InternetArchiveUrls,
pool: &PgPool,
) -> Result<(), Error> {
let settings = Settings::new().expect("Config settings are not configured properly");
let current_time = Utc::now();
let created_at = row.created_at.unwrap();
let duration_since_creation = current_time.sub(created_at);
Expand All @@ -49,14 +48,19 @@ pub async fn retry_and_cleanup_ia_row(
// If the URL status is failed, then we can remove it from the table (Case when still can't archive after 3 retries)
Ok(ArchivalStatus::Failed) => {
if duration_since_creation
>= Duration::seconds(settings.retry_task.allow_remove_row_after)
>= Duration::seconds(SETTINGS.retry_task.allow_remove_row_after)
{
sqlx::query(
"DELETE FROM external_url_archiver.internet_archive_urls WHERE id = $1",
)
.bind(row.id)
.execute(pool)
.await?;
debug_println!(
"[RETRY_AND_CLEANUP] Removing failed URL: {} after {} seconds",
row.url.unwrap_or("<unknown>".to_string()),
SETTINGS.retry_task.allow_remove_row_after
)
}
}
Ok(ArchivalStatus::StatusError) => {
Expand All @@ -69,31 +73,49 @@ pub async fn retry_and_cleanup_ia_row(
.bind(row.id)
.execute(pool)
.await?;
debug_println!(
"[RETRY_AND_CLEANUP] Removing URL: {} due to Permanent Errors",
row.url.unwrap_or("<unknown>".to_string())
)
} else {
// If the archival status is null, which means the URL could not get archived earlier, therefore, enqueue the row to be sent to get archived
sqlx::query("SELECT external_url_archiver.notify_archive_urls($1)")
.bind(row.id)
.execute(pool)
.await?;
debug_println!(
"[RETRY_AND_CLEANUP] Retrying notifying URL: {}",
row.url.unwrap_or("<unknown>".to_string())
)
}
}
}
_ => {
// In any other case, if the URL has been there for more than the time limit, i.e. 24 hours, we will remove it, else re-archive it
if duration_since_creation
>= Duration::seconds(settings.retry_task.allow_remove_row_after)
>= Duration::seconds(SETTINGS.retry_task.allow_remove_row_after)
{
sqlx::query(
"DELETE FROM external_url_archiver.internet_archive_urls WHERE id = $1",
)
.bind(row.id)
.execute(pool)
.await?;
debug_println!(
"[RETRY_AND_CLEANUP] Removing URL: {} after {} seconds",
row.url.unwrap_or("<unknown>".to_string()),
SETTINGS.retry_task.allow_remove_row_after
)
} else if row.status.try_into() != Ok(ArchivalStatus::Success) {
sqlx::query("SELECT external_url_archiver.notify_archive_urls($1)")
.bind(row.id)
.execute(pool)
.await?;
debug_println!(
"[RETRY_AND_CLEANUP] Retrying URL: {} after {} seconds",
row.url.unwrap_or("<unknown>".to_string()),
SETTINGS.retry_task.allow_remove_row_after
)
}
}
}
Expand Down
9 changes: 4 additions & 5 deletions src/archival/tests/utils.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::*;
use crate::configuration::Settings;
use crate::configuration::SETTINGS;
use sqlx::Error;

#[sqlx::test(fixtures(
Expand Down Expand Up @@ -37,11 +37,10 @@ async fn test_make_archival_network_request() -> Result<(), ArchivalError> {
..Default::default()
};
let mut server = mockito::Server::new_with_opts_async(opts).await;
let settings = Settings::new().expect("Config settings are not configured properly");
let mock = server
.mock("POST", "/save")
.match_header("Accept", "application/json")
.match_header("Authorization", format!("LOW {}:{}", settings.wayback_machine_api.myaccesskey, settings.wayback_machine_api.mysecret).as_str())
.match_header("Authorization", format!("LOW {}:{}", SETTINGS.wayback_machine_api.myaccesskey, SETTINGS.wayback_machine_api.mysecret).as_str())
.match_header("Content-Type", "application/x-www-form-urlencoded")
.match_body(format!("url={}", testing_url_invalid).as_str())
.with_body(r#"{"message":"www.example.om URL syntax is not valid.","status":"error","status_ext":"error:invalid-url-syntax"}"#)
Expand All @@ -55,7 +54,7 @@ async fn test_make_archival_network_request() -> Result<(), ArchivalError> {
"Authorization",
format!(
"LOW {}:{}",
settings.wayback_machine_api.myaccesskey, settings.wayback_machine_api.mysecret
SETTINGS.wayback_machine_api.myaccesskey, SETTINGS.wayback_machine_api.mysecret
)
.as_str(),
)
Expand All @@ -71,7 +70,7 @@ async fn test_make_archival_network_request() -> Result<(), ArchivalError> {
"Authorization",
format!(
"LOW {}:{}",
settings.wayback_machine_api.myaccesskey, settings.wayback_machine_api.mysecret
SETTINGS.wayback_machine_api.myaccesskey, SETTINGS.wayback_machine_api.mysecret
)
.as_str(),
)
Expand Down
Loading

0 comments on commit e6318eb

Please sign in to comment.