diff --git a/src/cli/commands/block/rm.js b/src/cli/commands/block/rm.js index 1f92ed1a06..83982e2273 100644 --- a/src/cli/commands/block/rm.js +++ b/src/cli/commands/block/rm.js @@ -25,7 +25,7 @@ module.exports = { const ipfs = await getIpfs() let errored = false - for await (const result of ipfs.block._rmAsyncIterator(hash, { + for await (const result of ipfs.block.rm(hash, { force, quiet })) { diff --git a/src/cli/commands/dag/resolve.js b/src/cli/commands/dag/resolve.js index bba7886034..7a9907f427 100644 --- a/src/cli/commands/dag/resolve.js +++ b/src/cli/commands/dag/resolve.js @@ -19,10 +19,9 @@ module.exports = { const options = {} try { - const result = await ipfs.dag.resolve(ref, options) let lastCid - for (const res of result) { + for await (const res of ipfs.dag.resolve(ref, options)) { if (CID.isCID(res.value)) { lastCid = res.value } diff --git a/src/cli/commands/pin/ls.js b/src/cli/commands/pin/ls.js index 5f75b6e410..7fbc6a2f7b 100644 --- a/src/cli/commands/pin/ls.js +++ b/src/cli/commands/pin/ls.js @@ -34,14 +34,13 @@ module.exports = { resolve((async () => { const paths = ipfsPath const ipfs = await getIpfs() - const results = await ipfs.pin.ls(paths, { type }) - results.forEach((res) => { + for await (const res of ipfs.pin.ls(paths, { type })) { let line = cidToString(res.hash, { base: cidBase }) if (!quiet) { line += ` ${res.type}` } print(line) - }) + } })()) } } diff --git a/src/core/components/block/get.js b/src/core/components/block/get.js new file mode 100644 index 0000000000..afc95d8b45 --- /dev/null +++ b/src/core/components/block/get.js @@ -0,0 +1,16 @@ +'use strict' + +const { cleanCid } = require('./utils') + +module.exports = ({ blockService, preload }) => { + return async function get (cid, options) { // eslint-disable-line require-await + options = options || {} + cid = cleanCid(cid) + + if (options.preload !== false) { + preload(cid) + } + + return blockService.get(cid) + } +} diff --git a/src/core/components/block/put.js b/src/core/components/block/put.js new file mode 100644 index 0000000000..526bc23e7f --- /dev/null +++ b/src/core/components/block/put.js @@ -0,0 +1,51 @@ +'use strict' + +const Block = require('ipfs-block') +const multihashing = require('multihashing-async') +const CID = require('cids') + +module.exports = ({ blockService, gcLock, preload }) => { + return async function put (block, options) { + options = options || {} + + if (Array.isArray(block)) { + throw new Error('Array is not supported') + } + + if (!Block.isBlock(block)) { + if (options.cid && CID.isCID(options.cid)) { + block = new Block(block, options.cid) + } else { + const mhtype = options.mhtype || 'sha2-256' + const format = options.format || 'dag-pb' + let cidVersion + + if (options.version == null) { + // Pick appropriate CID version + cidVersion = mhtype === 'sha2-256' && format === 'dag-pb' ? 0 : 1 + } else { + cidVersion = options.version + } + + const multihash = await multihashing(block, mhtype) + const cid = new CID(cidVersion, format, multihash) + + block = new Block(block, cid) + } + } + + const release = await gcLock.readLock() + + try { + await blockService.put(block) + + if (options.preload !== false) { + preload(block.cid) + } + + return block + } finally { + release() + } + } +} diff --git a/src/core/components/block/rm.js b/src/core/components/block/rm.js new file mode 100644 index 0000000000..8c251a9665 --- /dev/null +++ b/src/core/components/block/rm.js @@ -0,0 +1,65 @@ +'use strict' + +const CID = require('cids') +const errCode = require('err-code') +const { parallelMap, filter } = require('streaming-iterables') +const pipe = require('it-pipe') +const { PinTypes } = require('./pin/pin-manager') +const { cleanCid } = require('./utils') + +const BLOCK_RM_CONCURRENCY = 8 + +module.exports = ({ blockService, gcLock, pinManager }) => { + return async function * rm (cids, options) { + options = options || {} + + if (!Array.isArray(cids)) { + cids = [cids] + } + + // We need to take a write lock here to ensure that adding and removing + // blocks are exclusive operations + const release = await gcLock.writeLock() + + try { + yield * pipe( + cids, + parallelMap(BLOCK_RM_CONCURRENCY, async cid => { + cid = cleanCid(cid) + + const result = { hash: cid.toString() } + + try { + const pinResult = await pinManager.isPinnedWithType(cid, PinTypes.all) + + if (pinResult.pinned) { + if (CID.isCID(pinResult.reason)) { // eslint-disable-line max-depth + throw errCode(new Error(`pinned via ${pinResult.reason}`)) + } + + throw errCode(new Error(`pinned: ${pinResult.reason}`)) + } + + // remove has check when https://github.com/ipfs/js-ipfs-block-service/pull/88 is merged + const has = await blockService._repo.blocks.has(cid) + + if (!has) { + throw errCode(new Error('block not found'), 'ERR_BLOCK_NOT_FOUND') + } + + await blockService.delete(cid) + } catch (err) { + if (!options.force) { + result.error = `cannot remove ${cid}: ${err.message}` + } + } + + return result + }), + filter(() => !options.quiet) + ) + } finally { + release() + } + } +} diff --git a/src/core/components/block/stat.js b/src/core/components/block/stat.js new file mode 100644 index 0000000000..2a99e33704 --- /dev/null +++ b/src/core/components/block/stat.js @@ -0,0 +1,21 @@ +'use strict' + +const { cleanCid } = require('./utils') + +module.exports = ({ blockService, preload }) => { + return async function stat (cid, options) { + options = options || {} + cid = cleanCid(cid) + + if (options.preload !== false) { + preload(cid) + } + + const block = await blockService.get(cid) + + return { + key: cid.toString(), + size: block.data.length + } + } +} diff --git a/src/core/components/block/utils.js b/src/core/components/block/utils.js new file mode 100644 index 0000000000..76ca4fa293 --- /dev/null +++ b/src/core/components/block/utils.js @@ -0,0 +1,17 @@ +'use strict' + +const CID = require('cids') +const errCode = require('err-code') + +exports.cleanCid = cid => { + if (CID.isCID(cid)) { + return cid + } + + // CID constructor knows how to do the cleaning :) + try { + return new CID(cid) + } catch (err) { + throw errCode(err, 'ERR_INVALID_CID') + } +} diff --git a/src/core/components/dag.js b/src/core/components/dag.js deleted file mode 100644 index fd704e8139..0000000000 --- a/src/core/components/dag.js +++ /dev/null @@ -1,170 +0,0 @@ -'use strict' - -const callbackify = require('callbackify') -const CID = require('cids') -const all = require('async-iterator-all') -const errCode = require('err-code') -const multicodec = require('multicodec') - -function parseArgs (cid, path, options) { - options = options || {} - - // Allow options in path position - if (path !== undefined && typeof path !== 'string') { - options = path - path = undefined - } - - if (typeof cid === 'string') { - if (cid.startsWith('/ipfs/')) { - cid = cid.substring(6) - } - - const split = cid.split('/') - - try { - cid = new CID(split[0]) - } catch (err) { - throw errCode(err, 'ERR_INVALID_CID') - } - - split.shift() - - if (split.length > 0) { - path = split.join('/') - } else { - path = path || '/' - } - } else if (Buffer.isBuffer(cid)) { - try { - cid = new CID(cid) - } catch (err) { - throw errCode(err, 'ERR_INVALID_CID') - } - } - - return [ - cid, - path, - options - ] -} - -module.exports = function dag (self) { - return { - put: callbackify.variadic(async (dagNode, options) => { - options = options || {} - - if (options.cid && (options.format || options.hashAlg)) { - throw new Error('Can\'t put dag node. Please provide either `cid` OR `format` and `hashAlg` options.') - } else if (((options.format && !options.hashAlg) || (!options.format && options.hashAlg))) { - throw new Error('Can\'t put dag node. Please provide `format` AND `hashAlg` options.') - } - - const optionDefaults = { - format: multicodec.DAG_CBOR, - hashAlg: multicodec.SHA2_256 - } - - // The IPLD expects the format and hashAlg as constants - if (options.format && typeof options.format === 'string') { - const constantName = options.format.toUpperCase().replace(/-/g, '_') - options.format = multicodec[constantName] - } - if (options.hashAlg && typeof options.hashAlg === 'string') { - const constantName = options.hashAlg.toUpperCase().replace(/-/g, '_') - options.hashAlg = multicodec[constantName] - } - - options = options.cid ? options : Object.assign({}, optionDefaults, options) - - // js-ipld defaults to verion 1 CIDs. Hence set version 0 explicitly for - // dag-pb nodes - if (options.version === undefined) { - if (options.format === multicodec.DAG_PB && options.hashAlg === multicodec.SHA2_256) { - options.version = 0 - } else { - options.version = 1 - } - } - - let release - - if (options.pin) { - release = await self._gcLock.readLock() - } - - try { - const cid = await self._ipld.put(dagNode, options.format, { - hashAlg: options.hashAlg, - cidVersion: options.version - }) - - if (options.pin) { - await self.pin.add(cid, { - lock: false - }) - } - - if (options.preload !== false) { - self._preload(cid) - } - - return cid - } finally { - if (release) { - release() - } - } - }), - - get: callbackify.variadic(async (cid, path, options) => { - [cid, path, options] = parseArgs(cid, path, options) - - if (options.preload !== false) { - self._preload(cid) - } - - if (path == null || path === '/') { - const value = await self._ipld.get(cid) - - return { - value, - remainderPath: '' - } - } else { - let result - - for await (const entry of self._ipld.resolve(cid, path)) { - if (options.localResolve) { - return entry - } - - result = entry - } - - return result - } - }), - - tree: callbackify.variadic(async (cid, path, options) => { // eslint-disable-line require-await - [cid, path, options] = parseArgs(cid, path, options) - - if (options.preload !== false) { - self._preload(cid) - } - - return all(self._ipld.tree(cid, path, options)) - }), - - resolve: callbackify.variadic(async (cid, path, options) => { // eslint-disable-line require-await - [cid, path, options] = parseArgs(cid, path, options) - - if (options.preload !== false) { - self._preload(cid) - } - - return all(self._ipld.resolve(cid, path)) - }) - } -} diff --git a/src/core/components/dag/get.js b/src/core/components/dag/get.js new file mode 100644 index 0000000000..11c17152bc --- /dev/null +++ b/src/core/components/dag/get.js @@ -0,0 +1,34 @@ +'use strict' + +const { parseArgs } = require('./utils') + +module.exports = ({ ipld, preload }) => { + return async function get (cid, path, options) { + [cid, path, options] = parseArgs(cid, path, options) + + if (options.preload !== false) { + preload(cid) + } + + if (path == null || path === '/') { + const value = await ipld.get(cid) + + return { + value, + remainderPath: '' + } + } else { + let result + + for await (const entry of ipld.resolve(cid, path)) { + if (options.localResolve) { + return entry + } + + result = entry + } + + return result + } + } +} diff --git a/src/core/components/dag/put.js b/src/core/components/dag/put.js new file mode 100644 index 0000000000..301c87ba8c --- /dev/null +++ b/src/core/components/dag/put.js @@ -0,0 +1,70 @@ +'use strict' + +const multicodec = require('multicodec') +const nameToCodec = name => multicodec[name.toUpperCase().replace(/-/g, '_')] + +module.exports = ({ ipld, pin, gcLock, preload }) => { + return async function put (dagNode, options) { + options = options || {} + + if (options.cid && (options.format || options.hashAlg)) { + throw new Error('Can\'t put dag node. Please provide either `cid` OR `format` and `hashAlg` options.') + } else if (((options.format && !options.hashAlg) || (!options.format && options.hashAlg))) { + throw new Error('Can\'t put dag node. Please provide `format` AND `hashAlg` options.') + } + + const optionDefaults = { + format: multicodec.DAG_CBOR, + hashAlg: multicodec.SHA2_256 + } + + // The IPLD expects the format and hashAlg as constants + if (options.format && typeof options.format === 'string') { + options.format = nameToCodec(options.format) + } + if (options.hashAlg && typeof options.hashAlg === 'string') { + options.hashAlg = nameToCodec(options.hashAlg) + } + + options = options.cid ? options : Object.assign({}, optionDefaults, options) + + // js-ipld defaults to verion 1 CIDs. Hence set version 0 explicitly for + // dag-pb nodes + if (options.version === undefined) { + if (options.format === multicodec.DAG_PB && options.hashAlg === multicodec.SHA2_256) { + options.version = 0 + } else { + options.version = 1 + } + } + + let release + + if (options.pin) { + release = await gcLock.readLock() + } + + try { + const cid = await ipld.put(dagNode, options.format, { + hashAlg: options.hashAlg, + cidVersion: options.version + }) + + if (options.pin) { + await pin.add(cid, { + lock: false + }) + } + + if (options.preload !== false) { + preload(cid) + } + + return cid + } finally { + if (release) { + release() + } + } + } +} diff --git a/src/core/components/dag/resolve.js b/src/core/components/dag/resolve.js new file mode 100644 index 0000000000..e95e5b526f --- /dev/null +++ b/src/core/components/dag/resolve.js @@ -0,0 +1,15 @@ +'use strict' + +const { parseArgs } = require('./utils') + +module.exports = ({ ipld, preload }) => { + return async function * resolve (cid, path, options) { // eslint-disable-line require-await + [cid, path, options] = parseArgs(cid, path, options) + + if (options.preload !== false) { + preload(cid) + } + + yield * ipld.resolve(cid, path) + } +} diff --git a/src/core/components/dag/tree.js b/src/core/components/dag/tree.js new file mode 100644 index 0000000000..07d2d03e65 --- /dev/null +++ b/src/core/components/dag/tree.js @@ -0,0 +1,15 @@ +'use strict' + +const { parseArgs } = require('./utils') + +module.exports = ({ ipld, preload }) => { + return async function * tree (cid, path, options) { // eslint-disable-line require-await + [cid, path, options] = parseArgs(cid, path, options) + + if (options.preload !== false) { + preload(cid) + } + + yield * ipld.tree(cid, path, options) + } +} diff --git a/src/core/components/dag/utils.js b/src/core/components/dag/utils.js new file mode 100644 index 0000000000..810b0e2f9a --- /dev/null +++ b/src/core/components/dag/utils.js @@ -0,0 +1,48 @@ +'use strict' + +const CID = require('cids') +const errCode = require('err-code') + +exports.parseArgs = (cid, path, options) => { + options = options || {} + + // Allow options in path position + if (path !== undefined && typeof path !== 'string') { + options = path + path = undefined + } + + if (typeof cid === 'string') { + if (cid.startsWith('/ipfs/')) { + cid = cid.substring(6) + } + + const split = cid.split('/') + + try { + cid = new CID(split[0]) + } catch (err) { + throw errCode(err, 'ERR_INVALID_CID') + } + + split.shift() + + if (split.length > 0) { + path = split.join('/') + } else { + path = path || '/' + } + } else if (Buffer.isBuffer(cid)) { + try { + cid = new CID(cid) + } catch (err) { + throw errCode(err, 'ERR_INVALID_CID') + } + } + + return [ + cid, + path, + options + ] +} diff --git a/src/core/components/index.js b/src/core/components/index.js index 44d922712a..96598727c8 100644 --- a/src/core/components/index.js +++ b/src/core/components/index.js @@ -1,14 +1,29 @@ 'use strict' exports.add = require('./add') +exports.block = { + get: require('./block/get'), + put: require('./block/put'), + rm: require('./block/rm'), + stat: require('./block/stat') +} exports.config = require('./config') +exports.dag = { + get: require('./dag/get'), + put: require('./dag/put'), + resolve: require('./dag/resolve'), + tree: require('./dag/tree') +} exports.init = require('./init') +exports.pin = { + add: require('./pin/add'), + ls: require('./pin/ls'), + rm: require('./pin/rm') +} exports.start = require('./start') exports.stop = require('./stop') exports.legacy = { // TODO: these will be removed as the new API is completed - dag: require('./dag'), libp2p: require('./libp2p'), - object: require('./object'), - pin: require('./pin') + object: require('./object') } diff --git a/src/core/components/init.js b/src/core/components/init.js index 089d6148dc..2dda0eba04 100644 --- a/src/core/components/init.js +++ b/src/core/components/init.js @@ -9,7 +9,7 @@ const getDefaultConfig = require('../runtime/config-nodejs.js') const createRepo = require('../runtime/repo-nodejs') const Keychain = require('libp2p-keychain') const NoKeychain = require('./no-keychain') -const GCLock = require('./pin/gc-lock') +const mortice = require('mortice') const { DAGNode } = require('ipld-dag-pb') const UnixFs = require('ipfs-unixfs') const multicodec = require('multicodec') @@ -95,19 +95,28 @@ module.exports = ({ const preload = createPreloader(constructorOptions.preload) await preload.start() - const gcLock = new GCLock(constructorOptions.repoOwner, { - // Make sure GCLock is specific to repo, for tests where there are - // multiple instances of IPFS - morticeId: repo.path - }) - - const dag = Commands.legacy.dag({ _ipld: ipld, _preload: preload }) + // Make sure GC lock is specific to repo, for tests where there are + // multiple instances of IPFS + const gcLock = mortice(repo.path, { singleProcess: constructorOptions.repoOwner }) + const dag = { + get: Commands.dag.get({ ipld, preload }), + resolve: Commands.dag.resolve({ ipld, preload }), + tree: Commands.dag.tree({ ipld, preload }) + } const object = Commands.legacy.object({ _ipld: ipld, _preload: preload, dag, _gcLock: gcLock }) const pinManager = new PinManager(repo, dag) await pinManager.load() - const pin = Commands.legacy.pin({ _ipld: ipld, _preload: preload, object, _repo: repo, _pinManager: pinManager }) + const pin = { + add: Commands.pin.add({ pinManager, gcLock, dag, object }), + ls: Commands.pin.ls({ pinManager, object }), + rm: Commands.pin.rm({ pinManager, gcLock, object }) + } + + // FIXME: resolve this circular dependency + dag.put = Commands.dag.put({ ipld, pin, gcLock, preload }) + const add = Commands.add({ ipld, dag, preload, pin, gcLock, constructorOptions }) if (!isInitialized && !options.emptyRepo) { @@ -130,11 +139,13 @@ module.exports = ({ apiManager, constructorOptions, blockService, + dag, gcLock, initOptions: options, ipld, keychain, peerInfo, + pin, pinManager, preload, print, @@ -272,11 +283,13 @@ function createApi ({ apiManager, constructorOptions, blockService, + dag, gcLock, initOptions, ipld, keychain, peerInfo, + pin, pinManager, preload, print, @@ -299,8 +312,16 @@ function createApi ({ const api = { add, + block: { + get: Commands.block.get({ blockService, preload }), + put: Commands.block.put({ blockService, gcLock, preload }), + rm: Commands.block.rm({ blockService, gcLock, pinManager }), + stat: Commands.block.stat({ blockService, preload }) + }, config: Commands.config({ repo }), + dag, init: () => { throw new AlreadyInitializedError() }, + pin, start } diff --git a/src/core/components/pin.js b/src/core/components/pin.js deleted file mode 100644 index 176e5f5cc8..0000000000 --- a/src/core/components/pin.js +++ /dev/null @@ -1,248 +0,0 @@ -/* eslint max-nested-callbacks: ["error", 8] */ -'use strict' - -const callbackify = require('callbackify') -const errCode = require('err-code') -const multibase = require('multibase') -const { resolvePath } = require('../utils') -const PinManager = require('./pin/pin-manager') -const PinTypes = PinManager.PinTypes - -module.exports = (self) => { - const dag = self.dag - const pinManager = self._pinManager || new PinManager(self._repo, dag) - - const pin = { - add: callbackify.variadic(async (paths, options) => { - options = options || {} - - const recursive = options.recursive !== false - const cids = await resolvePath(self.object, paths) - const pinAdd = async () => { - const results = [] - - // verify that each hash can be pinned - for (const cid of cids) { - const key = cid.toBaseEncodedString() - - if (recursive) { - if (pinManager.recursivePins.has(key)) { - // it's already pinned recursively - results.push(key) - - continue - } - - // entire graph of nested links should be pinned, - // so make sure we have all the objects - await pinManager.fetchCompleteDag(key, { preload: options.preload }) - - // found all objects, we can add the pin - results.push(key) - } else { - if (pinManager.recursivePins.has(key)) { - // recursive supersedes direct, can't have both - throw new Error(`${key} already pinned recursively`) - } - - if (!pinManager.directPins.has(key)) { - // make sure we have the object - await dag.get(cid, { preload: options.preload }) - } - - results.push(key) - } - } - - // update the pin sets in memory - const pinset = recursive ? pinManager.recursivePins : pinManager.directPins - results.forEach(key => pinset.add(key)) - - // persist updated pin sets to datastore - await pinManager.flushPins() - - return results.map(hash => ({ hash })) - } - - // When adding a file, we take a lock that gets released after pinning - // is complete, so don't take a second lock here - const lock = Boolean(options.lock) - - if (!lock) { - return pinAdd() - } - - const release = await self._gcLock.readLock() - - try { - await pinAdd() - } finally { - release() - } - }), - - rm: callbackify.variadic(async (paths, options) => { - options = options || {} - - const recursive = options.recursive == null ? true : options.recursive - - if (options.cidBase && !multibase.names.includes(options.cidBase)) { - throw errCode(new Error('invalid multibase'), 'ERR_INVALID_MULTIBASE') - } - - const cids = await resolvePath(self.object, paths) - const release = await self._gcLock.readLock() - const results = [] - - try { - // verify that each hash can be unpinned - for (const cid of cids) { - const res = await pinManager.isPinnedWithType(cid, PinTypes.all) - - const { pinned, reason } = res - const key = cid.toBaseEncodedString() - - if (!pinned) { - throw new Error(`${key} is not pinned`) - } - - switch (reason) { - case (PinTypes.recursive): - if (!recursive) { - throw new Error(`${key} is pinned recursively`) - } - - results.push(key) - - break - case (PinTypes.direct): - results.push(key) - - break - default: - throw new Error(`${key} is pinned indirectly under ${reason}`) - } - } - - // update the pin sets in memory - results.forEach(key => { - if (recursive && pinManager.recursivePins.has(key)) { - pinManager.recursivePins.delete(key) - } else { - pinManager.directPins.delete(key) - } - }) - - // persist updated pin sets to datastore - await pinManager.flushPins() - - self.log(`Removed pins: ${results}`) - - return results.map(hash => ({ hash })) - } finally { - release() - } - }), - - ls: callbackify.variadic(async (paths, options) => { - options = options || {} - - let type = PinTypes.all - - if (paths && paths.type) { - options = paths - paths = null - } - - if (options.type) { - type = options.type - if (typeof options.type === 'string') { - type = options.type.toLowerCase() - } - const err = PinManager.checkPinType(type) - if (err) { - throw err - } - } - - if (paths) { - // check the pinned state of specific hashes - const cids = await resolvePath(self.object, paths) - const results = [] - - for (const cid of cids) { - const { key, reason, pinned } = await pinManager.isPinnedWithType(cid, type) - - if (pinned) { - switch (reason) { - case PinTypes.direct: - case PinTypes.recursive: - results.push({ - hash: key, - type: reason - }) - break - default: - results.push({ - hash: key, - type: `${PinTypes.indirect} through ${reason}` - }) - } - } - } - - if (!results.length) { - throw new Error(`path '${paths}' is not pinned`) - } - - return results - } - - // show all pinned items of type - let pins = [] - - if (type === PinTypes.direct || type === PinTypes.all) { - pins = pins.concat( - Array.from(pinManager.directPins).map(hash => ({ - type: PinTypes.direct, - hash - })) - ) - } - - if (type === PinTypes.recursive || type === PinTypes.all) { - pins = pins.concat( - Array.from(pinManager.recursivePins).map(hash => ({ - type: PinTypes.recursive, - hash - })) - ) - } - - if (type === PinTypes.indirect || type === PinTypes.all) { - const indirects = await pinManager.getIndirectKeys(options) - - pins = pins - // if something is pinned both directly and indirectly, - // report the indirect entry - .filter(({ hash }) => - !indirects.includes(hash) || - (indirects.includes(hash) && !pinManager.directPins.has(hash)) - ) - .concat(indirects.map(hash => ({ - type: PinTypes.indirect, - hash - }))) - - return pins - } - - return pins - }), - - // used by tests - pinManager - } - - return pin -} diff --git a/src/core/components/pin/add.js b/src/core/components/pin/add.js new file mode 100644 index 0000000000..07d6142d72 --- /dev/null +++ b/src/core/components/pin/add.js @@ -0,0 +1,74 @@ +/* eslint max-nested-callbacks: ["error", 8] */ +'use strict' + +const { resolvePath } = require('../utils') + +module.exports = ({ pinManager, gcLock, dag, object }) => { + return async function add (paths, options) { + options = options || {} + + const recursive = options.recursive !== false + const cids = await resolvePath(object, paths) + const pinAdd = async () => { + const results = [] + + // verify that each hash can be pinned + for (const cid of cids) { + const key = cid.toBaseEncodedString() + + if (recursive) { + if (pinManager.recursivePins.has(key)) { + // it's already pinned recursively + results.push(key) + + continue + } + + // entire graph of nested links should be pinned, + // so make sure we have all the objects + await pinManager.fetchCompleteDag(key, { preload: options.preload }) + + // found all objects, we can add the pin + results.push(key) + } else { + if (pinManager.recursivePins.has(key)) { + // recursive supersedes direct, can't have both + throw new Error(`${key} already pinned recursively`) + } + + if (!pinManager.directPins.has(key)) { + // make sure we have the object + await dag.get(cid, { preload: options.preload }) + } + + results.push(key) + } + } + + // update the pin sets in memory + const pinset = recursive ? pinManager.recursivePins : pinManager.directPins + results.forEach(key => pinset.add(key)) + + // persist updated pin sets to datastore + await pinManager.flushPins() + + return results.map(hash => ({ hash })) + } + + // When adding a file, we take a lock that gets released after pinning + // is complete, so don't take a second lock here + const lock = Boolean(options.lock) + + if (!lock) { + return pinAdd() + } + + const release = await gcLock.readLock() + + try { + await pinAdd() + } finally { + release() + } + } +} diff --git a/src/core/components/pin/gc-lock.js b/src/core/components/pin/gc-lock.js deleted file mode 100644 index faceea12cf..0000000000 --- a/src/core/components/pin/gc-lock.js +++ /dev/null @@ -1,83 +0,0 @@ -'use strict' - -const pull = require('pull-stream/pull') -const pullThrough = require('pull-stream/throughs/through') -const pullAsyncMap = require('pull-stream/throughs/async-map') -const Mutex = require('../../../utils/mutex') -const log = require('debug')('ipfs:gc:lock') - -class GCLock { - constructor (repoOwner, options) { - options = options || {} - - this.mutex = new Mutex(repoOwner, { ...options, log }) - } - - readLock () { - return this.mutex.readLock() - } - - writeLock () { - return this.mutex.writeLock() - } - - pullReadLock (lockedPullFn) { - return this.pullLock('readLock', lockedPullFn) - } - - pullWriteLock (lockedPullFn) { - return this.pullLock('writeLock', lockedPullFn) - } - - pullLock (type, lockedPullFn) { - const pullLocker = new PullLocker(this.mutex, type) - - return pull( - pullLocker.take(), - lockedPullFn(), - pullLocker.release() - ) - } -} - -class PullLocker { - constructor (mutex, type) { - this.mutex = mutex - this.type = type - - // The function to call to release the lock. It is set when the lock is taken - this.releaseLock = null - } - - take () { - return pullAsyncMap((i, cb) => { - // Check if the lock has already been acquired. - // Note: new items will only come through the pull stream once the first - // item has acquired a lock. - if (this.releaseLock) { - // The lock has been acquired so return immediately - return cb(null, i) - } - - // Request the lock - this.mutex[this.type]() - .then(release => { - // Save the release function to be called when the stream completes - this.releaseLock = release - - // The lock has been granted, so run the locked piece of code - cb(null, i) - }, cb) - }) - } - - // Releases the lock - release () { - return pullThrough(null, (err) => { - // When the stream completes, release the lock - this.releaseLock(err) - }) - } -} - -module.exports = GCLock diff --git a/src/core/components/pin/ls.js b/src/core/components/pin/ls.js new file mode 100644 index 0000000000..a316b35cd1 --- /dev/null +++ b/src/core/components/pin/ls.js @@ -0,0 +1,88 @@ +/* eslint max-nested-callbacks: ["error", 8] */ +'use strict' + +const { parallelMap } = require('streaming-iterables') +const { resolvePath } = require('../utils') +const PinManager = require('./pin/pin-manager') +const { PinTypes } = PinManager + +const PIN_LS_CONCURRENCY = 8 + +module.exports = ({ pinManager, object }) => { + return async function * ls (paths, options) { + options = options || {} + + let type = PinTypes.all + + if (paths && paths.type) { + options = paths + paths = null + } + + if (options.type) { + type = options.type + if (typeof options.type === 'string') { + type = options.type.toLowerCase() + } + const err = PinManager.checkPinType(type) + if (err) { + throw err + } + } + + if (paths) { + // check the pinned state of specific hashes + const cids = await resolvePath(object, paths) + + yield * parallelMap(PIN_LS_CONCURRENCY, async cid => { + const { key, reason, pinned } = await pinManager.isPinnedWithType(cid, type) + + if (!pinned) { + throw new Error(`path '${paths[cids.indexOf(cid)]}' is not pinned`) + } + + if (reason === PinTypes.direct || reason === PinTypes.recursive) { + return { hash: key, type: reason } + } + + return { hash: key, type: `${PinTypes.indirect} through ${reason}` } + }, cids) + + return + } + + // show all pinned items of type + let pins = [] + + if (type === PinTypes.direct || type === PinTypes.all) { + pins = pins.concat( + Array.from(pinManager.directPins).map(hash => ({ + type: PinTypes.direct, + hash + })) + ) + } + + if (type === PinTypes.recursive || type === PinTypes.all) { + pins = pins.concat( + Array.from(pinManager.recursivePins).map(hash => ({ + type: PinTypes.recursive, + hash + })) + ) + } + + if (type === PinTypes.indirect || type === PinTypes.all) { + const indirects = await pinManager.getIndirectKeys(options) + + pins = pins + // if something is pinned both directly and indirectly, + // report the indirect entry + .filter(({ hash }) => !indirects.includes(hash) || !pinManager.directPins.has(hash)) + .concat(indirects.map(hash => ({ type: PinTypes.indirect, hash }))) + } + + // FIXME: https://github.com/ipfs/js-ipfs/issues/2244 + yield * pins + } +} diff --git a/src/core/components/pin/rm.js b/src/core/components/pin/rm.js new file mode 100644 index 0000000000..0147abd59e --- /dev/null +++ b/src/core/components/pin/rm.js @@ -0,0 +1,67 @@ +'use strict' + +const errCode = require('err-code') +const multibase = require('multibase') +const { parallelMap, collect } = require('streaming-iterables') +const pipe = require('it-pipe') +const { resolvePath } = require('../utils') +const { PinTypes } = require('./pin/pin-manager') + +const PIN_RM_CONCURRENCY = 8 + +module.exports = ({ pinManager, gcLock, object }) => { + return async function rm (paths, options) { + options = options || {} + + const recursive = options.recursive !== false + + if (options.cidBase && !multibase.names.includes(options.cidBase)) { + throw errCode(new Error('invalid multibase'), 'ERR_INVALID_MULTIBASE') + } + + const cids = await resolvePath(object, paths) + const release = await gcLock.readLock() + + try { + // verify that each hash can be unpinned + const results = await pipe( + cids, + parallelMap(PIN_RM_CONCURRENCY, async cid => { + const res = await pinManager.isPinnedWithType(cid, PinTypes.all) + + const { pinned, reason } = res + const key = cid.toBaseEncodedString() + + if (!pinned) { + throw new Error(`${key} is not pinned`) + } + if (reason !== PinTypes.recursive && reason !== PinTypes.direct) { + throw new Error(`${key} is pinned indirectly under ${reason}`) + } + if (reason === PinTypes.recursive && !recursive) { + throw new Error(`${key} is pinned recursively`) + } + + return key + }), + collect + ) + + // update the pin sets in memory + results.forEach(key => { + if (recursive && pinManager.recursivePins.has(key)) { + pinManager.recursivePins.delete(key) + } else { + pinManager.directPins.delete(key) + } + }) + + // persist updated pin sets to datastore + await pinManager.flushPins() + + return results + } finally { + release() + } + } +} diff --git a/src/core/components/start.js b/src/core/components/start.js index f9f41c7458..e7bd337f13 100644 --- a/src/core/components/start.js +++ b/src/core/components/start.js @@ -107,9 +107,19 @@ function createApi ({ print, repo }) { - const dag = Commands.legacy.dag({ _ipld: ipld, _preload: preload }) + const dag = { + get: Commands.dag.get({ ipld, preload }), + resolve: Commands.dag.resolve({ ipld, preload }), + tree: Commands.dag.tree({ ipld, preload }) + } const object = Commands.legacy.object({ _ipld: ipld, _preload: preload, dag, _gcLock: gcLock }) - const pin = Commands.legacy.pin({ _ipld: ipld, _preload: preload, object, _repo: repo, _pinManager: pinManager }) + const pin = { + add: Commands.pin.add({ pinManager, gcLock, dag, object }), + ls: Commands.pin.ls({ pinManager, object }), + rm: Commands.pin.rm({ pinManager, gcLock, object }) + } + // FIXME: resolve this circular dependency + dag.put = Commands.dag.put({ ipld, pin, gcLock, preload }) const add = Commands.add({ ipld, dag, preload, pin, gcLock, constructorOptions }) const stop = Commands.stop({ @@ -131,8 +141,15 @@ function createApi ({ const api = { add, + block: { + get: Commands.block.get({ blockService, preload }), + put: Commands.block.put({ blockService, gcLock, preload }), + rm: Commands.block.rm({ blockService, gcLock, pinManager }), + stat: Commands.block.stat({ blockService, preload }) + }, config: Commands.config({ repo }), init: () => { throw new AlreadyInitializedError() }, + pin, start: () => apiManager.api, stop } diff --git a/src/core/components/stop.js b/src/core/components/stop.js index 4e2a9bb036..dcec2f00b5 100644 --- a/src/core/components/stop.js +++ b/src/core/components/stop.js @@ -76,9 +76,19 @@ function createApi ({ print, repo }) { - const dag = Commands.legacy.dag({ _ipld: ipld, _preload: preload }) + const dag = { + get: Commands.dag.get({ ipld, preload }), + resolve: Commands.dag.resolve({ ipld, preload }), + tree: Commands.dag.tree({ ipld, preload }) + } const object = Commands.legacy.object({ _ipld: ipld, _preload: preload, dag, _gcLock: gcLock }) - const pin = Commands.legacy.pin({ _ipld: ipld, _preload: preload, object, _repo: repo, _pinManager: pinManager }) + const pin = { + add: Commands.pin.add({ pinManager, gcLock, dag, object }), + ls: Commands.pin.ls({ pinManager, object }), + rm: Commands.pin.rm({ pinManager, gcLock, object }) + } + // FIXME: resolve this circular dependency + dag.put = Commands.dag.put({ ipld, pin, gcLock, preload }) const add = Commands.add({ ipld, dag, preload, pin, gcLock, constructorOptions }) const start = Commands.start({ @@ -98,8 +108,15 @@ function createApi ({ const api = { add, + block: { + get: Commands.block.get({ blockService, preload }), + put: Commands.block.put({ blockService, gcLock, preload }), + rm: Commands.block.rm({ blockService, gcLock, pinManager }), + stat: Commands.block.stat({ blockService, preload }) + }, config: Commands.config({ repo }), init: () => { throw new AlreadyInitializedError() }, + pin, start, stop: () => apiManager.api } diff --git a/src/http/api/resources/block.js b/src/http/api/resources/block.js index c88b25b15f..a33a1cd311 100644 --- a/src/http/api/resources/block.js +++ b/src/http/api/resources/block.js @@ -132,7 +132,7 @@ exports.rm = { return streamResponse(request, h, async (output) => { try { - for await (const result of request.server.app.ipfs.block._rmAsyncIterator(arg, { + for await (const result of request.server.app.ipfs.block.rm(arg, { force, quiet })) { diff --git a/src/http/api/resources/dag.js b/src/http/api/resources/dag.js index 436382bc38..f3dffc1b4f 100644 --- a/src/http/api/resources/dag.js +++ b/src/http/api/resources/dag.js @@ -248,7 +248,7 @@ exports.resolve = { let lastRemainderPath = path if (path) { - const result = ipfs._ipld.resolve(lastCid, path) + const result = ipfs.dag.resolve(lastCid, path) while (true) { const resolveResult = (await result.next()).value if (!CID.isCID(resolveResult.value)) { diff --git a/src/http/api/resources/pin.js b/src/http/api/resources/pin.js index 576d9be88d..593b10de69 100644 --- a/src/http/api/resources/pin.js +++ b/src/http/api/resources/pin.js @@ -4,6 +4,7 @@ const multibase = require('multibase') const Joi = require('@hapi/joi') const Boom = require('@hapi/boom') const isIpfs = require('is-ipfs') +const toStream = require('it-to-stream') const { cidToString } = require('../../../utils/cid') function parseArgs (request, h) { @@ -52,21 +53,7 @@ exports.ls = { async handler (request, h) { const { ipfs } = request.server.app const { path, type } = request.pre.args - - let result - try { - result = await ipfs.pin.ls(path, { type }) - } catch (err) { - throw Boom.boomify(err) - } - - return h.response({ - Keys: result.reduce((acc, v) => { - const prop = cidToString(v.hash, { base: request.query['cid-base'] }) - acc[prop] = { Type: v.type } - return acc - }, {}) - }) + return h.response(toStream.readable(ipfs.pin.ls(path, { type }))) } } diff --git a/src/utils/mutex.js b/src/utils/mutex.js deleted file mode 100644 index 8cb3df36cc..0000000000 --- a/src/utils/mutex.js +++ /dev/null @@ -1,52 +0,0 @@ -'use strict' - -const assert = require('assert') -const mortice = require('mortice') -const noop = () => {} - -// Wrap mortice to present a callback interface -class Mutex { - constructor (repoOwner, options) { - options = options || {} - - this.mutex = mortice(options.morticeId, { - singleProcess: repoOwner - }) - - this.log = options.log || noop - this.lockId = 0 - } - - readLock () { - return this._lock('readLock') - } - - writeLock () { - return this._lock('writeLock') - } - - /** - * Request a read or write lock - * - * @param {String} type The type of lock: readLock / writeLock - * @returns {Promise} - */ - async _lock (type) { - assert(typeof type === 'string', `first argument to Mutex.${type}() must be a string, got ${typeof type}`) - - const lockId = this.lockId++ - this.log(`[${lockId}] ${type} requested`) - - // Get a Promise for the lock, wrap it for logging - const release = await this.mutex[type]() - - this.log(`[${lockId}] ${type} started`) - - return () => { - this.log(`[${lockId}] ${type} released`) - release() - } - } -} - -module.exports = Mutex