From 333f935c605bd0deb2b7f4b42218bc5579b78bbe Mon Sep 17 00:00:00 2001 From: bencmbrook Date: Sat, 21 Oct 2023 19:08:07 -0700 Subject: [PATCH] Finish rewrite to --- src/muxer.ts | 14 +++++++------- test/mux-web-streams.test.ts | 23 +++++++++++++++-------- 2 files changed, 22 insertions(+), 15 deletions(-) diff --git a/src/muxer.ts b/src/muxer.ts index 30bf660..185d807 100644 --- a/src/muxer.ts +++ b/src/muxer.ts @@ -46,7 +46,7 @@ function getNextReader( } // Filter out any readers that are busy - readers.filter((reader) => !reader.busy); + readers = readers.filter((reader) => !reader.busy); if (readers.length === 0) { return 'all-busy'; } @@ -186,12 +186,12 @@ export const muxer = ( } } else { // This incoming stream is finished - // Release our reader's lock to the incoming stream - currentReader.reader.releaseLock(); - // Mark this incoming stream as done, so we no longer attempt to read from it. currentReader.end = true; + // Release our reader's lock to the incoming stream + currentReader.reader.releaseLock(); + // Send one last chunk into the muxer's output to signal that this stream is done. const byteChunk = serializeChunk({ id: currentReader.id, @@ -205,9 +205,9 @@ export const muxer = ( // Cancel incoming streams if the muxer stream is canceled. cancel(reason) { - Object.values(readerById).forEach(({ reader }) => - reader.cancel(`The muxer stream was canceled: ${reason}`), - ); + Object.values(readerById).forEach(({ reader }) => { + reader.cancel(`The muxer stream was canceled: ${reason}`); + }); }, }); }; diff --git a/test/mux-web-streams.test.ts b/test/mux-web-streams.test.ts index 2acc01c..a2e0bac 100644 --- a/test/mux-web-streams.test.ts +++ b/test/mux-web-streams.test.ts @@ -9,7 +9,7 @@ import { import { demuxer, muxer } from '../src/index.js'; import type { Header, SerializableData } from '../src/types.js'; -const createStreamFromArray = (arr: any[] | Uint8Array): ReadableStream => { +const createStreamFromArray = (arr: readonly any[]): ReadableStream => { return new ReadableStream({ start(controller) { for (let item of arr) { @@ -17,12 +17,15 @@ const createStreamFromArray = (arr: any[] | Uint8Array): ReadableStream => { } controller.close(); }, + cancel(reason) { + console.error('Readable canceled', reason); + }, }); }; const readStreamToArray = async (stream: ReadableStream): Promise => { + const result: any[] = []; const reader = stream.getReader(); - const result = []; while (true) { const { value, done } = await reader.read(); @@ -34,7 +37,7 @@ const readStreamToArray = async (stream: ReadableStream): Promise => { }; const inputData: SerializableData[][] = [ - [1, 2, 3, 4, 5, 6, 7, 8, 912381, 12], + [1, 2, 3], ['a', 'b', 'c', 'd'], [{ a: 1 }, { b: 2 }, { c: 3 }], [1, 2], @@ -78,17 +81,21 @@ describe('mux/demux', () => { // Test async is not blocking test('async is not blocking - streams do not wait on slowest stream', async () => { // Create test data - const originalData = inputData.slice(0, 3); + const originalData = [ + [1, 2, 3], + ['a', 'b', 'c'], + [{ a: 1 }, { b: 2 }, { c: 3 }], + ] as const; const TIMEOUT = 1000; - const SLOW_STREAM_TIME = TIMEOUT * originalData[1]!.length; + const SLOW_STREAM_TIME = TIMEOUT * originalData[1].length; const originalStreams = [ - createStreamFromArray(originalData[0]!), + createStreamFromArray(originalData[0]), // Slow stream new ReadableStream({ async start(controller) { - for (let item of originalData[1]!) { + for (let item of originalData[1]) { // Wait first await new Promise((resolve) => { setTimeout(() => resolve(null), TIMEOUT); @@ -98,7 +105,7 @@ describe('mux/demux', () => { controller.close(); }, }), - createStreamFromArray(originalData[2]!), + createStreamFromArray(originalData[2]), ]; const startTime = Date.now();