diff --git a/.clang-format b/.clang-format new file mode 100644 index 000000000..f10704ca2 --- /dev/null +++ b/.clang-format @@ -0,0 +1,58 @@ +--- +Language: Cpp +# BasedOnStyle: Mozilla +AlignAfterOpenBracket: AlwaysBreak +AlignConsecutiveAssignments: false +AlignConsecutiveDeclarations: false +AlignEscapedNewlines: Right +AlignOperands: true +AlignTrailingComments: true +AllowAllParametersOfDeclarationOnNextLine: false +AllowShortBlocksOnASingleLine: false +AllowShortCaseLabelsOnASingleLine: false +AllowShortFunctionsOnASingleLine: Inline +AllowShortIfStatementsOnASingleLine: false +AllowShortLoopsOnASingleLine: false +AlwaysBreakAfterReturnType: None +AlwaysBreakBeforeMultilineStrings: false +BinPackArguments: false +BinPackParameters: false +BreakBeforeBinaryOperators: None +BreakBeforeBraces: Attach +BreakBeforeTernaryOperators: true +BreakStringLiterals: true +ColumnLimit: 120 +ContinuationIndentWidth: 4 +DerivePointerAlignment: false +IncludeBlocks: Preserve +IndentCaseLabels: true +IndentPPDirectives: AfterHash +IndentWidth: 4 +IndentWrappedFunctionNames: true +KeepEmptyLinesAtTheStartOfBlocks: true +MacroBlockBegin: '' +MacroBlockEnd: '' +MaxEmptyLinesToKeep: 1 +PenaltyBreakAssignment: 2 +PenaltyBreakBeforeFirstCallParameter: 19 +PenaltyBreakComment: 300 +PenaltyBreakFirstLessLess: 120 +PenaltyBreakString: 1000 +PenaltyExcessCharacter: 1000000 +PenaltyReturnTypeOnItsOwnLine: 100000 +PointerAlignment: Right +ReflowComments: true +SortIncludes: true +SpaceAfterCStyleCast: false +SpaceBeforeAssignmentOperators: true +SpaceBeforeParens: ControlStatements +SpaceInEmptyParentheses: false +SpacesInContainerLiterals: true +SpacesInCStyleCastParentheses: false +SpacesInParentheses: false +SpacesInSquareBrackets: false +Standard: Cpp11 +TabWidth: 4 +UseTab: Never +... + diff --git a/.clang-tidy b/.clang-tidy new file mode 100644 index 000000000..30a67f199 --- /dev/null +++ b/.clang-tidy @@ -0,0 +1,14 @@ +--- +Checks: 'clang-diagnostic-*,clang-analyzer-*,readability-*,modernize-*,bugprone-*,misc-*,google-runtime-int,llvm-header-guard,fuchsia-restrict-system-includes,-clang-analyzer-valist.Uninitialized,-clang-analyzer-security.insecureAPI.rand,-clang-analyzer-alpha.*' +WarningsAsErrors: '*' +HeaderFilterRegex: '\./*' +FormatStyle: 'file' +CheckOptions: + - key: readability-braces-around-statements.ShortStatementLines + value: '1' + - key: google-runtime-int.TypeSufix + value: '_t' + - key: fuchsia-restrict-system-includes.Includes + value: '*,-stdint.h,-stdbool.h' + +... diff --git a/.gitignore b/.gitignore new file mode 100644 index 000000000..da271c75f --- /dev/null +++ b/.gitignore @@ -0,0 +1,530 @@ + +# Created by https://www.gitignore.io/api/git,c++,cmake,python,visualstudio,visualstudiocode + +### C++ ### +# Prerequisites +*.d + +# Compiled Object files +*.slo +*.lo +*.o +*.obj + +# Precompiled Headers +*.gch +*.pch + +# Compiled Dynamic libraries +*.so +*.dylib +*.dll + +# Fortran module files +*.mod +*.smod + +# Compiled Static libraries +*.lai +*.la +*.a +*.lib + +# Executables +*.exe +*.out +*.app + +### CMake ### +CMakeCache.txt +CMakeFiles +CMakeScripts +Testing +Makefile +cmake_install.cmake +install_manifest.txt +compile_commands.json +CTestTestfile.cmake + +### Git ### +# Created by git for backups. To disable backups in Git: +# $ git config --global mergetool.keepBackup false +*.orig + +# Created by git when using merge tools for conflicts +*.BACKUP.* +*.BASE.* +*.LOCAL.* +*.REMOTE.* +*_BACKUP_*.txt +*_BASE_*.txt +*_LOCAL_*.txt +*_REMOTE_*.txt + +### Python ### +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +### Python Patch ### +.venv/ + +### Python.VirtualEnv Stack ### +# Virtualenv +# http://iamzed.com/2009/05/07/a-primer-on-virtualenv/ +[Bb]in +[Ii]nclude +[Ll]ib +[Ll]ib64 +[Ll]ocal +[Ss]cripts +pyvenv.cfg +pip-selfcheck.json + +### VisualStudioCode ### +.vscode/* +!.vscode/settings.json +!.vscode/tasks.json +!.vscode/launch.json +!.vscode/extensions.json + +### VisualStudio ### +## Ignore Visual Studio temporary files, build results, and +## files generated by popular Visual Studio add-ons. +## +## Get latest from https://github.com/github/gitignore/blob/master/VisualStudio.gitignore + +# User-specific files +*.suo +*.user +*.userosscache +*.sln.docstates + +# User-specific files (MonoDevelop/Xamarin Studio) +*.userprefs + +# Build results +[Dd]ebug/ +[Dd]ebugPublic/ +[Rr]elease/ +[Rr]eleases/ +x64/ +x86/ +bld/ +[Bb]in/ +[Oo]bj/ +[Ll]og/ + +# Visual Studio 2015/2017 cache/options directory +.vs/ +# Uncomment if you have tasks that create the project's static files in wwwroot +#wwwroot/ + +# Visual Studio 2017 auto generated files +Generated\ Files/ + +# MSTest test Results +[Tt]est[Rr]esult*/ +[Bb]uild[Ll]og.* + +# NUNIT +*.VisualState.xml +TestResult.xml + +# Build Results of an ATL Project +[Dd]ebugPS/ +[Rr]eleasePS/ +dlldata.c + +# Benchmark Results +BenchmarkDotNet.Artifacts/ + +# .NET Core +project.lock.json +project.fragment.lock.json +artifacts/ + +# StyleCop +StyleCopReport.xml + +# Files built by Visual Studio +*_i.c +*_p.c +*_h.h +*.ilk +*.meta +*.iobj +*.pdb +*.ipdb +*.pgc +*.pgd +*.rsp +*.sbr +*.tlb +*.tli +*.tlh +*.tmp +*.tmp_proj +*_wpftmp.csproj +*.vspscc +*.vssscc +.builds +*.pidb +*.svclog +*.scc + +# Chutzpah Test files +_Chutzpah* + +# Visual C++ cache files +ipch/ +*.aps +*.ncb +*.opendb +*.opensdf +*.sdf +*.cachefile +*.VC.db +*.VC.VC.opendb + +# Visual Studio profiler +*.psess +*.vsp +*.vspx +*.sap + +# Visual Studio Trace Files +*.e2e + +# TFS 2012 Local Workspace +$tf/ + +# Guidance Automation Toolkit +*.gpState + +# ReSharper is a .NET coding add-in +_ReSharper*/ +*.[Rr]e[Ss]harper +*.DotSettings.user + +# JustCode is a .NET coding add-in +.JustCode + +# TeamCity is a build add-in +_TeamCity* + +# DotCover is a Code Coverage Tool +*.dotCover + +# AxoCover is a Code Coverage Tool +.axoCover/* +!.axoCover/settings.json + +# Visual Studio code coverage results +*.coverage +*.coveragexml + +# NCrunch +_NCrunch_* +.*crunch*.local.xml +nCrunchTemp_* + +# MightyMoose +*.mm.* +AutoTest.Net/ + +# Web workbench (sass) +.sass-cache/ + +# Installshield output folder +[Ee]xpress/ + +# DocProject is a documentation generator add-in +DocProject/buildhelp/ +DocProject/Help/*.HxT +DocProject/Help/*.HxC +DocProject/Help/*.hhc +DocProject/Help/*.hhk +DocProject/Help/*.hhp +DocProject/Help/Html2 +DocProject/Help/html + +# Click-Once directory +publish/ + +# Publish Web Output +*.[Pp]ublish.xml +*.azurePubxml +# Note: Comment the next line if you want to checkin your web deploy settings, +# but database connection strings (with potential passwords) will be unencrypted +*.pubxml +*.publishproj + +# Microsoft Azure Web App publish settings. Comment the next line if you want to +# checkin your Azure Web App publish settings, but sensitive information contained +# in these scripts will be unencrypted +PublishScripts/ + +# NuGet Packages +*.nupkg +# The packages folder can be ignored because of Package Restore +**/[Pp]ackages/* +# except build/, which is used as an MSBuild target. +!**/[Pp]ackages/build/ +# Uncomment if necessary however generally it will be regenerated when needed +#!**/[Pp]ackages/repositories.config +# NuGet v3's project.json files produces more ignorable files +*.nuget.props +*.nuget.targets + +# Microsoft Azure Build Output +csx/ +*.build.csdef + +# Microsoft Azure Emulator +ecf/ +rcf/ + +# Windows Store app package directories and files +AppPackages/ +BundleArtifacts/ +Package.StoreAssociation.xml +_pkginfo.txt +*.appx + +# Visual Studio cache files +# files ending in .cache can be ignored +*.[Cc]ache +# but keep track of directories ending in .cache +!*.[Cc]ache/ + +# Others +ClientBin/ +~$* +*~ +*.dbmdl +*.dbproj.schemaview +*.jfm +*.pfx +*.publishsettings +orleans.codegen.cs + +# Including strong name files can present a security risk +# (https://github.com/github/gitignore/pull/2483#issue-259490424) +#*.snk + +# Since there are multiple workflows, uncomment next line to ignore bower_components +# (https://github.com/github/gitignore/pull/1529#issuecomment-104372622) +#bower_components/ + +# RIA/Silverlight projects +Generated_Code/ + +# Backup & report files from converting an old project file +# to a newer Visual Studio version. Backup files are not needed, +# because we have git ;-) +_UpgradeReport_Files/ +Backup*/ +UpgradeLog*.XML +UpgradeLog*.htm +ServiceFabricBackup/ +*.rptproj.bak + +# SQL Server files +*.mdf +*.ldf +*.ndf + +# Business Intelligence projects +*.rdl.data +*.bim.layout +*.bim_*.settings +*.rptproj.rsuser + +# Microsoft Fakes +FakesAssemblies/ + +# GhostDoc plugin setting file +*.GhostDoc.xml + +# Node.js Tools for Visual Studio +.ntvs_analysis.dat +node_modules/ + +# Visual Studio 6 build log +*.plg + +# Visual Studio 6 workspace options file +*.opt + +# Visual Studio 6 auto-generated workspace file (contains which files were open etc.) +*.vbw + +# Visual Studio LightSwitch build output +**/*.HTMLClient/GeneratedArtifacts +**/*.DesktopClient/GeneratedArtifacts +**/*.DesktopClient/ModelManifest.xml +**/*.Server/GeneratedArtifacts +**/*.Server/ModelManifest.xml +_Pvt_Extensions + +# Paket dependency manager +.paket/paket.exe +paket-files/ + +# FAKE - F# Make +.fake/ + +# JetBrains Rider +.idea/ +*.sln.iml + +# CodeRush personal settings +.cr/personal + +# Python Tools for Visual Studio (PTVS) +*.pyc + +# Cake - Uncomment if you are using it +# tools/** +# !tools/packages.config + +# Tabs Studio +*.tss + +# Telerik's JustMock configuration file +*.jmconfig + +# BizTalk build output +*.btp.cs +*.btm.cs +*.odx.cs +*.xsd.cs + +# OpenCover UI analysis results +OpenCover/ + +# Azure Stream Analytics local run output +ASALocalRun/ + +# MSBuild Binary and Structured Log +*.binlog + +# NVidia Nsight GPU debugger configuration file +*.nvuser + +# MFractors (Xamarin productivity tool) working folder +.mfractor/ + +# Local History for Visual Studio +.localhistory/ + + +# End of https://www.gitignore.io/api/git,c++,cmake,python,visualstudio,visualstudiocode diff --git a/aws_crt_python/__init__.py b/aws_crt_python/__init__.py new file mode 100644 index 000000000..cbdb327bf --- /dev/null +++ b/aws_crt_python/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2010-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). +# You may not use this file except in compliance with the License. +# A copy of the License is located at +# +# http://aws.amazon.com/apache2.0 +# +# or in the "license" file accompanying this file. This file is distributed +# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +# express or implied. See the License for the specific language governing +# permissions and limitations under the License. + +__all__ = ['mqtt', 'iot'] \ No newline at end of file diff --git a/aws_crt_python/iot.py b/aws_crt_python/iot.py new file mode 100644 index 000000000..6717b7ae0 --- /dev/null +++ b/aws_crt_python/iot.py @@ -0,0 +1,914 @@ +# Copyright 2010-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). +# You may not use this file except in compliance with the License. +# A copy of the License is located at +# +# http://aws.amazon.com/apache2.0 +# +# or in the "license" file accompanying this file. This file is distributed +# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +# express or implied. See the License for the specific language governing +# permissions and limitations under the License. + +import aws_crt_python.mqtt + +import sys +import threading + +DROP_OLDEST = 0 +DROP_NEWEST = 1 + +class Message(object): + __slots__ = ['topic', 'payload'] + + def __init__(self, topic, payload): + self.topic = topic + self.payload = payload + +class Will(object): + def __init__(self, topic, payload, qos, retain): + self._topic = topic + self._payload = payload + self._qos = qos + self._retain = retain + +class AWSIoTMQTTClient(object): + + def __init__(self, clientID, useWebsocket=False, cleanSession=True): + """ + + The client class that connects to and accesses AWS IoT over MQTT v3.1/3.1.1. + + The following connection types are available: + + - TLSv1.2 Mutual Authentication + + X.509 certificate-based secured MQTT connection to AWS IoT + + - Websocket SigV4 + + IAM credential-based secured MQTT connection over Websocket to AWS IoT + + It provides basic synchronous MQTT operations in the classic MQTT publish-subscribe + model, along with configurations of on-top features: + + - Auto reconnect/resubscribe + + - Progressive reconnect backoff + + - Offline publish requests queueing with draining + + **Syntax** + + .. code:: python + + import AWSIoTPythonSDK.MQTTLib as AWSIoTPyMQTT + + # Create an AWS IoT MQTT Client using TLSv1.2 Mutual Authentication + myAWSIoTMQTTClient = AWSIoTPyMQTT.AWSIoTMQTTClient("testIoTPySDK") + # Create an AWS IoT MQTT Client using Websocket SigV4 + myAWSIoTMQTTClient = AWSIoTPyMQTT.AWSIoTMQTTClient("testIoTPySDK", useWebsocket=True) + + **Parameters** + + *clientID* - String that denotes the client identifier used to connect to AWS IoT. + If empty string were provided, client id for this connection will be randomly generated + n server side. + + *protocolType* - MQTT version in use for this connection. Could be :code:`AWSIoTPythonSDK.MQTTLib.MQTTv3_1` or :code:`AWSIoTPythonSDK.MQTTLib.MQTTv3_1_1` + + *useWebsocket* - Boolean that denotes enabling MQTT over Websocket SigV4 or not. + + **Returns** + + :code:`AWSIoTPythonSDK.MQTTLib.AWSIoTMQTTClient` object + + """ + assert(not useWebsocket) + self._cleanSession = cleanSession + self._useWebsocket = useWebsocket + self._alpnProtocol = None + + self._elg = aws_crt_python.mqtt.EventLoopGroup(1) + + self._client = aws_crt_python.mqtt.Client(self._elg, clientID) + + # Configuration APIs + def configureLastWill(self, topic, payload, QoS, retain=False): + """ + **Description** + + Used to configure the last will topic, payload and QoS of the client. Should be called before connect. + + **Syntax** + + .. code:: python + + myAWSIoTMQTTClient.configureLastWill("last/Will/Topic", "lastWillPayload", 0) + + **Parameters** + + *topic* - Topic name that last will publishes to. + + *payload* - Payload to publish for last will. + + *QoS* - Quality of Service. Could be 0 or 1. + + **Returns** + + None + + """ + self._will = Will(topic, payload, QoS, retain) + + def clearLastWill(self): + """ + **Description** + + Used to clear the last will configuration that is previously set through configureLastWill. + + **Syntax** + + .. code:: python + + myAWSIoTMQTTClient.clearLastWill() + + **Parameter** + + None + + **Returns** + + None + + """ + self._will = None + + def configureEndpoint(self, hostName, portNumber): + """ + **Description** + + Used to configure the host name and port number the client tries to connect to. Should be called + before connect. + + **Syntax** + + .. code:: python + + myAWSIoTMQTTClient.configureEndpoint("random.iot.region.amazonaws.com", 8883) + + **Parameters** + + *hostName* - String that denotes the host name of the user-specific AWS IoT endpoint. + + *portNumber* - Integer that denotes the port number to connect to. Could be :code:`8883` for + TLSv1.2 Mutual Authentication or :code:`443` for Websocket SigV4 and TLSv1.2 Mutual Authentication + with ALPN extension. + + **Returns** + + None + + """ + self._hostName = hostName + self._portNumber = portNumber + + if portNumber == 443 and not self._useWebsocket: + self._alpnProtocol = "x-amzn-mqtt-ca" + + def configureIAMCredentials(self, AWSAccessKeyID, AWSSecretAccessKey, AWSSessionToken=""): + """ + **Description** + + Used to configure/update the custom IAM credentials for Websocket SigV4 connection to + AWS IoT. Should be called before connect. + + **Syntax** + + .. code:: python + + myAWSIoTMQTTClient.configureIAMCredentials(obtainedAccessKeyID, obtainedSecretAccessKey, obtainedSessionToken) + + .. note:: + + Hard-coding credentials into custom script is NOT recommended. Please use AWS Cognito identity service + or other credential provider. + + **Parameters** + + *AWSAccessKeyID* - AWS Access Key Id from user-specific IAM credentials. + + *AWSSecretAccessKey* - AWS Secret Access Key from user-specific IAM credentials. + + *AWSSessionToken* - AWS Session Token for temporary authentication from STS. + + **Returns** + + None + + """ + raise NotImplementedError() + + def configureCredentials(self, CAFilePath, KeyPath="", CertificatePath=""): # Should be good for MutualAuth certs config and Websocket rootCA config + """ + **Description** + + Used to configure the rootCA, private key and certificate files. Should be called before connect. + + **Syntax** + + .. code:: python + + myAWSIoTMQTTClient.configureCredentials("PATH/TO/ROOT_CA", "PATH/TO/PRIVATE_KEY", "PATH/TO/CERTIFICATE") + + **Parameters** + + *CAFilePath* - Path to read the root CA file. Required for all connection types. + + *KeyPath* - Path to read the private key. Required for X.509 certificate based connection. + + *CertificatePath* - Path to read the certificate. Required for X.509 certificate based connection. + + **Returns** + + None + + """ + self._caPath = CAFilePath + self._keyPath = KeyPath + self._certPath = CertificatePath + + def configureAutoReconnectBackoffTime(self, baseReconnectQuietTimeSecond, maxReconnectQuietTimeSecond, stableConnectionTimeSecond): + """ + **Description** + + Used to configure the auto-reconnect backoff timing. Should be called before connect. + + **Syntax** + + .. code:: python + + # Configure the auto-reconnect backoff to start with 1 second and use 128 seconds as a maximum back off time. + # Connection over 20 seconds is considered stable and will reset the back off time back to its base. + myAWSIoTMQTTClient.configureAutoReconnectBackoffTime(1, 128, 20) + + **Parameters** + + *baseReconnectQuietTimeSecond* - The initial back off time to start with, in seconds. + Should be less than the stableConnectionTime. + + *maxReconnectQuietTimeSecond* - The maximum back off time, in seconds. + + *stableConnectionTimeSecond* - The number of seconds for a connection to last to be considered as stable. + Back off time will be reset to base once the connection is stable. + + **Returns** + + None + + """ + raise NotImplementedError() + + def configureOfflinePublishQueueing(self, queueSize, dropBehavior=DROP_NEWEST): + """ + **Description** + + Used to configure the queue size and drop behavior for the offline requests queueing. Should be + called before connect. Queueable offline requests include publish, subscribe and unsubscribe. + + **Syntax** + + .. code:: python + + import AWSIoTPythonSDK.MQTTLib as AWSIoTPyMQTT + + # Configure the offline queue for publish requests to be 20 in size and drop the oldest + request when the queue is full. + myAWSIoTMQTTClient.configureOfflinePublishQueueing(20, AWSIoTPyMQTT.DROP_OLDEST) + + **Parameters** + + *queueSize* - Size of the queue for offline publish requests queueing. + If set to 0, the queue is disabled. If set to -1, the queue size is set to be infinite. + + *dropBehavior* - the type of drop behavior when the queue is full. + Could be :code:`AWSIoTPythonSDK.core.util.enums.DropBehaviorTypes.DROP_OLDEST` or + :code:`AWSIoTPythonSDK.core.util.enums.DropBehaviorTypes.DROP_NEWEST`. + + **Returns** + + None + + """ + raise NotImplementedError() + + def configureDrainingFrequency(self, frequencyInHz): + """ + **Description** + + Used to configure the draining speed to clear up the queued requests when the connection is back. + Should be called before connect. + + **Syntax** + + .. code:: python + + # Configure the draining speed to be 2 requests/second + myAWSIoTMQTTClient.configureDrainingFrequency(2) + + .. note:: + + Make sure the draining speed is fast enough and faster than the publish rate. Slow draining + could result in inifinite draining process. + + **Parameters** + + *frequencyInHz* - The draining speed to clear the queued requests, in requests/second. + + **Returns** + + None + + """ + # self._mqtt_core.configure_draining_interval_sec(1/float(frequencyInHz)) + raise NotImplementedError() + + def configureConnectDisconnectTimeout(self, timeoutSecond): + """ + **Description** + + Used to configure the time in seconds to wait for a CONNACK or a disconnect to complete. + Should be called before connect. + + **Syntax** + + .. code:: python + + # Configure connect/disconnect timeout to be 10 seconds + myAWSIoTMQTTClient.configureConnectDisconnectTimeout(10) + + **Parameters** + + *timeoutSecond* - Time in seconds to wait for a CONNACK or a disconnect to complete. + + **Returns** + + None + + """ + raise NotImplementedError() + + def configureMQTTOperationTimeout(self, timeoutSecond): + """ + **Description** + + Used to configure the timeout in seconds for MQTT QoS 1 publish, subscribe and unsubscribe. + Should be called before connect. + + **Syntax** + + .. code:: python + + # Configure MQTT operation timeout to be 5 seconds + myAWSIoTMQTTClient.configureMQTTOperationTimeout(5) + + **Parameters** + + *timeoutSecond* - Time in seconds to wait for a PUBACK/SUBACK/UNSUBACK. + + **Returns** + + None + + """ + raise NotImplementedError() + + def configureUsernamePassword(self, username, password=None): + """ + **Description** + + Used to configure the username and password used in CONNECT packet. + + **Syntax** + + .. code:: python + + # Configure user name and password + myAWSIoTMQTTClient.configureUsernamePassword("myUsername", "myPassword") + + **Parameters** + + *username* - Username used in the username field of CONNECT packet. + + *password* - Password used in the password field of CONNECT packet. + + **Returns** + + None + + """ + self._username = username + self._password = password + + def enableMetricsCollection(self): + """ + **Description** + + Used to enable SDK metrics collection. Username field in CONNECT packet will be used to append the SDK name + and SDK version in use and communicate to AWS IoT cloud. This metrics collection is enabled by default. + + **Syntax** + + .. code:: python + + myAWSIoTMQTTClient.enableMetricsCollection() + + **Parameters** + + None + + **Returns** + + None + + """ + raise NotImplementedError() + + def disableMetricsCollection(self): + """ + **Description** + + Used to disable SDK metrics collection. + + **Syntax** + + .. code:: python + + myAWSIoTMQTTClient.disableMetricsCollection() + + **Parameters** + + None + + **Returns** + + None + + """ + raise NotImplementedError() + + # MQTT functionality APIs + def connect(self, keepAliveIntervalSecond=600): + """ + **Description** + + Connect to AWS IoT, with user-specific keepalive interval configuration. + + **Syntax** + + .. code:: python + + # Connect to AWS IoT with default keepalive set to 600 seconds + myAWSIoTMQTTClient.connect() + # Connect to AWS IoT with keepalive interval set to 1200 seconds + myAWSIoTMQTTClient.connect(1200) + + **Parameters** + + *keepAliveIntervalSecond* - Time in seconds for interval of sending MQTT ping request. + Default set to 600 seconds. + + **Returns** + + True if the connect attempt succeeded. False if failed. + + """ + + connected = threading.Event() + + def _onConnectWrapper(return_code, session_present): + nonlocal connected + self.onOnline() + connected.set() + + def _onDisconnectWrapper(return_code): + self.onOffline() + + self._client.connect( + host_name=self._hostName, + port=self._portNumber, + ca_path=self._caPath, + key_path=self._keyPath, + certificate_path=self._certPath, + alpn=self._alpnProtocol, + keep_alive=keepAliveIntervalSecond, + on_connect=_onConnectWrapper, + on_disconnect=_onDisconnectWrapper, + ) + + connected.wait() + + return True + + def connectAsync(self, keepAliveIntervalSecond=600, ackCallback=None): + """ + **Description** + + Connect asynchronously to AWS IoT, with user-specific keepalive interval configuration and CONNACK callback. + + **Syntax** + + .. code:: python + + # Connect to AWS IoT with default keepalive set to 600 seconds and a custom CONNACK callback + myAWSIoTMQTTClient.connectAsync(ackCallback=my_connack_callback) + # Connect to AWS IoT with default keepalive set to 1200 seconds and a custom CONNACK callback + myAWSIoTMQTTClient.connectAsync(keepAliveInternvalSecond=1200, ackCallback=myConnackCallback) + + **Parameters** + + *keepAliveIntervalSecond* - Time in seconds for interval of sending MQTT ping request. + Default set to 600 seconds. + + *ackCallback* - Callback to be invoked when the client receives a CONNACK. Should be in form + :code:`customCallback(mid, data)`, where :code:`mid` is the packet id for the connect request + and :code:`data` is the connect result code. + + **Returns** + + Connect request packet id, for tracking purpose in the corresponding callback. + + """ + + def _onConnectWrapper(return_code, session_present): + self.onOnline() + + def _onDisconnectWrapper(return_code): + self.onOffline() + + self._client.connect( + host_name=self._hostName, + port=self._portNumber, + ca_path=self._caPath, + key_path=self._keyPath, + certificate_path=self._certPath, + alpn=self._alpnProtocol, + keep_alive=keepAliveIntervalSecond, + on_connect=_onConnectWrapper, + on_disconnect=_onDisconnectWrapper, + ) + + def disconnect(self): + """ + **Description** + + Disconnect from AWS IoT. + + **Syntax** + + .. code:: python + + myAWSIoTMQTTClient.disconnect() + + **Parameters** + + None + + **Returns** + + True if the disconnect attempt succeeded. False if failed. + + """ + + done = threading.Event() + + old_onOffline = self.onOffline + def new_onOffline(): + nonlocal done + old_onOffline() + done.set() + self.onOffline = new_onOffline + + self._client.disconnect() + + done.wait() + + def disconnectAsync(self, ackCallback=None): + """ + **Description** + + Disconnect asynchronously to AWS IoT. + + **Syntax** + + .. code:: python + + myAWSIoTMQTTClient.disconnectAsync(ackCallback=myDisconnectCallback) + + **Parameters** + + *ackCallback* - Callback to be invoked when the client finishes sending disconnect and internal clean-up. + Should be in form :code:`customCallback(mid, data)`, where :code:`mid` is the packet id for the disconnect + request and :code:`data` is the disconnect result code. + + **Returns** + + Disconnect request packet id, for tracking purpose in the corresponding callback. + + """ + return self._client.disconnect() + + def publish(self, topic, payload, QoS): + """ + **Description** + + Publish a new message to the desired topic with QoS. + + **Syntax** + + .. code:: python + + # Publish a QoS0 message "myPayload" to topic "myTopic" + myAWSIoTMQTTClient.publish("myTopic", "myPayload", 0) + # Publish a QoS1 message "myPayloadWithQos1" to topic "myTopic/sub" + myAWSIoTMQTTClient.publish("myTopic/sub", "myPayloadWithQos1", 1) + + **Parameters** + + *topic* - Topic name to publish to. + + *payload* - Payload to publish. + + *QoS* - Quality of Service. Could be 0 or 1. + + **Returns** + + True if the publish request has been sent to paho. False if the request did not reach paho. + + """ + done = threading.Event() + + def _puback_callback(): + nonlocal done + done.set() + + # Disable retain for publish by now + self._client.publish(topic, payload, QoS, False, _puback_callback) + + done.wait() + + return True + + def publishAsync(self, topic, payload, QoS, ackCallback=None): + """ + **Description** + + Publish a new message asynchronously to the desired topic with QoS and PUBACK callback. Note that the ack + callback configuration for a QoS0 publish request will be ignored as there are no PUBACK reception. + + **Syntax** + + .. code:: python + + # Publish a QoS0 message "myPayload" to topic "myTopic" + myAWSIoTMQTTClient.publishAsync("myTopic", "myPayload", 0) + # Publish a QoS1 message "myPayloadWithQos1" to topic "myTopic/sub", with custom PUBACK callback + myAWSIoTMQTTClient.publishAsync("myTopic/sub", "myPayloadWithQos1", 1, ackCallback=myPubackCallback) + + **Parameters** + + *topic* - Topic name to publish to. + + *payload* - Payload to publish. + + *QoS* - Quality of Service. Could be 0 or 1. + + *ackCallback* - Callback to be invoked when the client receives a PUBACK. Should be in form + :code:`customCallback(mid)`, where :code:`mid` is the packet id for the disconnect request. + + **Returns** + + Publish request packet id, for tracking purpose in the corresponding callback. + + """ + self._client.publish(topic, payload, QoS, False, ackCallback) + + def subscribe(self, topic, QoS, callback): + """ + **Description** + + Subscribe to the desired topic and register a callback. + + **Syntax** + + .. code:: python + + # Subscribe to "myTopic" with QoS0 and register a callback + myAWSIoTMQTTClient.subscribe("myTopic", 0, customCallback) + # Subscribe to "myTopic/#" with QoS1 and register a callback + myAWSIoTMQTTClient.subscribe("myTopic/#", 1, customCallback) + + **Parameters** + + *topic* - Topic name or filter to subscribe to. + + *QoS* - Quality of Service. Could be 0 or 1. + + *callback* - Function to be called when a new message for the subscribed topic + comes in. Should be in form :code:`customCallback(client, userdata, message)`, where + :code:`message` contains :code:`topic` and :code:`payload`. Note that :code:`client` and :code:`userdata` are + here just to be aligned with the underneath Paho callback function signature. These fields are pending to be + deprecated and should not be depended on. + + **Returns** + + True if the subscribe attempt succeeded. False if failed. + + """ + done = threading.Event() + + def _suback_callback(): + nonlocal done + done.set() + + self._client.subscribe(topic, QoS, lambda topic, payload: callback(None, None, Message(topic, payload)), _suback_callback) + + done.wait() + + return True + + def subscribeAsync(self, topic, QoS, ackCallback=None, messageCallback=None): + """ + **Description** + + Subscribe to the desired topic and register a message callback with SUBACK callback. + + **Syntax** + + .. code:: python + + # Subscribe to "myTopic" with QoS0, custom SUBACK callback and a message callback + myAWSIoTMQTTClient.subscribe("myTopic", 0, ackCallback=mySubackCallback, messageCallback=customMessageCallback) + # Subscribe to "myTopic/#" with QoS1, custom SUBACK callback and a message callback + myAWSIoTMQTTClient.subscribe("myTopic/#", 1, ackCallback=mySubackCallback, messageCallback=customMessageCallback) + + **Parameters** + + *topic* - Topic name or filter to subscribe to. + + *QoS* - Quality of Service. Could be 0 or 1. + + *ackCallback* - Callback to be invoked when the client receives a SUBACK. Should be in form + :code:`customCallback(mid, data)`, where :code:`mid` is the packet id for the disconnect request and + :code:`data` is the granted QoS for this subscription. + + *messageCallback* - Function to be called when a new message for the subscribed topic + comes in. Should be in form :code:`customCallback(client, userdata, message)`, where + :code:`message` contains :code:`topic` and :code:`payload`. Note that :code:`client` and :code:`userdata` are + here just to be aligned with the underneath Paho callback function signature. These fields are pending to be + deprecated and should not be depended on. + + **Returns** + + Subscribe request packet id, for tracking purpose in the corresponding callback. + + """ + return self._client.subscribe(topic, QoS, lambda message: messageCallback(None, None, message), ackCallback) + + def unsubscribe(self, topic): + """ + **Description** + + Unsubscribe to the desired topic. + + **Syntax** + + .. code:: python + + myAWSIoTMQTTClient.unsubscribe("myTopic") + + **Parameters** + + *topic* - Topic name or filter to unsubscribe to. + + **Returns** + + True if the unsubscribe attempt succeeded. False if failed. + + """ + done = threading.Event() + + def _unsuback_callback(): + nonlocal done + done.set() + + self._client.unsubscribe(topic, _unsuback_callback) + + done.wait() + + return True + + def unsubscribeAsync(self, topic, ackCallback=None): + """ + **Description** + + Unsubscribe to the desired topic with UNSUBACK callback. + + **Syntax** + + .. code:: python + + myAWSIoTMQTTClient.unsubscribe("myTopic", ackCallback=myUnsubackCallback) + + **Parameters** + + *topic* - Topic name or filter to unsubscribe to. + + *ackCallback* - Callback to be invoked when the client receives a UNSUBACK. Should be in form + :code:`customCallback(mid)`, where :code:`mid` is the packet id for the disconnect request. + + **Returns** + + Unsubscribe request packet id, for tracking purpose in the corresponding callback. + + """ + return self._client.unsubscribe(topic, ackCallback) + + def onOnline(self): + """ + **Description** + + Callback that gets called when the client is online. The callback registration should happen before calling + connect/connectAsync. + + **Syntax** + + .. code:: python + + # Register an onOnline callback + myAWSIoTMQTTClient.onOnline = myOnOnlineCallback + + **Parameters** + + None + + **Returns** + + None + + """ + pass + + def onOffline(self): + """ + **Description** + + Callback that gets called when the client is offline. The callback registration should happen before calling + connect/connectAsync. + + **Syntax** + + .. code:: python + + # Register an onOffline callback + myAWSIoTMQTTClient.onOffline = myOnOfflineCallback + + **Parameters** + + None + + **Returns** + + None + + """ + pass + + def onMessage(self, message): + """ + **Description** + + Callback that gets called when the client receives a new message. The callback registration should happen before + calling connect/connectAsync. This callback, if present, will always be triggered regardless of whether there is + any message callback registered upon subscribe API call. It is for the purpose to aggregating the processing of + received messages in one function. + + **Syntax** + + .. code:: python + + # Register an onMessage callback + myAWSIoTMQTTClient.onMessage = myOnMessageCallback + + **Parameters** + + *message* - Received MQTT message. It contains the source topic as :code:`message.topic`, and the payload as + :code:`message.payload`. + + **Returns** + + None + + """ + pass diff --git a/aws_crt_python/mqtt.py b/aws_crt_python/mqtt.py new file mode 100644 index 000000000..51e6c51ef --- /dev/null +++ b/aws_crt_python/mqtt.py @@ -0,0 +1,72 @@ +# Copyright 2010-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). +# You may not use this file except in compliance with the License. +# A copy of the License is located at +# +# http://aws.amazon.com/apache2.0 +# +# or in the "license" file accompanying this file. This file is distributed +# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +# express or implied. See the License for the specific language governing +# permissions and limitations under the License. + +import _aws_crt_python + +def _default_on_connect(return_code, session_present): + pass +def _default_on_disconnect(return_code): + pass + +class EventLoopGroup(object): + __slots__ = ['_internal_elg'] + + def __init__(self, num_threads): + self._internal_elg = _aws_crt_python.io_new_event_loop_group(num_threads) + +class Client(object): + __slots__ = ['_internal_connection', 'elg', 'client_id'] + + def __init__(self, event_loop_group, client_id): + + assert isinstance(event_loop_group, EventLoopGroup) + + self.elg = event_loop_group + self.client_id = client_id + + def connect(self, + host_name, port, + ca_path, key_path, certificate_path, + on_connect=_default_on_connect, + on_disconnect=_default_on_disconnect, + use_websocket=False, alpn=None, + clean_session=True, keep_alive=0): + + assert use_websocket == False + + self._internal_connection = _aws_crt_python.mqtt_new_connection( + self.elg._internal_elg, + host_name, + port, + ca_path, + key_path, + certificate_path, + alpn, + self.client_id, + keep_alive, + on_connect, + on_disconnect, + ) + + def disconnect(self): + return _aws_crt_python.mqtt_disconnect(self._internal_connection) + + def subscribe(self, topic, qos, callback, suback_callback=None): + return _aws_crt_python.mqtt_subscribe(self._internal_connection, topic, qos, callback, suback_callback) + + def unsubscribe(self, topic, unsuback_callback=None): + return _aws_crt_python.mqtt_unsubscribe(self._internal_connection, topic, unsuback_callback) + + def publish(self, topic, payload, qos, retain=False, puback_callback=None): + _aws_crt_python.mqtt_publish(self._internal_connection, topic, payload, qos, retain, puback_callback) + diff --git a/setup.py b/setup.py new file mode 100644 index 000000000..6fb10f9e2 --- /dev/null +++ b/setup.py @@ -0,0 +1,51 @@ +import setuptools +import os +import sys + +from distutils.ccompiler import get_default_compiler +compiler_type = get_default_compiler() + +cflags = [] +ldflags = [] + +if compiler_type == 'msvc': + pass +else: + cflags += ['-O0', '-fsanitize=address'] + +if sys.platform == 'darwin': + ldflags += ['-framework Security'] + +os.environ['CFLAGS'] = ' '.join(cflags) +os.environ['LDFLAGS'] = ' '.join(ldflags) + +_aws_crt_python = setuptools.Extension( + '_aws_crt_python', + language = 'c', + define_macros = [ + ('MAJOR_VERSION', '1'), + ('MINOR_VERSION', '0'), + ], + include_dirs = ['/usr/local/include', os.getenv('AWS_C_INSTALL') + '/include'], + library_dirs = ['/usr/local/lib', os.getenv('AWS_C_INSTALL') + '/lib'], + libraries = ['aws-c-common', 'aws-c-io', 'aws-c-mqtt'], + sources = ['source/module.c', 'source/io.c', 'source/mqtt.c'], +) + +setuptools.setup( + name="aws_mqtt_python", + version="0.0.1", + author="Example Author", + author_email="author@example.com", + description="A common runtime for AWS Python projects", + long_description_content_type="text/markdown", + url="https://github.com/awslabs/aws-crt-python", + packages=setuptools.find_packages(), + classifiers=[ + "Programming Language :: Python :: 2", + "Programming Language :: Python :: 3", + "License :: OSI Approved :: Apache Software License", + "Operating System :: OS Independent", + ], + ext_modules = [_aws_crt_python], +) diff --git a/source/io.c b/source/io.c new file mode 100644 index 000000000..747881c86 --- /dev/null +++ b/source/io.c @@ -0,0 +1,50 @@ +/* + * Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +#include "io.h" + +#include + +const char *s_capsule_name_elg = "aws_event_loop_group"; + +static void s_elg_destructor(PyObject *elg_capsule) { + + assert(PyCapsule_CheckExact(elg_capsule)); + + struct aws_event_loop_group *elg = PyCapsule_GetPointer(elg_capsule, s_capsule_name_elg); + assert(elg); + + aws_event_loop_group_clean_up(elg); + aws_mem_release(elg->allocator, elg); +} + +PyObject *io_new_event_loop_group(PyObject *self, PyObject *args) { + (void)self; + + struct aws_allocator *allocator = mqtt_get_python_allocator(); + + uint16_t num_threads = 0; + + if (!PyArg_ParseTuple(args, "H", &num_threads)) { + return NULL; + } + + struct aws_event_loop_group *elg = aws_mem_acquire(allocator, sizeof(struct aws_event_loop_group)); + + if (aws_event_loop_group_default_init(elg, allocator, num_threads)) { + return NULL; + } + + return PyCapsule_New(elg, s_capsule_name_elg, s_elg_destructor); +} diff --git a/source/io.h b/source/io.h new file mode 100644 index 000000000..bd74c3307 --- /dev/null +++ b/source/io.h @@ -0,0 +1,34 @@ +#ifndef AWS_CRT_PYTHON_IO_H +#define AWS_CRT_PYTHON_IO_H +/* + * Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +/** + * This file includes definitions for common aws-c-io functions. + */ + +#include "module.h" + +/** + * Name string for event_loop_group capsules. + */ +extern const char *s_capsule_name_elg; + +/** + * Create a new event_loop_group to be managed by a Python Capsule. + */ +PyObject *io_new_event_loop_group(PyObject *self, PyObject *args); + +#endif /* AWS_CRT_PYTHON_IO_H */ diff --git a/source/module.c b/source/module.c new file mode 100644 index 000000000..5dd1da83a --- /dev/null +++ b/source/module.c @@ -0,0 +1,112 @@ +/* + * Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +#include "module.h" +#include "io.h" +#include "mqtt.h" + +/******************************************************************************* + * Allocator + ******************************************************************************/ + +static void *s_python_malloc(struct aws_allocator *allocator, size_t size) { + (void)allocator; + + PyGILState_STATE state = PyGILState_Ensure(); + void *memory = PyObject_Malloc(size); + PyGILState_Release(state); + + return memory; +} + +static void s_python_free(struct aws_allocator *allocator, void *ptr) { + (void)allocator; + + PyGILState_STATE state = PyGILState_Ensure(); + PyObject_Free(ptr); + PyGILState_Release(state); +} + +static void *s_python_realloc(struct aws_allocator *allocator, void *ptr, size_t oldsize, size_t newsize) { + (void)allocator; + (void)oldsize; + + PyGILState_STATE state = PyGILState_Ensure(); + void *memory = PyObject_Realloc(ptr, newsize); + PyGILState_Release(state); + + return memory; +} + +struct aws_allocator *mqtt_get_python_allocator(void) { + static struct aws_allocator python_allocator = { + .mem_acquire = s_python_malloc, + .mem_release = s_python_free, + .mem_realloc = s_python_realloc, + }; + + return &python_allocator; +} + +/******************************************************************************* + * Definitions + ******************************************************************************/ + +static PyMethodDef s_module_methods[] = { + /* IO */ + {"io_new_event_loop_group", io_new_event_loop_group, METH_VARARGS, NULL}, + + /* MQTT */ + {"mqtt_new_connection", mqtt_new_connection, METH_VARARGS, NULL}, + {"mqtt_publish", mqtt_publish, METH_VARARGS, NULL}, + {"mqtt_subscribe", mqtt_subscribe, METH_VARARGS, NULL}, + {"mqtt_unsubscribe", mqtt_unsubscribe, METH_VARARGS, NULL}, + {"mqtt_disconnect", mqtt_disconnect, METH_VARARGS, NULL}, + + {NULL, NULL, 0, NULL}, +}; + +static const char s_module_name[] = "_aws_crt_python"; +PyDoc_STRVAR(s_module_doc, "C extension for binding AWS implementations of MQTT, HTTP, and friends"); + +/******************************************************************************* + * Module Init + ******************************************************************************/ + +#if PY_MAJOR_VERSION == 3 +# define INIT_FN PyInit__aws_crt_python +#elif PY_MAJOR_VERSION == 2 +# define INIT_FN init_aws_crt_python +#endif /* PY_MAJOR_VERSION */ + +PyMODINIT_FUNC INIT_FN(void) { + +#if PY_MAJOR_VERSION == 3 + static struct PyModuleDef s_module_def = { + PyModuleDef_HEAD_INIT, + s_module_name, + s_module_doc, + -1, /* size of per-interpreter state of the module, or -1 if the module keeps state in global variables. */ + s_module_methods, + }; + PyObject *m = PyModule_Create(&s_module_def); +#elif PY_MAJOR_VERSION == 2 + PyObject *m = Py_InitModule3(s_module_name, s_module_methods, s_module_doc); + (void)m; +#endif /* PY_MAJOR_VERSION */ + +#if PY_MAJOR_VERSION == 3 + return m; +#endif /* PY_MAJOR_VERSION */ +} diff --git a/source/module.h b/source/module.h new file mode 100644 index 000000000..29df99189 --- /dev/null +++ b/source/module.h @@ -0,0 +1,38 @@ +#ifndef AWS_CRT_PYTHON_MODULE_H +#define AWS_CRT_PYTHON_MODULE_H +/* + * Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +/** + * This file contains general helpers. + */ + +#define PY_SSIZE_T_CLEAN 1 +#include + +#include + +#if PY_MAJOR_VERSION >= 3 +#define PyString_FromStringAndSize PyUnicode_FromStringAndSize +#endif /* PY_MAJOR_VERSION */ + +/* AWS Specific Helpers */ +#define PyBool_FromAwsResult(result) PyBool_FromLong((result) == AWS_OP_SUCCESS) +#define PyString_FromAwsByteCursor(cursor) PyString_FromStringAndSize((const char *)(cursor)->ptr, (cursor)->len) + +/* Allocator that calls into PyObject_[Malloc|Free|Realloc] */ +struct aws_allocator *mqtt_get_python_allocator(void); + +#endif /* AWS_CRT_PYTHON_MODULE_H */ diff --git a/source/mqtt.c b/source/mqtt.c new file mode 100644 index 000000000..006b10058 --- /dev/null +++ b/source/mqtt.c @@ -0,0 +1,462 @@ +/* + * Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +#include "mqtt.h" +#include "io.h" + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include + +const char *s_capsule_name_mqtt_client_connection = "aws_mqtt_client_connection"; + +/******************************************************************************* + * New Connection + ******************************************************************************/ + +struct mqtt_python_connection { + struct aws_socket_options socket_options; + struct aws_mqtt_client client; + struct aws_mqtt_client_connection *connection; + + PyObject *on_connect; + PyObject *on_disconnect; +}; + +static void s_mqtt_python_connection_destructor(PyObject *connection_capsule) { + + assert(PyCapsule_CheckExact(connection_capsule)); + + struct mqtt_python_connection *connection = + PyCapsule_GetPointer(connection_capsule, s_capsule_name_mqtt_client_connection); + assert(connection); + + Py_XDECREF(connection->on_connect); + Py_XDECREF(connection->on_disconnect); + + aws_mqtt_client_connection_disconnect(connection->connection); + aws_mqtt_client_clean_up(&connection->client); + + aws_mem_release(mqtt_get_python_allocator(), connection); +} + +static void s_on_connect_failed(struct aws_mqtt_client_connection *connection, int error_code, void *user_data) { + + (void)connection; + + struct mqtt_python_connection *py_connection = user_data; + + if (py_connection->on_disconnect) { + PyGILState_STATE state = PyGILState_Ensure(); + + PyObject *result = PyObject_CallFunction(py_connection->on_disconnect, "(I)", error_code); + Py_XDECREF(result); + + PyGILState_Release(state); + } +} + +static void s_on_connect( + struct aws_mqtt_client_connection *connection, + enum aws_mqtt_connect_return_code return_code, + bool session_present, + void *user_data) { + + (void)connection; + + struct mqtt_python_connection *py_connection = user_data; + + if (py_connection->on_connect) { + PyGILState_STATE state = PyGILState_Ensure(); + + PyObject *result = + PyObject_CallFunction(py_connection->on_connect, "(IN)", return_code, PyBool_FromLong(session_present)); + Py_XDECREF(result); + + PyGILState_Release(state); + } +} + +static void s_on_disconnect(struct aws_mqtt_client_connection *connection, int error_code, void *user_data) { + + (void)connection; + + struct mqtt_python_connection *py_connection = user_data; + + if (py_connection->on_disconnect) { + PyGILState_STATE state = PyGILState_Ensure(); + + PyObject *result = PyObject_CallFunction(py_connection->on_disconnect, "(I)", error_code); + Py_XDECREF(result); + + PyGILState_Release(state); + } +} + +PyObject *mqtt_new_connection(PyObject *self, PyObject *args) { + (void)self; + + struct aws_allocator *allocator = mqtt_get_python_allocator(); + + PyObject *elg_capsule = NULL; + const char *server_name = NULL; + Py_ssize_t server_name_len = 0; + uint16_t port_number = 0; + const char *ca_path = NULL; + const char *key_path = NULL; + const char *cert_path = NULL; + const char *alpn_protocol = NULL; + const char *client_id = NULL; + Py_ssize_t client_id_len = 0; + uint16_t keep_alive_time = 0; + PyObject *on_connect = NULL; + PyObject *on_disconnect = NULL; + + if (!PyArg_ParseTuple( + args, + "Os#Hsszzs#HOO", + &elg_capsule, + &server_name, + &server_name_len, + &port_number, + &ca_path, + &key_path, + &cert_path, + &alpn_protocol, + &client_id, + &client_id_len, + &keep_alive_time, + &on_connect, + &on_disconnect)) { + return NULL; + } + + struct mqtt_python_connection *connection = aws_mem_acquire(allocator, sizeof(struct mqtt_python_connection)); + if (!connection) { + return PyErr_NoMemory(); + } + AWS_ZERO_STRUCT(*connection); + + if (!elg_capsule || !PyCapsule_CheckExact(elg_capsule)) { + PyErr_SetNone(PyExc_ValueError); + return NULL; + } + struct aws_event_loop_group *elg = PyCapsule_GetPointer(elg_capsule, s_capsule_name_elg); + + if (on_connect && PyCallable_Check(on_connect)) { + Py_INCREF(on_connect); + connection->on_connect = on_connect; + } + + if (on_disconnect && PyCallable_Check(on_disconnect)) { + Py_INCREF(on_disconnect); + connection->on_disconnect = on_disconnect; + } + + struct aws_tls_ctx_options tls_ctx_opt; + aws_tls_ctx_options_init_client_mtls(&tls_ctx_opt, cert_path, key_path); + if (ca_path) { + aws_tls_ctx_options_override_default_trust_store(&tls_ctx_opt, NULL, ca_path); + } + if (alpn_protocol) { + aws_tls_ctx_options_set_alpn_list(&tls_ctx_opt, alpn_protocol); + } + + AWS_ZERO_STRUCT(connection->socket_options); + connection->socket_options.connect_timeout_ms = 3000; + connection->socket_options.type = AWS_SOCKET_STREAM; + connection->socket_options.domain = AWS_SOCKET_IPV4; + + struct aws_mqtt_client_connection_callbacks callbacks; + AWS_ZERO_STRUCT(callbacks); + callbacks.on_connection_failed = s_on_connect_failed; + callbacks.on_connack = s_on_connect; + callbacks.on_disconnect = s_on_disconnect; + callbacks.user_data = connection; + + aws_mqtt_client_init(&connection->client, allocator, elg); + + struct aws_byte_cursor server_name_cur = aws_byte_cursor_from_array(server_name, server_name_len); + + connection->connection = aws_mqtt_client_connection_new( + &connection->client, callbacks, &server_name_cur, port_number, &connection->socket_options, &tls_ctx_opt); + + if (!connection->connection) { + return PyErr_NoMemory(); + } + + struct aws_byte_cursor client_id_cur = aws_byte_cursor_from_array(client_id, client_id_len); + aws_mqtt_client_connection_connect(connection->connection, &client_id_cur, true, keep_alive_time); + + return PyCapsule_New(connection, s_capsule_name_mqtt_client_connection, s_mqtt_python_connection_destructor); +} + +/******************************************************************************* + * Publish + ******************************************************************************/ + +struct publish_complete_userdata { + Py_buffer payload; + PyObject *callback; +}; + +static void s_publish_complete(struct aws_mqtt_client_connection *connection, void *userdata) { + + struct publish_complete_userdata *metadata = userdata; + if (metadata) { + + if (metadata->callback) { + + PyGILState_STATE state = PyGILState_Ensure(); + + PyObject_CallFunction(metadata->callback, NULL); + Py_DECREF(metadata->callback); + + PyGILState_Release(state); + } + + PyBuffer_Release(&metadata->payload); + aws_mem_release(mqtt_get_python_allocator(), metadata); + } +} + +PyObject *mqtt_publish(PyObject *self, PyObject *args) { + (void)self; + + PyObject *impl_capsule = NULL; + const char *topic; + Py_ssize_t topic_len; + Py_buffer payload_stack; + AWS_ZERO_STRUCT(payload_stack); + uint8_t qos_val = AWS_MQTT_QOS_AT_MOST_ONCE; + PyObject *retain = NULL; + PyObject *puback_callback = NULL; + + if (!PyArg_ParseTuple( + args, "Os#s*bOO", &impl_capsule, &topic, &topic_len, &payload_stack, &qos_val, &retain, &puback_callback)) { + return NULL; + } + + if (!impl_capsule || !PyCapsule_CheckExact(impl_capsule)) { + PyErr_SetNone(PyExc_TypeError); + return NULL; + } + + struct mqtt_python_connection *connection = + PyCapsule_GetPointer(impl_capsule, s_capsule_name_mqtt_client_connection); + + if (qos_val > 3) { + PyErr_SetNone(PyExc_ValueError); + return NULL; + } + if (puback_callback && PyCallable_Check(puback_callback)) { + Py_INCREF(puback_callback); + } else { + puback_callback = NULL; + } + + struct publish_complete_userdata *metadata = NULL; + + struct aws_byte_cursor payload_cursor; + AWS_ZERO_STRUCT(payload_cursor); + + /* Heap allocate payload so that it may persist */ + if (payload_stack.len > 0 || puback_callback) { + metadata = aws_mem_acquire(mqtt_get_python_allocator(), sizeof(struct publish_complete_userdata)); + if (!metadata) { + return PyErr_NoMemory(); + } + + memcpy(&metadata->payload, &payload_stack, sizeof(Py_buffer)); + metadata->callback = puback_callback; + + payload_cursor = aws_byte_cursor_from_array(metadata->payload.buf, metadata->payload.len); + } + + struct aws_byte_cursor topic_cursor = aws_byte_cursor_from_array(topic, topic_len); + enum aws_mqtt_qos qos = (enum aws_mqtt_qos)qos_val; + + aws_mqtt_client_connection_publish( + connection->connection, &topic_cursor, qos, retain == Py_True, &payload_cursor, s_publish_complete, metadata); + + Py_RETURN_NONE; +} + +/******************************************************************************* + * Callback + ******************************************************************************/ + +static void s_subscribe_callback( + struct aws_mqtt_client_connection *connection, + const struct aws_byte_cursor *topic, + const struct aws_byte_cursor *payload, + void *user_data) { + + (void)connection; + + PyGILState_STATE state = PyGILState_Ensure(); + + PyObject *callback = user_data; + + PyObject *result = PyObject_CallFunction( + callback, "(NN)", PyString_FromAwsByteCursor(topic), PyString_FromAwsByteCursor(payload)); + + if (!result) { + PyErr_WriteUnraisable(PyErr_Occurred()); + abort(); + } + Py_XDECREF(result); + + PyGILState_Release(state); +} + +static void s_suback_callback(struct aws_mqtt_client_connection *connection, void *userdata) { + (void)connection; + PyObject *unsuback_callback = userdata; + + if (unsuback_callback) { + + PyGILState_STATE state = PyGILState_Ensure(); + + PyObject_CallFunction(unsuback_callback, NULL); + Py_DECREF(unsuback_callback); + + PyGILState_Release(state); + } +} + +PyObject *mqtt_subscribe(PyObject *self, PyObject *args) { + (void)self; + + PyObject *impl_capsule = NULL; + const char *topic = NULL; + Py_ssize_t topic_len = 0; + uint8_t qos_val = 0; + PyObject *callback = NULL; + PyObject *suback_callback = NULL; + + if (!PyArg_ParseTuple(args, "Os#bOO", &impl_capsule, &topic, &topic_len, &qos_val, &callback, &suback_callback)) { + return NULL; + } + + if (!impl_capsule || !PyCapsule_CheckExact(impl_capsule)) { + PyErr_SetNone(PyExc_TypeError); + return NULL; + } + + if (!callback || !PyCallable_Check(callback)) { + PyErr_SetNone(PyExc_ValueError); + return NULL; + } + Py_INCREF(callback); + + if (suback_callback && PyCallable_Check(suback_callback)) { + Py_INCREF(suback_callback); + } else { + suback_callback = NULL; + } + + struct mqtt_python_connection *connection = + PyCapsule_GetPointer(impl_capsule, s_capsule_name_mqtt_client_connection); + + if (qos_val > 3) { + PyErr_SetNone(PyExc_ValueError); + } + + struct aws_byte_cursor topic_filter = aws_byte_cursor_from_array(topic, topic_len); + int result = aws_mqtt_client_connection_subscribe( + connection->connection, + &topic_filter, + qos_val, + s_subscribe_callback, + callback, + s_suback_callback, + suback_callback); + + return PyBool_FromAwsResult(result); +} + +/******************************************************************************* + * Unsubscribe + ******************************************************************************/ + +PyObject *mqtt_unsubscribe(PyObject *self, PyObject *args) { + (void)self; + + PyObject *impl_capsule = NULL; + const char *topic = NULL; + Py_ssize_t topic_len = 0; + PyObject *unsuback_callback = NULL; + + if (!PyArg_ParseTuple(args, "Os#O", &impl_capsule, &topic, &topic_len, &unsuback_callback)) { + return NULL; + } + + if (!impl_capsule || !PyCapsule_CheckExact(impl_capsule)) { + PyErr_SetNone(PyExc_TypeError); + return NULL; + } + + if (unsuback_callback && PyCallable_Check(unsuback_callback)) { + Py_INCREF(unsuback_callback); + } else { + unsuback_callback = NULL; + } + + struct mqtt_python_connection *connection = + PyCapsule_GetPointer(impl_capsule, s_capsule_name_mqtt_client_connection); + + struct aws_byte_cursor filter = aws_byte_cursor_from_array(topic, topic_len); + int result = + aws_mqtt_client_connection_unsubscribe(connection->connection, &filter, s_suback_callback, unsuback_callback); + + return PyBool_FromAwsResult(result); +} + +/******************************************************************************* + * Disconnect + ******************************************************************************/ + +PyObject *mqtt_disconnect(PyObject *self, PyObject *args) { + (void)self; + + PyObject *impl_capsule = NULL; + + if (!PyArg_ParseTuple(args, "O", &impl_capsule)) { + return NULL; + } + + if (!impl_capsule || !PyCapsule_CheckExact(impl_capsule)) { + PyErr_SetNone(PyExc_TypeError); + return NULL; + } + + struct mqtt_python_connection *connection = + PyCapsule_GetPointer(impl_capsule, s_capsule_name_mqtt_client_connection); + + int result = aws_mqtt_client_connection_disconnect(connection->connection); + + return PyBool_FromAwsResult(result); +} diff --git a/source/mqtt.h b/source/mqtt.h new file mode 100644 index 000000000..6fc82ed72 --- /dev/null +++ b/source/mqtt.h @@ -0,0 +1,37 @@ +#ifndef AWS_CRT_PYTHON_MQTT_H +#define AWS_CRT_PYTHON_MQTT_H +/* + * Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +/** + * This file includes definitions for MQTT specific functions. + */ + +#include "module.h" + +#include + +/** + * Name string for client_connection capsules. + */ +extern const char *s_capsule_name_mqtt_client_connection; + +PyObject *mqtt_new_connection(PyObject *self, PyObject *args); +PyObject *mqtt_publish(PyObject *self, PyObject *args); +PyObject *mqtt_subscribe(PyObject *self, PyObject *args); +PyObject *mqtt_unsubscribe(PyObject *self, PyObject *args); +PyObject *mqtt_disconnect(PyObject *self, PyObject *args); + +#endif /* AWS_CRT_PYTHON_MQTT_H */ diff --git a/test.py b/test.py new file mode 100644 index 000000000..6febaad72 --- /dev/null +++ b/test.py @@ -0,0 +1,64 @@ +from aws_crt_python import mqtt, iot +from AWSIoTPythonSDK import MQTTLib +import time, threading +from timeit import default_timer as timer + +messages_recieved = 0 +messages_recieved_cv = threading.Condition() + +def iot_on_connect(): + print("iot connected") + +def iot_on_message(client, userdata, message): + global messages_recieved + + with messages_recieved_cv: + messages_recieved += 1 + messages_recieved_cv.notify() + +def iot_on_disconnect(): + print("iot disconnected") + +client = iot.AWSIoTMQTTClient +# client = MQTTLib.AWSIoTMQTTClient + +iot_client = client("coldens iot client") +iot_client.onOnline = iot_on_connect +iot_client.onOffline = iot_on_disconnect +iot_client.configureEndpoint("a1ba5f1mpna9k5-ats.iot.us-east-1.amazonaws.com", 8883) +iot_client.configureCredentials("AmazonRootCA1.pem", "9f0631f03a-private.pem.key", "9f0631f03a-certificate.pem.crt") + +print("connecting...") +iot_client.connect() + +# MQTT subscribes +print("subscribing...") +iot_client.subscribe("a", 1, iot_on_message) + +print("publishing...") + +begin_publish = timer() + +num_publishes = 100 +for i in range(0, num_publishes): + # Publish data to the mqtt client + iot_client.publishAsync("a", "REQUEST", 1) + +end_publish = timer() + +with messages_recieved_cv: + while messages_recieved < num_publishes: + messages_recieved_cv.wait() + +pubacks_gotten = timer() + +print("unsubscribing...") +iot_client.unsubscribe("a") + +print("disconnecting...") + +iot_client.disconnect() + +print("Publish time: {}\nTotal time: {}".format(end_publish - begin_publish, pubacks_gotten - begin_publish)) + +print("exiting...")