diff --git a/datascience/src/pipeline/flows/distribute_pnos.py b/datascience/src/pipeline/flows/distribute_pnos.py index 258534de84..d0a3692d59 100644 --- a/datascience/src/pipeline/flows/distribute_pnos.py +++ b/datascience/src/pipeline/flows/distribute_pnos.py @@ -63,6 +63,10 @@ ) from src.pipeline.shared_tasks.dates import get_utcnow, make_timedelta from src.pipeline.shared_tasks.infrastructure import execute_statement +from src.pipeline.shared_tasks.pnos import ( + extract_pno_units_ports_and_segments_subscriptions, + extract_pno_units_targeting_vessels, +) @task(checkpoint=False) @@ -110,22 +114,6 @@ def extract_pnos_to_generate( return (pnos, generation_needed) -@task(checkpoint=False) -def extract_pno_units_targeting_vessels() -> pd.DataFrame: - return extract( - db_name="monitorfish_remote", - query_filepath="monitorfish/pno_units_targeting_vessels.sql", - ) - - -@task(checkpoint=False) -def extract_pno_units_ports_and_segments_subscriptions() -> pd.DataFrame: - return extract( - db_name="monitorfish_remote", - query_filepath="monitorfish/pno_units_ports_and_segments_subscriptions.sql", - ) - - @task(checkpoint=False) def fetch_control_units_contacts() -> pd.DataFrame: r = requests.get(MONITORENV_API_ENDPOINT + "control_units") diff --git a/datascience/src/pipeline/flows/enrich_logbook.py b/datascience/src/pipeline/flows/enrich_logbook.py index 7af4d9e361..db45b9cca5 100644 --- a/datascience/src/pipeline/flows/enrich_logbook.py +++ b/datascience/src/pipeline/flows/enrich_logbook.py @@ -21,6 +21,10 @@ from src.pipeline.processing import prepare_df_for_loading from src.pipeline.shared_tasks.control_flow import check_flow_not_running from src.pipeline.shared_tasks.dates import get_utcnow, make_periods +from src.pipeline.shared_tasks.pnos import ( + extract_pno_units_ports_and_segments_subscriptions, + extract_pno_units_targeting_vessels, +) from src.pipeline.shared_tasks.segments import extract_all_segments from src.pipeline.utils import psql_insert_copy @@ -266,12 +270,15 @@ def compute_pno_types( - logbook_reports_pno_id `int` `1` - cfr `str` `FRA000000000` - `predicted_arrival_datetime_utc` `datetime` + - year `int` `2023` + - species `str` `'COD'` - trip_gears `List[dict]` `[{"gear": "xxx", "mesh": yyy, "dimensions": "zzz}, {...}]` - - species `str` `'COD'` - fao_area `str` `'27.7.d'` - - flag_state `str` `'FRA'` - weight `float` `150.5` + - flag_state `str` `'FRA'` + - locode `str` `CCXXX` + - facade `str` `NAMO` pno_types (pd.DataFrame): DataFrame of pno_types definitions. 1 line = 1 rule. Must have columns : @@ -293,6 +300,7 @@ def compute_pno_types( - logbook_reports_pno_id `int` `1` - cfr `str` `FRA000000000` + - locode `str` `CCXXX` - `predicted_arrival_datetime_utc` `datetime` - trip_gears `List[dict]` `[{"gear": "xxx", "mesh": yyy, "dimensions": "zzz}, {...}]` @@ -320,6 +328,7 @@ def compute_pno_types( [ "logbook_reports_pno_id", "cfr", + "locode", "flag_state", "predicted_arrival_datetime_utc", ] @@ -451,7 +460,10 @@ def compute_pno_risk_factors( def flag_pnos_to_verify_and_send( - pnos: pd.DataFrame, predicted_arrival_threshold: datetime + pnos: pd.DataFrame, + pno_units_targeting_vessels: pd.DataFrame, + pno_units_ports_and_segments_subscriptions: pd.DataFrame, + predicted_arrival_threshold: datetime, ): pnos = pnos.copy(deep=True) @@ -462,8 +474,76 @@ def flag_pnos_to_verify_and_send( pnos["is_verified"] = False pnos["is_sent"] = False - pnos["is_being_sent"] = (~pnos.is_in_verification_scope) * ( - pnos.predicted_arrival_datetime_utc >= predicted_arrival_threshold + segment_subscriptions = ( + pno_units_ports_and_segments_subscriptions.explode("unit_subscribed_segments")[ + ["port_locode", "unit_subscribed_segments"] + ] + .dropna() + .reset_index(drop=True) + .assign(is_target_segment=True) + ) + + target_ports = set( + pno_units_ports_and_segments_subscriptions.loc[ + pno_units_ports_and_segments_subscriptions.receive_all_pnos_from_port, + "port_locode", + ].tolist() + ) + + target_vessels = set(pno_units_targeting_vessels["cfr"].dropna().tolist()) + + pnos_with_segments = ( + pnos[["logbook_reports_pno_id", "locode", "trip_segments"]] + .explode("trip_segments") + .assign( + trip_segment=lambda x: x.trip_segments.map( + lambda d: d.get("segment"), na_action="ignore" + ) + ) + .drop(columns=["trip_segments"]) + .reset_index(drop=True) + .dropna() + ) + + if len(pnos_with_segments) > 0 and len(segment_subscriptions) > 0: + segment_matches = pd.merge( + pnos_with_segments, + segment_subscriptions, + how="left", + left_on=["locode", "trip_segment"], + right_on=["port_locode", "unit_subscribed_segments"], + ) + logbook_reports_targeting_segment_ids = set( + segment_matches.loc[ + segment_matches.is_target_segment.fillna(False), + "logbook_reports_pno_id", + ] + ) + else: + logbook_reports_targeting_segment_ids = set() + + if len(target_ports) > 0: + logbook_reports_targeting_port_ids = set( + pnos.loc[pnos.locode.isin(target_ports), "logbook_reports_pno_id"].tolist() + ) + else: + logbook_reports_targeting_port_ids = set() + + if len(target_vessels) > 0: + logbook_reports_targeting_vessel_ids = set( + pnos.loc[pnos.cfr.isin(target_vessels), "logbook_reports_pno_id"].tolist() + ) + else: + logbook_reports_targeting_vessel_ids = set() + + logbook_reports_to_sent_ids = logbook_reports_targeting_segment_ids.union( + logbook_reports_targeting_port_ids + ).union(logbook_reports_targeting_vessel_ids) + + pnos["is_being_sent"] = ( + (~pnos.is_in_verification_scope) + * (pnos.predicted_arrival_datetime_utc >= predicted_arrival_threshold) + * pnos.logbook_reports_pno_id.isin(logbook_reports_to_sent_ids) ) return pnos @@ -574,6 +654,8 @@ def extract_enrich_load_logbook( pno_types: pd.DataFrame, control_anteriority: pd.DataFrame, all_control_priorities: pd.DataFrame, + pno_units_targeting_vessels: pd.DataFrame, + pno_units_ports_and_segments_subscriptions: pd.DataFrame, utcnow: datetime, ): """Extract pnos for the given `Period`, enrich and update the `logbook` table. @@ -622,7 +704,10 @@ def extract_enrich_load_logbook( logger.info("Flagging PNOs to verify_and_distribute...") pnos = flag_pnos_to_verify_and_send( - pnos=pnos_with_risk_factors, predicted_arrival_threshold=utcnow + pnos=pnos_with_risk_factors, + pno_units_targeting_vessels=pno_units_targeting_vessels, + pno_units_ports_and_segments_subscriptions=pno_units_ports_and_segments_subscriptions, + predicted_arrival_threshold=utcnow, ) logger.info("Loading") @@ -649,6 +734,10 @@ def extract_enrich_load_logbook( all_control_priorities = extract_all_control_priorities() pno_types = extract_pno_types() control_anteriority = extract_control_anteriority() + pno_units_targeting_vessels = extract_pno_units_targeting_vessels() + pno_units_ports_and_segments_subscriptions = ( + extract_pno_units_ports_and_segments_subscriptions() + ) with case(recompute_all, True): reset = reset_pnos.map(periods) @@ -658,6 +747,10 @@ def extract_enrich_load_logbook( pno_types=unmapped(pno_types), control_anteriority=unmapped(control_anteriority), all_control_priorities=unmapped(all_control_priorities), + pno_units_targeting_vessels=unmapped(pno_units_targeting_vessels), + pno_units_ports_and_segments_subscriptions=unmapped( + pno_units_ports_and_segments_subscriptions + ), utcnow=unmapped(utcnow), upstream_tasks=[reset], ) @@ -669,6 +762,10 @@ def extract_enrich_load_logbook( pno_types=unmapped(pno_types), control_anteriority=unmapped(control_anteriority), all_control_priorities=unmapped(all_control_priorities), + pno_units_targeting_vessels=unmapped(pno_units_targeting_vessels), + pno_units_ports_and_segments_subscriptions=unmapped( + pno_units_ports_and_segments_subscriptions + ), utcnow=unmapped(utcnow), ) diff --git a/datascience/src/pipeline/queries/monitorfish/pno_species_and_gears.sql b/datascience/src/pipeline/queries/monitorfish/pno_species_and_gears.sql index 1d2d00c062..e1e206858b 100644 --- a/datascience/src/pipeline/queries/monitorfish/pno_species_and_gears.sql +++ b/datascience/src/pipeline/queries/monitorfish/pno_species_and_gears.sql @@ -21,6 +21,7 @@ pno_species AS ( flag_state, trip_number, report_datetime_utc, + p.locode, p.facade, (r.value->>'tripStartDate')::TIMESTAMPTZ AS trip_start_date, (r.value->>'predictedArrivalDatetimeUtc')::TIMESTAMPTZ AS predicted_arrival_datetime_utc, @@ -103,6 +104,7 @@ SELECT s.fao_area, s.weight, s.flag_state, + s.locode, s.facade FROM pno_species s LEFT JOIN far_gears fg diff --git a/datascience/src/pipeline/shared_tasks/pnos.py b/datascience/src/pipeline/shared_tasks/pnos.py new file mode 100644 index 0000000000..b850eec942 --- /dev/null +++ b/datascience/src/pipeline/shared_tasks/pnos.py @@ -0,0 +1,20 @@ +import pandas as pd +from prefect import task + +from src.pipeline.generic_tasks import extract + + +@task(checkpoint=False) +def extract_pno_units_targeting_vessels() -> pd.DataFrame: + return extract( + db_name="monitorfish_remote", + query_filepath="monitorfish/pno_units_targeting_vessels.sql", + ) + + +@task(checkpoint=False) +def extract_pno_units_ports_and_segments_subscriptions() -> pd.DataFrame: + return extract( + db_name="monitorfish_remote", + query_filepath="monitorfish/pno_units_ports_and_segments_subscriptions.sql", + ) diff --git a/datascience/tests/test_pipeline/conftest.py b/datascience/tests/test_pipeline/conftest.py new file mode 100644 index 0000000000..b18cf511c6 --- /dev/null +++ b/datascience/tests/test_pipeline/conftest.py @@ -0,0 +1,49 @@ +import pandas as pd +import pytest + + +@pytest.fixture +def pno_units_targeting_vessels(): + return pd.DataFrame( + { + "vessel_id": [2, 4, 7], + "cfr": ["ABC000542519", None, "___TARGET___"], + "control_unit_ids_targeting_vessel": [[4], [1, 2], [4]], + } + ) + + +@pytest.fixture +def pno_units_ports_and_segments_subscriptions(): + return pd.DataFrame( + { + "port_locode": [ + "FRCQF", + "FRDKK", + "FRDPE", + "FRLEH", + "FRLEH", + "FRZJZ", + "FRZJZ", + ], + "control_unit_id": [1, 2, 4, 2, 3, 2, 3], + "receive_all_pnos_from_port": [ + False, + False, + True, + False, + False, + False, + False, + ], + "unit_subscribed_segments": [ + ["SWW01/02/03"], + [], + [], + [], + ["SWW01/02/03", "NWW01"], + [], + ["SWW01/02/03", "NWW01"], + ], + } + ) diff --git a/datascience/tests/test_pipeline/test_flows/test_distribute_pnos.py b/datascience/tests/test_pipeline/test_flows/test_distribute_pnos.py index de07855aaa..97945e1cd2 100644 --- a/datascience/tests/test_pipeline/test_flows/test_distribute_pnos.py +++ b/datascience/tests/test_pipeline/test_flows/test_distribute_pnos.py @@ -33,8 +33,6 @@ create_email, create_sms, extract_fishing_gear_names, - extract_pno_units_ports_and_segments_subscriptions, - extract_pno_units_targeting_vessels, extract_pnos_to_generate, extract_species_names, fetch_control_units_contacts, @@ -1017,53 +1015,6 @@ def pno_pdf_document_to_distribute_verified_assigned( ) -@pytest.fixture -def pno_units_targeting_vessels(): - return pd.DataFrame( - { - "vessel_id": [2, 4, 7], - "cfr": ["ABC000542519", None, "___TARGET___"], - "control_unit_ids_targeting_vessel": [[4], [1, 2], [4]], - } - ) - - -@pytest.fixture -def pno_units_ports_and_segments_subscriptions(): - return pd.DataFrame( - { - "port_locode": [ - "FRCQF", - "FRDKK", - "FRDPE", - "FRLEH", - "FRLEH", - "FRZJZ", - "FRZJZ", - ], - "control_unit_id": [1, 2, 4, 2, 3, 2, 3], - "receive_all_pnos_from_port": [ - False, - False, - True, - False, - False, - False, - False, - ], - "unit_subscribed_segments": [ - ["SWW01/02/03"], - [], - [], - [], - ["SWW01/02/03", "NWW01"], - [], - ["SWW01/02/03", "NWW01"], - ], - } - ) - - @pytest.fixture def monitorenv_control_units_api_response() -> list: return [ @@ -1530,20 +1481,6 @@ def test_extract_species_names(reset_test_data, species_names): assert res == species_names -def test_extract_pno_units_targeting_vessels( - reset_test_data, pno_units_targeting_vessels -): - res = extract_pno_units_targeting_vessels.run() - pd.testing.assert_frame_equal(res, pno_units_targeting_vessels) - - -def test_extract_pno_units_ports_and_segments_subscriptions( - reset_test_data, pno_units_ports_and_segments_subscriptions -): - res = extract_pno_units_ports_and_segments_subscriptions.run() - pd.testing.assert_frame_equal(res, pno_units_ports_and_segments_subscriptions) - - def test_extract_fishing_gear_names(reset_test_data, fishing_gear_names): res = extract_fishing_gear_names.run() assert res == fishing_gear_names diff --git a/datascience/tests/test_pipeline/test_flows/test_enrich_logbook.py b/datascience/tests/test_pipeline/test_flows/test_enrich_logbook.py index af150cd33f..2f2ae03ef8 100644 --- a/datascience/tests/test_pipeline/test_flows/test_enrich_logbook.py +++ b/datascience/tests/test_pipeline/test_flows/test_enrich_logbook.py @@ -140,9 +140,9 @@ def sample_pno_species_and_gears() -> pd.DataFrame: "CFR000000004", "CFR000000004", "CFR000000004", - "CFR000000005", - "CFR000000005", - "CFR000000005", + "___TARGET___", + "___TARGET___", + "___TARGET___", "CFR000000006", "CFR000000001", # The same vessel has two PNOs "CFR000000008", @@ -248,6 +248,20 @@ def sample_pno_species_and_gears() -> pd.DataFrame: "FRA", "FRA", ], + "locode": [ + "FRABH", + "FRABH", + "FRUUU", + "FRLEH", + "FRLEH", + "FRLEH", + "FRAMO", + "FRAMO", + "FRAMO", + "FRAAA", + "FRDPE", + "FRLEH", + ], "facade": [ "SA", "SA", @@ -305,6 +319,7 @@ def expected_pno_species_and_gears() -> pd.DataFrame: "fao_area": ["27.7.a", None], "weight": [1500.0, None], "flag_state": ["CYP", "CYP"], + "locode": ["GBPHD", None], "facade": ["MEMN", None], } ) @@ -315,7 +330,7 @@ def segments() -> pd.DataFrame: return pd.DataFrame( { "year": [2023, 2023, 2023, 2023, 2015], - "segment": ["SOTM", "SHKE27", "SSB", "SxTB8910", "SxTB8910-2015"], + "segment": ["SOTM", "SHKE27", "NWW01", "SxTB8910", "SxTB8910-2015"], "segment_name": [ "Chaluts pélagiques", "Merlu en zone 27", @@ -359,11 +374,21 @@ def expected_computed_pno_types() -> pd.DataFrame: "CFR000000002", "CFR000000003", "CFR000000004", - "CFR000000005", + "___TARGET___", "CFR000000006", "CFR000000001", # The same vessel has two PNOs "CFR000000008", ], + "locode": [ + "FRABH", + "FRABH", + "FRUUU", + "FRLEH", + "FRAMO", + "FRAAA", + "FRDPE", + "FRLEH", + ], "flag_state": ["FRA", "FRA", "GBR", "FRA", "FRA", "FRA", "FRA", "FRA"], "predicted_arrival_datetime_utc": [ datetime(2021, 5, 2), @@ -474,7 +499,7 @@ def expected_computed_pno_segments() -> pd.DataFrame: {"segment": "SOTM", "segmentName": "Chaluts pélagiques"}, ], [{"segment": "SOTM", "segmentName": "Chaluts pélagiques"}], - [{"segment": "SSB", "segmentName": "Senne de plage"}], + [{"segment": "NWW01", "segmentName": "Senne de plage"}], ], "impact_risk_factor": [ default_risk_factors["impact_risk_factor"], @@ -672,11 +697,21 @@ def merged_pnos() -> pd.DataFrame: "CFR000000002", "CFR000000003", "CFR000000004", - "CFR000000005", + "___TARGET___", "CFR000000006", "CFR000000001", # The same vessel has two PNOs "CFR000000008", ], + "locode": [ + "FRABH", + "FRABH", + "FRUUU", + "FRLEH", + "FRAMO", + "FRAAA", + "FRDPE", + "FRLEH", + ], "flag_state": ["FRA", "FRA", "GBR", "FRA", "FRA", "FRA", "FRA", "FRA"], "predicted_arrival_datetime_utc": [ datetime(2021, 5, 2), @@ -778,7 +813,7 @@ def merged_pnos() -> pd.DataFrame: {"segment": "SOTM", "segmentName": "Chaluts pélagiques"}, ], [{"segment": "SOTM", "segmentName": "Chaluts pélagiques"}], - [{"segment": "SSB", "segmentName": "Senne de plage"}], + [{"segment": "NWW01", "segmentName": "Senne de plage"}], ], "impact_risk_factor": [ default_risk_factors["impact_risk_factor"], @@ -957,10 +992,17 @@ def test_compute_pno_risk_factors( pd.testing.assert_frame_equal(res, pnos_with_risk_factors) -def test_flag_pnos_to_verify_and_send(pnos_with_risk_factors, flagged_pnos): +def test_flag_pnos_to_verify_and_send( + pnos_with_risk_factors, + flagged_pnos, + pno_units_targeting_vessels, + pno_units_ports_and_segments_subscriptions, +): res = flag_pnos_to_verify_and_send( pnos=pnos_with_risk_factors, - predicted_arrival_threshold=datetime(2023, 5, 4, 14, 12, 25), + pno_units_targeting_vessels=pno_units_targeting_vessels, + pno_units_ports_and_segments_subscriptions=pno_units_ports_and_segments_subscriptions, + predicted_arrival_threshold=datetime(2023, 5, 2, 14, 12, 25), ) pd.testing.assert_frame_equal(res, flagged_pnos) @@ -1048,7 +1090,7 @@ def test_load_then_reset_logbook( ) -def test_flow(reset_test_data): +def test_flow_abracadabra(reset_test_data): query = ( "SELECT id, enriched, trip_gears, value->'pnoTypes' AS pno_types, (value->>'riskFactor')::DOUBLE PRECISION AS risk_factor, trip_segments " "FROM logbook_reports " diff --git a/datascience/tests/test_pipeline/test_shared_tasks/test_pnos.py b/datascience/tests/test_pipeline/test_shared_tasks/test_pnos.py new file mode 100644 index 0000000000..1d984784e4 --- /dev/null +++ b/datascience/tests/test_pipeline/test_shared_tasks/test_pnos.py @@ -0,0 +1,20 @@ +import pandas as pd + +from src.pipeline.flows.distribute_pnos import extract_pno_units_targeting_vessels +from src.pipeline.shared_tasks.pnos import ( + extract_pno_units_ports_and_segments_subscriptions, +) + + +def test_extract_pno_units_targeting_vessels( + reset_test_data, pno_units_targeting_vessels +): + res = extract_pno_units_targeting_vessels.run() + pd.testing.assert_frame_equal(res, pno_units_targeting_vessels) + + +def test_extract_pno_units_ports_and_segments_subscriptions( + reset_test_data, pno_units_ports_and_segments_subscriptions +): + res = extract_pno_units_ports_and_segments_subscriptions.run() + pd.testing.assert_frame_equal(res, pno_units_ports_and_segments_subscriptions)