Skip to content

Commit

Permalink
Fixed per feedback, removed decorator
Browse files Browse the repository at this point in the history
  • Loading branch information
TeachMeTW committed Oct 5, 2024
1 parent 553a45d commit d19392d
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 46 deletions.
46 changes: 46 additions & 0 deletions conf/log/function_pipeline.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
{
"version": 1,
"disable_existing_loggers": false,
"formatters": {
"detailed": {
"class": "logging.Formatter",
"format": "%(asctime)s:%(levelname)s:%(thread)d:%(message)s"
}
},
"handlers": {
"errors": {
"class": "logging.handlers.RotatingFileHandler",
"level": "ERROR",
"formatter": "detailed",
"filename": "/var/tmp/function_pipeline-errors.log",
"mode": "a",
"maxBytes": 1073741824,
"backupCount": 2,
"encoding": "UTF-8"
},
"file": {
"class": "logging.handlers.RotatingFileHandler",
"level": "DEBUG",
"formatter": "detailed",
"filename": "/var/tmp/function_pipeline.log",
"mode": "a",
"maxBytes": 1073741824,
"backupCount": 8,
"encoding": "UTF-8"
},
"console": {
"class": "logging.StreamHandler",
"level": "WARNING",
"formatter": "detailed",
"stream": "ext://sys.stdout"
}
},
"root": {
"handlers": [
"console",
"file",
"errors"
],
"level": "DEBUG"
}
}
82 changes: 82 additions & 0 deletions emission/functions/time_functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import json
import logging
import logging.config
import os
import time
import numpy as np
import arrow
import pymongo

import emission.core.get_database as edb
import emission.core.timer as ect

import emission.storage.decorations.stats_queries as esds

def run_function_pipeline(process_number, function_list, skip_if_no_new_data=False):
"""
Run the function pipeline with the specified process number and function list.
Note that the process_number is only really used to customize the log file name
We could avoid passing it in by using the process id - os.getpid() instead, but
then we won't get the nice RotatingLogHandler properties such as auto-deleting
files if there are too many. Maybe it will work properly with logrotate? Need to check
:param process_number: id representing the process number. In range (0..n)
:param function_list: the list of functions that this process will handle
:param skip_if_no_new_data: flag to skip function execution based on custom logic
:return: None
"""
try:
with open("conf/log/function_pipeline.conf", "r") as cf:
pipeline_log_config = json.load(cf)
except FileNotFoundError:
with open("conf/log/function_pipeline.conf.sample", "r") as cf:
pipeline_log_config = json.load(cf)

# Customize log filenames with process number
pipeline_log_config["handlers"]["file"]["filename"] = \
pipeline_log_config["handlers"]["file"]["filename"].replace("function_pipeline", f"function_pipeline_{process_number}")
pipeline_log_config["handlers"]["errors"]["filename"] = \
pipeline_log_config["handlers"]["errors"]["filename"].replace("function_pipeline", f"function_pipeline_{process_number}")

logging.config.dictConfig(pipeline_log_config)
np.random.seed(61297777)

logging.info(f"Processing function list: { [func.__name__ for func in function_list] }")

for func in function_list:
func_name = func.__name__
if func is None:
continue

try:
run_function_pipeline_step(func, skip_if_no_new_data)
except Exception as e:
esds.store_function_error(func_name, "WHOLE_PIPELINE", time.time(), None)
logging.exception(f"Found error {e} while processing pipeline for function {func_name}, skipping")

def run_function_pipeline_step(func, skip_if_no_new_data):
"""
Execute a single step in the function pipeline.
:param func: The function to execute
:param skip_if_no_new_data: Flag to determine if the function should be skipped based on custom logic
:return: None
"""
func_name = func.__name__

with ect.Timer() as timer:
logging.info(f"********** Function {func_name}: Starting execution **********")
print(f"{arrow.now()} ********** Function {func_name}: Starting execution **********")
result = func()

# Store the execution time
esds.store_function_time(func_name, "EXECUTION",
time.time(), timer.elapsed)

if skip_if_no_new_data and not result:
print(f"No new data for {func_name}, and skip_if_no_new_data = {skip_if_no_new_data}, skipping the rest of the pipeline")
return
else:
print(f"Function {func_name} executed with result = {result} and skip_if_no_new_data = {skip_if_no_new_data}, continuing")

logging.info(f"********** Function {func_name}: Completed execution **********")
48 changes: 2 additions & 46 deletions emission/storage/decorations/stats_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,50 +64,6 @@ def store_function_time(user_id: str, stage_string: str, ts: float, reading: flo
store_stats_entry(user_id, "stats/function_time", stage_string, ts, reading)


def time_and_store_function(user_id: str):
"""
Decorator to measure execution time of functions and store the stats under 'stats/function_time'.
Parameters:
- user_id (str): The ID of the user associated with the stats.
Usage:
@time_and_store_function(user_id="user123")
def my_function(...):
...
"""
def decorator(func: Callable):
@wraps(func)
def wrapper(*args, **kwargs):
print(f"Decorator invoked for {func.__name__}")
stage_string = func.__name__
ts = time.time()
logging.info(f"Starting '{stage_string}' execution.")
start_time = time.time()
try:
result = func(*args, **kwargs)
success = True
return result
except Exception as e:
success = False
logging.error(f"Error in '{stage_string}': {e}", exc_info=True)
raise
finally:
end_time = time.time()
duration_ms = (end_time - start_time) * 1000 # Convert to milliseconds
logging.info(f"Finished '{stage_string}' in {duration_ms:.2f} ms.")
# Store the timing stats
try:
store_function_time(
user_id=user_id,
stage_string=stage_string,
ts=ts,
reading=duration_ms
)
except Exception as storage_error:
logging.error(f"Failed to store timing stats for '{stage_string}': {storage_error}", exc_info=True)
if not success:
logging.warning(f"'{stage_string}' encountered an error.")
return wrapper
return decorator
def store_function_error(user_id, stage_string, ts, reading):
store_stats_entry(user_id, "stats/function_time", stage_string, ts, reading)

49 changes: 49 additions & 0 deletions emission/tests/funcTests/TestFunctionTiming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# emission/tests/funcTests/TestFunctionTiming.py

import json
import logging
import logging.config
import os
import time
import numpy as np
import arrow
from contextlib import contextmanager

# Import the run_function_pipeline function from time_functions.py
from emission.functions.time_functions import run_function_pipeline

# Define test functions
def test_function_1():
logging.info("Executing test_function_1")
time.sleep(1) # Simulate processing time
return True # Indicate successful execution

def test_function_2():
logging.info("Executing test_function_2")
time.sleep(1)
return True

def test_function_faulty():
logging.info("Executing test_function_faulty")
time.sleep(1)
raise ValueError("Simulated error in test_function_faulty")

def test_function_3():
logging.info("Executing test_function_3")
time.sleep(1)
return True

if __name__ == "__main__":
# Ensure the logs directory exists
os.makedirs("logs", exist_ok=True)

# Define the list of test functions, including the faulty one
function_list = [
test_function_1,
test_function_2,
test_function_faulty, # This will raise an exception
test_function_3 # This should execute normally after the faulty function
]

# Run the pipeline with process number 1 and skip_if_no_new_data set to True
run_function_pipeline(process_number=1, function_list=function_list, skip_if_no_new_data=True)

0 comments on commit d19392d

Please sign in to comment.