Skip to content

Commit

Permalink
Internal packages (testcontainers, redis-worker and zod-worker) (#1392)
Browse files Browse the repository at this point in the history
* Some notes on the new run engine

* lockfile with setup for the run engine

* Documenting where TaskRun is currently mutated, to try figure out the shape of the new system

* Added notes about how triggering currently works

* Details about when triggering happens

* Lots of notes about waitpoints

* Started scaffolding the RunEngine

* Sketch of Prisma waitpoint schema while it’s fresh in my mind

* Got Prisma working with testcontainers

* Use beforeEach/afterEach

* Simple Prisma and Redis test

* Return Redis options instead of a client

* Simplified things

* A very simple FIFO pull-based queue to check the tests working properly

* Use vitest extend

* Separate redis, postgres and combined tests for faster testing

* Some fixes and test improvements

* Pass a logger into the queue

* A queue processor that processes items from the given queue as fast as it can

* Test for retrying an item that wasn’t processed

* First draft of waitpoints in the Prisma schema

* Remove the custom logger from the test

* Added a completedAt to Waitpoint

* Notes on the flow for an execution starting

* Added redlock, moved some files around

* Starting point for the TaskRunExecutionSnapshot table

* Added relationships to TaskRunExecutionSnapshot

* Change some tsconfig

* Moved some things around

* Added some packages

* WIP on the RunQueue

* Fix for some imports

* Key producer with some tests

* Removed the nv type from the keys… it’s not useful to do global queries

* Passing unit tests for all the public key producer functions

* Some basic tests passing for the RunQueue

* Simple enqueue test working

* Enqueue and dequeue for dev is working

* Don’t log everything during the tests

* Enqueuing/dequeuing from the shared queue is working

* Tests for getting a shared queue

* The key producer sharedQueue can now be named, to allow multiple separate queues

* The key producer uses the name of the queue as the input

* Extra info in the Prisma schema

* Dequeuing a message gets the payload and sets the task concurrency all in one Lua script

* Adding more keys so we can read the concurrency from the queue

* Setting the concurrency with dequeue and enquque is working

* Improved the tests and fixed some bugs

* Acking is resetting the concurrencies

* Check the key has been removed after acking

* Nacking is working

* Changed the package to CommonJS + Node10 so it works with Redlock

* Moved the database, otel and emails packages to be in internal-packages

* Moved some Prisma code to the database package

* Started using the RunEngine for triggering

* Progress on run engine triggering, first waitpoint code

* Create a delay waitpoint

* Moved ZodWorker to an internal package so it can be used in the run engine as well as the webapp

* Web app now uses the zod worker package

* Added parseNaturalLanguageDuration to core/apps

* internal-packages/zod-worker in the lockfile

* Pass in the master queue, remove old rebalance workers code

* Add masterQueue to TaskRun

* Fixed the tests

* Moved waitpoint code into the run engine, also the zod worker

* Completing waitpoints

* An experiment to create a new test container with environment

* More changes to triggering

* Started testing triggering

* Test for a run getting triggered and being enqueued

* Removed dequeueMessageInEnv

* Update dev queue tests to use the shared queue function

* Schema changes for TaskRunExecutionSnapshot

* First execution snapshot when the run is created. Dequeue run function added to the engine

* Separate internal package for testcontainers so they can be used elsewhere

* Remove the simple queue and testcontainers from the run-engine. They’re going to be separate

* Fix for the wrong path to the Prisma schem,a

* Added the testcontainers package to the run-engine

* redis-worker package, just a copy of the simple queue for now

* The queue now uses Lua to enqueue dequeue

* The queue now has a catalog and an invisible period after dequeuing

* Added a visibility timeout and acking, with tests

* Added more Redis connection logging, deleted todos

* Visibility timeouts are now defined on the catalog and can be overridden when enqueuing

* Dequeue multiple items at once

* Test for dequeuing multiple items

* Export some types to be used elsewhere

* Partial refactor of the processor

* First stab at a worker with concurrency and NodeWorkers

* Don’t have a default visibility timeout in the queue

* Worker setup and processing items in a simple test

* Process jobs in parallel with retrying

* Get the attempt when dequeuing

* Workers do exponential backoff

* Moved todos

* DLQ functionality

* DLQ tests

* Same cluster for all keys in the same queue

* Added DLQ tests

* Whitespace

* Redis pubsub to redrive from the worker

* Fixed database paths

* Fix for path to zod-worker

* Fixes for typecheck errors, mostly with TS versions and module resolution

* Redlock required a patch

* Moved the new DB migrations to the new database package folder

* Remove the run-engine package

* Remove the RunEngine prisma schema changes

* Delete triggerTaskV2

* Remove zodworker test script (no tests)

* Update test-containers readme

* Generate the client first

* Use a specific version of the prisma package

* Generate the prisma client before running the unit tests
  • Loading branch information
matt-aitken authored Oct 8, 2024
1 parent fc60947 commit f2babbf
Show file tree
Hide file tree
Showing 648 changed files with 2,925 additions and 768 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,8 @@ jobs:
- name: 📥 Download deps
run: pnpm install --frozen-lockfile

- name: 📀 Generate Prisma Client
run: pnpm run generate

- name: 🧪 Run Unit Tests
run: pnpm run test
4 changes: 2 additions & 2 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[submodule "packages/otlp-importer/protos"]
path = packages/otlp-importer/protos
[submodule "internal-packages/otlp-importer/protos"]
path = internal-packages/otlp-importer/protos
url = https://github.com/open-telemetry/opentelemetry-proto.git
68 changes: 21 additions & 47 deletions apps/webapp/app/db.server.ts
Original file line number Diff line number Diff line change
@@ -1,77 +1,51 @@
import { Prisma, PrismaClient } from "@trigger.dev/database";
import {
Prisma,
PrismaClient,
PrismaClientOrTransaction,
PrismaReplicaClient,
PrismaTransactionClient,
PrismaTransactionOptions,
} from "@trigger.dev/database";
import invariant from "tiny-invariant";
import { z } from "zod";
import { env } from "./env.server";
import { logger } from "./services/logger.server";
import { isValidDatabaseUrl } from "./utils/db";
import { singleton } from "./utils/singleton";
import { $transaction as transac } from "@trigger.dev/database";

export type PrismaTransactionClient = Omit<
PrismaClient,
"$connect" | "$disconnect" | "$on" | "$transaction" | "$use" | "$extends"
>;

export type PrismaClientOrTransaction = PrismaClient | PrismaTransactionClient;

function isTransactionClient(prisma: PrismaClientOrTransaction): prisma is PrismaTransactionClient {
return !("$transaction" in prisma);
}

function isPrismaKnownError(error: unknown): error is Prisma.PrismaClientKnownRequestError {
return (
typeof error === "object" && error !== null && "code" in error && typeof error.code === "string"
);
}

export type PrismaTransactionOptions = {
/** The maximum amount of time (in ms) Prisma Client will wait to acquire a transaction from the database. The default value is 2000ms. */
maxWait?: number;

/** The maximum amount of time (in ms) the interactive transaction can run before being canceled and rolled back. The default value is 5000ms. */
timeout?: number;

/** Sets the transaction isolation level. By default this is set to the value currently configured in your database. */
isolationLevel?: Prisma.TransactionIsolationLevel;

swallowPrismaErrors?: boolean;
export type {
PrismaTransactionClient,
PrismaClientOrTransaction,
PrismaTransactionOptions,
PrismaReplicaClient,
};

export async function $transaction<R>(
prisma: PrismaClientOrTransaction,
fn: (prisma: PrismaTransactionClient) => Promise<R>,
options?: PrismaTransactionOptions
): Promise<R | undefined> {
if (isTransactionClient(prisma)) {
return fn(prisma);
}

try {
return await (prisma as PrismaClient).$transaction(fn, options);
} catch (error) {
if (isPrismaKnownError(error)) {
return transac(
prisma,
fn,
(error) => {
logger.error("prisma.$transaction error", {
code: error.code,
meta: error.meta,
stack: error.stack,
message: error.message,
name: error.name,
});

if (options?.swallowPrismaErrors) {
return;
}
}

throw error;
}
},
options
);
}

export { Prisma };

export const prisma = singleton("prisma", getClient);

export type PrismaReplicaClient = Omit<PrismaClient, "$transaction">;

export const $replica: PrismaReplicaClient = singleton(
"replica",
() => getReplicaClient() ?? prisma
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import { Paragraph } from "~/components/primitives/Paragraph";
import { Sheet, SheetBody, SheetContent, SheetHeader } from "~/components/primitives/Sheet";
import { ClientEndpoint } from "~/presenters/EnvironmentsPresenter.server";
import { endpointStreamingPath } from "~/utils/pathBuilder";
import { EndpointIndexStatus, RuntimeEnvironmentType } from "../../../../../packages/database/src";
import { EndpointIndexStatus, RuntimeEnvironmentType } from "@trigger.dev/database";
import { bodySchema } from "../resources.environments.$environmentParam.endpoint";

type ConfigureEndpointSheetProps = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import {
projectEnvironmentsStreamingPath,
} from "~/utils/pathBuilder";
import { requestUrl } from "~/utils/requestUrl.server";
import { RuntimeEnvironmentType } from "../../../../../packages/database/src";
import { RuntimeEnvironmentType } from "@trigger.dev/database";
import { ConfigureEndpointSheet } from "./ConfigureEndpointSheet";
import { FirstEndpointSheet } from "./FirstEndpointSheet";
import { BookOpenIcon } from "@heroicons/react/20/solid";
Expand Down
4 changes: 2 additions & 2 deletions apps/webapp/app/services/runExecutionRateLimiter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
import { JobHelpers, Task } from "graphile-worker";
import { singleton } from "~/utils/singleton";
import { logger } from "./logger.server";
import { ZodWorkerRateLimiter } from "~/platform/zodWorker.server";
import { ZodWorkerRateLimiter } from "@internal/zod-worker";
import {
ConcurrencyLimitGroup,
JobRun,
Expand Down Expand Up @@ -117,7 +117,7 @@ if currentSize < maxSize then
return true
else
redis.call('SADD', forbiddenFlagsKey, forbiddenFlag)
return false
end
`,
Expand Down
13 changes: 10 additions & 3 deletions apps/webapp/app/services/worker.server.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { DeliverEmailSchema } from "@/../../packages/emails/src";
import { DeliverEmailSchema } from "emails";
import { ScheduledPayloadSchema, addMissingVersionField } from "@trigger.dev/core";
import { ZodWorker } from "@internal/zod-worker";
import { z } from "zod";
import { prisma } from "~/db.server";
import { $replica, prisma } from "~/db.server";
import { env } from "~/env.server";
import { ZodWorker } from "~/platform/zodWorker.server";
import { MarqsConcurrencyMonitor } from "~/v3/marqs/concurrencyMonitor.server";
import { RequeueV2Message } from "~/v3/marqs/requeueV2Message.server";
import { RequeueTaskRunService } from "~/v3/requeueTaskRun.server";
Expand Down Expand Up @@ -54,6 +54,7 @@ import {
CancelDevSessionRunsService,
CancelDevSessionRunsServiceOptions,
} from "~/v3/services/cancelDevSessionRuns.server";
import { logger } from "./logger.server";

const workerCatalog = {
indexEndpoint: z.object({
Expand Down Expand Up @@ -279,6 +280,7 @@ function getWorkerQueue() {
return new ZodWorker({
name: "workerQueue",
prisma,
replica: $replica,
runnerOptions: {
connectionString: env.DATABASE_URL,
concurrency: env.WORKER_CONCURRENCY,
Expand All @@ -287,6 +289,7 @@ function getWorkerQueue() {
schema: env.WORKER_SCHEMA,
maxPoolSize: env.WORKER_CONCURRENCY + 1,
},
logger: logger,
shutdownTimeoutInMs: env.GRACEFUL_SHUTDOWN_TIMEOUT,
schema: workerCatalog,
recurringTasks: {
Expand Down Expand Up @@ -732,6 +735,8 @@ function getExecutionWorkerQueue() {
return new ZodWorker({
name: "executionWorker",
prisma,
replica: $replica,
logger: logger,
runnerOptions: {
connectionString: env.DATABASE_URL,
concurrency: env.EXECUTION_WORKER_CONCURRENCY,
Expand Down Expand Up @@ -786,6 +791,8 @@ function getTaskOperationWorkerQueue() {
return new ZodWorker({
name: "taskOperationWorker",
prisma,
replica: $replica,
logger: logger,
runnerOptions: {
connectionString: env.DATABASE_URL,
concurrency: env.TASK_OPERATION_WORKER_CONCURRENCY,
Expand Down
8 changes: 1 addition & 7 deletions apps/webapp/app/v3/friendlyIdentifiers.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1 @@
import { customAlphabet } from "nanoid";

const idGenerator = customAlphabet("123456789abcdefghijkmnopqrstuvwxyz", 21);

export function generateFriendlyId(prefix: string, size?: number) {
return `${prefix}_${idGenerator(size)}`;
}
export { generateFriendlyId } from "@trigger.dev/core/v3/apps";
2 changes: 1 addition & 1 deletion apps/webapp/app/v3/services/enqueueDelayedRun.server.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { parseNaturalLanguageDuration } from "@trigger.dev/core/v3/apps";
import { $transaction } from "~/db.server";
import { logger } from "~/services/logger.server";
import { marqs } from "~/v3/marqs/index.server";
import { BaseService } from "./baseService.server";
import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server";
import { parseNaturalLanguageDuration } from "./triggerTask.server";

export class EnqueueDelayedRunService extends BaseService {
public async call(runId: string) {
Expand Down
53 changes: 1 addition & 52 deletions apps/webapp/app/v3/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { isFinalAttemptStatus, isFinalRunStatus } from "../taskStatus";
import { createTag, MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server";
import { findCurrentWorkerFromEnvironment } from "../models/workerDeployment.server";
import { handleMetadataPacket } from "~/utils/packets";
import { parseNaturalLanguageDuration } from "@trigger.dev/core/v3/apps";
import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server";
import { guardQueueSizeLimitsForEnv } from "../queueSizeLimits.server";
import { clampMaxDuration } from "../utils/maxDuration";
Expand Down Expand Up @@ -646,58 +647,6 @@ export async function parseDelay(value?: string | Date): Promise<Date | undefine
}
}

export function parseNaturalLanguageDuration(duration: string): Date | undefined {
const regexPattern = /^(\d+w)?(\d+d)?(\d+h)?(\d+m)?(\d+s)?$/;

const result: Date = new Date();
let hasMatch = false;

const elements = duration.match(regexPattern);
if (elements) {
if (elements[1]) {
const weeks = Number(elements[1].slice(0, -1));
if (weeks >= 0) {
result.setDate(result.getDate() + 7 * weeks);
hasMatch = true;
}
}
if (elements[2]) {
const days = Number(elements[2].slice(0, -1));
if (days >= 0) {
result.setDate(result.getDate() + days);
hasMatch = true;
}
}
if (elements[3]) {
const hours = Number(elements[3].slice(0, -1));
if (hours >= 0) {
result.setHours(result.getHours() + hours);
hasMatch = true;
}
}
if (elements[4]) {
const minutes = Number(elements[4].slice(0, -1));
if (minutes >= 0) {
result.setMinutes(result.getMinutes() + minutes);
hasMatch = true;
}
}
if (elements[5]) {
const seconds = Number(elements[5].slice(0, -1));
if (seconds >= 0) {
result.setSeconds(result.getSeconds() + seconds);
hasMatch = true;
}
}
}

if (hasMatch) {
return result;
}

return undefined;
}

function stringifyDuration(seconds: number): string | undefined {
if (seconds <= 0) {
return;
Expand Down
2 changes: 2 additions & 0 deletions apps/webapp/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
"@electric-sql/react": "^0.3.5",
"@headlessui/react": "^1.7.8",
"@heroicons/react": "^2.0.12",
"@internal/run-engine": "workspace:*",
"@internal/zod-worker": "workspace:*",
"@internationalized/date": "^3.5.1",
"@lezer/highlight": "^1.1.6",
"@opentelemetry/api": "1.9.0",
Expand Down
16 changes: 10 additions & 6 deletions apps/webapp/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,18 @@
"@trigger.dev/sdk/*": ["../../packages/trigger-sdk/src/*"],
"@trigger.dev/core": ["../../packages/core/src/index"],
"@trigger.dev/core/*": ["../../packages/core/src/*"],
"@trigger.dev/database": ["../../packages/database/src/index"],
"@trigger.dev/database/*": ["../../packages/database/src/*"],
"@trigger.dev/database": ["../../internal-packages/database/src/index"],
"@trigger.dev/database/*": ["../../internal-packages/database/src/*"],
"@trigger.dev/yalt": ["../../packages/yalt/src/index"],
"@trigger.dev/yalt/*": ["../../packages/yalt/src/*"],
"@trigger.dev/otlp-importer": ["../../packages/otlp-importer/src/index"],
"@trigger.dev/otlp-importer/*": ["../../packages/otlp-importer/src/*"],
"emails": ["../../packages/emails/src/index"],
"emails/*": ["../../packages/emails/src/*"]
"@trigger.dev/otlp-importer": ["../../internal-packages/otlp-importer/src/index"],
"@trigger.dev/otlp-importer/*": ["../../internal-packages/otlp-importer/src/*"],
"emails": ["../../internal-packages/emails/src/index"],
"emails/*": ["../../internal-packages/emails/src/*"],
"@internal/run-engine": ["../../internal-packages/run-engine/src/index"],
"@internal/run-engine/*": ["../../internal-packages/run-engine/src/*"],
"@internal/zod-worker": ["../../internal-packages/zod-worker/src/index"],
"@internal/zod-worker/*": ["../../internal-packages/zod-worker/src/*"]
},
"noEmit": true
}
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Loading

0 comments on commit f2babbf

Please sign in to comment.