Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rachel/bug #6576

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions snuba/pipeline/stages/query_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def get_cluster(
def _process_data(
self, pipe_input: QueryPipelineData[ClickhouseQuery | CompositeQuery[Table]]
) -> QueryResult:
print("whereeeee query_execution.py")
cluster = self.get_cluster(pipe_input.data, pipe_input.query_settings)
if pipe_input.query_settings.get_dry_run():
return _dry_run_query_runner(
Expand Down Expand Up @@ -147,6 +148,7 @@ def _run_and_apply_column_names(
concurrent_queries_gauge,
cluster_name,
)
print("query_execution/result", result)

alias_name_mapping: MutableMapping[str, list[str]] = {}
for select_col in clickhouse_query.get_selected_columns():
Expand Down Expand Up @@ -194,6 +196,7 @@ def _format_storage_query_and_run(
formatted_query = format_query(clickhouse_query)

formatted_sql = formatted_query.get_sql()
print("formatted_sql print", formatted_sql)
query_size_bytes = len(formatted_sql.encode("utf-8"))
span.set_data(
"query", textwrap.wrap(formatted_sql, 100, break_long_words=False)
Expand Down
2 changes: 2 additions & 0 deletions snuba/pipeline/stages/query_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class EntityProcessingStage(
def _process_data(
self, pipe_input: QueryPipelineData[Request]
) -> ClickhouseQuery | CompositeQuery[Table]:
print("whereeeee query_processing.py")
query = pipe_input.data.query
translated_storage_query = try_translate_storage_query(query)
if translated_storage_query:
Expand Down Expand Up @@ -65,6 +66,7 @@ def _apply_default_subscriptable_mapping(
def _process_data(
self, pipe_input: QueryPipelineData[ClickhouseQuery | CompositeQuery[Table]]
) -> ClickhouseQuery | CompositeQuery[Table]:
print("whereeeee query_processing.py/StorageProcessingStage")
self._apply_default_subscriptable_mapping(pipe_input.data)
if isinstance(pipe_input.data, ClickhouseQuery):
query_plan = build_best_plan(pipe_input.data, pipe_input.query_settings, [])
Expand Down
2 changes: 2 additions & 0 deletions snuba/query/mql/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -1376,6 +1376,7 @@ def _process_data(
tuple[str, Dataset, dict[str, Any], QuerySettings | None]
],
) -> LogicalQuery:
print("whereeeee mql/parser.py")
mql_str, dataset, mql_context_dict, settings = pipe_input.data

with sentry_sdk.start_span(op="parser", description="parse_mql_query_initial"):
Expand Down Expand Up @@ -1419,6 +1420,7 @@ def _process_data(
]
],
) -> LogicalQuery:
print("whereeeee query/mql/parser.py")
query, settings, custom_processing = pipe_input.data
with sentry_sdk.start_span(op="processor", description="post_processors"):
_post_process(
Expand Down
1 change: 1 addition & 0 deletions snuba/query/snql/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -1587,6 +1587,7 @@ def _process_data(
]
],
) -> LogicalQuery | CompositeQuery[LogicalDataSource]:
print("whereeeee snql/parser.py")
query, dataset, custom_processing = pipe_input.data
settings = pipe_input.query_settings

Expand Down
3 changes: 3 additions & 0 deletions snuba/web/db_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ def execute_query(
with_totals=clickhouse_query.has_totals(),
robust=robust,
)
print("formatted_query", formatted_query)
print("reader.execute result", result)

timer.mark("execute")
stats.update(
Expand Down Expand Up @@ -674,6 +676,7 @@ def db_query(
trace_id,
robust,
)
print("db_query/result", result)
except AllocationPolicyViolations as e:
update_query_metadata_and_stats(
query=clickhouse_query,
Expand Down
1 change: 1 addition & 0 deletions snuba/web/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def _run_query_pipeline(
robust=robust,
concurrent_queries_gauge=concurrent_queries_gauge,
).execute(clickhouse_query)
print("query.py/res", res)
if res.error:
raise res.error
elif res.data:
Expand Down
4 changes: 3 additions & 1 deletion snuba/web/rpc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ def run_rpc_handler(
name: str, version: str, data: bytes
) -> ProtobufMessage | ErrorProto:
try:
endpoint = RPCEndpoint.get_from_name(name, version)() # type: ignore
endpoint = RPCEndpoint.get_from_name(name, version)()
print("endpoint", endpoint) # type: ignore
except (AttributeError, InvalidConfigKeyError) as e:
return convert_rpc_exception_to_proto(
RPCRequestException(
Expand All @@ -140,6 +141,7 @@ def run_rpc_handler(

try:
deserialized_protobuf = endpoint.parse_from_string(data)
print("deserialized_protobuf", deserialized_protobuf)
except DecodeError as e:
return convert_rpc_exception_to_proto(
RPCRequestException(
Expand Down
7 changes: 7 additions & 0 deletions snuba/web/rpc/common/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,13 @@ def attribute_key_to_expression(attr_key: AttributeKey) -> Expression:

# End of special handling, just send to the appropriate bucket
if attr_key.type == AttributeKey.Type.TYPE_STRING:
# return f.CAST(
# SubscriptableReference(
# alias=None, column=column("attr_str"), key=literal(attr_key.name)
# ),
# "String",
# alias=alias,
# )
return SubscriptableReference(
alias=alias, column=column("attr_str"), key=literal(attr_key.name)
)
Expand Down
1 change: 1 addition & 0 deletions snuba/web/rpc/v1/endpoint_trace_item_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ def _execute(self, in_msg: TraceItemTableRequest) -> TraceItemTableResponse:
uuid.uuid4()
)
snuba_request = _build_snuba_request(in_msg)
print("snuba_request", snuba_request)
res = run_query(
dataset=PluggableDataset(name="eap", all_entities=[]),
request=snuba_request,
Expand Down
1 change: 1 addition & 0 deletions snuba/web/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ def rpc(*, name: str, version: str) -> Response:
if isinstance(result_proto, ErrorProto):
return Response(result_proto.SerializeToString(), status=result_proto.code)
else:
print("views.py/result_proto", result_proto)
return Response(result_proto.SerializeToString(), status=200)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def gen_message(
"origin": "auto.http.django",
"project_id": 1,
"received": 1721319572.877828,
"retention_days": 90,
"retention_days": 91,
"segment_id": "8873a98879faf06d",
"sentry_tags": {
"category": "http",
Expand Down Expand Up @@ -841,6 +841,7 @@ def test_cast_bug(self, setup_teardown: Any) -> None:
err_msg = ParseDict(err_req, TraceItemTableRequest())
# just ensuring it doesnt raise an exception
EndpointTraceItemTable().execute(err_msg)
assert False


class TestUtils:
Expand Down
231 changes: 231 additions & 0 deletions tests/web/rpc/v1/test_endpoint_trace_item_table/test_rachel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
import json
from datetime import UTC, datetime, timedelta, timezone
from typing import Any, TypedDict, Union
from uuid import uuid4

import pytest
from google.protobuf.timestamp_pb2 import Timestamp
from google.protobuf.timestamp_pb2 import Timestamp as ProtobufTimestamp
from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import (
Column,
TraceItemTableRequest,
)
from sentry_protos.snuba.v1.request_common_pb2 import RequestMeta
from sentry_protos.snuba.v1.trace_item_attribute_pb2 import (
AttributeKey,
VirtualColumnContext,
)
from sentry_protos.snuba.v1.trace_item_filter_pb2 import TraceItemFilter

from tests.base import BaseApiTest


def before_now(**kwargs: float) -> datetime:
date = datetime.now(UTC) - timedelta(**kwargs)
return date - timedelta(microseconds=date.microsecond % 1000)


BASE_TIME = datetime.now(timezone.utc).replace(
minute=0, second=0, microsecond=0
) - timedelta(minutes=180)


@pytest.mark.clickhouse_db
@pytest.mark.redis_db
class TestRachel(BaseApiTest):
def create_span(
self,
extra_data: dict[str, Any] | None = None,
start_ts: datetime | None = None,
duration: int = 1000,
measurements: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Create span json, not required for store_span, but with no params passed should just work out of the box"""
if start_ts is None:
start_ts = datetime.now() - timedelta(days=30)
if extra_data is None:
extra_data = {}
span: dict = {
"is_segment": False,
"measurements": {},
"retention_days": 90,
"sentry_tags": {},
"tags": {},
}
# Load some defaults
span.update(
{
"event_id": uuid4().hex,
"organization_id": 4555051977080832,
"project_id": 4555051977211905,
"trace_id": uuid4().hex,
"span_id": uuid4().hex[:16],
"parent_span_id": uuid4().hex[:16],
"segment_id": uuid4().hex[:16],
"group_raw": uuid4().hex[:16],
"profile_id": uuid4().hex,
# Multiply by 1000 cause it needs to be ms
"start_timestamp_ms": int(start_ts.timestamp() * 1000),
"start_timestamp_precise": start_ts.timestamp(),
"end_timestamp_precise": start_ts.timestamp() + duration / 1000,
"timestamp": int(start_ts.timestamp() * 1000),
"received": start_ts.timestamp(),
"duration_ms": duration,
"exclusive_time_ms": duration,
}
)
# Load any specific custom data
span.update(extra_data)
# coerce to string
for tag, value in dict(span["tags"]).items():
span["tags"][tag] = str(value)
if measurements:
span["measurements"] = measurements
return span

def store_spans(self, spans, is_eap=False):
for span in spans:
span["ingest_in_eap"] = is_eap
assert (
self.app.post(
f"/tests/entities/{'eap_' if is_eap else ''}spans/insert",
data=json.dumps(spans),
).status_code
== 200
)

def test_rachel(self):
self.store_spans(
[
self.create_span(
{
"description": "foo",
"sentry_tags": {"status": "success"},
"tags": {"foo": "five"},
},
measurements={"foo": {"value": 5}},
start_ts=before_now(minutes=10),
),
],
is_eap=True,
)

# response = self.do_request(
# {
# "field": ["description", "tags[foo,number]", "tags[foo,string]", "tags[foo]"],
# "query": "",
# "orderby": "description",
# "project": 4555051977080832,
# "dataset": "spans",
# }
# )

req = TraceItemTableRequest(
meta=RequestMeta(
organization_id=4555051977080832,
referrer="api.organization-events",
project_ids=[4555051977211905],
start_timestamp=Timestamp(
seconds=int(
datetime(
year=2024,
month=8,
day=17,
hour=19,
minute=19,
second=13,
microsecond=417691,
tzinfo=UTC,
).timestamp()
)
),
end_timestamp=Timestamp(
seconds=int(
datetime(
year=2024,
month=11,
day=15,
hour=19,
minute=29,
second=13,
microsecond=417691,
tzinfo=UTC,
).timestamp()
)
),
),
columns=[
Column(
key=AttributeKey(type=AttributeKey.TYPE_STRING, name="sentry.name"),
label="description",
),
Column(
key=AttributeKey(type=AttributeKey.TYPE_INT, name="foo"),
label="tags[foo,number]",
),
Column(
key=AttributeKey(type=AttributeKey.TYPE_STRING, name="foo"),
label="tags[foo,string]",
),
Column(
key=AttributeKey(type=AttributeKey.TYPE_STRING, name="foo"),
label="tags[foo]",
),
Column(
key=AttributeKey(
type=AttributeKey.TYPE_STRING, name="sentry.span_id"
),
label="id",
),
Column(
key=AttributeKey(
type=AttributeKey.TYPE_STRING, name="project.name"
),
label="project.name",
),
],
order_by=[
TraceItemTableRequest.OrderBy(
column=Column(
key=AttributeKey(
type=AttributeKey.TYPE_STRING, name="sentry.name"
),
label="description",
)
)
],
group_by=[
AttributeKey(type="TYPE_STRING", name="sentry.name"),
AttributeKey(type="TYPE_INT", name="foo"),
AttributeKey(type="TYPE_STRING", name="foo"),
AttributeKey(type="TYPE_STRING", name="foo"),
AttributeKey(type="TYPE_STRING", name="sentry.span_id"),
AttributeKey(type="TYPE_STRING", name="project.name"),
],
virtual_column_contexts=[
VirtualColumnContext(
from_column_name="sentry.project_id",
to_column_name="project.name",
value_map={"4555051977211905": "bar"},
)
],
)

response = self.app.post(
"/rpc/EndpointTraceItemTable/v1",
data=req.SerializeToString(),
headers={"referer": "api.organization-events"},
)
#
# print("jdflksflkslf")
# print(response.data)
# #print(json.loads(response.data))
# print(response.status_code)
# print(response.json)

assert False
assert response.status_code == 200, response.content
datata = data["data"]
assert datata[0]["data"]["tags[foo,number]"] == 5
assert datata[0]["tags[foo,string]"] == "five"
assert datata[0]["tags[foo]"] == "five"
Loading