Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create Basic Api Key connection #311

Merged
merged 19 commits into from
Sep 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ably/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from ably.rest.rest import AblyRest
from ably.realtime.realtime import AblyRealtime
QuintinWillison marked this conversation as resolved.
Show resolved Hide resolved
from ably.rest.auth import Auth
from ably.rest.push import Push
from ably.types.capability import Capability
Expand Down
Empty file added ably/realtime/__init__.py
Empty file.
89 changes: 89 additions & 0 deletions ably/realtime/connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import logging
import asyncio
import websockets
import json
from ably.util.exceptions import AblyAuthException
from enum import Enum, IntEnum

log = logging.getLogger(__name__)


class ConnectionState(Enum):
INITIALIZED = 'initialized'
CONNECTING = 'connecting'
CONNECTED = 'connected'
CLOSING = 'closing'
CLOSED = 'closed'
FAILED = 'failed'


class ProtocolMessageAction(IntEnum):
CONNECTED = 4
ERROR = 9


class RealtimeConnection:
def __init__(self, realtime):
self.options = realtime.options
self.__ably = realtime
self.__state = ConnectionState.INITIALIZED
self.__connected_future = None
self.__websocket = None

async def connect(self):
if self.__state == ConnectionState.CONNECTED:
return

if self.__state == ConnectionState.CONNECTING:
if self.__connected_future is None:
log.fatal('Connection state is CONNECTING but connected_future does not exits')
return
await self.__connected_future
else:
self.__state = ConnectionState.CONNECTING
self.__connected_future = asyncio.Future()
asyncio.create_task(self.connect_impl())
await self.__connected_future
self.__state = ConnectionState.CONNECTED

async def close(self):
self.__state = ConnectionState.CLOSING
if self.__websocket:
await self.__websocket.close()
else:
log.warn('Connection.closed called while connection already closed')
self.__state = ConnectionState.CLOSED

async def connect_impl(self):
async with websockets.connect(f'wss://{self.options.realtime_host}?key={self.ably.key}') as websocket:
self.__websocket = websocket
task = asyncio.create_task(self.ws_read_loop())
await task

async def ws_read_loop(self):
while True:
raw = await self.__websocket.recv()
msg = json.loads(raw)
action = msg['action']
if action == ProtocolMessageAction.CONNECTED: # CONNECTED
if self.__connected_future:
self.__connected_future.set_result(None)
self.__connected_future = None
else:
log.warn('CONNECTED message received but connected_future not set')
if action == ProtocolMessageAction.ERROR: # ERROR
error = msg["error"]
if error['nonfatal'] is False:
self.__state = ConnectionState.FAILED
if self.__connected_future:
self.__connected_future.set_exception(
AblyAuthException(error["message"], error["statusCode"], error["code"]))
self.__connected_future = None

@property
def ably(self):
return self.__ably

@property
def state(self):
return self.__state
48 changes: 48 additions & 0 deletions ably/realtime/realtime.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import logging
from ably.realtime.connection import RealtimeConnection
from ably.rest.auth import Auth
from ably.types.options import Options


log = logging.getLogger(__name__)


class AblyRealtime:
"""Ably Realtime Client"""

def __init__(self, key=None, **kwargs):
"""Create an AblyRealtime instance.

:Parameters:
**Credentials**
- `key`: a valid ably key string
"""

if key is not None:
options = Options(key=key, **kwargs)
else:
raise ValueError("Key is missing. Provide an API key.")

self.__auth = Auth(self, options)
self.__options = options
self.key = key
self.__connection = RealtimeConnection(self)

async def connect(self):
await self.connection.connect()

async def close(self):
await self.connection.close()

@property
def auth(self):
return self.__auth

@property
def options(self):
return self.__options

@property
def connection(self):
"""Establish realtime connection"""
return self.__connection
3 changes: 3 additions & 0 deletions ably/types/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ def __init__(self, client_id=None, log_level=0, tls=True, rest_host=None,
from ably import api_version
idempotent_rest_publishing = api_version >= '1.2'

if realtime_host is None:
realtime_host = Defaults.realtime_host

self.__client_id = client_id
self.__log_level = log_level
self.__tls = tls
Expand Down
94 changes: 76 additions & 18 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ h2 = "^4.0.0"
# Optional dependencies
pycrypto = { version = "^2.6.1", optional = true }
pycryptodome = { version = "*", optional = true }
websockets = "^10.3"

[tool.poetry.extras]
oldcrypto = ["pycrypto"]
Expand Down
Loading