diff --git a/package-lock.json b/package-lock.json index 7d9df5a8f..ad05b3e5a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -18473,9 +18473,9 @@ } }, "node_modules/interruptor": { - "version": "1.0.1", - "resolved": "https://registry.npmjs.org/interruptor/-/interruptor-1.0.1.tgz", - "integrity": "sha512-/dScvVlMrCyKwHvYHpgAj5OBFDK79LmqdXcPwcWCnk80pR/ldP4JFD4vU7HdvdtLeR6kHGR+efPwuzjuTlj/zg==", + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/interruptor/-/interruptor-1.0.2.tgz", + "integrity": "sha512-sn7EmHLEsE0sFn/0xShWiyd7SUDQaZV9MIWpdm2jnnC1vGRAmZeywv2m+AOtZde6zUTZFLaQnlf1SCIGu7kQWA==", "hasInstallScript": true, "dependencies": { "bindings": "^1.5.0" @@ -29622,6 +29622,11 @@ "node": ">= 8" } }, + "node_modules/web-worker": { + "version": "1.3.0", + "resolved": "https://registry.npmjs.org/web-worker/-/web-worker-1.3.0.tgz", + "integrity": "sha512-BSR9wyRsy/KOValMgd5kMyr3JzpdeoR9KVId8u5GVlTTAtNChlsE4yTxeY7zMdNSyOmoKBv8NH2qeRY9Tg+IaA==" + }, "node_modules/webidl-conversions": { "version": "7.0.0", "resolved": "https://registry.npmjs.org/webidl-conversions/-/webidl-conversions-7.0.0.tgz", @@ -31460,7 +31465,8 @@ "license": "Apache-2.0", "dependencies": { "interruptor": "^1.0.1", - "system-ca": "^2.0.1" + "system-ca": "^2.0.1", + "web-worker": "^1.3.0" }, "devDependencies": { "@mongodb-js/eslint-config-mongosh": "^1.0.0", @@ -37399,6 +37405,7 @@ "postmsg-rpc": "^2.4.0", "prettier": "^2.8.8", "system-ca": "^2.0.1", + "web-worker": "^1.3.0", "webpack-merge": "^5.8.0" } }, @@ -46845,9 +46852,9 @@ "dev": true }, "interruptor": { - "version": "1.0.1", - "resolved": "https://registry.npmjs.org/interruptor/-/interruptor-1.0.1.tgz", - "integrity": "sha512-/dScvVlMrCyKwHvYHpgAj5OBFDK79LmqdXcPwcWCnk80pR/ldP4JFD4vU7HdvdtLeR6kHGR+efPwuzjuTlj/zg==", + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/interruptor/-/interruptor-1.0.2.tgz", + "integrity": "sha512-sn7EmHLEsE0sFn/0xShWiyd7SUDQaZV9MIWpdm2jnnC1vGRAmZeywv2m+AOtZde6zUTZFLaQnlf1SCIGu7kQWA==", "requires": { "bindings": "^1.5.0" } @@ -55400,6 +55407,11 @@ "resolved": "https://registry.npmjs.org/web-streams-polyfill/-/web-streams-polyfill-3.3.3.tgz", "integrity": "sha512-d2JWLCivmZYTSIoge9MsgFCZrt571BikcWGYkjC1khllbTeDlGqZ2D8vD8E/lJa8WGWbb7Plm8/XJYV7IJHZZw==" }, + "web-worker": { + "version": "1.3.0", + "resolved": "https://registry.npmjs.org/web-worker/-/web-worker-1.3.0.tgz", + "integrity": "sha512-BSR9wyRsy/KOValMgd5kMyr3JzpdeoR9KVId8u5GVlTTAtNChlsE4yTxeY7zMdNSyOmoKBv8NH2qeRY9Tg+IaA==" + }, "webidl-conversions": { "version": "7.0.0", "resolved": "https://registry.npmjs.org/webidl-conversions/-/webidl-conversions-7.0.0.tgz", diff --git a/packages/node-runtime-worker-thread/__fixtures__/script-that-throws.js b/packages/node-runtime-worker-thread/__fixtures__/script-that-throws.js deleted file mode 100644 index fbe4be2c2..000000000 --- a/packages/node-runtime-worker-thread/__fixtures__/script-that-throws.js +++ /dev/null @@ -1 +0,0 @@ -throw new Error("Nope, I'm not starting"); diff --git a/packages/node-runtime-worker-thread/package.json b/packages/node-runtime-worker-thread/package.json index 50fbd219c..676522539 100644 --- a/packages/node-runtime-worker-thread/package.json +++ b/packages/node-runtime-worker-thread/package.json @@ -17,7 +17,7 @@ "node": ">=14.15.1" }, "scripts": { - "test": "cross-env TS_NODE_PROJECT=./tsconfig.test.json mocha -r \"../../scripts/import-expansions.js\" --timeout 15000 -r ts-node/register \"./src/**/*.spec.ts\"", + "test": "cross-env TS_NODE_PROJECT=./tsconfig.test.json mocha -r \"../../scripts/import-expansions.js\" -r \"./tests/register-worker.js \" --timeout 15000 -r ts-node/register \"./src/**/*.spec.ts\"", "pretest-ci": "node ../../scripts/run-if-package-requested.js npm run webpack-build -- --no-stats --no-devtool", "test-ci": "node ../../scripts/run-if-package-requested.js npm test", "test-coverage": "nyc --no-clean --cwd ../.. --reporter=none npm run test", @@ -52,6 +52,7 @@ }, "dependencies": { "interruptor": "^1.0.1", - "system-ca": "^2.0.1" + "system-ca": "^2.0.1", + "web-worker": "^1.3.0" } } diff --git a/packages/node-runtime-worker-thread/src/child-process-proxy.spec.ts b/packages/node-runtime-worker-thread/src/child-process-proxy.spec.ts deleted file mode 100644 index 7c32db268..000000000 --- a/packages/node-runtime-worker-thread/src/child-process-proxy.spec.ts +++ /dev/null @@ -1,84 +0,0 @@ -import path from 'path'; -import type { ChildProcess } from 'child_process'; -import { fork, spawn } from 'child_process'; -import type { Caller } from './rpc'; -import { cancel, createCaller } from './rpc'; -import { expect } from 'chai'; -import type { WorkerRuntime } from './worker-runtime'; -import { once } from 'events'; -import { promisify } from 'util'; -import { dummyOptions } from './index.spec'; - -const childProcessModulePath = path.resolve( - __dirname, - '..', - 'dist', - 'child-process-proxy.js' -); - -describe('child process worker proxy', function () { - let caller: Caller; - let childProcess: ChildProcess; - - afterEach(function () { - if (caller) { - caller[cancel](); - caller = null; - } - - if (childProcess) { - childProcess.disconnect(); - childProcess = null; - } - }); - - it('should start worker runtime and proxy calls', async function () { - childProcess = fork(childProcessModulePath); - caller = createCaller(['init', 'evaluate'], childProcess); - await caller.init('mongodb://nodb/', dummyOptions, { nodb: true }); - const result = await caller.evaluate('1 + 1'); - expect(result.printable).to.equal(2); - }); - - it('should exit on its own when the parent process disconnects', async function () { - const intermediateProcess = spawn( - process.execPath, - [ - '-e', - `require("child_process") - .fork(${JSON.stringify(childProcessModulePath)}) - .on("message", function(m) { console.log("message " + m + " from " + this.pid) })`, - ], - { stdio: ['pipe', 'pipe', 'inherit'] } - ); - - // Make sure the outer child process runs and has created the inner child process - const [message] = await once( - intermediateProcess.stdout.setEncoding('utf8'), - 'data' - ); - const match = message.trim().match(/^message ready from (?\d+)$/); - expect(match).to.not.equal(null); - - // Make sure the inner child process runs - const childPid = +match.groups.pid; - process.kill(childPid, 0); - - // Kill the intermediate process and wait for the inner child process to also close - intermediateProcess.kill('SIGTERM'); - let innerChildHasStoppedRunning = false; - for (let i = 0; i < 200; i++) { - try { - process.kill(childPid, 0); - } catch (err) { - if (err.code === 'ESRCH') { - innerChildHasStoppedRunning = true; - break; - } - throw err; - } - await promisify(setTimeout)(10); - } - expect(innerChildHasStoppedRunning).to.equal(true); - }); -}); diff --git a/packages/node-runtime-worker-thread/src/child-process-proxy.ts b/packages/node-runtime-worker-thread/src/child-process-proxy.ts deleted file mode 100644 index 633878869..000000000 --- a/packages/node-runtime-worker-thread/src/child-process-proxy.ts +++ /dev/null @@ -1,124 +0,0 @@ -/* istanbul ignore file */ -/* ^^^ we test the dist directly, so isntanbul can't calculate the coverage correctly */ - -/** - * This proxy is needed as a workaround for the old electron verison "bug" where - * due to the electron runtime being a chromium, not just node (even with - * `ELECTRON_RUN_AS_NODE` enabled), SIGINT doesn't break code execution. This is - * fixed in the later versions of electron/node but we are still on the older - * one, we have to have this proxy in place - * - * @todo as soon as we update electron version in compass, we can get rid of - * this part of the worker runtime as it becomes redundant - * - * @see {@link https://github.com/nodejs/node/pull/36344} - */ -import { once } from 'events'; -import { SHARE_ENV, Worker } from 'worker_threads'; -import path from 'path'; -import { exposeAll, createCaller } from './rpc'; -import type { InterruptHandle } from 'interruptor'; -import { interrupt as nativeInterrupt } from 'interruptor'; - -const workerRuntimeSrcPath = - process.env.WORKER_RUNTIME_SRC_PATH_DO_NOT_USE_THIS_EXCEPT_FOR_TESTING || - path.resolve(__dirname, 'worker-runtime.js'); - -const workerProcess = new Worker(workerRuntimeSrcPath, { env: SHARE_ENV }); - -const workerReadyPromise: Promise = (async () => { - const waitForReadyMessage = async () => { - let msg: string; - while (([msg] = await once(workerProcess, 'message'))) { - if (msg === 'ready') return; - } - }; - - const waitForError = async () => { - const [err] = await once(workerProcess, 'error'); - if (err) { - err.message = `Worker thread failed to start with the following error: ${err.message}`; - throw err; - } - }; - - await Promise.race([waitForReadyMessage(), waitForError()]); -})(); - -// We expect the amount of listeners to be more than the default value of 10 but -// probably not more than ~25 (all exposed methods on -// ChildProcessEvaluationListener and ChildProcessMongoshBus + any concurrent -// in-flight calls on ChildProcessRuntime) at once -process.setMaxListeners(25); -workerProcess.setMaxListeners(25); - -let interruptHandle: InterruptHandle | null = null; - -const { interrupt } = createCaller(['interrupt'], workerProcess); - -const worker = Object.assign( - createCaller( - ['init', 'evaluate', 'getCompletions', 'getShellPrompt'], - workerProcess - ), - { - interrupt(): boolean { - if (interruptHandle) { - nativeInterrupt(interruptHandle); - return true; - } - - return interrupt(); - }, - } -); - -function waitForWorkerReadyProxy(fn: T): T { - return new Proxy(fn, { - async apply(target, thisArg, argumentsList) { - await workerReadyPromise; - return target.call(thisArg, ...Array.from(argumentsList)); - }, - }); -} - -// Every time parent process wants to request something from worker through -// proxy, we want to make sure worker process is ready -(Object.keys(worker) as (keyof typeof worker)[]).forEach((key) => { - worker[key] = waitForWorkerReadyProxy(worker[key]); -}); - -exposeAll(worker, process); - -const evaluationListener = Object.assign( - createCaller( - [ - 'onPrint', - 'onPrompt', - 'getConfig', - 'setConfig', - 'resetConfig', - 'validateConfig', - 'listConfigOptions', - 'onClearCommand', - 'onExit', - ], - process - ), - { - onRunInterruptible(handle: InterruptHandle | null) { - interruptHandle = handle; - }, - } -); - -exposeAll(evaluationListener, workerProcess); - -const messageBus = createCaller(['emit', 'on'], process); - -exposeAll(messageBus, workerProcess); - -process.once('disconnect', () => process.exit()); -process.nextTick(() => { - process.send?.('ready'); -}); diff --git a/packages/node-runtime-worker-thread/src/index.spec.ts b/packages/node-runtime-worker-thread/src/index.spec.ts index 7037a812f..bd0fd7446 100644 --- a/packages/node-runtime-worker-thread/src/index.spec.ts +++ b/packages/node-runtime-worker-thread/src/index.spec.ts @@ -1,4 +1,3 @@ -import path from 'path'; import chai, { expect } from 'chai'; import sinon from 'sinon'; import sinonChai from 'sinon-chai'; @@ -24,7 +23,7 @@ function sleep(ms: number) { } describe('WorkerRuntime', function () { - let runtime: WorkerRuntime; + let runtime: WorkerRuntime | null = null; afterEach(async function () { if (runtime) { @@ -33,69 +32,6 @@ describe('WorkerRuntime', function () { } }); - describe('spawn errors', function () { - const brokenScript = path.resolve( - __dirname, - '..', - '__fixtures__', - 'script-that-throws.js' - ); - - afterEach(function () { - delete process - .env.CHILD_PROCESS_PROXY_SRC_PATH_DO_NOT_USE_THIS_EXCEPT_FOR_TESTING; - }); - - it('should return init error if child process failed to spawn', async function () { - process.env.CHILD_PROCESS_PROXY_SRC_PATH_DO_NOT_USE_THIS_EXCEPT_FOR_TESTING = - brokenScript; - - runtime = new WorkerRuntime('mongodb://nodb/', dummyOptions, { - nodb: true, - }); - - let err; - - try { - await runtime.evaluate('1+1'); - } catch (e: any) { - err = e; - } - - expect(err).to.be.instanceof(Error); - expect(err) - .to.have.property('message') - .match(/Child process failed to start/); - }); - - it('should return init error if worker in child process failed to spawn', async function () { - runtime = new WorkerRuntime( - 'mongodb://nodb/', - dummyOptions, - { nodb: true }, - { - env: { - WORKER_RUNTIME_SRC_PATH_DO_NOT_USE_THIS_EXCEPT_FOR_TESTING: - brokenScript, - }, - } - ); - - let err; - - try { - await runtime.evaluate('1+1'); - } catch (e: any) { - err = e; - } - - expect(err).to.be.instanceof(Error); - expect(err) - .to.have.property('message') - .match(/Worker thread failed to start/); - }); - }); - describe('evaluate', function () { it('should evaluate and return basic values', async function () { runtime = new WorkerRuntime('mongodb://nodb/', dummyOptions, { @@ -267,32 +203,16 @@ describe('WorkerRuntime', function () { }); describe('terminate', function () { - function isRunning(pid: number): boolean { - try { - process.kill(pid, 0); - return true; - } catch (e: any) { - return false; - } - } - // We will be testing a bunch of private props that can be accessed only with // strings to make TS happy it('should terminate child process', async function () { const runtime = new WorkerRuntime('mongodb://nodb/', dummyOptions, { nodb: true, }); + await runtime.waitForRuntimeToBeReady(); + const terminateSpy = sinon.spy(runtime['workerProcess'], 'terminate'); await runtime.terminate(); - expect(runtime['childProcess']).to.have.property('killed', true); - expect(isRunning(runtime['childProcess'].pid)).to.equal(false); - }); - - it('should remove all listeners from childProcess', async function () { - const runtime = new WorkerRuntime('mongodb://nodb/', dummyOptions, { - nodb: true, - }); - await runtime.terminate(); - expect(runtime['childProcess'].listenerCount('message')).to.equal(0); + expect(terminateSpy.calledOnce).to.be.true; }); it('should cancel any in-flight runtime calls', async function () { @@ -318,85 +238,166 @@ describe('WorkerRuntime', function () { }); describe('interrupt', function () { - it('should interrupt in-flight async tasks', async function () { - runtime = new WorkerRuntime('mongodb://nodb/', dummyOptions, { - nodb: true, + context('async tasks', function () { + it('should interrupt in-flight tasks', async function () { + runtime = new WorkerRuntime('mongodb://nodb/', dummyOptions, { + nodb: true, + }); + + await runtime.waitForRuntimeToBeReady(); + + let err: Error; + + try { + await Promise.all([ + runtime.evaluate('sleep(1000000)'), + (async () => { + // This is flaky when not enough time is given to the worker to + // finish the sync part of the work. If it causes too much issues + // it would be okay to disable this test completely + await sleep(5000); + await runtime.interrupt(); + })(), + ]); + } catch (e: any) { + err = e; + } + + expect(err).to.be.instanceof(Error); + expect(err) + .to.have.property('message') + .match(/Async script execution was interrupted/); }); - await runtime.waitForRuntimeToBeReady(); + it('should allow to evaluate again after interruption', async function () { + runtime = new WorkerRuntime('mongodb://nodb/', dummyOptions, { + nodb: true, + }); - let err: Error; + await runtime.waitForRuntimeToBeReady(); - try { - await Promise.all([ - runtime.evaluate('sleep(1000000)'), - (async () => { - // This is flaky when not enought time given to the worker to - // finish the sync part of the work. If it causes too much issues - // it would be okay to disable this test completely - await sleep(5000); - await runtime.interrupt(); - })(), - ]); - } catch (e: any) { - err = e; - } + try { + await Promise.all([ + runtime.evaluate('sleep(1000000)'), + (async () => { + await sleep(200); + await runtime.interrupt(); + })(), + ]); + } catch (e: any) { + // ignore + } - expect(err).to.be.instanceof(Error); - expect(err) - .to.have.property('message') - .match(/Async script execution was interrupted/); - }); + const result = await runtime.evaluate('1+1'); - it('should interrupt in-flight synchronous tasks', async function () { - runtime = new WorkerRuntime('mongodb://nodb/', dummyOptions, { - nodb: true, + expect(result).to.have.property('printable', 2); }); - await runtime.waitForRuntimeToBeReady(); + it('should preserve the context after interruption', async function () { + runtime = new WorkerRuntime('mongodb://nodb/', dummyOptions, { + nodb: true, + }); - let err: Error; + await runtime.waitForRuntimeToBeReady(); - try { - await Promise.all([ - runtime.evaluate('while(true){}'), - (async () => { - await sleep(200); - await runtime.interrupt(); - })(), - ]); - } catch (e: any) { - err = e; - } + await runtime.evaluate('let x = 1'); + await runtime.evaluate('x = x + 2'); - expect(err).to.be.instanceof(Error); - expect(err) - .to.have.property('message') - .match(/Script execution was interrupted/); + try { + await Promise.all([ + runtime.evaluate('sleep(1000000)'), + (async () => { + await sleep(200); + await runtime.interrupt(); + })(), + ]); + } catch (e: any) { + // ignore + } + + const result = await runtime.evaluate('x + 3'); + + expect(result).to.have.property('printable', 6); + }); }); - it('should allow to evaluate again after interruption', async function () { - runtime = new WorkerRuntime('mongodb://nodb/', dummyOptions, { - nodb: true, + context('sync tasks', function () { + it('should interrupt in-flight tasks', async function () { + runtime = new WorkerRuntime('mongodb://nodb/', dummyOptions, { + nodb: true, + }); + + await runtime.waitForRuntimeToBeReady(); + + let err: Error; + + try { + await Promise.all([ + runtime.evaluate('while(true){}'), + (async () => { + await sleep(200); + await runtime.interrupt(); + })(), + ]); + } catch (e: any) { + err = e; + } + + expect(err).to.be.instanceof(Error); + expect(err) + .to.have.property('message') + .match(/Script execution was interrupted/); }); - await runtime.waitForRuntimeToBeReady(); + it('should allow to evaluate again after interruption', async function () { + runtime = new WorkerRuntime('mongodb://nodb/', dummyOptions, { + nodb: true, + }); - try { - await Promise.all([ - runtime.evaluate('while(true){}'), - (async () => { - await sleep(200); - await runtime.interrupt(); - })(), - ]); - } catch (e: any) { - // ignore - } + await runtime.waitForRuntimeToBeReady(); - const result = await runtime.evaluate('1+1'); + try { + await Promise.all([ + runtime.evaluate('while(true){}'), + (async () => { + await sleep(200); + await runtime.interrupt(); + })(), + ]); + } catch (e: any) { + // ignore + } + + const result = await runtime.evaluate('1+1'); + + expect(result).to.have.property('printable', 2); + }); + + it('should preserve the context after interruption', async function () { + runtime = new WorkerRuntime('mongodb://nodb/', dummyOptions, { + nodb: true, + }); + + await runtime.waitForRuntimeToBeReady(); - expect(result).to.have.property('printable', 2); + await runtime.evaluate('let x = 1'); + await runtime.evaluate('x = x + 2'); + + try { + await Promise.all([ + runtime.evaluate('while(true){}'), + (async () => { + await sleep(200); + await runtime.interrupt(); + })(), + ]); + } catch (e: any) { + // ignore + } + + const result = await runtime.evaluate('x + 3'); + expect(result).to.have.property('printable', 6); + }); }); }); }); diff --git a/packages/node-runtime-worker-thread/src/index.ts b/packages/node-runtime-worker-thread/src/index.ts index 818ad57f5..5c417977c 100644 --- a/packages/node-runtime-worker-thread/src/index.ts +++ b/packages/node-runtime-worker-thread/src/index.ts @@ -1,8 +1,6 @@ /* istanbul ignore file */ -/* ^^^ we test the dist directly, so isntanbul can't calculate the coverage correctly */ +/* ^^^ we test the dist directly, so istanbul can't calculate the coverage correctly */ -import type { ChildProcess, SpawnOptionsWithoutStdio } from 'child_process'; -import { spawn } from 'child_process'; import type { Runtime, RuntimeEvaluationListener, @@ -11,184 +9,162 @@ import type { import type { MongoshBus } from '@mongosh/types'; import path from 'path'; import { EventEmitter, once } from 'events'; -import { kill } from './spawn-child-from-source'; +import { pathToFileURL } from 'url'; import type { Caller } from './rpc'; -import { createCaller, cancel } from './rpc'; -import { ChildProcessEvaluationListener } from './child-process-evaluation-listener'; +import { createCaller, cancel, exposeAll } from './rpc'; import type { WorkerRuntime as WorkerThreadWorkerRuntime } from './worker-runtime'; import { deserializeEvaluationResult, serializeConnectOptions, } from './serializer'; -import { ChildProcessMongoshBus } from './child-process-mongosh-bus'; import type { CompassServiceProvider } from '@mongosh/service-provider-server'; +import type { InterruptHandle } from 'interruptor'; +import { interrupt as nativeInterrupt } from 'interruptor'; +import { WorkerThreadEvaluationListener } from './worker-thread-evaluation-listener'; +import { WorkerProcessMongoshBus } from './worker-process-mongosh-bus'; type DevtoolsConnectOptions = Parameters< (typeof CompassServiceProvider)['connect'] >[1]; -type ChildProcessRuntime = Caller; - -function parseStderrToError(str: string): Error | null { - const [, errorMessageWithStack] = str - .split(/^\s*\^\s*$/m) - .map((part) => part.trim()); - - if (errorMessageWithStack) { - const e = new Error(); - const errorHeader = - errorMessageWithStack.substring( - 0, - errorMessageWithStack.search(/^\s*at/m) - ) || errorMessageWithStack; - - const [name, ...message] = errorHeader.split(': '); - - e.name = name; - e.message = message.join(': ').trim(); - e.stack = errorMessageWithStack; - - return e; - } - - return null; -} +type WorkerThreadRuntime = Caller; class WorkerRuntime implements Runtime { private initOptions: { uri: string; driverOptions: DevtoolsConnectOptions; cliOptions: { nodb?: boolean }; - spawnOptions: SpawnOptionsWithoutStdio; + workerOptions: WorkerOptions; }; evaluationListener: RuntimeEvaluationListener | null = null; private eventEmitter: MongoshBus; - private childProcessMongoshBus!: ChildProcessMongoshBus; + private workerProcess!: Worker; - private childProcessEvaluationListener!: ChildProcessEvaluationListener; + private workerProcessRuntime!: WorkerThreadRuntime; - private childProcess!: ChildProcess; + private initWorkerPromise: Promise; - private childProcessRuntime!: ChildProcessRuntime; + private workerThreadEvaluationListener!: WorkerThreadEvaluationListener; - private initWorkerPromise: Promise; + private workerProcessMongoshBus!: WorkerProcessMongoshBus; - private childProcessProxySrcPath: string = - process.env - .CHILD_PROCESS_PROXY_SRC_PATH_DO_NOT_USE_THIS_EXCEPT_FOR_TESTING || - path.resolve(__dirname, 'child-process-proxy.js'); + private workerProcessPath = path.resolve(__dirname, 'worker-runtime.js'); constructor( uri: string, driverOptions: DevtoolsConnectOptions, cliOptions: { nodb?: boolean } = {}, - spawnOptions: SpawnOptionsWithoutStdio = {}, + workerOptions: WorkerOptions = {}, eventEmitter: MongoshBus = new EventEmitter() ) { - this.initOptions = { uri, driverOptions, cliOptions, spawnOptions }; + this.initOptions = { uri, driverOptions, cliOptions, workerOptions }; this.eventEmitter = eventEmitter; this.initWorkerPromise = this.initWorker(); } private async initWorker() { - const { uri, driverOptions, cliOptions, spawnOptions } = this.initOptions; - - this.childProcess = spawn( - process.execPath, - [this.childProcessProxySrcPath], - { - stdio: ['inherit', 'inherit', 'pipe', 'ipc'], - ...spawnOptions, - } + const workerProcess = new Worker( + pathToFileURL(this.workerProcessPath).href, + this.initOptions.workerOptions ); - const waitForReadyMessage = async () => { - let msg: string; - while (([msg] = await once(this.childProcess, 'message'))) { - if (msg === 'ready') return; - } - }; - - let spawnError = ''; + const workerReadyPromise = async (): Promise => { + const waitForReadyMessage = async () => { + let msg: { + data: string; + }; + while (([msg] = await once(workerProcess, 'message'))) { + if (msg?.data === 'ready') return; + } + }; + + const waitForError = async () => { + const [err] = await once(workerProcess, 'error'); + if (err) { + err.message = `Worker thread failed to start with error: ${ + (err as Error).message + }`; + throw err; + } + }; - this.childProcess?.stderr?.setEncoding('utf8')?.on('data', (chunk) => { - spawnError += chunk; - }); + await Promise.race([waitForReadyMessage(), waitForError()]); + }; - const waitForError = async () => { - const [exitCode] = await once(this.childProcess, 'exit'); + await workerReadyPromise(); - if (exitCode) { - let error = parseStderrToError(spawnError); + const { interrupt } = createCaller(['interrupt'], workerProcess); - if (error) { - error.message = `Child process failed to start with the following error: ${error.message}`; - } else { - error = new Error( - `Worker runtime failed to start: child process exited with code ${ - exitCode as number | string - }` - ); - } + let interruptHandle: InterruptHandle | null = null; - throw error; + this.workerProcessRuntime = Object.assign( + createCaller( + [ + 'init', + 'evaluate', + 'getCompletions', + 'getShellPrompt', + 'setEvaluationListener', + 'interrupt', + ], + workerProcess + ), + { + interrupt(): boolean { + if (interruptHandle) { + nativeInterrupt(interruptHandle); + return true; + } + + return interrupt(); + }, } - }; - - await Promise.race([waitForReadyMessage(), waitForError()]); - - // We expect the amount of listeners to be more than the default value of 10 - // but probably not more than ~25 (all exposed methods on - // ChildProcessEvaluationListener and ChildProcessMongoshBus + any - // concurrent in-flight calls on ChildProcessRuntime) at once - this.childProcess.setMaxListeners(25); - - this.childProcessRuntime = createCaller( - [ - 'init', - 'evaluate', - 'getCompletions', - 'setEvaluationListener', - 'getShellPrompt', - 'interrupt', - ], - this.childProcess ); - this.childProcessEvaluationListener = new ChildProcessEvaluationListener( + this.workerThreadEvaluationListener = new WorkerThreadEvaluationListener( this, - this.childProcess + workerProcess + ); + + exposeAll( + { + onRunInterruptible(handle: InterruptHandle | null) { + interruptHandle = handle; + }, + }, + workerProcess ); - this.childProcessMongoshBus = new ChildProcessMongoshBus( + this.workerProcessMongoshBus = new WorkerProcessMongoshBus( this.eventEmitter, - this.childProcess + workerProcess ); - await this.childProcessRuntime.init( - uri, - serializeConnectOptions(driverOptions), - cliOptions + await this.workerProcessRuntime.init( + this.initOptions.uri, + serializeConnectOptions(this.initOptions.driverOptions), + this.initOptions.cliOptions ); + this.workerProcess = workerProcess; } async evaluate(code: string): Promise { await this.initWorkerPromise; return deserializeEvaluationResult( - await this.childProcessRuntime.evaluate(code) + await this.workerProcessRuntime.evaluate(code) ); } async getCompletions(code: string) { await this.initWorkerPromise; - return await this.childProcessRuntime.getCompletions(code); + return await this.workerProcessRuntime.getCompletions(code); } async getShellPrompt() { await this.initWorkerPromise; - return await this.childProcessRuntime.getShellPrompt(); + return await this.workerProcessRuntime.getShellPrompt(); } setEvaluationListener(listener: RuntimeEvaluationListener | null) { @@ -201,28 +177,30 @@ class WorkerRuntime implements Runtime { try { await this.initWorkerPromise; } catch { - // In case child process encountered an error during init we still want - // to clean up whatever possible + // In case the worker thread encountered an error during init + // we still want to clean up whatever possible. } - await kill(this.childProcess); + if (this.workerProcessRuntime) { + this.workerProcessRuntime[cancel](); + } - if (this.childProcessRuntime) { - this.childProcessRuntime[cancel](); + if (this.workerProcess) { + this.workerProcess.terminate(); } - if (this.childProcessEvaluationListener) { - this.childProcessEvaluationListener.terminate(); + if (this.workerThreadEvaluationListener) { + this.workerThreadEvaluationListener.terminate(); } - if (this.childProcessMongoshBus) { - this.childProcessMongoshBus.terminate(); + if (this.workerProcessMongoshBus) { + this.workerProcessMongoshBus.terminate(); } } async interrupt() { await this.initWorkerPromise; - return this.childProcessRuntime.interrupt(); + return this.workerProcessRuntime.interrupt(); } async waitForRuntimeToBeReady() { diff --git a/packages/node-runtime-worker-thread/src/rpc.spec.ts b/packages/node-runtime-worker-thread/src/rpc.spec.ts index fc329b468..ac7880299 100644 --- a/packages/node-runtime-worker-thread/src/rpc.spec.ts +++ b/packages/node-runtime-worker-thread/src/rpc.spec.ts @@ -1,58 +1,24 @@ import { expect } from 'chai'; import { EventEmitter } from 'events'; -import type { Caller, Exposed } from './rpc'; -import { - createCaller, - exposeAll, - close, - cancel, - serialize, - deserialize, - removeTrailingUndefined, -} from './rpc'; - -function createMockRpcMesageBus() { - const bus = new (class Bus extends EventEmitter { - send(data: any) { - this.emit('message', data); - } - })(); - return bus; +import type { Caller, Exposed, RPCMessageBus } from './rpc'; +import { createCaller, exposeAll, close, cancel } from './rpc'; + +function createMockRpcMesageBus(): RPCMessageBus { + const ee = new EventEmitter(); + return { + addEventListener: ee.on.bind(ee), + removeEventListener: ee.off.bind(ee), + postMessage: (data: unknown) => ee.emit('message', data), + }; } function sleep(ms: number) { return new Promise((resolve) => setTimeout(resolve, ms)); } -describe('rpc helpers', function () { - describe('serialize', function () { - it('returns base64 representation of an input', function () { - expect(serialize('Hello')).to.match(/data:;base64,\/w[08]iBUhlbGxv/); - }); - }); - - describe('deserialize', function () { - it("converts base64 representation of input back to it's original form", function () { - expect(deserialize(serialize('Hello'))).to.equal('Hello'); - }); - - it("returns original string if it's not a base64 data uri", function () { - expect(deserialize('Hi')).to.equal('Hi'); - }); - }); - - describe('removeTrailingUndefined', function () { - it('removes trailing undefineds from an array', function () { - expect( - removeTrailingUndefined([1, 2, 3, undefined, undefined, undefined]) - ).to.deep.equal([1, 2, 3]); - }); - }); -}); - describe('rpc', function () { - let messageBus: EventEmitter; + let messageBus: RPCMessageBus | null; let caller: Caller<{ meow(...args: any[]): string; throws(...args: any[]): never; @@ -61,11 +27,10 @@ describe('rpc', function () { woof(...args: any[]): string; neverResolves(...args: any[]): void; }>; - let exposed: Exposed; + let exposed: Exposed | null; afterEach(function () { if (messageBus) { - messageBus.removeAllListeners(); messageBus = null; } @@ -126,7 +91,7 @@ describe('rpc', function () { .match(/TypeError: Uh-oh, error!\r?\n\s+at throws/); }); - it('throws on client if arguments are not serializable', async function () { + it('allows undefined response', async function () { messageBus = createMockRpcMesageBus(); caller = createCaller(['callMe'], messageBus); @@ -139,21 +104,11 @@ describe('rpc', function () { messageBus ); - let err: Error; - - try { - await caller.callMe((a: number, b: number) => a + b); - } catch (e: any) { - err = e; - } - - expect(err).to.be.instanceof(Error); - expect(err) - .to.have.property('message') - .match(/could not be cloned/); + expect(await caller.callMe((a: number, b: number) => a + b)).to.be + .undefined; }); - it('throws on client if retured value from the server is not serializable', async function () { + it('allows function response', async function () { messageBus = createMockRpcMesageBus(); caller = createCaller(['returnsFunction'], messageBus); @@ -166,18 +121,7 @@ describe('rpc', function () { messageBus ); - let err: Error; - - try { - await caller.returnsFunction(); - } catch (e: any) { - err = e; - } - - expect(err).to.be.instanceof(Error); - expect(err) - .to.have.property('message') - .match(/could not be cloned/); + expect(await caller.returnsFunction()).to.be.instanceof(Function); }); describe('createCaller', function () { @@ -192,7 +136,7 @@ describe('rpc', function () { messageBus = createMockRpcMesageBus(); caller = createCaller(['meow'], messageBus); - messageBus.on('message', (data) => { + messageBus.addEventListener('message', (data) => { expect(data).to.have.property('func', 'meow'); done(); }); @@ -238,7 +182,7 @@ describe('rpc', function () { messageBus ); - messageBus.on('message', (data: any) => { + messageBus.addEventListener('message', (data: any) => { // Due to how our mocks implemented we have to introduce an if here to // skip our own message being received by the message bus if (data.sender === 'postmsg-rpc/server') { @@ -251,21 +195,11 @@ describe('rpc', function () { } }); - messageBus.emit('message', { + messageBus.postMessage({ sender: 'postmsg-rpc/client', func: 'meow', id: '123abc', }); }); - - describe('close', function () { - it('disables all exposed listeners', function () { - messageBus = createMockRpcMesageBus(); - exposed = exposeAll({ doSomething() {} }, messageBus); - expect(messageBus.listenerCount('message')).to.equal(1); - exposed[close](); - expect(messageBus.listenerCount('message')).to.equal(0); - }); - }); }); }); diff --git a/packages/node-runtime-worker-thread/src/rpc.ts b/packages/node-runtime-worker-thread/src/rpc.ts index d99131ec8..53bf50a56 100644 --- a/packages/node-runtime-worker-thread/src/rpc.ts +++ b/packages/node-runtime-worker-thread/src/rpc.ts @@ -1,30 +1,12 @@ -import v8 from 'v8'; import { expose, caller } from 'postmsg-rpc'; import { deserializeError, serializeError } from './serializer'; -import type { - MessageData, - PostmsgRpcOptions, - ServerMessageData, - ClientMessageData, -} from 'postmsg-rpc'; +import type { PostmsgRpcOptions } from 'postmsg-rpc'; -export function serialize(data: unknown): string { - return `data:;base64,${v8.serialize(data).toString('base64')}`; -} - -export function deserialize(str: string): T | string { - if (/^data:;base64,.+/.test(str)) { - return v8.deserialize( - Buffer.from(str.replace('data:;base64,', ''), 'base64') - ); - } - return str; -} - -type RPCMessageBus = { on: Function; off: Function } & ( - | { postMessage: Function; send?: never } - | { postMessage?: never; send?: Function } -); +export type RPCMessageBus = { + postMessage: Function; + addEventListener: Function; + removeEventListener: Function; +}; enum RPCMessageTypes { Message, @@ -47,81 +29,15 @@ function isRPCError(data: any): data is RPCError { ); } -function isMessageData(data: any): data is MessageData { - return data && typeof data === 'object' && 'id' in data && 'sender' in data; -} - -function isServerMessageData(data: any): data is ServerMessageData { - return isMessageData(data) && data.sender === 'postmsg-rpc/server'; -} - -function isClientMessageData(data: any): data is ClientMessageData { - return isMessageData(data) && data.sender === 'postmsg-rpc/client'; -} - -export function removeTrailingUndefined(arr: unknown[]): unknown[] { - if (Array.isArray(arr)) { - arr = [...arr]; - while (arr.length > 0 && arr[arr.length - 1] === undefined) { - arr.pop(); - } - } - return arr; -} - -function send(messageBus: RPCMessageBus, data: any): void { - if ( - 'postMessage' in messageBus && - typeof messageBus.postMessage === 'function' - ) { - messageBus.postMessage(data); - } - - if ('send' in messageBus && typeof messageBus.send === 'function') { - messageBus.send(data); - } -} - function getRPCOptions(messageBus: RPCMessageBus): PostmsgRpcOptions { return { - addListener: messageBus.on.bind(messageBus), - removeListener: messageBus.off.bind(messageBus), + addListener: messageBus.addEventListener.bind(messageBus), + removeListener: messageBus.removeEventListener.bind(messageBus), postMessage(data) { - if (isClientMessageData(data) && Array.isArray(data.args)) { - data.args = serialize(removeTrailingUndefined(data.args)); - } - - if (isServerMessageData(data)) { - // If serialization of the response failed for some reason (e.g., the - // value is not serializable) we want to propagate the error back to the - // client that issued the remote call instead of throwing on the server - // that was executing the method. - try { - data.res = serialize(data.res); - } catch (e: any) { - data.res = serialize({ - type: RPCMessageTypes.Error, - payload: serializeError(e), - }); - } - } - - return send(messageBus, data); + return messageBus.postMessage(data); }, - getMessageData(data) { - if ( - isClientMessageData(data) && - data.args && - typeof data.args === 'string' - ) { - data.args = deserialize(data.args); - } - - if (isServerMessageData(data) && typeof data.res === 'string') { - data.res = deserialize(data.res); - } - - return data; + getMessageData(event) { + return (event as { data: unknown }).data ?? event; }, }; } @@ -151,7 +67,7 @@ export function exposeAll(obj: O, messageBus: RPCMessageBus): Exposed { }, getRPCOptions(messageBus) ); - (val as any).close = close; + val.close = close; }); Object.defineProperty(obj, close, { enumerable: false, @@ -171,7 +87,7 @@ export type Caller< Keys extends keyof Impl = keyof Impl > = CancelableMethods> & { [cancel]: () => void }; -export function createCaller( +export function createCaller( methodNames: Extract[], messageBus: RPCMessageBus, processors: Partial< diff --git a/packages/node-runtime-worker-thread/src/spawn-child-from-source.spec.ts b/packages/node-runtime-worker-thread/src/spawn-child-from-source.spec.ts deleted file mode 100644 index 719cea0c1..000000000 --- a/packages/node-runtime-worker-thread/src/spawn-child-from-source.spec.ts +++ /dev/null @@ -1,90 +0,0 @@ -import { expect } from 'chai'; -import type { ChildProcess } from 'child_process'; -import childProcess from 'child_process'; -import { once } from 'events'; -import spawnChildFromSource, { kill } from './spawn-child-from-source'; - -describe('spawnChildFromSource', function () { - let spawned: ChildProcess; - - afterEach(async function () { - if (spawned) { - await kill(spawned, 'SIGKILL'); - spawned = null; - } - }); - - it('should throw if stdin is missing', async function () { - let err: Error; - - try { - spawned = await spawnChildFromSource('console.log("Hi")', { - // Making istanbul happy by passing stuff that's not allowed - // eslint-disable-next-line @typescript-eslint/ban-ts-comment - // @ts-expect-error - stdio: 'ignore', - }); - } catch (e: any) { - err = e; - } - - expect(err).to.be.instanceof(Error); - expect(err) - .to.have.property('message') - .match(/missing stdin/); - }); - - it('should resolve with a child process', async function () { - spawned = await spawnChildFromSource(''); - expect(spawned).to.be.instanceof((childProcess as any).ChildProcess); - }); - - it('should spawn a process with an ipc channel open', async function () { - spawned = await spawnChildFromSource( - 'process.on("message", (data) => process.send(data))' - ); - spawned.send('Hi!'); - const [message] = await once(spawned, 'message'); - expect(message).to.equal('Hi!'); - }); - - it('should fail if process exited before successfully starting', async function () { - let err: Error; - - try { - spawned = await spawnChildFromSource( - 'throw new Error("Whoops!")', - {}, - undefined, - 'ignore', - 'ignore' - ); - } catch (e: any) { - err = e; - } - - expect(err).to.be.instanceof(Error); - expect(err.message).to.match( - /Child process exited with error before starting/ - ); - }); - - it('should fail if a timeout exceeded before the process is "ready"', async function () { - let err: Error; - - try { - spawned = await spawnChildFromSource( - 'let i = 0; while(++i < 10000000000){};', - {}, - 10 - ); - } catch (e: any) { - err = e; - } - - expect(err).to.be.instanceof(Error); - expect(err.message).to.match( - /Timed out while waiting for child process to start/ - ); - }); -}); diff --git a/packages/node-runtime-worker-thread/src/spawn-child-from-source.ts b/packages/node-runtime-worker-thread/src/spawn-child-from-source.ts deleted file mode 100644 index 5d35128f5..000000000 --- a/packages/node-runtime-worker-thread/src/spawn-child-from-source.ts +++ /dev/null @@ -1,102 +0,0 @@ -import type { - ChildProcess, - Serializable, - SpawnOptions, - StdioNull, - StdioPipe, -} from 'child_process'; -import { spawn } from 'child_process'; -import { once } from 'events'; - -export async function kill( - childProcess: ChildProcess, - code: NodeJS.Signals | number = 'SIGTERM' -) { - childProcess.kill(code); - if (childProcess.exitCode === null && childProcess.signalCode === null) { - await once(childProcess, 'exit'); - } -} - -export default function spawnChildFromSource( - src: string, - spawnOptions: Omit = {}, - timeoutMs?: number, - _stdout: StdioNull | StdioPipe = 'inherit', - _stderr: StdioNull | StdioPipe = 'inherit' -): Promise { - return new Promise((resolve, reject) => { - const readyToken = Date.now().toString(32); - - const childProcess = spawn(process.execPath, { - stdio: ['pipe', _stdout, _stderr, 'ipc'], - ...spawnOptions, - }); - - if (!childProcess.stdin) { - kill(childProcess) - .then(() => { - reject( - new Error("Can't write src to the spawned process, missing stdin") - ); - }) - .catch((err: any) => { - reject(err); - }); - return; - } - - // eslint-disable-next-line prefer-const - let timeoutId: NodeJS.Timeout | null; - - function cleanupListeners() { - if (timeoutId) { - clearTimeout(timeoutId); - } - if (childProcess.stdin) { - childProcess.stdin.off('error', onWriteError); - } - childProcess.off('message', onMessage); - childProcess.off('exit', onExit); - } - - function onExit(exitCode: number | null) { - if (exitCode && exitCode > 0) { - cleanupListeners(); - reject(new Error('Child process exited with error before starting')); - } - } - - /* really hard to reproduce in tests and coverage is not happy */ - /* istanbul ignore next */ - async function onWriteError(error: Error) { - cleanupListeners(); - await kill(childProcess); - reject(error); - } - - async function onTimeout() { - cleanupListeners(); - await kill(childProcess); - reject(new Error('Timed out while waiting for child process to start')); - } - - function onMessage(data: Serializable) { - if (data === readyToken) { - cleanupListeners(); - resolve(childProcess); - } - } - - childProcess.on('message', onMessage); - childProcess.on('exit', onExit); - childProcess.stdin.on('error', onWriteError); - - childProcess.stdin.write(src); - childProcess.stdin.write(`;process.send(${JSON.stringify(readyToken)})`); - childProcess.stdin.end(); - - timeoutId = - timeoutMs !== undefined ? setTimeout(onTimeout, timeoutMs) : null; - }); -} diff --git a/packages/node-runtime-worker-thread/src/child-process-mongosh-bus.ts b/packages/node-runtime-worker-thread/src/worker-process-mongosh-bus.ts similarity index 59% rename from packages/node-runtime-worker-thread/src/child-process-mongosh-bus.ts rename to packages/node-runtime-worker-thread/src/worker-process-mongosh-bus.ts index c09812bec..083f20139 100644 --- a/packages/node-runtime-worker-thread/src/child-process-mongosh-bus.ts +++ b/packages/node-runtime-worker-thread/src/worker-process-mongosh-bus.ts @@ -1,25 +1,24 @@ -import type { ChildProcess } from 'child_process'; import type { MongoshBus } from '@mongosh/types'; import type { Exposed } from './rpc'; import { exposeAll, close } from './rpc'; -export class ChildProcessMongoshBus { +export class WorkerProcessMongoshBus { exposedEmitter: Exposed; - constructor(eventEmitter: MongoshBus, childProcess: ChildProcess) { + constructor(eventEmitter: MongoshBus, worker: Worker) { const exposedEmitter: Exposed = exposeAll( { emit(...args) { eventEmitter.emit(...args); }, on() { - throw new Error("Can't use `on` method on ChildProcessMongoshBus"); + throw new Error("Can't use `on` method on WorkerProcessMongoshBus"); }, once() { - throw new Error("Can't use `once` method on ChildProcessMongoshBus"); + throw new Error("Can't use `once` method on WorkerProcessMongoshBus"); }, }, - childProcess + worker ); this.exposedEmitter = exposedEmitter; } diff --git a/packages/node-runtime-worker-thread/src/worker-runtime.spec.ts b/packages/node-runtime-worker-thread/src/worker-runtime.spec.ts index feb286204..41dc7a361 100644 --- a/packages/node-runtime-worker-thread/src/worker-runtime.spec.ts +++ b/packages/node-runtime-worker-thread/src/worker-runtime.spec.ts @@ -1,6 +1,6 @@ import path from 'path'; import { once } from 'events'; -import { Worker } from 'worker_threads'; +import Worker from 'web-worker'; import chai, { expect } from 'chai'; import sinonChai from 'sinon-chai'; import sinon from 'sinon'; @@ -13,6 +13,7 @@ import type { WorkerRuntime } from './worker-runtime'; import type { RuntimeEvaluationResult } from '@mongosh/browser-runtime-core'; import { interrupt } from 'interruptor'; import { dummyOptions } from './index.spec'; +import { pathToFileURL } from 'url'; chai.use(sinonChai); @@ -28,12 +29,12 @@ function sleep(ms: number) { return new Promise((resolve) => setTimeout(resolve, ms)); } -describe('worker', function () { - let worker: Worker; +describe('worker-runtime', function () { + let worker: any; let caller: Caller; beforeEach(async function () { - worker = new Worker(workerThreadModule); + worker = new Worker(pathToFileURL(workerThreadModule).href); await once(worker, 'message'); caller = createCaller( @@ -55,20 +56,9 @@ describe('worker', function () { }; }); - afterEach(async function () { + afterEach(function () { if (worker) { - // There is a Node.js bug that causes worker process to still be ref-ed - // after termination. To work around that, we are unrefing worker manually - // *immediately* after terminate method is called even though it should - // not be necessary. If this is not done in rare cases our test suite can - // get stuck. Even though the issue is fixed we would still need to keep - // this workaround for compat reasons. - // - // See: https://github.com/nodejs/node/pull/37319 - const terminationPromise = worker.terminate(); - worker.unref(); - await terminationPromise; - worker = null; + worker.terminate(); } if (caller) { @@ -171,7 +161,7 @@ describe('worker', function () { describe('shell-api results', function () { const testServer = startSharedTestServer(); const db = `test-db-${Date.now().toString(16)}`; - let exposed: Exposed; + let exposed: Exposed | null; afterEach(function () { if (exposed) { @@ -333,7 +323,7 @@ describe('worker', function () { .forEach((testCase) => { const [commands, resultType, printable] = testCase; - let command: string; + let command: string | undefined; let prepare: undefined | string[]; if (Array.isArray(commands)) { @@ -522,7 +512,7 @@ describe('worker', function () { return evalListener; }; - let exposed: Exposed; + let exposed: Exposed | null; afterEach(function () { if (exposed) { diff --git a/packages/node-runtime-worker-thread/src/worker-runtime.ts b/packages/node-runtime-worker-thread/src/worker-runtime.ts index 9e12acfcb..8176ca4fa 100644 --- a/packages/node-runtime-worker-thread/src/worker-runtime.ts +++ b/packages/node-runtime-worker-thread/src/worker-runtime.ts @@ -1,7 +1,6 @@ /* istanbul ignore file */ -/* ^^^ we test the dist directly, so isntanbul can't calculate the coverage correctly */ +/* ^^^ we test the dist directly, so istanbul can't calculate the coverage correctly */ -import { parentPort, isMainThread } from 'worker_threads'; import type { Completion, Runtime, @@ -22,14 +21,16 @@ import { Lock } from './lock'; import type { InterruptHandle } from 'interruptor'; import { runInterruptible } from 'interruptor'; +const mainMessageBus = { + addEventListener: self.addEventListener.bind(self), + removeEventListener: self.removeEventListener.bind(self), + postMessage: self.postMessage.bind(self), +}; + type DevtoolsConnectOptions = Parameters< (typeof CompassServiceProvider)['connect'] >[1]; -if (!parentPort || isMainThread) { - throw new Error('Worker runtime can be used only in a worker thread'); -} - let runtime: Runtime | null = null; let provider: ServiceProvider | null = null; @@ -62,7 +63,7 @@ const evaluationListener = createCaller( 'onExit', 'onRunInterruptible', ], - parentPort, + mainMessageBus, { onPrint: function ( results: RuntimeEvaluationResult[] @@ -76,8 +77,8 @@ const evaluationListener = createCaller( } ); -const messageBus: MongoshBus = Object.assign( - createCaller(['emit'], parentPort), +const mongoshBus: MongoshBus = Object.assign( + createCaller(['emit'], mainMessageBus), { on() { throw new Error("Can't call `on` method on worker runtime MongoshBus"); @@ -112,13 +113,13 @@ const workerRuntime: WorkerRuntime = { // TS2589: Type instantiation is excessively deep and possibly infinite. // I could not figure out why exactly that was the case, so 'as any' // will have to do for now. - provider = await (CompassServiceProvider as any).connect( + provider = await CompassServiceProvider.connect( uri, deserializeConnectOptions(driverOptions), cliOptions, - messageBus + mongoshBus ); - runtime = new ElectronRuntime(provider as ServiceProvider, messageBus); + runtime = new ElectronRuntime(provider, mongoshBus); runtime.setEvaluationListener(evaluationListener); }, @@ -197,16 +198,8 @@ const workerRuntime: WorkerRuntime = { }, }; -// We expect the amount of listeners to be more than the default value of 10 but -// probably not more than ~25 (all exposed methods on -// ChildProcessEvaluationListener and ChildProcessMongoshBus + any concurrent -// in-flight calls on ChildProcessRuntime) at once -parentPort.setMaxListeners(25); - -exposeAll(workerRuntime, parentPort); +exposeAll(workerRuntime, mainMessageBus); process.nextTick(() => { - if (parentPort) { - parentPort.postMessage('ready'); - } + mainMessageBus.postMessage('ready'); }); diff --git a/packages/node-runtime-worker-thread/src/child-process-evaluation-listener.ts b/packages/node-runtime-worker-thread/src/worker-thread-evaluation-listener.ts similarity index 90% rename from packages/node-runtime-worker-thread/src/child-process-evaluation-listener.ts rename to packages/node-runtime-worker-thread/src/worker-thread-evaluation-listener.ts index 6ed6a7441..071588a74 100644 --- a/packages/node-runtime-worker-thread/src/child-process-evaluation-listener.ts +++ b/packages/node-runtime-worker-thread/src/worker-thread-evaluation-listener.ts @@ -1,18 +1,17 @@ -import type { ChildProcess } from 'child_process'; import type { Exposed } from './rpc'; import { exposeAll, close } from './rpc'; import type { WorkerRuntime } from './index'; import { deserializeEvaluationResult } from './serializer'; import type { RuntimeEvaluationListener } from '@mongosh/browser-runtime-core'; -export class ChildProcessEvaluationListener { +export class WorkerThreadEvaluationListener { exposedListener: Exposed< Required< Omit > >; - constructor(workerRuntime: WorkerRuntime, childProcess: ChildProcess) { + constructor(workerRuntime: WorkerRuntime, worker: Worker) { this.exposedListener = exposeAll( { onPrompt(question, type) { @@ -58,7 +57,7 @@ export class ChildProcessEvaluationListener { ); }, }, - childProcess + worker ); } diff --git a/packages/node-runtime-worker-thread/tests/register-worker.js b/packages/node-runtime-worker-thread/tests/register-worker.js new file mode 100644 index 000000000..30f83f327 --- /dev/null +++ b/packages/node-runtime-worker-thread/tests/register-worker.js @@ -0,0 +1 @@ +global.Worker = require('web-worker'); diff --git a/packages/node-runtime-worker-thread/tsconfig.json b/packages/node-runtime-worker-thread/tsconfig.json index 2e3d788fa..0d127bd01 100644 --- a/packages/node-runtime-worker-thread/tsconfig.json +++ b/packages/node-runtime-worker-thread/tsconfig.json @@ -2,7 +2,8 @@ "extends": "@mongodb-js/tsconfig-mongosh/tsconfig.common.json", "compilerOptions": { "outDir": "./dist", - "allowJs": true + "allowJs": true, + "lib": ["WebWorker"] }, "files": ["./src/index.d.ts"], "include": ["src/**/*"], diff --git a/packages/node-runtime-worker-thread/webpack.config.js b/packages/node-runtime-worker-thread/webpack.config.js index 7bc44fe66..6bc769896 100644 --- a/packages/node-runtime-worker-thread/webpack.config.js +++ b/packages/node-runtime-worker-thread/webpack.config.js @@ -22,9 +22,7 @@ const config = { }, }; -module.exports = ['index', 'child-process-proxy', 'worker-runtime'].map( - (entry) => ({ - entry: { [entry]: path.resolve(__dirname, 'src', `${entry}.ts`) }, - ...merge(baseWebpackConfig, config), - }) -); +module.exports = ['index', 'worker-runtime'].map((entry) => ({ + entry: { [entry]: path.resolve(__dirname, 'src', `${entry}.ts`) }, + ...merge(baseWebpackConfig, config), +})); diff --git a/packages/shell-api/src/database.ts b/packages/shell-api/src/database.ts index 58751a3a0..137782ef0 100644 --- a/packages/shell-api/src/database.ts +++ b/packages/shell-api/src/database.ts @@ -280,7 +280,7 @@ export default class Database extends ShellApiWithMongoClass { // are not going to differ from fresh ones, and even if they do, a // subsequent autocompletion request will almost certainly have at least // the new cached results. - await new Promise((resolve) => setTimeout(resolve, 200).unref()); + await new Promise((resolve) => setTimeout(resolve, 200)?.unref?.()); return this._cachedCollectionNames; })(), ]); diff --git a/packages/shell-api/src/mongo.ts b/packages/shell-api/src/mongo.ts index a57fd1b08..af82f5703 100644 --- a/packages/shell-api/src/mongo.ts +++ b/packages/shell-api/src/mongo.ts @@ -345,7 +345,7 @@ export default class Mongo extends ShellApiClass { (async () => { // See the comment in _getCollectionNamesForCompletion/database.ts // for the choice of 200 ms. - await new Promise((resolve) => setTimeout(resolve, 200).unref()); + await new Promise((resolve) => setTimeout(resolve, 200)?.unref?.()); return this._cachedDatabaseNames; })(), ]);