Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rtc: bump ffi to v0.10.2 #271

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/strong-dodos-smash.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@livekit/rtc-node": patch
---

bump ffi to v0.10.2
7 changes: 3 additions & 4 deletions examples/publish-wav/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,11 @@ const track = LocalAudioTrack.createAudioTrack('audio', source);
const options = new TrackPublishOptions();
const buffer = new Int16Array(sample.buffer);
options.source = TrackSource.SOURCE_MICROPHONE;
await room.localParticipant.publishTrack(track, options);
await new Promise((resolve) => setTimeout(resolve, 1000)); // wait a bit so the start doesn't cut off
await room.localParticipant.publishTrack(track, options).then((pub) => pub.waitForSubscription());

let written = 44; // start of WAVE data stream
const FRAME_DURATION = 1; // write 1s of audio at a time
const numSamples = sampleRate / FRAME_DURATION;
const numSamples = sampleRate * FRAME_DURATION;
while (written < dataSize) {
const available = dataSize - written;
const frameSize = Math.min(numSamples, available);
Expand All @@ -61,9 +60,9 @@ while (written < dataSize) {
Math.trunc(frameSize / channels),
);
await source.captureFrame(frame);

written += frameSize;
}
await source.waitForPlayout();

await room.disconnect();
await dispose();
2 changes: 1 addition & 1 deletion packages/livekit-rtc/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
},
"dependencies": {
"@bufbuild/protobuf": "^1.4.2",
"typed-emitter": "^2.1.0"
"@livekit/typed-emitter": "^3.0.0"
},
"devDependencies": {
"@napi-rs/cli": "^2.18.0",
Expand Down
71 changes: 69 additions & 2 deletions packages/livekit-rtc/src/audio_source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ import type {
AudioSourceInfo,
CaptureAudioFrameCallback,
CaptureAudioFrameResponse,
ClearAudioBufferResponse,
NewAudioSourceResponse,
} from './proto/audio_frame_pb.js';
import {
AudioSourceType,
CaptureAudioFrameRequest,
ClearAudioBufferRequest,
NewAudioSourceRequest,
} from './proto/audio_frame_pb.js';

Expand All @@ -21,19 +23,33 @@ export class AudioSource {
info: AudioSourceInfo;
/** @internal */
ffiHandle: FfiHandle;
/** @internal */
lastCapture: number;
/** @internal */
currentQueueSize: number;
/** @internal */
release = () => {};
promise = this.newPromise();
/** @internal */
timeout?: ReturnType<typeof setTimeout> = undefined;

sampleRate: number;
numChannels: number;
queueSize: number;

constructor(sampleRate: number, numChannels: number, enableQueue?: boolean) {
constructor(sampleRate: number, numChannels: number, queueSize = 1000) {
this.sampleRate = sampleRate;
this.numChannels = numChannels;
this.queueSize = queueSize;

this.lastCapture = 0;
this.currentQueueSize = 0;

const req = new NewAudioSourceRequest({
type: AudioSourceType.AUDIO_SOURCE_NATIVE,
sampleRate: sampleRate,
numChannels: numChannels,
enableQueue: enableQueue,
queueSizeMs: queueSize,
});

const res = FfiClient.instance.request<NewAudioSourceResponse>({
Expand All @@ -47,7 +63,58 @@ export class AudioSource {
this.ffiHandle = new FfiHandle(res.source.handle.id);
}

get queuedDuration(): number {
return Math.max(
this.currentQueueSize - Number(process.hrtime.bigint() / 1000000n) + this.lastCapture,
0,
);
}

clearQueue() {
const req = new ClearAudioBufferRequest({
sourceHandle: this.ffiHandle.handle,
});

FfiClient.instance.request<ClearAudioBufferResponse>({
message: {
case: 'clearAudioBuffer',
value: req,
},
});

this.release();
}

/** @internal */
async newPromise() {
return new Promise<void>((resolve) => {
this.release = resolve;
});
}

async waitForPlayout() {
return this.promise.then(() => {
this.lastCapture = 0;
this.currentQueueSize = 0;
this.promise = this.newPromise();
});
}

async captureFrame(frame: AudioFrame) {
const now = Number(process.hrtime.bigint() / 1000000n);
const elapsed = this.lastCapture === 0 ? 0 : now - this.lastCapture;
this.currentQueueSize += (frame.samplesPerChannel / frame.sampleRate - elapsed) * 1000;

this.lastCapture = now;

if (this.timeout) {
nbsp marked this conversation as resolved.
Show resolved Hide resolved
clearTimeout(this.timeout);
}

// remove 50ms to account for processing time
// (e.g. using wait_for_playout for very small chunks)
this.timeout = setTimeout(this.release, this.currentQueueSize - 50);

const req = new CaptureAudioFrameRequest({
sourceHandle: this.ffiHandle.handle,
buffer: frame.protoInfo(),
Expand Down
2 changes: 1 addition & 1 deletion packages/livekit-rtc/src/audio_stream.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
// SPDX-FileCopyrightText: 2024 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter';
import EventEmitter from 'events';
import type TypedEmitter from 'typed-emitter';
import { AudioFrame } from './audio_frame.js';
import type { FfiEvent } from './ffi_client.js';
import { FfiClient, FfiClientEvent, FfiHandle } from './ffi_client.js';
Expand Down
2 changes: 1 addition & 1 deletion packages/livekit-rtc/src/ffi_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
//
// SPDX-License-Identifier: Apache-2.0
import type { PartialMessage } from '@bufbuild/protobuf';
import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter';
import EventEmitter from 'events';
import type TypedEmitter from 'typed-emitter';
import {
FfiHandle,
livekitCopyBuffer,
Expand Down
13 changes: 5 additions & 8 deletions packages/livekit-rtc/src/napi/native.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,11 @@

/* auto-generated by NAPI-RS */

export declare function livekitInitialize(
callback: (data: Uint8Array) => void,
captureLogs: boolean,
): void;
export declare function livekitFfiRequest(data: Uint8Array): Uint8Array;
export declare function livekitRetrievePtr(handle: Uint8Array): bigint;
export declare function livekitCopyBuffer(ptr: bigint, len: number): Uint8Array;
export declare function livekitDispose(): Promise<void>;
export function livekitInitialize(callback: (data: Uint8Array) => void, captureLogs: boolean): void;
export function livekitFfiRequest(data: Uint8Array): Uint8Array;
export function livekitRetrievePtr(handle: Uint8Array): bigint;
export function livekitCopyBuffer(ptr: bigint, len: number): Uint8Array;
export function livekitDispose(): Promise<void>;
export declare class FfiHandle {
constructor(handle: bigint);
dispose(): void;
Expand Down
74 changes: 71 additions & 3 deletions packages/livekit-rtc/src/proto/audio_frame_pb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,9 @@ export class NewAudioSourceRequest extends Message<NewAudioSourceRequest> {
numChannels = 0;

/**
* @generated from field: optional bool enable_queue = 5;
* @generated from field: uint32 queue_size_ms = 5;
*/
enableQueue?: boolean;
queueSizeMs = 0;

constructor(data?: PartialMessage<NewAudioSourceRequest>) {
super();
Expand All @@ -292,7 +292,7 @@ export class NewAudioSourceRequest extends Message<NewAudioSourceRequest> {
{ no: 2, name: "options", kind: "message", T: AudioSourceOptions, opt: true },
{ no: 3, name: "sample_rate", kind: "scalar", T: 13 /* ScalarType.UINT32 */ },
{ no: 4, name: "num_channels", kind: "scalar", T: 13 /* ScalarType.UINT32 */ },
{ no: 5, name: "enable_queue", kind: "scalar", T: 8 /* ScalarType.BOOL */, opt: true },
{ no: 5, name: "queue_size_ms", kind: "scalar", T: 13 /* ScalarType.UINT32 */ },
]);

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): NewAudioSourceRequest {
Expand Down Expand Up @@ -475,6 +475,74 @@ export class CaptureAudioFrameCallback extends Message<CaptureAudioFrameCallback
}
}

/**
* @generated from message livekit.proto.ClearAudioBufferRequest
*/
export class ClearAudioBufferRequest extends Message<ClearAudioBufferRequest> {
/**
* @generated from field: uint64 source_handle = 1;
*/
sourceHandle = protoInt64.zero;

constructor(data?: PartialMessage<ClearAudioBufferRequest>) {
super();
proto3.util.initPartial(data, this);
}

static readonly runtime: typeof proto3 = proto3;
static readonly typeName = "livekit.proto.ClearAudioBufferRequest";
static readonly fields: FieldList = proto3.util.newFieldList(() => [
{ no: 1, name: "source_handle", kind: "scalar", T: 4 /* ScalarType.UINT64 */ },
]);

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): ClearAudioBufferRequest {
return new ClearAudioBufferRequest().fromBinary(bytes, options);
}

static fromJson(jsonValue: JsonValue, options?: Partial<JsonReadOptions>): ClearAudioBufferRequest {
return new ClearAudioBufferRequest().fromJson(jsonValue, options);
}

static fromJsonString(jsonString: string, options?: Partial<JsonReadOptions>): ClearAudioBufferRequest {
return new ClearAudioBufferRequest().fromJsonString(jsonString, options);
}

static equals(a: ClearAudioBufferRequest | PlainMessage<ClearAudioBufferRequest> | undefined, b: ClearAudioBufferRequest | PlainMessage<ClearAudioBufferRequest> | undefined): boolean {
return proto3.util.equals(ClearAudioBufferRequest, a, b);
}
}

/**
* @generated from message livekit.proto.ClearAudioBufferResponse
*/
export class ClearAudioBufferResponse extends Message<ClearAudioBufferResponse> {
constructor(data?: PartialMessage<ClearAudioBufferResponse>) {
super();
proto3.util.initPartial(data, this);
}

static readonly runtime: typeof proto3 = proto3;
static readonly typeName = "livekit.proto.ClearAudioBufferResponse";
static readonly fields: FieldList = proto3.util.newFieldList(() => [
]);

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): ClearAudioBufferResponse {
return new ClearAudioBufferResponse().fromBinary(bytes, options);
}

static fromJson(jsonValue: JsonValue, options?: Partial<JsonReadOptions>): ClearAudioBufferResponse {
return new ClearAudioBufferResponse().fromJson(jsonValue, options);
}

static fromJsonString(jsonString: string, options?: Partial<JsonReadOptions>): ClearAudioBufferResponse {
return new ClearAudioBufferResponse().fromJsonString(jsonString, options);
}

static equals(a: ClearAudioBufferResponse | PlainMessage<ClearAudioBufferResponse> | undefined, b: ClearAudioBufferResponse | PlainMessage<ClearAudioBufferResponse> | undefined): boolean {
return proto3.util.equals(ClearAudioBufferResponse, a, b);
}
}

/**
* Create a new AudioResampler
*
Expand Down
Loading
Loading