Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Web Streams Support #61

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 43 additions & 11 deletions app/__tests__/kv-namespace_spec.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
const path = require("path");
const path = require('path');
const { promisify } = require('util');
const rimraf = promisify(require("rimraf"));
const rimraf = promisify(require('rimraf'));
const { KVNamespace } = require('../kv-namespace');
const { InMemoryKVStore } = require('../in-memory-kv-store');
const { FileKVStore } = require('../file-kv-store');
const { ReadableStream } = require('web-streams-polyfill');

const TEST_NAMESPACE = 'TEST_NAMESPACE';
const TEST_NAMESPACE_PATH = path.join(__dirname, TEST_NAMESPACE);
Expand All @@ -20,7 +21,7 @@ async function createMemoryNamespace(initialData) {
async function createFileNamespace(initialData) {
await rimraf(TEST_NAMESPACE_PATH);
const store = new FileKVStore(__dirname);
for(const [key, data] of Object.entries(initialData || {})) {
for (const [key, data] of Object.entries(initialData || {})) {
await FileKVStore.putter(path.join(TEST_NAMESPACE_PATH, key), data);
}
return {
Expand Down Expand Up @@ -95,16 +96,22 @@ describe('kv-namespace', () => {
expect(new Uint8Array(await ns.get('key', 'arrayBuffer'))).toStrictEqual(new Uint8Array([1, 2, 3]));
});

test('it fails to get streams', async () => {
test('it gets streams', async () => {
const { ns } = await createNamespace({
key: {
value: '\x01\x02\x03',
expiration: -1,
metadata: null,
},
});
expect.assertions(1);
await expect(ns.get('key', 'stream')).rejects.toStrictEqual(new Error('Type "stream" is not supported!'));
const readableStream = await ns.get('key', 'stream');
const reader = readableStream.getReader();
let read = await reader.read();
expect(read.done).toBe(false);
expect(read.value).toStrictEqual(new Uint8Array([1, 2, 3]));
read = await reader.read();
expect(read.done).toBe(true);
expect(read.value).toBeUndefined();
});

test('it returns null for non-existent keys', async () => {
Expand Down Expand Up @@ -187,18 +194,25 @@ describe('kv-namespace', () => {
});
});

test('it fails to get streams with metadata', async () => {
test('it gets streams with metadata', async () => {
const { ns } = await createNamespace({
key: {
value: '\x01\x02\x03',
expiration: -1,
metadata: { testing: true },
},
});
expect.assertions(1);
await expect(ns.getWithMetadata('key', 'stream')).rejects.toStrictEqual(
new Error('Type "stream" is not supported!')
);
const { value: readableStream, metadata } = await ns.getWithMetadata('key', 'stream');
// Check stream contents
const reader = readableStream.getReader();
let read = await reader.read();
expect(read.done).toBe(false);
expect(read.value).toStrictEqual(new Uint8Array([1, 2, 3]));
read = await reader.read();
expect(read.done).toBe(true);
expect(read.value).toBeUndefined();
// Check metadata
expect(metadata).toStrictEqual({ testing: true });
});

test('it returns null for non-existent keys with metadata', async () => {
Expand Down Expand Up @@ -237,6 +251,24 @@ describe('kv-namespace', () => {
});
});

test('it puts streams', async () => {
const { ns, storedFor } = await createNamespace();
await ns.put(
'key',
new ReadableStream({
start(controller) {
controller.enqueue(new Uint8Array([1, 2, 3]));
controller.close();
},
})
);
await expect(await storedFor('key')).toStrictEqual({
value: '\x01\x02\x03',
expiration: -1,
metadata: null,
});
});

test('it puts array buffers', async () => {
const { ns, storedFor } = await createNamespace();
await ns.put('key', new Uint8Array([1, 2, 3]).buffer);
Expand Down
17 changes: 16 additions & 1 deletion app/__tests__/server_spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ describe("server", () => {
.send(body)
.expect(200, 'POST');
});

it("can init a minio client", async () => {
const app = createApp(
'addEventListener("fetch", (e) => e.respondWith(new Response("success")))',
Expand All @@ -115,5 +116,19 @@ describe("server", () => {
kvStores: [] // leave this empty so the client doesn't attempt to make requests
}
);
})
});

it("returns headers with multiple values", async () => {
const app = createApp(`addEventListener("fetch", (e) => {
const headers = new Headers();
headers.append("Some-Header", "value1");
headers.append("Some-Header", "value2");
e.respondWith(new Response("hello", {status: 201, headers: headers}));
})`);

await supertest(app)
.get("/some-route")
.expect(201, "hello")
.expect("Some-Header", "value1, value2");
});
});
39 changes: 33 additions & 6 deletions app/__tests__/worker_spec.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const express = require("express");
const { Worker } = require("../worker");
const { InMemoryKVStore } = require("../in-memory-kv-store");
const { Headers } = require("node-fetch");
const { Headers } = require("@titelmedia/node-fetch");

describe("Workers", () => {
test("It Can Create and Execute a Listener", () => {
Expand Down Expand Up @@ -96,11 +96,38 @@ describe("Workers", () => {
});
});

test("It can stub out responses", async () => {
const worker = new Worker("foo.com", 'addEventListener("fetch", (e) => e.respondWith(new Response("hello")))');
const response = await worker.executeFetchEvent("http://foo.com");
expect(response.status).toBe(200);
expect(await response.text()).toBe("hello");
describe("Responses", () => {
test("It can stub out responses", async () => {
const worker = new Worker("foo.com", 'addEventListener("fetch", (e) => e.respondWith(new Response("hello")))');
const response = await worker.executeFetchEvent("http://foo.com");
expect(response.status).toBe(200);
expect(await response.text()).toBe("hello");
});

test("It can return a redirect response", async () => {
const worker = new Worker("foo.com", 'addEventListener("fetch", (e) => e.respondWith(Response.redirect("http://bar.com", 302)))');
const response = await worker.executeFetchEvent("http://foo.com");
expect(response.status).toBe(302);
expect(response.headers.get("Location")).toBe("http://bar.com");
});

test("It can return a stream response", async () => {
const worker = new Worker(
"foo.com",
`addEventListener("fetch", (e) => {
e.respondWith(new Response(new ReadableStream({
start(controller) {
controller.enqueue("hello");
controller.enqueue(" world");
controller.close();
}
})));
})`
);
const response = await worker.executeFetchEvent("http://foo.com");
expect(response.status).toBe(200);
expect(await response.text()).toBe("hello world");
});
});

describe("Cloudflare Headers", () => {
Expand Down
48 changes: 44 additions & 4 deletions app/kv-namespace.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const { TextDecoder, TextEncoder } = require('util');
const { ReadableStream } = require('web-streams-polyfill');

/**
* @typedef {Object} KVValue
Expand Down Expand Up @@ -39,6 +40,37 @@ class KVNamespace {
}
}

/**
* @param {ReadableStream} stream
* @returns {Promise<Buffer>} buffer containing concatenation of all chunks written to the stream
* @private
*/
static _consumeReadableStream(stream) {
return new Promise((resolve, reject) => {
const reader = stream.getReader();
const chunks = [];
let totalLength = 0;

// Keep pushing until we're done reading the stream
function push() {
reader
.read()
.then(({ done, value }) => {
if (done) {
resolve(Buffer.concat(chunks, totalLength));
} else {
const chunk = Buffer.from(value);
totalLength += chunk.length;
chunks.push(chunk);
push();
}
})
.catch(reject);
}
push();
});
}

/**
* @param {KVNamespaceOptions} options
*/
Expand All @@ -59,7 +91,6 @@ class KVNamespace {
return (await this.getWithMetadata(key, type)).value;
}

// TODO: support "stream" type
/**
* @param {string} key
* @param {("text" | "json" | "arrayBuffer" | "stream")} [type]
Expand Down Expand Up @@ -95,16 +126,20 @@ class KVNamespace {
});
typedValue = buffer;
} else if (type === 'stream') {
throw new Error('Type "stream" is not supported!');
typedValue = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode(value));
controller.close();
},
});
}

return { value: typedValue, metadata };
}

// TODO: support FormData and ReadableStream's as values
/**
* @param {string} key
* @param {(string | ArrayBuffer)} value
* @param {(string | ReadableStream | ArrayBuffer)} value
* @param {{expiration: (string | number | undefined), expirationTtl: (string | number | undefined), metadata: (* | undefined)}} [options]
* @returns {Promise<void>}
*/
Expand All @@ -114,6 +149,11 @@ class KVNamespace {
// Convert value to string if it isn't already
if (value instanceof ArrayBuffer) {
value = new TextDecoder().decode(value);
} else if (
value instanceof ReadableStream ||
(value.constructor.name === 'ReadableStream' && typeof value.getReader == 'function')
) {
value = (await KVNamespace._consumeReadableStream(value)).toString('utf8');
}

// Normalise expiration
Expand Down
7 changes: 5 additions & 2 deletions app/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ async function callWorker(worker, req, res) {
const data = await response.arrayBuffer();

res.status(response.status);
for (var pair of response.headers) {
res.set(pair[0], pair[1]);
for (const keyValues of Object.entries(response.headers.raw())) {
const key = keyValues[0];
// If there's just one value, use it, otherwise, use the the values array
const value = keyValues[1].length === 1 ? keyValues[1][0] : keyValues[1];
res.set(key, value);
}
res.end(Buffer.from(data), "binary");
}
Expand Down
37 changes: 34 additions & 3 deletions app/worker.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,28 @@
const { createContext, Script } = require("vm");
const { Request, Response, Headers } = require("node-fetch");
// The titelmedia node-fetch fork uses Web Streams instead of NodeJS ones
const { Request, Response, Headers } = require("@titelmedia/node-fetch");
const { URL } = require("url");
const fetch = require("node-fetch");
const fetch = require("@titelmedia/node-fetch");
const atob = require("atob");
const btoa = require("btoa");
const crypto = new (require("node-webcrypto-ossl"))();
const { TextDecoder, TextEncoder } = require("util");
const { caches } = require("./caches");
const {
ByteLengthQueuingStrategy,
CountQueuingStrategy,
ReadableByteStreamController,
ReadableStream,
ReadableStreamBYOBReader,
ReadableStreamBYOBRequest,
ReadableStreamDefaultController,
ReadableStreamDefaultReader,
TransformStream,
TransformStreamDefaultController,
WritableStream,
WritableStreamDefaultController,
WritableStreamDefaultWriter,
} = require("web-streams-polyfill");

function chomp(str) {
return str.substr(0, str.length - 1);
Expand Down Expand Up @@ -106,7 +122,22 @@ class Worker {
clearInterval,

// Cache stubs
caches
caches,

// Streams
ByteLengthQueuingStrategy,
CountQueuingStrategy,
ReadableByteStreamController,
ReadableStream,
ReadableStreamBYOBReader,
ReadableStreamBYOBRequest,
ReadableStreamDefaultController,
ReadableStreamDefaultReader,
TransformStream,
TransformStreamDefaultController,
WritableStream,
WritableStreamDefaultController,
WritableStreamDefaultWriter
};
const script = new Script(workerContents);
script.runInContext(
Expand Down
25 changes: 20 additions & 5 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading