From fac1cb280dd7f7c20d070cc3cf017e2b4c32ee70 Mon Sep 17 00:00:00 2001 From: Shankari Date: Tue, 15 Aug 2023 15:41:51 -0700 Subject: [PATCH] :zap: :bug: Mark the pipeline range right after the composite trips are generated + remove computing the cached diary objects using the OUTPUT_GEN pipeline state + diary screen is gone; we don't even use the cached values properly any more + no need to waste time iterating through trips, sections and trajectories, and fill in documents that nobody uses. This is a minimal change that should help get us to production. Removing all vestiges of OUTPUT_GEN from the codebase is left as a task for a future intern. Testing done: ``` ./e-mission-py.bash emission/tests/analysisTests/intakeTests/TestPipelineCornerCases.py ---------------------------------------------------------------------- Ran 3 tests in 12.238s OK ``` --- emission/pipeline/intake_stage.py | 8 -------- .../analysisTests/intakeTests/TestPipelineCornerCases.py | 2 +- 2 files changed, 1 insertion(+), 9 deletions(-) diff --git a/emission/pipeline/intake_stage.py b/emission/pipeline/intake_stage.py index 6213e66e8..203c25357 100644 --- a/emission/pipeline/intake_stage.py +++ b/emission/pipeline/intake_stage.py @@ -202,14 +202,6 @@ def run_intake_pipeline_for_user(uuid, skip_if_no_new_data): esds.store_pipeline_time(uuid, ecwp.PipelineStages.CREATE_COMPOSITE_OBJECTS.name, time.time(), crt.elapsed) - with ect.Timer() as ogt: - logging.info("*" * 10 + "UUID %s: storing views to cache" % uuid + "*" * 10) - print(str(arrow.now()) + "*" * 10 + "UUID %s: storing views to cache" % uuid + "*" * 10) - uh.storeViewsToCache() - - esds.store_pipeline_time(uuid, ecwp.PipelineStages.OUTPUT_GEN.name, - time.time(), ogt.elapsed) - _get_and_store_range(uuid, "analysis/composite_trip") def _get_and_store_range(user_id, trip_key): diff --git a/emission/tests/analysisTests/intakeTests/TestPipelineCornerCases.py b/emission/tests/analysisTests/intakeTests/TestPipelineCornerCases.py index 32b0d3eae..56075b83b 100644 --- a/emission/tests/analysisTests/intakeTests/TestPipelineCornerCases.py +++ b/emission/tests/analysisTests/intakeTests/TestPipelineCornerCases.py @@ -83,7 +83,7 @@ def testSkipPipelineNoNewEntries(self): ewps.PipelineStages.OUTPUT_GEN] test_run_states = list([pse.value for pse in filter(lambda pse: pse not in stages_skipped_in_testing, - ewps.PipelineStages.__iter__())]) + [ewps.PipelineStages.OUTPUT_GEN.value] + ewps.PipelineStages.__iter__())]) curr_user_states = list(filter(lambda ps: ps["user_id"] == self.testUUID, all_pipeline_states)) self.assertEqual(len(curr_user_states), 0)