diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index 8e27bae439f8e3..80e02379edab6b 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -589,18 +589,24 @@ class ReadableStream { 'DataCloneError'); } + const { port1, port2 } = this[kState].transfer; + + this[kState].transfer.port2 = undefined; + + const transfer = lazyTransfer(); const { writable, + source, promise, - } = lazyTransfer().newCrossRealmWritableSink( - this, - this[kState].transfer.port1); + } = transfer.newCrossRealmWritableSink(this, port1); + + transfer.closeRegistry().register(port2, source); this[kState].transfer.writable = writable; this[kState].transfer.promise = promise; return { - data: { port: this[kState].transfer.port2 }, + data: { port: port2 }, deserializeInfo: 'internal/webstreams/readablestream:TransferredReadableStream', }; diff --git a/lib/internal/webstreams/transfer.js b/lib/internal/webstreams/transfer.js index 136b0d81a99464..f016b73e9f8cbd 100644 --- a/lib/internal/webstreams/transfer.js +++ b/lib/internal/webstreams/transfer.js @@ -4,6 +4,7 @@ const { ObjectDefineProperties, PromiseResolve, ReflectConstruct, + SafeFinalizationRegistry, } = primordials; const { @@ -261,10 +262,18 @@ class CrossRealmTransformWritableSink { } } +let finalizer = null; + +function closeRegistry() { + return (finalizer ??= new SafeFinalizationRegistry((source) => { + source[kState].port.close(); + })); +} + function newCrossRealmReadableStream(writable, port) { - const readable = - new ReadableStream( - new CrossRealmTransformReadableSource(port)); + const source = new CrossRealmTransformReadableSource(port); + + const readable = new ReadableStream(source); const promise = readableStreamPipeTo(readable, writable, false, false, false); @@ -273,19 +282,21 @@ function newCrossRealmReadableStream(writable, port) { return { readable, + source, promise, }; } function newCrossRealmWritableSink(readable, port) { - const writable = - new WritableStream( - new CrossRealmTransformWritableSink(port)); + const source = new CrossRealmTransformWritableSink(port); + + const writable = new WritableStream(source); const promise = readableStreamPipeTo(readable, writable, false, false, false); setPromiseHandled(promise); return { writable, + source, promise, }; } @@ -297,4 +308,5 @@ module.exports = { CrossRealmTransformReadableSource, CloneableDOMException, InternalCloneableDOMException, + closeRegistry, }; diff --git a/lib/internal/webstreams/writablestream.js b/lib/internal/webstreams/writablestream.js index 2115aba36e927b..4bd5e03363649a 100644 --- a/lib/internal/webstreams/writablestream.js +++ b/lib/internal/webstreams/writablestream.js @@ -282,20 +282,27 @@ class WritableStream { 'DataCloneError'); } + const { port1, port2 } = this[kState].transfer; + + this[kState].transfer.port2 = undefined; + + const transfer = lazyTransfer(); const { readable, + source, promise, - } = lazyTransfer().newCrossRealmReadableStream( + } = transfer.newCrossRealmReadableStream( this, - this[kState].transfer.port1); + port1); + + transfer.closeRegistry().register(port2, source); this[kState].transfer.readable = readable; this[kState].transfer.promise = promise; - setPromiseHandled(this[kState].transfer.promise); return { - data: { port: this[kState].transfer.port2 }, + data: { port: port2 }, deserializeInfo: 'internal/webstreams/writablestream:TransferredWritableStream', }; diff --git a/test/parallel/test-webstreams-using-structuredclone-exit-process.js b/test/parallel/test-webstreams-using-structuredclone-exit-process.js new file mode 100644 index 00000000000000..7018c2d34445fb --- /dev/null +++ b/test/parallel/test-webstreams-using-structuredclone-exit-process.js @@ -0,0 +1,32 @@ +'use strict'; + +const common = require('../common'); +const { Worker } = require('worker_threads'); +const { once } = require('node:events'); + +const worker = new Worker( + ` + const { ReadableStream } = require('stream/web'); + const rs = new ReadableStream(); + const cloned = structuredClone(rs, { transfer: [rs] }); + `, + { eval: true }, +); + +const worker2 = new Worker( + ` + const { WritableStream } = require('stream/web'); + const ws = new WritableStream(); + const cloned = structuredClone(ws, { transfer: [ws] }); + `, + { eval: true }, +); + +(async () => { + // A timer is used here to detect the end of a process. + const timer = setTimeout(common.mustNotCall(), common.platformTimeout(10000)); + + await Promise.all([once(worker, 'exit'), once(worker2, 'exit')]); + + clearTimeout(timer); +})().then(common.mustCall());