diff --git a/backend/src/main/kotlin/fr/gouv/cnsp/monitorfish/infrastructure/api/public_api/PublicReportingController.kt b/backend/src/main/kotlin/fr/gouv/cnsp/monitorfish/infrastructure/api/public_api/PublicReportingController.kt new file mode 100644 index 0000000000..e20218ad28 --- /dev/null +++ b/backend/src/main/kotlin/fr/gouv/cnsp/monitorfish/infrastructure/api/public_api/PublicReportingController.kt @@ -0,0 +1,28 @@ +package fr.gouv.cnsp.monitorfish.infrastructure.api.public_api + +import fr.gouv.cnsp.monitorfish.domain.use_cases.reporting.ArchiveReporting +import io.swagger.v3.oas.annotations.Operation +import io.swagger.v3.oas.annotations.tags.Tag +import jakarta.websocket.server.PathParam +import org.springframework.web.bind.annotation.PathVariable +import org.springframework.web.bind.annotation.PutMapping +import org.springframework.web.bind.annotation.RequestMapping +import org.springframework.web.bind.annotation.RestController + +@RestController +@RequestMapping("/api/v1/reportings") +@Tag(name = "Public APIs for reporting") +class PublicReportingController( + private val archiveReporting: ArchiveReporting, +) { + + @PutMapping(value = ["/{reportingId}/archive"]) + @Operation(summary = "Archive a reporting") + fun archiveReporting( + @PathParam("Reporting id") + @PathVariable(name = "reportingId") + reportingId: Int, + ) { + archiveReporting.execute(reportingId) + } +} diff --git a/backend/src/test/kotlin/fr/gouv/cnsp/monitorfish/infrastructure/api/public_api/PublicReportingControllerITests.kt b/backend/src/test/kotlin/fr/gouv/cnsp/monitorfish/infrastructure/api/public_api/PublicReportingControllerITests.kt new file mode 100644 index 0000000000..c9a690eed7 --- /dev/null +++ b/backend/src/test/kotlin/fr/gouv/cnsp/monitorfish/infrastructure/api/public_api/PublicReportingControllerITests.kt @@ -0,0 +1,36 @@ +package fr.gouv.cnsp.monitorfish.infrastructure.api.public_api + +import fr.gouv.cnsp.monitorfish.config.OIDCProperties +import fr.gouv.cnsp.monitorfish.config.SecurityConfig +import fr.gouv.cnsp.monitorfish.config.SentryConfig +import fr.gouv.cnsp.monitorfish.domain.use_cases.reporting.ArchiveReporting +import org.junit.jupiter.api.Test +import org.mockito.Mockito +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.autoconfigure.web.servlet.WebMvcTest +import org.springframework.boot.test.mock.mockito.MockBean +import org.springframework.context.annotation.Import +import org.springframework.test.web.servlet.MockMvc +import org.springframework.test.web.servlet.request.MockMvcRequestBuilders.put +import org.springframework.test.web.servlet.result.MockMvcResultMatchers.status + +@Import(SecurityConfig::class, OIDCProperties::class, SentryConfig::class) +@WebMvcTest(value = [PublicReportingController::class]) +class PublicReportingControllerITests { + + @Autowired + private lateinit var api: MockMvc + + @MockBean + private lateinit var archiveReporting: ArchiveReporting + + @Test + fun `Should archive a reporting`() { + // When + api.perform(put("/api/v1/reportings/123/archive")) + // Then + .andExpect(status().isOk) + + Mockito.verify(archiveReporting).execute(123) + } +} diff --git a/datascience/config.py b/datascience/config.py index 69034b3b46..2aa0702162 100644 --- a/datascience/config.py +++ b/datascience/config.py @@ -140,6 +140,9 @@ PENDING_ALERT_VALIDATION_ENDPOINT_TEMPLATE = ( API_ENDPOINT + "operational_alerts/{pending_alert_id}/validate" ) +REPORTING_ARCHIVING_ENDPOINT_TEMPLATE = ( + API_ENDPOINT + "reportings/{reporting_id}/archive" +) # Backend api key BACKEND_API_KEY = os.environ.get("MONITORFISH_BACKEND_API_KEY") diff --git a/datascience/src/pipeline/flows/validate_pending_alerts.py b/datascience/src/pipeline/flows/validate_pending_alerts.py index be97781616..b5cb93fec6 100644 --- a/datascience/src/pipeline/flows/validate_pending_alerts.py +++ b/datascience/src/pipeline/flows/validate_pending_alerts.py @@ -4,7 +4,9 @@ from prefect.executors import LocalDaskExecutor from src.pipeline.shared_tasks.alerts import ( - extract_pending_alerts_ids_of_config_name, + archive_reporting, + extract_non_archived_reportings_ids_of_type, + extract_pending_alerts_ids_of_type, validate_pending_alert, ) from src.pipeline.shared_tasks.control_flow import check_flow_not_running @@ -13,8 +15,12 @@ flow_not_running = check_flow_not_running() with case(flow_not_running, True): - alert_config_name = Parameter("alert_config_name") - pending_alert_ids = extract_pending_alerts_ids_of_config_name(alert_config_name) - validate_pending_alert.map(pending_alert_ids) + alert_type = Parameter("alert_type") + pending_alert_ids = extract_pending_alerts_ids_of_type(alert_type) + validated_alerts = validate_pending_alert.map(pending_alert_ids) + reporting_ids = extract_non_archived_reportings_ids_of_type( + alert_type, upstream_tasks=[validated_alerts] + ) + archive_reporting.map(reporting_ids) flow.file_name = Path(__file__).name diff --git a/datascience/src/pipeline/flows_config.py b/datascience/src/pipeline/flows_config.py index 9c6668dc43..064bc5011a 100644 --- a/datascience/src/pipeline/flows_config.py +++ b/datascience/src/pipeline/flows_config.py @@ -245,7 +245,7 @@ clocks.CronClock( "50 6 * * *", parameter_defaults={ - "alert_config_name": "MISSING_FAR_ALERT", + "alert_type": "MISSING_FAR_ALERT", }, ), ] diff --git a/datascience/src/pipeline/queries/monitorfish/non_archived_reportings_of_type.sql b/datascience/src/pipeline/queries/monitorfish/non_archived_reportings_of_type.sql new file mode 100644 index 0000000000..c239eef14e --- /dev/null +++ b/datascience/src/pipeline/queries/monitorfish/non_archived_reportings_of_type.sql @@ -0,0 +1,3 @@ +SELECT id +FROM reportings +WHERE value->>'type' = :reporting_type AND NOT archived diff --git a/datascience/src/pipeline/queries/monitorfish/pending_alerts_of_config_name.sql b/datascience/src/pipeline/queries/monitorfish/pending_alerts_of_config_name.sql deleted file mode 100644 index 3c0539e928..0000000000 --- a/datascience/src/pipeline/queries/monitorfish/pending_alerts_of_config_name.sql +++ /dev/null @@ -1,3 +0,0 @@ -SELECT id -FROM pending_alerts -WHERE alert_config_name = :alert_config_name diff --git a/datascience/src/pipeline/queries/monitorfish/pending_alerts_of_type.sql b/datascience/src/pipeline/queries/monitorfish/pending_alerts_of_type.sql new file mode 100644 index 0000000000..7645e55aed --- /dev/null +++ b/datascience/src/pipeline/queries/monitorfish/pending_alerts_of_type.sql @@ -0,0 +1,3 @@ +SELECT id +FROM pending_alerts +WHERE value->>'type' = :alert_type diff --git a/datascience/src/pipeline/shared_tasks/alerts.py b/datascience/src/pipeline/shared_tasks/alerts.py index 869526111f..8bf54082a7 100644 --- a/datascience/src/pipeline/shared_tasks/alerts.py +++ b/datascience/src/pipeline/shared_tasks/alerts.py @@ -6,7 +6,10 @@ import requests from prefect import task -from config import PENDING_ALERT_VALIDATION_ENDPOINT_TEMPLATE +from config import ( + PENDING_ALERT_VALIDATION_ENDPOINT_TEMPLATE, + REPORTING_ARCHIVING_ENDPOINT_TEMPLATE, +) from src.db_config import create_engine from src.pipeline.generic_tasks import extract, load from src.pipeline.processing import ( @@ -29,21 +32,46 @@ def extract_silenced_alerts() -> pd.DataFrame: @task(checkpoint=False) -def extract_pending_alerts_ids_of_config_name(alert_config_name: str) -> List[int]: +def extract_pending_alerts_ids_of_type(alert_type: str) -> List[int]: """ - Return ids of pending alerts corresponding to `alert_config_name` + Return ids of pending alerts corresponding to `alert_type` """ logger = prefect.context.get("logger") pending_alerts = extract( db_name="monitorfish_remote", - query_filepath="monitorfish/pending_alerts_of_config_name.sql", - params={"alert_config_name": alert_config_name}, + query_filepath="monitorfish/pending_alerts_of_type.sql", + params={"alert_type": alert_type}, ) ids = pending_alerts.id.unique().tolist() logger.info(f"Returning {len(ids)} pending alerts ids.") return ids +@task(checkpoint=False) +def extract_non_archived_reportings_ids_of_type(reporting_type: str) -> List[int]: + """ + Return ids of pending alerts corresponding to `alert_type` + """ + logger = prefect.context.get("logger") + reportings = extract( + db_name="monitorfish_remote", + query_filepath="monitorfish/non_archived_reportings_of_type.sql", + params={"reporting_type": reporting_type}, + ) + ids = reportings.id.unique().tolist() + logger.info(f"Returning {len(ids)} reportings ids.") + return ids + + +@task(checkpoint=False) +def archive_reporting(id: int) -> pd.DataFrame: + logger = prefect.context.get("logger") + url = REPORTING_ARCHIVING_ENDPOINT_TEMPLATE.format(reporting_id=id) + logger.info(f"Archiving reporting {id}.") + r = requests.put(url) + r.raise_for_status() + + @task(checkpoint=False) def validate_pending_alert(id: int) -> pd.DataFrame: logger = prefect.context.get("logger") diff --git a/datascience/tests/test_data/remote_database/V666.20__Reset_test_reportings.sql b/datascience/tests/test_data/remote_database/V666.20__Reset_test_reportings.sql index ac0c6fb49c..95bb8d4ecc 100644 --- a/datascience/tests/test_data/remote_database/V666.20__Reset_test_reportings.sql +++ b/datascience/tests/test_data/remote_database/V666.20__Reset_test_reportings.sql @@ -1,5 +1,5 @@ DELETE FROM reportings; INSERT INTO reportings ( - type, vessel_id, internal_reference_number, external_reference_number, ircs, vessel_name, vessel_identifier, creation_date, validation_date, archived, deleted, flag_state, value) VALUES -( 'ALERT', 6, NULL, 'ZZTOPACDC', 'ZZ000000', 'I DO 4H REPORT', 'IRCS', NOW() - ('1 DAY')::interval, NOW(), false, false, 'FR' ,'{"seaFront": "NAMO", "riskFactor": 3.5647, "type": "THREE_MILES_TRAWLING_ALERT", "natinfCode": 7059}'::jsonb); + id, type, vessel_id, internal_reference_number, external_reference_number, ircs, vessel_name, vessel_identifier, creation_date, validation_date, archived, deleted, flag_state, value) VALUES +( 56, 'ALERT', 6, NULL, 'ZZTOPACDC', 'ZZ000000', 'I DO 4H REPORT', 'IRCS', NOW() - ('1 DAY')::interval, NOW(), false, false, 'FR' ,'{"seaFront": "NAMO", "riskFactor": 3.5647, "type": "THREE_MILES_TRAWLING_ALERT", "natinfCode": 7059}'::jsonb); diff --git a/datascience/tests/test_data/remote_database/V666.8__Reset_test_pending_alerts.sql b/datascience/tests/test_data/remote_database/V666.8__Reset_test_pending_alerts.sql index 05f5414752..cc389952bc 100644 --- a/datascience/tests/test_data/remote_database/V666.8__Reset_test_pending_alerts.sql +++ b/datascience/tests/test_data/remote_database/V666.8__Reset_test_pending_alerts.sql @@ -1,12 +1,14 @@ DELETE FROM pending_alerts; INSERT INTO pending_alerts ( + id, vessel_name, internal_reference_number, external_reference_number, ircs, vessel_identifier, creation_date, trip_number, flag_state, value, alert_config_name ) VALUES ( + 12, 'L''AMBRE', 'FRA000614250', 'GV614250', 'FUJW', 'INTERNAL_REFERENCE_NUMBER', '2021-12-23 16:03:00+00', NULL, 'FR', '{"type": "THREE_MILES_TRAWLING_ALERT", "seaFront": "NAMO"}', diff --git a/datascience/tests/test_pipeline/test_shared_tasks/test_alerts.py b/datascience/tests/test_pipeline/test_shared_tasks/test_alerts.py index d77387b161..a7cb0b6ffc 100644 --- a/datascience/tests/test_pipeline/test_shared_tasks/test_alerts.py +++ b/datascience/tests/test_pipeline/test_shared_tasks/test_alerts.py @@ -4,10 +4,14 @@ import pandas as pd from src.pipeline.shared_tasks.alerts import ( + archive_reporting, + extract_non_archived_reportings_ids_of_type, + extract_pending_alerts_ids_of_type, extract_silenced_alerts, filter_silenced_alerts, load_alerts, make_alerts, + validate_pending_alert, ) from src.read_query import read_query from tests.mocks import mock_datetime_utcnow @@ -26,6 +30,43 @@ def test_extract_silenced_alerts(reset_test_data): pd.testing.assert_frame_equal(silenced_alerts, expected_silenced_alerts) +def test_extract_pending_alerts_ids_of_type(reset_test_data): + assert extract_pending_alerts_ids_of_type.run( + alert_type="THREE_MILES_TRAWLING_ALERT" + ) == [12] + + assert extract_pending_alerts_ids_of_type.run(alert_type="NON_EXISTING_ALERT") == [] + + +def test_extract_non_archived_reportings_ids_of_type(reset_test_data): + assert extract_non_archived_reportings_ids_of_type.run( + reporting_type="THREE_MILES_TRAWLING_ALERT" + ) == [56] + + assert ( + extract_non_archived_reportings_ids_of_type.run( + reporting_type="NON_EXISTING_ALERT" + ) + == [] + ) + + +@patch("src.pipeline.shared_tasks.alerts.requests") +def test_validate_pending_alert(requests_mock): + validate_pending_alert.run(12) + requests_mock.put.assert_called_once_with( + "https://monitor.fish/api/v1/operational_alerts/12/validate" + ) + + +@patch("src.pipeline.shared_tasks.alerts.requests") +def test_archive_reporting(requests_mock): + archive_reporting.run(12) + requests_mock.put.assert_called_once_with( + "https://monitor.fish/api/v1/reportings/12/archive" + ) + + @patch( "src.pipeline.shared_tasks.alerts.datetime", mock_datetime_utcnow(datetime(2020, 5, 3, 8, 0, 0)),