Skip to content

Commit

Permalink
fix: dag traversal on directory cars
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Mar 11, 2022
1 parent 4eecb7b commit 406a735
Show file tree
Hide file tree
Showing 9 changed files with 168 additions and 74 deletions.
33 changes: 15 additions & 18 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"@ipld/dag-cbor": "^7.0.1",
"@ipld/dag-pb": "^2.1.16",
"aws-sdk": "^2.1084.0",
"carbites": "^1.0.6",
"dotenv": "^10.0.0",
"multiformats": "^9.6.4",
"pino": "^7.6.2"
Expand All @@ -30,7 +31,6 @@
"@web-std/blob": "^3.0.1",
"ava": "^3.15.0",
"aws-sdk-mock": "^5.5.0",
"carbites": "^1.0.6",
"debug": "^4.3.3",
"dotenv": "^10.0.0",
"ipfs-car": "^0.6.1",
Expand Down
65 changes: 57 additions & 8 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
'use strict'

const aws = require('aws-sdk')
const { TreewalkCarJoiner } = require('carbites')
const { CarReader } = require('@ipld/car')
const pMap = require('p-map')

const { S3Client } = require ('./s3-client.js')
const { Logger } = require('./logging')
const { carStat, inspectCarBlocks } = require('./car')
const { collectBytes } = require('./utils')

// TODO: Refine MAX_SIZE_TO_ATTEMPT - currently 100MB
const MAX_SIZE_TO_ATTEMPT = 100 * 1024 * 1024
Expand Down Expand Up @@ -35,12 +38,11 @@ async function main(event) {

const { body, metadata } = await s3Client.getObject(bucket, key)
// @ts-ignore body has different type from AWS SDK
const { rootCid, structure, size } = await inspectCar(body, metadata, logger, { bucket, key })
const { rootCid, structure, size } = await inspectCar(body, logger, { bucket, key, metadata })
const completePath = `complete/${rootCid}.car`

// Written CAR is already complete
if (structure === 'Complete') {
console.log('structure is complete')
await s3Client.putObject(bucket, completePath, body)

return { rootCid, structure }
Expand All @@ -63,8 +65,7 @@ async function main(event) {
return { rootCid, structure }
}

console.log('get directory')
const { accumSize } = await s3Client.getDirectoryStat(bucket, key)
const { accumSize, carKeys } = await s3Client.getDirectoryStat(bucket, key)

if (size > accumSize) {
logger.info(
Expand All @@ -75,16 +76,63 @@ async function main(event) {
}

// Attempt to traverse the full dag
return { rootCid, structure, directoryStructure: 'Complete' }
const directoryCars = await pMap(carKeys, async directoryKey => {
let car
if (directoryKey === key) {
// Written Car does not need new get
car = body
} else {
const { body } = await s3Client.getObject(bucket, directoryKey)
car = body
}
return car
})
// @ts-ignore body has different type from AWS SDK
const { structure: directoryStructure, directoryCar } = await inspectDirectoryCars(directoryCars, logger, { key, bucket })

if (directoryStructure === 'Complete') {
await s3Client.putObject(bucket, completePath, directoryCar)
}

// TODO: MARK CAR AS TRANSFORMED WITH METADATA?

return { rootCid, structure, directoryStructure }
}

/**
* @param {Uint8Array[]} cars
* @param {Logger} logger
* @param {S3Inputs} s3Inputs
*/
async function inspectDirectoryCars (cars, logger, { key, bucket }) {
let directoryCar

try {
// Create car file with all CARs stored
const carReaders = await Promise.all(cars.map(c => CarReader.fromBytes(c)))
const joiner = new TreewalkCarJoiner(carReaders)
directoryCar = await collectBytes(joiner.car())
} catch (err) {
logger.error(
err,
{
complementMessage: `Error parsing Joined CAR with ${key} from bucket ${bucket}: `
}
)
throw err
}

// Get current structure of all CAR files from directory
const { structure } = await inspectCar(directoryCar, logger, { bucket, key })
return { structure, directoryCar }
}

/**
* @param {Uint8Array} car
* @param {Object} metadata
* @param {Logger} logger
* @param {S3Inputs} s3Inputs
*/
async function inspectCar(car, metadata, logger, { key, bucket }) {
async function inspectCar(car, logger, { key, bucket, metadata = {} }) {
let rootCid, structure, size
try {
const stat = await carStat(car)
Expand Down Expand Up @@ -125,6 +173,7 @@ async function inspectCar(car, metadata, logger, { key, bucket }) {
* @typedef S3Inputs
* @property {string} bucket
* @property {string} key
* @property {Object} [metadata]
*/

exports.handler = main
6 changes: 4 additions & 2 deletions src/s3-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class S3Client {
async getDirectoryStat(bucket, key) {
const prefix = key.replace(/[^\/]*$/, '').slice(0, -1)

let accumSize
let accumSize, carKeys = []
try {
this.logger.debug(
key,
Expand All @@ -95,6 +95,7 @@ class S3Client {
}).promise()

accumSize = s3ListObjects.Contents.reduce((acc, obj) => acc + obj.Size, 0)
carKeys = s3ListObjects.Contents.map(obj => obj.Key)
} catch (err) {
this.logger.error(
err,
Expand All @@ -106,7 +107,8 @@ class S3Client {
}

return {
accumSize
accumSize,
carKeys
}
}
}
Expand Down
27 changes: 27 additions & 0 deletions src/utils.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
'use strict'

/**
* @template T
* @param {AsyncIterable<T>} iterable
* @returns {Promise<Array<T>>}
*/
async function collect(iterable) {
const chunks = []
for await (const chunk of iterable) {
chunks.push(chunk)
}
return chunks
}

/**
* @param {AsyncIterable<Uint8Array>} iterable
* @returns {Promise<Uint8Array>}
*/
async function collectBytes(iterable) {
const chunks = await collect(iterable)
return new Uint8Array([].concat(...chunks.map(c => Array.from(c))))
}

module.exports = {
collectBytes
}
5 changes: 0 additions & 5 deletions test/fixtures/larger-content.js

This file was deleted.

Loading

0 comments on commit 406a735

Please sign in to comment.