diff --git a/packages/connect-web/src/connect-transport.ts b/packages/connect-web/src/connect-transport.ts index 9c56da6b3..e499daef3 100644 --- a/packages/connect-web/src/connect-transport.ts +++ b/packages/connect-web/src/connect-transport.ts @@ -129,6 +129,24 @@ const fetchOptions: RequestInit = { redirect: "error", }; +const supportsRequestStreams = (() => { + let duplexAccessed = false; + + // explicit casting. RequestInit does not standardly support `duplex` yet + const hasContentType = new Request("https://example.com/", { + body: new ReadableStream(), + method: "POST", + get duplex() { + duplexAccessed = true; + return "half"; + }, + } as RequestInit).headers.has("Content-Type"); + + // eslint is unaware that `get duplex()` may be called + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + return duplexAccessed && !hasContentType; +})(); + /** * Create a Transport for the Connect protocol, which makes unary and * server-streaming methods available to web browsers. It uses the fetch @@ -305,17 +323,42 @@ export function createConnectTransport( } } + function createStreamingRequestBody( + input: AsyncIterable>, + serialize: (msg: MessageShape) => Uint8Array, + ): ReadableStream { + return new ReadableStream({ + async start(controller) { + try { + for await (const message of input) { + const encoded = encodeEnvelope(0, serialize(message)); + controller.enqueue(encoded); + } + controller.close(); + } catch (e) { + controller.error(e); + } + }, + }); + } + async function createRequestBody( input: AsyncIterable>, - ): Promise { - if (method.methodKind != "server_streaming") { - throw "The fetch API does not support streaming request bodies"; + method: DescMethodStreaming, + ): Promise { + if (method.methodKind === "server_streaming") { + const r = await input[Symbol.asyncIterator]().next(); + if (r.done == true) { + throw "missing request message"; + } + return encodeEnvelope(0, serialize(r.value)); } - const r = await input[Symbol.asyncIterator]().next(); - if (r.done == true) { - throw "missing request message"; + + if (!supportsRequestStreams) { + throw "The fetch API does not support streaming request bodies in this browser"; } - return encodeEnvelope(0, serialize(r.value)); + + return createStreamingRequestBody(input, serialize); } timeoutMs = @@ -346,12 +389,17 @@ export function createConnectTransport( }, next: async (req) => { const fetch = options.fetch ?? globalThis.fetch; + const clientDuplex = + method.methodKind !== "server_streaming" + ? ({ duplex: "half" } as RequestInit) + : undefined; const fRes = await fetch(req.url, { ...fetchOptions, + ...clientDuplex, method: req.requestMethod, headers: req.header, signal: req.signal, - body: await createRequestBody(req.message), + body: await createRequestBody(req.message, method), }); validateResponse( method.methodKind,