Skip to content

Commit

Permalink
[TESTED] Improving IosWrapper and CombinedWrappe for trip Segmentation
Browse files Browse the repository at this point in the history
  • Loading branch information
humbleOldSage committed Jan 25, 2024
1 parent 650d4d8 commit 752a4d5
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 22 deletions.
6 changes: 3 additions & 3 deletions emission/analysis/intake/segmentation/restart_checking.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ def get_ongoing_motion_in_range(start_ts, end_ts, motion_df):

motion_df_start_idx=motion_df.ts.searchsorted(start_ts,side='left')
motion_df_end_idx=motion_df.ts.searchsorted(end_ts,side='right')
motion_list=motion_df.iloc[motion_df_start_idx:motion_df_end_idx]
filtered_motion_df=motion_df.iloc[motion_df_start_idx:motion_df_end_idx]
logging.debug("Found %s motion_activity entries in range %s -> %s" %
(len(motion_list),start_ts, end_ts))
(len(filtered_motion_df),start_ts, end_ts))
# logging.debug("sample activities are %s" % motion_list[0:5])
return motion_list
return filtered_motion_df

def _is_tracking_restarted_android(transition_df):
"""
Expand Down
8 changes: 4 additions & 4 deletions emission/analysis/intake/segmentation/trip_segmentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,12 @@ def segment_current_trips(user_id):
if len(filters_in_df) == 1:
# Common case - let's make it easy

segmentation_points = filter_methods[filters_in_df[0]].segment_into_trips(loc_df,transition_df,motion_df,
segmentation_points = filter_methods[filters_in_df[0]].segment_into_trips(transition_df,motion_df,ts,
time_query)
else:
segmentation_points = get_combined_segmentation_points(ts, loc_df, time_query,
filters_in_df,
filter_methods)
filter_methods,transition_df,motion_df)
# Create and store trips and places based on the segmentation points
if segmentation_points is None:
epq.mark_segmentation_failed(user_id)
Expand All @@ -120,7 +120,7 @@ def segment_current_trips(user_id):
logging.exception("Trip generation failed for user %s" % user_id)
epq.mark_segmentation_failed(user_id)

def get_combined_segmentation_points(ts, loc_df, time_query, filters_in_df, filter_methods):
def get_combined_segmentation_points(ts, loc_df, time_query, filters_in_df, filter_methods,transition_df,motion_df):
"""
We can have mixed filters in a particular time range for multiple reasons.
a) user switches phones from one platform to another
Expand Down Expand Up @@ -159,7 +159,7 @@ def get_combined_segmentation_points(ts, loc_df, time_query, filters_in_df, filt
time_query.endTs = loc_df.iloc[endIndex+1].ts
logging.debug("for filter %s, startTs = %d and endTs = %d" %
(curr_filter, time_query.startTs, time_query.endTs))
segmentation_map[time_query.startTs] = filter_methods[curr_filter].segment_into_trips(ts, time_query)
segmentation_map[time_query.startTs] = filter_methods[curr_filter].segment_into_trips(transition_df,motion_df,ts, time_query)
logging.debug("After filtering, segmentation_map has keys %s" % list(segmentation_map.keys()))
sortedStartTsList = sorted(segmentation_map.keys())
segmentation_points = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def __init__(self, time_threshold, point_threshold, distance_threshold):
self.point_threshold = point_threshold
self.distance_threshold = distance_threshold

def segment_into_trips(self, timeseries, time_query):
def segment_into_trips(self,transition_df,motion_df,timeseries, time_query):
"""
Examines the timeseries database for a specific range and returns the
segmentation points. Note that the input is the entire timeseries and
Expand All @@ -48,7 +48,7 @@ def segment_into_trips(self, timeseries, time_query):
"""
self.filtered_points_df = timeseries.get_data_df("background/filtered_location", time_query)
self.filtered_points_df.loc[:,"valid"] = True
self.transition_df = timeseries.get_data_df("statemachine/transition", time_query)
self.transition_df = transition_df
if len(self.transition_df) > 0:
logging.debug("self.transition_df = %s" % self.transition_df[["fmt_time", "transition"]])
else:
Expand Down Expand Up @@ -88,7 +88,7 @@ def segment_into_trips(self, timeseries, time_query):
# So we reset_index upstream and use it here.
last10Points_df = self.filtered_points_df.iloc[max(idx-self.point_threshold, curr_trip_start_point.idx):idx+1]
lastPoint = self.find_last_valid_point(idx)
if self.has_trip_ended(lastPoint, currPoint, timeseries):
if self.has_trip_ended(lastPoint, currPoint, timeseries,motion_df):
last_trip_end_point = lastPoint
logging.debug("Appending last_trip_end_point %s with index %s " %
(last_trip_end_point, idx-1))
Expand Down Expand Up @@ -144,7 +144,7 @@ def segment_into_trips(self, timeseries, time_query):
logging.debug("Found %d transitions after last point, not ending trip..." % len(stopped_moving_after_last))
return segmentation_points

def has_trip_ended(self, lastPoint, currPoint, timeseries):
def has_trip_ended(self, lastPoint, currPoint, timeseries,motion_df):
# So we must not have been moving for the last _time filter_
# points. So the trip must have ended
# Since this is a distance filter, we detect that the last
Expand Down Expand Up @@ -173,14 +173,14 @@ def has_trip_ended(self, lastPoint, currPoint, timeseries):
# for this kind of test
speedThreshold = old_div(float(self.distance_threshold * 2), (old_div(self.time_threshold, 2)))

if eaisr.is_tracking_restarted_in_range(lastPoint.ts, currPoint.ts, timeseries):
if eaisr.is_tracking_restarted_in_range(lastPoint.ts, currPoint.ts, self.transition_df):
logging.debug("tracking was restarted, ending trip")
return True

# In general, we get multiple locations between each motion activity. If we see a bunch of motion activities
# between two location points, and there is a large gap between the last location and the first
# motion activity as well, let us just assume that there was a restart
ongoing_motion_in_range = eaisr.get_ongoing_motion_in_range(lastPoint.ts, currPoint.ts, timeseries)
ongoing_motion_in_range = eaisr.get_ongoing_motion_in_range(lastPoint.ts, currPoint.ts, motion_df)
ongoing_motion_check = len(ongoing_motion_in_range) > 0
if timeDelta > self.time_threshold and not ongoing_motion_check:
logging.debug("lastPoint.ts = %s, currPoint.ts = %s, threshold = %s, large gap = %s, ongoing_motion_in_range = %s, ending trip" %
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,15 @@ def __init__(self, time_threshold, point_threshold, distance_threshold):
self.point_threshold = point_threshold
self.distance_threshold = distance_threshold

def segment_into_trips(self, filtered_points_pre_ts_diff_df,transition_df,timeseries, time_query):
def segment_into_trips(self,transition_df,motion_df,timeseries, time_query):
"""
Examines the timeseries database for a specific range and returns the
segmentation points. Note that the input is the entire timeseries and
the time range. This allows algorithms to use whatever combination of
data that they want from the sensor streams in order to determine the
segmentation points.
"""
filtered_points_pre_ts_diff_df = timeseries.get_data_df("background/filtered_location", time_query)
# Sometimes, we can get bogus points because data.ts and
# metadata.write_ts are off by a lot. If we don't do this, we end up
# appearing to travel back in time
Expand Down Expand Up @@ -129,7 +130,7 @@ def segment_into_trips(self, filtered_points_pre_ts_diff_df,transition_df,timese
logging.debug("last5MinsTimes.max() = %s, time_threshold = %s" %
(last5MinTimes.max() if len(last5MinTimes) > 0 else np.NaN, self.time_threshold))

if self.has_trip_ended(prevPoint, currPoint, timeseries, last10PointsDistances, last5MinsDistances, last5MinTimes,transition_df):
if self.has_trip_ended(prevPoint, currPoint, timeseries, last10PointsDistances, last5MinsDistances, last5MinTimes,transition_df,motion_df):
(ended_before_this, last_trip_end_point) = self.get_last_trip_end_point(filtered_points_df,
last10Points_df, last5MinsPoints_df)
segmentation_points.append((curr_trip_start_point, last_trip_end_point))
Expand Down Expand Up @@ -198,7 +199,7 @@ def continue_just_ended(self, idx, currPoint, filtered_points_df):
else:
return False

def has_trip_ended(self, prev_point, curr_point, motion_df, last10PointsDistances, last5MinsDistances, last5MinTimes,transition_df):
def has_trip_ended(self, prev_point, curr_point, timeseries, last10PointsDistances, last5MinsDistances, last5MinTimes,transition_df,motion_df):
# Another mismatch between phone and server. Phone stops tracking too soon,
# so the distance is still greater than the threshold at the end of the trip.
# But then the next point is a long time away, so we can split again (similar to a distance filter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ def is_huge_invalid_ts_offset(filterMethod, lastPoint, currPoint, timeseries,
ecwm.MotionTypes.NONE.value,
ecwm.MotionTypes.STOPPED_WHILE_IN_VEHICLE.value]

non_still_motions = [ma for ma in motionInRange if ma["data"]["type"] not in ignore_modes_list and ma["data"]["confidence"] == 100]
logging.debug("non_still_motions = %s" % [(ecwm.MotionTypes(ma["data"]["type"]), ma["data"]["confidence"], ma["data"]["fmt_time"]) for ma in non_still_motions])
non_still_motions=motionInRange[~motionInRange['type'].isin(ignore_modes_list) & (motionInRange['confidence'] ==100)]
#logging.debug("non_still_motions = %s" % [(ecwm.MotionTypes(ma["data"]["type"]), ma["data"]["confidence"], ma["data"]["fmt_time"]) for ma in non_still_motions])

non_still_motions_rate = len(non_still_motions) / (currPoint.ts - lastPoint.ts)

Expand Down
20 changes: 16 additions & 4 deletions emission/tests/analysisTests/intakeTests/TestTripSegmentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import uuid
import emission.storage.json_wrappers as esj
import os

from timeit import default_timer as timer
# Our imports
import emission.core.get_database as edb
import emission.storage.timeseries.timequery as estt
Expand Down Expand Up @@ -68,10 +68,12 @@ def testEmptyCall(self):
def testSegmentationPointsDwellSegmentationTimeFilter(self):
ts = esta.TimeSeries.get_time_series(self.androidUUID)
tq = estt.TimeQuery("metadata.write_ts", 1440658800, 1440745200)
transition_df = ts.get_data_df("statemachine/transition", tq)
motion_df = ts.get_data_df("background/motion_activity",tq)
dstfsm = dstf.DwellSegmentationTimeFilter(time_threshold = 5 * 60, # 5 mins
point_threshold = 10,
distance_threshold = 100) # 100 m
segmentation_points = dstfsm.segment_into_trips(ts, tq)
segmentation_points = dstfsm.segment_into_trips(transition_df,motion_df,ts, tq)
for (start, end) in segmentation_points:
logging.debug("trip is from %s (%f) -> %s (%f)" % (start.fmt_time, start.ts, end.fmt_time, end.ts))
self.assertIsNotNone(segmentation_points)
Expand All @@ -86,10 +88,12 @@ def testSegmentationPointsDwellSegmentationTimeFilter(self):
def testSegmentationPointsDwellSegmentationDistFilter(self):
ts = esta.TimeSeries.get_time_series(self.iosUUID)
tq = estt.TimeQuery("metadata.write_ts", 1446796800, 1446847600)
transition_df = ts.get_data_df("statemachine/transition", tq)
motion_df = ts.get_data_df("background/motion_activity",tq)
dstdsm = dsdf.DwellSegmentationDistFilter(time_threshold = 10 * 60, # 5 mins
point_threshold = 10,
distance_threshold = 100) # 100 m
segmentation_points = dstdsm.segment_into_trips(ts, tq)
segmentation_points = dstdsm.segment_into_trips(transition_df,motion_df,ts, tq)
for (start, end) in segmentation_points:
logging.debug("trip is from %s (%f) -> %s (%f)" % (start.fmt_time, start.ts, end.fmt_time, end.ts))
self.assertIsNotNone(segmentation_points)
Expand All @@ -101,7 +105,10 @@ def testSegmentationPointsDwellSegmentationDistFilter(self):


def testSegmentationWrapperAndroid(self):
start=timer()
eaist.segment_current_trips(self.androidUUID)
end=timer()
logging.debug(f"ElapsedAndroid{end-start}")
# The previous line should have created places and trips and stored
# them into the database. Now, we want to query to ensure that they
# were created correctly.
Expand Down Expand Up @@ -142,7 +149,10 @@ def testSegmentationWrapperAndroid(self):
self.assertIsNotNone(place0.data.location)

def testSegmentationWrapperIOS(self):
start=timer()
eaist.segment_current_trips(self.iosUUID)
end=timer()
logging.debug(f"ElapsedIos{end-start}")
# The previous line should have created places and trips and stored
# them into the database. Now, we want to query to ensure that they
# were created correctly.
Expand Down Expand Up @@ -193,8 +203,10 @@ def testSegmentationWrapperCombined(self):

# Now, segment the data for the combined UUID, which will include both
# android and ios
start=timer()
eaist.segment_current_trips(self.androidUUID)

end=timer()
logging.debug(f"ElapsedCOmbined{end-start}")
tq_place = estt.TimeQuery("data.enter_ts", 1440658800, 1446847600)
created_places_entries = esda.get_entries(esda.RAW_PLACE_KEY,
self.androidUUID, tq_place)
Expand Down

0 comments on commit 752a4d5

Please sign in to comment.