Skip to content

Commit

Permalink
first
Browse files Browse the repository at this point in the history
  • Loading branch information
toxazhl committed Mar 13, 2024
0 parents commit 4d55d3c
Show file tree
Hide file tree
Showing 15 changed files with 1,144 additions and 0 deletions.
36 changes: 36 additions & 0 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
name: Publish to PyPI

on:
release:
types: [published]

jobs:
publish:
runs-on: ubuntu-latest
defaults:
run:
shell: bash
environment:
name: pypi
url: https://pypi.org/project/fastmqtt
permissions:
id-token: write
steps:
- name: Checkout repository
uses: actions/checkout@v3
with:
fetch-depth: 0 # Fetch entire history for poetry-dynamic-versioning, see: https://github.com/mtkennerly/poetry-dynamic-versioning/issues/55
- name: Set up Python
uses: actions/setup-python@v4 # Uses the Python version in .python-version
- name: Install poetry
uses: snok/install-poetry@v1
with:
virtualenvs-create: true
virtualenvs-in-project: true
installer-parallel: true
- name: Setup dynamic versioning
run: poetry self add "poetry-dynamic-versioning[plugin]"
- name: Build package
run: poetry build
- name: Publish to PyPI # Uses trusted publishing and thus doesn't need a token
uses: pypa/gh-action-pypi-publish@release/v1
26 changes: 26 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Project
.idea/
.vscode/
.venv/
.tests/
.env
.DS_Store
venv/

# Cache
__pycache__/
*.py[cod]
.cache/
.ruff_cache/
.mypy_cache/
.pytest_cache/
.coverage/

# Build
env/
build/
_build/
dist/
site/
*.egg-info/
*.egg
21 changes: 21 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) 2024 nullmatawasoradesu

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
66 changes: 66 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
**README.md**

# FastMQTT

A performant, flexible, and user-friendly MQTT client library built on top of aiomqtt. FastMQTT simplifies message handling, advanced subscriptions, and convenient request-response patterns within the MQTT protocol.

**Key Features**

* **Efficient Message Handling:** Streamlined asynchronous message processing.
* **Robust Router:** Define topic-based message routing with QoS, no_local, retain options.
* **Subscription Management:** Effortlessly manage subscriptions, including retained messages.
* **Request-Response Patterns:** Convenient `ResponseContext` for request-response communication over MQTT.
* **Correlation Tracking:** Automatic correlation ID generation to match responses with their requests.
* **aiomqtt Foundation:** Built upon the reliable aiomqtt library for core MQTT functionality.

**Installation**

```bash
pip install fastmqtt
```

**Basic Usage**

```python
import asyncio
from fastmqtt import FastMQTT, MQTTRouter

router = MQTTRouter()

@router.on_message("my/topic") # Subscribe and handle incoming messages
async def message_handler(message: Message, properties: dict):
print(f"Message received: {message.payload.decode()} on topic {message.topic}")

async def main():
client = FastMQTT("mqtt.example.com", routers=[router])

async with client: # Connect and automatically handle subscriptions
await client.publish("my/topic", "Hello from FastMQTT!")
await asyncio.sleep(5) # Keep running for a bit

if __name__ == "__main__":
asyncio.run(main())
```

**Request-Response Example**

```python
@router.on_message("temperature/request")
async def temp_request_handler(message: Message, properties: dict):
# Simulate getting a temperature reading
return 25 # Return the temperature

async def main():
client = FastMQTT("mqtt.example.com", routers=[router])

async with client:
async with client.response_context(f"temperature/response/{client.identifier}") as ctx:
response = await ctx.request("temperature/request")
print(f"Temperature: {response.payload.decode()}")
```

**Contributions**

We welcome contributions to improve FastMQTT! Please open issues for bug reports or feature suggestions, and fork the repository to submit pull requests.

Let me know if you'd like modifications or have specific aspects you want to emphasize in the README!
52 changes: 52 additions & 0 deletions example/example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import asyncio
import json
from typing import Any

from aiomqtt import Message

from fastmqtt import FastMQTT, MQTTRouter, Retain

router = MQTTRouter()


@router.on_message("test/fastmqtt/print_message", retain_handling=Retain.DO_NOT_SEND)
async def on_print_message(message: Message, properties: dict[str, Any]):
print(f"Received message: {message.payload.decode()}")


@router.on_message("test/fastmqtt/process/substruction", retain_handling=Retain.DO_NOT_SEND)
async def on_process_substruction(message: Message, properties: dict[str, Any]):
payload = json.loads(message.payload)
return payload["a"] - payload["b"]


@router.on_message("test/fastmqtt/process/multiplication", retain_handling=Retain.DO_NOT_SEND)
async def on_process_multiplication(message: Message, properties: dict[str, Any]):
payload = json.loads(message.payload)
return payload["a"] * payload["b"]


async def main():
# fastmqtt = FastMQTT("test.mosquitto.org")
# fastmqtt.include_router(router)
# async with fastmqtt:
# OR
async with FastMQTT("test.mosquitto.org", routers=[router]) as fastmqtt:
await fastmqtt.publish("test/fastmqtt/print_message", "Hello, world!")

async with fastmqtt.response_context(
f"test/fastmqtt/process/response/{fastmqtt.identifier}"
) as ctx:
response = await ctx.request(
"test/fastmqtt/process/substruction", json.dumps({"a": 10, "b": 5})
)
print(f"Substruction result: {response.payload.decode()}")

response = await ctx.request(
"test/fastmqtt/process/multiplication", json.dumps({"a": 20, "b": 30})
)
print(f"Multiplication result: {response.payload.decode()}")


if __name__ == "__main__":
asyncio.run(main())
12 changes: 12 additions & 0 deletions fastmqtt/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from .exceptions import FastMQTTError
from .fastmqtt import FastMQTT
from .router import MQTTRouter
from .subscription_manager import Retain, Subscription

__all__ = [
"FastMQTT",
"MQTTRouter",
"FastMQTTError",
"Retain",
"Subscription",
]
2 changes: 2 additions & 0 deletions fastmqtt/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class FastMQTTError(Exception):
pass
159 changes: 159 additions & 0 deletions fastmqtt/fastmqtt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
import asyncio
import logging
import ssl
from typing import Any, Callable, Iterable, Sequence

import paho.mqtt.client as mqtt
from aiomqtt import Client as MQTTClient
from aiomqtt import (
Message,
ProtocolVersion,
ProxySettings,
TLSParameters,
Will,
)
from aiomqtt.types import PayloadType, SocketOption
from paho.mqtt.packettypes import PacketTypes

from .message_handler import MessageHandler
from .response import ResponseContext
from .router import MQTTRouter
from .subscription_manager import Subscription, SubscriptionManager
from .utils import properties_from_dict

WebSocketHeaders = dict[str, str] | Callable[[dict[str, str]], dict[str, str]]


class FastMQTT(MQTTRouter):
def __init__(
self,
hostname: str,
port: int = 1883,
*,
username: str | None = None,
password: str | None = None,
logger: logging.Logger | None = None,
identifier: str | None = None,
queue_type: type[asyncio.Queue[Message]] | None = None,
will: Will | None = None,
clean_session: bool | None = None,
transport: str = "tcp",
timeout: float | None = None,
keepalive: int = 60,
bind_address: str = "",
bind_port: int = 0,
clean_start: int = mqtt.MQTT_CLEAN_START_FIRST_ONLY,
max_queued_incoming_messages: int | None = None,
max_queued_outgoing_messages: int | None = None,
max_inflight_messages: int | None = None,
max_concurrent_outgoing_calls: int | None = None,
properties: mqtt.Properties | None = None,
tls_context: ssl.SSLContext | None = None,
tls_params: TLSParameters | None = None,
tls_insecure: bool | None = None,
proxy: ProxySettings | None = None,
socket_options: Iterable[SocketOption] | None = None,
websocket_path: str | None = None,
websocket_headers: WebSocketHeaders | None = None,
routers: Sequence[MQTTRouter] | None = None,
):
super().__init__()
self.client = MQTTClient(
hostname,
port,
username=username,
password=password,
logger=logger,
identifier=identifier,
queue_type=queue_type,
protocol=ProtocolVersion.V5,
will=will,
clean_session=clean_session,
transport=transport,
timeout=timeout,
keepalive=keepalive,
bind_address=bind_address,
bind_port=bind_port,
clean_start=clean_start,
max_queued_incoming_messages=max_queued_incoming_messages,
max_queued_outgoing_messages=max_queued_outgoing_messages,
max_inflight_messages=max_inflight_messages,
max_concurrent_outgoing_calls=max_concurrent_outgoing_calls,
properties=properties,
tls_context=tls_context,
tls_params=tls_params,
tls_insecure=tls_insecure,
proxy=proxy,
socket_options=socket_options,
websocket_path=websocket_path,
websocket_headers=websocket_headers,
)
self.subscriptions_map: dict[int, Subscription] = {}
self.sub_manager = SubscriptionManager(self)
self.message_handler = MessageHandler(self)

if routers is not None:
for router in routers:
self.include_router(router)

@property
def identifier(self) -> str:
return self.client.identifier

def include_router(self, router: MQTTRouter) -> None:
included_subscriptions = self.subscriptions.copy()
for router_sub in router.subscriptions:
for included_sub in included_subscriptions:
if included_sub.topic == router_sub.topic:
included_sub.callbacks.extend(router_sub.callbacks)
break
else:
self.subscriptions.append(router_sub)

async def __aenter__(self):
await self.client.__aenter__()
await self.message_handler.__aenter__()
await self.subscribe_all()
return self

async def __aexit__(self, exc_type, exc_value, traceback):
await self.client.__aexit__(exc_type, exc_value, traceback)
await self.message_handler.__aexit__(exc_type, exc_value, traceback)

async def subscribe_all(self) -> None:
await self.sub_manager.subscribe_all()

async def publish(
self,
topic: str,
payload: PayloadType = None,
qos: int = 0,
retain: bool = False,
properties: dict[str, Any] | None = None,
) -> None:
mqtt_properties = None
if properties is not None:
mqtt_properties = properties_from_dict(properties, PacketTypes.PUBLISH)

await self.client.publish(
topic=topic,
payload=payload,
qos=qos,
retain=retain,
properties=mqtt_properties,
)

def response_context(
self,
response_topic: str,
qos: int = 0,
default_timeout: float | None = 60,
**kwargs,
) -> ResponseContext:
return ResponseContext(
self,
response_topic,
qos,
default_timeout,
**kwargs,
)
Loading

0 comments on commit 4d55d3c

Please sign in to comment.