Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test charts #262

Draft
wants to merge 24 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .env_file
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT=1
CLICKHOUSE_PASSWORD=pass
SUPERSET_USERNAME=superset
SUPERSET_PASSWORD=superset
SUPERSET_METADATA_PORT=5433
SUPERSET_METADATA_PORT=5433
ENVIRONMENT=test
4 changes: 3 additions & 1 deletion dff/stats/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ def get_superset_session(args: Namespace, base_url: str = "http://localhost:8088

:return: Authorized session - authorization headers tuple.
"""
username = args.username
password = args.password
healthcheck_url = parse.urljoin(base_url, "/healthcheck")
login_url = parse.urljoin(base_url, "/api/v1/security/login")
csrf_url = parse.urljoin(base_url, "/api/v1/security/csrf_token/")
Expand All @@ -121,7 +123,7 @@ def get_superset_session(args: Namespace, base_url: str = "http://localhost:8088
access_request = session.post(
login_url,
headers={"Content-Type": "application/json", "Accept": "*/*"},
data=json.dumps({"username": args.username, "password": args.password, "refresh": True, "provider": "db"}),
data=json.dumps({"username": username, "password": password, "refresh": True, "provider": "db"}),
)
access_token = access_request.json()["access_token"]
# get csrf_token
Expand Down
1 change: 1 addition & 0 deletions dff/utils/docker/dockerfile_stats
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
FROM apache/superset:2.1.0rc1
USER root
RUN cd /app && pip install .[clickhouse]
COPY health_stats.sh /app/docker/
COPY entrypoint_stats.sh /app/docker/
COPY --chown=superset superset_config_docker.py /app/pythonpath/
ENV SUPERSET_CONFIG_PATH /app/pythonpath/superset_config_docker.py
Expand Down
2 changes: 1 addition & 1 deletion dff/utils/docker/entrypoint_stats.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
export SERVER_THREADS_AMOUNT=8
set -m
nohup /bin/bash /usr/bin/run-server.sh &
sleep 5
/bin/bash /app/docker/health_stats.sh http://localhost:8088/health
superset fab create-admin --firstname superset --lastname admin --username $SUPERSET_USERNAME --email [email protected] --password $SUPERSET_PASSWORD
superset db upgrade
superset init
Expand Down
19 changes: 19 additions & 0 deletions dff/utils/docker/health_stats.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/bin/bash
if [[ $# = 0 ]] ; then printf "Specify healthcheck url;\n"; exit 1; fi;
for itr in {1..10}
do
healthcheck=$(curl -X GET "${1}" | grep "OK")
healthcheck=$?
if [ "$healthcheck" -ne 0 ] ; then
echo "Healthcheck failed. sleeping for 5 secs"
sleep 5
echo 'Iteration' $itr
if [ $itr == 10 ]; then
echo 'Healthcheck suite unsuccessful.'
fi
else
echo "Healthcheck suite succesful."
break
exit 0
fi
done
13 changes: 7 additions & 6 deletions dff/utils/docker/superset_config_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@
#
import os

SQLALCHEMY_DATABASE_URI = "postgresql+psycopg2://{0}:{1}@dashboard-metadata:{2}/{3}".format(
os.getenv("POSTGRES_USERNAME"),
os.getenv("POSTGRES_PASSWORD"),
os.getenv("SUPERSET_METADATA_PORT"),
os.getenv("POSTGRES_DB"),
)
if os.getenv("ENVIRONMENT") == "prod":
SQLALCHEMY_DATABASE_URI = "postgresql+psycopg2://{0}:{1}@dashboard-metadata:{2}/{3}".format(
os.getenv("POSTGRES_USERNAME"),
os.getenv("POSTGRES_PASSWORD"),
os.getenv("SUPERSET_METADATA_PORT"),
os.getenv("POSTGRES_DB"),
)
1 change: 1 addition & 0 deletions makefile
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ docker_up:

wait_db: docker_up
while ! docker-compose exec psql pg_isready; do sleep 1; done > /dev/null
while ! docker-compose exec dashboard /bin/bash -c "curl localhost:8088/health | grep OK"; do sleep 1; done > /dev/null
while ! docker-compose exec mysql bash -c 'mysql -u $$MYSQL_USERNAME -p$$MYSQL_PASSWORD -e "select 1;"'; do sleep 1; done &> /dev/null
.PHONY: wait_db

Expand Down
134 changes: 134 additions & 0 deletions tests/stats/chart_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
# %%
import random
import asyncio
from tqdm import tqdm
from dff.script import Context, Message, RESPONSE, TRANSITIONS
from dff.script import conditions as cnd
from dff.pipeline import Pipeline, ACTOR, Service
from dff.stats import OtelInstrumentor, default_extractors

# %%
# instrumentation code
dff_instrumentor = OtelInstrumentor.from_url("grpc://localhost:4317", insecure=True)
dff_instrumentor.instrument()


def numbered_flow_factory(number: int):
return {
f"node_{str(n)}": {
RESPONSE: Message(text=f"node_{str(number)}_{str(n)}"),
TRANSITIONS: {f"node_{str(n+1)}": cnd.true()} if n != 4 else {("root", "fallback"): cnd.true()},
}
for n in range(5)
}


numbered_script = {
"root": {
"start": {
RESPONSE: Message(text="Hi"),
TRANSITIONS: {
lambda ctx, pipeline: (f"flow_{random.choice(range(1, 11))}", "node_1", 1): cnd.true(),
},
},
"fallback": {RESPONSE: Message(text="Oops")},
},
**{f"flow_{str(n)}": numbered_flow_factory(n) for n in range(1, 11)},
}

transitions_script = {
"root": {
"start": {
RESPONSE: Message(text="Hi"),
TRANSITIONS: {
("flow_1", "node"): cnd.true(),
},
},
"fallback": {RESPONSE: Message(text="Oops")},
},
**{
f"flow_{str(num)}": {
"node": {
RESPONSE: Message(text="Message."),
TRANSITIONS: {(f"flow_{str(num+1)}", "node"): cnd.true()}
if num != 100
else {("root", "fallback"): cnd.true()},
}
}
for num in range(1, 101)
},
}


transition_test_pipeline = Pipeline.from_dict(
{
"script": transitions_script,
"start_label": ("root", "start"),
"fallback_label": ("root", "fallback"),
"components": [
Service(
handler=ACTOR,
after_handler=[
default_extractors.get_current_label,
],
),
],
}
)

numbered_test_pipeline = Pipeline.from_dict(
{
"script": numbered_script,
"start_label": ("root", "start"),
"fallback_label": ("root", "fallback"),
"components": [
Service(
handler=ACTOR,
after_handler=[
default_extractors.get_current_label,
],
),
],
}
)


# %%
async def worker(pipeline: Pipeline, queue: asyncio.Queue):
"""
Worker function for dispatching one client message.
The client message is chosen randomly from a predetermined set of options.
It simulates pauses in between messages by calling the sleep function.

The function also starts a new dialog as a new user, if the current dialog
ended in the fallback_node.

:param queue: Queue for sharing context variables.
"""
ctx: Context = await queue.get()
in_message = Message(text="Hi")
await asyncio.sleep(random.random() * 3)
ctx = await pipeline._run_pipeline(in_message, ctx.id)
await asyncio.sleep(random.random() * 3)
await queue.put(ctx)


# %%
# main loop
async def loop(pipeline: Pipeline, n_iterations: int = 10, n_workers: int = 10):
"""
The main loop that runs one or more worker coroutines in parallel.

:param n_iterations: Total number of coroutine runs.
:param n_workers: Number of parallelized coroutine runs.
"""
ctxs = asyncio.Queue()
parallel_iterations = n_iterations // n_workers
for _ in range(n_workers):
await ctxs.put(Context())
for _ in tqdm(range(parallel_iterations)):
await asyncio.gather(*(worker(pipeline, ctxs) for _ in range(n_workers)))


if __name__ == "__main__":
asyncio.run(loop(numbered_test_pipeline))
136 changes: 136 additions & 0 deletions tests/stats/test_charts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import os
import random
import json
import asyncio
from argparse import Namespace
from urllib import parse
import pytest

try:
from requests import Session
import omegaconf # noqa: F401
import tqdm # noqa: F401
from dff.stats.utils import get_superset_session
from dff.stats.cli import DEFAULT_SUPERSET_URL
from aiochclient import ChClient
from httpx import AsyncClient
except ImportError:
pytest.skip(reason="`OmegaConf` dependency missing.", allow_module_level=True)

from tests.stats.chart_data import numbered_test_pipeline, transition_test_pipeline, loop
from tests.context_storages.test_dbs import ping_localhost
from tests.test_utils import get_path_from_tests_to_current_dir

random.seed(42)
dot_path_to_addon = get_path_from_tests_to_current_dir(__file__)

SUPERSET_ACTIVE = ping_localhost(8088)
COLLECTOR_AVAILABLE = ping_localhost(4317)
CLICKHOUSE_AVAILABLE = ping_localhost(8123)
CLICKHOUSE_USER = os.getenv("CLICKHOUSE_USER")
CLICKHOUSE_PASSWORD = os.getenv("CLICKHOUSE_PASSWORD")
CLICKHOUSE_DB = os.getenv("CLICKHOUSE_DB")
SUPERSET_USERNAME = os.getenv("SUPERSET_USERNAME")
SUPERSET_PASSWORD = os.getenv("SUPERSET_PASSWORD")


async def transitions_data_test(session: Session, headers: dict, base_url=DEFAULT_SUPERSET_URL):
charts_url = parse.urljoin(DEFAULT_SUPERSET_URL, "/api/v1/chart")

result = session.get(charts_url, headers=headers)
result.raise_for_status()
result_json = result.json()

target_chart_id = [item for item in result_json["result"] if item["slice_name"] == "Transition counts"][0]["id"]
target_url = parse.urljoin(DEFAULT_SUPERSET_URL, f"api/v1/chart/{target_chart_id}/data/")
result_status = 404
attempts = 0
while result_status != 200 and attempts < 10:
attempts += 1
data_result = session.get(target_url, headers=headers)
result_status = data_result.status_code
await asyncio.sleep(1)

data_result_json = data_result.json()
data = data_result_json["result"][0]["data"]
assert (len(data)) > 0
assert "COUNT_DISTINCT(context_id)" in data[0]
assert data[0]["COUNT_DISTINCT(context_id)"] == 10
session.close()


async def numbered_data_test(session: Session, headers: dict, base_url=DEFAULT_SUPERSET_URL):
charts_url = parse.urljoin(DEFAULT_SUPERSET_URL, "/api/v1/chart")

result = session.get(charts_url, headers=headers)
result.raise_for_status()
result_json = result.json()

target_chart_id = [item for item in result_json["result"] if item["slice_name"] == "Table"][0]["id"]
target_url = parse.urljoin(DEFAULT_SUPERSET_URL, f"api/v1/chart/{target_chart_id}/data/")
result_status = 404
attempts = 0
while result_status != 200 and attempts < 10:
attempts += 1
data_result = session.get(target_url, headers=headers)
result_status = data_result.status_code
await asyncio.sleep(2)

data_result_json = data_result.json()
grouped_dict = dict()
data = data_result_json["result"][0]["data"]
assert len(data) > 0
for item in data:
if item["context_id"] not in grouped_dict:
grouped_dict[item["context_id"]] = [item]
else:
grouped_dict[item["context_id"]].append(item)
unique_flows = list(map(lambda x: set(map(lambda y: json.loads(y["data"])["flow"], x)), grouped_dict.values()))
assert all(map(lambda x: len(x) == 1, unique_flows))
session.close()


@pytest.mark.skipif(not SUPERSET_ACTIVE, reason="Superset server not active")
@pytest.mark.skipif(not CLICKHOUSE_AVAILABLE, reason="Clickhouse unavailable.")
@pytest.mark.skipif(not COLLECTOR_AVAILABLE, reason="OTLP collector unavailable.")
@pytest.mark.skipif(
not all([CLICKHOUSE_USER, CLICKHOUSE_PASSWORD, CLICKHOUSE_DB]), reason="Clickhouse credentials missing"
)
@pytest.mark.asyncio
@pytest.mark.parametrize(
["pipeline", "func"],
[
(numbered_test_pipeline, numbered_data_test),
(transition_test_pipeline, transitions_data_test),
],
)
@pytest.mark.docker
async def test_charts(pipeline, func, otlp_log_exp_provider, otlp_trace_exp_provider):
_, tracer_provider = otlp_trace_exp_provider
_, logger_provider = otlp_log_exp_provider

table = "otel_logs"
http_client = AsyncClient()
ch_client = ChClient(http_client, user=CLICKHOUSE_USER, password=CLICKHOUSE_PASSWORD, database=CLICKHOUSE_DB)
await ch_client.execute(f"TRUNCATE {table}")
await loop(pipeline=pipeline) # run with a test-specific pipeline
tracer_provider.force_flush()
logger_provider.force_flush()
num_records = 0

attempts = 0
while num_records < 10 and attempts < 10:
attempts += 1
await asyncio.sleep(2)
num_records = await ch_client.fetchval(f"SELECT COUNT (*) FROM {table}")

os.system(
f"dff.stats tutorials/stats/example_config.yaml \
-U {SUPERSET_USERNAME} \
-P {SUPERSET_PASSWORD} \
-dP {CLICKHOUSE_PASSWORD}"
)
session, headers = get_superset_session(
Namespace(**{"username": SUPERSET_USERNAME, "password": SUPERSET_PASSWORD}), DEFAULT_SUPERSET_URL
)
await func(session, headers) # run with a test-specific function with equal signature
4 changes: 0 additions & 4 deletions tests/stats/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ def dashboard_display_test(args: Namespace, session, headers, base_url: str):
dashboard_res = session.get(dashboard_url, headers=headers)
assert dashboard_res.status_code == 200
dashboard_json = dashboard_res.json()
print(dashboard_json["result"]["charts"])
assert sorted(dashboard_json["result"]["charts"]) == [
"Current topic [time series bar chart]",
"Current topic slot [bar chart]",
Expand All @@ -77,8 +76,6 @@ def dashboard_display_test(args: Namespace, session, headers, base_url: str):
datasets_result = session.get(datasets_url, headers=headers)
datasets_json = datasets_result.json()
assert datasets_json["count"] == 3
assert datasets_json["ids"] == [1, 2, 3]
assert [item["id"] for item in datasets_json["result"]] == [1, 2, 3]
assert sorted([item["table_name"] for item in datasets_json["result"]]) == [
"dff_final_nodes",
"dff_node_stats",
Expand All @@ -87,7 +84,6 @@ def dashboard_display_test(args: Namespace, session, headers, base_url: str):
charts_result = session.get(charts_url, headers=headers)
charts_json = charts_result.json()
assert charts_json["count"] == 17
assert sorted(charts_json["ids"]) == list(range(1, 18))
session.close()


Expand Down
Loading
Loading