Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Survey Assist Using RF #938

Open
wants to merge 30 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
6471a96
add copy of code used in TRB paper
hlu109 Aug 12, 2022
9d6a1af
update user uuid lookup; add documentation note
hlu109 Dec 16, 2022
9bd9b18
Add additional logging to the calculation so that we can monitor the …
shankari Feb 15, 2023
1b9ece0
making `cluster_performance.ipynb`, `generate_figs_for_poster` and `…
humbleOldSage Aug 22, 2023
e7d2a14
Unified Interface for fit function
humbleOldSage Aug 26, 2023
59633e0
Fixing `models.py` to support `regenerate_classification_performance_…
humbleOldSage Aug 30, 2023
0adb5fe
[PARTIALLY TESTED] Single database read and Code Cleanuo
humbleOldSage Sep 14, 2023
e9abd51
[PARTIALLY TESTED] Survey Assist Using RF
humbleOldSage Oct 2, 2023
3820d87
[NOT TESTED]Predict implemented
humbleOldSage Oct 3, 2023
5b2572e
[NOT TESTED] Model storage and Model Testing included
humbleOldSage Oct 9, 2023
bf7f406
[TESTED]Forest Model Integration
humbleOldSage Nov 2, 2023
1d7be5a
Minor fixes
humbleOldSage Nov 2, 2023
b3d0db2
Delete Config file
humbleOldSage Nov 3, 2023
c514fe0
Merge remote-tracking branch 'e-mission-eval-private-data/move-models…
humbleOldSage Nov 3, 2023
3b038a9
removedfile
humbleOldSage Nov 3, 2023
94fc848
Update model.py
humbleOldSage Nov 3, 2023
87f109c
Merge branch 'master' into SurveyAssistUsingRandomForest
humbleOldSage Dec 7, 2023
33cdaab
[Tested, Will fail]Integrating RF model on server and more Unit test
humbleOldSage Dec 9, 2023
01fcb2a
minor fix
humbleOldSage Dec 9, 2023
f5fec64
Delete model.py
humbleOldSage Dec 12, 2023
585cc90
Update TestForestModel.py
humbleOldSage Dec 13, 2023
61bbe3f
Minor Fixes
humbleOldSage Dec 16, 2023
a32ce4f
[Tested]Adding Integration test
humbleOldSage Jan 2, 2024
052cb08
Improving test
humbleOldSage Jan 10, 2024
104dd9a
Integration Testing for forest model
humbleOldSage Feb 5, 2024
1b523ed
[Tested] Improvements for model integration
humbleOldSage Mar 15, 2024
35a1346
Forest Model related data additions
humbleOldSage Mar 21, 2024
19bb394
Update TestForestModelIntegration.py
humbleOldSage Mar 22, 2024
450094c
[TESTED] Updated ForestModelLoadAndSave.py
humbleOldSage Mar 22, 2024
ad968de
Fixing TestForestModelLoadandSave.py
humbleOldSage Mar 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion conf/analysis/trip_model.conf.json.sample
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"model_type": "greedy",
"model_type": "forest",
"model_storage": "document_database",
"minimum_trips": 14,
"model_parameters": {
Expand All @@ -8,6 +8,24 @@
"similarity_threshold_meters": 500,
"apply_cutoff": false,
"incremental_evaluation": false
},
"forest": {
"loc_feature" : "coordinates",
"radius": 100,
"size_thresh":1,
"purity_thresh":1.0,
shankari marked this conversation as resolved.
Show resolved Hide resolved
"gamma":0.05,
"C":1,
"n_estimators":100,
"criterion":"gini",
"max_depth":null,
"min_samples_split":2,
"min_samples_leaf":1,
"max_features":"sqrt",
"bootstrap":true,
"random_state":42,
"use_start_clusters":false,
"use_trip_clusters":true
}
}
}
197 changes: 197 additions & 0 deletions emission/analysis/modelling/trip_model/forest_classifier.py
shankari marked this conversation as resolved.
Show resolved Hide resolved
shankari marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
import joblib
from typing import Dict, List, Optional, Tuple
import sklearn.metrics.pairwise as smp
import emission.core.wrapper.confirmedtrip as ecwc
import logging
from io import BytesIO

import json
import emission.analysis.modelling.trip_model.trip_model as eamuu
import emission.analysis.modelling.trip_model.config as eamtc
import emission.storage.timeseries.builtin_timeseries as estb
import emission.storage.decorations.trip_queries as esdtq
import emission.analysis.modelling.trip_model.models as eamtm

EARTH_RADIUS = 6371000

class ForestClassifierModel(eamuu.TripModel):

def __init__(self,config=None):

if config is None:
config = eamtc.get_config_value_or_raise('model_parameters.forest')
logging.debug(f'ForestClassifier loaded model config from file')
else:
logging.debug(f'ForestClassifier using model config argument')

random_forest_expected_keys = [
'loc_feature',
'n_estimators',
'criterion',
'min_samples_split',
'min_samples_leaf',
'max_features',
'bootstrap',
]
######### Not Tested #########
# The below code is used when we cluster the coordinates (loc_cluster parameter = True)
# before passing to Random Forest. Commenting this for now since it is not used. Not tested either.
###############################

# cluster_expected_keys= [
# 'radius',
# 'size_thresh',
# 'purity_thresh',
# 'gamma',
# 'C',
# 'use_start_clusters',
# 'use_trip_clusters',
# ]
#
# if config['loc_feature'] == 'cluster':
# for k in cluster_expected_keys:
# if config.get(k) is None:
# msg = f"cluster trip model config missing expected key {k}"
# raise KeyError(msg)
#######################################
for k in random_forest_expected_keys:
if config.get(k) is None:
msg = f"forest trip model config missing expected key {k}"
raise KeyError(msg)
self.model=eamtm.ForestClassifier(**config)


def fit(self,trips: List[ecwc.Confirmedtrip]):
'''
trips : List of Entry type data
'''
# check and raise exception if no data to fit
logging.debug(f'fit called with {len(trips)} trips')

unlabeled = list(filter(lambda t: len(t['data']['user_input']) == 0, trips))
if len(unlabeled) > 0:
msg = f'model.fit cannot be called with unlabeled trips, found {len(unlabeled)}'
raise Exception(msg)

#Convert List of Entry to dataframe
data_df = estb.BuiltinTimeSeries.to_data_df("analysis/confirmed_trip",trips)
labeled_trip_df = esdtq.filter_labeled_trips(data_df)
expanded_labeled_trip_df= esdtq.expand_userinputs(labeled_trip_df)
#fit models on dataframe
self.model.fit(expanded_labeled_trip_df)


def predict(self, trip: List[float]) -> Tuple[List[Dict], int]:
'''
trip : A single trip whose mode, pupose and replaced mode are required
returns.
'''

#check if theres no trip to predict
logging.debug(f"forest classifier predict called with {len(trip)} trips")
if len(trip) == 0:
msg = f'model.predict cannot be called with an empty trip'
raise Exception(msg)
# CONVERT TRIP TO dataFrame
test_df = estb.BuiltinTimeSeries.to_data_df("analysis/confirmed_trip",[trip])
predcitions_df= self.model.predict(test_df)

# the predictions_df currently holds the highest probable options
# individually in all three categories. the predictions_df are in the form
#
# purpose_pred | purpose_proba | mode_pred | mode_proba | replaced_pred | replaced proba
# dog-park | 1.0 | e-bike | 0.99 | walk | 1.1
#
#
# However, to keep the trip model general, the forest model is expected to return
#
#PREDICTIONS [ {'labels': {'mode_confirm': 'e-bike', 'replaced_mode': 'walk', 'purpose_confirm': 'dog-park'},
# 'p': ( Currently average of the 3 probabilities)}]
labels= {
shankari marked this conversation as resolved.
Show resolved Hide resolved
'mode_confirm': predcitions_df['mode_pred'].iloc[0],
'replaced_mode' : predcitions_df['replaced_pred'].iloc[0],
'purpose_confirm' : predcitions_df['purpose_pred'].iloc[0]
}

avg_proba = predcitions_df[['purpose_proba','mode_proba','replaced_proba']].mean(axis=1).iloc[0]
predictions =[{
'labels' : labels,
'p' : avg_proba
}]
return predictions, len(predictions)

def to_dict(self):
"""
Convert the model to a dictionary suitable for storage.
"""
data={}
attr=[ 'purpose_predictor','mode_predictor','replaced_predictor','purpose_enc','mode_enc','train_df']

######### Not Tested #########
# The below code is used when we cluster the coordinates (loc_cluster parameter = True)
# before passing to Random Forest. Commenting this for now since it is not used. Not tested either.
###############################
# if self.model.loc_feature == 'cluster':
# ## confirm this includes all the extra encoders/models
# attr.extend([ 'cluster_enc','end_cluster_model','start_cluster_model','trip_grouper'])

for attribute_name in attr:
if not hasattr(self.model,attribute_name):
raise ValueError(f"Attribute {attribute_name} not found in the model")

buffer=BytesIO()
try:
joblib.dump(getattr(self.model,attribute_name),buffer)
except Exception as e:
raise RuntimeError(f"Error serializing { attribute_name}: {str(e)}")
buffer.seek(0)
data[attribute_name]=buffer.getvalue()

return data

def from_dict(self,model: Dict):
"""
Load the model from a dictionary.
"""
attr=[ 'purpose_predictor','mode_predictor','replaced_predictor','purpose_enc','mode_enc','train_df']

######### Not Tested #########
# The below code is used when we cluster the coordinates (loc_cluster parameter = True)
# before passing to Random Forest. Commenting this for now since it is not used. Not tested either.
###############################
# if self.model.loc_feature == 'cluster':
# ## TODO : confirm this includes all the extra encoders/models
# attr.extend([ 'cluster_enc','end_cluster_model','start_cluster_model','trip_grouper'])
for attribute_name in attr:
if attribute_name not in model:
raise ValueError(f"Attribute {attribute_name} missing in the model")
try:
buffer = BytesIO(model[attribute_name])
setattr(self.model,attribute_name, joblib.load(buffer))
except Exception as e:
raise RuntimeError(f"Error deserializing { attribute_name}: {str(e)}")
# If we do not wish to raise the exception after logging the error, comment the line above

def extract_features(self, trip: ecwc.Confirmedtrip) -> List[float]:
"""
extract the relevant features for learning from a trip for this model instance

:param trip: the trip to extract features from
:type trip: Confirmedtrip
:return: a vector containing features to predict from
:rtype: List[float]
"""
# ForestClassifier class in models.py file handles features extraction.
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should add a code comment here about why this is "pass"
a normal expectation would be that all models would extract features.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do. Its because the pre implemented models.py file handles extraction in this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is included in the upcoming commit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see this code comment in here yet

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done.


def is_incremental(self) -> bool:
"""
whether this model requires the complete user history to build (False),
or, if only the incremental data since last execution is required (True).

:return: if the model is incremental. the current timestamp will be recorded
in the analysis pipeline. the next call to this model will only include
trip data for trips later than the recorded timestamp.
:rtype: bool
"""
pass
18 changes: 10 additions & 8 deletions emission/analysis/modelling/trip_model/model_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import emission.analysis.modelling.trip_model.trip_model as eamuu
import emission.analysis.modelling.similarity.od_similarity as eamso
import emission.analysis.modelling.trip_model.greedy_similarity_binning as eamug
import emission.analysis.modelling.trip_model.forest_classifier as eamuf


SIMILARITY_THRESHOLD_METERS=500
Expand All @@ -11,6 +12,7 @@
class ModelType(Enum):
# ENUM_NAME_CAPS = 'SHORTHAND_NAME_CAPS'
GREEDY_SIMILARITY_BINNING = 'GREEDY'
RANDOM_FOREST_CLASSIFIER = 'FOREST'

def build(self, config=None) -> eamuu.TripModel:
"""
Expand All @@ -24,16 +26,16 @@ def build(self, config=None) -> eamuu.TripModel:
:raises KeyError: if the requested model name does not exist
"""
# Dict[ModelType, TripModel]
MODELS = {
ModelType.GREEDY_SIMILARITY_BINNING: eamug.GreedySimilarityBinning(config)
}
MODELS = {
ModelType.GREEDY_SIMILARITY_BINNING: eamug.GreedySimilarityBinning,
ModelType.RANDOM_FOREST_CLASSIFIER: eamuf.ForestClassifierModel
}
model = MODELS.get(self)
if model is None:
model_names = list(lambda e: e.name, MODELS.keys())
models = ",".join(model_names)
raise KeyError(f"ModelType {self.name} not found in factory, please add to build method")

return model
available_models = ', '.join([ e.name for e in ModelType])
raise KeyError(f"ModelType {self.name} not found in factory, Available models are {available_models}."\
"Otherwise please add new model to build method")
return model(config)
shankari marked this conversation as resolved.
Show resolved Hide resolved

@classmethod
def names(cls):
Expand Down
70 changes: 38 additions & 32 deletions emission/analysis/modelling/trip_model/models.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have preferred this to be called something other than models.py (maybe something like rf_for_label_models.py but will not hold up the release for this change.

Copy link
Contributor Author

@humbleOldSage humbleOldSage Mar 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't done this yet since this file has other models as well ( this was brought in as it is from the e-mission-eval-private-data ). I can try to move other models to a separate file ( along with their blame history) and then change its name.

Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from emission.analysis.modelling.trip_model.clustering import get_distance_matrix, single_cluster_purity
import emission.analysis.modelling.trip_model.data_wrangling as eamtd
import emission.storage.decorations.trip_queries as esdtq
from emission.analysis.classification.inference.labels.inferrers import predict_cluster_confidence_discounting
import emission.analysis.classification.inference.labels.inferrers as eacili
import emission.core.wrapper.entry as ecwe
import emission.analysis.modelling.trip_model.greedy_similarity_binning as eamtg
import emission.core.common as ecc
Expand Down Expand Up @@ -738,7 +738,7 @@ def predict_proba(self, test_df):
replaced_distribs = []

for trip in test_trips:
trip_prediction = predict_cluster_confidence_discounting(trip)
trip_prediction = eacili.predict_cluster_confidence_discounting(trip)

if len(trip_prediction) == 0:
# model could not find cluster for the trip
Expand Down Expand Up @@ -1514,7 +1514,7 @@ def __init__(
self.C = C
self.n_estimators = n_estimators
self.criterion = criterion
self.max_depth = max_depth
self.max_depth = max_depth if max_depth!= 'null' else None
self.min_samples_split = min_samples_split
self.min_samples_leaf = min_samples_leaf
self.max_features = max_features
Expand All @@ -1524,36 +1524,42 @@ def __init__(
self.use_start_clusters = use_start_clusters
self.use_trip_clusters = use_trip_clusters

if self.loc_feature == 'cluster':
# clustering algorithm to generate end clusters
self.end_cluster_model = DBSCANSVMCluster(
loc_type='end',
radius=self.radius,
size_thresh=self.size_thresh,
purity_thresh=self.purity_thresh,
gamma=self.gamma,
C=self.C)

if self.use_start_clusters or self.use_trip_clusters:
# clustering algorithm to generate start clusters
self.start_cluster_model = DBSCANSVMCluster(
loc_type='start',
radius=self.radius,
size_thresh=self.size_thresh,
purity_thresh=self.purity_thresh,
gamma=self.gamma,
C=self.C)

if self.use_trip_clusters:
# helper class to generate trip-level clusters
self.trip_grouper = TripGrouper(
start_cluster_col='start_cluster_idx',
end_cluster_col='end_cluster_idx')

# wrapper class to generate one-hot encodings for cluster indices
self.cluster_enc = OneHotWrapper(sparse=False,
handle_unknown='ignore')
######### Not Tested #########
# The below code is used when we cluster the coordinates (loc_cluster parameter = True)
# before passing to Random Forest. Commenting this for now since it is not tested.
###############################
# if self.loc_feature == 'cluster':
# # clustering algorithm to generate end clusters
# self.end_cluster_model = DBSCANSVMCluster(
# loc_type='end',
# radius=self.radius,
# size_thresh=self.size_thresh,
# purity_thresh=self.purity_thresh,
# gamma=self.gamma,
# C=self.C)

# if self.use_start_clusters or self.use_trip_clusters:
# # clustering algorithm to generate start clusters
# self.start_cluster_model = DBSCANSVMCluster(
# loc_type='start',
# radius=self.radius,
# size_thresh=self.size_thresh,
# purity_thresh=self.purity_thresh,
# gamma=self.gamma,
# C=self.C)

# if self.use_trip_clusters:
# # helper class to generate trip-level clusters
# self.trip_grouper = TripGrouper(
# start_cluster_col='start_cluster_idx',
# end_cluster_col='end_cluster_idx')

# # wrapper class to generate one-hot encodings for cluster indices
# self.cluster_enc = OneHotWrapper(sparse=False,
# handle_unknown='ignore')
#############################################################################


# wrapper class to generate one-hot encodings for purposes and modes
self.purpose_enc = OneHotWrapper(impute_missing=True,
sparse=False,
Expand Down
5 changes: 2 additions & 3 deletions emission/analysis/modelling/trip_model/run_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,7 @@ def update_trip_model(
time_query = time_query_from_pipeline if model.is_incremental else None
logging.debug(f'model type {model_type.name} is incremental? {model.is_incremental}')
logging.debug(f'time query for training data collection: {time_query}')

trips = _get_training_data(user_id, time_query)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extraneous whitespace.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still see this extraneous whitespace

Copy link
Contributor Author

@humbleOldSage humbleOldSage Mar 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

# don't start training for a user that doesn't have at least $trips many trips
# (assume if a stored model exists for the user, that they met this requirement previously)
if len(trips) == 0:
Expand All @@ -74,7 +72,8 @@ def update_trip_model(
epq.mark_trip_model_failed(user_id)
else:

# train and store the model
# train and store the model. pass only List of event and only convert
# to dataframe type data whereever required.
model.fit(trips)
model_data_next = model.to_dict()

Expand Down
Loading
Loading