Skip to content

Commit

Permalink
Shortened core logic + Added tests to check file contents
Browse files Browse the repository at this point in the history
Will clean up and add more tests.

Looks good for now.
Need to update PR with queries now.
  • Loading branch information
Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Aug 31, 2024
1 parent 02fb2ce commit c38b82d
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 87 deletions.
123 changes: 41 additions & 82 deletions emission/purge_restore/purge_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,32 +48,21 @@ def run_purge_data_pipeline(self, user_id, archive_dir, export_type):
if os.path.isdir(archive_dir) == False:
os.mkdir(archive_dir)

initStartTs = time_query.startTs
initEndTs = time_query.endTs
print("Inside: purge_data - Start time: %s" % initStartTs)
print("Inside: purge_data - End time: %s" % initEndTs)
init_start_ts, init_end_ts = time_query.startTs, time_query.endTs
logging.info(f"Purge data - Start time: {init_start_ts}, End time: {init_end_ts}")

file_names = []
entries_to_export = self.get_export_timeseries_entries(user_id, ts, time_query.startTs, time_query.endTs)
count_entries = len(entries_to_export)
# count_entries = len(entries_to_export)

# If running the pipeline PURGE stage for first time, choose the first timestamp from the timeseries as the starting point
# Otherwise cannot add 1 hour (3600 seconds) to a NoneType value if incremental option is selected
current_start_ts = initStartTs if initStartTs is not None else entries_to_export[0]['data']['ts']
current_start_ts = init_start_ts if init_start_ts is not None else entries_to_export[0]['data']['ts']

# while current_start_ts < initEndTs:
while True:
while current_start_ts < init_end_ts:
# while True:
print("Inside while loop: current_start_ts = %s" % current_start_ts)
current_end_ts = min(current_start_ts + 3600, initEndTs) if export_type == 'incremental' else initEndTs

if export_type == 'incremental':
current_end_ts = min(current_start_ts + 3600, initEndTs)
print("Inside export_type incremental, increasing current_end_ts: %s" % current_end_ts)
elif export_type == 'full':
current_end_ts = initEndTs
print("Inside export_type full, setting current_end_ts to current time: %s" % current_end_ts)
else:
raise ValueError("Unknown export_type %s" % export_type)
current_end_ts = min(current_start_ts + 3600, init_end_ts) if export_type == 'incremental' else init_end_ts

print(f"Processing data from {current_start_ts} to {current_end_ts}")

Expand All @@ -84,74 +73,44 @@ def run_purge_data_pipeline(self, user_id, archive_dir, export_type):
entries_to_export_1 = self.get_export_timeseries_entries(user_id, ts, time_query.startTs, time_query.endTs)
count_entries_1 = len(entries_to_export_1)

if export_queries is None and count_entries_1 > 0:
print("No entries found in current time range from %s to %s" % (current_start_ts, current_end_ts))
print("Incrementing time range by 1 hour")
current_start_ts = current_end_ts
continue
# if count_entries_2 == 0 and count_entries_1 == 0:
elif export_queries is None and count_entries_1 == 0:
# Didn't process anything new so start at the same point next time
# self._last_processed_ts = None
logging.debug("No new data to export, breaking out of while loop")
print("No new data to export, breaking out of while loop")
break

entries_to_export_2 = self.get_export_timeseries_entries(user_id, ts, current_start_ts, current_end_ts)
count_entries_2 = len(entries_to_export_2)
print("count_entries_2 = %s" % count_entries_2)


logging.debug("Exporting to file: %s" % file_name)
print("Exporting to file: %s" % file_name)
file_names.append(file_name)
print("File names: %s" % file_names)

self.delete_timeseries_entries(user_id, ts, current_start_ts, current_end_ts)

print("Total entries to export: %s" % count_entries)
print("Entries exported in timerange %s to %s: %s" % (current_start_ts, current_end_ts, count_entries_2))
print("New count entries to export: %s" % count_entries_1)
self._last_processed_ts = entries_to_export_2[-1]['data']['ts']
print("Updated last_processed_ts %s" % self._last_processed_ts)
if export_queries:
logging.debug("Exporting to file: %s" % file_name)
print("Exporting to file: %s" % file_name)
file_names.append(file_name)
print("File names: %s" % file_names)

self.delete_timeseries_entries(user_id, ts, current_start_ts, current_end_ts)

entries = self.get_export_timeseries_entries(user_id, ts, current_start_ts, current_end_ts)

if entries:
self.last_processed_ts = entries[-1]['data']['ts']
logging.debug(f"Exported {len(entries)} entries from {current_start_ts} to {current_end_ts}")
logging.debug("Entries exported in timerange %s to %s: %s" % (current_start_ts, current_end_ts, len(entries)))
logging.debug("Remaining entries to export: %s" % count_entries_1)
logging.debug("Updated last_processed_ts %s" % self._last_processed_ts)
print(f"Exported {len(entries)} entries from {current_start_ts} to {current_end_ts}")
print("Entries exported in timerange %s to %s: %s" % (current_start_ts, current_end_ts, len(entries)))
print("Remaining entries to export: %s" % count_entries_1)
print("Updated last_processed_ts %s" % self._last_processed_ts)
else:
if count_entries_1 > 0:
print("No entries found in current time range from %s to %s" % (current_start_ts, current_end_ts))
print("Incrementing time range by 1 hour")
else:
logging.info(f"No entries found from {current_start_ts} to {current_end_ts}")
logging.debug("No new data to export, breaking out of while loop")
print("No new data to export, breaking out of while loop")
break

current_start_ts = current_end_ts
if current_start_ts >= initEndTs:
if current_start_ts >= init_end_ts:
break

print("Exported data to %s files" % len(file_names))
print("Exported file names: %s" % file_names)
print(f"Exported data to {len(file_names)} files")
print(f"Exported file names: {file_names}")
return file_names

# new_entries = self.get_export_timeseries_entries(user_id, ts, current_start_ts, current_end_ts)

# if new_entries:
# self._last_processed_ts = new_entries[-1]['data']['ts']
# print(f"Updated last_processed_ts {self._last_processed_ts}")

# if current_end_ts >= initEndTs:
# file_name = archive_dir + "/archive_%s_%s_%s" % (user_id, current_start_ts, current_end_ts)
# print(f"Exporting entries from {current_start_ts} to {current_end_ts} to file: {file_name}")
# epret.export(user_id, ts, current_start_ts, current_end_ts, file_name)
# file_names.append(file_name)
# self.delete_timeseries_entries(user_id, ts, current_start_ts, current_end_ts)

# current_start_ts = current_end_ts

# else:
# remaining_entries = self.get_export_timeseries_entries(user_id, ts, initStartTs, initEndTs)
# if not remaining_entries:
# print("No new data to export, breaking out of while loop")
# break
# else:
# print(f"No entries found in current time range from {current_start_ts} to {current_end_ts}")
# print("Incrementing time range")
# current_start_ts = current_end_ts

# print(f"Exported data to {len(file_names)} files")
# print(f"Exported file names: {file_names}")
# return file_names

# def export_pipeline_states(self, user_id, file_name):
# pipeline_state_list = list(edb.get_pipeline_state_db().find({"user_id": user_id}))
# logging.info("Found %d pipeline states %s" %
Expand Down Expand Up @@ -195,8 +154,8 @@ def get_export_timeseries_entries(self, user_id, ts, start_ts_datetime, end_ts_d

def get_export_queries(self, start_ts, end_ts):
export_queries = {
# 'trip_time_query': estt.TimeQuery("data.start_ts", initStartTs, initEndTs),
# 'place_time_query': estt.TimeQuery("data.enter_ts", initStartTs, initEndTs),
# 'trip_time_query': estt.TimeQuery("data.start_ts", init_start_ts, init_end_ts),
# 'place_time_query': estt.TimeQuery("data.enter_ts", init_start_ts, init_end_ts),
'loc_time_query': estt.TimeQuery("data.ts", start_ts, end_ts)
}
return export_queries
Expand Down
21 changes: 16 additions & 5 deletions emission/tests/exportTests/TestPurgeRestoreModule.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,16 @@ def testPurgeRestorePipelineFull(self):
print("Exported file names: %s" % file_names)

'''
Test 2 - Assert the file exists after the export process
Test 2 - Assert the file exists after the export process and checking contents
'''
self.assertTrue(pl.Path(file_names[0] + ".gz").is_file())
# with gzip.open(file_names[0] + ".gz", 'r') as ef:
# exported_data = json.loads(ef.read().decode('utf-8'))
with gzip.open(file_names[0] + ".gz", 'r') as ef:
exported_data = json.loads(ef.read().decode('utf-8'))
self.assertEqual(len(exported_data), 1906)

first_few_objectIds = ['564e73d388f663199aabf0d2', '55afb7c67d65cb39ee976598', '55afb7c67d65cb39ee976599', '55b08d327d65cb39ee9769e1', '55afb7c67d65cb39ee97659a']
for entry in exported_data[0:5]:
self.assertIn(entry.get('_id').get('$oid'), first_few_objectIds)

'''
Test 3 - Verify that purging timeseries data works with sample real data
Expand Down Expand Up @@ -180,10 +185,16 @@ def testPurgeRestorePipelineIncremental(self):
'''
Test 2 - Assert the file exists after the export process
'''
exported_data = []
for file_name in file_names:
self.assertTrue(pl.Path(file_name + ".gz").is_file())
# with gzip.open(file_name + ".gz", 'r') as ef:
# exported_data = json.loads(ef.read().decode('utf-8'))
with gzip.open(file_name + ".gz", 'r') as ef:
exported_data.extend(json.loads(ef.read().decode('utf-8')))
self.assertEqual(len(exported_data), 1906)

last_few_objectIds = ['55b08d3e7d65cb39ee976def', '55b08d3e7d65cb39ee976df0', '55b08d3e7d65cb39ee976df1', '55b08e907d65cb39ee976e06', '55b08e907d65cb39ee976e07']
for entry in exported_data[-5:]:
self.assertIn(entry.get('_id').get('$oid'), last_few_objectIds)

'''
Test 3 - Verify that purging timeseries data works with sample real data
Expand Down

0 comments on commit c38b82d

Please sign in to comment.