Skip to content
This repository was archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
chore: refactor tests
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Nov 14, 2019
1 parent 06f9b02 commit 767110f
Show file tree
Hide file tree
Showing 36 changed files with 1,136 additions and 1,538 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
"lodash": "^4.17.11",
"lodash.random": "^3.2.0",
"lodash.range": "^3.2.0",
"p-defer": "^3.0.0",
"p-each-series": "^2.1.0",
"p-map-series": "^2.1.0",
"p-retry": "^4.2.0",
Expand Down
9 changes: 4 additions & 5 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ const PeerId = require('peer-id')
const PeerInfo = require('peer-info')
const crypto = require('libp2p-crypto')

const promisify = require('promisify-es6')
const pFilter = require('p-filter')
const pTimeout = require('p-timeout')

Expand Down Expand Up @@ -241,7 +240,7 @@ class KadDHT extends EventEmitter {
* @param {Buffer} key
* @param {Object} [options] - get options
* @param {number} [options.timeout] - optional timeout (default: 60000)
* @returns {Promise<void>}
* @returns {Promise<{from: PeerId, val: Buffer}>}
*/
get (key, options = {}) {
options.timeout = options.timeout || c.minute
Expand Down Expand Up @@ -425,7 +424,7 @@ class KadDHT extends EventEmitter {
// try dht directly
const pkKey = utils.keyForPublicKey(peer)
const value = await this.get(pkKey)
pk = crypto.unmarshalPublicKey(value)
pk = crypto.keys.unmarshalPublicKey(value)
}

info.id = new PeerId(peer.id, null, pk)
Expand Down Expand Up @@ -468,15 +467,15 @@ class KadDHT extends EventEmitter {
// Add peer as provider
await this.providers.addProvider(key, this.peerInfo.id)

// Notice closest peers
// Notify closest peers
const peers = await this.getClosestPeers(key.buffer)
const msg = new Message(Message.TYPES.ADD_PROVIDER, key.buffer, 0)
msg.providerPeers = [this.peerInfo]

await Promise.all(peers.map(async (peer) => {
this._log('putProvider %s to %s', key.toBaseEncodedString(), peer.toB58String())
try {
await promisify(cb => this.network.sendMessage(peer, msg, cb))()
await this.network.sendMessage(peer, msg)
} catch (err) {
errors.push(err)
}
Expand Down
119 changes: 56 additions & 63 deletions src/network.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
'use strict'

const pull = require('pull-stream')
const timeout = require('async/timeout')
const pTimeout = require('p-timeout')
const lp = require('pull-length-prefixed')
const setImmediate = require('async/setImmediate')
const promisify = require('promisify-es6')

const errcode = require('err-code')
Expand Down Expand Up @@ -33,7 +32,7 @@ class Network {

/**
* Start the network.
* @async
* @returns {void}
*/
start () {
if (this._running) {
Expand All @@ -56,7 +55,6 @@ class Network {

/**
* Stop all network activity.
*
* @returns {void}
*/
stop () {
Expand Down Expand Up @@ -111,50 +109,40 @@ class Network {

/**
* Send a request and record RTT for latency measurements.
*
* @async
* @param {PeerId} to - The peer that should receive a message
* @param {Message} msg - The message to send.
* @param {function(Error, Message)} callback
* @returns {void}
* @returns {Promise<Message>}
*/
sendRequest (to, msg, callback) {
async sendRequest (to, msg) {
// TODO: record latency
if (!this.isConnected) {
return callback(errcode(new Error('Network is offline'), 'ERR_NETWORK_OFFLINE'))
throw errcode(new Error('Network is offline'), 'ERR_NETWORK_OFFLINE')
}

this._log('sending to: %s', to.toB58String())
this.dht.switch.dial(to, c.PROTOCOL_DHT, (err, conn) => {
if (err) {
return callback(err)
}

this._writeReadMessage(conn, msg.serialize(), callback)
})
const conn = await promisify(cb => this.dht.switch.dial(to, c.PROTOCOL_DHT, cb))()
return this._writeReadMessage(conn, msg.serialize())
}

/**
* Sends a message without expecting an answer.
*
* @param {PeerId} to
* @param {Message} msg
* @param {function(Error)} callback
* @returns {void}
* @returns {Promise<void>}
*/
sendMessage (to, msg, callback) {
async sendMessage (to, msg) {
if (!this.isConnected) {
return setImmediate(() => callback(errcode(new Error('Network is offline'), 'ERR_NETWORK_OFFLINE')))
throw errcode(new Error('Network is offline'), 'ERR_NETWORK_OFFLINE')
}

this._log('sending to: %s', to.toB58String())

this.dht.switch.dial(to, c.PROTOCOL_DHT, (err, conn) => {
if (err) {
return callback(err)
}

this._writeMessage(conn, msg.serialize(), callback)
})
const conn = await promisify(cb => this.dht.switch.dial(to, c.PROTOCOL_DHT, cb))()
return this._writeMessage(conn, msg.serialize())
}

/**
Expand All @@ -164,61 +152,66 @@ class Network {
*
* @param {Connection} conn - the connection to use
* @param {Buffer} msg - the message to send
* @param {function(Error, Message)} callback
* @returns {void}
* @returns {Message}
* @private
*/
_writeReadMessage (conn, msg, callback) {
timeout(
writeReadMessage,
_writeReadMessage (conn, msg) {
return pTimeout(
writeReadMessage(conn, msg),
this.readMessageTimeout
)(conn, msg, callback)
)
}

/**
* Write a message to the given connection.
*
* @param {Connection} conn - the connection to use
* @param {Buffer} msg - the message to send
* @param {function(Error)} callback
* @returns {void}
* @returns {Promise<void>}
* @private
*/
_writeMessage (conn, msg, callback) {
_writeMessage (conn, msg) {
return new Promise((resolve, reject) => {
pull(
pull.values([msg]),
lp.encode(),
conn,
pull.onEnd((err) => {
if (err) return reject(err)
resolve()
})
)
})
}
}

function writeReadMessage (conn, msg) {
return new Promise((resolve, reject) => {
pull(
pull.values([msg]),
lp.encode(),
conn,
pull.onEnd(callback)
pull.filter((msg) => msg.length < c.maxMessageSize),
lp.decode(),
pull.collect((err, res) => {
if (err) {
return reject(err)
}
if (res.length === 0) {
return reject(errcode(new Error('No message received'), 'ERR_NO_MESSAGE_RECEIVED'))
}

let response
try {
response = Message.deserialize(res[0])
} catch (err) {
return reject(errcode(err, 'ERR_FAILED_DESERIALIZE_RESPONSE'))
}

resolve(response)
})
)
}
}

function writeReadMessage (conn, msg, callback) {
pull(
pull.values([msg]),
lp.encode(),
conn,
pull.filter((msg) => msg.length < c.maxMessageSize),
lp.decode(),
pull.collect((err, res) => {
if (err) {
return callback(err)
}
if (res.length === 0) {
return callback(errcode(new Error('No message received'), 'ERR_NO_MESSAGE_RECEIVED'))
}

let response
try {
response = Message.deserialize(res[0])
} catch (err) {
return callback(errcode(err, 'ERR_FAILED_DESERIALIZE_RESPONSE'))
}

callback(null, response)
})
)
})
}

module.exports = Network
9 changes: 4 additions & 5 deletions src/private.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
const errcode = require('err-code')
const pTimeout = require('p-timeout')

const promisify = require('promisify-es6')
const PeerId = require('peer-id')
const libp2pRecord = require('libp2p-record')
const PeerInfo = require('peer-info')
Expand Down Expand Up @@ -179,7 +178,7 @@ module.exports = (dht) => ({
dht._log('_findPeerSingle %s', peer.toB58String())
const msg = new Message(Message.TYPES.FIND_NODE, target.id, 0)

return promisify(cb => dht.network.sendRequest(peer, msg, cb))()
return dht.network.sendRequest(peer, msg)
},

/**
Expand All @@ -197,7 +196,7 @@ module.exports = (dht) => ({
const msg = new Message(Message.TYPES.PUT_VALUE, key, 0)
msg.record = rec

const resp = await promisify(cb => dht.network.sendRequest(target, msg, cb))()
const resp = await dht.network.sendRequest(target, msg)

if (!resp.record.value.equals(Record.deserialize(rec).value)) {
throw errcode(new Error('value not put correctly'), 'ERR_PUT_VALUE_INVALID')
Expand Down Expand Up @@ -362,7 +361,7 @@ module.exports = (dht) => ({

async _getValueSingle (peer, key) { // eslint-disable-line require-await
const msg = new Message(Message.TYPES.GET_VALUE, key, 0)
return promisify(cb => dht.network.sendRequest(peer, msg, cb))()
return dht.network.sendRequest(peer, msg)
},

/**
Expand Down Expand Up @@ -500,6 +499,6 @@ module.exports = (dht) => ({
*/
async _findProvidersSingle (peer, key) { // eslint-disable-line require-await
const msg = new Message(Message.TYPES.GET_PROVIDERS, key.buffer, 0)
return promisify(cb => dht.network.sendRequest(peer, msg, cb))()
return dht.network.sendRequest(peer, msg)
}
})
9 changes: 4 additions & 5 deletions src/providers.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class Providers {
/**
* Release any resources.
*
* @returns {undefined}
* @returns {void}
*/
stop () {
if (this._cleaner) {
Expand All @@ -73,8 +73,7 @@ class Providers {
/**
* Check all providers if they are still valid, and if not delete them.
*
* @returns {Promise}
*
* @returns {Promise<void>}
* @private
*/
_cleanup () {
Expand Down Expand Up @@ -178,7 +177,7 @@ class Providers {
*
* @param {CID} cid
* @param {PeerId} provider
* @returns {Promise}
* @returns {Promise<void>}
*/
async addProvider (cid, provider) { // eslint-disable-line require-await
return this.syncQueue.add(async () => {
Expand Down Expand Up @@ -232,7 +231,7 @@ function makeProviderKey (cid) {
* @param {CID} cid
* @param {PeerId} peer
* @param {number} time
* @returns {Promise}
* @returns {Promise<void>}
*
* @private
*/
Expand Down
2 changes: 1 addition & 1 deletion src/routing.js
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class RoutingTable {
* Remove a given peer from the table.
*
* @param {PeerId} peer
* @returns {Promose<void>}
* @returns {Promise<void>}
*/
async remove (peer) {
const id = await utils.convertPeerId(peer)
Expand Down
19 changes: 6 additions & 13 deletions src/rpc/handlers/add-provider.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

const CID = require('cids')
const errcode = require('err-code')
const promiseToCallback = require('promise-to-callback')

const utils = require('../../utils')

Expand All @@ -13,26 +12,23 @@ module.exports = (dht) => {
*
* @param {PeerInfo} peer
* @param {Message} msg
* @param {function(Error)} callback
* @returns {undefined}
* @returns {Promise<void>}
*/
return function addProvider (peer, msg, callback) {
return async function addProvider (peer, msg) { // eslint-disable-line require-await
log('start')

if (!msg.key || msg.key.length === 0) {
return callback(errcode(new Error('Missing key'), 'ERR_MISSING_KEY'))
throw errcode(new Error('Missing key'), 'ERR_MISSING_KEY')
}

let cid
try {
cid = new CID(msg.key)
} catch (err) {
const errMsg = `Invalid CID: ${err.message}`

return callback(errcode(new Error(errMsg), 'ERR_INVALID_CID'))
throw errcode(new Error(errMsg), 'ERR_INVALID_CID')
}

let foundProvider = false
msg.providerPeers.forEach((pi) => {
// Ignore providers not from the originator
if (!pi.id.isEqual(peer.id)) {
Expand All @@ -48,9 +44,8 @@ module.exports = (dht) => {
log('received provider %s for %s (addrs %s)', peer.id.toB58String(), cid.toBaseEncodedString(), pi.multiaddrs.toArray().map((m) => m.toString()))

if (!dht._isSelf(pi.id)) {
foundProvider = true
dht.peerBook.put(pi)
promiseToCallback(dht.providers.addProvider(cid, pi.id))(err => callback(err))
return dht.providers.addProvider(cid, pi.id)
}
})

Expand All @@ -60,8 +55,6 @@ module.exports = (dht) => {
// we can't find any valid providers in the payload.
// https://github.com/libp2p/js-libp2p-kad-dht/pull/127
// https://github.com/libp2p/js-libp2p-kad-dht/issues/128
if (!foundProvider) {
promiseToCallback(dht.providers.addProvider(cid, peer.id))(err => callback(err))
}
return dht.providers.addProvider(cid, peer.id)
}
}
Loading

0 comments on commit 767110f

Please sign in to comment.