Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
Signed-off-by: Praneeth Bedapudi <[email protected]>
  • Loading branch information
bedapudi6788 committed Dec 12, 2023
1 parent 80580ec commit b9ce9f6
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 318 deletions.
4 changes: 2 additions & 2 deletions smartdash/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
EMAIL = "[email protected]"
AUTHOR = "BEDAPUDI PRANEETH"
REQUIRES_PYTHON = ">=3.6.0"
VERSION = "0.0.1.dev20"
VERSION = "0.0.1.dev21"

# What packages are required for this module to be executed?
REQUIRED = ["requests", "liteindex==0.0.1.dev25", "streamlit", "plotly", "gevent", "falcon", "gunicorn"]
REQUIRED = ["requests", "liteindex", "streamlit", "plotly", "gevent", "falcon", "gunicorn"]

# What packages are optional?
EXTRAS = {
Expand Down
212 changes: 29 additions & 183 deletions smartdash/smartdash/smartdash_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import time
import json
import uuid
import pickle
import falcon
import logging
import requests
Expand All @@ -17,49 +18,34 @@
db_path = os.path.join(os.getenv("SMARTDASH_SAVE_DIR", "./"), "smartdash.db")

LOG_INDEX = DefinedIndex(
"log_index",
{
"app_name": "",
"u_id": "",
"level": "",
"messages": [],
"timestamp": 0,
"stage": "",
"tags": [],
},
db_path=db_path,
auto_key=True,
)

ML_INPUTS_OUTPUTS_INDEX = DefinedIndex(
"ml_inputs_outputs_index",
{
"app_name": "",
"u_id": "",
"inputs": [],
"outputs": [],
"model_type": "",
"timestamp": 0,
"stage": "",
"tags": [],
"logs",
schema={
"app_name": "string",
"u_id": "string",
"stage": "string",
"level": "string",
"messages": "json",
"time": "number",
"tags": "json",
},
db_path=db_path,
auto_key=True,
)

METRICS_INDEX = DefinedIndex(
f"metrics_index",
KV_INDEX = DefinedIndex(
"key_value",
schema={
"app_name": "",
"u_id": "",
"metric": "",
"value": 0,
"timestamp": 0,
"stage": "",
"tags": [],
"app_name": "string",
"u_id": "string",
"key": "string",
"num_value": "number",
"str_value": "string",
"other_value": "other",
"name": "string",
"timestamp": "number",
"stage": "string",
"tags": "json",
},
db_path=db_path,
auto_key=True,
)


Expand All @@ -68,156 +54,19 @@ def on_get(self, req, resp):
resp.media = {"status": "ok"}


class GetDashMetrics(object):
def on_get(self, req, resp):
eight_hours_ago = (
time.time() - float(req.params.get("last_n_hours", 8)) * 60 * 60
)
long_running_if_greater_than = (
time.time() - float(req.params.get("long_running_n_hours", 1)) * 60 * 60
)

# Fetch logs from the last 8 hours

data_by_id = {}

for _, log in LOG_INDEX.search(
{
"app_name": req.params["app_name"],
"timestamp": {"$gte": eight_hours_ago},
},
sort_by="timestamp",
page=1,
page_size=1000,
):
if log["u_id"] not in data_by_id:
data_by_id[log["u_id"]] = {
"logs": [],
"metrics": [],
"ml_inputs_outputs": [],
"stage_wise_times": {},
"success": None,
"failed": None,
"in_process": None,
"long_running": False,
}

data_by_id[log["u_id"]]["logs"].append(log)

# Fetch metrics from the last 8 hours

for _, metric in METRICS_INDEX.search(
{
"app_name": req.params["app_name"],
"timestamp": {"$gte": eight_hours_ago},
},
sort_by="timestamp",
page=1,
page_size=1000,
):
if metric["u_id"] not in data_by_id:
data_by_id[metric["u_id"]] = {
"logs": [],
"metrics": [],
"ml_inputs_outputs": [],
"stage_wise_times": {},
"success": None,
"failed": None,
"in_process": None,
"long_running": False,
}
data_by_id[metric["u_id"]]["metrics"].append(metric)

for _, ml_inputs_outputs in ML_INPUTS_OUTPUTS_INDEX.search(
{
"app_name": req.params["app_name"],
"timestamp": {"$gte": eight_hours_ago},
},
sort_by="timestamp",
page=1,
page_size=1000,
):
if ml_inputs_outputs["u_id"] not in data_by_id:
data_by_id[log["u_id"]] = {
"logs": [],
"metrics": [],
"ml_inputs_outputs": [],
"stage_wise_times": {},
"success": None,
"failed": None,
"in_process": None,
"long_running": False,
}

data_by_id[log["u_id"]]["ml_inputs_outputs"].append(ml_inputs_outputs)

# Calculate stage wise timers
for u_id, data in data_by_id.items():
stage_timers = {}
logs = data["logs"]
for log in logs:
stage = log["stage"]
if stage not in stage_timers:
stage_timers[stage] = {
"start": log["timestamp"],
"end": log["timestamp"],
}
else:
stage_timers[stage]["end"] = log["timestamp"]

if logs:
if logs[-1]["level"] == "ERROR":
data_by_id[u_id]["failed"] = True
elif logs[-1]["messages"][0] == "Stage succeeded":
data_by_id[u_id]["success"] = True
elif time.time() - logs[-1]["timestamp"] > long_running_if_greater_than:
data_by_id[u_id]["long_running"] = True
else:
data_by_id[u_id]["in_process"] = True

if stage_timers:
data_by_id[u_id]["stage_wise_times"] = stage_timers

resp.media = {"success": True, "data_by_uid": data_by_id}
resp.status = falcon.HTTP_200


class AppNames(object):
def on_get(self, req, resp):
resp.media = {
"success": True,
"app_names": list(
set(
LOG_INDEX.distinct("app_name")
+ METRICS_INDEX.distinct("app_name")
+ ML_INPUTS_OUTPUTS_INDEX.distinct("app_name")
)
),
}
resp.status = falcon.HTTP_200


class AddLogs(object):
def on_post(self, req, resp):
data = req.media
id = LOG_INDEX.add(data)
resp.media = {"success": True, "id": id}
resp.status = falcon.HTTP_200
LOG_INDEX.update(pickle.loads(req.stream.read()))


class AddMetrics(object):
def on_post(self, req, resp):
data = req.media
id = METRICS_INDEX.add(data)
resp.media = {"success": True, "id": id}
resp.media = {"success": True}
resp.status = falcon.HTTP_200


class AddMLInputsOutputs(object):
class AddKeyValues(object):
def on_post(self, req, resp):
data = req.media
id = ML_INPUTS_OUTPUTS_INDEX.add(data)
resp.media = {"success": True, "id": id}
KV_INDEX.update(pickle.loads(req.stream.read()))

resp.media = {"success": True}
resp.status = falcon.HTTP_200


Expand All @@ -229,10 +78,7 @@ def main(port=8080):
)

app.add_route("/logs", AddLogs())
app.add_route("/metrics", AddMetrics())
app.add_route("/ml_inputs_outputs", AddMLInputsOutputs())
app.add_route("/app_names", AppNames())
app.add_route("/get_dash_metrics", GetDashMetrics())
app.add_route("/key_values", AddKeyValues())
app.add_route("/health", HealthCheck())

import gunicorn.app.base
Expand Down
4 changes: 2 additions & 2 deletions smartlog/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
EMAIL = "[email protected]"
AUTHOR = "BEDAPUDI PRANEETH"
REQUIRES_PYTHON = ">=3.6.0"
VERSION = "0.0.1.dev22"
VERSION = "0.0.1.dev23"

# What packages are required for this module to be executed?
REQUIRED = ["requests", "liteindex==0.0.1.dev25"]
REQUIRED = ["requests", "liteindex"]

# What packages are optional?
EXTRAS = {
Expand Down
Loading

0 comments on commit b9ce9f6

Please sign in to comment.