Skip to content

Commit

Permalink
Mark PNOs as 'sent' only if at least one message was successfully sent
Browse files Browse the repository at this point in the history
  • Loading branch information
VincentAntoine committed Jul 19, 2024
1 parent 0daafba commit b2a17b4
Show file tree
Hide file tree
Showing 2 changed files with 183 additions and 15 deletions.
85 changes: 75 additions & 10 deletions datascience/src/pipeline/flows/distribute_pnos.py
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,7 @@ def load_prior_notification_sent_messages(
@task(checkpoint=False)
def make_update_logbook_reports_statement(
pnos_to_update: List[RenderedPno],
sent_messages: List[PriorNotificationSentMessage],
start_datetime_utc: datetime,
end_datetime_utc: datetime,
) -> Executable:
Expand All @@ -824,6 +825,7 @@ def make_update_logbook_reports_statement(
Args:
pnos_to_update (List[RenderedPno]): PNOs to update
sent_messages (List[PriorNotificationSentMessage]): PNOs that were sent
start_datetime_utc (datetime): start date
end_datetime_utc (datetime): end date
Expand All @@ -838,6 +840,16 @@ def make_update_logbook_reports_statement(
pno.report_id for pno in pnos_to_update if pno.source == PnoSource.LOGBOOK
)

sent_pno_report_ids = tuple(
sorted(
set(
m.prior_notification_report_id
for m in sent_messages
if (m.success and m.prior_notification_source == PnoSource.LOGBOOK)
)
)
)

logger = prefect.context.get("logger")

if logbook_pno_report_ids:
Expand All @@ -848,6 +860,29 @@ def make_update_logbook_reports_statement(
)
)

bind_params = [
bindparam(
"logbook_pno_report_ids", value=logbook_pno_report_ids, expanding=True
),
]

# Empty tuple is not expanded correctly by SQLAlchemy so this case must be
# handled separately
if sent_pno_report_ids:
is_sent_line = (
" (CASE WHEN report_id IN :sent_pno_report_ids "
"THEN 'true' "
"ELSE 'false' END)::jsonb"
)
bind_params.append(
bindparam(
"sent_pno_report_ids", value=sent_pno_report_ids, expanding=True
)
)

else:
is_sent_line = " false::text::jsonb"

statement = text(
"UPDATE public.logbook_reports "
" SET value = jsonb_set("
Expand All @@ -857,16 +892,14 @@ def make_update_logbook_reports_statement(
" false::text::jsonb"
" ), "
" '{isSent}', "
" true::text::jsonb"
f"{is_sent_line}"
" ) "
"WHERE "
" operation_datetime_utc >= :start_datetime_utc "
" AND operation_datetime_utc < :end_datetime_utc "
" AND report_id IN :logbook_pno_report_ids"
).bindparams(
bindparam(
"logbook_pno_report_ids", value=logbook_pno_report_ids, expanding=True
),
*bind_params,
start_datetime_utc=start_datetime_utc,
end_datetime_utc=end_datetime_utc,
)
Expand All @@ -880,6 +913,7 @@ def make_update_logbook_reports_statement(
@task(checkpoint=False)
def make_manual_prior_notifications_statement(
pnos_to_update: List[RenderedPno],
sent_messages: List[PriorNotificationSentMessage],
) -> Executable:
"""
Creates slqalchemy update statement to update `isBeingSent` and `isSent` fields of
Expand All @@ -899,6 +933,16 @@ def make_manual_prior_notifications_statement(
pno.report_id for pno in pnos_to_update if pno.source == PnoSource.MANUAL
)

sent_pno_report_ids = tuple(
sorted(
set(
m.prior_notification_report_id
for m in sent_messages
if (m.success and m.prior_notification_source == PnoSource.MANUAL)
)
)
)

logger = prefect.context.get("logger")

if manual_pno_report_ids:
Expand All @@ -909,6 +953,29 @@ def make_manual_prior_notifications_statement(
)
)

bind_params = [
bindparam(
"manual_pno_report_ids", value=manual_pno_report_ids, expanding=True
),
]

# Empty tuple is not expanded correctly by SQLAlchemy so this case must be
# handled separately
if sent_pno_report_ids:
is_sent_line = (
" (CASE WHEN report_id IN :sent_pno_report_ids "
"THEN 'true' "
"ELSE 'false' END)::jsonb"
)
bind_params.append(
bindparam(
"sent_pno_report_ids", value=sent_pno_report_ids, expanding=True
)
)

else:
is_sent_line = " false::text::jsonb"

statement = text(
"UPDATE public.manual_prior_notifications "
" SET value = jsonb_set("
Expand All @@ -918,14 +985,10 @@ def make_manual_prior_notifications_statement(
" false::text::jsonb"
" ), "
" '{isSent}', "
" true::text::jsonb"
f"{is_sent_line}"
" ) "
"WHERE report_id IN :manual_pno_report_ids"
).bindparams(
bindparam(
"manual_pno_report_ids", value=manual_pno_report_ids, expanding=True
),
)
).bindparams(*bind_params)

return statement

Expand Down Expand Up @@ -1020,6 +1083,7 @@ def make_manual_prior_notifications_statement(
update_logbook_reports_statement = (
make_update_logbook_reports_statement(
pnos_to_update=pnos_to_distribute,
sent_messages=flatten(sent_messages),
start_datetime_utc=start_datetime_utc,
end_datetime_utc=end_datetime_utc,
upstream_tasks=[loaded_prior_notification_sent_messages],
Expand All @@ -1029,6 +1093,7 @@ def make_manual_prior_notifications_statement(
update_manual_pnos_statement = (
make_manual_prior_notifications_statement(
pnos_to_update=pnos_to_distribute,
sent_messages=flatten(sent_messages),
upstream_tasks=[loaded_prior_notification_sent_messages],
)
)
Expand Down
113 changes: 108 additions & 5 deletions datascience/tests/test_pipeline/test_flows/test_distribute_pnos.py
Original file line number Diff line number Diff line change
Expand Up @@ -1424,6 +1424,54 @@ def messages_sent_by_sms() -> List[PriorNotificationSentMessage]:
]


@pytest.fixture
def some_more_sent_messages(
messages_sent_by_email, messages_sent_by_sms
) -> List[PriorNotificationSentMessage]:
return (
messages_sent_by_email
+ messages_sent_by_sms
+ [
PriorNotificationSentMessage(
prior_notification_report_id="Manual-Report-1",
prior_notification_source=PnoSource.MANUAL,
date_time_utc=datetime(2023, 6, 6, 16, 10),
communication_means=CommunicationMeans.SMS,
recipient_address_or_number="00000000000",
success=True,
error_message=None,
),
PriorNotificationSentMessage(
prior_notification_report_id="Failed-logbook-report-123",
prior_notification_source=PnoSource.LOGBOOK,
date_time_utc=datetime(2023, 6, 6, 16, 10),
communication_means=CommunicationMeans.SMS,
recipient_address_or_number="00000000000",
success=False,
error_message=None,
),
PriorNotificationSentMessage(
prior_notification_report_id="Other-logbook-report-123",
prior_notification_source=PnoSource.LOGBOOK,
date_time_utc=datetime(2023, 6, 6, 16, 10),
communication_means=CommunicationMeans.SMS,
recipient_address_or_number="00000000000",
success=True,
error_message=None,
),
PriorNotificationSentMessage(
prior_notification_report_id="Manual-Report-failed",
prior_notification_source=PnoSource.MANUAL,
date_time_utc=datetime(2023, 6, 6, 16, 10),
communication_means=CommunicationMeans.SMS,
recipient_address_or_number="00000000000",
success=False,
error_message=None,
),
]
)


@pytest.fixture
def loaded_sent_messages() -> pd.DataFrame:
return pd.DataFrame(
Expand Down Expand Up @@ -2052,10 +2100,12 @@ def test_load_prior_notification_sent_messages(


def test_make_update_logbook_reports_statement(
logbook_rendered_pno, manual_rendered_pno
logbook_rendered_pno, manual_rendered_pno, some_more_sent_messages
):
# Test without sent messages
statement = make_update_logbook_reports_statement.run(
pnos_to_update=[logbook_rendered_pno, manual_rendered_pno],
sent_messages=[],
start_datetime_utc=datetime(2023, 6, 5, 12, 5, 6),
end_datetime_utc=datetime(2023, 7, 5, 15, 2, 38),
)
Expand All @@ -2071,7 +2121,35 @@ def test_make_update_logbook_reports_statement(
" false::text::jsonb"
" ),"
" '{isSent}',"
" true::text::jsonb"
" false::text::jsonb"
" ) "
"WHERE"
" operation_datetime_utc >= '2023-06-05 12:05:06'"
" AND operation_datetime_utc < '2023-07-05 15:02:38'"
" AND report_id IN ('Report-1')"
)

# Test with sent messages
statement = make_update_logbook_reports_statement.run(
pnos_to_update=[logbook_rendered_pno, manual_rendered_pno],
sent_messages=some_more_sent_messages,
start_datetime_utc=datetime(2023, 6, 5, 12, 5, 6),
end_datetime_utc=datetime(2023, 7, 5, 15, 2, 38),
)
assert isinstance(statement, sqlalchemy.TextClause)
compiled_statement = str(statement.compile(compile_kwargs={"literal_binds": True}))

assert compiled_statement == (
"UPDATE public.logbook_reports"
" SET value = jsonb_set("
" jsonb_set("
" value,"
" '{isBeingSent}',"
" false::text::jsonb"
" ),"
" '{isSent}',"
" (CASE WHEN report_id IN ('Other-logbook-report-123', 'Report-1') "
"THEN 'true' ELSE 'false' END)::jsonb"
" ) "
"WHERE"
" operation_datetime_utc >= '2023-06-05 12:05:06'"
Expand All @@ -2081,10 +2159,33 @@ def test_make_update_logbook_reports_statement(


def test_make_update_manual_prior_notifications_statement(
logbook_rendered_pno, manual_rendered_pno
logbook_rendered_pno, manual_rendered_pno, some_more_sent_messages
):
# Test without sent messages
statement = make_manual_prior_notifications_statement.run(
pnos_to_update=[logbook_rendered_pno, manual_rendered_pno], sent_messages=[]
)
assert isinstance(statement, sqlalchemy.TextClause)
compiled_statement = str(statement.compile(compile_kwargs={"literal_binds": True}))

assert compiled_statement == (
"UPDATE public.manual_prior_notifications"
" SET value = jsonb_set("
" jsonb_set("
" value,"
" '{isBeingSent}',"
" false::text::jsonb"
" ),"
" '{isSent}',"
" false::text::jsonb"
" ) "
"WHERE report_id IN ('Report-2')"
)

# Test with sent messages
statement = make_manual_prior_notifications_statement.run(
pnos_to_update=[logbook_rendered_pno, manual_rendered_pno],
sent_messages=some_more_sent_messages,
)
assert isinstance(statement, sqlalchemy.TextClause)
compiled_statement = str(statement.compile(compile_kwargs={"literal_binds": True}))
Expand All @@ -2098,7 +2199,7 @@ def test_make_update_manual_prior_notifications_statement(
" false::text::jsonb"
" ),"
" '{isSent}',"
" true::text::jsonb"
" (CASE WHEN report_id IN ('Manual-Report-1') THEN 'true' ELSE 'false' END)::jsonb"
" ) "
"WHERE report_id IN ('Report-2')"
)
Expand All @@ -2109,7 +2210,7 @@ def test_make_update_manual_prior_notifications_statement(
@patch("src.pipeline.helpers.emails.send_sms")
@patch("src.pipeline.helpers.emails.send_fax")
@patch("src.pipeline.flows.distribute_pnos.requests")
def test_flow(
def test_flow_abracadabra(
mock_requests,
mock_send_fax,
mock_send_sms,
Expand Down Expand Up @@ -2232,6 +2333,8 @@ class UnexpectedFailureOfDeath(Exception):
"success",
].all()

breakpoint()


@pytest.mark.parametrize("zero_pno_types", ["manual", "logbook", "both"])
@pytest.mark.parametrize("is_integration", [True, False])
Expand Down

0 comments on commit b2a17b4

Please sign in to comment.