Skip to content

Commit

Permalink
fix queue
Browse files Browse the repository at this point in the history
  • Loading branch information
nbsp committed Sep 19, 2024
1 parent cc63478 commit 3120dcb
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 36 deletions.
20 changes: 10 additions & 10 deletions packages/livekit-rtc/src/audio_source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import type {
AudioSourceInfo,
CaptureAudioFrameCallback,
CaptureAudioFrameResponse,
ClearAudioBufferResponse,
NewAudioSourceResponse,

ClearAudioBufferResponse} from './proto/audio_frame_pb.js';
} from './proto/audio_frame_pb.js';
import {
AudioSourceType,
CaptureAudioFrameRequest,
Expand Down Expand Up @@ -84,25 +84,25 @@ export class AudioSource {

async waitForPlayout() {
await this.releaseQueue.get().then(() => {
this.lastCapture = 0
this.lastCapture = 0;
this.currentQueueSize = 0;
})
});
}

async captureFrame(frame: AudioFrame) {
const now = Date.now()
const now = Date.now();
const elapsed = this.lastCapture === 0 ? 0 : now - this.lastCapture;
this.currentQueueSize += frame.samplesPerChannel / frame.sampleRate - elapsed
this.currentQueueSize += (frame.samplesPerChannel / frame.sampleRate - elapsed) * 1000;

// remove 50ms to account for processing time (e.g. using wait_for_playout for very small chunks)
this.currentQueueSize -= 0.05
this.lastCapture = now
this.currentQueueSize -= 50;
this.lastCapture = now;

if (this.timeout) {
clearTimeout(this.timeout);
}
setTimeout(this.releaseQueue.put, this.currentQueueSize)
setTimeout(this.releaseQueue.put, this.currentQueueSize);

const req = new CaptureAudioFrameRequest({
sourceHandle: this.ffiHandle.handle,
buffer: frame.protoInfo(),
Expand Down
13 changes: 5 additions & 8 deletions packages/livekit-rtc/src/napi/native.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,11 @@

/* auto-generated by NAPI-RS */

export declare function livekitInitialize(
callback: (data: Uint8Array) => void,
captureLogs: boolean,
): void;
export declare function livekitFfiRequest(data: Uint8Array): Uint8Array;
export declare function livekitRetrievePtr(handle: Uint8Array): bigint;
export declare function livekitCopyBuffer(ptr: bigint, len: number): Uint8Array;
export declare function livekitDispose(): Promise<void>;
export function livekitInitialize(callback: (data: Uint8Array) => void, captureLogs: boolean): void;
export function livekitFfiRequest(data: Uint8Array): Uint8Array;
export function livekitRetrievePtr(handle: Uint8Array): bigint;
export function livekitCopyBuffer(ptr: bigint, len: number): Uint8Array;
export function livekitDispose(): Promise<void>;
export declare class FfiHandle {
constructor(handle: bigint);
dispose(): void;
Expand Down
35 changes: 17 additions & 18 deletions packages/livekit-rtc/src/utils.ts
Original file line number Diff line number Diff line change
@@ -1,33 +1,32 @@
// SPDX-FileCopyrightText: 2024 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0

import EventEmitter, { once } from "events";
import EventEmitter, { once } from 'events';

/** @internal */
export class Queue<T> {
#items: T[] = [];
#limit?: number;
#events = new EventEmitter();
items: T[] = [];
limit?: number;
events = new EventEmitter();

constructor(limit?: number) {
this.#limit = limit;
this.limit = limit;
}

async get(): Promise<T> {
if (this.#items.length === 0) {
await once(this.#events, 'put');
get = async (): Promise<T> => {
if (this.items.length === 0) {
await once(this.events, 'put');
}
const item = this.#items.shift()!;
this.#events.emit('get');
const item = this.items.shift()!;
this.events.emit('get');
return item;
}
};

async put(item: T) {
if (this.#limit && this.#items.length >= this.#limit) {
await once(this.#events, 'get');
put = async (item: T) => {
if (this.limit && this.items.length >= this.limit) {
await once(this.events, 'get');
}
this.#items.push(item);
this.#events.emit('put');
}
this.items.push(item);
this.events.emit('put');
};
}

0 comments on commit 3120dcb

Please sign in to comment.