Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

38 auto run historical stations #54

Open
wants to merge 3 commits into
base: dev
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 119 additions & 44 deletions ERDDAP/cache_ERDDAP.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import sys
import argparse
import json
import numpy as np
from datetime import datetime, timedelta, timezone

log = logging.getLogger('caching.log')
handler = RotatingFileHandler('caching.log', maxBytes=2000, backupCount=10)
Expand All @@ -40,6 +42,7 @@
erddap_cache_historical_schema = os.getenv('ERDDAP_CACHE_HISTORICAL_SCHEMA')
pg_erddap_cache_active_table = os.getenv('ERDDAP_CACHE_ACTIVE_TABLE')
erddap_cache_active_schema = os.getenv('ERDDAP_CACHE_ACTIVE_SCHEMA')
pg_ibtracs_historical_table = os.getenv('PG_IBTRACS_HISTORICAL_TABLE')

docker_user = {'docker'}

Expand All @@ -61,41 +64,49 @@
unit_overrides = json.load(unit_override_file)

#process_ibtracs(df = , destination_table=pg_ibtracs_active_table, pg_engine=engine, table_schema=erddap_cache_schema)
def cache_erddap_data(df, destination_table, pg_engine, table_schema, replace=False):
def cache_erddap_data(storm_id, df, destination_table, pg_engine, table_schema, replace=False):
# populate table
print("Populating Table...")


with pg_engine.begin() as pg_conn:
sql = f"TRUNCATE public.{destination_table};"

if(storm_id):
sql = f"DELETE FROM public.{destination_table} WHERE storm = '{storm_id}';"
else:
#Clear old storm data
sql = f"TRUNCATE public.{destination_table};"
pg_conn.execute(text(sql))
#Clear old storm data

result = df.to_sql(destination_table, pg_engine, chunksize=1000, method='multi',
if_exists='append', index=False, schema='public')

with pg_engine.begin() as pg_conn:
print(f"Adding geom column")


with pg_engine.begin() as pg_conn:
#print(f"Adding geom column")
sql = f"ALTER TABLE public.{destination_table} ADD COLUMN IF NOT EXISTS geom geometry(Point, 4326);"
pg_conn.execute(text(sql))

print("Updating Geometry...")
#print("Updating Geometry...")
sql = f'UPDATE public.{destination_table} SET geom = ST_SetSRID(ST_MakePoint("min_lon", "min_lat"), 4326);'
pg_conn.execute(text(sql))

# TODO: Add users to env file. Caused errors when attempting through variable
print("Providing docker with permissions...")
#print("Providing docker with permissions...")
sql = f"GRANT ALL ON public.{destination_table} TO docker;"
sql = f"GRANT SELECT ON public.{destination_table} TO hurricane_dash_geoserver;"
pg_conn.execute(text(sql))

print("Committing Transaction.")
#print("Committing Transaction.")
pg_conn.execute(text("COMMIT;"))
print("Fin.")
print("Cached" + storm_id)
return

def create_table_from_schema(pg_engine, table_name, schema_file, pg_schema='public'):
# Create ECCC Tables if not exist
with pg_engine.begin() as pg_conn:
print(f"Creating Table {table_name} (if not exists)...")
#print(f"Creating Table {table_name} (if not exists)...")

sql = f"SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname = '{pg_schema}' AND tablename = '{table_name}');"
result = pg_conn.execute(text(sql))
Expand All @@ -105,11 +116,11 @@ def create_table_from_schema(pg_engine, table_name, schema_file, pg_schema='publ
sql = Path(schema_file).read_text()
pg_conn.execute(text(sql))

print(f"Adding geom column")
#print(f"Adding geom column")
sql = f"ALTER TABLE {pg_schema}.{table_name} ADD COLUMN IF NOT EXISTS geom geometry(Point, 4326);"
pg_conn.execute(text(sql))

print("Committing Transaction.")
#print("Committing Transaction.")
pg_conn.execute(text("COMMIT;"))

# Extracts data from the erddap metadata Pandas dataframe, NC_GLOBAL and
Expand Down Expand Up @@ -242,7 +253,7 @@ def cache_station_data(dataset, dataset_id, storm_id, min_time, max_time):
cached_entries.append(entry)
# time in ISO format or set column to timestamp (UTC for timezone)
prev_interval = interval
print(dataset_id + " cached")
#print(dataset_id + " cached")
return cached_entries
except Exception as ex:
print("HTTPStatusError", ex)
Expand All @@ -255,10 +266,47 @@ def find_df_column_by_standard_name(df, standard_name):
column = df.filter(regex='\(' + standard_name + '\|')#.columns.values[0]
return column

def get_historical_storm_list(storm=None, min_year= None, max_year=None):
# Conect to IBTracs storm table
# Filter post-2000 storms
# Put in pandas database, group by storm name and season
# Set storm ID to YYYY_storm name
# Set min and max time to whatever the storm is + / - 2 days
df = get_postgres_data(source_table=pg_ibtracs_historical_table, pg_engine=engine)
df = df.groupby(['SEASON','NAME'], as_index=False).agg({'ISO_TIME':[np.min, np.max]})
df = df.loc[(df['NAME'] != "UNNAMED")]
if(min_year):
df = df.loc[(df['SEASON'] >= min_year)]
if(max_year):
df = df.loc[(df['SEASON'] <= max_year)]
if(storm):
df = df.loc[(df['NAME'] == storm.split("_")[1].upper())]
df = df.loc[(df['SEASON'] == int(storm.split("_")[0]))]
return df

def get_postgres_data(source_table, pg_engine, table_schema=None):
# populate table
print("Getting table data...")
log.info(source_table)
with pg_engine.begin() as pg_conn:
sql_text = text(f'SELECT * FROM public.{source_table} WHERE "SEASON" > 2000')
data = pd.read_sql_query(sql=sql_text, con=pg_conn, parse_dates=['ISO_TIME'])
return data

# Returns ERDDAP datasets active within a range of time that match important variables
# min_time and max_time are datetime objects
def get_erddap_datasets(min_time, max_time):
min_time = datetime.strftime(min_time,'%Y-%m-%dT%H:%M:%SZ')
max_time = datetime.strftime(max_time,'%Y-%m-%dT%H:%M:%SZ')
search_url = e.get_search_url(response="csv", min_time=min_time,
max_time=max_time)
search = pd.read_csv(search_url)
dataset_list = search["Dataset ID"].values
return dataset_list

def main():
from datetime import datetime, timedelta, timezone
import re
def storm_format (arg_value, pattern=re.compile("[0-9]{4}_[a-z].*")):
def storm_format (arg_value, pattern=re.compile("[0-9]{4}_[A-Z].*")):
if not pattern.match(arg_value):
raise argparse.ArgumentTypeError("invalid storm format")
return arg_value
Expand All @@ -281,20 +329,53 @@ def datetime_format (arg_value):
subparsers.required = True

parser_historical = subparsers.add_parser("historical")
parser_historical.add_argument("--storm", help="The storm identifier, in the format of YYYY_stormname (lowercase). Example: 2022_fiona", nargs='?', type=storm_format)
parser_historical.add_argument("--min", help="The start time of data in the storm interval. Format: YYYY", nargs='?', type=int)
parser_historical.add_argument("--max", help="The end time of data in the storm interval. Format: YYYY", nargs='?', type=int)
"""
parser_historical.add_argument("storm", help="The storm identifier, in the format of YYYY_stormname (lowercase). Example: 2022_fiona", type=storm_format)
parser_historical.add_argument("min_time", help="The start time of data in the storm interval. Format: YYYY-mm-ddTHH:MM:SSZ", type=datetime_format)
parser_historical.add_argument("max_time", help="The end time of data in the storm interval. Format: YYYY-mm-ddTHH:MM:SSZ", type=datetime_format)

"""
parser_active = subparsers.add_parser("active")

args = parser.parse_args()

if(args.subcommand == 'historical'):
storm_id = args.storm
min_time = datetime.strptime(args.min_time, '%Y-%m-%dT%H:%M:%SZ')
max_time = datetime.strptime(args.max_time, '%Y-%m-%dT%H:%M:%SZ')
arg_storm = args.storm
arg_year_min = args.min
arg_year_max = args.max
#min_time = datetime.strptime(args.min_time, '%Y-%m-%dT%H:%M:%SZ')
#max_time = datetime.strptime(args.max_time, '%Y-%m-%dT%H:%M:%SZ')

storms = get_historical_storm_list(arg_storm, arg_year_min, arg_year_max)
"""
if(min_time > max_time ):
raise argparse.ArgumentTypeError("End time is before start time")
"""
#create_table_from_schema(pg_engine=engine, table_name=pg_erddap_cache_historical_table, schema_file=erddap_cache_historical_schema)
for i, storm in storms.iterrows():
print(storm)
storm_id = str(storm['SEASON'].values[0]) + "_" + storm['NAME'].values[0]
min_time = storm['ISO_TIME']['min']
max_time = storm['ISO_TIME']['max']
dataset_list = get_erddap_datasets(min_time, max_time)
cached_data = []
# Store in shared list to reduce calls and avoid overwriting for active cache
for dataset_id in dataset_list:
# Interrogate each dataset for the list of variable names using the list
# of standard names above. If a dataset does not have any of those variables it
# will be skipped
dataset = match_standard_names(dataset_id)
if (dataset and dataset_id not in ignore_stations):
cached_data.extend(cache_station_data(dataset, dataset_id, storm_id,
min_time=datetime.strftime(min_time,'%Y-%m-%dT%H:%M:%SZ'),
max_time=datetime.strftime(max_time,'%Y-%m-%dT%H:%M:%SZ')))
if(cached_data):
print('Caching historical storm...')
cache_erddap_data(storm_id = storm_id, df=pd.DataFrame(cached_data),destination_table=pg_erddap_cache_historical_table,
pg_engine=engine,table_schema=erddap_cache_historical_schema)
# ACTIVE
else:
storm_id = "ACTIVE"
max_time = datetime.now(timezone.utc)
Expand All @@ -304,33 +385,27 @@ def datetime_format (arg_value):
else:
max_time = datetime.combine(max_time.date(), datetime.min.time()) + timedelta(hours=12)
min_time = max_time - timedelta(days=active_data_period)
dataset_list = get_erddap_datasets(min_time, max_time)
create_table_from_schema(pg_engine=engine, table_name=pg_erddap_cache_active_table, schema_file=erddap_cache_active_schema)
# Store in shared list to reduce calls and avoid overwriting for active cache
cached_data = []
for dataset_id in dataset_list:
# Interrogate each dataset for the list of variable names using the list
# of standard names above. If a dataset does not have any of those variables it
# will be skipped
dataset = match_standard_names(dataset_id)
if (dataset and dataset_id not in ignore_stations):
cached_data.extend(cache_station_data(dataset, dataset_id, storm_id,
min_time=datetime.strftime(min_time,'%Y-%m-%dT%H:%M:%SZ'),
max_time=datetime.strftime(max_time,'%Y-%m-%dT%H:%M:%SZ')))
if(cached_data):
print("Caching active storm...")
cache_erddap_data(storm_id=storm_id, df=pd.DataFrame(cached_data),destination_table=pg_erddap_cache_active_table,
pg_engine=engine,table_schema=erddap_cache_active_schema,replace=True)


# Get datasets that have data within the times
search_url = e.get_search_url(response="csv", min_time=datetime.strftime(min_time,'%Y-%m-%dT%H:%M:%SZ'),
max_time=datetime.strftime(max_time,'%Y-%m-%dT%H:%M:%SZ'))
search = pd.read_csv(search_url)
dataset_list = search["Dataset ID"].values

create_table_from_schema(pg_engine=engine, table_name=pg_erddap_cache_active_table, schema_file=erddap_cache_active_schema)
# Store in shared list to reduce calls and avoid overwriting for active cache
cached_data = []
for dataset_id in dataset_list:
# Interrogate each dataset for the list of variable names using the list
# of standard names above. If a dataset does not have any of those variables it
# will be skipped
dataset = match_standard_names(dataset_id)
if (dataset and dataset_id not in ignore_stations):
cached_data.extend(cache_station_data(dataset, dataset_id, storm_id,
min_time=datetime.strftime(min_time,'%Y-%m-%dT%H:%M:%SZ'),
max_time=datetime.strftime(max_time,'%Y-%m-%dT%H:%M:%SZ')))
if(cached_data and storm_id =="ACTIVE"):
print("Caching active storm...")
cache_erddap_data(df=pd.DataFrame(cached_data),destination_table=pg_erddap_cache_active_table,
pg_engine=engine,table_schema=erddap_cache_active_schema,replace=True)
elif(cached_data):
print('Caching historical storm...')
cache_erddap_data(df=pd.DataFrame(cached_data),destination_table=pg_erddap_cache_historical_table,
pg_engine=engine,table_schema=erddap_cache_historical_schema)


if __name__ == '__main__':
main()