Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(write): buffered write for speed #4

Merged
merged 2 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/fuse-client/syscalls/flush.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ export const flush: (backend: SQLiteBackend) => MountOptions["flush"] = (
) => {
return async (path, fd, cb) => {
console.log("flush(%s, %d)", path, fd);
await backend.flush(path);
cb(0);
};
};
7 changes: 6 additions & 1 deletion packages/fuse-client/syscalls/fsync.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
import { SQLiteBackend } from "@zoid-fs/sqlite-backend";
import { MountOptions } from "@zoid-fs/node-fuse-bindings";
import { flush } from "./flush";

export const fsync: (backend: SQLiteBackend) => MountOptions["fsync"] = (
backend
) => {
return async (path, fd, datasync, cb) => {
console.log("fsync(%s, %d, %d)", path, fd, datasync);
cb(0);
// @ts-expect-error TODO: implement fsync properly
// We do buffered writes and flush flushes the buffer!
// A program may not call flush but fsync without relenquishing fd (like SQLite)
// In our case currently, the implementation of fsync and flush is same!
flush(backend)(path, fd, cb);
};
};
2 changes: 1 addition & 1 deletion packages/fuse-client/syscalls/getxattr.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ export const getxattr: (backend: SQLiteBackend) => MountOptions["getxattr"] = (
) => {
return async (path, name, buffer, length, offset, cb) => {
console.log(
"getxattr(%s, %s, %s, %d, %d)",
"getxattr(%s, %s, %o, %d, %d)",
path,
name,
buffer,
Expand Down
16 changes: 4 additions & 12 deletions packages/fuse-client/syscalls/write.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,14 @@
import { SQLiteBackend } from "@zoid-fs/sqlite-backend";
import fuse, { MountOptions } from "@zoid-fs/node-fuse-bindings";
import { match } from "ts-pattern";
import { MountOptions } from "@zoid-fs/node-fuse-bindings";

export const write: (backend: SQLiteBackend) => MountOptions["write"] = (
backend
) => {
return async (path, fd, buf, len, pos, cb) => {
console.log("write(%s, %d, %d, %d)", path, fd, len, pos);
const chunk = Buffer.from(buf, pos, len);

const rChunk = await backend.writeFileChunk(path, chunk, pos, len);
match(rChunk)
.with({ status: "ok" }, () => {
cb(chunk.length);
})
.with({ status: "not_found" }, () => {
cb(fuse.ENOENT);
})
.exhaustive();
// TODO: This may throw (because of flush!, what should happen then?)
await backend.write(path, { content: chunk, offset: pos, size: len });
cb(chunk.length);
};
};
82 changes: 61 additions & 21 deletions packages/sqlite-backend/SQLiteBackend.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,23 @@
import { PrismaClient } from "@prisma/client";
import { Content, PrismaClient } from "@prisma/client";
import { Backend } from "@zoid-fs/common";
import { match } from "ts-pattern";
import path from "path";
import { constants } from "fs";
import { rawCreateMany } from "./prismaRawUtil";
import { WriteBuffer } from "./WriteBuffer";

export type ContentChunk = {
content: Buffer;
offset: number;
size: number;
};

const WRITE_BUFFER_SIZE = 10;

export class SQLiteBackend implements Backend {
private prisma: PrismaClient;
private readonly writeBuffers: Map<string, WriteBuffer<ContentChunk>> =
new Map();
private readonly prisma: PrismaClient;
constructor(prismaOrDbUrl?: PrismaClient | string) {
if (prismaOrDbUrl instanceof PrismaClient) {
this.prisma = prismaOrDbUrl;
Expand All @@ -24,6 +36,27 @@ export class SQLiteBackend implements Backend {
}
}

async write(filepath: string, chunk: ContentChunk) {
const writeBuffer = match(this.writeBuffers.has(filepath))
.with(true, () => this.writeBuffers.get(filepath))
.with(false, () => {
this.writeBuffers.set(
filepath,
new WriteBuffer(WRITE_BUFFER_SIZE, async (bufferSlice) => {
await this.writeFileChunks(filepath, bufferSlice);
})
);
return this.writeBuffers.get(filepath);
})
.exhaustive();
await writeBuffer!.write(chunk);
}

async flush(filepath: string) {
const writeBuffer = this.writeBuffers.get(filepath);
await writeBuffer?.flush();
}

async getFiles(dir: string) {
const files = await this.prisma.file.findMany({
where: {
Expand Down Expand Up @@ -189,29 +222,36 @@ export class SQLiteBackend implements Backend {
}
}

async writeFileChunk(
filepath: string,
content: Buffer,
offset: number,
size: number
) {
private async writeFileChunks(filepath: string, chunks: ContentChunk[]) {
if (chunks.length === 0) {
return {
status: "ok" as const,
chunks,
};
}

try {
const contentChunk = await this.prisma.content.create({
data: {
offset,
size,
content,
file: {
connect: {
path: filepath,
},
},
},
});
const rFile = await this.getFile(filepath);
const file = rFile.file;

await rawCreateMany<Omit<Content, "id">>(
this.prisma,
"Content",
["content", "offset", "size", "fileId"],
chunks.map((chunk) => {
const { content, offset, size } = chunk;
return {
content,
offset,
size,
fileId: file?.id,
};
})
);

return {
status: "ok" as const,
chunk: contentChunk,
chunks,
};
} catch (e) {
return {
Expand Down
22 changes: 22 additions & 0 deletions packages/sqlite-backend/WriteBuffer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
export class WriteBuffer<T> {
private buffer: Array<T> = [];

constructor(
private readonly size: number,
private readonly writer: (bufferSlice: T[]) => Promise<void>
) {}
async write(item: T): Promise<void> {
this.buffer.push(item);
if (this.buffer.length >= this.size) {
await this.flush();
}
// TODO: implement a time based flush, like, if there are no writes for 100ms
// call flush
}

async flush(): Promise<void> {
const bufferSlice = this.buffer.slice(0);
this.buffer = [];
await this.writer(bufferSlice);
}
}
1 change: 1 addition & 0 deletions packages/sqlite-backend/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export { SQLiteBackend } from "./SQLiteBackend";
export { PrismaClient } from "@prisma/client";
export { rawCreateMany } from "./prismaRawUtil";
3 changes: 2 additions & 1 deletion packages/sqlite-backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"dependencies": {
"@prisma/client": "5.4.2",
"@zoid-fs/common": "*",
"ts-pattern": "^5.0.5"
"sql-template-tag": "5.1.0",
"ts-pattern": "5.0.5"
}
}
50 changes: 50 additions & 0 deletions packages/sqlite-backend/prismaRawUtil.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { Prisma, PrismaClient } from "@prisma/client";
import { Value as SqlTagTemplateValue } from "sql-template-tag";

type Value = SqlTagTemplateValue | Buffer | Prisma.Sql;

type ValuesOrNestedSql<T> = {
[K in keyof T]: Value;
};

const formatSingleValue = (value: Value): SqlTagTemplateValue | Prisma.Sql => {
if (Buffer.isBuffer(value)) {
return Prisma.raw(`x'${value.toString("hex")}'`);
}
return value;
};

const formatRow = <T>(
columns: (keyof T)[],
row: ValuesOrNestedSql<T>
): Prisma.Sql =>
Prisma.sql`(${Prisma.join(
columns.map((column) => formatSingleValue(row[column])),
","
)})`;

const formatValuesList = <T>(
columns: (keyof T)[],
rows: ValuesOrNestedSql<T>[]
): Prisma.Sql => {
return Prisma.join(
rows.map((row) => formatRow(columns, row)),
",\n"
);
};

export const rawCreateMany = async <T>(
db: PrismaClient,
tableName: string,
columns: (keyof T)[],
values: ValuesOrNestedSql<T>[]
) => {
const query = Prisma.sql`
INSERT INTO
${Prisma.raw(tableName)}
(${Prisma.join((columns as string[]).map(Prisma.raw), ", ")})
VALUES
${formatValuesList(columns, values)};
`;
return db.$queryRaw(query);
};
Binary file added packages/zoid-fs-client/meta/fuji-road.jpeg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion packages/zoid-fs-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"license": "MIT",
"type": "module",
"scripts": {
"start": "vite-node --watch index.ts /home/divyendusingh/zoid/vfs/1",
"start": "vite-node --watch index.ts /home/divyenduz/Documents/zoid/vfs/1",
"test:prepare": "vite-node --watch index.ts /home/div/code/vfs/test-fs --tenant test",
"ci:setup-fuse": "vite-node --watch index.ts",
"test": "vitest",
Expand Down
Loading