diff --git a/conf/analysis/trip_model.conf.json.sample b/conf/analysis/trip_model.conf.json.sample index 845e67a6a..4851be5d6 100644 --- a/conf/analysis/trip_model.conf.json.sample +++ b/conf/analysis/trip_model.conf.json.sample @@ -1,5 +1,5 @@ { - "model_type": "greedy", + "model_type": "forest", "model_storage": "document_database", "minimum_trips": 14, "model_parameters": { @@ -8,6 +8,24 @@ "similarity_threshold_meters": 500, "apply_cutoff": false, "incremental_evaluation": false + }, + "forest": { + "loc_feature" : "coordinates", + "radius": 100, + "size_thresh":1, + "purity_thresh":1.0, + "gamma":0.05, + "C":1, + "n_estimators":100, + "criterion":"gini", + "max_depth":null, + "min_samples_split":2, + "min_samples_leaf":1, + "max_features":"sqrt", + "bootstrap":true, + "random_state":42, + "use_start_clusters":false, + "use_trip_clusters":true } } } \ No newline at end of file diff --git a/emission/analysis/modelling/trip_model/forest_classifier.py b/emission/analysis/modelling/trip_model/forest_classifier.py new file mode 100644 index 000000000..16eee014f --- /dev/null +++ b/emission/analysis/modelling/trip_model/forest_classifier.py @@ -0,0 +1,197 @@ +import joblib +from typing import Dict, List, Optional, Tuple +import sklearn.metrics.pairwise as smp +import emission.core.wrapper.confirmedtrip as ecwc +import logging +from io import BytesIO + +import json +import emission.analysis.modelling.trip_model.trip_model as eamuu +import emission.analysis.modelling.trip_model.config as eamtc +import emission.storage.timeseries.builtin_timeseries as estb +import emission.storage.decorations.trip_queries as esdtq +import emission.analysis.modelling.trip_model.models as eamtm + +EARTH_RADIUS = 6371000 + +class ForestClassifierModel(eamuu.TripModel): + + def __init__(self,config=None): + + if config is None: + config = eamtc.get_config_value_or_raise('model_parameters.forest') + logging.debug(f'ForestClassifier loaded model config from file') + else: + logging.debug(f'ForestClassifier using model config argument') + + random_forest_expected_keys = [ + 'loc_feature', + 'n_estimators', + 'criterion', + 'min_samples_split', + 'min_samples_leaf', + 'max_features', + 'bootstrap', + ] + ######### Not Tested ######### + # The below code is used when we cluster the coordinates (loc_cluster parameter = True) + # before passing to Random Forest. Commenting this for now since it is not used. Not tested either. + ############################### + + # cluster_expected_keys= [ + # 'radius', + # 'size_thresh', + # 'purity_thresh', + # 'gamma', + # 'C', + # 'use_start_clusters', + # 'use_trip_clusters', + # ] + # + # if config['loc_feature'] == 'cluster': + # for k in cluster_expected_keys: + # if config.get(k) is None: + # msg = f"cluster trip model config missing expected key {k}" + # raise KeyError(msg) + ####################################### + for k in random_forest_expected_keys: + if config.get(k) is None: + msg = f"forest trip model config missing expected key {k}" + raise KeyError(msg) + self.model=eamtm.ForestClassifier(**config) + + + def fit(self,trips: List[ecwc.Confirmedtrip]): + ''' + trips : List of Entry type data + ''' + # check and raise exception if no data to fit + logging.debug(f'fit called with {len(trips)} trips') + + unlabeled = list(filter(lambda t: len(t['data']['user_input']) == 0, trips)) + if len(unlabeled) > 0: + msg = f'model.fit cannot be called with unlabeled trips, found {len(unlabeled)}' + raise Exception(msg) + + #Convert List of Entry to dataframe + data_df = estb.BuiltinTimeSeries.to_data_df("analysis/confirmed_trip",trips) + labeled_trip_df = esdtq.filter_labeled_trips(data_df) + expanded_labeled_trip_df= esdtq.expand_userinputs(labeled_trip_df) + #fit models on dataframe + self.model.fit(expanded_labeled_trip_df) + + + def predict(self, trip: List[float]) -> Tuple[List[Dict], int]: + ''' + trip : A single trip whose mode, pupose and replaced mode are required + returns. + ''' + + #check if theres no trip to predict + logging.debug(f"forest classifier predict called with {len(trip)} trips") + if len(trip) == 0: + msg = f'model.predict cannot be called with an empty trip' + raise Exception(msg) + # CONVERT TRIP TO dataFrame + test_df = estb.BuiltinTimeSeries.to_data_df("analysis/confirmed_trip",[trip]) + predcitions_df= self.model.predict(test_df) + + # the predictions_df currently holds the highest probable options + # individually in all three categories. the predictions_df are in the form + # + # purpose_pred | purpose_proba | mode_pred | mode_proba | replaced_pred | replaced proba + # dog-park | 1.0 | e-bike | 0.99 | walk | 1.1 + # + # + # However, to keep the trip model general, the forest model is expected to return + # + #PREDICTIONS [ {'labels': {'mode_confirm': 'e-bike', 'replaced_mode': 'walk', 'purpose_confirm': 'dog-park'}, + # 'p': ( Currently average of the 3 probabilities)}] + labels= { + 'mode_confirm': predcitions_df['mode_pred'].iloc[0], + 'replaced_mode' : predcitions_df['replaced_pred'].iloc[0], + 'purpose_confirm' : predcitions_df['purpose_pred'].iloc[0] + } + + avg_proba = predcitions_df[['purpose_proba','mode_proba','replaced_proba']].mean(axis=1).iloc[0] + predictions =[{ + 'labels' : labels, + 'p' : avg_proba + }] + return predictions, len(predictions) + + def to_dict(self): + """ + Convert the model to a dictionary suitable for storage. + """ + data={} + attr=[ 'purpose_predictor','mode_predictor','replaced_predictor','purpose_enc','mode_enc','train_df'] + + ######### Not Tested ######### + # The below code is used when we cluster the coordinates (loc_cluster parameter = True) + # before passing to Random Forest. Commenting this for now since it is not used. Not tested either. + ############################### + # if self.model.loc_feature == 'cluster': + # ## confirm this includes all the extra encoders/models + # attr.extend([ 'cluster_enc','end_cluster_model','start_cluster_model','trip_grouper']) + + for attribute_name in attr: + if not hasattr(self.model,attribute_name): + raise ValueError(f"Attribute {attribute_name} not found in the model") + + buffer=BytesIO() + try: + joblib.dump(getattr(self.model,attribute_name),buffer) + except Exception as e: + raise RuntimeError(f"Error serializing { attribute_name}: {str(e)}") + buffer.seek(0) + data[attribute_name]=buffer.getvalue() + + return data + + def from_dict(self,model: Dict): + """ + Load the model from a dictionary. + """ + attr=[ 'purpose_predictor','mode_predictor','replaced_predictor','purpose_enc','mode_enc','train_df'] + + ######### Not Tested ######### + # The below code is used when we cluster the coordinates (loc_cluster parameter = True) + # before passing to Random Forest. Commenting this for now since it is not used. Not tested either. + ############################### + # if self.model.loc_feature == 'cluster': + # ## TODO : confirm this includes all the extra encoders/models + # attr.extend([ 'cluster_enc','end_cluster_model','start_cluster_model','trip_grouper']) + for attribute_name in attr: + if attribute_name not in model: + raise ValueError(f"Attribute {attribute_name} missing in the model") + try: + buffer = BytesIO(model[attribute_name]) + setattr(self.model,attribute_name, joblib.load(buffer)) + except Exception as e: + raise RuntimeError(f"Error deserializing { attribute_name}: {str(e)}") + # If we do not wish to raise the exception after logging the error, comment the line above + + def extract_features(self, trip: ecwc.Confirmedtrip) -> List[float]: + """ + extract the relevant features for learning from a trip for this model instance + + :param trip: the trip to extract features from + :type trip: Confirmedtrip + :return: a vector containing features to predict from + :rtype: List[float] + """ + # ForestClassifier class in models.py file handles features extraction. + pass + + def is_incremental(self) -> bool: + """ + whether this model requires the complete user history to build (False), + or, if only the incremental data since last execution is required (True). + + :return: if the model is incremental. the current timestamp will be recorded + in the analysis pipeline. the next call to this model will only include + trip data for trips later than the recorded timestamp. + :rtype: bool + """ + pass \ No newline at end of file diff --git a/emission/analysis/modelling/trip_model/model_type.py b/emission/analysis/modelling/trip_model/model_type.py index b5e761fb0..16f27ae78 100644 --- a/emission/analysis/modelling/trip_model/model_type.py +++ b/emission/analysis/modelling/trip_model/model_type.py @@ -3,6 +3,7 @@ import emission.analysis.modelling.trip_model.trip_model as eamuu import emission.analysis.modelling.similarity.od_similarity as eamso import emission.analysis.modelling.trip_model.greedy_similarity_binning as eamug +import emission.analysis.modelling.trip_model.forest_classifier as eamuf SIMILARITY_THRESHOLD_METERS=500 @@ -11,6 +12,7 @@ class ModelType(Enum): # ENUM_NAME_CAPS = 'SHORTHAND_NAME_CAPS' GREEDY_SIMILARITY_BINNING = 'GREEDY' + RANDOM_FOREST_CLASSIFIER = 'FOREST' def build(self, config=None) -> eamuu.TripModel: """ @@ -24,16 +26,16 @@ def build(self, config=None) -> eamuu.TripModel: :raises KeyError: if the requested model name does not exist """ # Dict[ModelType, TripModel] - MODELS = { - ModelType.GREEDY_SIMILARITY_BINNING: eamug.GreedySimilarityBinning(config) - } + MODELS = { + ModelType.GREEDY_SIMILARITY_BINNING: eamug.GreedySimilarityBinning, + ModelType.RANDOM_FOREST_CLASSIFIER: eamuf.ForestClassifierModel + } model = MODELS.get(self) if model is None: - model_names = list(lambda e: e.name, MODELS.keys()) - models = ",".join(model_names) - raise KeyError(f"ModelType {self.name} not found in factory, please add to build method") - - return model + available_models = ', '.join([ e.name for e in ModelType]) + raise KeyError(f"ModelType {self.name} not found in factory, Available models are {available_models}."\ + "Otherwise please add new model to build method") + return model(config) @classmethod def names(cls): diff --git a/emission/analysis/modelling/trip_model/models.py b/emission/analysis/modelling/trip_model/models.py index e5fc08b46..cc3b58a2e 100644 --- a/emission/analysis/modelling/trip_model/models.py +++ b/emission/analysis/modelling/trip_model/models.py @@ -19,7 +19,7 @@ from emission.analysis.modelling.trip_model.clustering import get_distance_matrix, single_cluster_purity import emission.analysis.modelling.trip_model.data_wrangling as eamtd import emission.storage.decorations.trip_queries as esdtq -from emission.analysis.classification.inference.labels.inferrers import predict_cluster_confidence_discounting +import emission.analysis.classification.inference.labels.inferrers as eacili import emission.core.wrapper.entry as ecwe import emission.analysis.modelling.trip_model.greedy_similarity_binning as eamtg import emission.core.common as ecc @@ -738,7 +738,7 @@ def predict_proba(self, test_df): replaced_distribs = [] for trip in test_trips: - trip_prediction = predict_cluster_confidence_discounting(trip) + trip_prediction = eacili.predict_cluster_confidence_discounting(trip) if len(trip_prediction) == 0: # model could not find cluster for the trip @@ -1514,7 +1514,7 @@ def __init__( self.C = C self.n_estimators = n_estimators self.criterion = criterion - self.max_depth = max_depth + self.max_depth = max_depth if max_depth!= 'null' else None self.min_samples_split = min_samples_split self.min_samples_leaf = min_samples_leaf self.max_features = max_features @@ -1524,36 +1524,42 @@ def __init__( self.use_start_clusters = use_start_clusters self.use_trip_clusters = use_trip_clusters - if self.loc_feature == 'cluster': - # clustering algorithm to generate end clusters - self.end_cluster_model = DBSCANSVMCluster( - loc_type='end', - radius=self.radius, - size_thresh=self.size_thresh, - purity_thresh=self.purity_thresh, - gamma=self.gamma, - C=self.C) - - if self.use_start_clusters or self.use_trip_clusters: - # clustering algorithm to generate start clusters - self.start_cluster_model = DBSCANSVMCluster( - loc_type='start', - radius=self.radius, - size_thresh=self.size_thresh, - purity_thresh=self.purity_thresh, - gamma=self.gamma, - C=self.C) - - if self.use_trip_clusters: - # helper class to generate trip-level clusters - self.trip_grouper = TripGrouper( - start_cluster_col='start_cluster_idx', - end_cluster_col='end_cluster_idx') - - # wrapper class to generate one-hot encodings for cluster indices - self.cluster_enc = OneHotWrapper(sparse=False, - handle_unknown='ignore') + ######### Not Tested ######### + # The below code is used when we cluster the coordinates (loc_cluster parameter = True) + # before passing to Random Forest. Commenting this for now since it is not tested. + ############################### + # if self.loc_feature == 'cluster': + # # clustering algorithm to generate end clusters + # self.end_cluster_model = DBSCANSVMCluster( + # loc_type='end', + # radius=self.radius, + # size_thresh=self.size_thresh, + # purity_thresh=self.purity_thresh, + # gamma=self.gamma, + # C=self.C) + + # if self.use_start_clusters or self.use_trip_clusters: + # # clustering algorithm to generate start clusters + # self.start_cluster_model = DBSCANSVMCluster( + # loc_type='start', + # radius=self.radius, + # size_thresh=self.size_thresh, + # purity_thresh=self.purity_thresh, + # gamma=self.gamma, + # C=self.C) + + # if self.use_trip_clusters: + # # helper class to generate trip-level clusters + # self.trip_grouper = TripGrouper( + # start_cluster_col='start_cluster_idx', + # end_cluster_col='end_cluster_idx') + + # # wrapper class to generate one-hot encodings for cluster indices + # self.cluster_enc = OneHotWrapper(sparse=False, + # handle_unknown='ignore') + ############################################################################# + # wrapper class to generate one-hot encodings for purposes and modes self.purpose_enc = OneHotWrapper(impute_missing=True, sparse=False, diff --git a/emission/analysis/modelling/trip_model/run_model.py b/emission/analysis/modelling/trip_model/run_model.py index 7356aa597..cfee60464 100644 --- a/emission/analysis/modelling/trip_model/run_model.py +++ b/emission/analysis/modelling/trip_model/run_model.py @@ -56,9 +56,7 @@ def update_trip_model( time_query = time_query_from_pipeline if model.is_incremental else None logging.debug(f'model type {model_type.name} is incremental? {model.is_incremental}') logging.debug(f'time query for training data collection: {time_query}') - trips = _get_training_data(user_id, time_query) - # don't start training for a user that doesn't have at least $trips many trips # (assume if a stored model exists for the user, that they met this requirement previously) if len(trips) == 0: @@ -74,7 +72,8 @@ def update_trip_model( epq.mark_trip_model_failed(user_id) else: - # train and store the model + # train and store the model. pass only List of event and only convert + # to dataframe type data whereever required. model.fit(trips) model_data_next = model.to_dict() diff --git a/emission/analysis/modelling/trip_model/util.py b/emission/analysis/modelling/trip_model/util.py index 7d22b5d22..b3da1d4a1 100644 --- a/emission/analysis/modelling/trip_model/util.py +++ b/emission/analysis/modelling/trip_model/util.py @@ -1,16 +1,14 @@ from typing import List, Tuple from past.utils import old_div -import numpy +import numpy as np +import pandas as pd from numpy.linalg import norm - def find_knee_point(values: List[float]) -> Tuple[float, int]: """for a list of values, find the value which represents the cut-off point or "elbow" in the function when values are sorted. - copied from original similarity algorithm. permalink: [https://github.com/e-mission/e-mission-server/blob/5b9e608154de15e32df4f70a07a5b95477e7dbf5/emission/analysis/modelling/tour_model/similarity.py#L256] - with `y` passed in as `values` based on this stack overflow answer: https://stackoverflow.com/a/2022348/4803266 And summarized by the statement: "A quick way of finding the elbow is to draw a @@ -26,16 +24,47 @@ def find_knee_point(values: List[float]) -> Tuple[float, int]: x = list(range(N)) max = 0 index = -1 - a = numpy.array([x[0], values[0]]) - b = numpy.array([x[-1], values[-1]]) + a = np.array([x[0], values[0]]) + b = np.array([x[-1], values[-1]]) n = norm(b - a) new_y = [] for i in range(0, N): - p = numpy.array([x[i], values[i]]) - dist = old_div(norm(numpy.cross(p - a, p - b)), n) + p = np.array([x[i], values[i]]) + dist = old_div(norm(np.cross(p - a, p - b)), n) new_y.append(dist) if dist > max: max = dist index = i value = values[index] return [index, value] + + def get_distance_matrix(loc_df, loc_type): + """ Args: + loc_df (dataframe): must have columns 'start_lat' and 'start_lon' + or 'end_lat' and 'end_lon' + loc_type (str): 'start' or 'end' + """ + assert loc_type == 'start' or loc_type == 'end' + + radians_lat_lon = np.radians(loc_df[[loc_type + "_lat", loc_type + "_lon"]]) + + dist_matrix_meters = pd.DataFrame( + smp.haversine_distances(radians_lat_lon, radians_lat_lon) * + EARTH_RADIUS) + return dist_matrix_meters + +def single_cluster_purity(points_in_cluster, label_col='purpose_confirm'): + """ Calculates purity of a cluster (i.e. % of trips that have the most + common label) + + Args: + points_in_cluster (df): dataframe containing points in the same + cluster + label_col (str): column in the dataframe containing labels + """ + assert label_col in points_in_cluster.columns + + most_freq_label = points_in_cluster[label_col].mode()[0] + purity = len(points_in_cluster[points_in_cluster[label_col] == + most_freq_label]) / len(points_in_cluster) + return purity diff --git a/emission/tests/modellingTests/TestForestModelIntegration.py b/emission/tests/modellingTests/TestForestModelIntegration.py new file mode 100644 index 000000000..88813b5c4 --- /dev/null +++ b/emission/tests/modellingTests/TestForestModelIntegration.py @@ -0,0 +1,120 @@ +# This tests the label inference pipeline. It uses real data and placeholder inference algorithms +import unittest +import numpy as np +import time +import emission.analysis.classification.inference.labels.pipeline as eacilp +import emission.analysis.classification.inference.labels.inferrers as eacili +import emission.core.wrapper.labelprediction as ecwl +import emission.storage.decorations.analysis_timeseries_queries as esda +import emission.storage.decorations.trip_queries as esdt +import emission.storage.timeseries.timequery as estt +import emission.core.get_database as edb +import emission.tests.common as etc +import emission.pipeline.intake_stage as epi +import logging +from bson.objectid import ObjectId + +import emission.analysis.modelling.trip_model.config as eamtc + +import emission.analysis.modelling.trip_model.run_model as eamur +import emission.analysis.modelling.trip_model.model_type as eamumt +import emission.analysis.modelling.trip_model.model_storage as eamums +import emission.tests.modellingTests.modellingTestAssets as etmm +import emission.storage.timeseries.abstract_timeseries as esta + + +class TestForestModelIntegration(unittest.TestCase): + # Test if the forest model for label prediction is smoothly integrated with the inference pipeline. + # In the initial setup, build a dummy forest model. Then run the pipeline on real example data. + # Finally in the test, assert the type of label predictions expected. + + def setUp(self): + np.random.seed(91) + self.test_algorithms = eacilp.primary_algorithms + forest_model_config = eamtc.get_config_value_or_raise('model_parameters.forest') + + etc.setupRealExample(self, "emission/tests/data/real_examples/shankari_2015-07-22") ##maybe use a different file + ts = esta.TimeSeries.get_time_series(self.testUUID) + label_data = { + "mode_confirm": ['ebike', 'bike'], + "purpose_confirm": ['happy-hour', 'dog-park'], + "replaced_mode": ['walk'], + "mode_weights": [0.9, 0.1], + "purpose_weights": [0.1, 0.9] + } + + self.total_trips=100 + ## generate mock trips + train = etmm.generate_mock_trips( + user_id=self.testUUID, + trips=self.total_trips, + origin=(-105.1705977, 39.7402654), + destination=(-105.1755606, 39.7673075), + trip_part='od', + label_data=label_data, + within_threshold= 33, + threshold=0.004, # ~400m + has_label_p=0.9 + ) + ## Required for Forest model inference + for result_entry in train: + result_entry['data']['start_local_dt']=result_entry['metadata']['write_local_dt'] + result_entry['data']['end_local_dt']=result_entry['metadata']['write_local_dt'] + result_entry['data']['start_place']=ObjectId() + result_entry['data']['end_place']=ObjectId() + ts.bulk_insert(train) + # confirm data write did not fail + check_data = esda.get_entries(key="analysis/confirmed_trip", user_id=self.testUUID, time_query=None) + if len(check_data) != self.total_trips: + logging.debug(f'test invariant failed after generating test data') + self.fail() + else: + logging.debug(f'found {self.total_trips} trips in database') + ## Build an already existing model or a new model + eamur.update_trip_model( + user_id=self.testUUID, + model_type=eamumt.ModelType.RANDOM_FOREST_CLASSIFIER, + model_storage=eamums.ModelStorage.DOCUMENT_DATABASE, + min_trips=4, + model_config=forest_model_config + ) + ## run inference pipeline + self.run_pipeline(self.test_algorithms) + time_range = estt.TimeQuery("metadata.write_ts", None, time.time()) + self.inferred_trips = esda.get_entries(esda.INFERRED_TRIP_KEY, self.testUUID, time_query=time_range) + + def tearDown(self): + self.reset_all() + + def run_pipeline(self, algorithms): + default_primary_algorithms = eacilp.primary_algorithms + eacilp.primary_algorithms = algorithms + epi.run_intake_pipeline_for_user(self.testUUID,skip_if_no_new_data = False) + eacilp.primary_algorithms = default_primary_algorithms + + def reset_all(self): + edb.get_analysis_timeseries_db().delete_many({'user_id': self.testUUID}) + edb.get_model_db().delete_many({'user_id': self.testUUID}) + edb.get_pipeline_state_db().delete_many({'user_id': self.testUUID}) + + + # Tests that forest algorithm being tested runs successfully + def testForestAlgorithm(self): + for trip in self.inferred_trips: + entries = esdt.get_sections_for_trip("inference/labels", self.testUUID, trip.get_id()) + self.assertEqual(len(entries), len(self.test_algorithms)) + for entry in entries: + self.assertGreater(len(entry["data"]["prediction"]), 0) + for singleprediction in entry["data"]["prediction"]: + self.assertIsInstance(singleprediction, dict, " should be an instance of the dictionary class") + self.assertIsInstance(singleprediction['labels'], dict, " should be an instance of the dictionary class") + self.assertIn('mode_confirm',singleprediction['labels'].keys()) + self.assertIn('replaced_mode',singleprediction['labels'].keys()) + self.assertIn('purpose_confirm',singleprediction['labels'].keys()) + +def main(): + etc.configLogging() + unittest.main() + +if __name__ == "__main__": + main() diff --git a/emission/tests/modellingTests/TestForestModelLoadandSave.py b/emission/tests/modellingTests/TestForestModelLoadandSave.py new file mode 100644 index 000000000..079bc908b --- /dev/null +++ b/emission/tests/modellingTests/TestForestModelLoadandSave.py @@ -0,0 +1,315 @@ +from typing import ByteString +import unittest +import logging +from unittest.mock import patch +import emission.analysis.modelling.trip_model.run_model as eamur +import emission.analysis.modelling.trip_model.model_type as eamumt +import emission.analysis.modelling.trip_model.model_storage as eamums +import emission.analysis.modelling.trip_model.config as eamtc +import uuid +import emission.storage.timeseries.abstract_timeseries as esta +import emission.tests.modellingTests.modellingTestAssets as etmm +import emission.storage.decorations.analysis_timeseries_queries as esda +import emission.core.get_database as edb +import emission.analysis.modelling.trip_model.run_model as eamtr + +class TestForestModelLoadandSave(unittest.TestCase): + """ + Tests to make sure the model load and save properly + """ + + def setUp(self): + """ + sets up the end-to-end run model test with Confirmedtrip data + """ + logging.basicConfig(format='%(asctime)s:%(levelname)s:%(message)s', + level=logging.DEBUG) + + # configuration for randomly-generated test data + self.user_id = user_id = 'TestForestModelLoadAndSave-TestData' + self.origin = (-105.1705977, 39.7402654,) + self.destination = (-105.1755606, 39.7673075) + self.min_trips = 14 + self.total_trips = 100 + self.clustered_trips = 33 # must have at least self.min_trips similar trips by default + self.has_label_percent = 0.9 # let's make a few that don't have a label, but invariant + # $clustered_trips * $has_label_percent > self.min_trips + # must be correct or else this test could fail under some random test cases. + + self.unused_user_id = 'asdjfkl;asdfjkl;asd08234ur13fi4jhf2103mkl' + + # Ensuring that no previous test data was left in DB after teardown, + ts = esta.TimeSeries.get_time_series(user_id) + test_data = list(ts.find_entries(["analysis/confirmed_trip"])) + if len(test_data) == 0: + # generate test data for the database + logging.debug(f"inserting mock Confirmedtrips into database") + + # generate labels with a known sample weight that we can rely on in the test + label_data = { + "mode_confirm": ['ebike', 'bike'], + "purpose_confirm": ['happy-hour', 'dog-park'], + "replaced_mode": ['walk'], + "mode_weights": [0.9, 0.1], + "purpose_weights": [0.1, 0.9] + } + + test_data = etmm.generate_mock_trips( + user_id=user_id, + trips=self.total_trips, + origin=self.origin, + destination=self.destination, + trip_part='od', + label_data=label_data, + within_threshold=self.clustered_trips, + threshold=0.004, # ~400m + has_label_p=self.has_label_percent + ) + + for result_entry in test_data: + result_entry['data']['start_local_dt']=result_entry['metadata']['write_local_dt'] + result_entry['data']['end_local_dt']=result_entry['metadata']['write_local_dt'] + + ts.bulk_insert(test_data) + + # confirm data write did not fail + test_data = esda.get_entries(key="analysis/confirmed_trip", user_id=user_id, time_query=None) + if len(test_data) != self.total_trips: + logging.debug(f'test invariant failed after generating test data') + self.fail() + else: + logging.debug(f'found {self.total_trips} trips in database') + + self.forest_model_config= eamtc.get_config_value_or_raise('model_parameters.forest') + + def tearDown(self): + """ + clean up database + """ + edb.get_analysis_timeseries_db().delete_many({'user_id': self.user_id}) + edb.get_model_db().delete_many({'user_id': self.user_id}) + edb.get_pipeline_state_db().delete_many({'user_id': self.user_id}) + + def testForestModelRoundTrip(self): + """ + RoundTripTest : Serialising an object with 'to_dict' and then immediately + deserialize it with 'from_dict'. After deserialization, the object should have + the same state as original + """ + +# logging.debug(f'creating Random Forest model based on trips in database') + eamur.update_trip_model( + user_id=self.user_id, + model_type=eamumt.ModelType.RANDOM_FOREST_CLASSIFIER, + model_storage=eamums.ModelStorage.DOCUMENT_DATABASE, + min_trips=self.min_trips, + model_config=self.forest_model_config + ) + + model = eamur._load_stored_trip_model( + user_id=self.user_id, + model_type=eamumt.ModelType.RANDOM_FOREST_CLASSIFIER, + model_storage=eamums.ModelStorage.DOCUMENT_DATABASE, + model_config=self.forest_model_config + ) + +# logging.debug(f'Loading test data') + test = esda.get_entries(key="analysis/confirmed_trip", user_id=self.user_id, time_query=None) + +# logging.debug(f'Predictions on trips in database') + + predictions_list = eamur.predict_labels_with_n( + trip_list = test, + model=model + ) + + # logging.debug(f'Serialising the model ') + + model_data=model.to_dict() + +# logging.debug(f'Deserialising the model') + + + deserialized_model_type=eamumt.ModelType.RANDOM_FOREST_CLASSIFIER + deserialized_model = deserialized_model_type.build(self.forest_model_config) + deserialized_model.from_dict(model_data) + +# logging.debug(f'Predictions on trips using deserialised model') + predictions_loaded_model_list = eamur.predict_labels_with_n( + trip_list = test, + model=deserialized_model + ) +# logging.debug(f'Assert that both predictions are the same') + self.assertEqual(predictions_list, predictions_loaded_model_list, " should be equal") + + def testForestModelConsistency(self): + """ + ConsistencyTest : To Verify that the serialization and deserialization process + is consistent across multiple executions + """ + # logging.debug(f'creating a model based on trips in database') + eamur.update_trip_model( + user_id=self.user_id, + model_type=eamumt.ModelType.RANDOM_FOREST_CLASSIFIER, + model_storage=eamums.ModelStorage.DOCUMENT_DATABASE, + min_trips=self.min_trips, + model_config=self.forest_model_config + ) + + model_iter1 = eamur._load_stored_trip_model( + user_id=self.user_id, + model_type=eamumt.ModelType.RANDOM_FOREST_CLASSIFIER, + model_storage=eamums.ModelStorage.DOCUMENT_DATABASE, + model_config=self.forest_model_config + ) + + # logging.debug(f'Load Test data') + test = esda.get_entries(key="analysis/confirmed_trip", user_id=self.user_id, time_query=None) + + # logging.debug(f' Model Predictions on trips in database') + + predictions_list_model1 = eamur.predict_labels_with_n( + trip_list = test, + model=model_iter1 + ) + # logging.debug(f' Loading Model again') + + model_iter2 = eamur._load_stored_trip_model( + user_id=self.user_id, + model_type=eamumt.ModelType.RANDOM_FOREST_CLASSIFIER, + model_storage=eamums.ModelStorage.DOCUMENT_DATABASE, + model_config=self.forest_model_config + ) + # logging.debug(f' Model Predictions on trips in database') + predictions_list_model2 = eamur.predict_labels_with_n( + trip_list = test, + model=model_iter2 + ) + + self.assertEqual(predictions_list_model1, predictions_list_model2, " should be equal") + + + + def testSerializationErrorHandling(self): + """ + SerialisationErrorHandling : To verify that any errors during + serialising an object with 'to_dict' are handled. + """ + # defining a side effect function to simulate a serialization error + def mock_dump(*args,**kwargs): + raise Exception("Serialization Error") + + logging.debug(f'(TRAIN) creating a model based on trips in database') + eamur.update_trip_model( + user_id=self.user_id, + model_type=eamumt.ModelType.RANDOM_FOREST_CLASSIFIER, + model_storage=eamums.ModelStorage.DOCUMENT_DATABASE, + min_trips=self.min_trips, + model_config=self.forest_model_config + ) + + model = eamur._load_stored_trip_model( + user_id=self.user_id, + model_type=eamumt.ModelType.RANDOM_FOREST_CLASSIFIER, + model_storage=eamums.ModelStorage.DOCUMENT_DATABASE, + model_config=self.forest_model_config + ) + # patch is used to temporarily replace joblib.dump with a + # mock function that raises an exception + # + # side_effect, which is set to mock_dump, is called instead of + # real joblib.dump function when 'to_dict' is invoked + + with patch('joblib.dump',side_effect=mock_dump): + with self.assertRaises(RuntimeError): + model.to_dict() + + + def testDeserializationErrorHandling(self): + """ + deserialisationErrorHandling : To verify that any errors during + deserialising an object with 'from_dict' are handled. + """ + # defining a side effect function to simulate a deserialization error + def mock_load(*args,**kwargs): + raise Exception("Deserialization Error") + + logging.debug(f'(TRAIN) creating a model based on trips in database') + eamur.update_trip_model( + user_id=self.user_id, + model_type=eamumt.ModelType.RANDOM_FOREST_CLASSIFIER, + model_storage=eamums.ModelStorage.DOCUMENT_DATABASE, + min_trips=self.min_trips, + model_config=self.forest_model_config + ) + + model = eamur._load_stored_trip_model( + user_id=self.user_id, + model_type=eamumt.ModelType.RANDOM_FOREST_CLASSIFIER, + model_storage=eamums.ModelStorage.DOCUMENT_DATABASE, + model_config=self.forest_model_config + ) + + model_data=model.to_dict() + + deserialized_model_type=eamumt.ModelType.RANDOM_FOREST_CLASSIFIER + deserialized_model = deserialized_model_type.build(self.forest_model_config) + # patch is used to temporarily replace joblib.load with a + # mock function that raises an exception + # + # side_effect, which is set to mock_load, is called instead of + # real joblib.load function when 'to_dict' is invoked + + with patch('joblib.load',side_effect=mock_load): + with self.assertRaises(RuntimeError): + deserialized_model.from_dict(model_data) + + + def testRandomForestTypePreservation(self): + """ + TypePreservationTest: To ensure that the serialization and deserialization + process maintains the data types of all model attributes. + The model is trained, preditions stored, serialised and then desserialized. + The type of deserialised model attributes and the predictions of this must match + those of initial model. + """ + ct_entry=eamtr._get_training_data(self.user_id,None) + split= int(len(ct_entry)*0.8) + trips=ct_entry[:split] + test_trips=ct_entry[split:] + + ## Build and train model + model_type= eamumt.ModelType.RANDOM_FOREST_CLASSIFIER + model = model_type.build(self.forest_model_config) + model.fit(trips) + + ## Get pre serialization predictions + predictions_list = eamur.predict_labels_with_n( + trip_list = test_trips, + model=model + ) + + ## Serialise + serialised_model_data=model.to_dict() + + ## build and deserialise a different model + deserialised_model = model_type.build(self.forest_model_config) + deserialised_model.from_dict(serialised_model_data) + + ## test if the types are correct + for attr in ['purpose_predictor','mode_predictor','replaced_predictor','purpose_enc','mode_enc','train_df']: + deSerialised_attr_value=getattr(deserialised_model.model,attr) + original_attr_value=getattr(model.model,attr) + #Check type preservation + self.assertIsInstance(deSerialised_attr_value,type(original_attr_value), f"Type mismatch for {attr} ") + #Check for value equality. This assumes that the attributes are either direc + + ## test if the predictions are correct + deserialised_predictions_list = eamur.predict_labels_with_n( + trip_list = test_trips, + model=deserialised_model + ) + logging.debug(f'TESTIN:{deserialised_predictions_list}') + logging.debug(f'{predictions_list}') + self.assertEqual(deserialised_predictions_list,predictions_list,'predictions list not same.') + diff --git a/emission/tests/modellingTests/TestRunForestModel.py b/emission/tests/modellingTests/TestRunForestModel.py new file mode 100644 index 000000000..8c6cd1650 --- /dev/null +++ b/emission/tests/modellingTests/TestRunForestModel.py @@ -0,0 +1,214 @@ +import unittest +import logging + +import emission.analysis.modelling.trip_model.run_model as eamur +import emission.analysis.modelling.trip_model.model_type as eamumt +import emission.analysis.modelling.trip_model.model_storage as eamums +import emission.analysis.modelling.trip_model.models as eamtm +import emission.storage.timeseries.abstract_timeseries as esta +import emission.tests.modellingTests.modellingTestAssets as etmm +import emission.storage.decorations.analysis_timeseries_queries as esda +import emission.core.get_database as edb +import emission.storage.pipeline_queries as epq +import emission.core.wrapper.pipelinestate as ecwp +import emission.analysis.modelling.trip_model.forest_classifier as eamtf +from sklearn.ensemble import RandomForestClassifier + +class TestRunForestModel(unittest.TestCase): + """ + Tests to ensure Pipeline builds and runs with zero + or more trips + """ + + def setUp(self): + """ + sets up the end-to-end run model test with Confirmedtrip data + """ + logging.basicConfig(format='%(asctime)s:%(levelname)s:%(message)s', + level=logging.DEBUG) + + # configuration for randomly-generated test data + self.user_id = user_id = 'TestRunForestModel-TestData' + self.origin = (-105.1705977, 39.7402654,) + self.destination = (-105.1755606, 39.7673075) + self.min_trips = 14 + self.total_trips = 100 + self.clustered_trips = 33 # must have at least self.min_trips similar trips by default + self.has_label_percent = 0.9 # let's make a few that don't have a label, but invariant + # $clustered_trips * $has_label_percent > self.min_trips + # must be correct or else this test could fail under some random test cases. + + # for a negative test, below + self.unused_user_id = 'asdjfkl;asdfjkl;asd08234ur13fi4jhf2103mkl' + + # test data can be saved between test invocations, check if data exists before generating + ts = esta.TimeSeries.get_time_series(user_id) + test_data = list(ts.find_entries(["analysis/confirmed_trip"])) + if len(test_data) == 0: + # generate test data for the database + logging.debug(f"inserting mock Confirmedtrips into database") + + # generate labels with a known sample weight that we can rely on in the test + label_data = { + "mode_confirm": ['ebike', 'bike'], + "purpose_confirm": ['happy-hour', 'dog-park'], + "replaced_mode": ['walk'], + "mode_weights": [0.9, 0.1], + "purpose_weights": [0.1, 0.9] + } + + train = etmm.generate_mock_trips( + user_id=user_id, + trips=self.total_trips, + origin=self.origin, + destination=self.destination, + trip_part='od', + label_data=label_data, + within_threshold=self.clustered_trips, + threshold=0.004, # ~400m + has_label_p=self.has_label_percent + ) + #values required by forest model + for entry in train: + entry['data']['start_local_dt']=entry['metadata']['write_local_dt'] + entry['data']['end_local_dt']=entry['metadata']['write_local_dt'] + + ts.bulk_insert(train) + + # confirm data write did not fail + test_data = esda.get_entries(key="analysis/confirmed_trip", user_id=user_id, time_query=None) + if len(test_data) != self.total_trips: + logging.debug(f'test invariant failed after generating test data') + self.fail() + else: + logging.debug(f'found {self.total_trips} trips in database') + + def tearDown(self): + """ + clean up database + """ + edb.get_analysis_timeseries_db().delete_many({'user_id': self.user_id}) + edb.get_model_db().delete_many({'user_id': self.user_id}) + edb.get_pipeline_state_db().delete_many({'user_id': self.user_id}) + + def testBuildForestModelFromConfig(self): + """ + forest model takes config arguments via the constructor for testing + purposes but will load from a file in /conf/analysis/ which is tested here + """ + + built_model = eamumt.ModelType.RANDOM_FOREST_CLASSIFIER.build() + attributes={'purpose_predictor': RandomForestClassifier , + 'mode_predictor' :RandomForestClassifier, + 'replaced_predictor':RandomForestClassifier, + 'purpose_enc' : eamtm.OneHotWrapper, + 'mode_enc':eamtm.OneHotWrapper + } + self.assertIsInstance(built_model,eamtf.ForestClassifierModel) + for attr in attributes: + #logging.debug(f'{attr,attributes[attr]}') + x=getattr(built_model.model,attr) + self.assertIsInstance(x, attributes[attr]) + # success if it didn't throw + + def testTrainForestModelWithZeroTrips(self): + """ + forest model takes config arguments via the constructor for testing + purposes but will load from a file in /conf/analysis/ which is tested here + """ + + # pass along debug model configuration + forest_model_config= { + "loc_feature" : "coordinates", + "radius": 500, + "size_thresh":1, + "purity_thresh":1.0, + "gamma":0.05, + "C":1, + "n_estimators":100, + "criterion":"gini", + "max_depth":'null', + "min_samples_split":2, + "min_samples_leaf":1, + "max_features":"sqrt", + "bootstrap":True, + "random_state":42, + "use_start_clusters":False, + "use_trip_clusters":True + } + + logging.debug(f'~~~~ do nothing ~~~~') + eamur.update_trip_model( + user_id=self.unused_user_id, + model_type=eamumt.ModelType.RANDOM_FOREST_CLASSIFIER, + model_storage=eamums.ModelStorage.DOCUMENT_DATABASE, + min_trips=self.min_trips, + model_config=forest_model_config + ) + + # user had no entries so their pipeline state should not have been set + # if it was set, the time query here would + stage = ecwp.PipelineStages.TRIP_MODEL + pipeline_state = epq.get_current_state(self.unused_user_id, stage) + self.assertIsNone( + pipeline_state['curr_run_ts'], + "pipeline should not have a current timestamp for the test user") + + + def test1RoundPredictForestModel(self): + """ + forest model takes config arguments via the constructor for testing + purposes but will load from a file in /conf/analysis/ which is tested here + """ + + forest_model_config= { + "loc_feature" : "coordinates", + "radius": 500, + "size_thresh":1, + "purity_thresh":1.0, + "gamma":0.05, + "C":1, + "n_estimators":100, + "criterion":"gini", + "max_depth":'null', + "min_samples_split":2, + "min_samples_leaf":1, + "max_features":"sqrt", + "bootstrap":True, + "random_state":42, + "use_start_clusters":False, + "use_trip_clusters":True + } + + logging.debug(f'(TRAIN) creating a model based on trips in database') + eamur.update_trip_model( + user_id=self.user_id, + model_type=eamumt.ModelType.RANDOM_FOREST_CLASSIFIER, + model_storage=eamums.ModelStorage.DOCUMENT_DATABASE, + min_trips=self.min_trips, + model_config=forest_model_config + ) + + logging.debug(f'(TEST) testing prediction of stored model') + test = esda.get_entries(key="analysis/confirmed_trip", user_id=self.user_id, time_query=None) + model = eamur._load_stored_trip_model( + user_id=self.user_id, + model_type=eamumt.ModelType.RANDOM_FOREST_CLASSIFIER, + model_storage=eamums.ModelStorage.DOCUMENT_DATABASE, + model_config=forest_model_config + ) + + predictions_list = eamur.predict_labels_with_n( + trip_list = test, + model=model + ) + for prediction, n in predictions_list: + [logging.debug(p) for p in sorted(prediction, key=lambda r: r['p'], reverse=True)] + self.assertNotEqual(len(prediction), 0, "should have a prediction") + self.assertIn('labels',prediction[0].keys()) + self.assertIn('p',prediction[0].keys()) + self.assertIsInstance(prediction[0], dict, " should be an instance of the dictionary class") + self.assertIsInstance(prediction[0]['labels'], dict, " should be an instance of the dictionary class") + self.assertIn('mode_confirm',prediction[0]['labels'].keys()) + self.assertIn('replaced_mode',prediction[0]['labels'].keys()) + self.assertIn('purpose_confirm',prediction[0]['labels'].keys()) \ No newline at end of file diff --git a/emission/tests/modellingTests/modellingTestAssets.py b/emission/tests/modellingTests/modellingTestAssets.py index 252b2ad34..2bb1a958e 100644 --- a/emission/tests/modellingTests/modellingTestAssets.py +++ b/emission/tests/modellingTests/modellingTestAssets.py @@ -166,7 +166,10 @@ def build_mock_trip( "type": "Point", "coordinates": destination }, - "user_input": labels + #necessary valued for random forest model + "user_input": labels, + "duration": end_ts-start_ts, + "distance": ecc.calDistance(origin,destination) } return ecwe.Entry.create_fake_entry(user_id, key, data, write_ts=time.time())