diff --git a/quotaclimat/data_processing/mediatree/api_import.py b/quotaclimat/data_processing/mediatree/api_import.py index a13f93e9..d8a95545 100644 --- a/quotaclimat/data_processing/mediatree/api_import.py +++ b/quotaclimat/data_processing/mediatree/api_import.py @@ -201,7 +201,7 @@ def extract_api_sub( if(df is not None): df = filter_and_tag_by_theme(df) - df["id"] = add_primary_key(df) + df["id"] = df.apply(lambda x: add_primary_key(x), axis=1) return df else: None diff --git a/quotaclimat/data_processing/mediatree/detect_keywords.py b/quotaclimat/data_processing/mediatree/detect_keywords.py index fd85104e..a5baa28c 100644 --- a/quotaclimat/data_processing/mediatree/detect_keywords.py +++ b/quotaclimat/data_processing/mediatree/detect_keywords.py @@ -262,15 +262,14 @@ def filter_and_tag_by_theme(df: pd.DataFrame) -> pd.DataFrame : return df -def add_primary_key(df): - logging.info("Adding primary key to save to PG and have idempotent result") +def add_primary_key(row): + logging.info(f"Adding primary key to save to PG and have idempotent results") try: - return ( - df["start"].astype(str) + df["channel_name"] - ).apply(get_consistent_hash) + return get_consistent_hash(str(row["start"].timestamp()) + row["channel_name"]) + except (Exception) as error: - logging.error(f"{error} with df {df.head()}") - return get_consistent_hash("empty") # TODO improve - should be a None ? + logging.error(f"{error} with df {row}") + raise Exception def filter_indirect_words(keywords_with_timestamp: List[dict]) -> List[dict]: return list(filter(lambda kw: indirectes not in kw['theme'], keywords_with_timestamp)) diff --git a/test/sitemap/test_main_import_api.py b/test/sitemap/test_main_import_api.py index df2c62fa..5d02332d 100644 --- a/test/sitemap/test_main_import_api.py +++ b/test/sitemap/test_main_import_api.py @@ -23,7 +23,7 @@ def test_main_api_import(): start_time = t.time() df = parse_reponse_subtitle(json_response) df = filter_and_tag_by_theme(df) - df["id"] = add_primary_key(df) + df["id"] = df.apply(lambda x: add_primary_key(x), axis=1) end_time = t.time() logging.info(f"Elapsed time for api import {end_time - start_time}") # must df._to_pandas() because to_sql does not handle modin dataframe