Skip to content

Commit

Permalink
Merge pull request #189 from reportportal/EPMRPP-96427_component_upda…
Browse files Browse the repository at this point in the history
…te_and_fixes

[EPMRPP-96427] Component update and fixes
  • Loading branch information
HardNorth authored Nov 27, 2024
2 parents 8c56f90 + 92e9377 commit 54c8a40
Show file tree
Hide file tree
Showing 68 changed files with 1,120 additions and 863 deletions.
6 changes: 3 additions & 3 deletions app/commons/clusterizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from app.commons import logging
from app.utils import utils, text_processing

logger = logging.getLogger("analyzerApp.clusterizer")
LOGGER = logging.getLogger("analyzerApp.clusterizer")


class Clusterizer:
Expand Down Expand Up @@ -65,7 +65,7 @@ def find_groups_by_similarity(
rearranged_groups[cluster].append(real_id)
new_group_id += 1
group_id = new_group_id
logger.debug("Time for finding groups: %.2f s", time() - start_time)
LOGGER.debug("Time for finding groups: %.2f s", time() - start_time)
return rearranged_groups

def similarity_groupping(
Expand Down Expand Up @@ -125,7 +125,7 @@ def unite_groups_by_hashes(self, messages: list[str], threshold: float = 0.95) -
if cluster not in rearranged_groups:
rearranged_groups[cluster] = []
rearranged_groups[cluster].append(key)
logger.debug("Time for finding hash groups: %.2f s", time() - start_time)
LOGGER.debug("Time for finding hash groups: %.2f s", time() - start_time)
return rearranged_groups

def perform_light_deduplication(self, messages: list[str]) -> tuple[list[str], dict[int, list[int]]]:
Expand Down
12 changes: 3 additions & 9 deletions app/commons/esclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@
from urllib3.exceptions import InsecureRequestWarning

from app.amqp.amqp import AmqpClient
from app.commons import logging
from app.commons import logging, request_factory, log_merger
from app.commons.model.launch_objects import ApplicationConfig, Response, Launch, TestItem, BulkResponse
from app.commons.model.ml import TrainInfo, ModelType
from app.commons.log_merger import LogMerger
from app.commons.log_requests import LogRequests
from app.utils import utils, text_processing

logger = logging.getLogger("analyzerApp.esclient")
Expand All @@ -40,16 +38,12 @@ class EsClient:
app_config: ApplicationConfig
es_client: elasticsearch.Elasticsearch
host: str
log_requests: LogRequests
log_merger: LogMerger
tables_to_recreate: list[str]

def __init__(self, app_config: ApplicationConfig, es_client: elasticsearch.Elasticsearch = None):
self.app_config = app_config
self.host = app_config.esHost
self.es_client = es_client or self.create_es_client(app_config)
self.log_requests = LogRequests()
self.log_merger = LogMerger()
self.tables_to_recreate = ["rp_aa_stats", "rp_model_train_stats", "rp_suggestions_info_metrics"]

def create_es_client(self, app_config: ApplicationConfig) -> elasticsearch.Elasticsearch:
Expand Down Expand Up @@ -220,7 +214,7 @@ def _to_index_bodies(
if log.logLevel < utils.ERROR_LOGGING_LEVEL or not log.message.strip():
continue

bodies.append(LogRequests._prepare_log(launch, test_item, log, project_with_prefix))
bodies.append(request_factory.prepare_log(launch, test_item, log, project_with_prefix))
logs_added = True
if logs_added:
test_item_ids.append(str(test_item.testItemId))
Expand Down Expand Up @@ -276,7 +270,7 @@ def _merge_logs(self, test_item_ids, project):
test_items_dict[test_item_id] = []
test_items_dict[test_item_id].append(r)
for test_item_id in test_items_dict:
merged_logs, _ = self.log_merger.decompose_logs_merged_and_without_duplicates(
merged_logs, _ = log_merger.decompose_logs_merged_and_without_duplicates(
test_items_dict[test_item_id])
for log in merged_logs:
if log["_source"]["is_merged"]:
Expand Down
254 changes: 135 additions & 119 deletions app/commons/log_merger.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,125 +17,141 @@

from app.utils import text_processing

FIELDS_TO_CLEAN = ["message", "detected_message", "detected_message_with_numbers", "detected_message_extended",
"message_extended", "message_without_params_extended", "message_without_params_and_brackets",
"detected_message_without_params_and_brackets"]
FIELDS_TO_MERGE = ["message", "found_exceptions", "potential_status_codes", "found_tests_and_methods", "only_numbers",
"urls", "paths", "message_params", "detected_message_without_params_extended", "whole_message"]


def _prepare_new_log(old_log: dict[str, Any], new_id, is_merged: bool, merged_small_logs: str,
fields_to_clean: Optional[list[str]] = None) -> dict[str, Any]:
"""Prepare updated log"""
merged_log = copy.deepcopy(old_log)
merged_log["_source"]["is_merged"] = is_merged
merged_log["_id"] = new_id
merged_log["_source"]["merged_small_logs"] = merged_small_logs
if fields_to_clean:
for field in fields_to_clean:
merged_log["_source"][field] = ""
return merged_log


def merge_big_and_small_logs(
logs: list[dict[str, Any]], log_level_ids_to_add: dict[int, list[int]],
log_level_messages: dict[str, dict[int, str]], log_level_ids_merged: dict[int, dict[str, Any]],
logs_ids_in_merged_logs: dict[int, list[int]]) -> tuple[list[dict[str, Any]], dict[str, list[int]]]:
"""Merge big message logs with small ones."""
new_logs = []
for log in logs:
if not log["_source"]["message"].strip():
continue
log_level = log["_source"]["log_level"]

if log["_id"] in log_level_ids_to_add[log_level]:
merged_small_logs = text_processing.compress(log_level_messages["message"][log_level])
new_logs.append(_prepare_new_log(log, log["_id"], False, merged_small_logs))

log_ids_for_merged_logs = {}
for log_level in log_level_messages["message"]:
if not log_level_ids_to_add[log_level] and log_level_messages["message"][log_level].strip():
log = log_level_ids_merged[log_level]
merged_logs_id = str(log["_id"]) + "_m"
new_log = _prepare_new_log(
log, merged_logs_id, True, text_processing.compress(log_level_messages["message"][log_level]),
fields_to_clean=FIELDS_TO_CLEAN)
log_ids_for_merged_logs[merged_logs_id] = logs_ids_in_merged_logs[log_level]
for field in log_level_messages:
if field == "message":
continue
if field == "whole_message":
new_log["_source"][field] = log_level_messages[field][log_level]
else:
new_log["_source"][field] = text_processing.compress(
log_level_messages[field][log_level])
new_log["_source"]["found_exceptions_extended"] = text_processing.compress(
text_processing.enrich_found_exceptions(log_level_messages["found_exceptions"][log_level]))

new_logs.append(new_log)
return new_logs, log_ids_for_merged_logs


def decompose_logs_merged_and_without_duplicates(
logs: list[dict[str, Any]]) -> tuple[list[dict[str, Any]], dict[str, list[int]]]:
"""Merge big logs with small ones without duplicates."""
log_level_messages = {}
for field in FIELDS_TO_MERGE:
log_level_messages[field] = {}
log_level_ids_to_add = {}
log_level_ids_merged = {}
logs_unique_log_level = {}
logs_ids_in_merged_logs = {}

for log in logs:
source = log['_source']
if not source["message"].strip():
continue

log_level = source["log_level"]

for field in log_level_messages:
if log_level not in log_level_messages[field]:
log_level_messages[field][log_level] = ""
if log_level not in log_level_ids_to_add:
log_level_ids_to_add[log_level] = []
if log_level not in logs_unique_log_level:
logs_unique_log_level[log_level] = set()

if source['message_lines'] <= 2 and source['message_words_number'] <= 100:
if log_level not in log_level_ids_merged:
log_level_ids_merged[log_level] = log
if log_level not in logs_ids_in_merged_logs:
logs_ids_in_merged_logs[log_level] = []
logs_ids_in_merged_logs[log_level].append(log["_id"])

log_level_representative = log_level_ids_merged[log_level]
current_log_word_num = source["message_words_number"]
main_log_word_num = log_level_representative["_source"]["message_words_number"]
if current_log_word_num > main_log_word_num:
log_level_ids_merged[log_level] = log

normalized_msg = " ".join(source["message"].strip().lower().split())
if normalized_msg not in logs_unique_log_level[log_level]:
logs_unique_log_level[log_level].add(normalized_msg)

class LogMerger:
fields_to_clean: list[str]
fields_to_merge: list[str]

def __init__(self):
self.fields_to_clean = ["message", "detected_message",
"detected_message_with_numbers", "stacktrace",
"detected_message_extended",
"stacktrace_extended", "message_extended",
"message_without_params_extended",
"message_without_params_and_brackets",
"detected_message_without_params_and_brackets"]
self.fields_to_merge = ["message", "found_exceptions", "potential_status_codes",
"found_tests_and_methods", "only_numbers", "urls",
"paths", "message_params", "detected_message_without_params_extended",
"whole_message"]

def merge_big_and_small_logs(
self, logs: list[dict[str, Any]], log_level_ids_to_add: dict[int, list[int]],
log_level_messages: dict[str, dict[int, str]], log_level_ids_merged: dict[int, dict[str, Any]],
logs_ids_in_merged_logs: dict[int, list[int]]) -> tuple[list[dict[str, Any]], dict[str, list[int]]]:
"""Merge big message logs with small ones."""
new_logs = []
for log in logs:
if not log["_source"]["message"].strip():
continue
log_level = log["_source"]["log_level"]

if log["_id"] in log_level_ids_to_add[log_level]:
merged_small_logs = text_processing.compress(log_level_messages["message"][log_level])
new_logs.append(self.prepare_new_log(log, log["_id"], False, merged_small_logs))

log_ids_for_merged_logs = {}
for log_level in log_level_messages["message"]:
if not log_level_ids_to_add[log_level] and log_level_messages["message"][log_level].strip():
log = log_level_ids_merged[log_level]
merged_logs_id = str(log["_id"]) + "_m"
new_log = self.prepare_new_log(
log, merged_logs_id, True, text_processing.compress(log_level_messages["message"][log_level]),
fields_to_clean=self.fields_to_clean)
log_ids_for_merged_logs[merged_logs_id] = logs_ids_in_merged_logs[log_level]
for field in log_level_messages:
if field in ["message"]:
continue
if field in ["whole_message"]:
new_log["_source"][field] = log_level_messages[field][log_level]
else:
new_log["_source"][field] = text_processing.compress(
log_level_messages[field][log_level])
new_log["_source"]["found_exceptions_extended"] = text_processing.compress(
text_processing.enrich_found_exceptions(log_level_messages["found_exceptions"][log_level]))

new_logs.append(new_log)
return new_logs, log_ids_for_merged_logs

def decompose_logs_merged_and_without_duplicates(
self, logs: list[dict[str, Any]]) -> tuple[list[dict[str, Any]], dict[str, list[int]]]:
"""Merge big logs with small ones without duplicates."""
log_level_messages = {}
for field in self.fields_to_merge:
log_level_messages[field] = {}
log_level_ids_to_add = {}
log_level_ids_merged = {}
logs_unique_log_level = {}
logs_ids_in_merged_logs = {}

for log in logs:
if not log["_source"]["message"].strip():
if field in source:
splitter = "\n" if field in {"message", "whole_message"} else " "
log_level_messages[field][log_level] = \
log_level_messages[field][log_level] + source[field] + splitter

else:
log_level_ids_to_add[log_level].append(log["_id"])

return merge_big_and_small_logs(
logs, log_level_ids_to_add, log_level_messages, log_level_ids_merged, logs_ids_in_merged_logs)


def merge_logs(
logs: list[list[dict[str, Any]]], number_of_lines: int,
clean_numbers: bool) -> tuple[list[str], dict[int, dict[str, Any]], dict[str, list[int]]]:
full_log_ids_for_merged_logs = {}
log_messages = []
log_dict = {}
ind = 0
for prepared_log in logs:
merged_logs, log_ids_for_merged_logs = decompose_logs_merged_and_without_duplicates(prepared_log)
for _id, merged_list in log_ids_for_merged_logs.items():
full_log_ids_for_merged_logs[_id] = merged_list
for log in merged_logs:
number_of_log_lines = number_of_lines
if log["_source"]["is_merged"]:
number_of_log_lines = -1
log_message = text_processing.prepare_message_for_clustering(
log["_source"]["whole_message"], number_of_log_lines, clean_numbers)
if not log_message.strip():
continue

log_level = log["_source"]["log_level"]

for field in log_level_messages:
if log_level not in log_level_messages[field]:
log_level_messages[field][log_level] = ""
if log_level not in log_level_ids_to_add:
log_level_ids_to_add[log_level] = []
if log_level not in logs_unique_log_level:
logs_unique_log_level[log_level] = set()

if log["_source"]["original_message_lines"] <= 2 and \
log["_source"]["original_message_words_number"] <= 100:
if log_level not in log_level_ids_merged:
log_level_ids_merged[log_level] = log
if log_level not in logs_ids_in_merged_logs:
logs_ids_in_merged_logs[log_level] = []
logs_ids_in_merged_logs[log_level].append(log["_id"])

log_level_representative = log_level_ids_merged[log_level]
current_log_word_num = log["_source"]["original_message_words_number"]
main_log_word_num = log_level_representative["_source"]["original_message_words_number"]
if current_log_word_num > main_log_word_num:
log_level_ids_merged[log_level] = log

normalized_msg = " ".join(log["_source"]["message"].strip().lower().split())
if normalized_msg not in logs_unique_log_level[log_level]:
logs_unique_log_level[log_level].add(normalized_msg)

for field in log_level_messages:
if field in log["_source"]:
splitter = "\n" if field in ["message", "whole_message"] else " "
log_level_messages[field][log_level] = \
log_level_messages[field][log_level] + log["_source"][field] + splitter

else:
log_level_ids_to_add[log_level].append(log["_id"])

return self.merge_big_and_small_logs(
logs, log_level_ids_to_add, log_level_messages, log_level_ids_merged, logs_ids_in_merged_logs)

def prepare_new_log(self, old_log: dict[str, Any], new_id, is_merged: bool, merged_small_logs: str,
fields_to_clean: Optional[list[str]] = None) -> dict[str, Any]:
"""Prepare updated log"""
merged_log = copy.deepcopy(old_log)
merged_log["_source"]["is_merged"] = is_merged
merged_log["_id"] = new_id
merged_log["_source"]["merged_small_logs"] = merged_small_logs
if fields_to_clean:
for field in fields_to_clean:
merged_log["_source"][field] = ""
return merged_log
log_messages.append(log_message)
log_dict[ind] = log
ind += 1
return log_messages, log_dict, full_log_ids_for_merged_logs
Loading

0 comments on commit 54c8a40

Please sign in to comment.