diff --git a/emission/analysis/modelling/trip_model/clustering.py b/emission/analysis/modelling/trip_model/clustering.py new file mode 100644 index 000000000..d3924f32a --- /dev/null +++ b/emission/analysis/modelling/trip_model/clustering.py @@ -0,0 +1,410 @@ +# helper functions to streamline the use and comparison of clustering algs + +# basic imports +import pandas as pd +import numpy as np +import logging + +# import clustering algorithms +import sklearn.metrics.pairwise as smp +import sklearn.cluster as sc +from sklearn import metrics +from sklearn import svm +from sklearn.pipeline import make_pipeline +from sklearn.preprocessing import StandardScaler + +# our imports +# NOTE: this requires changing the branch of e-mission-server to +# eval-private-data-compatibility +import emission.storage.decorations.trip_queries as esdtq +import emission.analysis.modelling.trip_model.greedy_similarity_binning as eamtg + +EARTH_RADIUS = 6371000 +ALG_OPTIONS = [ + 'DBSCAN', + 'naive', + 'OPTICS', + # 'fuzzy', + 'mean_shift' +] + +def cleanEntryTypeData(loc_df,trip_entry): + + """ + Helps weed out entries from the list of entries which were removed from the df using + esdtq.filter_labeled_trips() and esdtq.expand_userinputs() + + loc_df : dataframe amde from entry type data + trip_entry : the entry type equivalent of loc_df , + which was passed alongside the dataframe while loading the data + + """ + + ids_in_df=loc_df['_id'] + filtered_trip_entry = list(filter(lambda entry: entry['_id'] in ids_in_df.values, trip_entry)) + return filtered_trip_entry + + +def add_loc_clusters( + loc_df, + trip_entry, + clustering_way, + radii, + loc_type, + alg, + SVM=False, + # cluster_unlabeled=False, + min_samples=1, + optics_min_samples=None, + optics_xi=0.05, + optics_cluster_method='xi', + svm_min_size=6, + svm_purity_thresh=0.7, + svm_gamma=0.05, + svm_C=1): + """ Given a dataframe of trips, cluster the locations (either start or end + locations) using the desired algorithm & parameters. + + Returns: + Same dataframe, with appended columns that contain the resulting cluster indices. + + Args: + loc_df (dataframe): must have columns 'start_lat' and 'start_lon' + or 'end_lat' and 'end_lon' + trip_entry ( list of Entry/confirmedTrip): list consisting all entries from the + time data was loaded. loc_df was obtained from this by converting to df and + then filtering out labeled trips and expanding user_inputs + radii (int list): list of radii to run the clustering algs with + loc_type (str): 'start' or 'end' + alg (str): 'DBSCAN', 'naive', 'OPTICS', 'SVM', 'fuzzy', or + 'mean_shift' + SVM (bool): whether or not to sub-divide clusters with SVM + # cluster_unlabeled (bool): whether or not unlabeled points are used + # to generate clusters. + min_samples (int): min samples per cluster. used in DBSCAN (and + therefore also SVM and fuzzy, for now) + optics_min_samples (int): min samples per cluster, if using OPTICS. + optics_xi (float): xi value if using the xi method of OPTICS. + optics_cluster_method (str): method to use for the OPTICS + algorithm. either 'xi' or 'dbscan' + svm_min_size (int): the min number of trips a cluster must have to + be considered for sub-division, if using SVM + svm_purity_thresh (float): the min purity a cluster must have to be + sub-divided, if using SVM + svm_gamma (float): if using SVM, the gamma hyperparameter + svm_C (float): if using SVM, the C hyperparameter + """ + assert loc_type == 'start' or loc_type == 'end' + assert alg in ALG_OPTIONS + + # if using SVM, we get the initial clusters with DBSCAN, then sub-divide + if alg == 'DBSCAN': + dist_matrix_meters = get_distance_matrix(loc_df, loc_type) + + for r in radii: + model = sc.DBSCAN(r, metric="precomputed", + min_samples=min_samples).fit(dist_matrix_meters) + labels = model.labels_ + # print(model.n_features_in_) + # print(model.components_.shape) + # print(model.components_) + + # pd.Categorical converts the type from int to category (so + # numerical operations aren't possible) + # loc_df.loc[:, + # f"{loc_type}_DBSCAN_clusters_{r}_m"] = pd.Categorical( + # labels) + # TODO: fix this and make it Categorical again (right now labels are + # ints) + loc_df.loc[:, f"{loc_type}_DBSCAN_clusters_{r}_m"] = labels + + elif alg == 'naive': + + cleaned_trip_entry= cleanEntryTypeData(loc_df,trip_entry) + + for r in radii: + # this is using a modified Similarity class that bins start/end + # points separately before creating trip-level bins + + model_config = { + "metric": "od_similarity", + "similarity_threshold_meters": r, # meters, + "apply_cutoff": False, + "clustering_way": clustering_way, + "shouldFilter":False, + "incremental_evaluation": False + } + + sim_model = eamtg.GreedySimilarityBinning(model_config) + sim_model.fit(cleaned_trip_entry) + labels = [int(l) for l in sim_model.tripLabels] + # # pd.Categorical converts the type from int to category (so + # # numerical operations aren't possible) + # loc_df.loc[:, f"{loc_type}_{alg}_clusters_{r}_m"] = pd.Categorical( + # labels) + loc_df.loc[:, f"{loc_type}_{alg}_clusters_{r}_m"] = labels + + elif alg == 'OPTICS': + if optics_min_samples == None: + optics_min_samples = 2 + dist_matrix_meters = get_distance_matrix(loc_df, loc_type) + + for r in radii: + labels = sc.OPTICS( + min_samples=optics_min_samples, + max_eps=r, + xi=optics_xi, + cluster_method=optics_cluster_method, + metric="precomputed").fit(dist_matrix_meters).labels_ + + # # pd.Categorical converts the type from int to category (so + # # numerical operations aren't possible) + # loc_df.loc[:, f"{loc_type}_{alg}_clusters_{r}_m"] = pd.Categorical( + # labels) + loc_df.loc[:, f"{loc_type}_{alg}_clusters_{r}_m"] = labels + + elif alg == 'fuzzy': + # create clusters with completely homogeneous purpose labels + # I'm calling this 'fuzzy' for now since the clusters overlap, but I + # need to think of a better name + logging.warning( + 'This alg is not properly implemented and will not generate clusters for unlabeled trips!' + ) + + purpose_list = loc_df.purpose_confirm.dropna().unique() + + for p in purpose_list: + p_loc_df = loc_df.loc[loc_df['purpose_confirm'] == p] + dist_matrix_meters = get_distance_matrix(p_loc_df, loc_type) + + for r in radii: + labels = sc.DBSCAN( + r, metric="precomputed", + min_samples=min_samples).fit(dist_matrix_meters).labels_ + + # pd.Categorical converts the type from int to category (so + # numerical operations aren't possible) + # loc_df.loc[:, + # f"{loc_type}_DBSCAN_clusters_{r}_m"] = pd.Categorical( + # labels) + loc_df.loc[loc_df['purpose_confirm'] == p, + f"{loc_type}_{alg}_clusters_{r}_m"] = labels + + # move "noisy" trips to their own single-trip clusters + noisy_trips = loc_df.loc[(loc_df['purpose_confirm'] == p) & ( + loc_df[f"{loc_type}_{alg}_clusters_{r}_m"] == -1)] + for idx in noisy_trips.index.values: + max_idx_inside_p = loc_df.loc[ + (loc_df['purpose_confirm'] == p), + f"{loc_type}_{alg}_clusters_{r}_m"].max() + loc_df.loc[ + idx, + f"{loc_type}_{alg}_clusters_{r}_m"] = 1 + max_idx_inside_p + + # we offset all cluster indices for purpose p by the max + # existing index excluding purpose p + # so that we don't run into any duplicate trouble + max_idx_outside_p = loc_df.loc[ + (loc_df['purpose_confirm'] != p), + f"{loc_type}_{alg}_clusters_{r}_m"].max(skipna=True) + + if np.isnan(max_idx_outside_p): + # can happen if column is empty, e.g. if this is the first + # purpose in the list that we are iterating over + max_idx_outside_p = -1 + + logging.debug('max_idx_outside_p', max_idx_outside_p, + "at radius", r) + + loc_df.loc[ + loc_df['purpose_confirm'] == p, + f"{loc_type}_{alg}_clusters_{r}_m"] += 1 + max_idx_outside_p + + elif alg == 'mean_shift': + for r in radii: + # seems like the bandwidth is based on the raw lat/lon data (we + # never pass in a distance matrix), so we want a conversion factor + # from meters to degrees. Since 100-500m corresponds to such a + # small degree change, we can rely on the small angle approximation + # and just use a linear multiplier. This conversion factor doesn't + # have to be *super* accurate, its just so we can get a sense of + # what the bandwidth roughly corresponds to in the real world/make + # the value a little more interpretable. + LATLON_TO_M = 1 / 111139 + labels = sc.MeanShift( + bandwidth=LATLON_TO_M * r, + min_bin_freq=min_samples, + cluster_all=False, + ).fit(loc_df[[f"{loc_type}_lon", f"{loc_type}_lat"]]).labels_ + + # pd.Categorical converts the type from int to category (so + # numerical operations aren't possible) + # loc_df.loc[:, + # f"{loc_type}_DBSCAN_clusters_{r}_m"] = pd.Categorical( + # labels) + # TODO: fix this and make it Categorical again (right now labels are + # ints) + loc_df.loc[:, f"{loc_type}_mean_shift_clusters_{r}_m"] = labels + + # move "noisy" trips to their own single-trip clusters + for idx in loc_df.loc[ + loc_df[f"{loc_type}_mean_shift_clusters_{r}_m"] == + -1].index.values: + loc_df.loc[ + idx, f"{loc_type}_mean_shift_clusters_{r}_m"] = 1 + loc_df[ + f"{loc_type}_mean_shift_clusters_{r}_m"].max() + + if SVM: + loc_df = add_loc_SVM(loc_df, radii, alg, loc_type, svm_min_size, + svm_purity_thresh, svm_gamma, svm_C) + return loc_df + + +def add_loc_SVM(loc_df, + radii, + alg, + loc_type, + svm_min_size=6, + svm_purity_thresh=0.7, + svm_gamma=0.05, + svm_C=1, + cluster_cols=None): + """ Sub-divide base clusters using SVM. + + Args: + loc_df (dataframe): must have columns 'start_lat' and 'start_lon' + or 'end_lat' and 'end_lon', as well as + '{loc_type}_{base_alg}_SVM_clusters_{r}_m', containing cluster indices generated by the base clustering alg + radii (int list): list of radii to run the clustering algs with + loc_type (str): 'start' or 'end' + svm_min_size (int): the min number of trips a cluster must have to + be considered for sub-division + svm_purity_thresh (float): the min purity a cluster must have to be + sub-divided + svm_gamma (float): the gamma hyperparameter + svm_C(float): the C hyperparameter + cluster_col (str list): names of column containing cluster indices + of interest + """ + assert loc_type == 'start' or loc_type == 'end' + assert f'{loc_type}_lat' in loc_df.columns + assert f'{loc_type}_lon' in loc_df.columns + + for i in range(len(radii)): + r = radii[i] + if cluster_cols == None: + cluster_col = f"{loc_type}_{alg}_clusters_{r}_m" + else: + cluster_col = cluster_cols[i] + assert cluster_col in loc_df.columns + + # c is the count of how many clusters we have iterated over + c = 0 + # iterate over all clusters and subdivide them with SVM. The while loop + # is so we can do multiple iterations of subdividing if needed + while c < loc_df[cluster_col].max(): + points_in_cluster = loc_df.loc[loc_df[cluster_col] == c] + + labeled_points_in_cluster = points_in_cluster.dropna( + subset=['purpose_confirm']) + + # only do SVM if we have at least labeled 6 points in the cluster + # (or custom min_size) + if len(labeled_points_in_cluster) < svm_min_size: + c += 1 + continue + + # only do SVM if purity is below threshold + purity = single_cluster_purity(labeled_points_in_cluster) + if purity < svm_purity_thresh: + X_train = labeled_points_in_cluster[[ + f"{loc_type}_lon", f"{loc_type}_lat" + ]] + X_all = points_in_cluster[[ + f"{loc_type}_lon", f"{loc_type}_lat" + ]] + y_train = labeled_points_in_cluster.purpose_confirm.to_list() + + labels = make_pipeline( + StandardScaler(), + svm.SVC( + kernel='rbf', + gamma=svm_gamma, + C=svm_C, + )).fit(X_train, y_train).predict(X_all) + + unique_labels = np.unique(labels) + + # map from purpose labels to new cluster indices + # we offset indices by the max existing index so that + # we don't run into any duplicate trouble + max_existing_idx = loc_df[cluster_col].max() + + # # if the indices are Categorical, need to convert to + # # ordered values + # max_existing_idx = np.amax( + # existing_cluster_indices.as_ordered()) + + # labels = np.array(svc.predict(X)) + label_to_cluster = { + unique_labels[i]: i + max_existing_idx + 1 + for i in range(len(unique_labels)) + } + + # if the SVM predicts everything with the same label, just + # ignore it and don't reindex. + + # this also helps us to handle the possibility that a cluster + # may be impure but inherently inseparable, e.g. an end cluster + # containing 50% 'home' trips and 50% round trips to pick up/ + # drop off. we don't want to reindex otherwise the low purity + # will trigger SVM again, and we will attempt & fail to split + # the cluster ad infinitum + if len(unique_labels) > 1: + indices = np.array([label_to_cluster[l] for l in labels]) + + loc_df.loc[loc_df[cluster_col] == c, cluster_col] = indices + + c += 1 + + return loc_df + + +def get_distance_matrix(loc_df, loc_type): + """ Args: + loc_df (dataframe): must have columns 'start_lat' and 'start_lon' + or 'end_lat' and 'end_lon' + loc_type (str): 'start' or 'end' + """ + assert loc_type == 'start' or loc_type == 'end' + + radians_lat_lon = np.radians(loc_df[[loc_type + "_lat", loc_type + "_lon"]]) + + dist_matrix_meters = pd.DataFrame( + smp.haversine_distances(radians_lat_lon, radians_lat_lon) * + EARTH_RADIUS) + return dist_matrix_meters + + +def single_cluster_purity(points_in_cluster, label_col='purpose_confirm'): + """ Calculates purity of a cluster (i.e. % of trips that have the most + common label) + + Args: + points_in_cluster (df): dataframe containing points in the same + cluster + label_col (str): column in the dataframe containing labels + """ + assert label_col in points_in_cluster.columns + + most_freq_label = points_in_cluster[label_col].mode()[0] + purity = len(points_in_cluster[points_in_cluster[label_col] == + most_freq_label]) / len(points_in_cluster) + return purity + + +def purity_score(y_true, y_pred): + contingency_matrix = metrics.cluster.contingency_matrix(y_true, y_pred) + purity = np.sum(np.amax(contingency_matrix, + axis=0)) / np.sum(contingency_matrix) + return purity diff --git a/emission/analysis/modelling/trip_model/data_wrangling.py b/emission/analysis/modelling/trip_model/data_wrangling.py new file mode 100644 index 000000000..137886190 --- /dev/null +++ b/emission/analysis/modelling/trip_model/data_wrangling.py @@ -0,0 +1,238 @@ +import pandas as pd +import numpy as np +import logging + + +def expand_df_dict(df, column_name): + """ + df: a dataframe that contains a column whose values are dictionaries + column_name: name of the df's column containing dictionary entries + + Returns a dataframe with the desired column expanded into the main dataframe + + This is a generalized version of the expand_userinputs() function from + e-mission-server/emission/storage/decorations/trip_queries.py + """ + if len(df) == 0: + return df + expanded_col = pd.DataFrame(df.loc[:, column_name].to_list(), + index=df.index) + logging.debug(expanded_col.head()) + df = df.drop(columns=[column_name]) + expanded_df = pd.concat([df, expanded_col], axis=1) + assert len(expanded_df) == len(df), \ + ("Mismatch after expanding labels, expanded_df.rows = %s != df.columns %s" % + (len(expanded_df), len(df))) + logging.debug("After expanding, columns went from %s -> %s" % + (len(df.columns), len(expanded_df.columns))) + logging.debug(expanded_df.head()) + return expanded_df + + +# oops, this is actually just the same as pd's explode() +def expand_df_list_vert(df, column_name): + """ + df: a dataframe that contains a column whose values are lists + column_name: name of the df's column containing list entries. (the + length of the list entry can vary from row to row.) + + Returns a dataframe with the desired column expanded vertically into + the main dataframe, i.e. for each row in the original dataframe, there + will be n rows in the expanded dataframe where n is the length of its + list entry under 'column_name' + """ + if len(df) == 0: + return df + + expanded_df_list = [] + for i in range(len(df)): + col_list = df.loc[i, column_name] + for e in col_list: + # add new row to new_df + new_row = df.loc[i].to_dict() + new_row[column_name] = e + expanded_df_list += [new_row] + + if len(expanded_df_list) == 0: + logging.debug( + '{} only has empty lists; expansion failed.'.format(column_name)) + raise Exception('expansion failed; empty lists') + + expanded_df = pd.DataFrame(expanded_df_list) + + assert len(expanded_df.columns) == len(df.columns), \ + ("Mismatch after expanding labels, expanded_df.columns = %s != df.columns %s" % + (len(expanded_df.columns), len(df.columns))) + logging.debug("After expanding, rows went from %s -> %s" % + (len(df), len(expanded_df))) + + return expanded_df + + +def expand_df_list_horiz(df, column_name): + """ + df: a dataframe that contains a column whose values are lists + column_name: name of the df's column containing list entries. (the + length of the list entry must be consistent for all rows.) + + Returns a dataframe with the desired column expanded horizontally into + the main dataframe, i.e. 'column_name' will be replaced by n columns + where n is the length of each list entry + """ + if len(df) == 0: + return df + expanded_col = pd.DataFrame(df.loc[:, column_name].to_list(), + index=df.index) + logging.debug(expanded_col.head()) + df = df.drop(columns=[column_name]) + expanded_df = pd.concat([df, expanded_col], axis=1) + assert len(expanded_df) == len(df), \ + ("Mismatch after expanding labels, expanded_df.rows = %s != df.columns %s" % + (len(expanded_df), len(df))) + logging.debug("After expanding, columns went from %s -> %s" % + (len(df.columns), len(expanded_df.columns))) + logging.debug(expanded_df.head()) + return expanded_df + + +def add_top_pred(df, trip_id_column='trip_id', pred_conf_column='pred_conf'): + """ df: dataframe containing trip ids, predicted labels and confidence level + trip_id_column: string, the name of the column containing trip ids + pred_conf_column: string, the name of the column containing prediction confidence + """ + df['top_pred'] = False + for trip_id in df[trip_id_column].unique(): + id_max = df[df[trip_id_column] == trip_id][pred_conf_column].idxmax( + skipna=True) + if not np.isnan(id_max): + df.loc[id_max, 'top_pred'] = True + + return df + + +def trips_to_df(trips, user_id, os_df=None): + datas = [] + for i in range(len(trips)): + t = trips[i] + if 'inferred_labels' not in t['data'] or t['data'][ + 'inferred_labels'] == []: + data = {'trip_id': t['_id']} + data = update_labels(data, t['data']['user_input'], 'true') + datas.append(data) + + else: + for label in t['data']['inferred_labels']: + data = {'trip_id': t['_id']} + data['pred_conf'] = label['p'] + data = update_labels(data, label['labels'], 'pred') + data = update_labels(data, t['data']['user_input'], 'true') + datas.append(data) + + df = pd.DataFrame(datas, + columns=[ + 'user_id', 'trip_id', 'mode_pred', 'replaced_pred', + 'purpose_pred', 'tuple_pred', 'pred_conf', + 'mode_true', 'replaced_true', 'purpose_true', + 'tuple_true' + ]) + df['user_id'] = user_id + if os_df: + df['os'] = os_df[os_df.user_id == user_id]['curr_platform'].item() + df['tuple_pred'] = df.mode_pred.astype( + str) + ', ' + df.purpose_pred.astype( + str) + ', ' + df.replaced_pred.astype(str) + df['tuple_true'] = df.mode_true.astype( + str) + ', ' + df.purpose_true.astype( + str) + ', ' + df.replaced_true.astype(str) + + # df['tuple_pred'] = list(zip(df.mode_pred, df.purpose_pred, df.replaced_pred)) + # df['tuple_true'] = list(zip(df.mode_true, df.purpose_true, df.replaced_true)) + + # indicates if the predicted label was the top choice (i.e. the first suggestion to the user) + df = add_top_pred(df) + + return df + + +def update_labels(data, user_input, label_type): + """ helper function to populate a dictionary with trip labels. + + Args: + data (dict): dictionary that we want to populate + user_input (dict): the dictionary containing mode_confirm, + purpose_confirm, and replaced_mode information (e.g. + t['data']['user_input'] or t['data']['inferred_labels'][i] ) + label_type (str): 'true' or 'pred' + """ + if user_input != {}: + if 'mode_confirm' in user_input.keys(): + data['mode_' + label_type] = user_input['mode_confirm'] + if data['mode_' + label_type] == 'not_a_trip': + data['replaced_' + label_type] = 'not_a_trip' + data['purpose_' + label_type] = 'not_a_trip' + + else: + if 'replaced_mode' in user_input.keys(): + data['replaced_' + + label_type] = user_input['replaced_mode'] + if 'purpose_confirm' in user_input.keys(): + data['purpose_' + + label_type] = user_input['purpose_confirm'] + + return data + + +def get_labels(trips): + """ helper function to get lists of trip labels from a list of trip dicts.""" + mode_true = [] + purpose_true = [] + replaced_true = [] + + for t in trips: + if 'mode_confirm' in t['data']['user_input']: + mode_true.append(t['data']['user_input']['mode_confirm']) + else: + mode_true.append(None) + + if 'purpose_confirm' in t['data']['user_input']: + purpose_true.append(t['data']['user_input']['purpose_confirm']) + else: + purpose_true.append(None) + + if 'replaced_mode' in t['data']['user_input']: + replaced_true.append(t['data']['user_input']['replaced_mode']) + else: + replaced_true.append(None) + + return mode_true, purpose_true, replaced_true + + +def get_trip_index(trips): + """ helper function to get list of trip indices from a list of trip dicts.""" + trip_indices = [] + for t in trips: + trip_indices.append(t['_id']) + + return trip_indices + + +def expand_coords(exp_df, purpose=None): + """ + copied and modifed from get_loc_df_for_purpose() in the 'Radius + selection' notebook + """ + purpose_trips = exp_df + if purpose is not None: + purpose_trips = exp_df[exp_df.purpose_confirm == purpose] + + dfs = [purpose_trips] + for loc_type in ['start', 'end']: + df = pd.DataFrame( + purpose_trips[loc_type + + "_loc"].apply(lambda p: p["coordinates"]).to_list(), + columns=[loc_type + "_lon", loc_type + "_lat"]) + df = df.set_index(purpose_trips.index) + dfs.append(df) + + # display.display(end_loc_df.head()) + return pd.concat(dfs, axis=1) \ No newline at end of file diff --git a/emission/analysis/modelling/trip_model/mapping.py b/emission/analysis/modelling/trip_model/mapping.py new file mode 100644 index 000000000..06f0614ea --- /dev/null +++ b/emission/analysis/modelling/trip_model/mapping.py @@ -0,0 +1,429 @@ +# This file contains helper functions for plotting maps. +import pandas as pd +import numpy as np + +import folium +import branca.element as bre +from scipy.spatial import ConvexHull + +import emission.analysis.modelling.trip_model.data_wrangling as eamtd +from emission.analysis.modelling.trip_model.clustering import add_loc_clusters, ALG_OPTIONS + +DENVER_COORD = [39.7392, -104.9903] +MTV_COORD = [37.3861, -122.0839] +CLM_COORD = [34.0967, -117.7198] + +# list of valid default colors in folium +COLORS = [ + 'darkred', + 'orange', + 'gray', + # 'green', # reserved for correct labels + 'darkblue', + # 'lightblue', # too hard to see on map + 'purple', + # 'pink', # too hard to see on map + 'darkgreen', + 'lightgreen', + # 'darkpurple', # this color does not exist? + 'cadetblue', + # 'lightgray', # too hard to see point on map + # 'black', # reserved for no_pred + 'blue', + # 'red', # reserved for noise/unlabeled data/incorrect labels + # 'lightred', # this color does not exist in folium? + # 'beige', # too hard to see point on map +] + + +def find_plot_clusters(user_df, + user_entry, + loc_type, + alg, + clustering_way, + SVM=False, + radii=[50, 100, 150, 200], + cluster_unlabeled=False, + plot_unlabeled=False, + optics_min_samples=None, + optics_xi=0.05, + optics_cluster_method='xi', + svm_min_size=6, + svm_purity_thresh=0.7, + svm_gamma=0.05, + svm_C=1, + map_loc=MTV_COORD): + """ Plot points and clusters on a folium map. + + Points with the same purpose will have the same color (unless there are more purposes than available colors in folium, in which case some colors may be duplicated). Hovering over a point will also reveal the purpose in the tooltip. + + The clusters are visualized as convex hulls; their color doesn't mean anything for now, it's simply so we can distinguish between distinct clusters (which will be helpful when clusters overlap). + + Args: + user_df (dataframe): must contain the following columns: + 'start_loc', 'end_loc', 'user_input' + loc_type (str): 'start' or 'end', the type of points to cluster + alg (str): the clustering algorithm to be used. must be one of the + following: 'DBSCAN', 'naive', 'OPTICS', 'SVM', 'fuzzy' or + 'mean_shift' + clustering_way(str): 'origin'or 'destination' or 'origin-destination'. + Decides the way we can cluster trips geospatially. + SVM (bool): whether or not to sub-divide clusters with SVM + radii (int list): list of radii to pass to the clustering alg + cluster_unlabeled (bool): whether or not unlabeled points are used + to generate clusters. + plot_unlabeled (bool): whether or not to plot unlabeled points. If + True, they will be plotted as red points. + optics_min_samples (int): number of min samples if using the OPTICS + algorithm. + optics_xi (float): xi value if using the xi method of the OPTICS algorithm. + optics_cluster_method (str): method to use for the OPTICS + algorithm. either 'xi' or 'dbscan' + svm_min_size (int): the min number of trips a cluster must have to + be considered for sub-division, if using SVM + svm_purity_thresh (float): the min purity a cluster must have to be + sub-divided, if using SVM + svm_gamma (float): if using SVM, the gamma hyperparameter + svm_C (float): if using SVM, the C hyperparameter + map_loc (array-like): lat and lon coordinate for the default folium + map position. + + """ + # TODO: refactor to take in kwargs so we can remove the mess of optics_* + # variables that I was using when manually tuning that algorithm + assert loc_type == 'start' or loc_type == 'end' + assert 'start_loc' in user_df.columns + assert 'end_loc' in user_df.columns + assert 'user_input' in user_df.columns + assert clustering_way in ['origin','destination','origin-destination'] + assert alg in ALG_OPTIONS + + fig = bre.Figure(figsize=(20, 20)) + fig_index = 0 + + # clean up the dataframe by dropping entries with NaN locations and + # reset index (because naive needs the position of each trip to match + # its nominal index) + all_trips_df = user_df.dropna(subset=['start_loc', 'end_loc']).reset_index( + drop=True) + + # expand the 'start_loc' and 'end_loc' column into 'start_lat', + # 'start_lon', 'end_lat', and 'end_lon' columns + all_trips_df = eamtd.expand_coords(all_trips_df) + + labeled_trips_df = all_trips_df.loc[all_trips_df.user_input != {}].dropna( + subset=['purpose_confirm']) + + if cluster_unlabeled: + df_for_cluster = all_trips_df + else: + df_for_cluster = labeled_trips_df + + df_for_cluster = add_loc_clusters( + df_for_cluster, + user_entry, + clustering_way, + radii=radii, + alg=alg, + SVM=SVM, + # cluster_unlabeled=cluster_unlabeled, + loc_type=loc_type, + min_samples=1, + optics_min_samples=optics_min_samples, + optics_xi=optics_xi, + optics_cluster_method=optics_cluster_method, + svm_min_size=svm_min_size, + svm_purity_thresh=svm_purity_thresh, + svm_gamma=svm_gamma, + svm_C=svm_C) + + for r in radii: + fig_index = fig_index + 1 + m = folium.Map( + location=map_loc, + zoom_start=12, + tiles= + 'https://{s}.basemaps.cartocdn.com/rastertiles/voyager_nolabels/{z}/{x}/{y}{r}.png', + attr= + '© OpenStreetMap contributors © CARTO' + ) + folium.TileLayer( + tiles= + 'https://stamen-tiles-{s}.a.ssl.fastly.net/toner-lines/{z}/{x}/{y}{r}.png', + attr= + 'Map tiles by Stamen Design, CC BY 3.0 — Map data © OpenStreetMap contributors' + ).add_to(m) + # folium.TileLayer('Stamen Toner').add_to(m) + + cluster_ids = df_for_cluster[ + f"{loc_type}_{alg}_clusters_{r}_m"].unique() + + # draw the convex hull of the clusters + for i in range(len(cluster_ids)): + c = cluster_ids[i] + if c == -1: + print( + 'we should never get here because we want to convert the -1 cluster into single-trip clusters' + ) + continue + + points_in_cluster = df_for_cluster[ + df_for_cluster[f"{loc_type}_{alg}_clusters_{r}_m"] == c] + + if np.isnan(c): + # if False: + if len(points_in_cluster) == 0: + continue + else: + print(points_in_cluster) + print(df_for_cluster[df_for_cluster[ + f"{loc_type}_{alg}_clusters_{r}_m"].isnull()]) + raise Exception( + 'nan cluster detected; all trips should have a proper cluster index' + ) + m = plot_cluster_border( + points_in_cluster, + loc_type=loc_type, + m=m, + color='gray', + # color=COLORS[i % (len(COLORS) - 1)], + cluster_idx=c) + + # plot all the destinations, color-coordinated by purpose + # we want to plot these on *top* of the cluster circles so that we can + # hover over the points and see the purpose on the tooltip + m = plot_user_trips(user_df, + loc_type, + plot_100=False, + plot_unlabeled=plot_unlabeled, + m=m) + + # add plot to the figure + fig.add_subplot(len(radii) / 2 + len(radii) % 2, 2, + fig_index).add_child(m) + + return fig + + +def plot_model_clusters( + model, + category, + # purpose_col='purpose_confirm', + m=None, + map_loc=CLM_COORD): + """ category (str): 'test' or 'train' """ + loc_type = 'end' + + if m == None: + m = folium.Map(location=map_loc, zoom_start=12) + + if category == 'test': + df = model.test_df + elif category == 'train': + df = model.train_df + + cluster_ids = df['final_cluster_idx'].unique() + + # draw the convex hull of the clusters + for i in range(len(cluster_ids)): + c = cluster_ids[i] + if c == -1: + print( + 'we should never get here because we want to convert the -1 cluster into single-trip clusters' + ) + continue + + points_in_cluster = df[df['final_cluster_idx'] == c] + + if np.isnan(c): + print(points_in_cluster) + print(df[df['final_cluster_idx'].isnull()]) + raise Exception( + 'nan cluster detected; all trips should have a proper cluster index' + ) + m = plot_cluster_border(points_in_cluster, + loc_type=loc_type, + m=m, + color=COLORS[i % (len(COLORS) - 1)], + cluster_idx=c) + + # plot all the destinations, color-coordinated by purpose + # we want to plot these on *top* of the cluster circles so that we can + # hover over the points and see the purpose on the tooltip + # m = plot_user_trips(df, loc_type, plot_100=False, plot_unlabeled=True, m=m) + + return m + + +def plot_user_trips(user_df, + loc_type, + plot_100=True, + plot_500=False, + plot_unlabeled=False, + purpose_col='purpose_confirm', + color=None, + m=None): + """ Args: + user_df (dataframe): must contain the columns 'start/end_lat/lon' + loc_type (str): 'start' or 'end' + plot_100 (bool): whether or not to plot 100m radius circles around + each location point + plot_500 (bool): whether or not to plot 500m radius circles around + each location point + plot_unlabeled (bool): whether or not to plot unlabeled points (if + so, they will be red) + m (folium.Map): optional, an existing map onto which this function + will plot markers + """ + assert loc_type == 'start' or loc_type == 'end' + + if m is None: + m = folium.Map(location=MTV_COORD, zoom_start=13) + + purpose_list = user_df[purpose_col].dropna().unique() + + # plot circles with a 500m radius around each point + if plot_500: + for i, purpose in enumerate(purpose_list): + if color is None and i < len(COLORS): + color = COLORS[i] + elif color is None: + color = COLORS[len(COLORS) - 1] + purpose_trips = user_df[user_df[purpose_col] == purpose] + for j in range(len(purpose_trips)): + coords = purpose_trips[loc_type + + '_loc'].iloc[j]['coordinates'] + folium.Circle([coords[1], coords[0]], + radius=500, + color=color, + opacity=0.2, + fill=True, + fill_opacity=0.1, + weight=1).add_to(m) + if plot_unlabeled: + unlabeled_trips = user_df[user_df[purpose_col].isna()] + for j in range(len(unlabeled_trips)): + coords = unlabeled_trips[loc_type + + '_loc'].iloc[j]['coordinates'] + folium.Circle([coords[1], coords[0]], + radius=500, + color='red', + opacity=0.2, + fill=True, + fill_opacity=0.1, + weight=1).add_to(m) + + +# plot circles with a 100m radius around each point + if plot_100: + for i, purpose in enumerate(purpose_list): + if i < len(COLORS): + color = COLORS[i] + else: + color = COLORS[len(COLORS) - 1] + purpose_trips = user_df[user_df[purpose_col] == purpose] + for j in range(len(purpose_trips)): + coords = purpose_trips[loc_type + + '_loc'].iloc[j]['coordinates'] + folium.Circle([coords[1], coords[0]], + radius=100, + color=color, + opacity=0.2, + fill=True, + fill_opacity=0.1, + weight=1).add_to(m) + if plot_unlabeled: + unlabeled_trips = user_df[user_df[purpose_col].isna()] + for j in range(len(unlabeled_trips)): + coords = unlabeled_trips[loc_type + + '_loc'].iloc[j]['coordinates'] + folium.Circle([coords[1], coords[0]], + radius=100, + color='red', + opacity=0.2, + fill=True, + fill_opacity=0.1, + weight=1).add_to(m) + + # plot small circle marker on the very top so it doesn't get obscured by + # the layers of 100m/500m circles + for i, purpose in enumerate(purpose_list): + if i < len(COLORS): + color = COLORS[i] + else: + color = COLORS[len(COLORS) - 1] + # print('{:<15} {:<15}'.format(color, purpose)) + purpose_trips = user_df[user_df[purpose_col] == purpose] + # print(purpose_trips) + for j in range(len(purpose_trips)): + coords = purpose_trips[loc_type + '_loc'].iloc[j]['coordinates'] + # print(purpose_trips.iloc[j]) + # print(purpose_trips.iloc[j].index) + trip_idx = purpose_trips.iloc[j].name + folium.CircleMarker([coords[1], coords[0]], + radius=2.5, + color=color, + tooltip=purpose + ' ' + + str(trip_idx)).add_to(m) + if plot_unlabeled: + unlabeled_trips = user_df[user_df[purpose_col].isna()] + for j in range(len(unlabeled_trips)): + coords = unlabeled_trips[loc_type + '_loc'].iloc[j]['coordinates'] + trip_idx = unlabeled_trips.iloc[j].name + folium.CircleMarker([coords[1], coords[0]], + radius=2.5, + color='red', + tooltip='UNLABELED' + ' ' + + str(trip_idx)).add_to(m) + + return m + + +def plot_cluster_border(points_df, + loc_type, + m=None, + color='green', + cluster_idx=None): + """ plots a convex hull around the given points. + + Args: + points_df: dataframe with columns 'xxx_lat' and 'xxx_lon' + loc_type (str): 'start' or 'end', the type of points to cluster + m (folium.Map): optional, an existing map onto which this function + will plot markers + color (str): cluster color. must be valid in folium. + cluster_idx (int): cluster index, to be added to tooltip + """ + assert loc_type == 'start' or loc_type == 'end' + if m is None: + m = folium.Map(location=MTV_COORD, zoom_start=12) + + lats = points_df[loc_type + '_lat'].tolist() + lons = points_df[loc_type + '_lon'].tolist() + points = np.array([lats, lons]).T + + if len(points) > 2: + hull = ConvexHull(points) + border_points = points[hull.vertices] + else: + border_points = points + + if cluster_idx is not None: + folium.Polygon( + border_points, # list of points (latitude, longitude) + color=color, + weight=15, + opacity=0.6, + fill=True, + fill_opacity=0.5, + tooltip=f'cluster {cluster_idx}').add_to(m) + else: + folium.Polygon( + border_points, # list of points (latitude, longitude) + color=color, + weight=20, + opacity=0.6, + fill=True, + fill_opacity=0.5).add_to(m) + + return m diff --git a/emission/analysis/modelling/trip_model/models.py b/emission/analysis/modelling/trip_model/models.py new file mode 100644 index 000000000..e5fc08b46 --- /dev/null +++ b/emission/analysis/modelling/trip_model/models.py @@ -0,0 +1,2092 @@ +import pandas as pd +import numpy as np +from abc import ABCMeta, abstractmethod # to define abstract class "blueprints" +import logging +import copy + +# sklearn imports +from sklearn.pipeline import make_pipeline +from sklearn.preprocessing import StandardScaler, OneHotEncoder +from sklearn.impute import SimpleImputer +from sklearn.metrics.pairwise import haversine_distances +from sklearn.cluster import DBSCAN +from sklearn import svm +from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier +from sklearn.tree import DecisionTreeClassifier +from sklearn.exceptions import NotFittedError + +# our imports +from emission.analysis.modelling.trip_model.clustering import get_distance_matrix, single_cluster_purity +import emission.analysis.modelling.trip_model.data_wrangling as eamtd +import emission.storage.decorations.trip_queries as esdtq +from emission.analysis.classification.inference.labels.inferrers import predict_cluster_confidence_discounting +import emission.core.wrapper.entry as ecwe +import emission.analysis.modelling.trip_model.greedy_similarity_binning as eamtg +import emission.core.common as ecc +import emission.analysis.modelling.trip_model.model_storage as eamums +import emission.analysis.modelling.trip_model.model_type as eamumt +import emission.analysis.modelling.trip_model.run_model as eamur + + +import emission.analysis.modelling.trip_model.clustering as eamtc +# NOTE: tour_model_extended.similarity is on the +# eval-private-data-compatibility branch in e-mission-server + +# logging.basicConfig(level=logging.DEBUG) + +EARTH_RADIUS = 6371000 + +############################# +## define abstract classes ## +############################# + + +class SetupMixin(metaclass=ABCMeta): + """ class containing code to be reused when setting up estimators. """ + + @abstractmethod + def set_params(self, params): + """ Set the parameters of the estimator. + + Args: + params (dict): dictionary where the keys are the param names + (strings) and the values are the parameter inputs + + Returns: + self + """ + raise NotImplementedError + + def _clean_data(self, df): + """ Clean a dataframe of trips. + (Drop trips with missing start/end locations, expand the user input + columns, ensure all essential columns are present) + + Args: + df: a dataframe of trips. must contain the columns 'start_loc', + 'end_loc', and should also contain the user input columns + ('mode_confirm', 'purpose_confirm', 'replaced_mode') if + available + """ + assert 'start_loc' in df.columns and 'end_loc' in df.columns + + # clean up the dataframe by dropping entries with NaN locations and + # reset index + num_nan = 0 + if df.start_loc.isna().any(): + num_nan += df.start_loc.value_counts(dropna=False).loc[np.nan] + df = df.dropna(subset=['start_loc']) + if df.end_loc.isna().any(): + num_nan += df.end_loc.value_counts(dropna=False).loc[np.nan] + df = df.dropna(subset=['end_loc']) + + # expand the 'start_loc' and 'end_loc' column into 'start_lat', + # 'start_lon', 'end_lat', and 'end_lon' columns + df = eamtd.expand_coords(df) + + # drop trips with missing coordinates + if df.start_lat.isna().any(): + num_nan += df.start_lat.value_counts(dropna=False).loc[np.nan] + df = df.dropna(subset=['start_lat']) + if df.start_lon.isna().any(): + num_nan += df.start_lon.value_counts(dropna=False).loc[np.nan] + df = df.dropna(subset=['start_lon']) + if df.end_lat.isna().any(): + num_nan += df.end_lat.value_counts(dropna=False).loc[np.nan] + df = df.dropna(subset=['end_lat']) + if df.end_lon.isna().any(): + num_nan = df.end_lon.value_counts(dropna=False).loc[np.nan] + df += df.dropna(subset=['end_lon']) + if num_nan > 0: + logging.info( + f'dropped {num_nan} trips that are missing location coordinates' + ) + + df = df.rename( + columns={ + 'mode_confirm': 'mode_true', + 'purpose_confirm': 'purpose_true', + 'replaced_mode': 'replaced_true' + }) + + for category in ['mode_true', 'purpose_true', 'replaced_true']: + if category not in df.columns: + # for example, if a user labels all their trip modes but none of their trip purposes + df.loc[:, category] = np.nan + + return df.reset_index(drop=True) + + +class Cluster(SetupMixin, metaclass=ABCMeta): + """ blueprint for clustering models. """ + + @abstractmethod + def fit(self, train_df,train_entry_list): + """ Fit the clustering algorithm. + + Args: + train_df (DataFrame): dataframe of labeled trips + train_entry_list (List) : A list of trips where each element is of Entry type + Returns: + self + """ + raise NotImplementedError + + @abstractmethod + def predict(self, test_df): + """ Predict cluster indices for trips, if possible. Trips that could + not be clustered will have the index -1. + + Args: + test_df (DataFrame): dataframe of test trips + + Returns: + pd DataFrame containing one column, 'start_cluster_idx' or + 'end_cluster_idx' + """ + raise NotImplementedError + + def fit_predict(self, train_df): + """ Fit the clustering algorithm and predict cluster indices for trips, + if possible. Trips that could not be clustered will have the index -1. + + Args: + train_df (DataFrame): dataframe of labeled trips + + Returns: + pd DataFrame containing one column, 'start_cluster_idx' or + 'end_cluster_idx' + """ + self.fit(train_df) + return self.predict(train_df) + + +class TripClassifier(SetupMixin, metaclass=ABCMeta): + + @abstractmethod + def fit(self, train_df,unused=None): + """ Fit a classification model. + + Args: + train_df (DataFrame): dataframe of labeled trips + unused (List) : A list of Entry type of labeled and unlabeled trips which is not used in current function. + Passed to keep fit function generic. + Returns: + self + """ + raise NotImplementedError + + def predict(self, test_df): + """ Predict trip labels. + + Args: + test_df (DataFrame): dataframe of trips + + Returns: + DataFrame containing the following columns: + 'purpose_pred', 'mode_pred', 'replaced_pred', + 'purpose_proba', 'mode_proba', 'replaced_proba' + the *_pred columns contain the most-likely label prediction + (string for a label or float for np.nan). + the *_proba columns contain the probability of the most-likely + prediction. + """ + proba_df = self.predict_proba(test_df) + prediction_df = proba_df.loc[:, [('purpose', 'top_pred'), + ('purpose', 'top_proba'), + ('mode', 'top_pred'), + ('mode', 'top_proba'), + ('replaced', 'top_pred'), + ('replaced', 'top_proba')]] + + prediction_df.columns = prediction_df.columns.to_flat_index() + prediction_df = prediction_df.rename( + columns={ + ('purpose', 'top_pred'): 'purpose_pred', + ('purpose', 'top_proba'): 'purpose_proba', + ('mode', 'top_pred'): 'mode_pred', + ('mode', 'top_proba'): 'mode_proba', + ('replaced', 'top_pred'): 'replaced_pred', + ('replaced', 'top_proba'): 'replaced_proba', + }) + + return prediction_df + + def fit_predict(self, train_df): + """ Fit a classification model and predict trip labels. + + Args: + train_df (DataFrame): dataframe of labeled trips + + Returns: + DataFrame containing the following columns: + 'purpose_pred', 'mode_pred', 'replaced_pred', + 'purpose_proba', 'mode_proba', 'replaced_proba' + the *_pred columns contain the most-likely label prediction + (string for a label or float for np.nan). + the *_proba columns contain the probability of the most-likely + prediction. + """ + self.fit(train_df) + return self.predict(train_df) + + @abstractmethod + def predict_proba(self, test_df): + """ Predict class probabilities for each trip. + + NOTE: check the specific model to see if the class probabilities + have confidence-discounting or not. + + Args: + test_df (DataFrame): dataframe of trips + + Returns: + DataFrame with multiindexing. Each row represents a trip. There + are 3 columns at level 1, one for each label category + ('purpose', 'mode', 'replaced'). Within each category, there is + a column for each label, with the row's entry being the + probability that the trip has the label. There are three + additional columns within each category, one indicating the + most-likely label, one indicating the probability of the + most-likely label, and one indicating whether or not the trip + can be clustered. + TODO: add a fourth optional column for the number of trips in + the cluster (if clusterable) + + Level 1 columns are: purpose, mode, replaced + Lebel 2 columns are: + , , ... top_pred, top_proba, clusterable + , , ... top_pred, top_proba, clusterable + , , ... top_pred, top_proba, clusterable + """ + raise NotImplementedError + + +######################## +## clustering classes ## +######################## + + +class RefactoredNaiveCluster(Cluster): + """ Naive fixed-width clustering algorithm. + Refactored from the existing Similarity class to take in dataframes for + consistency, and allows for separate clustering of start and end + clusters. + + WARNING: this algorithm is *extremely* slow. + + Args: + loc_type (str): 'start' or 'end', the type of point to cluster + radius (int): max distance between all pairs of points in a + cluster, i.e. strict maximum cluster width. + + Attributes: + loc_type (str) + radius (int) + train_df (DataFrame) + test_df (DataFrame) + sim_model (Similarity object) + """ + + def __init__(self, loc_type='end', radius=100): + logging.info("PERF: Initializing RefactoredNaiveCluster") + self.loc_type = loc_type + self.radius = radius + + def set_params(self, params): + if 'loc_type' in params.keys(): self.loc_type = params['loc_type'] + if 'radius' in params.keys(): self.radius = params['radius'] + + return self + + def fit(self, unused,train_entry_list=None): + # clean data + logging.info("PERF: Fitting RefactoredNaiveCluster with size %s" % len(unused)) + self.train_df = self._clean_data(unused) + + # we can use all trips as long as they have purpose labels. it's ok if + # they're missing mode/replaced-mode labels, because they aren't as + # strongly correlated with location compared to purpose + # TODO: actually, we may want to rethink this. for example, it will + # probably be helpful to include trips that are missing purpose labels + # but still have mode labels. + if self.train_df.purpose_true.isna().any(): + num_nan = self.train_df.purpose_true.value_counts( + dropna=False).loc[np.nan] + logging.info( + f'dropping {num_nan}/{len(self.train_df)} trips that are missing purpose labels' + ) + self.train_df = self.train_df.dropna( + subset=['purpose_true']).reset_index(drop=True) + if len(self.train_df) == 0: + # i.e. no valid trips after removing all nans + raise Exception('no valid trips; nothing to fit') + + model_config = { + "metric": "od_similarity", + "similarity_threshold_meters": self.radius, # meters, + "apply_cutoff": False, + "clustering_way":'origin' if self.loc_type=='start' + else 'destination' if self.loc_type =='end' + else 'origin-destination', + "incremental_evaluation": False + } + + # fit the bins + self.sim_model= eamtg.GreedySimilarityBinning(model_config) + cleaned_trip_entry= eamtc.cleanEntryTypeData(self.train_df,train_entry_list) + self.sim_model.fit(cleaned_trip_entry) + + labels = [int(l) for l in self.sim_model.tripLabels] + self.train_df.loc[:, f'{self.loc_type}_cluster_idx'] = labels + return self + + def predict(self, test_df): + logging.info("PERF: Predicting RefactoredNaiveCluster for %s" % len(test_df)) + self.test_df = self._clean_data(test_df) + + if self.loc_type == 'start': + bins = self.sim_model.bins + elif self.loc_type == 'end': + bins = self.sim_model.bins + + # This looks weird but works + # >>> x = [(1, 'a'), (2, 'b'), (3, 'c')] + # >>> {int(key):value for key,value in x} + # {1: 'a', 2: 'b', 3: 'c'} + # + # bins = { '1': [ 'key1': [] , 'key2' :[],.. ....], + # '2': ['key1': [] , 'key2' :[],...], + # '3': ['key1': [] , 'key2' :[],.....] ...} + # + # the code below converts above to + # + # bins = { 1: [ 'key1': [] , 'key2' :[],.. ....], + # 2: ['key1': [] , 'key2' :[],...], + # 3: ['key1': [] , 'key2' :[],.....] ....} + # + # This is why it works : + # 1. Iterate over (key,value) pairs in 'bins.items()' + # 2. for each pair, 'key' is a string . so use int(key) to convert it into an integer. + # 3. Create a new dictionary(using {} within the dictionary comprehension) + # where the keys are now integers and the values are same + + bins = {int(key):value for key,value in bins.items()} + labels = [] + + # for each trip in the test list: + for idx, row in self.test_df.iterrows(): + if idx % 100 == 0: + logging.info("PERF: RefactoredNaiveCluster Working on trip %s/%s" % (idx, len(self.test_df))) + # iterate over all bins + trip_binned = False + for i in bins: + # check if the trip can fit in the bin + # if so, get the bin index. + # + # 'feature_rows' is the key that contains the list of list where + # each of the inner list takes the form : + # + # [ start_lon,start_lat,end_lon,end_lat] + if self._match(row, bins[i]['feature_rows'], self.loc_type): + labels += [i] + trip_binned = True + break + # if not, return -1 + if not trip_binned: + labels += [-1] + + self.test_df.loc[:, f'{self.loc_type}_cluster_idx'] = labels + + return self.test_df[[f'{self.loc_type}_cluster_idx']] + + def _match(self, trip, bin, loc_type): + """ Check if a trip can fit into an existing bin. + + copied from the Similarity class on the e-mission-server. + """ + for trip_in_bin in bin: + if not self._distance_helper(trip, trip_in_bin, loc_type): + return False + return True + + def _distance_helper(self, tripa, tripb, loc_type): + """ Check if two trips have start/end points within the distance + threshold. + """ + #tripa is taken from the test datframe. + #tripb is taken from the stored bin list. + pta_lat = tripa[[loc_type + '_lat']] + pta_lon = tripa[[loc_type + '_lon']] + if loc_type == 'start': + ptb_lat = tripb[1] + ptb_lon = tripb[0] + elif loc_type == 'end': + ptb_lat = tripb[3] + ptb_lon = tripb[2] + + dist= ecc.calDistance([pta_lon,pta_lat],[ptb_lon,ptb_lat]) + return dist <= self.radius + + +class DBSCANSVMCluster(Cluster): + """ DBSCAN-based clustering algorithm that optionally implements SVM + sub-clustering. + + Args: + loc_type (str): 'start' or 'end', the type of point to cluster + radius (int): max distance between two points in each other's + neighborhood, i.e. DBSCAN's eps value. does not strictly + dictate final cluster size + size_thresh (int): the min number of trips a cluster must have + to be considered for SVM sub-division + purity_thresh (float): the min purity a cluster must have + to be sub-divided using SVM + gamma (float): coefficient for the rbf kernel in SVM + C (float): regularization hyperparameter for SVM + + Attributes: + loc_type (str) + radius (int) + size_thresh (int) + purity_thresh (float) + gamma (float) + C (float) + train_df (DataFrame) + test_df (DataFrame) + base_model (sklearn Estimator) + """ + + def __init__(self, + loc_type='end', + radius=100, + svm=True, + size_thresh=1, + purity_thresh=1.0, + gamma=0.05, + C=1): + logging.info("PERF: Initializing DBSCANSVMCluster") + self.loc_type = loc_type + self.radius = radius + self.svm = svm + self.size_thresh = size_thresh + self.purity_thresh = purity_thresh + self.gamma = gamma + self.C = C + + def set_params(self, params): + if 'loc_type' in params.keys(): self.loc_type = params['loc_type'] + if 'radius' in params.keys(): self.radius = params['radius'] + if 'svm' in params.keys(): self.svm = params['svm'] + if 'size_thresh' in params.keys(): + self.size_thresh = params['size_thresh'] + if 'purity_thresh' in params.keys(): + self.purity_thresh = params['purity_thresh'] + if 'gamma' in params.keys(): self.gamma = params['gamma'] + + return self + + def fit(self, train_df,unused=None): + """ Creates clusters of trip points. + self.train_df will be updated with columns containing base and + final clusters. + + TODO: perhaps move the loc_type argument to fit() so we can use a + single class instance to cluster both start and end points. This + will also help us reduce duplicate data. + + Args: + train_df (dataframe): dataframe of labeled trips + unused (List) : A list of Entry type of labeled and unlabeled trips which is not used in current function. + Passed to keep fit function generic. """ + ################## + ### clean data ### + ################## + logging.info("PERF: Fitting DBSCANSVMCluster") + self.train_df = self._clean_data(train_df) + + # we can use all trips as long as they have purpose labels. it's ok if + # they're missing mode/replaced-mode labels, because they aren't as + # strongly correlated with location compared to purpose + # TODO: actually, we may want to rethink this. for example, it will + # probably be helpful to include trips that are missing purpose labels + # but still have mode labels. + if self.train_df.purpose_true.isna().any(): + num_nan = self.train_df.purpose_true.value_counts( + dropna=False).loc[np.nan] + logging.info( + f'dropping {num_nan}/{len(self.train_df)} trips that are missing purpose labels' + ) + self.train_df = self.train_df.dropna( + subset=['purpose_true']).reset_index(drop=True) + if len(self.train_df) == 0: + # i.e. no valid trips after removing all nans + raise Exception('no valid trips; nothing to fit') + + ######################### + ### get base clusters ### + ######################### + dist_matrix_meters = get_distance_matrix(self.train_df, self.loc_type) + self.base_model = DBSCAN(self.radius, + metric="precomputed", + min_samples=1).fit(dist_matrix_meters) + base_clusters = self.base_model.labels_ + + self.train_df.loc[:, + f'{self.loc_type}_base_cluster_idx'] = base_clusters + + ######################## + ### get sub-clusters ### + ######################## + # copy base cluster column into final cluster column + self.train_df.loc[:, f'{self.loc_type}_cluster_idx'] = self.train_df[ + f'{self.loc_type}_base_cluster_idx'] + + if self.svm: + c = 0 # count of how many clusters we have iterated over + + # iterate over all clusters and subdivide them with SVM. the while + # loop is so we can do multiple iterations of subdividing if needed + while c < self.train_df[f'{self.loc_type}_cluster_idx'].max(): + points_in_cluster = self.train_df[ + self.train_df[f'{self.loc_type}_cluster_idx'] == c] + + # only do SVM if we have the minimum num of trips in the cluster + if len(points_in_cluster) < self.size_thresh: + c += 1 + continue + + # only do SVM if purity is below threshold + purity = single_cluster_purity(points_in_cluster, + label_col='purpose_true') + if purity < self.purity_thresh: + X = points_in_cluster[[ + f"{self.loc_type}_lon", f"{self.loc_type}_lat" + ]] + y = points_in_cluster.purpose_true.to_list() + + svm_model = make_pipeline( + StandardScaler(), + svm.SVC( + kernel='rbf', + gamma=self.gamma, + C=self.C, + )).fit(X, y) + labels = svm_model.predict(X) + unique_labels = np.unique(labels) + + # if the SVM predicts that all points in the cluster have + # the same label, just ignore it and don't reindex. + # this also helps us to handle the possibility that a + # cluster may be impure but inherently inseparable, e.g. an + # end cluster at a user's home, containing 50% trips from + # work to home and 50% round trips that start and end at + # home. we don't want to reindex otherwise the low purity + # will trigger SVM again, and we will attempt & fail to + # split the cluster ad infinitum + if len(unique_labels) > 1: + # map purpose labels to new cluster indices + # we offset indices by the max existing index so that we + # don't run into any duplicate indices + max_existing_idx = self.train_df[ + f'{self.loc_type}_cluster_idx'].max() + label_to_cluster = { + unique_labels[i]: i + max_existing_idx + 1 + for i in range(len(unique_labels)) + } + # update trips with their new cluster indices + indices = np.array( + [label_to_cluster[l] for l in labels]) + self.train_df.loc[ + self.train_df[f'{self.loc_type}_cluster_idx'] == c, + f'{self.loc_type}_cluster_idx'] = indices + + c += 1 + # TODO: make things categorical at the end? or maybe at the start of the decision tree pipeline + + return self + + def fit_predict(self, train_df): + """ Override to avoid unnecessarily computation of distance matrices. + """ + self.fit(train_df) + return self.train_df[[f'{self.loc_type}_cluster_idx']] + + def predict(self, test_df): + logging.info("PERF: Predicting DBSCANSVMCluster") + # TODO: store clusters as polygons so the prediction is faster + # TODO: we probably don't want to store test_df in self to be more memory-efficient + self.test_df = self._clean_data(test_df) + pred_clusters = self._NN_predict(self.test_df) + + self.test_df.loc[:, f'{self.loc_type}_cluster_idx'] = pred_clusters + + return self.test_df[[f'{self.loc_type}_cluster_idx']] + + def _NN_predict(self, test_df): + """ Generate base-cluster predictions for the test data using a + nearest-neighbor approach. + + sklearn doesn't implement predict() for DBSCAN, which is why we + need a custom method. + """ + logging.info("PERF: NN_predicting DBSCANSVMCluster") + n_samples = test_df.shape[0] + labels = np.ones(shape=n_samples, dtype=int) * -1 + + # get coordinates of core points (we can't use model.components_ + # because our input feature was a distance matrix and doesn't contain + # info about the raw coordinates) + # NOTE: technically, every single point in a cluster is a core point + # because it has at least minPts (2) points, including itself, in its + # radius + train_coordinates = self.train_df[[ + f'{self.loc_type}_lat', f'{self.loc_type}_lon' + ]] + train_radians = np.radians(train_coordinates) + + for idx, row in test_df.reset_index(drop=True).iterrows(): + # calculate the distances between the ith test data and all points, + # then find the index of the closest point. if the ith test data is + # within epsilon of the point, then assign its cluster to the ith + # test data (otherwise, leave it as -1, indicating noise). + # unfortunately, pairwise_distances_argmin() does not support + # haversine distance, so we have to reimplement it ourselves + new_loc_radians = np.radians( + row[[self.loc_type + "_lat", self.loc_type + "_lon"]].to_list()) + new_loc_radians = np.reshape(new_loc_radians, (1, 2)) + dist_matrix_meters = haversine_distances( + new_loc_radians, train_radians) * EARTH_RADIUS + + shortest_dist_idx = np.argmin(dist_matrix_meters) + if dist_matrix_meters[0, shortest_dist_idx] < self.radius: + labels[idx] = self.train_df.reset_index( + drop=True).loc[shortest_dist_idx, + f'{self.loc_type}_cluster_idx'] + + return labels + + +###################### +## trip classifiers ## +###################### + + +class NaiveBinningClassifier(TripClassifier): + """ Trip classifier using the existing Similarity class and associated + functions without refactoring them. Essentially a wrapper for the + existing code on e-mission-server. + + Args: + radius (int): maximum distance between any two points in the same + cluster + """ + + def __init__(self, radius=500): + logging.info("PERF: Initializing NaiveBinningClassifier") + self.radius = radius + + def set_params(self, params): + if 'radius' in params.keys(): self.radius = params['radius'] + + return self + + def fit(self, train_df,unused=None): + logging.info("PERF: Fitting NaiveBinningClassifier") + # (copied from bsm.build_user_model()) + + # convert train_df to a list because the existing binning algorithm + # only accepts lists of Entry objects + train_trips = self._trip_df_to_list(train_df) + + + model_config = { + "metric": "od_similarity", + "similarity_threshold_meters": self.radius, # meters, + "apply_cutoff": False, + "clustering_way": "origin-destination", #cause thats what is set in performance_eval.py for this model + "incremental_evaluation": False + } + + sim_model = eamtg.GreedySimilarityBinning(model_config) + sim_model.fit(train_trips) + # set instance variables so we can access results later as well + self.sim = sim_model + self.bins = sim_model.bins + + # save all user labels + user_id = train_df.user_id.iloc[0] + model_type=eamumt.ModelType.GREEDY_SIMILARITY_BINNING + model_storage=eamums.ModelStorage.DOCUMENT_DATABASE + model_data_next=sim_model.to_dict() + last_done_ts = eamur._latest_timestamp(train_trips) + eamums.save_model(user_id, model_type, model_data_next, last_done_ts, model_storage) + + return self + + def predict_proba(self, test_df): + """ NOTE: these class probabilities have the confidence-discounting + heuristic applied. + """ + # convert test_df to a list because the existing binning algorithm + # only accepts lists of Entry objects + logging.info("PERF: Predicting NaiveBinningClassifier") + test_trips = self._trip_df_to_list(test_df) + + purpose_distribs = [] + mode_distribs = [] + replaced_distribs = [] + + for trip in test_trips: + trip_prediction = predict_cluster_confidence_discounting(trip) + + if len(trip_prediction) == 0: + # model could not find cluster for the trip + purpose_distribs += [{}] + mode_distribs += [{}] + replaced_distribs += [{}] + + else: + trip_prediction_df = pd.DataFrame(trip_prediction).rename( + columns={'labels': 'user_input'}) + # renaming is simply so we can use the expand_userinputs + # function + + expand_prediction = esdtq.expand_userinputs(trip_prediction_df) + # converts the 'labels' dictionaries into individual columns + + # sum up probability for each label + for label_type, label_distribs in zip( + ['purpose_confirm', 'mode_confirm', 'replaced_mode'], + [purpose_distribs, mode_distribs, replaced_distribs]): + label_distrib = {} + if label_type in expand_prediction.columns: + for label in expand_prediction[label_type].unique(): + label_distrib[label] = expand_prediction.loc[ + expand_prediction[label_type] == label, + 'p'].sum() + label_distribs += [label_distrib] + + proba_dfs = [] + for label_type, label_distribs in zip( + ['purpose', 'mode', 'replaced'], + [purpose_distribs, mode_distribs, replaced_distribs]): + + proba = pd.DataFrame(label_distribs) + proba['clusterable'] = proba.sum(axis=1) > 0 + proba['top_pred'] = proba.drop(columns=['clusterable']).idxmax( + axis=1) + proba['top_proba'] = proba.drop( + columns=['clusterable', 'top_pred']).max(axis=1, skipna=True) + classes = proba.columns[:-3] + proba.loc[:, classes] = proba.loc[:, classes].fillna(0) + proba = pd.concat([proba], keys=[label_type], axis=1) + proba_dfs += [proba] + + self.proba_df = pd.concat(proba_dfs, axis=1) + return self.proba_df + + def _trip_df_to_list(self, trip_df): + """ Converts a dataframe of trips into a list of trip Entry objects. + + Allows this class to accept DataFrames (which are used by the new + clustering algorithms) without having to refactor the old + clustering algorithm. + + Args: + trip_df: DataFrame containing trips. See code below for the + expected columns. + + """ + trips_list = [] + + for idx, row in trip_df.iterrows(): + data = { + 'source': row['source'], + 'end_ts': row['end_ts'], + # 'end_local_dt':row['end_local_dt'], # this attribute doesn't seem to appear in the dataframes I've tested with + 'end_fmt_time': row['end_fmt_time'], + 'end_loc': row['end_loc'], + 'raw_trip': row['raw_trip'], + 'start_ts': row['start_ts'], + # 'start_local_dt':row['start_local_dt'], # this attribute doesn't seem to appear in the dataframes I've tested with + 'start_fmt_time': row['start_fmt_time'], + 'start_loc': row['start_loc'], + 'duration': row['duration'], + 'distance': row['distance'], + 'start_place': row['start_place'], + 'end_place': row['end_place'], + 'cleaned_trip': row['cleaned_trip'], + 'inferred_labels': row['inferred_labels'], + 'inferred_trip': row['inferred_trip'], + 'expectation': row['expectation'], + 'confidence_threshold': row['confidence_threshold'], + 'expected_trip': row['expected_trip'], + 'user_input': row['user_input'] + } + trip = ecwe.Entry.create_entry(user_id=row['user_id'], + key='analysis/confirmed_trip', + data=data) + trips_list += [trip] + + return trips_list + + +class ClusterExtrapolationClassifier(TripClassifier): + """ Classifier that extrapolates labels from a trip's cluster. + + Args: + alg (str): clustering algorithm to use; either 'DBSCAN' or 'naive' + radius (int): radius for the clustering algorithm + svm (bool): whether or not to use SVM sub-clustering. (only when + alg=='DBSCAN') + size_thresh (int): the min number of trips a cluster must have + to be considered for SVM sub-division + purity_thresh (float): the min purity a cluster must have + to be sub-divided using SVM + gamma (float): coefficient for the rbf kernel in SVM + C (float): regularization hyperparameter for SVM + cluster_method (str): 'end', 'trip', 'combination'. whether to + extrapolate labels from only end clusters, only trip clusters, + or both end and trip clusters when available. + """ + + def __init__( + self, + alg='DBSCAN', + radius=100, # TODO: add diff start and end radii + svm=True, + size_thresh=1, + purity_thresh=1.0, + gamma=0.05, + C=1, + cluster_method='end'): + assert cluster_method in ['end', 'trip', 'combination'] + assert alg in ['DBSCAN', 'naive'] + self.alg = alg + self.radius = radius + self.svm = svm + self.size_thresh = size_thresh + self.purity_thresh = purity_thresh + self.gamma = gamma + self.C = C + self.cluster_method = cluster_method + + if self.alg == 'DBSCAN': + self.end_cluster_model = DBSCANSVMCluster( + loc_type='end', + radius=self.radius, + svm=self.svm, + size_thresh=self.size_thresh, + purity_thresh=self.purity_thresh, + gamma=self.gamma, + C=self.C) + elif self.alg == 'naive': + self.end_cluster_model = RefactoredNaiveCluster(loc_type='end', + radius=self.radius) + + if self.cluster_method in ['trip', 'combination']: + if self.alg == 'DBSCAN': + self.start_cluster_model = DBSCANSVMCluster( + loc_type='start', + radius=self.radius, + svm=self.svm, + size_thresh=self.size_thresh, + purity_thresh=self.purity_thresh, + gamma=self.gamma, + C=self.C) + elif self.alg == 'naive': + self.start_cluster_model = RefactoredNaiveCluster( + loc_type='start', radius=self.radius) + + self.trip_grouper = TripGrouper( + start_cluster_col='start_cluster_idx', + end_cluster_col='end_cluster_idx') + + def set_params(self, params): + """ hacky code that mimics the set_params of an sklearn Estimator class + so that we can pass params during randomizedsearchCV + + Args: + params (dict): a dictionary where the keys are the parameter + names and the values are the parameter values + """ + alg = params['alg'] if 'alg' in params.keys() else self.alg + radius = params['radius'] if 'radius' in params.keys() else self.radius + svm = params['svm'] if 'svm' in params.keys() else self.svm + size_thresh = params['size_thresh'] if 'size_thresh' in params.keys( + ) else self.size_thresh + purity_thresh = params[ + 'purity_thresh'] if 'purity_thresh' in params.keys( + ) else self.purity_thresh + gamma = params['gamma'] if 'gamma' in params.keys() else self.gamma + C = params['C'] if 'C' in params.keys() else self.C + cluster_method = params[ + 'cluster_method'] if 'cluster_method' in params.keys( + ) else self.cluster_method + + # calling __init__ again is not good practice, I know... + self.__init__(alg, radius, svm, size_thresh, purity_thresh, gamma, C, + cluster_method) + + return self + + def fit(self, train_df,train_entry_list=None): + # fit clustering model + self.end_cluster_model.fit(train_df,train_entry_list) + self.train_df = self.end_cluster_model.train_df + + if self.cluster_method in ['trip', 'combination']: + self.start_cluster_model.fit(train_df,train_entry_list) + self.train_df.loc[:, ['start_cluster_idx' + ]] = self.start_cluster_model.train_df[[ + 'start_cluster_idx' + ]] + + # create trip-level clusters + trip_cluster_idx = self.trip_grouper.fit_transform(self.train_df) + self.train_df.loc[:, 'trip_cluster_idx'] = trip_cluster_idx + + return self + + def predict_proba(self, test_df): + """ NOTE: these class probabilities do NOT have a + confidence-discounting heuristic applied. + """ + self.end_cluster_model.predict(test_df) + # store a copy of test_df for now (TODO: make this more efficient since + # the data is duplicated) + self.test_df = self.end_cluster_model.test_df + + if self.cluster_method in ['trip', 'combination']: + self.start_cluster_model.predict(test_df) + # append the start cluster indices + self.test_df.loc[:, [ + 'start_cluster_idx' + ]] = self.start_cluster_model.test_df.loc[:, ['start_cluster_idx']] + + # create trip-level clusters + trip_cluster_idx = self.trip_grouper.transform(self.test_df) + self.test_df.loc[:, 'trip_cluster_idx'] = trip_cluster_idx + + # extrapolate label distributions from cluster information + self.test_df.loc[:, [ + 'mode_distrib', 'purpose_distrib', 'replaced_distrib' + ]] = np.nan + + if self.cluster_method in ['end', 'trip']: + cluster_col = f'{self.cluster_method}_cluster_idx' + self.test_df = self._add_label_distributions( + self.test_df, cluster_col) + + else: # self.cluster_method == 'combination' + # try to get label distributions from trip-level clusters first, + # because trip-level clusters tend to be more homogenous and will + # yield more accurate predictions + self.test_df = self._add_label_distributions( + self.test_df, 'trip_cluster_idx') + + # for trips that have an empty label-distribution after the first + # pass using trip clusters, try to get a distribution from the + # destination cluster (this includes both trips that *don't* fall + # into a trip cluster, as well as trips that *do* fall into a trip + # cluster but are missing some/all categories of labels due to + # missing user inputs.) + + # fill in missing label-distributions by the label_type + # (we want to iterate by label_type rather than check cluster idx + # because it's possible that some trips in a trip-cluster have + # predictions for one label_type but not another) + for label_type in ['mode', 'purpose', 'replaced']: + self.test_df.loc[self.test_df[f'{label_type}_distrib'] == + {}] = self._add_label_distributions( + self.test_df.loc[ + self.test_df[f'{label_type}_distrib'] + == {}], + 'end_cluster_idx', + label_types=[label_type]) + + # create the dataframe of probabilities + proba_dfs = [] + for label_type in ['purpose', 'mode', 'replaced']: + classes = self.train_df[f'{label_type}_true'].dropna().unique() + proba = pd.DataFrame( + self.test_df[f'{label_type}_distrib'].to_list(), + columns=classes) + proba['top_pred'] = proba.idxmax(axis=1) + proba['top_proba'] = proba.max(axis=1, skipna=True) + proba['clusterable'] = self.test_df.end_cluster_idx >= 0 + proba.loc[:, classes] = proba.loc[:, classes].fillna(0) + proba = pd.concat([proba], keys=[label_type], axis=1) + proba_dfs += [proba] + + self.proba_df = pd.concat(proba_dfs, axis=1) + return self.proba_df + + def _add_label_distributions(self, + df, + cluster_col, + label_types=['mode', 'purpose', 'replaced']): + """ Add label distributions to a DataFrame. + + Args: + df (DataFrame): DataFrame containing a column of clusters + cluster_col (str): name of column in df containing clusters + label_types (str list): the categories of labels to retrieve + distributions for. + + Returns: + a DataFrame with additional columns in which the entries are + dictionaries containing label distributions. + """ + df = df.copy() # to avoid SettingWithCopyWarning + for c in df.loc[:, cluster_col].unique(): + labeled_trips_in_cluster = self.train_df.loc[ + self.train_df[cluster_col] == c] + unlabeled_trips_in_cluster = df.loc[df[cluster_col] == c] + + cluster_size = len(unlabeled_trips_in_cluster) + + for label_type in label_types: + assert label_type in ['mode', 'purpose', 'replaced'] + + # get distribution of label_type labels in this cluster + distrib = labeled_trips_in_cluster[ + f'{label_type}_true'].value_counts(normalize=True, + dropna=True).to_dict() + # TODO: add confidence discounting + + # update predictions + # convert the dict into a list of dicts to work around pandas + # thinking we're trying to insert information according to a + # key-value map + # TODO: this is the line throwing the set on slice warning + df.loc[df[cluster_col] == c, + f'{label_type}_distrib'] = [distrib] * cluster_size + + return df + + +class EnsembleClassifier(TripClassifier, metaclass=ABCMeta): + """ Template class for trip classifiers using ensemble algorithms. + + Required args: + loc_feature (str): 'coordinates' or 'cluster' + """ + base_features = [ + 'duration', + 'distance', + 'start_local_dt_year', + 'start_local_dt_month', + 'start_local_dt_day', + 'start_local_dt_hour', + # 'start_local_dt_minute', + 'start_local_dt_weekday', + 'end_local_dt_year', # most likely the same as the start year + 'end_local_dt_month', # most likely the same as the start month + 'end_local_dt_day', + 'end_local_dt_hour', + # 'end_local_dt_minute', + 'end_local_dt_weekday', + ] + targets = ['mode_true', 'purpose_true', 'replaced_true'] + + # required instance attributes + loc_feature = NotImplemented + purpose_enc = NotImplemented + mode_enc = NotImplemented + purpose_predictor = NotImplemented + mode_predictor = NotImplemented + replaced_predictor = NotImplemented + + # required methods + def fit(self, train_df,unused=None): + # get location features + if self.loc_feature == 'cluster': + # fit clustering model(s) and one-hot encode their indices + # TODO: consolidate start/end_cluster_model in a single instance + # that has a location_type parameter in the fit() method + self.end_cluster_model.fit(train_df) + + clusters_to_encode = self.end_cluster_model.train_df[[ + 'end_cluster_idx' + ]].copy() # copy is to avoid SettingWithCopyWarning + + if self.use_start_clusters or self.use_trip_clusters: + self.start_cluster_model.fit(train_df) + + if self.use_start_clusters: + clusters_to_encode = pd.concat([ + clusters_to_encode, + self.start_cluster_model.train_df[['start_cluster_idx']] + ], + axis=1) + if self.use_trip_clusters: + start_end_clusters = pd.concat([ + self.end_cluster_model.train_df[['end_cluster_idx']], + self.start_cluster_model.train_df[['start_cluster_idx']] + ], + axis=1) + trip_cluster_idx = self.trip_grouper.fit_transform( + start_end_clusters) + clusters_to_encode.loc[:, + 'trip_cluster_idx'] = trip_cluster_idx + + loc_features_df = self.cluster_enc.fit_transform( + clusters_to_encode.astype(int)) + + # clean the df again because we need it in the next step + # TODO: remove redundancy + self.train_df = self._clean_data(train_df) + + # TODO: move below code into a reusable function + if self.train_df.purpose_true.isna().any(): + num_nan = self.train_df.purpose_true.value_counts( + dropna=False).loc[np.nan] + logging.info( + f'dropping {num_nan}/{len(self.train_df)} trips that are missing purpose labels' + ) + self.train_df = self.train_df.dropna( + subset=['purpose_true']).reset_index(drop=True) + if len(self.train_df) == 0: + # i.e. no valid trips after removing all nans + raise Exception('no valid trips; nothing to fit') + + else: # self.loc_feature == 'coordinates' + self.train_df = self._clean_data(train_df) + + # TODO: move below code into a reusable function + if self.train_df.purpose_true.isna().any(): + num_nan = self.train_df.purpose_true.value_counts( + dropna=False).loc[np.nan] + logging.info( + f'dropping {num_nan}/{len(self.train_df)} trips that are missing purpose labels' + ) + self.train_df = self.train_df.dropna( + subset=['purpose_true']).reset_index(drop=True) + if len(self.train_df) == 0: + # i.e. no valid trips after removing all nans + raise Exception('no valid trips; nothing to fit') + + loc_features_df = self.train_df[[ + 'start_lon', 'start_lat', 'end_lon', 'end_lat' + ]] + + # prepare data for the ensemble classifiers + + # note that we want to use purpose data to aid our mode predictions, + # and use both purpose and mode data to aid our replaced-mode + # predictions + # thus, we want to one-hot encode the purpose and mode as data + # features, but also preserve an unencoded copy for the target columns + + # dataframe holding all features and targets + self.Xy_train = pd.concat( + [self.train_df[self.base_features + self.targets], loc_features_df], + axis=1) + + # encode purposes and modes + onehot_purpose_df = self.purpose_enc.fit_transform( + self.Xy_train[['purpose_true']], output_col_prefix='purpose') + onehot_mode_df = self.mode_enc.fit_transform( + self.Xy_train[['mode_true']], output_col_prefix='mode') + self.Xy_train = pd.concat( + [self.Xy_train, onehot_purpose_df, onehot_mode_df], axis=1) + + # for predicting purpose, drop encoded purpose and mode features, as + # well as all target labels + self.X_purpose = self.Xy_train.dropna(subset=['purpose_true']).drop( + labels=self.targets + self.purpose_enc.onehot_encoding_cols + + self.mode_enc.onehot_encoding_cols, + axis=1) + + # for predicting mode, we want to keep purpose data + self.X_mode = self.Xy_train.dropna(subset=['mode_true']).drop( + labels=self.targets + self.mode_enc.onehot_encoding_cols, axis=1) + + # for predicting replaced-mode, we want to keep purpose and mode data + self.X_replaced = self.Xy_train.dropna(subset=['replaced_true']).drop( + labels=self.targets, axis=1) + + self.y_purpose = self.Xy_train['purpose_true'].dropna() + self.y_mode = self.Xy_train['mode_true'].dropna() + self.y_replaced = self.Xy_train['replaced_true'].dropna() + + # fit classifiers + if len(self.X_purpose) > 0: + self.purpose_predictor.fit(self.X_purpose, self.y_purpose) + if len(self.X_mode) > 0: + self.mode_predictor.fit(self.X_mode, self.y_mode) + if len(self.X_replaced) > 0: + self.replaced_predictor.fit(self.X_replaced, self.y_replaced) + + return self + + def predict_proba(self, test_df): + """ NOTE: these class probabilities do NOT have a + confidence-discounting heuristic applied. + """ + ################ + ### get data ### + ################ + self.X_test_for_purpose = self._get_X_test_for_purpose(test_df) + + ######################## + ### make predictions ### + ######################## + # note that we want to use purpose data to aid our mode predictions, + # and use both purpose and mode data to aid our replaced-mode + # predictions + + # TODO: some of the code across the try and except blocks can be + # consolidated by considering one-hot encoding fully np.nan arrays + try: + purpose_proba_raw = self.purpose_predictor.predict_proba( + self.X_test_for_purpose) + purpose_proba = pd.DataFrame( + purpose_proba_raw, columns=self.purpose_predictor.classes_) + purpose_pred = purpose_proba.idxmax(axis=1) + + # update X_test with one-hot-encoded purpose predictions to aid + # mode predictor + # TODO: converting purpose_pred to a DataFrame feels super + # unnecessary, make this more efficient + onehot_purpose_df = self.purpose_enc.transform( + pd.DataFrame(purpose_pred).set_index( + self.X_test_for_purpose.index)) + self.X_test_for_mode = pd.concat( + [self.X_test_for_purpose, onehot_purpose_df], axis=1) + + mode_proba, replaced_proba = self._try_predict_proba_mode_replaced() + + except NotFittedError as e: + # if we can't predict purpose, we can still try to predict mode and + # replaced-mode without one-hot encoding the purpose + + purpose_pred = np.full((len(self.X_test_for_purpose), ), np.nan) + purpose_proba_raw = np.full((len(self.X_test_for_purpose), 1), 0) + purpose_proba = pd.DataFrame(purpose_proba_raw, columns=[np.nan]) + + self.X_test_for_mode = self.X_test_for_purpose + mode_proba, replaced_proba = self._try_predict_proba_mode_replaced() + + mode_pred = mode_proba.idxmax(axis=1) + replaced_pred = replaced_proba.idxmax(axis=1) + + if (purpose_pred.dtype == np.float64 and mode_pred.dtype == np.float64 + and replaced_pred.dtype == np.float64): + # this indicates that all the predictions are np.nan so none of the + # random forest classifiers were fitted + raise NotFittedError + + # TODO: move this to a Mixin for cluster-based predictors and use the + # 'cluster' column of the proba_df outputs + # if self.drop_unclustered: + # # TODO: actually, we should only drop purpose predictions. we can + # # then impute the missing entries in the purpose feature and still + # # try to predict mode and replaced-mode without it + # self.predictions.loc[ + # self.end_cluster_model.test_df['end_cluster_idx'] == -1, + # ['purpose_pred', 'mode_pred', 'replaced_pred']] = np.nan + + proba_dfs = [] + for label_type, proba in zip( + ['purpose', 'mode', 'replaced'], + [purpose_proba, mode_proba, replaced_proba]): + proba['top_pred'] = proba.idxmax(axis=1) + proba['top_proba'] = proba.max(axis=1, skipna=True) + proba['clusterable'] = self._clusterable( + self.X_test_for_purpose).astype(bool) + proba = pd.concat([proba], keys=[label_type], axis=1) + proba_dfs += [proba] + + self.proba_df = pd.concat(proba_dfs, axis=1) + return self.proba_df + + def _get_X_test_for_purpose(self, test_df): + """ Do the pre-processing to get data that we can then pass into the + ensemble classifiers. + """ + if self.loc_feature == 'cluster': + # get clusters + self.end_cluster_model.predict(test_df) + clusters_to_encode = self.end_cluster_model.test_df[[ + 'end_cluster_idx' + ]].copy() # copy is to avoid SettingWithCopyWarning + + if self.use_start_clusters or self.use_trip_clusters: + self.start_cluster_model.predict(test_df) + + if self.use_start_clusters: + clusters_to_encode = pd.concat([ + clusters_to_encode, + self.start_cluster_model.test_df[['start_cluster_idx']] + ], + axis=1) + if self.use_trip_clusters: + start_end_clusters = pd.concat([ + self.end_cluster_model.test_df[['end_cluster_idx']], + self.start_cluster_model.test_df[['start_cluster_idx']] + ], + axis=1) + trip_cluster_idx = self.trip_grouper.transform( + start_end_clusters) + clusters_to_encode.loc[:, + 'trip_cluster_idx'] = trip_cluster_idx + + # one-hot encode the cluster indices + loc_features_df = self.cluster_enc.transform(clusters_to_encode) + else: # self.loc_feature == 'coordinates' + test_df = self._clean_data(test_df) + loc_features_df = test_df[[ + 'start_lon', 'start_lat', 'end_lon', 'end_lat' + ]] + + # extract the desired data + X_test = pd.concat([ + test_df[self.base_features].reset_index(drop=True), + loc_features_df.reset_index(drop=True) + ], + axis=1) + + return X_test + + def _try_predict_proba_mode_replaced(self): + """ Try to predict mode and replaced-mode. Handles error in case the + ensemble algorithms were not fitted. + + Requires self.X_test_for_mode to have already been set. (These are + the DataFrames containing the test data to be passed into self. + mode_predictor.) + + Returns: mode_proba and replaced_proba, two DataFrames containing + class probabilities for mode and replaced-mode respectively + """ + + try: + # predict mode + mode_proba_raw = self.mode_predictor.predict_proba( + self.X_test_for_mode) + mode_proba = pd.DataFrame(mode_proba_raw, + columns=self.mode_predictor.classes_) + mode_pred = mode_proba.idxmax(axis=1) + + # update X_test with one-hot-encoded mode predictions to aid + # replaced-mode predictor + onehot_mode_df = self.mode_enc.transform( + pd.DataFrame(mode_pred).set_index(self.X_test_for_mode.index)) + self.X_test_for_replaced = pd.concat( + [self.X_test_for_mode, onehot_mode_df], axis=1) + replaced_proba = self._try_predict_proba_replaced() + + except NotFittedError as e: + mode_proba_raw = np.full((len(self.X_test_for_mode), 1), 0) + mode_proba = pd.DataFrame(mode_proba_raw, columns=[np.nan]) + + # if we don't have mode predictions, we *could* still try to + # predict replaced mode (but if the user didn't input mode labels + # then it's unlikely they would input replaced-mode) + self.X_test_for_replaced = self.X_test_for_mode + replaced_proba = self._try_predict_proba_replaced() + + return mode_proba, replaced_proba + + def _try_predict_proba_replaced(self): + """ Try to predict replaced mode. Handles error in case the + replaced_predictor was not fitted. + + Requires self.X_test_for_replaced to have already been set. (This + is the DataFrame containing the test data to be passed into self. + replaced_predictor.) + + Returns: replaced_proba, DataFrame containing class probabilities + for replaced-mode + """ + try: + replaced_proba_raw = self.replaced_predictor.predict_proba( + self.X_test_for_replaced + ) # has shape (len_trips, number of replaced_mode classes) + replaced_proba = pd.DataFrame( + replaced_proba_raw, columns=self.replaced_predictor.classes_) + + except NotFittedError as e: + replaced_proba_raw = np.full((len(self.X_test_for_replaced), 1), 0) + replaced_proba = pd.DataFrame(replaced_proba_raw, columns=[np.nan]) + + return replaced_proba + + def _clusterable(self, test_df): + """ Check if the end points can be clustered (i.e. are within + meters of an end point from the training set) + """ + if self.loc_feature == 'cluster': + return self.end_cluster_model.test_df.end_cluster_idx >= 0 + + n_samples = test_df.shape[0] + clustered = np.ones(shape=n_samples, dtype=int) * False + + train_coordinates = self.train_df[['end_lat', 'end_lon']] + train_radians = np.radians(train_coordinates) + + for idx, row in test_df.reset_index(drop=True).iterrows(): + # calculate the distances between the ith test data and all points, + # then find the minimum distance for each point and check if it's + # within the distance threshold. + # unfortunately, pairwise_distances_argmin() does not support + # haversine distance, so we have to reimplement it ourselves + new_loc_radians = np.radians(row[["end_lat", "end_lon"]].to_list()) + new_loc_radians = np.reshape(new_loc_radians, (1, 2)) + dist_matrix_meters = haversine_distances( + new_loc_radians, train_radians) * EARTH_RADIUS + + shortest_dist = np.min(dist_matrix_meters) + if shortest_dist < self.radius: + clustered[idx] = True + + return clustered + + +class ForestClassifier(EnsembleClassifier): + """ Random forest-based trip classifier. + + Args: + loc_feature (str): 'coordinates' or 'cluster'; whether to use lat/ + lon coordinates or cluster indices for the location feature + radius (int): radius for DBSCAN clustering. only if + loc_feature=='cluster' + size_thresh (int): the min number of trips a cluster must have to + be considered for sub-division via SVM. only if + loc_feature=='cluster' + purity_thresh (float): the min purity a cluster must have to be + sub-divided via SVM. only if loc_feature=='cluster' + gamma (float): coefficient for the rbf kernel in SVM. only if + loc_feature=='cluster' + C (float): regularization hyperparameter for SVM. only if + loc_feature=='cluster' + n_estimators (int): number of estimators in the random forest + criterion (str): function to measure the quality of a split in the + random forest + max_depth (int): max depth of a tree in the random forest. + unlimited if None. + min_samples_split (int): min number of samples required to split an + internal node in a decision tree + min_samples_leaf (int): min number of samples required for a leaf + node in a decision tree + max_features (str): number of features to consider when looking for + the best split in a decision tree + bootstrap (bool): whether bootstrap samples are used when building + decision trees + random_state (int): random state for deterministic random forest + construction + use_start_clusters (bool): whether or not to use start clusters as + input features to the ensemble classifier. only if + loc_feature=='cluster' + use_trip_clusters (bool): whether or not to use trip-level clusters + as input features to the ensemble classifier. only if + loc_feature=='cluster' + """ + + def __init__( + self, + loc_feature='coordinates', + radius=100, # TODO: add different start and end radii + size_thresh=1, + purity_thresh=1.0, + gamma=0.05, + C=1, + n_estimators=100, + criterion='gini', + max_depth=None, + min_samples_split=2, + min_samples_leaf=1, + max_features='sqrt', + bootstrap=True, + random_state=42, + # drop_unclustered=False, + use_start_clusters=False, + use_trip_clusters=True): + assert loc_feature in ['cluster', 'coordinates'] + self.loc_feature = loc_feature + self.radius = radius + self.size_thresh = size_thresh + self.purity_thresh = purity_thresh + self.gamma = gamma + self.C = C + self.n_estimators = n_estimators + self.criterion = criterion + self.max_depth = max_depth + self.min_samples_split = min_samples_split + self.min_samples_leaf = min_samples_leaf + self.max_features = max_features + self.bootstrap = bootstrap + self.random_state = random_state + # self.drop_unclustered = drop_unclustered + self.use_start_clusters = use_start_clusters + self.use_trip_clusters = use_trip_clusters + + if self.loc_feature == 'cluster': + # clustering algorithm to generate end clusters + self.end_cluster_model = DBSCANSVMCluster( + loc_type='end', + radius=self.radius, + size_thresh=self.size_thresh, + purity_thresh=self.purity_thresh, + gamma=self.gamma, + C=self.C) + + if self.use_start_clusters or self.use_trip_clusters: + # clustering algorithm to generate start clusters + self.start_cluster_model = DBSCANSVMCluster( + loc_type='start', + radius=self.radius, + size_thresh=self.size_thresh, + purity_thresh=self.purity_thresh, + gamma=self.gamma, + C=self.C) + + if self.use_trip_clusters: + # helper class to generate trip-level clusters + self.trip_grouper = TripGrouper( + start_cluster_col='start_cluster_idx', + end_cluster_col='end_cluster_idx') + + # wrapper class to generate one-hot encodings for cluster indices + self.cluster_enc = OneHotWrapper(sparse=False, + handle_unknown='ignore') + + # wrapper class to generate one-hot encodings for purposes and modes + self.purpose_enc = OneHotWrapper(impute_missing=True, + sparse=False, + handle_unknown='error') + self.mode_enc = OneHotWrapper(impute_missing=True, + sparse=False, + handle_unknown='error') + + # ensemble classifiers for each label category + self.purpose_predictor = RandomForestClassifier( + n_estimators=self.n_estimators, + criterion=self.criterion, + max_depth=self.max_depth, + min_samples_split=self.min_samples_split, + min_samples_leaf=self.min_samples_leaf, + max_features=self.max_features, + bootstrap=self.bootstrap, + random_state=self.random_state) + self.mode_predictor = RandomForestClassifier( + n_estimators=self.n_estimators, + criterion=self.criterion, + max_depth=self.max_depth, + min_samples_split=self.min_samples_split, + min_samples_leaf=self.min_samples_leaf, + max_features=self.max_features, + bootstrap=self.bootstrap, + random_state=self.random_state) + self.replaced_predictor = RandomForestClassifier( + n_estimators=self.n_estimators, + criterion=self.criterion, + max_depth=self.max_depth, + min_samples_split=self.min_samples_split, + min_samples_leaf=self.min_samples_leaf, + max_features=self.max_features, + bootstrap=self.bootstrap, + random_state=self.random_state) + + def set_params(self, params): + """ hacky code that mimics the set_params of an sklearn Estimator class + so that we can pass params during randomizedsearchCV + + Args: + params (dict): a dictionary where the keys are the parameter + names and the values are the parameter values + """ + loc_feature = params['loc_feature'] if 'loc_feature' in params.keys( + ) else self.loc_feature + radius = params['radius'] if 'radius' in params.keys() else self.radius + size_thresh = params['size_thresh'] if 'size_thresh' in params.keys( + ) else self.size_thresh + purity_thresh = params[ + 'purity_thresh'] if 'purity_thresh' in params.keys( + ) else self.purity_thresh + gamma = params['gamma'] if 'gamma' in params.keys() else self.gamma + C = params['C'] if 'C' in params.keys() else self.C + n_estimators = params['n_estimators'] if 'n_estimators' in params.keys( + ) else self.n_estimators + criterion = params['criterion'] if 'criterion' in params.keys( + ) else self.criterion + max_depth = params['max_depth'] if 'max_depth' in params.keys( + ) else self.max_depth + min_samples_split = params[ + 'min_samples_split'] if 'min_samples_split' in params.keys( + ) else self.min_samples_split + min_samples_leaf = params[ + 'min_samples_leaf'] if 'min_samples_leaf' in params.keys( + ) else self.min_samples_leaf + max_features = params['max_features'] if 'max_features' in params.keys( + ) else self.max_features + bootstrap = params['bootstrap'] if 'bootstrap' in params.keys( + ) else self.bootstrap + random_state = params['random_state'] if 'random_state' in params.keys( + ) else self.random_state + use_start_clusters = params[ + 'use_start_clusters'] if 'use_start_clusters' in params.keys( + ) else self.use_start_clusters + # drop_unclustered = params[ + # 'drop_unclustered'] if 'drop_unclustered' in params.keys( + # ) else self.drop_unclustered + use_trip_clusters = params[ + 'use_trip_clusters'] if 'use_trip_clusters' in params.keys( + ) else self.use_trip_clusters + + # yes, calling __init__ again is not good practice... + self.__init__(loc_feature, radius, size_thresh, purity_thresh, gamma, C, + n_estimators, criterion, max_depth, min_samples_split, + min_samples_leaf, max_features, bootstrap, random_state, + use_start_clusters, use_trip_clusters) + return self + + +class ClusterForestSlimPredictor(ForestClassifier): + """ This is the same as ForestClassifier, just with fewer base + features. + + Args: + loc_feature (str): 'coordinates' or 'cluster'; whether to use lat/ + lon coordinates or cluster indices for the location feature + radius (int): radius for DBSCAN clustering. only if + loc_feature=='cluster' + size_thresh (int): the min number of trips a cluster must have to + be considered for sub-division via SVM. only if + loc_feature=='cluster' + purity_thresh (float): the min purity a cluster must have to be + sub-divided via SVM. only if loc_feature=='cluster' + gamma (float): coefficient for the rbf kernel in SVM. only if + loc_feature=='cluster' + C (float): regularization hyperparameter for SVM. only if + loc_feature=='cluster' + n_estimators (int): number of estimators in the random forest + criterion (str): function to measure the quality of a split in the + random forest + max_depth (int): max depth of a tree in the random forest. + unlimited if None. + min_samples_split (int): min number of samples required to split an + internal node in a decision tree + min_samples_leaf (int): min number of samples required for a leaf + node in a decision tree + max_features (str): number of features to consider when looking for + the best split in a decision tree + bootstrap (bool): whether bootstrap samples are used when building + decision trees + random_state (int): random state for deterministic random forest + construction + use_start_clusters (bool): whether or not to use start clusters as + input features to the ensemble classifier. only if + loc_feature=='cluster' + use_trip_clusters (bool): whether or not to use trip-level clusters + as input features to the ensemble classifier. only if + loc_feature=='cluster' + """ + + def __init__( + self, + loc_feature='coordinates', + radius=100, # TODO: add different start and end radii + size_thresh=1, + purity_thresh=1.0, + gamma=0.05, + C=1, + n_estimators=100, + criterion='gini', + max_depth=None, + min_samples_split=2, + min_samples_leaf=1, + max_features='sqrt', + bootstrap=True, + random_state=42, + # drop_unclustered=False, + use_start_clusters=False, + use_trip_clusters=True): + + super().__init__(loc_feature, radius, size_thresh, purity_thresh, gamma, + C, n_estimators, criterion, max_depth, + min_samples_split, min_samples_leaf, max_features, + bootstrap, random_state, use_start_clusters, + use_trip_clusters) + + self.base_features = [ + 'duration', + 'distance', + ] + + +class AdaBoostClassifier(EnsembleClassifier): + """ AdaBoost-based trip classifier. + + Args: + loc_feature (str): 'coordinates' or 'cluster'; whether to use lat/ + lon coordinates or cluster indices for the location feature + radius (int): radius for DBSCAN clustering. only if + loc_feature=='cluster' + size_thresh (int): the min number of trips a cluster must have to + be considered for sub-division via SVM. only if + loc_feature=='cluster' + purity_thresh (float): the min purity a cluster must have to be + sub-divided via SVM. only if loc_feature=='cluster' + gamma (float): coefficient for the rbf kernel in SVM. only if + loc_feature=='cluster' + C (float): regularization hyperparameter for SVM. only if + loc_feature=='cluster' + n_estimators (int): number of estimators + criterion (str): function to measure the quality of a split in a + decision tree + max_depth (int): max depth of a tree in the random forest. + unlimited if None. + min_samples_split (int): min number of samples required to split an + internal node in a decision tree + min_samples_leaf (int): min number of samples required for a leaf + node in a decision tree + max_features (str): number of features to consider when looking for + the best split in a decision tree + random_state (int): random state for deterministic random forest + construction + use_start_clusters (bool): whether or not to use start clusters as + input features to the ensemble classifier. only if + loc_feature=='cluster' + use_trip_clusters (bool): whether or not to use trip-level clusters + as input features to the ensemble classifier. only if + loc_feature=='cluster' + learning_rate (float): weight applied to each decision tree at each + boosting iteration + """ + + def __init__( + self, + loc_feature='coordinates', + radius=100, # TODO: add different start and end radii + size_thresh=1, + purity_thresh=1.0, + gamma=0.05, + C=1, + n_estimators=100, + criterion='gini', + max_depth=None, + min_samples_split=2, + min_samples_leaf=1, + max_features='sqrt', + random_state=42, + # drop_unclustered=False, + use_start_clusters=False, + use_trip_clusters=True, + use_base_clusters=True, + learning_rate=1.0): + assert loc_feature in ['cluster', 'coordinates'] + self.loc_feature = loc_feature + self.radius = radius + self.size_thresh = size_thresh + self.purity_thresh = purity_thresh + self.gamma = gamma + self.C = C + self.n_estimators = n_estimators + self.criterion = criterion + self.max_depth = max_depth + self.min_samples_split = min_samples_split + self.min_samples_leaf = min_samples_leaf + self.max_features = max_features + self.random_state = random_state + # self.drop_unclustered = drop_unclustered + self.use_start_clusters = use_start_clusters + self.use_trip_clusters = use_trip_clusters + self.use_base_clusters = use_base_clusters + self.learning_rate = learning_rate + + if self.loc_feature == 'cluster': + # clustering algorithm to generate end clusters + self.end_cluster_model = DBSCANSVMCluster( + loc_type='end', + radius=self.radius, + size_thresh=self.size_thresh, + purity_thresh=self.purity_thresh, + gamma=self.gamma, + C=self.C) + + if self.use_start_clusters or self.use_trip_clusters: + # clustering algorithm to generate start clusters + self.start_cluster_model = DBSCANSVMCluster( + loc_type='start', + radius=self.radius, + size_thresh=self.size_thresh, + purity_thresh=self.purity_thresh, + gamma=self.gamma, + C=self.C) + + if self.use_trip_clusters: + # helper class to generate trip-level clusters + self.trip_grouper = TripGrouper( + start_cluster_col='start_cluster_idx', + end_cluster_col='end_cluster_idx') + + # wrapper class to generate one-hot encodings for cluster indices + self.cluster_enc = OneHotWrapper(sparse=False, + handle_unknown='ignore') + + # wrapper class to generate one-hot encodings for purposes and modes + self.purpose_enc = OneHotWrapper(impute_missing=True, + sparse=False, + handle_unknown='error') + self.mode_enc = OneHotWrapper(impute_missing=True, + sparse=False, + handle_unknown='error') + + self.purpose_predictor = AdaBoostClassifier( + n_estimators=self.n_estimators, + learning_rate=self.learning_rate, + random_state=self.random_state, + base_estimator=DecisionTreeClassifier( + criterion=self.criterion, + max_depth=self.max_depth, + min_samples_split=self.min_samples_split, + min_samples_leaf=self.min_samples_leaf, + max_features=self.max_features, + random_state=self.random_state)) + self.mode_predictor = AdaBoostClassifier( + n_estimators=self.n_estimators, + learning_rate=self.learning_rate, + random_state=self.random_state, + base_estimator=DecisionTreeClassifier( + criterion=self.criterion, + max_depth=self.max_depth, + min_samples_split=self.min_samples_split, + min_samples_leaf=self.min_samples_leaf, + max_features=self.max_features, + random_state=self.random_state)) + self.replaced_predictor = AdaBoostClassifier( + n_estimators=self.n_estimators, + learning_rate=self.learning_rate, + random_state=self.random_state, + base_estimator=DecisionTreeClassifier( + criterion=self.criterion, + max_depth=self.max_depth, + min_samples_split=self.min_samples_split, + min_samples_leaf=self.min_samples_leaf, + max_features=self.max_features, + random_state=self.random_state)) + + def set_params(self, params): + """ hacky code that mimics the set_params of an sklearn Estimator class + so that we can pass params during randomizedsearchCV + + Args: + params (dict): a dictionary where the keys are the parameter + names and the values are the parameter values + """ + radius = params['radius'] if 'radius' in params.keys() else self.radius + size_thresh = params['size_thresh'] if 'size_thresh' in params.keys( + ) else self.size_thresh + purity_thresh = params[ + 'purity_thresh'] if 'purity_thresh' in params.keys( + ) else self.purity_thresh + gamma = params['gamma'] if 'gamma' in params.keys() else self.gamma + C = params['C'] if 'C' in params.keys() else self.C + n_estimators = params['n_estimators'] if 'n_estimators' in params.keys( + ) else self.n_estimators + criterion = params['criterion'] if 'criterion' in params.keys( + ) else self.criterion + max_depth = params['max_depth'] if 'max_depth' in params.keys( + ) else self.max_depth + min_samples_split = params[ + 'min_samples_split'] if 'min_samples_split' in params.keys( + ) else self.min_samples_split + min_samples_leaf = params[ + 'min_samples_leaf'] if 'min_samples_leaf' in params.keys( + ) else self.min_samples_leaf + max_features = params['max_features'] if 'max_features' in params.keys( + ) else self.max_features + random_state = params['random_state'] if 'random_state' in params.keys( + ) else self.random_state + use_start_clusters = params[ + 'use_start_clusters'] if 'use_start_clusters' in params.keys( + ) else self.use_start_clusters + # drop_unclustered = params[ + # 'drop_unclustered'] if 'drop_unclustered' in params.keys( + # ) else self.drop_unclustered + use_trip_clusters = params[ + 'use_trip_clusters'] if 'use_trip_clusters' in params.keys( + ) else self.use_trip_clusters + learning_rate = params[ + 'learning_rate'] if 'learning_rate' in params.keys( + ) else self.learning_rate + + # calling __init__ again is not good practice, I know... + self.__init__(radius, size_thresh, purity_thresh, gamma, C, + n_estimators, criterion, max_depth, min_samples_split, + min_samples_leaf, max_features, random_state, + use_start_clusters, use_trip_clusters, learning_rate) + return self + + +class TripGrouper(): + """ Helper class to get trip clusters from start and end clusters. + + Args: + start_cluster_col (str): name of the column containing start + cluster indices + end_cluster_col (str): name of the column containing end cluster + indices + """ + + def __init__(self, + start_cluster_col='start_cluster_idx', + end_cluster_col='end_cluster_idx'): + self.start_cluster_col = start_cluster_col + self.end_cluster_col = end_cluster_col + + def fit_transform(self, trip_df): + """ Fit and remember possible trip clusters. + + Args: + trip_df (DataFrame): DataFrame containing trips. must have + columns and + """ + trip_groups = trip_df.groupby( + [self.start_cluster_col, self.end_cluster_col]) + + # need dict so we can access the trip indices of all the trips in each + # group. the key is the group tuple and the value is the list of trip + # indices in the group. + self.trip_groups_dict = dict(trip_groups.groups) + + # we want to convert trip-group tuples to to trip-cluster indices, + # hence the pd Series + trip_groups_series = pd.Series(list(self.trip_groups_dict.keys())) + + trip_cluster_idx = np.empty(len(trip_df)) + + for group_idx in range(len(trip_groups_series)): + group_tuple = trip_groups_series[group_idx] + trip_idxs_in_group = self.trip_groups_dict[group_tuple] + trip_cluster_idx[trip_idxs_in_group] = group_idx + + return trip_cluster_idx + + def transform(self, new_trip_df): + """ Get trip clusters for a new set of trips. + + Args: + new_trip_df (DataFrame): DataFrame containing trips. must have + columns and + """ + prediction_trip_groups = new_trip_df.groupby( + [self.start_cluster_col, self.end_cluster_col]) + + # need dict so we can access the trip indices of all the trips in each + # group. the key is the group tuple and the value is the list of trip + # indices in the group. + prediction_trip_groups_dict = dict(prediction_trip_groups.groups) + trip_groups_series = pd.Series(list(self.trip_groups_dict.keys())) + trip_cluster_idx = np.empty(len(new_trip_df)) + + for group_tuple in dict(prediction_trip_groups.groups).keys(): + # check if the trip cluster exists in the training set + trip_idxs_in_group = prediction_trip_groups_dict[group_tuple] + if group_tuple in self.trip_groups_dict.keys(): + # look up the group index from the series we created when we + # fit the model + group_idx = trip_groups_series[trip_groups_series == + group_tuple].index[0] + else: + group_idx = -1 + + trip_cluster_idx[trip_idxs_in_group] = group_idx + + return trip_cluster_idx + + +class OneHotWrapper(): + """ Helper class to streamline one-hot encoding. + + Args: + impute_missing (bool): whether or not to impute np.nan values. + sparse (bool): whether or not to return a sparse matrix. + handle_unknown (str): specifies the way unknown categories are + handled during transform. + """ + + def __init__( + self, + impute_missing=False, + sparse=False, + handle_unknown='ignore', + ): + self.impute_missing = impute_missing + if self.impute_missing: + self.encoder = make_pipeline( + SimpleImputer(missing_values=np.nan, + strategy='constant', + fill_value='missing'), + OneHotEncoder(sparse=False, handle_unknown=handle_unknown)) + else: + self.encoder = OneHotEncoder(sparse=sparse, + handle_unknown=handle_unknown) + + def fit_transform(self, train_df, output_col_prefix=None): + """ Establish one-hot encoded variables. + + Args: + train_df (DataFrame): DataFrame containing train trips. + output_col_prefix (str): only if train_df is a single column + """ + # TODO: handle pd series + + train_df = train_df.copy() # to avoid SettingWithCopyWarning + + # if imputing, the dtype of each column must be string/object and not + # numerical, otherwise the SimpleImputer will fail + if self.impute_missing: + for col in train_df.columns: + train_df[col] = train_df[col].astype(object) + onehot_encoding = self.encoder.fit_transform(train_df) + self.onehot_encoding_cols_all = [] + for col in train_df.columns: + if train_df.shape[1] > 1 or output_col_prefix is None: + output_col_prefix = col + self.onehot_encoding_cols_all += [ + f'{output_col_prefix}_{val}' + for val in np.sort(train_df[col].dropna().unique()) + ] + # we handle np.nan separately because it is of type float, and may + # cause issues with np.sort if the rest of the unique values are + # strings + if any((train_df[col].isna())): + self.onehot_encoding_cols_all += [f'{output_col_prefix}_nan'] + + onehot_encoding_df = pd.DataFrame( + onehot_encoding, + columns=self.onehot_encoding_cols_all).set_index(train_df.index) + + # ignore the encoded columns for missing entries + self.onehot_encoding_cols = copy.deepcopy(self.onehot_encoding_cols_all) + for col in self.onehot_encoding_cols_all: + if col.endswith('_nan'): + onehot_encoding_df = onehot_encoding_df.drop(columns=[col]) + self.onehot_encoding_cols.remove(col) + + return onehot_encoding_df.astype(int) + + def transform(self, test_df): + """ One-hot encoded features in accordance with features seen in the + train set. + + Args: + test_df (DataFrame): DataFrame of trips. + """ + # TODO: rename test_df, this one doesn't necessarily need to be a df + onehot_encoding = self.encoder.transform(test_df) + onehot_encoding_df = pd.DataFrame( + onehot_encoding, + columns=self.onehot_encoding_cols_all).set_index(test_df.index) + + # ignore the encoded columns for missing entries + for col in self.onehot_encoding_cols_all: + if col.endswith('_nan'): + onehot_encoding_df = onehot_encoding_df.drop(columns=[col]) + + return onehot_encoding_df.astype(int)