Skip to content

Commit

Permalink
Merge pull request #932 from shankari/scalability_improvements
Browse files Browse the repository at this point in the history
⚡ 🐛 Mark the pipeline range right after the composite trips a…
  • Loading branch information
shankari authored Aug 15, 2023
2 parents f92b00a + fac1cb2 commit 94e7478
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 9 deletions.
8 changes: 0 additions & 8 deletions emission/pipeline/intake_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,14 +202,6 @@ def run_intake_pipeline_for_user(uuid, skip_if_no_new_data):
esds.store_pipeline_time(uuid, ecwp.PipelineStages.CREATE_COMPOSITE_OBJECTS.name,
time.time(), crt.elapsed)

with ect.Timer() as ogt:
logging.info("*" * 10 + "UUID %s: storing views to cache" % uuid + "*" * 10)
print(str(arrow.now()) + "*" * 10 + "UUID %s: storing views to cache" % uuid + "*" * 10)
uh.storeViewsToCache()

esds.store_pipeline_time(uuid, ecwp.PipelineStages.OUTPUT_GEN.name,
time.time(), ogt.elapsed)

_get_and_store_range(uuid, "analysis/composite_trip")

def _get_and_store_range(user_id, trip_key):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def testSkipPipelineNoNewEntries(self):
ewps.PipelineStages.OUTPUT_GEN]
test_run_states = list([pse.value for pse in
filter(lambda pse: pse not in stages_skipped_in_testing,
ewps.PipelineStages.__iter__())]) + [ewps.PipelineStages.OUTPUT_GEN.value]
ewps.PipelineStages.__iter__())])
curr_user_states = list(filter(lambda ps: ps["user_id"] == self.testUUID,
all_pipeline_states))
self.assertEqual(len(curr_user_states), 0)
Expand Down

0 comments on commit 94e7478

Please sign in to comment.