Skip to content

Commit

Permalink
Added duplicate data test + log message + returning inserted entries …
Browse files Browse the repository at this point in the history
…count
  • Loading branch information
Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Aug 2, 2024
1 parent 2097ff4 commit 103537a
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 5 deletions.
2 changes: 2 additions & 0 deletions bin/debug/load_multi_timeline_for_range.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ def load_multi_timeline_for_range(file_prefix, info_only=None, verbose=None, con

all_user_list = []
all_rerun_list = []
(tsdb_count, ucdb_count) = (0,0)

for i, filename in enumerate(sel_file_list):
if "pipelinestate" in filename:
Expand Down Expand Up @@ -154,6 +155,7 @@ def load_multi_timeline_for_range(file_prefix, info_only=None, verbose=None, con
register_fake_users(prefix, unique_user_list, verbose)

post_check(unique_user_list, all_rerun_list)
return (tsdb_count, ucdb_count)

if __name__ == '__main__':
parser = argparse.ArgumentParser()
Expand Down
6 changes: 6 additions & 0 deletions emission/storage/timeseries/cache_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from future import standard_library
standard_library.install_aliases()
from builtins import *
import logging

import emission.core.get_database as edb
import emission.net.usercache.abstract_usercache as enua
Expand Down Expand Up @@ -65,5 +66,10 @@ def insert_entries(uuid, entry_it, continue_on_error):
except pymongo.errors.DuplicateKeyError as e:
if not continue_on_error:
raise(e)
else:
if "write_fmt_time" in entry["metadata"]:
logging.info("ignoring duplicate key error while restoring timeseries entries")
else:
logging.info("ignoring duplicate key error while restoring usercache entries")

return (tsdb_count, ucdb_count)
17 changes: 12 additions & 5 deletions emission/tests/exportTests/TestPurgeRestoreModule.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@
standard_library.install_aliases()
from builtins import *
import os
from os import path
import tempfile
import unittest
import json
import bson.json_util as bju
import pathlib as pl
import emission.storage.timeseries.abstract_timeseries as esta
import gzip
Expand All @@ -19,6 +16,8 @@
import emission.purge_restore.purge_data as eprpd
import bin.debug.load_multi_timeline_for_range as lmtfr
import logging
import gzip
import emission.storage.json_wrappers as esj

class TestPurgeRestoreModule(unittest.TestCase):
def setUp(self):
Expand Down Expand Up @@ -78,7 +77,7 @@ def testPurgeRestoreModule(self):
pdp.delete_timeseries_entries(self.testUUID, ts, time_query['startTs'], time_query['endTs'], export_queries)

# Check how much data there is after
res = res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID})
res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID})
logging.info(f"Purging complete: {res} entries remaining")
self.assertEqual(res, 0)

Expand All @@ -90,10 +89,18 @@ def testPurgeRestoreModule(self):
lmtfr.load_multi_timeline_for_range(file_prefix=file_name, continue_on_error=True)

# Check how much data there is after
res = res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID})
res = edb.get_timeseries_db().count_documents({"user_id" : self.testUUID})
logging.info(f"Restoring complete: {res} entries restored")
self.assertEqual(res, 1906)

'''
Test 4 - Verify that restoring timeseries data fails if data already exists
Duplicate key error is ignored hence no entries should be inserted
'''
logging.info("Attempting to load duplicate data...")
(tsdb_count, ucdb_count) = lmtfr.load_multi_timeline_for_range(file_prefix=file_name, continue_on_error=True)
self.assertEqual(tsdb_count, 0)

def testPurgeRestorePipeline(self):
file_name = epp.run_purge_pipeline_for_user(self.testUUID, os.environ.get('DATA_DIR', 'emission/archived'))
epr.run_restore_pipeline_for_user(self.testUUID, file_name)
Expand Down

0 comments on commit 103537a

Please sign in to comment.