Skip to content

Commit

Permalink
feat:added integration tests for archiver, and tests in CI
Browse files Browse the repository at this point in the history
  • Loading branch information
yellowHatpro committed Aug 26, 2024
1 parent c8c59ba commit 005f981
Show file tree
Hide file tree
Showing 10 changed files with 277 additions and 110 deletions.
56 changes: 52 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ on:

env:
CARGO_TERM_COLOR: always
SQLX_OFFLINE: true

jobs:
test:
Expand All @@ -20,32 +19,81 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-latest, windows-latest, macos-latest]
services:
postgres:
image: postgres:13
env:
POSTGRES_USER: musicbrainz
POSTGRES_PASSWORD: musicbrainz
POSTGRES_DB: musicbrainz_db
ports:
- 5433:5432 # Change host port to 5433, container port stays 5432
options: >-
--health-cmd "pg_isready -U musicbrainz"
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@v4

- name: Install PostgreSQL Client
if: runner.os == 'Linux'
run: |
sudo apt-get update
sudo apt-get install -y postgresql-client
- name: Wait for PostgreSQL
run: |
until pg_isready -h localhost -p 5433 -U musicbrainz; do
echo "Waiting for PostgreSQL to start..."
sleep 1
done
- name: Populate Test Data
env:
PGPASSWORD: musicbrainz
run: |
for sql_file in ./tests/fixtures/*.sql; do
psql -h localhost -p 5433 -U musicbrainz -d musicbrainz_db -f "$sql_file"
done
- name: Verify Table Existence
env:
PGPASSWORD: musicbrainz
run: |
psql -h localhost -p 5433 -U musicbrainz -d musicbrainz_db -c '\dt external_url_archiver.internet_archive_urls'
- uses: actions-rust-lang/setup-rust-toolchain@v1
with:
cache: false

- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true

- name: Formatting
uses: clechasseur/rs-cargo@v2
with:
command: fmt
args: --check
continue-on-error: true

- name: Cargo Check
uses: clechasseur/rs-cargo@v2
with:
command: check
args: --all-targets --all-features --locked
continue-on-error: true

- name: Linting
uses: clechasseur/rs-cargo@v2
with:
command: clippy
args: --all-targets --all-features --locked -- -D warnings
continue-on-error: true
# - name: Tests
# run: |
# cargo test

- name: Run Tests
env:
DATABASE_URL: postgres://musicbrainz:musicbrainz@localhost:5433/musicbrainz_db
run: cargo test -- --nocapture
3 changes: 3 additions & 0 deletions src/archival/notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,7 @@ impl Notifier {
true
}
}
pub fn _get_notifier_index(&self) -> i32 {
self.start_notifier_from.unwrap()
}
}
6 changes: 3 additions & 3 deletions src/archival/tests/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ use sqlx::Error;
async fn test_get_first_index_to_start_notifier_from(pool: PgPool) -> Result<(), Error> {
let first_index_to_start_notifier_from =
get_first_id_to_start_notifier_from(pool.clone()).await;
assert_eq!(first_index_to_start_notifier_from.unwrap(), 362);
assert_eq!(first_index_to_start_notifier_from.unwrap(), 12);
sqlx::query(
r#"
DELETE FROM external_url_archiver.internet_archive_urls
WHERE id = 362;
WHERE id = 12;
"#,
)
.execute(&pool)
Expand All @@ -23,7 +23,7 @@ async fn test_get_first_index_to_start_notifier_from(pool: PgPool) -> Result<(),
get_first_id_to_start_notifier_from(pool.clone())
.await
.unwrap(),
363
13
);
Ok(())
}
Expand Down
37 changes: 36 additions & 1 deletion src/poller/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,41 @@ pub async fn get_edit_data_and_note_start_id(pool: &PgPool) -> Result<(i32, i32)
})
}

///This function checks if the URL is already present in `internet_archive_urls` table
pub async fn should_insert_url_to_internet_archive_urls(
url: &str,
pool: &PgPool,
) -> Result<bool, Error> {
if should_exclude_url(url) {
return Ok(false);
}
match is_url_exists(url, pool).await {
Ok(exists) => Ok(!exists), // If the URL exists, return false; otherwise, return true.
Err(_) => Ok(true), // On error, default to true
}
}

pub async fn is_url_exists(url: &str, pool: &PgPool) -> Result<bool, Error> {
let res: Option<(bool,)> = sqlx::query_as(
r#"
SELECT EXISTS (
SELECT 1 as present
FROM external_url_archiver.internet_archive_urls
WHERE url = $1
);
"#,
)
.bind(url)
.fetch_optional(pool)
.await?;
if res.is_some() {
let bool_val = res.unwrap().0;
Ok(bool_val)
} else {
Ok(false)
}
}

/// This function takes input a URL string, and returns true if it should exclude the URL from saving
pub fn should_exclude_url(url: &str) -> bool {
// TODO: discuss and add keywords to identify URLs we want to exclude
Expand All @@ -196,7 +231,7 @@ pub async fn save_url_to_internet_archive_urls(
from_table_id: i32,
pool: &PgPool,
) -> Result<(), Error> {
if should_exclude_url(url) {
if !should_insert_url_to_internet_archive_urls(url, pool).await? {
return Ok(());
}
let query = r#"
Expand Down
1 change: 1 addition & 0 deletions src/structs/internet_archive_urls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ impl TryFrom<i32> for ArchivalStatus {
pub struct InternetArchiveUrls {
pub id: i32,
pub url: Option<String>,
#[allow(dead_code)]
pub job_id: Option<String>,
#[allow(dead_code)]
pub from_table: Option<String>,
Expand Down
110 changes: 110 additions & 0 deletions tests/archival/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
use mb_exurl_ia_service::archival;
use mb_exurl_ia_service::archival::error::ArchivalError;
use mb_exurl_ia_service::archival::listener::handle_payload;
use mb_exurl_ia_service::archival::notifier::Notifier;
use mb_exurl_ia_service::structs::internet_archive_urls::InternetArchiveUrls;
use sqlx::postgres::PgListener;
use sqlx::{Error, PgPool};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::time;
use tokio::time::Instant;

#[sqlx::test(fixtures(
"../fixtures/InternetArchiveUrls.sql",
"../fixtures/internet_archive_urls_dump.sql"
))]
async fn test_notifier(pool: PgPool) -> Result<(), Error> {
let mut notifier = Notifier::new(pool.clone()).await;
let mut current_id = 12;
let end_id = 70;
for _ in 0..100 {
if notifier.should_notify().await {
if current_id <= end_id {
assert_eq!(notifier._get_notifier_index(), current_id);
}
notifier.notify().await.unwrap();
current_id += 1;
if current_id <= end_id {
assert_eq!(notifier._get_notifier_index(), current_id);
}
}
}
Ok(())
}

#[sqlx::test(fixtures(
"../fixtures/InternetArchiveUrls.sql",
"../fixtures/internet_archive_urls_dump.sql"
))]
async fn test_archival(pool: PgPool) -> Result<(), ArchivalError> {
let notifier = Arc::new(Mutex::new(Notifier::new(pool.clone()).await));
let listener_pool = pool.clone();

// Spawn both tasks and use tokio::join! to run them concurrently
let (notifier_result, listener_result) = tokio::join!(
// Notifier task
tokio::spawn({
let notifier = Arc::clone(&notifier);
async move {
for _ in 0..40 {
let mut notifier = notifier.lock().await;
if notifier.should_notify().await {
notifier.notify().await.unwrap();
println!("{}", notifier._get_notifier_index());
}
}
}
}),
// Listener task
tokio::spawn(async move {
let mut listener = PgListener::connect_with(&listener_pool).await.unwrap();
listener.listen("archive_urls").await.unwrap();

let duration = Duration::from_secs(5 * 60);
let start_time = Instant::now();

// Loop until the specified duration has elapsed
while start_time.elapsed() < duration {
if let Some(notification) = listener.try_recv().await.unwrap() {
let payload: InternetArchiveUrls =
serde_json::from_str(notification.payload()).unwrap();
assert!(payload.url.is_some());
println!("{} {}", start_time.elapsed().as_secs(), payload.id);
handle_payload(payload, &listener_pool).await.unwrap();
}

// Sleep for 5 seconds between checks
time::sleep(Duration::from_secs(5)).await;
}

println!("Listener has run for 5 minutes and will now stop.");
})
);

// Check both results for errors
notifier_result.unwrap();
listener_result.unwrap();

Ok(())
}

#[sqlx::test(fixtures(
"../fixtures/InternetArchiveUrls.sql",
"../fixtures/internet_archive_urls_dump.sql"
))]
async fn test_cleanup_task(pool: PgPool) -> Result<(), ArchivalError> {
archival::retry::start(pool.clone()).await.unwrap();
let success_urls = sqlx::query_as::<_, InternetArchiveUrls>(
r#"
SELECT * FROM external_url_archiver.internet_archive_urls
WHERE status = 3;
"#,
)
.fetch_all(&pool)
.await?;
//Check cleanup
assert_eq!(success_urls.len(), 0);
Ok(())
}
2 changes: 1 addition & 1 deletion tests/fixtures/InternetArchiveUrls.sql
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ RETURNS INTEGER AS $$
rec RECORD;
count INTEGER := 0;
BEGIN
FOR rec IN SELECT * FROM internet_archive_urls WHERE id >= start_id ORDER BY id LIMIT 2
FOR rec IN SELECT * FROM external_url_archiver.internet_archive_urls WHERE id >= start_id ORDER BY id LIMIT 2
LOOP
PERFORM pg_notify('archive_urls', row_to_json(rec)::text);
count := count + 1;
Expand Down
Loading

0 comments on commit 005f981

Please sign in to comment.