Skip to content

Commit

Permalink
Merge pull request #965 from JGreenlee/ble_matching
Browse files Browse the repository at this point in the history
🎇 BLE matching in the pipeline
  • Loading branch information
shankari authored May 5, 2024
2 parents bc2d7b6 + 2059f9a commit cc20ac4
Show file tree
Hide file tree
Showing 12 changed files with 475 additions and 333 deletions.
30 changes: 30 additions & 0 deletions emission/analysis/configs/dynamic_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import sys
import os
import logging

import json
import requests

STUDY_CONFIG = os.getenv('STUDY_CONFIG', "stage-program")

dynamic_config = None
def get_dynamic_config():
global dynamic_config
if dynamic_config is not None:
logging.debug("Returning cached dynamic config for %s at version %s" % (STUDY_CONFIG, dynamic_config['version']))
return dynamic_config
logging.debug("No cached dynamic config for %s, downloading from server" % STUDY_CONFIG)
download_url = "https://raw.githubusercontent.com/e-mission/nrel-openpath-deploy-configs/main/configs/" + STUDY_CONFIG + ".nrel-op.json"
logging.debug("About to download config from %s" % download_url)
r = requests.get(download_url)
if r.status_code != 200:
logging.debug(f"Unable to download study config, status code: {r.status_code}")
# sys.exit(1)
# TODO what to do here? What if Github is down or something?
# If we terminate, will the pipeline just try again later?
else:
dynamic_config = json.loads(r.text)
logging.debug(f"Successfully downloaded config with version {dynamic_config['version']} "\
f"for {dynamic_config['intro']['translated_text']['en']['deployment_name']} "\
f"and data collection URL {dynamic_config['server']['connectUrl']}")
return dynamic_config
17 changes: 15 additions & 2 deletions emission/analysis/intake/segmentation/section_segmentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import logging

# Our imports
import emission.analysis.configs.dynamic_config as eadc
import emission.storage.pipeline_queries as epq
import emission.storage.decorations.analysis_timeseries_queries as esda

Expand All @@ -22,6 +23,7 @@
import emission.core.wrapper.entry as ecwe

import emission.core.common as ecc
import emcommon.bluetooth.ble_matching as emcble

class SectionSegmentationMethod(object):
def segment_into_sections(self, timeseries, distance_from_place, time_query):
Expand Down Expand Up @@ -64,6 +66,7 @@ def segment_trip_into_sections(user_id, trip_entry, trip_source):
ts = esta.TimeSeries.get_time_series(user_id)
time_query = esda.get_time_query_for_trip_like(esda.RAW_TRIP_KEY, trip_entry.get_id())
distance_from_place = _get_distance_from_start_place_to_end(trip_entry)
ble_entries_during_trip = ts.find_entries(["background/bluetooth_ble"], time_query)

if (trip_source == "DwellSegmentationTimeFilter"):
import emission.analysis.intake.segmentation.section_segmentation_methods.smoothed_high_confidence_motion as shcm
Expand Down Expand Up @@ -118,7 +121,16 @@ def segment_trip_into_sections(user_id, trip_entry, trip_source):
# Particularly in this case, if we don't do this, then the trip end may overshoot the section end
end_loc = trip_end_loc

fill_section(section, start_loc, end_loc, sensed_mode)
# ble_sensed_mode represents the vehicle that was sensed via BLE beacon during the section.
# For now, we are going to rely on the current segmentation implementation and then fill in
# ble_sensed_mode by looking at scans within the timestamp range of the section.
# Later, we may want to actually use BLE sensor data as part of the basis for segmentation
dynamic_config = eadc.get_dynamic_config()
ble_sensed_mode = emcble.get_ble_sensed_vehicle_for_section(
ble_entries_during_trip, start_loc.ts, end_loc.ts, dynamic_config
)

fill_section(section, start_loc, end_loc, sensed_mode, ble_sensed_mode)
# We create the entry after filling in the section so that we know
# that the data is included properly
section_entry = ecwe.Entry.create_entry(user_id, esda.RAW_SECTION_KEY,
Expand All @@ -143,7 +155,7 @@ def segment_trip_into_sections(user_id, trip_entry, trip_source):
prev_section_entry = section_entry


def fill_section(section, start_loc, end_loc, sensed_mode):
def fill_section(section, start_loc, end_loc, sensed_mode, ble_sensed_mode=None):
section.start_ts = start_loc.ts
section.start_local_dt = start_loc.local_dt
section.start_fmt_time = start_loc.fmt_time
Expand All @@ -161,6 +173,7 @@ def fill_section(section, start_loc, end_loc, sensed_mode):
section.duration = end_loc.ts - start_loc.ts
section.source = "SmoothedHighConfidenceMotion"
section.sensed_mode = sensed_mode
section.ble_sensed_mode = ble_sensed_mode


def stitch_together(ending_section_entry, stop_entry, starting_section_entry):
Expand Down
18 changes: 11 additions & 7 deletions emission/analysis/userinput/matcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,13 @@ def create_and_link_timeline(ts, timeline, last_confirmed_place):
confirmed_places, confirmed_trips)
return confirmed_tl

def get_section_summary(ts, cleaned_trip, section_key):
def get_section_summary(ts, cleaned_trip, section_key, mode_prop="sensed_mode"):
"""
Returns the proportions of the distance, duration and count for each mode
in this trip. Note that sections are unimodal by definition.
cleaned_trip: the cleaned trip object associated with the sections
section_key: 'inferred_section' or 'cleaned_section'
mode_prop: the section property used as the basis for mode: 'sensed_mode' or 'ble_sensed_mode'.
"""
logging.debug(f"get_section_summary({cleaned_trip['_id']}, {section_key}) called")
sections = esdt.get_sections_for_trip(key = section_key,
Expand All @@ -207,12 +208,14 @@ def get_section_summary(ts, cleaned_trip, section_key):
logging.warning("While getting section summary, section length = 0. This should never happen, but let's not crash if it does")
return {"distance": {}, "duration": {}, "count": {}}
sections_df = ts.to_data_df(section_key, sections)
cleaned_section_mapper = lambda sm: ecwm.MotionTypes(sm).name
inferred_section_mapper = lambda sm: ecwmp.PredictedModeTypes(sm).name
sel_section_mapper = cleaned_section_mapper \
if section_key == "analysis/cleaned_section" else inferred_section_mapper
sections_df["sensed_mode_str"] = sections_df["sensed_mode"].apply(sel_section_mapper)
grouped_section_df = sections_df.groupby("sensed_mode_str")
if mode_prop == "ble_sensed_mode":
mapper = lambda bsm: bsm['baseMode'] if bsm is not None else ecwm.MotionTypes.UNKNOWN.name
elif section_key == "analysis/cleaned_section":
mapper = lambda sm: ecwm.MotionTypes(sm).name
else:
mapper = lambda sm: ecwmp.PredictedModeTypes(sm).name
sections_df[mode_prop + "_str"] = sections_df[mode_prop].apply(mapper)
grouped_section_df = sections_df.groupby(mode_prop + "_str")
retVal = {
"distance": grouped_section_df.distance.sum().to_dict(),
"duration": grouped_section_df.duration.sum().to_dict(),
Expand All @@ -233,6 +236,7 @@ def create_confirmed_entry(ts, tce, confirmed_key, input_key_list):
tce["data"]["cleaned_trip"])
confirmed_object_data['inferred_section_summary'] = get_section_summary(ts, cleaned_trip, "analysis/inferred_section")
confirmed_object_data['cleaned_section_summary'] = get_section_summary(ts, cleaned_trip, "analysis/cleaned_section")
confirmed_object_data['ble_sensed_summary'] = get_section_summary(ts, cleaned_trip, "analysis/inferred_section", mode_prop="ble_sensed_mode")
elif (confirmed_key == esda.CONFIRMED_PLACE_KEY):
confirmed_object_data["cleaned_place"] = tce.get_id()
confirmed_object_data["user_input"] = \
Expand Down
1 change: 1 addition & 0 deletions emission/core/wrapper/confirmedtrip.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class Confirmedtrip(ecwt.Trip):
"expected_trip": ecwb.WrapperBase.Access.WORM,
"inferred_section_summary": ecwb.WrapperBase.Access.WORM,
"cleaned_section_summary": ecwb.WrapperBase.Access.WORM,
"ble_sensed_summary": ecwb.WrapperBase.Access.WORM,
# the user input will have all `manual/*` entries
# let's make that be somewhat flexible instead of hardcoding into the data model
"user_input": ecwb.WrapperBase.Access.WORM,
Expand Down
1 change: 1 addition & 0 deletions emission/core/wrapper/section.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class Section(ecwb.WrapperBase):
"end_loc": ecwb.WrapperBase.Access.WORM, # location of end point in geojson format
"duration": ecwb.WrapperBase.Access.WORM, # duration of the trip in secs
"sensed_mode": ecwb.WrapperBase.Access.WORM, # the sensed mode used for the segmentation
"ble_sensed_mode": ecwb.WrapperBase.Access.WORM, # the mode sensed from BLE beacon scans
"source": ecwb.WrapperBase.Access.WORM} # the method used to generate this trip

enums = {"sensed_mode": ecwm.MotionTypes}
Expand Down
2 changes: 2 additions & 0 deletions emission/tests/analysisTests/userInputTests/TestUserInput.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ def compare_confirmed_objs_result(self, result_dicts, expect_dicts, manual_keys
self.assertEqual(rt.data["inferred_section_summary"], et.data["inferred_section_summary"])
if "cleaned_section_summary" in et.data:
self.assertEqual(rt.data["cleaned_section_summary"], et.data["cleaned_section_summary"])
if 'ble_sensed_summary' in et.data:
self.assertEqual(rt.data["ble_sensed_summary"], et.data["ble_sensed_summary"])
logging.debug(20 * "=")

def compare_section_result(self, result, expect):
Expand Down
Loading

0 comments on commit cc20ac4

Please sign in to comment.