Skip to content

Commit

Permalink
Comprehensive test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
bencmbrook committed Oct 22, 2023
1 parent f2fb177 commit cdf0823
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 86 deletions.
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
],
"scripts": {
"build": "del dist && tsc",
"test": "tsx test/mux-web-streams.test.ts",
"test": "node --loader tsx test/mux-web-streams.test.ts",
"test-only": "node --loader tsx --test-only test/mux-web-streams.test.ts",
"lint:prettier": "prettier . --check",
"prepublishOnly": "npm run build && npm run test && npm run lint:prettier"
},
Expand Down
149 changes: 71 additions & 78 deletions src/muxer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,13 @@ function getNextReader(
return 'all-done';
}

// Every stream is busy. Don't select another reader - just wait.
for (const candidate of readers) {
if (candidate.end) continue;
if (candidate.busy) continue;
return candidate;
}

// Every stream is busy or done. Don't select another reader - just wait.
return 'all-busy';
}

Expand Down Expand Up @@ -132,90 +138,77 @@ export const muxer = (
// Return a new ReadableStream that pulls from the individual stream readers in a round-robin fashion.
return new ReadableStream<Uint8Array>({
async pull(controller) {
if (downstreamIsReady(controller.desiredSize)) {
attemptNextRead();
}

/**
* Repeatedly pick the next available stream, read a chunk from it, and add that chunk to the multiplexed output
* This function is triggered by `pull()` AND by recursion whenever downstream readers are ready
*/
function attemptNextRead() {
// Pick the next available stream reader
const currentReader =
lastReaderId === null
? readerById[0]!
: getNextReader(readerById, lastReaderId)!;

// Base case. Every stream ended. We're done muxing!
if (currentReader === 'all-done') {
return controller.close();
}
if (!downstreamIsReady(controller.desiredSize)) return;

// Corner case. Every stream is busy. Wait for the next call to `pull()`.
if (currentReader === 'all-busy') {
return;
}
// Pick the next available stream reader
const currentReader =
lastReaderId === null
? readerById[0]!
: getNextReader(readerById, lastReaderId)!;

// Get the reader details
const { id, reader } = currentReader;
lastReaderId = id;

/**
* Read from this stream, asynchronously.
*
* Important: We don't `await` this, because `reader.read()` may be a very slow promise.
* Waiting for the response would pause reading for ALL streams.
* Instead, we continue calling `attemptNextRead()` for streams which are available.
*
* Important: We need to contiue calling the `attemptNextRead()` function while downstream readers are ready.
* Instead of running a while loop (which wouldn't work with this async function),
* this async function calls `attemptNextRead()` recursively.
*/
(async () => {
// Read a chunk from the reader
currentReader.busy = true;
const result = await reader.read();
currentReader.busy = false;

if (!result.done) {
// This stream is not done and has a value we need to mux.
// Prepare the chunk for the muxed output. This serializes the data into a byte array, and prepends a metadata header.
const byteChunk = serializeChunk({
id,
end: result.done,
value: result.value,
});

// If the byteChunk is not empty (sometimes streams have empty chunks)
if (byteChunk) {
// Write it to the muxed output
controller.enqueue(byteChunk);
}

// If the downstream consumers are ready for more data
if (downstreamIsReady(controller.desiredSize)) {
// Recurse (read from the next available incoming stream)
attemptNextRead();
}
} else {
// This incoming stream is finished
// Release our reader's lock to the incoming stream
reader.releaseLock();

// Mark this incoming stream as done, so we no longer attempt to read from it.
readerById[id] = { ...readerById[id]!, end: true };

// Send one last chunk into the muxer's output to signal that this stream is done.
const byteChunk = serializeChunk({
id,
end: true,
value: '\u0004', // arbitrarily chosen; value just needs to have length > 0 to distinguish cases
});
controller.enqueue(byteChunk!);
}
})();
// Base case. Every stream ended. We're done muxing!
if (currentReader === 'all-done') {
return controller.close();
}

// Corner case. Every stream is busy. Wait for the next call to `pull()`.
if (currentReader === 'all-busy') {
return;
}

// Get the reader details
lastReaderId = currentReader.id;

// Mark this reader busy until `reader.read()` resolves
currentReader.busy = true;

/**
* Read from this stream, asynchronously.
*
* Important: We don't `await` this, because `reader.read()` may be a very slow promise.
* Waiting for the response would pause reading for ALL streams.
* Instead, we continue calling `attemptNextRead()` for streams which are available.
*/
(async () => {
// Read a chunk from the reader
const result = await currentReader.reader.read();
currentReader.busy = false;

if (!result.done) {
// This stream is not done and has a value we need to mux.
// Prepare the chunk for the muxed output. This serializes the data into a byte array, and prepends a metadata header.
const byteChunk = serializeChunk({
id: currentReader.id,
end: false,
value: result.value,
});

// If the byteChunk is not empty (sometimes streams have empty chunks)
if (byteChunk !== null) {
// Write it to the muxed output
controller.enqueue(byteChunk);
}
} else {
// This incoming stream is finished
// Release our reader's lock to the incoming stream
currentReader.reader.releaseLock();

// Mark this incoming stream as done, so we no longer attempt to read from it.
currentReader.end = true;

// Send one last chunk into the muxer's output to signal that this stream is done.
const byteChunk = serializeChunk({
id: currentReader.id,
end: true,
value: '\u0004', // arbitrarily chosen; value just needs to have length > 0 to distinguish cases
});
controller.enqueue(byteChunk!);
}
})();
},

// Cancel incoming streams if the muxer stream is canceled.
Expand Down
23 changes: 17 additions & 6 deletions test/mux-web-streams.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,23 @@ const readStreamToArray = async (stream: ReadableStream): Promise<any[]> => {

const inputData: SerializableData[][] = [
[1, 2, 3, 4, 5, 6, 7, 8, 912381, 12],
['a', 'b', 'c'],
[new Uint8Array([12, 1, 100, 255, 0])],
// [null, null, 2, null],
[{}, {}],
['a', 'b', 'c', 'd'],
[{ a: 1 }, { b: 2 }, { c: 3 }],
[1, 2],
['A'],
[[1, 2], 3, 'a', new Uint8Array([12, 1, 100, 255, 0])],
[true, false],
[[1, 2], 3, null, 'a'],
[[1, 2], 3, 'a'],
[{ a: 1 }, { b: 2 }, { c: 3 }],
[[1, 2], 3, false],
[[1, 2], 3, 0],
[[1, 2], 3, []],
[new Uint8Array([12, 1, 100, 255, 0])],
[null, null, null],
[[1, 2], 3, 'a', {}],
[[1, 2], 3, null, 'a'],
[{}, {}],
[],
];

describe('mux/demux', () => {
Expand Down Expand Up @@ -123,9 +133,10 @@ describe('mux/demux', () => {
describe('serialization', () => {
test('body conversion functions `serializeData` and `deserializeData` are inverse', () => {
for (const input of inputData) {
if (input.length === 0) continue;
const { data, isRaw } = serializeData({ value: input[0]! });
const { value } = deserializeData({ data, isRaw });
assert.deepStrictEqual(value, input[0]!);
assert.deepStrictEqual(value, input[0]);
}
});

Expand Down
2 changes: 1 addition & 1 deletion test/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
"lib": ["dom", "dom.iterable", "es2023"],
"types": ["node"]
},
"include": ["**/*"]
"include": ["**/*.test.ts"]
}

0 comments on commit cdf0823

Please sign in to comment.