Skip to content

Commit

Permalink
Removing changes made to original export PR scripts.
Browse files Browse the repository at this point in the history
  • Loading branch information
Mahadik, Mukul Chandrakant authored and Mahadik, Mukul Chandrakant committed Jul 31, 2024
1 parent 6a990cb commit 2d73ef9
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 46 deletions.
5 changes: 3 additions & 2 deletions bin/debug/extract_timeline_for_day_range_and_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ def export_timeline_for_users(user_id_list, args):
for curr_uuid in user_id_list:
if curr_uuid != '':
logging.info("=" * 50)
export_timeline(user_id=curr_uuid, start_day_str=args.start_day,
end_day_str= args.end_day, timezone=args.timezone,
export_timeline(user_id=curr_uuid, start_day_str=args.start_day,
end_day_str= args.end_day, timezone=args.timezone,
file_name=args.file_prefix)

if __name__ == '__main__':
Expand All @@ -67,6 +67,7 @@ def export_timeline_for_users(user_id_list, args):
group.add_argument("-u", "--user_uuid", nargs="+")
group.add_argument("-a", "--all", action="store_true")
group.add_argument("-f", "--file")

parser.add_argument("--timezone", default="UTC")
parser.add_argument("start_day", help="start day in utc - e.g. 'YYYY-MM-DD'" )
parser.add_argument("end_day", help="start day in utc - e.g. 'YYYY-MM-DD'" )
Expand Down
56 changes: 13 additions & 43 deletions emission/export/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,54 +39,35 @@ def get_with_retry(retrieve_call, in_query):
query.startTs = curr_batch[-1][timeTypeSplit[0]][timeTypeSplit[1]]
return list_so_far

def get_from_all_three_sources_with_retry(user_id, in_query, databases=None):
def get_from_all_three_sources_with_retry(user_id, in_query):
import emission.storage.timeseries.builtin_timeseries as estb

ts = estb.BuiltinTimeSeries(user_id)
uc = enua.UserCache.getUserCache(user_id)

sort_key = ts._get_sort_key(in_query)
source_db_calls = []
base_ts_call = lambda tq: ts._get_entries_for_timeseries(ts.timeseries_db, None, tq,
geo_query=None, extra_query_list=None, sort_key = sort_key)
analysis_ts_call = lambda tq: ts._get_entries_for_timeseries(ts.analysis_timeseries_db, None, tq,
geo_query=None, extra_query_list=None, sort_key = sort_key)
uc_ts_call = lambda tq: (uc.getMessageCount(None, tq), uc.getMessage(None, tq))

logging.info("In get_from_all_three_sources_with_retry: Databases = %s" % databases)

if databases is None or 'timeseries_db' in databases:
logging.info("Fetching from timeseries_db")
base_ts_call = lambda tq: ts._get_entries_for_timeseries(ts.timeseries_db, None, tq,
geo_query=None, extra_query_list=None, sort_key = sort_key)
source_db_calls.append(base_ts_call)

if databases is None or 'analysis_timeseries_db' in databases:
logging.info("Fetching from analysis_timeseries_db")
analysis_ts_call = lambda tq: ts._get_entries_for_timeseries(ts.analysis_timeseries_db, None, tq,
geo_query=None, extra_query_list=None, sort_key = sort_key)
source_db_calls.append(analysis_ts_call)

if databases is None or 'usercache_db' in databases:
logging.info("Fetching from usercache_db")
uc_ts_call = lambda tq: (uc.getMessageCount(None, tq), uc.getMessage(None, tq))
source_db_calls.append(uc_ts_call)

retry_lists = []
for source_db_call in source_db_calls:
retry_lists = retry_lists + get_with_retry(source_db_call, in_query)

return retry_lists

def export(user_id, ts, start_ts, end_ts, file_name, ma_bool, databases=None):
logging.info("In export: Databases = %s" % databases)
return get_with_retry(base_ts_call, in_query) + \
get_with_retry(analysis_ts_call, in_query) + \
get_with_retry(uc_ts_call, in_query)

def export(user_id, ts, start_ts, end_ts, file_name, ma_bool):
logging.info("Extracting timeline for user %s day %s -> %s and saving to file %s" %
(user_id, start_ts, end_ts, file_name))

loc_time_query = estt.TimeQuery("data.ts", start_ts, end_ts)
loc_entry_list = get_from_all_three_sources_with_retry(user_id, loc_time_query, databases)
loc_entry_list = get_from_all_three_sources_with_retry(user_id, loc_time_query)
# Changing to estcs so that we will read the manual entries, which have data.start_ts and data.enter_ts
# from the usercache as well
trip_time_query = estt.TimeQuery("data.start_ts", start_ts, end_ts)
trip_entry_list = get_from_all_three_sources_with_retry(user_id, trip_time_query, databases)
trip_entry_list = get_from_all_three_sources_with_retry(user_id, trip_time_query)
place_time_query = estt.TimeQuery("data.enter_ts", start_ts, end_ts)
place_entry_list = get_from_all_three_sources_with_retry(user_id, place_time_query, databases)
place_entry_list = get_from_all_three_sources_with_retry(user_id, place_time_query)
# Handle the case of the first place, which has no enter_ts and won't be
# matched by the default query
first_place_extra_query = {'$and': [{'data.enter_ts': {'$exists': False}},
Expand All @@ -106,20 +87,9 @@ def export(user_id, ts, start_ts, end_ts, file_name, ma_bool, databases=None):
logging.info("No entries found in range for user %s, skipping save" % user_id)
else:
combined_filename = "%s.gz" % (file_name)
logging.info("Combined list:")
logging.info(len(combined_list))
with gzip.open(combined_filename, "wt") as gcfd:
json.dump(combined_list,
gcfd, default=esj.wrapped_default, allow_nan=False, indent=4)

# Returning these queries that were used to fetch the data entries that were exported.
# Need these for use in the purge_user_timeseries.py script so that we only delete those entries that were exported
return {
'trip_time_query': { 'query': trip_time_query, 'type': "time" },
'place_time_query': { 'query': place_time_query, 'type': "time" },
'loc_time_query': { 'query': loc_time_query, 'type': "time" },
'first_place_extra_query': { 'query': first_place_extra_query, 'type': "extra" }
}


def validate_truncation(loc_entry_list, trip_entry_list, place_entry_list):
Expand Down
2 changes: 1 addition & 1 deletion emission/pipeline/export_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def run_export_pipeline(process_number, uuid_list):
export_log_config["handlers"]["errors"]["filename"] = \
export_log_config["handlers"]["errors"]["filename"].replace("export", "export_%s" % process_number)

# logging.config.dictConfig(export_log_config)
logging.config.dictConfig(export_log_config)
np.random.seed(61297777)

logging.info("processing UUID list = %s" % uuid_list)
Expand Down

0 comments on commit 2d73ef9

Please sign in to comment.