From 1b523edf7ebd18ec9c8b62cb333e13ff4045c31b Mon Sep 17 00:00:00 2001 From: $aTyam Date: Fri, 15 Mar 2024 13:42:09 -0400 Subject: [PATCH] [Tested] Improvements for model integration 1. Improved tests in `TestForestModelLoadandSave.py` 2. Better comments, imports nd cleanup --- .../modelling/trip_model/dbscan_svm.py | 250 ------------------ .../modelling/trip_model/forest_classifier.py | 91 ++++--- .../analysis/modelling/trip_model/models.py | 66 ++--- .../modelling/trip_model/run_model.py | 1 - .../analysis/modelling/trip_model/util.py | 2 - .../TestForestModelIntegration.py | 44 +-- .../TestForestModelLoadandSave.py | 96 +++---- .../modellingTests/TestRunForestModel.py | 24 +- 8 files changed, 163 insertions(+), 411 deletions(-) delete mode 100644 emission/analysis/modelling/trip_model/dbscan_svm.py diff --git a/emission/analysis/modelling/trip_model/dbscan_svm.py b/emission/analysis/modelling/trip_model/dbscan_svm.py deleted file mode 100644 index 58cd8f7e0..000000000 --- a/emission/analysis/modelling/trip_model/dbscan_svm.py +++ /dev/null @@ -1,250 +0,0 @@ -import emission.analysis.modelling.trip_model.trip_model as eamuu -from sklearn.cluster import DBSCAN -import logging -import numpy as np -import pandas as pd -import emission.analysis.modelling.trip_model.util as eamtu -from sklearn.preprocessing import StandardScaler -from sklearn.pipeline import make_pipeline -from sklearn import svm -from sklearn.metrics.pairwise import haversine_distances - -EARTH_RADIUS = 6371000 - -class DBSCANSVMCluster(eamuu.TripModel): - """ DBSCAN-based clustering algorithm that optionally implements SVM - sub-clustering. - - Args: - loc_type (str): 'start' or 'end', the type of point to cluster - radius (int): max distance between two points in each other's - neighborhood, i.e. DBSCAN's eps value. does not strictly - dictate final cluster size - size_thresh (int): the min number of trips a cluster must have - to be considered for SVM sub-division - purity_thresh (float): the min purity a cluster must have - to be sub-divided using SVM - gamma (float): coefficient for the rbf kernel in SVM - C (float): regularization hyperparameter for SVM - - Attributes: - loc_type (str) - radius (int) - size_thresh (int) - purity_thresh (float) - gamma (float) - C (float) - train_df (DataFrame) - test_df (DataFrame) - base_model (sklearn Estimator) - """ - - def __init__(self, - loc_type='end', - radius=100, - svm=True, - size_thresh=1, - purity_thresh=1.0, - gamma=0.05, - C=1): - logging.info("PERF: Initializing DBSCANSVMCluster") - self.loc_type = loc_type - self.radius = radius - self.svm = svm - self.size_thresh = size_thresh - self.purity_thresh = purity_thresh - self.gamma = gamma - self.C = C - - def set_params(self, params): - if 'loc_type' in params.keys(): self.loc_type = params['loc_type'] - if 'radius' in params.keys(): self.radius = params['radius'] - if 'svm' in params.keys(): self.svm = params['svm'] - if 'size_thresh' in params.keys(): - self.size_thresh = params['size_thresh'] - if 'purity_thresh' in params.keys(): - self.purity_thresh = params['purity_thresh'] - if 'gamma' in params.keys(): self.gamma = params['gamma'] - - return self - - def fit(self, train_df,ct_entry=None): - """ Creates clusters of trip points. - self.train_df will be updated with columns containing base and - final clusters. - - TODO: perhaps move the loc_type argument to fit() so we can use a - single class instance to cluster both start and end points. This - will also help us reduce duplicate data. - - Args: - train_df (dataframe): dataframe of labeled trips - ct_entry (List) : A list of Entry type of labeled and unlabeled trips - """ - ################## - ### clean data ### - ################## - logging.info("PERF: Fitting DBSCANSVMCluster") - self.train_df = self._clean_data(train_df) - - # we can use all trips as long as they have purpose labels. it's ok if - # they're missing mode/replaced-mode labels, because they aren't as - # strongly correlated with location compared to purpose - # TODO: actually, we may want to rethink this. for example, it will - # probably be helpful to include trips that are missing purpose labels - # but still have mode labels. - if self.train_df.purpose_true.isna().any(): - num_nan = self.train_df.purpose_true.value_counts( - dropna=False).loc[np.nan] - logging.info( - f'dropping {num_nan}/{len(self.train_df)} trips that are missing purpose labels' - ) - self.train_df = self.train_df.dropna( - subset=['purpose_true']).reset_index(drop=True) - if len(self.train_df) == 0: - # i.e. no valid trips after removing all nans - raise Exception('no valid trips; nothing to fit') - - ######################### - ### get base clusters ### - ######################### - dist_matrix_meters = eamtu.get_distance_matrix(self.train_df, self.loc_type) - self.base_model = DBSCAN(self.radius, - metric="precomputed", - min_samples=1).fit(dist_matrix_meters) - base_clusters = self.base_model.labels_ - - self.train_df.loc[:, - f'{self.loc_type}_base_cluster_idx'] = base_clusters - - ######################## - ### get sub-clusters ### - ######################## - # copy base cluster column into final cluster column - self.train_df.loc[:, f'{self.loc_type}_cluster_idx'] = self.train_df[ - f'{self.loc_type}_base_cluster_idx'] - - if self.svm: - c = 0 # count of how many clusters we have iterated over - - # iterate over all clusters and subdivide them with SVM. the while - # loop is so we can do multiple iterations of subdividing if needed - while c < self.train_df[f'{self.loc_type}_cluster_idx'].max(): - points_in_cluster = self.train_df[ - self.train_df[f'{self.loc_type}_cluster_idx'] == c] - - # only do SVM if we have the minimum num of trips in the cluster - if len(points_in_cluster) < self.size_thresh: - c += 1 - continue - - # only do SVM if purity is below threshold - purity = eamtu.single_cluster_purity(points_in_cluster, - label_col='purpose_true') - if purity < self.purity_thresh: - X = points_in_cluster[[ - f"{self.loc_type}_lon", f"{self.loc_type}_lat" - ]] - y = points_in_cluster.purpose_true.to_list() - - svm_model = make_pipeline( - StandardScaler(), - svm.SVC( - kernel='rbf', - gamma=self.gamma, - C=self.C, - )).fit(X, y) - labels = svm_model.predict(X) - unique_labels = np.unique(labels) - - # if the SVM predicts that all points in the cluster have - # the same label, just ignore it and don't reindex. - # this also helps us to handle the possibility that a - # cluster may be impure but inherently inseparable, e.g. an - # end cluster at a user's home, containing 50% trips from - # work to home and 50% round trips that start and end at - # home. we don't want to reindex otherwise the low purity - # will trigger SVM again, and we will attempt & fail to - # split the cluster ad infinitum - if len(unique_labels) > 1: - # map purpose labels to new cluster indices - # we offset indices by the max existing index so that we - # don't run into any duplicate indices - max_existing_idx = self.train_df[ - f'{self.loc_type}_cluster_idx'].max() - label_to_cluster = { - unique_labels[i]: i + max_existing_idx + 1 - for i in range(len(unique_labels)) - } - # update trips with their new cluster indices - indices = np.array( - [label_to_cluster[l] for l in labels]) - self.train_df.loc[ - self.train_df[f'{self.loc_type}_cluster_idx'] == c, - f'{self.loc_type}_cluster_idx'] = indices - - c += 1 - # TODO: make things categorical at the end? or maybe at the start of the decision tree pipeline - - return self - - def fit_predict(self, train_df): - """ Override to avoid unnecessarily computation of distance matrices. - """ - self.fit(train_df) - return self.train_df[[f'{self.loc_type}_cluster_idx']] - - def predict(self, test_df): - logging.info("PERF: Predicting DBSCANSVMCluster") - # TODO: store clusters as polygons so the prediction is faster - # TODO: we probably don't want to store test_df in self to be more memory-efficient - self.test_df = self._clean_data(test_df) - pred_clusters = self._NN_predict(self.test_df) - - self.test_df.loc[:, f'{self.loc_type}_cluster_idx'] = pred_clusters - - return self.test_df[[f'{self.loc_type}_cluster_idx']] - - def _NN_predict(self, test_df): - """ Generate base-cluster predictions for the test data using a - nearest-neighbor approach. - - sklearn doesn't implement predict() for DBSCAN, which is why we - need a custom method. - """ - logging.info("PERF: NN_predicting DBSCANSVMCluster") - n_samples = test_df.shape[0] - labels = np.ones(shape=n_samples, dtype=int) * -1 - - # get coordinates of core points (we can't use model.components_ - # because our input feature was a distance matrix and doesn't contain - # info about the raw coordinates) - # NOTE: technically, every single point in a cluster is a core point - # because it has at least minPts (2) points, including itself, in its - # radius - train_coordinates = self.train_df[[ - f'{self.loc_type}_lat', f'{self.loc_type}_lon' - ]] - train_radians = np.radians(train_coordinates) - - for idx, row in test_df.reset_index(drop=True).iterrows(): - # calculate the distances between the ith test data and all points, - # then find the index of the closest point. if the ith test data is - # within epsilon of the point, then assign its cluster to the ith - # test data (otherwise, leave it as -1, indicating noise). - # unfortunately, pairwise_distances_argmin() does not support - # haversine distance, so we have to reimplement it ourselves - new_loc_radians = np.radians( - row[[self.loc_type + "_lat", self.loc_type + "_lon"]].to_list()) - new_loc_radians = np.reshape(new_loc_radians, (1, 2)) - dist_matrix_meters = haversine_distances( - new_loc_radians, train_radians) * EARTH_RADIUS - - shortest_dist_idx = np.argmin(dist_matrix_meters) - if dist_matrix_meters[0, shortest_dist_idx] < self.radius: - labels[idx] = self.train_df.reset_index( - drop=True).loc[shortest_dist_idx, - f'{self.loc_type}_cluster_idx'] - - return labels - diff --git a/emission/analysis/modelling/trip_model/forest_classifier.py b/emission/analysis/modelling/trip_model/forest_classifier.py index 8e066775d..16eee014f 100644 --- a/emission/analysis/modelling/trip_model/forest_classifier.py +++ b/emission/analysis/modelling/trip_model/forest_classifier.py @@ -1,17 +1,16 @@ -import pandas as pd -from sklearn.preprocessing import OneHotEncoder import joblib from typing import Dict, List, Optional, Tuple -from sklearn.metrics.pairwise import haversine_distances +import sklearn.metrics.pairwise as smp import emission.core.wrapper.confirmedtrip as ecwc import logging from io import BytesIO +import json import emission.analysis.modelling.trip_model.trip_model as eamuu import emission.analysis.modelling.trip_model.config as eamtc import emission.storage.timeseries.builtin_timeseries as estb import emission.storage.decorations.trip_queries as esdtq -from emission.analysis.modelling.trip_model.models import ForestClassifier +import emission.analysis.modelling.trip_model.models as eamtm EARTH_RADIUS = 6371000 @@ -33,45 +32,33 @@ def __init__(self,config=None): 'min_samples_leaf', 'max_features', 'bootstrap', - ] - cluster_expected_keys= [ - 'radius', - 'size_thresh', - 'purity_thresh', - 'gamma', - 'C', - 'use_start_clusters', - 'use_trip_clusters', - ] - + ] + ######### Not Tested ######### + # The below code is used when we cluster the coordinates (loc_cluster parameter = True) + # before passing to Random Forest. Commenting this for now since it is not used. Not tested either. + ############################### + + # cluster_expected_keys= [ + # 'radius', + # 'size_thresh', + # 'purity_thresh', + # 'gamma', + # 'C', + # 'use_start_clusters', + # 'use_trip_clusters', + # ] + # + # if config['loc_feature'] == 'cluster': + # for k in cluster_expected_keys: + # if config.get(k) is None: + # msg = f"cluster trip model config missing expected key {k}" + # raise KeyError(msg) + ####################################### for k in random_forest_expected_keys: if config.get(k) is None: msg = f"forest trip model config missing expected key {k}" raise KeyError(msg) - - if config['loc_feature'] == 'cluster': - for k in cluster_expected_keys: - if config.get(k) is None: - msg = f"cluster trip model config missing expected key {k}" - raise KeyError(msg) - maxdepth =config['max_depth'] if config['max_depth']!='null' else None - self.model=ForestClassifier( loc_feature=config['loc_feature'], - radius= config['radius'], - size_thresh=config['radius'], - purity_thresh=config['purity_thresh'], - gamma=config['gamma'], - C=config['C'], - n_estimators=config['n_estimators'], - criterion=config['criterion'], - max_depth=maxdepth, - min_samples_split=config['min_samples_split'], - min_samples_leaf=config['min_samples_leaf'], - max_features=config['max_features'], - bootstrap=config['bootstrap'], - random_state=config['random_state'], - # drop_unclustered=False, - use_start_clusters=config['use_start_clusters'], - use_trip_clusters=config['use_trip_clusters']) + self.model=eamtm.ForestClassifier(**config) def fit(self,trips: List[ecwc.Confirmedtrip]): @@ -139,9 +126,15 @@ def to_dict(self): """ data={} attr=[ 'purpose_predictor','mode_predictor','replaced_predictor','purpose_enc','mode_enc','train_df'] - if self.model.loc_feature == 'cluster': - ## confirm this includes all the extra encoders/models - attr.extend([ 'cluster_enc','end_cluster_model','start_cluster_model','trip_grouper']) + + ######### Not Tested ######### + # The below code is used when we cluster the coordinates (loc_cluster parameter = True) + # before passing to Random Forest. Commenting this for now since it is not used. Not tested either. + ############################### + # if self.model.loc_feature == 'cluster': + # ## confirm this includes all the extra encoders/models + # attr.extend([ 'cluster_enc','end_cluster_model','start_cluster_model','trip_grouper']) + for attribute_name in attr: if not hasattr(self.model,attribute_name): raise ValueError(f"Attribute {attribute_name} not found in the model") @@ -153,7 +146,7 @@ def to_dict(self): raise RuntimeError(f"Error serializing { attribute_name}: {str(e)}") buffer.seek(0) data[attribute_name]=buffer.getvalue() - + return data def from_dict(self,model: Dict): @@ -161,9 +154,14 @@ def from_dict(self,model: Dict): Load the model from a dictionary. """ attr=[ 'purpose_predictor','mode_predictor','replaced_predictor','purpose_enc','mode_enc','train_df'] - if self.model.loc_feature == 'cluster': - ## TODO : confirm this includes all the extra encoders/models - attr.extend([ 'cluster_enc','end_cluster_model','start_cluster_model','trip_grouper']) + + ######### Not Tested ######### + # The below code is used when we cluster the coordinates (loc_cluster parameter = True) + # before passing to Random Forest. Commenting this for now since it is not used. Not tested either. + ############################### + # if self.model.loc_feature == 'cluster': + # ## TODO : confirm this includes all the extra encoders/models + # attr.extend([ 'cluster_enc','end_cluster_model','start_cluster_model','trip_grouper']) for attribute_name in attr: if attribute_name not in model: raise ValueError(f"Attribute {attribute_name} missing in the model") @@ -183,6 +181,7 @@ def extract_features(self, trip: ecwc.Confirmedtrip) -> List[float]: :return: a vector containing features to predict from :rtype: List[float] """ + # ForestClassifier class in models.py file handles features extraction. pass def is_incremental(self) -> bool: diff --git a/emission/analysis/modelling/trip_model/models.py b/emission/analysis/modelling/trip_model/models.py index 1cb6de655..cc3b58a2e 100644 --- a/emission/analysis/modelling/trip_model/models.py +++ b/emission/analysis/modelling/trip_model/models.py @@ -1514,7 +1514,7 @@ def __init__( self.C = C self.n_estimators = n_estimators self.criterion = criterion - self.max_depth = max_depth + self.max_depth = max_depth if max_depth!= 'null' else None self.min_samples_split = min_samples_split self.min_samples_leaf = min_samples_leaf self.max_features = max_features @@ -1524,36 +1524,42 @@ def __init__( self.use_start_clusters = use_start_clusters self.use_trip_clusters = use_trip_clusters - if self.loc_feature == 'cluster': - # clustering algorithm to generate end clusters - self.end_cluster_model = DBSCANSVMCluster( - loc_type='end', - radius=self.radius, - size_thresh=self.size_thresh, - purity_thresh=self.purity_thresh, - gamma=self.gamma, - C=self.C) - - if self.use_start_clusters or self.use_trip_clusters: - # clustering algorithm to generate start clusters - self.start_cluster_model = DBSCANSVMCluster( - loc_type='start', - radius=self.radius, - size_thresh=self.size_thresh, - purity_thresh=self.purity_thresh, - gamma=self.gamma, - C=self.C) - - if self.use_trip_clusters: - # helper class to generate trip-level clusters - self.trip_grouper = TripGrouper( - start_cluster_col='start_cluster_idx', - end_cluster_col='end_cluster_idx') - - # wrapper class to generate one-hot encodings for cluster indices - self.cluster_enc = OneHotWrapper(sparse=False, - handle_unknown='ignore') + ######### Not Tested ######### + # The below code is used when we cluster the coordinates (loc_cluster parameter = True) + # before passing to Random Forest. Commenting this for now since it is not tested. + ############################### + # if self.loc_feature == 'cluster': + # # clustering algorithm to generate end clusters + # self.end_cluster_model = DBSCANSVMCluster( + # loc_type='end', + # radius=self.radius, + # size_thresh=self.size_thresh, + # purity_thresh=self.purity_thresh, + # gamma=self.gamma, + # C=self.C) + + # if self.use_start_clusters or self.use_trip_clusters: + # # clustering algorithm to generate start clusters + # self.start_cluster_model = DBSCANSVMCluster( + # loc_type='start', + # radius=self.radius, + # size_thresh=self.size_thresh, + # purity_thresh=self.purity_thresh, + # gamma=self.gamma, + # C=self.C) + + # if self.use_trip_clusters: + # # helper class to generate trip-level clusters + # self.trip_grouper = TripGrouper( + # start_cluster_col='start_cluster_idx', + # end_cluster_col='end_cluster_idx') + + # # wrapper class to generate one-hot encodings for cluster indices + # self.cluster_enc = OneHotWrapper(sparse=False, + # handle_unknown='ignore') + ############################################################################# + # wrapper class to generate one-hot encodings for purposes and modes self.purpose_enc = OneHotWrapper(impute_missing=True, sparse=False, diff --git a/emission/analysis/modelling/trip_model/run_model.py b/emission/analysis/modelling/trip_model/run_model.py index f27457c60..cfee60464 100644 --- a/emission/analysis/modelling/trip_model/run_model.py +++ b/emission/analysis/modelling/trip_model/run_model.py @@ -56,7 +56,6 @@ def update_trip_model( time_query = time_query_from_pipeline if model.is_incremental else None logging.debug(f'model type {model_type.name} is incremental? {model.is_incremental}') logging.debug(f'time query for training data collection: {time_query}') - trips = _get_training_data(user_id, time_query) # don't start training for a user that doesn't have at least $trips many trips # (assume if a stored model exists for the user, that they met this requirement previously) diff --git a/emission/analysis/modelling/trip_model/util.py b/emission/analysis/modelling/trip_model/util.py index b3a9a012c..b3da1d4a1 100644 --- a/emission/analysis/modelling/trip_model/util.py +++ b/emission/analysis/modelling/trip_model/util.py @@ -7,10 +7,8 @@ def find_knee_point(values: List[float]) -> Tuple[float, int]: """for a list of values, find the value which represents the cut-off point or "elbow" in the function when values are sorted. - copied from original similarity algorithm. permalink: [https://github.com/e-mission/e-mission-server/blob/5b9e608154de15e32df4f70a07a5b95477e7dbf5/emission/analysis/modelling/tour_model/similarity.py#L256] - with `y` passed in as `values` based on this stack overflow answer: https://stackoverflow.com/a/2022348/4803266 And summarized by the statement: "A quick way of finding the elbow is to draw a diff --git a/emission/tests/modellingTests/TestForestModelIntegration.py b/emission/tests/modellingTests/TestForestModelIntegration.py index 89d0a639d..fad84662b 100644 --- a/emission/tests/modellingTests/TestForestModelIntegration.py +++ b/emission/tests/modellingTests/TestForestModelIntegration.py @@ -12,6 +12,7 @@ import emission.tests.common as etc import emission.pipeline.intake_stage as epi import logging +import emission.analysis.modelling.trip_model.config as eamtc import emission.analysis.modelling.trip_model.run_model as eamur import emission.analysis.modelling.trip_model.model_type as eamumt @@ -26,29 +27,10 @@ class TestForestModelIntegration(unittest.TestCase): # Finally in the test, assert the type of label predictions expected. def setUp(self): - - self.reset_all() np.random.seed(91) self.test_algorithms = eacilp.primary_algorithms - - forest_model_config= { - "loc_feature" : "coordinates", - "radius": 500, - "size_thresh":1, - "purity_thresh":1.0, - "gamma":0.05, - "C":1, - "n_estimators":100, - "criterion":"gini", - "max_depth":'null', - "min_samples_split":2, - "min_samples_leaf":1, - "max_features":"sqrt", - "bootstrap":True, - "random_state":42, - "use_start_clusters":False, - "use_trip_clusters":True - } + forest_model_config = eamtc.get_config_value_or_raise('model_parameters.forest') + etc.setupRealExample(self, "emission/tests/data/real_examples/shankari_2015-07-22") ##maybe use a different file ts = esta.TimeSeries.get_time_series(self.testUUID) label_data = { @@ -59,23 +41,18 @@ def setUp(self): "purpose_weights": [0.1, 0.9] } - self.origin = (-105.1705977, 39.7402654,) - self.destination = (-105.1755606, 39.7673075) - self.min_trips = 14 - self.total_trips = 100 - self.clustered_trips = 33 - self.has_label_percent = 0.9 + self.total_trips=100 ## generate mock trips train = etmm.generate_mock_trips( user_id=self.testUUID, trips=self.total_trips, - origin=self.origin, - destination=self.destination, + origin=(-105.1705977, 39.7402654), + destination=(-105.1755606, 39.7673075), trip_part='od', label_data=label_data, - within_threshold=self.clustered_trips, + within_threshold= 33, threshold=0.004, # ~400m - has_label_p=self.has_label_percent + has_label_p=0.9 ) ts.bulk_insert(train) # confirm data write did not fail @@ -108,7 +85,10 @@ def run_pipeline(self, algorithms): eacilp.primary_algorithms = default_primary_algorithms def reset_all(self): - etc.dropAllCollections(edb._get_current_db()) + edb.get_analysis_timeseries_db().delete_many({'user_id': self.testUUID}) + edb.get_model_db().delete_many({'user_id': self.testUUID}) + edb.get_pipeline_state_db().delete_many({'user_id': self.testUUID}) + # Tests that forest algorithm being tested runs successfully def testForestAlgorithm(self): diff --git a/emission/tests/modellingTests/TestForestModelLoadandSave.py b/emission/tests/modellingTests/TestForestModelLoadandSave.py index 8da1fce5b..37768a689 100644 --- a/emission/tests/modellingTests/TestForestModelLoadandSave.py +++ b/emission/tests/modellingTests/TestForestModelLoadandSave.py @@ -5,14 +5,13 @@ import emission.analysis.modelling.trip_model.run_model as eamur import emission.analysis.modelling.trip_model.model_type as eamumt import emission.analysis.modelling.trip_model.model_storage as eamums - +import emission.analysis.modelling.trip_model.config as eamtc +import uuid import emission.storage.timeseries.abstract_timeseries as esta import emission.tests.modellingTests.modellingTestAssets as etmm import emission.storage.decorations.analysis_timeseries_queries as esda import emission.core.get_database as edb -import emission.storage.pipeline_queries as epq -import emission.core.wrapper.pipelinestate as ecwp - +import emission.analysis.modelling.trip_model.run_model as eamtr class TestForestModelLoadandSave(unittest.TestCase): """ @@ -40,7 +39,7 @@ def setUp(self): # for a negative test, below self.unused_user_id = 'asdjfkl;asdfjkl;asd08234ur13fi4jhf2103mkl' - # test data can be saved between test invocations, check if data exists before generating + # Ensuring that no previous test data was left in DB after teardown, ts = esta.TimeSeries.get_time_series(user_id) test_data = list(ts.find_entries(["analysis/confirmed_trip"])) if len(test_data) == 0: @@ -56,7 +55,7 @@ def setUp(self): "purpose_weights": [0.1, 0.9] } - train = etmm.generate_mock_trips( + test_data = etmm.generate_mock_trips( user_id=user_id, trips=self.total_trips, origin=self.origin, @@ -68,7 +67,7 @@ def setUp(self): has_label_p=self.has_label_percent ) - ts.bulk_insert(train) + ts.bulk_insert(test_data) # confirm data write did not fail test_data = esda.get_entries(key="analysis/confirmed_trip", user_id=user_id, time_query=None) @@ -78,24 +77,7 @@ def setUp(self): else: logging.debug(f'found {self.total_trips} trips in database') - self.forest_model_config= { - "loc_feature" : "coordinates", - "radius": 500, - "size_thresh":1, - "purity_thresh":1.0, - "gamma":0.05, - "C":1, - "n_estimators":100, - "criterion":"gini", - "max_depth":'null', - "min_samples_split":2, - "min_samples_leaf":1, - "max_features":"sqrt", - "bootstrap":True, - "random_state":42, - "use_start_clusters":False, - "use_trip_clusters":True - } + self.forest_model_config= eamtc.get_config_value_or_raise('model_parameters.forest') def tearDown(self): """ @@ -283,31 +265,51 @@ def mock_load(*args,**kwargs): def testRandomForestTypePreservation(self): """ TypePreservationTest: To ensure that the serialization and deserialization - process maintains the data types of all model attributes. + process maintains the data types of all model attributes. + The model is trained, preditions stored, serialised and then desserialized. + The type of deserialised model attributes and the predictions of this must mast initial + serialised model. """ + ## Get trips for a user + test_user=uuid.UUID('feb6a3a8-a2ef-4f4a-8754-bd79f7154495') + ct_entry=eamtr._get_training_data(test_user,None) - logging.debug(f'(TRAIN) creating a model based on trips in database') - eamur.update_trip_model( - user_id=self.user_id, - model_type=eamumt.ModelType.RANDOM_FOREST_CLASSIFIER, - model_storage=eamums.ModelStorage.DOCUMENT_DATABASE, - min_trips=self.min_trips, - model_config=self.forest_model_config + split= int(len(ct_entry)*0.8) + trips=ct_entry[:split] + test_trips=ct_entry[split:] + + ## Build and train model + model_type= eamumt.ModelType.RANDOM_FOREST_CLASSIFIER + model = model_type.build(self.forest_model_config) + model.fit(trips) + + ## Get pre serialization predictions + predictions_list = eamur.predict_labels_with_n( + trip_list = test_trips, + model=model ) - - model = eamur._load_stored_trip_model( - user_id=self.user_id, - model_type=eamumt.ModelType.RANDOM_FOREST_CLASSIFIER, - model_storage=eamums.ModelStorage.DOCUMENT_DATABASE, - model_config=self.forest_model_config - ) - - model_data=model.to_dict() - loaded_model_type=eamumt.ModelType.RANDOM_FOREST_CLASSIFIER - loaded_model = loaded_model_type.build(self.forest_model_config) - loaded_model.from_dict(model_data) - + ## Serialise + serialised_model_data=model.to_dict() + + ## build and deserialise a different model + deserialised_model = model_type.build(self.forest_model_config) + deserialised_model.from_dict(serialised_model_data) + + ## test if the types are correct for attr in ['purpose_predictor','mode_predictor','replaced_predictor','purpose_enc','mode_enc','train_df']: - assert isinstance(getattr(loaded_model.model,attr),type(getattr(model.model,attr))) + deSerialised_attr_value=getattr(deserialised_model.model,attr) + original_attr_value=getattr(model.model,attr) + #Check type preservation + self.assertIsInstance(deSerialised_attr_value,type(original_attr_value), f"Type mismatch for {attr} ") + #Check for value equality. This assumes that the attributes are either direc + + ## test if the predictions are correct + deserialised_predictions_list = eamur.predict_labels_with_n( + trip_list = test_trips, + model=deserialised_model + ) + logging.debug(f'TESTIN:{deserialised_predictions_list}') + logging.debug(f'{predictions_list}') + self.assertEqual(deserialised_predictions_list,predictions_list,'predictions list not same.') diff --git a/emission/tests/modellingTests/TestRunForestModel.py b/emission/tests/modellingTests/TestRunForestModel.py index 1676e878d..6a81a0cb0 100644 --- a/emission/tests/modellingTests/TestRunForestModel.py +++ b/emission/tests/modellingTests/TestRunForestModel.py @@ -4,14 +4,17 @@ import emission.analysis.modelling.trip_model.run_model as eamur import emission.analysis.modelling.trip_model.model_type as eamumt import emission.analysis.modelling.trip_model.model_storage as eamums - +import emission.analysis.modelling.trip_model.models as eamtm +logger=logging.getLogger("") +logger.setLevel(logging.DEBUG) import emission.storage.timeseries.abstract_timeseries as esta import emission.tests.modellingTests.modellingTestAssets as etmm import emission.storage.decorations.analysis_timeseries_queries as esda import emission.core.get_database as edb import emission.storage.pipeline_queries as epq import emission.core.wrapper.pipelinestate as ecwp - +import emission.analysis.modelling.trip_model.forest_classifier as eamtf +from sklearn.ensemble import RandomForestClassifier class TestRunForestModel(unittest.TestCase): """ @@ -67,6 +70,10 @@ def setUp(self): threshold=0.004, # ~400m has_label_p=self.has_label_percent ) + #values required by forest model + for entry in train: + entry['data']['start_local_dt']=entry['metadata']['write_local_dt'] + entry['data']['end_local_dt']=entry['metadata']['write_local_dt'] ts.bulk_insert(train) @@ -92,7 +99,18 @@ def testBuildForestModelFromConfig(self): purposes but will load from a file in /conf/analysis/ which is tested here """ - eamumt.ModelType.RANDOM_FOREST_CLASSIFIER.build() + built_model = eamumt.ModelType.RANDOM_FOREST_CLASSIFIER.build() + attributes={'purpose_predictor': RandomForestClassifier , + 'mode_predictor' :RandomForestClassifier, + 'replaced_predictor':RandomForestClassifier, + 'purpose_enc' : eamtm.OneHotWrapper, + 'mode_enc':eamtm.OneHotWrapper + } + self.assertIsInstance(built_model,eamtf.ForestClassifierModel) + for attr in attributes: + #logging.debug(f'{attr,attributes[attr]}') + x=getattr(built_model.model,attr) + self.assertIsInstance(x, attributes[attr]) # success if it didn't throw def testTrainForestModelWithZeroTrips(self):