From 3120dcb27dc32d7ae05d3bbde52f4c742f37b06d Mon Sep 17 00:00:00 2001 From: aoife cassidy Date: Thu, 19 Sep 2024 09:10:50 +0300 Subject: [PATCH] fix queue --- packages/livekit-rtc/src/audio_source.ts | 20 ++++++------- packages/livekit-rtc/src/napi/native.d.ts | 13 ++++----- packages/livekit-rtc/src/utils.ts | 35 +++++++++++------------ 3 files changed, 32 insertions(+), 36 deletions(-) diff --git a/packages/livekit-rtc/src/audio_source.ts b/packages/livekit-rtc/src/audio_source.ts index 089f9e47..761c32c4 100644 --- a/packages/livekit-rtc/src/audio_source.ts +++ b/packages/livekit-rtc/src/audio_source.ts @@ -8,9 +8,9 @@ import type { AudioSourceInfo, CaptureAudioFrameCallback, CaptureAudioFrameResponse, + ClearAudioBufferResponse, NewAudioSourceResponse, - - ClearAudioBufferResponse} from './proto/audio_frame_pb.js'; +} from './proto/audio_frame_pb.js'; import { AudioSourceType, CaptureAudioFrameRequest, @@ -84,25 +84,25 @@ export class AudioSource { async waitForPlayout() { await this.releaseQueue.get().then(() => { - this.lastCapture = 0 + this.lastCapture = 0; this.currentQueueSize = 0; - }) + }); } async captureFrame(frame: AudioFrame) { - const now = Date.now() + const now = Date.now(); const elapsed = this.lastCapture === 0 ? 0 : now - this.lastCapture; - this.currentQueueSize += frame.samplesPerChannel / frame.sampleRate - elapsed + this.currentQueueSize += (frame.samplesPerChannel / frame.sampleRate - elapsed) * 1000; // remove 50ms to account for processing time (e.g. using wait_for_playout for very small chunks) - this.currentQueueSize -= 0.05 - this.lastCapture = now + this.currentQueueSize -= 50; + this.lastCapture = now; if (this.timeout) { clearTimeout(this.timeout); } - setTimeout(this.releaseQueue.put, this.currentQueueSize) - + setTimeout(this.releaseQueue.put, this.currentQueueSize); + const req = new CaptureAudioFrameRequest({ sourceHandle: this.ffiHandle.handle, buffer: frame.protoInfo(), diff --git a/packages/livekit-rtc/src/napi/native.d.ts b/packages/livekit-rtc/src/napi/native.d.ts index f3ba192f..0355e240 100644 --- a/packages/livekit-rtc/src/napi/native.d.ts +++ b/packages/livekit-rtc/src/napi/native.d.ts @@ -3,14 +3,11 @@ /* auto-generated by NAPI-RS */ -export declare function livekitInitialize( - callback: (data: Uint8Array) => void, - captureLogs: boolean, -): void; -export declare function livekitFfiRequest(data: Uint8Array): Uint8Array; -export declare function livekitRetrievePtr(handle: Uint8Array): bigint; -export declare function livekitCopyBuffer(ptr: bigint, len: number): Uint8Array; -export declare function livekitDispose(): Promise; +export function livekitInitialize(callback: (data: Uint8Array) => void, captureLogs: boolean): void; +export function livekitFfiRequest(data: Uint8Array): Uint8Array; +export function livekitRetrievePtr(handle: Uint8Array): bigint; +export function livekitCopyBuffer(ptr: bigint, len: number): Uint8Array; +export function livekitDispose(): Promise; export declare class FfiHandle { constructor(handle: bigint); dispose(): void; diff --git a/packages/livekit-rtc/src/utils.ts b/packages/livekit-rtc/src/utils.ts index c6167d66..aa97e63f 100644 --- a/packages/livekit-rtc/src/utils.ts +++ b/packages/livekit-rtc/src/utils.ts @@ -1,33 +1,32 @@ // SPDX-FileCopyrightText: 2024 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 - -import EventEmitter, { once } from "events"; +import EventEmitter, { once } from 'events'; /** @internal */ export class Queue { - #items: T[] = []; - #limit?: number; - #events = new EventEmitter(); + items: T[] = []; + limit?: number; + events = new EventEmitter(); constructor(limit?: number) { - this.#limit = limit; + this.limit = limit; } - async get(): Promise { - if (this.#items.length === 0) { - await once(this.#events, 'put'); + get = async (): Promise => { + if (this.items.length === 0) { + await once(this.events, 'put'); } - const item = this.#items.shift()!; - this.#events.emit('get'); + const item = this.items.shift()!; + this.events.emit('get'); return item; - } + }; - async put(item: T) { - if (this.#limit && this.#items.length >= this.#limit) { - await once(this.#events, 'get'); + put = async (item: T) => { + if (this.limit && this.items.length >= this.limit) { + await once(this.events, 'get'); } - this.#items.push(item); - this.#events.emit('put'); - } + this.items.push(item); + this.events.emit('put'); + }; }