diff --git a/package_info.json b/package_info.json index bb2ec45..8315f93 100644 --- a/package_info.json +++ b/package_info.json @@ -1,5 +1,5 @@ { "package_name": "recon-lw", - "package_version": "3.1.0" + "package_version": "3.1.1" } diff --git a/recon_lw/core/StateStream.py b/recon_lw/core/StateStream.py index d382a86..2010616 100644 --- a/recon_lw/core/StateStream.py +++ b/recon_lw/core/StateStream.py @@ -19,7 +19,8 @@ def __init__(self, state_transition_func, events_saver: IEventsSaver, combine_instantenious_snapshots=True, - get_next_update_func2=None + get_next_update_func2 = None, + initial_snapshots_func = None ) -> None: """ @@ -43,8 +44,13 @@ def __init__(self, self._combine_instantenious_snapshots = combine_instantenious_snapshots self._events_saver = events_saver self._get_next_update_func2 = get_next_update_func2 + self._initial_snapshots_func = initial_snapshots_func def state_updates(self, stream: Iterable, snapshots_collection): + if self._initial_snapshots_func is not None: + for key, ts, state in self._initial_snapshots_func(): + yield (key, ts, 'c', state) + if self._get_next_update_func2 is None: for o in stream: key, ts, action, state = self._get_next_update_func(o) diff --git a/recon_lw/core/utility/recon_utils.py b/recon_lw/core/utility/recon_utils.py index 33cf65c..e3d642b 100644 --- a/recon_lw/core/utility/recon_utils.py +++ b/recon_lw/core/utility/recon_utils.py @@ -1,3 +1,5 @@ +from dataclasses import dataclass + from th2_data_services.config import options from datetime import datetime from recon_lw.core.message_utils import message_to_dict @@ -8,6 +10,30 @@ from os import listdir from os import path + +@dataclass +class CachedMessage: + message: dict + count: int = 1 + +def message_cache_add_with_copies(m, message_cache): + mid = options.mfr.get_id(m) + if mid not in message_cache: + message_cache[mid] = CachedMessage(m, 1) + else: + message_cache[mid].count += 1 + +def message_cache_pop_with_copies(m_id, message_cache): + if m_id is None: + return None + r = message_cache.get(m_id) + if not r: + return None + if r.count == 1: + return message_cache.pop(m_id).message + r.count -= 1 + return r.message + def time_index_add(key, m, time_index): time_index.add((options.mfr.get_timestamp(m), key)) @@ -32,6 +58,7 @@ def create_event( body=None, parentId=None, recon_name='', + attached_message_ids=[] ): # TODO - description is required. ts = datetime.now() @@ -43,7 +70,7 @@ def create_event( "body": body, "parentEventId": parentId, "startTimestamp": {"epochSecond": int(ts.timestamp()), "nano": ts.microsecond * 1000}, - "attachedMessageIds": []} + "attachedMessageIds": attached_message_ids} return e def simplify_message(m): diff --git a/recon_lw/matching/old/matching.py b/recon_lw/matching/old/matching.py index eed986c..629395e 100644 --- a/recon_lw/matching/old/matching.py +++ b/recon_lw/matching/old/matching.py @@ -4,7 +4,7 @@ from th2_data_services.config import options from recon_lw.core.ts_converters import time_stamp_key -from recon_lw.core.utility import time_index_add, message_cache_add +from recon_lw.core.utility import time_index_add, message_cache_add, message_cache_add_with_copies from recon_lw.matching.old.utils import rule_flush @@ -29,7 +29,8 @@ def flush_matcher(ts, rule_settings, event_sequence: dict, save_events_func): event_sequence, save_events_func, rule_settings["rule_root_event"], - rule_settings["live_orders_cache"] if "live_orders_cache" in rule_settings else None) + rule_settings["live_orders_cache"] if "live_orders_cache" in rule_settings else None, + keep_copies_for_same_m_id=rule_settings.get('keep_copies_for_same_m_id', False)) def one_many_match(next_batch, rule_dict): """ @@ -56,6 +57,12 @@ def one_many_match(next_batch, rule_dict): message_cache = rule_dict["message_cache"] first_key_func = rule_dict["first_key_func"] second_key_func = rule_dict["second_key_func"] + trace_duplicate_messages = rule_dict.get('trace_duplicate_messages', True) + keep_copies_for_same_m_id = rule_dict.get('keep_copies_for_same_m_id', False) + + message_cache_add_func = message_cache_add + if keep_copies_for_same_m_id: + message_cache_add_func = message_cache_add_with_copies n_duplicates = 0 for m in next_batch: @@ -64,10 +71,12 @@ def one_many_match(next_batch, rule_dict): if first_keys is not None: match_index_element = [message_id, None] for first_key in first_keys: + if keep_copies_for_same_m_id: + match_index_element = [message_id, None] if first_key not in match_index: match_index[first_key] = match_index_element time_index_add(first_key, m, time_index) - message_cache_add(m, message_cache) + message_cache_add_func(m, message_cache) continue else: existing = match_index[first_key] @@ -75,25 +84,25 @@ def one_many_match(next_batch, rule_dict): n_duplicates += 1 else: existing[0] = message_id - message_cache_add(m, message_cache) + message_cache_add_func(m, message_cache) continue second_key = second_key_func(m) if second_key is not None: if second_key not in match_index: match_index[second_key] = [None, message_id] time_index_add(second_key, m, time_index) - message_cache_add(m, message_cache) + message_cache_add_func(m, message_cache) else: existing = match_index[second_key] if existing[1] is None: # existing[1] - stream 2 message ID existing[1] = message_id # match_index[second_key] = [existing[0], message_id] - message_cache_add(m, message_cache) + message_cache_add_func(m, message_cache) else: existing.append(message_id) - message_cache_add(m, message_cache) + message_cache_add_func(m, message_cache) - if n_duplicates > 0: + if n_duplicates > 0 and trace_duplicate_messages: print(n_duplicates, " duplicates detected") def pair_one_match(next_batch, rule_dict): diff --git a/recon_lw/matching/old/utils.py b/recon_lw/matching/old/utils.py index ea54c30..c4299ef 100644 --- a/recon_lw/matching/old/utils.py +++ b/recon_lw/matching/old/utils.py @@ -1,5 +1,5 @@ from recon_lw.core.ts_converters import time_stamp_key -from recon_lw.core.utility import message_cache_pop +from recon_lw.core.utility import message_cache_pop, message_cache_pop_with_copies def flush_old(current_ts, horizon_delay, time_index): @@ -21,8 +21,13 @@ def flush_old(current_ts, horizon_delay, time_index): def rule_flush(current_ts, horizon_delay, match_index: dict, time_index, message_cache, interpret_func, event_sequence: dict, send_events_func, - parent_event, live_orders_cache): + parent_event, live_orders_cache, keep_copies_for_same_m_id=False): old_keys = flush_old(current_ts, horizon_delay, time_index) + + cache_pop_func = message_cache_pop + if keep_copies_for_same_m_id: + cache_pop_func = message_cache_pop_with_copies + events = [] for match_key in old_keys: elem = match_index.pop(match_key) # elem -- can have 2 or 3 elements inside @@ -35,7 +40,7 @@ def rule_flush(current_ts, horizon_delay, match_index: dict, time_index, message # arg1 - ?? # arg2 - EventSequence results = interpret_func( - [message_cache_pop(item, message_cache) for item in elem], + [cache_pop_func(item, message_cache) for item in elem], live_orders_cache, event_sequence ) diff --git a/recon_lw/matching/recon_ob.py b/recon_lw/matching/recon_ob.py index b124868..81fb4c4 100644 --- a/recon_lw/matching/recon_ob.py +++ b/recon_lw/matching/recon_ob.py @@ -161,6 +161,7 @@ def process_operations_batch(operations_batch, events, book_id, book, check_book obs[-1]["aggr_seq"]["limit_delta"] = 0 if limit_not_affected and obs[-1]["aggr_seq"][ "limit_delta"] == 0 else 1 obs[-1]["aggr_seq"]["limit_v2"] = updated_v2 + obs[-1]["aggr_seq"]["l"] = min(ob["aggr_seq"]["l"] for ob in obs) else: updated_limit_v2 = 0 updated_top_v2 = 0 @@ -498,18 +499,18 @@ def ob_copy(b): def init_aggr_seq(order_book: dict) -> None: - order_book["aggr_seq"] = {"top_delta": 0, "limit_delta": 0} + order_book["aggr_seq"] = {"top_delta": 0, "limit_delta": 0, "l": -1} order_book["implied_only"] = False def reset_aggr_seq(order_book): - order_book["aggr_seq"].update({"top_delta": 0, "limit_delta": 0}) + order_book["aggr_seq"].update({"top_delta": 0, "limit_delta": 0, "l": -1}) order_book["implied_only"] = False def reflect_price_update_in_version(side: str, price: float, str_time_of_event, order_book: dict): level = get_price_level(side, price, order_book) - + order_book["aggr_seq"]["l"] = level max_levels = order_book["aggr_max_levels"] if level <= max_levels: order_book["aggr_seq"]["limit_delta"] = 1 @@ -657,6 +658,8 @@ def ob_clean_book(str_time_of_event, order_book: dict) -> tuple: update_time_and_version(str_time_of_event, order_book) order_book["aggr_seq"]["limit_delta"] = 1 order_book["aggr_seq"]["top_delta"] = 1 + order_book["aggr_seq"]["l"] = 1 + # return {}, [copy.deepcopy(order_book)] return {}, [ob_copy(order_book)] @@ -672,6 +675,7 @@ def ob_change_status(new_status: str, str_time_of_event, condition: str, order_b update_time_and_version(str_time_of_event, order_book) order_book["aggr_seq"]["limit_delta"] = 1 order_book["aggr_seq"]["top_delta"] = 1 + order_book["aggr_seq"]["l"] = 1 # return {}, [copy.deepcopy(order_book)] return {}, [ob_copy(order_book)] diff --git a/recon_lw/matching/recon_ob_cross_stream.py b/recon_lw/matching/recon_ob_cross_stream.py index 7f84da4..5c4196a 100644 --- a/recon_lw/matching/recon_ob_cross_stream.py +++ b/recon_lw/matching/recon_ob_cross_stream.py @@ -171,7 +171,11 @@ def compare_full_vs_top(full_book: dict, top_book: dict): def ob_compare_get_timestamp_key1_key2_aggr(o, custom_settings): if o["body"]["aggr_seq"]["limit_delta"] not in [1, 2]: return None, None, None + if o["body"]["sessionId"] == custom_settings["full_session"]: + if 'level_limit' in custom_settings: + if o["body"]["aggr_seq"]["l"] > custom_settings['level_limit']: + return None, None, None return o["body"]["timestamp"], "{0}_{1}_{2}".format(o["body"]["book_id"], o["body"]["time_of_event"], o["body"]["aggr_seq"]["limit_v2"]), None @@ -219,6 +223,7 @@ def ob_compare_get_timestamp_key1_key2_top_aggr(o, custom_settings): def ob_compare_interpret_match_aggr(match, custom_settings, create_event, save_events): + streams = f'{custom_settings["comp_session"]}:{custom_settings["full_session"]}' if match[0] is not None and match[1] is not None: comp_res = compare_full_vs_aggr(match[0]["body"], match[1]["body"]) events_to_store = [] @@ -232,7 +237,10 @@ def ob_compare_interpret_match_aggr(match, custom_settings, create_event, save_e "book_id": match[0]["body"]["book_id"], "time_of_event": match[0]["body"]["time_of_event"], "limit_v2": match[0]["body"]["aggr_seq"]["limit_v2"], - "errors": comp_res})) + "errors": comp_res, + "streams": streams, + "full_meta" : match[0]["body"].get("meta"), + "aggr_meta" : match[1]["body"].get("meta")})) else: events_to_store.append(create_event( "23:match", @@ -244,7 +252,10 @@ def ob_compare_interpret_match_aggr(match, custom_settings, create_event, save_e "aggr_book_scope": match[1]["scope"], "book_id": match[0]["body"]["book_id"], "time_of_event": match[0]["body"]["time_of_event"], - "top_v2": match[0]["body"]["aggr_seq"]["top_v2"]})) + "limit_v2": match[0]["body"]["aggr_seq"]["limit_v2"], + "streams": streams, + "full_meta" : match[0]["body"].get("meta"), + "aggr_meta" : match[1]["body"].get("meta")})) save_events(events_to_store) elif match[0] is not None: tech_info = ob_compare_get_timestamp_key1_key2_aggr(match[0], custom_settings) @@ -256,7 +267,9 @@ def ob_compare_interpret_match_aggr(match, custom_settings, create_event, save_e "book_id": match[0]["body"]["book_id"], "time_of_event": match[0]["body"]["time_of_event"], "limit_v2": match[0]["body"]["aggr_seq"]["limit_v2"], + "streams": streams, "sessionId": match[0]["body"]["sessionId"], + "full_meta" : match[0]["body"].get("meta"), "tech_info": tech_info}) save_events([error_event]) elif match[1] is not None: @@ -269,6 +282,8 @@ def ob_compare_interpret_match_aggr(match, custom_settings, create_event, save_e "book_id": match[1]["body"]["book_id"], "time_of_event": match[1]["body"]["time_of_event"], "limit_v2": match[1]["body"]["aggr_seq"]["limit_v2"], + "streams": streams, + "aggr_meta" : match[1]["body"].get("meta"), "sessionId": match[1]["body"]["sessionId"]}) save_events([error_event]) @@ -412,11 +427,14 @@ def ob_compare_streams(source_events_path: pathlib.PosixPath, results_path: path full_session = rule_params["full_session"] if "aggr_session" in rule_params: aggr_session = rule_params["aggr_session"] + custom_settings = {"full_session": full_session, "comp_session": aggr_session} + if 'level_limit' in rule_params: + custom_settings['level_limit'] = rule_params['level_limit'] processor_aggr = TimeCacheMatcher( rule_params["horizon_delay"], ob_compare_get_timestamp_key1_key2_aggr, ob_compare_interpret_match_aggr, - {"full_session": full_session, "comp_session": aggr_session}, + custom_settings, lambda name, ev_type, ok, body: events_saver.create_event( name, ev_type, ok, body, parentId=rule_root_event["eventId"]), lambda ev_batch: events_saver.save_events(ev_batch) diff --git a/recon_lw/reporting/match_diff/categorizer/basic.py b/recon_lw/reporting/match_diff/categorizer/basic.py index ddf2090..2624615 100644 --- a/recon_lw/reporting/match_diff/categorizer/basic.py +++ b/recon_lw/reporting/match_diff/categorizer/basic.py @@ -103,6 +103,7 @@ def process_event( # TODO: # event['body']['diff'] -- diff here is actually `diffs` - list of diff + for diff in event['body']['diff']: category = self.error_extractor_strategy.diff_category_extractor( recon_name, diff, event) diff --git a/recon_lw/reporting/match_diff/viewer/category_displayer.py b/recon_lw/reporting/match_diff/viewer/category_displayer.py index becfa71..2380e23 100644 --- a/recon_lw/reporting/match_diff/viewer/category_displayer.py +++ b/recon_lw/reporting/match_diff/viewer/category_displayer.py @@ -150,9 +150,14 @@ def _get_example_td(example_data: MatchDiffExampleData, item_id: str): if isinstance(example_data.message_content, list): code_mc = '' for mc in example_data.message_content: - code_mc += f'
{json.dumps(mc, indent=4)}
' - else: + if isinstance(mc, dict): + code_mc += f'
{json.dumps(mc, indent=4)}
' + else: + code_mc += f'
{mc}
' + elif isinstance(example_data.message_content, dict): code_mc = f'{json.dumps(example_data.message_content, indent=4)}' + else: + code_mc = f'{example_data.message_content}' return f'''