Skip to content

Commit

Permalink
Merge pull request #3 from RENCI/Remove-ASGS
Browse files Browse the repository at this point in the history
Remove asgs
  • Loading branch information
PhillipsOwen authored Feb 2, 2024
2 parents 63c8e29 + bf688d6 commit beba89f
Show file tree
Hide file tree
Showing 19 changed files with 96 additions and 1,468 deletions.
74 changes: 30 additions & 44 deletions src/common/pg_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ def __init__(self, db_names: tuple, _logger=None, _auto_commit=True):
# init the base class
PGUtilsMultiConnect.__init__(self, 'APSViz.Msg-Handler', db_names, _logger=self.logger, _auto_commit=_auto_commit)

# load the ASGS constants into memory
self.asgs_constants = self.build_asgs_constants()
# load the legacy constants into memory
self.legacy_constants = self.build_constants()

def __del__(self):
"""
Expand All @@ -57,21 +57,21 @@ def __del__(self):
# clean up connections and cursors
PGUtilsMultiConnect.__del__(self)

def build_asgs_constants(self) -> dict:
def build_constants(self) -> dict:
"""
builds the in-memory data for asgs constants.
builds the in-memory data for legacy constants.
:return:
"""
# create a list of target lu tables
lu_tables = ['ASGS_Mon_site_lu', 'ASGS_Mon_event_type_lu', 'ASGS_Mon_state_type_lu', 'ASGS_Mon_instance_state_type_lu']
lu_tables = ['site_lu', 'event_type_lu', 'state_type_lu', 'instance_state_type_lu']

# init the lu_data storage
lu_data: dict = {}

# make the call to get the data
for lu_item in lu_tables:
lu_data.update({lu_item.removeprefix('ASGS_Mon_').removesuffix('_lu'): self.get_lu_items(lu_item)})
lu_data.update({lu_item.removeprefix('').removesuffix('_lu'): self.get_lu_items(lu_item)})

# add in the pct_complete items
lu_data.update(
Expand All @@ -94,7 +94,7 @@ def get_lu_items(self, lu_name: str):
sql_stmt = f"SELECT * FROM public.get_lu_items(lu_name := '{lu_name}')"

# get the data
lu_data = self.exec_sql('asgs', sql_stmt)
lu_data = self.exec_sql('apsviz', sql_stmt)

# check the return
if lu_data != -1:
Expand Down Expand Up @@ -139,7 +139,7 @@ def get_lu_id(self, element_name, lu_name, context: str = 'unknown'):
:return:
"""
# get the ID
ret_id = self.asgs_constants[lu_name].get(element_name, -1)
ret_id = self.legacy_constants[lu_name].get(element_name, -1)

# did we find something
if ret_id >= 0:
Expand All @@ -152,12 +152,12 @@ def get_lu_id(self, element_name, lu_name, context: str = 'unknown'):

def get_site_ids(self) -> list:
"""
gets the list of site ids for the ASGS run properties message handler
gets the list of site ids for the run properties message handler
:return:
"""
# return the list of ids
return [value for key, value in self.asgs_constants['site'].items()]
return [value for key, value in self.legacy_constants['site'].items()]

def get_existing_event_group_id(self, instance_id, advisory_id, context: str = 'unknown'):
"""
Expand All @@ -172,9 +172,9 @@ def get_existing_event_group_id(self, instance_id, advisory_id, context: str = '

# see if there are any event groups yet that have this instance_id
# this could be caused by a new install that does not have any data in the DB yet
sql_stmt = f"SELECT id FROM \"ASGS_Mon_event_group\" WHERE instance_id={instance_id} AND advisory_id='{advisory_id}' ORDER BY id DESC"
sql_stmt = f"SELECT id FROM \"event_group\" WHERE instance_id={instance_id} AND advisory_id='{advisory_id}' ORDER BY id DESC"

group = self.exec_sql('asgs', sql_stmt)
group = self.exec_sql('apsviz', sql_stmt)

if group > 0:
existing_group_id = group
Expand Down Expand Up @@ -204,14 +204,14 @@ def get_existing_instance_id(self, site_id, msg_obj):

# see if there are any instances yet that have this site_id and instance_name
# this could be caused by a new install that does not have any data in the DB yet
sql_stmt = f"SELECT id FROM \"ASGS_Mon_instance\" WHERE site_id={site_id} AND process_id={process_id} AND instance_name='{instance_name}' " \
sql_stmt = f"SELECT id FROM \"instance\" WHERE site_id={site_id} AND process_id={process_id} AND instance_name='{instance_name}' " \
f"AND inst_state_type_id!=9"

# +++++++++++++++FIX THIS++++++++++++++++++++Add query to get correct stat id for Defunct++++++++++++++++++++++++
# +++++++++++++++FIX THIS++++++++++++++++++++Add day to query too? (to account for rollover of process ids)++++++++++++++++++++++++

# get the instance id if it exists
inst = self.exec_sql('asgs', sql_stmt)
inst = self.exec_sql('apsviz', sql_stmt)

# any int > 0 is a valid instance id
if inst > 0:
Expand Down Expand Up @@ -241,10 +241,10 @@ def get_instance_id(self, start_ts, site_id, process_id, instance_name):
self.logger.debug("start_ts: %s, site_id: %s, process_id: %s, instance_name: %s", start_ts, site_id, process_id, instance_name)

# build up the sql statement to
sql_stmt = f"SELECT id FROM \"ASGS_Mon_instance\" WHERE CAST(start_ts as DATE)='{start_ts[:10]}' AND site_id={site_id} AND " \
sql_stmt = f"SELECT id FROM \"instance\" WHERE CAST(start_ts as DATE)='{start_ts[:10]}' AND site_id={site_id} AND " \
f"process_id={process_id} AND instance_name='{instance_name}'"

inst = self.exec_sql('asgs', sql_stmt)
inst = self.exec_sql('apsviz', sql_stmt)

if inst is not None:
_id = inst
Expand All @@ -271,10 +271,10 @@ def update_event_group(self, state_id, event_group_id, msg_obj):
advisory_id = msg_obj.get("advisory_number", "N/A") if (msg_obj.get("advisory_number", "N/A") != "") else "N/A"

# build up the sql statement to update the event group
sql_stmt = f"UPDATE \"ASGS_Mon_event_group\" SET state_type_id ={state_id}, storm_name='{storm_name}', advisory_id='{advisory_id}' " \
sql_stmt = f"UPDATE \"event_group\" SET state_type_id ={state_id}, storm_name='{storm_name}', advisory_id='{advisory_id}' " \
f"WHERE id={event_group_id} RETURNING 1"

self.exec_sql('asgs', sql_stmt)
self.exec_sql('apsviz', sql_stmt)

def update_instance(self, state_id, site_id, instance_id, msg_obj):
"""
Expand All @@ -296,24 +296,10 @@ def update_instance(self, state_id, site_id, instance_id, msg_obj):
run_params = msg_obj.get("run_params", "N/A") if (msg_obj.get("run_params", "N/A") != "") else "N/A"

# build up the sql statement to update the instance
sql_stmt = f"UPDATE \"ASGS_Mon_instance\" SET inst_state_type_id = {state_id}, end_ts = '{end_ts}', run_params = '{run_params}' " \
sql_stmt = f"UPDATE \"instance\" SET inst_state_type_id = {state_id}, end_ts = '{end_ts}', run_params = '{run_params}' " \
f"WHERE site_id = {site_id} AND id={instance_id} RETURNING 1"

self.exec_sql('asgs', sql_stmt)

def save_raw_msg(self, msg):
"""
saves the raw message
:param msg:
:return:
"""
self.logger.debug("msg: %s", msg)

# build up the sql statement to insert the json data
sql_stmt = f"INSERT INTO \"ASGS_Mon_json\" (data) VALUES ('{msg}') RETURNING 1"

self.exec_sql('asgs', sql_stmt)
self.exec_sql('apsviz', sql_stmt)

def insert_event(self, site_id, event_group_id, event_type_id, msg_obj, context: str = 'unknown'):
"""
Expand Down Expand Up @@ -356,11 +342,11 @@ def insert_event(self, site_id, event_group_id, event_type_id, msg_obj, context:
msg_line = ''

# create the fields
sql_stmt = 'INSERT INTO "ASGS_Mon_event" (site_id, event_group_id, event_type_id, event_ts, advisory_id, pct_complete, sub_pct_complete, ' \
sql_stmt = 'INSERT INTO "event" (site_id, event_group_id, event_type_id, event_ts, advisory_id, pct_complete, sub_pct_complete, ' \
f"process{raw_data_col}) VALUES ({site_id}, {event_group_id}, {event_type_id}, '{event_ts}', '{advisory_id}', {pct_complete}, " \
f"{sub_pct_complete}, '{process}'{msg_line}) RETURNING 1"

self.exec_sql('asgs', sql_stmt)
self.exec_sql('apsviz', sql_stmt)

def insert_event_group(self, state_id, instance_id, msg_obj, context: str = 'unknown'):
"""
Expand All @@ -387,12 +373,12 @@ def insert_event_group(self, state_id, instance_id, msg_obj, context: str = 'unk
advisory_id = msg_obj.get("advisory_number", "N/A") if (msg_obj.get("advisory_number", "N/A") != "") else "N/A"

# build up the sql statement to insert the event
sql_stmt = 'INSERT INTO "ASGS_Mon_event_group" (state_type_id, instance_id, event_group_ts, storm_name, storm_number, advisory_id, ' \
sql_stmt = 'INSERT INTO "event_group" (state_type_id, instance_id, event_group_ts, storm_name, storm_number, advisory_id, ' \
f"final_product) VALUES ({state_id}, {instance_id}, '{event_group_ts}', '{storm_name}', '{storm_number}', '{advisory_id}'" \
f", 'product') RETURNING id"

# get the new event group id
group = self.exec_sql('asgs', sql_stmt)
group = self.exec_sql('apsviz', sql_stmt)

self.logger.debug("group: %s, context: %s", group, context)

Expand Down Expand Up @@ -430,11 +416,11 @@ def insert_instance(self, state_id, site_id, msg_obj, context: str = 'unknown'):
# if (instance_id < 0):

# build up the sql statement to insert the run instance
sql_stmt = f"INSERT INTO \"ASGS_Mon_instance\" (site_id, process_id, start_ts, end_ts, run_params, instance_name, inst_state_type_id) " \
sql_stmt = f"INSERT INTO \"instance\" (site_id, process_id, start_ts, end_ts, run_params, instance_name, inst_state_type_id) " \
f"VALUES ({site_id}, {process_id}, '{start_ts}', '{end_ts}', '{run_params}', '{instance_name}', {state_id}) RETURNING id"

# insert the record and the new instance id
instance_id = self.exec_sql('asgs', sql_stmt)
instance_id = self.exec_sql('apsviz', sql_stmt)

self.logger.debug("instance_id: %s, context: %s", instance_id, context)

Expand Down Expand Up @@ -477,26 +463,26 @@ def insert_config_items(self, instance_id: int, params: dict, suffix: str = ''):
self.logger.debug("uid: %s", uid)

# build up the sql to remove old entries
sql_stmt = f"DELETE FROM public.\"ASGS_Mon_config_item\" WHERE instance_id = {instance_id} AND uid = '{uid}' RETURNING 1"
sql_stmt = f"DELETE FROM public.\"config_item\" WHERE instance_id = {instance_id} AND uid = '{uid}' RETURNING 1"

self.logger.debug("sql_stmt: %s", sql_stmt)

# remove all duplicate records that may already exist
self.exec_sql('asgs', sql_stmt)
self.exec_sql('apsviz', sql_stmt)

# get the list of values
values_list = [f"({instance_id}, '{uid}', '{k}', '{v}')" for (k, v) in params.items()]

# create a massive insert statement
sql_stmt = f"INSERT INTO public.\"ASGS_Mon_config_item\" (instance_id, uid, key, value) VALUES {','.join(values_list)}"
sql_stmt = f"INSERT INTO public.\"config_item\" (instance_id, uid, key, value) VALUES {','.join(values_list)}"

# insure this call returns a value
sql_stmt += " RETURNING 1"

self.logger.debug("sql_stmt: %s", sql_stmt)

# execute the sql
self.exec_sql('asgs', sql_stmt)
self.exec_sql('apsviz', sql_stmt)

except Exception:
ret_msg = "Exception inserting config items"
Expand Down
Loading

0 comments on commit beba89f

Please sign in to comment.