Skip to content

Commit

Permalink
Webhonc should proxy json and text respones with proper status code a…
Browse files Browse the repository at this point in the history
…nd body (#356)

* First pass (seems to work kind of)

* Refactor webhonc message handlers

* Modify response from webhonc (either text or json)

* Remove linter ignore
  • Loading branch information
brettimus authored Nov 7, 2024
1 parent 94ec506 commit f7b3fae
Show file tree
Hide file tree
Showing 5 changed files with 372 additions and 128 deletions.
159 changes: 159 additions & 0 deletions api/src/lib/webhonc/handlers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
import path from "node:path";
import type { WsMessageSchema } from "@fiberplane/fpx-types";
import type { z } from "zod";
import * as schema from "../../db/schema.js";
import logger from "../../logger.js";
import { resolveServiceArg } from "../../probe-routes.js";
import { generateOtelTraceId } from "../otel/index.js";
import {
executeProxyRequest,
handleFailedRequest,
handleSuccessfulRequest,
} from "../proxy-request/index.js";
import { resolveUrlQueryParams } from "../utils.js";
import { getWebHoncConnectionId, setWebHoncConnectionId } from "./store.js";
import type { WebhoncManagerConfig, WebhoncOutgoingResponse } from "./types.js";

type WsMessage = z.infer<typeof WsMessageSchema>;
type WithCorrelationId<T> = T & { correlationId?: string };

export async function handleTraceCreated() {
logger.debug("trace_created message received, no action required");
return null;
}

export async function handleLoginSuccess() {
logger.debug("login_success message received, this should never happen");
return null;
}

export async function handleConnectionOpen(
message: Extract<WsMessage, { event: "connection_open" }>,
config: WebhoncManagerConfig,
): Promise<null> {
const { connectionId } = message.payload;
logger.debug(
"connection_open message received, setting webhonc connection id:",
connectionId,
);
await setWebHoncConnectionId(config.db, connectionId);
for (const ws of config.wsConnections) {
ws.send(
JSON.stringify({
event: "connection_open",
payload: { connectionId },
}),
);
}
return null;
}

type RequestIncomingMessage = WithCorrelationId<
Extract<WsMessage, { event: "request_incoming" }>
>;

export async function handleRequestIncoming(
message: RequestIncomingMessage,
config: WebhoncManagerConfig,
correlationId?: string,
): Promise<WebhoncOutgoingResponse> {
// No trace id is coming from the websocket, so we generate one
const db = config.db;
const traceId = generateOtelTraceId();

const serviceTarget = resolveServiceArg(process.env.FPX_SERVICE_TARGET);
const resolvedPath = path.join(serviceTarget, ...message.payload.path);
const requestUrl = resolveUrlQueryParams(resolvedPath, message.payload.query);

const startTime = Date.now();

const newRequest: schema.NewAppRequest = {
requestMethod: message.payload
.method as schema.NewAppRequest["requestMethod"],
requestUrl,
requestHeaders: message.payload.headers,
requestPathParams: {},
requestQueryParams: message.payload.query,
requestBody: message.payload.body,
requestRoute: message.payload.path.join("/"),
};

const webhoncId = await getWebHoncConnectionId(db);

const supplementedHeaders = {
"x-fpx-trace-id": traceId,
"x-fpx-webhonc-id": webhoncId ?? "",
};

if (newRequest?.requestHeaders) {
newRequest.requestHeaders = {
...newRequest?.requestHeaders,
...supplementedHeaders,
};
} else {
newRequest.requestHeaders = supplementedHeaders;
}

const [{ id: requestId }] = await db
.insert(schema.appRequests)
.values(newRequest)
.returning({ id: schema.appRequests.id });

try {
const response = await executeProxyRequest({
requestHeaders: newRequest.requestHeaders,
requestMethod: newRequest.requestMethod,
requestBody: newRequest.requestBody,
requestUrl,
});

const duration = Date.now() - startTime;

const { responseBody, responseHeaders } = await handleSuccessfulRequest(
db,
requestId,
duration,
response,
traceId,
);

return {
status: response.status,
body: responseBody ?? "",
headers: responseHeaders,
correlationId: correlationId ?? "NA",
};
} catch (error) {
logger.error("Error making request", error);
const duration = Date.now() - startTime;
await handleFailedRequest(db, requestId, traceId, duration, error);
return {
status: 500,
body: "Internal server error",
headers: {},
correlationId: correlationId ?? "NA",
};
}
}

export const handleMessage = (
message: WsMessage,
config: WebhoncManagerConfig,
correlationId?: string,
) => {
switch (message.event) {
case "login_success":
return handleLoginSuccess();
case "trace_created":
return handleTraceCreated();
case "connection_open":
return handleConnectionOpen(message, config);
case "request_incoming":
return handleRequestIncoming(message, config, correlationId);
default: {
// @ts-expect-error - We're handling all possible events in the switch statement
logger.warn(`Unknown message event: ${message?.event}`);
return null;
}
}
};
171 changes: 47 additions & 124 deletions api/src/lib/webhonc/index.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,12 @@
import path from "node:path";
import { WsMessageSchema } from "@fiberplane/fpx-types";
import type { LibSQLDatabase } from "drizzle-orm/libsql";
import { type WsMessage, WsMessageSchema } from "@fiberplane/fpx-types";
import WebSocket from "ws";
import type { z } from "zod";
import * as schema from "../../db/schema.js";
import logger from "../../logger.js";
import { resolveServiceArg } from "../../probe-routes.js";
import { generateOtelTraceId } from "../otel/index.js";
import { handleMessage } from "./handlers.js";
import { getWebHoncConnectionId } from "./store.js";
import {
executeProxyRequest,
handleFailedRequest,
handleSuccessfulRequest,
} from "../proxy-request/index.js";
import { resolveUrlQueryParams } from "../utils.js";
import { getWebHoncConnectionId, setWebHoncConnectionId } from "./store.js";

type WebhoncManagerConfig = {
host: string;
db: LibSQLDatabase<typeof schema>;
wsConnections: Set<WebSocket>;
};
type WebhoncManagerConfig,
isWebhoncOutgoingResponse,
} from "./types.js";

let socket: WebSocket | undefined = undefined;
let reconnectTimeout: NodeJS.Timeout | undefined = undefined;
Expand Down Expand Up @@ -96,7 +83,19 @@ function setupSocketListeners() {
socket.onmessage = async (event) => {
logger.debug("Received message from the webhonc service:", event.data);
try {
await handleMessage(event);
const response = await dispatchMessage(event);
const shouldNotifyResponse =
response && isWebhoncOutgoingResponse(response);
// NOTE - We want to notify webhonc about the response so that it can "proxy" it
if (socket && shouldNotifyResponse) {
socket.send(
JSON.stringify({
event: "response_outgoing",
payload: response,
correlationId: response.correlationId,
}),
);
}
} catch (error) {
logger.error("Error handling message from webhonc:", error);
}
Expand Down Expand Up @@ -136,7 +135,10 @@ function scheduleReconnect() {
}, reconnectDelay);
}

async function handleMessage(event: WebSocket.MessageEvent) {
/**
* Dispatches a websocket message to the appropriate handler on the api
*/
async function dispatchMessage(event: WebSocket.MessageEvent) {
if (!config) {
return;
}
Expand All @@ -147,110 +149,31 @@ async function handleMessage(event: WebSocket.MessageEvent) {
JSON.parse(event.data.toString()),
);

const handler = messageHandlers[parsedMessage.event] as (
message: typeof parsedMessage,
config: WebhoncManagerConfig,
) => Promise<void>;
const correlationId = extractCorrelationId(event, parsedMessage);

if (handler) {
await handler(parsedMessage, config);
} else {
logger.error(`Unhandled event type: ${parsedMessage.event}`);
}
return handleMessage(parsedMessage, config, correlationId);
}

const messageHandlers: {
[K in z.infer<typeof WsMessageSchema>["event"]]: (
message: Extract<z.infer<typeof WsMessageSchema>, { event: K }>,
config: WebhoncManagerConfig,
) => Promise<void>;
} = {
trace_created: async (_message, _config) => {
logger.debug("trace_created message received, no action required");
},
login_success: async () => {
logger.debug("login_success message received, this should never happen");
},
connection_open: async (message, config) => {
const { connectionId } = message.payload;
logger.debug(
"connection_open message received, setting webhonc connection id:",
connectionId,
);
// Await this call so that the webhonc id is set before the query on the studio side is invalidated
await setWebHoncConnectionId(config.db, connectionId);
for (const ws of config.wsConnections) {
ws.send(
JSON.stringify({
event: "connection_open",
payload: { connectionId },
}),
);
}
},
request_incoming: async (message, config) => {
// no trace id is coming from the websocket, so we generate one
const db = config.db;
const traceId = generateOtelTraceId();

const serviceTarget = resolveServiceArg(process.env.FPX_SERVICE_TARGET);
const resolvedPath = path.join(serviceTarget, ...message.payload.path);
const requestUrl = resolveUrlQueryParams(
resolvedPath,
message.payload.query,
);

const startTime = Date.now();

const newRequest: schema.NewAppRequest = {
requestMethod: message.payload
.method as schema.NewAppRequest["requestMethod"],
requestUrl,
requestHeaders: message.payload.headers,
requestPathParams: {},
requestQueryParams: message.payload.query,
requestBody: message.payload.body,
requestRoute: message.payload.path.join("/"),
};

const webhoncId = await getWebHoncConnectionId(db);

const supplementedHeaders = {
"x-fpx-trace-id": traceId,
"x-fpx-webhonc-id": webhoncId ?? "",
};

if (newRequest?.requestHeaders) {
newRequest.requestHeaders = {
...newRequest?.requestHeaders,
...supplementedHeaders,
};
} else {
newRequest.requestHeaders = supplementedHeaders;
/**
* Extracts the correlation id from the message
*
* This is a HACK to avoid having to specify the correlation id on the strict fiberplane package types
*/
function extractCorrelationId(
event: WebSocket.MessageEvent,
parsedMessage: WsMessage,
): string | undefined {
// HACK - We need to extract the correlation id from the message
let correlationId = "NA";
try {
if (parsedMessage.event === "request_incoming") {
const reparsedMessage = JSON.parse(event.data.toString());
if (typeof reparsedMessage?.correlationId === "string") {
correlationId = reparsedMessage?.correlationId;
}
}

const [{ id: requestId }] = await db
.insert(schema.appRequests)
.values(newRequest)
.returning({ id: schema.appRequests.id });

try {
const response = await executeProxyRequest({
requestHeaders: newRequest.requestHeaders,
requestMethod: newRequest.requestMethod,
requestBody: newRequest.requestBody,
requestUrl,
});

const duration = Date.now() - startTime;

await handleSuccessfulRequest(db, requestId, duration, response, traceId);

// Store the request in the database
} catch (error) {
logger.error("Error making request", error);
const duration = Date.now() - startTime;
await handleFailedRequest(db, requestId, traceId, duration, error);
}
},
};
} catch (_err) {
console.warn("[webhonc] Failed to parse correlation id");
}
return correlationId;
}
35 changes: 35 additions & 0 deletions api/src/lib/webhonc/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import type { LibSQLDatabase } from "drizzle-orm/libsql";
import type WebSocket from "ws";
import { z } from "zod";
import type * as schema from "../../db/schema.js";
import logger from "../../logger.js";

export type WebhoncManagerConfig = {
host: string;
db: LibSQLDatabase<typeof schema>;
wsConnections: Set<WebSocket>;
};

const WebhoncOutgoingResponseSchema = z.object({
status: z.number(),
body: z.string(),
headers: z.record(z.string()),
correlationId: z.string(),
});

export type WebhoncOutgoingResponse = z.infer<
typeof WebhoncOutgoingResponseSchema
>;

export function isWebhoncOutgoingResponse(
value: unknown,
): value is WebhoncOutgoingResponse {
const parseResult = WebhoncOutgoingResponseSchema.safeParse(value);
if (!parseResult.success) {
logger.warn(
"Invalid webhonc outgoing response",
parseResult.error.format(),
);
}
return parseResult.success;
}
Loading

0 comments on commit f7b3fae

Please sign in to comment.