Skip to content

Commit

Permalink
Store limited number of models and clear older ones (#948)
Browse files Browse the repository at this point in the history
* Added details in logging statements

Completed marked future fixes for PR-944
Provided more detailed info regarding number of distinct users, as well  as number of predictions made for the trip_list.

* Implemented logic to clear models

Clearing or trimming number of models for a particular user from the Stage_updateable_models collection.

Constant K_MODELS_COUNT defines the number of models to keep.

Currently tested with model data available in stage dataset snapshot.

Pending - Complete testing with assertions to be added for testing.

* Added function call

Called the trim_model_entries function in upsert_model to delete models before inserting new ones.

* Squashed commit of the following:

commit 54659fb
Merge: cf0c9e2 1159eac
Author: K. Shankari <[email protected]>
Date:   Thu Dec 21 20:17:15 2023 -0800

    Merge pull request #951 from MukuFlash03/fix-vuln

    Bulk deletion of site-package/tests

commit 1159eac
Author: Mahadik, Mukul Chandrakant <[email protected]>
Date:   Thu Dec 21 20:43:39 2023 -0700

    Bulk deletion of site-package/tests

    Added a one line pipe command to remove all occurrences of site-packages/tests occurring in miniconda main directory.

commit cf0c9e2
Merge: d2f38bc 3be2757
Author: K. Shankari <[email protected]>
Date:   Thu Dec 21 17:47:27 2023 -0800

    Merge pull request #950 from MukuFlash03/fix-vuln

    Remove obsolete package versions

commit 3be2757
Author: Mahadik, Mukul Chandrakant <[email protected]>
Date:   Thu Dec 21 18:05:23 2023 -0700

    Remove obsolete package versions

    Cleaned up older versions for two packages:
    urllib3 - deleted stale version folders
    python - deleted tests folder

commit d2f38bc
Merge: 978a719 c1b0889
Author: K. Shankari <[email protected]>
Date:   Wed Dec 20 14:31:09 2023 -0800

    Merge pull request #949 from MukuFlash03/fix-vuln

    Fixing latest Docker image vulnerabilities

commit c1b0889
Author: Mahadik, Mukul Chandrakant <[email protected]>
Date:   Mon Dec 18 11:04:25 2023 -0700

    Upgraded Ubuntu base image

    Latest Ubuntu base image was just released officially by Docker which contains updated version of libc6 and libc-bin.

commit 07747d0
Author: Mahadik, Mukul Chandrakant <[email protected]>
Date:   Fri Dec 15 18:38:12 2023 -0700

    Fixing latest Docker image vulnerabilities

    AWS Inspector found the following vulnerable packages:

    CRITICAL
    perl

    HIGH
    nghttp2, libnghttp2-14
    cryptography, libssl3
    cryptography
    libc6, libc-bin

    Upgraded perl, libssl3, nghttp2 packages by upgrading base Ubuntu image to latest of the same LTS version - jammy (22.04).

    Cryptography package was fixed by mentioning required version to be installed using conda.

    Libc6, Libc-bin can be fixed by using apt-get upgrade but this upgrades all packages which is not recommended as a blanket upgrade fix.

* Updated Test file with dummy models

Was using pre-built models for testing that were present in stage_snapshot dataset but these won't be available in the normal dataset.

Hence, took reference from emission.tests.modellingTests.TestRunGreedyModel.py for creating test models using dummy trip data.

Creating multiple such models and then checking for trimming operation success.

* Reverted commit 31626ba + Removed log statements

Commit 31626ba included changes related to vulnerability fixes, hence reverted them.

Had added some log statements related to scalability fixes for model loading.
Removed them and will add in separate PR.

* Changed imports to access K_MODEL_COUNT

Removed redundant "esda" import added possibly during local testing.

Removed get_model_limit() as K_MODEL_COUNT is a class variable and can be accessed directly using class name.

* Revert "Squashed commit of the following:"

This reverts commit 31626ba.

Earlier commit f54e85f missed out the discarded reverted changes relating to the vulnerability fixes.
Hence removing them in these.

* Included maximum_stored_model_limit parameter in trip_model config file

Decided upon threshold value for model count above which redundant models will be deleted.
This was agreed upon to be 3 and is defined in the trip_model.config.json.sample file.

Necessary changes have been made in the related files to use this config value.

* Addressed latest review comments

- Improved logging statements to included whether model count has changed.
- Skipped check for saved test data in Test file as we clear teardown() related collections in the DB.

* Restored inferrers.py

Did this to revert a whitespace change which was possibly due to removing a line at the end of the file.

We don't want it to be modified as it is unrelated to this set of PR changes.

Restored using this command by pointing to specific commit hash for the first instance where the file was modified.

git restore --source=79ad1cc~1 emission/analysis/classification/inference/labels/inferrers.py

* Updated Test for Storing limited models

Added test to ensure that only the latest models are stored by comparing the write_ts times.

* Revert whitespace changes

* Revert whitespace changes

---------

Co-authored-by: Mahadik, Mukul Chandrakant <[email protected]>
Co-authored-by: K. Shankari <[email protected]>
  • Loading branch information
3 people authored Feb 10, 2024
1 parent 0a8c61d commit 911e1ec
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 1 deletion.
1 change: 1 addition & 0 deletions conf/analysis/trip_model.conf.json.sample
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"model_type": "greedy",
"model_storage": "document_database",
"minimum_trips": 14,
"maximum_stored_model_count": 3,
"model_parameters": {
"greedy": {
"metric": "od_similarity",
Expand Down
6 changes: 6 additions & 0 deletions emission/analysis/modelling/trip_model/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,9 @@ def get_minimum_trips():



def get_maximum_stored_model_count():
maximum_stored_model_count = get_config_value_or_raise('maximum_stored_model_count')
if not isinstance(maximum_stored_model_count, int):
msg = f"config key 'maximum_stored_model_count' not an integer in config file {config_filename}"
raise TypeError(msg)
return maximum_stored_model_count
6 changes: 6 additions & 0 deletions emission/storage/modifiable/abstract_model_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,9 @@ def get_current_model(self, key:str) -> Optional[Dict]:
: return: the most recent database entry for this key
"""
pass

def trim_model_entries(self, key:str):
"""
:param: the metadata key for the entries, used to identify the model type
"""
pass
39 changes: 38 additions & 1 deletion emission/storage/modifiable/builtin_model_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import emission.core.get_database as edb
import emission.storage.modifiable.abstract_model_storage as esma

import emission.analysis.modelling.trip_model.config as eamtc
import emission.core.wrapper.entry as ecwe
import emission.core.wrapper.wrapperbase as ecwb

Expand All @@ -23,9 +24,12 @@ def upsert_model(self, key:str, model: ecwb.WrapperBase):
"""
logging.debug("upsert_doc called with key %s" % key)
entry = ecwe.Entry.create_entry(self.user_id, key, model)
# Cleaning up older models, before inserting new model
self.trim_model_entries(key)
logging.debug("Inserting entry %s into model DB" % entry)
ins_result = edb.get_model_db().insert_one(entry)
## TODO: Cleanup old/obsolete models
new_model_count = edb.get_model_db().count_documents({"user_id": self.user_id})
logging.debug("New model count for user %s = %s" % (self.user_id, new_model_count))
return ins_result.inserted_id

def get_current_model(self, key:str) -> Optional[Dict]:
Expand All @@ -48,3 +52,36 @@ def get_current_model(self, key:str) -> Optional[Dict]:
del first_entry["_id"]
return first_entry

def trim_model_entries(self, key:str):
"""
:param: the metadata key for the entries, used to identify the model type
This function is called inside the model insertion function just before the
model is inserted, to ensure older models are removed before inserting newer ones.
The flow of model insertion function calls is:
eamur.update_trip_model() -> eamums.save_model() -> esma.upsert_model() -> esma.trim_model_entries()
"""
old_model_count = edb.get_model_db().count_documents({"user_id": self.user_id})
deleted_model_count = 0
find_query = {"user_id": self.user_id, "metadata.key": key}
result_it = edb.get_model_db().find(find_query).sort("metadata.write_ts", -1)
result_list = list(result_it)
maximum_stored_model_count = eamtc.get_maximum_stored_model_count()
if old_model_count >= maximum_stored_model_count:
# Specify the last or minimum timestamp of Kth model entry
write_ts_limit = result_list[maximum_stored_model_count - 1]['metadata']['write_ts']
logging.debug(f"Write ts limit = {write_ts_limit}")
filter_clause = {
"user_id" : self.user_id,
"metadata.key" : key,
"metadata.write_ts" : { "$lte" : write_ts_limit }
}
models_to_delete = edb.get_model_db().delete_many(filter_clause)
deleted_model_count = models_to_delete.deleted_count
new_model_count = edb.get_model_db().count_documents({"user_id": self.user_id})
if deleted_model_count > 0:
logging.debug(f"{deleted_model_count} models deleted successfully")
logging.debug("Model count for user %s has changed %s -> %s" % (self.user_id, old_model_count, new_model_count))
else:
logging.debug("No models found or none deleted")
logging.debug("Model count for user %s unchanged %s -> %s" % (self.user_id, old_model_count, new_model_count))
157 changes: 157 additions & 0 deletions emission/tests/storageTests/TestModelStorage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
from __future__ import unicode_literals
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
# Standard imports
from future import standard_library
standard_library.install_aliases()
from builtins import *
import unittest
import datetime as pydt
import logging
import json
import pymongo
import uuid

# Our imports
import emission.core.get_database as edb
import emission.storage.decorations.analysis_timeseries_queries as esda
import emission.analysis.modelling.trip_model.model_storage as eamums
import emission.analysis.modelling.trip_model.model_type as eamumt
import emission.analysis.modelling.trip_model.run_model as eamur
import emission.storage.timeseries.abstract_timeseries as esta
import emission.tests.modellingTests.modellingTestAssets as etmm
import emission.analysis.modelling.trip_model.config as eamtc
import emission.storage.modifiable.abstract_model_storage as esma

# Test imports
import emission.tests.common as etc

class TestModelStorage(unittest.TestCase):
'''
Copied over the below code in setup() and testTrimModelEntries()
for model creation using mock dummy trips data from
emission.tests.modellingTests.TestRunGreedyModel.py
'''
def setUp(self):
logging.basicConfig(format='%(asctime)s:%(levelname)s:%(message)s',
level=logging.DEBUG)

# configuration for randomly-generated test data
self.user_id = user_id = 'TestRunGreedyModel-TestData'
self.origin = (-105.1705977, 39.7402654,)
self.destination = (-105.1755606, 39.7673075)
self.min_trips = 14
self.total_trips = 100
self.clustered_trips = 33 # bins must have at least self.min_trips similar trips by default
self.has_label_percent = 0.9 # let's make a few that don't have a label, but invariant
# $clustered_trips * $has_label_percent > self.min_trips
# must be correct or else this test could fail under some random test cases.

ts = esta.TimeSeries.get_time_series(user_id)
logging.debug(f"inserting mock Confirmedtrips into database")

# generate labels with a known sample weight that we can rely on in the test
label_data = {
"mode_confirm": ['ebike', 'bike'],
"purpose_confirm": ['happy-hour', 'dog-park'],
"replaced_mode": ['walk'],
"mode_weights": [0.9, 0.1],
"purpose_weights": [0.1, 0.9]
}

train = etmm.generate_mock_trips(
user_id=user_id,
trips=self.total_trips,
origin=self.origin,
destination=self.destination,
trip_part='od',
label_data=label_data,
within_threshold=self.clustered_trips,
threshold=0.004, # ~400m
has_label_p=self.has_label_percent
)

ts.bulk_insert(train)

# confirm data write did not fail
test_data = esda.get_entries(key="analysis/confirmed_trip", user_id=user_id, time_query=None)
if len(test_data) != self.total_trips:
logging.debug(f'test invariant failed after generating test data')
self.fail()
else:
logging.debug(f'found {self.total_trips} trips in database')

def tearDown(self):
edb.get_analysis_timeseries_db().delete_many({'user_id': self.user_id})
edb.get_model_db().delete_many({'user_id': self.user_id})
edb.get_pipeline_state_db().delete_many({'user_id': self.user_id})

def testTrimModelEntries(self):
"""
Took this code from emission.tests.modellingTests.TestRunGreedyModel.py
with the objective of inserting multiple models into the model_db.
The test involves building and inserting (maximum_stored_model_count + 15) models, which is greater than
the maximum_stored_model_count (= 3) limit defined in conf/analysis/trip_model.conf.json.sample
train a model, save it, load it, and use it for prediction, using
the high-level training/testing API provided via
run_model.py:update_trip_model() # train
run_model.py:predict_labels_with_n() # test
for clustering, use the default greedy similarity binning model
"""
# pass along debug model configuration
greedy_model_config = {
"metric": "od_similarity",
"similarity_threshold_meters": 500,
"apply_cutoff": False,
"clustering_way": 'origin-destination',
"incremental_evaluation": False
}
maximum_stored_model_count = eamtc.get_maximum_stored_model_count()
logging.debug(f'(TRAIN) creating a model based on trips in database')
model_creation_write_ts_list = []
stored_model_write_ts_list = []
ms = esma.ModelStorage.get_model_storage(self.user_id,)
for i in range(maximum_stored_model_count + 15):
logging.debug(f"Creating dummy model no. {i}")
eamur.update_trip_model(
user_id=self.user_id,
model_type=eamumt.ModelType.GREEDY_SIMILARITY_BINNING,
model_storage=eamums.ModelStorage.DOCUMENT_DATABASE,
min_trips=self.min_trips,
model_config=greedy_model_config
)
latest_model_entry = ms.get_current_model(key=esda.TRIP_MODEL_STORE_KEY)
model_creation_write_ts_list.append(latest_model_entry['metadata']['write_ts'])
current_model_count = edb.get_model_db().count_documents({"user_id": self.user_id})

"""
Test 1: Ensure that the total number of models in the model_DB is less than or equal to the maximum_stored_model_count
- Can use assertLessEqual() but using assertEqual to distinguish between the
cases when it should be less and when it should be equal.
"""
if i <= (maximum_stored_model_count - 1):
self.assertEqual(current_model_count, i+1)
else:
self.assertEqual(current_model_count, maximum_stored_model_count)

find_query = {"user_id": self.user_id, "metadata.key": esda.TRIP_MODEL_STORE_KEY}
result_it = edb.get_model_db().find(find_query)
result_list = list(result_it)
stored_model_write_ts_list = [model['metadata']['write_ts'] for model in result_list]

"""
Test 2: Ensure that the latest 'maximum_stored_model_count' models are only stored and the oldest are deleted and not the other way around.
- This involves storing the write_ts times in two lists:
- model_creation_write_ts_list : stores write_ts times each time a model is created in the for loop.
- stored_model_write_ts_list : stores write_ts times of all the already stored models in the DB, which should just have the latest models.
- The last 'maximum_stored_model_count' in model_creation_write_ts_list should match those in stored_model_write_ts_list.
"""
self.assertEqual(model_creation_write_ts_list[-maximum_stored_model_count : ], stored_model_write_ts_list)

if __name__ == '__main__':
import emission.tests.common as etc
etc.configLogging()
unittest.main()

0 comments on commit 911e1ec

Please sign in to comment.