Skip to content

Commit

Permalink
Merge pull request #87 from B1ob/convert_to_bleak
Browse files Browse the repository at this point in the history
Converted from the bluepy library to bleak library for bluetooth access. Solves incompatibility with HA 2022.7 + home-assistant/core#73830
  • Loading branch information
MartyTremblay authored Jul 14, 2022
2 parents 6fcef04 + 304cfee commit ef4d7ab
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 114 deletions.
234 changes: 127 additions & 107 deletions custom_components/airthings_wave/airthings.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,46 @@
# Copyright (c) 2021 Martin Tremblay, Mark McCans
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

import struct
import time
from collections import namedtuple

import logging
from datetime import datetime

import bluepy.btle as btle
from bleak import BleakClient
from bleak import BleakScanner
import asyncio

from uuid import UUID

_LOGGER = logging.getLogger(__name__)

# Use full UUID since we do not use UUID from bluepy.btle
CHAR_UUID_CCCD = btle.UUID('2902') # Client Characteristic Configuration Descriptor (CCCD)
# Use full UUID since we do not use UUID from bluetooth library
CHAR_UUID_MANUFACTURER_NAME = UUID('00002a29-0000-1000-8000-00805f9b34fb')
CHAR_UUID_SERIAL_NUMBER_STRING = UUID('00002a25-0000-1000-8000-00805f9b34fb')
CHAR_UUID_MODEL_NUMBER_STRING = UUID('00002a24-0000-1000-8000-00805f9b34fb')
CHAR_UUID_DEVICE_NAME = UUID('00002a00-0000-1000-8000-00805f9b34fb')
CHAR_UUID_FIRMWARE_REV = UUID('00002a26-0000-1000-8000-00805f9b34fb')
CHAR_UUID_HARDWARE_REV = UUID('00002a27-0000-1000-8000-00805f9b34fb')

CHAR_UUID_DATETIME = UUID('00002a08-0000-1000-8000-00805f9b34fb')
CHAR_UUID_TEMPERATURE = UUID('00002a6e-0000-1000-8000-00805f9b34fb')
CHAR_UUID_HUMIDITY = UUID('00002a6f-0000-1000-8000-00805f9b34fb')
Expand All @@ -34,18 +58,22 @@
device_info_characteristics = [manufacturer_characteristics,
Characteristic(CHAR_UUID_SERIAL_NUMBER_STRING, 'serial_nr', "utf-8"),
Characteristic(CHAR_UUID_MODEL_NUMBER_STRING, 'model_nr', "utf-8"),
Characteristic(CHAR_UUID_DEVICE_NAME, 'device_name', "utf-8")]
Characteristic(CHAR_UUID_DEVICE_NAME, 'device_name', "utf-8"),
Characteristic(CHAR_UUID_FIRMWARE_REV, 'firmware_rev', "utf-8"),
Characteristic(CHAR_UUID_HARDWARE_REV, 'hardware_rev', "utf-8")]

class AirthingsDeviceInfo:
def __init__(self, manufacturer='', serial_nr='', model_nr='', device_name=''):
def __init__(self, manufacturer='', serial_nr='', model_nr='', device_name='', firmware_rev='', hardware_rev=''):
self.manufacturer = manufacturer
self.serial_nr = serial_nr
self.model_nr = model_nr
self.device_name = device_name
self.firmware_rev = firmware_rev
self.hardware_rev = hardware_rev

def __str__(self):
return "Manufacturer: {} Model: {} Serial: {} Device:{}".format(
self.manufacturer, self.model_nr, self.serial_nr, self.device_name)
return "Manufacturer: {} Model: {} Serial: {} Device: {} Firmware: {} Hardware Rev.: {}".format(
self.manufacturer, self.model_nr, self.serial_nr, self.device_name, self.firmware_rev, self.hardware_rev)


sensors_characteristics_uuid = [CHAR_UUID_DATETIME, CHAR_UUID_TEMPERATURE, CHAR_UUID_HUMIDITY, CHAR_UUID_RADON_1DAYAVG,
Expand Down Expand Up @@ -157,20 +185,6 @@ def decode_data(self, raw_data):

return res


class MyDelegate(btle.DefaultDelegate):
def __init__(self):
btle.DefaultDelegate.__init__(self)
# ... initialise here
self.data = None

def handleNotification(self, cHandle, data):
if self.data is None:
self.data = data
else:
self.data = self.data + data


sensor_decoders = {str(CHAR_UUID_WAVE_PLUS_DATA):WavePlussDecode(name="Pluss", format_type='BBBBHHHHHHHH', scale=0),
str(CHAR_UUID_DATETIME):WaveDecodeDate(name="date_time", format_type='HBBBBB', scale=0),
str(CHAR_UUID_HUMIDITY):BaseDecode(name="humidity", format_type='H', scale=1.0/100.0),
Expand All @@ -192,152 +206,158 @@ def __init__(self, scan_interval, mac=None):
self.scan_interval = scan_interval
self.last_scan = -1
self._dev = None

def _parse_serial_number(self, manufacturer_data):
try:
(ID, SN, _) = struct.unpack("<HLH", manufacturer_data)
except Exception as e: # Return None for non-Airthings devices
return None
else: # Executes only if try-block succeeds
if ID == 0x0334:
return SN

def find_devices(self, scans=50, timeout=0.1):
self._command_data = None

def notification_handler(self, sender, data):
_LOGGER.debug("Notification handler: {0}: {1}".format(sender, data))
self._command_data = data
self._event.set()

async def find_devices(self, scans=2, timeout=5):
# Search for devices, scan for BLE devices scans times for timeout seconds
# Get manufacturer data and try to match it to airthings ID.
scanner = btle.Scanner()

_LOGGER.debug("Scanning for airthings devices")
for _count in range(scans):
advertisements = scanner.scan(timeout)
advertisements = await BleakScanner.discover(timeout)
for adv in advertisements:
sn = self._parse_serial_number(adv.getValue(btle.ScanEntry.MANUFACTURER))
if sn is not None:
if adv.addr not in self.airthing_devices:
self.airthing_devices.append(adv.addr)
if 820 in adv.metadata["manufacturer_data"]: # TODO: Not sure if this is the best way to identify Airthings devices
if adv.address not in self.airthing_devices:
self.airthing_devices.append(adv.address)

_LOGGER.debug("Found {} airthings devices".format(len(self.airthing_devices)))
return len(self.airthing_devices)

def connect(self, mac, retries=10):
async def connect(self, mac, retries=10):
_LOGGER.debug("Connecting to {}".format(mac))
await self.disconnect()
tries = 0
self.disconnect()
while (tries < retries):
tries += 1
try:
self._dev = btle.Peripheral(mac.lower())
self.delgate = MyDelegate()
self._dev.withDelegate( self.delgate )
break
self._dev = BleakClient(mac.lower())
ret = await self._dev.connect()
if ret:
_LOGGER.debug("Connected to {}".format(mac))
break
except Exception as e:
print(e)
if tries == retries:
_LOGGER.info("Not able to connect to {}".format(mac))
pass
else:
_LOGGER.debug("Retrying {}".format(mac))

def disconnect(self):
async def disconnect(self):
if self._dev is not None:
self._dev.disconnect()
await self._dev.disconnect()
self._dev = None

def get_info(self):
async def get_info(self):
# Try to get some info from the discovered airthings devices
self.devices = {}
for mac in self.airthing_devices:
self.connect(mac)
if self._dev is not None:
device = AirthingsDeviceInfo(serial_nr=mac)
for characteristic in device_info_characteristics:
try:
char = self._dev.getCharacteristics(uuid=characteristic.uuid)[0]
data = char.read()
setattr(device, characteristic.name, data.decode(characteristic.format))
except btle.BTLEDisconnectError:
_LOGGER.exception("Disconnected")
self._dev = None

self.devices[mac] = device
self.disconnect()
await self.connect(mac)
if self._dev is not None and self._dev.is_connected:
try:
if self._dev is not None and self._dev.is_connected:
device = AirthingsDeviceInfo(serial_nr=mac)
for characteristic in device_info_characteristics:
try:
data = await self._dev.read_gatt_char(characteristic.uuid)
setattr(device, characteristic.name, data.decode(characteristic.format))
except:
_LOGGER.exception("Error getting info")
self._dev = None
self.devices[mac] = device
except:
_LOGGER.exception("Error getting device info.")
await self.disconnect()
else:
_LOGGER.error("Not getting device info because failed to connect to device.")
return self.devices

def get_sensors(self):
async def get_sensors(self):
self.sensors = {}
for mac in self.airthing_devices:
self.connect(mac)
if self._dev is not None:
try:
characteristics = self._dev.getCharacteristics()
sensor_characteristics = []
for characteristic in characteristics:
await self.connect(mac)
if self._dev is not None and self._dev.is_connected:
sensor_characteristics = []
svcs = await self._dev.get_services()
for service in svcs:
for characteristic in service.characteristics:
_LOGGER.debug(characteristic)
if characteristic.uuid in sensors_characteristics_uuid_str:
sensor_characteristics.append(characteristic)
self.sensors[mac] = sensor_characteristics
except btle.BTLEDisconnectError:
_LOGGER.exception("Disconnected")
self._dev = None
self.disconnect()
self.sensors[mac] = sensor_characteristics
await self.disconnect()
return self.sensors

def get_sensor_data(self):
async def get_sensor_data(self):
if time.monotonic() - self.last_scan > self.scan_interval or self.last_scan == -1:
self.last_scan = time.monotonic()
for mac, characteristics in self.sensors.items():
self.connect(mac)
if self._dev is not None:
await self.connect(mac)
if self._dev is not None and self._dev.is_connected:
try:
for characteristic in characteristics:
sensor_data = None
if str(characteristic.uuid) in sensor_decoders:
char = self._dev.getCharacteristics(uuid=characteristic.uuid)[0]
data = char.read()
data = await self._dev.read_gatt_char(characteristic.uuid)
sensor_data = sensor_decoders[str(characteristic.uuid)].decode_data(data)
_LOGGER.debug("{} Got sensordata {}".format(mac, sensor_data))

if str(characteristic.uuid) in command_decoders:
self.delgate.data = None # Clear the delegate so it is ready for new data.
char = self._dev.getCharacteristics(uuid=characteristic.uuid)[0]
# Do these steps to get notification to work, I do not know how it works, this link should explain it
# https://devzone.nordicsemi.com/guides/short-range-guides/b/bluetooth-low-energy/posts/ble-characteristics-a-beginners-tutorial
desc, = char.getDescriptors(forUUID=CHAR_UUID_CCCD)
desc.write(struct.pack('<H', 1), True)
char.write(command_decoders[str(characteristic.uuid)].cmd)
for i in range(3):
if self._dev.waitForNotifications(0.1):
_LOGGER.debug("Received notification, total data received len {}".format(len(self.delgate.data)))

sensor_data = command_decoders[str(characteristic.uuid)].decode_data(self.delgate.data)
_LOGGER.debug("{} Got cmddata {}".format(mac, sensor_data))
_LOGGER.debug("command characteristic: {}".format(characteristic.uuid))
# Create an Event object.
self._event = asyncio.Event()
# Set up the notification handlers
await self._dev.start_notify(characteristic.uuid, self.notification_handler)
# send command to this 'indicate' characteristic
await self._dev.write_gatt_char(characteristic.uuid, command_decoders[str(characteristic.uuid)].cmd)
# Wait for up to one second to see if a callblack comes in.
try:
await asyncio.wait_for(self._event.wait(), 1)
except asyncio.TimeoutError:
_LOGGER.warn("Timeout getting command data.")
if self._command_data is not None:
sensor_data = command_decoders[str(characteristic.uuid)].decode_data(self._command_data)
self._command_data = None
# Stop notification handler
await self._dev.stop_notify(characteristic.uuid)

if sensor_data is not None:
if self.sensordata.get(mac) is None:
self.sensordata[mac] = sensor_data
else:
self.sensordata[mac].update(sensor_data)

except btle.BTLEDisconnectError:
_LOGGER.exception("Disconnected")
self.sensordata[mac].update(sensor_data)
except:
_LOGGER.exception("Error getting sensor data.")
self._dev = None
self.disconnect()

return self.sensordata
await self.disconnect()

return self.sensordata

if __name__ == "__main__":
async def main():
logging.basicConfig()
_LOGGER.setLevel(logging.DEBUG)
ad = AirthingsWaveDetect(0)
num_dev_found = ad.find_devices()
num_dev_found = await ad.find_devices()
if num_dev_found > 0:
devices = ad.get_info()
devices = await ad.get_info()
for mac, dev in devices.items():
_LOGGER.info("{}: {}".format(mac, dev))
_LOGGER.info("Device: {}: {}".format(mac, dev))

devices_sensors = ad.get_sensors()
devices_sensors = await ad.get_sensors()
for mac, sensors in devices_sensors.items():
for sensor in sensors:
_LOGGER.info("{}: {}".format(mac, sensor))
_LOGGER.info("Sensor: {}: {}".format(mac, sensor))

sensordata = ad.get_sensor_data()
sensordata = await ad.get_sensor_data()
for mac, data in sensordata.items():
for name, val in data.items():
_LOGGER.info("{}: {}: {}".format(mac, name, val))
_LOGGER.info("Sensor data: {}: {}: {}".format(mac, name, val))


if __name__ == "__main__":
asyncio.run(main())
4 changes: 2 additions & 2 deletions custom_components/airthings_wave/manifest.json
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
{
"domain": "airthings_wave",
"name": "Airthings Wave",
"version": "3.1.0",
"version": "4.0.0",
"documentation": "https://github.com/custom-components/sensor.airthings_wave/",
"issue_tracker": "https://github.com/custom-components/sensor.airthings_wave/issues",
"dependencies": [],
"codeowners": ["@MartyTremblay","@sverrham"],
"requirements": [
"bluepy==1.3.0"
"bleak==0.14.3"
]
}
Loading

0 comments on commit ef4d7ab

Please sign in to comment.