diff --git a/emission/analysis/intake/segmentation/restart_checking.py b/emission/analysis/intake/segmentation/restart_checking.py index c715996af..61223b3f3 100644 --- a/emission/analysis/intake/segmentation/restart_checking.py +++ b/emission/analysis/intake/segmentation/restart_checking.py @@ -34,11 +34,11 @@ def get_ongoing_motion_in_range(start_ts, end_ts, motion_df): motion_df_start_idx=motion_df.ts.searchsorted(start_ts,side='left') motion_df_end_idx=motion_df.ts.searchsorted(end_ts,side='right') - motion_list=motion_df.iloc[motion_df_start_idx:motion_df_end_idx] + filtered_motion_df=motion_df.iloc[motion_df_start_idx:motion_df_end_idx] logging.debug("Found %s motion_activity entries in range %s -> %s" % - (len(motion_list),start_ts, end_ts)) + (len(filtered_motion_df),start_ts, end_ts)) # logging.debug("sample activities are %s" % motion_list[0:5]) - return motion_list + return filtered_motion_df def _is_tracking_restarted_android(transition_df): """ diff --git a/emission/analysis/intake/segmentation/trip_segmentation.py b/emission/analysis/intake/segmentation/trip_segmentation.py index 32e39c0f7..a45868917 100644 --- a/emission/analysis/intake/segmentation/trip_segmentation.py +++ b/emission/analysis/intake/segmentation/trip_segmentation.py @@ -99,12 +99,12 @@ def segment_current_trips(user_id): if len(filters_in_df) == 1: # Common case - let's make it easy - segmentation_points = filter_methods[filters_in_df[0]].segment_into_trips(loc_df,transition_df,motion_df, + segmentation_points = filter_methods[filters_in_df[0]].segment_into_trips(transition_df,motion_df,ts, time_query) else: segmentation_points = get_combined_segmentation_points(ts, loc_df, time_query, filters_in_df, - filter_methods) + filter_methods,transition_df,motion_df) # Create and store trips and places based on the segmentation points if segmentation_points is None: epq.mark_segmentation_failed(user_id) @@ -120,7 +120,7 @@ def segment_current_trips(user_id): logging.exception("Trip generation failed for user %s" % user_id) epq.mark_segmentation_failed(user_id) -def get_combined_segmentation_points(ts, loc_df, time_query, filters_in_df, filter_methods): +def get_combined_segmentation_points(ts, loc_df, time_query, filters_in_df, filter_methods,transition_df,motion_df): """ We can have mixed filters in a particular time range for multiple reasons. a) user switches phones from one platform to another @@ -159,7 +159,7 @@ def get_combined_segmentation_points(ts, loc_df, time_query, filters_in_df, filt time_query.endTs = loc_df.iloc[endIndex+1].ts logging.debug("for filter %s, startTs = %d and endTs = %d" % (curr_filter, time_query.startTs, time_query.endTs)) - segmentation_map[time_query.startTs] = filter_methods[curr_filter].segment_into_trips(ts, time_query) + segmentation_map[time_query.startTs] = filter_methods[curr_filter].segment_into_trips(transition_df,motion_df,ts, time_query) logging.debug("After filtering, segmentation_map has keys %s" % list(segmentation_map.keys())) sortedStartTsList = sorted(segmentation_map.keys()) segmentation_points = [] diff --git a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py index ea53c9abb..79c0f9a76 100644 --- a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py +++ b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_dist_filter.py @@ -38,7 +38,7 @@ def __init__(self, time_threshold, point_threshold, distance_threshold): self.point_threshold = point_threshold self.distance_threshold = distance_threshold - def segment_into_trips(self, timeseries, time_query): + def segment_into_trips(self,transition_df,motion_df,timeseries, time_query): """ Examines the timeseries database for a specific range and returns the segmentation points. Note that the input is the entire timeseries and @@ -48,7 +48,7 @@ def segment_into_trips(self, timeseries, time_query): """ self.filtered_points_df = timeseries.get_data_df("background/filtered_location", time_query) self.filtered_points_df.loc[:,"valid"] = True - self.transition_df = timeseries.get_data_df("statemachine/transition", time_query) + self.transition_df = transition_df if len(self.transition_df) > 0: logging.debug("self.transition_df = %s" % self.transition_df[["fmt_time", "transition"]]) else: @@ -88,7 +88,7 @@ def segment_into_trips(self, timeseries, time_query): # So we reset_index upstream and use it here. last10Points_df = self.filtered_points_df.iloc[max(idx-self.point_threshold, curr_trip_start_point.idx):idx+1] lastPoint = self.find_last_valid_point(idx) - if self.has_trip_ended(lastPoint, currPoint, timeseries): + if self.has_trip_ended(lastPoint, currPoint, timeseries,motion_df): last_trip_end_point = lastPoint logging.debug("Appending last_trip_end_point %s with index %s " % (last_trip_end_point, idx-1)) @@ -144,7 +144,7 @@ def segment_into_trips(self, timeseries, time_query): logging.debug("Found %d transitions after last point, not ending trip..." % len(stopped_moving_after_last)) return segmentation_points - def has_trip_ended(self, lastPoint, currPoint, timeseries): + def has_trip_ended(self, lastPoint, currPoint, timeseries,motion_df): # So we must not have been moving for the last _time filter_ # points. So the trip must have ended # Since this is a distance filter, we detect that the last @@ -173,14 +173,14 @@ def has_trip_ended(self, lastPoint, currPoint, timeseries): # for this kind of test speedThreshold = old_div(float(self.distance_threshold * 2), (old_div(self.time_threshold, 2))) - if eaisr.is_tracking_restarted_in_range(lastPoint.ts, currPoint.ts, timeseries): + if eaisr.is_tracking_restarted_in_range(lastPoint.ts, currPoint.ts, self.transition_df): logging.debug("tracking was restarted, ending trip") return True # In general, we get multiple locations between each motion activity. If we see a bunch of motion activities # between two location points, and there is a large gap between the last location and the first # motion activity as well, let us just assume that there was a restart - ongoing_motion_in_range = eaisr.get_ongoing_motion_in_range(lastPoint.ts, currPoint.ts, timeseries) + ongoing_motion_in_range = eaisr.get_ongoing_motion_in_range(lastPoint.ts, currPoint.ts, motion_df) ongoing_motion_check = len(ongoing_motion_in_range) > 0 if timeDelta > self.time_threshold and not ongoing_motion_check: logging.debug("lastPoint.ts = %s, currPoint.ts = %s, threshold = %s, large gap = %s, ongoing_motion_in_range = %s, ending trip" % diff --git a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py index 34779f0dd..83fcaaf31 100644 --- a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py +++ b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py @@ -55,7 +55,7 @@ def __init__(self, time_threshold, point_threshold, distance_threshold): self.point_threshold = point_threshold self.distance_threshold = distance_threshold - def segment_into_trips(self, filtered_points_pre_ts_diff_df,transition_df,timeseries, time_query): + def segment_into_trips(self,transition_df,motion_df,timeseries, time_query): """ Examines the timeseries database for a specific range and returns the segmentation points. Note that the input is the entire timeseries and @@ -63,6 +63,7 @@ def segment_into_trips(self, filtered_points_pre_ts_diff_df,transition_df,timese data that they want from the sensor streams in order to determine the segmentation points. """ + filtered_points_pre_ts_diff_df = timeseries.get_data_df("background/filtered_location", time_query) # Sometimes, we can get bogus points because data.ts and # metadata.write_ts are off by a lot. If we don't do this, we end up # appearing to travel back in time @@ -129,7 +130,7 @@ def segment_into_trips(self, filtered_points_pre_ts_diff_df,transition_df,timese logging.debug("last5MinsTimes.max() = %s, time_threshold = %s" % (last5MinTimes.max() if len(last5MinTimes) > 0 else np.NaN, self.time_threshold)) - if self.has_trip_ended(prevPoint, currPoint, timeseries, last10PointsDistances, last5MinsDistances, last5MinTimes,transition_df): + if self.has_trip_ended(prevPoint, currPoint, timeseries, last10PointsDistances, last5MinsDistances, last5MinTimes,transition_df,motion_df): (ended_before_this, last_trip_end_point) = self.get_last_trip_end_point(filtered_points_df, last10Points_df, last5MinsPoints_df) segmentation_points.append((curr_trip_start_point, last_trip_end_point)) @@ -198,7 +199,7 @@ def continue_just_ended(self, idx, currPoint, filtered_points_df): else: return False - def has_trip_ended(self, prev_point, curr_point, motion_df, last10PointsDistances, last5MinsDistances, last5MinTimes,transition_df): + def has_trip_ended(self, prev_point, curr_point, timeseries, last10PointsDistances, last5MinsDistances, last5MinTimes,transition_df,motion_df): # Another mismatch between phone and server. Phone stops tracking too soon, # so the distance is still greater than the threshold at the end of the trip. # But then the next point is a long time away, so we can split again (similar to a distance filter) diff --git a/emission/analysis/intake/segmentation/trip_segmentation_methods/trip_end_detection_corner_cases.py b/emission/analysis/intake/segmentation/trip_segmentation_methods/trip_end_detection_corner_cases.py index c329fe69d..76f13e2c9 100644 --- a/emission/analysis/intake/segmentation/trip_segmentation_methods/trip_end_detection_corner_cases.py +++ b/emission/analysis/intake/segmentation/trip_segmentation_methods/trip_end_detection_corner_cases.py @@ -23,8 +23,8 @@ def is_huge_invalid_ts_offset(filterMethod, lastPoint, currPoint, timeseries, ecwm.MotionTypes.NONE.value, ecwm.MotionTypes.STOPPED_WHILE_IN_VEHICLE.value] - non_still_motions = [ma for ma in motionInRange if ma["data"]["type"] not in ignore_modes_list and ma["data"]["confidence"] == 100] - logging.debug("non_still_motions = %s" % [(ecwm.MotionTypes(ma["data"]["type"]), ma["data"]["confidence"], ma["data"]["fmt_time"]) for ma in non_still_motions]) + non_still_motions=motionInRange[~motionInRange['type'].isin(ignore_modes_list) & (motionInRange['confidence'] ==100)] + #logging.debug("non_still_motions = %s" % [(ecwm.MotionTypes(ma["data"]["type"]), ma["data"]["confidence"], ma["data"]["fmt_time"]) for ma in non_still_motions]) non_still_motions_rate = len(non_still_motions) / (currPoint.ts - lastPoint.ts) diff --git a/emission/tests/analysisTests/intakeTests/TestTripSegmentation.py b/emission/tests/analysisTests/intakeTests/TestTripSegmentation.py index 0cc469fea..de8423365 100644 --- a/emission/tests/analysisTests/intakeTests/TestTripSegmentation.py +++ b/emission/tests/analysisTests/intakeTests/TestTripSegmentation.py @@ -13,7 +13,7 @@ import uuid import emission.storage.json_wrappers as esj import os - +from timeit import default_timer as timer # Our imports import emission.core.get_database as edb import emission.storage.timeseries.timequery as estt @@ -68,10 +68,12 @@ def testEmptyCall(self): def testSegmentationPointsDwellSegmentationTimeFilter(self): ts = esta.TimeSeries.get_time_series(self.androidUUID) tq = estt.TimeQuery("metadata.write_ts", 1440658800, 1440745200) + transition_df = ts.get_data_df("statemachine/transition", tq) + motion_df = ts.get_data_df("background/motion_activity",tq) dstfsm = dstf.DwellSegmentationTimeFilter(time_threshold = 5 * 60, # 5 mins point_threshold = 10, distance_threshold = 100) # 100 m - segmentation_points = dstfsm.segment_into_trips(ts, tq) + segmentation_points = dstfsm.segment_into_trips(transition_df,motion_df,ts, tq) for (start, end) in segmentation_points: logging.debug("trip is from %s (%f) -> %s (%f)" % (start.fmt_time, start.ts, end.fmt_time, end.ts)) self.assertIsNotNone(segmentation_points) @@ -86,10 +88,12 @@ def testSegmentationPointsDwellSegmentationTimeFilter(self): def testSegmentationPointsDwellSegmentationDistFilter(self): ts = esta.TimeSeries.get_time_series(self.iosUUID) tq = estt.TimeQuery("metadata.write_ts", 1446796800, 1446847600) + transition_df = ts.get_data_df("statemachine/transition", tq) + motion_df = ts.get_data_df("background/motion_activity",tq) dstdsm = dsdf.DwellSegmentationDistFilter(time_threshold = 10 * 60, # 5 mins point_threshold = 10, distance_threshold = 100) # 100 m - segmentation_points = dstdsm.segment_into_trips(ts, tq) + segmentation_points = dstdsm.segment_into_trips(transition_df,motion_df,ts, tq) for (start, end) in segmentation_points: logging.debug("trip is from %s (%f) -> %s (%f)" % (start.fmt_time, start.ts, end.fmt_time, end.ts)) self.assertIsNotNone(segmentation_points) @@ -101,7 +105,10 @@ def testSegmentationPointsDwellSegmentationDistFilter(self): def testSegmentationWrapperAndroid(self): + start=timer() eaist.segment_current_trips(self.androidUUID) + end=timer() + logging.debug(f"ElapsedAndroid{end-start}") # The previous line should have created places and trips and stored # them into the database. Now, we want to query to ensure that they # were created correctly. @@ -142,7 +149,10 @@ def testSegmentationWrapperAndroid(self): self.assertIsNotNone(place0.data.location) def testSegmentationWrapperIOS(self): + start=timer() eaist.segment_current_trips(self.iosUUID) + end=timer() + logging.debug(f"ElapsedIos{end-start}") # The previous line should have created places and trips and stored # them into the database. Now, we want to query to ensure that they # were created correctly. @@ -193,8 +203,10 @@ def testSegmentationWrapperCombined(self): # Now, segment the data for the combined UUID, which will include both # android and ios + start=timer() eaist.segment_current_trips(self.androidUUID) - + end=timer() + logging.debug(f"ElapsedCOmbined{end-start}") tq_place = estt.TimeQuery("data.enter_ts", 1440658800, 1446847600) created_places_entries = esda.get_entries(esda.RAW_PLACE_KEY, self.androidUUID, tq_place)