diff --git a/README.md b/README.md index 11b3f409..6876bdb7 100644 --- a/README.md +++ b/README.md @@ -60,12 +60,12 @@ import { create } from 'libp2p-kad-dht' /** * @param {Libp2p} libp2p */ -function addDHT(libp2p) { +async function addDHT(libp2p) { const customDHT = create({ libp2p, protocolPrefix: '/custom' }) - customDHT.start() + await customDHT.start() return customDHT } diff --git a/package.json b/package.json index 8f0e9a8d..1822b997 100644 --- a/package.json +++ b/package.json @@ -66,7 +66,7 @@ "it-take": "^1.0.2", "k-bucket": "^5.1.0", "libp2p-crypto": "^0.21.0", - "libp2p-interfaces": "^2.0.1", + "libp2p-interfaces": "^4.0.0", "libp2p-record": "^0.10.4", "multiaddr": "^10.0.0", "multiformats": "^9.4.5", @@ -92,7 +92,7 @@ "it-filter": "^1.0.3", "it-last": "^1.0.6", "it-pair": "^1.0.0", - "libp2p": "^0.35.4", + "libp2p": "libp2p/js-libp2p#feat/async-peerstore", "lodash.random": "^3.2.0", "lodash.range": "^3.2.0", "p-retry": "^4.2.0", diff --git a/src/content-routing/index.js b/src/content-routing/index.js index efc4e607..e27f5bec 100644 --- a/src/content-routing/index.js +++ b/src/content-routing/index.js @@ -28,7 +28,7 @@ class ContentRouting { * @param {import('../query/manager').QueryManager} params.queryManager * @param {import('../routing-table').RoutingTable} params.routingTable * @param {import('../providers').Providers} params.providers - * @param {import('../types').PeerStore} params.peerStore + * @param {import('libp2p/src/peer-store/types').PeerStore} params.peerStore * @param {boolean} params.lan */ constructor ({ peerId, network, peerRouting, queryManager, routingTable, providers, peerStore, lan }) { @@ -137,10 +137,15 @@ class ContentRouting { // yield values if we have some, also slice because maybe we got lucky and already have too many? if (provs.length) { - const providers = provs.slice(0, toFind).map(peerId => ({ - id: peerId, - multiaddrs: (this._peerStore.addressBook.get(peerId) || []).map(address => address.multiaddr) - })) + /** @type {{ id: PeerId, multiaddrs: Multiaddr[] }[]} */ + const providers = [] + + for (const peerId of provs.slice(0, toFind)) { + providers.push({ + id: peerId, + multiaddrs: ((await this._peerStore.addressBook.get(peerId)) || []).map(address => address.multiaddr) + }) + } yield peerResponseEvent({ from: this._peerId, messageType: MessageType.GET_PROVIDERS, providers }) yield providerEvent({ from: this._peerId, providers: providers }) diff --git a/src/dual-kad-dht.js b/src/dual-kad-dht.js index c5f3e9f7..a18a98bb 100644 --- a/src/dual-kad-dht.js +++ b/src/dual-kad-dht.js @@ -77,15 +77,15 @@ class DualKadDHT extends EventEmitter { /** * Whether we are in client or server mode */ - enableServerMode () { - this._wan.enableServerMode() + async enableServerMode () { + await this._wan.enableServerMode() } /** * Whether we are in client or server mode */ - enableClientMode () { - this._wan.enableClientMode() + async enableClientMode () { + await this._wan.enableClientMode() } /** @@ -314,7 +314,7 @@ class DualKadDHT extends EventEmitter { log('getPublicKey %p', peer) // local check - const peerData = this._libp2p.peerStore.get(peer) + const peerData = await this._libp2p.peerStore.get(peer) if (peerData && peerData.id.pubKey) { log('getPublicKey: found local copy') @@ -339,8 +339,8 @@ class DualKadDHT extends EventEmitter { const peerId = new PeerId(peer.id, undefined, pk) const addrs = ((peerData && peerData.addresses) || []).map((address) => address.multiaddr) - this._libp2p.peerStore.addressBook.add(peerId, addrs) - this._libp2p.peerStore.keyBook.set(peerId, pk) + await this._libp2p.peerStore.addressBook.add(peerId, addrs) + await this._libp2p.peerStore.keyBook.set(peerId, pk) return pk } diff --git a/src/kad-dht.js b/src/kad-dht.js index 49d4ae46..88decb66 100644 --- a/src/kad-dht.js +++ b/src/kad-dht.js @@ -280,9 +280,11 @@ class KadDHT extends EventEmitter { // handle peers being discovered via other peer discovery mechanisms this._topologyListener.on('peer', async (peerId) => { + const multiaddrs = await this._libp2p.peerStore.addressBook.get(peerId) + const peerData = { id: peerId, - multiaddrs: (this._libp2p.peerStore.addressBook.get(peerId) || []).map((/** @type {{ multiaddr: Multiaddr }} */ addr) => addr.multiaddr) + multiaddrs: multiaddrs.map(addr => addr.multiaddr) } this.onPeerConnect(peerData).catch(err => { @@ -332,19 +334,19 @@ class KadDHT extends EventEmitter { /** * Whether we are in client or server mode */ - enableServerMode () { + async enableServerMode () { this._log('enabling server mode') this._clientMode = false - this._libp2p.handle(this._protocol, this._rpc.onIncomingStream.bind(this._rpc)) + await this._libp2p.handle(this._protocol, this._rpc.onIncomingStream.bind(this._rpc)) } /** * Whether we are in client or server mode */ - enableClientMode () { + async enableClientMode () { this._log('enabling client mode') this._clientMode = true - this._libp2p.unhandle(this._protocol) + await this._libp2p.unhandle(this._protocol) } /** @@ -355,9 +357,9 @@ class KadDHT extends EventEmitter { // Only respond to queries when not in client mode if (this._clientMode) { - this.enableClientMode() + await this.enableClientMode() } else { - this.enableServerMode() + await this.enableServerMode() } await Promise.all([ diff --git a/src/peer-routing/index.js b/src/peer-routing/index.js index 032ec0d9..51a07482 100644 --- a/src/peer-routing/index.js +++ b/src/peer-routing/index.js @@ -24,7 +24,7 @@ class PeerRouting { * @param {object} params * @param {import('peer-id')} params.peerId * @param {import('../routing-table').RoutingTable} params.routingTable - * @param {import('../types').PeerStore} params.peerStore + * @param {import('libp2p/src/peer-store/types').PeerStore} params.peerStore * @param {import('../network').Network} params.network * @param {import('libp2p-interfaces/src/types').DhtValidators} params.validators * @param {import('../query/manager').QueryManager} params.queryManager @@ -52,11 +52,11 @@ class PeerRouting { if (p) { this._log('findPeerLocal found %p in routing table', peer) - peerData = this._peerStore.get(p) + peerData = await this._peerStore.get(p) } if (!peerData) { - peerData = this._peerStore.get(peer) + peerData = await this._peerStore.get(peer) } if (peerData) { @@ -141,7 +141,7 @@ class PeerRouting { const match = peers.find((p) => p.equals(id)) if (match) { - const peer = this._peerStore.get(id) + const peer = await this._peerStore.get(id) if (peer) { this._log('found in peerStore') @@ -232,13 +232,15 @@ class PeerRouting { this._log('found %d peers close to %b', peers.length, key) - yield * peers.peers.map(peer => finalPeerEvent({ - from: this._peerId, - peer: { - id: peer, - multiaddrs: (this._peerStore.addressBook.get(peer) || []).map(addr => addr.multiaddr) - } - })) + for (const peer of peers.peers) { + yield finalPeerEvent({ + from: this._peerId, + peer: { + id: peer, + multiaddrs: (await (this._peerStore.addressBook.get(peer)) || []).map(addr => addr.multiaddr) + } + }) + } } /** @@ -294,16 +296,20 @@ class PeerRouting { async getCloserPeersOffline (key, closerThan) { const id = await utils.convertBuffer(key) const ids = this._routingTable.closestPeers(id) - const output = ids - .map((p) => { - const peer = this._peerStore.get(p) + const output = [] - return { - id: p, - multiaddrs: peer ? peer.addresses.map((address) => address.multiaddr) : [] - } + for (const peerId of ids) { + if (peerId.equals(closerThan)) { + continue + } + + const peer = await this._peerStore.get(peerId) + + output.push({ + id: peerId, + multiaddrs: peer ? peer.addresses.map((address) => address.multiaddr) : [] }) - .filter((closer) => !closer.id.equals(closerThan)) + } if (output.length) { this._log('getCloserPeersOffline found %d peer(s) closer to %b than %p', output.length, key, closerThan) diff --git a/src/rpc/handlers/add-provider.js b/src/rpc/handlers/add-provider.js index 08e0f2da..7c4ed726 100644 --- a/src/rpc/handlers/add-provider.js +++ b/src/rpc/handlers/add-provider.js @@ -19,7 +19,7 @@ class AddProviderHandler { * @param {object} params * @param {PeerId} params.peerId * @param {import('../../providers').Providers} params.providers - * @param {import('../../types').PeerStore} params.peerStore + * @param {import('libp2p/src/peer-store/types').PeerStore} params.peerStore */ constructor ({ peerId, providers, peerStore }) { this._peerId = peerId @@ -69,7 +69,7 @@ class AddProviderHandler { if (!this._peerId.equals(pi.id)) { // Add known address to peer store - this._peerStore.addressBook.add(pi.id, pi.multiaddrs) + await this._peerStore.addressBook.add(pi.id, pi.multiaddrs) await this._providers.addProvider(cid, pi.id) } }) diff --git a/src/rpc/handlers/get-providers.js b/src/rpc/handlers/get-providers.js index 6a9bb559..0943aa45 100644 --- a/src/rpc/handlers/get-providers.js +++ b/src/rpc/handlers/get-providers.js @@ -13,6 +13,7 @@ const { /** * @typedef {import('peer-id')} PeerId * @typedef {import('../types').DHTMessageHandler} DHTMessageHandler + * @typedef {import('../../types').PeerData} PeerData */ /** @@ -24,7 +25,7 @@ class GetProvidersHandler { * @param {PeerId} params.peerId * @param {import('../../peer-routing').PeerRouting} params.peerRouting * @param {import('../../providers').Providers} params.providers - * @param {import('../../types').PeerStore} params.peerStore + * @param {import('libp2p/src/peer-store/types').PeerStore} params.peerStore * @param {import('../../types').Addressable} params.addressable * @param {boolean} [params.lan] */ @@ -58,8 +59,8 @@ class GetProvidersHandler { this._peerRouting.getCloserPeersOffline(msg.key, peerId) ]) - const providerPeers = this._getPeers(peers) - const closerPeers = this._getPeers(closer.map(({ id }) => id)) + const providerPeers = await this._getPeers(peers) + const closerPeers = await this._getPeers(closer.map(({ id }) => id)) const response = new Message(msg.type, msg.key, msg.clusterLevel) if (providerPeers.length > 0) { @@ -77,22 +78,30 @@ class GetProvidersHandler { /** * @param {PeerId} peerId */ - _getAddresses (peerId) { - return this._peerId.equals(peerId) ? this._addressable.multiaddrs : (this._peerStore.addressBook.get(peerId) || []).map(address => address.multiaddr) + async _getAddresses (peerId) { + return this._peerId.equals(peerId) ? this._addressable.multiaddrs : (await (this._peerStore.addressBook.get(peerId)) || []).map(address => address.multiaddr) } /** * @param {PeerId[]} peerIds - * @returns */ - _getPeers (peerIds) { - return peerIds - .map((peerId) => ({ + async _getPeers (peerIds) { + /** @type {PeerData[]} */ + const output = [] + const addrFilter = this._lan ? removePublicAddresses : removePrivateAddresses + + for (const peerId of peerIds) { + const peer = addrFilter({ id: peerId, - multiaddrs: this._getAddresses(peerId) - })) - .map(this._lan ? removePublicAddresses : removePrivateAddresses) - .filter(({ multiaddrs }) => multiaddrs.length) + multiaddrs: await this._getAddresses(peerId) + }) + + if (peer.multiaddrs.length) { + output.push(peer) + } + } + + return output } } diff --git a/src/rpc/handlers/get-value.js b/src/rpc/handlers/get-value.js index cb00e620..b89f646c 100644 --- a/src/rpc/handlers/get-value.js +++ b/src/rpc/handlers/get-value.js @@ -13,6 +13,7 @@ const log = utils.logger('libp2p:kad-dht:rpc:handlers:get-value') /** * @typedef {import('peer-id')} PeerId * @typedef {import('../types').DHTMessageHandler} DHTMessageHandler + * @typedef {import('libp2p-interfaces/src/keys/types').PublicKey} PublicKey */ /** @@ -22,7 +23,7 @@ class GetValueHandler { /** * @param {object} params * @param {PeerId} params.peerId - * @param {import('../../types').PeerStore} params.peerStore + * @param {import('libp2p/src/peer-store/types').PeerStore} params.peerStore * @param {import('../../peer-routing').PeerRouting} params.peerRouting * @param {import('interface-datastore').Datastore} params.records */ @@ -53,18 +54,18 @@ class GetValueHandler { if (utils.isPublicKeyKey(key)) { log('is public key') const idFromKey = utils.fromPublicKeyKey(key) - let id + /** @type {PublicKey | undefined} */ + let pubKey if (this._peerId.equals(idFromKey)) { - id = this._peerId + pubKey = this._peerId.pubKey } else { - const peerData = this._peerStore.get(idFromKey) - id = peerData && peerData.id + pubKey = await this._peerStore.keyBook.get(idFromKey) } - if (id && id.pubKey) { + if (pubKey != null) { log('returning found public key') - response.record = new Record(key, id.pubKey.bytes) + response.record = new Record(key, pubKey.bytes) return response } } diff --git a/src/rpc/handlers/index.js b/src/rpc/handlers/index.js index 88a5d5c4..7eb0425b 100644 --- a/src/rpc/handlers/index.js +++ b/src/rpc/handlers/index.js @@ -16,7 +16,7 @@ const { PutValueHandler } = require('./put-value') * @param {object} params * @param {import('peer-id')} params.peerId * @param {import('../../providers').Providers} params.providers - * @param {import('../../types').PeerStore} params.peerStore + * @param {import('libp2p/src/peer-store/types').PeerStore} params.peerStore * @param {import('../../types').Addressable} params.addressable * @param {import('../../peer-routing').PeerRouting} params.peerRouting * @param {import('interface-datastore').Datastore} params.records diff --git a/src/rpc/index.js b/src/rpc/index.js index b467ef54..13b2bf6a 100644 --- a/src/rpc/index.js +++ b/src/rpc/index.js @@ -23,7 +23,7 @@ class RPC { * @param {import('../routing-table').RoutingTable} params.routingTable * @param {import('peer-id')} params.peerId * @param {import('../providers').Providers} params.providers - * @param {import('../types').PeerStore} params.peerStore + * @param {import('libp2p/src/peer-store/types').PeerStore} params.peerStore * @param {import('../types').Addressable} params.addressable * @param {import('../peer-routing').PeerRouting} params.peerRouting * @param {import('interface-datastore').Datastore} params.records diff --git a/src/topology-listener.js b/src/topology-listener.js index bc10f87c..1f2a835d 100644 --- a/src/topology-listener.js +++ b/src/topology-listener.js @@ -12,7 +12,7 @@ class TopologyListener extends EventEmitter { * Create a new network * * @param {object} params - * @param {import('./types').Registrar} params.registrar + * @param {import('libp2p/src/registrar')} params.registrar * @param {string} params.protocol * @param {boolean} params.lan */ @@ -28,7 +28,7 @@ class TopologyListener extends EventEmitter { /** * Start the network */ - start () { + async start () { if (this._running) { return } @@ -46,7 +46,7 @@ class TopologyListener extends EventEmitter { onDisconnect: () => {} } }) - this._registrarId = this._registrar.register(topology) + this._registrarId = await this._registrar.register(topology) } /** diff --git a/src/types.ts b/src/types.ts index 2b90a601..12600e80 100644 --- a/src/types.ts +++ b/src/types.ts @@ -2,7 +2,6 @@ import type PeerId from 'peer-id' import type { Multiaddr } from 'multiaddr' import type { CID } from 'multiformats/cid' import type { MuxedStream } from 'libp2p/src/upgrader' -import type Topology from 'libp2p-interfaces/src/topology' import type { PublicKey } from 'libp2p-crypto' import type { Message } from './message/dht' @@ -141,8 +140,8 @@ export interface DHT { put: (key: Uint8Array, value: Uint8Array, options?: QueryOptions) => AsyncIterable // enable/disable publishing - enableServerMode: () => void - enableClientMode: () => void + enableServerMode: () => Promise + enableClientMode: () => Promise // housekeeping refreshRoutingTable: () => Promise @@ -161,24 +160,6 @@ export interface Addressable { multiaddrs: Multiaddr[] } -// Implemented by libp2p.registrar, should be moved to libp2p-interfaces eventually -export interface Registrar { - register: (topology: Topology) => string - unregister: (id: string) => boolean -} - -// Implemented by libp2p.peerStore, should be moved to libp2p-interfaces eventually -export interface PeerStore { - addressBook: AddressBook - get: (peerId: PeerId) => { id: PeerId, addresses: Array<{ multiaddr: Multiaddr }> } | undefined -} - -// Implemented by libp2p.peerStore.addressStore, should be moved to libp2p-interfaces eventually -export interface AddressBook { - add: (peerId: PeerId, addresses: Multiaddr[]) => void - get: (peerId: PeerId) => Array<{ multiaddr: Multiaddr }> | undefined -} - export interface Metrics { updateComponentMetric: (component: string, metric: string, value: number) => void } diff --git a/test/kad-dht.spec.js b/test/kad-dht.spec.js index 7269bd87..a348161d 100644 --- a/test/kad-dht.spec.js +++ b/test/kad-dht.spec.js @@ -97,11 +97,11 @@ describe('KadDHT', () => { sinon.spy(dht._lan._network, 'start') sinon.spy(dht._lan._network, 'stop') - dht.start() + await dht.start() expect(dht._wan._network.start.calledOnce).to.equal(true) expect(dht._lan._network.start.calledOnce).to.equal(true) - dht.stop() + await dht.stop() expect(dht._wan._network.stop.calledOnce).to.equal(true) expect(dht._lan._network.stop.calledOnce).to.equal(true) }) @@ -112,15 +112,15 @@ describe('KadDHT', () => { dht._libp2p.handle = sinon.stub() - dht.start() + await dht.start() // lan dht is always in server mode expect(dht._libp2p.handle.callCount).to.equal(1) - dht.enableServerMode() + await dht.enableServerMode() // now wan dht should be in server mode too expect(dht._libp2p.handle.callCount).to.equal(2) - dht.stop() + await dht.stop() }) it('client mode', async () => { @@ -129,8 +129,8 @@ describe('KadDHT', () => { dht._libp2p.handle = sinon.stub() - dht.start() - dht.stop() + await dht.start() + await dht.stop() // lan dht is always in server mode expect(dht._libp2p.handle.callCount).to.equal(1) @@ -139,17 +139,17 @@ describe('KadDHT', () => { it('should not fail when already started', async () => { const [dht] = await tdht.spawn(1, null, false) - dht.start() - dht.start() - dht.start() + await dht.start() + await dht.start() + await dht.start() - dht.stop() + await dht.stop() }) it('should not fail to stop when was not started', async () => { const [dht] = await tdht.spawn(1, null, false) - dht.stop() + await dht.stop() }) }) @@ -493,7 +493,7 @@ describe('KadDHT', () => { const wanSpy = sinon.spy(dhts[0]._wan, 'provide') const lanSpy = sinon.spy(dhts[0]._lan, 'provide') - dhts[0].enableServerMode() + await dhts[0].enableServerMode() await drain(dhts[0].provide(values[0].cid)) @@ -736,7 +736,7 @@ describe('KadDHT', () => { const dhts = await tdht.spawn(2) const ids = dhts.map((d) => d._libp2p.peerId) - dhts[0]._libp2p.peerStore.addressBook.add(dhts[1]._libp2p.peerId, [new Multiaddr('/ip4/160.1.1.1/tcp/80')]) + await dhts[0]._libp2p.peerStore.addressBook.add(dhts[1]._libp2p.peerId, [new Multiaddr('/ip4/160.1.1.1/tcp/80')]) const key = await dhts[0].getPublicKey(ids[1]) expect(key).to.eql(dhts[1]._libp2p.peerId.pubKey) @@ -753,7 +753,7 @@ describe('KadDHT', () => { await tdht.connect(dhts[0], dhts[1]) - dhts[0]._libp2p.peerStore.addressBook.add(dhts[1]._libp2p.peerId, [new Multiaddr('/ip4/160.1.1.1/tcp/80')]) + await dhts[0]._libp2p.peerStore.addressBook.add(dhts[1]._libp2p.peerId, [new Multiaddr('/ip4/160.1.1.1/tcp/80')]) const key = await dhts[0].getPublicKey(ids[1]) expect(uint8ArrayEquals(key, dhts[1]._libp2p.peerId.pubKey)).to.eql(true) diff --git a/test/rpc/handlers/add-provider.spec.js b/test/rpc/handlers/add-provider.spec.js index 194c2754..b8a24d32 100644 --- a/test/rpc/handlers/add-provider.spec.js +++ b/test/rpc/handlers/add-provider.spec.js @@ -87,7 +87,7 @@ describe('rpc - handlers - AddProvider', () => { expect(provs).to.have.length(1) expect(provs[0].id).to.eql(peerIds[0].id) - const bookEntry = dht._libp2p.peerStore.get(peerIds[0]) + const bookEntry = await dht._libp2p.peerStore.get(peerIds[0]) expect(bookEntry.addresses.map((address) => address.multiaddr)).to.eql([ma1]) }) }) diff --git a/test/rpc/handlers/get-value.spec.js b/test/rpc/handlers/get-value.spec.js index 131c2b0f..2caaa198 100644 --- a/test/rpc/handlers/get-value.spec.js +++ b/test/rpc/handlers/get-value.spec.js @@ -93,8 +93,8 @@ describe('rpc - handlers - GetValue', () => { const msg = new Message(T, key, 0) - dht._libp2p.peerStore.addressBook.add(other, []) - dht._libp2p.peerStore.keyBook.set(other, other.pubKey) + await dht._libp2p.peerStore.addressBook.add(other, []) + await dht._libp2p.peerStore.keyBook.set(other, other.pubKey) await dht._lan._routingTable.add(other) const response = await handler.handle(peerIds[0], msg) diff --git a/test/utils/test-dht.js b/test/utils/test-dht.js index f90ff1f1..e320a264 100644 --- a/test/utils/test-dht.js +++ b/test/utils/test-dht.js @@ -28,7 +28,8 @@ class TestDHT { const [peerId] = await createPeerId(1) const regRecord = {} - const peerStore = new PeerStore({ peerId }) + const datastore = new MemoryDatastore() + const peerStore = new PeerStore({ peerId, datastore }) options = { protocolPrefix: '/ipfs', @@ -80,7 +81,7 @@ class TestDHT { new Multiaddr('/ip4/85.3.31.0/tcp/4002') ], peerStore, - datastore: new MemoryDatastore(), + datastore, dialProtocol: (peer, protocol, options) => connectToPeer(dht, peer, protocol, options), registrar, handle: (protocol, fn) => { @@ -115,13 +116,13 @@ class TestDHT { }) // simulate libp2p._onDiscoveryPeer - dht.on('peer', (peerData) => { + dht.on('peer', async (peerData) => { if (peerData.id.toB58String() === peerId.toB58String()) { return } - peerData.multiaddrs && peerStore.addressBook.add(peerData.id, peerData.multiaddrs) - peerData.protocols && peerStore.protoBook.set(peerData.id, peerData.protocols) + peerData.multiaddrs && await peerStore.addressBook.add(peerData.id, peerData.multiaddrs) + peerData.protocols && await peerStore.protoBook.set(peerData.id, peerData.protocols) }) if (autoStart) { @@ -141,8 +142,8 @@ class TestDHT { const routingTableChecks = [] // Libp2p dial adds multiaddrs to the addressBook - dhtA._libp2p.peerStore.addressBook.add(dhtB._libp2p.peerId, dhtB._libp2p.multiaddrs) - dhtB._libp2p.peerStore.addressBook.add(dhtA._libp2p.peerId, dhtA._libp2p.multiaddrs) + await dhtA._libp2p.peerStore.addressBook.add(dhtB._libp2p.peerId, dhtB._libp2p.multiaddrs) + await dhtB._libp2p.peerStore.addressBook.add(dhtA._libp2p.peerId, dhtA._libp2p.multiaddrs) // Notice peers of connection if (!dhtB._clientMode) {