Skip to content

Commit

Permalink
[tools] tcat: extend TCAT command list.
Browse files Browse the repository at this point in the history
Add commands for:
 -WiFi connection management,
 -Perform reboot

Signed-off-by: Marcin Gasiorek <[email protected]>
  • Loading branch information
MarGasiorek committed Mar 25, 2024
1 parent 47894d4 commit 811add6
Show file tree
Hide file tree
Showing 3 changed files with 206 additions and 3 deletions.
10 changes: 9 additions & 1 deletion tools/tcat_ble_client/ble/ble_stream_secure.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,14 @@ async def recv(self, buffersize, timeout=1):

self.incoming.write(data)
while True:
decode = []
try:
decode = self.ssl_object.read(4096)
# try decode all available data
while True:
decode.append(self.ssl_object.read(4096))

if self.incoming.pending <= 0:
break
break
# if recv called before entire message was received from the link
except ssl.SSLWantReadError:
Expand All @@ -113,6 +119,8 @@ async def recv(self, buffersize, timeout=1):
await asyncio.sleep(0.1)
more = await self.ble_stream.recv(buffersize)
self.incoming.write(more)
if len(decode) == 1:
return decode[0]
return decode

async def send_with_resp(self, bytes):
Expand Down
195 changes: 194 additions & 1 deletion tools/tcat_ble_client/cli/base_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from tlv.tcat_tlv import TcatTLVType
from cli.command import Command, CommandResultNone, CommandResultTLV
from dataset.dataset import ThreadDataset
from utils import select_device_by_user_input
from utils import select_device_by_user_input, get_int_in_range
from os import path


Expand Down Expand Up @@ -163,3 +163,196 @@ async def execute_default(self, args, context):
await ble_sstream.do_handshake(hostname=SERVER_COMMON_NAME)
print('Done')
context['ble_sstream'] = ble_sstream


class TbrWiFiScanSubCmd(Command):

def get_help_string(self) -> str:
return 'Find available Wi-Fi access points and connect to one of them.'

def user_confirmation(self):
while True:
try:
userAnswer = str(input('Do you want to continue? [Y]es/[n]o\n> '))
if userAnswer.lower() == 'y' or userAnswer.lower() == 'yes':
return True
elif userAnswer.lower() == 'n' or userAnswer.lower() == 'no':
return False
else:
print('\nTry again.')
except KeyboardInterrupt:
print('\nInterrupted by user.')
return None

def get_password(self, networkName):
while True:
try:
password = str(input(f'\nPassword for \"{networkName}\"\n> '))
if len(password) > 0:
return password
else:
print('\nThe password is too short. Try again.')
except KeyboardInterrupt:
print('\nInterrupted by user.')
return None

async def execute_default(self, args, context):
bless: BleStreamSecure = context['ble_sstream']

print('Wi-Fi scanning...\n')
cmd = TLV(TcatTLVType.APPLICATION.value, bytes('wifi_scan', 'ascii')).to_bytes()

response = await bless.send_with_resp(cmd)
if not response:
return

tlv_response = TLV.from_bytes(response)
if tlv_response.value.decode("ascii").find("RESP_NOT_SUPP") != -1:
print('Command not supported\n')
return
elif tlv_response.value.decode("ascii").find("RESP_FAIL") != -1:
print('Wi-Fi status get fail\n')
return

wifiNetworks = []
wifiNetworksDictKeys = ["id", "ssid", "chan", "band", "rssi", "security"]

while True:
response = await bless.recv(buffersize=4096, timeout=15)
if not response:
return
for _resp in response:
tlv_response = TLV.from_bytes(_resp)
if tlv_response.value.decode("ascii").find("RESP_FAIL") != -1:
print('Wi-Fi status get fail\n')
break
elif tlv_response.value.decode("ascii").find("RESP_OK") != -1:
break
wifiNetworks.append(dict(zip(wifiNetworksDictKeys, tlv_response.value.decode("ascii").split('|'))))
break

if len(wifiNetworks) > 0:
print('Found Wi-Fi networks:\n')
print(f"{'Num' :<4}| {'SSID' :<30}| {'Chan' :<5}| {'Band' :<7}| {'RSSI' :<5}")
for _net in wifiNetworks:
print(f"{_net['id'] :<4}| {_net['ssid'] :<30}| {_net['chan'] :<5}| {_net['band'] :<7}| {_net['rssi'] :<5}")
else:
print('\nNo Wi-Fi networks found.')
return None

print("""\nConnect to Wi-Fi network?\n"""
"""After you continue, the network is saved as default """
"""and TBR automatically connects to this network.""")
conf = self.user_confirmation()
if not conf:
return

print('\nSelect Wi-Fi network number to connect to it')
selected = get_int_in_range(1, len(wifiNetworks))
ssid = wifiNetworks[selected - 1]['ssid']
sec = wifiNetworks[selected - 1]['security']

pwd = self.get_password(ssid)
if pwd is None:
return

print('Store Wi-Fi network...\n')
cmd = TLV(TcatTLVType.APPLICATION.value, bytes('wifi_add ' + ssid + ' ' + pwd + ' ' + sec, 'ascii')).to_bytes()
response = await bless.send_with_resp(cmd)

if not response:
return

tlv_response = TLV.from_bytes(response)
if tlv_response.value.decode("ascii").find("RESP_FAIL") != -1:
print('\nConnection fail\n')
print('Wi-Fi network is saved')
return


class TbrWiFiStatusSubCmd(Command):

def get_help_string(self) -> str:
return 'Get Wi-Fi connection status for TBR.'

async def execute_default(self, args, context):
bless: BleStreamSecure = context['ble_sstream']

print('Wi-Fi status get...\n')
command = TLV(TcatTLVType.APPLICATION.value, bytes('wifi_status', 'ascii')).to_bytes()
await bless.send(command)

wifiStatus = {}
wifiStatusDictKeys = ['state', 'ssid', 'rssi']

while True:
response = await bless.recv(buffersize=4096, timeout=15)
if not response:
return
for _resp in response:
tlv_response = TLV.from_bytes(_resp)
if tlv_response.value.decode("ascii").find("RESP_NOT_SUPP") != -1:
print('Command not supported\n')
break
elif tlv_response.value.decode("ascii").find("RESP_FAIL") != -1:
print('Wi-Fi status get fail\n')
break
elif tlv_response.value.decode("ascii").find("RESP_OK") != -1:
break
wifiStatus = dict(zip(wifiStatusDictKeys, tlv_response.value.decode("ascii").split('|')))
break

if 'state' in wifiStatus:
if wifiStatus['state'] == 'COMPLETED':
print(f'\tWi-Fi connected to: \"{wifiStatus["ssid"]}\" [RSSI: {wifiStatus["rssi"]}]')
else:
print(f'\tWi-Fi {wifiStatus["state"].lower()}')

return


class TbrWiFiCommand(Command):

def __init__(self):
self._subcommands = {'scan': TbrWiFiScanSubCmd(), 'status': TbrWiFiStatusSubCmd()}

def get_help_string(self) -> str:
return 'Manage Wi-Fi network connection for TBR.'

async def execute_default(self, args, context):
print('Invalid usage. Provide a subcommand.')
return CommandResultNone()


class TbrRebootSubCmd(Command):

def get_help_string(self) -> str:
return 'Reboot the TBR.'

async def execute_default(self, args, context):
bless: BleStreamSecure = context['ble_sstream']

cmd = TLV(TcatTLVType.APPLICATION.value, bytes('reboot', 'ascii')).to_bytes()

response = await bless.send_with_resp(cmd)
if not response:
return

tlv_response = TLV.from_bytes(response)
if tlv_response.value.decode("ascii").find("RESP_FAIL") != -1:
print('Command execution fail\n')

return CommandResultTLV(tlv_response)


class TbrCommand(Command):

def __init__(self):
self._subcommands = {'reboot': TbrRebootSubCmd()}

def get_help_string(self) -> str:
return 'Manage TBR.'

async def execute_default(self, args, context):
print('Invalid usage. Provide a subcommand.')
return CommandResultNone()
4 changes: 3 additions & 1 deletion tools/tcat_ble_client/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import readline
import shlex
from ble.ble_stream_secure import BleStreamSecure
from cli.base_commands import (HelpCommand, HelloCommand, CommissionCommand, ThreadStateCommand, ScanCommand)
from cli.base_commands import (HelpCommand, HelloCommand, CommissionCommand, ThreadStateCommand, ScanCommand, TbrWiFiCommand, TbrCommand)
from cli.dataset_commands import (DatasetCommand)
from dataset.dataset import ThreadDataset
from typing import Optional
Expand All @@ -45,6 +45,8 @@ def __init__(self, dataset: ThreadDataset, ble_sstream: Optional[BleStreamSecure
'dataset': DatasetCommand(),
'thread': ThreadStateCommand(),
'scan': ScanCommand(),
'wifi': TbrWiFiCommand(),
'tbr' : TbrCommand(),
}
self._context = {'ble_sstream': ble_sstream, 'dataset': dataset, 'commands': self._commands}
readline.set_completer(self.completer)
Expand Down

0 comments on commit 811add6

Please sign in to comment.