Skip to content

Commit

Permalink
Merge remote-tracking branch 'ttalex/purge-timeseries' into purge-res…
Browse files Browse the repository at this point in the history
…tore-timeseries
  • Loading branch information
Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Jan 8, 2024
2 parents d20011b + 6ea8ac5 commit 0d0a0ba
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 0 deletions.
68 changes: 68 additions & 0 deletions bin/purge_user_timeseries.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import logging
import argparse
import uuid
from datetime import datetime
import emission.core.wrapper.user as ecwu
import emission.core.get_database as edb
import emission.core.wrapper.pipelinestate as ecwp
import emission.core.wrapper.pipelinestate as ecwp
import emission.storage.pipeline_queries as esp
import pandas as pd


DEFAULT_DIR_NAME = "/tmp"
DEFAULT_FILE_PREFIX = "old_timeseries_"

def exportOldTimeseriesAsCsv(user_id, last_ts_run, dir_name, file_prefix):
filename = dir_name + "/" + file_prefix + str(user_id) + ".csv"
all_data = list(edb.get_timeseries_db().find({"user_id": user_id, "metadata.write_ts": { "$lt": last_ts_run}}))
all_df = pd.json_normalize(all_data)
all_df.to_csv(filename)
logging.info("Old timeseries data exported to {}".format(filename))

def purgeUserTimeseries(user_uuid, user_email=None, dir_name=DEFAULT_DIR_NAME, file_prefix=DEFAULT_FILE_PREFIX, unsafe_ignore_save=False):
if user_uuid:
user_id = uuid.UUID(user_uuid)
else:
user_id = ecwu.User.fromEmail(user_email).uuid

cstate = esp.get_current_state(user_id, ecwp.PipelineStages.CREATE_CONFIRMED_OBJECTS)
last_ts_run = cstate['last_ts_run']

if not last_ts_run:
logging.warning("No processed timeserie for user {}".format(user_id))
exit(1)

if unsafe_ignore_save is True:
logging.warning("CSV export was ignored")
else:
exportOldTimeseriesAsCsv(user_id, last_ts_run, dir_name, file_prefix)

res = edb.get_timeseries_db().delete_many({"user_id": user_id, "metadata.write_ts": { "$lt": last_ts_run}})
logging.info("{} deleted entries since {}".format(res.deleted_count, datetime.fromtimestamp(last_ts_run)))

if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)

parser = argparse.ArgumentParser(prog="purge_user_timeseries")
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("-e", "--user_email")
group.add_argument("-u", "--user_uuid")
parser.add_argument(
"-d", "--dir_name",
help="Target directory for exported csv data (defaults to {})".format(DEFAULT_DIR_NAME),
default=DEFAULT_DIR_NAME
)
parser.add_argument(
"--file_prefix",
help="File prefix for exported csv data (defaults to {})".format(DEFAULT_FILE_PREFIX),
default=DEFAULT_FILE_PREFIX
)
parser.add_argument(
"--unsafe_ignore_save",
help="Ignore csv export of deleted data (not recommended, this operation is definitive)",
action='store_true'
)

args = parser.parse_args()
purgeUserTimeseries(args.user_uuid, args.user_email, args.dir_name, args.file_prefix, args.unsafe_ignore_save)
47 changes: 47 additions & 0 deletions emission/tests/binTests/TestPurgeUserTimeseries.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from future import standard_library
standard_library.install_aliases()
from builtins import *
import os
import tempfile
import unittest
import emission.tests.common as etc
import emission.core.get_database as edb
import emission.storage.pipeline_queries as esp
import emission.core.wrapper.pipelinestate as ecwp
from bin.purge_user_timeseries import purgeUserTimeseries


class TestPurgeUserTimeseries(unittest.TestCase):
def setUp(self):
etc.setupRealExample(self, "emission/tests/data/real_examples/shankari_2015-07-22")
etc.runIntakePipeline(self.testUUID)

def testPurgeUserTimeseries(self):
with tempfile.TemporaryDirectory(dir='/tmp') as tmpdirname:
cstate = esp.get_current_state(self.testUUID, ecwp.PipelineStages.CREATE_CONFIRMED_OBJECTS)
last_ts_run = cstate['last_ts_run']
self.assertTrue(last_ts_run > 0)

# Check how much data there was before
res = edb.get_timeseries_db().count_documents({"user_id": self.testUUID, "metadata.write_ts": { "$lt": last_ts_run}})
self.assertEqual(res, 1906)

# Run the purge function
file_prefix = "some_fancy_prefix_"
purgeUserTimeseries(str(self.testUUID), dir_name=tmpdirname, file_prefix=file_prefix)

# Check how much data there is after
res = edb.get_timeseries_db().count_documents({"user_id": self.testUUID, "metadata.write_ts": { "$lt": last_ts_run}})
self.assertEqual(res, 0)

# Check that data was properly saved (1906 lines of data + 1 line of header)
with open(tmpdirname + "/" + file_prefix + str(self.testUUID) + ".csv", 'r') as f:
self.assertTrue(f.readlines(), 1907)

def tearDown(self):
etc.dropAllCollections(edb._get_current_db())


if __name__ == '__main__':
etc.configLogging()
unittest.main()

0 comments on commit 0d0a0ba

Please sign in to comment.