From 005f0d5723880d5a2a0900e54db220d09fa013ad Mon Sep 17 00:00:00 2001 From: Julian Waller Date: Wed, 2 Aug 2023 09:48:52 +0100 Subject: [PATCH] fix: disconnect called before connection has completed #107 (#150) --- src/lib/atemSocket.ts | 45 ++++++++++++++++++++++++++++--------------- 1 file changed, 29 insertions(+), 16 deletions(-) diff --git a/src/lib/atemSocket.ts b/src/lib/atemSocket.ts index f6f3a4233..b63204a81 100644 --- a/src/lib/atemSocket.ts +++ b/src/lib/atemSocket.ts @@ -3,7 +3,7 @@ import { CommandParser } from './atemCommandParser' import exitHook = require('exit-hook') import { VersionCommand, ISerializableCommand, IDeserializedCommand } from '../commands' import { DEFAULT_PORT } from '../atem' -import { threadedClass, ThreadedClass, ThreadedClassManager, Promisify } from 'threadedclass' +import { threadedClass, ThreadedClass, ThreadedClassManager } from 'threadedclass' import type { AtemSocketChild } from './atemSocketChild' export interface AtemSocketOptions { @@ -30,9 +30,11 @@ export class AtemSocket extends EventEmitter { private readonly _commandParser: CommandParser = new CommandParser() private _nextCommandTrackingId = 0 + private _isDisconnecting = false private _address: string private _port: number = DEFAULT_PORT private _socketProcess: ThreadedClass | undefined + private _creatingSocket: Promise | undefined private _exitUnsubscribe?: () => void constructor(options: AtemSocketOptions) { @@ -45,6 +47,8 @@ export class AtemSocket extends EventEmitter { } public async connect(address?: string, port?: number): Promise { + this._isDisconnecting = false + if (address) { this._address = address } @@ -53,17 +57,24 @@ export class AtemSocket extends EventEmitter { } if (!this._socketProcess) { - this._socketProcess = await this._createSocketProcess() - this._exitUnsubscribe = exitHook(() => { - this.destroy().catch(() => null) - }) - } else { - await this._socketProcess.connect(this._address, this._port) + // cache the creation promise, in case `destroy` is called before it completes + this._creatingSocket = this._createSocketProcess() + await this._creatingSocket + + if (this._isDisconnecting || !this._socketProcess) { + throw new Error('Disconnecting') + } } + + await this._socketProcess.connect(this._address, this._port) } public async destroy(): Promise { await this.disconnect() + + // Ensure thread creation has finished if it was started + if (this._creatingSocket) await this._creatingSocket.catch(() => null) + if (this._socketProcess) { await ThreadedClassManager.destroy(this._socketProcess) this._socketProcess = undefined @@ -75,6 +86,8 @@ export class AtemSocket extends EventEmitter { } public async disconnect(): Promise { + this._isDisconnecting = true + if (this._socketProcess) { await this._socketProcess.disconnect() } @@ -91,7 +104,7 @@ export class AtemSocket extends EventEmitter { commands: Array<{ rawCommand: ISerializableCommand; trackingId: number }> ): Promise { if (this._socketProcess) { - const commands2 = commands.map((cmd) => { + const wrappedCommands = commands.map((cmd) => { if (typeof cmd.rawCommand.serialize !== 'function') { throw new Error(`Command ${cmd.rawCommand.constructor.name} is not serializable`) } @@ -107,14 +120,14 @@ export class AtemSocket extends EventEmitter { } }) - await this._socketProcess.sendCommands(commands2) + await this._socketProcess.sendCommands(wrappedCommands) } else { throw new Error('Socket process is not open') } } - private async _createSocketProcess(): Promise> { - const socketProcess = await threadedClass( + private async _createSocketProcess(): Promise { + this._socketProcess = await threadedClass( './atemSocketChild', 'AtemSocketChild', [ @@ -147,19 +160,19 @@ export class AtemSocket extends EventEmitter { } ) - ThreadedClassManager.onEvent(socketProcess, 'restarted', () => { + ThreadedClassManager.onEvent(this._socketProcess, 'restarted', () => { this.connect().catch((error) => { const errorMsg = `Failed to reconnect after respawning socket process: ${error}` this.emit('error', errorMsg) }) }) - ThreadedClassManager.onEvent(socketProcess, 'thread_closed', () => { + ThreadedClassManager.onEvent(this._socketProcess, 'thread_closed', () => { this.emit('disconnect') }) - await socketProcess.connect(this._address, this._port) - - return socketProcess + this._exitUnsubscribe = exitHook(() => { + this.destroy().catch(() => null) + }) } private _parseCommands(buffer: Buffer): IDeserializedCommand[] {