Skip to content

Commit

Permalink
ref: bump sentry-arroyo to 2.14.13 (#4887)
Browse files Browse the repository at this point in the history
Co-authored-by: getsentry-bot <[email protected]>
Co-authored-by: lynnagara <[email protected]>
  • Loading branch information
3 people authored Oct 18, 2023
1 parent f18a9a8 commit 40d2510
Show file tree
Hide file tree
Showing 10 changed files with 95 additions and 97 deletions.
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pytest-watch==4.2.0
python-dateutil==2.8.2
python-rapidjson==1.8
redis==4.3.4
sentry-arroyo==2.14.12
sentry-arroyo==2.14.13
sentry-kafka-schemas==0.1.31
sentry-redis-tools==0.1.7
sentry-relay==0.8.27
Expand Down
12 changes: 10 additions & 2 deletions snuba/consumers/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,11 @@ def close(self) -> None:
for partition, (offset, timestamp) in self.__offsets_to_produce.items():
payload = commit_codec.encode(
CommitLogCommit(
self.__commit_log_config.group_id, partition, offset, timestamp
self.__commit_log_config.group_id,
partition,
offset,
datetime.timestamp(timestamp),
None,
)
)
self.__commit_log_config.producer.produce(
Expand Down Expand Up @@ -447,7 +451,11 @@ def close(self) -> None:
for partition, (offset, timestamp) in self.__offsets_to_produce.items():
payload = commit_codec.encode(
CommitLogCommit(
self.__commit_log_config.group_id, partition, offset, timestamp
self.__commit_log_config.group_id,
partition,
offset,
datetime.timestamp(timestamp),
None,
)
)
self.__commit_log_config.producer.produce(
Expand Down
4 changes: 2 additions & 2 deletions snuba/subscriptions/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,8 @@ def find(self, tick: Tick) -> Iterator[ScheduledSubscriptionTask]:
subscriptions = self.__get_subscriptions()

for timestamp in range(
math.ceil(interval.lower.timestamp()),
math.ceil(interval.upper.timestamp()),
math.ceil(interval.lower),
math.ceil(interval.upper),
):
for subscription in subscriptions:
task = self.__builder.get_task(
Expand Down
8 changes: 4 additions & 4 deletions snuba/subscriptions/scheduler_consumer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from datetime import datetime, timedelta
from datetime import timedelta
from typing import Callable, Mapping, MutableMapping, NamedTuple, Optional, Sequence

from arroyo.backends.abstract import Consumer, Producer
Expand Down Expand Up @@ -38,10 +38,10 @@

class MessageDetails(NamedTuple):
offset: int
orig_message_ts: datetime
orig_message_ts: float
# The timestamp the message was first received by Sentry (Relay)
# It is optional since it is not currently present on all topics
received_ts: Optional[datetime]
received_ts: Optional[float]


class CommitLogTickConsumer(Consumer[Tick]):
Expand Down Expand Up @@ -104,7 +104,7 @@ def __init__(
self.__followed_consumer_group = followed_consumer_group
self.__previous_messages: MutableMapping[Partition, MessageDetails] = {}
self.__metrics = metrics
self.__time_shift = time_shift if time_shift is not None else timedelta()
self.__time_shift = time_shift.total_seconds() if time_shift is not None else 0

def subscribe(
self,
Expand Down
5 changes: 2 additions & 3 deletions snuba/subscriptions/scheduler_processing_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def __update_offset_high_watermark(self, value: BrokerValue[Tick]) -> None:
# Record the lag between the fastest and slowest partition
self.__metrics.timing(
"partition_lag_ms",
(fastest - slowest).total_seconds() * 1000,
(fastest - slowest) * 1000,
)

if (
Expand Down Expand Up @@ -441,8 +441,7 @@ def submit(self, message: Message[CommittableTick]) -> None:

if (
self.__stale_threshold_seconds is not None
and time.time() - datetime.timestamp(tick.timestamps.lower)
> self.__stale_threshold_seconds
and time.time() - tick.timestamps.lower > self.__stale_threshold_seconds
):
encoded_tasks = []
else:
Expand Down
5 changes: 2 additions & 3 deletions snuba/subscriptions/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from __future__ import annotations

import logging
from datetime import datetime, timedelta
from enum import Enum
from typing import NamedTuple

Expand All @@ -18,9 +17,9 @@ class SchedulingWatermarkMode(Enum):
class Tick(NamedTuple):
partition: int
offsets: Interval[int]
timestamps: Interval[datetime]
timestamps: Interval[float]

def time_shift(self, delta: timedelta) -> Tick:
def time_shift(self, delta: float) -> Tick:
"""
Returns a new ``Tick`` instance that has had the bounds of its time
interval shifted by the provided delta.
Expand Down
4 changes: 2 additions & 2 deletions tests/subscriptions/test_combined_scheduler_executor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import time
import uuid
from contextlib import closing
from datetime import datetime, timedelta
from datetime import datetime
from unittest import mock

import pytest
Expand Down Expand Up @@ -79,7 +79,7 @@ def test_combined_scheduler_and_executor(tmpdir: LocalPath) -> None:
Tick(
0,
offsets=Interval(1, 3),
timestamps=Interval(epoch, epoch + timedelta(seconds=60)),
timestamps=Interval(epoch.timestamp(), epoch.timestamp() + 60),
),
partition,
4,
Expand Down
9 changes: 8 additions & 1 deletion tests/subscriptions/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,14 @@ def build_subscription(self, resolution: timedelta) -> Subscription:
)

def build_tick(self, lower: timedelta, upper: timedelta) -> Tick:
return Tick(1, Interval(1, 5), Interval(self.now + lower, self.now + upper))
return Tick(
1,
Interval(1, 5),
Interval(
self.now.timestamp() + lower.total_seconds(),
self.now.timestamp() + upper.total_seconds(),
),
)

def sort_key(self, task: ScheduledSubscriptionTask) -> Tuple[datetime, uuid.UUID]:
return task.timestamp, task.task.subscription.identifier.uuid
Expand Down
71 changes: 40 additions & 31 deletions tests/subscriptions/test_scheduler_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ def test_scheduler_consumer(tmpdir: LocalPath) -> None:
"events",
Partition(commit_log_topic, partition),
offset,
orig_message_ts,
orig_message_ts.timestamp(),
None,
)
),
)
Expand All @@ -144,11 +145,11 @@ def test_scheduler_consumer(tmpdir: LocalPath) -> None:
def test_tick_time_shift() -> None:
partition = 0
offsets = Interval(0, 1)
tick = Tick(
partition, offsets, Interval(datetime(1970, 1, 1), datetime(1970, 1, 2))
)
assert tick.time_shift(timedelta(hours=24)) == Tick(
partition, offsets, Interval(datetime(1970, 1, 2), datetime(1970, 1, 3))
tick = Tick(partition, offsets, Interval(0, 60 * 60 * 24))
assert tick.time_shift(timedelta(hours=24).total_seconds()) == Tick(
partition,
offsets,
Interval(datetime(1970, 1, 2).timestamp(), datetime(1970, 1, 3).timestamp()),
)


Expand Down Expand Up @@ -176,7 +177,11 @@ def test_tick_consumer(time_shift: Optional[timedelta]) -> None:
for offset in offsets:
payload = commit_codec.encode(
Commit(
followed_consumer_group, Partition(topic, partition), offset, epoch
followed_consumer_group,
Partition(topic, partition),
offset,
epoch.timestamp(),
None,
)
)
producer.produce(Partition(topic, 0), payload).result()
Expand Down Expand Up @@ -212,9 +217,11 @@ def _assignment_callback(offsets: Mapping[Partition, int]) -> None:

# consume 0, 1
assert consumer.poll() == BrokerValue(
Tick(0, offsets=Interval(0, 1), timestamps=Interval(epoch, epoch)).time_shift(
time_shift
),
Tick(
0,
offsets=Interval(0, 1),
timestamps=Interval(epoch.timestamp(), epoch.timestamp()),
).time_shift(time_shift.total_seconds()),
Partition(topic, 0),
1,
epoch,
Expand All @@ -226,9 +233,11 @@ def _assignment_callback(offsets: Mapping[Partition, int]) -> None:

# consume 0, 2
assert consumer.poll() == BrokerValue(
Tick(0, offsets=Interval(1, 2), timestamps=Interval(epoch, epoch)).time_shift(
time_shift
),
Tick(
0,
offsets=Interval(1, 2),
timestamps=Interval(epoch.timestamp(), epoch.timestamp()),
).time_shift(time_shift.total_seconds()),
Partition(topic, 0),
2,
epoch,
Expand Down Expand Up @@ -267,9 +276,11 @@ def _assignment_callback(offsets: Mapping[Partition, int]) -> None:

# consume 0, 2
assert consumer.poll() == BrokerValue(
Tick(0, offsets=Interval(1, 2), timestamps=Interval(epoch, epoch)).time_shift(
time_shift
),
Tick(
0,
offsets=Interval(1, 2),
timestamps=Interval(epoch.timestamp(), epoch.timestamp()),
).time_shift(time_shift.total_seconds()),
Partition(topic, 0),
2,
epoch,
Expand Down Expand Up @@ -313,15 +324,17 @@ def _assignment_callback(offsets: Mapping[Partition, int]) -> None:

producer.produce(
partition,
commit_codec.encode(Commit(followed_consumer_group, partition, 0, epoch)),
commit_codec.encode(
Commit(followed_consumer_group, partition, 0, epoch.timestamp(), None)
),
).result()

clock.sleep(1)

producer.produce(
partition,
commit_codec.encode(
Commit(followed_consumer_group, partition, 1, epoch + timedelta(seconds=1))
Commit(followed_consumer_group, partition, 1, epoch.timestamp() + 1, None)
),
).result()

Expand All @@ -335,7 +348,7 @@ def _assignment_callback(offsets: Mapping[Partition, int]) -> None:
Tick(
0,
offsets=Interval(0, 1),
timestamps=Interval(epoch, epoch + timedelta(seconds=1)),
timestamps=Interval(epoch.timestamp(), epoch.timestamp() + 1),
),
partition,
1,
Expand All @@ -346,7 +359,9 @@ def _assignment_callback(offsets: Mapping[Partition, int]) -> None:

producer.produce(
partition,
commit_codec.encode(Commit(followed_consumer_group, partition, 2, epoch)),
commit_codec.encode(
Commit(followed_consumer_group, partition, 2, epoch.timestamp(), None)
),
).result()

with assert_changes(consumer.tell, {partition: 2}, {partition: 3}):
Expand All @@ -357,7 +372,7 @@ def _assignment_callback(offsets: Mapping[Partition, int]) -> None:
producer.produce(
partition,
commit_codec.encode(
Commit(followed_consumer_group, partition, 3, epoch + timedelta(seconds=2))
Commit(followed_consumer_group, partition, 3, epoch.timestamp() + 2, None)
),
).result()

Expand All @@ -366,9 +381,7 @@ def _assignment_callback(offsets: Mapping[Partition, int]) -> None:
Tick(
0,
offsets=Interval(1, 3),
timestamps=Interval(
epoch + timedelta(seconds=1), epoch + timedelta(seconds=2)
),
timestamps=Interval(epoch.timestamp() + 1, epoch.timestamp() + 2),
),
partition,
3,
Expand Down Expand Up @@ -425,20 +438,16 @@ def _assignment_callback(offsets: Mapping[Partition, int]) -> None:
followed_consumer_group,
partition,
5,
now,
now.timestamp(),
None,
)
),
).result()

producer.produce(
partition,
commit_codec.encode(
Commit(
followed_consumer_group,
partition,
4,
now - timedelta(seconds=2),
)
Commit(followed_consumer_group, partition, 4, now.timestamp() - 2, None)
),
).result()

Expand Down
Loading

0 comments on commit 40d2510

Please sign in to comment.