From 4d6dfa1a88312a73439dd82c47a6a401674e8456 Mon Sep 17 00:00:00 2001 From: Hannah Lu <30671575+hlu109@users.noreply.github.com> Date: Fri, 12 Aug 2022 19:33:07 -0400 Subject: [PATCH 1/6] add copy of code used in TRB paper --- TRB_label_assist/clustering.py | 382 ++++++ TRB_label_assist/data_wrangling.py | 238 ++++ TRB_label_assist/mapping.py | 422 ++++++ TRB_label_assist/models.py | 2035 ++++++++++++++++++++++++++++ 4 files changed, 3077 insertions(+) create mode 100644 TRB_label_assist/clustering.py create mode 100644 TRB_label_assist/data_wrangling.py create mode 100644 TRB_label_assist/mapping.py create mode 100644 TRB_label_assist/models.py diff --git a/TRB_label_assist/clustering.py b/TRB_label_assist/clustering.py new file mode 100644 index 000000000..28a07ab38 --- /dev/null +++ b/TRB_label_assist/clustering.py @@ -0,0 +1,382 @@ +# helper functions to streamline the use and comparison of clustering algs + +# basic imports +import pandas as pd +import numpy as np +import logging + +# import clustering algorithms +import sklearn.metrics.pairwise as smp +import sklearn.cluster as sc +from sklearn import metrics +from sklearn import svm +from sklearn.pipeline import make_pipeline +from sklearn.preprocessing import StandardScaler + +# our imports +import emission.analysis.modelling.tour_model_extended.similarity as eamts +import emission.storage.decorations.trip_queries as esdtq + +EARTH_RADIUS = 6371000 +ALG_OPTIONS = [ + 'DBSCAN', + 'naive', + 'OPTICS', + # 'fuzzy', + 'mean_shift' +] + + +def add_loc_clusters( + loc_df, + radii, + loc_type, + alg, + SVM=False, + # cluster_unlabeled=False, + min_samples=1, + optics_min_samples=None, + optics_xi=0.05, + optics_cluster_method='xi', + svm_min_size=6, + svm_purity_thresh=0.7, + svm_gamma=0.05, + svm_C=1): + """ Given a dataframe of trips, cluster the locations (either start or end + locations) using the desired algorithm & parameters. + + Returns: + Same dataframe, with appended columns that contain the resulting cluster indices. + + Args: + loc_df (dataframe): must have columns 'start_lat' and 'start_lon' + or 'end_lat' and 'end_lon' + radii (int list): list of radii to run the clustering algs with + loc_type (str): 'start' or 'end' + alg (str): 'DBSCAN', 'naive', 'OPTICS', 'SVM', 'fuzzy', or + 'mean_shift' + SVM (bool): whether or not to sub-divide clusters with SVM + # cluster_unlabeled (bool): whether or not unlabeled points are used + # to generate clusters. + min_samples (int): min samples per cluster. used in DBSCAN (and + therefore also SVM and fuzzy, for now) + optics_min_samples (int): min samples per cluster, if using OPTICS. + optics_xi (float): xi value if using the xi method of OPTICS. + optics_cluster_method (str): method to use for the OPTICS + algorithm. either 'xi' or 'dbscan' + svm_min_size (int): the min number of trips a cluster must have to + be considered for sub-division, if using SVM + svm_purity_thresh (float): the min purity a cluster must have to be + sub-divided, if using SVM + svm_gamma (float): if using SVM, the gamma hyperparameter + svm_C (float): if using SVM, the C hyperparameter + """ + assert loc_type == 'start' or loc_type == 'end' + assert alg in ALG_OPTIONS + + # if using SVM, we get the initial clusters with DBSCAN, then sub-divide + if alg == 'DBSCAN': + dist_matrix_meters = get_distance_matrix(loc_df, loc_type) + + for r in radii: + model = sc.DBSCAN(r, metric="precomputed", + min_samples=min_samples).fit(dist_matrix_meters) + labels = model.labels_ + # print(model.n_features_in_) + # print(model.components_.shape) + # print(model.components_) + + # pd.Categorical converts the type from int to category (so + # numerical operations aren't possible) + # loc_df.loc[:, + # f"{loc_type}_DBSCAN_clusters_{r}_m"] = pd.Categorical( + # labels) + # TODO: fix this and make it Categorical again (right now labels are + # ints) + loc_df.loc[:, f"{loc_type}_DBSCAN_clusters_{r}_m"] = labels + + elif alg == 'naive': + for r in radii: + # this is using a modified Similarity class that bins start/end + # points separately before creating trip-level bins + sim_model = eamts.Similarity(loc_df, + radius_start=r, + radius_end=r, + shouldFilter=False, + cutoff=False) + # we only bin the loc_type points to speed up the alg. avoid + # unnecessary binning since this is really slow + sim_model.bin_helper(loc_type=loc_type) + labels = sim_model.data_df[loc_type + '_bin'].to_list() + + # # pd.Categorical converts the type from int to category (so + # # numerical operations aren't possible) + # loc_df.loc[:, f"{loc_type}_{alg}_clusters_{r}_m"] = pd.Categorical( + # labels) + loc_df.loc[:, f"{loc_type}_{alg}_clusters_{r}_m"] = labels + + elif alg == 'OPTICS': + if optics_min_samples == None: + optics_min_samples = 2 + dist_matrix_meters = get_distance_matrix(loc_df, loc_type) + + for r in radii: + labels = sc.OPTICS( + min_samples=optics_min_samples, + max_eps=r, + xi=optics_xi, + cluster_method=optics_cluster_method, + metric="precomputed").fit(dist_matrix_meters).labels_ + + # # pd.Categorical converts the type from int to category (so + # # numerical operations aren't possible) + # loc_df.loc[:, f"{loc_type}_{alg}_clusters_{r}_m"] = pd.Categorical( + # labels) + loc_df.loc[:, f"{loc_type}_{alg}_clusters_{r}_m"] = labels + + elif alg == 'fuzzy': + # create clusters with completely homogeneous purpose labels + # I'm calling this 'fuzzy' for now since the clusters overlap, but I + # need to think of a better name + logging.warning( + 'This alg is not properly implemented and will not generate clusters for unlabeled trips!' + ) + + purpose_list = loc_df.purpose_confirm.dropna().unique() + + for p in purpose_list: + p_loc_df = loc_df.loc[loc_df['purpose_confirm'] == p] + dist_matrix_meters = get_distance_matrix(p_loc_df, loc_type) + + for r in radii: + labels = sc.DBSCAN( + r, metric="precomputed", + min_samples=min_samples).fit(dist_matrix_meters).labels_ + + # pd.Categorical converts the type from int to category (so + # numerical operations aren't possible) + # loc_df.loc[:, + # f"{loc_type}_DBSCAN_clusters_{r}_m"] = pd.Categorical( + # labels) + loc_df.loc[loc_df['purpose_confirm'] == p, + f"{loc_type}_{alg}_clusters_{r}_m"] = labels + + # move "noisy" trips to their own single-trip clusters + noisy_trips = loc_df.loc[(loc_df['purpose_confirm'] == p) & ( + loc_df[f"{loc_type}_{alg}_clusters_{r}_m"] == -1)] + for idx in noisy_trips.index.values: + max_idx_inside_p = loc_df.loc[ + (loc_df['purpose_confirm'] == p), + f"{loc_type}_{alg}_clusters_{r}_m"].max() + loc_df.loc[ + idx, + f"{loc_type}_{alg}_clusters_{r}_m"] = 1 + max_idx_inside_p + + # we offset all cluster indices for purpose p by the max + # existing index excluding purpose p + # so that we don't run into any duplicate trouble + max_idx_outside_p = loc_df.loc[ + (loc_df['purpose_confirm'] != p), + f"{loc_type}_{alg}_clusters_{r}_m"].max(skipna=True) + + if np.isnan(max_idx_outside_p): + # can happen if column is empty, e.g. if this is the first + # purpose in the list that we are iterating over + max_idx_outside_p = -1 + + logging.debug('max_idx_outside_p', max_idx_outside_p, + "at radius", r) + + loc_df.loc[ + loc_df['purpose_confirm'] == p, + f"{loc_type}_{alg}_clusters_{r}_m"] += 1 + max_idx_outside_p + + elif alg == 'mean_shift': + for r in radii: + # seems like the bandwidth is based on the raw lat/lon data (we + # never pass in a distance matrix), so we want a conversion factor + # from meters to degrees. Since 100-500m corresponds to such a + # small degree change, we can rely on the small angle approximation + # and just use a linear multiplier. This conversion factor doesn't + # have to be *super* accurate, its just so we can get a sense of + # what the bandwidth roughly corresponds to in the real world/make + # the value a little more interpretable. + LATLON_TO_M = 1 / 111139 + labels = sc.MeanShift( + bandwidth=LATLON_TO_M * r, + min_bin_freq=min_samples, + cluster_all=False, + ).fit(loc_df[[f"{loc_type}_lon", f"{loc_type}_lat"]]).labels_ + + # pd.Categorical converts the type from int to category (so + # numerical operations aren't possible) + # loc_df.loc[:, + # f"{loc_type}_DBSCAN_clusters_{r}_m"] = pd.Categorical( + # labels) + # TODO: fix this and make it Categorical again (right now labels are + # ints) + loc_df.loc[:, f"{loc_type}_mean_shift_clusters_{r}_m"] = labels + + # move "noisy" trips to their own single-trip clusters + for idx in loc_df.loc[ + loc_df[f"{loc_type}_mean_shift_clusters_{r}_m"] == + -1].index.values: + loc_df.loc[ + idx, f"{loc_type}_mean_shift_clusters_{r}_m"] = 1 + loc_df[ + f"{loc_type}_mean_shift_clusters_{r}_m"].max() + + if SVM: + loc_df = add_loc_SVM(loc_df, radii, alg, loc_type, svm_min_size, + svm_purity_thresh, svm_gamma, svm_C) + return loc_df + + +def add_loc_SVM(loc_df, + radii, + alg, + loc_type, + svm_min_size=6, + svm_purity_thresh=0.7, + svm_gamma=0.05, + svm_C=1, + cluster_cols=None): + """ Sub-divide base clusters using SVM. + + Args: + loc_df (dataframe): must have columns 'start_lat' and 'start_lon' + or 'end_lat' and 'end_lon', as well as + '{loc_type}_{base_alg}_SVM_clusters_{r}_m', containing cluster indices generated by the base clustering alg + radii (int list): list of radii to run the clustering algs with + loc_type (str): 'start' or 'end' + svm_min_size (int): the min number of trips a cluster must have to + be considered for sub-division + svm_purity_thresh (float): the min purity a cluster must have to be + sub-divided + svm_gamma (float): the gamma hyperparameter + svm_C(float): the C hyperparameter + cluster_col (str list): names of column containing cluster indices + of interest + """ + assert loc_type == 'start' or loc_type == 'end' + assert f'{loc_type}_lat' in loc_df.columns + assert f'{loc_type}_lon' in loc_df.columns + + for i in range(len(radii)): + r = radii[i] + if cluster_cols == None: + cluster_col = f"{loc_type}_{alg}_clusters_{r}_m" + else: + cluster_col = cluster_cols[i] + assert cluster_col in loc_df.columns + + # c is the count of how many clusters we have iterated over + c = 0 + # iterate over all clusters and subdivide them with SVM. The while loop + # is so we can do multiple iterations of subdividing if needed + while c < loc_df[cluster_col].max(): + points_in_cluster = loc_df.loc[loc_df[cluster_col] == c] + + labeled_points_in_cluster = points_in_cluster.dropna( + subset=['purpose_confirm']) + + # only do SVM if we have at least labeled 6 points in the cluster + # (or custom min_size) + if len(labeled_points_in_cluster) < svm_min_size: + c += 1 + continue + + # only do SVM if purity is below threshold + purity = single_cluster_purity(labeled_points_in_cluster) + if purity < svm_purity_thresh: + X_train = labeled_points_in_cluster[[ + f"{loc_type}_lon", f"{loc_type}_lat" + ]] + X_all = points_in_cluster[[ + f"{loc_type}_lon", f"{loc_type}_lat" + ]] + y_train = labeled_points_in_cluster.purpose_confirm.to_list() + + labels = make_pipeline( + StandardScaler(), + svm.SVC( + kernel='rbf', + gamma=svm_gamma, + C=svm_C, + )).fit(X_train, y_train).predict(X_all) + + unique_labels = np.unique(labels) + + # map from purpose labels to new cluster indices + # we offset indices by the max existing index so that + # we don't run into any duplicate trouble + max_existing_idx = loc_df[cluster_col].max() + + # # if the indices are Categorical, need to convert to + # # ordered values + # max_existing_idx = np.amax( + # existing_cluster_indices.as_ordered()) + + # labels = np.array(svc.predict(X)) + label_to_cluster = { + unique_labels[i]: i + max_existing_idx + 1 + for i in range(len(unique_labels)) + } + + # if the SVM predicts everything with the same label, just + # ignore it and don't reindex. + + # this also helps us to handle the possibility that a cluster + # may be impure but inherently inseparable, e.g. an end cluster + # containing 50% 'home' trips and 50% round trips to pick up/ + # drop off. we don't want to reindex otherwise the low purity + # will trigger SVM again, and we will attempt & fail to split + # the cluster ad infinitum + if len(unique_labels) > 1: + indices = np.array([label_to_cluster[l] for l in labels]) + + loc_df.loc[loc_df[cluster_col] == c, cluster_col] = indices + + c += 1 + + return loc_df + + +def get_distance_matrix(loc_df, loc_type): + """ Args: + loc_df (dataframe): must have columns 'start_lat' and 'start_lon' + or 'end_lat' and 'end_lon' + loc_type (str): 'start' or 'end' + """ + assert loc_type == 'start' or loc_type == 'end' + + radians_lat_lon = np.radians(loc_df[[loc_type + "_lat", + loc_type + "_lon"]]) + + dist_matrix_meters = pd.DataFrame( + smp.haversine_distances(radians_lat_lon, radians_lat_lon) * + EARTH_RADIUS) + return dist_matrix_meters + + +def single_cluster_purity(points_in_cluster, label_col='purpose_confirm'): + """ Calculates purity of a cluster (i.e. % of trips that have the most + common label) + + Args: + points_in_cluster (df): dataframe containing points in the same + cluster + label_col (str): column in the dataframe containing labels + """ + assert label_col in points_in_cluster.columns + + most_freq_label = points_in_cluster[label_col].mode()[0] + purity = len(points_in_cluster[points_in_cluster[label_col] == + most_freq_label]) / len(points_in_cluster) + return purity + + +def purity_score(y_true, y_pred): + contingency_matrix = metrics.cluster.contingency_matrix(y_true, y_pred) + purity = np.sum(np.amax(contingency_matrix, + axis=0)) / np.sum(contingency_matrix) + return purity diff --git a/TRB_label_assist/data_wrangling.py b/TRB_label_assist/data_wrangling.py new file mode 100644 index 000000000..137886190 --- /dev/null +++ b/TRB_label_assist/data_wrangling.py @@ -0,0 +1,238 @@ +import pandas as pd +import numpy as np +import logging + + +def expand_df_dict(df, column_name): + """ + df: a dataframe that contains a column whose values are dictionaries + column_name: name of the df's column containing dictionary entries + + Returns a dataframe with the desired column expanded into the main dataframe + + This is a generalized version of the expand_userinputs() function from + e-mission-server/emission/storage/decorations/trip_queries.py + """ + if len(df) == 0: + return df + expanded_col = pd.DataFrame(df.loc[:, column_name].to_list(), + index=df.index) + logging.debug(expanded_col.head()) + df = df.drop(columns=[column_name]) + expanded_df = pd.concat([df, expanded_col], axis=1) + assert len(expanded_df) == len(df), \ + ("Mismatch after expanding labels, expanded_df.rows = %s != df.columns %s" % + (len(expanded_df), len(df))) + logging.debug("After expanding, columns went from %s -> %s" % + (len(df.columns), len(expanded_df.columns))) + logging.debug(expanded_df.head()) + return expanded_df + + +# oops, this is actually just the same as pd's explode() +def expand_df_list_vert(df, column_name): + """ + df: a dataframe that contains a column whose values are lists + column_name: name of the df's column containing list entries. (the + length of the list entry can vary from row to row.) + + Returns a dataframe with the desired column expanded vertically into + the main dataframe, i.e. for each row in the original dataframe, there + will be n rows in the expanded dataframe where n is the length of its + list entry under 'column_name' + """ + if len(df) == 0: + return df + + expanded_df_list = [] + for i in range(len(df)): + col_list = df.loc[i, column_name] + for e in col_list: + # add new row to new_df + new_row = df.loc[i].to_dict() + new_row[column_name] = e + expanded_df_list += [new_row] + + if len(expanded_df_list) == 0: + logging.debug( + '{} only has empty lists; expansion failed.'.format(column_name)) + raise Exception('expansion failed; empty lists') + + expanded_df = pd.DataFrame(expanded_df_list) + + assert len(expanded_df.columns) == len(df.columns), \ + ("Mismatch after expanding labels, expanded_df.columns = %s != df.columns %s" % + (len(expanded_df.columns), len(df.columns))) + logging.debug("After expanding, rows went from %s -> %s" % + (len(df), len(expanded_df))) + + return expanded_df + + +def expand_df_list_horiz(df, column_name): + """ + df: a dataframe that contains a column whose values are lists + column_name: name of the df's column containing list entries. (the + length of the list entry must be consistent for all rows.) + + Returns a dataframe with the desired column expanded horizontally into + the main dataframe, i.e. 'column_name' will be replaced by n columns + where n is the length of each list entry + """ + if len(df) == 0: + return df + expanded_col = pd.DataFrame(df.loc[:, column_name].to_list(), + index=df.index) + logging.debug(expanded_col.head()) + df = df.drop(columns=[column_name]) + expanded_df = pd.concat([df, expanded_col], axis=1) + assert len(expanded_df) == len(df), \ + ("Mismatch after expanding labels, expanded_df.rows = %s != df.columns %s" % + (len(expanded_df), len(df))) + logging.debug("After expanding, columns went from %s -> %s" % + (len(df.columns), len(expanded_df.columns))) + logging.debug(expanded_df.head()) + return expanded_df + + +def add_top_pred(df, trip_id_column='trip_id', pred_conf_column='pred_conf'): + """ df: dataframe containing trip ids, predicted labels and confidence level + trip_id_column: string, the name of the column containing trip ids + pred_conf_column: string, the name of the column containing prediction confidence + """ + df['top_pred'] = False + for trip_id in df[trip_id_column].unique(): + id_max = df[df[trip_id_column] == trip_id][pred_conf_column].idxmax( + skipna=True) + if not np.isnan(id_max): + df.loc[id_max, 'top_pred'] = True + + return df + + +def trips_to_df(trips, user_id, os_df=None): + datas = [] + for i in range(len(trips)): + t = trips[i] + if 'inferred_labels' not in t['data'] or t['data'][ + 'inferred_labels'] == []: + data = {'trip_id': t['_id']} + data = update_labels(data, t['data']['user_input'], 'true') + datas.append(data) + + else: + for label in t['data']['inferred_labels']: + data = {'trip_id': t['_id']} + data['pred_conf'] = label['p'] + data = update_labels(data, label['labels'], 'pred') + data = update_labels(data, t['data']['user_input'], 'true') + datas.append(data) + + df = pd.DataFrame(datas, + columns=[ + 'user_id', 'trip_id', 'mode_pred', 'replaced_pred', + 'purpose_pred', 'tuple_pred', 'pred_conf', + 'mode_true', 'replaced_true', 'purpose_true', + 'tuple_true' + ]) + df['user_id'] = user_id + if os_df: + df['os'] = os_df[os_df.user_id == user_id]['curr_platform'].item() + df['tuple_pred'] = df.mode_pred.astype( + str) + ', ' + df.purpose_pred.astype( + str) + ', ' + df.replaced_pred.astype(str) + df['tuple_true'] = df.mode_true.astype( + str) + ', ' + df.purpose_true.astype( + str) + ', ' + df.replaced_true.astype(str) + + # df['tuple_pred'] = list(zip(df.mode_pred, df.purpose_pred, df.replaced_pred)) + # df['tuple_true'] = list(zip(df.mode_true, df.purpose_true, df.replaced_true)) + + # indicates if the predicted label was the top choice (i.e. the first suggestion to the user) + df = add_top_pred(df) + + return df + + +def update_labels(data, user_input, label_type): + """ helper function to populate a dictionary with trip labels. + + Args: + data (dict): dictionary that we want to populate + user_input (dict): the dictionary containing mode_confirm, + purpose_confirm, and replaced_mode information (e.g. + t['data']['user_input'] or t['data']['inferred_labels'][i] ) + label_type (str): 'true' or 'pred' + """ + if user_input != {}: + if 'mode_confirm' in user_input.keys(): + data['mode_' + label_type] = user_input['mode_confirm'] + if data['mode_' + label_type] == 'not_a_trip': + data['replaced_' + label_type] = 'not_a_trip' + data['purpose_' + label_type] = 'not_a_trip' + + else: + if 'replaced_mode' in user_input.keys(): + data['replaced_' + + label_type] = user_input['replaced_mode'] + if 'purpose_confirm' in user_input.keys(): + data['purpose_' + + label_type] = user_input['purpose_confirm'] + + return data + + +def get_labels(trips): + """ helper function to get lists of trip labels from a list of trip dicts.""" + mode_true = [] + purpose_true = [] + replaced_true = [] + + for t in trips: + if 'mode_confirm' in t['data']['user_input']: + mode_true.append(t['data']['user_input']['mode_confirm']) + else: + mode_true.append(None) + + if 'purpose_confirm' in t['data']['user_input']: + purpose_true.append(t['data']['user_input']['purpose_confirm']) + else: + purpose_true.append(None) + + if 'replaced_mode' in t['data']['user_input']: + replaced_true.append(t['data']['user_input']['replaced_mode']) + else: + replaced_true.append(None) + + return mode_true, purpose_true, replaced_true + + +def get_trip_index(trips): + """ helper function to get list of trip indices from a list of trip dicts.""" + trip_indices = [] + for t in trips: + trip_indices.append(t['_id']) + + return trip_indices + + +def expand_coords(exp_df, purpose=None): + """ + copied and modifed from get_loc_df_for_purpose() in the 'Radius + selection' notebook + """ + purpose_trips = exp_df + if purpose is not None: + purpose_trips = exp_df[exp_df.purpose_confirm == purpose] + + dfs = [purpose_trips] + for loc_type in ['start', 'end']: + df = pd.DataFrame( + purpose_trips[loc_type + + "_loc"].apply(lambda p: p["coordinates"]).to_list(), + columns=[loc_type + "_lon", loc_type + "_lat"]) + df = df.set_index(purpose_trips.index) + dfs.append(df) + + # display.display(end_loc_df.head()) + return pd.concat(dfs, axis=1) \ No newline at end of file diff --git a/TRB_label_assist/mapping.py b/TRB_label_assist/mapping.py new file mode 100644 index 000000000..2ef54de46 --- /dev/null +++ b/TRB_label_assist/mapping.py @@ -0,0 +1,422 @@ +# This file contains helper functions for plotting maps. +import pandas as pd +import numpy as np + +import folium +import branca.element as bre +from scipy.spatial import ConvexHull + +import data_wrangling +from clustering import add_loc_clusters, ALG_OPTIONS + +DENVER_COORD = [39.7392, -104.9903] +MTV_COORD = [37.3861, -122.0839] +CLM_COORD = [34.0967, -117.7198] + +# list of valid default colors in folium +COLORS = [ + 'darkred', + 'orange', + 'gray', + # 'green', # reserved for correct labels + 'darkblue', + # 'lightblue', # too hard to see on map + 'purple', + # 'pink', # too hard to see on map + 'darkgreen', + 'lightgreen', + # 'darkpurple', # this color does not exist? + 'cadetblue', + # 'lightgray', # too hard to see point on map + # 'black', # reserved for no_pred + 'blue', + # 'red', # reserved for noise/unlabeled data/incorrect labels + # 'lightred', # this color does not exist in folium? + # 'beige', # too hard to see point on map +] + + +def find_plot_clusters(user_df, + loc_type, + alg, + SVM=False, + radii=[50, 100, 150, 200], + cluster_unlabeled=False, + plot_unlabeled=False, + optics_min_samples=None, + optics_xi=0.05, + optics_cluster_method='xi', + svm_min_size=6, + svm_purity_thresh=0.7, + svm_gamma=0.05, + svm_C=1, + map_loc=MTV_COORD): + """ Plot points and clusters on a folium map. + + Points with the same purpose will have the same color (unless there are more purposes than available colors in folium, in which case some colors may be duplicated). Hovering over a point will also reveal the purpose in the tooltip. + + The clusters are visualized as convex hulls; their color doesn't mean anything for now, it's simply so we can distinguish between distinct clusters (which will be helpful when clusters overlap). + + Args: + user_df (dataframe): must contain the following columns: + 'start_loc', 'end_loc', 'user_input' + loc_type (str): 'start' or 'end', the type of points to cluster + alg (str): the clustering algorithm to be used. must be one of the + following: 'DBSCAN', 'naive', 'OPTICS', 'SVM', 'fuzzy' or + 'mean_shift' + SVM (bool): whether or not to sub-divide clusters with SVM + radii (int list): list of radii to pass to the clustering alg + cluster_unlabeled (bool): whether or not unlabeled points are used + to generate clusters. + plot_unlabeled (bool): whether or not to plot unlabeled points. If + True, they will be plotted as red points. + optics_min_samples (int): number of min samples if using the OPTICS + algorithm. + optics_xi (float): xi value if using the xi method of the OPTICS algorithm. + optics_cluster_method (str): method to use for the OPTICS + algorithm. either 'xi' or 'dbscan' + svm_min_size (int): the min number of trips a cluster must have to + be considered for sub-division, if using SVM + svm_purity_thresh (float): the min purity a cluster must have to be + sub-divided, if using SVM + svm_gamma (float): if using SVM, the gamma hyperparameter + svm_C (float): if using SVM, the C hyperparameter + map_loc (array-like): lat and lon coordinate for the default folium + map position. + + """ + # TODO: refactor to take in kwargs so we can remove the mess of optics_* + # variables that I was using when manually tuning that algorithm + assert loc_type == 'start' or loc_type == 'end' + assert 'start_loc' in user_df.columns + assert 'end_loc' in user_df.columns + assert 'user_input' in user_df.columns + assert alg in ALG_OPTIONS + + fig = bre.Figure(figsize=(20, 20)) + fig_index = 0 + + # clean up the dataframe by dropping entries with NaN locations and + # reset index (because naive needs the position of each trip to match + # its nominal index) + all_trips_df = user_df.dropna(subset=['start_loc', 'end_loc']).reset_index( + drop=True) + + # expand the 'start_loc' and 'end_loc' column into 'start_lat', + # 'start_lon', 'end_lat', and 'end_lon' columns + all_trips_df = data_wrangling.expand_coords(all_trips_df) + + labeled_trips_df = all_trips_df.loc[all_trips_df.user_input != {}].dropna( + subset=['purpose_confirm']) + + if cluster_unlabeled: + df_for_cluster = all_trips_df + else: + df_for_cluster = labeled_trips_df + + df_for_cluster = add_loc_clusters( + df_for_cluster, + radii=radii, + alg=alg, + SVM=SVM, + # cluster_unlabeled=cluster_unlabeled, + loc_type=loc_type, + min_samples=1, + optics_min_samples=optics_min_samples, + optics_xi=optics_xi, + optics_cluster_method=optics_cluster_method, + svm_min_size=svm_min_size, + svm_purity_thresh=svm_purity_thresh, + svm_gamma=svm_gamma, + svm_C=svm_C) + + for r in radii: + fig_index = fig_index + 1 + m = folium.Map( + location=map_loc, + zoom_start=12, + tiles= + 'https://{s}.basemaps.cartocdn.com/rastertiles/voyager_nolabels/{z}/{x}/{y}{r}.png', + attr= + '© OpenStreetMap contributors © CARTO' + ) + folium.TileLayer( + tiles= + 'https://stamen-tiles-{s}.a.ssl.fastly.net/toner-lines/{z}/{x}/{y}{r}.png', + attr= + 'Map tiles by Stamen Design, CC BY 3.0 — Map data © OpenStreetMap contributors' + ).add_to(m) + # folium.TileLayer('Stamen Toner').add_to(m) + + cluster_ids = df_for_cluster[ + f"{loc_type}_{alg}_clusters_{r}_m"].unique() + + # draw the convex hull of the clusters + for i in range(len(cluster_ids)): + c = cluster_ids[i] + if c == -1: + print( + 'we should never get here because we want to convert the -1 cluster into single-trip clusters' + ) + continue + + points_in_cluster = df_for_cluster[ + df_for_cluster[f"{loc_type}_{alg}_clusters_{r}_m"] == c] + + if np.isnan(c): + # if False: + if len(points_in_cluster) == 0: + continue + else: + print(points_in_cluster) + print(df_for_cluster[df_for_cluster[ + f"{loc_type}_{alg}_clusters_{r}_m"].isnull()]) + raise Exception( + 'nan cluster detected; all trips should have a proper cluster index' + ) + m = plot_cluster_border( + points_in_cluster, + loc_type=loc_type, + m=m, + color='gray', + # color=COLORS[i % (len(COLORS) - 1)], + cluster_idx=c) + + # plot all the destinations, color-coordinated by purpose + # we want to plot these on *top* of the cluster circles so that we can + # hover over the points and see the purpose on the tooltip + m = plot_user_trips(user_df, + loc_type, + plot_100=False, + plot_unlabeled=plot_unlabeled, + m=m) + + # add plot to the figure + fig.add_subplot(len(radii) / 2 + len(radii) % 2, 2, + fig_index).add_child(m) + + return fig + + +def plot_model_clusters( + model, + category, + # purpose_col='purpose_confirm', + m=None, + map_loc=CLM_COORD): + """ category (str): 'test' or 'train' """ + loc_type = 'end' + + if m == None: + m = folium.Map(location=map_loc, zoom_start=12) + + if category == 'test': + df = model.test_df + elif category == 'train': + df = model.train_df + + cluster_ids = df['final_cluster_idx'].unique() + + # draw the convex hull of the clusters + for i in range(len(cluster_ids)): + c = cluster_ids[i] + if c == -1: + print( + 'we should never get here because we want to convert the -1 cluster into single-trip clusters' + ) + continue + + points_in_cluster = df[df['final_cluster_idx'] == c] + + if np.isnan(c): + print(points_in_cluster) + print(df[df['final_cluster_idx'].isnull()]) + raise Exception( + 'nan cluster detected; all trips should have a proper cluster index' + ) + m = plot_cluster_border(points_in_cluster, + loc_type=loc_type, + m=m, + color=COLORS[i % (len(COLORS) - 1)], + cluster_idx=c) + + # plot all the destinations, color-coordinated by purpose + # we want to plot these on *top* of the cluster circles so that we can + # hover over the points and see the purpose on the tooltip + # m = plot_user_trips(df, loc_type, plot_100=False, plot_unlabeled=True, m=m) + + return m + + +def plot_user_trips(user_df, + loc_type, + plot_100=True, + plot_500=False, + plot_unlabeled=False, + purpose_col='purpose_confirm', + color=None, + m=None): + """ Args: + user_df (dataframe): must contain the columns 'start/end_lat/lon' + loc_type (str): 'start' or 'end' + plot_100 (bool): whether or not to plot 100m radius circles around + each location point + plot_500 (bool): whether or not to plot 500m radius circles around + each location point + plot_unlabeled (bool): whether or not to plot unlabeled points (if + so, they will be red) + m (folium.Map): optional, an existing map onto which this function + will plot markers + """ + assert loc_type == 'start' or loc_type == 'end' + + if m is None: + m = folium.Map(location=MTV_COORD, zoom_start=13) + + purpose_list = user_df[purpose_col].dropna().unique() + + # plot circles with a 500m radius around each point + if plot_500: + for i, purpose in enumerate(purpose_list): + if color is None and i < len(COLORS): + color = COLORS[i] + elif color is None: + color = COLORS[len(COLORS) - 1] + purpose_trips = user_df[user_df[purpose_col] == purpose] + for j in range(len(purpose_trips)): + coords = purpose_trips[loc_type + + '_loc'].iloc[j]['coordinates'] + folium.Circle([coords[1], coords[0]], + radius=500, + color=color, + opacity=0.2, + fill=True, + fill_opacity=0.1, + weight=1).add_to(m) + if plot_unlabeled: + unlabeled_trips = user_df[user_df[purpose_col].isna()] + for j in range(len(unlabeled_trips)): + coords = unlabeled_trips[loc_type + + '_loc'].iloc[j]['coordinates'] + folium.Circle([coords[1], coords[0]], + radius=500, + color='red', + opacity=0.2, + fill=True, + fill_opacity=0.1, + weight=1).add_to(m) + + +# plot circles with a 100m radius around each point + if plot_100: + for i, purpose in enumerate(purpose_list): + if i < len(COLORS): + color = COLORS[i] + else: + color = COLORS[len(COLORS) - 1] + purpose_trips = user_df[user_df[purpose_col] == purpose] + for j in range(len(purpose_trips)): + coords = purpose_trips[loc_type + + '_loc'].iloc[j]['coordinates'] + folium.Circle([coords[1], coords[0]], + radius=100, + color=color, + opacity=0.2, + fill=True, + fill_opacity=0.1, + weight=1).add_to(m) + if plot_unlabeled: + unlabeled_trips = user_df[user_df[purpose_col].isna()] + for j in range(len(unlabeled_trips)): + coords = unlabeled_trips[loc_type + + '_loc'].iloc[j]['coordinates'] + folium.Circle([coords[1], coords[0]], + radius=100, + color='red', + opacity=0.2, + fill=True, + fill_opacity=0.1, + weight=1).add_to(m) + + # plot small circle marker on the very top so it doesn't get obscured by + # the layers of 100m/500m circles + for i, purpose in enumerate(purpose_list): + if i < len(COLORS): + color = COLORS[i] + else: + color = COLORS[len(COLORS) - 1] + # print('{:<15} {:<15}'.format(color, purpose)) + purpose_trips = user_df[user_df[purpose_col] == purpose] + # print(purpose_trips) + for j in range(len(purpose_trips)): + coords = purpose_trips[loc_type + '_loc'].iloc[j]['coordinates'] + # print(purpose_trips.iloc[j]) + # print(purpose_trips.iloc[j].index) + trip_idx = purpose_trips.iloc[j].name + folium.CircleMarker([coords[1], coords[0]], + radius=2.5, + color=color, + tooltip=purpose + ' ' + + str(trip_idx)).add_to(m) + if plot_unlabeled: + unlabeled_trips = user_df[user_df[purpose_col].isna()] + for j in range(len(unlabeled_trips)): + coords = unlabeled_trips[loc_type + '_loc'].iloc[j]['coordinates'] + trip_idx = unlabeled_trips.iloc[j].name + folium.CircleMarker([coords[1], coords[0]], + radius=2.5, + color='red', + tooltip='UNLABELED' + ' ' + + str(trip_idx)).add_to(m) + + return m + + +def plot_cluster_border(points_df, + loc_type, + m=None, + color='green', + cluster_idx=None): + """ plots a convex hull around the given points. + + Args: + points_df: dataframe with columns 'xxx_lat' and 'xxx_lon' + loc_type (str): 'start' or 'end', the type of points to cluster + m (folium.Map): optional, an existing map onto which this function + will plot markers + color (str): cluster color. must be valid in folium. + cluster_idx (int): cluster index, to be added to tooltip + """ + assert loc_type == 'start' or loc_type == 'end' + if m is None: + m = folium.Map(location=MTV_COORD, zoom_start=12) + + lats = points_df[loc_type + '_lat'].tolist() + lons = points_df[loc_type + '_lon'].tolist() + points = np.array([lats, lons]).T + + if len(points) > 2: + hull = ConvexHull(points) + border_points = points[hull.vertices] + else: + border_points = points + + if cluster_idx is not None: + folium.Polygon( + border_points, # list of points (latitude, longitude) + color=color, + weight=15, + opacity=0.6, + fill=True, + fill_opacity=0.5, + tooltip=f'cluster {cluster_idx}').add_to(m) + else: + folium.Polygon( + border_points, # list of points (latitude, longitude) + color=color, + weight=20, + opacity=0.6, + fill=True, + fill_opacity=0.5).add_to(m) + + return m diff --git a/TRB_label_assist/models.py b/TRB_label_assist/models.py new file mode 100644 index 000000000..e5283d730 --- /dev/null +++ b/TRB_label_assist/models.py @@ -0,0 +1,2035 @@ +import pandas as pd +import numpy as np +from abc import ABCMeta, abstractmethod # to define abstract class "blueprints" +import logging +import copy + +# sklearn imports +from sklearn.pipeline import make_pipeline +from sklearn.preprocessing import StandardScaler, OneHotEncoder +from sklearn.impute import SimpleImputer +from sklearn.metrics.pairwise import haversine_distances +from sklearn.cluster import DBSCAN +from sklearn import svm +from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier +from sklearn.tree import DecisionTreeClassifier +from sklearn.exceptions import NotFittedError + +# our imports +from clustering import get_distance_matrix, single_cluster_purity +import data_wrangling +import emission.storage.decorations.trip_queries as esdtq +import emission.analysis.modelling.tour_model_first_only.build_save_model as bsm +import emission.analysis.modelling.tour_model_first_only.evaluation_pipeline as ep +from emission.analysis.classification.inference.labels.inferrers import predict_cluster_confidence_discounting +import emission.core.wrapper.entry as ecwe +import emission.analysis.modelling.tour_model_extended.similarity as eamts + +# logging.basicConfig(level=logging.DEBUG) + +EARTH_RADIUS = 6371000 + +############################# +## define abstract classes ## +############################# + + +class SetupMixin(metaclass=ABCMeta): + """ class containing code to be reused when setting up estimators. """ + + @abstractmethod + def set_params(self, params): + """ Set the parameters of the estimator. + + Args: + params (dict): dictionary where the keys are the param names + (strings) and the values are the parameter inputs + + Returns: + self + """ + raise NotImplementedError + + def _clean_data(self, df): + """ Clean a dataframe of trips. + (Drop trips with missing start/end locations, expand the user input + columns, ensure all essential columns are present) + + Args: + df: a dataframe of trips. must contain the columns 'start_loc', + 'end_loc', and should also contain the user input columns + ('mode_confirm', 'purpose_confirm', 'replaced_mode') if + available + """ + assert 'start_loc' in df.columns and 'end_loc' in df.columns + + # clean up the dataframe by dropping entries with NaN locations and + # reset index + num_nan = 0 + if df.start_loc.isna().any(): + num_nan += df.start_loc.value_counts(dropna=False).loc[np.nan] + df = df.dropna(subset=['start_loc']) + if df.end_loc.isna().any(): + num_nan += df.end_loc.value_counts(dropna=False).loc[np.nan] + df = df.dropna(subset=['end_loc']) + + # expand the 'start_loc' and 'end_loc' column into 'start_lat', + # 'start_lon', 'end_lat', and 'end_lon' columns + df = data_wrangling.expand_coords(df) + + # drop trips with missing coordinates + if df.start_lat.isna().any(): + num_nan += df.start_lat.value_counts(dropna=False).loc[np.nan] + df = df.dropna(subset=['start_lat']) + if df.start_lon.isna().any(): + num_nan += df.start_lon.value_counts(dropna=False).loc[np.nan] + df = df.dropna(subset=['start_lon']) + if df.end_lat.isna().any(): + num_nan += df.end_lat.value_counts(dropna=False).loc[np.nan] + df = df.dropna(subset=['end_lat']) + if df.end_lon.isna().any(): + num_nan = df.end_lon.value_counts(dropna=False).loc[np.nan] + df += df.dropna(subset=['end_lon']) + if num_nan > 0: + logging.info( + f'dropped {num_nan} trips that are missing location coordinates' + ) + + df = df.rename( + columns={ + 'mode_confirm': 'mode_true', + 'purpose_confirm': 'purpose_true', + 'replaced_mode': 'replaced_true' + }) + + for category in ['mode_true', 'purpose_true', 'replaced_true']: + if category not in df.columns: + # for example, if a user labels all their trip modes but none of their trip purposes + df.loc[:, category] = np.nan + + return df.reset_index(drop=True) + + +class Cluster(SetupMixin, metaclass=ABCMeta): + """ blueprint for clustering models. """ + + @abstractmethod + def fit(self, train_df): + """ Fit the clustering algorithm. + + Args: + train_df (DataFrame): dataframe of labeled trips + + Returns: + self + """ + raise NotImplementedError + + @abstractmethod + def predict(self, test_df): + """ Predict cluster indices for trips, if possible. Trips that could + not be clustered will have the index -1. + + Args: + test_df (DataFrame): dataframe of test trips + + Returns: + pd DataFrame containing one column, 'start_cluster_idx' or + 'end_cluster_idx' + """ + raise NotImplementedError + + def fit_predict(self, train_df): + """ Fit the clustering algorithm and predict cluster indices for trips, + if possible. Trips that could not be clustered will have the index -1. + + Args: + train_df (DataFrame): dataframe of labeled trips + + Returns: + pd DataFrame containing one column, 'start_cluster_idx' or + 'end_cluster_idx' + """ + self.fit(train_df) + return self.predict(train_df) + + +class TripClassifier(SetupMixin, metaclass=ABCMeta): + + @abstractmethod + def fit(self, train_df): + """ Fit a classification model. + + Args: + train_df (DataFrame): dataframe of labeled trips + + Returns: + self + """ + raise NotImplementedError + + def predict(self, test_df): + """ Predict trip labels. + + Args: + test_df (DataFrame): dataframe of trips + + Returns: + DataFrame containing the following columns: + 'purpose_pred', 'mode_pred', 'replaced_pred', + 'purpose_proba', 'mode_proba', 'replaced_proba' + the *_pred columns contain the most-likely label prediction + (string for a label or float for np.nan). + the *_proba columns contain the probability of the most-likely + prediction. + """ + proba_df = self.predict_proba(test_df) + prediction_df = proba_df.loc[:, [('purpose', 'top_pred'), + ('purpose', 'top_proba'), + ('mode', 'top_pred'), + ('mode', 'top_proba'), + ('replaced', 'top_pred'), + ('replaced', 'top_proba')]] + + prediction_df.columns = prediction_df.columns.to_flat_index() + prediction_df = prediction_df.rename( + columns={ + ('purpose', 'top_pred'): 'purpose_pred', + ('purpose', 'top_proba'): 'purpose_proba', + ('mode', 'top_pred'): 'mode_pred', + ('mode', 'top_proba'): 'mode_proba', + ('replaced', 'top_pred'): 'replaced_pred', + ('replaced', 'top_proba'): 'replaced_proba', + }) + + return prediction_df + + def fit_predict(self, train_df): + """ Fit a classification model and predict trip labels. + + Args: + train_df (DataFrame): dataframe of labeled trips + + Returns: + DataFrame containing the following columns: + 'purpose_pred', 'mode_pred', 'replaced_pred', + 'purpose_proba', 'mode_proba', 'replaced_proba' + the *_pred columns contain the most-likely label prediction + (string for a label or float for np.nan). + the *_proba columns contain the probability of the most-likely + prediction. + """ + self.fit(train_df) + return self.predict(train_df) + + @abstractmethod + def predict_proba(self, test_df): + """ Predict class probabilities for each trip. + + NOTE: check the specific model to see if the class probabilities + have confidence-discounting or not. + + Args: + test_df (DataFrame): dataframe of trips + + Returns: + DataFrame with multiindexing. Each row represents a trip. There + are 3 columns at level 1, one for each label category + ('purpose', 'mode', 'replaced'). Within each category, there is + a column for each label, with the row's entry being the + probability that the trip has the label. There are three + additional columns within each category, one indicating the + most-likely label, one indicating the probability of the + most-likely label, and one indicating whether or not the trip + can be clustered. + TODO: add a fourth optional column for the number of trips in + the cluster (if clusterable) + + Level 1 columns are: purpose, mode, replaced + Lebel 2 columns are: + , , ... top_pred, top_proba, clusterable + , , ... top_pred, top_proba, clusterable + , , ... top_pred, top_proba, clusterable + """ + raise NotImplementedError + + +######################## +## clustering classes ## +######################## + + +class RefactoredNaiveCluster(Cluster): + """ Naive fixed-width clustering algorithm. + Refactored from the existing Similarity class to take in dataframes for + consistency, and allows for separate clustering of start and end + clusters. + + WARNING: this algorithm is *extremely* slow. + + Args: + loc_type (str): 'start' or 'end', the type of point to cluster + radius (int): max distance between all pairs of points in a + cluster, i.e. strict maximum cluster width. + + Attributes: + loc_type (str) + radius (int) + train_df (DataFrame) + test_df (DataFrame) + sim_model (Similarity object) + """ + + def __init__(self, loc_type='end', radius=100): + self.loc_type = loc_type + self.radius = radius + + def set_params(self, params): + if 'loc_type' in params.keys(): self.loc_type = params['loc_type'] + if 'radius' in params.keys(): self.radius = params['radius'] + + return self + + def fit(self, train_df): + # clean data + self.train_df = self._clean_data(train_df) + + # we can use all trips as long as they have purpose labels. it's ok if + # they're missing mode/replaced-mode labels, because they aren't as + # strongly correlated with location compared to purpose + # TODO: actually, we may want to rethink this. for example, it will + # probably be helpful to include trips that are missing purpose labels + # but still have mode labels. + if self.train_df.purpose_true.isna().any(): + num_nan = self.train_df.purpose_true.value_counts( + dropna=False).loc[np.nan] + logging.info( + f'dropping {num_nan}/{len(self.train_df)} trips that are missing purpose labels' + ) + self.train_df = self.train_df.dropna( + subset=['purpose_true']).reset_index(drop=True) + if len(self.train_df) == 0: + # i.e. no valid trips after removing all nans + raise Exception('no valid trips; nothing to fit') + + # fit the bins + self.sim_model = eamts.Similarity(self.train_df, + radius_start=self.radius, + radius_end=self.radius, + shouldFilter=False, + cutoff=False) + # we only bin the loc_type points to speed up the alg. avoid + # unnecessary binning since this is really slow + self.sim_model.bin_helper(loc_type=self.loc_type) + labels = self.sim_model.data_df[self.loc_type + '_bin'].to_list() + self.train_df.loc[:, f'{self.loc_type}_cluster_idx'] = labels + return self + + def predict(self, test_df): + self.test_df = self._clean_data(test_df) + + if self.loc_type == 'start': + bins = self.sim_model.start_bins + elif self.loc_type == 'end': + bins = self.sim_model.end_bins + + labels = [] + + # for each trip in the test list: + for idx, row in self.test_df.iterrows(): + # iterate over all bins + trip_binned = False + for i, bin in enumerate(bins): + # check if the trip can fit in the bin + # if so, get the bin index + if self._match(row, bin, self.loc_type): + labels += [i] + trip_binned = True + break + # if not, return -1 + if not trip_binned: + labels += [-1] + + self.test_df.loc[:, f'{self.loc_type}_cluster_idx'] = labels + + return self.test_df[[f'{self.loc_type}_cluster_idx']] + + def _match(self, trip, bin, loc_type): + """ Check if a trip can fit into an existing bin. + + copied from the Similarity class on the e-mission-server. + """ + for t_idx in bin: + trip_in_bin = self.train_df.iloc[t_idx] + if not self._distance_helper(trip, trip_in_bin, loc_type): + return False + return True + + def _distance_helper(self, tripa, tripb, loc_type): + """ Check if two trips have start/end points within the distance + threshold. + + copied from the Similarity class on the e-mission-server. + """ + pta_lat = tripa[[loc_type + '_lat']] + pta_lon = tripa[[loc_type + '_lon']] + ptb_lat = tripb[[loc_type + '_lat']] + ptb_lon = tripb[[loc_type + '_lon']] + + return eamts.within_radius(pta_lat, pta_lon, ptb_lat, ptb_lon, + self.radius) + + +class DBSCANSVMCluster(Cluster): + """ DBSCAN-based clustering algorithm that optionally implements SVM + sub-clustering. + + Args: + loc_type (str): 'start' or 'end', the type of point to cluster + radius (int): max distance between two points in each other's + neighborhood, i.e. DBSCAN's eps value. does not strictly + dictate final cluster size + size_thresh (int): the min number of trips a cluster must have + to be considered for SVM sub-division + purity_thresh (float): the min purity a cluster must have + to be sub-divided using SVM + gamma (float): coefficient for the rbf kernel in SVM + C (float): regularization hyperparameter for SVM + + Attributes: + loc_type (str) + radius (int) + size_thresh (int) + purity_thresh (float) + gamma (float) + C (float) + train_df (DataFrame) + test_df (DataFrame) + base_model (sklearn Estimator) + """ + + def __init__(self, + loc_type='end', + radius=100, + svm=True, + size_thresh=1, + purity_thresh=1.0, + gamma=0.05, + C=1): + self.loc_type = loc_type + self.radius = radius + self.svm = svm + self.size_thresh = size_thresh + self.purity_thresh = purity_thresh + self.gamma = gamma + self.C = C + + def set_params(self, params): + if 'loc_type' in params.keys(): self.loc_type = params['loc_type'] + if 'radius' in params.keys(): self.radius = params['radius'] + if 'svm' in params.keys(): self.svm = params['svm'] + if 'size_thresh' in params.keys(): + self.size_thresh = params['size_thresh'] + if 'purity_thresh' in params.keys(): + self.purity_thresh = params['purity_thresh'] + if 'gamma' in params.keys(): self.gamma = params['gamma'] + + return self + + def fit(self, train_df): + """ Creates clusters of trip points. + self.train_df will be updated with columns containing base and + final clusters. + + TODO: perhaps move the loc_type argument to fit() so we can use a + single class instance to cluster both start and end points. This + will also help us reduce duplicate data. + + Args: + train_df (dataframe): dataframe of labeled trips + """ + ################## + ### clean data ### + ################## + self.train_df = self._clean_data(train_df) + + # we can use all trips as long as they have purpose labels. it's ok if + # they're missing mode/replaced-mode labels, because they aren't as + # strongly correlated with location compared to purpose + # TODO: actually, we may want to rethink this. for example, it will + # probably be helpful to include trips that are missing purpose labels + # but still have mode labels. + if self.train_df.purpose_true.isna().any(): + num_nan = self.train_df.purpose_true.value_counts( + dropna=False).loc[np.nan] + logging.info( + f'dropping {num_nan}/{len(self.train_df)} trips that are missing purpose labels' + ) + self.train_df = self.train_df.dropna( + subset=['purpose_true']).reset_index(drop=True) + if len(self.train_df) == 0: + # i.e. no valid trips after removing all nans + raise Exception('no valid trips; nothing to fit') + + ######################### + ### get base clusters ### + ######################### + dist_matrix_meters = get_distance_matrix(self.train_df, self.loc_type) + self.base_model = DBSCAN(self.radius, + metric="precomputed", + min_samples=1).fit(dist_matrix_meters) + base_clusters = self.base_model.labels_ + + self.train_df.loc[:, + f'{self.loc_type}_base_cluster_idx'] = base_clusters + + ######################## + ### get sub-clusters ### + ######################## + # copy base cluster column into final cluster column + self.train_df.loc[:, f'{self.loc_type}_cluster_idx'] = self.train_df[ + f'{self.loc_type}_base_cluster_idx'] + + if self.svm: + c = 0 # count of how many clusters we have iterated over + + # iterate over all clusters and subdivide them with SVM. the while + # loop is so we can do multiple iterations of subdividing if needed + while c < self.train_df[f'{self.loc_type}_cluster_idx'].max(): + points_in_cluster = self.train_df[ + self.train_df[f'{self.loc_type}_cluster_idx'] == c] + + # only do SVM if we have the minimum num of trips in the cluster + if len(points_in_cluster) < self.size_thresh: + c += 1 + continue + + # only do SVM if purity is below threshold + purity = single_cluster_purity(points_in_cluster, + label_col='purpose_true') + if purity < self.purity_thresh: + X = points_in_cluster[[ + f"{self.loc_type}_lon", f"{self.loc_type}_lat" + ]] + y = points_in_cluster.purpose_true.to_list() + + svm_model = make_pipeline( + StandardScaler(), + svm.SVC( + kernel='rbf', + gamma=self.gamma, + C=self.C, + )).fit(X, y) + labels = svm_model.predict(X) + unique_labels = np.unique(labels) + + # if the SVM predicts that all points in the cluster have + # the same label, just ignore it and don't reindex. + # this also helps us to handle the possibility that a + # cluster may be impure but inherently inseparable, e.g. an + # end cluster at a user's home, containing 50% trips from + # work to home and 50% round trips that start and end at + # home. we don't want to reindex otherwise the low purity + # will trigger SVM again, and we will attempt & fail to + # split the cluster ad infinitum + if len(unique_labels) > 1: + # map purpose labels to new cluster indices + # we offset indices by the max existing index so that we + # don't run into any duplicate indices + max_existing_idx = self.train_df[ + f'{self.loc_type}_cluster_idx'].max() + label_to_cluster = { + unique_labels[i]: i + max_existing_idx + 1 + for i in range(len(unique_labels)) + } + # update trips with their new cluster indices + indices = np.array( + [label_to_cluster[l] for l in labels]) + self.train_df.loc[ + self.train_df[f'{self.loc_type}_cluster_idx'] == c, + f'{self.loc_type}_cluster_idx'] = indices + + c += 1 + # TODO: make things categorical at the end? or maybe at the start of the decision tree pipeline + + return self + + def fit_predict(self, train_df): + """ Override to avoid unnecessarily computation of distance matrices. + """ + self.fit(train_df) + return self.train_df[[f'{self.loc_type}_cluster_idx']] + + def predict(self, test_df): + # TODO: store clusters as polygons so the prediction is faster + # TODO: we probably don't want to store test_df in self to be more memory-efficient + self.test_df = self._clean_data(test_df) + pred_clusters = self._NN_predict(self.test_df) + + self.test_df.loc[:, f'{self.loc_type}_cluster_idx'] = pred_clusters + + return self.test_df[[f'{self.loc_type}_cluster_idx']] + + def _NN_predict(self, test_df): + """ Generate base-cluster predictions for the test data using a + nearest-neighbor approach. + + sklearn doesn't implement predict() for DBSCAN, which is why we + need a custom method. + """ + n_samples = test_df.shape[0] + labels = np.ones(shape=n_samples, dtype=int) * -1 + + # get coordinates of core points (we can't use model.components_ + # because our input feature was a distance matrix and doesn't contain + # info about the raw coordinates) + # NOTE: technically, every single point in a cluster is a core point + # because it has at least minPts (2) points, including itself, in its + # radius + train_coordinates = self.train_df[[ + f'{self.loc_type}_lat', f'{self.loc_type}_lon' + ]] + train_radians = np.radians(train_coordinates) + + for idx, row in test_df.reset_index(drop=True).iterrows(): + # calculate the distances between the ith test data and all points, + # then find the index of the closest point. if the ith test data is + # within epsilon of the point, then assign its cluster to the ith + # test data (otherwise, leave it as -1, indicating noise). + # unfortunately, pairwise_distances_argmin() does not support + # haversine distance, so we have to reimplement it ourselves + new_loc_radians = np.radians( + row[[self.loc_type + "_lat", + self.loc_type + "_lon"]].to_list()) + new_loc_radians = np.reshape(new_loc_radians, (1, 2)) + dist_matrix_meters = haversine_distances( + new_loc_radians, train_radians) * EARTH_RADIUS + + shortest_dist_idx = np.argmin(dist_matrix_meters) + if dist_matrix_meters[0, shortest_dist_idx] < self.radius: + labels[idx] = self.train_df.reset_index( + drop=True).loc[shortest_dist_idx, + f'{self.loc_type}_cluster_idx'] + + return labels + + +###################### +## trip classifiers ## +###################### + + +class NaiveBinningClassifier(TripClassifier): + """ Trip classifier using the existing Similarity class and associated + functions without refactoring them. Essentially a wrapper for the + existing code on e-mission-server. + + Args: + radius (int): maximum distance between any two points in the same + cluster + """ + + def __init__(self, radius=500): + self.radius = radius + + def set_params(self, params): + if 'radius' in params.keys(): self.radius = params['radius'] + + return self + + def fit(self, train_df): + # (copied from bsm.build_user_model()) + + # convert train_df to a list because the existing binning algorithm + # only accepts lists of Entry objects + train_trips = self._trip_df_to_list(train_df) + + sim, bins, bin_trips, train_trips = ep.first_round( + train_trips, self.radius) + + # set instance variables so we can access results later as well + self.sim = sim + self.bins = bins + + # save all user labels + user_id = train_df.user_id.iloc[0] + bsm.save_models('user_labels', + bsm.create_user_input_map(train_trips, bins), user_id) + + # save location features of all bins + bsm.save_models('locations', + bsm.create_location_map(train_trips, bins), user_id) + return self + + def predict_proba(self, test_df): + """ NOTE: these class probabilities have the confidence-discounting + heuristic applied. + """ + # convert test_df to a list because the existing binning algorithm + # only accepts lists of Entry objects + test_trips = self._trip_df_to_list(test_df) + + purpose_distribs = [] + mode_distribs = [] + replaced_distribs = [] + + for trip in test_trips: + trip_prediction = predict_cluster_confidence_discounting(trip) + + if len(trip_prediction) == 0: + # model could not find cluster for the trip + purpose_distribs += [{}] + mode_distribs += [{}] + replaced_distribs += [{}] + + else: + trip_prediction_df = pd.DataFrame(trip_prediction).rename( + columns={'labels': 'user_input'}) + # renaming is simply so we can use the expand_userinputs + # function + + expand_prediction = esdtq.expand_userinputs(trip_prediction_df) + # converts the 'labels' dictionaries into individual columns + + # sum up probability for each label + for label_type, label_distribs in zip( + ['purpose_confirm', 'mode_confirm', 'replaced_mode'], + [purpose_distribs, mode_distribs, replaced_distribs]): + label_distrib = {} + if label_type in expand_prediction.columns: + for label in expand_prediction[label_type].unique(): + label_distrib[label] = expand_prediction.loc[ + expand_prediction[label_type] == label, + 'p'].sum() + label_distribs += [label_distrib] + + proba_dfs = [] + for label_type, label_distribs in zip( + ['purpose', 'mode', 'replaced'], + [purpose_distribs, mode_distribs, replaced_distribs]): + + proba = pd.DataFrame(label_distribs) + proba['clusterable'] = proba.sum(axis=1) > 0 + proba['top_pred'] = proba.drop(columns=['clusterable']).idxmax( + axis=1) + proba['top_proba'] = proba.drop( + columns=['clusterable', 'top_pred']).max(axis=1, skipna=True) + classes = proba.columns[:-3] + proba.loc[:, classes] = proba.loc[:, classes].fillna(0) + proba = pd.concat([proba], keys=[label_type], axis=1) + proba_dfs += [proba] + + self.proba_df = pd.concat(proba_dfs, axis=1) + return self.proba_df + + def _trip_df_to_list(self, trip_df): + """ Converts a dataframe of trips into a list of trip Entry objects. + + Allows this class to accept DataFrames (which are used by the new + clustering algorithms) without having to refactor the old + clustering algorithm. + + Args: + trip_df: DataFrame containing trips. See code below for the + expected columns. + + """ + trips_list = [] + + for idx, row in trip_df.iterrows(): + data = { + 'source': row['source'], + 'end_ts': row['end_ts'], + # 'end_local_dt':row['end_local_dt'], # this attribute doesn't seem to appear in the dataframes I've tested with + 'end_fmt_time': row['end_fmt_time'], + 'end_loc': row['end_loc'], + 'raw_trip': row['raw_trip'], + 'start_ts': row['start_ts'], + # 'start_local_dt':row['start_local_dt'], # this attribute doesn't seem to appear in the dataframes I've tested with + 'start_fmt_time': row['start_fmt_time'], + 'start_loc': row['start_loc'], + 'duration': row['duration'], + 'distance': row['distance'], + 'start_place': row['start_place'], + 'end_place': row['end_place'], + 'cleaned_trip': row['cleaned_trip'], + 'inferred_labels': row['inferred_labels'], + 'inferred_trip': row['inferred_trip'], + 'expectation': row['expectation'], + 'confidence_threshold': row['confidence_threshold'], + 'expected_trip': row['expected_trip'], + 'user_input': row['user_input'] + } + trip = ecwe.Entry.create_entry(user_id=row['user_id'], + key='analysis/confirmed_trip', + data=data) + trips_list += [trip] + + return trips_list + + +class ClusterExtrapolationClassifier(TripClassifier): + """ Classifier that extrapolates labels from a trip's cluster. + + Args: + alg (str): clustering algorithm to use; either 'DBSCAN' or 'naive' + radius (int): radius for the clustering algorithm + svm (bool): whether or not to use SVM sub-clustering. (only when + alg=='DBSCAN') + size_thresh (int): the min number of trips a cluster must have + to be considered for SVM sub-division + purity_thresh (float): the min purity a cluster must have + to be sub-divided using SVM + gamma (float): coefficient for the rbf kernel in SVM + C (float): regularization hyperparameter for SVM + cluster_method (str): 'end', 'trip', 'combination'. whether to + extrapolate labels from only end clusters, only trip clusters, + or both end and trip clusters when available. + """ + + def __init__( + self, + alg='DBSCAN', + radius=100, # TODO: add diff start and end radii + svm=True, + size_thresh=1, + purity_thresh=1.0, + gamma=0.05, + C=1, + cluster_method='end'): + assert cluster_method in ['end', 'trip', 'combination'] + assert alg in ['DBSCAN', 'naive'] + self.alg = alg + self.radius = radius + self.svm = svm + self.size_thresh = size_thresh + self.purity_thresh = purity_thresh + self.gamma = gamma + self.C = C + self.cluster_method = cluster_method + + if self.alg == 'DBSCAN': + self.end_cluster_model = DBSCANSVMCluster( + loc_type='end', + radius=self.radius, + svm=self.svm, + size_thresh=self.size_thresh, + purity_thresh=self.purity_thresh, + gamma=self.gamma, + C=self.C) + elif self.alg == 'naive': + self.end_cluster_model = RefactoredNaiveCluster(loc_type='end', + radius=self.radius) + + if self.cluster_method in ['trip', 'combination']: + if self.alg == 'DBSCAN': + self.start_cluster_model = DBSCANSVMCluster( + loc_type='start', + radius=self.radius, + svm=self.svm, + size_thresh=self.size_thresh, + purity_thresh=self.purity_thresh, + gamma=self.gamma, + C=self.C) + elif self.alg == 'naive': + self.start_cluster_model = RefactoredNaiveCluster( + loc_type='start', radius=self.radius) + + self.trip_grouper = TripGrouper( + start_cluster_col='start_cluster_idx', + end_cluster_col='end_cluster_idx') + + def set_params(self, params): + """ hacky code that mimics the set_params of an sklearn Estimator class + so that we can pass params during randomizedsearchCV + + Args: + params (dict): a dictionary where the keys are the parameter + names and the values are the parameter values + """ + alg = params['alg'] if 'alg' in params.keys() else self.alg + radius = params['radius'] if 'radius' in params.keys() else self.radius + svm = params['svm'] if 'svm' in params.keys() else self.svm + size_thresh = params['size_thresh'] if 'size_thresh' in params.keys( + ) else self.size_thresh + purity_thresh = params[ + 'purity_thresh'] if 'purity_thresh' in params.keys( + ) else self.purity_thresh + gamma = params['gamma'] if 'gamma' in params.keys() else self.gamma + C = params['C'] if 'C' in params.keys() else self.C + cluster_method = params[ + 'cluster_method'] if 'cluster_method' in params.keys( + ) else self.cluster_method + + # calling __init__ again is not good practice, I know... + self.__init__(alg, radius, svm, size_thresh, purity_thresh, gamma, C, + cluster_method) + + return self + + def fit(self, train_df): + # fit clustering model + self.end_cluster_model.fit(train_df) + self.train_df = self.end_cluster_model.train_df + + if self.cluster_method in ['trip', 'combination']: + self.start_cluster_model.fit(train_df) + self.train_df.loc[:, ['start_cluster_idx' + ]] = self.start_cluster_model.train_df[[ + 'start_cluster_idx' + ]] + + # create trip-level clusters + trip_cluster_idx = self.trip_grouper.fit_transform(self.train_df) + self.train_df.loc[:, 'trip_cluster_idx'] = trip_cluster_idx + + return self + + def predict_proba(self, test_df): + """ NOTE: these class probabilities do NOT have a + confidence-discounting heuristic applied. + """ + self.end_cluster_model.predict(test_df) + # store a copy of test_df for now (TODO: make this more efficient since + # the data is duplicated) + self.test_df = self.end_cluster_model.test_df + + if self.cluster_method in ['trip', 'combination']: + self.start_cluster_model.predict(test_df) + # append the start cluster indices + self.test_df.loc[:, [ + 'start_cluster_idx' + ]] = self.start_cluster_model.test_df.loc[:, ['start_cluster_idx']] + + # create trip-level clusters + trip_cluster_idx = self.trip_grouper.transform(self.test_df) + self.test_df.loc[:, 'trip_cluster_idx'] = trip_cluster_idx + + # extrapolate label distributions from cluster information + self.test_df.loc[:, [ + 'mode_distrib', 'purpose_distrib', 'replaced_distrib' + ]] = np.nan + + if self.cluster_method in ['end', 'trip']: + cluster_col = f'{self.cluster_method}_cluster_idx' + self.test_df = self._add_label_distributions( + self.test_df, cluster_col) + + else: # self.cluster_method == 'combination' + # try to get label distributions from trip-level clusters first, + # because trip-level clusters tend to be more homogenous and will + # yield more accurate predictions + self.test_df = self._add_label_distributions( + self.test_df, 'trip_cluster_idx') + + # for trips that have an empty label-distribution after the first + # pass using trip clusters, try to get a distribution from the + # destination cluster (this includes both trips that *don't* fall + # into a trip cluster, as well as trips that *do* fall into a trip + # cluster but are missing some/all categories of labels due to + # missing user inputs.) + + # fill in missing label-distributions by the label_type + # (we want to iterate by label_type rather than check cluster idx + # because it's possible that some trips in a trip-cluster have + # predictions for one label_type but not another) + for label_type in ['mode', 'purpose', 'replaced']: + self.test_df.loc[self.test_df[f'{label_type}_distrib'] == + {}] = self._add_label_distributions( + self.test_df.loc[ + self.test_df[f'{label_type}_distrib'] + == {}], + 'end_cluster_idx', + label_types=[label_type]) + + # create the dataframe of probabilities + proba_dfs = [] + for label_type in ['purpose', 'mode', 'replaced']: + classes = self.train_df[f'{label_type}_true'].dropna().unique() + proba = pd.DataFrame( + self.test_df[f'{label_type}_distrib'].to_list(), + columns=classes) + proba['top_pred'] = proba.idxmax(axis=1) + proba['top_proba'] = proba.max(axis=1, skipna=True) + proba['clusterable'] = self.test_df.end_cluster_idx >= 0 + proba.loc[:, classes] = proba.loc[:, classes].fillna(0) + proba = pd.concat([proba], keys=[label_type], axis=1) + proba_dfs += [proba] + + self.proba_df = pd.concat(proba_dfs, axis=1) + return self.proba_df + + def _add_label_distributions(self, + df, + cluster_col, + label_types=['mode', 'purpose', 'replaced']): + """ Add label distributions to a DataFrame. + + Args: + df (DataFrame): DataFrame containing a column of clusters + cluster_col (str): name of column in df containing clusters + label_types (str list): the categories of labels to retrieve + distributions for. + + Returns: + a DataFrame with additional columns in which the entries are + dictionaries containing label distributions. + """ + df = df.copy() # to avoid SettingWithCopyWarning + for c in df.loc[:, cluster_col].unique(): + labeled_trips_in_cluster = self.train_df.loc[ + self.train_df[cluster_col] == c] + unlabeled_trips_in_cluster = df.loc[df[cluster_col] == c] + + cluster_size = len(unlabeled_trips_in_cluster) + + for label_type in label_types: + assert label_type in ['mode', 'purpose', 'replaced'] + + # get distribution of label_type labels in this cluster + distrib = labeled_trips_in_cluster[ + f'{label_type}_true'].value_counts(normalize=True, + dropna=True).to_dict() + # TODO: add confidence discounting + + # update predictions + # convert the dict into a list of dicts to work around pandas + # thinking we're trying to insert information according to a + # key-value map + # TODO: this is the line throwing the set on slice warning + df.loc[df[cluster_col] == c, + f'{label_type}_distrib'] = [distrib] * cluster_size + + return df + + +class EnsembleClassifier(TripClassifier, metaclass=ABCMeta): + """ Template class for trip classifiers using ensemble algorithms. + + Required args: + loc_feature (str): 'coordinates' or 'cluster' + """ + base_features = [ + 'duration', + 'distance', + 'start_local_dt_year', + 'start_local_dt_month', + 'start_local_dt_day', + 'start_local_dt_hour', + # 'start_local_dt_minute', + 'start_local_dt_weekday', + 'end_local_dt_year', # most likely the same as the start year + 'end_local_dt_month', # most likely the same as the start month + 'end_local_dt_day', + 'end_local_dt_hour', + # 'end_local_dt_minute', + 'end_local_dt_weekday', + ] + targets = ['mode_true', 'purpose_true', 'replaced_true'] + + # required instance attributes + loc_feature = NotImplemented + purpose_enc = NotImplemented + mode_enc = NotImplemented + purpose_predictor = NotImplemented + mode_predictor = NotImplemented + replaced_predictor = NotImplemented + + # required methods + def fit(self, train_df): + # get location features + if self.loc_feature == 'cluster': + # fit clustering model(s) and one-hot encode their indices + # TODO: consolidate start/end_cluster_model in a single instance + # that has a location_type parameter in the fit() method + self.end_cluster_model.fit(train_df) + + clusters_to_encode = self.end_cluster_model.train_df[[ + 'end_cluster_idx' + ]].copy() # copy is to avoid SettingWithCopyWarning + + if self.use_start_clusters or self.use_trip_clusters: + self.start_cluster_model.fit(train_df) + + if self.use_start_clusters: + clusters_to_encode = pd.concat([ + clusters_to_encode, self.start_cluster_model.train_df[[ + 'start_cluster_idx' + ]] + ], + axis=1) + if self.use_trip_clusters: + start_end_clusters = pd.concat([ + self.end_cluster_model.train_df[['end_cluster_idx']], + self.start_cluster_model.train_df[[ + 'start_cluster_idx' + ]] + ], + axis=1) + trip_cluster_idx = self.trip_grouper.fit_transform( + start_end_clusters) + clusters_to_encode.loc[:, + 'trip_cluster_idx'] = trip_cluster_idx + + loc_features_df = self.cluster_enc.fit_transform( + clusters_to_encode.astype(int)) + + # clean the df again because we need it in the next step + # TODO: remove redundancy + self.train_df = self._clean_data(train_df) + + # TODO: move below code into a reusable function + if self.train_df.purpose_true.isna().any(): + num_nan = self.train_df.purpose_true.value_counts( + dropna=False).loc[np.nan] + logging.info( + f'dropping {num_nan}/{len(self.train_df)} trips that are missing purpose labels' + ) + self.train_df = self.train_df.dropna( + subset=['purpose_true']).reset_index(drop=True) + if len(self.train_df) == 0: + # i.e. no valid trips after removing all nans + raise Exception('no valid trips; nothing to fit') + + else: # self.loc_feature == 'coordinates' + self.train_df = self._clean_data(train_df) + + # TODO: move below code into a reusable function + if self.train_df.purpose_true.isna().any(): + num_nan = self.train_df.purpose_true.value_counts( + dropna=False).loc[np.nan] + logging.info( + f'dropping {num_nan}/{len(self.train_df)} trips that are missing purpose labels' + ) + self.train_df = self.train_df.dropna( + subset=['purpose_true']).reset_index(drop=True) + if len(self.train_df) == 0: + # i.e. no valid trips after removing all nans + raise Exception('no valid trips; nothing to fit') + + loc_features_df = self.train_df[[ + 'start_lon', 'start_lat', 'end_lon', 'end_lat' + ]] + + # prepare data for the ensemble classifiers + + # note that we want to use purpose data to aid our mode predictions, + # and use both purpose and mode data to aid our replaced-mode + # predictions + # thus, we want to one-hot encode the purpose and mode as data + # features, but also preserve an unencoded copy for the target columns + + # dataframe holding all features and targets + self.Xy_train = pd.concat([ + self.train_df[self.base_features + self.targets], loc_features_df + ], + axis=1) + + # encode purposes and modes + onehot_purpose_df = self.purpose_enc.fit_transform( + self.Xy_train[['purpose_true']], output_col_prefix='purpose') + onehot_mode_df = self.mode_enc.fit_transform( + self.Xy_train[['mode_true']], output_col_prefix='mode') + self.Xy_train = pd.concat( + [self.Xy_train, onehot_purpose_df, onehot_mode_df], axis=1) + + # for predicting purpose, drop encoded purpose and mode features, as + # well as all target labels + self.X_purpose = self.Xy_train.dropna(subset=['purpose_true']).drop( + labels=self.targets + self.purpose_enc.onehot_encoding_cols + + self.mode_enc.onehot_encoding_cols, + axis=1) + + # for predicting mode, we want to keep purpose data + self.X_mode = self.Xy_train.dropna(subset=['mode_true']).drop( + labels=self.targets + self.mode_enc.onehot_encoding_cols, axis=1) + + # for predicting replaced-mode, we want to keep purpose and mode data + self.X_replaced = self.Xy_train.dropna(subset=['replaced_true']).drop( + labels=self.targets, axis=1) + + self.y_purpose = self.Xy_train['purpose_true'].dropna() + self.y_mode = self.Xy_train['mode_true'].dropna() + self.y_replaced = self.Xy_train['replaced_true'].dropna() + + # fit classifiers + if len(self.X_purpose) > 0: + self.purpose_predictor.fit(self.X_purpose, self.y_purpose) + if len(self.X_mode) > 0: + self.mode_predictor.fit(self.X_mode, self.y_mode) + if len(self.X_replaced) > 0: + self.replaced_predictor.fit(self.X_replaced, self.y_replaced) + + return self + + def predict_proba(self, test_df): + """ NOTE: these class probabilities do NOT have a + confidence-discounting heuristic applied. + """ + ################ + ### get data ### + ################ + self.X_test_for_purpose = self._get_X_test_for_purpose(test_df) + + ######################## + ### make predictions ### + ######################## + # note that we want to use purpose data to aid our mode predictions, + # and use both purpose and mode data to aid our replaced-mode + # predictions + + # TODO: some of the code across the try and except blocks can be + # consolidated by considering one-hot encoding fully np.nan arrays + try: + purpose_proba_raw = self.purpose_predictor.predict_proba( + self.X_test_for_purpose) + purpose_proba = pd.DataFrame( + purpose_proba_raw, columns=self.purpose_predictor.classes_) + purpose_pred = purpose_proba.idxmax(axis=1) + + # update X_test with one-hot-encoded purpose predictions to aid + # mode predictor + # TODO: converting purpose_pred to a DataFrame feels super + # unnecessary, make this more efficient + onehot_purpose_df = self.purpose_enc.transform( + pd.DataFrame(purpose_pred).set_index( + self.X_test_for_purpose.index)) + self.X_test_for_mode = pd.concat( + [self.X_test_for_purpose, onehot_purpose_df], axis=1) + + mode_proba, replaced_proba = self._try_predict_proba_mode_replaced( + ) + + except NotFittedError as e: + # if we can't predict purpose, we can still try to predict mode and + # replaced-mode without one-hot encoding the purpose + + purpose_pred = np.full((len(self.X_test_for_purpose), ), np.nan) + purpose_proba_raw = np.full((len(self.X_test_for_purpose), 1), 0) + purpose_proba = pd.DataFrame(purpose_proba_raw, columns=[np.nan]) + + self.X_test_for_mode = self.X_test_for_purpose + mode_proba, replaced_proba = self._try_predict_proba_mode_replaced( + ) + + mode_pred = mode_proba.idxmax(axis=1) + replaced_pred = replaced_proba.idxmax(axis=1) + + if (purpose_pred.dtype == np.float64 and mode_pred.dtype == np.float64 + and replaced_pred.dtype == np.float64): + # this indicates that all the predictions are np.nan so none of the + # random forest classifiers were fitted + raise NotFittedError + + # TODO: move this to a Mixin for cluster-based predictors and use the + # 'cluster' column of the proba_df outputs + # if self.drop_unclustered: + # # TODO: actually, we should only drop purpose predictions. we can + # # then impute the missing entries in the purpose feature and still + # # try to predict mode and replaced-mode without it + # self.predictions.loc[ + # self.end_cluster_model.test_df['end_cluster_idx'] == -1, + # ['purpose_pred', 'mode_pred', 'replaced_pred']] = np.nan + + proba_dfs = [] + for label_type, proba in zip( + ['purpose', 'mode', 'replaced'], + [purpose_proba, mode_proba, replaced_proba]): + proba['top_pred'] = proba.idxmax(axis=1) + proba['top_proba'] = proba.max(axis=1, skipna=True) + proba['clusterable'] = self._clusterable( + self.X_test_for_purpose).astype(bool) + proba = pd.concat([proba], keys=[label_type], axis=1) + proba_dfs += [proba] + + self.proba_df = pd.concat(proba_dfs, axis=1) + return self.proba_df + + def _get_X_test_for_purpose(self, test_df): + """ Do the pre-processing to get data that we can then pass into the + ensemble classifiers. + """ + if self.loc_feature == 'cluster': + # get clusters + self.end_cluster_model.predict(test_df) + clusters_to_encode = self.end_cluster_model.test_df[[ + 'end_cluster_idx' + ]].copy() # copy is to avoid SettingWithCopyWarning + + if self.use_start_clusters or self.use_trip_clusters: + self.start_cluster_model.predict(test_df) + + if self.use_start_clusters: + clusters_to_encode = pd.concat([ + clusters_to_encode, + self.start_cluster_model.test_df[['start_cluster_idx']] + ], + axis=1) + if self.use_trip_clusters: + start_end_clusters = pd.concat([ + self.end_cluster_model.test_df[['end_cluster_idx']], + self.start_cluster_model.test_df[['start_cluster_idx']] + ], + axis=1) + trip_cluster_idx = self.trip_grouper.transform( + start_end_clusters) + clusters_to_encode.loc[:, + 'trip_cluster_idx'] = trip_cluster_idx + + # one-hot encode the cluster indices + loc_features_df = self.cluster_enc.transform(clusters_to_encode) + else: # self.loc_feature == 'coordinates' + test_df = self._clean_data(test_df) + loc_features_df = test_df[[ + 'start_lon', 'start_lat', 'end_lon', 'end_lat' + ]] + + # extract the desired data + X_test = pd.concat([ + test_df[self.base_features].reset_index(drop=True), + loc_features_df.reset_index(drop=True) + ], + axis=1) + + return X_test + + def _try_predict_proba_mode_replaced(self): + """ Try to predict mode and replaced-mode. Handles error in case the + ensemble algorithms were not fitted. + + Requires self.X_test_for_mode to have already been set. (These are + the DataFrames containing the test data to be passed into self. + mode_predictor.) + + Returns: mode_proba and replaced_proba, two DataFrames containing + class probabilities for mode and replaced-mode respectively + """ + + try: + # predict mode + mode_proba_raw = self.mode_predictor.predict_proba( + self.X_test_for_mode) + mode_proba = pd.DataFrame(mode_proba_raw, + columns=self.mode_predictor.classes_) + mode_pred = mode_proba.idxmax(axis=1) + + # update X_test with one-hot-encoded mode predictions to aid + # replaced-mode predictor + onehot_mode_df = self.mode_enc.transform( + pd.DataFrame(mode_pred).set_index(self.X_test_for_mode.index)) + self.X_test_for_replaced = pd.concat( + [self.X_test_for_mode, onehot_mode_df], axis=1) + replaced_proba = self._try_predict_proba_replaced() + + except NotFittedError as e: + mode_proba_raw = np.full((len(self.X_test_for_mode), 1), 0) + mode_proba = pd.DataFrame(mode_proba_raw, columns=[np.nan]) + + # if we don't have mode predictions, we *could* still try to + # predict replaced mode (but if the user didn't input mode labels + # then it's unlikely they would input replaced-mode) + self.X_test_for_replaced = self.X_test_for_mode + replaced_proba = self._try_predict_proba_replaced() + + return mode_proba, replaced_proba + + def _try_predict_proba_replaced(self): + """ Try to predict replaced mode. Handles error in case the + replaced_predictor was not fitted. + + Requires self.X_test_for_replaced to have already been set. (This + is the DataFrame containing the test data to be passed into self. + replaced_predictor.) + + Returns: replaced_proba, DataFrame containing class probabilities + for replaced-mode + """ + try: + replaced_proba_raw = self.replaced_predictor.predict_proba( + self.X_test_for_replaced + ) # has shape (len_trips, number of replaced_mode classes) + replaced_proba = pd.DataFrame( + replaced_proba_raw, columns=self.replaced_predictor.classes_) + + except NotFittedError as e: + replaced_proba_raw = np.full((len(self.X_test_for_replaced), 1), 0) + replaced_proba = pd.DataFrame(replaced_proba_raw, columns=[np.nan]) + + return replaced_proba + + def _clusterable(self, test_df): + """ Check if the end points can be clustered (i.e. are within + meters of an end point from the training set) + """ + if self.loc_feature == 'cluster': + return self.end_cluster_model.test_df.end_cluster_idx >= 0 + + n_samples = test_df.shape[0] + clustered = np.ones(shape=n_samples, dtype=int) * False + + train_coordinates = self.train_df[['end_lat', 'end_lon']] + train_radians = np.radians(train_coordinates) + + for idx, row in test_df.reset_index(drop=True).iterrows(): + # calculate the distances between the ith test data and all points, + # then find the minimum distance for each point and check if it's + # within the distance threshold. + # unfortunately, pairwise_distances_argmin() does not support + # haversine distance, so we have to reimplement it ourselves + new_loc_radians = np.radians(row[["end_lat", "end_lon"]].to_list()) + new_loc_radians = np.reshape(new_loc_radians, (1, 2)) + dist_matrix_meters = haversine_distances( + new_loc_radians, train_radians) * EARTH_RADIUS + + shortest_dist = np.min(dist_matrix_meters) + if shortest_dist < self.radius: + clustered[idx] = True + + return clustered + + +class ForestClassifier(EnsembleClassifier): + """ Random forest-based trip classifier. + + Args: + loc_feature (str): 'coordinates' or 'cluster'; whether to use lat/ + lon coordinates or cluster indices for the location feature + radius (int): radius for DBSCAN clustering. only if + loc_feature=='cluster' + size_thresh (int): the min number of trips a cluster must have to + be considered for sub-division via SVM. only if + loc_feature=='cluster' + purity_thresh (float): the min purity a cluster must have to be + sub-divided via SVM. only if loc_feature=='cluster' + gamma (float): coefficient for the rbf kernel in SVM. only if + loc_feature=='cluster' + C (float): regularization hyperparameter for SVM. only if + loc_feature=='cluster' + n_estimators (int): number of estimators in the random forest + criterion (str): function to measure the quality of a split in the + random forest + max_depth (int): max depth of a tree in the random forest. + unlimited if None. + min_samples_split (int): min number of samples required to split an + internal node in a decision tree + min_samples_leaf (int): min number of samples required for a leaf + node in a decision tree + max_features (str): number of features to consider when looking for + the best split in a decision tree + bootstrap (bool): whether bootstrap samples are used when building + decision trees + random_state (int): random state for deterministic random forest + construction + use_start_clusters (bool): whether or not to use start clusters as + input features to the ensemble classifier. only if + loc_feature=='cluster' + use_trip_clusters (bool): whether or not to use trip-level clusters + as input features to the ensemble classifier. only if + loc_feature=='cluster' + """ + + def __init__( + self, + loc_feature='coordinates', + radius=100, # TODO: add different start and end radii + size_thresh=1, + purity_thresh=1.0, + gamma=0.05, + C=1, + n_estimators=100, + criterion='gini', + max_depth=None, + min_samples_split=2, + min_samples_leaf=1, + max_features='sqrt', + bootstrap=True, + random_state=42, + # drop_unclustered=False, + use_start_clusters=False, + use_trip_clusters=True): + assert loc_feature in ['cluster', 'coordinates'] + self.loc_feature = loc_feature + self.radius = radius + self.size_thresh = size_thresh + self.purity_thresh = purity_thresh + self.gamma = gamma + self.C = C + self.n_estimators = n_estimators + self.criterion = criterion + self.max_depth = max_depth + self.min_samples_split = min_samples_split + self.min_samples_leaf = min_samples_leaf + self.max_features = max_features + self.bootstrap = bootstrap + self.random_state = random_state + # self.drop_unclustered = drop_unclustered + self.use_start_clusters = use_start_clusters + self.use_trip_clusters = use_trip_clusters + + if self.loc_feature == 'cluster': + # clustering algorithm to generate end clusters + self.end_cluster_model = DBSCANSVMCluster( + loc_type='end', + radius=self.radius, + size_thresh=self.size_thresh, + purity_thresh=self.purity_thresh, + gamma=self.gamma, + C=self.C) + + if self.use_start_clusters or self.use_trip_clusters: + # clustering algorithm to generate start clusters + self.start_cluster_model = DBSCANSVMCluster( + loc_type='start', + radius=self.radius, + size_thresh=self.size_thresh, + purity_thresh=self.purity_thresh, + gamma=self.gamma, + C=self.C) + + if self.use_trip_clusters: + # helper class to generate trip-level clusters + self.trip_grouper = TripGrouper( + start_cluster_col='start_cluster_idx', + end_cluster_col='end_cluster_idx') + + # wrapper class to generate one-hot encodings for cluster indices + self.cluster_enc = OneHotWrapper(sparse=False, + handle_unknown='ignore') + + # wrapper class to generate one-hot encodings for purposes and modes + self.purpose_enc = OneHotWrapper(impute_missing=True, + sparse=False, + handle_unknown='error') + self.mode_enc = OneHotWrapper(impute_missing=True, + sparse=False, + handle_unknown='error') + + # ensemble classifiers for each label category + self.purpose_predictor = RandomForestClassifier( + n_estimators=self.n_estimators, + criterion=self.criterion, + max_depth=self.max_depth, + min_samples_split=self.min_samples_split, + min_samples_leaf=self.min_samples_leaf, + max_features=self.max_features, + bootstrap=self.bootstrap, + random_state=self.random_state) + self.mode_predictor = RandomForestClassifier( + n_estimators=self.n_estimators, + criterion=self.criterion, + max_depth=self.max_depth, + min_samples_split=self.min_samples_split, + min_samples_leaf=self.min_samples_leaf, + max_features=self.max_features, + bootstrap=self.bootstrap, + random_state=self.random_state) + self.replaced_predictor = RandomForestClassifier( + n_estimators=self.n_estimators, + criterion=self.criterion, + max_depth=self.max_depth, + min_samples_split=self.min_samples_split, + min_samples_leaf=self.min_samples_leaf, + max_features=self.max_features, + bootstrap=self.bootstrap, + random_state=self.random_state) + + def set_params(self, params): + """ hacky code that mimics the set_params of an sklearn Estimator class + so that we can pass params during randomizedsearchCV + + Args: + params (dict): a dictionary where the keys are the parameter + names and the values are the parameter values + """ + loc_feature = params['loc_feature'] if 'loc_feature' in params.keys( + ) else self.loc_feature + radius = params['radius'] if 'radius' in params.keys() else self.radius + size_thresh = params['size_thresh'] if 'size_thresh' in params.keys( + ) else self.size_thresh + purity_thresh = params[ + 'purity_thresh'] if 'purity_thresh' in params.keys( + ) else self.purity_thresh + gamma = params['gamma'] if 'gamma' in params.keys() else self.gamma + C = params['C'] if 'C' in params.keys() else self.C + n_estimators = params['n_estimators'] if 'n_estimators' in params.keys( + ) else self.n_estimators + criterion = params['criterion'] if 'criterion' in params.keys( + ) else self.criterion + max_depth = params['max_depth'] if 'max_depth' in params.keys( + ) else self.max_depth + min_samples_split = params[ + 'min_samples_split'] if 'min_samples_split' in params.keys( + ) else self.min_samples_split + min_samples_leaf = params[ + 'min_samples_leaf'] if 'min_samples_leaf' in params.keys( + ) else self.min_samples_leaf + max_features = params['max_features'] if 'max_features' in params.keys( + ) else self.max_features + bootstrap = params['bootstrap'] if 'bootstrap' in params.keys( + ) else self.bootstrap + random_state = params['random_state'] if 'random_state' in params.keys( + ) else self.random_state + use_start_clusters = params[ + 'use_start_clusters'] if 'use_start_clusters' in params.keys( + ) else self.use_start_clusters + # drop_unclustered = params[ + # 'drop_unclustered'] if 'drop_unclustered' in params.keys( + # ) else self.drop_unclustered + use_trip_clusters = params[ + 'use_trip_clusters'] if 'use_trip_clusters' in params.keys( + ) else self.use_trip_clusters + + # yes, calling __init__ again is not good practice... + self.__init__(loc_feature, radius, size_thresh, purity_thresh, gamma, + C, n_estimators, criterion, max_depth, min_samples_split, + min_samples_leaf, max_features, bootstrap, random_state, + use_start_clusters, use_trip_clusters) + return self + + +class ClusterForestSlimPredictor(ForestClassifier): + """ This is the same as ForestClassifier, just with fewer base + features. + + Args: + loc_feature (str): 'coordinates' or 'cluster'; whether to use lat/ + lon coordinates or cluster indices for the location feature + radius (int): radius for DBSCAN clustering. only if + loc_feature=='cluster' + size_thresh (int): the min number of trips a cluster must have to + be considered for sub-division via SVM. only if + loc_feature=='cluster' + purity_thresh (float): the min purity a cluster must have to be + sub-divided via SVM. only if loc_feature=='cluster' + gamma (float): coefficient for the rbf kernel in SVM. only if + loc_feature=='cluster' + C (float): regularization hyperparameter for SVM. only if + loc_feature=='cluster' + n_estimators (int): number of estimators in the random forest + criterion (str): function to measure the quality of a split in the + random forest + max_depth (int): max depth of a tree in the random forest. + unlimited if None. + min_samples_split (int): min number of samples required to split an + internal node in a decision tree + min_samples_leaf (int): min number of samples required for a leaf + node in a decision tree + max_features (str): number of features to consider when looking for + the best split in a decision tree + bootstrap (bool): whether bootstrap samples are used when building + decision trees + random_state (int): random state for deterministic random forest + construction + use_start_clusters (bool): whether or not to use start clusters as + input features to the ensemble classifier. only if + loc_feature=='cluster' + use_trip_clusters (bool): whether or not to use trip-level clusters + as input features to the ensemble classifier. only if + loc_feature=='cluster' + """ + + def __init__( + self, + loc_feature='coordinates', + radius=100, # TODO: add different start and end radii + size_thresh=1, + purity_thresh=1.0, + gamma=0.05, + C=1, + n_estimators=100, + criterion='gini', + max_depth=None, + min_samples_split=2, + min_samples_leaf=1, + max_features='sqrt', + bootstrap=True, + random_state=42, + # drop_unclustered=False, + use_start_clusters=False, + use_trip_clusters=True): + + super().__init__(loc_feature, radius, size_thresh, purity_thresh, + gamma, C, n_estimators, criterion, max_depth, + min_samples_split, min_samples_leaf, max_features, + bootstrap, random_state, use_start_clusters, + use_trip_clusters) + + self.base_features = [ + 'duration', + 'distance', + ] + + +class AdaBoostClassifier(EnsembleClassifier): + """ AdaBoost-based trip classifier. + + Args: + loc_feature (str): 'coordinates' or 'cluster'; whether to use lat/ + lon coordinates or cluster indices for the location feature + radius (int): radius for DBSCAN clustering. only if + loc_feature=='cluster' + size_thresh (int): the min number of trips a cluster must have to + be considered for sub-division via SVM. only if + loc_feature=='cluster' + purity_thresh (float): the min purity a cluster must have to be + sub-divided via SVM. only if loc_feature=='cluster' + gamma (float): coefficient for the rbf kernel in SVM. only if + loc_feature=='cluster' + C (float): regularization hyperparameter for SVM. only if + loc_feature=='cluster' + n_estimators (int): number of estimators + criterion (str): function to measure the quality of a split in a + decision tree + max_depth (int): max depth of a tree in the random forest. + unlimited if None. + min_samples_split (int): min number of samples required to split an + internal node in a decision tree + min_samples_leaf (int): min number of samples required for a leaf + node in a decision tree + max_features (str): number of features to consider when looking for + the best split in a decision tree + random_state (int): random state for deterministic random forest + construction + use_start_clusters (bool): whether or not to use start clusters as + input features to the ensemble classifier. only if + loc_feature=='cluster' + use_trip_clusters (bool): whether or not to use trip-level clusters + as input features to the ensemble classifier. only if + loc_feature=='cluster' + learning_rate (float): weight applied to each decision tree at each + boosting iteration + """ + + def __init__( + self, + loc_feature='coordinates', + radius=100, # TODO: add different start and end radii + size_thresh=1, + purity_thresh=1.0, + gamma=0.05, + C=1, + n_estimators=100, + criterion='gini', + max_depth=None, + min_samples_split=2, + min_samples_leaf=1, + max_features='sqrt', + random_state=42, + # drop_unclustered=False, + use_start_clusters=False, + use_trip_clusters=True, + use_base_clusters=True, + learning_rate=1.0): + assert loc_feature in ['cluster', 'coordinates'] + self.loc_feature = loc_feature + self.radius = radius + self.size_thresh = size_thresh + self.purity_thresh = purity_thresh + self.gamma = gamma + self.C = C + self.n_estimators = n_estimators + self.criterion = criterion + self.max_depth = max_depth + self.min_samples_split = min_samples_split + self.min_samples_leaf = min_samples_leaf + self.max_features = max_features + self.random_state = random_state + # self.drop_unclustered = drop_unclustered + self.use_start_clusters = use_start_clusters + self.use_trip_clusters = use_trip_clusters + self.use_base_clusters = use_base_clusters + self.learning_rate = learning_rate + + if self.loc_feature == 'cluster': + # clustering algorithm to generate end clusters + self.end_cluster_model = DBSCANSVMCluster( + loc_type='end', + radius=self.radius, + size_thresh=self.size_thresh, + purity_thresh=self.purity_thresh, + gamma=self.gamma, + C=self.C) + + if self.use_start_clusters or self.use_trip_clusters: + # clustering algorithm to generate start clusters + self.start_cluster_model = DBSCANSVMCluster( + loc_type='start', + radius=self.radius, + size_thresh=self.size_thresh, + purity_thresh=self.purity_thresh, + gamma=self.gamma, + C=self.C) + + if self.use_trip_clusters: + # helper class to generate trip-level clusters + self.trip_grouper = TripGrouper( + start_cluster_col='start_cluster_idx', + end_cluster_col='end_cluster_idx') + + # wrapper class to generate one-hot encodings for cluster indices + self.cluster_enc = OneHotWrapper(sparse=False, + handle_unknown='ignore') + + # wrapper class to generate one-hot encodings for purposes and modes + self.purpose_enc = OneHotWrapper(impute_missing=True, + sparse=False, + handle_unknown='error') + self.mode_enc = OneHotWrapper(impute_missing=True, + sparse=False, + handle_unknown='error') + + self.purpose_predictor = AdaBoostClassifier( + n_estimators=self.n_estimators, + learning_rate=self.learning_rate, + random_state=self.random_state, + base_estimator=DecisionTreeClassifier( + criterion=self.criterion, + max_depth=self.max_depth, + min_samples_split=self.min_samples_split, + min_samples_leaf=self.min_samples_leaf, + max_features=self.max_features, + random_state=self.random_state)) + self.mode_predictor = AdaBoostClassifier( + n_estimators=self.n_estimators, + learning_rate=self.learning_rate, + random_state=self.random_state, + base_estimator=DecisionTreeClassifier( + criterion=self.criterion, + max_depth=self.max_depth, + min_samples_split=self.min_samples_split, + min_samples_leaf=self.min_samples_leaf, + max_features=self.max_features, + random_state=self.random_state)) + self.replaced_predictor = AdaBoostClassifier( + n_estimators=self.n_estimators, + learning_rate=self.learning_rate, + random_state=self.random_state, + base_estimator=DecisionTreeClassifier( + criterion=self.criterion, + max_depth=self.max_depth, + min_samples_split=self.min_samples_split, + min_samples_leaf=self.min_samples_leaf, + max_features=self.max_features, + random_state=self.random_state)) + + def set_params(self, params): + """ hacky code that mimics the set_params of an sklearn Estimator class + so that we can pass params during randomizedsearchCV + + Args: + params (dict): a dictionary where the keys are the parameter + names and the values are the parameter values + """ + radius = params['radius'] if 'radius' in params.keys() else self.radius + size_thresh = params['size_thresh'] if 'size_thresh' in params.keys( + ) else self.size_thresh + purity_thresh = params[ + 'purity_thresh'] if 'purity_thresh' in params.keys( + ) else self.purity_thresh + gamma = params['gamma'] if 'gamma' in params.keys() else self.gamma + C = params['C'] if 'C' in params.keys() else self.C + n_estimators = params['n_estimators'] if 'n_estimators' in params.keys( + ) else self.n_estimators + criterion = params['criterion'] if 'criterion' in params.keys( + ) else self.criterion + max_depth = params['max_depth'] if 'max_depth' in params.keys( + ) else self.max_depth + min_samples_split = params[ + 'min_samples_split'] if 'min_samples_split' in params.keys( + ) else self.min_samples_split + min_samples_leaf = params[ + 'min_samples_leaf'] if 'min_samples_leaf' in params.keys( + ) else self.min_samples_leaf + max_features = params['max_features'] if 'max_features' in params.keys( + ) else self.max_features + random_state = params['random_state'] if 'random_state' in params.keys( + ) else self.random_state + use_start_clusters = params[ + 'use_start_clusters'] if 'use_start_clusters' in params.keys( + ) else self.use_start_clusters + # drop_unclustered = params[ + # 'drop_unclustered'] if 'drop_unclustered' in params.keys( + # ) else self.drop_unclustered + use_trip_clusters = params[ + 'use_trip_clusters'] if 'use_trip_clusters' in params.keys( + ) else self.use_trip_clusters + learning_rate = params[ + 'learning_rate'] if 'learning_rate' in params.keys( + ) else self.learning_rate + + # calling __init__ again is not good practice, I know... + self.__init__(radius, size_thresh, purity_thresh, gamma, C, + n_estimators, criterion, max_depth, min_samples_split, + min_samples_leaf, max_features, random_state, + use_start_clusters, use_trip_clusters, learning_rate) + return self + + +class TripGrouper(): + """ Helper class to get trip clusters from start and end clusters. + + Args: + start_cluster_col (str): name of the column containing start + cluster indices + end_cluster_col (str): name of the column containing end cluster + indices + """ + + def __init__(self, + start_cluster_col='start_cluster_idx', + end_cluster_col='end_cluster_idx'): + self.start_cluster_col = start_cluster_col + self.end_cluster_col = end_cluster_col + + def fit_transform(self, trip_df): + """ Fit and remember possible trip clusters. + + Args: + trip_df (DataFrame): DataFrame containing trips. must have + columns and + """ + trip_groups = trip_df.groupby( + [self.start_cluster_col, self.end_cluster_col]) + + # need dict so we can access the trip indices of all the trips in each + # group. the key is the group tuple and the value is the list of trip + # indices in the group. + self.trip_groups_dict = dict(trip_groups.groups) + + # we want to convert trip-group tuples to to trip-cluster indices, + # hence the pd Series + trip_groups_series = pd.Series(list(self.trip_groups_dict.keys())) + + trip_cluster_idx = np.empty(len(trip_df)) + + for group_idx in range(len(trip_groups_series)): + group_tuple = trip_groups_series[group_idx] + trip_idxs_in_group = self.trip_groups_dict[group_tuple] + trip_cluster_idx[trip_idxs_in_group] = group_idx + + return trip_cluster_idx + + def transform(self, new_trip_df): + """ Get trip clusters for a new set of trips. + + Args: + new_trip_df (DataFrame): DataFrame containing trips. must have + columns and + """ + prediction_trip_groups = new_trip_df.groupby( + [self.start_cluster_col, self.end_cluster_col]) + + # need dict so we can access the trip indices of all the trips in each + # group. the key is the group tuple and the value is the list of trip + # indices in the group. + prediction_trip_groups_dict = dict(prediction_trip_groups.groups) + trip_groups_series = pd.Series(list(self.trip_groups_dict.keys())) + trip_cluster_idx = np.empty(len(new_trip_df)) + + for group_tuple in dict(prediction_trip_groups.groups).keys(): + # check if the trip cluster exists in the training set + trip_idxs_in_group = prediction_trip_groups_dict[group_tuple] + if group_tuple in self.trip_groups_dict.keys(): + # look up the group index from the series we created when we + # fit the model + group_idx = trip_groups_series[trip_groups_series == + group_tuple].index[0] + else: + group_idx = -1 + + trip_cluster_idx[trip_idxs_in_group] = group_idx + + return trip_cluster_idx + + +class OneHotWrapper(): + """ Helper class to streamline one-hot encoding. + + Args: + impute_missing (bool): whether or not to impute np.nan values. + sparse (bool): whether or not to return a sparse matrix. + handle_unknown (str): specifies the way unknown categories are + handled during transform. + """ + + def __init__( + self, + impute_missing=False, + sparse=False, + handle_unknown='ignore', + ): + self.impute_missing = impute_missing + if self.impute_missing: + self.encoder = make_pipeline( + SimpleImputer(missing_values=np.nan, + strategy='constant', + fill_value='missing'), + OneHotEncoder(sparse=False, handle_unknown=handle_unknown)) + else: + self.encoder = OneHotEncoder(sparse=sparse, + handle_unknown=handle_unknown) + + def fit_transform(self, train_df, output_col_prefix=None): + """ Establish one-hot encoded variables. + + Args: + train_df (DataFrame): DataFrame containing train trips. + output_col_prefix (str): only if train_df is a single column + """ + # TODO: handle pd series + + train_df = train_df.copy() # to avoid SettingWithCopyWarning + + # if imputing, the dtype of each column must be string/object and not + # numerical, otherwise the SimpleImputer will fail + if self.impute_missing: + for col in train_df.columns: + train_df[col] = train_df[col].astype(object) + onehot_encoding = self.encoder.fit_transform(train_df) + self.onehot_encoding_cols_all = [] + for col in train_df.columns: + if train_df.shape[1] > 1 or output_col_prefix is None: + output_col_prefix = col + self.onehot_encoding_cols_all += [ + f'{output_col_prefix}_{val}' + for val in np.sort(train_df[col].dropna().unique()) + ] + # we handle np.nan separately because it is of type float, and may + # cause issues with np.sort if the rest of the unique values are + # strings + if any((train_df[col].isna())): + self.onehot_encoding_cols_all += [f'{output_col_prefix}_nan'] + + onehot_encoding_df = pd.DataFrame( + onehot_encoding, + columns=self.onehot_encoding_cols_all).set_index(train_df.index) + + # ignore the encoded columns for missing entries + self.onehot_encoding_cols = copy.deepcopy( + self.onehot_encoding_cols_all) + for col in self.onehot_encoding_cols_all: + if col.endswith('_nan'): + onehot_encoding_df = onehot_encoding_df.drop(columns=[col]) + self.onehot_encoding_cols.remove(col) + + return onehot_encoding_df.astype(int) + + def transform(self, test_df): + """ One-hot encoded features in accordance with features seen in the + train set. + + Args: + test_df (DataFrame): DataFrame of trips. + """ + # TODO: rename test_df, this one doesn't necessarily need to be a df + onehot_encoding = self.encoder.transform(test_df) + onehot_encoding_df = pd.DataFrame( + onehot_encoding, + columns=self.onehot_encoding_cols_all).set_index(test_df.index) + + # ignore the encoded columns for missing entries + for col in self.onehot_encoding_cols_all: + if col.endswith('_nan'): + onehot_encoding_df = onehot_encoding_df.drop(columns=[col]) + + return onehot_encoding_df.astype(int) \ No newline at end of file From 3fc9b5362c6d58c896fc144477e8f96334d28759 Mon Sep 17 00:00:00 2001 From: Hannah Lu Date: Fri, 16 Dec 2022 09:20:30 -0800 Subject: [PATCH 2/6] update user uuid lookup; add documentation note --- TRB_label_assist/clustering.py | 5 ++-- TRB_label_assist/models.py | 42 +++++++++++++++------------------- 2 files changed, 21 insertions(+), 26 deletions(-) diff --git a/TRB_label_assist/clustering.py b/TRB_label_assist/clustering.py index 28a07ab38..fbe8a3bb7 100644 --- a/TRB_label_assist/clustering.py +++ b/TRB_label_assist/clustering.py @@ -14,6 +14,8 @@ from sklearn.preprocessing import StandardScaler # our imports +# NOTE: this requires changing the branch of e-mission-server to +# eval-private-data-compatibility import emission.analysis.modelling.tour_model_extended.similarity as eamts import emission.storage.decorations.trip_queries as esdtq @@ -349,8 +351,7 @@ def get_distance_matrix(loc_df, loc_type): """ assert loc_type == 'start' or loc_type == 'end' - radians_lat_lon = np.radians(loc_df[[loc_type + "_lat", - loc_type + "_lon"]]) + radians_lat_lon = np.radians(loc_df[[loc_type + "_lat", loc_type + "_lon"]]) dist_matrix_meters = pd.DataFrame( smp.haversine_distances(radians_lat_lon, radians_lat_lon) * diff --git a/TRB_label_assist/models.py b/TRB_label_assist/models.py index e5283d730..b370878f3 100644 --- a/TRB_label_assist/models.py +++ b/TRB_label_assist/models.py @@ -24,6 +24,8 @@ from emission.analysis.classification.inference.labels.inferrers import predict_cluster_confidence_discounting import emission.core.wrapper.entry as ecwe import emission.analysis.modelling.tour_model_extended.similarity as eamts +# NOTE: tour_model_extended.similarity is on the +# eval-private-data-compatibility branch in e-mission-server # logging.basicConfig(level=logging.DEBUG) @@ -599,8 +601,7 @@ def _NN_predict(self, test_df): # unfortunately, pairwise_distances_argmin() does not support # haversine distance, so we have to reimplement it ourselves new_loc_radians = np.radians( - row[[self.loc_type + "_lat", - self.loc_type + "_lon"]].to_list()) + row[[self.loc_type + "_lat", self.loc_type + "_lon"]].to_list()) new_loc_radians = np.reshape(new_loc_radians, (1, 2)) dist_matrix_meters = haversine_distances( new_loc_radians, train_radians) * EARTH_RADIUS @@ -657,8 +658,8 @@ def fit(self, train_df): bsm.create_user_input_map(train_trips, bins), user_id) # save location features of all bins - bsm.save_models('locations', - bsm.create_location_map(train_trips, bins), user_id) + bsm.save_models('locations', bsm.create_location_map(train_trips, bins), + user_id) return self def predict_proba(self, test_df): @@ -1053,17 +1054,14 @@ def fit(self, train_df): if self.use_start_clusters: clusters_to_encode = pd.concat([ - clusters_to_encode, self.start_cluster_model.train_df[[ - 'start_cluster_idx' - ]] + clusters_to_encode, + self.start_cluster_model.train_df[['start_cluster_idx']] ], axis=1) if self.use_trip_clusters: start_end_clusters = pd.concat([ self.end_cluster_model.train_df[['end_cluster_idx']], - self.start_cluster_model.train_df[[ - 'start_cluster_idx' - ]] + self.start_cluster_model.train_df[['start_cluster_idx']] ], axis=1) trip_cluster_idx = self.trip_grouper.fit_transform( @@ -1120,10 +1118,9 @@ def fit(self, train_df): # features, but also preserve an unencoded copy for the target columns # dataframe holding all features and targets - self.Xy_train = pd.concat([ - self.train_df[self.base_features + self.targets], loc_features_df - ], - axis=1) + self.Xy_train = pd.concat( + [self.train_df[self.base_features + self.targets], loc_features_df], + axis=1) # encode purposes and modes onehot_purpose_df = self.purpose_enc.fit_transform( @@ -1197,8 +1194,7 @@ def predict_proba(self, test_df): self.X_test_for_mode = pd.concat( [self.X_test_for_purpose, onehot_purpose_df], axis=1) - mode_proba, replaced_proba = self._try_predict_proba_mode_replaced( - ) + mode_proba, replaced_proba = self._try_predict_proba_mode_replaced() except NotFittedError as e: # if we can't predict purpose, we can still try to predict mode and @@ -1209,8 +1205,7 @@ def predict_proba(self, test_df): purpose_proba = pd.DataFrame(purpose_proba_raw, columns=[np.nan]) self.X_test_for_mode = self.X_test_for_purpose - mode_proba, replaced_proba = self._try_predict_proba_mode_replaced( - ) + mode_proba, replaced_proba = self._try_predict_proba_mode_replaced() mode_pred = mode_proba.idxmax(axis=1) replaced_pred = replaced_proba.idxmax(axis=1) @@ -1580,8 +1575,8 @@ def set_params(self, params): ) else self.use_trip_clusters # yes, calling __init__ again is not good practice... - self.__init__(loc_feature, radius, size_thresh, purity_thresh, gamma, - C, n_estimators, criterion, max_depth, min_samples_split, + self.__init__(loc_feature, radius, size_thresh, purity_thresh, gamma, C, + n_estimators, criterion, max_depth, min_samples_split, min_samples_leaf, max_features, bootstrap, random_state, use_start_clusters, use_trip_clusters) return self @@ -1648,8 +1643,8 @@ def __init__( use_start_clusters=False, use_trip_clusters=True): - super().__init__(loc_feature, radius, size_thresh, purity_thresh, - gamma, C, n_estimators, criterion, max_depth, + super().__init__(loc_feature, radius, size_thresh, purity_thresh, gamma, + C, n_estimators, criterion, max_depth, min_samples_split, min_samples_leaf, max_features, bootstrap, random_state, use_start_clusters, use_trip_clusters) @@ -2005,8 +2000,7 @@ def fit_transform(self, train_df, output_col_prefix=None): columns=self.onehot_encoding_cols_all).set_index(train_df.index) # ignore the encoded columns for missing entries - self.onehot_encoding_cols = copy.deepcopy( - self.onehot_encoding_cols_all) + self.onehot_encoding_cols = copy.deepcopy(self.onehot_encoding_cols_all) for col in self.onehot_encoding_cols_all: if col.endswith('_nan'): onehot_encoding_df = onehot_encoding_df.drop(columns=[col]) From 7111df7466aff9cd2889067521373438ae662f22 Mon Sep 17 00:00:00 2001 From: Shankari Date: Tue, 14 Feb 2023 22:10:09 -0800 Subject: [PATCH 3/6] Add additional logging to the calculation so that we can monitor the result generation more carefully --- TRB_label_assist/models.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/TRB_label_assist/models.py b/TRB_label_assist/models.py index b370878f3..6f02277ce 100644 --- a/TRB_label_assist/models.py +++ b/TRB_label_assist/models.py @@ -283,6 +283,7 @@ class RefactoredNaiveCluster(Cluster): """ def __init__(self, loc_type='end', radius=100): + logging.info("PERF: Initializing RefactoredNaiveCluster") self.loc_type = loc_type self.radius = radius @@ -294,6 +295,7 @@ def set_params(self, params): def fit(self, train_df): # clean data + logging.info("PERF: Fitting RefactoredNaiveCluster with size %s" % len(train_df)) self.train_df = self._clean_data(train_df) # we can use all trips as long as they have purpose labels. it's ok if @@ -328,6 +330,7 @@ def fit(self, train_df): return self def predict(self, test_df): + logging.info("PERF: Predicting RefactoredNaiveCluster for %s" % len(test_df)) self.test_df = self._clean_data(test_df) if self.loc_type == 'start': @@ -339,6 +342,8 @@ def predict(self, test_df): # for each trip in the test list: for idx, row in self.test_df.iterrows(): + if idx % 100 == 0: + logging.info("PERF: RefactoredNaiveCluster Working on trip %s/%s" % (idx, len(self.test_df))) # iterate over all bins trip_binned = False for i, bin in enumerate(bins): @@ -418,6 +423,7 @@ def __init__(self, purity_thresh=1.0, gamma=0.05, C=1): + logging.info("PERF: Initializing DBSCANSVMCluster") self.loc_type = loc_type self.radius = radius self.svm = svm @@ -453,6 +459,7 @@ def fit(self, train_df): ################## ### clean data ### ################## + logging.info("PERF: Fitting DBSCANSVMCluster") self.train_df = self._clean_data(train_df) # we can use all trips as long as they have purpose labels. it's ok if @@ -563,6 +570,7 @@ def fit_predict(self, train_df): return self.train_df[[f'{self.loc_type}_cluster_idx']] def predict(self, test_df): + logging.info("PERF: Predicting DBSCANSVMCluster") # TODO: store clusters as polygons so the prediction is faster # TODO: we probably don't want to store test_df in self to be more memory-efficient self.test_df = self._clean_data(test_df) @@ -579,6 +587,7 @@ def _NN_predict(self, test_df): sklearn doesn't implement predict() for DBSCAN, which is why we need a custom method. """ + logging.info("PERF: NN_predicting DBSCANSVMCluster") n_samples = test_df.shape[0] labels = np.ones(shape=n_samples, dtype=int) * -1 @@ -631,6 +640,7 @@ class NaiveBinningClassifier(TripClassifier): """ def __init__(self, radius=500): + logging.info("PERF: Initializing NaiveBinningClassifier") self.radius = radius def set_params(self, params): @@ -639,6 +649,7 @@ def set_params(self, params): return self def fit(self, train_df): + logging.info("PERF: Fitting NaiveBinningClassifier") # (copied from bsm.build_user_model()) # convert train_df to a list because the existing binning algorithm @@ -668,6 +679,7 @@ def predict_proba(self, test_df): """ # convert test_df to a list because the existing binning algorithm # only accepts lists of Entry objects + logging.info("PERF: Predicting NaiveBinningClassifier") test_trips = self._trip_df_to_list(test_df) purpose_distribs = [] @@ -2026,4 +2038,4 @@ def transform(self, test_df): if col.endswith('_nan'): onehot_encoding_df = onehot_encoding_df.drop(columns=[col]) - return onehot_encoding_df.astype(int) \ No newline at end of file + return onehot_encoding_df.astype(int) From 6967fc8520afdc2fce0d22cd06b215c9f16b8999 Mon Sep 17 00:00:00 2001 From: $aTyam Date: Sat, 25 Nov 2023 14:07:29 -0500 Subject: [PATCH 4/6] Update clustering.py (#37) * Update clustering.py Changes in clustering.py file to shift dependency from hlu09's tour_model_extended to main branch trip_model. Still need to change type of data being passed to fit function for this to work. * moving clustering_examples.ipynb to trip_model All dependencies of this notebook from custom branch are removed. There currently seems no errors while generating maps in clustering_examples notebook. * Removing changes in builtimeseries.py With these changes, no change in e-mission-server should be required. * Changes to support TRB_Label_Assist passing way of clustering to the e-mission-server. It was 'origin-destination' by default. Now can take one of three values, 'origin','destination' or 'origin-destination'. * suggestions previous suggestions to improve readability. * Revert "suggestions" This reverts commit 3e19b32cd090135b001709cb52da57e6c6a17c1f. * Improving readability Suggestions from previous comments to improve readability. * making `cluster_performance.ipynb`, `generate_figs_for_poster` and `SVM_decision_boundaries` compatible with changes in `clustering.py` and `mapping.py` files. Also porting these 3 notebooks to trip_model `cluster_performance.ipynb`, `generate_figs_for_poster` and `SVM_decision_boundaries` now have no dependence on the custom branch. Results of plots are attached to show no difference in theie previous and current outputs. * Unified Interface for fit function Unified Interface for fit function across all models. Passing 'Entry' Type data from the notebooks till the Binning functions. Default set to 'none'. * Fixing `models.py` to support `regenerate_classification_performance_results.py` Prior to this update, `NaiveBinningClassifier` in 'models.py' had dependencies on both of tour model and trip model. Now, this classifier is completely dependent on trip model. All the other notebooks (except `classification_performance.ipynb`) were tested as well and they are working as usual. Other minor fixes to support previous changes. * [PARTIALLY TESTED] Single database read and Code Cleanuo 1. removed mentions of `tour_model` or `tour_model_first_only` . 2. removed two reads from database. 3. Removed notebook outputs ( this could be the reason a few diffs are too big to view) * Delete TRB_label_assist/first_trial_results/cv results DBSCAN+SVM (destination).csv not required. * Reverting Notebook Reverting notebooks to initial state, since running on the browser messed up the cell index numbers. This was causing unnecessary git diffs even when no changes were made. running on VS code should resolve this. WIll do the subsequent changes on VS code and commit again. * [Partially Tested]Handled Whitespaces Whitespaces corrected. * [Partially Tested] Suggested changes implemented `Classification_performance` and `regenerate_classification_performance_results.py` are not tested yet as they would take too long to run. The itertools removal in these two files is tested in other notebooks and it works. Other files, like models.py will be tested once any of the above two are run. * Revert "[Partially Tested] Suggested changes implemented" This reverts commit bb404e989b2826f159e88fa828537b24785508e3. * [Partially Tested] Suggested changes implemented [Partially Tested] Suggested changes implemented bb404e9 `Classification_performance` and `regenerate_classification_performance_results.py` are not tested yet as they would take too long to run. The itertools removal in these two files is tested in other notebooks and it works. Other files, like models.py will be tested once any of the above two are run. * Minor variable fixes Fixed names of variables to be more self-explanatory * [TESTED] All the notebooks and files are tested 1. Change in models file a.t. changes in greedy_similarity_binning in e-mission-server 2.Minor fixes * Minor Fixes Minor Fixes to improve readability. * Minor Fixes in models.py Improved readability --- TRB_label_assist/clustering.py | 47 +++++++--- TRB_label_assist/mapping.py | 7 ++ TRB_label_assist/models.py | 153 ++++++++++++++++++++++----------- 3 files changed, 146 insertions(+), 61 deletions(-) diff --git a/TRB_label_assist/clustering.py b/TRB_label_assist/clustering.py index fbe8a3bb7..d3924f32a 100644 --- a/TRB_label_assist/clustering.py +++ b/TRB_label_assist/clustering.py @@ -16,8 +16,8 @@ # our imports # NOTE: this requires changing the branch of e-mission-server to # eval-private-data-compatibility -import emission.analysis.modelling.tour_model_extended.similarity as eamts import emission.storage.decorations.trip_queries as esdtq +import emission.analysis.modelling.trip_model.greedy_similarity_binning as eamtg EARTH_RADIUS = 6371000 ALG_OPTIONS = [ @@ -28,9 +28,27 @@ 'mean_shift' ] +def cleanEntryTypeData(loc_df,trip_entry): + + """ + Helps weed out entries from the list of entries which were removed from the df using + esdtq.filter_labeled_trips() and esdtq.expand_userinputs() + + loc_df : dataframe amde from entry type data + trip_entry : the entry type equivalent of loc_df , + which was passed alongside the dataframe while loading the data + + """ + + ids_in_df=loc_df['_id'] + filtered_trip_entry = list(filter(lambda entry: entry['_id'] in ids_in_df.values, trip_entry)) + return filtered_trip_entry + def add_loc_clusters( loc_df, + trip_entry, + clustering_way, radii, loc_type, alg, @@ -53,6 +71,9 @@ def add_loc_clusters( Args: loc_df (dataframe): must have columns 'start_lat' and 'start_lon' or 'end_lat' and 'end_lon' + trip_entry ( list of Entry/confirmedTrip): list consisting all entries from the + time data was loaded. loc_df was obtained from this by converting to df and + then filtering out labeled trips and expanding user_inputs radii (int list): list of radii to run the clustering algs with loc_type (str): 'start' or 'end' alg (str): 'DBSCAN', 'naive', 'OPTICS', 'SVM', 'fuzzy', or @@ -98,19 +119,25 @@ def add_loc_clusters( loc_df.loc[:, f"{loc_type}_DBSCAN_clusters_{r}_m"] = labels elif alg == 'naive': + + cleaned_trip_entry= cleanEntryTypeData(loc_df,trip_entry) + for r in radii: # this is using a modified Similarity class that bins start/end # points separately before creating trip-level bins - sim_model = eamts.Similarity(loc_df, - radius_start=r, - radius_end=r, - shouldFilter=False, - cutoff=False) - # we only bin the loc_type points to speed up the alg. avoid - # unnecessary binning since this is really slow - sim_model.bin_helper(loc_type=loc_type) - labels = sim_model.data_df[loc_type + '_bin'].to_list() + model_config = { + "metric": "od_similarity", + "similarity_threshold_meters": r, # meters, + "apply_cutoff": False, + "clustering_way": clustering_way, + "shouldFilter":False, + "incremental_evaluation": False + } + + sim_model = eamtg.GreedySimilarityBinning(model_config) + sim_model.fit(cleaned_trip_entry) + labels = [int(l) for l in sim_model.tripLabels] # # pd.Categorical converts the type from int to category (so # # numerical operations aren't possible) # loc_df.loc[:, f"{loc_type}_{alg}_clusters_{r}_m"] = pd.Categorical( diff --git a/TRB_label_assist/mapping.py b/TRB_label_assist/mapping.py index 2ef54de46..cd2d11766 100644 --- a/TRB_label_assist/mapping.py +++ b/TRB_label_assist/mapping.py @@ -37,8 +37,10 @@ def find_plot_clusters(user_df, + user_entry, loc_type, alg, + clustering_way, SVM=False, radii=[50, 100, 150, 200], cluster_unlabeled=False, @@ -64,6 +66,8 @@ def find_plot_clusters(user_df, alg (str): the clustering algorithm to be used. must be one of the following: 'DBSCAN', 'naive', 'OPTICS', 'SVM', 'fuzzy' or 'mean_shift' + clustering_way(str): 'origin'or 'destination' or 'origin-destination'. + Decides the way we can cluster trips geospatially. SVM (bool): whether or not to sub-divide clusters with SVM radii (int list): list of radii to pass to the clustering alg cluster_unlabeled (bool): whether or not unlabeled points are used @@ -91,6 +95,7 @@ def find_plot_clusters(user_df, assert 'start_loc' in user_df.columns assert 'end_loc' in user_df.columns assert 'user_input' in user_df.columns + assert clustering_way in ['origin','destination','origin-destination'] assert alg in ALG_OPTIONS fig = bre.Figure(figsize=(20, 20)) @@ -116,6 +121,8 @@ def find_plot_clusters(user_df, df_for_cluster = add_loc_clusters( df_for_cluster, + user_entry, + clustering_way, radii=radii, alg=alg, SVM=SVM, diff --git a/TRB_label_assist/models.py b/TRB_label_assist/models.py index 6f02277ce..f3026b6b8 100644 --- a/TRB_label_assist/models.py +++ b/TRB_label_assist/models.py @@ -19,11 +19,16 @@ from clustering import get_distance_matrix, single_cluster_purity import data_wrangling import emission.storage.decorations.trip_queries as esdtq -import emission.analysis.modelling.tour_model_first_only.build_save_model as bsm -import emission.analysis.modelling.tour_model_first_only.evaluation_pipeline as ep from emission.analysis.classification.inference.labels.inferrers import predict_cluster_confidence_discounting import emission.core.wrapper.entry as ecwe -import emission.analysis.modelling.tour_model_extended.similarity as eamts +import emission.analysis.modelling.trip_model.greedy_similarity_binning as eamtg +import emission.core.common as ecc +import emission.analysis.modelling.trip_model.model_storage as eamums +import emission.analysis.modelling.trip_model.model_type as eamumt +import emission.analysis.modelling.trip_model.run_model as eamur + + +import clustering # NOTE: tour_model_extended.similarity is on the # eval-private-data-compatibility branch in e-mission-server @@ -116,12 +121,12 @@ class Cluster(SetupMixin, metaclass=ABCMeta): """ blueprint for clustering models. """ @abstractmethod - def fit(self, train_df): + def fit(self, train_df,train_entry_list): """ Fit the clustering algorithm. Args: train_df (DataFrame): dataframe of labeled trips - + train_entry_list (List) : A list of trips where each element is of Entry type Returns: self """ @@ -159,12 +164,13 @@ def fit_predict(self, train_df): class TripClassifier(SetupMixin, metaclass=ABCMeta): @abstractmethod - def fit(self, train_df): + def fit(self, train_df,unused=None): """ Fit a classification model. Args: train_df (DataFrame): dataframe of labeled trips - + unused (List) : A list of Entry type of labeled and unlabeled trips which is not used in current function. + Passed to keep fit function generic. Returns: self """ @@ -293,10 +299,10 @@ def set_params(self, params): return self - def fit(self, train_df): + def fit(self, unused,train_entry_list=None): # clean data - logging.info("PERF: Fitting RefactoredNaiveCluster with size %s" % len(train_df)) - self.train_df = self._clean_data(train_df) + logging.info("PERF: Fitting RefactoredNaiveCluster with size %s" % len(unused)) + self.train_df = self._clean_data(unused) # we can use all trips as long as they have purpose labels. it's ok if # they're missing mode/replaced-mode labels, because they aren't as @@ -315,17 +321,23 @@ def fit(self, train_df): if len(self.train_df) == 0: # i.e. no valid trips after removing all nans raise Exception('no valid trips; nothing to fit') - + + model_config = { + "metric": "od_similarity", + "similarity_threshold_meters": self.radius, # meters, + "apply_cutoff": False, + "clustering_way":'origin' if self.loc_type=='start' + else 'destination' if self.loc_type =='end' + else 'origin-destination', + "incremental_evaluation": False + } + # fit the bins - self.sim_model = eamts.Similarity(self.train_df, - radius_start=self.radius, - radius_end=self.radius, - shouldFilter=False, - cutoff=False) - # we only bin the loc_type points to speed up the alg. avoid - # unnecessary binning since this is really slow - self.sim_model.bin_helper(loc_type=self.loc_type) - labels = self.sim_model.data_df[self.loc_type + '_bin'].to_list() + self.sim_model= eamtg.GreedySimilarityBinning(model_config) + cleaned_trip_entry= clustering.cleanEntryTypeData(self.train_df,train_entry_list) + self.sim_model.fit(cleaned_trip_entry) + + labels = [int(l) for l in self.sim_model.tripLabels] self.train_df.loc[:, f'{self.loc_type}_cluster_idx'] = labels return self @@ -334,10 +346,32 @@ def predict(self, test_df): self.test_df = self._clean_data(test_df) if self.loc_type == 'start': - bins = self.sim_model.start_bins + bins = self.sim_model.bins elif self.loc_type == 'end': - bins = self.sim_model.end_bins - + bins = self.sim_model.bins + + # This looks weird but works + # >>> x = [(1, 'a'), (2, 'b'), (3, 'c')] + # >>> {int(key):value for key,value in x} + # {1: 'a', 2: 'b', 3: 'c'} + # + # bins = { '1': [ 'key1': [] , 'key2' :[],.. ....], + # '2': ['key1': [] , 'key2' :[],...], + # '3': ['key1': [] , 'key2' :[],.....] ...} + # + # the code below converts above to + # + # bins = { 1: [ 'key1': [] , 'key2' :[],.. ....], + # 2: ['key1': [] , 'key2' :[],...], + # 3: ['key1': [] , 'key2' :[],.....] ....} + # + # This is why it works : + # 1. Iterate over (key,value) pairs in 'bins.items()' + # 2. for each pair, 'key' is a string . so use int(key) to convert it into an integer. + # 3. Create a new dictionary(using {} within the dictionary comprehension) + # where the keys are now integers and the values are same + + bins = {int(key):value for key,value in bins.items()} labels = [] # for each trip in the test list: @@ -346,10 +380,15 @@ def predict(self, test_df): logging.info("PERF: RefactoredNaiveCluster Working on trip %s/%s" % (idx, len(self.test_df))) # iterate over all bins trip_binned = False - for i, bin in enumerate(bins): + for i in bins: # check if the trip can fit in the bin - # if so, get the bin index - if self._match(row, bin, self.loc_type): + # if so, get the bin index. + # + # 'feature_rows' is the key that contains the list of list where + # each of the inner list takes the form : + # + # [ start_lon,start_lat,end_lon,end_lat] + if self._match(row, bins[i]['feature_rows'], self.loc_type): labels += [i] trip_binned = True break @@ -366,8 +405,7 @@ def _match(self, trip, bin, loc_type): copied from the Similarity class on the e-mission-server. """ - for t_idx in bin: - trip_in_bin = self.train_df.iloc[t_idx] + for trip_in_bin in bin: if not self._distance_helper(trip, trip_in_bin, loc_type): return False return True @@ -375,16 +413,20 @@ def _match(self, trip, bin, loc_type): def _distance_helper(self, tripa, tripb, loc_type): """ Check if two trips have start/end points within the distance threshold. - - copied from the Similarity class on the e-mission-server. """ + #tripa is taken from the test datframe. + #tripb is taken from the stored bin list. pta_lat = tripa[[loc_type + '_lat']] pta_lon = tripa[[loc_type + '_lon']] - ptb_lat = tripb[[loc_type + '_lat']] - ptb_lon = tripb[[loc_type + '_lon']] + if loc_type == 'start': + ptb_lat = tripb[1] + ptb_lon = tripb[0] + elif loc_type == 'end': + ptb_lat = tripb[3] + ptb_lon = tripb[2] - return eamts.within_radius(pta_lat, pta_lon, ptb_lat, ptb_lon, - self.radius) + dist= ecc.calDistance([pta_lon,pta_lat],[ptb_lon,ptb_lat]) + return dist <= self.radius class DBSCANSVMCluster(Cluster): @@ -444,7 +486,7 @@ def set_params(self, params): return self - def fit(self, train_df): + def fit(self, train_df,unused=None): """ Creates clusters of trip points. self.train_df will be updated with columns containing base and final clusters. @@ -455,7 +497,8 @@ def fit(self, train_df): Args: train_df (dataframe): dataframe of labeled trips - """ + unused (List) : A list of Entry type of labeled and unlabeled trips which is not used in current function. + Passed to keep fit function generic. """ ################## ### clean data ### ################## @@ -648,7 +691,7 @@ def set_params(self, params): return self - def fit(self, train_df): + def fit(self, train_df,unused=None): logging.info("PERF: Fitting NaiveBinningClassifier") # (copied from bsm.build_user_model()) @@ -656,21 +699,29 @@ def fit(self, train_df): # only accepts lists of Entry objects train_trips = self._trip_df_to_list(train_df) - sim, bins, bin_trips, train_trips = ep.first_round( - train_trips, self.radius) - + + model_config = { + "metric": "od_similarity", + "similarity_threshold_meters": self.radius, # meters, + "apply_cutoff": False, + "clustering_way": "origin-destination", #cause thats what is set in performance_eval.py for this model + "incremental_evaluation": False + } + + sim_model = eamtg.GreedySimilarityBinning(model_config) + sim_model.fit(train_trips) # set instance variables so we can access results later as well - self.sim = sim - self.bins = bins + self.sim = sim_model + self.bins = sim_model.bins # save all user labels user_id = train_df.user_id.iloc[0] - bsm.save_models('user_labels', - bsm.create_user_input_map(train_trips, bins), user_id) + model_type=eamumt.ModelType.GREEDY_SIMILARITY_BINNING + model_storage=eamums.ModelStorage.DOCUMENT_DATABASE + model_data_next=sim_model.to_dict() + last_done_ts = eamur._latest_timestamp(train_trips) + eamums.save_model(user_id, model_type, model_data_next, last_done_ts, model_storage) - # save location features of all bins - bsm.save_models('locations', bsm.create_location_map(train_trips, bins), - user_id) return self def predict_proba(self, test_df): @@ -880,13 +931,13 @@ def set_params(self, params): return self - def fit(self, train_df): + def fit(self, train_df,train_entry_list=None): # fit clustering model - self.end_cluster_model.fit(train_df) + self.end_cluster_model.fit(train_df,train_entry_list) self.train_df = self.end_cluster_model.train_df if self.cluster_method in ['trip', 'combination']: - self.start_cluster_model.fit(train_df) + self.start_cluster_model.fit(train_df,train_entry_list) self.train_df.loc[:, ['start_cluster_idx' ]] = self.start_cluster_model.train_df[[ 'start_cluster_idx' @@ -1049,7 +1100,7 @@ class EnsembleClassifier(TripClassifier, metaclass=ABCMeta): replaced_predictor = NotImplemented # required methods - def fit(self, train_df): + def fit(self, train_df,unused=None): # get location features if self.loc_feature == 'cluster': # fit clustering model(s) and one-hot encode their indices From 425136c559726bee4a7fa79f1913d1c317c04439 Mon Sep 17 00:00:00 2001 From: $aTyam Date: Sat, 2 Dec 2023 23:35:15 -0500 Subject: [PATCH 5/6] Receiving files from e-mission-eval-private-data --- .../analysis/modelling/trip_model}/clustering.py | 0 .../analysis/modelling/trip_model}/data_wrangling.py | 0 .../analysis/modelling/trip_model}/mapping.py | 0 .../analysis/modelling/trip_model}/models.py | 0 4 files changed, 0 insertions(+), 0 deletions(-) rename {TRB_label_assist => emission/analysis/modelling/trip_model}/clustering.py (100%) rename {TRB_label_assist => emission/analysis/modelling/trip_model}/data_wrangling.py (100%) rename {TRB_label_assist => emission/analysis/modelling/trip_model}/mapping.py (100%) rename {TRB_label_assist => emission/analysis/modelling/trip_model}/models.py (100%) diff --git a/TRB_label_assist/clustering.py b/emission/analysis/modelling/trip_model/clustering.py similarity index 100% rename from TRB_label_assist/clustering.py rename to emission/analysis/modelling/trip_model/clustering.py diff --git a/TRB_label_assist/data_wrangling.py b/emission/analysis/modelling/trip_model/data_wrangling.py similarity index 100% rename from TRB_label_assist/data_wrangling.py rename to emission/analysis/modelling/trip_model/data_wrangling.py diff --git a/TRB_label_assist/mapping.py b/emission/analysis/modelling/trip_model/mapping.py similarity index 100% rename from TRB_label_assist/mapping.py rename to emission/analysis/modelling/trip_model/mapping.py diff --git a/TRB_label_assist/models.py b/emission/analysis/modelling/trip_model/models.py similarity index 100% rename from TRB_label_assist/models.py rename to emission/analysis/modelling/trip_model/models.py From 2fd9d78425ca691633e0bc706732108da2ebf465 Mon Sep 17 00:00:00 2001 From: $aTyam Date: Sat, 2 Dec 2023 23:55:29 -0500 Subject: [PATCH 6/6] Update inter-file dependencies for relocated files Updating import paths and dependencies among the four files ( mapping.py, clustering.py, models.py, data_wrangling.py ) that were recently moved from e-mission-eval-private-data --- emission/analysis/modelling/trip_model/mapping.py | 6 +++--- emission/analysis/modelling/trip_model/models.py | 10 +++++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/emission/analysis/modelling/trip_model/mapping.py b/emission/analysis/modelling/trip_model/mapping.py index cd2d11766..06f0614ea 100644 --- a/emission/analysis/modelling/trip_model/mapping.py +++ b/emission/analysis/modelling/trip_model/mapping.py @@ -6,8 +6,8 @@ import branca.element as bre from scipy.spatial import ConvexHull -import data_wrangling -from clustering import add_loc_clusters, ALG_OPTIONS +import emission.analysis.modelling.trip_model.data_wrangling as eamtd +from emission.analysis.modelling.trip_model.clustering import add_loc_clusters, ALG_OPTIONS DENVER_COORD = [39.7392, -104.9903] MTV_COORD = [37.3861, -122.0839] @@ -109,7 +109,7 @@ def find_plot_clusters(user_df, # expand the 'start_loc' and 'end_loc' column into 'start_lat', # 'start_lon', 'end_lat', and 'end_lon' columns - all_trips_df = data_wrangling.expand_coords(all_trips_df) + all_trips_df = eamtd.expand_coords(all_trips_df) labeled_trips_df = all_trips_df.loc[all_trips_df.user_input != {}].dropna( subset=['purpose_confirm']) diff --git a/emission/analysis/modelling/trip_model/models.py b/emission/analysis/modelling/trip_model/models.py index f3026b6b8..e5fc08b46 100644 --- a/emission/analysis/modelling/trip_model/models.py +++ b/emission/analysis/modelling/trip_model/models.py @@ -16,8 +16,8 @@ from sklearn.exceptions import NotFittedError # our imports -from clustering import get_distance_matrix, single_cluster_purity -import data_wrangling +from emission.analysis.modelling.trip_model.clustering import get_distance_matrix, single_cluster_purity +import emission.analysis.modelling.trip_model.data_wrangling as eamtd import emission.storage.decorations.trip_queries as esdtq from emission.analysis.classification.inference.labels.inferrers import predict_cluster_confidence_discounting import emission.core.wrapper.entry as ecwe @@ -28,7 +28,7 @@ import emission.analysis.modelling.trip_model.run_model as eamur -import clustering +import emission.analysis.modelling.trip_model.clustering as eamtc # NOTE: tour_model_extended.similarity is on the # eval-private-data-compatibility branch in e-mission-server @@ -82,7 +82,7 @@ def _clean_data(self, df): # expand the 'start_loc' and 'end_loc' column into 'start_lat', # 'start_lon', 'end_lat', and 'end_lon' columns - df = data_wrangling.expand_coords(df) + df = eamtd.expand_coords(df) # drop trips with missing coordinates if df.start_lat.isna().any(): @@ -334,7 +334,7 @@ def fit(self, unused,train_entry_list=None): # fit the bins self.sim_model= eamtg.GreedySimilarityBinning(model_config) - cleaned_trip_entry= clustering.cleanEntryTypeData(self.train_df,train_entry_list) + cleaned_trip_entry= eamtc.cleanEntryTypeData(self.train_df,train_entry_list) self.sim_model.fit(cleaned_trip_entry) labels = [int(l) for l in self.sim_model.tripLabels]