Skip to content

Commit

Permalink
Notify combining memory to memories page
Browse files Browse the repository at this point in the history
  • Loading branch information
beastoin committed Sep 28, 2024
1 parent 2dab63e commit f4b15bc
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 200 deletions.
4 changes: 0 additions & 4 deletions app/lib/backend/http/api/processing_memories.dart
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,13 @@ Future<UpdateProcessingMemoryResponse?> updateProcessingMemoryServer({
bodyData.addAll({"geolocation": geolocation.toJson()});
}

debugPrint(jsonEncode(bodyData));

var response = await makeApiCall(
url: '${Env.apiBaseUrl}v1/processing-memories/$id',
headers: {},
method: 'PATCH',
body: jsonEncode(bodyData),
);
if (response == null) return null;
debugPrint('updateProcessingMemoryServer: ${response.body}');
if (response.statusCode == 200) {
return UpdateProcessingMemoryResponse.fromJson(jsonDecode(response.body));
} else {
Expand All @@ -57,7 +54,6 @@ Future<ProcessingMemoryResponse?> fetchProcessingMemoryServer({
body: "",
);
if (response == null) return null;
debugPrint('updateProcessingMemoryServer: ${response.body}');
if (response.statusCode == 200) {
return ProcessingMemoryResponse.fromJson(jsonDecode(response.body));
} else {
Expand Down
53 changes: 34 additions & 19 deletions app/lib/providers/capture_provider.dart
Original file line number Diff line number Diff line change
Expand Up @@ -186,46 +186,61 @@ class CaptureProvider extends ChangeNotifier
setHasTranscripts(segments.isNotEmpty);
}

// Notify combining
if (capturingProcessingMemory?.memoryId != null) {
memoryProvider?.onNewCombiningMemory(capturingProcessingMemory!);
}

// Update processing memory
_updateProcessingMemory();
}

void _setCapturingProcessingMemory(ServerProcessingMemory? pm) {
var now = DateTime.now();
debugPrint("${pm?.toJson()}");
if (pm != null &&
pm.status == ServerProcessingMemoryStatus.capturing &&
pm.capturingTo != null &&
pm.capturingTo!.isAfter(now)) {
capturingProcessingMemory = pm;
} else {
capturingProcessingMemory = null;
}
notifyListeners();

// End
void _trackCapturingProcessingMemory() {
if (capturingProcessingMemory == null) {
return;
}

// Or watch
var id = capturingProcessingMemory!.id;
var delayMs = capturingProcessingMemory?.capturingTo != null
? capturingProcessingMemory!.capturingTo!.millisecondsSinceEpoch - now.millisecondsSinceEpoch
var pm = capturingProcessingMemory!;

var delayMs = pm.capturingTo != null
? pm.capturingTo!.millisecondsSinceEpoch - DateTime.now().millisecondsSinceEpoch
: 2 * 60 * 1000; // 2m
if (delayMs > 0) {
_processingMemoryWatchTimer?.cancel();
_processingMemoryWatchTimer = Timer(Duration(milliseconds: delayMs), () async {
ProcessingMemoryResponse? result = await fetchProcessingMemoryServer(id: id);
ProcessingMemoryResponse? result = await fetchProcessingMemoryServer(id: pm.id);
if (result?.result == null) {
debugPrint("Can not fetch processing memory, result null");
return;
}

_setCapturingProcessingMemory(result?.result);
if (capturingProcessingMemory == null) {
// Force clean
_clean();
}
});
}
}

void _setCapturingProcessingMemory(ServerProcessingMemory? pm) {
if (pm != null &&
pm.status == ServerProcessingMemoryStatus.capturing &&
pm.capturingTo != null &&
pm.capturingTo!.isAfter(DateTime.now())) {
capturingProcessingMemory = pm;
_trackCapturingProcessingMemory();

notifyListeners();
return;
}

capturingProcessingMemory = null;
_processingMemoryWatchTimer?.cancel();

notifyListeners();
}

Future<void> _onMemoryCreated(ServerMessageEvent event) async {
if (event.memory == null) {
debugPrint("Memory is not found, processing memory ${event.processingMemoryId}");
Expand Down
52 changes: 46 additions & 6 deletions app/lib/providers/memory_provider.dart
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class MemoryProvider extends ChangeNotifier {
List<ServerProcessingMemory> processingMemories = [];
Timer? _processingMemoryWatchTimer;

void populateMemoriesWithDates() {
void _populateMemoriesWithDatesWithoutNotify() {
memoriesWithDates = [];
for (var i = 0; i < filteredMemories.length; i++) {
if (i == 0) {
Expand All @@ -34,12 +34,37 @@ class MemoryProvider extends ChangeNotifier {
memoriesWithDates.add(filteredMemories[i]);
}
}
}

void _filterMemoriesWithoutNotify(String query) {
filteredMemories = [];
filteredMemories = SharedPreferencesUtil().showDiscardedMemories
? memories
: memories.where((memory) => !memory.discarded || memory.isNew).toList();
filteredMemories = query.isEmpty
? filteredMemories
: filteredMemories
.where(
(memory) => (memory.getTranscript() + memory.structured.title + memory.structured.overview)
.toLowerCase()
.contains(query.toLowerCase()),
)
.toList();
if (query == '' && filteredMemories.isEmpty) {
filteredMemories = memories;
SharedPreferencesUtil().showDiscardedMemories = true;
hasNonDiscardedMemories = false;
}
}

void populateMemoriesWithDates() {
_populateMemoriesWithDatesWithoutNotify();
notifyListeners();
}

void initFilteredMemories() {
filterMemories('');
populateMemoriesWithDates();
_filterMemoriesWithoutNotify('');
_populateMemoriesWithDatesWithoutNotify();
notifyListeners();
}

Expand All @@ -62,7 +87,9 @@ class MemoryProvider extends ChangeNotifier {
SharedPreferencesUtil().showDiscardedMemories = true;
hasNonDiscardedMemories = false;
}
populateMemoriesWithDates();

_populateMemoriesWithDatesWithoutNotify();

notifyListeners();
}

Expand Down Expand Up @@ -112,6 +139,20 @@ class MemoryProvider extends ChangeNotifier {
return;
}

Future onNewCombiningMemory(ServerProcessingMemory pm) async {
if (pm.memoryId == null) {
debugPrint("Processing Memory Id is not found ${pm.id}");
return;
}
int idx = memories.indexWhere((m) => m.id == pm.memoryId);
if (idx < 0) {
return;
}
memories.removeAt(idx);

initFilteredMemories();
}

Future onNewProcessingMemory(ServerProcessingMemory processingMemory) async {
if (processingMemories.indexWhere((pm) => pm.id == processingMemory.id) >= 0) {
// existed
Expand Down Expand Up @@ -145,11 +186,10 @@ class MemoryProvider extends ChangeNotifier {
if (idx < 0) {
memories.insert(0, memory);
} else {
// TODO: thinh, remove
memories[idx] = memory;
}

// Warn: Too many notifying!
// Warn: Too many notifies!
initFilteredMemories();
}

Expand Down
168 changes: 0 additions & 168 deletions app/lib/services/sockets/pure_socket.dart
Original file line number Diff line number Diff line change
@@ -1,15 +1,8 @@
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'dart:math';

import 'package:flutter/material.dart';
import 'package:friend_private/backend/preferences.dart';
import 'package:friend_private/backend/schema/bt_device.dart';
import 'package:friend_private/backend/schema/message_event.dart';
import 'package:friend_private/backend/schema/transcript_segment.dart';
import 'package:friend_private/env/env.dart';
import 'package:friend_private/services/notifications.dart';
import 'package:instabug_flutter/instabug_flutter.dart';
import 'package:internet_connection_checker_plus/internet_connection_checker_plus.dart';
import 'package:web_socket_channel/io.dart';
Expand Down Expand Up @@ -263,164 +256,3 @@ class PureSocket implements IPureSocket {
}
}
}

abstract interface class ITransctipSegmentSocketServiceListener {
void onMessageEventReceived(ServerMessageEvent event);
void onSegmentReceived(List<TranscriptSegment> segments);
void onError(Object err);
void onClosed();
}

class SpeechProfileTranscripSegmentSocketService extends TranscripSegmentSocketService {
SpeechProfileTranscripSegmentSocketService.create(super.sampleRate, super.codec)
: super.create(includeSpeechProfile: false, newMemoryWatch: false);
}

class MemoryTranscripSegmentSocketService extends TranscripSegmentSocketService {
MemoryTranscripSegmentSocketService.create(super.sampleRate, super.codec)
: super.create(includeSpeechProfile: true, newMemoryWatch: true);
}

enum SocketServiceState {
connected,
disconnected,
}

class TranscripSegmentSocketService implements IPureSocketListener {
late PureSocket _socket;
final Map<Object, ITransctipSegmentSocketServiceListener> _listeners = {};

SocketServiceState get state =>
_socket.status == PureSocketStatus.connected ? SocketServiceState.connected : SocketServiceState.disconnected;

int sampleRate;
BleAudioCodec codec;
bool includeSpeechProfile;
bool newMemoryWatch;

TranscripSegmentSocketService.create(
this.sampleRate,
this.codec, {
this.includeSpeechProfile = false,
this.newMemoryWatch = true,
}) {
final recordingsLanguage = SharedPreferencesUtil().recordingsLanguage;
var params = '?language=$recordingsLanguage&sample_rate=$sampleRate&codec=$codec&uid=${SharedPreferencesUtil().uid}'
'&include_speech_profile=$includeSpeechProfile&new_memory_watch=$newMemoryWatch&stt_service=${SharedPreferencesUtil().transcriptionModel}';
String url = '${Env.apiBaseUrl!.replaceAll('https', 'wss')}listen$params';

_socket = PureSocket(url);
_socket.setListener(this);
}

void subscribe(Object context, ITransctipSegmentSocketServiceListener listener) {
_listeners.remove(context.hashCode);
_listeners.putIfAbsent(context.hashCode, () => listener);
}

void unsubscribe(Object context) {
_listeners.remove(context.hashCode);
}

Future start() async {
bool ok = await _socket.connect();
if (!ok) {
debugPrint("Can not connect to websocket");
}
}

Future stop({String? reason}) async {
await _socket.stop();
_listeners.clear();

if (reason != null) {
debugPrint(reason);
}
}

Future send(dynamic message) async {
_socket.send(message);
return;
}

@override
void onClosed() {
_listeners.forEach((k, v) {
v.onClosed();
});
}

@override
void onError(Object err, StackTrace trace) {
_listeners.forEach((k, v) {
v.onError(err);
});
}

@override
void onMessage(event) {
if (event == 'ping') return;

// Decode json
dynamic jsonEvent;
try {
jsonEvent = jsonDecode(event);
} on FormatException catch (e) {
debugPrint(e.toString());
}
if (jsonEvent == null) {
debugPrint("Can not decode message event json $event");
return;
}

// Transcript segments
if (jsonEvent is List) {
var segments = jsonEvent;
if (segments.isEmpty) {
return;
}
_listeners.forEach((k, v) {
v.onSegmentReceived(segments.map((e) => TranscriptSegment.fromJson(e)).toList());
});
return;
}

// Message event
if (jsonEvent.containsKey("type")) {
var event = ServerMessageEvent.fromJson(jsonEvent);
_listeners.forEach((k, v) {
v.onMessageEventReceived(event);
});
return;
}

debugPrint(event.toString());
}

@override
void onInternetConnectionFailed() {
debugPrint("onInternetConnectionFailed");

// Send notification
NotificationService.instance.clearNotification(3);
NotificationService.instance.createNotification(
notificationId: 3,
title: 'Internet Connection Lost',
body: 'Your device is offline. Transcription is paused until connection is restored.',
);
}

@override
void onMaxRetriesReach() {
debugPrint("onMaxRetriesReach");

// Send notification
NotificationService.instance.clearNotification(2);
NotificationService.instance.createNotification(
notificationId: 2,
title: 'Connection Issue 🚨',
body: 'Unable to connect to the transcript service.'
' Please restart the app or contact support if the problem persists.',
);
}
}
3 changes: 0 additions & 3 deletions app/lib/services/sockets/transcription_connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,6 @@ class TranscripSegmentSocketService implements IPureSocketListener {

@override
void onMessage(event) {
debugPrint("[TranscriptSegmentService] onMessage ${event}");
if (event == 'ping') return;

// Decode json
Expand All @@ -386,8 +385,6 @@ class TranscripSegmentSocketService implements IPureSocketListener {
return;
}

debugPrint(event);

// Message event
if (jsonEvent.containsKey("type")) {
var event = ServerMessageEvent.fromJson(jsonEvent);
Expand Down

0 comments on commit f4b15bc

Please sign in to comment.