From 6a656c84591720c6a4f1caeff3f26123c3ae8bed Mon Sep 17 00:00:00 2001 From: JaredMcLellan Date: Fri, 20 Dec 2024 10:36:53 -0400 Subject: [PATCH 1/2] Updated ERDDAP cacher to grab IBtracs storm information --- ERDDAP/cache_ERDDAP.py | 148 +++++++++++++++++++++++++++++++---------- 1 file changed, 113 insertions(+), 35 deletions(-) diff --git a/ERDDAP/cache_ERDDAP.py b/ERDDAP/cache_ERDDAP.py index 53096ab..8728f39 100755 --- a/ERDDAP/cache_ERDDAP.py +++ b/ERDDAP/cache_ERDDAP.py @@ -15,6 +15,8 @@ import sys import argparse import json +import numpy as np +from datetime import datetime, timedelta, timezone log = logging.getLogger('caching.log') handler = RotatingFileHandler('caching.log', maxBytes=2000, backupCount=10) @@ -40,6 +42,7 @@ erddap_cache_historical_schema = os.getenv('ERDDAP_CACHE_HISTORICAL_SCHEMA') pg_erddap_cache_active_table = os.getenv('ERDDAP_CACHE_ACTIVE_TABLE') erddap_cache_active_schema = os.getenv('ERDDAP_CACHE_ACTIVE_SCHEMA') +pg_ibtracs_historical_table = os.getenv('PG_IBTRACS_HISTORICAL_TABLE') docker_user = {'docker'} @@ -61,21 +64,31 @@ unit_overrides = json.load(unit_override_file) #process_ibtracs(df = , destination_table=pg_ibtracs_active_table, pg_engine=engine, table_schema=erddap_cache_schema) -def cache_erddap_data(df, destination_table, pg_engine, table_schema, replace=False): +def cache_erddap_data(storm_id, df, destination_table, pg_engine, table_schema, replace=False): # populate table print("Populating Table...") + with pg_engine.begin() as pg_conn: - sql = f"TRUNCATE public.{destination_table};" + + if(storm_id): + sql = f"DELETE FROM public.{destination_table} WHERE storm = '{storm_id}';" + else: + #Clear old storm data + sql = f"TRUNCATE public.{destination_table};" pg_conn.execute(text(sql)) + #Clear old storm data result = df.to_sql(destination_table, pg_engine, chunksize=1000, method='multi', if_exists='append', index=False, schema='public') - - with pg_engine.begin() as pg_conn: + + + with pg_engine.begin() as pg_conn: + """ print(f"Adding geom column") sql = f"ALTER TABLE public.{destination_table} ADD COLUMN IF NOT EXISTS geom geometry(Point, 4326);" pg_conn.execute(text(sql)) + """ print("Updating Geometry...") sql = f'UPDATE public.{destination_table} SET geom = ST_SetSRID(ST_MakePoint("min_lon", "min_lat"), 4326);' @@ -255,10 +268,47 @@ def find_df_column_by_standard_name(df, standard_name): column = df.filter(regex='\(' + standard_name + '\|')#.columns.values[0] return column +def get_historical_storm_list(storm=None, min_year= None, max_year=None): + # Conect to IBTracs storm table + # Filter post-2000 storms + # Put in pandas database, group by storm name and season + # Set storm ID to YYYY_storm name + # Set min and max time to whatever the storm is + / - 2 days + df = get_postgres_data(source_table=pg_ibtracs_historical_table, pg_engine=engine) + df = df.groupby(['SEASON','NAME'], as_index=False).agg({'ISO_TIME':[np.min, np.max]}) + df = df.loc[(df['NAME'] != "UNNAMED")] + if(min_year): + df = df.loc[(df['SEASON'] >= min_year)] + if(max_year): + df = df.loc[(df['SEASON'] <= max_year)] + if(storm): + df = df.loc[(df['NAME'] == storm.split("_")[1].upper())] + df = df.loc[(df['SEASON'] == int(storm.split("_")[0]))] + return df + +def get_postgres_data(source_table, pg_engine, table_schema=None): + # populate table + print("Getting table data...") + log.info(source_table) + with pg_engine.begin() as pg_conn: + sql_text = text(f'SELECT * FROM public.{source_table} WHERE "SEASON" > 2000') + data = pd.read_sql_query(sql=sql_text, con=pg_conn, parse_dates=['ISO_TIME']) + return data + +# Returns ERDDAP datasets active within a range of time that match important variables +# min_time and max_time are datetime objects +def get_erddap_datasets(min_time, max_time): + min_time = datetime.strftime(min_time,'%Y-%m-%dT%H:%M:%SZ') + max_time = datetime.strftime(max_time,'%Y-%m-%dT%H:%M:%SZ') + search_url = e.get_search_url(response="csv", min_time=min_time, + max_time=max_time) + search = pd.read_csv(search_url) + dataset_list = search["Dataset ID"].values + return dataset_list + def main(): - from datetime import datetime, timedelta, timezone import re - def storm_format (arg_value, pattern=re.compile("[0-9]{4}_[a-z].*")): + def storm_format (arg_value, pattern=re.compile("[0-9]{4}_[A-Z].*")): if not pattern.match(arg_value): raise argparse.ArgumentTypeError("invalid storm format") return arg_value @@ -281,20 +331,54 @@ def datetime_format (arg_value): subparsers.required = True parser_historical = subparsers.add_parser("historical") + parser_historical.add_argument("--storm", help="The storm identifier, in the format of YYYY_stormname (lowercase). Example: 2022_fiona", nargs='?', type=storm_format) + parser_historical.add_argument("--min", help="The start time of data in the storm interval. Format: YYYY", nargs='?', type=int) + parser_historical.add_argument("--max", help="The end time of data in the storm interval. Format: YYYY", nargs='?', type=int) + """ parser_historical.add_argument("storm", help="The storm identifier, in the format of YYYY_stormname (lowercase). Example: 2022_fiona", type=storm_format) parser_historical.add_argument("min_time", help="The start time of data in the storm interval. Format: YYYY-mm-ddTHH:MM:SSZ", type=datetime_format) parser_historical.add_argument("max_time", help="The end time of data in the storm interval. Format: YYYY-mm-ddTHH:MM:SSZ", type=datetime_format) - + """ parser_active = subparsers.add_parser("active") args = parser.parse_args() if(args.subcommand == 'historical'): - storm_id = args.storm - min_time = datetime.strptime(args.min_time, '%Y-%m-%dT%H:%M:%SZ') - max_time = datetime.strptime(args.max_time, '%Y-%m-%dT%H:%M:%SZ') + arg_storm = args.storm + arg_year_min = args.min + arg_year_max = args.max + #min_time = datetime.strptime(args.min_time, '%Y-%m-%dT%H:%M:%SZ') + #max_time = datetime.strptime(args.max_time, '%Y-%m-%dT%H:%M:%SZ') + + storms = get_historical_storm_list(arg_storm, arg_year_min, arg_year_max) + """ if(min_time > max_time ): raise argparse.ArgumentTypeError("End time is before start time") + """ + #create_table_from_schema(pg_engine=engine, table_name=pg_erddap_cache_historical_table, schema_file=erddap_cache_historical_schema) + for i, storm in storms.iterrows(): + print(storm) + storm_id = str(storm['SEASON'].values[0]) + "_" + storm['NAME'].values[0] + min_time = storm['ISO_TIME']['min'] + max_time = storm['ISO_TIME']['max'] + dataset_list = get_erddap_datasets(min_time, max_time) + cached_data = [] + print(dataset_list) + # Store in shared list to reduce calls and avoid overwriting for active cache + for dataset_id in dataset_list: + # Interrogate each dataset for the list of variable names using the list + # of standard names above. If a dataset does not have any of those variables it + # will be skipped + dataset = match_standard_names(dataset_id) + if (dataset and dataset_id not in ignore_stations): + cached_data.extend(cache_station_data(dataset, dataset_id, storm_id, + min_time=datetime.strftime(min_time,'%Y-%m-%dT%H:%M:%SZ'), + max_time=datetime.strftime(max_time,'%Y-%m-%dT%H:%M:%SZ'))) + if(cached_data): + print('Caching historical storm...') + cache_erddap_data(storm_id = storm_id, df=pd.DataFrame(cached_data),destination_table=pg_erddap_cache_historical_table, + pg_engine=engine,table_schema=erddap_cache_historical_schema) + # ACTIVE else: storm_id = "ACTIVE" max_time = datetime.now(timezone.utc) @@ -304,33 +388,27 @@ def datetime_format (arg_value): else: max_time = datetime.combine(max_time.date(), datetime.min.time()) + timedelta(hours=12) min_time = max_time - timedelta(days=active_data_period) + dataset_list = get_erddap_datasets(min_time, max_time) + create_table_from_schema(pg_engine=engine, table_name=pg_erddap_cache_active_table, schema_file=erddap_cache_active_schema) + # Store in shared list to reduce calls and avoid overwriting for active cache + cached_data = [] + for dataset_id in dataset_list: + # Interrogate each dataset for the list of variable names using the list + # of standard names above. If a dataset does not have any of those variables it + # will be skipped + dataset = match_standard_names(dataset_id) + if (dataset and dataset_id not in ignore_stations): + cached_data.extend(cache_station_data(dataset, dataset_id, storm_id, + min_time=datetime.strftime(min_time,'%Y-%m-%dT%H:%M:%SZ'), + max_time=datetime.strftime(max_time,'%Y-%m-%dT%H:%M:%SZ'))) + if(cached_data): + print("Caching active storm...") + cache_erddap_data(storm_id=storm_id, df=pd.DataFrame(cached_data),destination_table=pg_erddap_cache_active_table, + pg_engine=engine,table_schema=erddap_cache_active_schema,replace=True) + # Get datasets that have data within the times - search_url = e.get_search_url(response="csv", min_time=datetime.strftime(min_time,'%Y-%m-%dT%H:%M:%SZ'), - max_time=datetime.strftime(max_time,'%Y-%m-%dT%H:%M:%SZ')) - search = pd.read_csv(search_url) - dataset_list = search["Dataset ID"].values - - create_table_from_schema(pg_engine=engine, table_name=pg_erddap_cache_active_table, schema_file=erddap_cache_active_schema) - # Store in shared list to reduce calls and avoid overwriting for active cache - cached_data = [] - for dataset_id in dataset_list: - # Interrogate each dataset for the list of variable names using the list - # of standard names above. If a dataset does not have any of those variables it - # will be skipped - dataset = match_standard_names(dataset_id) - if (dataset and dataset_id not in ignore_stations): - cached_data.extend(cache_station_data(dataset, dataset_id, storm_id, - min_time=datetime.strftime(min_time,'%Y-%m-%dT%H:%M:%SZ'), - max_time=datetime.strftime(max_time,'%Y-%m-%dT%H:%M:%SZ'))) - if(cached_data and storm_id =="ACTIVE"): - print("Caching active storm...") - cache_erddap_data(df=pd.DataFrame(cached_data),destination_table=pg_erddap_cache_active_table, - pg_engine=engine,table_schema=erddap_cache_active_schema,replace=True) - elif(cached_data): - print('Caching historical storm...') - cache_erddap_data(df=pd.DataFrame(cached_data),destination_table=pg_erddap_cache_historical_table, - pg_engine=engine,table_schema=erddap_cache_historical_schema) + if __name__ == '__main__': main() \ No newline at end of file From 4fe3f60fd6dca4894d686ab1035e94b887495002 Mon Sep 17 00:00:00 2001 From: JaredMcLellan Date: Fri, 20 Dec 2024 10:56:27 -0400 Subject: [PATCH 2/2] Cut out some print statements --- ERDDAP/cache_ERDDAP.py | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/ERDDAP/cache_ERDDAP.py b/ERDDAP/cache_ERDDAP.py index 8728f39..52fe586 100755 --- a/ERDDAP/cache_ERDDAP.py +++ b/ERDDAP/cache_ERDDAP.py @@ -84,31 +84,29 @@ def cache_erddap_data(storm_id, df, destination_table, pg_engine, table_schema, with pg_engine.begin() as pg_conn: - """ - print(f"Adding geom column") + #print(f"Adding geom column") sql = f"ALTER TABLE public.{destination_table} ADD COLUMN IF NOT EXISTS geom geometry(Point, 4326);" pg_conn.execute(text(sql)) - """ - print("Updating Geometry...") + #print("Updating Geometry...") sql = f'UPDATE public.{destination_table} SET geom = ST_SetSRID(ST_MakePoint("min_lon", "min_lat"), 4326);' pg_conn.execute(text(sql)) # TODO: Add users to env file. Caused errors when attempting through variable - print("Providing docker with permissions...") + #print("Providing docker with permissions...") sql = f"GRANT ALL ON public.{destination_table} TO docker;" sql = f"GRANT SELECT ON public.{destination_table} TO hurricane_dash_geoserver;" pg_conn.execute(text(sql)) - print("Committing Transaction.") + #print("Committing Transaction.") pg_conn.execute(text("COMMIT;")) - print("Fin.") + print("Cached" + storm_id) return def create_table_from_schema(pg_engine, table_name, schema_file, pg_schema='public'): # Create ECCC Tables if not exist with pg_engine.begin() as pg_conn: - print(f"Creating Table {table_name} (if not exists)...") + #print(f"Creating Table {table_name} (if not exists)...") sql = f"SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname = '{pg_schema}' AND tablename = '{table_name}');" result = pg_conn.execute(text(sql)) @@ -118,11 +116,11 @@ def create_table_from_schema(pg_engine, table_name, schema_file, pg_schema='publ sql = Path(schema_file).read_text() pg_conn.execute(text(sql)) - print(f"Adding geom column") + #print(f"Adding geom column") sql = f"ALTER TABLE {pg_schema}.{table_name} ADD COLUMN IF NOT EXISTS geom geometry(Point, 4326);" pg_conn.execute(text(sql)) - print("Committing Transaction.") + #print("Committing Transaction.") pg_conn.execute(text("COMMIT;")) # Extracts data from the erddap metadata Pandas dataframe, NC_GLOBAL and @@ -255,7 +253,7 @@ def cache_station_data(dataset, dataset_id, storm_id, min_time, max_time): cached_entries.append(entry) # time in ISO format or set column to timestamp (UTC for timezone) prev_interval = interval - print(dataset_id + " cached") + #print(dataset_id + " cached") return cached_entries except Exception as ex: print("HTTPStatusError", ex) @@ -363,7 +361,6 @@ def datetime_format (arg_value): max_time = storm['ISO_TIME']['max'] dataset_list = get_erddap_datasets(min_time, max_time) cached_data = [] - print(dataset_list) # Store in shared list to reduce calls and avoid overwriting for active cache for dataset_id in dataset_list: # Interrogate each dataset for the list of variable names using the list