Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Revert "ref: Deprecate TOPIC_PARTITION_COUNTS setting (#5868)"" #5942

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions snuba/datasets/table_storage.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from functools import cached_property
from typing import Any, Mapping, Optional, Sequence

from arroyo.backends.kafka import KafkaPayload
from confluent_kafka.admin import AdminClient, _TopicCollection

from snuba import settings
from snuba.clickhouse.http import InsertStatement, JSONRow
Expand All @@ -22,6 +24,7 @@
from snuba.subscriptions.utils import SchedulingWatermarkMode
from snuba.utils.metrics import MetricsBackend
from snuba.utils.schemas import ReadOnly
from snuba.utils.streams.configuration_builder import get_default_kafka_configuration
from snuba.utils.streams.topics import Topic, get_topic_creation_config
from snuba.writer import BatchWriter

Expand Down Expand Up @@ -57,13 +60,16 @@ def get_physical_topic_name(self, slice_id: Optional[int] = None) -> str:

return physical_topic

@property
@cached_property
def partitions_number(self) -> int:
return settings.TOPIC_PARTITION_COUNTS.get(self.__topic.value, 1)

@property
def replication_factor(self) -> int:
return 1
config = get_default_kafka_configuration(self.__topic, None)
client = AdminClient(config)
topic_name = self.get_physical_topic_name()
return len(
client.describe_topics(_TopicCollection([topic_name]))[topic_name]
.result()
.partitions
)

@property
def topic_creation_config(self) -> Mapping[str, str]:
Expand Down
8 changes: 5 additions & 3 deletions snuba/utils/manage_topics.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,18 @@
logger = logging.getLogger(__name__)


def create_topics(client: AdminClient, topics: Sequence[Topic]) -> None:
def create_topics(
client: AdminClient, topics: Sequence[Topic], num_partitions: int = 1
) -> None:
topics_to_create = {}

for topic in topics:
topic_spec = KafkaTopicSpec(topic)
logger.debug("Adding topic %s to creation list", topic_spec.topic_name)
topics_to_create[topic_spec.topic_name] = NewTopic(
topic_spec.topic_name,
num_partitions=topic_spec.partitions_number,
replication_factor=topic_spec.replication_factor,
num_partitions=num_partitions,
replication_factor=1,
config=topic_spec.topic_creation_config,
)

Expand Down
15 changes: 15 additions & 0 deletions tests/datasets/test_table_storage.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
from confluent_kafka.admin import AdminClient

from snuba.datasets.storages.factory import get_writable_storage
from snuba.datasets.storages.storage_key import StorageKey
from snuba.datasets.table_storage import KafkaTopicSpec
from snuba.settings import SLICED_KAFKA_TOPIC_MAP
from snuba.utils.manage_topics import create_topics
from snuba.utils.streams.configuration_builder import get_default_kafka_configuration
from snuba.utils.streams.topics import Topic
from snuba.utils.streams.topics import Topic as SnubaTopic


def test_get_physical_topic_name(monkeypatch) -> None: # type: ignore
Expand All @@ -19,3 +26,11 @@ def test_get_physical_topic_name(monkeypatch) -> None: # type: ignore
physical_topic_name = default_topic_spec.get_physical_topic_name(slice_id=2)

assert physical_topic_name == "ingest-replay-events-2"


def test_partitions_number() -> None:
admin_client = AdminClient(get_default_kafka_configuration())
create_topics(admin_client, [SnubaTopic.REPLAYEVENTS])

topic_spec = KafkaTopicSpec(Topic.REPLAYEVENTS)
assert topic_spec.partitions_number == 1
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from uuid import UUID

import pytest
from confluent_kafka.admin import AdminClient

from snuba.datasets.entities.entity_key import EntityKey
from snuba.datasets.entities.factory import get_entity, get_entity_name
Expand All @@ -13,7 +14,10 @@
from snuba.subscriptions.data import PartitionId, SubscriptionData
from snuba.subscriptions.store import RedisSubscriptionDataStore
from snuba.subscriptions.subscription import SubscriptionCreator
from snuba.utils.manage_topics import create_topics
from snuba.utils.metrics.timer import Timer
from snuba.utils.streams.configuration_builder import get_default_kafka_configuration
from snuba.utils.streams.topics import Topic as SnubaTopic

dataset = get_dataset("generic_metrics")
entity = get_entity(EntityKey.GENERIC_METRICS_SETS)
Expand Down Expand Up @@ -50,6 +54,9 @@ def subscription_data_builder() -> SubscriptionData:
@pytest.mark.clickhouse_db
@pytest.mark.redis_db
def test_entity_subscriptions_data() -> None:
admin_client = AdminClient(get_default_kafka_configuration())
create_topics(admin_client, [SnubaTopic.GENERIC_METRICS])

subscription_data = subscription_data_builder()

subscription_identifier = SubscriptionCreator(dataset, entity_key).create(
Expand Down
6 changes: 3 additions & 3 deletions tests/subscriptions/test_partitioner.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import pytest

from snuba import settings
from snuba.datasets.entities.entity_key import EntityKey
from snuba.datasets.entities.factory import get_entity
from snuba.datasets.table_storage import KafkaTopicSpec
Expand Down Expand Up @@ -44,7 +43,8 @@ class TestBuildRequest(BaseSubscriptionTest):
@pytest.mark.parametrize("subscription", TESTS)
@pytest.mark.clickhouse_db
def test(self, subscription: SubscriptionData) -> None:
settings.TOPIC_PARTITION_COUNTS = {"events": 64}
partitioner = TopicSubscriptionDataPartitioner(KafkaTopicSpec(Topic.EVENTS))
kafka_topic_spec = KafkaTopicSpec(Topic.EVENTS)
kafka_topic_spec.partitions_number = 64
partitioner = TopicSubscriptionDataPartitioner(kafka_topic_spec)

assert partitioner.build_partition_id(subscription) == 18
12 changes: 8 additions & 4 deletions tests/subscriptions/test_scheduler_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,15 @@

@pytest.mark.redis_db
def test_scheduler_consumer(tmpdir: LocalPath) -> None:
settings.TOPIC_PARTITION_COUNTS = {"events": 2}
settings.KAFKA_TOPIC_MAP = {
"events": "events-test",
"snuba-commit-log": "snuba-commit-log-test",
}
importlib.reload(scheduler_consumer)

admin_client = AdminClient(get_default_kafka_configuration())
create_topics(admin_client, [SnubaTopic.COMMIT_LOG])
create_topics(admin_client, [SnubaTopic.EVENTS], 2)
create_topics(admin_client, [SnubaTopic.COMMIT_LOG], 1)

metrics_backend = TestingMetricsBackend()
entity_name = "events"
Expand All @@ -52,7 +56,7 @@ def test_scheduler_consumer(tmpdir: LocalPath) -> None:
assert storage is not None
stream_loader = storage.get_table_writer().get_stream_loader()

commit_log_topic = Topic("snuba-commit-log")
commit_log_topic = Topic("snuba-commit-log-test")

mock_scheduler_producer = mock.Mock()

Expand Down Expand Up @@ -138,7 +142,7 @@ def test_scheduler_consumer(tmpdir: LocalPath) -> None:
assert (tmpdir / "health.txt").check()
assert mock_scheduler_producer.produce.call_count == 2

settings.TOPIC_PARTITION_COUNTS = {}
settings.KAFKA_TOPIC_MAP = {}


def test_tick_time_shift() -> None:
Expand Down
9 changes: 9 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import pytest
import simplejson as json
from confluent_kafka.admin import AdminClient
from dateutil.parser import parse as parse_datetime
from sentry_sdk import Client, Hub

Expand All @@ -23,6 +24,9 @@
from snuba.processor import InsertBatch, InsertEvent, ReplacementType
from snuba.redis import RedisClientKey, RedisClientType, get_redis_client
from snuba.subscriptions.store import RedisSubscriptionDataStore
from snuba.utils.manage_topics import create_topics
from snuba.utils.streams.configuration_builder import get_default_kafka_configuration
from snuba.utils.streams.topics import Topic as SnubaTopic
from tests.base import BaseApiTest
from tests.conftest import SnubaSetConfig
from tests.helpers import write_processed_messages
Expand Down Expand Up @@ -2149,6 +2153,9 @@ class TestCreateSubscriptionApi(BaseApiTest):
entity_key = "events"

def test(self) -> None:
admin_client = AdminClient(get_default_kafka_configuration())
create_topics(admin_client, [SnubaTopic.EVENTS])

expected_uuid = uuid.uuid1()

with patch("snuba.subscriptions.subscription.uuid1") as uuid4:
Expand Down Expand Up @@ -2219,6 +2226,8 @@ def test_selected_entity_is_used(self) -> None:
Test that ensures that the passed entity is the selected one, not the dataset's default
entity
"""
admin_client = AdminClient(get_default_kafka_configuration())
create_topics(admin_client, [SnubaTopic.METRICS])

expected_uuid = uuid.uuid1()
entity_key = EntityKey.METRICS_COUNTERS
Expand Down
Loading