Skip to content

Commit

Permalink
Simplify subscription storage
Browse files Browse the repository at this point in the history
  • Loading branch information
nkiryanov committed Jun 7, 2024
1 parent 6de86d8 commit 3f0b0c9
Show file tree
Hide file tree
Showing 10 changed files with 91 additions and 174 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
- name: Install dependencies
if: steps.cache-dependencies.outputs.cache-hit != 'true'
run: |
pip install uv ruff
pip install uv
uv venv
source .venv/bin/activate
make install-dev-deps
Expand Down
2 changes: 2 additions & 0 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ python-dotenv==1.0.1
# via pydantic-settings
respx==0.21.1
# via websockets-notification-py (pyproject.toml)
ruff==0.4.8
# via websockets-notification-py (pyproject.toml)
sentry-sdk==2.3.1
# via websockets-notification-py (pyproject.toml)
six==1.16.0
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ dev = [
"respx",

"dotenv-linter",
"ruff",
"mypy",
]

Expand Down
29 changes: 11 additions & 18 deletions src/storage/storage_updaters/storage_user_subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@

from app.services import BaseService
from app.types import UserId
from app.types import Event
from storage.exceptions import StorageOperationException
from storage.subscription_storage import SubscriptionStorage
from storage.types import SubscriptionFullQualifiedIdentifier

logger = logging.getLogger(__name__)

Expand All @@ -17,7 +17,7 @@
class StorageUserSubscriber(BaseService):
storage: SubscriptionStorage
websocket: WebSocketServerProtocol
subscription_fqi: SubscriptionFullQualifiedIdentifier
event: Event

@cached_property
def websocket_user_id(self) -> UserId:
Expand All @@ -38,23 +38,16 @@ def validate_websocket_registered(self) -> UserId:
return self.websocket_user_id # access to property

def update_storage_subscriptions(self) -> None:
event, _, event_subscription_identifier = self.subscription_fqi
event_subscriptions: set[UserId] | None = self.storage.subscriptions.get(self.event)

# Create event with events_subscriptions_storage if it's the first subscription for the event
if event not in self.storage.subscriptions:
self.storage.subscriptions[event] = {}
logger.info("Event record for subscriptions created in storage. Event: '%s'", event)
if event_subscriptions is None:
event_subscriptions = set()
self.storage.subscriptions[self.event] = event_subscriptions
logger.info("Event record for subscriptions created in storage. Event: '%s'", self.event)

event_subscriptions = self.storage.subscriptions[event]

# Create subscription identifier in event subscription storage if it doesn't exist
if event_subscription_identifier not in event_subscriptions:
event_subscriptions[event_subscription_identifier] = set()
logger.info("Subscription identifier created for event. Event: '%s', Identifier '%s'", event, event_subscription_identifier)

event_subscriptions[event_subscription_identifier].add(self.websocket_user_id)
logger.info("User added to event subscribers. UserId: '%s', subscription_fqi: '%s'", self.websocket_user_id, self.subscription_fqi)
event_subscriptions.add(self.websocket_user_id)
logger.info("User added to event subscribers. UserId: '%s', Event: '%s'", self.websocket_user_id, self.event)

def update_user_subscriptions(self) -> None:
self.storage.user_connections[self.websocket_user_id].user_subscriptions.add(self.subscription_fqi)
logger.info("Subscription added to user: '%s', subscription_fqi: '%s'", self.websocket_user_id, self.subscription_fqi)
self.storage.user_connections[self.websocket_user_id].user_subscriptions.add(self.event)
logger.info("Subscription added to user: '%s', Event: '%s'", self.websocket_user_id, self.event)
32 changes: 13 additions & 19 deletions src/storage/storage_updaters/storage_user_unsubscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@

from app.services import BaseService
from app.types import UserId
from app.types import Event
from storage.exceptions import StorageOperationException
from storage.subscription_storage import SubscriptionStorage
from storage.types import SubscriptionFullQualifiedIdentifier

logger = logging.getLogger(__name__)

Expand All @@ -17,7 +17,7 @@
class StorageUserUnsubscriber(BaseService):
storage: SubscriptionStorage
websocket: WebSocketServerProtocol
subscription_fqi: SubscriptionFullQualifiedIdentifier
event: Event

@cached_property
def websocket_user_id(self) -> UserId:
Expand All @@ -33,39 +33,33 @@ def act(self) -> None:
StorageUserSubscriptionRemover(
self.storage,
self.websocket_user_id,
self.subscription_fqi,
self.event,
)()

def is_user_has_subscription(self) -> bool:
return self.subscription_fqi in self.storage.user_connections[self.websocket_user_id].user_subscriptions
return self.event in self.storage.user_connections[self.websocket_user_id].user_subscriptions


@dataclass
class StorageUserSubscriptionRemover(BaseService):
storage: SubscriptionStorage
user_id: UserId
subscription_fqi: SubscriptionFullQualifiedIdentifier
event: Event

def act(self) -> None:
self.remove_storage_subscription()
self.update_user_subscriptions()

def remove_storage_subscription(self) -> None:
event, _, event_subscription_identifier = self.subscription_fqi
event_subscriptions = self.storage.subscriptions[self.event]

event_subscriptions = self.storage.subscriptions[event]
event_subscriptions.discard(self.user_id)
logger.info("User unsubscribed from event. User: '%s', Event '%s'", self.user_id, self.event)

event_subscriptions[event_subscription_identifier].discard(self.user_id)
logger.info("User unsubscribed from event. User: '%s', subscription_fqi '%s'", self.user_id, self.subscription_fqi)

if not event_subscriptions[event_subscription_identifier]:
del event_subscriptions[event_subscription_identifier]
logger.info("Subscription identifier removed from event in storage. Event: '%s', identifier: '%s'", event, event_subscription_identifier)

if not self.storage.subscriptions[event]:
del self.storage.subscriptions[event]
logger.info("Event from storage removed cause empty. Event: '%s'", event)
if not event_subscriptions:
del self.storage.subscriptions[self.event]
logger.info("Event record removed from storage cause has no subscribers. Event: '%s'", self.event)

def update_user_subscriptions(self) -> None:
self.storage.user_connections[self.user_id].user_subscriptions.discard(self.subscription_fqi)
logger.info("Subscription removed from user. User: '%s', subscription_fqi: '%s'", self.user_id, self.subscription_fqi)
self.storage.user_connections[self.user_id].user_subscriptions.discard(self.event)
logger.info("Subscription removed from user. User: '%s', event: '%s'", self.user_id, self.event)
8 changes: 3 additions & 5 deletions src/storage/subscription_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,12 @@
from app.types import UserId
from storage.types import ConnectedUserMeta
from storage.types import WebSocketMeta
from storage.types import EventSubscriptionIdentifier
from storage.types import EventSubscriptionsBucket


@dataclass
class SubscriptionStorage:
registered_websockets: dict[WebSocketServerProtocol, WebSocketMeta] = field(default_factory=dict)
subscriptions: dict[Event, EventSubscriptionsBucket] = field(default_factory=dict)
subscriptions: dict[Event, set[UserId]] = field(default_factory=dict)
user_connections: dict[UserId, ConnectedUserMeta] = field(default_factory=dict)

def is_websocket_registered(self, websocket: WebSocketServerProtocol) -> bool:
Expand All @@ -28,8 +26,8 @@ def get_websocket_user_id(self, websocket: WebSocketServerProtocol) -> UserId |
websocket_meta = self.registered_websockets.get(websocket)
return websocket_meta.user_id if websocket_meta else None

def get_event_subscribers_user_ids(self, event: Event, identifier: EventSubscriptionIdentifier) -> set[UserId]:
return self.subscriptions.get(event, {}).get(identifier, set())
def get_event_subscribers_user_ids(self, event: Event) -> set[UserId]:
return self.subscriptions.get(event) or set()

def is_event_active(self, event: Event) -> bool:
return event in self.subscriptions
Expand Down
29 changes: 7 additions & 22 deletions src/storage/tests/storage_updaters/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from app.testing import MockedWebSocketServerProtocol
from storage.storage_updaters.storage_connection_register import StorageConnectionRegister
from storage.storage_updaters.storage_user_subscriber import StorageUserSubscriber
from storage.types import SubscriptionFullQualifiedIdentifier


@pytest.fixture
Expand Down Expand Up @@ -48,41 +47,27 @@ def ws_registered(ws, valid_token, register_ws):

@pytest.fixture
def event():
return "omniNotification"
return "boobs"


@pytest.fixture
def event_subscription_key():
return ("userId",)


@pytest.fixture
def event_subscription_identifier():
return ("userX",)


@pytest.fixture
def subscription_fqi(event, event_subscription_key, event_subscription_identifier):
return SubscriptionFullQualifiedIdentifier(
event=event,
subscription_key=event_subscription_key,
subscription_identifier=event_subscription_identifier,
)
def ya_event():
return "boobs-boobs"


@pytest.fixture
def subscribe_ws(storage):
def subscribe(ws, subscription_fqi):
def subscribe(ws, event):
StorageUserSubscriber(
storage=storage,
websocket=ws,
subscription_fqi=subscription_fqi,
event=event,
)()

return subscribe


@pytest.fixture
def ws_subscribed(ws_registered, subscribe_ws, subscription_fqi):
subscribe_ws(ws_registered, subscription_fqi)
def ws_subscribed(ws_registered, subscribe_ws, event):
subscribe_ws(ws_registered, event)
return ws_registered
104 changes: 37 additions & 67 deletions src/storage/tests/storage_updaters/tests_storage_user_subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from storage.exceptions import StorageOperationException
from storage.storage_updaters import StorageUserSubscriber
from storage.types import SubscriptionFullQualifiedIdentifier


@pytest.fixture
Expand All @@ -19,104 +18,75 @@ def ya_user_ws_registered(ya_ws, ya_user_valid_token, register_ws):

@pytest.fixture
def subscribe(storage):
def subscribe(ws, subscription_fqi):
StorageUserSubscriber(
storage=storage,
websocket=ws,
subscription_fqi=subscription_fqi,
)()
def subscribe(ws, event):
StorageUserSubscriber(storage=storage, websocket=ws, event=event)()

return subscribe


def test_create_subscription_in_storage_subscriptions(ws_registered, subscribe, storage, event, event_subscription_identifier, subscription_fqi):
subscribe(ws_registered, subscription_fqi)
def test_create_subscription_in_storage_subscriptions(ws_registered, subscribe, storage, event):
subscribe(ws_registered, event)

assert event in storage.subscriptions, "Event should be created in storage"
assert event_subscription_identifier in storage.subscriptions[event], "Identifier should be created"
assert storage.subscriptions[event][event_subscription_identifier] == {"user1"}, "User id should be added to subscription"
assert storage.user_connections["user1"].user_subscriptions == {subscription_fqi}, "Subscription should be added to user subscriptions"
assert storage.subscriptions[event] == {"user1"}, "User id should be added to subscription"
assert storage.user_connections["user1"].user_subscriptions == {event}, "Subscription should be added to user subscriptions"


def test_subscription_with_same_params_idempotent(ws_registered, subscribe, storage, subscription_fqi, event, event_subscription_identifier):
subscribe(ws_registered, subscription_fqi)
subscribe(ws_registered, subscription_fqi)
def test_subscription_with_same_params_idempotent(ws_registered, subscribe, storage, event):
subscribe(ws_registered, event)
subscribe(ws_registered, event)

assert len(storage.subscriptions) == 1
assert len(storage.subscriptions[event]) == 1
assert len(storage.subscriptions[event][event_subscription_identifier]) == 1
assert storage.subscriptions[event][event_subscription_identifier] == {"user1"}
assert len(storage.user_connections["user1"].user_subscriptions) == 1
assert storage.subscriptions[event] == {"user1"}
assert storage.user_connections["user1"].user_subscriptions == {event}


def test_subscription_same_user_other_websocket_idempotent(
ws_registered, same_user_ya_ws_registered, subscribe, storage, subscription_fqi, event, event_subscription_identifier
):
subscribe(ws_registered, subscription_fqi)
def test_subscription_same_user_other_websocket_idempotent(ws_registered, same_user_ya_ws_registered, subscribe, storage, event):
subscribe(ws_registered, event)

subscribe(same_user_ya_ws_registered, subscription_fqi)
subscribe(same_user_ya_ws_registered, event)

assert len(storage.subscriptions) == 1
assert len(storage.subscriptions[event]) == 1
assert storage.subscriptions[event][event_subscription_identifier] == {"user1"}
assert len(storage.user_connections["user1"].user_subscriptions) == 1
assert storage.subscriptions[event] == {"user1"}
assert storage.user_connections["user1"].user_subscriptions == {event}


def test_subscription_to_same_event_other_identifier(ws_registered, subscribe, storage, event, event_subscription_key, subscription_fqi):
same_event_ya_identifier_fqi = SubscriptionFullQualifiedIdentifier(event, event_subscription_key, ("userY",))
subscribe(ws_registered, subscription_fqi)
def test_subscription_other_event(ws_registered, subscribe, storage, event, ya_event):
subscribe(ws_registered, event)

subscribe(ws_registered, same_event_ya_identifier_fqi)

assert len(storage.subscriptions) == 1
assert len(storage.subscriptions[event]) == 2
assert storage.subscriptions[event][("userX",)] == {"user1"}
assert storage.subscriptions[event][("userY",)] == {"user1"}
assert storage.user_connections["user1"].user_subscriptions == {subscription_fqi, same_event_ya_identifier_fqi}


def test_subscription_two_different_events(ws_registered, subscribe, storage, subscription_fqi, event, event_subscription_identifier):
other_event_fqi = SubscriptionFullQualifiedIdentifier("classifierId", ("userId", "projectUrn"), ("user008", "some-urn"))
subscribe(ws_registered, subscription_fqi)

subscribe(ws_registered, other_event_fqi)
subscribe(ws_registered, ya_event)

assert len(storage.subscriptions) == 2
assert storage.subscriptions[event][event_subscription_identifier] == {"user1"}
assert storage.subscriptions["classifierId"][("user008", "some-urn")] == {"user1"}
assert storage.user_connections["user1"].user_subscriptions == {subscription_fqi, other_event_fqi}
assert storage.subscriptions[event] == {"user1"}
assert storage.subscriptions[ya_event] == {"user1"}
assert storage.user_connections["user1"].user_subscriptions == {event, ya_event}


def test_other_user_with_same_fqi_subscribe_ok(
ws_registered, ya_user_ws_registered, subscribe, storage, subscription_fqi, event, event_subscription_identifier
):
subscribe(ws_registered, subscription_fqi)
def test_other_user_with_same_fqi_subscribe_ok(ws_registered, ya_user_ws_registered, subscribe, storage, event):
subscribe(ws_registered, event)

subscribe(ya_user_ws_registered, subscription_fqi)
subscribe(ya_user_ws_registered, event)

assert storage.subscriptions[event][event_subscription_identifier] == {"user1", "user2"}
assert storage.user_connections["user1"].user_subscriptions == {subscription_fqi}
assert storage.user_connections["user2"].user_subscriptions == {subscription_fqi}
assert storage.subscriptions[event] == {"user1", "user2"}
assert storage.user_connections["user1"].user_subscriptions == {event}
assert storage.user_connections["user2"].user_subscriptions == {event}


def test_other_user_subscription_to_other_event_ok(
ws_registered, ya_user_ws_registered, subscribe, storage, subscription_fqi, event, event_subscription_identifier
):
ya_subscribe_fqi = SubscriptionFullQualifiedIdentifier("classifierId", ("entityId"), (100500,))
subscribe(ws_registered, subscription_fqi)
def test_other_user_subscription_to_other_event_ok(ws_registered, ya_user_ws_registered, subscribe, storage, event, ya_event):
subscribe(ws_registered, event)

subscribe(ya_user_ws_registered, ya_subscribe_fqi)
subscribe(ya_user_ws_registered, ya_event)

assert len(storage.subscriptions) == 2
assert storage.subscriptions[event][event_subscription_identifier] == {"user1"}
assert storage.subscriptions["classifierId"][(100500,)] == {"user2"}
assert storage.user_connections["user1"].user_subscriptions == {subscription_fqi}
assert storage.user_connections["user2"].user_subscriptions == {ya_subscribe_fqi}
assert storage.subscriptions[event] == {"user1"}
assert storage.subscriptions[ya_event] == {"user2"}
assert storage.user_connections["user1"].user_subscriptions == {event}
assert storage.user_connections["user2"].user_subscriptions == {ya_event}


def test_raise_if_ws_connection_not_registered(ws, subscribe, storage, subscription_fqi):
def test_raise_if_ws_connection_not_registered(ws, subscribe, storage, event):
with pytest.raises(StorageOperationException, match="The user is not registered"):
subscribe(ws, subscription_fqi)
subscribe(ws, event)

assert storage.subscriptions == {}
assert storage.user_connections == {}
Loading

0 comments on commit 3f0b0c9

Please sign in to comment.