Skip to content

Commit

Permalink
feat/realtime-streams (#1470)
Browse files Browse the repository at this point in the history
* WIP realtime streams

* Handle realtime with large payloads or outputs #1451

* feat: optimize Redis stream handling with batching

Add STREAM_ORIGIN to environment schema. Improve performance in
RealtimeStreams by using TextDecoderStream for simpler text
decoding and implementing batching of XADD commands for Redis
streams. Limit stream size using MAXLEN option. Update
environment variable repository with new variable type. Adjust
import statements for Redis key and value types.

* 🔧 chore: add dev dependencies for bundle analysis

* add metadata tests and a few more utilties

* Add stream tests and improve streaming

* Added AI tool tasks, descriptions to tasks

* Use the config file path to determine the workingDir, then the package.json path

* Remove stream test files

* useTaskTrigger react hook that allows triggering a task from the client

* Add streaming support for the realtime react hooks

* Add ability to stream results after useTaskTrigger

* Improve the stream throttling

* Use the runId as the ID key to bust the cache after triggering

* Upgrade to to the latest electric sql client and server

* Make realtime server backwards compat with 3.1.2 release

* Pass the runId into useRealtimeRun

* Fix scopes when specifiying reading all runs

* WIP @trigger.dev/rsc package

* Various fixes and accepted recommendations by CodeRabbit

* Regenerate pnpm lock file

* A couple tweaks to rsc and give up on rendering react in tasks for now

* Add changeset

* Remove triggerRequest from the useEffect deps

* Improve realtime & frontend authentication errors

* Fixed authorization tests

* Remove unnecessary log

* Add metadata.stream limits and improve the metadata streams structure

* Streams can now have up to 2500 entries

* Various coderabbit fixes

* additional react-hooks jsdocs
  • Loading branch information
ericallam authored Nov 19, 2024
1 parent ea09564 commit 23b43be
Show file tree
Hide file tree
Showing 112 changed files with 6,054 additions and 1,051 deletions.
10 changes: 10 additions & 0 deletions .changeset/swift-glasses-mate.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
"@trigger.dev/react-hooks": patch
"@trigger.dev/sdk": patch
"trigger.dev": patch
"@trigger.dev/build": patch
"@trigger.dev/core": patch
"@trigger.dev/rsc": patch
---

Realtime streams
17 changes: 17 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,15 @@
"cwd": "${workspaceFolder}",
"sourceMaps": true
},
{
"type": "node-terminal",
"request": "launch",
"name": "Debug realtimeStreams.test.ts",
"command": "pnpm run test -t RealtimeStreams",
"envFile": "${workspaceFolder}/.env",
"cwd": "${workspaceFolder}/apps/webapp",
"sourceMaps": true
},
{
"type": "chrome",
"request": "launch",
Expand All @@ -36,6 +45,14 @@
"cwd": "${workspaceFolder}/references/v3-catalog",
"sourceMaps": true
},
{
"type": "node-terminal",
"request": "launch",
"name": "Debug Dev Next.js Realtime",
"command": "pnpm exec trigger dev",
"cwd": "${workspaceFolder}/references/nextjs-realtime",
"sourceMaps": true
},
{
"type": "node-terminal",
"request": "launch",
Expand Down
1 change: 1 addition & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const EnvironmentSchema = z.object({
LOGIN_ORIGIN: z.string().default("http://localhost:3030"),
APP_ORIGIN: z.string().default("http://localhost:3030"),
API_ORIGIN: z.string().optional(),
STREAM_ORIGIN: z.string().optional(),
ELECTRIC_ORIGIN: z.string().default("http://localhost:3060"),
APP_ENV: z.string().default(process.env.NODE_ENV),
SERVICE_NAME: z.string().default("trigger.dev webapp"),
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/presenters/v3/SpanPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ export class SpanPresenter extends BasePresenter {
const span = await eventRepository.getSpan(spanId, run.traceId);

const metadata = run.metadata
? await prettyPrintPacket(run.metadata, run.metadataType)
? await prettyPrintPacket(run.metadata, run.metadataType, { filteredKeys: ["$$streams"] })
: undefined;

const context = {
Expand Down
42 changes: 21 additions & 21 deletions apps/webapp/app/routes/api.v1.packets.$.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { ActionFunctionArgs } from "@remix-run/server-runtime";
import { json } from "@remix-run/server-runtime";
import { z } from "zod";
import { authenticateApiRequest } from "~/services/apiAuth.server";
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { generatePresignedUrl } from "~/v3/r2.server";

const ParamsSchema = z.object({
Expand Down Expand Up @@ -39,28 +40,27 @@ export async function action({ request, params }: ActionFunctionArgs) {
return json({ presignedUrl });
}

export async function loader({ request, params }: ActionFunctionArgs) {
// Next authenticate the request
const authenticationResult = await authenticateApiRequest(request);
export const loader = createLoaderApiRoute(
{
params: ParamsSchema,
allowJWT: true,
corsStrategy: "all",
},
async ({ params, authentication }) => {
const filename = params["*"];

if (!authenticationResult) {
return json({ error: "Invalid or Missing API key" }, { status: 401 });
}
const presignedUrl = await generatePresignedUrl(
authentication.environment.project.externalRef,
authentication.environment.slug,
filename,
"GET"
);

const parsedParams = ParamsSchema.parse(params);
const filename = parsedParams["*"];

const presignedUrl = await generatePresignedUrl(
authenticationResult.environment.project.externalRef,
authenticationResult.environment.slug,
filename,
"GET"
);
if (!presignedUrl) {
return json({ error: "Failed to generate presigned URL" }, { status: 500 });
}

if (!presignedUrl) {
return json({ error: "Failed to generate presigned URL" }, { status: 500 });
// Caller can now use this URL to fetch that object.
return json({ presignedUrl });
}

// Caller can now use this URL to fetch that object.
return json({ presignedUrl });
}
);
2 changes: 1 addition & 1 deletion apps/webapp/app/routes/api.v1.projects.$projectRef.runs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
ApiRunListPresenter,
ApiRunListSearchParams,
} from "~/presenters/v3/ApiRunListPresenter.server";
import { createLoaderPATApiRoute } from "~/services/routeBuiilders/apiBuilder.server";
import { createLoaderPATApiRoute } from "~/services/routeBuilders/apiBuilder.server";

const ParamsSchema = z.object({
projectRef: z.string(),
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/routes/api.v1.runs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {
ApiRunListPresenter,
ApiRunListSearchParams,
} from "~/presenters/v3/ApiRunListPresenter.server";
import { createLoaderApiRoute } from "~/services/routeBuiilders/apiBuilder.server";
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";

export const loader = createLoaderApiRoute(
{
Expand Down
216 changes: 112 additions & 104 deletions apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
import { fromZodError } from "zod-validation-error";
import type { ActionFunctionArgs } from "@remix-run/server-runtime";
import { json } from "@remix-run/server-runtime";
import { TriggerTaskRequestBody } from "@trigger.dev/core/v3";
import { generateJWT as internal_generateJWT, TriggerTaskRequestBody } from "@trigger.dev/core/v3";
import { TaskRun } from "@trigger.dev/database";
import { z } from "zod";
import { env } from "~/env.server";
import { authenticateApiRequest } from "~/services/apiAuth.server";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { parseRequestJsonAsync } from "~/utils/parseRequestJson.server";
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { ServiceValidationError } from "~/v3/services/baseService.server";
import { OutOfEntitlementError, TriggerTaskService } from "~/v3/services/triggerTask.server";
import { startActiveSpan } from "~/v3/tracer.server";

const ParamsSchema = z.object({
taskId: z.string(),
Expand All @@ -20,115 +18,125 @@ export const HeadersSchema = z.object({
"trigger-version": z.string().nullish(),
"x-trigger-span-parent-as-link": z.coerce.number().nullish(),
"x-trigger-worker": z.string().nullish(),
"x-trigger-client": z.string().nullish(),
traceparent: z.string().optional(),
tracestate: z.string().optional(),
});

export async function action({ request, params }: ActionFunctionArgs) {
// Ensure this is a POST request
if (request.method.toUpperCase() !== "POST") {
return { status: 405, body: "Method Not Allowed" };
}

logger.debug("TriggerTask action", { headers: Object.fromEntries(request.headers) });

// Next authenticate the request
const authenticationResult = await authenticateApiRequest(request);

if (!authenticationResult) {
return json({ error: "Invalid or Missing API key" }, { status: 401 });
}

const contentLength = request.headers.get("content-length");

if (!contentLength || parseInt(contentLength) > env.TASK_PAYLOAD_MAXIMUM_SIZE) {
return json({ error: "Request body too large" }, { status: 413 });
}
const { action, loader } = createActionApiRoute(
{
headers: HeadersSchema,
params: ParamsSchema,
body: TriggerTaskRequestBody,
allowJWT: true,
maxContentLength: env.TASK_PAYLOAD_MAXIMUM_SIZE,
authorization: {
action: "write",
resource: (params) => ({ tasks: params.taskId }),
superScopes: ["write:tasks", "admin"],
},
corsStrategy: "all",
},
async ({ body, headers, params, authentication }) => {
const {
"idempotency-key": idempotencyKey,
"trigger-version": triggerVersion,
"x-trigger-span-parent-as-link": spanParentAsLink,
traceparent,
tracestate,
"x-trigger-worker": isFromWorker,
"x-trigger-client": triggerClient,
} = headers;

const service = new TriggerTaskService();

try {
const traceContext =
traceparent && isFromWorker /// If the request is from a worker, we should pass the trace context
? { traceparent, tracestate }
: undefined;

logger.debug("Triggering task", {
taskId: params.taskId,
idempotencyKey,
triggerVersion,
headers,
options: body.options,
isFromWorker,
traceContext,
});

const run = await service.call(params.taskId, authentication.environment, body, {
idempotencyKey: idempotencyKey ?? undefined,
triggerVersion: triggerVersion ?? undefined,
traceContext,
spanParentAsLink: spanParentAsLink === 1,
});

if (!run) {
return json({ error: "Task not found" }, { status: 404 });
}

const rawHeaders = Object.fromEntries(request.headers);
const $responseHeaders = await responseHeaders(
run,
authentication.environment,
triggerClient
);

const headers = HeadersSchema.safeParse(rawHeaders);
return json(
{
id: run.friendlyId,
},
{
headers: $responseHeaders,
}
);
} catch (error) {
if (error instanceof ServiceValidationError) {
return json({ error: error.message }, { status: 422 });
} else if (error instanceof OutOfEntitlementError) {
return json({ error: error.message }, { status: 422 });
} else if (error instanceof Error) {
return json({ error: error.message }, { status: 500 });
}

if (!headers.success) {
return json({ error: "Invalid headers" }, { status: 400 });
return json({ error: "Something went wrong" }, { status: 500 });
}
}

const {
"idempotency-key": idempotencyKey,
"trigger-version": triggerVersion,
"x-trigger-span-parent-as-link": spanParentAsLink,
traceparent,
tracestate,
"x-trigger-worker": isFromWorker,
} = headers.data;

const { taskId } = ParamsSchema.parse(params);

// Now parse the request body
const anyBody = await parseRequestJsonAsync(request, { taskId });

const body = await startActiveSpan("TriggerTaskRequestBody.safeParse()", async (span) => {
return TriggerTaskRequestBody.safeParse(anyBody);
);

async function responseHeaders(
run: TaskRun,
environment: AuthenticatedEnvironment,
triggerClient?: string | null
): Promise<Record<string, string>> {
const claimsHeader = JSON.stringify({
sub: environment.id,
pub: true,
});

if (!body.success) {
return json(
{ error: fromZodError(body.error, { prefix: "Invalid trigger call" }).toString() },
{ status: 400 }
);
}

const service = new TriggerTaskService();

try {
const traceContext =
traceparent && isFromWorker /// If the request is from a worker, we should pass the trace context
? { traceparent, tracestate }
: undefined;

logger.debug("Triggering task", {
taskId,
idempotencyKey,
triggerVersion,
headers: Object.fromEntries(request.headers),
options: body.data.options,
isFromWorker,
traceContext,
if (triggerClient === "browser") {
const claims = {
sub: environment.id,
pub: true,
scopes: [`read:runs:${run.friendlyId}`],
};

const jwt = await internal_generateJWT({
secretKey: environment.apiKey,
payload: claims,
expirationTime: "1h",
});

const run = await service.call(taskId, authenticationResult.environment, body.data, {
idempotencyKey: idempotencyKey ?? undefined,
triggerVersion: triggerVersion ?? undefined,
traceContext,
spanParentAsLink: spanParentAsLink === 1,
});

if (!run) {
return json({ error: "Task not found" }, { status: 404 });
}

return json(
{
id: run.friendlyId,
},
{
headers: {
"x-trigger-jwt-claims": JSON.stringify({
sub: authenticationResult.environment.id,
pub: true,
}),
},
}
);
} catch (error) {
if (error instanceof ServiceValidationError) {
return json({ error: error.message }, { status: 422 });
} else if (error instanceof OutOfEntitlementError) {
return json({ error: error.message }, { status: 422 });
} else if (error instanceof Error) {
return json({ error: error.message }, { status: 400 });
}

return json({ error: "Something went wrong" }, { status: 500 });
return {
"x-trigger-jwt-claims": claimsHeader,
"x-trigger-jwt": jwt,
};
}

return {
"x-trigger-jwt-claims": claimsHeader,
};
}

export { action, loader };
2 changes: 1 addition & 1 deletion apps/webapp/app/routes/api.v3.runs.$runId.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { json } from "@remix-run/server-runtime";
import { z } from "zod";
import { ApiRetrieveRunPresenter } from "~/presenters/v3/ApiRetrieveRunPresenter.server";
import { createLoaderApiRoute } from "~/services/routeBuiilders/apiBuilder.server";
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";

const ParamsSchema = z.object({
runId: z.string(),
Expand Down
Loading

0 comments on commit 23b43be

Please sign in to comment.