From c436b1ab3e7979593b87e3fc2c384ab340e934ad Mon Sep 17 00:00:00 2001 From: Vincent Date: Fri, 9 Aug 2024 14:37:54 +0200 Subject: [PATCH 1/2] Use DELETE by default when replacing table data --- .../src/pipeline/flows/init_pno_types.py | 4 +- datascience/src/pipeline/flows/missions.py | 4 +- datascience/src/pipeline/generic_tasks.py | 61 ++++++++++++------- datascience/src/pipeline/utils.py | 19 ++++-- 4 files changed, 56 insertions(+), 32 deletions(-) diff --git a/datascience/src/pipeline/flows/init_pno_types.py b/datascience/src/pipeline/flows/init_pno_types.py index 0cb5fc465b..385630986e 100644 --- a/datascience/src/pipeline/flows/init_pno_types.py +++ b/datascience/src/pipeline/flows/init_pno_types.py @@ -12,7 +12,7 @@ from src.pipeline.generic_tasks import load from src.pipeline.shared_tasks.control_flow import check_flow_not_running from src.pipeline.shared_tasks.infrastructure import get_table -from src.pipeline.utils import truncate +from src.pipeline.utils import delete @task(checkpoint=False) @@ -47,7 +47,7 @@ def load_pno_types_and_rules( e = create_engine("monitorfish_remote") with e.begin() as con: - truncate( + delete( tables=[pno_type_rules_table, pno_types_table], connection=con, logger=logger, diff --git a/datascience/src/pipeline/flows/missions.py b/datascience/src/pipeline/flows/missions.py index a58973760b..412795ba64 100644 --- a/datascience/src/pipeline/flows/missions.py +++ b/datascience/src/pipeline/flows/missions.py @@ -10,7 +10,7 @@ from src.pipeline.generic_tasks import extract, load from src.pipeline.shared_tasks.control_flow import check_flow_not_running from src.pipeline.shared_tasks.infrastructure import get_table -from src.pipeline.utils import truncate +from src.pipeline.utils import delete @task(checkpoint=False) @@ -113,7 +113,7 @@ def load_missions_and_missions_control_units( e = create_engine("monitorfish_remote") with e.begin() as connection: - truncate( + delete( tables=[analytics_missions_table, analytics_missions_control_units_table], connection=connection, logger=prefect.context.get("logger"), diff --git a/datascience/src/pipeline/generic_tasks.py b/datascience/src/pipeline/generic_tasks.py index 8a17e15a05..3a4078f1d0 100644 --- a/datascience/src/pipeline/generic_tasks.py +++ b/datascience/src/pipeline/generic_tasks.py @@ -1,6 +1,6 @@ import logging from pathlib import Path -from typing import List, Union +from typing import List, Optional, Union import geopandas as gpd import pandas as pd @@ -19,12 +19,12 @@ def extract( db_name: str, query_filepath: Union[Path, str], - dtypes: Union[None, dict] = None, - parse_dates: Union[list, dict, None] = None, + dtypes: Optional[dict] = None, + parse_dates: Optional[Union[list, dict]] = None, params=None, backend: str = "pandas", geom_col: str = "geom", - crs: Union[int, None] = None, + crs: Optional[int] = None, ) -> Union[pd.DataFrame, gpd.GeoDataFrame]: """Run SQL query against the indicated database and return the result as a `pandas.DataFrame`. @@ -56,7 +56,7 @@ def extract( geom_col (str, optional): column name to convert to shapely geometries when `backend` is 'geopandas'. Ignored when `backend` is 'pandas'. Defaults to 'geom'. - crs (Union[None, str], optional) : CRS to use for the returned GeoDataFrame; + crs (str, optional) : CRS to use for the returned GeoDataFrame; if not set, tries to determine CRS from the SRID associated with the first geometry in the database, and assigns that to all geometries. Ignored when `backend` is 'pandas'. Defaults to None. @@ -88,20 +88,21 @@ def load( schema: str, logger: logging.Logger, how: str = "replace", - db_name: str = None, - pg_array_columns: list = None, + replace_with_truncate: bool = False, + db_name: Optional[str] = None, + pg_array_columns: Optional[list] = None, handle_array_conversion_errors: bool = True, value_on_array_conversion_error: str = "{}", - jsonb_columns: list = None, - table_id_column: str = None, + jsonb_columns: Optional[list] = None, + table_id_column: Optional[str] = None, df_id_column: str = None, - nullable_integer_columns: list = None, - timedelta_columns: list = None, + nullable_integer_columns: Optional[list] = None, + timedelta_columns: Optional[list] = None, enum_columns: list = None, - connection: Connection = None, - init_ddls: List[DDL] = None, - end_ddls: List[DDL] = None, - bytea_columns: list = None, + connection: Optional[Connection] = None, + init_ddls: Optional[List[DDL]] = None, + end_ddls: Optional[List[DDL]] = None, + bytea_columns: Optional[list] = None, ): r""" Load a DataFrame or GeoDataFrame to a database table using sqlalchemy. The table @@ -118,7 +119,20 @@ def load( - 'append' to append the data to rows already in the table - 'upsert' to append the rows to the table, replacing the rows whose id is already - + replace_with_truncate (bool): if `how` is `replace`, and + `replace_with_truncate` is `True`, the table to replace will be truncated + before loading the new data. If `how` is `replace`, and + `replace_with_truncate` is `False` (the default), the table to replace will + be deleted before loading the new data. If `how` is anything but `replace`, + `replace_with_truncate` is ignored. + TRUNCATE is more efficient than DELETE as the whole file holding table data + is dropped, rather than deleting rows one by one as DELETE does. + It also results in reallocating new pages and therefore results in table data + without any bloat (dead or free space in data pages). However, TRUNCATE + requires an ACCESS EXCLUSIVE lock on the table, which may conflict with other + database operations, notably `pg_dump` and `ALTER TABLE` commands, and result + in a deadlock and therefore downtime of the entire system during database + backup or migration. Use only if you know what you're doing. db_name (str, optional): Required if a `connection` is not provided. 'monitorfish_remote', 'monitorenv_remote' or 'monitorfish_local'. Defaults to None. @@ -194,6 +208,7 @@ def load( schema=schema, logger=logger, how=how, + replace_with_truncate=replace_with_truncate, table_id_column=table_id_column, df_id_column=df_id_column, init_ddls=init_ddls, @@ -207,6 +222,7 @@ def load( schema=schema, logger=logger, how=how, + replace_with_truncate=replace_with_truncate, table_id_column=table_id_column, df_id_column=df_id_column, init_ddls=init_ddls, @@ -222,10 +238,11 @@ def load_with_connection( schema: str, logger: logging.Logger, how: str = "replace", - table_id_column: Union[None, str] = None, - df_id_column: Union[None, str] = None, - init_ddls: List[DDL] = None, - end_ddls: List[DDL] = None, + replace_with_truncate: bool = False, + table_id_column: Optional[str] = None, + df_id_column: Optional[str] = None, + init_ddls: Optional[List[DDL]] = None, + end_ddls: Optional[List[DDL]] = None, ): if init_ddls: for ddl in init_ddls: @@ -233,8 +250,8 @@ def load_with_connection( table = get_table(table_name, schema, connection, logger) if how == "replace": - # Truncate table - utils.truncate([table], connection, logger) + # Delete table + utils.delete([table], connection, logger, truncate=replace_with_truncate) elif how == "upsert": # Delete rows that are in the DataFrame from the table diff --git a/datascience/src/pipeline/utils.py b/datascience/src/pipeline/utils.py index 947174a1d8..27b25724fa 100644 --- a/datascience/src/pipeline/utils.py +++ b/datascience/src/pipeline/utils.py @@ -53,22 +53,29 @@ def get_table( return table -def truncate( +def delete( tables: List[sqlalchemy.Table], connection: sqlalchemy.engine.base.Connection, logger: logging.Logger, + truncate: bool = False, ): - """Truncate tables. + """Delete tables. Useful to wipe tables before re-inserting fresh data in ETL jobs.""" for table in tables: count_statement = select(func.count()).select_from(table) n = connection.execute(count_statement).fetchall()[0][0] logger.info(f"Table {table.name} has {n} rows.") - tables_list = ", ".join([f'"{table.schema}"."{table.name}"' for table in tables]) - logger.info(f"Truncating tables {tables_list}...") - - connection.execute(text(f"TRUNCATE {tables_list}")) + if truncate: + tables_list = ", ".join( + [f'"{table.schema}"."{table.name}"' for table in tables] + ) + logger.info(f"Truncating tables {tables_list}...") + connection.execute(text(f"TRUNCATE {tables_list}")) + else: + for table in tables: + logger.info(f"Deleting table {table.name}...") + connection.execute(table.delete()) def delete_rows( From ce06c6ce141d6e229d86b1bf4511595f54250cad Mon Sep 17 00:00:00 2001 From: Vincent Date: Fri, 9 Aug 2024 15:08:48 +0200 Subject: [PATCH 2/2] Use truncate on selected flows --- datascience/src/pipeline/flows/admin_areas.py | 39 +++++++++++++++++++ datascience/src/pipeline/flows/anchorages.py | 4 +- .../src/pipeline/flows/control_units.py | 2 + .../src/pipeline/flows/facade_areas.py | 2 +- datascience/src/pipeline/flows/fao_areas.py | 1 + .../src/pipeline/flows/foreign_fmcs.py | 3 +- datascience/src/pipeline/flows/infractions.py | 3 +- datascience/src/pipeline/flows/missions.py | 1 + datascience/src/pipeline/flows/ports.py | 5 +-- datascience/src/pipeline/flows/vessels.py | 1 + datascience/src/pipeline/flows_config.py | 2 +- 11 files changed, 50 insertions(+), 13 deletions(-) diff --git a/datascience/src/pipeline/flows/admin_areas.py b/datascience/src/pipeline/flows/admin_areas.py index d642a4ef8d..be5e8f1e01 100644 --- a/datascience/src/pipeline/flows/admin_areas.py +++ b/datascience/src/pipeline/flows/admin_areas.py @@ -24,6 +24,7 @@ def load_cgpm_areas( db_name="monitorfish_remote", logger=prefect.context.get("logger"), how="replace", + replace_with_truncate=True, ) @@ -43,6 +44,7 @@ def load_cgpm_statistical_rectangles_areas( db_name="monitorfish_remote", logger=prefect.context.get("logger"), how="replace", + replace_with_truncate=True, ) @@ -62,6 +64,7 @@ def load_n_miles_to_shore_areas( db_name="monitorfish_remote", logger=prefect.context.get("logger"), how="replace", + replace_with_truncate=True, ) @@ -81,6 +84,7 @@ def load_3_miles_areas( db_name="monitorfish_remote", logger=prefect.context.get("logger"), how="replace", + replace_with_truncate=True, ) @@ -100,6 +104,7 @@ def load_6_miles_areas( db_name="monitorfish_remote", logger=prefect.context.get("logger"), how="replace", + replace_with_truncate=True, ) @@ -119,6 +124,7 @@ def load_12_miles_areas( db_name="monitorfish_remote", logger=prefect.context.get("logger"), how="replace", + replace_with_truncate=True, ) @@ -138,6 +144,7 @@ def load_eez_areas( db_name="monitorfish_remote", logger=prefect.context.get("logger"), how="replace", + replace_with_truncate=True, ) @@ -159,6 +166,7 @@ def load_1241_eaux_occidentales_australes_areas( db_name="monitorfish_remote", logger=prefect.context.get("logger"), how="replace", + replace_with_truncate=True, ) @@ -180,6 +188,7 @@ def load_1241_eaux_occidentales_septentrionales_areas( db_name="monitorfish_remote", logger=prefect.context.get("logger"), how="replace", + replace_with_truncate=True, ) @@ -201,6 +210,7 @@ def load_1241_eaux_union_dans_oi_et_atl_ouest_areas( db_name="monitorfish_remote", logger=prefect.context.get("logger"), how="replace", + replace_with_truncate=True, ) @@ -218,6 +228,7 @@ def load_1241_mer_baltique_areas(mer_baltique_areas: pd.DataFrame): db_name="monitorfish_remote", logger=prefect.context.get("logger"), how="replace", + replace_with_truncate=True, ) @@ -235,6 +246,7 @@ def load_1241_mer_du_nord_areas(mer_du_nord_areas: pd.DataFrame): db_name="monitorfish_remote", logger=prefect.context.get("logger"), how="replace", + replace_with_truncate=True, ) @@ -252,6 +264,7 @@ def load_1241_mer_mediterranee_areas(mer_mediterranee_areas: pd.DataFrame): db_name="monitorfish_remote", logger=prefect.context.get("logger"), how="replace", + replace_with_truncate=True, ) @@ -269,6 +282,7 @@ def load_1241_mer_noire_areas(mer_noire_areas: pd.DataFrame): db_name="monitorfish_remote", logger=prefect.context.get("logger"), how="replace", + replace_with_truncate=True, ) @@ -286,6 +300,7 @@ def load_1241_mer_celtique_areas(mer_celtique_areas: pd.DataFrame): db_name="monitorfish_remote", logger=prefect.context.get("logger"), how="replace", + replace_with_truncate=True, ) @@ -303,6 +318,7 @@ def load_aem_areas(aem_areas: pd.DataFrame): db_name="monitorfish_remote", logger=prefect.context.get("logger"), how="replace", + replace_with_truncate=True, ) @@ -320,6 +336,7 @@ def load_brexit_areas(brexit_areas: pd.DataFrame): db_name="monitorfish_remote", logger=prefect.context.get("logger"), how="replace", + replace_with_truncate=True, ) @@ -337,6 +354,7 @@ def load_cormoran_areas(cormoran_areas: pd.DataFrame): db_name="monitorfish_remote", logger=prefect.context.get("logger"), how="replace", + replace_with_truncate=True, ) @@ -354,6 +372,7 @@ def load_fao_ccamlr_areas(fao_ccamlr_areas: pd.DataFrame): db_name="monitorfish_remote", logger=prefect.context.get("logger"), how="replace", + replace_with_truncate=True, ) @@ -371,6 +390,7 @@ def load_fao_iccat_areas(fao_iccat_areas: pd.DataFrame): db_name="monitorfish_remote", logger=prefect.context.get("logger"), how="replace", + replace_with_truncate=True, ) @@ -388,6 +408,7 @@ def load_fao_iotc_areas(fao_iotc_areas: pd.DataFrame): db_name="monitorfish_remote", logger=prefect.context.get("logger"), how="replace", + replace_with_truncate=True, ) @@ -405,6 +426,7 @@ def load_fao_siofa_areas(fao_siofa_areas: pd.DataFrame): db_name="monitorfish_remote", logger=prefect.context.get("logger"), how="replace", + replace_with_truncate=True, ) @@ -422,6 +444,7 @@ def load_rectangles_stat_areas(rectangles_stat_areas: pd.DataFrame): db_name="monitorfish_remote", logger=prefect.context.get("logger"), how="replace", + replace_with_truncate=True, ) @@ -439,6 +462,7 @@ def load_situ_atlant_areas(situ_atlant_areas: pd.DataFrame): db_name="monitorfish_remote", logger=prefect.context.get("logger"), how="replace", + replace_with_truncate=True, ) @@ -456,6 +480,7 @@ def load_situ_med_areas(situ_med_areas: pd.DataFrame): db_name="monitorfish_remote", logger=prefect.context.get("logger"), how="replace", + replace_with_truncate=True, ) @@ -473,6 +498,7 @@ def load_situ_memn_areas(situ_memn_areas: pd.DataFrame): db_name="monitorfish_remote", logger=prefect.context.get("logger"), how="replace", + replace_with_truncate=True, ) @@ -490,6 +516,7 @@ def load_situ_outre_mer_areas(situ_outre_mer_areas: pd.DataFrame): db_name="monitorfish_remote", logger=prefect.context.get("logger"), how="replace", + replace_with_truncate=True, ) @@ -507,6 +534,7 @@ def load_situs_areas(situs_areas: pd.DataFrame): db_name="monitorfish_remote", logger=prefect.context.get("logger"), how="replace", + replace_with_truncate=True, ) @@ -524,6 +552,7 @@ def load_effort_zones_areas(effort_zones_areas: pd.DataFrame): db_name="monitorfish_remote", logger=prefect.context.get("logger"), how="replace", + replace_with_truncate=True, ) @@ -541,6 +570,7 @@ def load_neafc_regulatory_area(neafc_regulatory_area: pd.DataFrame): db_name="monitorfish_remote", logger=prefect.context.get("logger"), how="replace", + replace_with_truncate=True, ) @@ -558,6 +588,7 @@ def load_nafo_regulatory_area(nafo_regulatory_area: pd.DataFrame): db_name="monitorfish_remote", logger=prefect.context.get("logger"), how="replace", + replace_with_truncate=True, ) @@ -575,6 +606,7 @@ def load_navigation_category_two_areas(navigation_category_two_areas: pd.DataFra db_name="monitorfish_remote", logger=prefect.context.get("logger"), how="replace", + replace_with_truncate=True, ) @@ -592,6 +624,7 @@ def load_navigation_category_three_areas(navigation_category_three_areas: pd.Dat db_name="monitorfish_remote", logger=prefect.context.get("logger"), how="replace", + replace_with_truncate=True, ) @@ -609,6 +642,7 @@ def load_navigation_category_four_areas(navigation_category_four_areas: pd.DataF db_name="monitorfish_remote", logger=prefect.context.get("logger"), how="replace", + replace_with_truncate=True, ) @@ -626,6 +660,7 @@ def load_navigation_category_five_areas(navigation_category_five_areas: pd.DataF db_name="monitorfish_remote", logger=prefect.context.get("logger"), how="replace", + replace_with_truncate=True, ) @@ -643,6 +678,7 @@ def load_saltwater_limit_areas(saltwater_limit_areas: pd.DataFrame): db_name="monitorfish_remote", logger=prefect.context.get("logger"), how="replace", + replace_with_truncate=True, ) @@ -660,6 +696,7 @@ def load_transversal_sea_limit_areas(transversal_sea_limit_areas: pd.DataFrame): db_name="monitorfish_remote", logger=prefect.context.get("logger"), how="replace", + replace_with_truncate=True, ) @@ -677,6 +714,7 @@ def load_departments_areas(departments_areas: pd.DataFrame): db_name="monitorfish_remote", logger=prefect.context.get("logger"), how="replace", + replace_with_truncate=True, ) @@ -694,6 +732,7 @@ def load_land_areas(land_areas: pd.DataFrame): db_name="monitorfish_remote", logger=prefect.context.get("logger"), how="replace", + replace_with_truncate=True, ) diff --git a/datascience/src/pipeline/flows/anchorages.py b/datascience/src/pipeline/flows/anchorages.py index cbb9e078e1..0f778589c8 100644 --- a/datascience/src/pipeline/flows/anchorages.py +++ b/datascience/src/pipeline/flows/anchorages.py @@ -317,7 +317,6 @@ def get_ports_locations(ports: pd.DataFrame) -> List[PortLocation]: def get_anchorages_closest_port( anchorage_h3_cells_rings: pd.DataFrame, ports_locations: List[PortLocation] ) -> pd.DataFrame: - ports_vptree = PortsVPTree(ports_locations) anchorages_closest_port = anchorage_h3_cells_rings.apply( @@ -357,7 +356,6 @@ def unite_ports_locodes( def get_active_ports( ports: pd.DataFrame, active_ports_locodes: Set[str] ) -> pd.DataFrame: - active_ports = ports[ports.locode.isin(active_ports_locodes)].copy(deep=True) return active_ports @@ -405,7 +403,6 @@ def load_processed_anchorages(anchorages: pd.DataFrame): with Flow("Anchorages") as flow_compute_anchorages: - h3_resolution = Parameter("h3_resolution", ANCHORAGES_H3_CELL_RESOLUTION) number_signals_threshold = Parameter("number_signals_threshold", 100) static_vms_positions_file_relative_path = Parameter( @@ -519,6 +516,7 @@ def load_anchorages_to_monitorfish(anchorages: pd.DataFrame): db_name="monitorfish_remote", logger=prefect.context.get("logger"), how="replace", + replace_with_truncate=True, ) diff --git a/datascience/src/pipeline/flows/control_units.py b/datascience/src/pipeline/flows/control_units.py index 598e8bb665..ab5046004e 100644 --- a/datascience/src/pipeline/flows/control_units.py +++ b/datascience/src/pipeline/flows/control_units.py @@ -39,6 +39,7 @@ def load_analytics_control_units_and_administrations( connection=connection, logger=logger, how="replace", + replace_with_truncate=True, ) load( @@ -48,6 +49,7 @@ def load_analytics_control_units_and_administrations( connection=connection, logger=logger, how="replace", + replace_with_truncate=True, ) diff --git a/datascience/src/pipeline/flows/facade_areas.py b/datascience/src/pipeline/flows/facade_areas.py index 4089045468..1519b74df3 100644 --- a/datascience/src/pipeline/flows/facade_areas.py +++ b/datascience/src/pipeline/flows/facade_areas.py @@ -22,7 +22,6 @@ def extract_facade_areas() -> pd.DataFrame: @task(checkpoint=False) def load_facade_areas(facade_areas: pd.DataFrame): - logger = prefect.context.get("logger") load( @@ -32,6 +31,7 @@ def load_facade_areas(facade_areas: pd.DataFrame): db_name="monitorfish_remote", logger=logger, how="replace", + replace_with_truncate=True, ) diff --git a/datascience/src/pipeline/flows/fao_areas.py b/datascience/src/pipeline/flows/fao_areas.py index 8854e9a738..38edc84a70 100644 --- a/datascience/src/pipeline/flows/fao_areas.py +++ b/datascience/src/pipeline/flows/fao_areas.py @@ -56,6 +56,7 @@ def load_fao_areas(fao_areas: gpd.GeoDataFrame): db_name="monitorfish_remote", logger=logger, how="replace", + replace_with_truncate=True, ) diff --git a/datascience/src/pipeline/flows/foreign_fmcs.py b/datascience/src/pipeline/flows/foreign_fmcs.py index 3ee31e98a0..2bdad5b829 100644 --- a/datascience/src/pipeline/flows/foreign_fmcs.py +++ b/datascience/src/pipeline/flows/foreign_fmcs.py @@ -66,15 +66,14 @@ def load_foreign_fmcs(foreign_fmcs): db_name="monitorfish_remote", logger=prefect.context.get("logger"), how="replace", + replace_with_truncate=True, pg_array_columns=["email_addresses"], ) with Flow("Foreign FMCs", executor=LocalDaskExecutor()) as flow: - flow_not_running = check_flow_not_running() with case(flow_not_running, True): - foreign_fmcs_contacts = extract_foreign_fmcs_contacts() foreign_fmcs = transform_foreign_fmcs_contacts(foreign_fmcs_contacts) load_foreign_fmcs(foreign_fmcs) diff --git a/datascience/src/pipeline/flows/infractions.py b/datascience/src/pipeline/flows/infractions.py index c03618183f..7749111667 100644 --- a/datascience/src/pipeline/flows/infractions.py +++ b/datascience/src/pipeline/flows/infractions.py @@ -28,14 +28,13 @@ def load_infractions(infractions): db_name="monitorfish_remote", logger=prefect.context.get("logger"), how="replace", + replace_with_truncate=True, ) with Flow("Infractions", executor=LocalDaskExecutor()) as flow: - flow_not_running = check_flow_not_running() with case(flow_not_running, True): - infractions = extract_infractions() infractions = clean_infractions(infractions) load_infractions(infractions) diff --git a/datascience/src/pipeline/flows/missions.py b/datascience/src/pipeline/flows/missions.py index 412795ba64..c28185a437 100644 --- a/datascience/src/pipeline/flows/missions.py +++ b/datascience/src/pipeline/flows/missions.py @@ -117,6 +117,7 @@ def load_missions_and_missions_control_units( tables=[analytics_missions_table, analytics_missions_control_units_table], connection=connection, logger=prefect.context.get("logger"), + truncate=True, ) load( diff --git a/datascience/src/pipeline/flows/ports.py b/datascience/src/pipeline/flows/ports.py index 24cdc5a1ed..1749c74d08 100644 --- a/datascience/src/pipeline/flows/ports.py +++ b/datascience/src/pipeline/flows/ports.py @@ -304,7 +304,6 @@ def extract_circabc_ports(): @task(checkpoint=False) def merge_circabc_unece(circabc_ports, unece_ports): - keep_unece_cols = ["region", "locode", "latitude", "longitude"] ports = pd.merge( @@ -607,7 +606,6 @@ def flag_active_ports(ports: pd.DataFrame, active_ports_locodes: set) -> pd.Data @task(checkpoint=False) def geocode_ports(ports): - country_codes_to_geocode = [ "FR", "MQ", @@ -874,15 +872,14 @@ def load_ports(ports): db_name="monitorfish_remote", logger=prefect.context.get("logger"), how="replace", + replace_with_truncate=True, pg_array_columns=["fao_areas"], ) with Flow("Ports", executor=LocalDaskExecutor()) as flow: - flow_not_running = check_flow_not_running() with case(flow_not_running, True): - # Parameters dataset_id = Parameter("dataset_id", default=PORTS_DATASET_ID) ports_resource_id = Parameter( diff --git a/datascience/src/pipeline/flows/vessels.py b/datascience/src/pipeline/flows/vessels.py index 7a1e0671f4..4a3a169f49 100644 --- a/datascience/src/pipeline/flows/vessels.py +++ b/datascience/src/pipeline/flows/vessels.py @@ -399,6 +399,7 @@ def load_vessels(all_vessels: pd.DataFrame): db_name="monitorfish_remote", logger=prefect.context.get("logger"), how="replace", + replace_with_truncate=True, pg_array_columns=[ "declared_fishing_gears", "operator_phones", diff --git a/datascience/src/pipeline/flows_config.py b/datascience/src/pipeline/flows_config.py index 4e6b59d11b..c2c5e48fc9 100644 --- a/datascience/src/pipeline/flows_config.py +++ b/datascience/src/pipeline/flows_config.py @@ -283,7 +283,7 @@ ), ] ) -vessels.flow.schedule = CronSchedule("2 2,5,8,11,14,17,20,23 * * *") +vessels.flow.schedule = CronSchedule("2 2,5,8,11,14,20,23 * * *") ###################### List flows to register with prefect server #####################