Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Windows Ulimits #7

Merged
merged 17 commits into from
Mar 18, 2024
Merged
10 changes: 7 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@filebase/sdk",
"version": "1.0.3",
"version": "1.0.4",
"description": "SDK for Interacting with Filebase Services [S3(Buckets, Objects), IPFS(Gateways, Pins) IPNS(Names)]",
"repository": {
"type": "git",
Expand Down Expand Up @@ -51,10 +51,14 @@
"@aws-sdk/client-s3": "3.478.0",
"@aws-sdk/lib-storage": "3.478.0",
"@helia/car": "1.0.4",
"@helia/mfs": "3.0.1",
"@helia/unixfs": "1.4.3",
"@ipld/car": "5.2.4",
"axios": "1.6.2",
"blockstore-fs": "1.1.8",
"uuid": "9.0.1"
"blockstore-fs": "1.1.10",
"datastore-core": "9.2.9",
"p-queue": "8.0.1",
"uuid": "9.0.1",
"winston": "3.12.0"
}
}
10 changes: 10 additions & 0 deletions src/logger.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import winston from "winston";
const { combine, timestamp, json } = winston.format;

const logger = winston.createLogger({
level: process.env.LOG_LEVEL || "info",
format: combine(timestamp(), json()),
transports: [new winston.transports.Console()],
});

export default logger;
156 changes: 144 additions & 12 deletions src/objectManager.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// Environment Imports
import logger from "./logger.js";
// S3 Imports
import {
CopyObjectCommand,
Expand All @@ -11,16 +13,20 @@ import { Upload } from "@aws-sdk/lib-storage";
// Helia Imports
import { CarWriter } from "@ipld/car";
import { car } from "@helia/car";
import { mfs } from "@helia/mfs";
import { unixfs } from "@helia/unixfs";
import { FsBlockstore } from "blockstore-fs";
import { MemoryDatastore } from "datastore-core";
// Utility Imports
import { once } from "node:events";
import { createReadStream, createWriteStream } from "node:fs";
import { mkdir, rm } from "node:fs/promises";
import { mkdir, rm, open } from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { Readable } from "node:stream";
import { v4 as uuidv4 } from "uuid";
import { downloadFromGateway } from "./helpers.js";
import PQueue from "p-queue";

/** Interacts with an S3 client to perform various operations on objects in a bucket. */
class ObjectManager {
Expand Down Expand Up @@ -149,6 +155,7 @@ class ObjectManager {
async upload(key, source, metadata, options) {
// Generate Upload UUID
const uploadUUID = uuidv4();
const uploadLogger = logger.child({ uploadUUID });

// Setup Upload Options
const bucket = options?.bucket || this.#defaultBucket,
Expand All @@ -172,6 +179,9 @@ class ObjectManager {
...uploadOptions.params.Metadata,
import: "car",
};
source.sort((a, b) => {
return countSlashes(b.path) - countSlashes(a.path);
});

let temporaryCarFilePath, temporaryBlockstoreDir;
try {
Expand All @@ -184,39 +194,156 @@ class ObjectManager {
);
temporaryCarFilePath = `${temporaryBlockstoreDir}/main.car`;
await mkdir(temporaryBlockstoreDir, { recursive: true });
const temporaryBlockstore = new FsBlockstore(temporaryBlockstoreDir);
const temporaryBlockstore = new FsBlockstore(temporaryBlockstoreDir),
temporaryDatastore = new MemoryDatastore();

let createdFiles = new Map();
const heliaFs = unixfs({
blockstore: temporaryBlockstore,
});
uploadLogger.verbose("UNIXFS_ADD", {
count: source.length,
});
let createFilePromises = [];
const queue = new PQueue({ concurrency: 50 });
for (const entry of source) {
if (entry.content === null) {
continue;
}
const task = (async () => {
await queue.add(async () => {
uploadLogger.silly("SOURCE_IMPORT_STARTED", {
path: entry.path,
size: queue.size,
});
let createdFile;
if (
(entry.type === "import" && entry.content !== null) ||
entry.content instanceof Readable
) {
let filehandle;
try {
if (entry.type === "import") {
filehandle = await open(path.resolve(entry.content), "r");
entry.content = filehandle.createReadStream();
}
createdFile = await heliaFs.addByteStream(entry.content);
} catch (err) {
if (typeof filehandle !== "undefined") {
await filehandle.close();
}
throw err;
}
if (typeof filehandle !== "undefined") {
await filehandle.close();
}
} else if (entry.content !== null) {
createdFile = await heliaFs.addBytes(entry.content);
} else {
return;
}
createdFiles.set(entry.path, createdFile);
uploadLogger.verbose("SOURCE_IMPORT_COMPLETED", {
path: entry.path,
size: queue.size,
});
});
})();
if (queue.size > 150) {
while (queue.size > 100) {
await once(queue, "next");
}
}
createFilePromises.push(task);
uploadLogger.verbose("SOURCE_IMPORT_QUEUED", {
path: entry.path,
size: queue.size,
});
}
await Promise.all(createFilePromises);
uploadLogger.verbose("UNIXFS_ADDED", {
count: createdFiles.size,
});

for (let sourceEntry of source) {
sourceEntry.path =
sourceEntry.path[0] === "/"
? `/${uploadUUID}${sourceEntry.path}`
: `/${uploadUUID}/${sourceEntry.path}`;
const heliaMfs = mfs({
blockstore: temporaryBlockstore,
datastore: temporaryDatastore,
});
uploadLogger.verbose("MFS_ADDING", {
count: source.length,
output: temporaryCarFilePath,
});
for (const entry of source) {
if (entry.content === null) {
uploadLogger.silly("MFS_DIR_CREATING", {
path: entry.path,
});
await heliaMfs.mkdir(entry.path);
uploadLogger.verbose("MFS_DIR_CREATED", {
path: entry.path,
});
} else {
const entryFile = createdFiles.get(entry.path);
uploadLogger.silly("MFS_FILE_COPY", {
cid: entryFile,
path: entry.path,
});
await heliaMfs.cp(entryFile, entry.path, {
force: true,
});
uploadLogger.verbose("MFS_FILE_COPIED", {
cid: entryFile,
path: entry.path,
});
}
}
for await (const entry of heliaFs.addAll(source)) {
parsedEntries[entry.path] = entry;
for (const entry of source) {
parsedEntries[entry.path] = await heliaMfs.stat(entry.path);
uploadLogger.silly("MFS_PATH_STAT", parsedEntries[entry.path]);
}
const rootEntry = parsedEntries[uploadUUID];
parsedEntries["/"] = await heliaMfs.stat("/");
const rootEntry = parsedEntries["/"];
uploadLogger.verbose("MFS_ADDED", {
root: rootEntry,
count: Object.keys(parsedEntries).length,
});

// Get carFile stream here
uploadLogger.verbose("CAR_EXPORTING", {
root: rootEntry,
});
const carExporter = car({ blockstore: temporaryBlockstore }),
{ writer, out } = CarWriter.create([rootEntry.cid]);

// Put carFile stream to disk
const output = createWriteStream(temporaryCarFilePath);
Readable.from(out).pipe(output);
await carExporter.export(rootEntry.cid, writer);
uploadLogger.verbose("CAR_EXPORTED", {
root: rootEntry,
});

// Set Uploader to Read from carFile on disk
uploadOptions.params.Body = createReadStream(temporaryCarFilePath);

// Upload carFile via S3
uploadLogger.verbose("CAR_UPLOADING", {
entry: rootEntry,
source: temporaryCarFilePath,
});
const parallelUploads3 = new Upload(uploadOptions);
parallelUploads3.on("httpUploadProgress", (progress) => {
uploadLogger.debug("CAR_UPLOAD_PROGRESS", progress);
});
await parallelUploads3.done();
uploadLogger.verbose("CAR_UPLOADED", {
entry: rootEntry,
source: temporaryCarFilePath,
});
await temporaryBlockstore.close();
} catch (err) {
console.error(err.message);
throw err;
} finally {
if (typeof temporaryBlockstoreDir !== "undefined") {
// Delete Temporary Blockstore
Expand Down Expand Up @@ -292,7 +419,7 @@ class ObjectManager {
/**
* @summary Downloads an object from the specified bucket using the provided key.
* @param {string} key - The key of the object to be downloaded.
* @param {objectOptions} [options] - The options for downloading the object..
* @param {objectOptions} [options] - The options for downloading the object.
* @returns {Promise<Object>} - A promise that resolves with the contents of the downloaded object as a Stream.
* @example
* // Download object with name of `download-object-example`
Expand Down Expand Up @@ -410,7 +537,7 @@ class ObjectManager {
* @returns {Promise<Boolean>} - A Promise that resolves with the result of the copy operation.
* @example
* // Copy object `copy-object-test` from `copy-object-test-pass-src` to `copy-object-test-pass-dest`
* // TIP: Set bucket on constructor and it will be used as the default source for copying objects.
* // TIP: Set bucket on constructor, it will be used as the default source for copying objects.
* await objectManager.copy(`copy-object-test`, `copy-object-dest`, {
* sourceBucket: `copy-object-src`
* });
Expand All @@ -437,4 +564,9 @@ class ObjectManager {
}
}

// Function to count slashes in a path
function countSlashes(path) {
return (path.match(/\//g) || []).length;
}

export default ObjectManager;
18 changes: 13 additions & 5 deletions tsup.config.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
import { defineConfig } from 'tsup'
import { defineConfig } from "tsup";

export default defineConfig({
entry: ['src/index.js'],
entry: ["src/index.js"],
splitting: false,
sourcemap: false,
noExternal: ['@ipld/car', '@helia/car', '@helia/unixfs', 'blockstore-fs'],
noExternal: [
"@ipld/car",
"@helia/car",
"@helia/unixfs",
"@helia/mfs",
"blockstore-fs",
"datastore-core",
"p-queue",
],
dts: true,
format: ['cjs'],
format: ["cjs"],
clean: true,
})
});
Loading
Loading