diff --git a/types/src/shared/utils/streams.ts b/types/src/shared/utils/streams.ts index b7e6181cbdc1..12dfd8728c84 100644 --- a/types/src/shared/utils/streams.ts +++ b/types/src/shared/utils/streams.ts @@ -43,9 +43,10 @@ export function readableStreamToReadable(readableStream: ReadableStream) { const reader = readableStream.getReader(); let reading = false; - return new Readable({ + const nodeReadable = new Readable({ async read() { if (reading) { + console.log("already reading"); return; } reading = true; @@ -53,15 +54,28 @@ export function readableStreamToReadable(readableStream: ReadableStream) { try { const { done, value } = await reader.read(); if (done) { + console.log("done"); this.push(null); } else { + console.log("not done"); reading = !this.push(value); // Pause reading if push() returns false + console.log("set reading=", reading); } } catch (error) { + console.log("error", error); this.destroy(error as Error); - } finally { - reading = false; } }, }); + + nodeReadable.on("readable", () => { + console.log("readable"); + reading = false; + + if (!reading) { + nodeReadable.read(); // Resume reading when the stream is ready for more data + } + }); + + return nodeReadable; }