Skip to content

Commit

Permalink
ref: fix types for sentry.utils.services
Browse files Browse the repository at this point in the history
  • Loading branch information
asottile-sentry committed Nov 21, 2024
1 parent fcdc458 commit c56941b
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 70 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,6 @@ module = [
"sentry.testutils.helpers.notifications",
"sentry.utils.auth",
"sentry.utils.committers",
"sentry.utils.services",
"sentry.web.forms.accounts",
"sentry.web.frontend.auth_login",
"sentry.web.frontend.auth_organization_login",
Expand Down Expand Up @@ -496,6 +495,7 @@ module = [
"sentry.utils.redis",
"sentry.utils.redis_metrics",
"sentry.utils.sentry_apps.*",
"sentry.utils.services",
"sentry.utils.sms",
"sentry.utils.snowflake",
"sentry.utils.urls",
Expand Down
7 changes: 4 additions & 3 deletions src/sentry/conf/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from sentry.conf.types.role_dict import RoleDict
from sentry.conf.types.sdk_config import ServerSdkConfig
from sentry.conf.types.sentry_config import SentryMode
from sentry.conf.types.service_options import ServiceOptions
from sentry.utils import json # NOQA (used in getsentry config)
from sentry.utils.celery import crontab_with_minute_jitter, make_split_task_queues
from sentry.utils.types import Type, type_from_value
Expand Down Expand Up @@ -3161,12 +3162,12 @@ def custom_parameter_sort(parameter: dict) -> tuple[str, int]:
SENTRY_SNOWFLAKE_EPOCH_START = datetime(2022, 8, 8, 0, 0).timestamp()
SENTRY_USE_SNOWFLAKE = False

SENTRY_DEFAULT_LOCKS_BACKEND_OPTIONS = {
SENTRY_DEFAULT_LOCKS_BACKEND_OPTIONS: ServiceOptions = {
"path": "sentry.utils.locking.backends.redis.RedisLockBackend",
"options": {"cluster": "default"},
}

SENTRY_POST_PROCESS_LOCKS_BACKEND_OPTIONS = {
SENTRY_POST_PROCESS_LOCKS_BACKEND_OPTIONS: ServiceOptions = {
"path": "sentry.utils.locking.backends.redis.RedisLockBackend",
"options": {"cluster": "default"},
}
Expand Down Expand Up @@ -3244,7 +3245,7 @@ def custom_parameter_sort(parameter: dict) -> tuple[str, int]:
# lost as a result of toggling this setting.
SENTRY_REPLAYS_ATTEMPT_LEGACY_FILESTORE_LOOKUP = True

SENTRY_FEATURE_ADOPTION_CACHE_OPTIONS = {
SENTRY_FEATURE_ADOPTION_CACHE_OPTIONS: ServiceOptions = {
"path": "sentry.models.featureadoption.FeatureAdoptionRedisBackend",
"options": {"cluster": "default"},
}
Expand Down
9 changes: 9 additions & 0 deletions src/sentry/conf/types/service_options.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from __future__ import annotations

from typing import TypedDict


class ServiceOptions(TypedDict, total=False):
path: str
options: dict[str, object]
executor: ServiceOptions
40 changes: 23 additions & 17 deletions src/sentry/utils/concurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,15 @@
from contextlib import contextmanager
from queue import Full, PriorityQueue
from time import time
from typing import Generic, NamedTuple, TypeVar
from typing import Any, NamedTuple

import sentry_sdk
import sentry_sdk.scope

logger = logging.getLogger(__name__)

T = TypeVar("T")


def execute(function: Callable[..., T], daemon=True):
def execute[T](function: Callable[..., T], daemon=True) -> Future[T]:
future: Future[T] = Future()

def run():
Expand All @@ -41,7 +39,7 @@ def run():


@functools.total_ordering
class PriorityTask(NamedTuple, Generic[T]):
class PriorityTask[T](NamedTuple):
priority: int
item: tuple[sentry_sdk.Scope, sentry_sdk.Scope, Callable[[], T], Future[T]]

Expand All @@ -52,7 +50,7 @@ def __lt__(self, b):
return self.priority < b.priority


class TimedFuture(Future[T]):
class TimedFuture[T](Future[T]):
_condition: threading.Condition
_state: str

Expand Down Expand Up @@ -124,7 +122,7 @@ def set_exception(self, *args, **kwargs):
return super().set_exception(*args, **kwargs)


class Executor(Generic[T]):
class Executor:
"""
This class provides an API for executing tasks in different contexts
(immediately, or asynchronously.)
Expand All @@ -136,9 +134,15 @@ class Executor(Generic[T]):
to allow controlling whether or not queue insertion should be blocking.
"""

Future = TimedFuture

def submit(self, callable, priority=0, block=True, timeout=None) -> TimedFuture[T]:
def submit[
T
](
self,
callable: Callable[[], T],
priority: int = 0,
block: bool = True,
timeout=None,
) -> TimedFuture[T]:
"""
Enqueue a task to be executed, returning a ``TimedFuture``.

Expand All @@ -149,7 +153,7 @@ def submit(self, callable, priority=0, block=True, timeout=None) -> TimedFuture[
raise NotImplementedError


class SynchronousExecutor(Executor[T]):
class SynchronousExecutor(Executor):
"""
This executor synchronously executes callables in the current thread.
Expand All @@ -160,11 +164,11 @@ class SynchronousExecutor(Executor[T]):
# TODO: The ``Future`` implementation here could be replaced with a
# lock-free future for efficiency.

def submit(self, callable, *args, **kwargs):
def submit[T](self, callable: Callable[[], T], *args, **kwargs) -> TimedFuture[T]:
"""
Immediately execute a callable, returning a ``TimedFuture``.
"""
future: Future[T] = self.Future()
future: TimedFuture[T] = TimedFuture()
assert future.set_running_or_notify_cancel()
try:
result = callable()
Expand All @@ -175,7 +179,7 @@ def submit(self, callable, *args, **kwargs):
return future


class ThreadedExecutor(Executor[T]):
class ThreadedExecutor(Executor):
"""\
This executor provides a method of executing callables in a threaded worker
pool. The number of outstanding requests can be limited by the ``maxsize``
Expand All @@ -190,7 +194,7 @@ def __init__(self, worker_count=1, maxsize=0):
self.__worker_count = worker_count
self.__workers = set()
self.__started = False
self.__queue: PriorityQueue[PriorityTask[T]] = PriorityQueue(maxsize)
self.__queue: PriorityQueue[PriorityTask[Any]] = PriorityQueue(maxsize)
self.__lock = threading.Lock()

def __worker(self):
Expand Down Expand Up @@ -223,7 +227,9 @@ def start(self):

self.__started = True

def submit(self, callable, priority=0, block=True, timeout=None):
def submit[
T
](self, callable: Callable[[], T], priority=0, block=True, timeout=None) -> TimedFuture[T]:
"""\
Enqueue a task to be executed, returning a ``TimedFuture``.

Expand All @@ -237,7 +243,7 @@ def submit(self, callable, priority=0, block=True, timeout=None):
if not self.__started:
self.start()

future: Future[T] = self.Future()
future: TimedFuture[T] = TimedFuture()
task = PriorityTask(
priority,
(
Expand Down
9 changes: 5 additions & 4 deletions src/sentry/utils/locking/backends/migration.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from collections.abc import Callable, Mapping
from typing import Any, Optional, Union
from collections.abc import Callable
from typing import Optional, Union

from sentry.conf.types.service_options import ServiceOptions
from sentry.utils.locking.backends import LockBackend
from sentry.utils.services import build_instance_from_options_of_type, resolve_callable

Expand Down Expand Up @@ -53,8 +54,8 @@ def selector_func(key, routing_key, backend_new, backend_old):

def __init__(
self,
backend_new_config: Mapping[str, Any],
backend_old_config: Mapping[str, Any],
backend_new_config: ServiceOptions,
backend_old_config: ServiceOptions,
selector_func_path: str | SelectorFncType | None = None,
):
self.backend_new = build_instance_from_options_of_type(LockBackend, backend_new_config)
Expand Down
51 changes: 23 additions & 28 deletions src/sentry/utils/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,38 +6,34 @@
import logging
import threading
from collections.abc import Callable, Mapping, Sequence
from typing import Any, TypeVar
from typing import Any

from rest_framework.request import Request
from django.http.request import HttpRequest

from sentry import options
from sentry.conf.types.service_options import ServiceOptions
from sentry.utils.concurrent import Executor, FutureSet, ThreadedExecutor, TimedFuture

# TODO: adjust modules to import from new location -- the weird `as` syntax is for mypy
from sentry.utils.lazy_service_wrapper import LazyServiceWrapper as LazyServiceWrapper # noqa: F401
from sentry.utils.lazy_service_wrapper import Service as Service

from .imports import import_string
from .types import AnyCallable

logger = logging.getLogger(__name__)

T = TypeVar("T")

CallableT = TypeVar("CallableT", bound=Callable[..., object])


def resolve_callable(value: str | CallableT) -> CallableT:
if callable(value):
return value
elif isinstance(value, str):
def resolve_callable[CallableT: Callable[..., object]](value: str | CallableT) -> CallableT:
if isinstance(value, str):
return import_string(value)
elif callable(value):
return value
else:
raise TypeError("Expected callable or string")


class Context:
def __init__(self, request: Request, backends: dict[type[Service | None], Service]):
def __init__(self, request: HttpRequest | None, backends: dict[type[Service | None], Service]):
self.request = request
self.backends = backends

Expand All @@ -51,7 +47,7 @@ def copy(self) -> Context:
]

Callback = Callable[
[Context, str, Mapping[str, Any], Sequence[str], Sequence[TimedFuture]],
[Context, str, Mapping[str, Any], Sequence[str], Sequence[TimedFuture[Any] | None]],
None,
]

Expand Down Expand Up @@ -253,7 +249,7 @@ def call_backend_method(context: Context, backend: Service, is_primary: bool) ->
# executed before the primary request is queued. This is such a
# strange usage pattern that I don't think it's worth optimizing
# for.)
results = [None] * len(selected_backend_names)
results: list[TimedFuture[Any] | None] = [None] * len(selected_backend_names)
for i, backend_name in enumerate(selected_backend_names[1:], 1):
try:
backend, executor = self.backends[backend_name]
Expand All @@ -276,7 +272,7 @@ def call_backend_method(context: Context, backend: Service, is_primary: bool) ->
# calling thread. (We don't have to protect this from ``KeyError``
# since we already ensured that the primary backend exists.)
backend, executor = self.backends[selected_backend_names[0]]
results[0] = executor.submit(
result = results[0] = executor.submit(
functools.partial(call_backend_method, context.copy(), backend, is_primary=True),
priority=0,
block=True,
Expand All @@ -289,14 +285,13 @@ def call_backend_method(context: Context, backend: Service, is_primary: bool) ->
)
)

result: TimedFuture = results[0]
return result.result()

return execute


def build_instance_from_options(
options: Mapping[str, object],
options: ServiceOptions,
*,
default_constructor: Callable[..., object] | None = None,
) -> object:
Expand All @@ -313,9 +308,11 @@ def build_instance_from_options(
return constructor(**options.get("options", {}))


def build_instance_from_options_of_type(
def build_instance_from_options_of_type[
T
](
tp: type[T],
options: Mapping[str, object],
options: ServiceOptions,
*,
default_constructor: Callable[..., T] | None = None,
) -> T:
Expand Down Expand Up @@ -364,17 +361,17 @@ class ServiceDelegator(Delegator, Service):
def __init__(
self,
backend_base: str,
backends: Mapping[str, Mapping[str, Any]],
selector_func: str | AnyCallable,
callback_func: str | AnyCallable | None = None,
backends: Mapping[str, ServiceOptions],
selector_func: str | Selector,
callback_func: str | Callback | None = None,
):
super().__init__(
import_string(backend_base),
{
name: (
build_instance_from_options(options),
build_instance_from_options(
options.get("executor", {}), default_constructor=ThreadedExecutor
build_instance_from_options_of_type(Service, options),
build_instance_from_options_of_type(
Executor, options.get("executor", {}), default_constructor=ThreadedExecutor
),
)
for name, options in backends.items()
Expand Down Expand Up @@ -435,9 +432,7 @@ def selector(context: Context, method: str, callargs: Mapping[str, Any]) -> list
else:
intkey = key

if not isinstance(intkey, int):
logger.error("make_writebehind_selector.invalid", extra={"received_type": type(intkey)})
return [move_from]
assert isinstance(intkey, int), intkey

if rollout_rate < 0:
if (intkey % 10000) / 10000 < rollout_rate * -1.0:
Expand Down
6 changes: 3 additions & 3 deletions tests/sentry/utils/test_concurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ def test_timed_future_cancel():


def test_synchronous_executor():
executor: SynchronousExecutor[object] = SynchronousExecutor()
executor = SynchronousExecutor()

assert executor.submit(lambda: mock.sentinel.RESULT).result() is mock.sentinel.RESULT

Expand All @@ -204,7 +204,7 @@ def callable():


def test_threaded_same_priority_Tasks():
executor: ThreadedExecutor[None] = ThreadedExecutor(worker_count=1)
executor = ThreadedExecutor(worker_count=1)

def callable():
pass
Expand All @@ -215,7 +215,7 @@ def callable():


def test_threaded_executor():
executor: ThreadedExecutor[int] = ThreadedExecutor(worker_count=1, maxsize=3)
executor = ThreadedExecutor(worker_count=1, maxsize=3)

def waiter(ready, waiting, result):
ready.set()
Expand Down
15 changes: 1 addition & 14 deletions tests/sentry/utils/test_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def apply(self, x: int, y: int) -> int:

@pytest.fixture
def delegator_fixture() -> tuple[Delegator, Mock, Mock]:
executor: SynchronousExecutor[object] = SynchronousExecutor()
executor = SynchronousExecutor()
selector = Mock()
callback = Mock()
delegator = Delegator(
Expand Down Expand Up @@ -187,16 +187,3 @@ def test_make_writebehind_selector_int_key(register_option):
with override_options({"feature.rollout": 1.0}):
result = selector(context, "do_thing", {})
assert result == ["new", "old"]


def test_make_writebehind_selector_invalid_key(register_option):
context = Mock()
selector = make_writebehind_selector(
option_name="feature.rollout",
move_to="new",
move_from="old",
key_fetch=lambda *args: {"lol": "nope"}, # type: ignore[arg-type]
)
with override_options({"feature.rollout": 1.0}):
result = selector(context, "do_thing", {})
assert result == ["old"]

0 comments on commit c56941b

Please sign in to comment.