Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(subscriptions): Use RPC in subscriptions pipeline #6499

Open
wants to merge 31 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
fb123b4
feat(subscriptions): Add create subscriptions RPC
shruthilayaj Nov 4, 2024
0f862e9
create subscriptions file
shruthilayaj Nov 4, 2024
0e90ffc
fix typing
shruthilayaj Nov 4, 2024
efa8aaa
tests
shruthilayaj Nov 6, 2024
69cfc87
subscription data test
shruthilayaj Nov 6, 2024
01f952c
merge conflicts
shruthilayaj Nov 7, 2024
f532342
fix typing
shruthilayaj Nov 7, 2024
2cd4713
codec tests
shruthilayaj Nov 8, 2024
6c05e65
Merge branch 'master' into shruthi/feat/add-create-subscriptions-endp…
shruthilayaj Nov 8, 2024
958926d
task result encoder
shruthilayaj Nov 8, 2024
9a8f16e
fix typing
shruthilayaj Nov 8, 2024
67372da
not deterministic
shruthilayaj Nov 8, 2024
c16e31b
fix test
shruthilayaj Nov 8, 2024
d152941
remove breakpoint
shruthilayaj Nov 8, 2024
76a7f0d
feat(subscriptions): Add a create subscription rpc endpoint
shruthilayaj Nov 18, 2024
5726ade
move validate to base class
shruthilayaj Nov 18, 2024
66270bf
inspect what's stored in redis in the test
shruthilayaj Nov 18, 2024
baaa03c
Merge branch 'master' into shruthi/feat/add-create-subscriptions-endp…
shruthilayaj Nov 18, 2024
7191833
merge conflicts
shruthilayaj Nov 18, 2024
c99d49c
serialize what's stored in redis
shruthilayaj Nov 19, 2024
17a49fe
add a comment
shruthilayaj Nov 19, 2024
83f1693
add more validation
shruthilayaj Nov 20, 2024
49e6e9b
merge
shruthilayaj Nov 20, 2024
e7fa5fb
fix tests
shruthilayaj Nov 20, 2024
15d29f6
minimize diff
shruthilayaj Nov 20, 2024
df861f5
remove rpc helper file
shruthilayaj Nov 20, 2024
356d368
update test subscriptions to be valid
shruthilayaj Nov 20, 2024
baf5566
fix typing
shruthilayaj Nov 20, 2024
6e9424a
fix merge conflcits
shruthilayaj Nov 20, 2024
6321e0c
remove rpc helper file
shruthilayaj Nov 21, 2024
f7f8448
return none, not 0 when no data is found
shruthilayaj Nov 21, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,4 @@ sqlparse==0.5.0
google-api-python-client==2.88.0
sentry-usage-accountant==0.0.10
freezegun==1.2.2
sentry-protos==0.1.31
sentry-protos==0.1.34
1 change: 1 addition & 0 deletions snuba/datasets/slicing.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
should be stored. These do not require individual physical partitions but allow
for repartitioning with less code changes per physical change.
"""

from snuba.clusters.storage_sets import StorageSetKey

SENTRY_LOGICAL_PARTITIONS = 256
Expand Down
31 changes: 27 additions & 4 deletions snuba/subscriptions/codecs.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
import base64
import json
from datetime import datetime
from typing import cast

import rapidjson
from arroyo.backends.kafka import KafkaPayload
from google.protobuf.message import Message as ProtobufMessage
from sentry_kafka_schemas.schema_types import events_subscription_results_v1

from snuba.datasets.entities.entity_key import EntityKey
from snuba.query.exceptions import InvalidQueryException
from snuba.subscriptions.data import (
RPCSubscriptionData,
ScheduledSubscriptionTask,
SnQLSubscriptionData,
Subscription,
SubscriptionData,
SubscriptionIdentifier,
SubscriptionTaskResult,
SubscriptionType,
SubscriptionWithMetadata,
)
from snuba.utils.codecs import Codec, Encoder
Expand All @@ -33,6 +37,9 @@ def decode(self, value: bytes) -> SubscriptionData:
except json.JSONDecodeError:
raise InvalidQueryException("Invalid JSON")

if data.get("subscription_type") == SubscriptionType.RPC.value:
return RPCSubscriptionData.from_dict(data, self.entity_key)

return SnQLSubscriptionData.from_dict(data, self.entity_key)


Expand All @@ -42,11 +49,22 @@ def encode(self, value: SubscriptionTaskResult) -> KafkaPayload:
subscription_id = str(subscription.identifier)
request, result = value.result

if isinstance(request, ProtobufMessage):
original_body = {
"request": base64.b64encode(request.SerializeToString()).decode(
"utf-8"
),
"request_name": request.__class__.__name__,
"request_version": request.__class__.__module__.split(".", 3)[2],
}
else:
original_body = {**request.original_body}

data: events_subscription_results_v1.SubscriptionResult = {
"version": 3,
"payload": {
"subscription_id": subscription_id,
"request": {**request.original_body},
"request": original_body,
"result": {
"data": result["data"],
"meta": result["meta"],
Expand Down Expand Up @@ -98,15 +116,20 @@ def decode(self, value: KafkaPayload) -> ScheduledSubscriptionTask:

entity_key = EntityKey(scheduled_subscription_dict["entity"])

data = scheduled_subscription_dict["task"]["data"]
subscription: SubscriptionData
if data.get("subscription_type") == SubscriptionType.RPC.value:
subscription = RPCSubscriptionData.from_dict(data, entity_key)
else:
subscription = SnQLSubscriptionData.from_dict(data, entity_key)

return ScheduledSubscriptionTask(
datetime.fromisoformat(scheduled_subscription_dict["timestamp"]),
SubscriptionWithMetadata(
entity_key,
Subscription(
SubscriptionIdentifier.from_string(subscription_identifier),
SnQLSubscriptionData.from_dict(
scheduled_subscription_dict["task"]["data"], entity_key
),
subscription,
),
scheduled_subscription_dict["tick_upper_offset"],
),
Expand Down
Loading
Loading