From 406a735c5616756da9d21629db8de2612420cc11 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Fri, 11 Mar 2022 16:34:02 +0100 Subject: [PATCH] fix: dag traversal on directory cars --- package-lock.json | 33 ++++++++--------- package.json | 2 +- src/index.js | 65 +++++++++++++++++++++++++++++---- src/s3-client.js | 6 ++- src/utils.js | 27 ++++++++++++++ test/fixtures/larger-content.js | 5 --- test/index.node.js | 65 +++++++++++++++------------------ test/utils/cars.js | 20 +++++++--- test/utils/s3.js | 19 ++++++++++ 9 files changed, 168 insertions(+), 74 deletions(-) create mode 100644 src/utils.js delete mode 100644 test/fixtures/larger-content.js create mode 100644 test/utils/s3.js diff --git a/package-lock.json b/package-lock.json index 454c44f..ba52236 100644 --- a/package-lock.json +++ b/package-lock.json @@ -13,6 +13,7 @@ "@ipld/dag-cbor": "^7.0.1", "@ipld/dag-pb": "^2.1.16", "aws-sdk": "^2.1084.0", + "carbites": "^1.0.6", "dotenv": "^10.0.0", "multiformats": "^9.6.4", "pino": "^7.6.2" @@ -21,7 +22,6 @@ "@web-std/blob": "^3.0.1", "ava": "^3.15.0", "aws-sdk-mock": "^5.5.0", - "carbites": "^1.0.6", "debug": "^4.3.3", "dotenv": "^10.0.0", "ipfs-car": "^0.6.1", @@ -29,7 +29,8 @@ "p-map": "^4.0.0", "pino-pretty": "^7.3.0", "prettier": "^2.4.1", - "sade": "^1.8.1" + "sade": "^1.8.1", + "uint8arrays": "^3.0.0" } }, "node_modules/@assemblyscript/loader": { @@ -704,9 +705,9 @@ } }, "node_modules/aws-sdk": { - "version": "2.1086.0", - "resolved": "https://registry.npmjs.org/aws-sdk/-/aws-sdk-2.1086.0.tgz", - "integrity": "sha512-iQb9UpvaJphZSJrNccN8xA3rQBG7mg29Qvgt2VG4XnUM4cwD/i4+gJ3V/rSmM4rzAWZMbTk04lal+KBn7ByNyw==", + "version": "2.1091.0", + "resolved": "https://registry.npmjs.org/aws-sdk/-/aws-sdk-2.1091.0.tgz", + "integrity": "sha512-DSidFjHdZZopF2kBt6I5wlkxeV5xhXWWXtLtZVQw9g9RQxFnmm+B3iBf1mQtw9fPspfyKyQwnBUNnvMtPku3Yw==", "dependencies": { "buffer": "4.9.2", "events": "1.1.1", @@ -1037,7 +1038,6 @@ "version": "1.0.6", "resolved": "https://registry.npmjs.org/carbites/-/carbites-1.0.6.tgz", "integrity": "sha512-dS9IQvnrb5VIRvSTNz5Ff+mB9d2MFfi5mojtJi7Rlss79VeF190jr0sZdA7eW0CGHotvHkZaWuM6wgfD9PEFRg==", - "dev": true, "dependencies": { "@ipld/car": "^3.0.1", "@ipld/dag-cbor": "^6.0.3", @@ -1049,7 +1049,6 @@ "version": "6.0.15", "resolved": "https://registry.npmjs.org/@ipld/dag-cbor/-/dag-cbor-6.0.15.tgz", "integrity": "sha512-Vm3VTSTwlmGV92a3C5aeY+r2A18zbH2amehNhsX8PBa3muXICaWrN8Uri85A5hLH7D7ElhE8PdjxD6kNqUmTZA==", - "dev": true, "dependencies": { "cborg": "^1.5.4", "multiformats": "^9.5.4" @@ -4143,9 +4142,9 @@ } }, "node_modules/pino": { - "version": "7.8.0", - "resolved": "https://registry.npmjs.org/pino/-/pino-7.8.0.tgz", - "integrity": "sha512-Ynw2HRVapiyj+ZGfUcpms+SRgDKFIy0ztaFUf3X6IHh+vsysMvn+tpV/Ej3gyutPp4n9tgH6ZkkCAelSvP5zmQ==", + "version": "7.8.1", + "resolved": "https://registry.npmjs.org/pino/-/pino-7.8.1.tgz", + "integrity": "sha512-G0AVnaJBBtbdOpZ3z0/QD3R57BWwjVo4K7e+c5mHKjNCYIY1FIKuNlWjVJfCVQ4Bq6iN/yAAh5OCeeTI7OXosA==", "dependencies": { "fast-redact": "^3.0.0", "on-exit-leak-free": "^0.2.0", @@ -6314,9 +6313,9 @@ "dev": true }, "aws-sdk": { - "version": "2.1086.0", - "resolved": "https://registry.npmjs.org/aws-sdk/-/aws-sdk-2.1086.0.tgz", - "integrity": "sha512-iQb9UpvaJphZSJrNccN8xA3rQBG7mg29Qvgt2VG4XnUM4cwD/i4+gJ3V/rSmM4rzAWZMbTk04lal+KBn7ByNyw==", + "version": "2.1091.0", + "resolved": "https://registry.npmjs.org/aws-sdk/-/aws-sdk-2.1091.0.tgz", + "integrity": "sha512-DSidFjHdZZopF2kBt6I5wlkxeV5xhXWWXtLtZVQw9g9RQxFnmm+B3iBf1mQtw9fPspfyKyQwnBUNnvMtPku3Yw==", "requires": { "buffer": "4.9.2", "events": "1.1.1", @@ -6557,7 +6556,6 @@ "version": "1.0.6", "resolved": "https://registry.npmjs.org/carbites/-/carbites-1.0.6.tgz", "integrity": "sha512-dS9IQvnrb5VIRvSTNz5Ff+mB9d2MFfi5mojtJi7Rlss79VeF190jr0sZdA7eW0CGHotvHkZaWuM6wgfD9PEFRg==", - "dev": true, "requires": { "@ipld/car": "^3.0.1", "@ipld/dag-cbor": "^6.0.3", @@ -6569,7 +6567,6 @@ "version": "6.0.15", "resolved": "https://registry.npmjs.org/@ipld/dag-cbor/-/dag-cbor-6.0.15.tgz", "integrity": "sha512-Vm3VTSTwlmGV92a3C5aeY+r2A18zbH2amehNhsX8PBa3muXICaWrN8Uri85A5hLH7D7ElhE8PdjxD6kNqUmTZA==", - "dev": true, "requires": { "cborg": "^1.5.4", "multiformats": "^9.5.4" @@ -8912,9 +8909,9 @@ "dev": true }, "pino": { - "version": "7.8.0", - "resolved": "https://registry.npmjs.org/pino/-/pino-7.8.0.tgz", - "integrity": "sha512-Ynw2HRVapiyj+ZGfUcpms+SRgDKFIy0ztaFUf3X6IHh+vsysMvn+tpV/Ej3gyutPp4n9tgH6ZkkCAelSvP5zmQ==", + "version": "7.8.1", + "resolved": "https://registry.npmjs.org/pino/-/pino-7.8.1.tgz", + "integrity": "sha512-G0AVnaJBBtbdOpZ3z0/QD3R57BWwjVo4K7e+c5mHKjNCYIY1FIKuNlWjVJfCVQ4Bq6iN/yAAh5OCeeTI7OXosA==", "requires": { "fast-redact": "^3.0.0", "on-exit-leak-free": "^0.2.0", diff --git a/package.json b/package.json index eecb13c..44ba754 100644 --- a/package.json +++ b/package.json @@ -22,6 +22,7 @@ "@ipld/dag-cbor": "^7.0.1", "@ipld/dag-pb": "^2.1.16", "aws-sdk": "^2.1084.0", + "carbites": "^1.0.6", "dotenv": "^10.0.0", "multiformats": "^9.6.4", "pino": "^7.6.2" @@ -30,7 +31,6 @@ "@web-std/blob": "^3.0.1", "ava": "^3.15.0", "aws-sdk-mock": "^5.5.0", - "carbites": "^1.0.6", "debug": "^4.3.3", "dotenv": "^10.0.0", "ipfs-car": "^0.6.1", diff --git a/src/index.js b/src/index.js index 8da0c07..1dcec47 100644 --- a/src/index.js +++ b/src/index.js @@ -1,10 +1,13 @@ 'use strict' -const aws = require('aws-sdk') +const { TreewalkCarJoiner } = require('carbites') +const { CarReader } = require('@ipld/car') +const pMap = require('p-map') const { S3Client } = require ('./s3-client.js') const { Logger } = require('./logging') const { carStat, inspectCarBlocks } = require('./car') +const { collectBytes } = require('./utils') // TODO: Refine MAX_SIZE_TO_ATTEMPT - currently 100MB const MAX_SIZE_TO_ATTEMPT = 100 * 1024 * 1024 @@ -35,12 +38,11 @@ async function main(event) { const { body, metadata } = await s3Client.getObject(bucket, key) // @ts-ignore body has different type from AWS SDK - const { rootCid, structure, size } = await inspectCar(body, metadata, logger, { bucket, key }) + const { rootCid, structure, size } = await inspectCar(body, logger, { bucket, key, metadata }) const completePath = `complete/${rootCid}.car` // Written CAR is already complete if (structure === 'Complete') { - console.log('structure is complete') await s3Client.putObject(bucket, completePath, body) return { rootCid, structure } @@ -63,8 +65,7 @@ async function main(event) { return { rootCid, structure } } - console.log('get directory') - const { accumSize } = await s3Client.getDirectoryStat(bucket, key) + const { accumSize, carKeys } = await s3Client.getDirectoryStat(bucket, key) if (size > accumSize) { logger.info( @@ -75,16 +76,63 @@ async function main(event) { } // Attempt to traverse the full dag - return { rootCid, structure, directoryStructure: 'Complete' } + const directoryCars = await pMap(carKeys, async directoryKey => { + let car + if (directoryKey === key) { + // Written Car does not need new get + car = body + } else { + const { body } = await s3Client.getObject(bucket, directoryKey) + car = body + } + return car + }) + // @ts-ignore body has different type from AWS SDK + const { structure: directoryStructure, directoryCar } = await inspectDirectoryCars(directoryCars, logger, { key, bucket }) + + if (directoryStructure === 'Complete') { + await s3Client.putObject(bucket, completePath, directoryCar) + } + + // TODO: MARK CAR AS TRANSFORMED WITH METADATA? + + return { rootCid, structure, directoryStructure } +} + +/** + * @param {Uint8Array[]} cars + * @param {Logger} logger + * @param {S3Inputs} s3Inputs + */ +async function inspectDirectoryCars (cars, logger, { key, bucket }) { + let directoryCar + + try { + // Create car file with all CARs stored + const carReaders = await Promise.all(cars.map(c => CarReader.fromBytes(c))) + const joiner = new TreewalkCarJoiner(carReaders) + directoryCar = await collectBytes(joiner.car()) + } catch (err) { + logger.error( + err, + { + complementMessage: `Error parsing Joined CAR with ${key} from bucket ${bucket}: ` + } + ) + throw err + } + + // Get current structure of all CAR files from directory + const { structure } = await inspectCar(directoryCar, logger, { bucket, key }) + return { structure, directoryCar } } /** * @param {Uint8Array} car - * @param {Object} metadata * @param {Logger} logger * @param {S3Inputs} s3Inputs */ -async function inspectCar(car, metadata, logger, { key, bucket }) { +async function inspectCar(car, logger, { key, bucket, metadata = {} }) { let rootCid, structure, size try { const stat = await carStat(car) @@ -125,6 +173,7 @@ async function inspectCar(car, metadata, logger, { key, bucket }) { * @typedef S3Inputs * @property {string} bucket * @property {string} key + * @property {Object} [metadata] */ exports.handler = main diff --git a/src/s3-client.js b/src/s3-client.js index 810795b..d6a0a6d 100644 --- a/src/s3-client.js +++ b/src/s3-client.js @@ -83,7 +83,7 @@ class S3Client { async getDirectoryStat(bucket, key) { const prefix = key.replace(/[^\/]*$/, '').slice(0, -1) - let accumSize + let accumSize, carKeys = [] try { this.logger.debug( key, @@ -95,6 +95,7 @@ class S3Client { }).promise() accumSize = s3ListObjects.Contents.reduce((acc, obj) => acc + obj.Size, 0) + carKeys = s3ListObjects.Contents.map(obj => obj.Key) } catch (err) { this.logger.error( err, @@ -106,7 +107,8 @@ class S3Client { } return { - accumSize + accumSize, + carKeys } } } diff --git a/src/utils.js b/src/utils.js new file mode 100644 index 0000000..54bcbc1 --- /dev/null +++ b/src/utils.js @@ -0,0 +1,27 @@ +'use strict' + +/** + * @template T + * @param {AsyncIterable} iterable + * @returns {Promise>} + */ +async function collect(iterable) { + const chunks = [] + for await (const chunk of iterable) { + chunks.push(chunk) + } + return chunks +} + +/** + * @param {AsyncIterable} iterable + * @returns {Promise} + */ +async function collectBytes(iterable) { + const chunks = await collect(iterable) + return new Uint8Array([].concat(...chunks.map(c => Array.from(c)))) +} + +module.exports = { + collectBytes +} diff --git a/test/fixtures/larger-content.js b/test/fixtures/larger-content.js deleted file mode 100644 index f2d83ba..0000000 --- a/test/fixtures/larger-content.js +++ /dev/null @@ -1,5 +0,0 @@ -const largerContent = new Uint8Array([21, 31, 41, 51, 61, 71, 81, 91, 101, 111, 121, 131, 21, 31, 41, 51, 61, 71, 81, 91, 101, 111, 121, 131, 21, 31, 41, 51, 61, 71, 81, 91, 101, 111, 121, 131]) - -module.exports = { - largerContent -} diff --git a/test/index.node.js b/test/index.node.js index 177087d..84c969d 100644 --- a/test/index.node.js +++ b/test/index.node.js @@ -1,34 +1,11 @@ const { serial: test } = require('ava') const aws = require('aws-sdk-mock') -const { TreewalkCarSplitter } = require('carbites') -const { Blob } = require('@web-std/blob') -const { CarReader } = require('@ipld/car') -const { pack } = require('ipfs-car/pack') -const { sha256 } = require('multiformats/hashes/sha2') const pDefer = require('p-defer') -const pMap = require('p-map') -const { toString } = require('uint8arrays') const { handler } = require('../src/index') // lambda function const { generateCar, generateSplittedCar } = require('./utils/cars') -const { largerContent } = require('./fixtures/larger-content') - -const createS3PutEvent = ({ - bucketName = 'test-bucket', - objectKey = 'raw/bafybeif3hyrtm2skjmldufjid3myfp37sxyqbtz7xe3f2fqyd7ugi33b2a/102702852462097292/ciqg66cgf5pib7qhdczhhzlpb3z2dk5xeo4l6zlel7snh3j4v3g7l5i.car' -} = {}) => ({ - Records: [{ - s3: { - bucket: { - name: bucketName - }, - object: { - key: objectKey - } - } - }] -}) +const { createS3PutEvent } = require('./utils/s3') test.afterEach(t => { aws.restore('S3') @@ -137,7 +114,10 @@ test.skip('should not write dag cbor CAR file to complete as we wont know its si }) test('should not write complete CAR when dag pb incomplete CAR file written and not all CAR files already stored', async t => { - const { carFiles, root } = await generateSplittedCar(36, 90, { wrapWithDirectory: true }) + const { carFiles, root } = await generateSplittedCar(150, 130, { + wrapWithDirectory: true, + maxChunkSize: 100, + }) // Body with only first car aws.mock('S3', 'getObject', (params) => { @@ -146,6 +126,7 @@ test('should not write complete CAR when dag pb incomplete CAR file written and Body: carFiles[0].car, Metadata: {} }) } + return Promise.reject('invalid s3 object requestes') }) aws.mock('S3', 'listObjectsV2', { @@ -166,16 +147,18 @@ test('should not write complete CAR when dag pb incomplete CAR file written and t.deepEqual(structure, 'Partial') }) -test.only('dag pb incomplete CAR file with all CAR files already stored triggers gateway request', async t => { - const { carFiles, root } = await generateSplittedCar(36, 90, { wrapWithDirectory: true }) +test('dag pb incomplete CAR file written with all CAR files already stored triggers complete write', async t => { + const deferredWrite = pDefer() + const { carFiles, root, directoryCar } = await generateSplittedCar(150, 130, { + wrapWithDirectory: true, + maxChunkSize: 100, + }) - console.log('car files', carFiles) - // Write only second CAR aws.mock('S3', 'getObject', (params) => { - console.log('params', params) - if (params.Key === carFiles[0].key) { + const getObjectCar = carFiles.find((carFile) => carFile.key === params.Key) + if (getObjectCar) { return Promise.resolve({ - Body: carFiles[0].car, Metadata: {} + Body: getObjectCar.car, Metadata: {} }) } return Promise.reject('invalid s3 object requestes') @@ -186,15 +169,27 @@ test.only('dag pb incomplete CAR file with all CAR files already stored triggers Key: carFile.key, LastModified: new Date().toISOString(), ETag: '60faf8595327ae9c8a7f7ab099a5c9b0', - Size: 49, + Size: carFile.size, StorageClass: 'STANDARD' })) }) - const { rootCid, structure, directoryStructure } = await handler(createS3PutEvent({ objectKey: carFiles[0].key })) + aws.mock('S3', 'putObject', (params, cb) => { + t.deepEqual(params.Key, `complete/${root.toString()}.car`) + t.deepEqual(params.Body, directoryCar) + + deferredWrite.resolve() + cb(undefined, putObjectOutput) + }) + + // Write only last CAR + const { rootCid, structure, directoryStructure } = await handler(createS3PutEvent({ objectKey: carFiles[carFiles.length - 1].key })) t.deepEqual(root.toString(), rootCid.toString()) t.deepEqual(structure, 'Partial') - console.log('directoryStructure', directoryStructure) + t.deepEqual(directoryStructure, 'Complete') + + // Guarantee write happened + await deferredWrite.promise }) const putObjectOutput = { diff --git a/test/utils/cars.js b/test/utils/cars.js index a538450..7466bdb 100644 --- a/test/utils/cars.js +++ b/test/utils/cars.js @@ -3,11 +3,13 @@ const { toString } = require('uint8arrays') const { Blob } = require('@web-std/blob') const { sha256 } = require('multiformats/hashes/sha2') -const { TreewalkCarSplitter } = require('carbites') +const { TreewalkCarSplitter, TreewalkCarJoiner } = require('carbites') const { CarReader } = require('@ipld/car') const { pack } = require('ipfs-car/pack') const { packToBlob } = require('ipfs-car/pack/blob') +const { collectBytes } = require('../../src/utils') + /** * @param {number} length * @param {Object} [packOptions] @@ -16,7 +18,7 @@ async function generateCar (length, packOptions = {}) { const { root, car } = await packToBlob({ input: [{ path: 'file.txt', - content: generateUint8Array(length), // 2 + content: generateUint8Array(length), }], wrapWithDirectory: false, ...packOptions @@ -41,7 +43,7 @@ async function generateSplittedCar (length, targetSize, packOptions = {}) { const { root, out } = await pack({ input: [{ path: 'file.txt', - content: generateUint8Array(length), // 36 + content: generateUint8Array(length), }], wrapWithDirectory: true, ...packOptions @@ -65,13 +67,21 @@ async function generateSplittedCar (length, targetSize, packOptions = {}) { const multihash = await sha256.digest(car) return { car, - key: `raw/${root.toString()}/${toString(multihash.bytes, 'base32')}.car` + key: `raw/${root.toString()}/${toString(multihash.bytes, 'base32')}.car`, + size: car.byteLength } }) + // Create Join Car + const carReaders = await Promise.all(carFiles.map(cf => + CarReader.fromBytes(cf.car))) + const joiner = new TreewalkCarJoiner(carReaders) + const directoryCar = await collectBytes(joiner.car()) + return { root, - carFiles + carFiles, + directoryCar } } diff --git a/test/utils/s3.js b/test/utils/s3.js new file mode 100644 index 0000000..6ac863d --- /dev/null +++ b/test/utils/s3.js @@ -0,0 +1,19 @@ +const createS3PutEvent = ({ + bucketName = 'test-bucket', + objectKey = 'raw/bafybeif3hyrtm2skjmldufjid3myfp37sxyqbtz7xe3f2fqyd7ugi33b2a/102702852462097292/ciqg66cgf5pib7qhdczhhzlpb3z2dk5xeo4l6zlel7snh3j4v3g7l5i.car' +} = {}) => ({ + Records: [{ + s3: { + bucket: { + name: bucketName + }, + object: { + key: objectKey + } + } + }] +}) + +module.exports = { + createS3PutEvent +}