From e98fb503cd060e9e2858b08d6b7ac1ebaee2ebc4 Mon Sep 17 00:00:00 2001 From: aoife cassidy Date: Wed, 18 Sep 2024 13:14:31 -0700 Subject: [PATCH 01/11] rtc: bump ffi to v0.10.2 --- packages/livekit-rtc/rust-sdks | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/livekit-rtc/rust-sdks b/packages/livekit-rtc/rust-sdks index 346cc226..1b87c1af 160000 --- a/packages/livekit-rtc/rust-sdks +++ b/packages/livekit-rtc/rust-sdks @@ -1 +1 @@ -Subproject commit 346cc22618d30296192e936f26c2c968aab2ec80 +Subproject commit 1b87c1aff6fb84284ab15242843ff7222ed7dbf4 From 8a544e6b288a242d46ca891078fab3670805ed64 Mon Sep 17 00:00:00 2001 From: aoife cassidy Date: Wed, 18 Sep 2024 13:15:43 -0700 Subject: [PATCH 02/11] Create strong-dodos-smash.md --- .changeset/strong-dodos-smash.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/strong-dodos-smash.md diff --git a/.changeset/strong-dodos-smash.md b/.changeset/strong-dodos-smash.md new file mode 100644 index 00000000..ad66452b --- /dev/null +++ b/.changeset/strong-dodos-smash.md @@ -0,0 +1,5 @@ +--- +"@livekit/rtc-node": patch +--- + +bump ffi to v0.10.2 From 99abc28e0441daa11da1b572ce310ce897917b87 Mon Sep 17 00:00:00 2001 From: aoife cassidy Date: Wed, 18 Sep 2024 14:35:49 -0700 Subject: [PATCH 03/11] add queueing to audiosource --- packages/livekit-rtc/src/audio_source.ts | 56 +++++++++++++- .../livekit-rtc/src/proto/audio_frame_pb.ts | 74 ++++++++++++++++++- packages/livekit-rtc/src/proto/ffi_pb.ts | 48 +++++++----- packages/livekit-rtc/src/utils.ts | 33 +++++++++ 4 files changed, 188 insertions(+), 23 deletions(-) create mode 100644 packages/livekit-rtc/src/utils.ts diff --git a/packages/livekit-rtc/src/audio_source.ts b/packages/livekit-rtc/src/audio_source.ts index 0af142cb..7d0024f9 100644 --- a/packages/livekit-rtc/src/audio_source.ts +++ b/packages/livekit-rtc/src/audio_source.ts @@ -9,31 +9,45 @@ import type { CaptureAudioFrameCallback, CaptureAudioFrameResponse, NewAudioSourceResponse, -} from './proto/audio_frame_pb.js'; + + ClearAudioBufferResponse} from './proto/audio_frame_pb.js'; import { AudioSourceType, CaptureAudioFrameRequest, + ClearAudioBufferRequest, NewAudioSourceRequest, } from './proto/audio_frame_pb.js'; +import { Queue } from './utils.js'; export class AudioSource { /** @internal */ info: AudioSourceInfo; /** @internal */ ffiHandle: FfiHandle; + /** @internal */ + lastCapture: number; + /** @internal */ + currentQueueSize: number; + /** @internal */ + releaseQueue = new Queue(); sampleRate: number; numChannels: number; + queueSize: number; - constructor(sampleRate: number, numChannels: number, enableQueue?: boolean) { + constructor(sampleRate: number, numChannels: number, queueSize = 1000) { this.sampleRate = sampleRate; this.numChannels = numChannels; + this.queueSize = queueSize; + + this.lastCapture = 0; + this.currentQueueSize = 0; const req = new NewAudioSourceRequest({ type: AudioSourceType.AUDIO_SOURCE_NATIVE, sampleRate: sampleRate, numChannels: numChannels, - enableQueue: enableQueue, + queueSizeMs: queueSize, }); const res = FfiClient.instance.request({ @@ -47,7 +61,43 @@ export class AudioSource { this.ffiHandle = new FfiHandle(res.source.handle.id); } + get queuedDuration(): number { + return Math.max(this.currentQueueSize - Date.now() + this.lastCapture, 0); + } + + clearQueue() { + const req = new ClearAudioBufferRequest({ + sourceHandle: this.ffiHandle.handle, + }); + + FfiClient.instance.request({ + message: { + case: 'clearAudioBuffer', + value: req, + }, + }); + + this.releaseQueue.put(); + } + + async waitForPlayout() { + await this.releaseQueue.get().then(() => { + this.lastCapture = 0 + this.currentQueueSize = 0; + }) + } + async captureFrame(frame: AudioFrame) { + const now = Date.now() + const elapsed = this.lastCapture === 0 ? 0 : now - this.lastCapture; + this.currentQueueSize += frame.samplesPerChannel / frame.sampleRate - elapsed + + // remove 50ms to account for processing time (e.g. using wait_for_playout for very small chunks) + this.currentQueueSize -= 0.05 + this.lastCapture = now + + setTimeout(this.releaseQueue.put, this.currentQueueSize) + const req = new CaptureAudioFrameRequest({ sourceHandle: this.ffiHandle.handle, buffer: frame.protoInfo(), diff --git a/packages/livekit-rtc/src/proto/audio_frame_pb.ts b/packages/livekit-rtc/src/proto/audio_frame_pb.ts index c680608b..597dbdd4 100644 --- a/packages/livekit-rtc/src/proto/audio_frame_pb.ts +++ b/packages/livekit-rtc/src/proto/audio_frame_pb.ts @@ -276,9 +276,9 @@ export class NewAudioSourceRequest extends Message { numChannels = 0; /** - * @generated from field: optional bool enable_queue = 5; + * @generated from field: uint32 queue_size_ms = 5; */ - enableQueue?: boolean; + queueSizeMs = 0; constructor(data?: PartialMessage) { super(); @@ -292,7 +292,7 @@ export class NewAudioSourceRequest extends Message { { no: 2, name: "options", kind: "message", T: AudioSourceOptions, opt: true }, { no: 3, name: "sample_rate", kind: "scalar", T: 13 /* ScalarType.UINT32 */ }, { no: 4, name: "num_channels", kind: "scalar", T: 13 /* ScalarType.UINT32 */ }, - { no: 5, name: "enable_queue", kind: "scalar", T: 8 /* ScalarType.BOOL */, opt: true }, + { no: 5, name: "queue_size_ms", kind: "scalar", T: 13 /* ScalarType.UINT32 */ }, ]); static fromBinary(bytes: Uint8Array, options?: Partial): NewAudioSourceRequest { @@ -475,6 +475,74 @@ export class CaptureAudioFrameCallback extends Message { + /** + * @generated from field: uint64 source_handle = 1; + */ + sourceHandle = protoInt64.zero; + + constructor(data?: PartialMessage) { + super(); + proto3.util.initPartial(data, this); + } + + static readonly runtime: typeof proto3 = proto3; + static readonly typeName = "livekit.proto.ClearAudioBufferRequest"; + static readonly fields: FieldList = proto3.util.newFieldList(() => [ + { no: 1, name: "source_handle", kind: "scalar", T: 4 /* ScalarType.UINT64 */ }, + ]); + + static fromBinary(bytes: Uint8Array, options?: Partial): ClearAudioBufferRequest { + return new ClearAudioBufferRequest().fromBinary(bytes, options); + } + + static fromJson(jsonValue: JsonValue, options?: Partial): ClearAudioBufferRequest { + return new ClearAudioBufferRequest().fromJson(jsonValue, options); + } + + static fromJsonString(jsonString: string, options?: Partial): ClearAudioBufferRequest { + return new ClearAudioBufferRequest().fromJsonString(jsonString, options); + } + + static equals(a: ClearAudioBufferRequest | PlainMessage | undefined, b: ClearAudioBufferRequest | PlainMessage | undefined): boolean { + return proto3.util.equals(ClearAudioBufferRequest, a, b); + } +} + +/** + * @generated from message livekit.proto.ClearAudioBufferResponse + */ +export class ClearAudioBufferResponse extends Message { + constructor(data?: PartialMessage) { + super(); + proto3.util.initPartial(data, this); + } + + static readonly runtime: typeof proto3 = proto3; + static readonly typeName = "livekit.proto.ClearAudioBufferResponse"; + static readonly fields: FieldList = proto3.util.newFieldList(() => [ + ]); + + static fromBinary(bytes: Uint8Array, options?: Partial): ClearAudioBufferResponse { + return new ClearAudioBufferResponse().fromBinary(bytes, options); + } + + static fromJson(jsonValue: JsonValue, options?: Partial): ClearAudioBufferResponse { + return new ClearAudioBufferResponse().fromJson(jsonValue, options); + } + + static fromJsonString(jsonString: string, options?: Partial): ClearAudioBufferResponse { + return new ClearAudioBufferResponse().fromJsonString(jsonString, options); + } + + static equals(a: ClearAudioBufferResponse | PlainMessage | undefined, b: ClearAudioBufferResponse | PlainMessage | undefined): boolean { + return proto3.util.equals(ClearAudioBufferResponse, a, b); + } +} + /** * Create a new AudioResampler * diff --git a/packages/livekit-rtc/src/proto/ffi_pb.ts b/packages/livekit-rtc/src/proto/ffi_pb.ts index 5646abf8..e74b4809 100644 --- a/packages/livekit-rtc/src/proto/ffi_pb.ts +++ b/packages/livekit-rtc/src/proto/ffi_pb.ts @@ -22,7 +22,7 @@ import { Message, proto3, protoInt64 } from "@bufbuild/protobuf"; import { ConnectCallback, ConnectRequest, ConnectResponse, DisconnectCallback, DisconnectRequest, DisconnectResponse, GetSessionStatsCallback, GetSessionStatsRequest, GetSessionStatsResponse, PublishDataCallback, PublishDataRequest, PublishDataResponse, PublishSipDtmfCallback, PublishSipDtmfRequest, PublishSipDtmfResponse, PublishTrackCallback, PublishTrackRequest, PublishTrackResponse, PublishTranscriptionCallback, PublishTranscriptionRequest, PublishTranscriptionResponse, RoomEvent, SetLocalAttributesCallback, SetLocalAttributesRequest, SetLocalAttributesResponse, SetLocalMetadataCallback, SetLocalMetadataRequest, SetLocalMetadataResponse, SetLocalNameCallback, SetLocalNameRequest, SetLocalNameResponse, SetSubscribedRequest, SetSubscribedResponse, UnpublishTrackCallback, UnpublishTrackRequest, UnpublishTrackResponse } from "./room_pb.js"; import { CreateAudioTrackRequest, CreateAudioTrackResponse, CreateVideoTrackRequest, CreateVideoTrackResponse, EnableRemoteTrackRequest, EnableRemoteTrackResponse, GetStatsCallback, GetStatsRequest, GetStatsResponse, LocalTrackMuteRequest, LocalTrackMuteResponse, TrackEvent } from "./track_pb.js"; import { CaptureVideoFrameRequest, CaptureVideoFrameResponse, NewVideoSourceRequest, NewVideoSourceResponse, NewVideoStreamRequest, NewVideoStreamResponse, VideoConvertRequest, VideoConvertResponse, VideoStreamEvent, VideoStreamFromParticipantRequest, VideoStreamFromParticipantResponse } from "./video_frame_pb.js"; -import { AudioStreamEvent, AudioStreamFromParticipantRequest, AudioStreamFromParticipantResponse, CaptureAudioFrameCallback, CaptureAudioFrameRequest, CaptureAudioFrameResponse, NewAudioResamplerRequest, NewAudioResamplerResponse, NewAudioSourceRequest, NewAudioSourceResponse, NewAudioStreamRequest, NewAudioStreamResponse, RemixAndResampleRequest, RemixAndResampleResponse } from "./audio_frame_pb.js"; +import { AudioStreamEvent, AudioStreamFromParticipantRequest, AudioStreamFromParticipantResponse, CaptureAudioFrameCallback, CaptureAudioFrameRequest, CaptureAudioFrameResponse, ClearAudioBufferRequest, ClearAudioBufferResponse, NewAudioResamplerRequest, NewAudioResamplerResponse, NewAudioSourceRequest, NewAudioSourceResponse, NewAudioStreamRequest, NewAudioStreamResponse, RemixAndResampleRequest, RemixAndResampleResponse } from "./audio_frame_pb.js"; import { E2eeRequest, E2eeResponse } from "./e2ee_pb.js"; /** @@ -239,25 +239,31 @@ export class FfiRequest extends Message { case: "captureAudioFrame"; } | { /** - * @generated from field: livekit.proto.NewAudioResamplerRequest new_audio_resampler = 28; + * @generated from field: livekit.proto.ClearAudioBufferRequest clear_audio_buffer = 28; + */ + value: ClearAudioBufferRequest; + case: "clearAudioBuffer"; + } | { + /** + * @generated from field: livekit.proto.NewAudioResamplerRequest new_audio_resampler = 29; */ value: NewAudioResamplerRequest; case: "newAudioResampler"; } | { /** - * @generated from field: livekit.proto.RemixAndResampleRequest remix_and_resample = 29; + * @generated from field: livekit.proto.RemixAndResampleRequest remix_and_resample = 30; */ value: RemixAndResampleRequest; case: "remixAndResample"; } | { /** - * @generated from field: livekit.proto.E2eeRequest e2ee = 30; + * @generated from field: livekit.proto.E2eeRequest e2ee = 31; */ value: E2eeRequest; case: "e2ee"; } | { /** - * @generated from field: livekit.proto.AudioStreamFromParticipantRequest audio_stream_from_participant = 31; + * @generated from field: livekit.proto.AudioStreamFromParticipantRequest audio_stream_from_participant = 32; */ value: AudioStreamFromParticipantRequest; case: "audioStreamFromParticipant"; @@ -297,10 +303,11 @@ export class FfiRequest extends Message { { no: 25, name: "new_audio_stream", kind: "message", T: NewAudioStreamRequest, oneof: "message" }, { no: 26, name: "new_audio_source", kind: "message", T: NewAudioSourceRequest, oneof: "message" }, { no: 27, name: "capture_audio_frame", kind: "message", T: CaptureAudioFrameRequest, oneof: "message" }, - { no: 28, name: "new_audio_resampler", kind: "message", T: NewAudioResamplerRequest, oneof: "message" }, - { no: 29, name: "remix_and_resample", kind: "message", T: RemixAndResampleRequest, oneof: "message" }, - { no: 30, name: "e2ee", kind: "message", T: E2eeRequest, oneof: "message" }, - { no: 31, name: "audio_stream_from_participant", kind: "message", T: AudioStreamFromParticipantRequest, oneof: "message" }, + { no: 28, name: "clear_audio_buffer", kind: "message", T: ClearAudioBufferRequest, oneof: "message" }, + { no: 29, name: "new_audio_resampler", kind: "message", T: NewAudioResamplerRequest, oneof: "message" }, + { no: 30, name: "remix_and_resample", kind: "message", T: RemixAndResampleRequest, oneof: "message" }, + { no: 31, name: "e2ee", kind: "message", T: E2eeRequest, oneof: "message" }, + { no: 32, name: "audio_stream_from_participant", kind: "message", T: AudioStreamFromParticipantRequest, oneof: "message" }, ]); static fromBinary(bytes: Uint8Array, options?: Partial): FfiRequest { @@ -495,25 +502,31 @@ export class FfiResponse extends Message { case: "captureAudioFrame"; } | { /** - * @generated from field: livekit.proto.NewAudioResamplerResponse new_audio_resampler = 28; + * @generated from field: livekit.proto.ClearAudioBufferResponse clear_audio_buffer = 28; + */ + value: ClearAudioBufferResponse; + case: "clearAudioBuffer"; + } | { + /** + * @generated from field: livekit.proto.NewAudioResamplerResponse new_audio_resampler = 29; */ value: NewAudioResamplerResponse; case: "newAudioResampler"; } | { /** - * @generated from field: livekit.proto.RemixAndResampleResponse remix_and_resample = 29; + * @generated from field: livekit.proto.RemixAndResampleResponse remix_and_resample = 30; */ value: RemixAndResampleResponse; case: "remixAndResample"; } | { /** - * @generated from field: livekit.proto.AudioStreamFromParticipantResponse audio_stream_from_participant = 30; + * @generated from field: livekit.proto.AudioStreamFromParticipantResponse audio_stream_from_participant = 31; */ value: AudioStreamFromParticipantResponse; case: "audioStreamFromParticipant"; } | { /** - * @generated from field: livekit.proto.E2eeResponse e2ee = 31; + * @generated from field: livekit.proto.E2eeResponse e2ee = 32; */ value: E2eeResponse; case: "e2ee"; @@ -553,10 +566,11 @@ export class FfiResponse extends Message { { no: 25, name: "new_audio_stream", kind: "message", T: NewAudioStreamResponse, oneof: "message" }, { no: 26, name: "new_audio_source", kind: "message", T: NewAudioSourceResponse, oneof: "message" }, { no: 27, name: "capture_audio_frame", kind: "message", T: CaptureAudioFrameResponse, oneof: "message" }, - { no: 28, name: "new_audio_resampler", kind: "message", T: NewAudioResamplerResponse, oneof: "message" }, - { no: 29, name: "remix_and_resample", kind: "message", T: RemixAndResampleResponse, oneof: "message" }, - { no: 30, name: "audio_stream_from_participant", kind: "message", T: AudioStreamFromParticipantResponse, oneof: "message" }, - { no: 31, name: "e2ee", kind: "message", T: E2eeResponse, oneof: "message" }, + { no: 28, name: "clear_audio_buffer", kind: "message", T: ClearAudioBufferResponse, oneof: "message" }, + { no: 29, name: "new_audio_resampler", kind: "message", T: NewAudioResamplerResponse, oneof: "message" }, + { no: 30, name: "remix_and_resample", kind: "message", T: RemixAndResampleResponse, oneof: "message" }, + { no: 31, name: "audio_stream_from_participant", kind: "message", T: AudioStreamFromParticipantResponse, oneof: "message" }, + { no: 32, name: "e2ee", kind: "message", T: E2eeResponse, oneof: "message" }, ]); static fromBinary(bytes: Uint8Array, options?: Partial): FfiResponse { diff --git a/packages/livekit-rtc/src/utils.ts b/packages/livekit-rtc/src/utils.ts new file mode 100644 index 00000000..c6167d66 --- /dev/null +++ b/packages/livekit-rtc/src/utils.ts @@ -0,0 +1,33 @@ +// SPDX-FileCopyrightText: 2024 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 + +import EventEmitter, { once } from "events"; + +/** @internal */ +export class Queue { + #items: T[] = []; + #limit?: number; + #events = new EventEmitter(); + + constructor(limit?: number) { + this.#limit = limit; + } + + async get(): Promise { + if (this.#items.length === 0) { + await once(this.#events, 'put'); + } + const item = this.#items.shift()!; + this.#events.emit('get'); + return item; + } + + async put(item: T) { + if (this.#limit && this.#items.length >= this.#limit) { + await once(this.#events, 'get'); + } + this.#items.push(item); + this.#events.emit('put'); + } +} From cc6347836df2abf4c9d685b142ca557b9c26e672 Mon Sep 17 00:00:00 2001 From: aoife cassidy Date: Wed, 18 Sep 2024 14:52:40 -0700 Subject: [PATCH 04/11] refresh timeout if not done --- packages/livekit-rtc/src/audio_source.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/packages/livekit-rtc/src/audio_source.ts b/packages/livekit-rtc/src/audio_source.ts index 7d0024f9..089f9e47 100644 --- a/packages/livekit-rtc/src/audio_source.ts +++ b/packages/livekit-rtc/src/audio_source.ts @@ -30,6 +30,8 @@ export class AudioSource { currentQueueSize: number; /** @internal */ releaseQueue = new Queue(); + /** @internal */ + timeout?: ReturnType = undefined; sampleRate: number; numChannels: number; @@ -96,6 +98,9 @@ export class AudioSource { this.currentQueueSize -= 0.05 this.lastCapture = now + if (this.timeout) { + clearTimeout(this.timeout); + } setTimeout(this.releaseQueue.put, this.currentQueueSize) const req = new CaptureAudioFrameRequest({ From 3120dcb27dc32d7ae05d3bbde52f4c742f37b06d Mon Sep 17 00:00:00 2001 From: aoife cassidy Date: Thu, 19 Sep 2024 09:10:50 +0300 Subject: [PATCH 05/11] fix queue --- packages/livekit-rtc/src/audio_source.ts | 20 ++++++------- packages/livekit-rtc/src/napi/native.d.ts | 13 ++++----- packages/livekit-rtc/src/utils.ts | 35 +++++++++++------------ 3 files changed, 32 insertions(+), 36 deletions(-) diff --git a/packages/livekit-rtc/src/audio_source.ts b/packages/livekit-rtc/src/audio_source.ts index 089f9e47..761c32c4 100644 --- a/packages/livekit-rtc/src/audio_source.ts +++ b/packages/livekit-rtc/src/audio_source.ts @@ -8,9 +8,9 @@ import type { AudioSourceInfo, CaptureAudioFrameCallback, CaptureAudioFrameResponse, + ClearAudioBufferResponse, NewAudioSourceResponse, - - ClearAudioBufferResponse} from './proto/audio_frame_pb.js'; +} from './proto/audio_frame_pb.js'; import { AudioSourceType, CaptureAudioFrameRequest, @@ -84,25 +84,25 @@ export class AudioSource { async waitForPlayout() { await this.releaseQueue.get().then(() => { - this.lastCapture = 0 + this.lastCapture = 0; this.currentQueueSize = 0; - }) + }); } async captureFrame(frame: AudioFrame) { - const now = Date.now() + const now = Date.now(); const elapsed = this.lastCapture === 0 ? 0 : now - this.lastCapture; - this.currentQueueSize += frame.samplesPerChannel / frame.sampleRate - elapsed + this.currentQueueSize += (frame.samplesPerChannel / frame.sampleRate - elapsed) * 1000; // remove 50ms to account for processing time (e.g. using wait_for_playout for very small chunks) - this.currentQueueSize -= 0.05 - this.lastCapture = now + this.currentQueueSize -= 50; + this.lastCapture = now; if (this.timeout) { clearTimeout(this.timeout); } - setTimeout(this.releaseQueue.put, this.currentQueueSize) - + setTimeout(this.releaseQueue.put, this.currentQueueSize); + const req = new CaptureAudioFrameRequest({ sourceHandle: this.ffiHandle.handle, buffer: frame.protoInfo(), diff --git a/packages/livekit-rtc/src/napi/native.d.ts b/packages/livekit-rtc/src/napi/native.d.ts index f3ba192f..0355e240 100644 --- a/packages/livekit-rtc/src/napi/native.d.ts +++ b/packages/livekit-rtc/src/napi/native.d.ts @@ -3,14 +3,11 @@ /* auto-generated by NAPI-RS */ -export declare function livekitInitialize( - callback: (data: Uint8Array) => void, - captureLogs: boolean, -): void; -export declare function livekitFfiRequest(data: Uint8Array): Uint8Array; -export declare function livekitRetrievePtr(handle: Uint8Array): bigint; -export declare function livekitCopyBuffer(ptr: bigint, len: number): Uint8Array; -export declare function livekitDispose(): Promise; +export function livekitInitialize(callback: (data: Uint8Array) => void, captureLogs: boolean): void; +export function livekitFfiRequest(data: Uint8Array): Uint8Array; +export function livekitRetrievePtr(handle: Uint8Array): bigint; +export function livekitCopyBuffer(ptr: bigint, len: number): Uint8Array; +export function livekitDispose(): Promise; export declare class FfiHandle { constructor(handle: bigint); dispose(): void; diff --git a/packages/livekit-rtc/src/utils.ts b/packages/livekit-rtc/src/utils.ts index c6167d66..aa97e63f 100644 --- a/packages/livekit-rtc/src/utils.ts +++ b/packages/livekit-rtc/src/utils.ts @@ -1,33 +1,32 @@ // SPDX-FileCopyrightText: 2024 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 - -import EventEmitter, { once } from "events"; +import EventEmitter, { once } from 'events'; /** @internal */ export class Queue { - #items: T[] = []; - #limit?: number; - #events = new EventEmitter(); + items: T[] = []; + limit?: number; + events = new EventEmitter(); constructor(limit?: number) { - this.#limit = limit; + this.limit = limit; } - async get(): Promise { - if (this.#items.length === 0) { - await once(this.#events, 'put'); + get = async (): Promise => { + if (this.items.length === 0) { + await once(this.events, 'put'); } - const item = this.#items.shift()!; - this.#events.emit('get'); + const item = this.items.shift()!; + this.events.emit('get'); return item; - } + }; - async put(item: T) { - if (this.#limit && this.#items.length >= this.#limit) { - await once(this.#events, 'get'); + put = async (item: T) => { + if (this.limit && this.items.length >= this.limit) { + await once(this.events, 'get'); } - this.#items.push(item); - this.#events.emit('put'); - } + this.items.push(item); + this.events.emit('put'); + }; } From 23511a9b6b91cb7b4146ab0164e3cb326650e185 Mon Sep 17 00:00:00 2001 From: aoife cassidy Date: Wed, 18 Sep 2024 23:57:19 -0700 Subject: [PATCH 06/11] use process.hrtime.bigint --- packages/livekit-rtc/src/audio_source.ts | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/packages/livekit-rtc/src/audio_source.ts b/packages/livekit-rtc/src/audio_source.ts index 761c32c4..c298ce10 100644 --- a/packages/livekit-rtc/src/audio_source.ts +++ b/packages/livekit-rtc/src/audio_source.ts @@ -64,7 +64,10 @@ export class AudioSource { } get queuedDuration(): number { - return Math.max(this.currentQueueSize - Date.now() + this.lastCapture, 0); + return Math.max( + this.currentQueueSize - Number(process.hrtime.bigint() / 1000000n) + this.lastCapture, + 0, + ); } clearQueue() { @@ -90,7 +93,7 @@ export class AudioSource { } async captureFrame(frame: AudioFrame) { - const now = Date.now(); + const now = Number(process.hrtime.bigint() / 1000000n); const elapsed = this.lastCapture === 0 ? 0 : now - this.lastCapture; this.currentQueueSize += (frame.samplesPerChannel / frame.sampleRate - elapsed) * 1000; From 2b24e1439b343dd3c7759f5550c3bdb3763f7611 Mon Sep 17 00:00:00 2001 From: aoife cassidy Date: Thu, 19 Sep 2024 10:50:33 -0700 Subject: [PATCH 07/11] { -> @livekit/}typed-emitter --- packages/livekit-rtc/package.json | 2 +- packages/livekit-rtc/src/audio_stream.ts | 2 +- packages/livekit-rtc/src/ffi_client.ts | 2 +- packages/livekit-rtc/src/room.ts | 2 +- packages/livekit-rtc/src/video_stream.ts | 2 +- pnpm-lock.yaml | 26 ++++++++---------------- 6 files changed, 13 insertions(+), 23 deletions(-) diff --git a/packages/livekit-rtc/package.json b/packages/livekit-rtc/package.json index 6bf684ec..dd0cc4ee 100644 --- a/packages/livekit-rtc/package.json +++ b/packages/livekit-rtc/package.json @@ -34,7 +34,7 @@ }, "dependencies": { "@bufbuild/protobuf": "^1.4.2", - "typed-emitter": "^2.1.0" + "@livekit/typed-emitter": "^3.0.0" }, "devDependencies": { "@napi-rs/cli": "^2.18.0", diff --git a/packages/livekit-rtc/src/audio_stream.ts b/packages/livekit-rtc/src/audio_stream.ts index 7c44bbc4..8399e0c9 100644 --- a/packages/livekit-rtc/src/audio_stream.ts +++ b/packages/livekit-rtc/src/audio_stream.ts @@ -1,8 +1,8 @@ // SPDX-FileCopyrightText: 2024 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 +import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter'; import EventEmitter from 'events'; -import type TypedEmitter from 'typed-emitter'; import { AudioFrame } from './audio_frame.js'; import type { FfiEvent } from './ffi_client.js'; import { FfiClient, FfiClientEvent, FfiHandle } from './ffi_client.js'; diff --git a/packages/livekit-rtc/src/ffi_client.ts b/packages/livekit-rtc/src/ffi_client.ts index 577d99f1..148c58b2 100644 --- a/packages/livekit-rtc/src/ffi_client.ts +++ b/packages/livekit-rtc/src/ffi_client.ts @@ -2,8 +2,8 @@ // // SPDX-License-Identifier: Apache-2.0 import type { PartialMessage } from '@bufbuild/protobuf'; +import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter'; import EventEmitter from 'events'; -import type TypedEmitter from 'typed-emitter'; import { FfiHandle, livekitCopyBuffer, diff --git a/packages/livekit-rtc/src/room.ts b/packages/livekit-rtc/src/room.ts index 2c50981e..fa2ae9f2 100644 --- a/packages/livekit-rtc/src/room.ts +++ b/packages/livekit-rtc/src/room.ts @@ -1,8 +1,8 @@ // SPDX-FileCopyrightText: 2024 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 +import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter'; import EventEmitter from 'events'; -import type TypedEmitter from 'typed-emitter'; import type { E2EEOptions } from './e2ee.js'; import { E2EEManager } from './e2ee.js'; import { FfiClient, FfiClientEvent, FfiHandle } from './ffi_client.js'; diff --git a/packages/livekit-rtc/src/video_stream.ts b/packages/livekit-rtc/src/video_stream.ts index d6a9ed45..45799cef 100644 --- a/packages/livekit-rtc/src/video_stream.ts +++ b/packages/livekit-rtc/src/video_stream.ts @@ -1,8 +1,8 @@ // SPDX-FileCopyrightText: 2024 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 +import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter'; import EventEmitter from 'events'; -import type TypedEmitter from 'typed-emitter'; import type { FfiEvent } from './ffi_client.js'; import { FfiClient, FfiClientEvent, FfiHandle } from './ffi_client.js'; import type { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 04faab37..924b5393 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -149,9 +149,9 @@ importers: '@bufbuild/protobuf': specifier: ^1.4.2 version: 1.10.0 - typed-emitter: - specifier: ^2.1.0 - version: 2.1.0 + '@livekit/typed-emitter': + specifier: ^3.0.0 + version: 3.0.0 optionalDependencies: '@livekit/rtc-node-darwin-arm64': specifier: workspace:* @@ -722,6 +722,9 @@ packages: '@livekit/protocol@1.20.0': resolution: {integrity: sha512-2RJQwzBa+MfUoy0zBWuyj8S2MTBxeTgREeG0r/1bNmkAFiBhsdgr87gIvblyqJxffUxJpALMu1Ee0M1XHX+9Ug==} + '@livekit/typed-emitter@3.0.0': + resolution: {integrity: sha512-9bl0k4MgBPZu3Qu3R3xy12rmbW17e3bE9yf4YY85gJIQ3ezLEj/uzpKHWBsLaDoL5Mozz8QCgggwIBudYQWeQg==} + '@manypkg/find-root@1.1.0': resolution: {integrity: sha512-mki5uBvhHzO8kYYix/WRy2WX8S3B5wdVSc9D6KcU5lQNglP2yt58/VfLuAK49glRXChosY8ap2oJ1qgma3GUVA==} @@ -2525,9 +2528,6 @@ packages: run-parallel@1.2.0: resolution: {integrity: sha512-5l4VyZR86LZ/lDxZTR6jqL8AFE2S0IFLMP26AbjsLVADxHdhB/c0GUsH+y39UfCi3dzz8OlQuPmnaJOMoDHQBA==} - rxjs@7.8.1: - resolution: {integrity: sha512-AA3TVj+0A2iuIoQkWEK/tqFjBq2j+6PO6Y0zJcvzLAFhEFIO3HL0vls9hWLncZbAAbK0mar7oZ4V079I/qPMxg==} - safe-array-concat@1.1.2: resolution: {integrity: sha512-vj6RsCsWBCf19jIeHEfkRMw8DPiBb+DMXklQ/1SGDHOMlHdPUkZXFQ2YdplS23zESTijAcurb1aSgJA3AgMu1Q==} engines: {node: '>=0.4'} @@ -2838,9 +2838,6 @@ packages: resolution: {integrity: sha512-/OxDN6OtAk5KBpGb28T+HZc2M+ADtvRxXrKKbUwtsLgdoxgX13hyy7ek6bFRl5+aBs2yZzB0c4CnQfAtVypW/g==} engines: {node: '>= 0.4'} - typed-emitter@2.1.0: - resolution: {integrity: sha512-g/KzbYKbH5C2vPkaXGu8DJlHrGKHLsM25Zg9WuC9pMGfuvT+X25tZQWo5fK1BjBm8+UrVE9LDCvaY0CQk+fXDA==} - typedoc@0.26.6: resolution: {integrity: sha512-SfEU3SH3wHNaxhFPjaZE2kNl/NFtLNW5c1oHsg7mti7GjmUj1Roq6osBQeMd+F4kL0BoRBBr8gQAuqBlfFu8LA==} engines: {node: '>= 18'} @@ -3496,6 +3493,8 @@ snapshots: dependencies: '@bufbuild/protobuf': 1.10.0 + '@livekit/typed-emitter@3.0.0': {} + '@manypkg/find-root@1.1.0': dependencies: '@babel/runtime': 7.24.7 @@ -5568,11 +5567,6 @@ snapshots: dependencies: queue-microtask: 1.2.3 - rxjs@7.8.1: - dependencies: - tslib: 2.7.0 - optional: true - safe-array-concat@1.1.2: dependencies: call-bind: 1.0.7 @@ -5880,10 +5874,6 @@ snapshots: is-typed-array: 1.1.13 possible-typed-array-names: 1.0.0 - typed-emitter@2.1.0: - optionalDependencies: - rxjs: 7.8.1 - typedoc@0.26.6(typescript@5.5.4): dependencies: lunr: 2.3.9 From 7f01628330d0edf0c3dcfd40570a56649d73ae6e Mon Sep 17 00:00:00 2001 From: aoife cassidy Date: Thu, 19 Sep 2024 10:55:47 -0700 Subject: [PATCH 08/11] set the timeout --- packages/livekit-rtc/src/audio_source.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/livekit-rtc/src/audio_source.ts b/packages/livekit-rtc/src/audio_source.ts index c298ce10..e66481d5 100644 --- a/packages/livekit-rtc/src/audio_source.ts +++ b/packages/livekit-rtc/src/audio_source.ts @@ -104,7 +104,7 @@ export class AudioSource { if (this.timeout) { clearTimeout(this.timeout); } - setTimeout(this.releaseQueue.put, this.currentQueueSize); + this.timeout = setTimeout(this.releaseQueue.put, this.currentQueueSize); const req = new CaptureAudioFrameRequest({ sourceHandle: this.ffiHandle.handle, From 39d29a2b493d0cf60c2661a308eee4ca0b3e48ab Mon Sep 17 00:00:00 2001 From: aoife cassidy Date: Thu, 19 Sep 2024 11:54:08 -0700 Subject: [PATCH 09/11] fix 50ms and update examples --- examples/publish-wav/index.ts | 6 +++--- packages/livekit-rtc/src/audio_source.ts | 7 ++++--- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/examples/publish-wav/index.ts b/examples/publish-wav/index.ts index 32af721a..3db75df4 100644 --- a/examples/publish-wav/index.ts +++ b/examples/publish-wav/index.ts @@ -44,12 +44,11 @@ const track = LocalAudioTrack.createAudioTrack('audio', source); const options = new TrackPublishOptions(); const buffer = new Int16Array(sample.buffer); options.source = TrackSource.SOURCE_MICROPHONE; -await room.localParticipant.publishTrack(track, options); -await new Promise((resolve) => setTimeout(resolve, 1000)); // wait a bit so the start doesn't cut off +await room.localParticipant.publishTrack(track, options).then((pub) => pub.waitForSubscription()); let written = 44; // start of WAVE data stream const FRAME_DURATION = 1; // write 1s of audio at a time -const numSamples = sampleRate / FRAME_DURATION; +const numSamples = sampleRate * FRAME_DURATION; while (written < dataSize) { const available = dataSize - written; const frameSize = Math.min(numSamples, available); @@ -61,6 +60,7 @@ while (written < dataSize) { Math.trunc(frameSize / channels), ); await source.captureFrame(frame); + await source.waitForPlayout(); written += frameSize; } diff --git a/packages/livekit-rtc/src/audio_source.ts b/packages/livekit-rtc/src/audio_source.ts index e66481d5..33224f22 100644 --- a/packages/livekit-rtc/src/audio_source.ts +++ b/packages/livekit-rtc/src/audio_source.ts @@ -97,14 +97,15 @@ export class AudioSource { const elapsed = this.lastCapture === 0 ? 0 : now - this.lastCapture; this.currentQueueSize += (frame.samplesPerChannel / frame.sampleRate - elapsed) * 1000; - // remove 50ms to account for processing time (e.g. using wait_for_playout for very small chunks) - this.currentQueueSize -= 50; this.lastCapture = now; if (this.timeout) { clearTimeout(this.timeout); } - this.timeout = setTimeout(this.releaseQueue.put, this.currentQueueSize); + + // remove 50ms to account for processing time + // (e.g. using wait_for_playout for very small chunks) + this.timeout = setTimeout(this.releaseQueue.put, this.currentQueueSize - 50); const req = new CaptureAudioFrameRequest({ sourceHandle: this.ffiHandle.handle, From 66ec836e050468f4fd26cacae5e35e07dcd06e25 Mon Sep 17 00:00:00 2001 From: aoife cassidy Date: Thu, 19 Sep 2024 12:22:22 -0700 Subject: [PATCH 10/11] wip --- packages/livekit-rtc/src/audio_source.ts | 16 +++++++----- packages/livekit-rtc/src/utils.ts | 32 ------------------------ 2 files changed, 10 insertions(+), 38 deletions(-) delete mode 100644 packages/livekit-rtc/src/utils.ts diff --git a/packages/livekit-rtc/src/audio_source.ts b/packages/livekit-rtc/src/audio_source.ts index 33224f22..2171c56e 100644 --- a/packages/livekit-rtc/src/audio_source.ts +++ b/packages/livekit-rtc/src/audio_source.ts @@ -17,7 +17,6 @@ import { ClearAudioBufferRequest, NewAudioSourceRequest, } from './proto/audio_frame_pb.js'; -import { Queue } from './utils.js'; export class AudioSource { /** @internal */ @@ -29,7 +28,8 @@ export class AudioSource { /** @internal */ currentQueueSize: number; /** @internal */ - releaseQueue = new Queue(); + release = () => {}; + waitForPlayout = this.newPromise(); /** @internal */ timeout?: ReturnType = undefined; @@ -82,13 +82,17 @@ export class AudioSource { }, }); - this.releaseQueue.put(); + this.release(); } - async waitForPlayout() { - await this.releaseQueue.get().then(() => { + /** @internal */ + async newPromise() { + return new Promise((resolve) => { + this.release = resolve; + }).then(() => { this.lastCapture = 0; this.currentQueueSize = 0; + this.waitForPlayout = this.newPromise(); }); } @@ -105,7 +109,7 @@ export class AudioSource { // remove 50ms to account for processing time // (e.g. using wait_for_playout for very small chunks) - this.timeout = setTimeout(this.releaseQueue.put, this.currentQueueSize - 50); + this.timeout = setTimeout(this.release, this.currentQueueSize - 50); const req = new CaptureAudioFrameRequest({ sourceHandle: this.ffiHandle.handle, diff --git a/packages/livekit-rtc/src/utils.ts b/packages/livekit-rtc/src/utils.ts deleted file mode 100644 index aa97e63f..00000000 --- a/packages/livekit-rtc/src/utils.ts +++ /dev/null @@ -1,32 +0,0 @@ -// SPDX-FileCopyrightText: 2024 LiveKit, Inc. -// -// SPDX-License-Identifier: Apache-2.0 -import EventEmitter, { once } from 'events'; - -/** @internal */ -export class Queue { - items: T[] = []; - limit?: number; - events = new EventEmitter(); - - constructor(limit?: number) { - this.limit = limit; - } - - get = async (): Promise => { - if (this.items.length === 0) { - await once(this.events, 'put'); - } - const item = this.items.shift()!; - this.events.emit('get'); - return item; - }; - - put = async (item: T) => { - if (this.limit && this.items.length >= this.limit) { - await once(this.events, 'get'); - } - this.items.push(item); - this.events.emit('put'); - }; -} From b66e488bd6e870b4d58168259d805a637e0f4def Mon Sep 17 00:00:00 2001 From: aoife cassidy Date: Thu, 19 Sep 2024 12:47:07 -0700 Subject: [PATCH 11/11] use normal promises --- examples/publish-wav/index.ts | 3 +-- packages/livekit-rtc/src/audio_source.ts | 10 +++++++--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/examples/publish-wav/index.ts b/examples/publish-wav/index.ts index 3db75df4..56722969 100644 --- a/examples/publish-wav/index.ts +++ b/examples/publish-wav/index.ts @@ -60,10 +60,9 @@ while (written < dataSize) { Math.trunc(frameSize / channels), ); await source.captureFrame(frame); - await source.waitForPlayout(); - written += frameSize; } +await source.waitForPlayout(); await room.disconnect(); await dispose(); diff --git a/packages/livekit-rtc/src/audio_source.ts b/packages/livekit-rtc/src/audio_source.ts index 2171c56e..5d620ab5 100644 --- a/packages/livekit-rtc/src/audio_source.ts +++ b/packages/livekit-rtc/src/audio_source.ts @@ -29,7 +29,7 @@ export class AudioSource { currentQueueSize: number; /** @internal */ release = () => {}; - waitForPlayout = this.newPromise(); + promise = this.newPromise(); /** @internal */ timeout?: ReturnType = undefined; @@ -89,10 +89,14 @@ export class AudioSource { async newPromise() { return new Promise((resolve) => { this.release = resolve; - }).then(() => { + }); + } + + async waitForPlayout() { + return this.promise.then(() => { this.lastCapture = 0; this.currentQueueSize = 0; - this.waitForPlayout = this.newPromise(); + this.promise = this.newPromise(); }); }