Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rtc: bump ffi to v0.10.2 #271

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/strong-dodos-smash.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@livekit/rtc-node": patch
---

bump ffi to v0.10.2
59 changes: 57 additions & 2 deletions packages/livekit-rtc/src/audio_source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,48 @@ import type {
AudioSourceInfo,
CaptureAudioFrameCallback,
CaptureAudioFrameResponse,
ClearAudioBufferResponse,
NewAudioSourceResponse,
} from './proto/audio_frame_pb.js';
import {
AudioSourceType,
CaptureAudioFrameRequest,
ClearAudioBufferRequest,
NewAudioSourceRequest,
} from './proto/audio_frame_pb.js';
import { Queue } from './utils.js';

export class AudioSource {
/** @internal */
info: AudioSourceInfo;
/** @internal */
ffiHandle: FfiHandle;
/** @internal */
lastCapture: number;
/** @internal */
currentQueueSize: number;
/** @internal */
releaseQueue = new Queue<void>();
/** @internal */
timeout?: ReturnType<typeof setTimeout> = undefined;

sampleRate: number;
numChannels: number;
queueSize: number;

constructor(sampleRate: number, numChannels: number, enableQueue?: boolean) {
constructor(sampleRate: number, numChannels: number, queueSize = 1000) {
this.sampleRate = sampleRate;
this.numChannels = numChannels;
this.queueSize = queueSize;

this.lastCapture = 0;
this.currentQueueSize = 0;

const req = new NewAudioSourceRequest({
type: AudioSourceType.AUDIO_SOURCE_NATIVE,
sampleRate: sampleRate,
numChannels: numChannels,
enableQueue: enableQueue,
queueSizeMs: queueSize,
});

const res = FfiClient.instance.request<NewAudioSourceResponse>({
Expand All @@ -47,7 +63,46 @@ export class AudioSource {
this.ffiHandle = new FfiHandle(res.source.handle.id);
}

get queuedDuration(): number {
return Math.max(this.currentQueueSize - Date.now() + this.lastCapture, 0);
nbsp marked this conversation as resolved.
Show resolved Hide resolved
}

clearQueue() {
const req = new ClearAudioBufferRequest({
sourceHandle: this.ffiHandle.handle,
});

FfiClient.instance.request<ClearAudioBufferResponse>({
message: {
case: 'clearAudioBuffer',
value: req,
},
});

this.releaseQueue.put();
nbsp marked this conversation as resolved.
Show resolved Hide resolved
}

async waitForPlayout() {
await this.releaseQueue.get().then(() => {
this.lastCapture = 0;
this.currentQueueSize = 0;
});
}

async captureFrame(frame: AudioFrame) {
const now = Date.now();
const elapsed = this.lastCapture === 0 ? 0 : now - this.lastCapture;
this.currentQueueSize += (frame.samplesPerChannel / frame.sampleRate - elapsed) * 1000;

// remove 50ms to account for processing time (e.g. using wait_for_playout for very small chunks)
this.currentQueueSize -= 50;
this.lastCapture = now;

if (this.timeout) {
nbsp marked this conversation as resolved.
Show resolved Hide resolved
clearTimeout(this.timeout);
}
setTimeout(this.releaseQueue.put, this.currentQueueSize);

const req = new CaptureAudioFrameRequest({
sourceHandle: this.ffiHandle.handle,
buffer: frame.protoInfo(),
Expand Down
13 changes: 5 additions & 8 deletions packages/livekit-rtc/src/napi/native.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,11 @@

/* auto-generated by NAPI-RS */

export declare function livekitInitialize(
callback: (data: Uint8Array) => void,
captureLogs: boolean,
): void;
export declare function livekitFfiRequest(data: Uint8Array): Uint8Array;
export declare function livekitRetrievePtr(handle: Uint8Array): bigint;
export declare function livekitCopyBuffer(ptr: bigint, len: number): Uint8Array;
export declare function livekitDispose(): Promise<void>;
export function livekitInitialize(callback: (data: Uint8Array) => void, captureLogs: boolean): void;
export function livekitFfiRequest(data: Uint8Array): Uint8Array;
export function livekitRetrievePtr(handle: Uint8Array): bigint;
export function livekitCopyBuffer(ptr: bigint, len: number): Uint8Array;
export function livekitDispose(): Promise<void>;
export declare class FfiHandle {
constructor(handle: bigint);
dispose(): void;
Expand Down
74 changes: 71 additions & 3 deletions packages/livekit-rtc/src/proto/audio_frame_pb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,9 @@ export class NewAudioSourceRequest extends Message<NewAudioSourceRequest> {
numChannels = 0;

/**
* @generated from field: optional bool enable_queue = 5;
* @generated from field: uint32 queue_size_ms = 5;
*/
enableQueue?: boolean;
queueSizeMs = 0;

constructor(data?: PartialMessage<NewAudioSourceRequest>) {
super();
Expand All @@ -292,7 +292,7 @@ export class NewAudioSourceRequest extends Message<NewAudioSourceRequest> {
{ no: 2, name: "options", kind: "message", T: AudioSourceOptions, opt: true },
{ no: 3, name: "sample_rate", kind: "scalar", T: 13 /* ScalarType.UINT32 */ },
{ no: 4, name: "num_channels", kind: "scalar", T: 13 /* ScalarType.UINT32 */ },
{ no: 5, name: "enable_queue", kind: "scalar", T: 8 /* ScalarType.BOOL */, opt: true },
{ no: 5, name: "queue_size_ms", kind: "scalar", T: 13 /* ScalarType.UINT32 */ },
]);

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): NewAudioSourceRequest {
Expand Down Expand Up @@ -475,6 +475,74 @@ export class CaptureAudioFrameCallback extends Message<CaptureAudioFrameCallback
}
}

/**
* @generated from message livekit.proto.ClearAudioBufferRequest
*/
export class ClearAudioBufferRequest extends Message<ClearAudioBufferRequest> {
/**
* @generated from field: uint64 source_handle = 1;
*/
sourceHandle = protoInt64.zero;

constructor(data?: PartialMessage<ClearAudioBufferRequest>) {
super();
proto3.util.initPartial(data, this);
}

static readonly runtime: typeof proto3 = proto3;
static readonly typeName = "livekit.proto.ClearAudioBufferRequest";
static readonly fields: FieldList = proto3.util.newFieldList(() => [
{ no: 1, name: "source_handle", kind: "scalar", T: 4 /* ScalarType.UINT64 */ },
]);

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): ClearAudioBufferRequest {
return new ClearAudioBufferRequest().fromBinary(bytes, options);
}

static fromJson(jsonValue: JsonValue, options?: Partial<JsonReadOptions>): ClearAudioBufferRequest {
return new ClearAudioBufferRequest().fromJson(jsonValue, options);
}

static fromJsonString(jsonString: string, options?: Partial<JsonReadOptions>): ClearAudioBufferRequest {
return new ClearAudioBufferRequest().fromJsonString(jsonString, options);
}

static equals(a: ClearAudioBufferRequest | PlainMessage<ClearAudioBufferRequest> | undefined, b: ClearAudioBufferRequest | PlainMessage<ClearAudioBufferRequest> | undefined): boolean {
return proto3.util.equals(ClearAudioBufferRequest, a, b);
}
}

/**
* @generated from message livekit.proto.ClearAudioBufferResponse
*/
export class ClearAudioBufferResponse extends Message<ClearAudioBufferResponse> {
constructor(data?: PartialMessage<ClearAudioBufferResponse>) {
super();
proto3.util.initPartial(data, this);
}

static readonly runtime: typeof proto3 = proto3;
static readonly typeName = "livekit.proto.ClearAudioBufferResponse";
static readonly fields: FieldList = proto3.util.newFieldList(() => [
]);

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): ClearAudioBufferResponse {
return new ClearAudioBufferResponse().fromBinary(bytes, options);
}

static fromJson(jsonValue: JsonValue, options?: Partial<JsonReadOptions>): ClearAudioBufferResponse {
return new ClearAudioBufferResponse().fromJson(jsonValue, options);
}

static fromJsonString(jsonString: string, options?: Partial<JsonReadOptions>): ClearAudioBufferResponse {
return new ClearAudioBufferResponse().fromJsonString(jsonString, options);
}

static equals(a: ClearAudioBufferResponse | PlainMessage<ClearAudioBufferResponse> | undefined, b: ClearAudioBufferResponse | PlainMessage<ClearAudioBufferResponse> | undefined): boolean {
return proto3.util.equals(ClearAudioBufferResponse, a, b);
}
}

/**
* Create a new AudioResampler
*
Expand Down
48 changes: 31 additions & 17 deletions packages/livekit-rtc/src/proto/ffi_pb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import { Message, proto3, protoInt64 } from "@bufbuild/protobuf";
import { ConnectCallback, ConnectRequest, ConnectResponse, DisconnectCallback, DisconnectRequest, DisconnectResponse, GetSessionStatsCallback, GetSessionStatsRequest, GetSessionStatsResponse, PublishDataCallback, PublishDataRequest, PublishDataResponse, PublishSipDtmfCallback, PublishSipDtmfRequest, PublishSipDtmfResponse, PublishTrackCallback, PublishTrackRequest, PublishTrackResponse, PublishTranscriptionCallback, PublishTranscriptionRequest, PublishTranscriptionResponse, RoomEvent, SetLocalAttributesCallback, SetLocalAttributesRequest, SetLocalAttributesResponse, SetLocalMetadataCallback, SetLocalMetadataRequest, SetLocalMetadataResponse, SetLocalNameCallback, SetLocalNameRequest, SetLocalNameResponse, SetSubscribedRequest, SetSubscribedResponse, UnpublishTrackCallback, UnpublishTrackRequest, UnpublishTrackResponse } from "./room_pb.js";
import { CreateAudioTrackRequest, CreateAudioTrackResponse, CreateVideoTrackRequest, CreateVideoTrackResponse, EnableRemoteTrackRequest, EnableRemoteTrackResponse, GetStatsCallback, GetStatsRequest, GetStatsResponse, LocalTrackMuteRequest, LocalTrackMuteResponse, TrackEvent } from "./track_pb.js";
import { CaptureVideoFrameRequest, CaptureVideoFrameResponse, NewVideoSourceRequest, NewVideoSourceResponse, NewVideoStreamRequest, NewVideoStreamResponse, VideoConvertRequest, VideoConvertResponse, VideoStreamEvent, VideoStreamFromParticipantRequest, VideoStreamFromParticipantResponse } from "./video_frame_pb.js";
import { AudioStreamEvent, AudioStreamFromParticipantRequest, AudioStreamFromParticipantResponse, CaptureAudioFrameCallback, CaptureAudioFrameRequest, CaptureAudioFrameResponse, NewAudioResamplerRequest, NewAudioResamplerResponse, NewAudioSourceRequest, NewAudioSourceResponse, NewAudioStreamRequest, NewAudioStreamResponse, RemixAndResampleRequest, RemixAndResampleResponse } from "./audio_frame_pb.js";
import { AudioStreamEvent, AudioStreamFromParticipantRequest, AudioStreamFromParticipantResponse, CaptureAudioFrameCallback, CaptureAudioFrameRequest, CaptureAudioFrameResponse, ClearAudioBufferRequest, ClearAudioBufferResponse, NewAudioResamplerRequest, NewAudioResamplerResponse, NewAudioSourceRequest, NewAudioSourceResponse, NewAudioStreamRequest, NewAudioStreamResponse, RemixAndResampleRequest, RemixAndResampleResponse } from "./audio_frame_pb.js";
import { E2eeRequest, E2eeResponse } from "./e2ee_pb.js";

/**
Expand Down Expand Up @@ -239,25 +239,31 @@ export class FfiRequest extends Message<FfiRequest> {
case: "captureAudioFrame";
} | {
/**
* @generated from field: livekit.proto.NewAudioResamplerRequest new_audio_resampler = 28;
* @generated from field: livekit.proto.ClearAudioBufferRequest clear_audio_buffer = 28;
*/
value: ClearAudioBufferRequest;
case: "clearAudioBuffer";
} | {
/**
* @generated from field: livekit.proto.NewAudioResamplerRequest new_audio_resampler = 29;
*/
value: NewAudioResamplerRequest;
case: "newAudioResampler";
} | {
/**
* @generated from field: livekit.proto.RemixAndResampleRequest remix_and_resample = 29;
* @generated from field: livekit.proto.RemixAndResampleRequest remix_and_resample = 30;
*/
value: RemixAndResampleRequest;
case: "remixAndResample";
} | {
/**
* @generated from field: livekit.proto.E2eeRequest e2ee = 30;
* @generated from field: livekit.proto.E2eeRequest e2ee = 31;
*/
value: E2eeRequest;
case: "e2ee";
} | {
/**
* @generated from field: livekit.proto.AudioStreamFromParticipantRequest audio_stream_from_participant = 31;
* @generated from field: livekit.proto.AudioStreamFromParticipantRequest audio_stream_from_participant = 32;
*/
value: AudioStreamFromParticipantRequest;
case: "audioStreamFromParticipant";
Expand Down Expand Up @@ -297,10 +303,11 @@ export class FfiRequest extends Message<FfiRequest> {
{ no: 25, name: "new_audio_stream", kind: "message", T: NewAudioStreamRequest, oneof: "message" },
{ no: 26, name: "new_audio_source", kind: "message", T: NewAudioSourceRequest, oneof: "message" },
{ no: 27, name: "capture_audio_frame", kind: "message", T: CaptureAudioFrameRequest, oneof: "message" },
{ no: 28, name: "new_audio_resampler", kind: "message", T: NewAudioResamplerRequest, oneof: "message" },
{ no: 29, name: "remix_and_resample", kind: "message", T: RemixAndResampleRequest, oneof: "message" },
{ no: 30, name: "e2ee", kind: "message", T: E2eeRequest, oneof: "message" },
{ no: 31, name: "audio_stream_from_participant", kind: "message", T: AudioStreamFromParticipantRequest, oneof: "message" },
{ no: 28, name: "clear_audio_buffer", kind: "message", T: ClearAudioBufferRequest, oneof: "message" },
{ no: 29, name: "new_audio_resampler", kind: "message", T: NewAudioResamplerRequest, oneof: "message" },
{ no: 30, name: "remix_and_resample", kind: "message", T: RemixAndResampleRequest, oneof: "message" },
{ no: 31, name: "e2ee", kind: "message", T: E2eeRequest, oneof: "message" },
{ no: 32, name: "audio_stream_from_participant", kind: "message", T: AudioStreamFromParticipantRequest, oneof: "message" },
]);

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): FfiRequest {
Expand Down Expand Up @@ -495,25 +502,31 @@ export class FfiResponse extends Message<FfiResponse> {
case: "captureAudioFrame";
} | {
/**
* @generated from field: livekit.proto.NewAudioResamplerResponse new_audio_resampler = 28;
* @generated from field: livekit.proto.ClearAudioBufferResponse clear_audio_buffer = 28;
*/
value: ClearAudioBufferResponse;
case: "clearAudioBuffer";
} | {
/**
* @generated from field: livekit.proto.NewAudioResamplerResponse new_audio_resampler = 29;
*/
value: NewAudioResamplerResponse;
case: "newAudioResampler";
} | {
/**
* @generated from field: livekit.proto.RemixAndResampleResponse remix_and_resample = 29;
* @generated from field: livekit.proto.RemixAndResampleResponse remix_and_resample = 30;
*/
value: RemixAndResampleResponse;
case: "remixAndResample";
} | {
/**
* @generated from field: livekit.proto.AudioStreamFromParticipantResponse audio_stream_from_participant = 30;
* @generated from field: livekit.proto.AudioStreamFromParticipantResponse audio_stream_from_participant = 31;
*/
value: AudioStreamFromParticipantResponse;
case: "audioStreamFromParticipant";
} | {
/**
* @generated from field: livekit.proto.E2eeResponse e2ee = 31;
* @generated from field: livekit.proto.E2eeResponse e2ee = 32;
*/
value: E2eeResponse;
case: "e2ee";
Expand Down Expand Up @@ -553,10 +566,11 @@ export class FfiResponse extends Message<FfiResponse> {
{ no: 25, name: "new_audio_stream", kind: "message", T: NewAudioStreamResponse, oneof: "message" },
{ no: 26, name: "new_audio_source", kind: "message", T: NewAudioSourceResponse, oneof: "message" },
{ no: 27, name: "capture_audio_frame", kind: "message", T: CaptureAudioFrameResponse, oneof: "message" },
{ no: 28, name: "new_audio_resampler", kind: "message", T: NewAudioResamplerResponse, oneof: "message" },
{ no: 29, name: "remix_and_resample", kind: "message", T: RemixAndResampleResponse, oneof: "message" },
{ no: 30, name: "audio_stream_from_participant", kind: "message", T: AudioStreamFromParticipantResponse, oneof: "message" },
{ no: 31, name: "e2ee", kind: "message", T: E2eeResponse, oneof: "message" },
{ no: 28, name: "clear_audio_buffer", kind: "message", T: ClearAudioBufferResponse, oneof: "message" },
{ no: 29, name: "new_audio_resampler", kind: "message", T: NewAudioResamplerResponse, oneof: "message" },
{ no: 30, name: "remix_and_resample", kind: "message", T: RemixAndResampleResponse, oneof: "message" },
{ no: 31, name: "audio_stream_from_participant", kind: "message", T: AudioStreamFromParticipantResponse, oneof: "message" },
{ no: 32, name: "e2ee", kind: "message", T: E2eeResponse, oneof: "message" },
]);

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): FfiResponse {
Expand Down
32 changes: 32 additions & 0 deletions packages/livekit-rtc/src/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// SPDX-FileCopyrightText: 2024 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import EventEmitter, { once } from 'events';

/** @internal */
export class Queue<T> {
nbsp marked this conversation as resolved.
Show resolved Hide resolved
items: T[] = [];
limit?: number;
events = new EventEmitter();

constructor(limit?: number) {
this.limit = limit;
}

get = async (): Promise<T> => {
if (this.items.length === 0) {
await once(this.events, 'put');
}
const item = this.items.shift()!;
this.events.emit('get');
return item;
};

put = async (item: T) => {
if (this.limit && this.items.length >= this.limit) {
await once(this.events, 'get');
}
this.items.push(item);
this.events.emit('put');
};
}
Loading