Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/dev3' into dev3
Browse files Browse the repository at this point in the history
# Conflicts:
#	recon_lw/core/StateStream.py
  • Loading branch information
ConnectDIY committed Jun 17, 2024
2 parents be8f789 + cf165ab commit e0ff9de
Show file tree
Hide file tree
Showing 9 changed files with 97 additions and 22 deletions.
2 changes: 1 addition & 1 deletion package_info.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"package_name": "recon-lw",
"package_version": "3.1.0"
"package_version": "3.1.1"
}

8 changes: 7 additions & 1 deletion recon_lw/core/StateStream.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ def __init__(self,
state_transition_func,
events_saver: IEventsSaver,
combine_instantenious_snapshots=True,
get_next_update_func2=None
get_next_update_func2 = None,
initial_snapshots_func = None
) -> None:
"""
Expand All @@ -43,8 +44,13 @@ def __init__(self,
self._combine_instantenious_snapshots = combine_instantenious_snapshots
self._events_saver = events_saver
self._get_next_update_func2 = get_next_update_func2
self._initial_snapshots_func = initial_snapshots_func

def state_updates(self, stream: Iterable, snapshots_collection):
if self._initial_snapshots_func is not None:
for key, ts, state in self._initial_snapshots_func():
yield (key, ts, 'c', state)

if self._get_next_update_func2 is None:
for o in stream:
key, ts, action, state = self._get_next_update_func(o)
Expand Down
29 changes: 28 additions & 1 deletion recon_lw/core/utility/recon_utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from dataclasses import dataclass

from th2_data_services.config import options
from datetime import datetime
from recon_lw.core.message_utils import message_to_dict
Expand All @@ -8,6 +10,30 @@
from os import listdir
from os import path


@dataclass
class CachedMessage:
message: dict
count: int = 1

def message_cache_add_with_copies(m, message_cache):
mid = options.mfr.get_id(m)
if mid not in message_cache:
message_cache[mid] = CachedMessage(m, 1)
else:
message_cache[mid].count += 1

def message_cache_pop_with_copies(m_id, message_cache):
if m_id is None:
return None
r = message_cache.get(m_id)
if not r:
return None
if r.count == 1:
return message_cache.pop(m_id).message
r.count -= 1
return r.message

def time_index_add(key, m, time_index):
time_index.add((options.mfr.get_timestamp(m), key))

Expand All @@ -32,6 +58,7 @@ def create_event(
body=None,
parentId=None,
recon_name='',
attached_message_ids=[]
):
# TODO - description is required.
ts = datetime.now()
Expand All @@ -43,7 +70,7 @@ def create_event(
"body": body,
"parentEventId": parentId,
"startTimestamp": {"epochSecond": int(ts.timestamp()), "nano": ts.microsecond * 1000},
"attachedMessageIds": []}
"attachedMessageIds": attached_message_ids}
return e

def simplify_message(m):
Expand Down
25 changes: 17 additions & 8 deletions recon_lw/matching/old/matching.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from th2_data_services.config import options

from recon_lw.core.ts_converters import time_stamp_key
from recon_lw.core.utility import time_index_add, message_cache_add
from recon_lw.core.utility import time_index_add, message_cache_add, message_cache_add_with_copies
from recon_lw.matching.old.utils import rule_flush


Expand All @@ -29,7 +29,8 @@ def flush_matcher(ts, rule_settings, event_sequence: dict, save_events_func):
event_sequence,
save_events_func,
rule_settings["rule_root_event"],
rule_settings["live_orders_cache"] if "live_orders_cache" in rule_settings else None)
rule_settings["live_orders_cache"] if "live_orders_cache" in rule_settings else None,
keep_copies_for_same_m_id=rule_settings.get('keep_copies_for_same_m_id', False))

def one_many_match(next_batch, rule_dict):
"""
Expand All @@ -56,6 +57,12 @@ def one_many_match(next_batch, rule_dict):
message_cache = rule_dict["message_cache"]
first_key_func = rule_dict["first_key_func"]
second_key_func = rule_dict["second_key_func"]
trace_duplicate_messages = rule_dict.get('trace_duplicate_messages', True)
keep_copies_for_same_m_id = rule_dict.get('keep_copies_for_same_m_id', False)

message_cache_add_func = message_cache_add
if keep_copies_for_same_m_id:
message_cache_add_func = message_cache_add_with_copies

n_duplicates = 0
for m in next_batch:
Expand All @@ -64,36 +71,38 @@ def one_many_match(next_batch, rule_dict):
if first_keys is not None:
match_index_element = [message_id, None]
for first_key in first_keys:
if keep_copies_for_same_m_id:
match_index_element = [message_id, None]
if first_key not in match_index:
match_index[first_key] = match_index_element
time_index_add(first_key, m, time_index)
message_cache_add(m, message_cache)
message_cache_add_func(m, message_cache)
continue
else:
existing = match_index[first_key]
if existing[0] is not None:
n_duplicates += 1
else:
existing[0] = message_id
message_cache_add(m, message_cache)
message_cache_add_func(m, message_cache)
continue
second_key = second_key_func(m)
if second_key is not None:
if second_key not in match_index:
match_index[second_key] = [None, message_id]
time_index_add(second_key, m, time_index)
message_cache_add(m, message_cache)
message_cache_add_func(m, message_cache)
else:
existing = match_index[second_key]
if existing[1] is None: # existing[1] - stream 2 message ID
existing[1] = message_id
# match_index[second_key] = [existing[0], message_id]
message_cache_add(m, message_cache)
message_cache_add_func(m, message_cache)
else:
existing.append(message_id)
message_cache_add(m, message_cache)
message_cache_add_func(m, message_cache)

if n_duplicates > 0:
if n_duplicates > 0 and trace_duplicate_messages:
print(n_duplicates, " duplicates detected")

def pair_one_match(next_batch, rule_dict):
Expand Down
11 changes: 8 additions & 3 deletions recon_lw/matching/old/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from recon_lw.core.ts_converters import time_stamp_key
from recon_lw.core.utility import message_cache_pop
from recon_lw.core.utility import message_cache_pop, message_cache_pop_with_copies


def flush_old(current_ts, horizon_delay, time_index):
Expand All @@ -21,8 +21,13 @@ def flush_old(current_ts, horizon_delay, time_index):

def rule_flush(current_ts, horizon_delay, match_index: dict, time_index, message_cache,
interpret_func, event_sequence: dict, send_events_func,
parent_event, live_orders_cache):
parent_event, live_orders_cache, keep_copies_for_same_m_id=False):
old_keys = flush_old(current_ts, horizon_delay, time_index)

cache_pop_func = message_cache_pop
if keep_copies_for_same_m_id:
cache_pop_func = message_cache_pop_with_copies

events = []
for match_key in old_keys:
elem = match_index.pop(match_key) # elem -- can have 2 or 3 elements inside
Expand All @@ -35,7 +40,7 @@ def rule_flush(current_ts, horizon_delay, match_index: dict, time_index, message
# arg1 - ??
# arg2 - EventSequence
results = interpret_func(
[message_cache_pop(item, message_cache) for item in elem],
[cache_pop_func(item, message_cache) for item in elem],
live_orders_cache,
event_sequence
)
Expand Down
10 changes: 7 additions & 3 deletions recon_lw/matching/recon_ob.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ def process_operations_batch(operations_batch, events, book_id, book, check_book
obs[-1]["aggr_seq"]["limit_delta"] = 0 if limit_not_affected and obs[-1]["aggr_seq"][
"limit_delta"] == 0 else 1
obs[-1]["aggr_seq"]["limit_v2"] = updated_v2
obs[-1]["aggr_seq"]["l"] = min(ob["aggr_seq"]["l"] for ob in obs)
else:
updated_limit_v2 = 0
updated_top_v2 = 0
Expand Down Expand Up @@ -498,18 +499,18 @@ def ob_copy(b):


def init_aggr_seq(order_book: dict) -> None:
order_book["aggr_seq"] = {"top_delta": 0, "limit_delta": 0}
order_book["aggr_seq"] = {"top_delta": 0, "limit_delta": 0, "l": -1}
order_book["implied_only"] = False


def reset_aggr_seq(order_book):
order_book["aggr_seq"].update({"top_delta": 0, "limit_delta": 0})
order_book["aggr_seq"].update({"top_delta": 0, "limit_delta": 0, "l": -1})
order_book["implied_only"] = False


def reflect_price_update_in_version(side: str, price: float, str_time_of_event, order_book: dict):
level = get_price_level(side, price, order_book)

order_book["aggr_seq"]["l"] = level
max_levels = order_book["aggr_max_levels"]
if level <= max_levels:
order_book["aggr_seq"]["limit_delta"] = 1
Expand Down Expand Up @@ -657,6 +658,8 @@ def ob_clean_book(str_time_of_event, order_book: dict) -> tuple:
update_time_and_version(str_time_of_event, order_book)
order_book["aggr_seq"]["limit_delta"] = 1
order_book["aggr_seq"]["top_delta"] = 1
order_book["aggr_seq"]["l"] = 1

# return {}, [copy.deepcopy(order_book)]
return {}, [ob_copy(order_book)]

Expand All @@ -672,6 +675,7 @@ def ob_change_status(new_status: str, str_time_of_event, condition: str, order_b
update_time_and_version(str_time_of_event, order_book)
order_book["aggr_seq"]["limit_delta"] = 1
order_book["aggr_seq"]["top_delta"] = 1
order_book["aggr_seq"]["l"] = 1
# return {}, [copy.deepcopy(order_book)]
return {}, [ob_copy(order_book)]

Expand Down
24 changes: 21 additions & 3 deletions recon_lw/matching/recon_ob_cross_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,11 @@ def compare_full_vs_top(full_book: dict, top_book: dict):
def ob_compare_get_timestamp_key1_key2_aggr(o, custom_settings):
if o["body"]["aggr_seq"]["limit_delta"] not in [1, 2]:
return None, None, None

if o["body"]["sessionId"] == custom_settings["full_session"]:
if 'level_limit' in custom_settings:
if o["body"]["aggr_seq"]["l"] > custom_settings['level_limit']:
return None, None, None
return o["body"]["timestamp"], "{0}_{1}_{2}".format(o["body"]["book_id"],
o["body"]["time_of_event"],
o["body"]["aggr_seq"]["limit_v2"]), None
Expand Down Expand Up @@ -219,6 +223,7 @@ def ob_compare_get_timestamp_key1_key2_top_aggr(o, custom_settings):


def ob_compare_interpret_match_aggr(match, custom_settings, create_event, save_events):
streams = f'{custom_settings["comp_session"]}:{custom_settings["full_session"]}'
if match[0] is not None and match[1] is not None:
comp_res = compare_full_vs_aggr(match[0]["body"], match[1]["body"])
events_to_store = []
Expand All @@ -232,7 +237,10 @@ def ob_compare_interpret_match_aggr(match, custom_settings, create_event, save_e
"book_id": match[0]["body"]["book_id"],
"time_of_event": match[0]["body"]["time_of_event"],
"limit_v2": match[0]["body"]["aggr_seq"]["limit_v2"],
"errors": comp_res}))
"errors": comp_res,
"streams": streams,
"full_meta" : match[0]["body"].get("meta"),
"aggr_meta" : match[1]["body"].get("meta")}))
else:
events_to_store.append(create_event(
"23:match",
Expand All @@ -244,7 +252,10 @@ def ob_compare_interpret_match_aggr(match, custom_settings, create_event, save_e
"aggr_book_scope": match[1]["scope"],
"book_id": match[0]["body"]["book_id"],
"time_of_event": match[0]["body"]["time_of_event"],
"top_v2": match[0]["body"]["aggr_seq"]["top_v2"]}))
"limit_v2": match[0]["body"]["aggr_seq"]["limit_v2"],
"streams": streams,
"full_meta" : match[0]["body"].get("meta"),
"aggr_meta" : match[1]["body"].get("meta")}))
save_events(events_to_store)
elif match[0] is not None:
tech_info = ob_compare_get_timestamp_key1_key2_aggr(match[0], custom_settings)
Expand All @@ -256,7 +267,9 @@ def ob_compare_interpret_match_aggr(match, custom_settings, create_event, save_e
"book_id": match[0]["body"]["book_id"],
"time_of_event": match[0]["body"]["time_of_event"],
"limit_v2": match[0]["body"]["aggr_seq"]["limit_v2"],
"streams": streams,
"sessionId": match[0]["body"]["sessionId"],
"full_meta" : match[0]["body"].get("meta"),
"tech_info": tech_info})
save_events([error_event])
elif match[1] is not None:
Expand All @@ -269,6 +282,8 @@ def ob_compare_interpret_match_aggr(match, custom_settings, create_event, save_e
"book_id": match[1]["body"]["book_id"],
"time_of_event": match[1]["body"]["time_of_event"],
"limit_v2": match[1]["body"]["aggr_seq"]["limit_v2"],
"streams": streams,
"aggr_meta" : match[1]["body"].get("meta"),
"sessionId": match[1]["body"]["sessionId"]})
save_events([error_event])

Expand Down Expand Up @@ -412,11 +427,14 @@ def ob_compare_streams(source_events_path: pathlib.PosixPath, results_path: path
full_session = rule_params["full_session"]
if "aggr_session" in rule_params:
aggr_session = rule_params["aggr_session"]
custom_settings = {"full_session": full_session, "comp_session": aggr_session}
if 'level_limit' in rule_params:
custom_settings['level_limit'] = rule_params['level_limit']
processor_aggr = TimeCacheMatcher(
rule_params["horizon_delay"],
ob_compare_get_timestamp_key1_key2_aggr,
ob_compare_interpret_match_aggr,
{"full_session": full_session, "comp_session": aggr_session},
custom_settings,
lambda name, ev_type, ok, body: events_saver.create_event(
name, ev_type, ok, body, parentId=rule_root_event["eventId"]),
lambda ev_batch: events_saver.save_events(ev_batch)
Expand Down
1 change: 1 addition & 0 deletions recon_lw/reporting/match_diff/categorizer/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ def process_event(

# TODO:
# event['body']['diff'] -- diff here is actually `diffs` - list of diff

for diff in event['body']['diff']:
category = self.error_extractor_strategy.diff_category_extractor(
recon_name, diff, event)
Expand Down
9 changes: 7 additions & 2 deletions recon_lw/reporting/match_diff/viewer/category_displayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,14 @@ def _get_example_td(example_data: MatchDiffExampleData, item_id: str):
if isinstance(example_data.message_content, list):
code_mc = ''
for mc in example_data.message_content:
code_mc += f'<div><code id="code">{json.dumps(mc, indent=4)}</code></div>'
else:
if isinstance(mc, dict):
code_mc += f'<div><code id="code">{json.dumps(mc, indent=4)}</code></div>'
else:
code_mc += f'<div><code id="code">{mc}</code></div>'
elif isinstance(example_data.message_content, dict):
code_mc = f'<code id="code">{json.dumps(example_data.message_content, indent=4)}</code>'
else:
code_mc = f'<code id="code">{example_data.message_content}</code>'

return f'''
<td style="text-align: left; vertical-align: top">
Expand Down

0 comments on commit e0ff9de

Please sign in to comment.