From f7b3fae276cd6ce93f6545cf6784554d006c5246 Mon Sep 17 00:00:00 2001 From: Brett Beutell Date: Thu, 7 Nov 2024 21:59:59 +0100 Subject: [PATCH] Webhonc should proxy json and text respones with proper status code and body (#356) * First pass (seems to work kind of) * Refactor webhonc message handlers * Modify response from webhonc (either text or json) * Remove linter ignore --- api/src/lib/webhonc/handlers.ts | 159 +++++++++++++++++++++++++++++ api/src/lib/webhonc/index.ts | 171 +++++++++----------------------- api/src/lib/webhonc/types.ts | 35 +++++++ webhonc/src/index.ts | 47 ++++++++- webhonc/src/webhonc.ts | 88 +++++++++++++++- 5 files changed, 372 insertions(+), 128 deletions(-) create mode 100644 api/src/lib/webhonc/handlers.ts create mode 100644 api/src/lib/webhonc/types.ts diff --git a/api/src/lib/webhonc/handlers.ts b/api/src/lib/webhonc/handlers.ts new file mode 100644 index 000000000..66d83fad1 --- /dev/null +++ b/api/src/lib/webhonc/handlers.ts @@ -0,0 +1,159 @@ +import path from "node:path"; +import type { WsMessageSchema } from "@fiberplane/fpx-types"; +import type { z } from "zod"; +import * as schema from "../../db/schema.js"; +import logger from "../../logger.js"; +import { resolveServiceArg } from "../../probe-routes.js"; +import { generateOtelTraceId } from "../otel/index.js"; +import { + executeProxyRequest, + handleFailedRequest, + handleSuccessfulRequest, +} from "../proxy-request/index.js"; +import { resolveUrlQueryParams } from "../utils.js"; +import { getWebHoncConnectionId, setWebHoncConnectionId } from "./store.js"; +import type { WebhoncManagerConfig, WebhoncOutgoingResponse } from "./types.js"; + +type WsMessage = z.infer; +type WithCorrelationId = T & { correlationId?: string }; + +export async function handleTraceCreated() { + logger.debug("trace_created message received, no action required"); + return null; +} + +export async function handleLoginSuccess() { + logger.debug("login_success message received, this should never happen"); + return null; +} + +export async function handleConnectionOpen( + message: Extract, + config: WebhoncManagerConfig, +): Promise { + const { connectionId } = message.payload; + logger.debug( + "connection_open message received, setting webhonc connection id:", + connectionId, + ); + await setWebHoncConnectionId(config.db, connectionId); + for (const ws of config.wsConnections) { + ws.send( + JSON.stringify({ + event: "connection_open", + payload: { connectionId }, + }), + ); + } + return null; +} + +type RequestIncomingMessage = WithCorrelationId< + Extract +>; + +export async function handleRequestIncoming( + message: RequestIncomingMessage, + config: WebhoncManagerConfig, + correlationId?: string, +): Promise { + // No trace id is coming from the websocket, so we generate one + const db = config.db; + const traceId = generateOtelTraceId(); + + const serviceTarget = resolveServiceArg(process.env.FPX_SERVICE_TARGET); + const resolvedPath = path.join(serviceTarget, ...message.payload.path); + const requestUrl = resolveUrlQueryParams(resolvedPath, message.payload.query); + + const startTime = Date.now(); + + const newRequest: schema.NewAppRequest = { + requestMethod: message.payload + .method as schema.NewAppRequest["requestMethod"], + requestUrl, + requestHeaders: message.payload.headers, + requestPathParams: {}, + requestQueryParams: message.payload.query, + requestBody: message.payload.body, + requestRoute: message.payload.path.join("/"), + }; + + const webhoncId = await getWebHoncConnectionId(db); + + const supplementedHeaders = { + "x-fpx-trace-id": traceId, + "x-fpx-webhonc-id": webhoncId ?? "", + }; + + if (newRequest?.requestHeaders) { + newRequest.requestHeaders = { + ...newRequest?.requestHeaders, + ...supplementedHeaders, + }; + } else { + newRequest.requestHeaders = supplementedHeaders; + } + + const [{ id: requestId }] = await db + .insert(schema.appRequests) + .values(newRequest) + .returning({ id: schema.appRequests.id }); + + try { + const response = await executeProxyRequest({ + requestHeaders: newRequest.requestHeaders, + requestMethod: newRequest.requestMethod, + requestBody: newRequest.requestBody, + requestUrl, + }); + + const duration = Date.now() - startTime; + + const { responseBody, responseHeaders } = await handleSuccessfulRequest( + db, + requestId, + duration, + response, + traceId, + ); + + return { + status: response.status, + body: responseBody ?? "", + headers: responseHeaders, + correlationId: correlationId ?? "NA", + }; + } catch (error) { + logger.error("Error making request", error); + const duration = Date.now() - startTime; + await handleFailedRequest(db, requestId, traceId, duration, error); + return { + status: 500, + body: "Internal server error", + headers: {}, + correlationId: correlationId ?? "NA", + }; + } +} + +export const handleMessage = ( + message: WsMessage, + config: WebhoncManagerConfig, + correlationId?: string, +) => { + switch (message.event) { + case "login_success": + return handleLoginSuccess(); + case "trace_created": + return handleTraceCreated(); + case "connection_open": + return handleConnectionOpen(message, config); + case "request_incoming": + return handleRequestIncoming(message, config, correlationId); + default: { + // @ts-expect-error - We're handling all possible events in the switch statement + logger.warn(`Unknown message event: ${message?.event}`); + return null; + } + } +}; diff --git a/api/src/lib/webhonc/index.ts b/api/src/lib/webhonc/index.ts index aae9b4c44..4f21f3717 100644 --- a/api/src/lib/webhonc/index.ts +++ b/api/src/lib/webhonc/index.ts @@ -1,25 +1,12 @@ -import path from "node:path"; -import { WsMessageSchema } from "@fiberplane/fpx-types"; -import type { LibSQLDatabase } from "drizzle-orm/libsql"; +import { type WsMessage, WsMessageSchema } from "@fiberplane/fpx-types"; import WebSocket from "ws"; -import type { z } from "zod"; -import * as schema from "../../db/schema.js"; import logger from "../../logger.js"; -import { resolveServiceArg } from "../../probe-routes.js"; -import { generateOtelTraceId } from "../otel/index.js"; +import { handleMessage } from "./handlers.js"; +import { getWebHoncConnectionId } from "./store.js"; import { - executeProxyRequest, - handleFailedRequest, - handleSuccessfulRequest, -} from "../proxy-request/index.js"; -import { resolveUrlQueryParams } from "../utils.js"; -import { getWebHoncConnectionId, setWebHoncConnectionId } from "./store.js"; - -type WebhoncManagerConfig = { - host: string; - db: LibSQLDatabase; - wsConnections: Set; -}; + type WebhoncManagerConfig, + isWebhoncOutgoingResponse, +} from "./types.js"; let socket: WebSocket | undefined = undefined; let reconnectTimeout: NodeJS.Timeout | undefined = undefined; @@ -96,7 +83,19 @@ function setupSocketListeners() { socket.onmessage = async (event) => { logger.debug("Received message from the webhonc service:", event.data); try { - await handleMessage(event); + const response = await dispatchMessage(event); + const shouldNotifyResponse = + response && isWebhoncOutgoingResponse(response); + // NOTE - We want to notify webhonc about the response so that it can "proxy" it + if (socket && shouldNotifyResponse) { + socket.send( + JSON.stringify({ + event: "response_outgoing", + payload: response, + correlationId: response.correlationId, + }), + ); + } } catch (error) { logger.error("Error handling message from webhonc:", error); } @@ -136,7 +135,10 @@ function scheduleReconnect() { }, reconnectDelay); } -async function handleMessage(event: WebSocket.MessageEvent) { +/** + * Dispatches a websocket message to the appropriate handler on the api + */ +async function dispatchMessage(event: WebSocket.MessageEvent) { if (!config) { return; } @@ -147,110 +149,31 @@ async function handleMessage(event: WebSocket.MessageEvent) { JSON.parse(event.data.toString()), ); - const handler = messageHandlers[parsedMessage.event] as ( - message: typeof parsedMessage, - config: WebhoncManagerConfig, - ) => Promise; + const correlationId = extractCorrelationId(event, parsedMessage); - if (handler) { - await handler(parsedMessage, config); - } else { - logger.error(`Unhandled event type: ${parsedMessage.event}`); - } + return handleMessage(parsedMessage, config, correlationId); } -const messageHandlers: { - [K in z.infer["event"]]: ( - message: Extract, { event: K }>, - config: WebhoncManagerConfig, - ) => Promise; -} = { - trace_created: async (_message, _config) => { - logger.debug("trace_created message received, no action required"); - }, - login_success: async () => { - logger.debug("login_success message received, this should never happen"); - }, - connection_open: async (message, config) => { - const { connectionId } = message.payload; - logger.debug( - "connection_open message received, setting webhonc connection id:", - connectionId, - ); - // Await this call so that the webhonc id is set before the query on the studio side is invalidated - await setWebHoncConnectionId(config.db, connectionId); - for (const ws of config.wsConnections) { - ws.send( - JSON.stringify({ - event: "connection_open", - payload: { connectionId }, - }), - ); - } - }, - request_incoming: async (message, config) => { - // no trace id is coming from the websocket, so we generate one - const db = config.db; - const traceId = generateOtelTraceId(); - - const serviceTarget = resolveServiceArg(process.env.FPX_SERVICE_TARGET); - const resolvedPath = path.join(serviceTarget, ...message.payload.path); - const requestUrl = resolveUrlQueryParams( - resolvedPath, - message.payload.query, - ); - - const startTime = Date.now(); - - const newRequest: schema.NewAppRequest = { - requestMethod: message.payload - .method as schema.NewAppRequest["requestMethod"], - requestUrl, - requestHeaders: message.payload.headers, - requestPathParams: {}, - requestQueryParams: message.payload.query, - requestBody: message.payload.body, - requestRoute: message.payload.path.join("/"), - }; - - const webhoncId = await getWebHoncConnectionId(db); - - const supplementedHeaders = { - "x-fpx-trace-id": traceId, - "x-fpx-webhonc-id": webhoncId ?? "", - }; - - if (newRequest?.requestHeaders) { - newRequest.requestHeaders = { - ...newRequest?.requestHeaders, - ...supplementedHeaders, - }; - } else { - newRequest.requestHeaders = supplementedHeaders; +/** + * Extracts the correlation id from the message + * + * This is a HACK to avoid having to specify the correlation id on the strict fiberplane package types + */ +function extractCorrelationId( + event: WebSocket.MessageEvent, + parsedMessage: WsMessage, +): string | undefined { + // HACK - We need to extract the correlation id from the message + let correlationId = "NA"; + try { + if (parsedMessage.event === "request_incoming") { + const reparsedMessage = JSON.parse(event.data.toString()); + if (typeof reparsedMessage?.correlationId === "string") { + correlationId = reparsedMessage?.correlationId; + } } - - const [{ id: requestId }] = await db - .insert(schema.appRequests) - .values(newRequest) - .returning({ id: schema.appRequests.id }); - - try { - const response = await executeProxyRequest({ - requestHeaders: newRequest.requestHeaders, - requestMethod: newRequest.requestMethod, - requestBody: newRequest.requestBody, - requestUrl, - }); - - const duration = Date.now() - startTime; - - await handleSuccessfulRequest(db, requestId, duration, response, traceId); - - // Store the request in the database - } catch (error) { - logger.error("Error making request", error); - const duration = Date.now() - startTime; - await handleFailedRequest(db, requestId, traceId, duration, error); - } - }, -}; + } catch (_err) { + console.warn("[webhonc] Failed to parse correlation id"); + } + return correlationId; +} diff --git a/api/src/lib/webhonc/types.ts b/api/src/lib/webhonc/types.ts new file mode 100644 index 000000000..c13c3480b --- /dev/null +++ b/api/src/lib/webhonc/types.ts @@ -0,0 +1,35 @@ +import type { LibSQLDatabase } from "drizzle-orm/libsql"; +import type WebSocket from "ws"; +import { z } from "zod"; +import type * as schema from "../../db/schema.js"; +import logger from "../../logger.js"; + +export type WebhoncManagerConfig = { + host: string; + db: LibSQLDatabase; + wsConnections: Set; +}; + +const WebhoncOutgoingResponseSchema = z.object({ + status: z.number(), + body: z.string(), + headers: z.record(z.string()), + correlationId: z.string(), +}); + +export type WebhoncOutgoingResponse = z.infer< + typeof WebhoncOutgoingResponseSchema +>; + +export function isWebhoncOutgoingResponse( + value: unknown, +): value is WebhoncOutgoingResponse { + const parseResult = WebhoncOutgoingResponseSchema.safeParse(value); + if (!parseResult.success) { + logger.warn( + "Invalid webhonc outgoing response", + parseResult.error.format(), + ); + } + return parseResult.success; +} diff --git a/webhonc/src/index.ts b/webhonc/src/index.ts index 38e0ab25e..de4440cbc 100644 --- a/webhonc/src/index.ts +++ b/webhonc/src/index.ts @@ -81,7 +81,7 @@ app.all( headersJson[key] = value; } - await webhonc.pushWebhookData(id, { + const stringifiedResponse = await webhonc.pushWebhookData(id, { event: "request_incoming", payload: { headers: headersJson, @@ -92,10 +92,53 @@ app.all( }, }); - return c.text("OK"); + // TODO - Validate + const parsedResponse = JSON.parse(stringifiedResponse); + + const shouldRespondWithJson = isContentTypeJson( + parsedResponse?.headers ?? {}, + ); + + const proxiedHeaders = new Headers(); + for (const [key, value] of Object.entries(parsedResponse?.headers ?? {})) { + // TODO - handle octet stream content type? + + // NOTE - Skipping content-length as it's calculated dynamically by the response + if (key.toLowerCase() === "content-length") { + continue; + } + proxiedHeaders.set(key, value as string); + } + + // HACK - Type coercions + const proxiedBody = parsedResponse?.body as string; + const proxiedStatus = parsedResponse?.status as number; + + if (shouldRespondWithJson) { + return c.json(JSON.parse(proxiedBody), { + headers: proxiedHeaders, + status: proxiedStatus, + }); + } + + return c.text(proxiedBody, { + headers: proxiedHeaders, + status: proxiedStatus, + }); }, ); export { WebHonc }; export default app; + +function isContentTypeJson(headers: Record) { + for (const [key, value] of Object.entries(headers)) { + if (key.toLowerCase() === "content-length") { + if ((value as string)?.toLowerCase()?.startsWith("application/json")) { + return true; + } + } + } + return false; +} diff --git a/webhonc/src/webhonc.ts b/webhonc/src/webhonc.ts index 62a526939..717f6b36c 100644 --- a/webhonc/src/webhonc.ts +++ b/webhonc/src/webhonc.ts @@ -4,17 +4,26 @@ import type { Bindings } from "./types"; export class WebHonc extends DurableObject { sessions: Map; + // IMPROVE - Use Cloudflare KV instead, with an expiration time on the keys + pendingResponses: Map; constructor(ctx: DurableObjectState, env: Bindings) { super(ctx, env); this.ctx = ctx; this.env = env; this.sessions = new Map(); + this.pendingResponses = new Map(); for (const ws of this.ctx.getWebSockets()) { const { connectionId } = ws.deserializeAttachment(); this.sessions.set(connectionId, ws); } + + // Load the pending responses from storage + ctx.blockConcurrencyWhile(async () => { + this.pendingResponses = + (await ctx.storage.get("pendingResponses")) || this.pendingResponses; + }); } async fetch(_req: Request) { @@ -40,6 +49,31 @@ export class WebHonc extends DurableObject { webSocketMessage(_ws: WebSocket, message: string | ArrayBuffer) { console.debug("Received message from WS connection:", message); + try { + const messageString = + message instanceof ArrayBuffer + ? new TextDecoder().decode(message) + : message; + + const parsedMessage = JSON.parse(messageString); + const event = parsedMessage?.event; + const payload = parsedMessage?.payload; + const correlationId = parsedMessage?.correlationId; + + // The Fiberplane Studio API sends responses back to us with a correlationId + // If we're receiving a response, we need to store the response in our pending responses map + if (event === "response_outgoing" && correlationId) { + console.debug( + "Setting pending response value for correlationId:", + payload.correlationId, + ); + // Re-serialize the payload to ensure it's a valid JSON string + const payloadString = JSON.stringify(payload); + this.pendingResponses.set(correlationId, payloadString); + } + } catch (error) { + console.error("Error parsing message from WS connection:", error); + } } async webSocketClose( @@ -57,17 +91,67 @@ export class WebHonc extends DurableObject { ); try { ws.close(code); + for (const [correlationId, response] of this.pendingResponses.entries()) { + if (response === null) { + this.pendingResponses.delete(correlationId); + } + } } catch (error) { console.error("Error closing WebSocket:", error); } } + private async waitForWebhookResponse( + correlationId: string, + timeoutMs = 5000, + ): Promise { + const pollIntervalMs = 150; + const maxAttempts = Math.ceil(timeoutMs / pollIntervalMs); + + for (let attempt = 0; attempt < maxAttempts; attempt++) { + await new Promise((resolve) => setTimeout(resolve, pollIntervalMs)); + const response = this.pendingResponses.get(correlationId); + + if (response) { + this.pendingResponses.delete(correlationId); + return response; + } + } + + return JSON.stringify({ + status: 500, + body: "Webhook response timeout exceeded", + }); + } + public async pushWebhookData(connectionId: string, data: WsMessage) { - console.debug("Serializing and sending data to connection:", connectionId); + const correlationId = crypto.randomUUID(); + + console.debug( + `Serializing and sending data to (ConnectionId: ${connectionId}, CorrelationId: ${correlationId})`, + ); const ws = this.sessions.get(connectionId); - const payload = JSON.stringify(data); + + const payload = JSON.stringify({ + ...data, + correlationId, + }); + + console.debug( + "Sending payload to Studio with correlationId:", + correlationId, + ); + + this.pendingResponses.set(correlationId, null); if (ws) { ws.send(payload); } + + console.debug( + "Awaiting pending response for correlationId:", + correlationId, + ); + + return this.waitForWebhookResponse(correlationId); } }