Skip to content

Commit

Permalink
Merge pull request #496 from f3rno/master
Browse files Browse the repository at this point in the history
v2.0.9
  • Loading branch information
prdn authored Sep 9, 2019
2 parents 148c2b6 + 7400560 commit dcfbace
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 13 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
2.0.9

- WS2Manager: add managedUnsubscribe()
- WS2Manager: add close()
- WS2Manager: add getAuthenticatedSocket()
- WSv2: add suppport for liquidations feed (status methods)
- WSv2: add reconnect throttler in case of connection reset w/ many open sockets

2.0.8

- Bump dependency versions

2.0.7

- WSv2: increase data chan limit to 30 (732499b)
Expand Down
18 changes: 18 additions & 0 deletions examples/ws2/liquidations.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
'use strict'

process.env.DEBUG = '*'

const debug = require('debug')('bfx:examples:liquidations')
const bfx = require('../bfx')
const ws = bfx.ws(2, { transform: true })

ws.on('open', () => {
debug('open')
ws.subscribeStatus('liq:global')
})

ws.onStatus({ key: 'liq:global' }, (data) => {
console.log(data)
})

ws.open()
66 changes: 61 additions & 5 deletions lib/transports/ws2.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class WSv2 extends EventEmitter {
* @param {boolean} opts.seqAudit - enable sequence numbers & verification
* @param {boolean} opts.autoReconnect - if true, we will reconnect on close
* @param {number} opts.reconnectDelay - optional, defaults to 1000 (ms)
* @param {PromiseThrottle} opts.reconnectThrottler - optional pt to limit reconnect freq
* @param {number} opts.packetWDDelay - watch-dog forced reconnection delay
*/
constructor (opts = { apiKey: '', apiSecret: '', url: WS_URL }) {
Expand All @@ -88,6 +89,7 @@ class WSv2 extends EventEmitter {
this._seqAudit = opts.seqAudit === true
this._autoReconnect = opts.autoReconnect === true
this._reconnectDelay = opts.reconnectDelay || 1000
this._reconnectThrottler = opts.reconnectThrottler
this._manageOrderBooks = opts.manageOrderBooks === true
this._manageCandles = opts.manageCandles === true
this._packetWDDelay = opts.packetWDDelay
Expand Down Expand Up @@ -148,6 +150,11 @@ class WSv2 extends EventEmitter {
return _includes(Object.keys(this._channelMap), chanId)
}

hasSubscriptionRef (channel, identifier) {
const key = `${channel}:${identifier}`
return !!Object.keys(this._subscriptionRefs).find(ref => ref === key)
}

getDataChannelId (type, filter) {
return Object
.keys(this._channelMap)
Expand Down Expand Up @@ -468,10 +475,19 @@ class WSv2 extends EventEmitter {
// _autoReconnect = true - if the user likes to reconnect automatically
if (this._isReconnecting || (this._autoReconnect && !this._isClosing)) {
this._prevChannelMap = this._channelMap

setTimeout(() => {
this.reconnectAfterClose().catch((err) => {
debug('error reconnectAfterClose: %s', err.stack)
})
if (this._reconnectThrottler) {
this._reconnectThrottler
.add(this.reconnectAfterClose.bind(this))
.catch((err) => {
debug('error reconnectAfterClose: %s', err.stack)
})
} else {
this.reconnectAfterClose().catch((err) => {
debug('error reconnectAfterClose: %s', err.stack)
})
}
}, this._reconnectDelay)
}

Expand All @@ -485,7 +501,7 @@ class WSv2 extends EventEmitter {
_onWSError (err) {
this.emit('error', err)

debug('error: %j', err)
debug('error: %s', err)
}

/**
Expand Down Expand Up @@ -595,6 +611,8 @@ class WSv2 extends EventEmitter {
return this._handleTickerMessage(msg, channelData)
} else if (channelData.channel === 'candles') {
return this._handleCandleMessage(msg, channelData)
} else if (channelData.channel === 'status') {
return this._handleStatusMessage(msg, channelData)
} else if (channelData.channel === 'auth') {
return this._handleAuthMessage(msg, channelData)
} else {
Expand Down Expand Up @@ -812,6 +830,24 @@ class WSv2 extends EventEmitter {
this.emit('candle', data, key)
}

/**
* Called for messages from a 'status' channel.
*
* @param {Array|Array[]} msg
* @param {Object} chanData - entry from _channelMap
* @private
*/
_handleStatusMessage (msg, chanData) {
const { key } = chanData
const data = getMessagePayload(msg)

const internalMessage = [chanData.chanId, 'status', data]
internalMessage.filterOverride = [chanData.key]

this._propagateMessageToListeners(internalMessage, chanData, false)
this.emit('status', data, key)
}

/**
* @param {string} symbol
* @param {number[]|number[][]} data
Expand Down Expand Up @@ -942,7 +978,7 @@ class WSv2 extends EventEmitter {
listeners.forEach(({ cb, modelClass }) => {
const ModelClass = modelClass

if (!transform || data.length === 0) {
if (!ModelClass || !transform || data.length === 0) {
cb(data, chanData)
} else if (Array.isArray(data[0])) {
cb(data.map((entry) => {
Expand Down Expand Up @@ -1375,6 +1411,14 @@ class WSv2 extends EventEmitter {
return this.managedSubscribe('candles', key, { key })
}

/**
* @param {string} key - 'liq:global'
* @return {boolean} subscribed
*/
subscribeStatus (key) {
return this.managedSubscribe('status', key, { key })
}

/**
* @param {number} chanId
*/
Expand Down Expand Up @@ -1420,6 +1464,14 @@ class WSv2 extends EventEmitter {
return this.managedUnsubscribe('candles', `trade:${frame}:${symbol}`)
}

/**
* @param {string} key
* @return {boolean} unsubscribed
*/
unsubscribeStatus (key) {
return this.managedUnsubscribe('status', key)
}

/**
* @param {string} cbGID
*/
Expand Down Expand Up @@ -1836,6 +1888,10 @@ class WSv2 extends EventEmitter {
this._registerListener('ticker', { 0: symbol }, m, cbGID, cb)
}

onStatus ({ key = '', cbGID } = {}, cb) {
this._registerListener('status', { 0: key }, null, cbGID, cb)
}

/**
* @param {Object} opts
* @param {string} opts.symbol
Expand Down
54 changes: 48 additions & 6 deletions lib/ws2_manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@ const debug = require('debug')('bfx:ws2:manager')
const _isEqual = require('lodash/isEqual')
const _includes = require('lodash/includes')
const _pick = require('lodash/pick')
const PromiseThrottle = require('promise-throttle')
const WSv2 = require('./transports/ws2')

const DATA_CHANNEL_LIMIT = 30
const reconnectThrottler = new PromiseThrottle({
requestsPerSecond: 10 / 60.0,
promiseImplementation: Promise
})

/**
* Provides a wrapper around the WSv2 class, opening new sockets when a
Expand All @@ -27,9 +32,20 @@ module.exports = class WS2Manager extends EventEmitter {
super()

this.setMaxListeners(1000)
this._socketArgs = socketArgs || {}

this._authArgs = authArgs
this._sockets = []
this._socketArgs = {
...(socketArgs || {}),
reconnectThrottler
}
}

/**
* Closes all open sockets
*/
close () {
this._sockets.forEach(socket => socket.ws.close())

This comment has been minimized.

Copy link
@vigan-abd

vigan-abd Sep 17, 2019

Contributor

@f3rno @prdn
In some cases ws may not be open and it will throw an error there, faced this issue locally.
I think that the line should check if the socket is open first:

this._sockets.forEach(socket => socket.ws._isOpen ? socket.ws.close() : null)
}

/**
Expand Down Expand Up @@ -144,7 +160,7 @@ module.exports = class WS2Manager extends EventEmitter {

const { chanId } = msg
const i = wsState.pendingUnsubscriptions.findIndex(cid => (
cid === chanId
cid === `${chanId}`
))

if (i === -1) {
Expand Down Expand Up @@ -177,6 +193,10 @@ module.exports = class WS2Manager extends EventEmitter {
return wsState
}

getAuthenticatedSocket () {
return this._sockets.find(s => s.ws.isAuthenticated())
}

/**
* Returns the first socket that has less active/pending channels than the
* DATA_CHANNEL_LIMIT
Expand Down Expand Up @@ -233,6 +253,15 @@ module.exports = class WS2Manager extends EventEmitter {
})
}

/**
* @param {string} channel
* @param {string} identifier
* @return {Object} wsState - undefined if not found
*/
getSocketWithSubRef (channel, identifier) {
return this._sockets.find(s => s.ws.hasSubscriptionRef(channel, identifier))
}

/**
* Calls the provided cb with all internal socket instances
*
Expand All @@ -254,14 +283,14 @@ module.exports = class WS2Manager extends EventEmitter {
*/
subscribe (type, ident, filter) {
let s = this.getFreeDataSocket()
const doSub = () => {
s.ws.managedSubscribe(type, ident, filter)
}

if (!s) {
s = this.openSocket()
}

const doSub = () => {
s.ws.managedSubscribe(type, ident, filter)
}

if (!s.ws.isOpen()) {
s.ws.once('open', doSub)
} else {
Expand All @@ -271,6 +300,19 @@ module.exports = class WS2Manager extends EventEmitter {
s.pendingSubscriptions.push([type, filter])
}

managedUnsubscribe (channel, identifier) {
const s = this.getSocketWithSubRef(channel, identifier)

if (!s) {
debug('cannot unsub from unknown channel %s: %s', channel, identifier)
return
}

const chanId = s.ws._chanIdByIdentifier(channel, identifier)
s.ws.managedUnsubscribe(channel, identifier)
s.pendingUnsubscriptions.push(chanId)
}

/**
* Unsubscribes the first socket w/ the specified channel. Does nothing if no
* such socket is found.
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "bitfinex-api-node",
"version": "2.0.8",
"version": "2.0.9",
"description": "Node reference library for Bitfinex API",
"engines": {
"node": ">=7"
Expand Down Expand Up @@ -79,6 +79,7 @@
"lodash": "^4.17.4",
"lodash.throttle": "^4.1.1",
"p-iteration": "^1.1.8",
"promise-throttle": "^1.0.1",
"request": "^2.67.0",
"request-promise": "^4.2.0",
"ws": "^3.0.0"
Expand Down
2 changes: 1 addition & 1 deletion test/lib/ws2_manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ describe('WS2Manager', () => {
const m = new WS2Manager()
const s = m.openSocket()

s.pendingUnsubscriptions.push(42)
s.pendingUnsubscriptions.push(`${42}`)
s.ws.emit('unsubscribed', { chanId: 42 })

assert.strictEqual(s.pendingUnsubscriptions.length, 0)
Expand Down

0 comments on commit dcfbace

Please sign in to comment.