Skip to content

Commit

Permalink
Merge pull request #931 from shankari/scalability_improvements
Browse files Browse the repository at this point in the history
⚡ Do nothing well (2023 server edition)
  • Loading branch information
shankari authored Aug 15, 2023
2 parents 8761f4a + 2099f9b commit f92b00a
Show file tree
Hide file tree
Showing 12 changed files with 157 additions and 53 deletions.
20 changes: 20 additions & 0 deletions bin/build_label_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import argparse
import uuid
import arrow
import pymongo

import emission.pipeline.reset as epr
import emission.core.get_database as edb
Expand Down Expand Up @@ -47,6 +49,9 @@ def _email_2_user_list(email_list):
level=logging.DEBUG)

parser = argparse.ArgumentParser()
parser.add_argument("-i", "--ignore_older_than_weeks", type=int,
help="skip model build if last trip is older than this")

group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("-a", "--all", action="store_true", default=False,
help="build the model for all users")
Expand All @@ -66,6 +71,21 @@ def _email_2_user_list(email_list):
logging.info("building model for user %s" % user_id)
# these can come from the application config as default values

if args.ignore_older_than_weeks:
ts = esta.TimeSeries.get_time_series(user_id)
last_confirmed_trip_end = arrow.get(
ts.get_first_value_for_field("analysis/confirmed_trip",
"data.end_ts", pymongo.DESCENDING))

week_limit = arrow.utcnow().shift(weeks=-args.ignore_older_than_weeks)
if last_confirmed_trip_end.timestamp() < week_limit.timestamp():
logging.debug("last trip was at %s, three weeks ago was %s, gap = %s, skipping model building..." %
(last_confirmed_trip_end, week_limit, last_confirmed_trip_end.humanize(week_limit)))
continue
else:
logging.debug("last trip was at %s, three weeks ago was %s, gap is %s, building model..." %
(last_confirmed_trip_end, week_limit, last_confirmed_trip_end.humanize(week_limit)))

model_type = eamtc.get_model_type()
model_storage = eamtc.get_model_storage()
min_trips = eamtc.get_minimum_trips()
Expand Down
4 changes: 3 additions & 1 deletion bin/intake_multiprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
intake_log_config = json.load(open("conf/log/intake.conf.sample", "r"))

parser = argparse.ArgumentParser()
parser.add_argument("--skip_if_no_new_data", action="store_true",
help="skip the pipeline if there is no new data")
parser.add_argument("n_workers", type=int,
help="the number of worker processors to use")
args = parser.parse_args()
Expand All @@ -32,4 +34,4 @@

split_lists = eps.get_split_uuid_lists(args.n_workers)
logging.info("Finished generating split lists %s" % split_lists)
eps.dispatch(split_lists)
eps.dispatch(split_lists, args.skip_if_no_new_data)
19 changes: 10 additions & 9 deletions emission/analysis/userinput/matcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def match_incoming_inputs(user_id, timerange):
handle_multi_non_deleted_match(confirmed_obj, ui)
else:
assert False, "Found weird key {ui.metadata.key} that was not in the search list"
update_confirmed_and_composite(confirmed_obj)
update_confirmed_and_composite(ts, confirmed_obj)
else:
logging.warn("No match found for single user input %s, moving forward anyway" % ui)
lastInputProcessed = ui
Expand Down Expand Up @@ -98,7 +98,7 @@ def create_confirmed_objects(user_id):
if last_confirmed_place is not None:
logging.debug("last confirmed_place %s was already in database, updating with linked trip info... and %s additions" %
(last_confirmed_place["_id"], len(last_confirmed_place["data"]["additions"])))
update_confirmed_and_composite(last_confirmed_place)
update_confirmed_and_composite(ts, last_confirmed_place)

if confirmed_tl is not None and not confirmed_tl.is_empty():
tl_list = list(confirmed_tl)
Expand Down Expand Up @@ -200,8 +200,7 @@ def get_section_summary(ts, cleaned_trip, section_key):
cleaned_trip: the cleaned trip object associated with the sections
section_key: 'inferred_section' or 'cleaned_section'
"""
import emission.core.get_database as edb

logging.debug(f"get_section_summary({cleaned_trip['_id']}, {section_key}) called")
sections = esdt.get_sections_for_trip(key = section_key,
user_id = cleaned_trip["user_id"], trip_id = cleaned_trip["_id"])
if len(sections) == 0:
Expand All @@ -214,11 +213,13 @@ def get_section_summary(ts, cleaned_trip, section_key):
if section_key == "analysis/cleaned_section" else inferred_section_mapper
sections_df["sensed_mode_str"] = sections_df["sensed_mode"].apply(sel_section_mapper)
grouped_section_df = sections_df.groupby("sensed_mode_str")
return {
retVal = {
"distance": grouped_section_df.distance.sum().to_dict(),
"duration": grouped_section_df.duration.sum().to_dict(),
"count": grouped_section_df.trip_id.count().to_dict()
}
logging.debug(f"get_section_summary({cleaned_trip['_id']}, {section_key}) returning {retVal}")
return retVal

def create_confirmed_entry(ts, tce, confirmed_key, input_key_list):
# Copy the entry and fill in the new values
Expand Down Expand Up @@ -271,7 +272,7 @@ def link_trip_end(confirmed_trip, confirmed_end_place):
confirmed_trip["data"]["end_place"] = confirmed_end_place.get_id()
confirmed_end_place["data"]["ending_trip"] = confirmed_trip.get_id()

def update_confirmed_and_composite(confirmed_obj):
def update_confirmed_and_composite(ts, confirmed_obj):
import emission.storage.timeseries.builtin_timeseries as estbt
import emission.core.get_database as edb
estbt.BuiltinTimeSeries.update(confirmed_obj)
Expand All @@ -281,10 +282,10 @@ def update_confirmed_and_composite(confirmed_obj):
# if we don't find a matching composite trip, we don't need to do anything
# since it has not been created yet and will be created with updated values when we get to that stage
if confirmed_obj["metadata"]["key"] in [esda.CONFIRMED_TRIP_KEY, esda.CONFIRMED_UNTRACKED_KEY]:
composite_trip = edb.get_analysis_timeseries_db().find_one({"data.confirmed_trip": confirmed_obj.get_id()})
composite_trip = ts.get_entry_at_ts("analysis/composite_trip", "data.start_ts", confirmed_obj.data.start_ts)
if composite_trip is not None:
# copy over all the fields other than the end_confimed_place
EXCLUDED_FIELDS = ["end_confirmed_place"]
EXCLUDED_FIELDS = ["end_confirmed_place", "start_ts", "end_ts", "start_loc", "end_loc"]
for k in confirmed_obj["data"].keys():
if k not in EXCLUDED_FIELDS:
composite_trip["data"][k] = confirmed_obj["data"][k]
Expand All @@ -293,7 +294,7 @@ def update_confirmed_and_composite(confirmed_obj):
logging.debug("No composite trip matching confirmed trip %s, nothing to update" % confirmed_obj["_id"])

if confirmed_obj["metadata"]["key"] == esda.CONFIRMED_PLACE_KEY:
composite_trip = edb.get_analysis_timeseries_db().find_one({"data.end_confirmed_place._id": confirmed_obj["_id"]})
composite_trip = ts.get_entry_at_ts("analysis/composite_trip", "data.end_ts", confirmed_obj['data']['enter_ts'])
if composite_trip is not None:
composite_trip["data"]["end_confirmed_place"] = confirmed_obj
estbt.BuiltinTimeSeries.update(ecwe.Entry(composite_trip))
Expand Down
27 changes: 4 additions & 23 deletions emission/net/api/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,35 +10,16 @@

import emission.storage.pipeline_queries as esp
import emission.storage.timeseries.abstract_timeseries as esta
import emission.core.wrapper.user as ecwu

def get_complete_ts(user_id):
complete_ts = esp.get_complete_ts(user_id)
logging.debug("Returning complete_ts = %s" % complete_ts)
return complete_ts

def get_range(user_id):
ts = esta.TimeSeries.get_time_series(user_id)
start_ts = ts.get_first_value_for_field("analysis/composite_trip", "data.start_ts", pymongo.ASCENDING)
if start_ts == -1:
start_ts = ts.get_first_value_for_field("analysis/confirmed_trip", "data.start_ts", pymongo.ASCENDING)
if start_ts == -1:
start_ts = ts.get_first_value_for_field("analysis/cleaned_trip", "data.start_ts", pymongo.ASCENDING)
if start_ts == -1:
start_ts = None

end_ts = ts.get_first_value_for_field("analysis/composite_trip", "data.end_ts", pymongo.DESCENDING)
if start_ts == -1:
end_ts = ts.get_first_value_for_field("analysis/confirmed_trip", "data.end_ts", pymongo.DESCENDING)
if end_ts == -1:
end_ts = ts.get_first_value_for_field("analysis/cleaned_trip", "data.end_ts", pymongo.DESCENDING)
if end_ts == -1:
end_ts = None

complete_ts = get_complete_ts(user_id)
if complete_ts is not None and end_ts is not None\
and (end_ts != (complete_ts - esp.END_FUZZ_AVOID_LTE)):
logging.exception("end_ts %s != complete_ts no fuzz %s" %
(end_ts, (complete_ts - esp.END_FUZZ_AVOID_LTE)))

user_profile = ecwu.User(user_id).getProfile()
start_ts = user_profile.get("pipeline_range", {}).get("start_ts", None)
end_ts = user_profile.get("pipeline_range", {}).get("end_ts", None)
logging.debug("Returning range (%s, %s)" % (start_ts, end_ts))
return (start_ts, end_ts)
2 changes: 2 additions & 0 deletions emission/net/usercache/abstract_usercache_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ def moveToLongTerm(self):
"""
Moves all messages that have arrived for the current user into long-term
storage, after converting into a platform-independent format.
Returns the number of entries that were moved so that we can return
early if this is zero.
"""
pass

Expand Down
3 changes: 2 additions & 1 deletion emission/net/usercache/builtin_usercache_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def moveToLongTerm(self):
# Since we didn't get the current time range, there is no current
# state, so we don't need to mark it as done
# esp.mark_usercache_done(None)
return
return 0

time_query = esp.get_time_range_for_usercache(self.user_id)

Expand Down Expand Up @@ -99,6 +99,7 @@ def moveToLongTerm(self):
logging.debug("Deleting all entries for query %s" % time_query)
uc.clearProcessedMessages(time_query)
esp.mark_usercache_done(self.user_id, last_ts_processed)
return len(unified_entry_list)

def storeViewsToCache(self):
"""
Expand Down
32 changes: 28 additions & 4 deletions emission/pipeline/intake_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import arrow
from uuid import UUID
import time
import pymongo

import emission.core.get_database as edb
import emission.core.timer as ect
Expand All @@ -37,8 +38,9 @@

import emission.storage.decorations.stats_queries as esds

import emission.core.wrapper.user as ecwu

def run_intake_pipeline(process_number, uuid_list):
def run_intake_pipeline(process_number, uuid_list, skip_if_no_new_data=False):
"""
Run the intake pipeline with the specified process number and uuid list.
Note that the process_number is only really used to customize the log file name
Expand Down Expand Up @@ -75,23 +77,30 @@ def run_intake_pipeline(process_number, uuid_list):
continue

try:
run_intake_pipeline_for_user(uuid)
run_intake_pipeline_for_user(uuid, skip_if_no_new_data)
except Exception as e:
esds.store_pipeline_error(uuid, "WHOLE_PIPELINE", time.time(), None)
logging.exception("Found error %s while processing pipeline "
"for user %s, skipping" % (e, uuid))

def run_intake_pipeline_for_user(uuid):
def run_intake_pipeline_for_user(uuid, skip_if_no_new_data):
uh = euah.UserCacheHandler.getUserCacheHandler(uuid)

with ect.Timer() as uct:
logging.info("*" * 10 + "UUID %s: moving to long term" % uuid + "*" * 10)
print(str(arrow.now()) + "*" * 10 + "UUID %s: moving to long term" % uuid + "*" * 10)
uh.moveToLongTerm()
new_entry_count = uh.moveToLongTerm()

esds.store_pipeline_time(uuid, ecwp.PipelineStages.USERCACHE.name,
time.time(), uct.elapsed)

if skip_if_no_new_data and new_entry_count == 0:
print("No new entries, and skip_if_no_new_data = %s, skipping the rest of the pipeline" % skip_if_no_new_data)
return
else:
print("New entry count == %s and skip_if_no_new_data = %s, continuing" %
(new_entry_count, skip_if_no_new_data))

with ect.Timer() as uit:
logging.info("*" * 10 + "UUID %s: updating incoming user inputs" % uuid + "*" * 10)
print(str(arrow.now()) + "*" * 10 + "UUID %s: updating incoming user inputs" % uuid + "*" * 10)
Expand Down Expand Up @@ -200,3 +209,18 @@ def run_intake_pipeline_for_user(uuid):

esds.store_pipeline_time(uuid, ecwp.PipelineStages.OUTPUT_GEN.name,
time.time(), ogt.elapsed)

_get_and_store_range(uuid, "analysis/composite_trip")

def _get_and_store_range(user_id, trip_key):
ts = esta.TimeSeries.get_time_series(user_id)
start_ts = ts.get_first_value_for_field(trip_key, "data.start_ts", pymongo.ASCENDING)
if start_ts == -1:
start_ts = None
end_ts = ts.get_first_value_for_field(trip_key, "data.end_ts", pymongo.DESCENDING)
if end_ts == -1:
end_ts = None

user = ecwu.User(user_id)
user.update({"pipeline_range": {"start_ts": start_ts, "end_ts": end_ts}})
logging.debug("After updating, new profiles is %s" % user.getProfile())
4 changes: 2 additions & 2 deletions emission/pipeline/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@ def get_split_uuid_lists(n_splits):
logging.debug("Split values are %s" % ret_splits)
return ret_splits

def dispatch(split_lists):
def dispatch(split_lists, skip_if_no_new_data):
ctx = mp.get_context('spawn')
process_list = []
for i, uuid_list in enumerate(split_lists):
logging.debug("Dispatching list %s" % uuid_list)
pid = i
p = ctx.Process(target=epi.run_intake_pipeline, args=(pid, uuid_list))
p = ctx.Process(target=epi.run_intake_pipeline, args=(pid, uuid_list, skip_if_no_new_data))
logging.info("Created process %s to process %s list of size %s" %
(p, i, len(uuid_list)))
p.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@
import emission.core.common as ecc
import emission.core.wrapper.pipelinestate as ewps
import emission.storage.pipeline_queries as epq
import emission.pipeline.intake_stage as epi

# Test imports
import emission.tests.common as etc

class TestPipelineRealData(unittest.TestCase):
class TestPipelineCornerCases(unittest.TestCase):
def setUp(self):
self.testUUID = uuid.uuid4()
logging.info("setUp complete")
Expand Down Expand Up @@ -69,6 +70,72 @@ def testAllDefinedPipelineStates(self):
self.assertIsNotNone(ps['user_id'])
self.assertIn(ps['pipeline_stage'], valid_states)

def testSkipPipelineNoNewEntries(self):
# we start with no pipeline states for this user
all_pipeline_states = edb.get_pipeline_state_db().find()
stages_skipped_in_testing = [
ewps.PipelineStages.USERCACHE,
ewps.PipelineStages.TRIP_MODEL,
ewps.PipelineStages.TOUR_MODEL,
ewps.PipelineStages.ALTERNATIVES,
ewps.PipelineStages.USER_MODEL,
ewps.PipelineStages.RECOMMENDATION,
ewps.PipelineStages.OUTPUT_GEN]
test_run_states = list([pse.value for pse in
filter(lambda pse: pse not in stages_skipped_in_testing,
ewps.PipelineStages.__iter__())]) + [ewps.PipelineStages.OUTPUT_GEN.value]
curr_user_states = list(filter(lambda ps: ps["user_id"] == self.testUUID,
all_pipeline_states))
self.assertEqual(len(curr_user_states), 0)
# next, we run the real pipeline, and end up with no entries
# and we have an early return in that case
print("-" * 10, "Running real pipeline on empty DB, expecting no change", "-" * 10)
epi.run_intake_pipeline_for_user(self.testUUID, True)
all_pipeline_states = edb.get_pipeline_state_db().find()
curr_user_states = list(filter(lambda ps: ps["user_id"] == self.testUUID,
all_pipeline_states))
self.assertEqual(len(curr_user_states), 0)
# Now we load some data and run the test pipeline
# which will generate some pipeline states
print("-" * 10, "Running test pipeline on real data, expecting states to be set", "-" * 10)
etc.setupRealExample(self, "emission/tests/data/real_examples/shankari_2016-07-25")
epi.run_intake_pipeline_for_user(self.testUUID, False)

all_pipeline_states_after_run = edb.get_pipeline_state_db().find()
curr_user_states_after_run = list(filter(lambda ps: ps["user_id"] == self.testUUID,
all_pipeline_states_after_run))
states_set = [ps['pipeline_stage'] for ps in curr_user_states_after_run]
print("-" * 10, "test run stages are ", test_run_states, "-" * 10)
print("-" * 10, "states that are set are ", states_set, "-" * 10)
self.assertGreater(len(curr_user_states_after_run), 0)
self.assertEqual(sorted(states_set), sorted(test_run_states))
# then we run the real pipeline again
# We expect to see no changes between the first and the second run
# because of the usercache skip
epi.run_intake_pipeline_for_user(self.testUUID, True)
all_pipeline_states_after_test_run = edb.get_pipeline_state_db().find()
curr_user_states_after_test_run = list(filter(lambda ps: ps["user_id"] == self.testUUID,
all_pipeline_states_after_test_run))
get_last_processed = lambda ps: ps['last_processed_ts']
get_last_run = lambda ps: ps['last_ts_run']
self.assertEqual(list(map(get_last_processed, curr_user_states_after_run)),
list(map(get_last_processed, curr_user_states_after_test_run)))
self.assertEqual(list(map(get_last_run, curr_user_states_after_run)),
list(map(get_last_run, curr_user_states_after_test_run)))

# then we run the real pipeline again with skip=False
# We expect to see no changes between the first and the second run
# because of the usercache skip
epi.run_intake_pipeline_for_user(self.testUUID, False)
all_pipeline_states_after_test_run = edb.get_pipeline_state_db().find()
curr_user_states_after_test_run = list(filter(lambda ps: ps["user_id"] == self.testUUID,
all_pipeline_states_after_test_run))
get_last_processed = lambda ps: ps['last_processed_ts']
get_last_run = lambda ps: ps['last_ts_run']
self.assertEqual(list(map(get_last_processed, curr_user_states_after_run)),
list(map(get_last_processed, curr_user_states_after_test_run)))
self.assertNotEqual(list(map(get_last_run, curr_user_states_after_run)),
list(map(get_last_run, curr_user_states_after_test_run)))

if __name__ == '__main__':
etc.configLogging()
Expand Down
Loading

0 comments on commit f92b00a

Please sign in to comment.