Skip to content

Commit

Permalink
Use consumer/deliver to control the transcript pushing/audiobytes pus… (
Browse files Browse the repository at this point in the history
#1208)

…hing even better
  • Loading branch information
beastoin authored Oct 29, 2024
2 parents 0863830 + 9fd5512 commit 9a372b6
Showing 1 changed file with 45 additions and 46 deletions.
91 changes: 45 additions & 46 deletions backend/routers/transcribe_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,75 +235,67 @@ async def create_memory_on_segment_received_task(finished_at: datetime):
memory_creation_task = asyncio.create_task(
_trigger_create_memory_with_delay(memory_creation_timeout, finished_at))

async def create_pusher_task_handler(loop):
async def create_pusher_task_handler():
nonlocal websocket_active

# Transcript
transcript_ws = await connect_to_transcript_pusher(uid)
last_sync_transcript = time.time()
segment_buffers = []

async def transcript_send_async(segments):
# print(f"transcript_send ${len(segments)}")
try:
await transcript_ws.send(json.dumps(segments))
except websockets.exceptions.ConnectionClosed as e:
print(f"Pusher Transcript Connection closed: {e}")

# Debounce 1s
def transcript_send(segments, force: bool = False):
def transcript_send(segments):
nonlocal segment_buffers
nonlocal last_sync_transcript

segment_buffers.extend(segments)
if time.time() - last_sync_transcript >= 1 or force:
last_sync_transcript = time.time()
asyncio.run_coroutine_threadsafe(transcript_send_async(segment_buffers), loop)
segment_buffers = []

async def transcript_consume():
nonlocal websocket_active
nonlocal segment_buffers
while websocket_active:
await asyncio.sleep(1)
if len(segment_buffers) > 0:
try:
await transcript_ws.send(json.dumps(segment_buffers))
segment_buffers = []
except websockets.exceptions.ConnectionClosed as e:
print(f"Pusher transcripts Connection closed: {e}")
except Exception as e:
print(f"Pusher transcripts failed: {e}")

# Audio bytes
audio_bytes_ws = None
last_sync_audio_bytes = time.time()
audio_buffers = []
audio_buffers = bytearray()
audio_bytes_webhook_delay_seconds = get_audio_bytes_webhook_seconds(uid)
if audio_bytes_webhook_delay_seconds:
audio_bytes_ws = await connect_to_audio_bytes_pusher(uid, sample_rate)

async def audio_bytes_send_async(audio_buffer):
# print(f"audio_bytes_send ${len(audio_buffer)}")
if audio_bytes_ws:
try:
await audio_bytes_ws.send(audio_buffer)
except websockets.exceptions.ConnectionClosed as e:
print(f"Pusher Audio Bytes Connection closed: {e}")

# Debounce 1s
def audio_bytes_send(audio_bytes, force: bool = False):
# print(f"audio_bytes_send ${len(audio_buffer)}")
def audio_bytes_send(audio_bytes):
nonlocal audio_buffers
nonlocal last_sync_audio_bytes

audio_buffers.extend(audio_bytes)
if time.time() - last_sync_audio_bytes >= 1 or force:
last_sync_audio_bytes = time.time()
asyncio.run_coroutine_threadsafe(audio_bytes_send_async(audio_buffers), loop)
audio_buffers = []

async def close(code: int = 1000):
# Flush
if len(segment_buffers) > 0:
transcript_send([], True)
if len(audio_buffers) > 0:
audio_bytes_send([], True)
async def audio_bytes_consume():
nonlocal websocket_active
nonlocal audio_buffers
while websocket_active:
await asyncio.sleep(1)
if len(audio_buffers) > 0:
try:
await audio_bytes_ws.send(audio_buffers.copy())
audio_buffers = bytearray()
except websockets.exceptions.ConnectionClosed as e:
print(f"Pusher audio_bytes Connection closed: {e}")
except Exception as e:
print(f"Pusher audio_bytes failed: {e}")

# Close
async def close(code: int = 1000):
if transcript_ws:
await transcript_ws.close(code)

if audio_bytes_ws:
await audio_bytes_ws.close(code)

return (close, transcript_send, audio_bytes_send if audio_bytes_ws else None)
return (close, transcript_send, transcript_consume,
audio_bytes_send if audio_bytes_ws else None, audio_bytes_consume if audio_bytes_ws else None)

pusher_close, transcript_send, audio_bytes_send = await create_pusher_task_handler(loop)
pusher_close, transcript_send, transcript_consume, audio_bytes_send, audio_bytes_consume = await create_pusher_task_handler()

def stream_transcript(segments):
nonlocal websocket
Expand Down Expand Up @@ -492,7 +484,14 @@ async def send_heartbeat():
receive_audio(deepgram_socket, deepgram_socket2, soniox_socket, speechmatics_socket)
)
heartbeat_task = asyncio.create_task(send_heartbeat())
await asyncio.gather(receive_task, heartbeat_task)

# consumer
consume_tasks = [asyncio.create_task(transcript_consume())]
if audio_bytes_consume:
consume_tasks.append(asyncio.create_task(audio_bytes_consume()))

tasks = [receive_task, heartbeat_task] + consume_tasks
await asyncio.gather(*tasks)

except Exception as e:
print(f"Error during WebSocket operation: {e}")
Expand Down

0 comments on commit 9a372b6

Please sign in to comment.