diff --git a/app/__tests__/kv-namespace_spec.js b/app/__tests__/kv-namespace_spec.js index 79a3180..83f25f8 100644 --- a/app/__tests__/kv-namespace_spec.js +++ b/app/__tests__/kv-namespace_spec.js @@ -1,9 +1,10 @@ -const path = require("path"); +const path = require('path'); const { promisify } = require('util'); -const rimraf = promisify(require("rimraf")); +const rimraf = promisify(require('rimraf')); const { KVNamespace } = require('../kv-namespace'); const { InMemoryKVStore } = require('../in-memory-kv-store'); const { FileKVStore } = require('../file-kv-store'); +const { ReadableStream } = require('web-streams-polyfill'); const TEST_NAMESPACE = 'TEST_NAMESPACE'; const TEST_NAMESPACE_PATH = path.join(__dirname, TEST_NAMESPACE); @@ -20,7 +21,7 @@ async function createMemoryNamespace(initialData) { async function createFileNamespace(initialData) { await rimraf(TEST_NAMESPACE_PATH); const store = new FileKVStore(__dirname); - for(const [key, data] of Object.entries(initialData || {})) { + for (const [key, data] of Object.entries(initialData || {})) { await FileKVStore.putter(path.join(TEST_NAMESPACE_PATH, key), data); } return { @@ -95,7 +96,7 @@ describe('kv-namespace', () => { expect(new Uint8Array(await ns.get('key', 'arrayBuffer'))).toStrictEqual(new Uint8Array([1, 2, 3])); }); - test('it fails to get streams', async () => { + test('it gets streams', async () => { const { ns } = await createNamespace({ key: { value: '\x01\x02\x03', @@ -103,8 +104,14 @@ describe('kv-namespace', () => { metadata: null, }, }); - expect.assertions(1); - await expect(ns.get('key', 'stream')).rejects.toStrictEqual(new Error('Type "stream" is not supported!')); + const readableStream = await ns.get('key', 'stream'); + const reader = readableStream.getReader(); + let read = await reader.read(); + expect(read.done).toBe(false); + expect(read.value).toStrictEqual(new Uint8Array([1, 2, 3])); + read = await reader.read(); + expect(read.done).toBe(true); + expect(read.value).toBeUndefined(); }); test('it returns null for non-existent keys', async () => { @@ -187,7 +194,7 @@ describe('kv-namespace', () => { }); }); - test('it fails to get streams with metadata', async () => { + test('it gets streams with metadata', async () => { const { ns } = await createNamespace({ key: { value: '\x01\x02\x03', @@ -195,10 +202,17 @@ describe('kv-namespace', () => { metadata: { testing: true }, }, }); - expect.assertions(1); - await expect(ns.getWithMetadata('key', 'stream')).rejects.toStrictEqual( - new Error('Type "stream" is not supported!') - ); + const { value: readableStream, metadata } = await ns.getWithMetadata('key', 'stream'); + // Check stream contents + const reader = readableStream.getReader(); + let read = await reader.read(); + expect(read.done).toBe(false); + expect(read.value).toStrictEqual(new Uint8Array([1, 2, 3])); + read = await reader.read(); + expect(read.done).toBe(true); + expect(read.value).toBeUndefined(); + // Check metadata + expect(metadata).toStrictEqual({ testing: true }); }); test('it returns null for non-existent keys with metadata', async () => { @@ -237,6 +251,24 @@ describe('kv-namespace', () => { }); }); + test('it puts streams', async () => { + const { ns, storedFor } = await createNamespace(); + await ns.put( + 'key', + new ReadableStream({ + start(controller) { + controller.enqueue(new Uint8Array([1, 2, 3])); + controller.close(); + }, + }) + ); + await expect(await storedFor('key')).toStrictEqual({ + value: '\x01\x02\x03', + expiration: -1, + metadata: null, + }); + }); + test('it puts array buffers', async () => { const { ns, storedFor } = await createNamespace(); await ns.put('key', new Uint8Array([1, 2, 3]).buffer); diff --git a/app/__tests__/server_spec.js b/app/__tests__/server_spec.js index 062b932..992c453 100644 --- a/app/__tests__/server_spec.js +++ b/app/__tests__/server_spec.js @@ -107,6 +107,7 @@ describe("server", () => { .send(body) .expect(200, 'POST'); }); + it("can init a minio client", async () => { const app = createApp( 'addEventListener("fetch", (e) => e.respondWith(new Response("success")))', @@ -115,5 +116,19 @@ describe("server", () => { kvStores: [] // leave this empty so the client doesn't attempt to make requests } ); - }) + }); + + it("returns headers with multiple values", async () => { + const app = createApp(`addEventListener("fetch", (e) => { + const headers = new Headers(); + headers.append("Some-Header", "value1"); + headers.append("Some-Header", "value2"); + e.respondWith(new Response("hello", {status: 201, headers: headers})); + })`); + + await supertest(app) + .get("/some-route") + .expect(201, "hello") + .expect("Some-Header", "value1, value2"); + }); }); diff --git a/app/__tests__/worker_spec.js b/app/__tests__/worker_spec.js index 1229efa..9ca8479 100644 --- a/app/__tests__/worker_spec.js +++ b/app/__tests__/worker_spec.js @@ -1,7 +1,7 @@ const express = require("express"); const { Worker } = require("../worker"); const { InMemoryKVStore } = require("../in-memory-kv-store"); -const { Headers } = require("node-fetch"); +const { Headers } = require("@titelmedia/node-fetch"); describe("Workers", () => { test("It Can Create and Execute a Listener", () => { @@ -96,11 +96,38 @@ describe("Workers", () => { }); }); - test("It can stub out responses", async () => { - const worker = new Worker("foo.com", 'addEventListener("fetch", (e) => e.respondWith(new Response("hello")))'); - const response = await worker.executeFetchEvent("http://foo.com"); - expect(response.status).toBe(200); - expect(await response.text()).toBe("hello"); + describe("Responses", () => { + test("It can stub out responses", async () => { + const worker = new Worker("foo.com", 'addEventListener("fetch", (e) => e.respondWith(new Response("hello")))'); + const response = await worker.executeFetchEvent("http://foo.com"); + expect(response.status).toBe(200); + expect(await response.text()).toBe("hello"); + }); + + test("It can return a redirect response", async () => { + const worker = new Worker("foo.com", 'addEventListener("fetch", (e) => e.respondWith(Response.redirect("http://bar.com", 302)))'); + const response = await worker.executeFetchEvent("http://foo.com"); + expect(response.status).toBe(302); + expect(response.headers.get("Location")).toBe("http://bar.com"); + }); + + test("It can return a stream response", async () => { + const worker = new Worker( + "foo.com", + `addEventListener("fetch", (e) => { + e.respondWith(new Response(new ReadableStream({ + start(controller) { + controller.enqueue("hello"); + controller.enqueue(" world"); + controller.close(); + } + }))); + })` + ); + const response = await worker.executeFetchEvent("http://foo.com"); + expect(response.status).toBe(200); + expect(await response.text()).toBe("hello world"); + }); }); describe("Cloudflare Headers", () => { diff --git a/app/kv-namespace.js b/app/kv-namespace.js index 0c22f36..e069d93 100644 --- a/app/kv-namespace.js +++ b/app/kv-namespace.js @@ -1,4 +1,5 @@ const { TextDecoder, TextEncoder } = require('util'); +const { ReadableStream } = require('web-streams-polyfill'); /** * @typedef {Object} KVValue @@ -39,6 +40,37 @@ class KVNamespace { } } + /** + * @param {ReadableStream} stream + * @returns {Promise} buffer containing concatenation of all chunks written to the stream + * @private + */ + static _consumeReadableStream(stream) { + return new Promise((resolve, reject) => { + const reader = stream.getReader(); + const chunks = []; + let totalLength = 0; + + // Keep pushing until we're done reading the stream + function push() { + reader + .read() + .then(({ done, value }) => { + if (done) { + resolve(Buffer.concat(chunks, totalLength)); + } else { + const chunk = Buffer.from(value); + totalLength += chunk.length; + chunks.push(chunk); + push(); + } + }) + .catch(reject); + } + push(); + }); + } + /** * @param {KVNamespaceOptions} options */ @@ -59,7 +91,6 @@ class KVNamespace { return (await this.getWithMetadata(key, type)).value; } - // TODO: support "stream" type /** * @param {string} key * @param {("text" | "json" | "arrayBuffer" | "stream")} [type] @@ -95,16 +126,20 @@ class KVNamespace { }); typedValue = buffer; } else if (type === 'stream') { - throw new Error('Type "stream" is not supported!'); + typedValue = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode(value)); + controller.close(); + }, + }); } return { value: typedValue, metadata }; } - // TODO: support FormData and ReadableStream's as values /** * @param {string} key - * @param {(string | ArrayBuffer)} value + * @param {(string | ReadableStream | ArrayBuffer)} value * @param {{expiration: (string | number | undefined), expirationTtl: (string | number | undefined), metadata: (* | undefined)}} [options] * @returns {Promise} */ @@ -114,6 +149,11 @@ class KVNamespace { // Convert value to string if it isn't already if (value instanceof ArrayBuffer) { value = new TextDecoder().decode(value); + } else if ( + value instanceof ReadableStream || + (value.constructor.name === 'ReadableStream' && typeof value.getReader == 'function') + ) { + value = (await KVNamespace._consumeReadableStream(value)).toString('utf8'); } // Normalise expiration diff --git a/app/server.js b/app/server.js index f29a36a..501f862 100644 --- a/app/server.js +++ b/app/server.js @@ -15,8 +15,11 @@ async function callWorker(worker, req, res) { const data = await response.arrayBuffer(); res.status(response.status); - for (var pair of response.headers) { - res.set(pair[0], pair[1]); + for (const keyValues of Object.entries(response.headers.raw())) { + const key = keyValues[0]; + // If there's just one value, use it, otherwise, use the the values array + const value = keyValues[1].length === 1 ? keyValues[1][0] : keyValues[1]; + res.set(key, value); } res.end(Buffer.from(data), "binary"); } diff --git a/app/worker.js b/app/worker.js index f889f61..13aa6a4 100644 --- a/app/worker.js +++ b/app/worker.js @@ -1,12 +1,28 @@ const { createContext, Script } = require("vm"); -const { Request, Response, Headers } = require("node-fetch"); +// The titelmedia node-fetch fork uses Web Streams instead of NodeJS ones +const { Request, Response, Headers } = require("@titelmedia/node-fetch"); const { URL } = require("url"); -const fetch = require("node-fetch"); +const fetch = require("@titelmedia/node-fetch"); const atob = require("atob"); const btoa = require("btoa"); const crypto = new (require("node-webcrypto-ossl"))(); const { TextDecoder, TextEncoder } = require("util"); const { caches } = require("./caches"); +const { + ByteLengthQueuingStrategy, + CountQueuingStrategy, + ReadableByteStreamController, + ReadableStream, + ReadableStreamBYOBReader, + ReadableStreamBYOBRequest, + ReadableStreamDefaultController, + ReadableStreamDefaultReader, + TransformStream, + TransformStreamDefaultController, + WritableStream, + WritableStreamDefaultController, + WritableStreamDefaultWriter, +} = require("web-streams-polyfill"); function chomp(str) { return str.substr(0, str.length - 1); @@ -106,7 +122,22 @@ class Worker { clearInterval, // Cache stubs - caches + caches, + + // Streams + ByteLengthQueuingStrategy, + CountQueuingStrategy, + ReadableByteStreamController, + ReadableStream, + ReadableStreamBYOBReader, + ReadableStreamBYOBRequest, + ReadableStreamDefaultController, + ReadableStreamDefaultReader, + TransformStream, + TransformStreamDefaultController, + WritableStream, + WritableStreamDefaultController, + WritableStreamDefaultWriter }; const script = new Script(workerContents); script.runInContext( diff --git a/package-lock.json b/package-lock.json index 4389bb2..e8d35fc 100644 --- a/package-lock.json +++ b/package-lock.json @@ -542,6 +542,21 @@ "@types/yargs": "^12.0.9" } }, + "@titelmedia/node-fetch": { + "version": "3.1.1", + "resolved": "https://registry.npmjs.org/@titelmedia/node-fetch/-/node-fetch-3.1.1.tgz", + "integrity": "sha512-4Rrag5DjxSAzpL9MswXGBG6ZA9wEE9fEzQukyv1zJZTG+ff6dsm3tbzbYmNDxZXi7HwhPFJfxUZmvXsGCJK/ng==", + "requires": { + "web-streams-polyfill": "^2.1.1" + }, + "dependencies": { + "web-streams-polyfill": { + "version": "2.1.1", + "resolved": "https://registry.npmjs.org/web-streams-polyfill/-/web-streams-polyfill-2.1.1.tgz", + "integrity": "sha512-dlNpL2aab3g8CKfGz6rl8FNmGaRWLLn2g/DtSc9IjB30mEdE6XxzPfPSig5BwGSzI+oLxHyETrQGKjrVVhbLCg==" + } + } + }, "@types/babel__core": { "version": "7.1.6", "resolved": "https://registry.npmjs.org/@types/babel__core/-/babel__core-7.1.6.tgz", @@ -5971,11 +5986,6 @@ "integrity": "sha512-1nh45deeb5olNY7eX82BkPO7SSxR5SSYJiPTrTdFUVYwAl8CKMA5N9PjTYkHiRjisVcxcQ1HXdLhx2qxxJzLNQ==", "dev": true }, - "node-fetch": { - "version": "2.3.0", - "resolved": "https://registry.npmjs.org/node-fetch/-/node-fetch-2.3.0.tgz", - "integrity": "sha512-MOd8pV3fxENbryESLgVIeaGKrdl+uaYhCSSVkjeOb/31/njTpcis5aWfdqgNlHIrKOLRbMnfPINPOML2CIFeXA==" - }, "node-int64": { "version": "0.4.0", "resolved": "https://registry.npmjs.org/node-int64/-/node-int64-0.4.0.tgz", @@ -8011,6 +8021,11 @@ "makeerror": "1.0.x" } }, + "web-streams-polyfill": { + "version": "3.0.1", + "resolved": "https://registry.npmjs.org/web-streams-polyfill/-/web-streams-polyfill-3.0.1.tgz", + "integrity": "sha512-M+EmTdszMWINywOZaqpZ6VIEDUmNpRaTOuizF0ZKPjSDC8paMRe/jBBwFv0Yeyn5WYnM5pMqMQa82vpaE+IJRw==" + }, "webcrypto-core": { "version": "0.1.26", "resolved": "https://registry.npmjs.org/webcrypto-core/-/webcrypto-core-0.1.26.tgz", diff --git a/package.json b/package.json index e0e0257..b5eb0a8 100644 --- a/package.json +++ b/package.json @@ -20,6 +20,7 @@ "homepage": "https://github.com/gja/cloudflare-worker-local#readme", "dependencies": { "@iarna/toml": "^2.2.3", + "@titelmedia/node-fetch": "^3.1.1", "atob": "^2.1.2", "body-parser": "^1.18.3", "btoa": "^1.2.1", @@ -27,8 +28,8 @@ "lodash.get": "^4.4.2", "lodash.merge": "^4.6.2", "mkdirp": "^1.0.4", - "node-fetch": "^2.3.0", - "node-webcrypto-ossl": "^1.0.48" + "node-webcrypto-ossl": "^1.0.48", + "web-streams-polyfill": "^3.0.1" }, "optionalDependencies": { "minio": "^7.0.15"