Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rtc: bump ffi to v0.10.2 #271

Merged
merged 11 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/strong-dodos-smash.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@livekit/rtc-node": patch
---

bump ffi to v0.10.2
6 changes: 3 additions & 3 deletions examples/publish-wav/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,11 @@ const track = LocalAudioTrack.createAudioTrack('audio', source);
const options = new TrackPublishOptions();
const buffer = new Int16Array(sample.buffer);
options.source = TrackSource.SOURCE_MICROPHONE;
await room.localParticipant.publishTrack(track, options);
await new Promise((resolve) => setTimeout(resolve, 1000)); // wait a bit so the start doesn't cut off
await room.localParticipant.publishTrack(track, options).then((pub) => pub.waitForSubscription());

let written = 44; // start of WAVE data stream
const FRAME_DURATION = 1; // write 1s of audio at a time
const numSamples = sampleRate / FRAME_DURATION;
const numSamples = sampleRate * FRAME_DURATION;
while (written < dataSize) {
const available = dataSize - written;
const frameSize = Math.min(numSamples, available);
Expand All @@ -61,6 +60,7 @@ while (written < dataSize) {
Math.trunc(frameSize / channels),
);
await source.captureFrame(frame);
await source.waitForPlayout();

written += frameSize;
}
Expand Down
2 changes: 1 addition & 1 deletion packages/livekit-rtc/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
},
"dependencies": {
"@bufbuild/protobuf": "^1.4.2",
"typed-emitter": "^2.1.0"
"@livekit/typed-emitter": "^3.0.0"
},
"devDependencies": {
"@napi-rs/cli": "^2.18.0",
Expand Down
63 changes: 61 additions & 2 deletions packages/livekit-rtc/src/audio_source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,48 @@ import type {
AudioSourceInfo,
CaptureAudioFrameCallback,
CaptureAudioFrameResponse,
ClearAudioBufferResponse,
NewAudioSourceResponse,
} from './proto/audio_frame_pb.js';
import {
AudioSourceType,
CaptureAudioFrameRequest,
ClearAudioBufferRequest,
NewAudioSourceRequest,
} from './proto/audio_frame_pb.js';
import { Queue } from './utils.js';

export class AudioSource {
/** @internal */
info: AudioSourceInfo;
/** @internal */
ffiHandle: FfiHandle;
/** @internal */
lastCapture: number;
/** @internal */
currentQueueSize: number;
/** @internal */
releaseQueue = new Queue<void>();
/** @internal */
timeout?: ReturnType<typeof setTimeout> = undefined;

sampleRate: number;
numChannels: number;
queueSize: number;

constructor(sampleRate: number, numChannels: number, enableQueue?: boolean) {
constructor(sampleRate: number, numChannels: number, queueSize = 1000) {
this.sampleRate = sampleRate;
this.numChannels = numChannels;
this.queueSize = queueSize;

this.lastCapture = 0;
this.currentQueueSize = 0;

const req = new NewAudioSourceRequest({
type: AudioSourceType.AUDIO_SOURCE_NATIVE,
sampleRate: sampleRate,
numChannels: numChannels,
enableQueue: enableQueue,
queueSizeMs: queueSize,
});

const res = FfiClient.instance.request<NewAudioSourceResponse>({
Expand All @@ -47,7 +63,50 @@ export class AudioSource {
this.ffiHandle = new FfiHandle(res.source.handle.id);
}

get queuedDuration(): number {
return Math.max(
this.currentQueueSize - Number(process.hrtime.bigint() / 1000000n) + this.lastCapture,
0,
);
}

clearQueue() {
const req = new ClearAudioBufferRequest({
sourceHandle: this.ffiHandle.handle,
});

FfiClient.instance.request<ClearAudioBufferResponse>({
message: {
case: 'clearAudioBuffer',
value: req,
},
});

this.releaseQueue.put();
nbsp marked this conversation as resolved.
Show resolved Hide resolved
}

async waitForPlayout() {
await this.releaseQueue.get().then(() => {
this.lastCapture = 0;
this.currentQueueSize = 0;
});
}

async captureFrame(frame: AudioFrame) {
const now = Number(process.hrtime.bigint() / 1000000n);
const elapsed = this.lastCapture === 0 ? 0 : now - this.lastCapture;
this.currentQueueSize += (frame.samplesPerChannel / frame.sampleRate - elapsed) * 1000;

this.lastCapture = now;

if (this.timeout) {
nbsp marked this conversation as resolved.
Show resolved Hide resolved
clearTimeout(this.timeout);
}

// remove 50ms to account for processing time
// (e.g. using wait_for_playout for very small chunks)
this.timeout = setTimeout(this.releaseQueue.put, this.currentQueueSize - 50);

const req = new CaptureAudioFrameRequest({
sourceHandle: this.ffiHandle.handle,
buffer: frame.protoInfo(),
Expand Down
2 changes: 1 addition & 1 deletion packages/livekit-rtc/src/audio_stream.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
// SPDX-FileCopyrightText: 2024 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter';
import EventEmitter from 'events';
import type TypedEmitter from 'typed-emitter';
import { AudioFrame } from './audio_frame.js';
import type { FfiEvent } from './ffi_client.js';
import { FfiClient, FfiClientEvent, FfiHandle } from './ffi_client.js';
Expand Down
2 changes: 1 addition & 1 deletion packages/livekit-rtc/src/ffi_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
//
// SPDX-License-Identifier: Apache-2.0
import type { PartialMessage } from '@bufbuild/protobuf';
import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter';
import EventEmitter from 'events';
import type TypedEmitter from 'typed-emitter';
import {
FfiHandle,
livekitCopyBuffer,
Expand Down
13 changes: 5 additions & 8 deletions packages/livekit-rtc/src/napi/native.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,11 @@

/* auto-generated by NAPI-RS */

export declare function livekitInitialize(
callback: (data: Uint8Array) => void,
captureLogs: boolean,
): void;
export declare function livekitFfiRequest(data: Uint8Array): Uint8Array;
export declare function livekitRetrievePtr(handle: Uint8Array): bigint;
export declare function livekitCopyBuffer(ptr: bigint, len: number): Uint8Array;
export declare function livekitDispose(): Promise<void>;
export function livekitInitialize(callback: (data: Uint8Array) => void, captureLogs: boolean): void;
export function livekitFfiRequest(data: Uint8Array): Uint8Array;
export function livekitRetrievePtr(handle: Uint8Array): bigint;
export function livekitCopyBuffer(ptr: bigint, len: number): Uint8Array;
export function livekitDispose(): Promise<void>;
export declare class FfiHandle {
constructor(handle: bigint);
dispose(): void;
Expand Down
74 changes: 71 additions & 3 deletions packages/livekit-rtc/src/proto/audio_frame_pb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,9 @@ export class NewAudioSourceRequest extends Message<NewAudioSourceRequest> {
numChannels = 0;

/**
* @generated from field: optional bool enable_queue = 5;
* @generated from field: uint32 queue_size_ms = 5;
*/
enableQueue?: boolean;
queueSizeMs = 0;

constructor(data?: PartialMessage<NewAudioSourceRequest>) {
super();
Expand All @@ -292,7 +292,7 @@ export class NewAudioSourceRequest extends Message<NewAudioSourceRequest> {
{ no: 2, name: "options", kind: "message", T: AudioSourceOptions, opt: true },
{ no: 3, name: "sample_rate", kind: "scalar", T: 13 /* ScalarType.UINT32 */ },
{ no: 4, name: "num_channels", kind: "scalar", T: 13 /* ScalarType.UINT32 */ },
{ no: 5, name: "enable_queue", kind: "scalar", T: 8 /* ScalarType.BOOL */, opt: true },
{ no: 5, name: "queue_size_ms", kind: "scalar", T: 13 /* ScalarType.UINT32 */ },
]);

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): NewAudioSourceRequest {
Expand Down Expand Up @@ -475,6 +475,74 @@ export class CaptureAudioFrameCallback extends Message<CaptureAudioFrameCallback
}
}

/**
* @generated from message livekit.proto.ClearAudioBufferRequest
*/
export class ClearAudioBufferRequest extends Message<ClearAudioBufferRequest> {
/**
* @generated from field: uint64 source_handle = 1;
*/
sourceHandle = protoInt64.zero;

constructor(data?: PartialMessage<ClearAudioBufferRequest>) {
super();
proto3.util.initPartial(data, this);
}

static readonly runtime: typeof proto3 = proto3;
static readonly typeName = "livekit.proto.ClearAudioBufferRequest";
static readonly fields: FieldList = proto3.util.newFieldList(() => [
{ no: 1, name: "source_handle", kind: "scalar", T: 4 /* ScalarType.UINT64 */ },
]);

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): ClearAudioBufferRequest {
return new ClearAudioBufferRequest().fromBinary(bytes, options);
}

static fromJson(jsonValue: JsonValue, options?: Partial<JsonReadOptions>): ClearAudioBufferRequest {
return new ClearAudioBufferRequest().fromJson(jsonValue, options);
}

static fromJsonString(jsonString: string, options?: Partial<JsonReadOptions>): ClearAudioBufferRequest {
return new ClearAudioBufferRequest().fromJsonString(jsonString, options);
}

static equals(a: ClearAudioBufferRequest | PlainMessage<ClearAudioBufferRequest> | undefined, b: ClearAudioBufferRequest | PlainMessage<ClearAudioBufferRequest> | undefined): boolean {
return proto3.util.equals(ClearAudioBufferRequest, a, b);
}
}

/**
* @generated from message livekit.proto.ClearAudioBufferResponse
*/
export class ClearAudioBufferResponse extends Message<ClearAudioBufferResponse> {
constructor(data?: PartialMessage<ClearAudioBufferResponse>) {
super();
proto3.util.initPartial(data, this);
}

static readonly runtime: typeof proto3 = proto3;
static readonly typeName = "livekit.proto.ClearAudioBufferResponse";
static readonly fields: FieldList = proto3.util.newFieldList(() => [
]);

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): ClearAudioBufferResponse {
return new ClearAudioBufferResponse().fromBinary(bytes, options);
}

static fromJson(jsonValue: JsonValue, options?: Partial<JsonReadOptions>): ClearAudioBufferResponse {
return new ClearAudioBufferResponse().fromJson(jsonValue, options);
}

static fromJsonString(jsonString: string, options?: Partial<JsonReadOptions>): ClearAudioBufferResponse {
return new ClearAudioBufferResponse().fromJsonString(jsonString, options);
}

static equals(a: ClearAudioBufferResponse | PlainMessage<ClearAudioBufferResponse> | undefined, b: ClearAudioBufferResponse | PlainMessage<ClearAudioBufferResponse> | undefined): boolean {
return proto3.util.equals(ClearAudioBufferResponse, a, b);
}
}

/**
* Create a new AudioResampler
*
Expand Down
48 changes: 31 additions & 17 deletions packages/livekit-rtc/src/proto/ffi_pb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import { Message, proto3, protoInt64 } from "@bufbuild/protobuf";
import { ConnectCallback, ConnectRequest, ConnectResponse, DisconnectCallback, DisconnectRequest, DisconnectResponse, GetSessionStatsCallback, GetSessionStatsRequest, GetSessionStatsResponse, PublishDataCallback, PublishDataRequest, PublishDataResponse, PublishSipDtmfCallback, PublishSipDtmfRequest, PublishSipDtmfResponse, PublishTrackCallback, PublishTrackRequest, PublishTrackResponse, PublishTranscriptionCallback, PublishTranscriptionRequest, PublishTranscriptionResponse, RoomEvent, SetLocalAttributesCallback, SetLocalAttributesRequest, SetLocalAttributesResponse, SetLocalMetadataCallback, SetLocalMetadataRequest, SetLocalMetadataResponse, SetLocalNameCallback, SetLocalNameRequest, SetLocalNameResponse, SetSubscribedRequest, SetSubscribedResponse, UnpublishTrackCallback, UnpublishTrackRequest, UnpublishTrackResponse } from "./room_pb.js";
import { CreateAudioTrackRequest, CreateAudioTrackResponse, CreateVideoTrackRequest, CreateVideoTrackResponse, EnableRemoteTrackRequest, EnableRemoteTrackResponse, GetStatsCallback, GetStatsRequest, GetStatsResponse, LocalTrackMuteRequest, LocalTrackMuteResponse, TrackEvent } from "./track_pb.js";
import { CaptureVideoFrameRequest, CaptureVideoFrameResponse, NewVideoSourceRequest, NewVideoSourceResponse, NewVideoStreamRequest, NewVideoStreamResponse, VideoConvertRequest, VideoConvertResponse, VideoStreamEvent, VideoStreamFromParticipantRequest, VideoStreamFromParticipantResponse } from "./video_frame_pb.js";
import { AudioStreamEvent, AudioStreamFromParticipantRequest, AudioStreamFromParticipantResponse, CaptureAudioFrameCallback, CaptureAudioFrameRequest, CaptureAudioFrameResponse, NewAudioResamplerRequest, NewAudioResamplerResponse, NewAudioSourceRequest, NewAudioSourceResponse, NewAudioStreamRequest, NewAudioStreamResponse, RemixAndResampleRequest, RemixAndResampleResponse } from "./audio_frame_pb.js";
import { AudioStreamEvent, AudioStreamFromParticipantRequest, AudioStreamFromParticipantResponse, CaptureAudioFrameCallback, CaptureAudioFrameRequest, CaptureAudioFrameResponse, ClearAudioBufferRequest, ClearAudioBufferResponse, NewAudioResamplerRequest, NewAudioResamplerResponse, NewAudioSourceRequest, NewAudioSourceResponse, NewAudioStreamRequest, NewAudioStreamResponse, RemixAndResampleRequest, RemixAndResampleResponse } from "./audio_frame_pb.js";
import { E2eeRequest, E2eeResponse } from "./e2ee_pb.js";

/**
Expand Down Expand Up @@ -239,25 +239,31 @@ export class FfiRequest extends Message<FfiRequest> {
case: "captureAudioFrame";
} | {
/**
* @generated from field: livekit.proto.NewAudioResamplerRequest new_audio_resampler = 28;
* @generated from field: livekit.proto.ClearAudioBufferRequest clear_audio_buffer = 28;
*/
value: ClearAudioBufferRequest;
case: "clearAudioBuffer";
} | {
/**
* @generated from field: livekit.proto.NewAudioResamplerRequest new_audio_resampler = 29;
*/
value: NewAudioResamplerRequest;
case: "newAudioResampler";
} | {
/**
* @generated from field: livekit.proto.RemixAndResampleRequest remix_and_resample = 29;
* @generated from field: livekit.proto.RemixAndResampleRequest remix_and_resample = 30;
*/
value: RemixAndResampleRequest;
case: "remixAndResample";
} | {
/**
* @generated from field: livekit.proto.E2eeRequest e2ee = 30;
* @generated from field: livekit.proto.E2eeRequest e2ee = 31;
*/
value: E2eeRequest;
case: "e2ee";
} | {
/**
* @generated from field: livekit.proto.AudioStreamFromParticipantRequest audio_stream_from_participant = 31;
* @generated from field: livekit.proto.AudioStreamFromParticipantRequest audio_stream_from_participant = 32;
*/
value: AudioStreamFromParticipantRequest;
case: "audioStreamFromParticipant";
Expand Down Expand Up @@ -297,10 +303,11 @@ export class FfiRequest extends Message<FfiRequest> {
{ no: 25, name: "new_audio_stream", kind: "message", T: NewAudioStreamRequest, oneof: "message" },
{ no: 26, name: "new_audio_source", kind: "message", T: NewAudioSourceRequest, oneof: "message" },
{ no: 27, name: "capture_audio_frame", kind: "message", T: CaptureAudioFrameRequest, oneof: "message" },
{ no: 28, name: "new_audio_resampler", kind: "message", T: NewAudioResamplerRequest, oneof: "message" },
{ no: 29, name: "remix_and_resample", kind: "message", T: RemixAndResampleRequest, oneof: "message" },
{ no: 30, name: "e2ee", kind: "message", T: E2eeRequest, oneof: "message" },
{ no: 31, name: "audio_stream_from_participant", kind: "message", T: AudioStreamFromParticipantRequest, oneof: "message" },
{ no: 28, name: "clear_audio_buffer", kind: "message", T: ClearAudioBufferRequest, oneof: "message" },
{ no: 29, name: "new_audio_resampler", kind: "message", T: NewAudioResamplerRequest, oneof: "message" },
{ no: 30, name: "remix_and_resample", kind: "message", T: RemixAndResampleRequest, oneof: "message" },
{ no: 31, name: "e2ee", kind: "message", T: E2eeRequest, oneof: "message" },
{ no: 32, name: "audio_stream_from_participant", kind: "message", T: AudioStreamFromParticipantRequest, oneof: "message" },
]);

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): FfiRequest {
Expand Down Expand Up @@ -495,25 +502,31 @@ export class FfiResponse extends Message<FfiResponse> {
case: "captureAudioFrame";
} | {
/**
* @generated from field: livekit.proto.NewAudioResamplerResponse new_audio_resampler = 28;
* @generated from field: livekit.proto.ClearAudioBufferResponse clear_audio_buffer = 28;
*/
value: ClearAudioBufferResponse;
case: "clearAudioBuffer";
} | {
/**
* @generated from field: livekit.proto.NewAudioResamplerResponse new_audio_resampler = 29;
*/
value: NewAudioResamplerResponse;
case: "newAudioResampler";
} | {
/**
* @generated from field: livekit.proto.RemixAndResampleResponse remix_and_resample = 29;
* @generated from field: livekit.proto.RemixAndResampleResponse remix_and_resample = 30;
*/
value: RemixAndResampleResponse;
case: "remixAndResample";
} | {
/**
* @generated from field: livekit.proto.AudioStreamFromParticipantResponse audio_stream_from_participant = 30;
* @generated from field: livekit.proto.AudioStreamFromParticipantResponse audio_stream_from_participant = 31;
*/
value: AudioStreamFromParticipantResponse;
case: "audioStreamFromParticipant";
} | {
/**
* @generated from field: livekit.proto.E2eeResponse e2ee = 31;
* @generated from field: livekit.proto.E2eeResponse e2ee = 32;
*/
value: E2eeResponse;
case: "e2ee";
Expand Down Expand Up @@ -553,10 +566,11 @@ export class FfiResponse extends Message<FfiResponse> {
{ no: 25, name: "new_audio_stream", kind: "message", T: NewAudioStreamResponse, oneof: "message" },
{ no: 26, name: "new_audio_source", kind: "message", T: NewAudioSourceResponse, oneof: "message" },
{ no: 27, name: "capture_audio_frame", kind: "message", T: CaptureAudioFrameResponse, oneof: "message" },
{ no: 28, name: "new_audio_resampler", kind: "message", T: NewAudioResamplerResponse, oneof: "message" },
{ no: 29, name: "remix_and_resample", kind: "message", T: RemixAndResampleResponse, oneof: "message" },
{ no: 30, name: "audio_stream_from_participant", kind: "message", T: AudioStreamFromParticipantResponse, oneof: "message" },
{ no: 31, name: "e2ee", kind: "message", T: E2eeResponse, oneof: "message" },
{ no: 28, name: "clear_audio_buffer", kind: "message", T: ClearAudioBufferResponse, oneof: "message" },
{ no: 29, name: "new_audio_resampler", kind: "message", T: NewAudioResamplerResponse, oneof: "message" },
{ no: 30, name: "remix_and_resample", kind: "message", T: RemixAndResampleResponse, oneof: "message" },
{ no: 31, name: "audio_stream_from_participant", kind: "message", T: AudioStreamFromParticipantResponse, oneof: "message" },
{ no: 32, name: "e2ee", kind: "message", T: E2eeResponse, oneof: "message" },
]);

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): FfiResponse {
Expand Down
Loading
Loading