diff --git a/packages/quick-tsr/src/tsrHandler.ts b/packages/quick-tsr/src/tsrHandler.ts index 3055eeba3..d1a95cb4b 100644 --- a/packages/quick-tsr/src/tsrHandler.ts +++ b/packages/quick-tsr/src/tsrHandler.ts @@ -8,7 +8,6 @@ import { Datastore, DeviceStatus, SlowSentCommandInfo, - DeviceOptionsBase, SlowFulfilledCommandInfo, DeviceType, CasparCGDevice, @@ -17,9 +16,7 @@ import { } from 'timeline-state-resolver' import { ThreadedClass } from 'threadedclass' -import * as _ from 'underscore' import { TSRSettings } from './index' -import { BaseRemoteDeviceIntegration } from 'timeline-state-resolver/dist/service/remoteDeviceInstance' /** * Represents a connection between Gateway and TSR @@ -27,13 +24,9 @@ import { BaseRemoteDeviceIntegration } from 'timeline-state-resolver/dist/servic export class TSRHandler { private tsr!: Conductor - private _multiThreaded: boolean | null = null - // private _timeline: TSRTimeline // private _mappings: Mappings - private _devices: { [deviceId: string]: BaseRemoteDeviceIntegration> } = {} - constructor() { // nothing } @@ -76,6 +69,29 @@ export class TSRHandler { // todo ? }) + this.tsr.connectionManager.on('connectionEvent:connectionChanged', (deviceId: string, status: DeviceStatus) => { + console.log(`Device ${deviceId} status changed: ${JSON.stringify(status)}`) + }) + this.tsr.connectionManager.on( + 'connectionEvent:slowSentCommand', + (_deviceId: string, _info: SlowSentCommandInfo) => { + // console.log(`Device ${device.deviceId} slow sent command: ${_info}`) + } + ) + this.tsr.connectionManager.on( + 'connectionEvent:slowFulfilledCommand', + (_deviceId: string, _info: SlowFulfilledCommandInfo) => { + // console.log(`Device ${device.deviceId} slow fulfilled command: ${_info}`) + } + ) + this.tsr.connectionManager.on('connectionEvent:commandReport', (deviceId: string, command: any) => { + console.log(`Device ${deviceId} command: ${JSON.stringify(command)}`) + }) + this.tsr.connectionManager.on('connectionEvent:debug', (deviceId: string, ...args: any[]) => { + const data = args.map((arg) => (typeof arg === 'object' ? JSON.stringify(arg) : arg)) + console.log(`Device ${deviceId} debug: ${data}`) + }) + await this.tsr.init() // this._initialized = true @@ -90,7 +106,7 @@ export class TSRHandler { else return Promise.resolve() } async logMediaList(): Promise { - for (const deviceContainer of this.tsr.getDevices()) { + for (const deviceContainer of this.tsr.connectionManager.getConnections()) { if (deviceContainer.deviceType === DeviceType.CASPARCG) { const device = deviceContainer.device as ThreadedClass @@ -118,94 +134,6 @@ export class TSRHandler { this.tsr.setDatastore(store) } public async setDevices(devices: { [deviceId: string]: DeviceOptionsAny }): Promise { - const ps: Array> = [] - - _.each(devices, (deviceOptions: DeviceOptionsAny, deviceId: string) => { - const oldDevice = this.tsr.getDevice(deviceId) - - if (!oldDevice) { - if (deviceOptions.options) { - console.log('Initializing device: ' + deviceId) - ps.push(this._addDevice(deviceId, deviceOptions)) - } - } else { - if (this._multiThreaded !== null && deviceOptions.isMultiThreaded === undefined) { - deviceOptions.isMultiThreaded = this._multiThreaded - } - if (deviceOptions.options) { - let anyChanged = false - - // let oldOptions = (oldDevice.deviceOptions).options || {} - - if (!_.isEqual(oldDevice.deviceOptions, deviceOptions)) { - anyChanged = true - } - - if (anyChanged) { - console.log('Re-initializing device: ' + deviceId) - ps.push(this._removeDevice(deviceId).then(async () => this._addDevice(deviceId, deviceOptions))) - } - } - } - }) - - _.each(this.tsr.getDevices(), (oldDevice: BaseRemoteDeviceIntegration>) => { - const deviceId = oldDevice.deviceId - if (!devices[deviceId]) { - console.log('Un-initializing device: ' + deviceId) - ps.push(this._removeDevice(deviceId)) - } - }) - - await Promise.all(ps) - } - private async _addDevice(deviceId: string, options: DeviceOptionsAny) { - // console.log('Adding device ' + deviceId) - - if (!options.limitSlowSentCommand) options.limitSlowSentCommand = 40 - if (!options.limitSlowFulfilledCommand) options.limitSlowFulfilledCommand = 100 - - try { - const device = await this.tsr.addDevice(deviceId, options) - - this._devices[deviceId] = device - - await device.device.on('connectionChanged', ((status: DeviceStatus) => { - console.log(`Device ${device.deviceId} status changed: ${JSON.stringify(status)}`) - }) as () => void) - await device.device.on('slowSentCommand', ((_info: SlowSentCommandInfo) => { - // console.log(`Device ${device.deviceId} slow sent command: ${_info}`) - }) as () => void) - await device.device.on('slowFulfilledCommand', ((_info: SlowFulfilledCommandInfo) => { - // console.log(`Device ${device.deviceId} slow fulfilled command: ${_info}`) - }) as () => void) - await device.device.on('commandReport', ((command: any) => { - console.log(`Device ${device.deviceId} command: ${JSON.stringify(command)}`) - }) as () => void) - await device.device.on('debug', (...args: any[]) => { - const data = args.map((arg) => (typeof arg === 'object' ? JSON.stringify(arg) : arg)) - console.log(`Device ${device.deviceId} debug: ${data}`) - }) - // also ask for the status now, and update: - // onConnectionChanged(await device.device.getStatus()) - } catch (e) { - console.error(`Error when adding device "${deviceId}"`, e) - } - } - private async _removeDevice(deviceId: string) { - try { - await this.tsr.removeDevice(deviceId) - } catch (e) { - console.error('Error when removing tsr device: ' + e) - } - - if (this._devices[deviceId]) { - try { - await this._devices[deviceId].device.terminate() - } catch (e) { - console.error('Error when removing device: ' + e) - } - } - delete this._devices[deviceId] + this.tsr.connectionManager.setConnections(devices) } } diff --git a/packages/timeline-state-resolver/examples/CasparcgVideoPlayES6example.js b/packages/timeline-state-resolver/examples/CasparcgVideoPlayES6example.js index 2516772cf..4d64e6ffb 100644 --- a/packages/timeline-state-resolver/examples/CasparcgVideoPlayES6example.js +++ b/packages/timeline-state-resolver/examples/CasparcgVideoPlayES6example.js @@ -14,11 +14,13 @@ tsrConductor .init() .then(() => { // Add devices to the TSR-conductor: - return tsrConductor.addDevice('casparcg0', { - type: DeviceType.CASPARCG, - options: { - host: 'localhost', - port: 5250, + return tsrConductor.connectionManager.setConnections({ + casparcg0: { + type: DeviceType.CASPARCG, + options: { + host: 'localhost', + port: 5250, + }, }, }) }) diff --git a/packages/timeline-state-resolver/examples/playVideoInCaspar.ts b/packages/timeline-state-resolver/examples/playVideoInCaspar.ts index 0ce4c4372..c5f3e562b 100644 --- a/packages/timeline-state-resolver/examples/playVideoInCaspar.ts +++ b/packages/timeline-state-resolver/examples/playVideoInCaspar.ts @@ -12,11 +12,13 @@ tsr.on('debug', (deviceId, cmd) => console.log('debug', deviceId, cmd)) const a = async function () { await tsr.init() - await tsr.addDevice('casparcg0', { - type: DeviceType.CASPARCG, - options: { - host: '127.0.0.1', - // port: 5250 + tsr.connectionManager.setConnections({ + casparcg0: { + type: DeviceType.CASPARCG, + options: { + host: '127.0.0.1', + // port: 5250 + }, }, }) diff --git a/packages/timeline-state-resolver/examples/testChangeTimelineQuickly.ts b/packages/timeline-state-resolver/examples/testChangeTimelineQuickly.ts index c75082fa3..863a9e8c1 100644 --- a/packages/timeline-state-resolver/examples/testChangeTimelineQuickly.ts +++ b/packages/timeline-state-resolver/examples/testChangeTimelineQuickly.ts @@ -10,11 +10,13 @@ tsr.on('error', (e) => console.log('error', e)) const a = async function () { await tsr.init() - await tsr.addDevice('casparcg0', { - type: DeviceType.CASPARCG, - options: { - host: '127.0.0.1', - // port: 5250 + tsr.connectionManager.setConnections({ + casparcg0: { + type: DeviceType.CASPARCG, + options: { + host: '127.0.0.1', + // port: 5250 + }, }, }) diff --git a/packages/timeline-state-resolver/src/__mocks__/osc.ts b/packages/timeline-state-resolver/src/__mocks__/osc.ts index 411de3936..803225f2e 100644 --- a/packages/timeline-state-resolver/src/__mocks__/osc.ts +++ b/packages/timeline-state-resolver/src/__mocks__/osc.ts @@ -9,7 +9,7 @@ export class UDPPort extends EventEmitter { this.emit('ready') } - send({ address }) { + send({ address }: { address: string }) { orgSetTimeout(() => { if (MockOSC.connectionIsGood) { if (address === '/state/full') { @@ -21,11 +21,18 @@ export class UDPPort extends EventEmitter { value: `{ "channel": [{ "faderLevel": 0.75, "pgmOn": false, - "pstOn": false + "pstOn": false, + "showChannel": true }, { "faderLevel": 0.75, "pgmOn": false, - "pstOn": false + "pstOn": false, + "showChannel": true + }, { + "faderLevel": 0.75, + "pgmOn": false, + "pstOn": false, + "showChannel": true }] }`, }, ], diff --git a/packages/timeline-state-resolver/src/__tests__/conductor.spec.ts b/packages/timeline-state-resolver/src/__tests__/conductor.spec.ts index 984d97067..0beef2e66 100644 --- a/packages/timeline-state-resolver/src/__tests__/conductor.spec.ts +++ b/packages/timeline-state-resolver/src/__tests__/conductor.spec.ts @@ -12,7 +12,7 @@ import { } from 'timeline-state-resolver-types' import { MockTime } from './mockTime' import { ThreadedClass } from 'threadedclass' -import { getMockCall } from './lib' +import { addConnections, getMockCall, removeConnections } from './lib' import { setupAllMocks } from '../__mocks__/_setup-all-mocks' import { Commands } from 'casparcg-connection' import { MockDeviceInstanceWrapper, ConstructedMockDevices, DiscardAllMockDevices } from './mockDeviceInstanceWrapper' @@ -38,11 +38,11 @@ describe('Conductor', () => { DiscardAllMockDevices() }) - async function getMockDeviceWrapper(conductor: Conductor, deviceId: string): Promise { - const deviceContainer = conductor.getDevice(deviceId) + async function getMockDeviceWrapper(conductor: Conductor, connectionId: string): Promise { + const deviceContainer = conductor.connectionManager.getConnection(connectionId) expect(deviceContainer).toBeTruthy() - const mockDevice = ConstructedMockDevices[deviceId] + const mockDevice = ConstructedMockDevices[connectionId] expect(mockDevice).toBeTruthy() return mockDevice } @@ -76,13 +76,15 @@ describe('Conductor', () => { try { await conductor.init() - await conductor.addDevice('device0', { - type: DeviceType.ABSTRACT, - options: {}, - }) - await conductor.addDevice('device1', { - type: DeviceType.ABSTRACT, - options: {}, + await addConnections(conductor.connectionManager, { + device0: { + type: DeviceType.ABSTRACT, + options: {}, + }, + device1: { + type: DeviceType.ABSTRACT, + options: {}, + }, }) // add something that will play in a seconds time @@ -197,13 +199,18 @@ describe('Conductor', () => { ) // Remove the device - await conductor.removeDevice('device1') - expect(conductor.getDevice('device1')).toBeFalsy() + await removeConnections( + conductor.connectionManager, + { + device0: { + type: DeviceType.ABSTRACT, + options: {}, + }, + }, + ['device1'] + ) + expect(conductor.connectionManager.getConnection('device1')).toBeFalsy() expect(ConstructedMockDevices['device1']).toBeFalsy() - - // Re-add a device - const addedDevice = await conductor.addDevice('device1', { type: DeviceType.ABSTRACT, options: {} }) - expect(addedDevice).toBeTruthy() } finally { await conductor.destroy() } @@ -226,9 +233,11 @@ describe('Conductor', () => { try { await conductor.init() - await conductor.addDevice('device0', { - type: DeviceType.ABSTRACT, - options: {}, + await addConnections(conductor.connectionManager, { + device0: { + type: DeviceType.ABSTRACT, + options: {}, + }, }) // add something that will play "now" @@ -393,13 +402,15 @@ describe('Conductor', () => { try { await conductor.init() - await conductor.addDevice('device0', { - type: DeviceType.ABSTRACT, - options: {}, - }) - await conductor.addDevice('device1', { - type: DeviceType.HTTPSEND, - options: {}, + await addConnections(conductor.connectionManager, { + device0: { + type: DeviceType.ABSTRACT, + options: {}, + }, + device1: { + type: DeviceType.ABSTRACT, + options: {}, + }, }) const device0 = await getMockDeviceWrapper(conductor, 'device0') @@ -443,15 +454,17 @@ describe('Conductor', () => { try { await conductor.init() - await conductor.addDevice('device0', { - type: DeviceType.ABSTRACT, - options: {}, - isMultiThreaded: true, + await addConnections(conductor.connectionManager, { + device0: { + type: DeviceType.ABSTRACT, + options: {}, + isMultiThreaded: true, + }, }) conductor.setTimelineAndMappings([], myLayerMapping) - const device = conductor.getDevice('device0')!.device - expect(await device.getCurrentTime()).toBeTruthy() + const connection = conductor.connectionManager.getConnection('device0')!.device + expect(await connection.getCurrentTime()).toBeTruthy() } finally { await conductor.destroy() } @@ -481,12 +494,14 @@ describe('Conductor', () => { }) await conductor.init() - await conductor.addDevice('device0', { - type: DeviceType.CASPARCG, - options: { - host: '127.0.0.1', + await addConnections(conductor.connectionManager, { + device0: { + type: DeviceType.CASPARCG, + options: { + host: '127.0.0.1', + }, + commandReceiver: commandReceiver0, }, - commandReceiver: commandReceiver0, }) conductor.setTimelineAndMappings([], myLayerMapping) @@ -511,7 +526,7 @@ describe('Conductor', () => { const timeline: TSRTimeline = [video0] - const device0Container = conductor.getDevice('device0') + const device0Container = conductor.connectionManager.getConnection('device0') const device0 = device0Container!.device as ThreadedClass expect(device0).toBeTruthy() diff --git a/packages/timeline-state-resolver/src/__tests__/lib.ts b/packages/timeline-state-resolver/src/__tests__/lib.ts index 5aec3bc7d..9416bcafb 100644 --- a/packages/timeline-state-resolver/src/__tests__/lib.ts +++ b/packages/timeline-state-resolver/src/__tests__/lib.ts @@ -1,9 +1,73 @@ +import { DeviceOptionsAnyInternal } from '../conductor' +import { ConnectionManager } from '../service/ConnectionManager' +import { MockTime } from './mockTime' + /** * Just a wrapper to :any type, to be used in tests only */ export function getMockCall(fcn: jest.Mock, callIndex: number, paramIndex: number): any { return fcn.mock.calls[callIndex][paramIndex] } +export async function addConnections( + connManager: ConnectionManager, + connections: Record, + waitForInit = true +): Promise { + const connectionIds = Object.keys(connections) + const addedConns: string[] = [] + + let resolveAdded: undefined | (() => void) = undefined + const psAdded = new Promise((resolveCb) => (resolveAdded = resolveCb)) + const cb = (id: string) => { + addedConns.push(id) + + if (resolveAdded && addedConns.length === connectionIds.length) { + resolveAdded() + if (waitForInit) { + connManager.removeListener('connectionInitialised', cb) + } else { + connManager.removeListener('connectionAdded', cb) + } + } + } + if (waitForInit) { + connManager.on('connectionInitialised', cb) + } else { + connManager.on('connectionAdded', cb) + } + + connManager.setConnections(connections) + + await psAdded +} + +export async function removeConnections( + connManager: ConnectionManager, + connections: Record, + toBeRemoved: string[] +): Promise { + const addedConns: string[] = [] + + let resolveAdded: undefined | (() => void) = undefined + const psAdded = new Promise((resolveCb) => (resolveAdded = resolveCb)) + connManager.on('connectionRemoved', (id) => { + addedConns.push(id) + + if (resolveAdded && addedConns.length === toBeRemoved.length) resolveAdded() + }) + + connManager.setConnections(connections) + + await psAdded +} + +export async function awaitNextRemoval(connManager: ConnectionManager): Promise { + return new Promise((resolve) => { + connManager.once('connectionRemoved', () => { + resolve() + }) + }) +} // Excend jest.expect in functionality and typings expect.extend({ @@ -22,3 +86,55 @@ declare global { } } } +/** setTimeout (not affected by jest.fakeTimers) */ +const setTimeoutOrg = setTimeout +/** Sleep for a */ +export async function waitTime(ms: number): Promise { + return new Promise((resolve) => setTimeoutOrg(resolve, ms)) +} + +/** The current time, not affected by jest.fakeTimers */ +export const realTimeNow = Date.now.bind(Date) +/** + * Executes {expectFcn} intermittently until it doesn't throw anymore. + * Waits up to {maxWaitTime} ms, then throws the latest error. + * Useful in unit-tests as a way to wait until a predicate is fulfilled. + */ +export async function waitUntil( + expectFcn: () => void | Promise, + maxWaitTime: number, + mockTime?: MockTime +): Promise { + const startTime = realTimeNow() + + const previousErrors: string[] = [] + + // eslint-disable-next-line no-constant-condition + while (true) { + await mockTime?.advanceTimeTicks(100) + await waitTime(100) + try { + await Promise.resolve(expectFcn()) + return + } catch (err) { + const errorStr = `${err}` + if (previousErrors.length) { + const previousError = previousErrors[previousErrors.length - 1] + if (errorStr !== previousError) { + previousErrors.push(errorStr) + } + } else { + previousErrors.push(errorStr) + } + + const waitedTime = realTimeNow() - startTime + if (waitedTime > maxWaitTime) { + console.log(`waitUntil: waited for ${waitedTime} ms, giving up (maxWaitTime: ${maxWaitTime}).`) + console.log(`Previous errors: \n${previousErrors.join('\n')}`) + + throw err + } + // else ignore error and try again later + } + } +} diff --git a/packages/timeline-state-resolver/src/conductor.ts b/packages/timeline-state-resolver/src/conductor.ts index ad04f35c5..07b940d0f 100644 --- a/packages/timeline-state-resolver/src/conductor.ts +++ b/packages/timeline-state-resolver/src/conductor.ts @@ -1,7 +1,7 @@ import * as _ from 'underscore' import { getResolvedState, ResolvedTimeline, ResolvedTimelineObjectInstance, TimelineObject } from 'superfly-timeline' import { EventEmitter } from 'eventemitter3' -import { MemUsageReport, threadedClass, ThreadedClass, ThreadedClassConfig, ThreadedClassManager } from 'threadedclass' +import { MemUsageReport, threadedClass, ThreadedClass, ThreadedClassManager } from 'threadedclass' import PQueue from 'p-queue' import * as PAll from 'p-all' import PTimeout from 'p-timeout' @@ -41,18 +41,17 @@ import { import { DoOnTime } from './devices/doOnTime' import { AsyncResolver } from './AsyncResolver' -import { assertNever, endTrace, fillStateFromDatastore, FinishedTrace, startTrace } from './lib' +import { endTrace, fillStateFromDatastore, FinishedTrace, startTrace } from './lib' import { CommandWithContext } from './devices/device' import { DeviceContainer } from './devices/deviceContainer' -import { CasparCGDevice, DeviceOptionsCasparCGInternal } from './integrations/casparCG' -import { SisyfosMessageDevice, DeviceOptionsSisyfosInternal } from './integrations/sisyfos' -import { VMixDevice, DeviceOptionsVMixInternal } from './integrations/vmix' -import { VizMSEDevice, DeviceOptionsVizMSEInternal } from './integrations/vizMSE' -import { BaseRemoteDeviceIntegration, RemoteDeviceInstance } from './service/remoteDeviceInstance' -import type { ImplementedServiceDeviceTypes } from './service/devices' -import { DeviceEvents } from './service/device' +import { DeviceOptionsCasparCGInternal } from './integrations/casparCG' +import { DeviceOptionsSisyfosInternal } from './integrations/sisyfos' +import { DeviceOptionsVMixInternal } from './integrations/vmix' +import { DeviceOptionsVizMSEInternal } from './integrations/vizMSE' +import { BaseRemoteDeviceIntegration } from './service/remoteDeviceInstance' +import { ConnectionManager } from './service/ConnectionManager' export { DeviceContainer } export { CommandWithContext } @@ -65,8 +64,6 @@ export const MINTIMEUNIT = 1 // Minimum unit of time /** When resolving and the timeline has repeating objects, only resolve this far into the future */ const RESOLVE_LIMIT_TIME = 10 * 1000 -const FREEZE_LIMIT = 5000 // how long to wait before considering the child to be unresponsive - export type TimelineTriggerTimeResult = Array<{ id: string; time: number }> export { Device } from './devices/device' @@ -97,7 +94,6 @@ interface TimelineCallback { } type TimelineCallbacks = { [key: string]: TimelineCallback } const CALLBACK_WAIT_TIME = 50 -const REMOVE_TIMEOUT = 5000 interface CallbackInstance { playing: boolean | undefined @@ -170,7 +166,7 @@ export class Conductor extends EventEmitter { private _options: ConductorOptions - private devices = new Map>>() + public readonly connectionManager = new ConnectionManager() private _getCurrentTime?: () => number @@ -239,6 +235,9 @@ export class Conductor extends EventEmitter { this.emit('error', 'Error during auto-init: ', e) }) } + + this.connectionManager.on('error', (e) => this.emit('error', e)) + this.connectionManager.on('connectionEvent:resyncStates', (deviceId: string) => this.resyncDeviceStates(deviceId)) } /** * Initializates the resolver, with optional multithreading @@ -332,318 +331,16 @@ export class Conductor extends EventEmitter { this._estimateResolveTimeMultiplier = value } - public getDevices(includeUninitialized = false): Array>> { - if (includeUninitialized) { - return Array.from(this.devices.values()) - } else { - return Array.from(this.devices.values()).filter((device) => device.initialized === true) - } - } - - public getDevice( - deviceId: string, - includeUninitialized = false - ): BaseRemoteDeviceIntegration> | undefined { - if (includeUninitialized) { - return this.devices.get(deviceId) - } else { - const device = this.devices.get(deviceId) - if (device?.initialized === true) { - return device - } else { - return undefined - } - } - } - /** - * Adds a device that can be referenced by the timeline and mappings. - * NOTE: use this with caution! if a device fails to initialise (i.e. because the hardware is turned off) this may never resolve. It is preferred to use createDevice and initDevice separately for this reason. - * @param deviceId Id used by the mappings to reference the device. - * @param deviceOptions The options used to initalize the device - * @returns A promise that resolves with the created device, or rejects with an error message. - */ - public async addDevice( - deviceId: string, - deviceOptions: DeviceOptionsAnyInternal, - activeRundownPlaylistId?: string - ): Promise>> { - const newDevice = await this.createDevice(deviceId, deviceOptions) - - try { - // Temporary listening to events, these are removed after the devide has been initiated. - const instanceId = newDevice.instanceId - const onDeviceInfo: any = (...args: DeviceEvents['info']) => { - this.emit('info', instanceId, ...args) - } - const onDeviceWarning: any = (...args: DeviceEvents['warning']) => { - this.emit('warning', instanceId, ...args) - } - const onDeviceError: any = (...args: DeviceEvents['error']) => { - this.emit('error', instanceId, ...args) - } - const onDeviceDebug: any = (...args: DeviceEvents['debug']) => { - this.emit('debug', instanceId, ...args) - } - const onDeviceDebugState: any = (...args: DeviceEvents['debugState']) => { - this.emit('debugState', args) - } - - newDevice.device.on('info', onDeviceInfo).catch(console.error) - newDevice.device.on('warning', onDeviceWarning).catch(console.error) - newDevice.device.on('error', onDeviceError).catch(console.error) - newDevice.device.on('debug', onDeviceDebug).catch(console.error) - newDevice.device.on('debugState', onDeviceDebugState).catch(console.error) - - const device = await this.initDevice(deviceId, deviceOptions, activeRundownPlaylistId) - - // Remove listeners, expect consumer to subscribe to them now. - newDevice.device.removeListener('info', onDeviceInfo).catch(console.error) - newDevice.device.removeListener('warning', onDeviceWarning).catch(console.error) - newDevice.device.removeListener('error', onDeviceError).catch(console.error) - newDevice.device.removeListener('debug', onDeviceDebug).catch(console.error) - newDevice.device.removeListener('debugState', onDeviceDebugState).catch(console.error) - - return device - } catch (e) { - await this.terminateUnwantedDevice(newDevice) - this.devices.delete(deviceId) - this.emit('error', 'conductor.addDevice', e) - return Promise.reject(e) - } - } - - /** - * Creates an uninitialised device that can be referenced by the timeline and mappings. - * @param deviceId Id used by the mappings to reference the device. - * @param deviceOptions The options used to initalize the device - * @param options Additional options - * @returns A promise that resolves with the created device, or rejects with an error message. - */ - public async createDevice( - deviceId: string, - deviceOptions: DeviceOptionsAnyInternal, - options?: { signal?: AbortSignal } - ): Promise>> { - let newDevice: BaseRemoteDeviceIntegration> | undefined - try { - const throwIfAborted = () => this.throwIfAborted(options?.signal, deviceId, 'creation') - if (this.devices.has(deviceId)) { - throw new Error(`Device "${deviceId}" already exists when creating device`) - } - throwIfAborted() - - const threadedClassOptions: ThreadedClassConfig = { - threadUsage: deviceOptions.threadUsage || 1, - autoRestart: false, - disableMultithreading: !deviceOptions.isMultiThreaded, - instanceName: deviceId, - freezeLimit: FREEZE_LIMIT, - } - - const getCurrentTime = () => { - return this.getCurrentTime() - } - - const newDevicePromise = this.createDeviceContainer(deviceOptions, deviceId, getCurrentTime, threadedClassOptions) - - if (!newDevicePromise) { - const type: any = deviceOptions.type - throw new Error(`No matching device type for "${type}" ("${DeviceType[type]}") found in conductor`) - } - - newDevice = await makeImmediatelyAbortable(async () => { - throwIfAborted() - const newDevice = await newDevicePromise - if (options?.signal?.aborted) { - // if the promise above did not resolve before aborted, - // this executes some time after raceAbortable rejects, serving as a cleanup - await this.terminateUnwantedDevice(newDevice) - throw new AbortError(`Device "${deviceId}" creation aborted`) - } - return newDevice - }, options?.signal) - - newDevice.device.on('resetResolver', () => this.resetResolver()).catch(console.error) - newDevice.on('error', (context, e) => { - this.emit('error', `deviceContainer for "${newDevice?.deviceId}" emitted an error: ${context}, ${e}`) - }) - - // Double check that it hasnt been created while we were busy waiting - if (this.devices.has(deviceId)) { - throw new Error(`Device "${deviceId}" already exists when creating device`) - } - throwIfAborted() - } catch (e) { - await this.terminateUnwantedDevice(newDevice) - - this.emit('error', 'conductor.createDevice', e) - throw e - } - - this.devices.set(deviceId, newDevice) - - return newDevice - } - - private throwIfAborted(signal: AbortSignal | undefined, deviceId: string, action: string) { - if (signal?.aborted) { - throw new AbortError(`Device "${deviceId}" ${action} aborted`) - } - } - - private createDeviceContainer( - deviceOptions: DeviceOptionsAnyInternal, - deviceId: string, - getCurrentTime: () => number, - threadedClassOptions: ThreadedClassConfig - ): Promise>> | null { - switch (deviceOptions.type) { - case DeviceType.CASPARCG: - return DeviceContainer.create( - '../../dist/integrations/casparCG/index.js', - 'CasparCGDevice', - deviceId, - deviceOptions, - getCurrentTime, - threadedClassOptions - ) - case DeviceType.SISYFOS: - return DeviceContainer.create( - '../../dist/integrations/sisyfos/index.js', - 'SisyfosMessageDevice', - deviceId, - deviceOptions, - getCurrentTime, - threadedClassOptions - ) - case DeviceType.VIZMSE: - return DeviceContainer.create( - '../../dist/integrations/vizMSE/index.js', - 'VizMSEDevice', - deviceId, - deviceOptions, - getCurrentTime, - threadedClassOptions - ) - case DeviceType.VMIX: - return DeviceContainer.create( - '../../dist/integrations/vmix/index.js', - 'VMixDevice', - deviceId, - deviceOptions, - getCurrentTime, - threadedClassOptions - ) - case DeviceType.ABSTRACT: - case DeviceType.ATEM: - case DeviceType.HTTPSEND: - case DeviceType.HTTPWATCHER: - case DeviceType.HYPERDECK: - case DeviceType.LAWO: - case DeviceType.OBS: - case DeviceType.OSC: - case DeviceType.MULTI_OSC: - case DeviceType.PANASONIC_PTZ: - case DeviceType.PHAROS: - case DeviceType.SHOTOKU: - case DeviceType.SINGULAR_LIVE: - case DeviceType.SOFIE_CHEF: - case DeviceType.TCPSEND: - case DeviceType.TELEMETRICS: - case DeviceType.TRICASTER: - case DeviceType.QUANTEL: { - ensureIsImplementedAsService(deviceOptions.type) - - // presumably this device is implemented in the new service handler - return RemoteDeviceInstance.create(deviceId, deviceOptions, getCurrentTime, threadedClassOptions) - } - default: - assertNever(deviceOptions) - return null - } - } - - private async terminateUnwantedDevice(newDevice: BaseRemoteDeviceIntegration> | undefined) { - await newDevice - ?.terminate() - .catch((e) => this.emit('error', `Cleanup failed of aborted device "${newDevice.deviceId}": ${e}`)) - } - - /** - * Initialises an existing device that can be referenced by the timeline and mappings. - * @param deviceId Id used by the mappings to reference the device. - * @param deviceOptions The options used to initalize the device - * @param activeRundownPlaylistId Id of the current rundown playlist - * @param options Additional options - * @returns A promise that resolves with the initialised device, or rejects with an error message. - */ - public async initDevice( - deviceId: string, - deviceOptions: DeviceOptionsAnyInternal, - activeRundownPlaylistId?: string, - options?: { signal?: AbortSignal } - ): Promise>> { - const throwIfAborted = () => this.throwIfAborted(options?.signal, deviceId, 'initialisation') - - throwIfAborted() - - const newDevice = this.devices.get(deviceId) - - if (!newDevice) { - throw new Error('Could not find device ' + deviceId + ', has it been created?') - } - - if (newDevice.initialized === true) { - throw new Error('Device ' + deviceId + ' is already initialized!') - } - this.emit( - 'info', - `Initializing device ${newDevice.deviceId} (${newDevice.instanceId}) of type ${DeviceType[deviceOptions.type]}...` - ) - return makeImmediatelyAbortable(async () => { - throwIfAborted() - await newDevice.init(deviceOptions.options, activeRundownPlaylistId) - throwIfAborted() - await newDevice.reloadProps() - throwIfAborted() - this.emit('info', `Device ${newDevice.deviceId} (${newDevice.instanceId}) initialized!`) - return newDevice - }, options?.signal) - } - - /** - * Safely remove a device - * @param deviceId The id of the device to be removed - */ - public async removeDevice(deviceId: string): Promise { - const device = this.devices.get(deviceId) - if (device) { - try { - await Promise.race([ - device.device.terminate(), - new Promise((_, reject) => setTimeout(() => reject('Timeout'), REMOVE_TIMEOUT)), - ]) - } catch (e) { - // An error while terminating is probably not that important, since we'll kill the instance anyway - this.emit('warning', `Error when terminating device ${e}`) - } - await device.terminate() - this.devices.delete(deviceId) - } else { - return Promise.reject('No device found') - } - } - - /** - * Remove all devices + * Remove all connections */ public async destroy(): Promise { clearTimeout(this._interval) if (this._triggerSendStartStopCallbacksTimeout) clearTimeout(this._triggerSendStartStopCallbacksTimeout) - await this._mapAllDevices(true, async (d) => this.removeDevice(d.deviceId)) + // remove all connections: + this.connectionManager.setConnections({}) } /** @@ -682,7 +379,7 @@ export class Conductor extends EventEmitter { }` ) await this._actionQueue.add(async () => { - await this._mapAllDevices(false, async (d) => + await this._mapAllConnections(false, async (d) => PTimeout( (async () => { const trace = startTrace('conductor:makeReady:' + d.deviceId) @@ -707,7 +404,7 @@ export class Conductor extends EventEmitter { this.activationId = undefined this.emit('debug', `devicesStandDown, ${okToDestroyStuff ? 'okToDestroyStuff' : 'undefined'}`) await this._actionQueue.add(async () => { - await this._mapAllDevices(false, async (d) => + await this._mapAllConnections(false, async (d) => PTimeout( (async () => { const trace = startTrace('conductor:standDown:' + d.deviceId) @@ -725,14 +422,12 @@ export class Conductor extends EventEmitter { return ThreadedClassManager.getThreadsMemoryUsage() } - private async _mapAllDevices( + private async _mapAllConnections( includeUninitialized: boolean, fcn: (d: BaseRemoteDeviceIntegration>) => Promise ): Promise { return PAll( - this.getDevices(true) - .filter((d) => includeUninitialized || d.initialized === true) - .map((d) => async () => fcn(d)), + this.connectionManager.getConnections(includeUninitialized).map((d) => async () => fcn(d)), { stopOnError: false, } @@ -840,8 +535,8 @@ export class Conductor extends EventEmitter { // TODO - the PAll way of doing this provokes https://github.com/nrkno/tv-automation-state-timeline-resolver/pull/139 // The doOnTime calls fire before this, meaning we cleanup the state for a time we have already sent commands for const pPrepareForHandleStates: Promise = Promise.all( - Array.from(this.devices.values()) - .filter((d) => d.initialized === true) + this.connectionManager + .getConnections(false) .map(async (device: BaseRemoteDeviceIntegration>): Promise => { await device.device.prepareForHandleState(resolveTime) }) @@ -926,11 +621,11 @@ export class Conductor extends EventEmitter { const layersPerDevice = this.filterLayersPerDevice( tlState.layers as Timeline.StateInTime, - Array.from(this.devices.values()).filter((d) => d.initialized === true) + this.connectionManager.getConnections(false) ) // Push state to the right device: - await this._mapAllDevices( + await this._mapAllConnections( false, async (device: BaseRemoteDeviceIntegration>): Promise => { if (this._options.optimizeForProduction) { @@ -963,7 +658,7 @@ export class Conductor extends EventEmitter { if (!nextEventTime && tlState.time < this._resolved.validTo) { // There's nothing ahead in the timeline (as far as we can see, ref: this._resolved.validTo) // Tell the devices that the future is clear: - await this._mapAllDevices(true, async (device: BaseRemoteDeviceIntegration>) => { + await this._mapAllConnections(true, async (device: BaseRemoteDeviceIntegration>) => { try { await device.device.clearFuture(tlState.time) } catch (e) { @@ -1118,7 +813,7 @@ export class Conductor extends EventEmitter { const filledState = fillStateFromDatastore(state, this._datastore) // send the filled state to the device handler - return this.getDevice(deviceId)?.device.handleState(filledState, mappings) + return this.connectionManager.getConnection(deviceId)?.device.handleState(filledState, mappings) } setDatastore(newStore: Datastore) { @@ -1150,7 +845,8 @@ export class Conductor extends EventEmitter { for (const s of toBeFilled) { const filledState = fillStateFromDatastore(s.state, this._datastore) - this.getDevice(deviceId) + this.connectionManager + .getConnection(deviceId) ?.device.handleState(filledState, s.mappings) .catch((e) => this.emit('error', 'resolveTimeline' + e + '\nStack: ' + (e as Error).stack)) } @@ -1161,6 +857,29 @@ export class Conductor extends EventEmitter { }) } + private resyncDeviceStates(deviceId: string) { + this._actionQueue + .add(() => { + const toBeFilled = _.compact([ + // shallow clone so we don't reverse the array in place + [...this._deviceStates[deviceId]].reverse().find((s) => s.time <= this.getCurrentTime()), // one state before now + ...this._deviceStates[deviceId].filter((s) => s.time > this.getCurrentTime()), // all states after now + ]) + + for (const s of toBeFilled) { + const filledState = fillStateFromDatastore(s.state, this._datastore) + + this.connectionManager + .getConnection(deviceId) + ?.device.handleState(filledState, s.mappings) + .catch((e) => this.emit('error', 'resolveTimeline' + e + '\nStack: ' + (e as Error).stack)) + } + }) + .catch((e) => { + this.emit('error', 'Caught error in resyncDeviceStates' + e) + }) + } + getTimelineSize(): number { if (this._timelineSize === undefined) { // Update the cache: @@ -1495,45 +1214,3 @@ function removeParentFromState( } return o } - -/** - * If aborted, rejects as soon as possible, but lets the wraped function safely resolve or reject on its own - * @param func async function to wrap - * @param abortSignal the AbortSignal - * @returns Promise of the same type as `func` - */ -async function makeImmediatelyAbortable( - func: (abortSignal?: AbortSignal) => Promise, - abortSignal?: AbortSignal -): Promise { - const mainPromise = func(abortSignal) - if (!abortSignal) { - return mainPromise - } - let resolveAbortPromise: Function - const abortPromise = new Promise((resolve, reject) => { - resolveAbortPromise = () => { - resolve() - abortSignal.removeEventListener('abort', rejectPromise) - } - const rejectPromise = () => { - reject(new AbortError()) - } - abortSignal.addEventListener('abort', rejectPromise, { once: true }) - }) - return Promise.race([mainPromise, abortPromise]) - .then((result) => { - // only mainPromise could have resolved, so the result must be T - resolveAbortPromise() - return result as T - }) - .catch((reason) => { - // mainPromise or abortPromise might have rejected; calling resolveAbortPromise in the latter case is safe - resolveAbortPromise() - throw reason - }) -} - -function ensureIsImplementedAsService(_type: ImplementedServiceDeviceTypes): void { - // This is a type check -} diff --git a/packages/timeline-state-resolver/src/integrations/casparCG/__tests__/casparcg.spec.ts b/packages/timeline-state-resolver/src/integrations/casparCG/__tests__/casparcg.spec.ts index edb05af02..a65b5c87a 100644 --- a/packages/timeline-state-resolver/src/integrations/casparCG/__tests__/casparcg.spec.ts +++ b/packages/timeline-state-resolver/src/integrations/casparCG/__tests__/casparcg.spec.ts @@ -13,7 +13,7 @@ import { MappingCasparCGType, } from 'timeline-state-resolver-types' import { MockTime } from '../../../__tests__/mockTime' -import { getMockCall } from '../../../__tests__/lib' +import { addConnections, getMockCall } from '../../../__tests__/lib' import { Commands } from 'casparcg-connection' // usage logCalls(commandReceiver0) @@ -51,13 +51,15 @@ describe('CasparCG', () => { getCurrentTime: mockTime.getCurrentTime, }) await myConductor.init() // we cannot do an await, because setTimeout will never call without jest moving on. - await myConductor.addDevice('myCCG', { - type: DeviceType.CASPARCG, - options: { - host: '127.0.0.1', + await addConnections(myConductor.connectionManager, { + myCCG: { + type: DeviceType.CASPARCG, + options: { + host: '127.0.0.1', + }, + commandReceiver: commandReceiver0, + skipVirginCheck: true, }, - commandReceiver: commandReceiver0, - skipVirginCheck: true, }) await mockTime.advanceTimeToTicks(10100) @@ -129,13 +131,15 @@ describe('CasparCG', () => { getCurrentTime: mockTime.getCurrentTime, }) await myConductor.init() // we cannot do an await, because setTimeout will never call without jest moving on. - await myConductor.addDevice('myCCG', { - type: DeviceType.CASPARCG, - options: { - host: '127.0.0.1', + await addConnections(myConductor.connectionManager, { + myCCG: { + type: DeviceType.CASPARCG, + options: { + host: '127.0.0.1', + }, + commandReceiver: commandReceiver0, + skipVirginCheck: true, }, - commandReceiver: commandReceiver0, - skipVirginCheck: true, }) await mockTime.advanceTimeToTicks(10100) @@ -196,14 +200,16 @@ describe('CasparCG', () => { getCurrentTime: mockTime.getCurrentTime, }) await myConductor.init() // we cannot do an await, because setTimeout will never call without jest moving on. - await myConductor.addDevice('myCCG', { - type: DeviceType.CASPARCG, - options: { - host: '127.0.0.1', - fps: 50, + await addConnections(myConductor.connectionManager, { + myCCG: { + type: DeviceType.CASPARCG, + options: { + host: '127.0.0.1', + fps: 50, + }, + commandReceiver: commandReceiver0, + skipVirginCheck: true, }, - commandReceiver: commandReceiver0, - skipVirginCheck: true, }) await mockTime.advanceTimeToTicks(10100) @@ -278,13 +284,15 @@ describe('CasparCG', () => { getCurrentTime: mockTime.getCurrentTime, }) await myConductor.init() - await myConductor.addDevice('myCCG', { - type: DeviceType.CASPARCG, - options: { - host: '127.0.0.1', + await addConnections(myConductor.connectionManager, { + myCCG: { + type: DeviceType.CASPARCG, + options: { + host: '127.0.0.1', + }, + commandReceiver: commandReceiver0, + skipVirginCheck: true, }, - commandReceiver: commandReceiver0, - skipVirginCheck: true, }) myConductor.setTimelineAndMappings( @@ -351,13 +359,15 @@ describe('CasparCG', () => { getCurrentTime: mockTime.getCurrentTime, }) await myConductor.init() - await myConductor.addDevice('myCCG', { - type: DeviceType.CASPARCG, - options: { - host: '127.0.0.1', + await addConnections(myConductor.connectionManager, { + myCCG: { + type: DeviceType.CASPARCG, + options: { + host: '127.0.0.1', + }, + commandReceiver: commandReceiver0, + skipVirginCheck: true, }, - commandReceiver: commandReceiver0, - skipVirginCheck: true, }) // await mockTime.advanceTimeToTicks(10050) @@ -440,13 +450,15 @@ describe('CasparCG', () => { getCurrentTime: mockTime.getCurrentTime, }) await myConductor.init() - await myConductor.addDevice('myCCG', { - type: DeviceType.CASPARCG, - options: { - host: '127.0.0.1', + await addConnections(myConductor.connectionManager, { + myCCG: { + type: DeviceType.CASPARCG, + options: { + host: '127.0.0.1', + }, + commandReceiver: commandReceiver0, + skipVirginCheck: true, }, - commandReceiver: commandReceiver0, - skipVirginCheck: true, }) await mockTime.advanceTimeToTicks(10050) @@ -519,13 +531,15 @@ describe('CasparCG', () => { getCurrentTime: mockTime.getCurrentTime, }) await myConductor.init() - await myConductor.addDevice('myCCG', { - type: DeviceType.CASPARCG, - options: { - host: '127.0.0.1', + await addConnections(myConductor.connectionManager, { + myCCG: { + type: DeviceType.CASPARCG, + options: { + host: '127.0.0.1', + }, + commandReceiver: commandReceiver0, + skipVirginCheck: true, }, - commandReceiver: commandReceiver0, - skipVirginCheck: true, }) await mockTime.advanceTimeToTicks(10050) @@ -599,18 +613,20 @@ describe('CasparCG', () => { getCurrentTime: mockTime.getCurrentTime, }) await myConductor.init() - await myConductor.addDevice('myCCG', { - type: DeviceType.CASPARCG, - options: { - host: '127.0.0.1', + await addConnections(myConductor.connectionManager, { + myCCG: { + type: DeviceType.CASPARCG, + options: { + host: '127.0.0.1', + }, + commandReceiver: commandReceiver0, + skipVirginCheck: true, }, - commandReceiver: commandReceiver0, - skipVirginCheck: true, }) - const deviceContainer = myConductor.getDevice('myCCG') - const device = deviceContainer!.device - await device['_ccgState'] + const connContainer = myConductor.connectionManager.getConnection('myCCG') + const conn = connContainer!.device + await conn['_ccgState'] await mockTime.advanceTimeToTicks(10050) expect(commandReceiver0).toHaveBeenCalledTimes(0) @@ -715,13 +731,15 @@ describe('CasparCG', () => { getCurrentTime: mockTime.getCurrentTime, }) await myConductor.init() - await myConductor.addDevice('myCCG', { - type: DeviceType.CASPARCG, - options: { - host: '127.0.0.1', + await addConnections(myConductor.connectionManager, { + myCCG: { + type: DeviceType.CASPARCG, + options: { + host: '127.0.0.1', + }, + commandReceiver: commandReceiver0, + skipVirginCheck: true, }, - commandReceiver: commandReceiver0, - skipVirginCheck: true, }) // Check that no commands has been sent: @@ -834,15 +852,19 @@ describe('CasparCG', () => { console.warn(msg) }) await myConductor.init() - await myConductor.addDevice('myCCG', { - type: DeviceType.CASPARCG, - options: { - host: '127.0.0.1', + await addConnections(myConductor.connectionManager, { + myCCG: { + type: DeviceType.CASPARCG, + options: { + host: '127.0.0.1', + }, + commandReceiver: commandReceiver0, + skipVirginCheck: true, }, - commandReceiver: commandReceiver0, - skipVirginCheck: true, }) + await mockTime.advanceTimeTicks(100) // let the device settle + // Check that no commands has been sent: expect(commandReceiver0).toHaveBeenCalledTimes(0) @@ -972,13 +994,15 @@ describe('CasparCG', () => { getCurrentTime: mockTime.getCurrentTime, }) await myConductor.init() - await myConductor.addDevice('myCCG', { - type: DeviceType.CASPARCG, - options: { - host: '127.0.0.1', + await addConnections(myConductor.connectionManager, { + myCCG: { + type: DeviceType.CASPARCG, + options: { + host: '127.0.0.1', + }, + commandReceiver: commandReceiver0, + skipVirginCheck: true, }, - commandReceiver: commandReceiver0, - skipVirginCheck: true, }) expect(mockTime.getCurrentTime()).toEqual(10000) @@ -1077,13 +1101,15 @@ describe('CasparCG', () => { getCurrentTime: mockTime.getCurrentTime, }) await myConductor.init() - await myConductor.addDevice('myCCG', { - type: DeviceType.CASPARCG, - options: { - host: '127.0.0.1', + await addConnections(myConductor.connectionManager, { + myCCG: { + type: DeviceType.CASPARCG, + options: { + host: '127.0.0.1', + }, + commandReceiver: commandReceiver0, + skipVirginCheck: true, }, - commandReceiver: commandReceiver0, - skipVirginCheck: true, }) expect(mockTime.getCurrentTime()).toEqual(10000) @@ -1181,13 +1207,15 @@ describe('CasparCG', () => { getCurrentTime: mockTime.getCurrentTime, }) await myConductor.init() - await myConductor.addDevice('myCCG', { - type: DeviceType.CASPARCG, - options: { - host: '127.0.0.1', + await addConnections(myConductor.connectionManager, { + myCCG: { + type: DeviceType.CASPARCG, + options: { + host: '127.0.0.1', + }, + commandReceiver: commandReceiver0, + skipVirginCheck: true, }, - commandReceiver: commandReceiver0, - skipVirginCheck: true, }) await mockTime.advanceTimeToTicks(10050) @@ -1281,13 +1309,15 @@ describe('CasparCG', () => { getCurrentTime: mockTime.getCurrentTime, }) await myConductor.init() - await myConductor.addDevice('myCCG', { - type: DeviceType.CASPARCG, - options: { - host: '127.0.0.1', + await addConnections(myConductor.connectionManager, { + myCCG: { + type: DeviceType.CASPARCG, + options: { + host: '127.0.0.1', + }, + commandReceiver: commandReceiver0, + skipVirginCheck: true, }, - commandReceiver: commandReceiver0, - skipVirginCheck: true, }) await mockTime.advanceTimeToTicks(10050) @@ -1370,13 +1400,15 @@ describe('CasparCG', () => { getCurrentTime: mockTime.getCurrentTime, }) await myConductor.init() // we cannot do an await, because setTimeout will never call without jest moving on. - await myConductor.addDevice('myCCG', { - type: DeviceType.CASPARCG, - options: { - host: '127.0.0.1', + await addConnections(myConductor.connectionManager, { + myCCG: { + type: DeviceType.CASPARCG, + options: { + host: '127.0.0.1', + }, + commandReceiver: commandReceiver0, + skipVirginCheck: true, }, - commandReceiver: commandReceiver0, - skipVirginCheck: true, }) await mockTime.advanceTimeToTicks(10100) @@ -1454,14 +1486,16 @@ describe('CasparCG', () => { getCurrentTime: mockTime.getCurrentTime, }) await myConductor.init() - await myConductor.addDevice('myCCG', { - type: DeviceType.CASPARCG, - options: { - host: '127.0.0.1', - retryInterval: undefined, // disable retries explicitly, we will manually trigger them + await addConnections(myConductor.connectionManager, { + myCCG: { + type: DeviceType.CASPARCG, + options: { + host: '127.0.0.1', + retryInterval: undefined, // disable retries explicitly, we will manually trigger them + }, + commandReceiver: commandReceiver0, + skipVirginCheck: true, }, - commandReceiver: commandReceiver0, - skipVirginCheck: true, }) myConductor.setTimelineAndMappings([], myLayerMapping) await mockTime.advanceTimeToTicks(10100) @@ -1470,8 +1504,8 @@ describe('CasparCG', () => { commandReceiver0.mockClear() - const deviceContainer = myConductor.getDevice('myCCG') - const device = deviceContainer!.device + const connContainer = myConductor.connectionManager.getConnection('myCCG') + const conn = connContainer!.device myConductor.setTimelineAndMappings([ { @@ -1513,7 +1547,7 @@ describe('CasparCG', () => { // advance to half way await mockTime.advanceTimeToTicks(10700) // call the retry mechanism - await (device as any)._assertIntendedState() + await (conn as any)._assertIntendedState() await mockTime.advanceTimeToTicks(10800) expect(commandReceiver0).toHaveBeenCalledTimes(2) @@ -1530,13 +1564,13 @@ describe('CasparCG', () => { // apply command to internal ccg-state const resCommand = getMockCall(commandReceiver0, 1, 1) // @ts-ignore - await device._changeTrackedStateFromCommand( + await conn._changeTrackedStateFromCommand( resCommand, { responseCode: 202, command: resCommand.command }, mockTime.getCurrentTime() ) // trigger retry mechanism - await (device as any)._assertIntendedState() + await (conn as any)._assertIntendedState() await mockTime.advanceTimeToTicks(10900) // no retries done expect(commandReceiver0).toHaveBeenCalledTimes(2) @@ -1553,7 +1587,7 @@ describe('CasparCG', () => { // advance time to after clip: await mockTime.advanceTimeToTicks(11700) // call the retry mechanism - await (device as any)._assertIntendedState() + await (conn as any)._assertIntendedState() await mockTime.advanceTimeToTicks(11800) // no retries issued expect(commandReceiver0).toHaveBeenCalledTimes(3) @@ -1581,14 +1615,16 @@ describe('CasparCG', () => { getCurrentTime: mockTime.getCurrentTime, }) await myConductor.init() - await myConductor.addDevice('myCCG', { - type: DeviceType.CASPARCG, - options: { - host: '127.0.0.1', - retryInterval: undefined, // disable retries explicitly, we will manually trigger them + await addConnections(myConductor.connectionManager, { + myCCG: { + type: DeviceType.CASPARCG, + options: { + host: '127.0.0.1', + retryInterval: undefined, // disable retries explicitly, we will manually trigger them + }, + commandReceiver: commandReceiver0, + skipVirginCheck: true, }, - commandReceiver: commandReceiver0, - skipVirginCheck: true, }) myConductor.setTimelineAndMappings([], myLayerMapping) await mockTime.advanceTimeToTicks(10100) @@ -1597,8 +1633,8 @@ describe('CasparCG', () => { commandReceiver0.mockClear() - const deviceContainer = myConductor.getDevice('myCCG') - const device = deviceContainer!.device + const connContainer = myConductor.connectionManager.getConnection('myCCG') + const conn = connContainer!.device myConductor.setTimelineAndMappings([ { @@ -1658,7 +1694,7 @@ describe('CasparCG', () => { // @ts-ignore const resCommand = getMockCall(commandReceiver0, 0, 1) // @ts-ignore - await device._changeTrackedStateFromCommand( + await conn._changeTrackedStateFromCommand( resCommand, { responseCode: 202, command: resCommand.command }, mockTime.getCurrentTime() @@ -1672,7 +1708,7 @@ describe('CasparCG', () => { // advance to half way await mockTime.advanceTimeToTicks(10700) // call the retry mechanism - await (device as any)._assertIntendedState() + await (conn as any)._assertIntendedState() // still no retries as empty always plays expect(commandReceiver0).toHaveBeenCalledTimes(1) @@ -1681,7 +1717,7 @@ describe('CasparCG', () => { // advance time to after clip: await mockTime.advanceTimeToTicks(20700) // call the retry mechanism - await (device as any)._assertIntendedState() + await (conn as any)._assertIntendedState() await mockTime.advanceTimeToTicks(20800) // no retries issued expect(commandReceiver0).toHaveBeenCalledTimes(1) @@ -1710,13 +1746,15 @@ describe('CasparCG', () => { getCurrentTime: mockTime.getCurrentTime, }) await myConductor.init() // we cannot do an await, because setTimeout will never call without jest moving on. - await myConductor.addDevice('myCCG', { - type: DeviceType.CASPARCG, - options: { - host: '127.0.0.1', + await addConnections(myConductor.connectionManager, { + myCCG: { + type: DeviceType.CASPARCG, + options: { + host: '127.0.0.1', + }, + commandReceiver: commandReceiver0, + skipVirginCheck: true, }, - commandReceiver: commandReceiver0, - skipVirginCheck: true, }) await mockTime.advanceTimeToTicks(10100) @@ -1806,13 +1844,15 @@ describe('CasparCG', () => { getCurrentTime: mockTime.getCurrentTime, }) await myConductor.init() // we cannot do an await, because setTimeout will never call without jest moving on. - await myConductor.addDevice('myCCG', { - type: DeviceType.CASPARCG, - options: { - host: '127.0.0.1', + await addConnections(myConductor.connectionManager, { + myCCG: { + type: DeviceType.CASPARCG, + options: { + host: '127.0.0.1', + }, + commandReceiver: commandReceiver0, + skipVirginCheck: true, }, - commandReceiver: commandReceiver0, - skipVirginCheck: true, }) await mockTime.advanceTimeToTicks(10100) diff --git a/packages/timeline-state-resolver/src/integrations/casparCG/index.ts b/packages/timeline-state-resolver/src/integrations/casparCG/index.ts index 1d7a951f1..101b3655b 100644 --- a/packages/timeline-state-resolver/src/integrations/casparCG/index.ts +++ b/packages/timeline-state-resolver/src/integrations/casparCG/index.ts @@ -123,6 +123,7 @@ export class CasparCGDevice extends DeviceWithState { this.makeReady(false) // always make sure timecode is correct, setting it can never do bad @@ -130,8 +131,6 @@ export class CasparCGDevice extends DeviceWithState { - if (this.deviceOptions.skipVirginCheck) return false - // a "virgin server" was just restarted (so it is cleared & black). // Otherwise it was probably just a loss of connection @@ -143,18 +142,29 @@ export class CasparCGDevice extends DeviceWithState>[] = [] const channelLength: number = response?.data?.['length'] ?? 0 - // Issue commands - for (let i = 1; i <= channelLength; i++) { - // 1-based index for channels + for (let i = 0; i < channelLength; i++) { + const obj = response.data[i] + + if (!this._currentState.channels[i]) { + this._currentState.channels[obj.channel] = { + channelNo: obj.channel, + videoMode: this.getVideMode(obj), + fps: obj.frameRate, + layers: {}, + } + } + + if (this.deviceOptions.skipVirginCheck) continue + // Issue command const { error, request } = await this._ccg.executeCommand({ command: Commands.InfoChannel, - params: { channel: i }, + params: { channel: obj.channel }, }) if (error) { // We can't return here, as that will leave anything in channelPromises as potentially unhandled channelPromises.push(Promise.reject('execute failed')) - break + continue } channelPromises.push(request) } @@ -178,10 +188,11 @@ export class CasparCGDevice extends DeviceWithState { @@ -197,25 +208,6 @@ export class CasparCGDevice extends DeviceWithState { - this._currentState.channels[obj.channel] = { - channelNo: obj.channel, - videoMode: this.getVideMode(obj), - fps: obj.frameRate, - layers: {}, - } - }) - } else { - return false // not being able to get channel count is a problem for us - } - if (typeof initOptions.retryInterval === 'number' && initOptions.retryInterval >= 0) { this._retryTime = initOptions.retryInterval || MEDIA_RETRY_INTERVAL this._retryTimeout = setTimeout(() => this._assertIntendedState(), this._retryTime) @@ -235,7 +227,7 @@ export class CasparCGDevice extends DeviceWithState { resolve() this._ccg.removeAllListeners() diff --git a/packages/timeline-state-resolver/src/integrations/lawo/index.ts b/packages/timeline-state-resolver/src/integrations/lawo/index.ts index 380bd90da..6e5065c6a 100644 --- a/packages/timeline-state-resolver/src/integrations/lawo/index.ts +++ b/packages/timeline-state-resolver/src/integrations/lawo/index.ts @@ -23,7 +23,6 @@ export class LawoDevice extends Device this.context.logger.error('Lawo.LawoConnection', e)) this._lawo.on('debug', (...debug) => this.context.logger.debug('Lawo.LawoConnection', ...debug)) - this._lawo.on('debug', (...debug) => console.log('Lawo.LawoConnection', ...debug)) this._lawo.on('connected', (firstConnection) => { if (firstConnection) { // reset state diff --git a/packages/timeline-state-resolver/src/integrations/multiOsc/index.ts b/packages/timeline-state-resolver/src/integrations/multiOsc/index.ts index 46135ad59..56480cbff 100644 --- a/packages/timeline-state-resolver/src/integrations/multiOsc/index.ts +++ b/packages/timeline-state-resolver/src/integrations/multiOsc/index.ts @@ -76,6 +76,11 @@ export class MultiOSCMessageDevice extends Device [id, {}]))) + .catch((e) => this.context.logger.warning('Failed to reset state: ' + e)) + return true } diff --git a/packages/timeline-state-resolver/src/integrations/osc/__tests__/osc.spec.ts b/packages/timeline-state-resolver/src/integrations/osc/__tests__/osc.spec.ts index d2dfa725d..66f86b745 100644 --- a/packages/timeline-state-resolver/src/integrations/osc/__tests__/osc.spec.ts +++ b/packages/timeline-state-resolver/src/integrations/osc/__tests__/osc.spec.ts @@ -24,6 +24,12 @@ jest.mock('osc', () => { on: (event: string, listener: (...args: any[]) => void) => { SOCKET_EVENTS.set(event, listener) }, + once: (event: string, listener: (...args: any[]) => void) => { + SOCKET_EVENTS.set(event, (...args: any[]) => { + SOCKET_EVENTS.delete(event) + return listener(...args) + }) + }, close: jest.fn(), } }), diff --git a/packages/timeline-state-resolver/src/integrations/osc/index.ts b/packages/timeline-state-resolver/src/integrations/osc/index.ts index aa791a11b..13bb71a1f 100644 --- a/packages/timeline-state-resolver/src/integrations/osc/index.ts +++ b/packages/timeline-state-resolver/src/integrations/osc/index.ts @@ -56,9 +56,21 @@ export class OscDevice extends Device { this._oscClientStatus = 'connected' - this.context.connectionChanged(this.getStatus()) + if (firstConnect) { + // note - perhaps we could resend the commands every time we reconnect? or that could be a device option + firstConnect = false + this.context.connectionChanged(this.getStatus()) + this.context + .resetToState({}) + .catch((e) => + this.context.logger.warning( + 'Failed to reset to state after first connection, device may be in unknown state (reason: ' + e + ')' + ) + ) + } }) client.socket.on('close', () => { this._oscClientStatus = 'disconnected' @@ -74,6 +86,15 @@ export class OscDevice extends Device { + this.context + .resetToState({}) + .catch((e) => + this.context.logger.warning( + 'Failed to reset to state after first connection, device may be in unknown state (reason: ' + e + ')' + ) + ) + }) this._oscClient.open() } else { assertNever(options.type) diff --git a/packages/timeline-state-resolver/src/integrations/pharos/index.ts b/packages/timeline-state-resolver/src/integrations/pharos/index.ts index c1e025075..7ff4a0004 100644 --- a/packages/timeline-state-resolver/src/integrations/pharos/index.ts +++ b/packages/timeline-state-resolver/src/integrations/pharos/index.ts @@ -58,6 +58,7 @@ export class PharosDevice extends Device { this.context.logger.info(`Current project: ${info.name}`) + this.context.resetToState({}).catch((e) => this.context.logger.error('Failed to reset state', e)) }) .catch((e) => this.context.logger.error('Failed to query project', e)) }) diff --git a/packages/timeline-state-resolver/src/integrations/quantel/__tests__/quantelGatewayMock.ts b/packages/timeline-state-resolver/src/integrations/quantel/__tests__/quantelGatewayMock.ts index c9ce6754e..74206c5ea 100644 --- a/packages/timeline-state-resolver/src/integrations/quantel/__tests__/quantelGatewayMock.ts +++ b/packages/timeline-state-resolver/src/integrations/quantel/__tests__/quantelGatewayMock.ts @@ -29,9 +29,8 @@ export function setupQuantelGatewayMock() { }, } - // @ts-ignore: not logging const onRequest = jest.fn((_type: string, _url: string) => { - // console.log('onRequest', type, url) + // nothing }) const onRequestRaw = jest.fn((type: string, url: string) => { @@ -223,7 +222,6 @@ async function handleRequest( message: `ISA URL not provided`, stack: '', } - // console.log(type, resource) urlRoute(type, resource, { // @ts-ignore: no need for params @@ -837,7 +835,6 @@ async function handleRequest( }, }) .then((body) => { - // console.log('got responding:', type, resource, body) resolve({ statusCode: quantelServer.requestReturnsOK ? 200 : 500, // body: JSON.stringify(body) diff --git a/packages/timeline-state-resolver/src/integrations/quantel/index.ts b/packages/timeline-state-resolver/src/integrations/quantel/index.ts index 3b3b95dcd..6bdc0e090 100644 --- a/packages/timeline-state-resolver/src/integrations/quantel/index.ts +++ b/packages/timeline-state-resolver/src/integrations/quantel/index.ts @@ -74,16 +74,31 @@ export class QuantelDevice extends Device { this._quantel.monitorServerStatus((connected: boolean) => { - if (!this._disconnectedSince && connected === false && options.suppressDisconnectTime) { + if (!this._disconnectedSince && connected === false) { this._disconnectedSince = Date.now() - // trigger another update after debounce - setTimeout(() => { - if (!this._quantel.connected) { - this.context.connectionChanged(this.getStatus()) - } - }, options.suppressDisconnectTime) + if (options.suppressDisconnectTime) { + // trigger another update after debounce + setTimeout(() => { + if (!this._quantel.connected) { + this.context.connectionChanged(this.getStatus()) + } + }, options.suppressDisconnectTime) + } } else if (connected === true) { + if (!this._disconnectedSince) { + // this must be our first time connecting, so let's resend any commands we missed + this.context + .resetToState({ time: 0, port: {} }) + .catch((e) => + this.context.logger.warning( + 'Failed to reset to state after first connection, device may be in unknown state (reason: ' + + e + + ')' + ) + ) + } + this._disconnectedSince = undefined } diff --git a/packages/timeline-state-resolver/src/integrations/shotoku/index.ts b/packages/timeline-state-resolver/src/integrations/shotoku/index.ts index 73842947a..a0cac93d7 100644 --- a/packages/timeline-state-resolver/src/integrations/shotoku/index.ts +++ b/packages/timeline-state-resolver/src/integrations/shotoku/index.ts @@ -48,6 +48,15 @@ export class ShotokuDevice extends Device { + this.context + .resetToState({ shots: {}, sequences: {} }) + .catch((e) => + this.context.logger.warning( + 'Failed to reset to state after first connection, device may be in unknown state (reason: ' + e + ')' + ) + ) + }) .catch((e) => this.context.logger.debug('Shotoku device failed initial connection attempt', e)) return true diff --git a/packages/timeline-state-resolver/src/integrations/sisyfos/__tests__/sisyfos.spec.ts b/packages/timeline-state-resolver/src/integrations/sisyfos/__tests__/sisyfos.spec.ts index f4014276f..90c9a0055 100644 --- a/packages/timeline-state-resolver/src/integrations/sisyfos/__tests__/sisyfos.spec.ts +++ b/packages/timeline-state-resolver/src/integrations/sisyfos/__tests__/sisyfos.spec.ts @@ -13,7 +13,7 @@ const MockOSC = OSC.MockOSC import { MockTime } from '../../../__tests__/mockTime' import { ThreadedClass } from 'threadedclass' import { SisyfosMessageDevice } from '../../../integrations/sisyfos' -import { getMockCall } from '../../../__tests__/lib' +import { addConnections, getMockCall, waitUntil } from '../../../__tests__/lib' describe('Sisyfos', () => { jest.mock('osc', () => OSC) @@ -82,18 +82,20 @@ describe('Sisyfos', () => { getCurrentTime: mockTime.getCurrentTime, }) await myConductor.init() // we cannot do an await, because setTimeout will never call without jest moving on. - await myConductor.addDevice('mySisyfos', { - type: DeviceType.SISYFOS, - options: { - host: '192.168.0.10', - port: 8900, + await addConnections(myConductor.connectionManager, { + mySisyfos: { + type: DeviceType.SISYFOS, + options: { + host: '192.168.0.10', + port: 8900, + }, + commandReceiver: commandReceiver0, }, - commandReceiver: commandReceiver0, }) myConductor.setTimelineAndMappings([], myChannelMapping) await mockTime.advanceTimeToTicks(10100) - const deviceContainer = myConductor.getDevice('mySisyfos') + const deviceContainer = myConductor.connectionManager.getConnection('mySisyfos') const device = deviceContainer!.device as ThreadedClass // Check that no commands has been scheduled: @@ -309,18 +311,20 @@ describe('Sisyfos', () => { getCurrentTime: mockTime.getCurrentTime, }) await myConductor.init() // we cannot do an await, because setTimeout will never call without jest moving on. - await myConductor.addDevice('mySisyfos', { - type: DeviceType.SISYFOS, - options: { - host: '192.168.0.10', - port: 8900, + await addConnections(myConductor.connectionManager, { + mySisyfos: { + type: DeviceType.SISYFOS, + options: { + host: '192.168.0.10', + port: 8900, + }, + commandReceiver: commandReceiver0, }, - commandReceiver: commandReceiver0, }) myConductor.setTimelineAndMappings([], myChannelMapping) await mockTime.advanceTimeToTicks(10100) - const deviceContainer = myConductor.getDevice('mySisyfos') + const deviceContainer = myConductor.connectionManager.getConnection('mySisyfos') const device = deviceContainer!.device as ThreadedClass // Check that no commands has been scheduled: @@ -535,18 +539,20 @@ describe('Sisyfos', () => { getCurrentTime: mockTime.getCurrentTime, }) await myConductor.init() // we cannot do an await, because setTimeout will never call without jest moving on. - await myConductor.addDevice('mySisyfos', { - type: DeviceType.SISYFOS, - options: { - host: '192.168.0.10', - port: 8900, + await addConnections(myConductor.connectionManager, { + mySisyfos: { + type: DeviceType.SISYFOS, + options: { + host: '192.168.0.10', + port: 8900, + }, + commandReceiver: commandReceiver0, }, - commandReceiver: commandReceiver0, }) myConductor.setTimelineAndMappings([], myChannelMapping) await mockTime.advanceTimeToTicks(10100) - const deviceContainer = myConductor.getDevice('mySisyfos') + const deviceContainer = myConductor.connectionManager.getConnection('mySisyfos') const device = deviceContainer!.device as ThreadedClass // Check that no commands has been scheduled: @@ -673,18 +679,20 @@ describe('Sisyfos', () => { getCurrentTime: mockTime.getCurrentTime, }) await myConductor.init() // we cannot do an await, because setTimeout will never call without jest moving on. - await myConductor.addDevice('mySisyfos', { - type: DeviceType.SISYFOS, - options: { - host: '192.168.0.10', - port: 8900, + await addConnections(myConductor.connectionManager, { + mySisyfos: { + type: DeviceType.SISYFOS, + options: { + host: '192.168.0.10', + port: 8900, + }, + commandReceiver: commandReceiver0, }, - commandReceiver: commandReceiver0, }) myConductor.setTimelineAndMappings([], myChannelMapping) await mockTime.advanceTimeToTicks(10100) - const deviceContainer = myConductor.getDevice('mySisyfos') + const deviceContainer = myConductor.connectionManager.getConnection('mySisyfos') const device = deviceContainer!.device as ThreadedClass // Check that no commands has been scheduled: @@ -891,18 +899,20 @@ describe('Sisyfos', () => { getCurrentTime: mockTime.getCurrentTime, }) await myConductor.init() // we cannot do an await, because setTimeout will never call without jest moving on. - await myConductor.addDevice('mySisyfos', { - type: DeviceType.SISYFOS, - options: { - host: '192.168.0.10', - port: 8900, + await addConnections(myConductor.connectionManager, { + mySisyfos: { + type: DeviceType.SISYFOS, + options: { + host: '192.168.0.10', + port: 8900, + }, + commandReceiver: commandReceiver0, }, - commandReceiver: commandReceiver0, }) myConductor.setTimelineAndMappings([], myChannelMapping) await mockTime.advanceTimeToTicks(10100) - const deviceContainer = myConductor.getDevice('mySisyfos') + const deviceContainer = myConductor.connectionManager.getConnection('mySisyfos') const device = deviceContainer!.device as ThreadedClass // Check that no commands has been scheduled: @@ -1041,17 +1051,19 @@ describe('Sisyfos', () => { }) // myConductor.setTimelineAndMappings([], myChannelMapping) await myConductor.init() - await myConductor.addDevice('mySisyfos', { - type: DeviceType.SISYFOS, - options: { - host: '127.0.0.1', - port: 1234, + await addConnections(myConductor.connectionManager, { + mySisyfos: { + type: DeviceType.SISYFOS, + options: { + host: '192.168.0.10', + port: 8900, + }, + commandReceiver: commandReceiver0, }, - commandReceiver: commandReceiver0, }) await mockTime.advanceTimeToTicks(10100) - const deviceContainer = myConductor.getDevice('mySisyfos') + const deviceContainer = myConductor.connectionManager.getConnection('mySisyfos') const device = deviceContainer!.device as ThreadedClass const onConnectionChanged = jest.fn() @@ -1059,19 +1071,29 @@ describe('Sisyfos', () => { // Check that no commands has been scheduled: expect(await device.queue).toHaveLength(0) - expect(await device.connected).toEqual(true) - expect(onConnectionChanged).toHaveBeenCalledTimes(0) + + // Wait for the connection to be initialized: + await waitUntil( + async () => { + expect(await device.connected).toEqual(true) + }, + 1000, + mockTime + ) // Simulate a connection loss: MockOSC.connectionIsGood = false + // Wait for the OSC timeout to trigger: await mockTime.advanceTimeTicks(3000) await wait(1) await mockTime.advanceTimeTicks(3000) await wait(1) expect(await device.connected).toEqual(false) - expect(onConnectionChanged).toHaveBeenCalledTimes(1) + + expect(onConnectionChanged.mock.calls.length).toBeGreaterThanOrEqual(1) + onConnectionChanged.mockClear() // Simulate a connection regain: MockOSC.connectionIsGood = true @@ -1081,6 +1103,6 @@ describe('Sisyfos', () => { await wait(1) expect(await device.connected).toEqual(true) - expect(onConnectionChanged).toHaveBeenCalledTimes(4) + expect(onConnectionChanged.mock.calls.length).toBeGreaterThanOrEqual(1) }) }) diff --git a/packages/timeline-state-resolver/src/integrations/sisyfos/index.ts b/packages/timeline-state-resolver/src/integrations/sisyfos/index.ts index 759f02dd1..88803b36d 100644 --- a/packages/timeline-state-resolver/src/integrations/sisyfos/index.ts +++ b/packages/timeline-state-resolver/src/integrations/sisyfos/index.ts @@ -85,10 +85,14 @@ export class SisyfosMessageDevice extends DeviceWithState { this._sisyfos.once('initialized', () => { this.setState(this.getDeviceState(false), this.getCurrentTime()) - this.emit('resetResolver') + this.emit('resyncStates') }) - return this._sisyfos.connect(initOptions.host, initOptions.port).then(() => true) + this._sisyfos + .connect(initOptions.host, initOptions.port) + .catch((e) => this.emit('error', 'Failed to initialise Sisyfos connection', e)) + + return true } /** Called by the Conductor a bit before a .handleState is called */ prepareForHandleState(newStateTime: number) { @@ -190,7 +194,7 @@ export class SisyfosMessageDevice extends DeviceWithState { + this._sisyfos.once('initialized', () => { if (resync) { this._resyncing = false const targetState = this.getState(this.getCurrentTime()) @@ -200,7 +204,7 @@ export class SisyfosMessageDevice extends DeviceWithState { // This is where we would do initialization, like connecting to the devices, etc this.initOptions = initOptions - await this._setupWSConnection() + + this._setupWSConnection() + .then(() => { + // assume empty state on start (would be nice if we could get the url for each window on connection) + this.context.resetToState({ windows: {} }).catch((e) => this.context.logger.error('Failed to reset state', e)) + }) + .catch((e) => this.context.logger.error('Failed to initialise Sofie Chef connection', e)) + return true } private async _setupWSConnection() { diff --git a/packages/timeline-state-resolver/src/integrations/tcpSend/index.ts b/packages/timeline-state-resolver/src/integrations/tcpSend/index.ts index 1a5e2dad1..b4fb99f03 100644 --- a/packages/timeline-state-resolver/src/integrations/tcpSend/index.ts +++ b/packages/timeline-state-resolver/src/integrations/tcpSend/index.ts @@ -30,6 +30,17 @@ export class TcpSendDevice extends Device { + this.tcpConnection.once('connectionChanged', (connected) => { + if (connected) { + this.context + .resetState() + .catch((e) => + this.context.logger.warning( + 'Failed to reset state after first connection, device may be in unknown state (reason: ' + e + ')' + ) + ) + } + }) this.tcpConnection.activate(options) return true } diff --git a/packages/timeline-state-resolver/src/integrations/vizMSE/__tests__/vizMSE.spec.ts b/packages/timeline-state-resolver/src/integrations/vizMSE/__tests__/vizMSE.spec.ts index 19c095974..a5283e05c 100644 --- a/packages/timeline-state-resolver/src/integrations/vizMSE/__tests__/vizMSE.spec.ts +++ b/packages/timeline-state-resolver/src/integrations/vizMSE/__tests__/vizMSE.spec.ts @@ -7,13 +7,13 @@ import { SomeMappingVizMSE, TimelineContentTypeVizMSE, VIZMSETransitionType, - VizMSEOptions, VIZMSEPlayoutItemContentExternal, VIZMSEPlayoutItemContentInternal, + VizMSEOptions, } from 'timeline-state-resolver-types' import { MockTime } from '../../../__tests__/mockTime' import { ThreadedClass } from 'threadedclass' -import { getMockCall } from '../../../__tests__/lib' +import { addConnections, awaitNextRemoval, getMockCall } from '../../../__tests__/lib' import { VizMSEDevice } from '..' import * as vConnection from '../../../__mocks__/v-connection' import * as net from '../../../__mocks__/net' @@ -64,18 +64,20 @@ async function setupDevice() { myConductor.setTimelineAndMappings([], myChannelMapping) await myConductor.init() - await myConductor.addDevice('myViz', { - type: DeviceType.VIZMSE, - options: { - host: '127.0.0.1', - preloadAllElements: true, - playlistID: 'my-super-playlist-id', - profile: 'profile9999', - showDirectoryPath: 'SOFIE', + await addConnections(myConductor.connectionManager, { + myViz: { + type: DeviceType.VIZMSE, + options: { + host: '127.0.0.1', + preloadAllElements: true, + playlistID: 'my-super-playlist-id', + profile: 'profile9999', + showDirectoryPath: 'SOFIE', + }, + commandReceiver: commandReceiver0, }, - commandReceiver: commandReceiver0, }) - const deviceContainer = myConductor.getDevice('myViz') + const deviceContainer = myConductor.connectionManager.getConnection('myViz') device = deviceContainer!.device as ThreadedClass return { device, myConductor, onError, commandReceiver0 } @@ -482,7 +484,7 @@ describe('vizMSE', () => { expect(onError).toHaveBeenCalledTimes(0) }) - test('bad init options & basic functionality', async () => { + test('basic functionality', async () => { const myConductor = new Conductor({ multiThreadedResolver: false, getCurrentTime: mockTime.getCurrentTime, @@ -494,37 +496,48 @@ describe('vizMSE', () => { await myConductor.init() - await expect( - myConductor.addDevice('myViz', { - type: DeviceType.VIZMSE, - options: literal>({ - // host: '127.0.0.1', - profile: 'myProfile', - }) as any, - }) - ).rejects.toMatch(/bad option/) - await expect( - // @ts-ignore - myConductor.addDevice('myViz', { - type: DeviceType.VIZMSE, - options: { - host: '127.0.0.1', - // profile: 'myProfile' + await addConnections( + myConductor.connectionManager, + { + myViz: { + type: DeviceType.VIZMSE, + options: literal>({ + // host: '127.0.0.1', + profile: 'myProfile', + }) as any, }, - }) - ).rejects.toMatch(/bad option/) + }, + false + ) + await awaitNextRemoval(myConductor.connectionManager) + await addConnections( + myConductor.connectionManager, + { + myViz: { + type: DeviceType.VIZMSE, + options: literal>({ + host: '127.0.0.1', + // profile: 'myProfile', + }) as any, + }, + }, + false + ) + await awaitNextRemoval(myConductor.connectionManager) expect(onError).toHaveBeenCalledTimes(2) onError.mockClear() - const deviceContainer = await myConductor.addDevice('myViz', { - type: DeviceType.VIZMSE, - options: { - host: '127.0.0.1', - profile: 'myProfile', + await addConnections(myConductor.connectionManager, { + myViz: { + type: DeviceType.VIZMSE, + options: { + host: '127.0.0.1', + profile: 'myProfile', + }, }, }) - const device = deviceContainer.device + const device = myConductor.connectionManager.getConnection('myViz')!.device const connectionChanged = jest.fn() await device.on('connectionChanged', connectionChanged) @@ -587,22 +600,24 @@ describe('vizMSE', () => { }) myConductor.setTimelineAndMappings([], myChannelMapping) await myConductor.init() - await myConductor.addDevice('myViz', { - type: DeviceType.VIZMSE, - options: { - host: '127.0.0.1', - preloadAllElements: true, - playlistID: 'my-super-playlist-id', - profile: 'profile9999', - clearAllTemplateName: 'clear_all_of_them', - clearAllCommands: ['RENDERER*FRONT_LAYER SET_OBJECT ', 'RENDERER SET_OBJECT '], - showDirectoryPath: 'SOFIE', + await addConnections(myConductor.connectionManager, { + myViz: { + type: DeviceType.VIZMSE, + options: { + host: '127.0.0.1', + preloadAllElements: true, + playlistID: 'my-super-playlist-id', + profile: 'profile9999', + clearAllTemplateName: 'clear_all_of_them', + clearAllCommands: ['RENDERER*FRONT_LAYER SET_OBJECT ', 'RENDERER SET_OBJECT '], + showDirectoryPath: 'SOFIE', + }, + commandReceiver: commandReceiver0, }, - commandReceiver: commandReceiver0, }) await mockTime.advanceTimeToTicks(10100) - const deviceContainer = myConductor.getDevice('myViz') + const deviceContainer = myConductor.connectionManager.getConnection('myViz') const device = deviceContainer!.device as ThreadedClass await device.ignoreWaitsInTests() @@ -663,7 +678,7 @@ describe('vizMSE', () => { expect(getMockCall(commandReceiver0, 0, 1)).toMatchObject({ timelineObjId: 'obj0', - time: 10105, + time: 10100, content: { instanceName: expect.stringContaining('myInternalElement'), templateName: 'myInternalElement', @@ -1040,20 +1055,22 @@ describe('vizMSE', () => { getCurrentTime: mockTime.getCurrentTime, }) await myConductor.init() - await myConductor.addDevice('myViz', { - type: DeviceType.VIZMSE, - options: { - host: '127.0.0.1', - preloadAllElements: true, - playlistID: 'my-super-playlist-id', - profile: PROFILE_NAME, - clearAllOnMakeReady: true, - clearAllTemplateName: 'clear_all_of_them', - clearAllCommands: [CLEAR_COMMAND], + await addConnections(myConductor.connectionManager, { + myViz: { + type: DeviceType.VIZMSE, + options: { + host: '127.0.0.1', + preloadAllElements: true, + playlistID: 'my-super-playlist-id', + profile: PROFILE_NAME, + clearAllOnMakeReady: true, + clearAllTemplateName: 'clear_all_of_them', + clearAllCommands: [CLEAR_COMMAND], + }, }, }) - const deviceContainer = myConductor.getDevice('myViz') + const deviceContainer = myConductor.connectionManager.getConnection('myViz') const device = deviceContainer!.device as ThreadedClass await device.ignoreWaitsInTests() @@ -1111,20 +1128,22 @@ describe('vizMSE', () => { getCurrentTime: mockTime.getCurrentTime, }) await myConductor.init() - await myConductor.addDevice('myViz', { - type: DeviceType.VIZMSE, - options: { - host: '127.0.0.1', - preloadAllElements: true, - playlistID: 'my-super-playlist-id', - profile: PROFILE_NAME, - clearAllOnMakeReady: false, - clearAllTemplateName: 'clear_all_of_them', - clearAllCommands: [CLEAR_COMMAND], + await addConnections(myConductor.connectionManager, { + myViz: { + type: DeviceType.VIZMSE, + options: { + host: '127.0.0.1', + preloadAllElements: true, + playlistID: 'my-super-playlist-id', + profile: PROFILE_NAME, + clearAllOnMakeReady: false, + clearAllTemplateName: 'clear_all_of_them', + clearAllCommands: [CLEAR_COMMAND], + }, }, }) - const deviceContainer = myConductor.getDevice('myViz') + const deviceContainer = myConductor.connectionManager.getConnection('myViz') const device = deviceContainer!.device as ThreadedClass await device.ignoreWaitsInTests() diff --git a/packages/timeline-state-resolver/src/integrations/vizMSE/index.ts b/packages/timeline-state-resolver/src/integrations/vizMSE/index.ts index b965bb03c..80957c513 100644 --- a/packages/timeline-state-resolver/src/integrations/vizMSE/index.ts +++ b/packages/timeline-state-resolver/src/integrations/vizMSE/index.ts @@ -142,7 +142,14 @@ export class VizMSEDevice extends DeviceWithState this.emit('error', 'VizMSE', typeof e === 'string' ? new Error(e) : e)) this._vizmseManager.on('debug', (...args) => this.emitDebug(...args)) - await this._vizmseManager.initializeRundown(activeRundownPlaylistId) + this._vizmseManager + .initializeRundown(activeRundownPlaylistId) + .then(() => { + // reset any states we had to re-enforce them + this.clearStates() + this.emit('resyncStates') + }) + .catch((e) => this.emit('error', 'Failed to initialise Viz Rundown', e)) return true } diff --git a/packages/timeline-state-resolver/src/integrations/vmix/__tests__/vmix.spec.ts b/packages/timeline-state-resolver/src/integrations/vmix/__tests__/vmix.spec.ts index 83efaf278..3a8bf0699 100644 --- a/packages/timeline-state-resolver/src/integrations/vmix/__tests__/vmix.spec.ts +++ b/packages/timeline-state-resolver/src/integrations/vmix/__tests__/vmix.spec.ts @@ -26,9 +26,9 @@ import { import { ThreadedClass } from 'threadedclass' import { VMixDevice } from '..' import { MockTime } from '../../../__tests__/mockTime' -import '../../../__tests__/lib' import { CommandContext } from '../vMixCommands' import { prefixAddedInput } from './mockState' +import { addConnections } from '../../../__tests__/lib' const orgSetTimeout = setTimeout @@ -94,19 +94,21 @@ describe('vMix', () => { await myConductor.init() await runPromise( - myConductor.addDevice('myvmix', { - type: DeviceType.VMIX, - options: { - host: '127.0.0.1', - port: 8099, - pollInterval: 0, + addConnections(myConductor.connectionManager, { + myvmix: { + type: DeviceType.VMIX, + options: { + host: '127.0.0.1', + port: 8099, + pollInterval: 0, + }, + commandReceiver: commandReceiver0, }, - commandReceiver: commandReceiver0, }), mockTime ) - const deviceContainer = myConductor.getDevice('myvmix') + const deviceContainer = myConductor.connectionManager.getConnection('myvmix') device = deviceContainer!.device as ThreadedClass const deviceErrorHandler = jest.fn((...args) => console.log('Error in device', ...args)) device.on('error', deviceErrorHandler) @@ -221,7 +223,7 @@ describe('vMix', () => { expect(commandReceiver0).toHaveBeenCalledTimes(1) expect(commandReceiver0).toHaveBeenNthCalledWith( 1, - 17000, + 17001, expect.objectContaining({ command: { command: VMixCommand.REMOVE_INPUT, @@ -273,19 +275,21 @@ describe('vMix', () => { await myConductor.init() await runPromise( - myConductor.addDevice('myvmix', { - type: DeviceType.VMIX, - options: { - host: '127.0.0.1', - port: 8099, - pollInterval: 0, + addConnections(myConductor.connectionManager, { + myvmix: { + type: DeviceType.VMIX, + options: { + host: '127.0.0.1', + port: 8099, + pollInterval: 0, + }, + commandReceiver: commandReceiver0, }, - commandReceiver: commandReceiver0, }), mockTime ) - const deviceContainer = myConductor.getDevice('myvmix') + const deviceContainer = myConductor.connectionManager.getConnection('myvmix') device = deviceContainer!.device as ThreadedClass const deviceErrorHandler = jest.fn((...args) => console.log('Error in device', ...args)) device.on('error', deviceErrorHandler) @@ -585,19 +589,21 @@ describe('vMix', () => { await myConductor.init() await runPromise( - myConductor.addDevice('myvmix', { - type: DeviceType.VMIX, - options: { - host: '127.0.0.1', - port: 8099, - pollInterval: 0, + addConnections(myConductor.connectionManager, { + myvmix: { + type: DeviceType.VMIX, + options: { + host: '127.0.0.1', + port: 8099, + pollInterval: 0, + }, + commandReceiver: commandReceiver0, }, - commandReceiver: commandReceiver0, }), mockTime ) - const deviceContainer = myConductor.getDevice('myvmix') + const deviceContainer = myConductor.connectionManager.getConnection('myvmix') device = deviceContainer!.device as ThreadedClass const deviceErrorHandler = jest.fn((...args) => console.log('Error in device', ...args)) device.on('error', deviceErrorHandler) @@ -829,19 +835,21 @@ describe('vMix', () => { await myConductor.init() await runPromise( - myConductor.addDevice('myvmix', { - type: DeviceType.VMIX, - options: { - host: '127.0.0.1', - port: 8099, - pollInterval: 0, + addConnections(myConductor.connectionManager, { + myvmix: { + type: DeviceType.VMIX, + options: { + host: '127.0.0.1', + port: 8099, + pollInterval: 0, + }, + commandReceiver: commandReceiver0, }, - commandReceiver: commandReceiver0, }), mockTime ) - const deviceContainer = myConductor.getDevice('myvmix') + const deviceContainer = myConductor.connectionManager.getConnection('myvmix') device = deviceContainer!.device as ThreadedClass const deviceErrorHandler = jest.fn((...args) => console.log('Error in device', ...args)) device.on('error', deviceErrorHandler) @@ -1070,7 +1078,7 @@ describe('vMix', () => { ) expect(commandReceiver0).toHaveBeenNthCalledWith( 5, - 17000, + 17001, expect.objectContaining({ command: { command: VMixCommand.REMOVE_INPUT, @@ -1144,19 +1152,21 @@ describe('vMix', () => { await myConductor.init() await runPromise( - myConductor.addDevice('myvmix', { - type: DeviceType.VMIX, - options: { - host: '127.0.0.1', - port: 8099, - pollInterval: 0, + addConnections(myConductor.connectionManager, { + myvmix: { + type: DeviceType.VMIX, + options: { + host: '127.0.0.1', + port: 8099, + pollInterval: 0, + }, + commandReceiver: commandReceiver0, }, - commandReceiver: commandReceiver0, }), mockTime ) - const deviceContainer = myConductor.getDevice('myvmix') + const deviceContainer = myConductor.connectionManager.getConnection('myvmix') device = deviceContainer!.device as ThreadedClass const deviceErrorHandler = jest.fn((...args) => console.log('Error in device', ...args)) device.on('error', deviceErrorHandler) @@ -1495,19 +1505,21 @@ describe('vMix', () => { await myConductor.init() await runPromise( - myConductor.addDevice('myvmix', { - type: DeviceType.VMIX, - options: { - host: '127.0.0.1', - port: 8099, - pollInterval: 0, + addConnections(myConductor.connectionManager, { + myvmix: { + type: DeviceType.VMIX, + options: { + host: '127.0.0.1', + port: 8099, + pollInterval: 0, + }, + commandReceiver: commandReceiver0, }, - commandReceiver: commandReceiver0, }), mockTime ) - const deviceContainer = myConductor.getDevice('myvmix') + const deviceContainer = myConductor.connectionManager.getConnection('myvmix') device = deviceContainer!.device as ThreadedClass const deviceErrorHandler = jest.fn((...args) => console.log('Error in device', ...args)) device.on('error', deviceErrorHandler) @@ -1722,19 +1734,21 @@ describe('vMix', () => { await myConductor.init() await runPromise( - myConductor.addDevice('myvmix', { - type: DeviceType.VMIX, - options: { - host: '127.0.0.1', - port: 8099, - pollInterval: 0, + addConnections(myConductor.connectionManager, { + myvmix: { + type: DeviceType.VMIX, + options: { + host: '127.0.0.1', + port: 8099, + pollInterval: 0, + }, + commandReceiver: commandReceiver0, }, - commandReceiver: commandReceiver0, }), mockTime ) - const deviceContainer = myConductor.getDevice('myvmix') + const deviceContainer = myConductor.connectionManager.getConnection('myvmix') device = deviceContainer!.device as ThreadedClass const deviceErrorHandler = jest.fn((...args) => console.log('Error in device', ...args)) device.on('error', deviceErrorHandler) @@ -1851,19 +1865,21 @@ describe('vMix', () => { await myConductor.init() await runPromise( - myConductor.addDevice('myvmix', { - type: DeviceType.VMIX, - options: { - host: '127.0.0.1', - port: 8099, - pollInterval: 0, + addConnections(myConductor.connectionManager, { + myvmix: { + type: DeviceType.VMIX, + options: { + host: '127.0.0.1', + port: 8099, + pollInterval: 0, + }, + commandReceiver: commandReceiver0, }, - commandReceiver: commandReceiver0, }), mockTime ) - const deviceContainer = myConductor.getDevice('myvmix') + const deviceContainer = myConductor.connectionManager.getConnection('myvmix') device = deviceContainer!.device as ThreadedClass const deviceErrorHandler = jest.fn((...args) => console.log('Error in device', ...args)) device.on('error', deviceErrorHandler) @@ -1977,19 +1993,21 @@ describe('vMix', () => { await myConductor.init() await runPromise( - myConductor.addDevice('myvmix', { - type: DeviceType.VMIX, - options: { - host: '127.0.0.1', - port: 8099, - pollInterval: 0, + addConnections(myConductor.connectionManager, { + myvmix: { + type: DeviceType.VMIX, + options: { + host: '127.0.0.1', + port: 8099, + pollInterval: 0, + }, + commandReceiver: commandReceiver0, }, - commandReceiver: commandReceiver0, }), mockTime ) - const deviceContainer = myConductor.getDevice('myvmix') + const deviceContainer = myConductor.connectionManager.getConnection('myvmix') device = deviceContainer!.device as ThreadedClass const deviceErrorHandler = jest.fn((...args) => console.log('Error in device', ...args)) device.on('error', deviceErrorHandler) @@ -2103,19 +2121,21 @@ describe('vMix', () => { await myConductor.init() await runPromise( - myConductor.addDevice('myvmix', { - type: DeviceType.VMIX, - options: { - host: '127.0.0.1', - port: 8099, - pollInterval: 0, + addConnections(myConductor.connectionManager, { + myvmix: { + type: DeviceType.VMIX, + options: { + host: '127.0.0.1', + port: 8099, + pollInterval: 0, + }, + commandReceiver: commandReceiver0, }, - commandReceiver: commandReceiver0, }), mockTime ) - const deviceContainer = myConductor.getDevice('myvmix') + const deviceContainer = myConductor.connectionManager.getConnection('myvmix') device = deviceContainer!.device as ThreadedClass const deviceErrorHandler = jest.fn((...args) => console.log('Error in device', ...args)) device.on('error', deviceErrorHandler) @@ -2230,19 +2250,21 @@ describe('vMix', () => { await myConductor.init() await runPromise( - myConductor.addDevice('myvmix', { - type: DeviceType.VMIX, - options: { - host: '127.0.0.1', - port: 8099, - pollInterval: 0, + addConnections(myConductor.connectionManager, { + myvmix: { + type: DeviceType.VMIX, + options: { + host: '127.0.0.1', + port: 8099, + pollInterval: 0, + }, + commandReceiver: commandReceiver0, }, - commandReceiver: commandReceiver0, }), mockTime ) - const deviceContainer = myConductor.getDevice('myvmix') + const deviceContainer = myConductor.connectionManager.getConnection('myvmix') device = deviceContainer!.device as ThreadedClass const deviceErrorHandler = jest.fn((...args) => console.log('Error in device', ...args)) device.on('error', deviceErrorHandler) @@ -2361,19 +2383,21 @@ describe('vMix', () => { await myConductor.init() await runPromise( - myConductor.addDevice('myvmix', { - type: DeviceType.VMIX, - options: { - host: '127.0.0.1', - port: 8099, - pollInterval: 0, + addConnections(myConductor.connectionManager, { + myvmix: { + type: DeviceType.VMIX, + options: { + host: '127.0.0.1', + port: 8099, + pollInterval: 0, + }, + commandReceiver: commandReceiver0, }, - commandReceiver: commandReceiver0, }), mockTime ) - const deviceContainer = myConductor.getDevice('myvmix') + const deviceContainer = myConductor.connectionManager.getConnection('myvmix') device = deviceContainer!.device as ThreadedClass const deviceErrorHandler = jest.fn((...args) => console.log('Error in device', ...args)) device.on('error', deviceErrorHandler) @@ -2493,19 +2517,21 @@ describe('vMix', () => { await myConductor.init() await runPromise( - myConductor.addDevice('myvmix', { - type: DeviceType.VMIX, - options: { - host: '127.0.0.1', - port: 8099, - pollInterval: 0, + addConnections(myConductor.connectionManager, { + myvmix: { + type: DeviceType.VMIX, + options: { + host: '127.0.0.1', + port: 8099, + pollInterval: 0, + }, + commandReceiver: commandReceiver0, }, - commandReceiver: commandReceiver0, }), mockTime ) - const deviceContainer = myConductor.getDevice('myvmix') + const deviceContainer = myConductor.connectionManager.getConnection('myvmix') device = deviceContainer!.device as ThreadedClass const deviceErrorHandler = jest.fn((...args) => console.log('Error in device', ...args)) device.on('error', deviceErrorHandler) @@ -2619,19 +2645,21 @@ describe('vMix', () => { await myConductor.init() await runPromise( - myConductor.addDevice('myvmix', { - type: DeviceType.VMIX, - options: { - host: '127.0.0.1', - port: 8099, - pollInterval: 0, + addConnections(myConductor.connectionManager, { + myvmix: { + type: DeviceType.VMIX, + options: { + host: '127.0.0.1', + port: 8099, + pollInterval: 0, + }, + commandReceiver: commandReceiver0, }, - commandReceiver: commandReceiver0, }), mockTime ) - const deviceContainer = myConductor.getDevice('myvmix') + const deviceContainer = myConductor.connectionManager.getConnection('myvmix') device = deviceContainer!.device as ThreadedClass const deviceErrorHandler = jest.fn((...args) => console.log('Error in device', ...args)) device.on('error', deviceErrorHandler) @@ -2747,19 +2775,21 @@ describe('vMix', () => { await myConductor.init() await runPromise( - myConductor.addDevice('myvmix', { - type: DeviceType.VMIX, - options: { - host: '127.0.0.1', - port: 9999, - pollInterval: 0, + addConnections(myConductor.connectionManager, { + myvmix: { + type: DeviceType.VMIX, + options: { + host: '127.0.0.1', + port: 9999, + pollInterval: 0, + }, + commandReceiver: commandReceiver0, }, - commandReceiver: commandReceiver0, }), mockTime ) - const deviceContainer = myConductor.getDevice('myvmix') + const deviceContainer = myConductor.connectionManager.getConnection('myvmix') device = deviceContainer!.device as ThreadedClass const deviceErrorHandler = jest.fn((...args) => console.log('Error in device', ...args)) device.on('error', deviceErrorHandler) @@ -2852,19 +2882,21 @@ describe('vMix', () => { await myConductor.init() await runPromise( - myConductor.addDevice('myvmix', { - type: DeviceType.VMIX, - options: { - host: '127.0.0.1', - port: 9999, - pollInterval: 0, + addConnections(myConductor.connectionManager, { + myvmix: { + type: DeviceType.VMIX, + options: { + host: '127.0.0.1', + port: 9999, + pollInterval: 0, + }, + commandReceiver: commandReceiver0, }, - commandReceiver: commandReceiver0, }), mockTime ) - const deviceContainer = myConductor.getDevice('myvmix') + const deviceContainer = myConductor.connectionManager.getConnection('myvmix') device = deviceContainer!.device as ThreadedClass const deviceErrorHandler = jest.fn((...args) => console.log('Error in device', ...args)) device.on('error', deviceErrorHandler) @@ -2977,19 +3009,21 @@ describe('vMix', () => { await myConductor.init() await runPromise( - myConductor.addDevice('myvmix', { - type: DeviceType.VMIX, - options: { - host: '127.0.0.1', - port: 9999, - pollInterval: 0, + addConnections(myConductor.connectionManager, { + myvmix: { + type: DeviceType.VMIX, + options: { + host: '127.0.0.1', + port: 9999, + pollInterval: 0, + }, + commandReceiver: commandReceiver0, }, - commandReceiver: commandReceiver0, }), mockTime ) - const deviceContainer = myConductor.getDevice('myvmix') + const deviceContainer = myConductor.connectionManager.getConnection('myvmix') device = deviceContainer!.device as ThreadedClass const deviceErrorHandler = jest.fn((...args) => console.log('Error in device', ...args)) device.on('error', deviceErrorHandler) @@ -3056,19 +3090,21 @@ describe('vMix', () => { await myConductor.init() await runPromise( - myConductor.addDevice('myvmix', { - type: DeviceType.VMIX, - options: { - host: '127.0.0.1', - port: 9999, - pollInterval: 0, + addConnections(myConductor.connectionManager, { + myvmix: { + type: DeviceType.VMIX, + options: { + host: '127.0.0.1', + port: 9999, + pollInterval: 0, + }, + commandReceiver: commandReceiver0, }, - commandReceiver: commandReceiver0, }), mockTime ) - const deviceContainer = myConductor.getDevice('myvmix') + const deviceContainer = myConductor.connectionManager.getConnection('myvmix') device = deviceContainer!.device as ThreadedClass const deviceErrorHandler = jest.fn((...args) => console.log('Error in device', ...args)) device.on('error', deviceErrorHandler) @@ -3157,19 +3193,21 @@ describe('vMix', () => { await myConductor.init() await runPromise( - myConductor.addDevice('myvmix', { - type: DeviceType.VMIX, - options: { - host: '127.0.0.1', - port: 9999, - pollInterval: 0, + addConnections(myConductor.connectionManager, { + myvmix: { + type: DeviceType.VMIX, + options: { + host: '127.0.0.1', + port: 9999, + pollInterval: 0, + }, + commandReceiver: commandReceiver0, }, - commandReceiver: commandReceiver0, }), mockTime ) - const deviceContainer = myConductor.getDevice('myvmix') + const deviceContainer = myConductor.connectionManager.getConnection('myvmix') device = deviceContainer!.device as ThreadedClass const deviceErrorHandler = jest.fn((...args) => console.log('Error in device', ...args)) device.on('error', deviceErrorHandler) @@ -3259,19 +3297,21 @@ describe('vMix', () => { await myConductor.init() await runPromise( - myConductor.addDevice('myvmix', { - type: DeviceType.VMIX, - options: { - host: '127.0.0.1', - port: 9999, - pollInterval: 0, + addConnections(myConductor.connectionManager, { + myvmix: { + type: DeviceType.VMIX, + options: { + host: '127.0.0.1', + port: 9999, + pollInterval: 0, + }, + commandReceiver: commandReceiver0, }, - commandReceiver: commandReceiver0, }), mockTime ) - const deviceContainer = myConductor.getDevice('myvmix') + const deviceContainer = myConductor.connectionManager.getConnection('myvmix') device = deviceContainer!.device as ThreadedClass const deviceErrorHandler = jest.fn((...args) => console.log('Error in device', ...args)) device.on('error', deviceErrorHandler) @@ -3378,19 +3418,21 @@ describe('vMix', () => { await myConductor.init() await runPromise( - myConductor.addDevice('myvmix', { - type: DeviceType.VMIX, - options: { - host: '127.0.0.1', - port: 9999, - pollInterval: 0, + addConnections(myConductor.connectionManager, { + myvmix: { + type: DeviceType.VMIX, + options: { + host: '127.0.0.1', + port: 9999, + pollInterval: 0, + }, + commandReceiver: commandReceiver0, }, - commandReceiver: commandReceiver0, }), mockTime ) - const deviceContainer = myConductor.getDevice('myvmix') + const deviceContainer = myConductor.connectionManager.getConnection('myvmix') device = deviceContainer!.device as ThreadedClass const deviceErrorHandler = jest.fn((...args) => console.log('Error in device', ...args)) device.on('error', deviceErrorHandler) @@ -3507,19 +3549,21 @@ describe('vMix', () => { await myConductor.init() await runPromise( - myConductor.addDevice('myvmix', { - type: DeviceType.VMIX, - options: { - host: '127.0.0.1', - port: 9999, - pollInterval: 0, + addConnections(myConductor.connectionManager, { + myvmix: { + type: DeviceType.VMIX, + options: { + host: '127.0.0.1', + port: 9999, + pollInterval: 0, + }, + commandReceiver: commandReceiver0, }, - commandReceiver: commandReceiver0, }), mockTime ) - const deviceContainer = myConductor.getDevice('myvmix') + const deviceContainer = myConductor.connectionManager.getConnection('myvmix') device = deviceContainer!.device as ThreadedClass const deviceErrorHandler = jest.fn((...args) => console.log('Error in device', ...args)) device.on('error', deviceErrorHandler) diff --git a/packages/timeline-state-resolver/src/integrations/vmix/index.ts b/packages/timeline-state-resolver/src/integrations/vmix/index.ts index 7897cb60e..2b49fd98e 100644 --- a/packages/timeline-state-resolver/src/integrations/vmix/index.ts +++ b/packages/timeline-state-resolver/src/integrations/vmix/index.ts @@ -161,6 +161,10 @@ export class VMixDevice extends DeviceWithState>] + connectionInitialised: [id: string] + connectionRemoved: [id: string] +} +export type MappedDeviceEvents = { + [T in keyof DeviceInstanceEvents as `connectionEvent:${T}`]: [string, ...DeviceInstanceEvents[T]] +} + +export class ConnectionManager extends EventEmitter { + private _config: Map = new Map() + private _connections: Map> = new Map() + private _updating = false + + private _connectionAttempts = new Map() + private _nextAttempt: NodeJS.Timeout | undefined + + /** + * Set the config options for all connections + */ + public setConnections(connectionsConfig: Record) { + // run through and see if we need to reset any of the counters + this._config.forEach((conf, id) => { + const newConf = connectionsConfig[id] + if (newConf && configHasChanged(conf, newConf)) { + // new conf warrants an immediate retry + this._connectionAttempts.delete(id) + } + }) + + this._config = new Map(Object.entries(connectionsConfig)) + this._updateConnections() + } + + public getConnections(includeUninitialized = false): Array>> { + if (includeUninitialized) { + return Array.from(this._connections.values()) + } else { + return Array.from(this._connections.values()).filter((conn) => conn.initialized === true) + } + } + + public getConnection( + connectionId: string, + includeUninitialized = false + ): BaseRemoteDeviceIntegration> | undefined { + if (includeUninitialized) { + return this._connections.get(connectionId) + } else { + const connection = this._connections.get(connectionId) + if (connection?.initialized === true) { + return connection + } else { + return undefined + } + } + } + + /** + * Iterate over config and check that the existing connection has the right config, if + * not... recreate it + */ + private _updateConnections() { + if (this._updating) return + this._updating = true + + if (this._nextAttempt) { + clearTimeout(this._nextAttempt) + this._nextAttempt = undefined + } + + const operations: Operation[] = [] + + for (const [deviceId, config] of this._config.entries()) { + // find connection + const connection = this._connections.get(deviceId) + + if (connection) { + // see if it should be restarted because of an update + if (connectionConfigHasChanged(connection, config)) { + operations.push({ operation: 'update', id: deviceId }) + } else if ( + connection.deviceOptions.debug !== config.debug || + connection.deviceOptions.debugState !== config.debugState + ) { + // see if we should set the debug params + operations.push({ operation: 'setDebug', id: deviceId }) + } + } else { + // create + operations.push({ operation: 'create', id: deviceId }) + } + } + + for (const deviceId of this._connections.keys()) { + // find if still in config + const config = this._config.get(deviceId) + + if (!config) { + // not found, so it should be closed + operations.push({ operation: 'delete', id: deviceId }) + } + } + + const isAllowedOp = (op: Operation): boolean => { + if (op.operation !== 'create') return true // allow non-create ops + + const nextCreate = this._connectionAttempts.get(op.id) + if (!nextCreate || nextCreate.next < Date.now()) return true + + return false + } + const allowedOperations = operations.filter(isAllowedOp) + + if (operations.length === 0) { + // no operations needed, so stop the loop + this._updating = false + return + } else if (allowedOperations.length === 0) { + this._updating = false + + // wait until next + const nextTime = Array.from(this._connectionAttempts.values()).reduce((a, b) => (a.next < b.next ? a : b), { + last: Date.now(), // not used + next: Date.now() + 4000, // in 4 seconds + }) + this._nextAttempt = setTimeout(() => { + this._updateConnections() + }, nextTime.next - Date.now()) + + // there's nothing to execute right now so return + return + } + + Promise.allSettled(allowedOperations.map(async (op) => this.executeOperation(op))) + .then(() => { + this._updating = false + + // rerun the algorithm once to make sure we have no missed operations in the meanwhile + this._updateConnections() + }) + .catch((e) => { + this.emit('warning', 'Error encountered while updating connections: ' + e) + }) + } + + private async executeOperation({ operation, id }: Operation): Promise { + try { + switch (operation) { + case 'create': + await this.createConnection(id) + break + case 'delete': + await this.deleteConnection(id) + break + case 'update': + await this.deleteConnection(id) + await this.createConnection(id) + break + case 'setDebug': + await this.setDebugForConnection(id) + break + } + } catch { + this.emit('warning', `Failed to execute "${operation} for ${id}"`) + } + } + + private async createConnection(id: string): Promise { + const deviceOptions = this._config.get(id) + if (!deviceOptions) return // has been removed since, so do not create + + const lastAttempt = this._connectionAttempts.get(id) + const last = lastAttempt?.last ?? Date.now() + this._connectionAttempts.set(id, { + last: Date.now(), + next: Date.now() + Math.min(Math.max(Date.now() - last, 2000) * 2, 60 * 1000), + }) // first retry after 4secs, double it every try, max 60s + + const threadedClassOptions: ThreadedClassConfig = { + threadUsage: deviceOptions.threadUsage || 1, + autoRestart: false, + disableMultithreading: !deviceOptions.isMultiThreaded, + instanceName: id, + freezeLimit: FREEZE_LIMIT, + } + + const container = await createContainer(deviceOptions, id, () => Date.now(), threadedClassOptions) // we rely on threadedclass to timeout if this fails + + if (!container) { + this.emit('warning', 'Failed to create container for ' + id) + return + } + + // set up event handlers + await this._setupDeviceListeners(id, container) + + container.onChildClose = () => { + this.emit('error', 'Connection ' + id + ' closed') + this._connections.delete(id) + this.emit('connectionRemoved', id) + + container + .terminate() + .catch((e) => this.emit('warning', `Failed to initialise ${id} (${e})`)) + .finally(() => { + this._updateConnections() + }) + } + + this._connections.set(id, container) + this.emit('connectionAdded', id, container) + + // trigger connection init + this._handleConnectionInitialisation(id, container) + .then(() => { + this._connectionAttempts.delete(id) + this.emit('connectionInitialised', id) + }) + .catch((e) => { + this.emit('error', 'Connection ' + id + ' failed to initialise', e) + this._connections.delete(id) + this.emit('connectionRemoved', id) + + container + .terminate() + .catch((e) => this.emit('warning', `Failed to initialise ${id} (${e})`)) + .finally(() => { + this._updateConnections() + }) + }) + } + + private async deleteConnection(id: string): Promise { + const connection = this._connections.get(id) + if (!connection) return Promise.resolve() // already removed / never existed + + this._connections.delete(id) + this.emit('connectionRemoved', id) + + return new Promise((resolve) => + deferAsync( + async () => { + let finished = false + setTimeout(() => { + if (!finished) { + resolve() + this.emit('warning', 'Failed to delete connection in time') + + connection.terminate().catch((e) => this.emit('error', 'Failed to terminate connection: ' + e)) + } + }, 30000) + + try { + await connection.device.terminate() + await connection.device.removeAllListeners() + await connection.terminate() + } catch { + await connection.terminate() + } + + finished = true + resolve() + }, + (e) => { + this.emit('warning', 'Error encountered trying to delete connection: ' + e) + } + ) + ) + } + + private async setDebugForConnection(id: string): Promise { + const config = this._config.get(id) + const connection = this._connections.get(id) + if (!connection || !config) return + + try { + await connection.device.setDebugLogging(config.debug ?? false) + await connection.device.setDebugState(config.debugState ?? false) + } catch { + this.emit('warning', 'Failed to update debug values for ' + id) + } + } + + private async _handleConnectionInitialisation( + id: string, + container: BaseRemoteDeviceIntegration + ) { + const deviceOptions = this._config.get(id) + if (!deviceOptions) return // if the config has been removed, the connection should be removed as well so no need to init + + this.emit( + 'info', + `Initializing connection ${id} (${container.instanceId}) of type ${DeviceType[deviceOptions.type]}...` + ) + await container.init(deviceOptions.options, undefined) + await container.reloadProps() + this.emit('info', `Connection ${id} (${container.instanceId}) initialized!`) + } + + private async _setupDeviceListeners( + id: string, + container: BaseRemoteDeviceIntegration + ): Promise { + const passEvent = (ev: T) => { + const evHandler: any = (...args: DeviceInstanceEvents[T]) => + this.emit(('connectionEvent:' + ev) as `connectionEvent:${keyof DeviceInstanceEvents}`, id, ...args) + container.device + .on(ev, evHandler) + .catch((e) => this.emit('error', 'Failed to attach listener for device: ' + id + ' ' + ev, e)) + } + + passEvent('info') + passEvent('warning') + passEvent('error') + passEvent('debug') + passEvent('debugState') + passEvent('connectionChanged') + passEvent('resetResolver') + passEvent('slowCommand') + passEvent('slowSentCommand') + passEvent('slowFulfilledCommand') + passEvent('commandReport') + passEvent('commandError') + passEvent('updateMediaObject') + passEvent('clearMediaObjects') + passEvent('timeTrace') + } +} + +/** + * A config has changed if any of the options are no longer the same, taking default values into + * consideration. In addition, the debug logging flag should be ignored as that can be changed at runtime. + */ +function connectionConfigHasChanged( + connection: BaseRemoteDeviceIntegration>, + config: DeviceOptionsBase +): boolean { + const oldConfig = connection.deviceOptions + + // now check device specific options + return configHasChanged(oldConfig, config) +} +function configHasChanged(oldConfig: DeviceOptionsBase, config: DeviceOptionsBase): boolean { + // now check device specific options + return !_.isEqual(_.omit(oldConfig, 'debug', 'debugState'), _.omit(config, 'debug', 'debugState')) +} + +function createContainer( + deviceOptions: DeviceOptionsAnyInternal, + deviceId: string, + getCurrentTime: () => number, + threadedClassOptions: ThreadedClassConfig +): Promise>> | null { + switch (deviceOptions.type) { + case DeviceType.CASPARCG: + return DeviceContainer.create( + '../../dist/integrations/casparCG/index.js', + 'CasparCGDevice', + deviceId, + deviceOptions, + getCurrentTime, + threadedClassOptions + ) + case DeviceType.SISYFOS: + return DeviceContainer.create( + '../../dist/integrations/sisyfos/index.js', + 'SisyfosMessageDevice', + deviceId, + deviceOptions, + getCurrentTime, + threadedClassOptions + ) + case DeviceType.VIZMSE: + return DeviceContainer.create( + '../../dist/integrations/vizMSE/index.js', + 'VizMSEDevice', + deviceId, + deviceOptions, + getCurrentTime, + threadedClassOptions + ) + case DeviceType.VMIX: + return DeviceContainer.create( + '../../dist/integrations/vmix/index.js', + 'VMixDevice', + deviceId, + deviceOptions, + getCurrentTime, + threadedClassOptions + ) + case DeviceType.SINGULAR_LIVE: + case DeviceType.TELEMETRICS: + case DeviceType.PHAROS: + case DeviceType.ABSTRACT: + case DeviceType.ATEM: + case DeviceType.HTTPSEND: + case DeviceType.HTTPWATCHER: + case DeviceType.HYPERDECK: + case DeviceType.LAWO: + case DeviceType.MULTI_OSC: + case DeviceType.OBS: + case DeviceType.OSC: + case DeviceType.PANASONIC_PTZ: + case DeviceType.SHOTOKU: + case DeviceType.SOFIE_CHEF: + case DeviceType.TCPSEND: + case DeviceType.TRICASTER: + case DeviceType.QUANTEL: { + ensureIsImplementedAsService(deviceOptions.type) + + // presumably this device is implemented in the new service handler + return RemoteDeviceInstance.create(deviceId, deviceOptions, getCurrentTime, threadedClassOptions) + } + default: + assertNever(deviceOptions) + return null + } +} + +function ensureIsImplementedAsService(_type: ImplementedServiceDeviceTypes): void { + // This is a type check +} diff --git a/packages/timeline-state-resolver/src/service/DeviceInstance.ts b/packages/timeline-state-resolver/src/service/DeviceInstance.ts index 5e8554e88..71a2a5f6c 100644 --- a/packages/timeline-state-resolver/src/service/DeviceInstance.ts +++ b/packages/timeline-state-resolver/src/service/DeviceInstance.ts @@ -264,13 +264,13 @@ export class DeviceInstanceWrapper extends EventEmitter { resetState: async () => { await this._stateHandler.setCurrentState(undefined) await this._stateHandler.clearFutureStates() - this.emit('resetResolver') + this.emit('resyncStates') }, resetToState: async (state: any) => { await this._stateHandler.setCurrentState(state) await this._stateHandler.clearFutureStates() - this.emit('resetResolver') + this.emit('resyncStates') }, } } diff --git a/packages/timeline-state-resolver/src/service/__tests__/ConnectionManager.spec.ts b/packages/timeline-state-resolver/src/service/__tests__/ConnectionManager.spec.ts new file mode 100644 index 000000000..9c47cb149 --- /dev/null +++ b/packages/timeline-state-resolver/src/service/__tests__/ConnectionManager.spec.ts @@ -0,0 +1,50 @@ +import { DeviceType, OSCDeviceType } from 'timeline-state-resolver-types' +import { ConstructedMockDevices, MockDeviceInstanceWrapper } from '../../__tests__/mockDeviceInstanceWrapper' +import { ConnectionManager } from '../ConnectionManager' + +// Mock explicitly the 'dist' version, as that is what threadedClass is being told to load +jest.mock('../../../dist/service/DeviceInstance', () => ({ + DeviceInstanceWrapper: MockDeviceInstanceWrapper, +})) +jest.mock('../DeviceInstance', () => ({ + DeviceInstanceWrapper: MockDeviceInstanceWrapper, +})) + +describe('ConnectionManager', () => { + const connManager = new ConnectionManager() + + test('adding/removing a device', async () => { + let resolveAdded: undefined | (() => void) = undefined + const psAdded = new Promise((resolveCb) => (resolveAdded = resolveCb)) + connManager.on('connectionAdded', () => { + if (resolveAdded) resolveAdded() + }) + + let resolveRemoved: undefined | (() => void) = undefined + const psRemoved = new Promise((resolveCb) => (resolveRemoved = resolveCb)) + connManager.on('connectionRemoved', () => { + if (resolveRemoved) resolveRemoved() + }) + + connManager.setConnections({ + osc0: { + type: DeviceType.OSC, + options: { + host: '127.0.0.1', + port: 5250, + type: OSCDeviceType.UDP, + }, + }, + }) + + await psAdded + + expect(ConstructedMockDevices['osc0']).toBeTruthy() + + connManager.setConnections({}) + + await psRemoved + + expect(ConstructedMockDevices['osc0']).toBeFalsy() + }) +}) diff --git a/packages/timeline-state-resolver/src/service/device.ts b/packages/timeline-state-resolver/src/service/device.ts index 50fe277f4..1d01a185f 100644 --- a/packages/timeline-state-resolver/src/service/device.ts +++ b/packages/timeline-state-resolver/src/service/device.ts @@ -113,6 +113,8 @@ export interface DeviceEvents { connectionChanged: [status: Omit] /** A message to the resolver that something has happened that warrants a reset of the resolver (to re-run it again) */ resetResolver: [] + /** A message to the resolver that the device needs to receive all known states */ + resyncStates: [] /** @deprecated replaced by slowSentCommand & slowFulfilledCommand */ slowCommand: [commandInfo: string] diff --git a/packages/timeline-state-resolver/src/service/stateHandler.ts b/packages/timeline-state-resolver/src/service/stateHandler.ts index 25f99e70c..586051575 100644 --- a/packages/timeline-state-resolver/src/service/stateHandler.ts +++ b/packages/timeline-state-resolver/src/service/stateHandler.ts @@ -105,6 +105,9 @@ export class StateHandler { } } + /** + * Sets the current state and makes sure the commands to get to the next state are still corrects + **/ async setCurrentState(state: DeviceState | undefined) { this.currentState = { commands: [], @@ -115,6 +118,43 @@ export class StateHandler { await this.calculateNextStateChange() } + /** + * This takes in a DeviceState and then updates the commands such that the device + * will be put back into its intended state as designated by the timeline + * @todo: this may need to be tied into _executingStateChange variable + */ + async updateStateFromDeviceState(state: DeviceState | undefined) { + // update the current state to the state we received + const timelineState = this.currentState?.state || { + time: this.context.getCurrentTime(), + layers: {}, + nextEvents: [], + } + const currentMappings = this.currentState?.mappings || {} + + this.currentState = { + commands: [], + deviceState: state, + state: timelineState, + mappings: currentMappings, + } + + // calculate how to get to the timeline state (because the device state may have changed based on device config changes or something) + const trace = startTrace('device:convertTimelineStateToDeviceState', { deviceId: this.context.deviceId }) + const deviceState = this.device.convertTimelineStateToDeviceState(timelineState, currentMappings) // @todo - we should probably be recalculating all of these :x + this.context.emitTimeTrace(endTrace(trace)) + + // push a new state + this.stateQueue.unshift({ + deviceState: deviceState, + state: this.currentState?.state || { time: this.context.getCurrentTime(), layers: {}, nextEvents: [] }, + mappings: this.currentState?.mappings || {}, + }) + + // now we let it calculate commands to get into the right state, which should be executed immediately given this state is from the past + await this.calculateNextStateChange() + } + clearFutureAfterTimestamp(t: number) { this.stateQueue = this.stateQueue.filter((s) => s.state.time <= t) }