Skip to content

Commit

Permalink
Diffusion des préavis auto [pipeline] - flaguer isBeingSent un préa…
Browse files Browse the repository at this point in the history
…vis hors scope uniquement si des unités le ciblent (#3448)

## Linked issues

- Resolve #3436
  • Loading branch information
VincentAntoine committed Jul 31, 2024
2 parents beabe0d + 2be4a28 commit 10f3602
Show file tree
Hide file tree
Showing 8 changed files with 251 additions and 96 deletions.
20 changes: 4 additions & 16 deletions datascience/src/pipeline/flows/distribute_pnos.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@
)
from src.pipeline.shared_tasks.dates import get_utcnow, make_timedelta
from src.pipeline.shared_tasks.infrastructure import execute_statement
from src.pipeline.shared_tasks.pnos import (
extract_pno_units_ports_and_segments_subscriptions,
extract_pno_units_targeting_vessels,
)


@task(checkpoint=False)
Expand Down Expand Up @@ -110,22 +114,6 @@ def extract_pnos_to_generate(
return (pnos, generation_needed)


@task(checkpoint=False)
def extract_pno_units_targeting_vessels() -> pd.DataFrame:
return extract(
db_name="monitorfish_remote",
query_filepath="monitorfish/pno_units_targeting_vessels.sql",
)


@task(checkpoint=False)
def extract_pno_units_ports_and_segments_subscriptions() -> pd.DataFrame:
return extract(
db_name="monitorfish_remote",
query_filepath="monitorfish/pno_units_ports_and_segments_subscriptions.sql",
)


@task(checkpoint=False)
def fetch_control_units_contacts() -> pd.DataFrame:
r = requests.get(MONITORENV_API_ENDPOINT + "control_units")
Expand Down
109 changes: 103 additions & 6 deletions datascience/src/pipeline/flows/enrich_logbook.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
from src.pipeline.processing import prepare_df_for_loading
from src.pipeline.shared_tasks.control_flow import check_flow_not_running
from src.pipeline.shared_tasks.dates import get_utcnow, make_periods
from src.pipeline.shared_tasks.pnos import (
extract_pno_units_ports_and_segments_subscriptions,
extract_pno_units_targeting_vessels,
)
from src.pipeline.shared_tasks.segments import extract_all_segments
from src.pipeline.utils import psql_insert_copy

Expand Down Expand Up @@ -266,12 +270,15 @@ def compute_pno_types(
- logbook_reports_pno_id `int` `1`
- cfr `str` `FRA000000000`
- `predicted_arrival_datetime_utc` `datetime`
- year `int` `2023`
- species `str` `'COD'`
- trip_gears `List[dict]`
`[{"gear": "xxx", "mesh": yyy, "dimensions": "zzz}, {...}]`
- species `str` `'COD'`
- fao_area `str` `'27.7.d'`
- flag_state `str` `'FRA'`
- weight `float` `150.5`
- flag_state `str` `'FRA'`
- locode `str` `CCXXX`
- facade `str` `NAMO`
pno_types (pd.DataFrame): DataFrame of pno_types definitions. 1 line = 1 rule.
Must have columns :
Expand All @@ -293,6 +300,7 @@ def compute_pno_types(
- logbook_reports_pno_id `int` `1`
- cfr `str` `FRA000000000`
- locode `str` `CCXXX`
- `predicted_arrival_datetime_utc` `datetime`
- trip_gears `List[dict]`
`[{"gear": "xxx", "mesh": yyy, "dimensions": "zzz}, {...}]`
Expand Down Expand Up @@ -320,6 +328,7 @@ def compute_pno_types(
[
"logbook_reports_pno_id",
"cfr",
"locode",
"flag_state",
"predicted_arrival_datetime_utc",
]
Expand Down Expand Up @@ -451,7 +460,10 @@ def compute_pno_risk_factors(


def flag_pnos_to_verify_and_send(
pnos: pd.DataFrame, predicted_arrival_threshold: datetime
pnos: pd.DataFrame,
pno_units_targeting_vessels: pd.DataFrame,
pno_units_ports_and_segments_subscriptions: pd.DataFrame,
predicted_arrival_threshold: datetime,
):
pnos = pnos.copy(deep=True)

Expand All @@ -462,8 +474,76 @@ def flag_pnos_to_verify_and_send(
pnos["is_verified"] = False
pnos["is_sent"] = False

pnos["is_being_sent"] = (~pnos.is_in_verification_scope) * (
pnos.predicted_arrival_datetime_utc >= predicted_arrival_threshold
segment_subscriptions = (
pno_units_ports_and_segments_subscriptions.explode("unit_subscribed_segments")[
["port_locode", "unit_subscribed_segments"]
]
.dropna()
.reset_index(drop=True)
.assign(is_target_segment=True)
)

target_ports = set(
pno_units_ports_and_segments_subscriptions.loc[
pno_units_ports_and_segments_subscriptions.receive_all_pnos_from_port,
"port_locode",
].tolist()
)

target_vessels = set(pno_units_targeting_vessels["cfr"].dropna().tolist())

pnos_with_segments = (
pnos[["logbook_reports_pno_id", "locode", "trip_segments"]]
.explode("trip_segments")
.assign(
trip_segment=lambda x: x.trip_segments.map(
lambda d: d.get("segment"), na_action="ignore"
)
)
.drop(columns=["trip_segments"])
.reset_index(drop=True)
.dropna()
)

if len(pnos_with_segments) > 0 and len(segment_subscriptions) > 0:
segment_matches = pd.merge(
pnos_with_segments,
segment_subscriptions,
how="left",
left_on=["locode", "trip_segment"],
right_on=["port_locode", "unit_subscribed_segments"],
)
logbook_reports_targeting_segment_ids = set(
segment_matches.loc[
segment_matches.is_target_segment.fillna(False),
"logbook_reports_pno_id",
]
)
else:
logbook_reports_targeting_segment_ids = set()

if len(target_ports) > 0:
logbook_reports_targeting_port_ids = set(
pnos.loc[pnos.locode.isin(target_ports), "logbook_reports_pno_id"].tolist()
)
else:
logbook_reports_targeting_port_ids = set()

if len(target_vessels) > 0:
logbook_reports_targeting_vessel_ids = set(
pnos.loc[pnos.cfr.isin(target_vessels), "logbook_reports_pno_id"].tolist()
)
else:
logbook_reports_targeting_vessel_ids = set()

logbook_reports_to_sent_ids = logbook_reports_targeting_segment_ids.union(
logbook_reports_targeting_port_ids
).union(logbook_reports_targeting_vessel_ids)

pnos["is_being_sent"] = (
(~pnos.is_in_verification_scope)
* (pnos.predicted_arrival_datetime_utc >= predicted_arrival_threshold)
* pnos.logbook_reports_pno_id.isin(logbook_reports_to_sent_ids)
)

return pnos
Expand Down Expand Up @@ -574,6 +654,8 @@ def extract_enrich_load_logbook(
pno_types: pd.DataFrame,
control_anteriority: pd.DataFrame,
all_control_priorities: pd.DataFrame,
pno_units_targeting_vessels: pd.DataFrame,
pno_units_ports_and_segments_subscriptions: pd.DataFrame,
utcnow: datetime,
):
"""Extract pnos for the given `Period`, enrich and update the `logbook` table.
Expand Down Expand Up @@ -622,7 +704,10 @@ def extract_enrich_load_logbook(

logger.info("Flagging PNOs to verify_and_distribute...")
pnos = flag_pnos_to_verify_and_send(
pnos=pnos_with_risk_factors, predicted_arrival_threshold=utcnow
pnos=pnos_with_risk_factors,
pno_units_targeting_vessels=pno_units_targeting_vessels,
pno_units_ports_and_segments_subscriptions=pno_units_ports_and_segments_subscriptions,
predicted_arrival_threshold=utcnow,
)

logger.info("Loading")
Expand All @@ -649,6 +734,10 @@ def extract_enrich_load_logbook(
all_control_priorities = extract_all_control_priorities()
pno_types = extract_pno_types()
control_anteriority = extract_control_anteriority()
pno_units_targeting_vessels = extract_pno_units_targeting_vessels()
pno_units_ports_and_segments_subscriptions = (
extract_pno_units_ports_and_segments_subscriptions()
)

with case(recompute_all, True):
reset = reset_pnos.map(periods)
Expand All @@ -658,6 +747,10 @@ def extract_enrich_load_logbook(
pno_types=unmapped(pno_types),
control_anteriority=unmapped(control_anteriority),
all_control_priorities=unmapped(all_control_priorities),
pno_units_targeting_vessels=unmapped(pno_units_targeting_vessels),
pno_units_ports_and_segments_subscriptions=unmapped(
pno_units_ports_and_segments_subscriptions
),
utcnow=unmapped(utcnow),
upstream_tasks=[reset],
)
Expand All @@ -669,6 +762,10 @@ def extract_enrich_load_logbook(
pno_types=unmapped(pno_types),
control_anteriority=unmapped(control_anteriority),
all_control_priorities=unmapped(all_control_priorities),
pno_units_targeting_vessels=unmapped(pno_units_targeting_vessels),
pno_units_ports_and_segments_subscriptions=unmapped(
pno_units_ports_and_segments_subscriptions
),
utcnow=unmapped(utcnow),
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pno_species AS (
flag_state,
trip_number,
report_datetime_utc,
p.locode,
p.facade,
(r.value->>'tripStartDate')::TIMESTAMPTZ AS trip_start_date,
(r.value->>'predictedArrivalDatetimeUtc')::TIMESTAMPTZ AS predicted_arrival_datetime_utc,
Expand Down Expand Up @@ -103,6 +104,7 @@ SELECT
s.fao_area,
s.weight,
s.flag_state,
s.locode,
s.facade
FROM pno_species s
LEFT JOIN far_gears fg
Expand Down
20 changes: 20 additions & 0 deletions datascience/src/pipeline/shared_tasks/pnos.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import pandas as pd
from prefect import task

from src.pipeline.generic_tasks import extract


@task(checkpoint=False)
def extract_pno_units_targeting_vessels() -> pd.DataFrame:
return extract(
db_name="monitorfish_remote",
query_filepath="monitorfish/pno_units_targeting_vessels.sql",
)


@task(checkpoint=False)
def extract_pno_units_ports_and_segments_subscriptions() -> pd.DataFrame:
return extract(
db_name="monitorfish_remote",
query_filepath="monitorfish/pno_units_ports_and_segments_subscriptions.sql",
)
49 changes: 49 additions & 0 deletions datascience/tests/test_pipeline/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import pandas as pd
import pytest


@pytest.fixture
def pno_units_targeting_vessels():
return pd.DataFrame(
{
"vessel_id": [2, 4, 7],
"cfr": ["ABC000542519", None, "___TARGET___"],
"control_unit_ids_targeting_vessel": [[4], [1, 2], [4]],
}
)


@pytest.fixture
def pno_units_ports_and_segments_subscriptions():
return pd.DataFrame(
{
"port_locode": [
"FRCQF",
"FRDKK",
"FRDPE",
"FRLEH",
"FRLEH",
"FRZJZ",
"FRZJZ",
],
"control_unit_id": [1, 2, 4, 2, 3, 2, 3],
"receive_all_pnos_from_port": [
False,
False,
True,
False,
False,
False,
False,
],
"unit_subscribed_segments": [
["SWW01/02/03"],
[],
[],
[],
["SWW01/02/03", "NWW01"],
[],
["SWW01/02/03", "NWW01"],
],
}
)
63 changes: 0 additions & 63 deletions datascience/tests/test_pipeline/test_flows/test_distribute_pnos.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
create_email,
create_sms,
extract_fishing_gear_names,
extract_pno_units_ports_and_segments_subscriptions,
extract_pno_units_targeting_vessels,
extract_pnos_to_generate,
extract_species_names,
fetch_control_units_contacts,
Expand Down Expand Up @@ -1017,53 +1015,6 @@ def pno_pdf_document_to_distribute_verified_assigned(
)


@pytest.fixture
def pno_units_targeting_vessels():
return pd.DataFrame(
{
"vessel_id": [2, 4, 7],
"cfr": ["ABC000542519", None, "___TARGET___"],
"control_unit_ids_targeting_vessel": [[4], [1, 2], [4]],
}
)


@pytest.fixture
def pno_units_ports_and_segments_subscriptions():
return pd.DataFrame(
{
"port_locode": [
"FRCQF",
"FRDKK",
"FRDPE",
"FRLEH",
"FRLEH",
"FRZJZ",
"FRZJZ",
],
"control_unit_id": [1, 2, 4, 2, 3, 2, 3],
"receive_all_pnos_from_port": [
False,
False,
True,
False,
False,
False,
False,
],
"unit_subscribed_segments": [
["SWW01/02/03"],
[],
[],
[],
["SWW01/02/03", "NWW01"],
[],
["SWW01/02/03", "NWW01"],
],
}
)


@pytest.fixture
def monitorenv_control_units_api_response() -> list:
return [
Expand Down Expand Up @@ -1530,20 +1481,6 @@ def test_extract_species_names(reset_test_data, species_names):
assert res == species_names


def test_extract_pno_units_targeting_vessels(
reset_test_data, pno_units_targeting_vessels
):
res = extract_pno_units_targeting_vessels.run()
pd.testing.assert_frame_equal(res, pno_units_targeting_vessels)


def test_extract_pno_units_ports_and_segments_subscriptions(
reset_test_data, pno_units_ports_and_segments_subscriptions
):
res = extract_pno_units_ports_and_segments_subscriptions.run()
pd.testing.assert_frame_equal(res, pno_units_ports_and_segments_subscriptions)


def test_extract_fishing_gear_names(reset_test_data, fishing_gear_names):
res = extract_fishing_gear_names.run()
assert res == fishing_gear_names
Expand Down
Loading

0 comments on commit 10f3602

Please sign in to comment.