Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thumbnails generation #116

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/api/src/repositories/jobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {
ffmpegQueue,
ffprobeQueue,
flowProducer,
imageQueue,
outcomeQueue,
packageQueue,
pipelineQueue,
Expand All @@ -19,6 +20,7 @@ const allQueus = [
ffmpegQueue,
ffprobeQueue,
outcomeQueue,
imageQueue,
];

function findQueueByName(name: string): Queue {
Expand Down
26 changes: 26 additions & 0 deletions packages/api/src/routes/jobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
addToQueue,
DEFAULT_PACKAGE_NAME,
DEFAULT_SEGMENT_SIZE,
imageQueue,
packageQueue,
pipelineQueue,
transcodeQueue,
Expand Down Expand Up @@ -168,6 +169,31 @@ export const jobs = new Elysia()
},
},
)
.post(
"/image",
async ({ body }) => {
const jobId = await addToQueue(imageQueue, body, {
id: body.assetId,
});
return { jobId };
},
{
detail: {
summary: "Create image job",
tags: ["Jobs"],
},
body: t.Object({
assetId: t.String({
format: "uuid",
}),
}),
response: {
200: t.Object({
jobId: t.String(),
}),
},
},
)
.get(
"/jobs",
async ({ query }) => {
Expand Down
7 changes: 7 additions & 0 deletions packages/api/src/routes/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ export const storage = new Elysia()
url: await getStorageFileUrl(query.path),
type: "video",
};
case "png":
case "webp":
return {
mode: "url",
url: await getStorageFileUrl(query.path),
type: "image",
};
case "m4a":
case "mp3":
return {
Expand Down
2 changes: 1 addition & 1 deletion packages/api/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ export type StorageFolder = Static<typeof StorageFolderSchema>;
export const StorageFileSchema = t.Union([
t.Object({
mode: t.Literal("url"),
type: t.Union([t.Literal("video"), t.Literal("audio")]),
type: t.Union([t.Literal("video"), t.Literal("image"), t.Literal("audio")]),
url: t.String(),
}),
t.Object({
Expand Down
4 changes: 4 additions & 0 deletions packages/api/src/workers/outcome.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,9 @@ export const outcomeCallback: WorkerCallback<OutcomeData> = async ({ job }) => {
});
break;
}
case "image": {
// TODO: Store thumbnail in database.
break;
}
}
};
3 changes: 3 additions & 0 deletions packages/app/src/components/FilePreview.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ function Preview({ file }: { file: StorageFile }) {
<audio src={file.url} controls className="w-full max-w-lg mx-auto" />
);
}
if (file.type === "image") {
return <img src={file.url} className="w-full" />;
}
}
return null;
}
Expand Down
7 changes: 7 additions & 0 deletions packages/app/src/components/Image.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
interface ImageProps {
src: string;
}

export function Image({ src }: ImageProps) {
return <img src={src} />;
}
13 changes: 6 additions & 7 deletions packages/artisan/src/lib/file-helpers.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import * as fs from "node:fs/promises";
import { getS3SignedUrl } from "./s3";
import { getS3SignedUrl, getTextFromS3 } from "./s3";
import type { PartialInput, Stream } from "bolt";

export async function getBinaryPath(name: string) {
Expand Down Expand Up @@ -41,11 +40,11 @@ export interface MetaStruct {
}

/**
* Will fetch meta file when meta.json is found in path.
* @param path S3 dir
* Will fetch meta file when meta.json
* @param assetId
* @returns
*/
export async function getMetaStruct(path: string): Promise<MetaStruct> {
const text = await fs.readFile(`${path}/meta.json`, "utf8");
return JSON.parse(text.toString());
export async function getMetaStruct(assetId: string): Promise<MetaStruct> {
const text = await getTextFromS3(`transcode/${assetId}/meta.json`);
return JSON.parse(text);
}
14 changes: 14 additions & 0 deletions packages/artisan/src/lib/s3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { getSignedUrl } from "@aws-sdk/s3-request-presigner";
import { ConfiguredRetryStrategy } from "@smithy/util-retry";
import { lookup } from "mime-types";
import { S3SyncClient } from "s3-sync-client";
import { assert } from "shared/assert";
import { env } from "../env";
import type { PutObjectCommandInput } from "@aws-sdk/client-s3";
import type { CommandInput } from "s3-sync-client";
Expand Down Expand Up @@ -116,3 +117,16 @@ export async function getS3SignedUrl(
});
return url;
}

export async function getTextFromS3(remoteFilePath: string) {
const command = new GetObjectCommand({
Bucket: env.S3_BUCKET,
Key: remoteFilePath,
});
const response = await client.send(command);

const text = response.Body?.transformToString("utf-8");
assert(text, `Failed to get text from S3 "${remoteFilePath}"`);

return text;
}
79 changes: 79 additions & 0 deletions packages/artisan/src/workers/image.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import { addToQueue, outcomeQueue } from "bolt";
import { ffmpeg } from "../lib/ffmpeg";
import { getMetaStruct } from "../lib/file-helpers";
import { getS3SignedUrl, syncToS3 } from "../lib/s3";
import type { MetaStruct } from "../lib/file-helpers";
import type { ImageData, ImageResult, WorkerCallback } from "bolt";

export const imageCallback: WorkerCallback<ImageData, ImageResult> = async ({
job,
dir,
progressTracker,
}) => {
const metaStruct = await getMetaStruct(job.data.assetId);
const name = findStreamInputName(metaStruct);

if (!name) {
throw new Error("Failed to find suitable stream.");
}

const publicUrl = await getS3SignedUrl(
`transcode/${job.data.assetId}/${name}`,
60 * 30,
);

const outDir = await dir.createTempDir();

const outputOptions = ["-ss 00:00:01.000", "-vframes 1"];

await ffmpeg(
publicUrl,
`${outDir}/thumbnail.png`,
outputOptions,
(command) => {
job.log(command);
},
(value) => {
progressTracker.set("screenshot", value);
},
);

const s3Dir = `image/${job.data.assetId}`;
job.log(`Uploading to ${s3Dir}`);

await syncToS3(outDir, s3Dir, {
public: true,
});

await addToQueue(
outcomeQueue,
{
type: "image",
data: job.data,
},
{
options: {
removeOnComplete: true,
},
},
);

return {
assetId: job.data.assetId,
};
};

function findStreamInputName(metaStruct: MetaStruct): string | null {
let name: string | null = null;
let lastHeight = 0;

const entries = Object.entries(metaStruct.streams);
for (const [key, value] of entries) {
if (value.type === "video" && value.height > lastHeight) {
name = key;
lastHeight = value.height;
}
}

return name;
}
5 changes: 5 additions & 0 deletions packages/artisan/src/workers/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { runWorkers } from "bolt";
import { ffmpegCallback } from "./ffmpeg";
import { ffprobeCallback } from "./ffprobe";
import { imageCallback } from "./image";
import { packageCallback } from "./package";
import { pipelineCallback } from "./pipeline";
import { transcodeCallback } from "./transcode";
Expand All @@ -26,4 +27,8 @@ runWorkers([
name: "pipeline",
callback: pipelineCallback,
},
{
name: "image",
callback: imageCallback,
},
]);
62 changes: 10 additions & 52 deletions packages/artisan/src/workers/package.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,68 +3,24 @@ import { execa } from "execa";
import parseFilePath from "parse-filepath";
import { getBinaryPath, getMetaStruct } from "../lib/file-helpers";
import { syncFromS3, syncToS3 } from "../lib/s3";
import type {
PackageData,
PackageResult,
Stream,
WorkerCallback,
WorkerDir,
} from "bolt";
import type { Job } from "bullmq";
import type { PackageData, PackageResult, Stream, WorkerCallback } from "bolt";

const packagerBin = await getBinaryPath("packager");

enum Step {
Initial,
Outcome,
Finish,
}

export const packageCallback: WorkerCallback<
PackageData & { step?: Step },
PackageData,
PackageResult
> = async ({ job, dir }) => {
let step = job.data.step ?? Step.Initial;
while (step !== Step.Finish) {
switch (step) {
case Step.Initial: {
await handleStepInitial(job, dir);
await job.updateData({
...job.data,
step: Step.Outcome,
});
step = Step.Outcome;
break;
}

case Step.Outcome: {
await handleJobOutcome(job);
await job.updateData({
...job.data,
step: Step.Finish,
});
step = Step.Finish;
break;
}
}
}
const inDir = await dir.createTempDir();

return {
assetId: job.data.assetId,
};
};
const meta = await getMetaStruct(job.data.assetId);

async function handleStepInitial(job: Job<PackageData>, dir: WorkerDir) {
const inDir = await dir.createTempDir();
job.log(`Got meta: "${JSON.stringify(meta)}"`);

await syncFromS3(`transcode/${job.data.assetId}`, inDir);

job.log(`Synced folder in ${inDir}`);

const meta = await getMetaStruct(inDir);

job.log(`Got meta: "${JSON.stringify(meta)}"`);

// If we do not specify the segmentSize, grab it from the meta file.
const segmentSize = job.data.segmentSize ?? meta.segmentSize;

Expand Down Expand Up @@ -143,9 +99,7 @@ async function handleStepInitial(job: Job<PackageData>, dir: WorkerDir) {
del: true,
public: true,
});
}

async function handleJobOutcome(job: Job<PackageData>) {
await addToQueue(
outcomeQueue,
{
Expand All @@ -158,7 +112,11 @@ async function handleJobOutcome(job: Job<PackageData>) {
},
},
);
}

return {
assetId: job.data.assetId,
};
};

function getGroupId(
stream:
Expand Down
11 changes: 11 additions & 0 deletions packages/artisan/src/workers/pipeline.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {
addToQueue,
imageQueue,
packageQueue,
transcodeQueue,
waitForChildren,
Expand Down Expand Up @@ -88,4 +89,14 @@ async function handleStepContinue(job: Job<PipelineData>, token?: string) {
parent: job,
},
);

await addToQueue(
imageQueue,
{
assetId: job.data.assetId,
},
{
parent: job,
},
);
}
4 changes: 4 additions & 0 deletions packages/bolt/src/queue-result.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,7 @@ export interface PackageResult {
export interface PipelineResult {
assetId: string;
}

export interface ImageResult {
assetId: string;
}
Loading