Skip to content

Commit

Permalink
Merge and fix packages
Browse files Browse the repository at this point in the history
  • Loading branch information
jtsmedley committed Mar 7, 2024
1 parent 08bb0fd commit 37f8922
Show file tree
Hide file tree
Showing 3 changed files with 319 additions and 33 deletions.
4 changes: 4 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,21 @@
"clean-jsdoc-theme": "4.2.17",
"jsdoc": "4.0.2",
"prettier": "3.1.0",
"recursive-fs": "2.1.0",
"tsup": "8.0.1",
"typescript": "5.3.3"
},
"dependencies": {
"@aws-sdk/client-s3": "3.478.0",
"@aws-sdk/lib-storage": "3.478.0",
"@helia/car": "1.0.4",
"@helia/mfs": "3.0.1",
"@helia/unixfs": "1.4.3",
"@ipld/car": "5.2.4",
"axios": "1.6.2",
"blockstore-fs": "1.1.8",
"datastore-core": "9.2.9",
"p-queue": "8.0.1",
"uuid": "9.0.1"
}
}
62 changes: 40 additions & 22 deletions src/objectManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,19 @@ import { Upload } from "@aws-sdk/lib-storage";
// Helia Imports
import { CarWriter } from "@ipld/car";
import { car } from "@helia/car";
import { mfs } from "@helia/mfs";
import { unixfs } from "@helia/unixfs";
import { FsBlockstore } from "blockstore-fs";
import { MemoryDatastore } from "datastore-core";
// Utility Imports
import { createReadStream, createWriteStream, ReadStream } from "node:fs";
import { mkdir, rm } from "node:fs/promises";
import { mkdir, rm, open } from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { Readable } from "node:stream";
import { v4 as uuidv4 } from "uuid";
import { downloadFromGateway } from "./helpers.js";
import PQueue from "p-queue";

/** Interacts with an S3 client to perform various operations on objects in a bucket. */
class ObjectManager {
Expand Down Expand Up @@ -184,32 +187,47 @@ class ObjectManager {
);
temporaryCarFilePath = `${temporaryBlockstoreDir}/main.car`;
await mkdir(temporaryBlockstoreDir, { recursive: true });
const temporaryBlockstore = new FsBlockstore(temporaryBlockstoreDir);
const temporaryBlockstore = new FsBlockstore(temporaryBlockstoreDir),
temporaryDatastore = new MemoryDatastore();

const heliaFs = unixfs({
blockstore: temporaryBlockstore,
});

const fileHandlers = new Map();
for (let sourceEntry of source) {
sourceEntry.path =
sourceEntry.path[0] === "/"
? `/${uploadUUID}${sourceEntry.path}`
: `/${uploadUUID}/${sourceEntry.path}`;
if (sourceEntry.content instanceof ReadStream) {
fileHandlers.set(sourceEntry.path, sourceEntry.content);
}
blockstore: temporaryBlockstore,
datastore: temporaryDatastore,
}),
heliaMfs = mfs({
blockstore: temporaryBlockstore,
datastore: temporaryDatastore,
});
const queue = new PQueue({ concurrency: os.cpus().length });
let parsePromises = [];
for (const entry of source) {
parsePromises.push(
(async () => {
let fileHandle;
try {
await queue.add(async () => {
if (entry.type === "import") {
fileHandle = await open(entry.content);
entry.content = await fileHandle.createReadStream();
}
parsedEntries[entry.path] = await heliaFs.addFile({
path: entry.path,
content: entry.content,
});
});
} finally {
if (typeof fileHandle !== "undefined") {
await fileHandle.close();
}
}
})(),
);
}
for await (const entry of heliaFs.addAll(source)) {
if (fileHandlers.has(entry.path)) {
fileHandlers.get(entry.path).destroy();
fileHandlers.delete(entry.path);
}
parsedEntries[entry.path] = entry;
}
const rootEntry = parsedEntries[uploadUUID];
await Promise.all(parsePromises);
parsedEntries["/"] = await heliaMfs.stat("/");

// Get carFile stream here
const rootEntry = parsedEntries["/"];
const carExporter = car({ blockstore: temporaryBlockstore }),
{ writer, out } = CarWriter.create([rootEntry.cid]);

Expand Down
Loading

0 comments on commit 37f8922

Please sign in to comment.