From 7e5aacb6af1f0b7cce2a2e537c1e227ae23ce64f Mon Sep 17 00:00:00 2001 From: Shankari Date: Sun, 13 Aug 2023 16:20:28 -0700 Subject: [PATCH 01/11] :zap: Skip the future pipeline stages if we have new data Without this change, even if we have no new entries, we run through the rest of the pipeline. Some of the queries for the rest of the pipeline can be fairly slow - taking ~ 30 secs to complete. With ~ 100 people per program, even if we had no new data, we will take 100 * 30 = 3000 secs = 50 minutes to process all the data. Instead, let's do an early return if we have no new data. This allows us to hopefully cut that runtime down significantly. This is a partial fix for https://github.com/e-mission/e-mission-docs/issues/942#issuecomment-1674108246 Testing done: - Added new test case which: - runs the pipeline on an empty dataset and checks that none of the states are updated - runs the pipeline on a non-empty dataset with no new data and checks that none of the states are updated - Test passes ``` ---------------------------------------------------------------------- Ran 1 test in 2.361s OK ``` --- .../usercache/abstract_usercache_handler.py | 2 + .../usercache/builtin_usercache_handler.py | 3 +- emission/pipeline/intake_stage.py | 8 ++- .../intakeTests/TestPipelineCornerCases.py | 54 ++++++++++++++++++- 4 files changed, 64 insertions(+), 3 deletions(-) diff --git a/emission/net/usercache/abstract_usercache_handler.py b/emission/net/usercache/abstract_usercache_handler.py index 58c559d14..9f4f835f5 100644 --- a/emission/net/usercache/abstract_usercache_handler.py +++ b/emission/net/usercache/abstract_usercache_handler.py @@ -28,6 +28,8 @@ def moveToLongTerm(self): """ Moves all messages that have arrived for the current user into long-term storage, after converting into a platform-independent format. + Returns the number of entries that were moved so that we can return + early if this is zero. """ pass diff --git a/emission/net/usercache/builtin_usercache_handler.py b/emission/net/usercache/builtin_usercache_handler.py index 61243a844..f61d5469c 100644 --- a/emission/net/usercache/builtin_usercache_handler.py +++ b/emission/net/usercache/builtin_usercache_handler.py @@ -58,7 +58,7 @@ def moveToLongTerm(self): # Since we didn't get the current time range, there is no current # state, so we don't need to mark it as done # esp.mark_usercache_done(None) - return + return 0 time_query = esp.get_time_range_for_usercache(self.user_id) @@ -99,6 +99,7 @@ def moveToLongTerm(self): logging.debug("Deleting all entries for query %s" % time_query) uc.clearProcessedMessages(time_query) esp.mark_usercache_done(self.user_id, last_ts_processed) + return len(unified_entry_list) def storeViewsToCache(self): """ diff --git a/emission/pipeline/intake_stage.py b/emission/pipeline/intake_stage.py index 2d1f89889..ec6643ff7 100644 --- a/emission/pipeline/intake_stage.py +++ b/emission/pipeline/intake_stage.py @@ -87,11 +87,17 @@ def run_intake_pipeline_for_user(uuid): with ect.Timer() as uct: logging.info("*" * 10 + "UUID %s: moving to long term" % uuid + "*" * 10) print(str(arrow.now()) + "*" * 10 + "UUID %s: moving to long term" % uuid + "*" * 10) - uh.moveToLongTerm() + new_entry_count = uh.moveToLongTerm() esds.store_pipeline_time(uuid, ecwp.PipelineStages.USERCACHE.name, time.time(), uct.elapsed) + if new_entry_count == 0: + print("No new entries, skipping the rest of the pipeline") + return + else: + print("New entry count == %s, continuing" % new_entry_count) + with ect.Timer() as uit: logging.info("*" * 10 + "UUID %s: updating incoming user inputs" % uuid + "*" * 10) print(str(arrow.now()) + "*" * 10 + "UUID %s: updating incoming user inputs" % uuid + "*" * 10) diff --git a/emission/tests/analysisTests/intakeTests/TestPipelineCornerCases.py b/emission/tests/analysisTests/intakeTests/TestPipelineCornerCases.py index 803f1a446..05911986a 100644 --- a/emission/tests/analysisTests/intakeTests/TestPipelineCornerCases.py +++ b/emission/tests/analysisTests/intakeTests/TestPipelineCornerCases.py @@ -13,11 +13,12 @@ import emission.core.common as ecc import emission.core.wrapper.pipelinestate as ewps import emission.storage.pipeline_queries as epq +import emission.pipeline.intake_stage as epi # Test imports import emission.tests.common as etc -class TestPipelineRealData(unittest.TestCase): +class TestPipelineCornerCases(unittest.TestCase): def setUp(self): self.testUUID = uuid.uuid4() logging.info("setUp complete") @@ -69,6 +70,57 @@ def testAllDefinedPipelineStates(self): self.assertIsNotNone(ps['user_id']) self.assertIn(ps['pipeline_stage'], valid_states) + def testSkipPipelineNoNewEntries(self): + # we start with no pipeline states for this user + all_pipeline_states = edb.get_pipeline_state_db().find() + stages_skipped_in_testing = [ + ewps.PipelineStages.USERCACHE, + ewps.PipelineStages.TRIP_MODEL, + ewps.PipelineStages.TOUR_MODEL, + ewps.PipelineStages.ALTERNATIVES, + ewps.PipelineStages.USER_MODEL, + ewps.PipelineStages.RECOMMENDATION, + ewps.PipelineStages.OUTPUT_GEN] + test_run_states = list([pse.value for pse in + filter(lambda pse: pse not in stages_skipped_in_testing, + ewps.PipelineStages.__iter__())]) + curr_user_states = list(filter(lambda ps: ps["user_id"] == self.testUUID, + all_pipeline_states)) + self.assertEqual(len(curr_user_states), 0) + # next, we run the real pipeline, and end up with no entries + # and we have an early return in that case + print("-" * 10, "Running real pipeline on empty DB, expecting no change", "-" * 10) + epi.run_intake_pipeline_for_user(self.testUUID) + all_pipeline_states = edb.get_pipeline_state_db().find() + curr_user_states = list(filter(lambda ps: ps["user_id"] == self.testUUID, + all_pipeline_states)) + self.assertEqual(len(curr_user_states), 0) + # Now we load some data and run the test pipeline + # which will generate some pipeline states + print("-" * 10, "Running test pipeline on real data, expecting states to be set", "-" * 10) + etc.setupRealExample(self, "emission/tests/data/real_examples/shankari_2016-07-25") + etc.runIntakePipeline(self.testUUID) + + all_pipeline_states_after_run = edb.get_pipeline_state_db().find() + curr_user_states_after_run = list(filter(lambda ps: ps["user_id"] == self.testUUID, + all_pipeline_states_after_run)) + states_set = [ps['pipeline_stage'] for ps in curr_user_states_after_run] + print("-" * 10, "test run stages are ", test_run_states, "-" * 10) + print("-" * 10, "states that are set are ", states_set, "-" * 10) + self.assertGreater(len(curr_user_states_after_run), 0) + self.assertEqual(sorted(states_set), sorted(test_run_states)) + # then we run the real pipeline again + # We expect to see no changes between the first and the second run + # because of the usercache skip + all_pipeline_states_after_test_run = edb.get_pipeline_state_db().find() + curr_user_states_after_test_run = list(filter(lambda ps: ps["user_id"] == self.testUUID, + all_pipeline_states_after_test_run)) + get_last_processed = lambda ps: ps['last_processed_ts'] + get_last_run = lambda ps: ps['last_ts_run'] + self.assertEqual(list(map(get_last_processed, curr_user_states_after_run)), + list(map(get_last_processed, curr_user_states_after_test_run))) + self.assertEqual(list(map(get_last_run, curr_user_states_after_run)), + list(map(get_last_run, curr_user_states_after_test_run))) if __name__ == '__main__': etc.configLogging() From 63fb9c8012294dbd12b43e4137a7e05506f895bb Mon Sep 17 00:00:00 2001 From: Shankari Date: Sun, 13 Aug 2023 18:12:51 -0700 Subject: [PATCH 02/11] :wrench: add a command line option to decide whether to end early if there is no new data In 7e5aacb6af1f0b7cce2a2e537c1e227ae23ce64f, we configured the pipeline to return early if there is no new data. That is good for performance, but can have some limitations if we make changes to the pipeline to address errors - the modified stage will not run until there is new data. Just to be on the safe side, add a command line option to control this feature, and plumb it through the script ('intake_multiprocess' -> 'scheduler' -> 'intake') with a default of 'False' to retain the original behavior. This is a partial fix of https://github.com/e-mission/e-mission-docs/issues/942#issuecomment-1674108246 We were then able to change the test to run the pipeline with 'False' instead of running the test pipeline. This added the 'OUTPUT_GEN' step to the set of valid states. After adding that, the test passes ''' ---------------------------------------------------------------------- Ran 1 test in 3.825s OK ''' --- bin/intake_multiprocess.py | 4 +++- emission/pipeline/intake_stage.py | 13 ++++++------ emission/pipeline/scheduler.py | 4 ++-- .../intakeTests/TestPipelineCornerCases.py | 21 ++++++++++++++++--- 4 files changed, 30 insertions(+), 12 deletions(-) diff --git a/bin/intake_multiprocess.py b/bin/intake_multiprocess.py index 033c047d1..6f22b9221 100644 --- a/bin/intake_multiprocess.py +++ b/bin/intake_multiprocess.py @@ -20,6 +20,8 @@ intake_log_config = json.load(open("conf/log/intake.conf.sample", "r")) parser = argparse.ArgumentParser() + parser.add_argument("--skip_if_no_new_data", action="store_true" + help="skip the pipeline if there is no new data") parser.add_argument("n_workers", type=int, help="the number of worker processors to use") args = parser.parse_args() @@ -32,4 +34,4 @@ split_lists = eps.get_split_uuid_lists(args.n_workers) logging.info("Finished generating split lists %s" % split_lists) - eps.dispatch(split_lists) + eps.dispatch(split_lists, skip_if_no_new_data) diff --git a/emission/pipeline/intake_stage.py b/emission/pipeline/intake_stage.py index ec6643ff7..6a2a1a1cb 100644 --- a/emission/pipeline/intake_stage.py +++ b/emission/pipeline/intake_stage.py @@ -38,7 +38,7 @@ import emission.storage.decorations.stats_queries as esds -def run_intake_pipeline(process_number, uuid_list): +def run_intake_pipeline(process_number, uuid_list, skip_if_no_new_data=False): """ Run the intake pipeline with the specified process number and uuid list. Note that the process_number is only really used to customize the log file name @@ -75,13 +75,13 @@ def run_intake_pipeline(process_number, uuid_list): continue try: - run_intake_pipeline_for_user(uuid) + run_intake_pipeline_for_user(uuid, skip_if_no_new_data) except Exception as e: esds.store_pipeline_error(uuid, "WHOLE_PIPELINE", time.time(), None) logging.exception("Found error %s while processing pipeline " "for user %s, skipping" % (e, uuid)) -def run_intake_pipeline_for_user(uuid): +def run_intake_pipeline_for_user(uuid, skip_if_no_new_data): uh = euah.UserCacheHandler.getUserCacheHandler(uuid) with ect.Timer() as uct: @@ -92,11 +92,12 @@ def run_intake_pipeline_for_user(uuid): esds.store_pipeline_time(uuid, ecwp.PipelineStages.USERCACHE.name, time.time(), uct.elapsed) - if new_entry_count == 0: - print("No new entries, skipping the rest of the pipeline") + if skip_if_no_new_data and new_entry_count == 0: + print("No new entries, and skip_if_no_new_data = %s, skipping the rest of the pipeline" % skip_if_no_new_data) return else: - print("New entry count == %s, continuing" % new_entry_count) + print("New entry count == %s and skip_if_no_new_data = %s, continuing" % + (new_entry_count, skip_if_no_new_data)) with ect.Timer() as uit: logging.info("*" * 10 + "UUID %s: updating incoming user inputs" % uuid + "*" * 10) diff --git a/emission/pipeline/scheduler.py b/emission/pipeline/scheduler.py index 9e810ea1b..c25c1ebbb 100644 --- a/emission/pipeline/scheduler.py +++ b/emission/pipeline/scheduler.py @@ -60,13 +60,13 @@ def get_split_uuid_lists(n_splits): logging.debug("Split values are %s" % ret_splits) return ret_splits -def dispatch(split_lists): +def dispatch(split_lists, skip_if_no_new_data): ctx = mp.get_context('spawn') process_list = [] for i, uuid_list in enumerate(split_lists): logging.debug("Dispatching list %s" % uuid_list) pid = i - p = ctx.Process(target=epi.run_intake_pipeline, args=(pid, uuid_list)) + p = ctx.Process(target=epi.run_intake_pipeline, args=(pid, uuid_list, skip_if_no_new_data)) logging.info("Created process %s to process %s list of size %s" % (p, i, len(uuid_list))) p.start() diff --git a/emission/tests/analysisTests/intakeTests/TestPipelineCornerCases.py b/emission/tests/analysisTests/intakeTests/TestPipelineCornerCases.py index 05911986a..32b0d3eae 100644 --- a/emission/tests/analysisTests/intakeTests/TestPipelineCornerCases.py +++ b/emission/tests/analysisTests/intakeTests/TestPipelineCornerCases.py @@ -83,14 +83,14 @@ def testSkipPipelineNoNewEntries(self): ewps.PipelineStages.OUTPUT_GEN] test_run_states = list([pse.value for pse in filter(lambda pse: pse not in stages_skipped_in_testing, - ewps.PipelineStages.__iter__())]) + ewps.PipelineStages.__iter__())]) + [ewps.PipelineStages.OUTPUT_GEN.value] curr_user_states = list(filter(lambda ps: ps["user_id"] == self.testUUID, all_pipeline_states)) self.assertEqual(len(curr_user_states), 0) # next, we run the real pipeline, and end up with no entries # and we have an early return in that case print("-" * 10, "Running real pipeline on empty DB, expecting no change", "-" * 10) - epi.run_intake_pipeline_for_user(self.testUUID) + epi.run_intake_pipeline_for_user(self.testUUID, True) all_pipeline_states = edb.get_pipeline_state_db().find() curr_user_states = list(filter(lambda ps: ps["user_id"] == self.testUUID, all_pipeline_states)) @@ -99,7 +99,7 @@ def testSkipPipelineNoNewEntries(self): # which will generate some pipeline states print("-" * 10, "Running test pipeline on real data, expecting states to be set", "-" * 10) etc.setupRealExample(self, "emission/tests/data/real_examples/shankari_2016-07-25") - etc.runIntakePipeline(self.testUUID) + epi.run_intake_pipeline_for_user(self.testUUID, False) all_pipeline_states_after_run = edb.get_pipeline_state_db().find() curr_user_states_after_run = list(filter(lambda ps: ps["user_id"] == self.testUUID, @@ -112,6 +112,7 @@ def testSkipPipelineNoNewEntries(self): # then we run the real pipeline again # We expect to see no changes between the first and the second run # because of the usercache skip + epi.run_intake_pipeline_for_user(self.testUUID, True) all_pipeline_states_after_test_run = edb.get_pipeline_state_db().find() curr_user_states_after_test_run = list(filter(lambda ps: ps["user_id"] == self.testUUID, all_pipeline_states_after_test_run)) @@ -122,6 +123,20 @@ def testSkipPipelineNoNewEntries(self): self.assertEqual(list(map(get_last_run, curr_user_states_after_run)), list(map(get_last_run, curr_user_states_after_test_run))) + # then we run the real pipeline again with skip=False + # We expect to see no changes between the first and the second run + # because of the usercache skip + epi.run_intake_pipeline_for_user(self.testUUID, False) + all_pipeline_states_after_test_run = edb.get_pipeline_state_db().find() + curr_user_states_after_test_run = list(filter(lambda ps: ps["user_id"] == self.testUUID, + all_pipeline_states_after_test_run)) + get_last_processed = lambda ps: ps['last_processed_ts'] + get_last_run = lambda ps: ps['last_ts_run'] + self.assertEqual(list(map(get_last_processed, curr_user_states_after_run)), + list(map(get_last_processed, curr_user_states_after_test_run))) + self.assertNotEqual(list(map(get_last_run, curr_user_states_after_run)), + list(map(get_last_run, curr_user_states_after_test_run))) + if __name__ == '__main__': etc.configLogging() unittest.main() From 3cfda257b35ea617773765f7c18318fbca1d968b Mon Sep 17 00:00:00 2001 From: Shankari Date: Sun, 13 Aug 2023 22:35:08 -0700 Subject: [PATCH 03/11] :zap: Stop running the trip model for people who have stopped contributing trips We currently build the trip model for every single user, regardless of whether they are actively contributing data or not. But if they have stopped contributing data, their model is the same, we don't need to rebuild it. So we skip the model building if their last trip's end is too old This is a partial fix for: https://github.com/e-mission/e-mission-docs/issues/942#issuecomment-1674136328 Testing done: - Commented out the actual model - Loaded the Denver CASR data - Spot checked both decisions ``` 2023-08-13 19:23:11,719:INFO:building model for user U1 2023-08-13 19:23:11,722:DEBUG:last trip was at 2022-10-22T15:51:49.993526+00:00, three weeks ago was 2023-07-24T02:23:11.722757+00:00, gap = 9 months ago, skipping model building... 2023-08-13 19:23:11,722:INFO:building model for user U2 2023-08-13 19:23:11,726:DEBUG:last trip was at 2023-08-01T01:46:10.019082+00:00, three weeks ago was 2023-07-24T02:23:11.726590+00:00, gap is in a week, building model... ``` --- bin/build_label_model.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/bin/build_label_model.py b/bin/build_label_model.py index 3afd62901..d98ac6a88 100644 --- a/bin/build_label_model.py +++ b/bin/build_label_model.py @@ -5,6 +5,8 @@ import argparse import uuid +import arrow +import pymongo import emission.pipeline.reset as epr import emission.core.get_database as edb @@ -66,6 +68,20 @@ def _email_2_user_list(email_list): logging.info("building model for user %s" % user_id) # these can come from the application config as default values + ts = esta.TimeSeries.get_time_series(user_id) + last_confirmed_trip_end = arrow.get( + ts.get_first_value_for_field("analysis/confirmed_trip", + "data.end_ts", pymongo.DESCENDING)) + + three_weeks_ago = arrow.utcnow().shift(weeks=-3) + if last_confirmed_trip_end.timestamp() < three_weeks_ago.timestamp(): + logging.debug("last trip was at %s, three weeks ago was %s, gap = %s, skipping model building..." % + (last_confirmed_trip_end, three_weeks_ago, last_confirmed_trip_end.humanize(three_weeks_ago))) + continue + else: + logging.debug("last trip was at %s, three weeks ago was %s, gap is %s, building model..." % + (last_confirmed_trip_end, three_weeks_ago, last_confirmed_trip_end.humanize(three_weeks_ago))) + model_type = eamtc.get_model_type() model_storage = eamtc.get_model_storage() min_trips = eamtc.get_minimum_trips() From ec968ccf2e90b2d95f6a2c9bf5e5581f89b1a492 Mon Sep 17 00:00:00 2001 From: Shankari Date: Mon, 14 Aug 2023 09:28:36 -0700 Subject: [PATCH 04/11] :wrench: Wrap the `ignore_if_older_than` functionality into a command line flag Add a new optional argument for `ignore_older_than_weeks` Wrap the skip functionality so that it is called only if `ignore_older_than_weeks` is specified. Testing done: - similar to 3cfda257b35ea617773765f7c18318fbca1d968b, turned off the actual model building - `./e-mission-py.bash bin/build_label_model.py -a`, all models were run - `./e-mission-py.bash bin/build_label_model.py -a -i 3`, some models were skipped ``` 2023-08-14 09:27:04,394:INFO:building model for user ... 2023-08-14 09:27:04,402:DEBUG:last trip was at 2022-10-23T17:18:28.999999+00:00, three weeks ago was 2023-07-24T16:27:04.402425+00:00, gap = 9 months ago, skipping model building... 2023-08-14 09:27:04,402:INFO:building model for user ... 2023-08-14 09:27:04,425:DEBUG:last trip was at 2023-08-04T00:50:24.999804+00:00, three weeks ago was 2023-07-24T16:27:04.425763+00:00, gap is in a week, building model... ``` --- bin/build_label_model.py | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/bin/build_label_model.py b/bin/build_label_model.py index d98ac6a88..46a7f2adf 100644 --- a/bin/build_label_model.py +++ b/bin/build_label_model.py @@ -49,6 +49,9 @@ def _email_2_user_list(email_list): level=logging.DEBUG) parser = argparse.ArgumentParser() + parser.add_argument("-i", "--ignore_older_than_weeks", type=int, + help="skip model build if last trip is older than this") + group = parser.add_mutually_exclusive_group(required=True) group.add_argument("-a", "--all", action="store_true", default=False, help="build the model for all users") @@ -68,19 +71,20 @@ def _email_2_user_list(email_list): logging.info("building model for user %s" % user_id) # these can come from the application config as default values - ts = esta.TimeSeries.get_time_series(user_id) - last_confirmed_trip_end = arrow.get( - ts.get_first_value_for_field("analysis/confirmed_trip", - "data.end_ts", pymongo.DESCENDING)) + if args.ignore_older_than_weeks: + ts = esta.TimeSeries.get_time_series(user_id) + last_confirmed_trip_end = arrow.get( + ts.get_first_value_for_field("analysis/confirmed_trip", + "data.end_ts", pymongo.DESCENDING)) - three_weeks_ago = arrow.utcnow().shift(weeks=-3) - if last_confirmed_trip_end.timestamp() < three_weeks_ago.timestamp(): - logging.debug("last trip was at %s, three weeks ago was %s, gap = %s, skipping model building..." % - (last_confirmed_trip_end, three_weeks_ago, last_confirmed_trip_end.humanize(three_weeks_ago))) - continue - else: - logging.debug("last trip was at %s, three weeks ago was %s, gap is %s, building model..." % - (last_confirmed_trip_end, three_weeks_ago, last_confirmed_trip_end.humanize(three_weeks_ago))) + week_limit = arrow.utcnow().shift(weeks=-args.ignore_older_than_weeks) + if last_confirmed_trip_end.timestamp() < week_limit.timestamp(): + logging.debug("last trip was at %s, three weeks ago was %s, gap = %s, skipping model building..." % + (last_confirmed_trip_end, week_limit, last_confirmed_trip_end.humanize(week_limit))) + continue + else: + logging.debug("last trip was at %s, three weeks ago was %s, gap is %s, building model..." % + (last_confirmed_trip_end, week_limit, last_confirmed_trip_end.humanize(week_limit))) model_type = eamtc.get_model_type() model_storage = eamtc.get_model_storage() From 343fddd6d541b701648e135ed11694b46ebf8517 Mon Sep 17 00:00:00 2001 From: Shankari Date: Mon, 14 Aug 2023 13:55:33 -0700 Subject: [PATCH 05/11] :zap: Match confirmed and composite trips using an indexed value When we match the user inputs to existing confirmed trips, we "fill in" the inputs and update the confirmed trip. Hwoever, now that we retrieve the composite trips, we need to propagate those changes downstream as well. We used to find the matching composite trip for a confirmed trip by searching for the entry whose `data.confirmed_trip` field was the id of the confirmed trip. However, the `data.confirmed_trip` field is not indexed in the database. So on larger data collection efforts, such as Denver CASR, composite trip to confirmed trip mapping was taking 10 **minutes**. Since the confirmed and composite trips will always have the same start and end timestamps we can find the entry that has the same timestamp instead to find the match. This drops the access time from 10 minutes to ~ 2 minutes https://github.com/e-mission/e-mission-docs/issues/942#issuecomment-1677998639 Once we resolve the slow indexing on DocumentDB, this has the potential to drop even further. Side changes (also documented in https://github.com/e-mission/e-mission-docs/issues/942#issuecomment-1678001429) - we don't override the start and end timestamps and locations anymore (since we don't anticipate filling them in anyway) - changed the unit tests, which were changing the `start_ts` to change other fields instead since we now lookup by `start_ts` Testing done: `TestCompositeTripCreation.py` passes with the changes --- emission/analysis/userinput/matcher.py | 12 ++++++------ .../plottingTests/TestCompositeTripCreation.py | 18 +++++++++--------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/emission/analysis/userinput/matcher.py b/emission/analysis/userinput/matcher.py index 5d60e7975..1af0c6de2 100644 --- a/emission/analysis/userinput/matcher.py +++ b/emission/analysis/userinput/matcher.py @@ -50,7 +50,7 @@ def match_incoming_inputs(user_id, timerange): handle_multi_non_deleted_match(confirmed_obj, ui) else: assert False, "Found weird key {ui.metadata.key} that was not in the search list" - update_confirmed_and_composite(confirmed_obj) + update_confirmed_and_composite(ts, confirmed_obj) else: logging.warn("No match found for single user input %s, moving forward anyway" % ui) lastInputProcessed = ui @@ -98,7 +98,7 @@ def create_confirmed_objects(user_id): if last_confirmed_place is not None: logging.debug("last confirmed_place %s was already in database, updating with linked trip info... and %s additions" % (last_confirmed_place["_id"], len(last_confirmed_place["data"]["additions"]))) - update_confirmed_and_composite(last_confirmed_place) + update_confirmed_and_composite(ts, last_confirmed_place) if confirmed_tl is not None and not confirmed_tl.is_empty(): tl_list = list(confirmed_tl) @@ -271,7 +271,7 @@ def link_trip_end(confirmed_trip, confirmed_end_place): confirmed_trip["data"]["end_place"] = confirmed_end_place.get_id() confirmed_end_place["data"]["ending_trip"] = confirmed_trip.get_id() -def update_confirmed_and_composite(confirmed_obj): +def update_confirmed_and_composite(ts, confirmed_obj): import emission.storage.timeseries.builtin_timeseries as estbt import emission.core.get_database as edb estbt.BuiltinTimeSeries.update(confirmed_obj) @@ -281,10 +281,10 @@ def update_confirmed_and_composite(confirmed_obj): # if we don't find a matching composite trip, we don't need to do anything # since it has not been created yet and will be created with updated values when we get to that stage if confirmed_obj["metadata"]["key"] in [esda.CONFIRMED_TRIP_KEY, esda.CONFIRMED_UNTRACKED_KEY]: - composite_trip = edb.get_analysis_timeseries_db().find_one({"data.confirmed_trip": confirmed_obj.get_id()}) + composite_trip = ts.get_entry_at_ts("analysis/composite_trip", "data.start_ts", confirmed_obj.data.start_ts) if composite_trip is not None: # copy over all the fields other than the end_confimed_place - EXCLUDED_FIELDS = ["end_confirmed_place"] + EXCLUDED_FIELDS = ["end_confirmed_place", "start_ts", "end_ts", "start_loc", "end_loc"] for k in confirmed_obj["data"].keys(): if k not in EXCLUDED_FIELDS: composite_trip["data"][k] = confirmed_obj["data"][k] @@ -293,7 +293,7 @@ def update_confirmed_and_composite(confirmed_obj): logging.debug("No composite trip matching confirmed trip %s, nothing to update" % confirmed_obj["_id"]) if confirmed_obj["metadata"]["key"] == esda.CONFIRMED_PLACE_KEY: - composite_trip = edb.get_analysis_timeseries_db().find_one({"data.end_confirmed_place._id": confirmed_obj["_id"]}) + composite_trip = ts.get_entry_at_ts("analysis/composite_trip", "data.end_ts", confirmed_obj['data']['enter_ts']) if composite_trip is not None: composite_trip["data"]["end_confirmed_place"] = confirmed_obj estbt.BuiltinTimeSeries.update(ecwe.Entry(composite_trip)) diff --git a/emission/tests/analysisTests/plottingTests/TestCompositeTripCreation.py b/emission/tests/analysisTests/plottingTests/TestCompositeTripCreation.py index 25b595584..8892d9fdf 100644 --- a/emission/tests/analysisTests/plottingTests/TestCompositeTripCreation.py +++ b/emission/tests/analysisTests/plottingTests/TestCompositeTripCreation.py @@ -79,15 +79,15 @@ def _testUpdateConfirmedTripProperties(self): # set a couple of confirmed trip properties tripSacrifice = confirmedTrips[0] - tripSacrifice["data"]["start_ts"] = 1000 - tripSacrifice["data"]["start_fmt_time"] = "I want to go back in time" - eaum.update_confirmed_and_composite(ecwe.Entry(tripSacrifice)) + tripSacrifice["data"]["some_field"] = 1000 + tripSacrifice["data"]["user_input"] = "I want to go back in time" + eaum.update_confirmed_and_composite(self.testTs, ecwe.Entry(tripSacrifice)) compositeTrips = list(self.testTs.find_entries(["analysis/composite_trip"])) tripExpected = compositeTrips[0] - self.assertEqual(tripExpected["data"]["start_ts"], 1000) - self.assertEqual(tripExpected["data"]["start_fmt_time"], "I want to go back in time") + self.assertEqual(tripExpected["data"]["some_field"], 1000) + self.assertEqual(tripExpected["data"]["user_input"], "I want to go back in time") print("testUpdateConfirmedTripProperties DONE") @@ -100,7 +100,7 @@ def _testSetConfirmedTripAdditions(self): tripSacrifice = confirmedTrips[1] ADDITIONS = ["mimi", "fifi", "gigi", "bibi"] tripSacrifice["data"]["additions"] = ADDITIONS - eaum.update_confirmed_and_composite(ecwe.Entry(tripSacrifice)) + eaum.update_confirmed_and_composite(self.testTs, ecwe.Entry(tripSacrifice)) compositeTrips = list(self.testTs.find_entries(["analysis/composite_trip"])) tripExpected = compositeTrips[1] @@ -117,7 +117,7 @@ def _testSetConfirmedTripUserInput(self): tripSacrifice = confirmedTrips[2] USERINPUT = {"mimi": 1, "fifi": 100, "gigi": 200, "bibi": 300} tripSacrifice["data"]["user_input"] = USERINPUT - eaum.update_confirmed_and_composite(ecwe.Entry(tripSacrifice)) + eaum.update_confirmed_and_composite(self.testTs, ecwe.Entry(tripSacrifice)) compositeTrips = list(self.testTs.find_entries(["analysis/composite_trip"])) tripExpected = compositeTrips[2] @@ -135,7 +135,7 @@ def _testUpdateConfirmedPlaceProperties(self): placeSacrifice = confirmedPlaces[2] placeSacrifice["data"]["exit_ts"] = 1000 placeSacrifice["data"]["exit_fmt_time"] = "I want to go back in time" - eaum.update_confirmed_and_composite(ecwe.Entry(placeSacrifice)) + eaum.update_confirmed_and_composite(self.testTs, ecwe.Entry(placeSacrifice)) compositeTrips = list(self.testTs.find_entries(["analysis/composite_trip"])) tripExpected = compositeTrips[1] @@ -154,7 +154,7 @@ def _testSetConfirmedPlaceAdditions(self): placeSacrifice = confirmedPlaces[5] ADDITIONS = ["mimi", "fifi", "gigi", "bibi"] placeSacrifice["data"]["additions"] = ADDITIONS - eaum.update_confirmed_and_composite(ecwe.Entry(placeSacrifice)) + eaum.update_confirmed_and_composite(self.testTs, ecwe.Entry(placeSacrifice)) compositeTrips= list(self.testTs.find_entries(["analysis/composite_trip"])) tripExpected = compositeTrips[4] From 0403ae0ecc8985911451f16fac085fcf0d6328de Mon Sep 17 00:00:00 2001 From: Shankari Date: Mon, 14 Aug 2023 20:07:27 -0700 Subject: [PATCH 06/11] :zap: Precompute the pipeline range ts to avoid determining the range on the fly Before this, we used to compute the pipeline range while retrieving it to display the label screen. However, as we can see from https://github.com/e-mission/e-mission-docs/issues/942#issuecomment-1677985567 the call to get the oldest trip can take up to 10 seconds (!!) on certain deployments. However, retrieving data from the pipeline or profile databases is very fast So we compute the range at the end of the pipeline run and store it in the user profile. We can then retrieve the data directly from the user profile when requested. https://github.com/e-mission/e-mission-docs/issues/942#issuecomment-1678027101 Additional changes: - remove the fallback to the cleaned and confirmed trips since we can't display them in the UI anyway, and the fallback can lead to a _hole_ in the displayed trips https://github.com/e-mission/e-mission-docs/issues/942#issuecomment-1678111381 - remove the check against the complete_ts since we have not encountered it in also removed the check against the complete_ts since that was also a backwards compat check, and we haven't run into it even once over 1.5 years of `open_access` and `nrel_commute` https://github.com/e-mission/e-mission-docs/issues/942#issuecomment-1678406843 Testing done: - After making the changes, tried to load trips from a test user with real data loaded - range was None, None, no trips were returned ``` 2023-08-14 14:59:04,432:DEBUG:123145673592832:START POST /pipeline/get_range_ts 2023-08-14 14:59:04,433:DEBUG:123145673592832:methodName = skip, returning 2023-08-14 14:59:04,434:DEBUG:123145673592832:Using the skip method to verify id token nrelop_dev-emulator-pro gram_default_testuser of length 44 2023-08-14 14:59:04,437:DEBUG:123145673592832:retUUID = .... 2023-08-14 14:59:04,440:DEBUG:123145673592832:Returning range (None, None) 2023-08-14 14:59:04,440:DEBUG:123145673592832:END POST /pipeline/get_range_ts .... 0.008764982223510742 ``` - Reset and re-ran the pipeline - range was set and trips were returned and displayed correctly ``` 2023-08-14 15:19:22,953:DEBUG:123145807908864:START POST /pipeline/get_range_ts 2023-08-14 15:19:22,954:DEBUG:123145807908864:methodName = skip, returning 2023-08-14 15:19:22,955:DEBUG:123145807908864:Using the skip method to verify id token nrelop_dev-emulator-pro gram_default_testuser of length 44 2023-08-14 15:19:22,962:DEBUG:123145807908864:retUUID = ... 2023-08-14 15:19:22,967:DEBUG:123145807908864:Returning range (1688845113.167196, 1691156058.2945251) 2023-08-14 15:19:22,968:DEBUG:123145807908864:END POST /pipeline/get_range_ts ... 0.016937971115112305 ``` --- emission/net/api/pipeline.py | 27 ++++----------------------- emission/pipeline/intake_stage.py | 16 ++++++++++++++++ 2 files changed, 20 insertions(+), 23 deletions(-) diff --git a/emission/net/api/pipeline.py b/emission/net/api/pipeline.py index e942e0431..cd75a9c8c 100644 --- a/emission/net/api/pipeline.py +++ b/emission/net/api/pipeline.py @@ -10,6 +10,7 @@ import emission.storage.pipeline_queries as esp import emission.storage.timeseries.abstract_timeseries as esta +import emission.core.wrapper.user as ecwu def get_complete_ts(user_id): complete_ts = esp.get_complete_ts(user_id) @@ -17,28 +18,8 @@ def get_complete_ts(user_id): return complete_ts def get_range(user_id): - ts = esta.TimeSeries.get_time_series(user_id) - start_ts = ts.get_first_value_for_field("analysis/composite_trip", "data.start_ts", pymongo.ASCENDING) - if start_ts == -1: - start_ts = ts.get_first_value_for_field("analysis/confirmed_trip", "data.start_ts", pymongo.ASCENDING) - if start_ts == -1: - start_ts = ts.get_first_value_for_field("analysis/cleaned_trip", "data.start_ts", pymongo.ASCENDING) - if start_ts == -1: - start_ts = None - - end_ts = ts.get_first_value_for_field("analysis/composite_trip", "data.end_ts", pymongo.DESCENDING) - if start_ts == -1: - end_ts = ts.get_first_value_for_field("analysis/confirmed_trip", "data.end_ts", pymongo.DESCENDING) - if end_ts == -1: - end_ts = ts.get_first_value_for_field("analysis/cleaned_trip", "data.end_ts", pymongo.DESCENDING) - if end_ts == -1: - end_ts = None - - complete_ts = get_complete_ts(user_id) - if complete_ts is not None and end_ts is not None\ - and (end_ts != (complete_ts - esp.END_FUZZ_AVOID_LTE)): - logging.exception("end_ts %s != complete_ts no fuzz %s" % - (end_ts, (complete_ts - esp.END_FUZZ_AVOID_LTE))) - + user_profile = ecwu.User(user_id).getProfile() + start_ts = user_profile.get("pipeline_range", {}).get("start_ts", None) + end_ts = user_profile.get("pipeline_range", {}).get("end_ts", None) logging.debug("Returning range (%s, %s)" % (start_ts, end_ts)) return (start_ts, end_ts) diff --git a/emission/pipeline/intake_stage.py b/emission/pipeline/intake_stage.py index 6a2a1a1cb..7a2cfd9a7 100644 --- a/emission/pipeline/intake_stage.py +++ b/emission/pipeline/intake_stage.py @@ -12,6 +12,7 @@ import arrow from uuid import UUID import time +import pymongo import emission.core.get_database as edb import emission.core.timer as ect @@ -37,6 +38,7 @@ import emission.storage.decorations.stats_queries as esds +import emission.core.wrapper.user as ecwu def run_intake_pipeline(process_number, uuid_list, skip_if_no_new_data=False): """ @@ -207,3 +209,17 @@ def run_intake_pipeline_for_user(uuid, skip_if_no_new_data): esds.store_pipeline_time(uuid, ecwp.PipelineStages.OUTPUT_GEN.name, time.time(), ogt.elapsed) + + _get_and_store_range(uuid, "analysis/composite_trip") + +def _get_and_store_range(user_id, trip_key): + ts = esta.TimeSeries.get_time_series(user_id) + start_ts = ts.get_first_value_for_field(trip_key, "data.start_ts", pymongo.ASCENDING) + if start_ts == -1: + start_ts = None + end_ts = ts.get_first_value_for_field(trip_key, "data.end_ts", pymongo.DESCENDING) + if end_ts == -1: + end_ts = None + + user = ecwu.User(user_id) + user.update({"pipeline_range": {"start_ts": start_ts, "end_ts": end_ts}}) From 73414b9919d21fb9ff9db110ed981f7d36aa57b6 Mon Sep 17 00:00:00 2001 From: Shankari Date: Mon, 14 Aug 2023 21:41:37 -0700 Subject: [PATCH 07/11] :loud_sound: Add additional logs to determine how long the section summaries take Since they also make calls using fields with no index (`data.trip_id`), and may have performance issues. https://github.com/e-mission/e-mission-docs/issues/942#issuecomment-1678410111 These will help us determine whether they do actually have performance issues and how much we can improve by changing the implementation Testing done: - Ran the pipeline with the change - Saw logs ``` 2023-08-14 21:38:08,150:DEBUG:140704350807680:get_section_summary(64db00a82032f67e94165626, analysis/cleaned_s ection) called 2023-08-14 21:38:08,488:DEBUG:140704350807680:get_section_summary(64db00a82032f67e94165626, analysis/cleaned_s 2023-08-14 21:38:09,679:DEBUG:140704350807680:get_section_summary(64db00aa2032f67e94165644, analysis/cleaned_section) called 2023-08-14 21:38:09,983:DEBUG:140704350807680:get_section_summary(64db00aa2032f67e94165644, analysis/cleaned_section) returning {'distance': {'BICYCLING': 5789.554643480443}, 'duration': {'BICYCLING': 1226.4553627967834}, 'count': {'BICYCLING': 1}} ... ``` --- emission/analysis/userinput/matcher.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/emission/analysis/userinput/matcher.py b/emission/analysis/userinput/matcher.py index 1af0c6de2..ee4365261 100644 --- a/emission/analysis/userinput/matcher.py +++ b/emission/analysis/userinput/matcher.py @@ -200,8 +200,7 @@ def get_section_summary(ts, cleaned_trip, section_key): cleaned_trip: the cleaned trip object associated with the sections section_key: 'inferred_section' or 'cleaned_section' """ - import emission.core.get_database as edb - + logging.debug(f"get_section_summary({cleaned_trip['_id']}, {section_key}) called") sections = esdt.get_sections_for_trip(key = section_key, user_id = cleaned_trip["user_id"], trip_id = cleaned_trip["_id"]) if len(sections) == 0: @@ -214,11 +213,13 @@ def get_section_summary(ts, cleaned_trip, section_key): if section_key == "analysis/cleaned_section" else inferred_section_mapper sections_df["sensed_mode_str"] = sections_df["sensed_mode"].apply(sel_section_mapper) grouped_section_df = sections_df.groupby("sensed_mode_str") - return { + retVal = { "distance": grouped_section_df.distance.sum().to_dict(), "duration": grouped_section_df.duration.sum().to_dict(), "count": grouped_section_df.trip_id.count().to_dict() } + logging.debug(f"get_section_summary({cleaned_trip['_id']}, {section_key}) returning {retVal}") + return retVal def create_confirmed_entry(ts, tce, confirmed_key, input_key_list): # Copy the entry and fill in the new values From 29fd72d3b85cb0f05e347a6ccaaa17f8f2af1d34 Mon Sep 17 00:00:00 2001 From: Shankari Date: Mon, 14 Aug 2023 22:29:51 -0700 Subject: [PATCH 08/11] :bug: Fix formatting + pass in the argument correctly Fix errors in https://github.com/e-mission/e-mission-server/pull/931 to fix https://github.com/e-mission/e-mission-docs/issues/942 Testing done: - Ran ``` ./e-mission-py.bash bin/intake_multiprocess.py 1 --skip_if_no_new_data ``` - Several users were skipped ``` 2023-08-14T22:23:23.331295-07:00**********UUID : moving to long term********** No new entries, and skip_if_no_new_data = True, skipping the rest of the pipeline 2023-08-14T22:23:23.525582-07:00**********UUID : moving to long term********** No new entries, and skip_if_no_new_data = True, skipping the rest of the pipeline 2023-08-14T22:23:23.748683-07:00**********UUID : moving to long term********** No new entries, and skip_if_no_new_data = True, skipping the rest of the pipeline ``` - And several other users were not ``` 2023-08-14T22:24:30.000684-07:00**********UUID : moving to long term********** New entry count == 207 and skip_if_no_new_data = True, continuing 2023-08-14T22:24:30.830030-07:00**********UUID : updating incoming user inputs********** 2023-08-14T22:24:31.705677-07:00**********UUID : filter accuracy if needed********** ``` --- bin/intake_multiprocess.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bin/intake_multiprocess.py b/bin/intake_multiprocess.py index 6f22b9221..f90326a6f 100644 --- a/bin/intake_multiprocess.py +++ b/bin/intake_multiprocess.py @@ -20,7 +20,7 @@ intake_log_config = json.load(open("conf/log/intake.conf.sample", "r")) parser = argparse.ArgumentParser() - parser.add_argument("--skip_if_no_new_data", action="store_true" + parser.add_argument("--skip_if_no_new_data", action="store_true", help="skip the pipeline if there is no new data") parser.add_argument("n_workers", type=int, help="the number of worker processors to use") @@ -34,4 +34,4 @@ split_lists = eps.get_split_uuid_lists(args.n_workers) logging.info("Finished generating split lists %s" % split_lists) - eps.dispatch(split_lists, skip_if_no_new_data) + eps.dispatch(split_lists, args.skip_if_no_new_data) From 97929278bb8e4ffd0c8bff040726671e2db11960 Mon Sep 17 00:00:00 2001 From: Shankari Date: Mon, 14 Aug 2023 23:16:34 -0700 Subject: [PATCH 09/11] :loud_sound: Add logging to display the profile after the pipeline range is saved We can remove this later if we don't need this level of verbosity --- emission/pipeline/intake_stage.py | 1 + 1 file changed, 1 insertion(+) diff --git a/emission/pipeline/intake_stage.py b/emission/pipeline/intake_stage.py index 7a2cfd9a7..6213e66e8 100644 --- a/emission/pipeline/intake_stage.py +++ b/emission/pipeline/intake_stage.py @@ -223,3 +223,4 @@ def _get_and_store_range(user_id, trip_key): user = ecwu.User(user_id) user.update({"pipeline_range": {"start_ts": start_ts, "end_ts": end_ts}}) + logging.debug("After updating, new profiles is %s" % user.getProfile()) From 0e4fe47cab91c3bf3c93036ce1407f66b39a1fbe Mon Sep 17 00:00:00 2001 From: Shankari Date: Mon, 14 Aug 2023 23:17:33 -0700 Subject: [PATCH 10/11] =?UTF-8?q?=E2=9C=85=20Fix=20the=20exception=20to=20?= =?UTF-8?q?expect=20from=20`add=5Fsection=5Fsummary`?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In 73414b9919d21fb9ff9db110ed981f7d36aa57b6, we added logging to track how long the section summaries take. The log prints out the trip id as the first line of the function ``` + logging.debug(f"get_section_summary({cleaned_trip['_id']}, {section_key}) called") ``` So the error when we pass in a fake trip is now that the `_id` doesn't exist, not that the `user_id` doesn't exist. Fixed by changing the expected exception Testing done: ``` ---------------------------------------------------------------------- Ran 1 test in 0.247s OK ``` --- .../analysisTests/userInputTests/TestConfirmedObjectFakeData.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/emission/tests/analysisTests/userInputTests/TestConfirmedObjectFakeData.py b/emission/tests/analysisTests/userInputTests/TestConfirmedObjectFakeData.py index b82dcb194..7ae4d4634 100644 --- a/emission/tests/analysisTests/userInputTests/TestConfirmedObjectFakeData.py +++ b/emission/tests/analysisTests/userInputTests/TestConfirmedObjectFakeData.py @@ -113,7 +113,7 @@ def testInvalidInput(self): with self.assertRaises(KeyError) as ke: fake_ct = ecwe.Entry({"metadata": {"key": "analysis/cleaned_trip"}, "data": {}}) eaum.get_section_summary(self.test_ts, fake_ct, "foobar") - self.assertEqual(str(ke.exception), "'user_id'") + self.assertEqual(str(ke.exception), "'_id'") fake_ct = ecwe.Entry({"metadata": {"key": "analysis/cleaned_trip"}, "data": {}}) fake_ct_id = self.test_ts.insert(fake_ct) From 2099f9b3b9da7e49f6224384a68355205a558dde Mon Sep 17 00:00:00 2001 From: Shankari Date: Mon, 14 Aug 2023 23:21:41 -0700 Subject: [PATCH 11/11] =?UTF-8?q?=E2=9C=85=20Register=20the=20users=20befo?= =?UTF-8?q?re=20running=20the=20pipeline=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In 0403ae0ecc8985911451f16fac085fcf0d6328de, we start precomputing the pipeline stage and storing it in this profile. This means that we need the user to have a profile to begin with. In the real world, users will always have profiles since we register them during `profile/create` In this test, we set the `testEmail` so that we register the users as part of the test as well. This implies that we also: - need to delete the profile_db() as part of cleanup - use the version of the pipeline in `epi` so that it will run the pre-population code Once we do that, the tests pass. However, when they were failing, we realized that we can't compare two tuples with `assertAlmostEqual` since the assertion can't subtract tuples to get the difference. Instead, we separate out the start and end and compare them independently with `assertAlmostEqual` Testing done: both tests pass ``` ---------------------------------------------------------------------- Ran 2 tests in 33.884s OK ``` --- emission/tests/netTests/TestPipeline.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/emission/tests/netTests/TestPipeline.py b/emission/tests/netTests/TestPipeline.py index f98964e60..54f81ba8e 100644 --- a/emission/tests/netTests/TestPipeline.py +++ b/emission/tests/netTests/TestPipeline.py @@ -6,14 +6,17 @@ import emission.core.get_database as edb import emission.core.wrapper.localdate as ecwl import emission.tests.common as etc +import emission.pipeline.intake_stage as epi from emission.net.api import pipeline class TestPipeline(unittest.TestCase): def setUp(self): + self.testEmail="foo@foo.com" etc.setupRealExample(self, "emission/tests/data/real_examples/shankari_2015-aug-21") self.testUUID1 = self.testUUID + self.testEmail="bar@bar.com" etc.setupRealExample(self, "emission/tests/data/real_examples/shankari_2015-aug-27") @@ -22,6 +25,7 @@ def tearDown(self): def clearRelatedDb(self): edb.get_timeseries_db().delete_many({"user_id": self.testUUID}) + edb.get_profile_db().delete_many({"user_id": self.testUUID}) edb.get_analysis_timeseries_db().delete_many({"user_id": self.testUUID}) edb.get_pipeline_state_db().delete_many({"user_id": self.testUUID}) edb.get_timeseries_db().delete_many({"user_id": self.testUUID1}) @@ -33,8 +37,10 @@ def testNoAnalysisResults(self): def testAnalysisResults(self): self.assertEqual(pipeline.get_range(self.testUUID), (None, None)) - etc.runIntakePipeline(self.testUUID) - self.assertAlmostEqual(pipeline.get_range(self.testUUID), (1440688739.672, 1440729142.709)) + epi.run_intake_pipeline_for_user(self.testUUID, skip_if_no_new_data = False) + pr = pipeline.get_range(self.testUUID) + self.assertAlmostEqual(pr[0], 1440688739.672) + self.assertAlmostEqual(pr[1], 1440729142.709) if __name__ == '__main__': import emission.tests.common as etc