Skip to content

Commit

Permalink
Session based stitching with in memory cache (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
matvp91 authored Aug 22, 2024
1 parent 24a3af2 commit c503f80
Show file tree
Hide file tree
Showing 73 changed files with 6,033 additions and 367 deletions.
2 changes: 2 additions & 0 deletions notes.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
Run redis with `redis-stack-server`, `@redis/json` is required.

```json
{
"inputs": [
Expand Down
1 change: 0 additions & 1 deletion packages/api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
"@fastify/swagger": "^8.15.0",
"@fastify/swagger-ui": "^4.0.1",
"@mixwave/artisan": "workspace:*",
"@mixwave/stitcher": "workspace:*",
"@ts-rest/core": "^3.49.3",
"@ts-rest/fastify": "^3.49.3",
"@ts-rest/open-api": "^3.49.3",
Expand Down
2 changes: 0 additions & 2 deletions packages/api/src/client.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
export { streamSchema, inputSchema } from "@mixwave/artisan/schemas";

export { playlistParamsSchema } from "@mixwave/stitcher/schemas";

export * from "./contract";

export type { JobDto, JobNodeDto } from "./types";
12 changes: 1 addition & 11 deletions packages/api/src/contract.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { initContract } from "@ts-rest/core";
import { streamSchema, inputSchema } from "@mixwave/artisan/schemas";
import { playlistParamsSchema } from "@mixwave/stitcher/schemas";
import * as z from "zod";
import type { JobDto, JobNodeDto } from "./types.js";

Expand All @@ -11,6 +10,7 @@ export const postTranscodeBodySchema = z.object({
streams: z.array(streamSchema),
segmentSize: z.number(),
assetId: z.string().optional(),
package: z.boolean().optional(),
});

export const postPackageBodySchema = z.object({
Expand Down Expand Up @@ -56,16 +56,6 @@ export const contract = c.router({
200: c.type<string[]>(),
},
},
postPlaylist: {
method: "POST",
path: "/playlist/:assetId",
body: playlistParamsSchema,
responses: {
200: c.type<{
url: string;
}>(),
},
},
getSpec: {
method: "GET",
path: "/spec.json",
Expand Down
3 changes: 1 addition & 2 deletions packages/api/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,5 @@ export const env = parse(
PORT: z.coerce.number(),
REDIS_HOST: z.string(),
REDIS_PORT: z.coerce.number(),
STITCHER_URL: z.string(),
})
}),
);
16 changes: 6 additions & 10 deletions packages/api/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import { bullBoardPlugin } from "./plugins/bull-board.js";
import { initServer } from "@ts-rest/fastify";
import { addTranscodeJob, addPackageJob } from "@mixwave/artisan/producer";
import { getJobs, getJob, getRootTreeForJobById, getJobLogs } from "./jobs.js";
import { getPlaylistUrl } from "./playlist.js";
import { generateOpenApi } from "@ts-rest/open-api";
import { randomUUID } from "crypto";

async function buildServer() {
const app = Fastify();
Expand All @@ -18,7 +18,11 @@ async function buildServer() {

const router = s.router(contract, {
postTranscode: async ({ body }) => {
const { job } = await addTranscodeJob(body);
const job = await addTranscodeJob({
assetId: randomUUID(),
package: false,
...body,
});
return {
status: 201,
body: { jobId: job.id },
Expand Down Expand Up @@ -52,14 +56,6 @@ async function buildServer() {
body: await getJobLogs(params.id),
};
},
postPlaylist: async ({ params, body }) => {
return {
status: 200,
body: {
url: await getPlaylistUrl(params.assetId, body),
},
};
},
getSpec: async () => {
return {
status: 200,
Expand Down
18 changes: 11 additions & 7 deletions packages/api/src/jobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,17 @@ export async function getJobs(): Promise<JobDto[]> {
}

async function formatJobDto(job: Job): Promise<JobDto> {
if (!job.id) {
throw new Error("Missing jobId");
}

let progress = 0;
if (typeof job.progress === "number") {
progress = job.progress;
}

return {
id: `${job.queueName}_${job.id}`,
id: job.id,
name: job.name,
state: await job.getState(),
progress,
Expand All @@ -46,17 +50,17 @@ async function formatJobDto(job: Job): Promise<JobDto> {
};
}

export async function getJobLogs(prefixedId: string) {
const [queueName, id] = prefixedId.split("_");
export async function getJobLogs(id: string) {
const queueName = id.split("_", 1)[0];
const queue = findQueueByName(queueName);

const { logs } = await queue.getJobLogs(id);

return logs;
}

export async function getJob(prefixedId: string) {
const [queueName, id] = prefixedId.split("_");
export async function getJob(id: string) {
const queueName = id.split("_", 1)[0];
const queue = findQueueByName(queueName);

const job = await Job.fromId(queue, id);
Expand Down Expand Up @@ -104,8 +108,8 @@ export async function getRootTreeForJob(job: Job) {
return await formatJobNodeDto(node);
}

export async function getRootTreeForJobById(prefixedId: string) {
const [queueName, id] = prefixedId.split("_");
export async function getRootTreeForJobById(id: string) {
const queueName = id.split("_", 1)[0];
const queue = findQueueByName(queueName);

const job = await Job.fromId(queue, id);
Expand Down
8 changes: 0 additions & 8 deletions packages/api/src/playlist.ts

This file was deleted.

31 changes: 30 additions & 1 deletion packages/artisan/src/consumer/s3.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
import { GetObjectCommand, PutObjectCommand, S3 } from "@aws-sdk/client-s3";
import {
GetObjectCommand,
PutObjectCommand,
S3,
CopyObjectCommand,
DeleteObjectCommand,
} from "@aws-sdk/client-s3";
import { S3SyncClient } from "s3-sync-client";
import { env } from "./env.js";
import { basename } from "path";
Expand All @@ -7,6 +13,7 @@ import { existsSync } from "fs";
import { createReadStream } from "fs";
import type { Readable } from "stream";
import type { SyncOptions } from "s3-sync-client/dist/commands/SyncCommand";
import type { ObjectCannedACL } from "@aws-sdk/client-s3";

const client = new S3({
endpoint: env.S3_ENDPOINT,
Expand Down Expand Up @@ -72,3 +79,25 @@ export async function uploadJsonFile(key: string, content: string) {
}),
);
}

export async function copyFile(
name: string,
key: string,
acl?: ObjectCannedACL,
) {
await client.send(
new CopyObjectCommand({
Bucket: env.S3_BUCKET,
Key: key,
CopySource: `/${env.S3_BUCKET}/${name}`,
ACL: acl,
}),
);

await client.send(
new DeleteObjectCommand({
Bucket: env.S3_BUCKET,
Key: name,
}),
);
}
13 changes: 11 additions & 2 deletions packages/artisan/src/consumer/workers/package.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { lookup } from "mime-types";
import { fork } from "child_process";
import { createRequire } from "node:module";
import { by639_2T } from "iso-language-codes";
import { downloadFolder, uploadFolder } from "../s3.js";
import { copyFile, downloadFolder, uploadFolder } from "../s3.js";
import parseFilePath from "parse-filepath";
import * as z from "zod";
import { streamSchema } from "../../schemas.js";
Expand Down Expand Up @@ -85,7 +85,7 @@ export default async function (job: Job<PackageData, PackageResult>) {
"--fragment_duration",
"4",
"--hls_master_playlist_output",
"master.m3u8",
"master_tmp.m3u8",
);

const fakeRequire = createRequire(import.meta.url);
Expand All @@ -109,6 +109,15 @@ export default async function (job: Job<PackageData, PackageResult>) {
}),
});

// When we uploaded all files, including the "master_tmp" file, let's rename it so it
// becomes available on CDN.
// This way we ensure we have all the segments on S3 before we make the manifest available.
await copyFile(
`package/${job.data.assetId}/hls/master_tmp.m3u8`,
`package/${job.data.assetId}/hls/master.m3u8`,
"public-read",
);

job.updateProgress(100);

return {
Expand Down
18 changes: 12 additions & 6 deletions packages/artisan/src/consumer/workers/transcode.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import { getFakeJob } from "../../lib/job-helpers.js";
import { uploadJsonFile } from "../s3.js";
import type { Input, Stream } from "../../schemas.js";
import { addPackageJob } from "../../producer.js";
import type { Stream } from "../../schemas.js";
import type { FfmpegResult } from "./ffmpeg.js";
import type { Job } from "bullmq";

export type TranscodeData = {
assetId: string;
inputs: Input[];
streams: Stream[];
segmentSize: number;
package: boolean;
};

export type TranscodeResult = {
Expand All @@ -28,16 +27,23 @@ export default async function (job: Job<TranscodeData, TranscodeResult>) {
}
return acc;
},
{}
{},
);

await job.log(`Writing meta.json (${JSON.stringify(meta)})`);

await uploadJsonFile(
`transcode/${job.data.assetId}/meta.json`,
JSON.stringify(meta, null, 2)
JSON.stringify(meta, null, 2),
);

if (job.data.package) {
await job.log("Will queue package job");
await addPackageJob({
assetId: job.data.assetId,
});
}

job.updateProgress(100);

return {
Expand Down
Loading

0 comments on commit c503f80

Please sign in to comment.