From 89adc5e8df02509dafe513f2e936b223c1a728f8 Mon Sep 17 00:00:00 2001 From: Alexey Zverev <74251321+alexeyzverev@users.noreply.github.com> Date: Mon, 27 May 2024 20:10:23 +0100 Subject: [PATCH 1/9] Update recon_ob_cross_stream.py add meta to aggr_cross recon --- recon_lw/matching/recon_ob_cross_stream.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/recon_lw/matching/recon_ob_cross_stream.py b/recon_lw/matching/recon_ob_cross_stream.py index 7f84da4..e3d030f 100644 --- a/recon_lw/matching/recon_ob_cross_stream.py +++ b/recon_lw/matching/recon_ob_cross_stream.py @@ -232,7 +232,9 @@ def ob_compare_interpret_match_aggr(match, custom_settings, create_event, save_e "book_id": match[0]["body"]["book_id"], "time_of_event": match[0]["body"]["time_of_event"], "limit_v2": match[0]["body"]["aggr_seq"]["limit_v2"], - "errors": comp_res})) + "errors": comp_res, + "full_meta" : match[0]["body"].get("meta"), + "aggr_meta" : match[1]["body"].get("meta")})) else: events_to_store.append(create_event( "23:match", @@ -244,7 +246,9 @@ def ob_compare_interpret_match_aggr(match, custom_settings, create_event, save_e "aggr_book_scope": match[1]["scope"], "book_id": match[0]["body"]["book_id"], "time_of_event": match[0]["body"]["time_of_event"], - "top_v2": match[0]["body"]["aggr_seq"]["top_v2"]})) + "top_v2": match[0]["body"]["aggr_seq"]["top_v2"], + "full_meta" : match[0]["body"].get("meta"), + "aggr_meta" : match[1]["body"].get("meta")})) save_events(events_to_store) elif match[0] is not None: tech_info = ob_compare_get_timestamp_key1_key2_aggr(match[0], custom_settings) @@ -257,6 +261,7 @@ def ob_compare_interpret_match_aggr(match, custom_settings, create_event, save_e "time_of_event": match[0]["body"]["time_of_event"], "limit_v2": match[0]["body"]["aggr_seq"]["limit_v2"], "sessionId": match[0]["body"]["sessionId"], + "full_meta" : match[0]["body"].get("meta"), "tech_info": tech_info}) save_events([error_event]) elif match[1] is not None: @@ -269,6 +274,7 @@ def ob_compare_interpret_match_aggr(match, custom_settings, create_event, save_e "book_id": match[1]["body"]["book_id"], "time_of_event": match[1]["body"]["time_of_event"], "limit_v2": match[1]["body"]["aggr_seq"]["limit_v2"], + "aggr_meta" : match[1]["body"].get("meta"), "sessionId": match[1]["body"]["sessionId"]}) save_events([error_event]) From e967fb83adbd6841994c1aafc11399db981e78ce Mon Sep 17 00:00:00 2001 From: Alexey Zverev <74251321+alexeyzverev@users.noreply.github.com> Date: Wed, 29 May 2024 21:08:22 +0100 Subject: [PATCH 2/9] Update recon_ob.py add l to l3 book objects --- recon_lw/matching/recon_ob.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/recon_lw/matching/recon_ob.py b/recon_lw/matching/recon_ob.py index b124868..81fb4c4 100644 --- a/recon_lw/matching/recon_ob.py +++ b/recon_lw/matching/recon_ob.py @@ -161,6 +161,7 @@ def process_operations_batch(operations_batch, events, book_id, book, check_book obs[-1]["aggr_seq"]["limit_delta"] = 0 if limit_not_affected and obs[-1]["aggr_seq"][ "limit_delta"] == 0 else 1 obs[-1]["aggr_seq"]["limit_v2"] = updated_v2 + obs[-1]["aggr_seq"]["l"] = min(ob["aggr_seq"]["l"] for ob in obs) else: updated_limit_v2 = 0 updated_top_v2 = 0 @@ -498,18 +499,18 @@ def ob_copy(b): def init_aggr_seq(order_book: dict) -> None: - order_book["aggr_seq"] = {"top_delta": 0, "limit_delta": 0} + order_book["aggr_seq"] = {"top_delta": 0, "limit_delta": 0, "l": -1} order_book["implied_only"] = False def reset_aggr_seq(order_book): - order_book["aggr_seq"].update({"top_delta": 0, "limit_delta": 0}) + order_book["aggr_seq"].update({"top_delta": 0, "limit_delta": 0, "l": -1}) order_book["implied_only"] = False def reflect_price_update_in_version(side: str, price: float, str_time_of_event, order_book: dict): level = get_price_level(side, price, order_book) - + order_book["aggr_seq"]["l"] = level max_levels = order_book["aggr_max_levels"] if level <= max_levels: order_book["aggr_seq"]["limit_delta"] = 1 @@ -657,6 +658,8 @@ def ob_clean_book(str_time_of_event, order_book: dict) -> tuple: update_time_and_version(str_time_of_event, order_book) order_book["aggr_seq"]["limit_delta"] = 1 order_book["aggr_seq"]["top_delta"] = 1 + order_book["aggr_seq"]["l"] = 1 + # return {}, [copy.deepcopy(order_book)] return {}, [ob_copy(order_book)] @@ -672,6 +675,7 @@ def ob_change_status(new_status: str, str_time_of_event, condition: str, order_b update_time_and_version(str_time_of_event, order_book) order_book["aggr_seq"]["limit_delta"] = 1 order_book["aggr_seq"]["top_delta"] = 1 + order_book["aggr_seq"]["l"] = 1 # return {}, [copy.deepcopy(order_book)] return {}, [ob_copy(order_book)] From e857170572d2c13440dd616019b92eb180cc3701 Mon Sep 17 00:00:00 2001 From: Alexey Zverev <74251321+alexeyzverev@users.noreply.github.com> Date: Wed, 29 May 2024 22:20:05 +0100 Subject: [PATCH 3/9] Update recon_ob_cross_stream.py add limit_level to ob_cross --- recon_lw/matching/recon_ob_cross_stream.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/recon_lw/matching/recon_ob_cross_stream.py b/recon_lw/matching/recon_ob_cross_stream.py index e3d030f..a805776 100644 --- a/recon_lw/matching/recon_ob_cross_stream.py +++ b/recon_lw/matching/recon_ob_cross_stream.py @@ -171,7 +171,11 @@ def compare_full_vs_top(full_book: dict, top_book: dict): def ob_compare_get_timestamp_key1_key2_aggr(o, custom_settings): if o["body"]["aggr_seq"]["limit_delta"] not in [1, 2]: return None, None, None + if o["body"]["sessionId"] == custom_settings["full_session"]: + if 'level_limit' in custom_settings: + if o["body"]["aggr_seq"]["l"] > custom_settings['level_limit']: + return None, None, None return o["body"]["timestamp"], "{0}_{1}_{2}".format(o["body"]["book_id"], o["body"]["time_of_event"], o["body"]["aggr_seq"]["limit_v2"]), None @@ -418,11 +422,14 @@ def ob_compare_streams(source_events_path: pathlib.PosixPath, results_path: path full_session = rule_params["full_session"] if "aggr_session" in rule_params: aggr_session = rule_params["aggr_session"] + custom_settings = {"full_session": full_session, "comp_session": aggr_session} + if 'level_limit' in rule_params: + custom_settings['level_limit'] = rule_params['level_limit'] processor_aggr = TimeCacheMatcher( rule_params["horizon_delay"], ob_compare_get_timestamp_key1_key2_aggr, ob_compare_interpret_match_aggr, - {"full_session": full_session, "comp_session": aggr_session}, + custom_settings, lambda name, ev_type, ok, body: events_saver.create_event( name, ev_type, ok, body, parentId=rule_root_event["eventId"]), lambda ev_batch: events_saver.save_events(ev_batch) From 0f5236e76f0f028c8cf4982a6e5caa20857cc1ac Mon Sep 17 00:00:00 2001 From: "denis.plotnikov" Date: Mon, 3 Jun 2024 13:14:50 +0300 Subject: [PATCH 4/9] Optional initial snapshot generator for StateStream --- recon_lw/core/StateStream.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/recon_lw/core/StateStream.py b/recon_lw/core/StateStream.py index 8c995b0..37b1e02 100644 --- a/recon_lw/core/StateStream.py +++ b/recon_lw/core/StateStream.py @@ -19,7 +19,8 @@ def __init__(self, state_transition_func, events_saver: IEventsSaver, combine_instantenious_snapshots=True, - get_next_update_func2 = None + get_next_update_func2 = None, + initial_snapshots_func = None ) -> None: """ @@ -37,8 +38,13 @@ def __init__(self, self._combine_instantenious_snapshots = combine_instantenious_snapshots self._events_saver = events_saver self._get_next_update_func2 = get_next_update_func2 + self._initial_snapshots_func = initial_snapshots_func def state_updates(self, stream: Iterable, snapshots_collection): + if self._initial_snapshots_func is not None: + for key, ts, state in self._initial_snapshots_func(): + yield (key, ts, 'c', state) + if self._get_next_update_func2 is None: for o in stream: key, ts, action, state = self._get_next_update_func(o) From 21b8078547bb5f76900d994f8fd71e31711a63b4 Mon Sep 17 00:00:00 2001 From: Alexey Zverev <74251321+alexeyzverev@users.noreply.github.com> Date: Mon, 3 Jun 2024 14:40:05 +0100 Subject: [PATCH 5/9] Update recon_ob_cross_stream.py add streams marker to output --- recon_lw/matching/recon_ob_cross_stream.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/recon_lw/matching/recon_ob_cross_stream.py b/recon_lw/matching/recon_ob_cross_stream.py index a805776..efc91e3 100644 --- a/recon_lw/matching/recon_ob_cross_stream.py +++ b/recon_lw/matching/recon_ob_cross_stream.py @@ -223,6 +223,7 @@ def ob_compare_get_timestamp_key1_key2_top_aggr(o, custom_settings): def ob_compare_interpret_match_aggr(match, custom_settings, create_event, save_events): + streams = f'{custom_settings["aggr_session"]}:{custom_settings["full_session"]}' if match[0] is not None and match[1] is not None: comp_res = compare_full_vs_aggr(match[0]["body"], match[1]["body"]) events_to_store = [] @@ -237,6 +238,7 @@ def ob_compare_interpret_match_aggr(match, custom_settings, create_event, save_e "time_of_event": match[0]["body"]["time_of_event"], "limit_v2": match[0]["body"]["aggr_seq"]["limit_v2"], "errors": comp_res, + "streams": streams, "full_meta" : match[0]["body"].get("meta"), "aggr_meta" : match[1]["body"].get("meta")})) else: @@ -250,7 +252,8 @@ def ob_compare_interpret_match_aggr(match, custom_settings, create_event, save_e "aggr_book_scope": match[1]["scope"], "book_id": match[0]["body"]["book_id"], "time_of_event": match[0]["body"]["time_of_event"], - "top_v2": match[0]["body"]["aggr_seq"]["top_v2"], + "limit_v2": match[0]["body"]["aggr_seq"]["limit_v2"], + "streams": streams, "full_meta" : match[0]["body"].get("meta"), "aggr_meta" : match[1]["body"].get("meta")})) save_events(events_to_store) @@ -264,6 +267,7 @@ def ob_compare_interpret_match_aggr(match, custom_settings, create_event, save_e "book_id": match[0]["body"]["book_id"], "time_of_event": match[0]["body"]["time_of_event"], "limit_v2": match[0]["body"]["aggr_seq"]["limit_v2"], + "streams": streams, "sessionId": match[0]["body"]["sessionId"], "full_meta" : match[0]["body"].get("meta"), "tech_info": tech_info}) @@ -278,6 +282,7 @@ def ob_compare_interpret_match_aggr(match, custom_settings, create_event, save_e "book_id": match[1]["body"]["book_id"], "time_of_event": match[1]["body"]["time_of_event"], "limit_v2": match[1]["body"]["aggr_seq"]["limit_v2"], + "streams": streams, "aggr_meta" : match[1]["body"].get("meta"), "sessionId": match[1]["body"]["sessionId"]}) save_events([error_event]) From 501aaf80595cdcac50947283c62d858e59ebd2fe Mon Sep 17 00:00:00 2001 From: Alexey Zverev <74251321+alexeyzverev@users.noreply.github.com> Date: Mon, 3 Jun 2024 15:03:46 +0100 Subject: [PATCH 6/9] Update recon_ob_cross_stream.py fix streams marker --- recon_lw/matching/recon_ob_cross_stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/recon_lw/matching/recon_ob_cross_stream.py b/recon_lw/matching/recon_ob_cross_stream.py index efc91e3..5c4196a 100644 --- a/recon_lw/matching/recon_ob_cross_stream.py +++ b/recon_lw/matching/recon_ob_cross_stream.py @@ -223,7 +223,7 @@ def ob_compare_get_timestamp_key1_key2_top_aggr(o, custom_settings): def ob_compare_interpret_match_aggr(match, custom_settings, create_event, save_events): - streams = f'{custom_settings["aggr_session"]}:{custom_settings["full_session"]}' + streams = f'{custom_settings["comp_session"]}:{custom_settings["full_session"]}' if match[0] is not None and match[1] is not None: comp_res = compare_full_vs_aggr(match[0]["body"], match[1]["body"]) events_to_store = [] From a95b0aa49fe1dbda6a41fb9354a3fab44889ad61 Mon Sep 17 00:00:00 2001 From: "denis.plotnikov" Date: Wed, 12 Jun 2024 13:56:03 +0300 Subject: [PATCH 7/9] Add ability to keep multiple message cache copies when multiple keys used --- package_info.json | 2 +- recon_lw/core/utility/recon_utils.py | 29 +++++++++++++++++++++++++++- recon_lw/matching/old/matching.py | 25 ++++++++++++++++-------- recon_lw/matching/old/utils.py | 11 ++++++++--- 4 files changed, 54 insertions(+), 13 deletions(-) diff --git a/package_info.json b/package_info.json index bb2ec45..8315f93 100644 --- a/package_info.json +++ b/package_info.json @@ -1,5 +1,5 @@ { "package_name": "recon-lw", - "package_version": "3.1.0" + "package_version": "3.1.1" } diff --git a/recon_lw/core/utility/recon_utils.py b/recon_lw/core/utility/recon_utils.py index 33cf65c..e3d642b 100644 --- a/recon_lw/core/utility/recon_utils.py +++ b/recon_lw/core/utility/recon_utils.py @@ -1,3 +1,5 @@ +from dataclasses import dataclass + from th2_data_services.config import options from datetime import datetime from recon_lw.core.message_utils import message_to_dict @@ -8,6 +10,30 @@ from os import listdir from os import path + +@dataclass +class CachedMessage: + message: dict + count: int = 1 + +def message_cache_add_with_copies(m, message_cache): + mid = options.mfr.get_id(m) + if mid not in message_cache: + message_cache[mid] = CachedMessage(m, 1) + else: + message_cache[mid].count += 1 + +def message_cache_pop_with_copies(m_id, message_cache): + if m_id is None: + return None + r = message_cache.get(m_id) + if not r: + return None + if r.count == 1: + return message_cache.pop(m_id).message + r.count -= 1 + return r.message + def time_index_add(key, m, time_index): time_index.add((options.mfr.get_timestamp(m), key)) @@ -32,6 +58,7 @@ def create_event( body=None, parentId=None, recon_name='', + attached_message_ids=[] ): # TODO - description is required. ts = datetime.now() @@ -43,7 +70,7 @@ def create_event( "body": body, "parentEventId": parentId, "startTimestamp": {"epochSecond": int(ts.timestamp()), "nano": ts.microsecond * 1000}, - "attachedMessageIds": []} + "attachedMessageIds": attached_message_ids} return e def simplify_message(m): diff --git a/recon_lw/matching/old/matching.py b/recon_lw/matching/old/matching.py index eed986c..629395e 100644 --- a/recon_lw/matching/old/matching.py +++ b/recon_lw/matching/old/matching.py @@ -4,7 +4,7 @@ from th2_data_services.config import options from recon_lw.core.ts_converters import time_stamp_key -from recon_lw.core.utility import time_index_add, message_cache_add +from recon_lw.core.utility import time_index_add, message_cache_add, message_cache_add_with_copies from recon_lw.matching.old.utils import rule_flush @@ -29,7 +29,8 @@ def flush_matcher(ts, rule_settings, event_sequence: dict, save_events_func): event_sequence, save_events_func, rule_settings["rule_root_event"], - rule_settings["live_orders_cache"] if "live_orders_cache" in rule_settings else None) + rule_settings["live_orders_cache"] if "live_orders_cache" in rule_settings else None, + keep_copies_for_same_m_id=rule_settings.get('keep_copies_for_same_m_id', False)) def one_many_match(next_batch, rule_dict): """ @@ -56,6 +57,12 @@ def one_many_match(next_batch, rule_dict): message_cache = rule_dict["message_cache"] first_key_func = rule_dict["first_key_func"] second_key_func = rule_dict["second_key_func"] + trace_duplicate_messages = rule_dict.get('trace_duplicate_messages', True) + keep_copies_for_same_m_id = rule_dict.get('keep_copies_for_same_m_id', False) + + message_cache_add_func = message_cache_add + if keep_copies_for_same_m_id: + message_cache_add_func = message_cache_add_with_copies n_duplicates = 0 for m in next_batch: @@ -64,10 +71,12 @@ def one_many_match(next_batch, rule_dict): if first_keys is not None: match_index_element = [message_id, None] for first_key in first_keys: + if keep_copies_for_same_m_id: + match_index_element = [message_id, None] if first_key not in match_index: match_index[first_key] = match_index_element time_index_add(first_key, m, time_index) - message_cache_add(m, message_cache) + message_cache_add_func(m, message_cache) continue else: existing = match_index[first_key] @@ -75,25 +84,25 @@ def one_many_match(next_batch, rule_dict): n_duplicates += 1 else: existing[0] = message_id - message_cache_add(m, message_cache) + message_cache_add_func(m, message_cache) continue second_key = second_key_func(m) if second_key is not None: if second_key not in match_index: match_index[second_key] = [None, message_id] time_index_add(second_key, m, time_index) - message_cache_add(m, message_cache) + message_cache_add_func(m, message_cache) else: existing = match_index[second_key] if existing[1] is None: # existing[1] - stream 2 message ID existing[1] = message_id # match_index[second_key] = [existing[0], message_id] - message_cache_add(m, message_cache) + message_cache_add_func(m, message_cache) else: existing.append(message_id) - message_cache_add(m, message_cache) + message_cache_add_func(m, message_cache) - if n_duplicates > 0: + if n_duplicates > 0 and trace_duplicate_messages: print(n_duplicates, " duplicates detected") def pair_one_match(next_batch, rule_dict): diff --git a/recon_lw/matching/old/utils.py b/recon_lw/matching/old/utils.py index ea54c30..c4299ef 100644 --- a/recon_lw/matching/old/utils.py +++ b/recon_lw/matching/old/utils.py @@ -1,5 +1,5 @@ from recon_lw.core.ts_converters import time_stamp_key -from recon_lw.core.utility import message_cache_pop +from recon_lw.core.utility import message_cache_pop, message_cache_pop_with_copies def flush_old(current_ts, horizon_delay, time_index): @@ -21,8 +21,13 @@ def flush_old(current_ts, horizon_delay, time_index): def rule_flush(current_ts, horizon_delay, match_index: dict, time_index, message_cache, interpret_func, event_sequence: dict, send_events_func, - parent_event, live_orders_cache): + parent_event, live_orders_cache, keep_copies_for_same_m_id=False): old_keys = flush_old(current_ts, horizon_delay, time_index) + + cache_pop_func = message_cache_pop + if keep_copies_for_same_m_id: + cache_pop_func = message_cache_pop_with_copies + events = [] for match_key in old_keys: elem = match_index.pop(match_key) # elem -- can have 2 or 3 elements inside @@ -35,7 +40,7 @@ def rule_flush(current_ts, horizon_delay, match_index: dict, time_index, message # arg1 - ?? # arg2 - EventSequence results = interpret_func( - [message_cache_pop(item, message_cache) for item in elem], + [cache_pop_func(item, message_cache) for item in elem], live_orders_cache, event_sequence ) From 18abd25fce09a19848f5dd94fcab567a9116ca6c Mon Sep 17 00:00:00 2001 From: "denis.plotnikov" Date: Wed, 12 Jun 2024 14:25:21 +0300 Subject: [PATCH 8/9] Using json dumps only for dicts --- recon_lw/reporting/match_diff/categorizer/basic.py | 1 + .../reporting/match_diff/viewer/category_displayer.py | 9 +++++++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/recon_lw/reporting/match_diff/categorizer/basic.py b/recon_lw/reporting/match_diff/categorizer/basic.py index ddf2090..0e1d405 100644 --- a/recon_lw/reporting/match_diff/categorizer/basic.py +++ b/recon_lw/reporting/match_diff/categorizer/basic.py @@ -103,6 +103,7 @@ def process_event( # TODO: # event['body']['diff'] -- diff here is actually `diffs` - list of diff + print('here') for diff in event['body']['diff']: category = self.error_extractor_strategy.diff_category_extractor( recon_name, diff, event) diff --git a/recon_lw/reporting/match_diff/viewer/category_displayer.py b/recon_lw/reporting/match_diff/viewer/category_displayer.py index becfa71..2380e23 100644 --- a/recon_lw/reporting/match_diff/viewer/category_displayer.py +++ b/recon_lw/reporting/match_diff/viewer/category_displayer.py @@ -150,9 +150,14 @@ def _get_example_td(example_data: MatchDiffExampleData, item_id: str): if isinstance(example_data.message_content, list): code_mc = '' for mc in example_data.message_content: - code_mc += f'
{json.dumps(mc, indent=4)}
' - else: + if isinstance(mc, dict): + code_mc += f'
{json.dumps(mc, indent=4)}
' + else: + code_mc += f'
{mc}
' + elif isinstance(example_data.message_content, dict): code_mc = f'{json.dumps(example_data.message_content, indent=4)}' + else: + code_mc = f'{example_data.message_content}' return f''' From cf165ab9dbdf9975af8eb1c3a43bd05e39fa2fad Mon Sep 17 00:00:00 2001 From: "denis.plotnikov" Date: Sun, 16 Jun 2024 21:50:53 +0300 Subject: [PATCH 9/9] Remove print statement --- recon_lw/reporting/match_diff/categorizer/basic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/recon_lw/reporting/match_diff/categorizer/basic.py b/recon_lw/reporting/match_diff/categorizer/basic.py index 0e1d405..2624615 100644 --- a/recon_lw/reporting/match_diff/categorizer/basic.py +++ b/recon_lw/reporting/match_diff/categorizer/basic.py @@ -103,7 +103,7 @@ def process_event( # TODO: # event['body']['diff'] -- diff here is actually `diffs` - list of diff - print('here') + for diff in event['body']['diff']: category = self.error_extractor_strategy.diff_category_extractor( recon_name, diff, event)