Skip to content

Commit

Permalink
RSDK-2379 Add getStream to stream api (#141)
Browse files Browse the repository at this point in the history
  • Loading branch information
maximpertsov authored Aug 1, 2023
1 parent 00d36f4 commit 49bc38d
Show file tree
Hide file tree
Showing 10 changed files with 153 additions and 53 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ clean: clean-js clean-buf clean-docs
test: $(node_modules) build-buf
npm run test

.PHONY: test-watch
test-watch: $(node_modules) build-buf
npm run test:watch

.PHONY: lint
lint: $(node_modules) build-buf
npm run lint
Expand Down
2 changes: 1 addition & 1 deletion examples/teleop-elm/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 7 additions & 12 deletions examples/teleop-elm/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,8 @@ async function connectWebRTC() {
});
}

function onTrack(event) {
const eventStream = event.streams[0];
if (!eventStream) {
throw new Error('expected event stream to exist');
}
function injectMediaStream(eventStream) {
console.debug('got media stream');

const streamName = eventStream.id;
const streamContainers = document.querySelectorAll(
Expand Down Expand Up @@ -53,17 +50,18 @@ connectWebRTC()
.then((client) => {
const base = new VIAM.BaseClient(client, 'viam_base');
const wifi = new VIAM.SensorClient(client, 'wifi');
const streams = new VIAM.StreamClient(client);
const accel = new VIAM.MovementSensorClient(client, 'accelerometer');

const app = Elm.Main.init({
node: document.getElementById('main'),
flags: {},
});

// streams

const streams = new VIAM.StreamClient(client);
streams.on('track', onTrack);
console.debug('requested media stream');
streams.getStream('cam').then((mediaStream) => {
injectMediaStream(mediaStream);
});

app.ports.sendBaseSetPower.subscribe(async ({ linear, angular }) => {
const linearVec = { x: 0, y: linear, z: 0 };
Expand All @@ -85,9 +83,6 @@ connectWebRTC()
const readings = await accel.getLinearAcceleration();
app.ports.recvAccelReading.send(readings);
});

// Add stream from camera
streams.add('cam');
})
.catch((err) => {
console.error('something went wrong');
Expand Down
2 changes: 1 addition & 1 deletion examples/teleop-react/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 0 additions & 32 deletions examples/teleop-react/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,35 +52,3 @@ export const getStreamClient = (client: RobotClient): StreamClient => {
export const getBaseClient = (client: RobotClient): BaseClient => {
return new BaseClient(client, 'viam_base');
};

/**
* Get a stream by name from a StreamClient.
*
* @param streamClient - The connected StreamClient.
* @param name - The name of the camera.
* @returns A MediaStream object that can be used in a <video>.
*/
export const getStream = async (
streamClient: StreamClient,
name: string
): Promise<MediaStream> => {
const streamPromise = new Promise<MediaStream>((resolve, reject) => {
const handleTrack = (event: RTCTrackEvent) => {
const stream = event.streams[0];

if (!stream) {
streamClient.off('track', handleTrack as (args: unknown) => void);
reject(new Error('Recieved track event with no streams'));
} else if (stream.id === name) {
streamClient.off('track', handleTrack as (args: unknown) => void);
resolve(stream);
}
};

streamClient.on('track', handleTrack as (args: unknown) => void);
});

await streamClient.add(name);

return streamPromise;
};
3 changes: 2 additions & 1 deletion examples/teleop-react/src/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ export const useStream = (
if (streamClient && okToConnectRef.current) {
okToConnectRef.current = false;

getStream(streamClient, cameraName)
streamClient
.getStream(cameraName)
.then((mediaStream) => setStream(mediaStream))
.catch((error: unknown) => {
console.warn(`Unable to connect to camera ${cameraName}`, error);
Expand Down
101 changes: 96 additions & 5 deletions src/extra/stream/client.test.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,113 @@
// @vitest-environment happy-dom

import { describe, expect, test } from 'vitest';
import { vi, beforeEach, afterEach, describe, expect, test } from 'vitest';
import { RobotClient } from '../../robot';
vi.mock('../../robot/client');

import { events } from '../../events';
import { StreamServiceClient } from '../../gen/proto/stream/v1/stream_pb_service';
vi.mock('../../gen/proto/stream/v1/stream_pb_service');

import { StreamClient } from './client';

let streamClient: StreamClient;

describe('StreamClient', () => {
beforeEach(() => {
vi.useFakeTimers();

const fakehost = 'fakehost';
RobotClient.prototype.createServiceClient = vi
.fn()
.mockImplementation(() => new StreamServiceClient(fakehost));

const robotClient = new RobotClient(fakehost);
streamClient = new StreamClient(robotClient);
});

afterEach(() => {
vi.useRealTimers();
});

test('webrtc track will cause the client to emit an event', () =>
new Promise<void>((done) => {
const host = 'fakeServiceHost';
const client = new RobotClient(host);
const streamClient = new StreamClient(client);

streamClient.on('track', (data) => {
expect((data as { mock: true }).mock).eq(true);
done();
});

events.emit('track', { mock: true });
}));

test('getStream creates and returns a new stream', async () => {
const fakeCamName = 'fakecam';
const fakeStream = { id: fakeCamName };
StreamServiceClient.prototype.addStream = vi
.fn()
.mockImplementation((_req, _md, cb) => {
cb(null, {});
streamClient.emit('track', { streams: [fakeStream] });
});

const addStream = vi.spyOn(streamClient, 'add');
await expect(streamClient.getStream(fakeCamName)).resolves.toStrictEqual(
fakeStream
);
expect(addStream).toHaveBeenCalledOnce();
expect(addStream).toHaveBeenCalledWith(fakeCamName);
});

test('getStream fails when add stream fails', async () => {
const fakeCamName = 'fakecam';
const error = new Error('could not add stream');
StreamServiceClient.prototype.addStream = vi
.fn()
.mockImplementation((_req, _md, cb) => {
cb(error);
});

const addStream = vi.spyOn(streamClient, 'add');
await expect(streamClient.getStream(fakeCamName)).rejects.toThrow(error);
expect(addStream).toHaveBeenCalledOnce();
expect(addStream).toHaveBeenCalledWith(fakeCamName);
});

test('getStream fails when timeout exceeded', async () => {
const fakeCamName = 'fakecam';
StreamServiceClient.prototype.addStream = vi
.fn()
.mockImplementation((_req, _md, cb) => {
cb(null, {});
});

const addStream = vi.spyOn(streamClient, 'add');
const promise = streamClient.getStream(fakeCamName);
vi.runAllTimers();
await expect(promise).rejects.toThrowError(
'Did not receive a stream after 5000 ms'
);
expect(addStream).toHaveBeenCalledOnce();
expect(addStream).toHaveBeenCalledWith(fakeCamName);
});

test('getStream can add the same stream twice', async () => {
const fakeCamName = 'fakecam';
const fakeStream = { id: fakeCamName };
StreamServiceClient.prototype.addStream = vi
.fn()
.mockImplementation((_req, _md, cb) => {
cb(null, {});
streamClient.emit('track', { streams: [fakeStream] });
});

const addStream = vi.spyOn(streamClient, 'add');
await expect(streamClient.getStream(fakeCamName)).resolves.toStrictEqual(
fakeStream
);
await expect(streamClient.getStream(fakeCamName)).resolves.toStrictEqual(
fakeStream
);
expect(addStream).toHaveBeenCalledTimes(2);
expect(addStream).toHaveBeenCalledWith(fakeCamName);
});
});
37 changes: 37 additions & 0 deletions src/extra/stream/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,41 @@ export class StreamClient extends EventDispatcher implements Stream {
this.streams.delete(name);
}
}

private STREAM_TIMEOUT = 5000;

/**
* Get a stream by name from a StreamClient. Will time out if stream is not
* received within 5 seconds.
*
* @param name - The name of a camera component.
*/
getStream = async (name: string): Promise<MediaStream> => {
const streamPromise = new Promise<MediaStream>((resolve, reject) => {
const handleTrack = (event: RTCTrackEvent) => {
const [stream] = event.streams;

if (!stream) {
this.off('track', handleTrack as (args: unknown) => void);
reject(new Error('Recieved track event with no streams'));
} else if (stream.id === name) {
this.off('track', handleTrack as (args: unknown) => void);
resolve(stream);
}
};

this.on('track', handleTrack as (args: unknown) => void);

setTimeout(() => {
this.off('track', handleTrack as (args: unknown) => void);
reject(
new Error(`Did not receive a stream after ${this.STREAM_TIMEOUT} ms`)
);
}, this.STREAM_TIMEOUT);
});

await this.add(name);

return streamPromise;
};
}
2 changes: 1 addition & 1 deletion src/robot/robot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export interface Robot {
* Blocks on the specified operation on the robot. This function will only
* return when the specific operation has finished or has been cancelled.
*
* @param id (str) - ID of operation to block on.
* @param id - ID of operation to block on.
* @group Operations
* @alpha
*/
Expand Down
4 changes: 4 additions & 0 deletions vite.config.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/// <reference types="vitest" />
import path from 'node:path';
import { defineConfig } from 'vite';

Expand Down Expand Up @@ -35,4 +36,7 @@ export default defineConfig({
},
},
},
test: {
mockReset: true,
},
});

0 comments on commit 49bc38d

Please sign in to comment.