From cd48f7d5e443d46c25460c7bbef9ad9d208fe961 Mon Sep 17 00:00:00 2001 From: kevvz <92408564+kevvz@users.noreply.github.com> Date: Fri, 27 Sep 2024 17:02:30 -0700 Subject: [PATCH] rebased storage files --- Friend/firmware/firmware_v1.0/src/storage.c | 70 ++++-- Friend/firmware/firmware_v1.0/src/transport.c | 54 +++-- app/lib/backend/preferences.dart | 8 + .../memories/widgets/processing_capture.dart | 32 +++ app/lib/pages/sdcard/page.dart | 111 +++++++++ app/lib/pages/settings/page.dart | 7 + app/lib/providers/capture_provider.dart | 165 +++++++++++-- .../services/devices/device_connection.dart | 6 +- .../services/devices/frame_connection.dart | 2 +- .../services/devices/friend_connection.dart | 13 +- app/lib/services/sockets/sdcard_socket.dart | 139 +++++++++++ backend/models/memory.py | 1 + backend/routers/sdcard.py | 228 +++++++++++------- 13 files changed, 677 insertions(+), 159 deletions(-) create mode 100644 app/lib/pages/sdcard/page.dart create mode 100644 app/lib/services/sockets/sdcard_socket.dart diff --git a/Friend/firmware/firmware_v1.0/src/storage.c b/Friend/firmware/firmware_v1.0/src/storage.c index 473e02b43..a7e26172f 100644 --- a/Friend/firmware/firmware_v1.0/src/storage.c +++ b/Friend/firmware/firmware_v1.0/src/storage.c @@ -22,6 +22,7 @@ LOG_MODULE_REGISTER(storage, CONFIG_LOG_DEFAULT_LEVEL); #define READ_COMMAND 0 #define DELETE_COMMAND 1 #define NUKE 2 +#define STOP_COMMAND 3 #define INVALID_FILE_SIZE 3 #define ZERO_FILE_SIZE 4 @@ -34,7 +35,7 @@ static struct bt_uuid_128 storage_write_uuid = BT_UUID_INIT_128(BT_UUID_128_ENCO static struct bt_uuid_128 storage_read_uuid = BT_UUID_INIT_128(BT_UUID_128_ENCODE(0x30295782, 0x4301, 0xEABD, 0x2904, 0x2849ADFEAE43)); static ssize_t storage_read_characteristic(struct bt_conn *conn, const struct bt_gatt_attr *attr, void *buf, uint16_t len, uint16_t offset); -K_THREAD_STACK_DEFINE(storage_stack, 2048); +K_THREAD_STACK_DEFINE(storage_stack, 4096); static struct k_thread storage_thread; extern uint8_t file_count; @@ -50,12 +51,13 @@ static struct bt_gatt_attr storage_service_attr[] = { }; -static struct bt_gatt_service storage_service = BT_GATT_SERVICE(storage_service_attr); +struct bt_gatt_service storage_service = BT_GATT_SERVICE(storage_service_attr); bool storage_is_on = false; -static void storage_config_changed_handler(const struct bt_gatt_attr *attr, uint16_t value) { +static void storage_config_changed_handler(const struct bt_gatt_attr *attr, uint16_t value) +{ storage_is_on = true; if (value == BT_GATT_CCC_NOTIFY) @@ -73,15 +75,14 @@ static void storage_config_changed_handler(const struct bt_gatt_attr *attr, uint } -static ssize_t storage_read_characteristic(struct bt_conn *conn, const struct bt_gatt_attr *attr, void *buf, uint16_t len, uint16_t offset) { +static ssize_t storage_read_characteristic(struct bt_conn *conn, const struct bt_gatt_attr *attr, void *buf, uint16_t len, uint16_t offset) +{ k_msleep(10); - // char amount[1] = {file_count}; uint32_t amount[50] = {0}; for (int i = 0; i < file_count; i++) { amount[i] = file_num_array[i]; } - - ssize_t result = bt_gatt_attr_read(conn, attr, buf, len, offset, amount, file_count * sizeof(uint32_t)); + ssize_t result = bt_gatt_attr_read(conn, attr, buf, len, offset, amount, 1 * sizeof(uint32_t)); return result; } @@ -96,7 +97,7 @@ static uint32_t offset = 0; static uint8_t index = 0; static uint8_t current_packet_size = 0; static uint8_t tx_buffer_size = 0; - +static uint8_t stop_started = 0; static uint8_t delete_started = 0; static uint8_t current_read_num = 1; uint32_t remaining_length = 0; @@ -190,6 +191,11 @@ static uint8_t parse_storage_command(void *buf,uint16_t len) { else if (command == NUKE) { nuke_started = 1; } + else if (command == STOP_COMMAND) + { + remaining_length = 80; + stop_started = 1; + } else { LOG_INF("invalid command \n"); return 6; @@ -245,6 +251,7 @@ static void write_to_gatt(struct bt_conn *conn) { void storage_write(void) { while (1) { + struct bt_conn *conn = get_current_connection(); if ( transport_started ) { LOG_INF("transpor started in side : %d",transport_started); @@ -259,9 +266,13 @@ void storage_write(void) { if (err) { printk("error clearing\n"); } - else{ - uint8_t result_buffer[1] = {100}; - bt_gatt_notify(get_current_connection(), &storage_service.attrs[1], &result_buffer,1); + else + { + uint8_t result_buffer[1] = {200}; + if (conn) + { + bt_gatt_notify(get_current_connection(), &storage_service.attrs[1], &result_buffer,1); + } } delete_started = 0; k_msleep(10); @@ -270,27 +281,42 @@ void storage_write(void) { clear_audio_directory(); nuke_started = 0; } + if (stop_started) + { + remaining_length = 0; + stop_started = 0; + + } if(remaining_length > 0 ) { - struct bt_conn *conn = get_current_connection(); + if (conn == NULL) { LOG_ERR("invalid connection"); + remaining_length = 0; k_yield(); } write_to_gatt(conn); transport_started = 0; - if (remaining_length == 0) { - LOG_INF("done. attempting to download more files\n"); - uint8_t stop_result[1] = {100}; - int err = bt_gatt_notify(conn, &storage_service.attrs[1], &stop_result,1); - k_sleep(K_MSEC(10)); - - } - - } - k_yield(); + if (remaining_length == 0 ) + { + if(stop_started) + { + stop_started = 0; + } + else + { + printk("done. attempting to download more files\n"); + uint8_t stop_result[1] = {100}; + int err = bt_gatt_notify(conn, &storage_service.attrs[1], &stop_result,1); + k_sleep(K_MSEC(10)); + } + + } + } + k_yield(); + } } diff --git a/Friend/firmware/firmware_v1.0/src/transport.c b/Friend/firmware/firmware_v1.0/src/transport.c index 0edb42c24..c56756c5c 100644 --- a/Friend/firmware/firmware_v1.0/src/transport.c +++ b/Friend/firmware/firmware_v1.0/src/transport.c @@ -609,8 +609,13 @@ bool write_to_storage(void) { static bool use_storage = true; #define MAX_FILES 10 #define MAX_AUDIO_FILE_SIZE 300000 +static int recent_file_size_updated = 0; - + void update_file_size() + { + file_num_array[0] = get_file_size(1); + printk("file size for file count %d %d\n",file_count,file_num_array[0]); + } void pusher(void) { @@ -624,7 +629,22 @@ void pusher(void) // struct bt_conn *conn = current_connection; - // bool use_gatt = true; + bool use_gatt = true; + //updating the most recent file size is expensive! + static bool file_size_updated = true; + static bool connection_was_true = false; + if (conn && !connection_was_true) { + k_msleep(100); + file_size_updated = false; + connection_was_true = true; + } else if (!conn) { + connection_was_true = false; + } + if (!file_size_updated) { + printk("updating file size\n"); + update_file_size(); + file_size_updated = true; + } if (conn) { conn = bt_conn_ref(conn); @@ -643,21 +663,19 @@ void pusher(void) valid = bt_gatt_is_subscribed(conn, &audio_service.attrs[1], BT_GATT_CCC_NOTIFY); // Check if subscribed } - if (!valid && !storage_is_on) { - - bool result = write_to_storage(); - // file_num_array[file_count-1] = get_file_size(file_count); - // printk("file size for file count %d %d\n",file_count,file_num_array[file_count-1]); - if (result) + if (!valid && !storage_is_on) { - // if (get_file_size(9) > MAX_AUDIO_FILE_SIZE) { - // printk("Audio file size limit reached, making new file\n"); - // // make_and_rebase_audio_file(get_info_file_length()+1); - // } - } - else { - k_sleep(K_MSEC(10)); - } + + bool result = write_to_storage(); + + if (result) + { + + } + else + { + k_sleep(K_MSEC(10)); + } } if (valid) @@ -678,7 +696,7 @@ void pusher(void) k_yield(); } } - +extern struct bt_gatt_service storage_service; // @@ -726,7 +744,7 @@ play_boot_sound(); #endif // Start advertising - + bt_gatt_service_register(&storage_service); bt_gatt_service_register(&audio_service); bt_gatt_service_register(&dfu_service); memset(storage_temp_data, 0, OPUS_PADDED_LENGTH * 4); diff --git a/app/lib/backend/preferences.dart b/app/lib/backend/preferences.dart index c5a1ac22a..c0e1c5606 100644 --- a/app/lib/backend/preferences.dart +++ b/app/lib/backend/preferences.dart @@ -402,4 +402,12 @@ class SharedPreferencesUtil { set locationPermissionRequested(bool value) => saveBool('locationPermissionRequested', value); bool get locationPermissionRequested => getBool('locationPermissionRequested') ?? false; + + int get currentStorageBytes => getInt('currentStorageBytes') ?? 0; + + set currentStorageBytes(int value) => saveInt('currentStorageBytes', value); + + int get previousStorageBytes => getInt('previousStorageBytes') ?? 0; + + set previousStorageBytes(int value) => saveInt('previousStorageBytes', value); } diff --git a/app/lib/pages/memories/widgets/processing_capture.dart b/app/lib/pages/memories/widgets/processing_capture.dart index 9526e9027..6228050c9 100644 --- a/app/lib/pages/memories/widgets/processing_capture.dart +++ b/app/lib/pages/memories/widgets/processing_capture.dart @@ -10,6 +10,7 @@ import 'package:friend_private/providers/device_provider.dart'; import 'package:friend_private/utils/analytics/mixpanel.dart'; import 'package:friend_private/utils/enums.dart'; import 'package:friend_private/utils/other/temp.dart'; +import 'package:friend_private/pages/sdcard/page.dart'; import 'package:friend_private/widgets/dialog.dart'; import 'package:provider/provider.dart'; @@ -39,6 +40,37 @@ class _MemoryCaptureWidgetState extends State { provider.recordingState == RecordingState.record || (provider.memoryCreating && deviceProvider.connectedDevice != null); + var storageBytes = provider.timeToSend ?? 1; + var totalTimeSent = provider.timeAlreadySent ?? 1; + var totalTimeRemaining = storageBytes - totalTimeSent; + String totalTimeRemainingString = totalTimeRemaining.toStringAsFixed(2); + + if(provider.storageIsReady) { + var banner = 'You have ' + totalTimeRemainingString + ' seconds of Storage Remaining. Click here to see'; + Future.delayed(Duration.zero, () { + ScaffoldMessenger.of(context).hideCurrentMaterialBanner(); + ScaffoldMessenger.of(context).showMaterialBanner( + MaterialBanner( + content: Text(banner), + backgroundColor: Colors.green, + actions: [ + TextButton( + onPressed: () { + ScaffoldMessenger.of(context).hideCurrentMaterialBanner(); + routeToPage(context, SdCardCapturePage()); + }, + child: const Text('Click here'), + ), + ], + onVisible: () => Future.delayed(const Duration(seconds: 15), () { + ScaffoldMessenger.of(context).hideCurrentMaterialBanner(); + }), + ), + ); + }); + provider.setStorageIsReady(false); + } + return (showPhoneMic || isConnected) ? GestureDetector( onTap: () async { diff --git a/app/lib/pages/sdcard/page.dart b/app/lib/pages/sdcard/page.dart new file mode 100644 index 000000000..852f44739 --- /dev/null +++ b/app/lib/pages/sdcard/page.dart @@ -0,0 +1,111 @@ +import 'dart:io'; + +import 'package:flutter/material.dart'; +import 'package:upgrader/upgrader.dart'; +import 'package:provider/provider.dart'; +import 'package:gradient_borders/gradient_borders.dart'; +import 'package:friend_private/providers/device_provider.dart'; +import 'package:friend_private/providers/capture_provider.dart'; +import 'package:friend_private/utils/other/temp.dart'; +class SdCardCapturePage extends StatefulWidget { + + const SdCardCapturePage({ + super.key + }); + + @override + State createState() => _SdCardCapturePageState(); + +} + +class _SdCardCapturePageState extends State { + late String _displayText; + @override + void initState() { + _displayText = 'hello there'; + super.initState(); + } + + @override + Widget build(BuildContext context) { + + return Consumer2(builder: (context, provider, deviceProvider, child) { + + var connectedDevice = deviceProvider.connectedDevice; + var totalStorageBytes = provider.totalStorageFileBytes ?? 1; // Avoid division by zero + var totalReceivedBytes = provider.totalBytesReceived ?? 1; + + var storageBytes = provider.timeToSend ?? 1; + var totalTimeSent = provider.timeAlreadySent ?? 1; + var totalTimeRemaining = storageBytes - totalTimeSent; + String totalTimeRemainingString = totalTimeRemaining.toStringAsFixed(2); + String percentRemaining = (totalReceivedBytes / totalStorageBytes * 100).toStringAsFixed(2); + _displayText = 'about ' + totalTimeRemainingString + ' seconds remaining\n' + percentRemaining + '% there'; + if (provider.isDone) { + _displayText = 'Done! Check back later for your memories.'; + } + // if(provider.sendNotification) { + // provider.sendNotification = false; + // } + + return Scaffold( + backgroundColor: Theme.of(context).colorScheme.primary, + appBar: AppBar( + backgroundColor: Theme.of(context).colorScheme.primary, + automaticallyImplyLeading: true, + title: const Text('SD Card'), + centerTitle: true, + leading: IconButton( + icon: const Icon(Icons.arrow_back_ios_new), + onPressed: () { + Navigator.pop(context); + }, + ), + elevation: 0, + ), + body: Center( + child: Column( + mainAxisAlignment: MainAxisAlignment.center, + + children: [ + LinearProgressIndicator( + value: totalTimeSent / storageBytes, + backgroundColor: Colors.grey, + color: Colors.green, + minHeight: 10, + ), + const SizedBox(height: 20), + Center( + child: Text(_displayText ?? 'Default Text'), + ), + const SizedBox(height: 20), + ElevatedButton( + style: ElevatedButton.styleFrom( + backgroundColor: Colors.white, + ), + onPressed: () { + setState(() { + _displayText = 'about' + totalTimeRemainingString + ' seconds remaining, about ' + percentRemaining + '% there'; + }); + // if (provider.totalStorageFileBytes == 0) { + // } + if (!provider.sdCardIsDownloading) { + provider.sendStorage(deviceProvider.connectedDevice!.id); + provider.setSdCardIsDownloading(true); + } + }, + child: const Text('Click to starting Importing Memories'), + ), + const SizedBox(height: 20), + const Text('This download may take a while. Exiting this while the download is in progress will halt all progress and some memories may be lost.'), + const SizedBox(height: 20), + const Text('Please ensure that you have good internet connection.'), + ], + ), + ), + ); + + }); + } + +} \ No newline at end of file diff --git a/app/lib/pages/settings/page.dart b/app/lib/pages/settings/page.dart index 29f458239..5c83cb439 100644 --- a/app/lib/pages/settings/page.dart +++ b/app/lib/pages/settings/page.dart @@ -4,6 +4,7 @@ import 'package:friend_private/backend/preferences.dart'; import 'package:friend_private/main.dart'; import 'package:friend_private/pages/home/support.dart'; import 'package:friend_private/pages/plugins/page.dart'; +import 'package:friend_private/pages/sdcard/page.dart'; import 'package:friend_private/pages/settings/about.dart'; import 'package:friend_private/pages/settings/calendar.dart'; import 'package:friend_private/pages/settings/developer.dart'; @@ -107,6 +108,12 @@ class _SettingsPageState extends State { icon: Icons.person, ), const SizedBox(height: 20), + getItemAddOn2( + 'SD Card Import', + () => routeToPage(context, const SdCardCapturePage()), + icon: Icons.sd_card, + ), + const SizedBox(height: 8), getItemAddOn2( 'Device Settings', () { diff --git a/app/lib/providers/capture_provider.dart b/app/lib/providers/capture_provider.dart index 9afd66a3e..8e9c90fd2 100644 --- a/app/lib/providers/capture_provider.dart +++ b/app/lib/providers/capture_provider.dart @@ -3,6 +3,7 @@ import 'dart:convert'; import 'dart:io'; import 'dart:math'; +import 'package:web_socket_channel/io.dart'; import 'package:collection/collection.dart'; import 'package:flutter/material.dart'; import 'package:flutter_foreground_task/flutter_foreground_task.dart'; @@ -21,7 +22,9 @@ import 'package:friend_private/pages/capture/logic/openglass_mixin.dart'; import 'package:friend_private/providers/memory_provider.dart'; import 'package:friend_private/providers/message_provider.dart'; import 'package:friend_private/services/services.dart'; +import 'package:friend_private/services/notifications.dart'; import 'package:friend_private/services/sockets/transcription_connection.dart'; +import 'package:friend_private/services/sockets/sdcard_socket.dart'; import 'package:friend_private/utils/analytics/growthbook.dart'; import 'package:friend_private/utils/analytics/mixpanel.dart'; import 'package:friend_private/utils/audio/wav_bytes.dart'; @@ -40,7 +43,7 @@ class CaptureProvider extends ChangeNotifier MemoryProvider? memoryProvider; MessageProvider? messageProvider; TranscripSegmentSocketService? _socket; - + SdCardSocketService sdCardSocket = SdCardSocketService(); Timer? _keepAliveTimer; void updateProviderInstances(MemoryProvider? mp, MessageProvider? p) { @@ -93,7 +96,28 @@ class CaptureProvider extends ChangeNotifier String? processingMemoryId; - String dateTimeStorageString = ""; + int totalStorageFileBytes = 0; + int totalBytesReceived = 0; + double timeToSend = 0.0; + double timeAlreadySent = 0.0; + bool isDone = false; + bool storageIsReady = false; + bool sdCardIsDownloading = false; + bool sendNotification = false; + String btConnectedTime = ""; + String dateTimeStorageString = ""; + + + + void setIsDone(bool value) { + isDone = value; + notifyListeners(); + } + + void setSdCardIsDownloading(bool value) { + sdCardIsDownloading = value; + notifyListeners(); + } void setHasTranscripts(bool value) { hasTranscripts = value; @@ -413,15 +437,29 @@ class CaptureProvider extends ChangeNotifier } Future sendStorage(String id) async { - storageUtil = StorageBytesUtil(); - + // storageUtil = StorageBytesUtil(); if (_storageStream != null) { _storageStream?.cancel(); } + if (totalStorageFileBytes == 0) { + return; + } + if (sdCardSocket.sdCardConnectionState != WebsocketConnectionStatus.connected) { + await sdCardSocket.setupSdCardWebSocket( + onMessageReceived: () { + debugPrint('onMessageReceived'); + memoryProvider?.getMoreMemoriesFromServer(); + _notifySdCardComplete(); + + return; + }, + btConnectedTime: btConnectedTime, + ); + } + // debugPrint('sd card connection state: ${sdCardSocketService?.sdCardConnectionState}'); _storageStream = await _getBleStorageBytesListener(id, onStorageBytesReceived: (List value) async { if (value.isEmpty) return; - storageUtil!.storeFrameStoragePacket(value); if (value.length == 1) { //result codes i guess debugPrint('returned $value'); @@ -435,42 +473,87 @@ class CaptureProvider extends ChangeNotifier } else if (value[0] == 4) { //file size is zero. debugPrint('file size is zero. going to next one....'); - getFileFromDevice(storageUtil.getFileNum() + 1); + // getFileFromDevice(storageUtil.getFileNum() + 1); } else if (value[0] == 100) { //valid end command + + isDone = true; + sdCardIsDownloading = false; debugPrint('done. sending to backend....trying to dl more'); - File storageFile = (await storageUtil.createWavFile(removeLastNSeconds: 0)).item1; - List result = await sendStorageToBackend(storageFile, dateTimeStorageString); - for (ServerMemory memory in result) { - memoryProvider?.addMemory(memory); - } + + sdCardSocket?.sdCardChannel?.sink.add(value); //replace + storageUtil.clearAudioBytes(); - //clear the file to indicate completion + SharedPreferencesUtil().currentStorageBytes = 0; + SharedPreferencesUtil().previousStorageBytes = 0; clearFileFromDevice(storageUtil.getFileNum()); - getFileFromDevice(storageUtil.getFileNum() + 1); + } else { //bad bit debugPrint('Error bit returned'); } } + else if (value.length == 83) { + totalBytesReceived += 80; + // storageUtil!.storeFrameStoragePacket(value); + if (sdCardSocket.sdCardConnectionState != WebsocketConnectionStatus.connected) { + debugPrint('websocket provider state: ${sdCardSocket.sdCardConnectionState}'); + //means we are disconnected, stop all transmission. attempt reconnection + if (!sdCardIsDownloading) + { + return; + } + pauseFileFromDevice(storageUtil.getFileNum()); + debugPrint('paused file from device'); + sdCardIsDownloading = false; + return; + + } + + sdCardSocket?.sdCardChannel?.sink.add(value); + timeAlreadySent = ( (totalBytesReceived.toDouble() / 80.0) / 100.0 ) * 2.2 ; + SharedPreferencesUtil().currentStorageBytes = totalBytesReceived; + } + notifyListeners(); }); - getFileFromDevice(storageUtil.getFileNum()); + getFileFromDevice(storageUtil.getFileNum(),totalBytesReceived); // notifyListeners(); } - Future getFileFromDevice(int fileNum) async { + Future getFileFromDevice(int fileNum,int offset) async { storageUtil.fileNum = fileNum; int command = 0; - _writeToStorage(_recordingDevice!.id, storageUtil.fileNum, command); + _writeToStorage(_recordingDevice!.id, storageUtil.fileNum, command,offset); } Future clearFileFromDevice(int fileNum) async { storageUtil.fileNum = fileNum; int command = 1; - _writeToStorage(_recordingDevice!.id, storageUtil.fileNum, command); + _writeToStorage(_recordingDevice!.id, storageUtil.fileNum, command,0); } + Future pauseFileFromDevice(int fileNum) async { + storageUtil.fileNum = fileNum; + int command = 3; + _writeToStorage(_recordingDevice!.id, storageUtil.fileNum, command,0); + } + + void _notifySdCardComplete() { + + NotificationService.instance.clearNotification(8); + NotificationService.instance.createNotification( + notificationId: 8, + title: 'Sd Card Processing Complete', + body: 'Your Sd Card data is now processed! Enter the app to see.', + ); + } + + void setStorageIsReady(bool value) { + storageIsReady = value; + notifyListeners(); + } + void clearTranscripts() { segments = []; setHasTranscripts(false); @@ -498,7 +581,7 @@ class CaptureProvider extends ChangeNotifier await startOpenGlass(); await _initiateFriendAudioStreaming(); // TODO: Commenting this for now as DevKit 2 is not yet used in production - // await initiateStorageBytesStreaming(); + await initiateStorageBytesStreaming(); notifyListeners(); } @@ -551,12 +634,12 @@ class CaptureProvider extends ChangeNotifier return connection.getBleAudioBytesListener(onAudioBytesReceived: onAudioBytesReceived); } - Future _writeToStorage(String deviceId, int numFile, int command) async { + Future _writeToStorage(String deviceId, int numFile, int command,int offset) async { var connection = await ServiceManager.instance().device.ensureConnection(deviceId); if (connection == null) { return Future.value(false); } - return connection.writeToStorage(numFile, command); + return connection.writeToStorage(numFile, command,offset); } Future> _getStorageList(String deviceId) async { @@ -622,12 +705,50 @@ class CaptureProvider extends ChangeNotifier notifyListeners(); } - Future initiateStorageBytesStreaming() async { +Future initiateStorageBytesStreaming() async { debugPrint('initiateStorageBytesStreaming'); + if (_recordingDevice == null) return; currentStorageFiles = await _getStorageList(_recordingDevice!.id); + if (currentStorageFiles.isEmpty) { + debugPrint('No storage files found'); + return; + } debugPrint('Storage files: $currentStorageFiles'); - await sendStorage(_recordingDevice!.id); + totalStorageFileBytes = currentStorageFiles.fold(0, (sum, fileSize) => sum + fileSize); + var previousStorageBytes = SharedPreferencesUtil().previousStorageBytes; + // SharedPreferencesUtil().previousStorageBytes = totalStorageFileBytes; + //check if new or old file + if (totalStorageFileBytes < previousStorageBytes) { + totalBytesReceived = 0; + SharedPreferencesUtil().currentStorageBytes = 0; + } + else { + totalBytesReceived = SharedPreferencesUtil().currentStorageBytes; + } + if (totalBytesReceived > totalStorageFileBytes) { + totalBytesReceived = 0; + } + SharedPreferencesUtil().previousStorageBytes = totalStorageFileBytes; + timeToSend = ( (totalStorageFileBytes.toDouble() / 80.0) / 100.0 ) * 2.2; + + debugPrint('totalBytesReceived in initiateStorageBytesStreaming: $totalBytesReceived'); + debugPrint('previousStorageBytes in initiateStorageBytesStreaming: $previousStorageBytes'); + btConnectedTime = DateTime.now().toUtc().toString(); + sdCardSocket.setupSdCardWebSocket( //replace + onMessageReceived: () { + debugPrint('onMessageReceived'); + memoryProvider?.getMemoriesFromServer(); + _notifySdCardComplete(); + return; + }, + btConnectedTime: btConnectedTime, + + ); + + if (totalStorageFileBytes > 0) { + storageIsReady = true; + } notifyListeners(); } diff --git a/app/lib/services/devices/device_connection.dart b/app/lib/services/devices/device_connection.dart index aea17630e..dd9a460e1 100644 --- a/app/lib/services/devices/device_connection.dart +++ b/app/lib/services/devices/device_connection.dart @@ -199,11 +199,11 @@ abstract class DeviceConnection { Future> performGetStorageList(); - Future performWriteToStorage(int numFile, int command); + Future performWriteToStorage(int numFile, int command,int offset); - Future writeToStorage(int numFile, int command) async { + Future writeToStorage(int numFile, int command,int offset) async { if (await isConnected()) { - return await performWriteToStorage(numFile, command); + return await performWriteToStorage(numFile, command,offset); } _showDeviceDisconnectedNotification(); return Future.value(false); diff --git a/app/lib/services/devices/frame_connection.dart b/app/lib/services/devices/frame_connection.dart index 94c33968b..2febd698b 100644 --- a/app/lib/services/devices/frame_connection.dart +++ b/app/lib/services/devices/frame_connection.dart @@ -443,7 +443,7 @@ class FrameDeviceConnection extends DeviceConnection { } @override - Future performWriteToStorage(int numFile, int command) { + Future performWriteToStorage(int numFile, int command,int offset) { return Future.value(false); } } diff --git a/app/lib/services/devices/friend_connection.dart b/app/lib/services/devices/friend_connection.dart index 986ce0032..b517d55a7 100644 --- a/app/lib/services/devices/friend_connection.dart +++ b/app/lib/services/devices/friend_connection.dart @@ -294,7 +294,7 @@ class FriendDeviceConnection extends DeviceConnection { return listener; } - Future performWriteToStorage(int numFile, int command) async { + Future performWriteToStorage(int numFile, int command,int offset) async { if (_storageService == null) { logServiceNotFoundError('Storage Write', deviceId); return false; @@ -307,7 +307,16 @@ class FriendDeviceConnection extends DeviceConnection { } debugPrint('About to write to storage bytes'); debugPrint('about to send $numFile'); - await storageDataStreamCharacteristic.write([command & 0xFF, numFile & 0xFF]); + debugPrint('about to send $command'); + debugPrint('about to send offset$offset'); + var offsetBytes = [ + (offset >> 24) & 0xFF, + (offset >> 16) & 0xFF, + (offset >> 8) & 0xFF, + offset & 0xFF, + ]; + + await storageDataStreamCharacteristic.write([command & 0xFF,numFile & 0xFF,offsetBytes[0],offsetBytes[1],offsetBytes[2],offsetBytes[3]]); return true; } // Future> performGetStorageList(); diff --git a/app/lib/services/sockets/sdcard_socket.dart b/app/lib/services/sockets/sdcard_socket.dart new file mode 100644 index 000000000..23bf3626b --- /dev/null +++ b/app/lib/services/sockets/sdcard_socket.dart @@ -0,0 +1,139 @@ +import 'dart:async'; +import 'dart:convert'; + +import 'package:flutter/material.dart'; +import 'package:friend_private/backend/preferences.dart'; +import 'package:friend_private/backend/schema/bt_device.dart'; +import 'package:friend_private/backend/schema/message_event.dart'; +import 'package:friend_private/backend/schema/transcript_segment.dart'; +import 'package:friend_private/env/env.dart'; +import 'package:instabug_flutter/instabug_flutter.dart'; +import 'package:web_socket_channel/io.dart'; + +enum WebsocketConnectionStatus { notConnected, connected, failed, closed, error } + + + +class SdCardSocketService { + + IOWebSocketChannel? sdCardChannel; + WebsocketConnectionStatus sdCardConnectionState = WebsocketConnectionStatus.notConnected; + + SdCardSocketService(); + +Future setupSdCardWebSocket({required Function onMessageReceived, String? btConnectedTime}) async { + // IOWebSocketChannel? sdCardChannel; + try { + sdCardChannel = await openSdCardStream( + onMessageReceived: onMessageReceived, + onWebsocketConnectionSuccess: () { + sdCardConnectionState = WebsocketConnectionStatus.connected; + debugPrint('WebSocket connected successfully sd'); + // notifyListeners(); + }, + onWebsocketConnectionFailed: (err) { + sdCardConnectionState = WebsocketConnectionStatus.failed; + //reconnectSdCardWebSocket(onMessageReceived: onMessageReceived); + debugPrint('WebSocket connection failed sd: $err'); + // notifyListeners(); + }, + onWebsocketConnectionClosed: (int? closeCode, String? closeReason) { + sdCardConnectionState = WebsocketConnectionStatus.closed; + // //reconnectSdCardWebSocket(onMessageReceived: onMessageReceived); + debugPrint('WebSocket connection closed2 sd: code ~ $closeCode, reason ~ $closeReason'); + // notifyListeners(); + }, + onWebsocketConnectionError: (err) { + sdCardConnectionState = WebsocketConnectionStatus.error; + //reconnectSdCardWebSocket(onMessageReceived: onMessageReceived); + debugPrint('WebSocket connection error sd: $err'); + // notifyListeners(); + }, + btConnectedTime: btConnectedTime, + ); + } catch (e) { + debugPrint('Error in initWebSocket sd: $e'); + + // notifyListeners(); + } + + } + +Future openSdCardStream({ + required VoidCallback onWebsocketConnectionSuccess, + required void Function(dynamic) onWebsocketConnectionFailed, + required void Function(int?, String?) onWebsocketConnectionClosed, + required void Function(dynamic) onWebsocketConnectionError, + required Function onMessageReceived, + String? btConnectedTime, +}) async { + debugPrint('Websocket Opening sd card'); + final recordingsLanguage = SharedPreferencesUtil().recordingsLanguage; + // var params = '?language=$recordingsLanguage&sample_rate=$sampleRate&codec=$codec&uid=${SharedPreferencesUtil().uid}' + // '&include_speech_profile=$includeSpeechProfile&new_memory_watch=$newMemoryWatch&stt_service=${SharedPreferencesUtil().transcriptionModel}'; + var params = '?uid=${SharedPreferencesUtil().uid}&bt_connected_time=$btConnectedTime'; + debugPrint('btConnectedTime: $btConnectedTime'); + IOWebSocketChannel channel = IOWebSocketChannel.connect( + Uri.parse('${Env.apiBaseUrl!.replaceAll('https', 'wss')}sdcard_stream$params'), + // headers: {'Authorization': await getAuthHeader()}, + ); + + await channel.ready.then((v) { + channel.stream.listen( + (event) { + debugPrint('sdcard stream event'); + if (event == 'ping') return; + + final jsonEvent = jsonDecode(event); + + // segment + if (jsonEvent is List) { + var segments = jsonEvent; + if (segments.isEmpty) return; + // onMessageReceived(segments.map((e) => TranscriptSegment.fromJson(e)).toList()); + return; + } + + // debugPrint(event); + + // object message event + if (jsonEvent.containsKey("type")) { + var messageEvent = ServerMessageEvent.fromJson(jsonEvent); + onMessageReceived(); + // if (onMessageEventReceived != null) { + // // onMessageEventReceived(messageEvent); + // return; + // } + } + + debugPrint(event.toString()); + }, + onError: (err, stackTrace) { + onWebsocketConnectionError(err); // error during connection + CrashReporting.reportHandledCrash(err!, stackTrace, level: NonFatalExceptionLevel.warning); + }, + onDone: (() { + debugPrint('Websocket connection onDone sd'); // FIXME + onWebsocketConnectionClosed(channel.closeCode, channel.closeReason); + }), + cancelOnError: true, // TODO: is this correct? + ); + }).onError((err, stackTrace) { + // no closing reason or code + print(err); + debugPrint('Websocket connection failed sd: $err'); + CrashReporting.reportHandledCrash(err!, stackTrace, level: NonFatalExceptionLevel.warning); + onWebsocketConnectionFailed(err); // initial connection failed + }); + + try { + await channel.ready; + debugPrint('Websocket Opened in sd card'); + onWebsocketConnectionSuccess(); + } catch (err) { + print(err); + } + return channel; +} + +} \ No newline at end of file diff --git a/backend/models/memory.py b/backend/models/memory.py index 69618efa5..a76b64894 100644 --- a/backend/models/memory.py +++ b/backend/models/memory.py @@ -105,6 +105,7 @@ class MemorySource(str, Enum): openglass = 'openglass' screenpipe = 'screenpipe' workflow = 'workflow' + sdcard = 'sdcard' class MemoryVisibility(str, Enum): diff --git a/backend/routers/sdcard.py b/backend/routers/sdcard.py index de24c925f..337962156 100644 --- a/backend/routers/sdcard.py +++ b/backend/routers/sdcard.py @@ -5,108 +5,154 @@ import requests from fastapi import APIRouter, FastAPI,Depends, HTTPException, UploadFile from utils.memories.process_memory import process_memory, process_user_emotion -from utils.other.storage import upload_sdcard_audio +from utils.other.storage import upload_sdcard_audio,create_signed_postprocessing_audio_url from utils.other import endpoints as auth import datetime - +import uuid +from utils.stt.pre_recorded import fal_whisperx, fal_postprocessing +from utils.audio import create_wav_from_bytes +from utils.stt.vad import vad_is_empty +from fastapi.websockets import WebSocketDisconnect, WebSocket +from starlette.websockets import WebSocketState +from utils.stt.vad import VADIterator, model +import asyncio +import opuslib +from pydub import AudioSegment +import time router = APIRouter() +@router.websocket("/sdcard_stream") +async def sdcard_streaming_endpoint( + websocket: WebSocket, uid: str,bt_connected_time: str +): + + bt_connected_time_dt = datetime.datetime.strptime(bt_connected_time, '%Y-%m-%d %H:%M:%S.%fZ').replace(tzinfo=datetime.timezone.utc) + + try: + await websocket.accept() + except RuntimeError as e: + print(e) + return + + #activate the websocket + websocket_active = True + session_id = str(uuid.uuid4()) + big_file_path = f"_temp/_temp{session_id}.wav" + first_packet_flag=False + data_packet_length=83 + packet_count = 0 + seconds_until_timeout = 10.0 + audio_frames = [] + + try: + while websocket_active: + if first_packet_flag: + data = await asyncio.wait_for(websocket.receive_bytes(), timeout=seconds_until_timeout) + + else: + data = await websocket.receive_bytes() -url = "https://api.deepgram.com/v1/listen" -headers = { - "Authorization": "Token 19e6bff945ec7b8346da429548a80b40973704e4", - "Content-Type": "audio/*", - } + if (len(data) == data_packet_length): #valid packet size + if not first_packet_flag: + first_packet_flag = True + print('first valid packet received') + if data == 100: #valid code + print('done.') + websocket_active = False + break + amount = int(data[3]) + frame_to_decode = bytes(list(data[4:4+amount])) + audio_frames.append(frame_to_decode) -params = { - "model": "whisper" - } -@router.post("/sdcard_memory", response_model=List[Memory], tags=['memories']) -async def download_wav( - file: UploadFile, #, uid: str = Depends(auth.get_current_user_uid) - date_time: str, - uid: str = Depends(auth.get_current_user_uid) -): - #save file here? - # print(uid) - file_path = f"_temp/_{file.filename}" - with open(file_path, 'wb') as f: - f.write(file.file.read()) + except WebSocketDisconnect: + print("websocket gone") + except asyncio.TimeoutError: + print('timeout condition, exitting') + websocket_active = False + except Exception as e: + print('somethign went wrong') + finally: + websocket_active = False + duration_of_file = len(audio_frames) / 100.0 + if duration_of_file < 5.0:#seconds + print('audio file too small') + return + + create_wav_from_bytes(big_file_path, audio_frames, "opus", 16000, 1, 2) - temp_url = upload_sdcard_audio(file_path) - # print(date_time) - datetime_now = datetime.datetime.now(datetime.timezone.utc) #save the current time. will be used to determine elapsed time - #start of audio to transcription stage - try: - f_ = open(file_path,'rb') - response = requests.post(url, headers=headers, params = params,data=f_.read()) - f_.close() - except: - print("eror parsing") + try: + temp_file_path = f"_temp/{session_id}"#+file_num .wav + current_file_num = 1 + temp_file_name = 'temp' + str(current_file_num) + + + vad_segments = vad_is_empty(big_file_path, return_segments=True) + print(vad_segments) + temp_file_list = [] + vad_segments_combined = [] + if vad_segments: + vad_segments_combined = combine_val_segments(vad_segments) + for segments in vad_segments_combined: + + start = segments['start'] + end = segments['end'] + aseg = AudioSegment.from_wav(big_file_path) + aseg = aseg[max(0, (start - 1) * 1000):min((end + 1) * 1000, aseg.duration_seconds * 1000)] + temp_file_name = temp_file_path + str(current_file_num) + '.wav' + temp_file_list.append(temp_file_name) + aseg.export(temp_file_name, format="wav") + current_file_num+=1 + + else: + print('nothing worth using memory for') return - response2 = response.json() - #end of audio to transcriptoin stage - print(response2) - if response2['metadata']['duration'] == 0.0: - return 400 - if not response2['results']['channels']: - return 400 - #this part is for more accurate time measurement - # date_string = "2024-09-14T14:43:46.560643" #the true start of transcription is the time of download + file duration - # format_string = "%Y-%m-%dT%H:%M:%S.%f" - # datetime_object = datetime.datetime.strptime( date_string, format_string) - file_duration = response2['metadata']['duration'] - approximate_file_delay = file_duration * 2.2 #based on empirical observations of ble download speed. 2.2 is tunable - #partitioning stage - partitioned_transcripts = partition_transcripts(response2) - memory_list = [] - print('length of list:',len(partitioned_transcripts)) - for partitions in partitioned_transcripts: - #weed out the transcripts here - if not partitions: #empty list - continue - if len(partitions[0].text.split()) < 8: #too small - continue - temp_start_time = partitions[0].start - temp_end_time = partitions[0].end - partitions[0].start = 0.0 - partitions[0].end = temp_end_time-temp_start_time + + for file, segments in zip(temp_file_list,vad_segments_combined): + signed_url = upload_sdcard_audio(file) + aseg = AudioSegment.from_wav(file) + words = fal_whisperx(signed_url, 1, 2) + duration_entire_process = datetime.datetime.now(datetime.timezone.utc) + time_to_subtract = (duration_entire_process - bt_connected_time_dt).total_seconds() + zero_base = duration_of_file + fal_segments = fal_postprocessing(words, aseg.duration_seconds) + print(fal_segments) + if not fal_segments: + print('failed to get fal segments') + continue temp_memory = CreateMemory( - started_at= datetime_now - datetime.timedelta(seconds=(file_duration - temp_start_time)) - datetime.timedelta(seconds=approximate_file_delay), - finished_at= datetime_now - datetime.timedelta(seconds=(file_duration - temp_end_time)) - datetime.timedelta(seconds=approximate_file_delay), - transcript_segments = partitions, + started_at= datetime.datetime.now(datetime.timezone.utc)-datetime.timedelta(seconds=time_to_subtract+zero_base-segments['start']), + finished_at= datetime.datetime.now(datetime.timezone.utc)-datetime.timedelta(seconds=time_to_subtract+zero_base-segments['end']), + + transcript_segments = fal_segments, + source= MemorySource.sdcard, language = 'en' ) result: Memory = process_memory(uid , temp_memory.language, temp_memory, force_process=True) - print(temp_memory.transcript_segments) + await websocket.send_json({"type": "done"}) - memory_list.append(result) - if not memory_list: - return 400 - - return memory_list + except Exception as e: + print('error bruf') + print(e) + return + + print('finished') + return -def partition_transcripts(json_file): - #may be transcription dervice dependant - transcript_list =json_file['results']['channels'][0]['alternatives'][0]['words'] - list_of_conversations = [] - previous_end_time = 0.0 - tr_threshold = 30.0 - current_transcript_string = '' - current_start_num = 0.0 - for words in transcript_list: - word_ = words['word'] - end = words['end'] - start = words['start'] - if (start - previous_end_time > tr_threshold): - test1 = TranscriptSegment(text=current_transcript_string,is_user = True,start = current_start_num,end = previous_end_time) - current_start_num = words['start'] - list_of_conversations.append([test1]) - current_transcript_string= '' - #TODO:partition within segment for different speakers - #if different speaker: do this.... - current_transcript_string = current_transcript_string + word_ + ' ' - previous_end_time = end - final_conv = TranscriptSegment(text=current_transcript_string,is_user=True ,start = current_start_num,end = previous_end_time) - list_of_conversations.append([final_conv]) - return list_of_conversations +def combine_val_segments(val_segments): + if len(val_segments) == 1: + return val_segments + segments_result = [] + temp_segment = None + for i in range(len(val_segments)): + if not temp_segment: + temp_segment = val_segments[i] + continue + else: + if (val_segments[i]['start'] - val_segments[i-1]['end']) > 120.0: + segments_result.append(temp_segment) + temp_segment = None + else: + temp_segment['end'] = val_segments[i]['end'] + if temp_segment is not None: + segments_result.append(temp_segment) + return segments_result