diff --git a/bin/purge_user_timeseries.py b/bin/purge_user_timeseries.py new file mode 100644 index 000000000..e88968ae2 --- /dev/null +++ b/bin/purge_user_timeseries.py @@ -0,0 +1,68 @@ +import logging +import argparse +import uuid +from datetime import datetime +import emission.core.wrapper.user as ecwu +import emission.core.get_database as edb +import emission.core.wrapper.pipelinestate as ecwp +import emission.core.wrapper.pipelinestate as ecwp +import emission.storage.pipeline_queries as esp +import pandas as pd + + +DEFAULT_DIR_NAME = "/tmp" +DEFAULT_FILE_PREFIX = "old_timeseries_" + +def exportOldTimeseriesAsCsv(user_id, last_ts_run, dir_name, file_prefix): + filename = dir_name + "/" + file_prefix + str(user_id) + ".csv" + all_data = list(edb.get_timeseries_db().find({"user_id": user_id, "metadata.write_ts": { "$lt": last_ts_run}})) + all_df = pd.json_normalize(all_data) + all_df.to_csv(filename) + logging.info("Old timeseries data exported to {}".format(filename)) + +def purgeUserTimeseries(user_uuid, user_email=None, dir_name=DEFAULT_DIR_NAME, file_prefix=DEFAULT_FILE_PREFIX, unsafe_ignore_save=False): + if user_uuid: + user_id = uuid.UUID(user_uuid) + else: + user_id = ecwu.User.fromEmail(user_email).uuid + + cstate = esp.get_current_state(user_id, ecwp.PipelineStages.CREATE_CONFIRMED_OBJECTS) + last_ts_run = cstate['last_ts_run'] + + if not last_ts_run: + logging.warning("No processed timeserie for user {}".format(user_id)) + exit(1) + + if unsafe_ignore_save is True: + logging.warning("CSV export was ignored") + else: + exportOldTimeseriesAsCsv(user_id, last_ts_run, dir_name, file_prefix) + + res = edb.get_timeseries_db().delete_many({"user_id": user_id, "metadata.write_ts": { "$lt": last_ts_run}}) + logging.info("{} deleted entries since {}".format(res.deleted_count, datetime.fromtimestamp(last_ts_run))) + +if __name__ == '__main__': + logging.basicConfig(level=logging.DEBUG) + + parser = argparse.ArgumentParser(prog="purge_user_timeseries") + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument("-e", "--user_email") + group.add_argument("-u", "--user_uuid") + parser.add_argument( + "-d", "--dir_name", + help="Target directory for exported csv data (defaults to {})".format(DEFAULT_DIR_NAME), + default=DEFAULT_DIR_NAME + ) + parser.add_argument( + "--file_prefix", + help="File prefix for exported csv data (defaults to {})".format(DEFAULT_FILE_PREFIX), + default=DEFAULT_FILE_PREFIX + ) + parser.add_argument( + "--unsafe_ignore_save", + help="Ignore csv export of deleted data (not recommended, this operation is definitive)", + action='store_true' + ) + + args = parser.parse_args() + purgeUserTimeseries(args.user_uuid, args.user_email, args.dir_name, args.file_prefix, args.unsafe_ignore_save) \ No newline at end of file diff --git a/emission/tests/binTests/TestPurgeUserTimeseries.py b/emission/tests/binTests/TestPurgeUserTimeseries.py new file mode 100644 index 000000000..bb393a0bb --- /dev/null +++ b/emission/tests/binTests/TestPurgeUserTimeseries.py @@ -0,0 +1,47 @@ +from future import standard_library +standard_library.install_aliases() +from builtins import * +import os +import tempfile +import unittest +import emission.tests.common as etc +import emission.core.get_database as edb +import emission.storage.pipeline_queries as esp +import emission.core.wrapper.pipelinestate as ecwp +from bin.purge_user_timeseries import purgeUserTimeseries + + +class TestPurgeUserTimeseries(unittest.TestCase): + def setUp(self): + etc.setupRealExample(self, "emission/tests/data/real_examples/shankari_2015-07-22") + etc.runIntakePipeline(self.testUUID) + + def testPurgeUserTimeseries(self): + with tempfile.TemporaryDirectory(dir='/tmp') as tmpdirname: + cstate = esp.get_current_state(self.testUUID, ecwp.PipelineStages.CREATE_CONFIRMED_OBJECTS) + last_ts_run = cstate['last_ts_run'] + self.assertTrue(last_ts_run > 0) + + # Check how much data there was before + res = edb.get_timeseries_db().count_documents({"user_id": self.testUUID, "metadata.write_ts": { "$lt": last_ts_run}}) + self.assertEqual(res, 1906) + + # Run the purge function + file_prefix = "some_fancy_prefix_" + purgeUserTimeseries(str(self.testUUID), dir_name=tmpdirname, file_prefix=file_prefix) + + # Check how much data there is after + res = edb.get_timeseries_db().count_documents({"user_id": self.testUUID, "metadata.write_ts": { "$lt": last_ts_run}}) + self.assertEqual(res, 0) + + # Check that data was properly saved (1906 lines of data + 1 line of header) + with open(tmpdirname + "/" + file_prefix + str(self.testUUID) + ".csv", 'r') as f: + self.assertTrue(f.readlines(), 1907) + + def tearDown(self): + etc.dropAllCollections(edb._get_current_db()) + + +if __name__ == '__main__': + etc.configLogging() + unittest.main()