diff --git a/bin/build_label_model.py b/bin/build_label_model.py index 3afd62901..46a7f2adf 100644 --- a/bin/build_label_model.py +++ b/bin/build_label_model.py @@ -5,6 +5,8 @@ import argparse import uuid +import arrow +import pymongo import emission.pipeline.reset as epr import emission.core.get_database as edb @@ -47,6 +49,9 @@ def _email_2_user_list(email_list): level=logging.DEBUG) parser = argparse.ArgumentParser() + parser.add_argument("-i", "--ignore_older_than_weeks", type=int, + help="skip model build if last trip is older than this") + group = parser.add_mutually_exclusive_group(required=True) group.add_argument("-a", "--all", action="store_true", default=False, help="build the model for all users") @@ -66,6 +71,21 @@ def _email_2_user_list(email_list): logging.info("building model for user %s" % user_id) # these can come from the application config as default values + if args.ignore_older_than_weeks: + ts = esta.TimeSeries.get_time_series(user_id) + last_confirmed_trip_end = arrow.get( + ts.get_first_value_for_field("analysis/confirmed_trip", + "data.end_ts", pymongo.DESCENDING)) + + week_limit = arrow.utcnow().shift(weeks=-args.ignore_older_than_weeks) + if last_confirmed_trip_end.timestamp() < week_limit.timestamp(): + logging.debug("last trip was at %s, three weeks ago was %s, gap = %s, skipping model building..." % + (last_confirmed_trip_end, week_limit, last_confirmed_trip_end.humanize(week_limit))) + continue + else: + logging.debug("last trip was at %s, three weeks ago was %s, gap is %s, building model..." % + (last_confirmed_trip_end, week_limit, last_confirmed_trip_end.humanize(week_limit))) + model_type = eamtc.get_model_type() model_storage = eamtc.get_model_storage() min_trips = eamtc.get_minimum_trips() diff --git a/bin/intake_multiprocess.py b/bin/intake_multiprocess.py index 033c047d1..f90326a6f 100644 --- a/bin/intake_multiprocess.py +++ b/bin/intake_multiprocess.py @@ -20,6 +20,8 @@ intake_log_config = json.load(open("conf/log/intake.conf.sample", "r")) parser = argparse.ArgumentParser() + parser.add_argument("--skip_if_no_new_data", action="store_true", + help="skip the pipeline if there is no new data") parser.add_argument("n_workers", type=int, help="the number of worker processors to use") args = parser.parse_args() @@ -32,4 +34,4 @@ split_lists = eps.get_split_uuid_lists(args.n_workers) logging.info("Finished generating split lists %s" % split_lists) - eps.dispatch(split_lists) + eps.dispatch(split_lists, args.skip_if_no_new_data) diff --git a/emission/analysis/userinput/matcher.py b/emission/analysis/userinput/matcher.py index 5d60e7975..ee4365261 100644 --- a/emission/analysis/userinput/matcher.py +++ b/emission/analysis/userinput/matcher.py @@ -50,7 +50,7 @@ def match_incoming_inputs(user_id, timerange): handle_multi_non_deleted_match(confirmed_obj, ui) else: assert False, "Found weird key {ui.metadata.key} that was not in the search list" - update_confirmed_and_composite(confirmed_obj) + update_confirmed_and_composite(ts, confirmed_obj) else: logging.warn("No match found for single user input %s, moving forward anyway" % ui) lastInputProcessed = ui @@ -98,7 +98,7 @@ def create_confirmed_objects(user_id): if last_confirmed_place is not None: logging.debug("last confirmed_place %s was already in database, updating with linked trip info... and %s additions" % (last_confirmed_place["_id"], len(last_confirmed_place["data"]["additions"]))) - update_confirmed_and_composite(last_confirmed_place) + update_confirmed_and_composite(ts, last_confirmed_place) if confirmed_tl is not None and not confirmed_tl.is_empty(): tl_list = list(confirmed_tl) @@ -200,8 +200,7 @@ def get_section_summary(ts, cleaned_trip, section_key): cleaned_trip: the cleaned trip object associated with the sections section_key: 'inferred_section' or 'cleaned_section' """ - import emission.core.get_database as edb - + logging.debug(f"get_section_summary({cleaned_trip['_id']}, {section_key}) called") sections = esdt.get_sections_for_trip(key = section_key, user_id = cleaned_trip["user_id"], trip_id = cleaned_trip["_id"]) if len(sections) == 0: @@ -214,11 +213,13 @@ def get_section_summary(ts, cleaned_trip, section_key): if section_key == "analysis/cleaned_section" else inferred_section_mapper sections_df["sensed_mode_str"] = sections_df["sensed_mode"].apply(sel_section_mapper) grouped_section_df = sections_df.groupby("sensed_mode_str") - return { + retVal = { "distance": grouped_section_df.distance.sum().to_dict(), "duration": grouped_section_df.duration.sum().to_dict(), "count": grouped_section_df.trip_id.count().to_dict() } + logging.debug(f"get_section_summary({cleaned_trip['_id']}, {section_key}) returning {retVal}") + return retVal def create_confirmed_entry(ts, tce, confirmed_key, input_key_list): # Copy the entry and fill in the new values @@ -271,7 +272,7 @@ def link_trip_end(confirmed_trip, confirmed_end_place): confirmed_trip["data"]["end_place"] = confirmed_end_place.get_id() confirmed_end_place["data"]["ending_trip"] = confirmed_trip.get_id() -def update_confirmed_and_composite(confirmed_obj): +def update_confirmed_and_composite(ts, confirmed_obj): import emission.storage.timeseries.builtin_timeseries as estbt import emission.core.get_database as edb estbt.BuiltinTimeSeries.update(confirmed_obj) @@ -281,10 +282,10 @@ def update_confirmed_and_composite(confirmed_obj): # if we don't find a matching composite trip, we don't need to do anything # since it has not been created yet and will be created with updated values when we get to that stage if confirmed_obj["metadata"]["key"] in [esda.CONFIRMED_TRIP_KEY, esda.CONFIRMED_UNTRACKED_KEY]: - composite_trip = edb.get_analysis_timeseries_db().find_one({"data.confirmed_trip": confirmed_obj.get_id()}) + composite_trip = ts.get_entry_at_ts("analysis/composite_trip", "data.start_ts", confirmed_obj.data.start_ts) if composite_trip is not None: # copy over all the fields other than the end_confimed_place - EXCLUDED_FIELDS = ["end_confirmed_place"] + EXCLUDED_FIELDS = ["end_confirmed_place", "start_ts", "end_ts", "start_loc", "end_loc"] for k in confirmed_obj["data"].keys(): if k not in EXCLUDED_FIELDS: composite_trip["data"][k] = confirmed_obj["data"][k] @@ -293,7 +294,7 @@ def update_confirmed_and_composite(confirmed_obj): logging.debug("No composite trip matching confirmed trip %s, nothing to update" % confirmed_obj["_id"]) if confirmed_obj["metadata"]["key"] == esda.CONFIRMED_PLACE_KEY: - composite_trip = edb.get_analysis_timeseries_db().find_one({"data.end_confirmed_place._id": confirmed_obj["_id"]}) + composite_trip = ts.get_entry_at_ts("analysis/composite_trip", "data.end_ts", confirmed_obj['data']['enter_ts']) if composite_trip is not None: composite_trip["data"]["end_confirmed_place"] = confirmed_obj estbt.BuiltinTimeSeries.update(ecwe.Entry(composite_trip)) diff --git a/emission/net/api/pipeline.py b/emission/net/api/pipeline.py index e942e0431..cd75a9c8c 100644 --- a/emission/net/api/pipeline.py +++ b/emission/net/api/pipeline.py @@ -10,6 +10,7 @@ import emission.storage.pipeline_queries as esp import emission.storage.timeseries.abstract_timeseries as esta +import emission.core.wrapper.user as ecwu def get_complete_ts(user_id): complete_ts = esp.get_complete_ts(user_id) @@ -17,28 +18,8 @@ def get_complete_ts(user_id): return complete_ts def get_range(user_id): - ts = esta.TimeSeries.get_time_series(user_id) - start_ts = ts.get_first_value_for_field("analysis/composite_trip", "data.start_ts", pymongo.ASCENDING) - if start_ts == -1: - start_ts = ts.get_first_value_for_field("analysis/confirmed_trip", "data.start_ts", pymongo.ASCENDING) - if start_ts == -1: - start_ts = ts.get_first_value_for_field("analysis/cleaned_trip", "data.start_ts", pymongo.ASCENDING) - if start_ts == -1: - start_ts = None - - end_ts = ts.get_first_value_for_field("analysis/composite_trip", "data.end_ts", pymongo.DESCENDING) - if start_ts == -1: - end_ts = ts.get_first_value_for_field("analysis/confirmed_trip", "data.end_ts", pymongo.DESCENDING) - if end_ts == -1: - end_ts = ts.get_first_value_for_field("analysis/cleaned_trip", "data.end_ts", pymongo.DESCENDING) - if end_ts == -1: - end_ts = None - - complete_ts = get_complete_ts(user_id) - if complete_ts is not None and end_ts is not None\ - and (end_ts != (complete_ts - esp.END_FUZZ_AVOID_LTE)): - logging.exception("end_ts %s != complete_ts no fuzz %s" % - (end_ts, (complete_ts - esp.END_FUZZ_AVOID_LTE))) - + user_profile = ecwu.User(user_id).getProfile() + start_ts = user_profile.get("pipeline_range", {}).get("start_ts", None) + end_ts = user_profile.get("pipeline_range", {}).get("end_ts", None) logging.debug("Returning range (%s, %s)" % (start_ts, end_ts)) return (start_ts, end_ts) diff --git a/emission/net/usercache/abstract_usercache_handler.py b/emission/net/usercache/abstract_usercache_handler.py index 58c559d14..9f4f835f5 100644 --- a/emission/net/usercache/abstract_usercache_handler.py +++ b/emission/net/usercache/abstract_usercache_handler.py @@ -28,6 +28,8 @@ def moveToLongTerm(self): """ Moves all messages that have arrived for the current user into long-term storage, after converting into a platform-independent format. + Returns the number of entries that were moved so that we can return + early if this is zero. """ pass diff --git a/emission/net/usercache/builtin_usercache_handler.py b/emission/net/usercache/builtin_usercache_handler.py index 61243a844..f61d5469c 100644 --- a/emission/net/usercache/builtin_usercache_handler.py +++ b/emission/net/usercache/builtin_usercache_handler.py @@ -58,7 +58,7 @@ def moveToLongTerm(self): # Since we didn't get the current time range, there is no current # state, so we don't need to mark it as done # esp.mark_usercache_done(None) - return + return 0 time_query = esp.get_time_range_for_usercache(self.user_id) @@ -99,6 +99,7 @@ def moveToLongTerm(self): logging.debug("Deleting all entries for query %s" % time_query) uc.clearProcessedMessages(time_query) esp.mark_usercache_done(self.user_id, last_ts_processed) + return len(unified_entry_list) def storeViewsToCache(self): """ diff --git a/emission/pipeline/intake_stage.py b/emission/pipeline/intake_stage.py index 2d1f89889..6213e66e8 100644 --- a/emission/pipeline/intake_stage.py +++ b/emission/pipeline/intake_stage.py @@ -12,6 +12,7 @@ import arrow from uuid import UUID import time +import pymongo import emission.core.get_database as edb import emission.core.timer as ect @@ -37,8 +38,9 @@ import emission.storage.decorations.stats_queries as esds +import emission.core.wrapper.user as ecwu -def run_intake_pipeline(process_number, uuid_list): +def run_intake_pipeline(process_number, uuid_list, skip_if_no_new_data=False): """ Run the intake pipeline with the specified process number and uuid list. Note that the process_number is only really used to customize the log file name @@ -75,23 +77,30 @@ def run_intake_pipeline(process_number, uuid_list): continue try: - run_intake_pipeline_for_user(uuid) + run_intake_pipeline_for_user(uuid, skip_if_no_new_data) except Exception as e: esds.store_pipeline_error(uuid, "WHOLE_PIPELINE", time.time(), None) logging.exception("Found error %s while processing pipeline " "for user %s, skipping" % (e, uuid)) -def run_intake_pipeline_for_user(uuid): +def run_intake_pipeline_for_user(uuid, skip_if_no_new_data): uh = euah.UserCacheHandler.getUserCacheHandler(uuid) with ect.Timer() as uct: logging.info("*" * 10 + "UUID %s: moving to long term" % uuid + "*" * 10) print(str(arrow.now()) + "*" * 10 + "UUID %s: moving to long term" % uuid + "*" * 10) - uh.moveToLongTerm() + new_entry_count = uh.moveToLongTerm() esds.store_pipeline_time(uuid, ecwp.PipelineStages.USERCACHE.name, time.time(), uct.elapsed) + if skip_if_no_new_data and new_entry_count == 0: + print("No new entries, and skip_if_no_new_data = %s, skipping the rest of the pipeline" % skip_if_no_new_data) + return + else: + print("New entry count == %s and skip_if_no_new_data = %s, continuing" % + (new_entry_count, skip_if_no_new_data)) + with ect.Timer() as uit: logging.info("*" * 10 + "UUID %s: updating incoming user inputs" % uuid + "*" * 10) print(str(arrow.now()) + "*" * 10 + "UUID %s: updating incoming user inputs" % uuid + "*" * 10) @@ -200,3 +209,18 @@ def run_intake_pipeline_for_user(uuid): esds.store_pipeline_time(uuid, ecwp.PipelineStages.OUTPUT_GEN.name, time.time(), ogt.elapsed) + + _get_and_store_range(uuid, "analysis/composite_trip") + +def _get_and_store_range(user_id, trip_key): + ts = esta.TimeSeries.get_time_series(user_id) + start_ts = ts.get_first_value_for_field(trip_key, "data.start_ts", pymongo.ASCENDING) + if start_ts == -1: + start_ts = None + end_ts = ts.get_first_value_for_field(trip_key, "data.end_ts", pymongo.DESCENDING) + if end_ts == -1: + end_ts = None + + user = ecwu.User(user_id) + user.update({"pipeline_range": {"start_ts": start_ts, "end_ts": end_ts}}) + logging.debug("After updating, new profiles is %s" % user.getProfile()) diff --git a/emission/pipeline/scheduler.py b/emission/pipeline/scheduler.py index 9e810ea1b..c25c1ebbb 100644 --- a/emission/pipeline/scheduler.py +++ b/emission/pipeline/scheduler.py @@ -60,13 +60,13 @@ def get_split_uuid_lists(n_splits): logging.debug("Split values are %s" % ret_splits) return ret_splits -def dispatch(split_lists): +def dispatch(split_lists, skip_if_no_new_data): ctx = mp.get_context('spawn') process_list = [] for i, uuid_list in enumerate(split_lists): logging.debug("Dispatching list %s" % uuid_list) pid = i - p = ctx.Process(target=epi.run_intake_pipeline, args=(pid, uuid_list)) + p = ctx.Process(target=epi.run_intake_pipeline, args=(pid, uuid_list, skip_if_no_new_data)) logging.info("Created process %s to process %s list of size %s" % (p, i, len(uuid_list))) p.start() diff --git a/emission/tests/analysisTests/intakeTests/TestPipelineCornerCases.py b/emission/tests/analysisTests/intakeTests/TestPipelineCornerCases.py index 803f1a446..32b0d3eae 100644 --- a/emission/tests/analysisTests/intakeTests/TestPipelineCornerCases.py +++ b/emission/tests/analysisTests/intakeTests/TestPipelineCornerCases.py @@ -13,11 +13,12 @@ import emission.core.common as ecc import emission.core.wrapper.pipelinestate as ewps import emission.storage.pipeline_queries as epq +import emission.pipeline.intake_stage as epi # Test imports import emission.tests.common as etc -class TestPipelineRealData(unittest.TestCase): +class TestPipelineCornerCases(unittest.TestCase): def setUp(self): self.testUUID = uuid.uuid4() logging.info("setUp complete") @@ -69,6 +70,72 @@ def testAllDefinedPipelineStates(self): self.assertIsNotNone(ps['user_id']) self.assertIn(ps['pipeline_stage'], valid_states) + def testSkipPipelineNoNewEntries(self): + # we start with no pipeline states for this user + all_pipeline_states = edb.get_pipeline_state_db().find() + stages_skipped_in_testing = [ + ewps.PipelineStages.USERCACHE, + ewps.PipelineStages.TRIP_MODEL, + ewps.PipelineStages.TOUR_MODEL, + ewps.PipelineStages.ALTERNATIVES, + ewps.PipelineStages.USER_MODEL, + ewps.PipelineStages.RECOMMENDATION, + ewps.PipelineStages.OUTPUT_GEN] + test_run_states = list([pse.value for pse in + filter(lambda pse: pse not in stages_skipped_in_testing, + ewps.PipelineStages.__iter__())]) + [ewps.PipelineStages.OUTPUT_GEN.value] + curr_user_states = list(filter(lambda ps: ps["user_id"] == self.testUUID, + all_pipeline_states)) + self.assertEqual(len(curr_user_states), 0) + # next, we run the real pipeline, and end up with no entries + # and we have an early return in that case + print("-" * 10, "Running real pipeline on empty DB, expecting no change", "-" * 10) + epi.run_intake_pipeline_for_user(self.testUUID, True) + all_pipeline_states = edb.get_pipeline_state_db().find() + curr_user_states = list(filter(lambda ps: ps["user_id"] == self.testUUID, + all_pipeline_states)) + self.assertEqual(len(curr_user_states), 0) + # Now we load some data and run the test pipeline + # which will generate some pipeline states + print("-" * 10, "Running test pipeline on real data, expecting states to be set", "-" * 10) + etc.setupRealExample(self, "emission/tests/data/real_examples/shankari_2016-07-25") + epi.run_intake_pipeline_for_user(self.testUUID, False) + + all_pipeline_states_after_run = edb.get_pipeline_state_db().find() + curr_user_states_after_run = list(filter(lambda ps: ps["user_id"] == self.testUUID, + all_pipeline_states_after_run)) + states_set = [ps['pipeline_stage'] for ps in curr_user_states_after_run] + print("-" * 10, "test run stages are ", test_run_states, "-" * 10) + print("-" * 10, "states that are set are ", states_set, "-" * 10) + self.assertGreater(len(curr_user_states_after_run), 0) + self.assertEqual(sorted(states_set), sorted(test_run_states)) + # then we run the real pipeline again + # We expect to see no changes between the first and the second run + # because of the usercache skip + epi.run_intake_pipeline_for_user(self.testUUID, True) + all_pipeline_states_after_test_run = edb.get_pipeline_state_db().find() + curr_user_states_after_test_run = list(filter(lambda ps: ps["user_id"] == self.testUUID, + all_pipeline_states_after_test_run)) + get_last_processed = lambda ps: ps['last_processed_ts'] + get_last_run = lambda ps: ps['last_ts_run'] + self.assertEqual(list(map(get_last_processed, curr_user_states_after_run)), + list(map(get_last_processed, curr_user_states_after_test_run))) + self.assertEqual(list(map(get_last_run, curr_user_states_after_run)), + list(map(get_last_run, curr_user_states_after_test_run))) + + # then we run the real pipeline again with skip=False + # We expect to see no changes between the first and the second run + # because of the usercache skip + epi.run_intake_pipeline_for_user(self.testUUID, False) + all_pipeline_states_after_test_run = edb.get_pipeline_state_db().find() + curr_user_states_after_test_run = list(filter(lambda ps: ps["user_id"] == self.testUUID, + all_pipeline_states_after_test_run)) + get_last_processed = lambda ps: ps['last_processed_ts'] + get_last_run = lambda ps: ps['last_ts_run'] + self.assertEqual(list(map(get_last_processed, curr_user_states_after_run)), + list(map(get_last_processed, curr_user_states_after_test_run))) + self.assertNotEqual(list(map(get_last_run, curr_user_states_after_run)), + list(map(get_last_run, curr_user_states_after_test_run))) if __name__ == '__main__': etc.configLogging() diff --git a/emission/tests/analysisTests/plottingTests/TestCompositeTripCreation.py b/emission/tests/analysisTests/plottingTests/TestCompositeTripCreation.py index 25b595584..8892d9fdf 100644 --- a/emission/tests/analysisTests/plottingTests/TestCompositeTripCreation.py +++ b/emission/tests/analysisTests/plottingTests/TestCompositeTripCreation.py @@ -79,15 +79,15 @@ def _testUpdateConfirmedTripProperties(self): # set a couple of confirmed trip properties tripSacrifice = confirmedTrips[0] - tripSacrifice["data"]["start_ts"] = 1000 - tripSacrifice["data"]["start_fmt_time"] = "I want to go back in time" - eaum.update_confirmed_and_composite(ecwe.Entry(tripSacrifice)) + tripSacrifice["data"]["some_field"] = 1000 + tripSacrifice["data"]["user_input"] = "I want to go back in time" + eaum.update_confirmed_and_composite(self.testTs, ecwe.Entry(tripSacrifice)) compositeTrips = list(self.testTs.find_entries(["analysis/composite_trip"])) tripExpected = compositeTrips[0] - self.assertEqual(tripExpected["data"]["start_ts"], 1000) - self.assertEqual(tripExpected["data"]["start_fmt_time"], "I want to go back in time") + self.assertEqual(tripExpected["data"]["some_field"], 1000) + self.assertEqual(tripExpected["data"]["user_input"], "I want to go back in time") print("testUpdateConfirmedTripProperties DONE") @@ -100,7 +100,7 @@ def _testSetConfirmedTripAdditions(self): tripSacrifice = confirmedTrips[1] ADDITIONS = ["mimi", "fifi", "gigi", "bibi"] tripSacrifice["data"]["additions"] = ADDITIONS - eaum.update_confirmed_and_composite(ecwe.Entry(tripSacrifice)) + eaum.update_confirmed_and_composite(self.testTs, ecwe.Entry(tripSacrifice)) compositeTrips = list(self.testTs.find_entries(["analysis/composite_trip"])) tripExpected = compositeTrips[1] @@ -117,7 +117,7 @@ def _testSetConfirmedTripUserInput(self): tripSacrifice = confirmedTrips[2] USERINPUT = {"mimi": 1, "fifi": 100, "gigi": 200, "bibi": 300} tripSacrifice["data"]["user_input"] = USERINPUT - eaum.update_confirmed_and_composite(ecwe.Entry(tripSacrifice)) + eaum.update_confirmed_and_composite(self.testTs, ecwe.Entry(tripSacrifice)) compositeTrips = list(self.testTs.find_entries(["analysis/composite_trip"])) tripExpected = compositeTrips[2] @@ -135,7 +135,7 @@ def _testUpdateConfirmedPlaceProperties(self): placeSacrifice = confirmedPlaces[2] placeSacrifice["data"]["exit_ts"] = 1000 placeSacrifice["data"]["exit_fmt_time"] = "I want to go back in time" - eaum.update_confirmed_and_composite(ecwe.Entry(placeSacrifice)) + eaum.update_confirmed_and_composite(self.testTs, ecwe.Entry(placeSacrifice)) compositeTrips = list(self.testTs.find_entries(["analysis/composite_trip"])) tripExpected = compositeTrips[1] @@ -154,7 +154,7 @@ def _testSetConfirmedPlaceAdditions(self): placeSacrifice = confirmedPlaces[5] ADDITIONS = ["mimi", "fifi", "gigi", "bibi"] placeSacrifice["data"]["additions"] = ADDITIONS - eaum.update_confirmed_and_composite(ecwe.Entry(placeSacrifice)) + eaum.update_confirmed_and_composite(self.testTs, ecwe.Entry(placeSacrifice)) compositeTrips= list(self.testTs.find_entries(["analysis/composite_trip"])) tripExpected = compositeTrips[4] diff --git a/emission/tests/analysisTests/userInputTests/TestConfirmedObjectFakeData.py b/emission/tests/analysisTests/userInputTests/TestConfirmedObjectFakeData.py index b82dcb194..7ae4d4634 100644 --- a/emission/tests/analysisTests/userInputTests/TestConfirmedObjectFakeData.py +++ b/emission/tests/analysisTests/userInputTests/TestConfirmedObjectFakeData.py @@ -113,7 +113,7 @@ def testInvalidInput(self): with self.assertRaises(KeyError) as ke: fake_ct = ecwe.Entry({"metadata": {"key": "analysis/cleaned_trip"}, "data": {}}) eaum.get_section_summary(self.test_ts, fake_ct, "foobar") - self.assertEqual(str(ke.exception), "'user_id'") + self.assertEqual(str(ke.exception), "'_id'") fake_ct = ecwe.Entry({"metadata": {"key": "analysis/cleaned_trip"}, "data": {}}) fake_ct_id = self.test_ts.insert(fake_ct) diff --git a/emission/tests/netTests/TestPipeline.py b/emission/tests/netTests/TestPipeline.py index f98964e60..54f81ba8e 100644 --- a/emission/tests/netTests/TestPipeline.py +++ b/emission/tests/netTests/TestPipeline.py @@ -6,14 +6,17 @@ import emission.core.get_database as edb import emission.core.wrapper.localdate as ecwl import emission.tests.common as etc +import emission.pipeline.intake_stage as epi from emission.net.api import pipeline class TestPipeline(unittest.TestCase): def setUp(self): + self.testEmail="foo@foo.com" etc.setupRealExample(self, "emission/tests/data/real_examples/shankari_2015-aug-21") self.testUUID1 = self.testUUID + self.testEmail="bar@bar.com" etc.setupRealExample(self, "emission/tests/data/real_examples/shankari_2015-aug-27") @@ -22,6 +25,7 @@ def tearDown(self): def clearRelatedDb(self): edb.get_timeseries_db().delete_many({"user_id": self.testUUID}) + edb.get_profile_db().delete_many({"user_id": self.testUUID}) edb.get_analysis_timeseries_db().delete_many({"user_id": self.testUUID}) edb.get_pipeline_state_db().delete_many({"user_id": self.testUUID}) edb.get_timeseries_db().delete_many({"user_id": self.testUUID1}) @@ -33,8 +37,10 @@ def testNoAnalysisResults(self): def testAnalysisResults(self): self.assertEqual(pipeline.get_range(self.testUUID), (None, None)) - etc.runIntakePipeline(self.testUUID) - self.assertAlmostEqual(pipeline.get_range(self.testUUID), (1440688739.672, 1440729142.709)) + epi.run_intake_pipeline_for_user(self.testUUID, skip_if_no_new_data = False) + pr = pipeline.get_range(self.testUUID) + self.assertAlmostEqual(pr[0], 1440688739.672) + self.assertAlmostEqual(pr[1], 1440729142.709) if __name__ == '__main__': import emission.tests.common as etc