Skip to content

Commit

Permalink
segments aligning seconds duration
Browse files Browse the repository at this point in the history
  • Loading branch information
josancamon19 committed Oct 5, 2024
1 parent fa4747d commit ad6940a
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 69 deletions.
13 changes: 2 additions & 11 deletions backend/routers/memories.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from database.vector_db import delete_vector
from models.memory import *
from routers.speech_profile import expand_speech_profile
from routers.transcribe_v2 import retrieve_in_progress_memory
from utils.memories.location import get_google_maps_location
from utils.memories.process_memory import process_memory
from utils.other import endpoints as auth
Expand Down Expand Up @@ -68,17 +69,7 @@ def create_memory(

@router.post("/v2/memories", response_model=CreateMemoryResponse, tags=['memories'])
def process_in_progress_memory(uid: str = Depends(auth.get_current_user_uid)):
memory_id = redis_db.get_in_progress_memory_id(uid)
memory = None

if memory_id:
memory = memories_db.get_memory(uid, memory_id)
if memory and memory['status'] != 'in_progress':
memory = None

if not memory:
memory = memories_db.get_in_progress_memory(uid)

memory = retrieve_in_progress_memory(uid)
if not memory:
raise HTTPException(status_code=404, detail="Memory in progress not found")

Expand Down
102 changes: 44 additions & 58 deletions backend/routers/transcribe_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,24 @@ def get_model_name(value):
return 'speechmatics_streaming'


def retrieve_in_progress_memory(uid):
memory_id = redis_db.get_in_progress_memory_id(uid)
existing = None

if memory_id:
existing = memories_db.get_memory(uid, memory_id)
if existing and existing['status'] != 'in_progress':
existing = None

if not existing:
existing = memories_db.get_in_progress_memory(uid)
return existing


async def _websocket_util(
websocket: WebSocket, uid: str, language: str = 'en', sample_rate: int = 8000, codec: str = 'pcm8',
channels: int = 1, include_speech_profile: bool = True):
channels: int = 1, include_speech_profile: bool = True
):
print('websocket_endpoint', uid, language, sample_rate, codec, include_speech_profile)

# Not when comes from the phone, and only Friend's with 1.0.4
Expand All @@ -95,47 +110,12 @@ async def _websocket_util(

# Stream transcript
loop = asyncio.get_event_loop()

# Soft timeout, should < MODAL_TIME_OUT - 3m
timeout_seconds = 420 # 7m
started_at = time.time()
memory_creation_timeout = 15

def _get_in_progress_memory(segments: List[dict] = None):

memory_id = redis_db.get_in_progress_memory_id(uid)
existing = None

if memory_id:
existing = memories_db.get_memory(uid, memory_id)
if existing and existing['status'] != 'in_progress':
existing = None

if not existing:
existing = memories_db.get_in_progress_memory(uid)

if not segments: # memory_created event, just return in progress, withouc considering follow-up checks
return existing

if existing:
if time.time() - existing['finished_at'].timestamp() > memory_creation_timeout:
memory = Memory(**existing)
memories_db.update_memory_status(uid, memory.id, MemoryStatus.processing)
memory = process_memory(uid, memory.language, memory)
memories_db.update_memory_status(uid, memory.id, MemoryStatus.completed)
memory = None
existing = None

if existing:
print('_get_in_progress_memory existing', existing['id'])
def _get_or_create_in_progress_memory(segments: List[dict]):
if existing := retrieve_in_progress_memory(uid):
print('_get_or_create_in_progress_memory existing', existing['id'])
memory = Memory(**existing)
# latest_segment = memory.transcript_segments[-1]
#
# if segments and segments[-1]['start'] < latest_segment.end:
# for segment in segments:
# segment['start'] += latest_segment.end
# segment['end'] += latest_segment.end

memory.transcript_segments = TranscriptSegment.combine_segments(
memory.transcript_segments, [TranscriptSegment(**segment) for segment in segments]
)
Expand Down Expand Up @@ -170,6 +150,14 @@ async def memory_creation_timer():

memory_creation_task = None
seconds_to_trim = None
seconds_to_add = None

existing_memory = retrieve_in_progress_memory(uid)
if existing_memory:
dt = datetime.fromisoformat(existing_memory['started_at'].isoformat())
seconds_to_add = (datetime.now(timezone.utc) - dt).total_seconds()
# TODO: validate is not more than duration? 120 seconds, and start processing
print('seconds_to_add', seconds_to_add)

def stream_transcript(segments, _):
nonlocal websocket
Expand All @@ -188,22 +176,20 @@ def stream_transcript(segments, _):
memory_creation_task = asyncio.create_task(memory_creation_timer())

# Segments aligning duration seconds.
# last_segment_seconds = redis_db.get_in_progress_memory_id_last_segment_seconds(uid)
# if segments[0]['start'] < last_segment_seconds:
# seconds_to_trim = 0
# for i, segment in enumerate(segments):
# segment["start"] += last_segment_seconds
# segment["end"] += last_segment_seconds
# segments[i] = segment
# else:
for i, segment in enumerate(segments):
segment["start"] -= seconds_to_trim
segment["end"] -= seconds_to_trim
segments[i] = segment
if seconds_to_add:
for i, segment in enumerate(segments):
segment["start"] += seconds_to_add
segment["end"] += seconds_to_add
segments[i] = segment
elif seconds_to_trim:
for i, segment in enumerate(segments):
segment["start"] -= seconds_to_trim
segment["end"] -= seconds_to_trim
segments[i] = segment

asyncio.run_coroutine_threadsafe(websocket.send_json(segments), loop)

memory = _get_in_progress_memory(segments) # can trigger race condition? increase soniox utterance?
memory = _get_or_create_in_progress_memory(segments) # can trigger race condition? increase soniox utterance?
memories_db.update_memory_segments(uid, memory.id, [s.dict() for s in memory.transcript_segments])
memories_db.update_memory_finished_at(uid, memory.id, datetime.now(timezone.utc))

Expand Down Expand Up @@ -324,6 +310,9 @@ async def receive_audio(dg_socket1, dg_socket2, soniox_socket, speechmatics_sock
await speechmatics_socket.close()

# heart beat
started_at = time.time()
timeout_seconds = 420 # 7m # Soft timeout, should < MODAL_TIME_OUT - 3m

async def send_heartbeat():
nonlocal websocket_active
nonlocal websocket_close_code
Expand Down Expand Up @@ -362,15 +351,12 @@ async def _send_message_event(msg: MessageEvent):

return False

# Create proccesing memory

# Create memory
async def _create_memory():
print("_create_memory")
nonlocal seconds_to_trim
seconds_to_trim = None

memory = _get_in_progress_memory()
memory = retrieve_in_progress_memory(uid)
if not memory or not memory.transcript_segments:
raise Exception('FAILED')

Expand All @@ -387,9 +373,9 @@ async def _create_memory():

try:
receive_task = asyncio.create_task(
receive_audio(deepgram_socket, deepgram_socket2, soniox_socket, speechmatics_socket))
receive_audio(deepgram_socket, deepgram_socket2, soniox_socket, speechmatics_socket)
)
heartbeat_task = asyncio.create_task(send_heartbeat())

await asyncio.gather(receive_task, heartbeat_task)

except Exception as e:
Expand Down

0 comments on commit ad6940a

Please sign in to comment.