From a8e8b931f676c38dcd20013cabaed7f78fd33b44 Mon Sep 17 00:00:00 2001 From: Matthias Van Parijs Date: Sat, 16 Nov 2024 23:06:39 +0100 Subject: [PATCH 1/6] Added thumbnails worker --- packages/api/src/repositories/jobs.ts | 2 + packages/api/src/routes/jobs.ts | 26 ++++++++ packages/api/src/routes/storage.ts | 7 ++ packages/api/src/types.ts | 2 +- packages/app/src/components/FilePreview.tsx | 3 + packages/artisan/src/lib/s3.ts | 13 ++++ packages/artisan/src/workers/index.ts | 5 ++ packages/artisan/src/workers/thumbnails.ts | 71 +++++++++++++++++++++ packages/bolt/src/queue-result.ts | 4 ++ packages/bolt/src/queue.ts | 8 +++ 10 files changed, 140 insertions(+), 1 deletion(-) create mode 100644 packages/artisan/src/workers/thumbnails.ts diff --git a/packages/api/src/repositories/jobs.ts b/packages/api/src/repositories/jobs.ts index 5110cc3..567eb73 100644 --- a/packages/api/src/repositories/jobs.ts +++ b/packages/api/src/repositories/jobs.ts @@ -5,6 +5,7 @@ import { outcomeQueue, packageQueue, pipelineQueue, + thumbnailsQueue, transcodeQueue, } from "bolt"; import { Job as RawJob } from "bullmq"; @@ -19,6 +20,7 @@ const allQueus = [ ffmpegQueue, ffprobeQueue, outcomeQueue, + thumbnailsQueue, ]; function findQueueByName(name: string): Queue { diff --git a/packages/api/src/routes/jobs.ts b/packages/api/src/routes/jobs.ts index 15b3f91..c61abc7 100644 --- a/packages/api/src/routes/jobs.ts +++ b/packages/api/src/routes/jobs.ts @@ -5,6 +5,7 @@ import { DEFAULT_SEGMENT_SIZE, packageQueue, pipelineQueue, + thumbnailsQueue, transcodeQueue, } from "bolt"; import { AudioCodec, VideoCodec } from "bolt"; @@ -168,6 +169,31 @@ export const jobs = new Elysia() }, }, ) + .post( + "/thumbnails", + async ({ body }) => { + const jobId = await addToQueue(thumbnailsQueue, body, { + id: body.assetId, + }); + return { jobId }; + }, + { + detail: { + summary: "Create thumbnails job", + tags: ["Jobs"], + }, + body: t.Object({ + assetId: t.String({ + format: "uuid", + }), + }), + response: { + 200: t.Object({ + jobId: t.String(), + }), + }, + }, + ) .get( "/jobs", async ({ query }) => { diff --git a/packages/api/src/routes/storage.ts b/packages/api/src/routes/storage.ts index b3cad92..62ca4cd 100644 --- a/packages/api/src/routes/storage.ts +++ b/packages/api/src/routes/storage.ts @@ -46,6 +46,13 @@ export const storage = new Elysia() url: await getStorageFileUrl(query.path), type: "video", }; + case "png": + case "webp": + return { + mode: "url", + url: await getStorageFileUrl(query.path), + type: "image", + }; case "m3u8": case "json": case "vtt": diff --git a/packages/api/src/types.ts b/packages/api/src/types.ts index 1156c0c..0f9522b 100644 --- a/packages/api/src/types.ts +++ b/packages/api/src/types.ts @@ -58,7 +58,7 @@ export type StorageFolder = Static; export const StorageFileSchema = t.Union([ t.Object({ mode: t.Literal("url"), - type: t.Union([t.Literal("video")]), + type: t.Union([t.Literal("video"), t.Literal("image")]), url: t.String(), }), t.Object({ diff --git a/packages/app/src/components/FilePreview.tsx b/packages/app/src/components/FilePreview.tsx index 74757f8..37b2687 100644 --- a/packages/app/src/components/FilePreview.tsx +++ b/packages/app/src/components/FilePreview.tsx @@ -54,6 +54,9 @@ function Preview({ file }: { file: StorageFile }) { /> ); } + if (file.type === "image") { + return ; + } } return null; } diff --git a/packages/artisan/src/lib/s3.ts b/packages/artisan/src/lib/s3.ts index e946d3a..f579997 100644 --- a/packages/artisan/src/lib/s3.ts +++ b/packages/artisan/src/lib/s3.ts @@ -1,9 +1,11 @@ import { createReadStream } from "node:fs"; +import { writeFile } from "node:fs/promises"; import { GetObjectCommand, S3 } from "@aws-sdk/client-s3"; import { Upload } from "@aws-sdk/lib-storage"; import { getSignedUrl } from "@aws-sdk/s3-request-presigner"; import { ConfiguredRetryStrategy } from "@smithy/util-retry"; import { lookup } from "mime-types"; +import parseFilePath from "parse-filepath"; import { S3SyncClient } from "s3-sync-client"; import { env } from "../env"; import type { PutObjectCommandInput } from "@aws-sdk/client-s3"; @@ -116,3 +118,14 @@ export async function getS3SignedUrl( }); return url; } + +export async function getFromS3(remoteFilePath: string, localDir: string) { + const filePath = parseFilePath(remoteFilePath); + const command = new GetObjectCommand({ + Bucket: env.S3_BUCKET, + Key: remoteFilePath, + }); + const response = await client.send(command); + // @ts-expect-error Body is a Readable + await writeFile(`${localDir}/${filePath.base}`, response.Body); +} diff --git a/packages/artisan/src/workers/index.ts b/packages/artisan/src/workers/index.ts index 60321c6..ae90722 100644 --- a/packages/artisan/src/workers/index.ts +++ b/packages/artisan/src/workers/index.ts @@ -3,6 +3,7 @@ import { ffmpegCallback } from "./ffmpeg"; import { ffprobeCallback } from "./ffprobe"; import { packageCallback } from "./package"; import { pipelineCallback } from "./pipeline"; +import { thumbnailsCallback } from "./thumbnails"; import { transcodeCallback } from "./transcode"; runWorkers([ @@ -26,4 +27,8 @@ runWorkers([ name: "pipeline", callback: pipelineCallback, }, + { + name: "thumbnails", + callback: thumbnailsCallback, + }, ]); diff --git a/packages/artisan/src/workers/thumbnails.ts b/packages/artisan/src/workers/thumbnails.ts new file mode 100644 index 0000000..5c4e892 --- /dev/null +++ b/packages/artisan/src/workers/thumbnails.ts @@ -0,0 +1,71 @@ +import { ffmpeg } from "../lib/ffmpeg"; +import { getMetaStruct } from "../lib/file-helpers"; +import { getFromS3, getS3SignedUrl, syncToS3 } from "../lib/s3"; +import type { MetaStruct } from "../lib/file-helpers"; +import type { ThumbnailsData, ThumbnailsResult, WorkerCallback } from "bolt"; + +export const thumbnailsCallback: WorkerCallback< + ThumbnailsData, + ThumbnailsResult +> = async ({ job, dir, progressTracker }) => { + const tmpDir = await dir.createTempDir(); + + // TODO: Do not write meta file to local disk but stream it instead. + // Do the same for package job. + await getFromS3(`transcode/${job.data.assetId}/meta.json`, tmpDir); + + const metaStruct = await getMetaStruct(tmpDir); + const name = findStreamInputName(metaStruct); + + if (!name) { + throw new Error("Failed to find suitable stream."); + } + + const publicUrl = await getS3SignedUrl( + `transcode/${job.data.assetId}/${name}`, + 60 * 30, + ); + + const outDir = await dir.createTempDir(); + + const outputOptions = ["-ss 00:00:01.000", "-vframes 1"]; + + await ffmpeg( + publicUrl, + `${outDir}/0.png`, + outputOptions, + (command) => { + job.log(command); + }, + (value) => { + progressTracker.set("ffmpeg", value); + }, + ); + + const s3Dir = `thumbnails/${job.data.assetId}`; + job.log(`Uploading to ${s3Dir}`); + + await syncToS3(outDir, s3Dir, { + del: true, + public: true, + }); + + return { + assetId: job.data.assetId, + }; +}; + +function findStreamInputName(metaStruct: MetaStruct): string | null { + let name: string | null = null; + let lastHeight = 0; + + const entries = Object.entries(metaStruct.streams); + for (const [key, value] of entries) { + if (value.type === "video" && value.height > lastHeight) { + name = key; + lastHeight = value.height; + } + } + + return name; +} diff --git a/packages/bolt/src/queue-result.ts b/packages/bolt/src/queue-result.ts index 410bee1..5f41b8d 100644 --- a/packages/bolt/src/queue-result.ts +++ b/packages/bolt/src/queue-result.ts @@ -33,3 +33,7 @@ export interface PackageResult { export interface PipelineResult { assetId: string; } + +export interface ThumbnailsResult { + assetId: string; +} diff --git a/packages/bolt/src/queue.ts b/packages/bolt/src/queue.ts index cc9a35a..7e29531 100644 --- a/packages/bolt/src/queue.ts +++ b/packages/bolt/src/queue.ts @@ -71,3 +71,11 @@ export type OutcomeData = export const outcomeQueue = new Queue("outcome", { connection, }); + +export interface ThumbnailsData { + assetId: string; +} + +export const thumbnailsQueue = new Queue("thumbnails", { + connection, +}); From 2c70e18f9a00e2f9a97d98cc89fe82ac3e1fb83d Mon Sep 17 00:00:00 2001 From: Matthias Van Parijs Date: Sat, 16 Nov 2024 23:17:05 +0100 Subject: [PATCH 2/6] Download meta struct as text from S3 --- packages/app/src/components/Image.tsx | 7 +++++++ packages/artisan/src/lib/file-helpers.ts | 13 ++++++------- packages/artisan/src/lib/s3.ts | 13 +++++++------ packages/artisan/src/workers/package.ts | 8 ++++---- packages/artisan/src/workers/thumbnails.ts | 10 ++-------- 5 files changed, 26 insertions(+), 25 deletions(-) create mode 100644 packages/app/src/components/Image.tsx diff --git a/packages/app/src/components/Image.tsx b/packages/app/src/components/Image.tsx new file mode 100644 index 0000000..96d0d1a --- /dev/null +++ b/packages/app/src/components/Image.tsx @@ -0,0 +1,7 @@ +interface ImageProps { + src: string; +} + +export function Image({ src }: ImageProps) { + return ; +} diff --git a/packages/artisan/src/lib/file-helpers.ts b/packages/artisan/src/lib/file-helpers.ts index 6e85961..df21e04 100644 --- a/packages/artisan/src/lib/file-helpers.ts +++ b/packages/artisan/src/lib/file-helpers.ts @@ -1,5 +1,4 @@ -import * as fs from "node:fs/promises"; -import { getS3SignedUrl } from "./s3"; +import { getS3SignedUrl, getTextFromS3 } from "./s3"; import type { PartialInput, Stream } from "bolt"; export async function getBinaryPath(name: string) { @@ -41,11 +40,11 @@ export interface MetaStruct { } /** - * Will fetch meta file when meta.json is found in path. - * @param path S3 dir + * Will fetch meta file when meta.json + * @param assetId * @returns */ -export async function getMetaStruct(path: string): Promise { - const text = await fs.readFile(`${path}/meta.json`, "utf8"); - return JSON.parse(text.toString()); +export async function getMetaStruct(assetId: string): Promise { + const text = await getTextFromS3(`transcode/${assetId}/meta.json`); + return JSON.parse(text); } diff --git a/packages/artisan/src/lib/s3.ts b/packages/artisan/src/lib/s3.ts index f579997..860ffe2 100644 --- a/packages/artisan/src/lib/s3.ts +++ b/packages/artisan/src/lib/s3.ts @@ -1,12 +1,11 @@ import { createReadStream } from "node:fs"; -import { writeFile } from "node:fs/promises"; import { GetObjectCommand, S3 } from "@aws-sdk/client-s3"; import { Upload } from "@aws-sdk/lib-storage"; import { getSignedUrl } from "@aws-sdk/s3-request-presigner"; import { ConfiguredRetryStrategy } from "@smithy/util-retry"; import { lookup } from "mime-types"; -import parseFilePath from "parse-filepath"; import { S3SyncClient } from "s3-sync-client"; +import { assert } from "shared/assert"; import { env } from "../env"; import type { PutObjectCommandInput } from "@aws-sdk/client-s3"; import type { CommandInput } from "s3-sync-client"; @@ -119,13 +118,15 @@ export async function getS3SignedUrl( return url; } -export async function getFromS3(remoteFilePath: string, localDir: string) { - const filePath = parseFilePath(remoteFilePath); +export async function getTextFromS3(remoteFilePath: string) { const command = new GetObjectCommand({ Bucket: env.S3_BUCKET, Key: remoteFilePath, }); const response = await client.send(command); - // @ts-expect-error Body is a Readable - await writeFile(`${localDir}/${filePath.base}`, response.Body); + + const text = response.Body?.transformToString("utf-8"); + assert(text, `Failed to get text from S3 "${remoteFilePath}"`); + + return text; } diff --git a/packages/artisan/src/workers/package.ts b/packages/artisan/src/workers/package.ts index 49cff2b..aa49f7e 100644 --- a/packages/artisan/src/workers/package.ts +++ b/packages/artisan/src/workers/package.ts @@ -57,13 +57,13 @@ export const packageCallback: WorkerCallback< async function handleStepInitial(job: Job, dir: WorkerDir) { const inDir = await dir.createTempDir(); - await syncFromS3(`transcode/${job.data.assetId}`, inDir); + const meta = await getMetaStruct(job.data.assetId); - job.log(`Synced folder in ${inDir}`); + job.log(`Got meta: "${JSON.stringify(meta)}"`); - const meta = await getMetaStruct(inDir); + await syncFromS3(`transcode/${job.data.assetId}`, inDir); - job.log(`Got meta: "${JSON.stringify(meta)}"`); + job.log(`Synced folder in ${inDir}`); // If we do not specify the segmentSize, grab it from the meta file. const segmentSize = job.data.segmentSize ?? meta.segmentSize; diff --git a/packages/artisan/src/workers/thumbnails.ts b/packages/artisan/src/workers/thumbnails.ts index 5c4e892..3a231ec 100644 --- a/packages/artisan/src/workers/thumbnails.ts +++ b/packages/artisan/src/workers/thumbnails.ts @@ -1,6 +1,6 @@ import { ffmpeg } from "../lib/ffmpeg"; import { getMetaStruct } from "../lib/file-helpers"; -import { getFromS3, getS3SignedUrl, syncToS3 } from "../lib/s3"; +import { getS3SignedUrl, syncToS3 } from "../lib/s3"; import type { MetaStruct } from "../lib/file-helpers"; import type { ThumbnailsData, ThumbnailsResult, WorkerCallback } from "bolt"; @@ -8,13 +8,7 @@ export const thumbnailsCallback: WorkerCallback< ThumbnailsData, ThumbnailsResult > = async ({ job, dir, progressTracker }) => { - const tmpDir = await dir.createTempDir(); - - // TODO: Do not write meta file to local disk but stream it instead. - // Do the same for package job. - await getFromS3(`transcode/${job.data.assetId}/meta.json`, tmpDir); - - const metaStruct = await getMetaStruct(tmpDir); + const metaStruct = await getMetaStruct(job.data.assetId); const name = findStreamInputName(metaStruct); if (!name) { From ffb5b1e750ba99129298cc2b06f313477b16e5bb Mon Sep 17 00:00:00 2001 From: Matthias Van Parijs Date: Sun, 17 Nov 2024 14:02:38 +0100 Subject: [PATCH 3/6] Renamed to image worker --- packages/api/src/repositories/jobs.ts | 4 ++-- packages/api/src/routes/jobs.ts | 8 ++++---- .../src/workers/{thumbnails.ts => image.ts} | 17 +++++++++-------- packages/artisan/src/workers/index.ts | 6 +++--- packages/bolt/src/queue-result.ts | 2 +- packages/bolt/src/queue.ts | 4 ++-- 6 files changed, 21 insertions(+), 20 deletions(-) rename packages/artisan/src/workers/{thumbnails.ts => image.ts} (79%) diff --git a/packages/api/src/repositories/jobs.ts b/packages/api/src/repositories/jobs.ts index 567eb73..d44060c 100644 --- a/packages/api/src/repositories/jobs.ts +++ b/packages/api/src/repositories/jobs.ts @@ -2,10 +2,10 @@ import { ffmpegQueue, ffprobeQueue, flowProducer, + imageQueue, outcomeQueue, packageQueue, pipelineQueue, - thumbnailsQueue, transcodeQueue, } from "bolt"; import { Job as RawJob } from "bullmq"; @@ -20,7 +20,7 @@ const allQueus = [ ffmpegQueue, ffprobeQueue, outcomeQueue, - thumbnailsQueue, + imageQueue, ]; function findQueueByName(name: string): Queue { diff --git a/packages/api/src/routes/jobs.ts b/packages/api/src/routes/jobs.ts index c61abc7..1f717fc 100644 --- a/packages/api/src/routes/jobs.ts +++ b/packages/api/src/routes/jobs.ts @@ -3,9 +3,9 @@ import { addToQueue, DEFAULT_PACKAGE_NAME, DEFAULT_SEGMENT_SIZE, + imageQueue, packageQueue, pipelineQueue, - thumbnailsQueue, transcodeQueue, } from "bolt"; import { AudioCodec, VideoCodec } from "bolt"; @@ -170,16 +170,16 @@ export const jobs = new Elysia() }, ) .post( - "/thumbnails", + "/image", async ({ body }) => { - const jobId = await addToQueue(thumbnailsQueue, body, { + const jobId = await addToQueue(imageQueue, body, { id: body.assetId, }); return { jobId }; }, { detail: { - summary: "Create thumbnails job", + summary: "Create image job", tags: ["Jobs"], }, body: t.Object({ diff --git a/packages/artisan/src/workers/thumbnails.ts b/packages/artisan/src/workers/image.ts similarity index 79% rename from packages/artisan/src/workers/thumbnails.ts rename to packages/artisan/src/workers/image.ts index 3a231ec..38829c1 100644 --- a/packages/artisan/src/workers/thumbnails.ts +++ b/packages/artisan/src/workers/image.ts @@ -2,12 +2,13 @@ import { ffmpeg } from "../lib/ffmpeg"; import { getMetaStruct } from "../lib/file-helpers"; import { getS3SignedUrl, syncToS3 } from "../lib/s3"; import type { MetaStruct } from "../lib/file-helpers"; -import type { ThumbnailsData, ThumbnailsResult, WorkerCallback } from "bolt"; +import type { ImageData, ImageResult, WorkerCallback } from "bolt"; -export const thumbnailsCallback: WorkerCallback< - ThumbnailsData, - ThumbnailsResult -> = async ({ job, dir, progressTracker }) => { +export const imageCallback: WorkerCallback = async ({ + job, + dir, + progressTracker, +}) => { const metaStruct = await getMetaStruct(job.data.assetId); const name = findStreamInputName(metaStruct); @@ -26,17 +27,17 @@ export const thumbnailsCallback: WorkerCallback< await ffmpeg( publicUrl, - `${outDir}/0.png`, + `${outDir}/thumbnail.png`, outputOptions, (command) => { job.log(command); }, (value) => { - progressTracker.set("ffmpeg", value); + progressTracker.set("screenshot", value); }, ); - const s3Dir = `thumbnails/${job.data.assetId}`; + const s3Dir = `screenshots/${job.data.assetId}`; job.log(`Uploading to ${s3Dir}`); await syncToS3(outDir, s3Dir, { diff --git a/packages/artisan/src/workers/index.ts b/packages/artisan/src/workers/index.ts index ae90722..2f126bf 100644 --- a/packages/artisan/src/workers/index.ts +++ b/packages/artisan/src/workers/index.ts @@ -1,9 +1,9 @@ import { runWorkers } from "bolt"; import { ffmpegCallback } from "./ffmpeg"; import { ffprobeCallback } from "./ffprobe"; +import { imageCallback } from "./image"; import { packageCallback } from "./package"; import { pipelineCallback } from "./pipeline"; -import { thumbnailsCallback } from "./thumbnails"; import { transcodeCallback } from "./transcode"; runWorkers([ @@ -28,7 +28,7 @@ runWorkers([ callback: pipelineCallback, }, { - name: "thumbnails", - callback: thumbnailsCallback, + name: "image", + callback: imageCallback, }, ]); diff --git a/packages/bolt/src/queue-result.ts b/packages/bolt/src/queue-result.ts index 5f41b8d..0be3c12 100644 --- a/packages/bolt/src/queue-result.ts +++ b/packages/bolt/src/queue-result.ts @@ -34,6 +34,6 @@ export interface PipelineResult { assetId: string; } -export interface ThumbnailsResult { +export interface ImageResult { assetId: string; } diff --git a/packages/bolt/src/queue.ts b/packages/bolt/src/queue.ts index 7e29531..2d697e8 100644 --- a/packages/bolt/src/queue.ts +++ b/packages/bolt/src/queue.ts @@ -72,10 +72,10 @@ export const outcomeQueue = new Queue("outcome", { connection, }); -export interface ThumbnailsData { +export interface ImageData { assetId: string; } -export const thumbnailsQueue = new Queue("thumbnails", { +export const imageQueue = new Queue("image", { connection, }); From b6fbe7e1fba9c9d7e0368cfb9da067f68ad8b096 Mon Sep 17 00:00:00 2001 From: Matthias Van Parijs Date: Sun, 17 Nov 2024 14:03:01 +0100 Subject: [PATCH 4/6] Changed path to image --- packages/artisan/src/workers/image.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/artisan/src/workers/image.ts b/packages/artisan/src/workers/image.ts index 38829c1..64b7b88 100644 --- a/packages/artisan/src/workers/image.ts +++ b/packages/artisan/src/workers/image.ts @@ -37,7 +37,7 @@ export const imageCallback: WorkerCallback = async ({ }, ); - const s3Dir = `screenshots/${job.data.assetId}`; + const s3Dir = `image/${job.data.assetId}`; job.log(`Uploading to ${s3Dir}`); await syncToS3(outDir, s3Dir, { From bc695de2ac7ab36205cec9c368f65edfaba28d53 Mon Sep 17 00:00:00 2001 From: Matthias Van Parijs Date: Sun, 17 Nov 2024 14:07:47 +0100 Subject: [PATCH 5/6] Do not delete image folder --- packages/artisan/src/workers/image.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/artisan/src/workers/image.ts b/packages/artisan/src/workers/image.ts index 64b7b88..a2d79ae 100644 --- a/packages/artisan/src/workers/image.ts +++ b/packages/artisan/src/workers/image.ts @@ -41,7 +41,6 @@ export const imageCallback: WorkerCallback = async ({ job.log(`Uploading to ${s3Dir}`); await syncToS3(outDir, s3Dir, { - del: true, public: true, }); From a0f1e2a4df50c9bb025348ec6d9562b137cdf27a Mon Sep 17 00:00:00 2001 From: Matthias Van Parijs Date: Mon, 18 Nov 2024 19:02:06 +0100 Subject: [PATCH 6/6] Added outcome job for images --- packages/api/src/workers/outcome.ts | 4 ++ packages/artisan/src/workers/image.ts | 14 ++++++ packages/artisan/src/workers/package.ts | 56 +++--------------------- packages/artisan/src/workers/pipeline.ts | 11 +++++ packages/bolt/src/queue.ts | 4 ++ 5 files changed, 40 insertions(+), 49 deletions(-) diff --git a/packages/api/src/workers/outcome.ts b/packages/api/src/workers/outcome.ts index 1bb1cb1..2f7f2bf 100644 --- a/packages/api/src/workers/outcome.ts +++ b/packages/api/src/workers/outcome.ts @@ -26,5 +26,9 @@ export const outcomeCallback: WorkerCallback = async ({ job }) => { }); break; } + case "image": { + // TODO: Store thumbnail in database. + break; + } } }; diff --git a/packages/artisan/src/workers/image.ts b/packages/artisan/src/workers/image.ts index a2d79ae..2183898 100644 --- a/packages/artisan/src/workers/image.ts +++ b/packages/artisan/src/workers/image.ts @@ -1,3 +1,4 @@ +import { addToQueue, outcomeQueue } from "bolt"; import { ffmpeg } from "../lib/ffmpeg"; import { getMetaStruct } from "../lib/file-helpers"; import { getS3SignedUrl, syncToS3 } from "../lib/s3"; @@ -44,6 +45,19 @@ export const imageCallback: WorkerCallback = async ({ public: true, }); + await addToQueue( + outcomeQueue, + { + type: "image", + data: job.data, + }, + { + options: { + removeOnComplete: true, + }, + }, + ); + return { assetId: job.data.assetId, }; diff --git a/packages/artisan/src/workers/package.ts b/packages/artisan/src/workers/package.ts index aa49f7e..c2dfa4f 100644 --- a/packages/artisan/src/workers/package.ts +++ b/packages/artisan/src/workers/package.ts @@ -3,58 +3,14 @@ import { execa } from "execa"; import parseFilePath from "parse-filepath"; import { getBinaryPath, getMetaStruct } from "../lib/file-helpers"; import { syncFromS3, syncToS3 } from "../lib/s3"; -import type { - PackageData, - PackageResult, - Stream, - WorkerCallback, - WorkerDir, -} from "bolt"; -import type { Job } from "bullmq"; +import type { PackageData, PackageResult, Stream, WorkerCallback } from "bolt"; const packagerBin = await getBinaryPath("packager"); -enum Step { - Initial, - Outcome, - Finish, -} - export const packageCallback: WorkerCallback< - PackageData & { step?: Step }, + PackageData, PackageResult > = async ({ job, dir }) => { - let step = job.data.step ?? Step.Initial; - while (step !== Step.Finish) { - switch (step) { - case Step.Initial: { - await handleStepInitial(job, dir); - await job.updateData({ - ...job.data, - step: Step.Outcome, - }); - step = Step.Outcome; - break; - } - - case Step.Outcome: { - await handleJobOutcome(job); - await job.updateData({ - ...job.data, - step: Step.Finish, - }); - step = Step.Finish; - break; - } - } - } - - return { - assetId: job.data.assetId, - }; -}; - -async function handleStepInitial(job: Job, dir: WorkerDir) { const inDir = await dir.createTempDir(); const meta = await getMetaStruct(job.data.assetId); @@ -143,9 +99,7 @@ async function handleStepInitial(job: Job, dir: WorkerDir) { del: true, public: true, }); -} -async function handleJobOutcome(job: Job) { await addToQueue( outcomeQueue, { @@ -158,7 +112,11 @@ async function handleJobOutcome(job: Job) { }, }, ); -} + + return { + assetId: job.data.assetId, + }; +}; function getGroupId( stream: diff --git a/packages/artisan/src/workers/pipeline.ts b/packages/artisan/src/workers/pipeline.ts index db7dd56..40760d1 100644 --- a/packages/artisan/src/workers/pipeline.ts +++ b/packages/artisan/src/workers/pipeline.ts @@ -1,5 +1,6 @@ import { addToQueue, + imageQueue, packageQueue, transcodeQueue, waitForChildren, @@ -88,4 +89,14 @@ async function handleStepContinue(job: Job, token?: string) { parent: job, }, ); + + await addToQueue( + imageQueue, + { + assetId: job.data.assetId, + }, + { + parent: job, + }, + ); } diff --git a/packages/bolt/src/queue.ts b/packages/bolt/src/queue.ts index 2d697e8..6890a70 100644 --- a/packages/bolt/src/queue.ts +++ b/packages/bolt/src/queue.ts @@ -66,6 +66,10 @@ export type OutcomeData = | { type: "package"; data: PackageData; + } + | { + type: "image"; + data: ImageData; }; export const outcomeQueue = new Queue("outcome", {