-
Notifications
You must be signed in to change notification settings - Fork 1
/
bgapi.py
751 lines (637 loc) · 29 KB
/
bgapi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
from __future__ import print_function
# for Python 2/3 compatibility
try:
import queue
except ImportError:
import Queue as queue
import logging
import serial
import time
import threading
from binascii import hexlify, unhexlify
from uuid import UUID
from enum import Enum
from collections import defaultdict
from pygatt.exceptions import NotConnectedError
from pygatt.backends import BLEBackend, Characteristic, BLEAddressType
from pygatt.util import uuid16_to_uuid
from . import bglib, constants
from .exceptions import BGAPIError, ExpectedResponseTimeout
from .device import BGAPIBLEDevice
from .bglib import EventPacketType, ResponsePacketType
from .packets import BGAPICommandPacketBuilder as CommandBuilder
from .error_codes import get_return_message
from .util import find_usb_serial_devices
try:
import termios
except ImportError:
# Running in Windows (not Linux/OS X/Cygwin)
serial_exception = RuntimeError
else:
serial_exception = termios.error
log = logging.getLogger(__name__)
BLED112_VENDOR_ID = 0x2458
BLED112_PRODUCT_ID = 0x0001
MAX_CONNECTION_ATTEMPTS = 10
UUIDType = Enum('UUIDType', ['custom', 'service', 'attribute',
'descriptor', 'characteristic'])
def _timed_out(start_time, timeout):
return time.time() - start_time > timeout
def bgapi_address_to_hex(address):
address = hexlify(bytearray(
list(reversed(address)))).upper().decode('ascii')
return ':'.join(''.join(pair) for pair in zip(*[iter(address)] * 2))
class AdvertisingAndScanInfo(object):
"""
Holds the advertising and scan response packet data from a device at a given
address.
"""
def __init__(self):
self.name = ""
self.address = ""
self.rssi = None
self.packet_data = {
# scan_response_packet_type[xxx]: data_dictionary,
}
class BGAPIBackend(BLEBackend):
"""
A BLE backend for a BGAPI compatible USB adapter.
"""
def __init__(self, serial_port=None, receive_queue_timeout=0.1):
"""
Initialize the backend, but don't start the USB connection yet. Must
call .start().
serial_port -- The name of the serial port for the BGAPI-compatible
USB interface. If not provided, will attempt to auto-detect.
"""
self._lib = bglib.BGLib()
self._serial_port = serial_port
self._receive_queue_timeout = receive_queue_timeout
self._ser = None
self._receiver = None
self._running = None
self._lock = threading.Lock()
# buffer for packets received
self._receiver_queue = queue.Queue()
# State
self._num_bonds = 0 # number of bonds stored on the adapter
self._stored_bonds = [] # bond handles stored on the adapter
self._devices_discovered = {
# 'address': AdvertisingAndScanInfo,
# Note: address formatted like "01:23:45:67:89:AB"
}
self._characteristics = defaultdict(dict)
self._connections = {}
self._current_characteristic = None # used in char/descriptor discovery
self._packet_handlers = {
ResponsePacketType.sm_get_bonds: self._ble_rsp_sm_get_bonds,
EventPacketType.attclient_attribute_value: (
self._ble_evt_attclient_attribute_value),
EventPacketType.attclient_find_information_found: (
self._ble_evt_attclient_find_information_found),
EventPacketType.connection_status: self._ble_evt_connection_status,
EventPacketType.connection_disconnected: (
self._ble_evt_connection_disconnected),
EventPacketType.gap_scan_response: self._ble_evt_gap_scan_response,
EventPacketType.sm_bond_status: self._ble_evt_sm_bond_status,
}
log.info("Initialized new BGAPI backend")
def _detect_device_port(self):
log.info("Auto-detecting serial port for BLED112")
detected_devices = find_usb_serial_devices(
vendor_id=BLED112_VENDOR_ID,
product_id=BLED112_PRODUCT_ID)
if len(detected_devices) == 0:
raise BGAPIError("Unable to auto-detect BLED112 serial port")
log.info("Found BLED112 on serial port %s",
detected_devices[0].port_name)
return detected_devices[0].port_name
def _open_serial_port(self,
max_connection_attempts=MAX_CONNECTION_ATTEMPTS):
"""
Open a connection to the named serial port, or auto-detect the first
port matching the BLED device. This will wait until data can actually be
read from the connection, so it will not return until the device is
fully booted.
max_connection_attempts -- Max number of times to retry
detecting and connecting to a device.
Raises a NotConnectedError if the device cannot connect after 10
attempts, with a short pause in between each attempt.
"""
for attempt in range(max_connection_attempts):
log.debug("Opening connection to serial port (attempt %d)",
attempt + 1)
try:
serial_port = self._serial_port or self._detect_device_port()
self._ser = None
self._ser = serial.Serial(serial_port, baudrate=115200,
timeout=0.25)
# Wait until we can actually read from the device
self._ser.read()
break
except (BGAPIError, serial.serialutil.SerialException,
serial_exception):
log.debug("Failed to open serial port", exc_info=True)
if self._ser:
self._ser.close()
# elif attempt == 0:
# raise NotConnectedError(
# "No BGAPI compatible device detected")
self._ser = None
time.sleep(0.25)
else:
raise NotConnectedError("Unable to reconnect with USB "
"device after rebooting")
def start(self):
"""
Connect to the USB adapter, reset it's state and start a backgroud
receiver thread.
"""
if self._running and self._running.is_set():
self.stop()
# Fail immediately if no device is attached, don't retry waiting for one
# to be plugged in.
self._open_serial_port(max_connection_attempts=1)
log.info("Resetting and reconnecting to device for a clean environment")
# Blow everything away and start anew.
# Only way to be sure is to burn it down and start again.
# (Aka reset remote state machine)
# Note: Could make this a conditional based on parameter if this
# happens to be too slow on some systems.
# The zero param just means we want to do a normal restart instead of
# starting a firmware update restart.
self.send_command(CommandBuilder.system_reset(0))
self._ser.flush()
self._ser.close()
self._open_serial_port()
self._receiver = threading.Thread(target=self._receive)
self._receiver.daemon = True
self._running = threading.Event()
self._running.set()
self._receiver.start()
self.disable_advertising()
self.set_bondable(False)
# Stop any ongoing procedure
log.debug("Stopping any outstanding GAP procedure")
self.send_command(CommandBuilder.gap_end_procedure())
try:
self.expect(ResponsePacketType.gap_end_procedure)
except BGAPIError:
# Ignore any errors if there was no GAP procedure running
pass
def stop(self):
for device in self._connections.values():
try:
device.disconnect()
except NotConnectedError:
pass
if self._running:
if self._running.is_set():
log.info('Stopping')
self._running.clear()
if self._receiver:
self._receiver.join()
self._receiver = None
if self._ser:
self._ser.close()
self._ser = None
def set_bondable(self, bondable):
self.send_command(
CommandBuilder.sm_set_bondable_mode(
constants.bondable['yes' if bondable else 'no']))
self.expect(ResponsePacketType.sm_set_bondable_mode)
def disable_advertising(self):
log.info("Disabling advertising")
self.send_command(
CommandBuilder.gap_set_mode(
constants.gap_discoverable_mode['non_discoverable'],
constants.gap_connectable_mode['non_connectable']))
self.expect(ResponsePacketType.gap_set_mode)
def send_command(self, *args, **kwargs):
with self._lock:
if self._ser is None:
log.warn("Unexpectedly not connected to USB device")
raise NotConnectedError()
return self._lib.send_command(self._ser, *args, **kwargs)
def clear_bond(self, address=None):
"""
Delete the bonds stored on the adapter.
address - the address of the device to unbond. If not provided, will
erase all bonds.
Note: this does not delete the corresponding bond stored on the remote
device.
"""
# Find bonds
log.info("Fetching existing bonds for devices")
self._stored_bonds = []
self.send_command(CommandBuilder.sm_get_bonds())
try:
self.expect(ResponsePacketType.sm_get_bonds)
except NotConnectedError:
pass
if self._num_bonds == 0:
return
while len(self._stored_bonds) < self._num_bonds:
self.expect(EventPacketType.sm_bond_status)
for b in reversed(self._stored_bonds):
log.info("Deleting bond %s", b)
self.send_command(CommandBuilder.sm_delete_bonding(b))
self.expect(ResponsePacketType.sm_delete_bonding)
def scan(self, timeout=10, scan_interval=75, scan_window=50, active=True,
discover_mode=constants.gap_discover_mode['observation'],
**kwargs):
"""
Perform a scan to discover BLE devices.
timeout -- the number of seconds this scan should last.
scan_interval -- the number of miliseconds until scanning is restarted.
scan_window -- the number of miliseconds the scanner will listen on one
frequency for advertisement packets.
active -- True --> ask sender for scan response data. False --> don't.
discover_mode -- one of the gap_discover_mode constants.
"""
parameters = 1 if active else 0
# NOTE: the documentation seems to say that the times are in units of
# 625us but the ranges it gives correspond to units of 1ms....
self.send_command(
CommandBuilder.gap_set_scan_parameters(
scan_interval, scan_window, parameters
))
self.expect(ResponsePacketType.gap_set_scan_parameters)
log.info("Starting an %s scan", "active" if active else "passive")
self.send_command(CommandBuilder.gap_discover(discover_mode))
self.expect(ResponsePacketType.gap_discover)
log.info("Pausing for %ds to allow scan to complete", timeout)
time.sleep(timeout)
log.info("Stopping scan")
self.send_command(CommandBuilder.gap_end_procedure())
self.expect(ResponsePacketType.gap_end_procedure)
devices = []
for address, info in self._devices_discovered.items():
devices.append({
'address': address,
'name': info.name,
'rssi': info.rssi,
'packet_data': info.packet_data
})
log.info("Discovered %d devices: %s", len(devices), devices)
self._devices_discovered = {}
return devices
def _end_procedure(self):
self.send_command(CommandBuilder.gap_end_procedure())
self.expect(ResponsePacketType.gap_end_procedure)
def connect(self, address, timeout=5,
address_type=BLEAddressType.public,
interval_min=60, interval_max=76, supervision_timeout=100,
latency=0):
"""
Connnect directly to a device given the ble address then discovers and
stores the characteristic and characteristic descriptor handles.
Requires that the adapter is not connected to a device already.
address -- a bytearray containing the device mac address.
timeout -- number of seconds to wait before returning if not connected.
address_type -- one of BLEAddressType's values, either public or random.
Raises BGAPIError or NotConnectedError on failure.
"""
address_bytes = bytearray(unhexlify(address.replace(":", "")))
for device in self._connections.values():
if device._address == bgapi_address_to_hex(address_bytes):
return device
log.info("Connecting to device at address %s (timeout %ds)",
address, timeout)
self.set_bondable(False)
if address_type == BLEAddressType.public:
addr_type = constants.ble_address_type['gap_address_type_public']
else:
addr_type = constants.ble_address_type['gap_address_type_random']
self.send_command(
CommandBuilder.gap_connect_direct(
address_bytes, addr_type, interval_min, interval_max,
supervision_timeout, latency))
try:
self.expect(ResponsePacketType.gap_connect_direct)
_, packet = self.expect(EventPacketType.connection_status,
timeout=timeout)
# TODO what do we do if the status isn't 'connected'? Retry?
# Raise an exception? Should also check the address matches the
# expected TODO i'm finding that when reconnecting to the same
# MAC, we geta conneciotn status of "disconnected" but that is
# picked up here as "connected", then we don't get anything
# else.
if self._connection_status_flag(
packet['flags'],
constants.connection_status_flag['connected']):
device = BGAPIBLEDevice(
bgapi_address_to_hex(packet['address']),
packet['connection_handle'],
self)
if self._connection_status_flag(
packet['flags'],
constants.connection_status_flag['encrypted']):
device.encrypted = True
self._connections[packet['connection_handle']] = device
log.info("Connected to %s", address)
return device
except ExpectedResponseTimeout:
# If the connection doesn't occur because the device isn't there
# then you should manually stop the command.
#
# If we never get the connection status it is likely that it
# didn't occur because the device isn't there. If that is true
# then we have to manually stop the command.
self._end_procedure()
exc = NotConnectedError()
exc.__cause__ = None
raise exc
def discover_characteristics(self, connection_handle):
att_handle_start = 0x0001 # first valid handle
att_handle_end = 0xFFFF # last valid handle
log.info("Fetching characteristics for connection %d",
connection_handle)
self.send_command(
CommandBuilder.attclient_find_information(
connection_handle, att_handle_start, att_handle_end))
self.expect(ResponsePacketType.attclient_find_information)
self.expect(EventPacketType.attclient_procedure_completed,
timeout=10)
for char_uuid_str, char_obj in (
self._characteristics[connection_handle].items()):
log.info("Characteristic 0x%s is handle 0x%x",
char_uuid_str, char_obj.handle)
for desc_uuid_str, desc_handle in (
char_obj.descriptors.items()):
log.info("Characteristic descriptor 0x%s is handle 0x%x",
desc_uuid_str, desc_handle)
return self._characteristics[connection_handle]
@staticmethod
def _connection_status_flag(flags, flag_to_find):
"""
Is the given flag in the connection status flags?
flags -- the 'flags' parameter returned by ble_evt_connection_status.
flag_to_find -- the flag to look for in flags.
Returns true if flag_to_find is in flags. Returns false otherwise.
"""
return (flags & flag_to_find) == flag_to_find
@staticmethod
def _get_uuid_type(uuid):
"""
Checks if the UUID is a custom 128-bit UUID or a GATT characteristic
descriptor UUID.
uuid -- the UUID as a bytearray.
Return a UUIDType.
"""
if len(uuid) == 16: # 128-bit --> 16 byte
return UUIDType.custom
if uuid in constants.gatt_service_uuid.values():
return UUIDType.service
if uuid in constants.gatt_attribute_type_uuid.values():
return UUIDType.attribute
if uuid in constants.gatt_characteristic_descriptor_uuid.values():
return UUIDType.descriptor
if uuid in constants.gatt_characteristic_type_uuid.values():
return UUIDType.characteristic
log.warn("UUID %s is of unknown type", hexlify(uuid))
return None
def _scan_rsp_data(self, data):
"""
Parse scan response data.
Note: the data will come in a format like the following:
[data_length, data_type, data..., data_length, data_type, data...]
data -- the args['data'] list from _ble_evt_scan_response.
Returns a name and a dictionary containing the parsed data in pairs of
field_name': value.
"""
# Result stored here
data_dict = {
# 'name': value,
}
bytes_left_in_field = 0
field_name = None
field_value = []
# Iterate over data bytes to put in field
dev_name = ""
for b in data:
if bytes_left_in_field == 0:
# New field
bytes_left_in_field = b
field_value = []
else:
field_value.append(b)
bytes_left_in_field -= 1
if bytes_left_in_field == 0:
# End of field
field_name = (
constants.scan_response_data_type[field_value[0]])
field_value = field_value[1:]
# Field type specific formats
if (field_name == 'complete_local_name' or
field_name == 'shortened_local_name'):
dev_name = bytearray(field_value).decode("utf-8")
data_dict[field_name] = dev_name
elif (field_name ==
'complete_list_128-bit_service_class_uuids'):
if len(field_value) % 16 == 0: # 16 bytes
data_dict[field_name] = []
for i in range(0, int(len(field_value) / 16)):
service_uuid = (
"0x%s" %
bgapi_address_to_hex(
field_value[i * 16:i * 16 + 16]))
data_dict[field_name].append(service_uuid)
else:
log.warning("Expected a service class UUID of 16\
bytes. Instead received %d bytes",
len(field_value))
else:
data_dict[field_name] = bytearray(field_value)
return dev_name, data_dict
def expect(self, expected, *args, **kargs):
return self.expect_any([expected], *args, **kargs)
def expect_any(self, expected_packet_choices, timeout=None,
assert_return_success=True):
"""
Process packets until a packet of one of the expected types is found.
expected_packet_choices -- a list of BGLib.PacketType.xxxxx. Upon
processing a packet of a type contained in
the list, this function will return.
timeout -- maximum time in seconds to process packets.
assert_return_success -- raise an exception if the return code from a
matched message is non-zero.
Raises an ExpectedResponseTimeout if one of the expected responses is
not receiving withint the time limit.
"""
timeout = timeout or 1
log.debug("Expecting a response of one of %s within %fs",
expected_packet_choices, timeout or 0)
start_time = None
if timeout is not None:
start_time = time.time()
while True:
packet = None
try:
packet = self._receiver_queue.get(
timeout=self._receive_queue_timeout)
except queue.Empty:
if timeout is not None:
if _timed_out(start_time, timeout):
exc = ExpectedResponseTimeout(
expected_packet_choices, timeout)
exc.__cause__ = None
raise exc
continue
if packet is None:
raise ExpectedResponseTimeout(expected_packet_choices, timeout)
packet_type, response = self._lib.decode_packet(packet)
return_code = response.get('result', 0)
log.debug("Received a %s packet: %s",
packet_type, get_return_message(return_code))
if packet_type in self._packet_handlers:
self._packet_handlers[packet_type](response)
if packet_type in expected_packet_choices:
return packet_type, response
def _receive(self):
"""
Read bytes from serial and enqueue the packets if the packet is not a.
Stops if the self._running event is not set.
"""
log.info("Running receiver")
while self._running.is_set():
packet = self._lib.parse_byte(self._ser.read())
if packet is not None:
packet_type, args = self._lib.decode_packet(packet)
if packet_type == EventPacketType.attclient_attribute_value:
device = self._connections[args['connection_handle']]
device.receive_notification(args['atthandle'],
bytearray(args['value']))
self._receiver_queue.put(packet)
log.info("Stopping receiver")
def _ble_evt_attclient_attribute_value(self, args):
"""
Handles the event for values of characteristics.
args -- dictionary containing the attribute handle ('atthandle'),
attribute type ('type'), and attribute value ('value')
"""
log.debug("attribute handle = %x", args['atthandle'])
log.debug("attribute type = %x", args['type'])
log.debug("attribute value = 0x%s", hexlify(bytearray(args['value'])))
def _ble_evt_attclient_find_information_found(self, args):
"""
Handles the event for characteritic discovery.
Adds the characteristic to the dictionary of characteristics or adds
the descriptor to the dictionary of descriptors in the current
characteristic. These events will be occur in an order similar to the
following:
1) primary service uuid
2) 0 or more descriptors
3) characteristic uuid
4) 0 or more descriptors
5) repeat steps 3-4
args -- dictionary containing the characteristic handle ('chrhandle'),
and characteristic UUID ('uuid')
"""
raw_uuid = bytearray(reversed(args['uuid']))
uuid_type = self._get_uuid_type(raw_uuid)
if uuid_type != UUIDType.custom:
uuid = uuid16_to_uuid(int(
bgapi_address_to_hex(args['uuid']).replace(':', ''), 16))
else:
uuid = UUID(bytes=bytes(raw_uuid))
# TODO is there a way to get the characteristic from the packet instead
# of having to track the "current" characteristic?
if (uuid_type == UUIDType.descriptor and
self._current_characteristic is not None):
self._current_characteristic.add_descriptor(uuid, args['chrhandle'])
elif (uuid_type == UUIDType.custom or
uuid_type == UUIDType.characteristic):
if uuid_type == UUIDType.custom:
log.info("Found custom characteristic %s" % uuid)
elif uuid_type == UUIDType.characteristic:
log.info("Found approved characteristic %s" % uuid)
new_char = Characteristic(uuid, args['chrhandle'])
self._current_characteristic = new_char
self._characteristics[
args['connection_handle']][uuid] = new_char
def _ble_evt_connection_disconnected(self, args):
"""
Handles the event for the termination of a connection.
"""
self._connections.pop(args['connection_handle'], None)
def _ble_evt_connection_status(self, args):
"""
Handles the event for reporting connection status.
args -- dictionary containing the connection status flags ('flags'),
device address ('address'), device address type ('address_type'),
connection interval ('conn_interval'), connection timeout
(timeout'), device latency ('latency'), device bond handle
('bonding')
"""
connection_handle = args['connection_handle']
if not self._connection_status_flag(
args['flags'],
constants.connection_status_flag['connected']):
# Disconnected
self._connections.pop(connection_handle, None)
log.info("Connection status: handle=0x%x, flags=%s, address=0x%s, "
"connection interval=%fms, timeout=%d, "
"latency=%d intervals, bonding=0x%x",
connection_handle,
args['address'],
hexlify(bytearray(args['address'])),
args['conn_interval'] * 1.25,
args['timeout'] * 10,
args['latency'],
args['bonding'])
def _ble_evt_gap_scan_response(self, args):
"""
Handles the event for reporting the contents of an advertising or scan
response packet.
This event will occur during device discovery but not direct connection.
args -- dictionary containing the RSSI value ('rssi'), packet type
('packet_type'), address of packet sender ('sender'), address
type ('address_type'), existing bond handle ('bond'), and
scan resonse data list ('data')
"""
# Parse packet
packet_type = constants.scan_response_packet_type[args['packet_type']]
address = bgapi_address_to_hex(args['sender'])
name, data_dict = self._scan_rsp_data(args['data'])
# Store device information
if address not in self._devices_discovered:
self._devices_discovered[address] = AdvertisingAndScanInfo()
dev = self._devices_discovered[address]
if dev.name == "":
dev.name = name
if dev.address == "":
dev.address = address
if (packet_type not in dev.packet_data or
len(dev.packet_data[packet_type]) < len(data_dict)):
dev.packet_data[packet_type] = data_dict
dev.rssi = args['rssi']
log.debug("Received a scan response from %s with rssi=%d dBM "
"and data=%s", address, args['rssi'], data_dict)
def _ble_evt_sm_bond_status(self, args):
"""
Handles the event for reporting a stored bond.
Adds the stored bond to the list of bond handles.
args -- dictionary containing the bond handle ('bond'), encryption key
size used in the long-term key ('keysize'), was man in the
middle used ('mitm'), keys stored for bonding ('keys')
"""
# Add to list of stored bonds found or set flag
self._stored_bonds.append(args['bond'])
def _ble_rsp_sm_delete_bonding(self, args):
"""
Handles the response for the deletion of a stored bond.
args -- dictionary containing the return code ('result')
"""
result = args['result']
if result == 0:
self._stored_bonds.pop()
return result
def _ble_rsp_sm_get_bonds(self, args):
"""
Handles the response for the start of stored bond enumeration. Sets
self._num_bonds to the number of stored bonds.
args -- dictionary containing the number of stored bonds ('bonds'),
"""
self._num_bonds = args['bonds']
log.debug("num bonds = %d", args['bonds'])