From 911e1ec89c9a23153617a709faf732ad2d28ce7a Mon Sep 17 00:00:00 2001 From: Mukul Chandrakant Mahadik Date: Fri, 9 Feb 2024 17:34:00 -0700 Subject: [PATCH] Store limited number of models and clear older ones (#948) * Added details in logging statements Completed marked future fixes for PR-944 Provided more detailed info regarding number of distinct users, as well as number of predictions made for the trip_list. * Implemented logic to clear models Clearing or trimming number of models for a particular user from the Stage_updateable_models collection. Constant K_MODELS_COUNT defines the number of models to keep. Currently tested with model data available in stage dataset snapshot. Pending - Complete testing with assertions to be added for testing. * Added function call Called the trim_model_entries function in upsert_model to delete models before inserting new ones. * Squashed commit of the following: commit 54659fb07423c27f6a0ff433b4d0f347986ede48 Merge: cf0c9e24 1159eacf Author: K. Shankari Date: Thu Dec 21 20:17:15 2023 -0800 Merge pull request #951 from MukuFlash03/fix-vuln Bulk deletion of site-package/tests commit 1159eacf2ff77fb2031c2e4fba26ebbc56422d59 Author: Mahadik, Mukul Chandrakant Date: Thu Dec 21 20:43:39 2023 -0700 Bulk deletion of site-package/tests Added a one line pipe command to remove all occurrences of site-packages/tests occurring in miniconda main directory. commit cf0c9e246b3dfa079fbe07d3da9d16906400329f Merge: d2f38bc1 3be27579 Author: K. Shankari Date: Thu Dec 21 17:47:27 2023 -0800 Merge pull request #950 from MukuFlash03/fix-vuln Remove obsolete package versions commit 3be27579e664a25f58538f0ebaf702f772eb98e6 Author: Mahadik, Mukul Chandrakant Date: Thu Dec 21 18:05:23 2023 -0700 Remove obsolete package versions Cleaned up older versions for two packages: urllib3 - deleted stale version folders python - deleted tests folder commit d2f38bc18d5c415888451e7ad98d40325a74c999 Merge: 978a7199 c1b0889b Author: K. Shankari Date: Wed Dec 20 14:31:09 2023 -0800 Merge pull request #949 from MukuFlash03/fix-vuln Fixing latest Docker image vulnerabilities commit c1b0889b50a3e2da8cc799f914010575c3f13326 Author: Mahadik, Mukul Chandrakant Date: Mon Dec 18 11:04:25 2023 -0700 Upgraded Ubuntu base image Latest Ubuntu base image was just released officially by Docker which contains updated version of libc6 and libc-bin. commit 07747d0669c986c537310995d0552b5c117f6ebb Author: Mahadik, Mukul Chandrakant Date: Fri Dec 15 18:38:12 2023 -0700 Fixing latest Docker image vulnerabilities AWS Inspector found the following vulnerable packages: CRITICAL perl HIGH nghttp2, libnghttp2-14 cryptography, libssl3 cryptography libc6, libc-bin Upgraded perl, libssl3, nghttp2 packages by upgrading base Ubuntu image to latest of the same LTS version - jammy (22.04). Cryptography package was fixed by mentioning required version to be installed using conda. Libc6, Libc-bin can be fixed by using apt-get upgrade but this upgrades all packages which is not recommended as a blanket upgrade fix. * Updated Test file with dummy models Was using pre-built models for testing that were present in stage_snapshot dataset but these won't be available in the normal dataset. Hence, took reference from emission.tests.modellingTests.TestRunGreedyModel.py for creating test models using dummy trip data. Creating multiple such models and then checking for trimming operation success. * Reverted commit 31626bae + Removed log statements Commit 31626bae included changes related to vulnerability fixes, hence reverted them. Had added some log statements related to scalability fixes for model loading. Removed them and will add in separate PR. * Changed imports to access K_MODEL_COUNT Removed redundant "esda" import added possibly during local testing. Removed get_model_limit() as K_MODEL_COUNT is a class variable and can be accessed directly using class name. * Revert "Squashed commit of the following:" This reverts commit 31626baef1909913fe5899813570f9e78cb63a4f. Earlier commit f54e85f missed out the discarded reverted changes relating to the vulnerability fixes. Hence removing them in these. * Included maximum_stored_model_limit parameter in trip_model config file Decided upon threshold value for model count above which redundant models will be deleted. This was agreed upon to be 3 and is defined in the trip_model.config.json.sample file. Necessary changes have been made in the related files to use this config value. * Addressed latest review comments - Improved logging statements to included whether model count has changed. - Skipped check for saved test data in Test file as we clear teardown() related collections in the DB. * Restored inferrers.py Did this to revert a whitespace change which was possibly due to removing a line at the end of the file. We don't want it to be modified as it is unrelated to this set of PR changes. Restored using this command by pointing to specific commit hash for the first instance where the file was modified. git restore --source=79ad1cc~1 emission/analysis/classification/inference/labels/inferrers.py * Updated Test for Storing limited models Added test to ensure that only the latest models are stored by comparing the write_ts times. * Revert whitespace changes * Revert whitespace changes --------- Co-authored-by: Mahadik, Mukul Chandrakant Co-authored-by: K. Shankari --- conf/analysis/trip_model.conf.json.sample | 1 + .../analysis/modelling/trip_model/config.py | 6 + .../modifiable/abstract_model_storage.py | 6 + .../modifiable/builtin_model_storage.py | 39 ++++- .../tests/storageTests/TestModelStorage.py | 157 ++++++++++++++++++ 5 files changed, 208 insertions(+), 1 deletion(-) create mode 100644 emission/tests/storageTests/TestModelStorage.py diff --git a/conf/analysis/trip_model.conf.json.sample b/conf/analysis/trip_model.conf.json.sample index 845e67a6a..37a50fefa 100644 --- a/conf/analysis/trip_model.conf.json.sample +++ b/conf/analysis/trip_model.conf.json.sample @@ -2,6 +2,7 @@ "model_type": "greedy", "model_storage": "document_database", "minimum_trips": 14, + "maximum_stored_model_count": 3, "model_parameters": { "greedy": { "metric": "od_similarity", diff --git a/emission/analysis/modelling/trip_model/config.py b/emission/analysis/modelling/trip_model/config.py index 76b3c6e6d..af6a96809 100644 --- a/emission/analysis/modelling/trip_model/config.py +++ b/emission/analysis/modelling/trip_model/config.py @@ -77,3 +77,9 @@ def get_minimum_trips(): +def get_maximum_stored_model_count(): + maximum_stored_model_count = get_config_value_or_raise('maximum_stored_model_count') + if not isinstance(maximum_stored_model_count, int): + msg = f"config key 'maximum_stored_model_count' not an integer in config file {config_filename}" + raise TypeError(msg) + return maximum_stored_model_count diff --git a/emission/storage/modifiable/abstract_model_storage.py b/emission/storage/modifiable/abstract_model_storage.py index 59b9e529e..614abf37e 100644 --- a/emission/storage/modifiable/abstract_model_storage.py +++ b/emission/storage/modifiable/abstract_model_storage.py @@ -29,3 +29,9 @@ def get_current_model(self, key:str) -> Optional[Dict]: : return: the most recent database entry for this key """ pass + + def trim_model_entries(self, key:str): + """ + :param: the metadata key for the entries, used to identify the model type + """ + pass diff --git a/emission/storage/modifiable/builtin_model_storage.py b/emission/storage/modifiable/builtin_model_storage.py index 35f0f601f..1cbf091df 100644 --- a/emission/storage/modifiable/builtin_model_storage.py +++ b/emission/storage/modifiable/builtin_model_storage.py @@ -7,6 +7,7 @@ import emission.core.get_database as edb import emission.storage.modifiable.abstract_model_storage as esma +import emission.analysis.modelling.trip_model.config as eamtc import emission.core.wrapper.entry as ecwe import emission.core.wrapper.wrapperbase as ecwb @@ -23,9 +24,12 @@ def upsert_model(self, key:str, model: ecwb.WrapperBase): """ logging.debug("upsert_doc called with key %s" % key) entry = ecwe.Entry.create_entry(self.user_id, key, model) + # Cleaning up older models, before inserting new model + self.trim_model_entries(key) logging.debug("Inserting entry %s into model DB" % entry) ins_result = edb.get_model_db().insert_one(entry) - ## TODO: Cleanup old/obsolete models + new_model_count = edb.get_model_db().count_documents({"user_id": self.user_id}) + logging.debug("New model count for user %s = %s" % (self.user_id, new_model_count)) return ins_result.inserted_id def get_current_model(self, key:str) -> Optional[Dict]: @@ -48,3 +52,36 @@ def get_current_model(self, key:str) -> Optional[Dict]: del first_entry["_id"] return first_entry + def trim_model_entries(self, key:str): + """ + :param: the metadata key for the entries, used to identify the model type + This function is called inside the model insertion function just before the + model is inserted, to ensure older models are removed before inserting newer ones. + + The flow of model insertion function calls is: + eamur.update_trip_model() -> eamums.save_model() -> esma.upsert_model() -> esma.trim_model_entries() + """ + old_model_count = edb.get_model_db().count_documents({"user_id": self.user_id}) + deleted_model_count = 0 + find_query = {"user_id": self.user_id, "metadata.key": key} + result_it = edb.get_model_db().find(find_query).sort("metadata.write_ts", -1) + result_list = list(result_it) + maximum_stored_model_count = eamtc.get_maximum_stored_model_count() + if old_model_count >= maximum_stored_model_count: + # Specify the last or minimum timestamp of Kth model entry + write_ts_limit = result_list[maximum_stored_model_count - 1]['metadata']['write_ts'] + logging.debug(f"Write ts limit = {write_ts_limit}") + filter_clause = { + "user_id" : self.user_id, + "metadata.key" : key, + "metadata.write_ts" : { "$lte" : write_ts_limit } + } + models_to_delete = edb.get_model_db().delete_many(filter_clause) + deleted_model_count = models_to_delete.deleted_count + new_model_count = edb.get_model_db().count_documents({"user_id": self.user_id}) + if deleted_model_count > 0: + logging.debug(f"{deleted_model_count} models deleted successfully") + logging.debug("Model count for user %s has changed %s -> %s" % (self.user_id, old_model_count, new_model_count)) + else: + logging.debug("No models found or none deleted") + logging.debug("Model count for user %s unchanged %s -> %s" % (self.user_id, old_model_count, new_model_count)) diff --git a/emission/tests/storageTests/TestModelStorage.py b/emission/tests/storageTests/TestModelStorage.py new file mode 100644 index 000000000..9343eb92d --- /dev/null +++ b/emission/tests/storageTests/TestModelStorage.py @@ -0,0 +1,157 @@ +from __future__ import unicode_literals +from __future__ import print_function +from __future__ import division +from __future__ import absolute_import +# Standard imports +from future import standard_library +standard_library.install_aliases() +from builtins import * +import unittest +import datetime as pydt +import logging +import json +import pymongo +import uuid + +# Our imports +import emission.core.get_database as edb +import emission.storage.decorations.analysis_timeseries_queries as esda +import emission.analysis.modelling.trip_model.model_storage as eamums +import emission.analysis.modelling.trip_model.model_type as eamumt +import emission.analysis.modelling.trip_model.run_model as eamur +import emission.storage.timeseries.abstract_timeseries as esta +import emission.tests.modellingTests.modellingTestAssets as etmm +import emission.analysis.modelling.trip_model.config as eamtc +import emission.storage.modifiable.abstract_model_storage as esma + +# Test imports +import emission.tests.common as etc + +class TestModelStorage(unittest.TestCase): + ''' + Copied over the below code in setup() and testTrimModelEntries() + for model creation using mock dummy trips data from + emission.tests.modellingTests.TestRunGreedyModel.py + ''' + def setUp(self): + logging.basicConfig(format='%(asctime)s:%(levelname)s:%(message)s', + level=logging.DEBUG) + + # configuration for randomly-generated test data + self.user_id = user_id = 'TestRunGreedyModel-TestData' + self.origin = (-105.1705977, 39.7402654,) + self.destination = (-105.1755606, 39.7673075) + self.min_trips = 14 + self.total_trips = 100 + self.clustered_trips = 33 # bins must have at least self.min_trips similar trips by default + self.has_label_percent = 0.9 # let's make a few that don't have a label, but invariant + # $clustered_trips * $has_label_percent > self.min_trips + # must be correct or else this test could fail under some random test cases. + + ts = esta.TimeSeries.get_time_series(user_id) + logging.debug(f"inserting mock Confirmedtrips into database") + + # generate labels with a known sample weight that we can rely on in the test + label_data = { + "mode_confirm": ['ebike', 'bike'], + "purpose_confirm": ['happy-hour', 'dog-park'], + "replaced_mode": ['walk'], + "mode_weights": [0.9, 0.1], + "purpose_weights": [0.1, 0.9] + } + + train = etmm.generate_mock_trips( + user_id=user_id, + trips=self.total_trips, + origin=self.origin, + destination=self.destination, + trip_part='od', + label_data=label_data, + within_threshold=self.clustered_trips, + threshold=0.004, # ~400m + has_label_p=self.has_label_percent + ) + + ts.bulk_insert(train) + + # confirm data write did not fail + test_data = esda.get_entries(key="analysis/confirmed_trip", user_id=user_id, time_query=None) + if len(test_data) != self.total_trips: + logging.debug(f'test invariant failed after generating test data') + self.fail() + else: + logging.debug(f'found {self.total_trips} trips in database') + + def tearDown(self): + edb.get_analysis_timeseries_db().delete_many({'user_id': self.user_id}) + edb.get_model_db().delete_many({'user_id': self.user_id}) + edb.get_pipeline_state_db().delete_many({'user_id': self.user_id}) + + def testTrimModelEntries(self): + """ + Took this code from emission.tests.modellingTests.TestRunGreedyModel.py + with the objective of inserting multiple models into the model_db. + The test involves building and inserting (maximum_stored_model_count + 15) models, which is greater than + the maximum_stored_model_count (= 3) limit defined in conf/analysis/trip_model.conf.json.sample + + train a model, save it, load it, and use it for prediction, using + the high-level training/testing API provided via + run_model.py:update_trip_model() # train + run_model.py:predict_labels_with_n() # test + + for clustering, use the default greedy similarity binning model + """ + # pass along debug model configuration + greedy_model_config = { + "metric": "od_similarity", + "similarity_threshold_meters": 500, + "apply_cutoff": False, + "clustering_way": 'origin-destination', + "incremental_evaluation": False + } + maximum_stored_model_count = eamtc.get_maximum_stored_model_count() + logging.debug(f'(TRAIN) creating a model based on trips in database') + model_creation_write_ts_list = [] + stored_model_write_ts_list = [] + ms = esma.ModelStorage.get_model_storage(self.user_id,) + for i in range(maximum_stored_model_count + 15): + logging.debug(f"Creating dummy model no. {i}") + eamur.update_trip_model( + user_id=self.user_id, + model_type=eamumt.ModelType.GREEDY_SIMILARITY_BINNING, + model_storage=eamums.ModelStorage.DOCUMENT_DATABASE, + min_trips=self.min_trips, + model_config=greedy_model_config + ) + latest_model_entry = ms.get_current_model(key=esda.TRIP_MODEL_STORE_KEY) + model_creation_write_ts_list.append(latest_model_entry['metadata']['write_ts']) + current_model_count = edb.get_model_db().count_documents({"user_id": self.user_id}) + + """ + Test 1: Ensure that the total number of models in the model_DB is less than or equal to the maximum_stored_model_count + - Can use assertLessEqual() but using assertEqual to distinguish between the + cases when it should be less and when it should be equal. + """ + if i <= (maximum_stored_model_count - 1): + self.assertEqual(current_model_count, i+1) + else: + self.assertEqual(current_model_count, maximum_stored_model_count) + + find_query = {"user_id": self.user_id, "metadata.key": esda.TRIP_MODEL_STORE_KEY} + result_it = edb.get_model_db().find(find_query) + result_list = list(result_it) + stored_model_write_ts_list = [model['metadata']['write_ts'] for model in result_list] + + """ + Test 2: Ensure that the latest 'maximum_stored_model_count' models are only stored and the oldest are deleted and not the other way around. + - This involves storing the write_ts times in two lists: + - model_creation_write_ts_list : stores write_ts times each time a model is created in the for loop. + - stored_model_write_ts_list : stores write_ts times of all the already stored models in the DB, which should just have the latest models. + - The last 'maximum_stored_model_count' in model_creation_write_ts_list should match those in stored_model_write_ts_list. + """ + self.assertEqual(model_creation_write_ts_list[-maximum_stored_model_count : ], stored_model_write_ts_list) + +if __name__ == '__main__': + import emission.tests.common as etc + etc.configLogging() + unittest.main()