diff --git a/conf/analysis/trip_model.conf.json.sample b/conf/analysis/trip_model.conf.json.sample index 845e67a6a..37a50fefa 100644 --- a/conf/analysis/trip_model.conf.json.sample +++ b/conf/analysis/trip_model.conf.json.sample @@ -2,6 +2,7 @@ "model_type": "greedy", "model_storage": "document_database", "minimum_trips": 14, + "maximum_stored_model_count": 3, "model_parameters": { "greedy": { "metric": "od_similarity", diff --git a/emission/analysis/modelling/trip_model/config.py b/emission/analysis/modelling/trip_model/config.py index 76b3c6e6d..af6a96809 100644 --- a/emission/analysis/modelling/trip_model/config.py +++ b/emission/analysis/modelling/trip_model/config.py @@ -77,3 +77,9 @@ def get_minimum_trips(): +def get_maximum_stored_model_count(): + maximum_stored_model_count = get_config_value_or_raise('maximum_stored_model_count') + if not isinstance(maximum_stored_model_count, int): + msg = f"config key 'maximum_stored_model_count' not an integer in config file {config_filename}" + raise TypeError(msg) + return maximum_stored_model_count diff --git a/emission/storage/modifiable/abstract_model_storage.py b/emission/storage/modifiable/abstract_model_storage.py index 59b9e529e..614abf37e 100644 --- a/emission/storage/modifiable/abstract_model_storage.py +++ b/emission/storage/modifiable/abstract_model_storage.py @@ -29,3 +29,9 @@ def get_current_model(self, key:str) -> Optional[Dict]: : return: the most recent database entry for this key """ pass + + def trim_model_entries(self, key:str): + """ + :param: the metadata key for the entries, used to identify the model type + """ + pass diff --git a/emission/storage/modifiable/builtin_model_storage.py b/emission/storage/modifiable/builtin_model_storage.py index 35f0f601f..1cbf091df 100644 --- a/emission/storage/modifiable/builtin_model_storage.py +++ b/emission/storage/modifiable/builtin_model_storage.py @@ -7,6 +7,7 @@ import emission.core.get_database as edb import emission.storage.modifiable.abstract_model_storage as esma +import emission.analysis.modelling.trip_model.config as eamtc import emission.core.wrapper.entry as ecwe import emission.core.wrapper.wrapperbase as ecwb @@ -23,9 +24,12 @@ def upsert_model(self, key:str, model: ecwb.WrapperBase): """ logging.debug("upsert_doc called with key %s" % key) entry = ecwe.Entry.create_entry(self.user_id, key, model) + # Cleaning up older models, before inserting new model + self.trim_model_entries(key) logging.debug("Inserting entry %s into model DB" % entry) ins_result = edb.get_model_db().insert_one(entry) - ## TODO: Cleanup old/obsolete models + new_model_count = edb.get_model_db().count_documents({"user_id": self.user_id}) + logging.debug("New model count for user %s = %s" % (self.user_id, new_model_count)) return ins_result.inserted_id def get_current_model(self, key:str) -> Optional[Dict]: @@ -48,3 +52,36 @@ def get_current_model(self, key:str) -> Optional[Dict]: del first_entry["_id"] return first_entry + def trim_model_entries(self, key:str): + """ + :param: the metadata key for the entries, used to identify the model type + This function is called inside the model insertion function just before the + model is inserted, to ensure older models are removed before inserting newer ones. + + The flow of model insertion function calls is: + eamur.update_trip_model() -> eamums.save_model() -> esma.upsert_model() -> esma.trim_model_entries() + """ + old_model_count = edb.get_model_db().count_documents({"user_id": self.user_id}) + deleted_model_count = 0 + find_query = {"user_id": self.user_id, "metadata.key": key} + result_it = edb.get_model_db().find(find_query).sort("metadata.write_ts", -1) + result_list = list(result_it) + maximum_stored_model_count = eamtc.get_maximum_stored_model_count() + if old_model_count >= maximum_stored_model_count: + # Specify the last or minimum timestamp of Kth model entry + write_ts_limit = result_list[maximum_stored_model_count - 1]['metadata']['write_ts'] + logging.debug(f"Write ts limit = {write_ts_limit}") + filter_clause = { + "user_id" : self.user_id, + "metadata.key" : key, + "metadata.write_ts" : { "$lte" : write_ts_limit } + } + models_to_delete = edb.get_model_db().delete_many(filter_clause) + deleted_model_count = models_to_delete.deleted_count + new_model_count = edb.get_model_db().count_documents({"user_id": self.user_id}) + if deleted_model_count > 0: + logging.debug(f"{deleted_model_count} models deleted successfully") + logging.debug("Model count for user %s has changed %s -> %s" % (self.user_id, old_model_count, new_model_count)) + else: + logging.debug("No models found or none deleted") + logging.debug("Model count for user %s unchanged %s -> %s" % (self.user_id, old_model_count, new_model_count)) diff --git a/emission/tests/storageTests/TestModelStorage.py b/emission/tests/storageTests/TestModelStorage.py new file mode 100644 index 000000000..9343eb92d --- /dev/null +++ b/emission/tests/storageTests/TestModelStorage.py @@ -0,0 +1,157 @@ +from __future__ import unicode_literals +from __future__ import print_function +from __future__ import division +from __future__ import absolute_import +# Standard imports +from future import standard_library +standard_library.install_aliases() +from builtins import * +import unittest +import datetime as pydt +import logging +import json +import pymongo +import uuid + +# Our imports +import emission.core.get_database as edb +import emission.storage.decorations.analysis_timeseries_queries as esda +import emission.analysis.modelling.trip_model.model_storage as eamums +import emission.analysis.modelling.trip_model.model_type as eamumt +import emission.analysis.modelling.trip_model.run_model as eamur +import emission.storage.timeseries.abstract_timeseries as esta +import emission.tests.modellingTests.modellingTestAssets as etmm +import emission.analysis.modelling.trip_model.config as eamtc +import emission.storage.modifiable.abstract_model_storage as esma + +# Test imports +import emission.tests.common as etc + +class TestModelStorage(unittest.TestCase): + ''' + Copied over the below code in setup() and testTrimModelEntries() + for model creation using mock dummy trips data from + emission.tests.modellingTests.TestRunGreedyModel.py + ''' + def setUp(self): + logging.basicConfig(format='%(asctime)s:%(levelname)s:%(message)s', + level=logging.DEBUG) + + # configuration for randomly-generated test data + self.user_id = user_id = 'TestRunGreedyModel-TestData' + self.origin = (-105.1705977, 39.7402654,) + self.destination = (-105.1755606, 39.7673075) + self.min_trips = 14 + self.total_trips = 100 + self.clustered_trips = 33 # bins must have at least self.min_trips similar trips by default + self.has_label_percent = 0.9 # let's make a few that don't have a label, but invariant + # $clustered_trips * $has_label_percent > self.min_trips + # must be correct or else this test could fail under some random test cases. + + ts = esta.TimeSeries.get_time_series(user_id) + logging.debug(f"inserting mock Confirmedtrips into database") + + # generate labels with a known sample weight that we can rely on in the test + label_data = { + "mode_confirm": ['ebike', 'bike'], + "purpose_confirm": ['happy-hour', 'dog-park'], + "replaced_mode": ['walk'], + "mode_weights": [0.9, 0.1], + "purpose_weights": [0.1, 0.9] + } + + train = etmm.generate_mock_trips( + user_id=user_id, + trips=self.total_trips, + origin=self.origin, + destination=self.destination, + trip_part='od', + label_data=label_data, + within_threshold=self.clustered_trips, + threshold=0.004, # ~400m + has_label_p=self.has_label_percent + ) + + ts.bulk_insert(train) + + # confirm data write did not fail + test_data = esda.get_entries(key="analysis/confirmed_trip", user_id=user_id, time_query=None) + if len(test_data) != self.total_trips: + logging.debug(f'test invariant failed after generating test data') + self.fail() + else: + logging.debug(f'found {self.total_trips} trips in database') + + def tearDown(self): + edb.get_analysis_timeseries_db().delete_many({'user_id': self.user_id}) + edb.get_model_db().delete_many({'user_id': self.user_id}) + edb.get_pipeline_state_db().delete_many({'user_id': self.user_id}) + + def testTrimModelEntries(self): + """ + Took this code from emission.tests.modellingTests.TestRunGreedyModel.py + with the objective of inserting multiple models into the model_db. + The test involves building and inserting (maximum_stored_model_count + 15) models, which is greater than + the maximum_stored_model_count (= 3) limit defined in conf/analysis/trip_model.conf.json.sample + + train a model, save it, load it, and use it for prediction, using + the high-level training/testing API provided via + run_model.py:update_trip_model() # train + run_model.py:predict_labels_with_n() # test + + for clustering, use the default greedy similarity binning model + """ + # pass along debug model configuration + greedy_model_config = { + "metric": "od_similarity", + "similarity_threshold_meters": 500, + "apply_cutoff": False, + "clustering_way": 'origin-destination', + "incremental_evaluation": False + } + maximum_stored_model_count = eamtc.get_maximum_stored_model_count() + logging.debug(f'(TRAIN) creating a model based on trips in database') + model_creation_write_ts_list = [] + stored_model_write_ts_list = [] + ms = esma.ModelStorage.get_model_storage(self.user_id,) + for i in range(maximum_stored_model_count + 15): + logging.debug(f"Creating dummy model no. {i}") + eamur.update_trip_model( + user_id=self.user_id, + model_type=eamumt.ModelType.GREEDY_SIMILARITY_BINNING, + model_storage=eamums.ModelStorage.DOCUMENT_DATABASE, + min_trips=self.min_trips, + model_config=greedy_model_config + ) + latest_model_entry = ms.get_current_model(key=esda.TRIP_MODEL_STORE_KEY) + model_creation_write_ts_list.append(latest_model_entry['metadata']['write_ts']) + current_model_count = edb.get_model_db().count_documents({"user_id": self.user_id}) + + """ + Test 1: Ensure that the total number of models in the model_DB is less than or equal to the maximum_stored_model_count + - Can use assertLessEqual() but using assertEqual to distinguish between the + cases when it should be less and when it should be equal. + """ + if i <= (maximum_stored_model_count - 1): + self.assertEqual(current_model_count, i+1) + else: + self.assertEqual(current_model_count, maximum_stored_model_count) + + find_query = {"user_id": self.user_id, "metadata.key": esda.TRIP_MODEL_STORE_KEY} + result_it = edb.get_model_db().find(find_query) + result_list = list(result_it) + stored_model_write_ts_list = [model['metadata']['write_ts'] for model in result_list] + + """ + Test 2: Ensure that the latest 'maximum_stored_model_count' models are only stored and the oldest are deleted and not the other way around. + - This involves storing the write_ts times in two lists: + - model_creation_write_ts_list : stores write_ts times each time a model is created in the for loop. + - stored_model_write_ts_list : stores write_ts times of all the already stored models in the DB, which should just have the latest models. + - The last 'maximum_stored_model_count' in model_creation_write_ts_list should match those in stored_model_write_ts_list. + """ + self.assertEqual(model_creation_write_ts_list[-maximum_stored_model_count : ], stored_model_write_ts_list) + +if __name__ == '__main__': + import emission.tests.common as etc + etc.configLogging() + unittest.main()