Skip to content

Commit

Permalink
Fix Windows Ulimits (#7)
Browse files Browse the repository at this point in the history
Adds proper queue handling of new files to help work on windows.
  • Loading branch information
jtsmedley authored Mar 18, 2024
1 parent 973eb7e commit 1f47935
Show file tree
Hide file tree
Showing 5 changed files with 615 additions and 43 deletions.
10 changes: 7 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@filebase/sdk",
"version": "1.0.3",
"version": "1.0.4",
"description": "SDK for Interacting with Filebase Services [S3(Buckets, Objects), IPFS(Gateways, Pins) IPNS(Names)]",
"repository": {
"type": "git",
Expand Down Expand Up @@ -51,10 +51,14 @@
"@aws-sdk/client-s3": "3.478.0",
"@aws-sdk/lib-storage": "3.478.0",
"@helia/car": "1.0.4",
"@helia/mfs": "3.0.1",
"@helia/unixfs": "1.4.3",
"@ipld/car": "5.2.4",
"axios": "1.6.2",
"blockstore-fs": "1.1.8",
"uuid": "9.0.1"
"blockstore-fs": "1.1.10",
"datastore-core": "9.2.9",
"p-queue": "8.0.1",
"uuid": "9.0.1",
"winston": "3.12.0"
}
}
10 changes: 10 additions & 0 deletions src/logger.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import winston from "winston";
const { combine, timestamp, json } = winston.format;

const logger = winston.createLogger({
level: process.env.LOG_LEVEL || "info",
format: combine(timestamp(), json()),
transports: [new winston.transports.Console()],
});

export default logger;
156 changes: 144 additions & 12 deletions src/objectManager.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// Environment Imports
import logger from "./logger.js";
// S3 Imports
import {
CopyObjectCommand,
Expand All @@ -11,16 +13,20 @@ import { Upload } from "@aws-sdk/lib-storage";
// Helia Imports
import { CarWriter } from "@ipld/car";
import { car } from "@helia/car";
import { mfs } from "@helia/mfs";
import { unixfs } from "@helia/unixfs";
import { FsBlockstore } from "blockstore-fs";
import { MemoryDatastore } from "datastore-core";
// Utility Imports
import { once } from "node:events";
import { createReadStream, createWriteStream } from "node:fs";
import { mkdir, rm } from "node:fs/promises";
import { mkdir, rm, open } from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { Readable } from "node:stream";
import { v4 as uuidv4 } from "uuid";
import { downloadFromGateway } from "./helpers.js";
import PQueue from "p-queue";

/** Interacts with an S3 client to perform various operations on objects in a bucket. */
class ObjectManager {
Expand Down Expand Up @@ -149,6 +155,7 @@ class ObjectManager {
async upload(key, source, metadata, options) {
// Generate Upload UUID
const uploadUUID = uuidv4();
const uploadLogger = logger.child({ uploadUUID });

// Setup Upload Options
const bucket = options?.bucket || this.#defaultBucket,
Expand All @@ -172,6 +179,9 @@ class ObjectManager {
...uploadOptions.params.Metadata,
import: "car",
};
source.sort((a, b) => {
return countSlashes(b.path) - countSlashes(a.path);
});

let temporaryCarFilePath, temporaryBlockstoreDir;
try {
Expand All @@ -184,39 +194,156 @@ class ObjectManager {
);
temporaryCarFilePath = `${temporaryBlockstoreDir}/main.car`;
await mkdir(temporaryBlockstoreDir, { recursive: true });
const temporaryBlockstore = new FsBlockstore(temporaryBlockstoreDir);
const temporaryBlockstore = new FsBlockstore(temporaryBlockstoreDir),
temporaryDatastore = new MemoryDatastore();

let createdFiles = new Map();
const heliaFs = unixfs({
blockstore: temporaryBlockstore,
});
uploadLogger.verbose("UNIXFS_ADD", {
count: source.length,
});
let createFilePromises = [];
const queue = new PQueue({ concurrency: 50 });
for (const entry of source) {
if (entry.content === null) {
continue;
}
const task = (async () => {
await queue.add(async () => {
uploadLogger.silly("SOURCE_IMPORT_STARTED", {
path: entry.path,
size: queue.size,
});
let createdFile;
if (
(entry.type === "import" && entry.content !== null) ||
entry.content instanceof Readable
) {
let filehandle;
try {
if (entry.type === "import") {
filehandle = await open(path.resolve(entry.content), "r");
entry.content = filehandle.createReadStream();
}
createdFile = await heliaFs.addByteStream(entry.content);
} catch (err) {
if (typeof filehandle !== "undefined") {
await filehandle.close();
}
throw err;
}
if (typeof filehandle !== "undefined") {
await filehandle.close();
}
} else if (entry.content !== null) {
createdFile = await heliaFs.addBytes(entry.content);
} else {
return;
}
createdFiles.set(entry.path, createdFile);
uploadLogger.verbose("SOURCE_IMPORT_COMPLETED", {
path: entry.path,
size: queue.size,
});
});
})();
if (queue.size > 150) {
while (queue.size > 100) {
await once(queue, "next");
}
}
createFilePromises.push(task);
uploadLogger.verbose("SOURCE_IMPORT_QUEUED", {
path: entry.path,
size: queue.size,
});
}
await Promise.all(createFilePromises);
uploadLogger.verbose("UNIXFS_ADDED", {
count: createdFiles.size,
});

for (let sourceEntry of source) {
sourceEntry.path =
sourceEntry.path[0] === "/"
? `/${uploadUUID}${sourceEntry.path}`
: `/${uploadUUID}/${sourceEntry.path}`;
const heliaMfs = mfs({
blockstore: temporaryBlockstore,
datastore: temporaryDatastore,
});
uploadLogger.verbose("MFS_ADDING", {
count: source.length,
output: temporaryCarFilePath,
});
for (const entry of source) {
if (entry.content === null) {
uploadLogger.silly("MFS_DIR_CREATING", {
path: entry.path,
});
await heliaMfs.mkdir(entry.path);
uploadLogger.verbose("MFS_DIR_CREATED", {
path: entry.path,
});
} else {
const entryFile = createdFiles.get(entry.path);
uploadLogger.silly("MFS_FILE_COPY", {
cid: entryFile,
path: entry.path,
});
await heliaMfs.cp(entryFile, entry.path, {
force: true,
});
uploadLogger.verbose("MFS_FILE_COPIED", {
cid: entryFile,
path: entry.path,
});
}
}
for await (const entry of heliaFs.addAll(source)) {
parsedEntries[entry.path] = entry;
for (const entry of source) {
parsedEntries[entry.path] = await heliaMfs.stat(entry.path);
uploadLogger.silly("MFS_PATH_STAT", parsedEntries[entry.path]);
}
const rootEntry = parsedEntries[uploadUUID];
parsedEntries["/"] = await heliaMfs.stat("/");
const rootEntry = parsedEntries["/"];
uploadLogger.verbose("MFS_ADDED", {
root: rootEntry,
count: Object.keys(parsedEntries).length,
});

// Get carFile stream here
uploadLogger.verbose("CAR_EXPORTING", {
root: rootEntry,
});
const carExporter = car({ blockstore: temporaryBlockstore }),
{ writer, out } = CarWriter.create([rootEntry.cid]);

// Put carFile stream to disk
const output = createWriteStream(temporaryCarFilePath);
Readable.from(out).pipe(output);
await carExporter.export(rootEntry.cid, writer);
uploadLogger.verbose("CAR_EXPORTED", {
root: rootEntry,
});

// Set Uploader to Read from carFile on disk
uploadOptions.params.Body = createReadStream(temporaryCarFilePath);

// Upload carFile via S3
uploadLogger.verbose("CAR_UPLOADING", {
entry: rootEntry,
source: temporaryCarFilePath,
});
const parallelUploads3 = new Upload(uploadOptions);
parallelUploads3.on("httpUploadProgress", (progress) => {
uploadLogger.debug("CAR_UPLOAD_PROGRESS", progress);
});
await parallelUploads3.done();
uploadLogger.verbose("CAR_UPLOADED", {
entry: rootEntry,
source: temporaryCarFilePath,
});
await temporaryBlockstore.close();
} catch (err) {
console.error(err.message);
throw err;
} finally {
if (typeof temporaryBlockstoreDir !== "undefined") {
// Delete Temporary Blockstore
Expand Down Expand Up @@ -292,7 +419,7 @@ class ObjectManager {
/**
* @summary Downloads an object from the specified bucket using the provided key.
* @param {string} key - The key of the object to be downloaded.
* @param {objectOptions} [options] - The options for downloading the object..
* @param {objectOptions} [options] - The options for downloading the object.
* @returns {Promise<Object>} - A promise that resolves with the contents of the downloaded object as a Stream.
* @example
* // Download object with name of `download-object-example`
Expand Down Expand Up @@ -410,7 +537,7 @@ class ObjectManager {
* @returns {Promise<Boolean>} - A Promise that resolves with the result of the copy operation.
* @example
* // Copy object `copy-object-test` from `copy-object-test-pass-src` to `copy-object-test-pass-dest`
* // TIP: Set bucket on constructor and it will be used as the default source for copying objects.
* // TIP: Set bucket on constructor, it will be used as the default source for copying objects.
* await objectManager.copy(`copy-object-test`, `copy-object-dest`, {
* sourceBucket: `copy-object-src`
* });
Expand All @@ -437,4 +564,9 @@ class ObjectManager {
}
}

// Function to count slashes in a path
function countSlashes(path) {
return (path.match(/\//g) || []).length;
}

export default ObjectManager;
18 changes: 13 additions & 5 deletions tsup.config.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
import { defineConfig } from 'tsup'
import { defineConfig } from "tsup";

export default defineConfig({
entry: ['src/index.js'],
entry: ["src/index.js"],
splitting: false,
sourcemap: false,
noExternal: ['@ipld/car', '@helia/car', '@helia/unixfs', 'blockstore-fs'],
noExternal: [
"@ipld/car",
"@helia/car",
"@helia/unixfs",
"@helia/mfs",
"blockstore-fs",
"datastore-core",
"p-queue",
],
dts: true,
format: ['cjs'],
format: ["cjs"],
clean: true,
})
});
Loading

0 comments on commit 1f47935

Please sign in to comment.