diff --git a/api/ws/server.js b/api/ws/server.js new file mode 100644 index 00000000..fc0d1810 --- /dev/null +++ b/api/ws/server.js @@ -0,0 +1,65 @@ +const { Server } = require('socket.io'); +const Peer = require('../../logic/peer'); + +function WebSocketServer(server, appConfig) { + this.io = new Server(server, { + allowEIO3: true, + cors: appConfig.cors, + }); + + this.enabled = appConfig.wsNode.enabled; + this.max = appConfig.wsNode.maxConnections; +} + +WebSocketServer.prototype.linkPeers = function (logic) { + if (!this.enabled) { + return; + } + + this.io.on('connection', (socket) => { + const peerIp = socket.handshake.address || socket.request.socket.remoteAddress; + const { nonce } = socket.handshake.auth; + + if (!nonce) { + socket.disconnect(true); + return; + } + + const existingPeer = logic.peers.getByNonce(nonce); + + // Handle IPv6-mapped IPv4 addresses + const normalizeIp = (ip) => ip.replace(/^::ffff:/, ''); + + if ( + !existingPeer + || normalizeIp(peerIp) !== normalizeIp(existingPeer.ip) + || existingPeer.state === Peer.STATE.BANNED + ) { + socket.disconnect(true); + return; + } + + if (logic.peers.getSocketCount() >= this.max) { + socket.disconnect(true); + return; + } + + existingPeer.isBroadcastingViaSocket = true; + + socket.on('disconnect', () => { + const disconnectedPeer = logic.peers.getByNonce(nonce); + + if (disconnectedPeer) { + disconnectedPeer.isBroadcastingViaSocket = false; + } + }); + }); +}; + +WebSocketServer.prototype.emit = function (eventName, data) { + if (this.enabled) { + this.io.sockets.emit(eventName, data); + } +} + +module.exports = WebSocketServer; diff --git a/api/ws/transport.js b/api/ws/transport.js new file mode 100644 index 00000000..93fc6971 --- /dev/null +++ b/api/ws/transport.js @@ -0,0 +1,264 @@ +'use strict'; + +const { io } = require('socket.io-client'); +const Peer = require('../../logic/peer.js'); + +const maxReconnectDelay = 60000; +const defaultReconnectionDelay = 5000; + +function TransportWsApi(modules, library, options) { + this.modules = modules; + this.library = library; + this.peers = modules.peers; + this.system = modules.system; + this.transportModule = modules.transport; + this.logger = library.logger; + + this.maxConnections = options.maxWsConnections; + this.reconnectionDelay = defaultReconnectionDelay; + + this.connections = new Map(); + + this.peers.events.on('peers:update', () => this.updatePeers()); + + this.startRotation(); +} + +TransportWsApi.prototype.initialize = function() { + const self = this; + + // Clear existing connections + self.connections.forEach((socket) => { + socket.removeAllListeners(); + socket.disconnect(); + }); + self.connections.clear(); + + // Connect to multiple peers + self.getRandomPeers(self.maxConnections, (err, peers) => { + if (err || !peers.length) { + return self.scheduleReconnect(); + } + + this.reconnectionDelay = defaultReconnectionDelay; + + peers.forEach((peer) => self.connectToPeer(peer)); + }); +}; + +TransportWsApi.prototype.connectToPeer = function(peer) { + const self = this; + const peerUrl = `ws://${peer.ip}:${peer.port}`; + + if (this.connections.has(peerUrl)) { + return; + } + + self.logger.debug(`Connecting to WebSocket peer: ${peerUrl}`); + + const socket = io(peerUrl, { + reconnection: false, + transports: ['websocket'], + auth: { + nonce: this.system.getNonce(), + }, + }); + + socket.on('connect', () => self.handleConnect(socket, peer)); + socket.on('connect_error', (err) => self.handleConnectError(peer, err)); + socket.on('disconnect', (reason) => self.handleDisconnect(peer, reason)); + + self.connections.set(peerUrl, { socket, peer }); +}; + +TransportWsApi.prototype.handleConnect = function(socket, peer) { + this.logger.debug(`WebSocket: Connected to peer WebSocket at ${peer.ip}:${peer.port}`); + + this.peers.switchToWs(peer); + this.peers.recordRequest(peer.ip, peer.port, null); + + this.setupEventHandlers(socket, peer); +}; + +TransportWsApi.prototype.handleConnectError = function(peer, err) { + this.logger.debug(`WebSocket: Connection error with ${peer.ip}:${peer.port}`, err.message); + + this.peers.switchToHttp(peer); + this.peers.recordRequest(peer.ip, peer.port, err); + + this.replacePeer(peer); +}; + +TransportWsApi.prototype.handleDisconnect = function(peer, reason) { + this.logger.debug(`WebSocket: Disconnected from ${peer.ip}:${peer.port}`, reason); + this.peers.switchToHttp(peer); + this.replacePeer(peer); +}; + +TransportWsApi.prototype.replacePeer = function(peer) { + const self = this; + + // Remove the disconnected peer + self.connections.delete(`ws://${peer.ip}:${peer.port}`); + + // Find a new peer to replace it + self.getRandomPeer((err, newPeer) => { + if (err || !newPeer) { + self.logger.debug('WebSocket: Failed to find replacement peer'); + return; + } + + self.connectToPeer(newPeer); + }); +}; + +TransportWsApi.prototype.scheduleReconnect = function() { + if (this.reconnectTimeout) { + clearTimeout(this.reconnectTimeout); + } + + this.reconnectTimeout = setTimeout(() => { + this.initialize(); + }, this.reconnectionDelay); + + this.reconnectionDelay = Math.min( + this.reconnectionDelay * 2, + maxReconnectDelay + ); +}; + +TransportWsApi.prototype.getRandomPeers = function(limit, callback) { + this.peers.list({ + limit, + allowedStates: [Peer.STATE.CONNECTED], + syncProtocol: 'http', + broadhash: this.modules.system.getBroadhash() + }, callback); +}; + +TransportWsApi.prototype.getRandomPeer = function(callback) { + this.getRandomPeers(1, (err, peers) => { + if (err || !peers.length) { + return callback(err || new Error('No peers available')); + } + callback(null, peers[0]); + }); +}; + +TransportWsApi.prototype.setupEventHandlers = function(socket, peer) { + const self = this; + + socket.on('transactions/change', (data) => { + self.transportModule.internal.postTransactions({ + transaction: data + }, peer, 'websocket /transactions', (err) => { + if (err) { + self.peers.recordRequest(peer.ip, peer.port, err); + } + }); + }); + + socket.on('blocks/change', (data) => { + self.transportModule.internal.postBlock(data, peer, 'websocket /blocks', (err) => { + if (err) { + self.peers.recordRequest(peer.ip, peer.port, err); + } + }); + }); + + socket.on('signature/change', (data) => { + self.transportModule.internal.postSignatures({ + signature: data + }, (err) => { + if (err) { + self.peers.recordRequest(peer.ip, peer.port, err); + } + }); + }); +}; + +TransportWsApi.prototype.updatePeers = function() { + const self = this; + + this.connections.forEach(({ peer }) => { + if (self.peers.isBanned(peer)) { + self.cleanupConnection(peer); + } + }); + + const availableSlots = this.maxConnections - this.connections.size; + + if (availableSlots <= 0) { + return; + } + + this.getRandomPeers(availableSlots, (err, candidates) => { + if (err || !candidates.length) { + return; + } + + candidates.forEach(peer => { + self.connectToPeer(peer); + }); + }); +}; + +TransportWsApi.prototype.cleanupConnection = function(peer) { + const peerUrl = `ws://${peer.ip}:${peer.port}`; + const connection = this.connections.get(peerUrl); + + if (connection) { + connection.socket.removeAllListeners(); + connection.socket.disconnect(); + this.connections.delete(peerUrl); + } +}; + +TransportWsApi.prototype.rotatePeers = function () { + const self = this; + + const totalConnections = self.connections.size; + + if (totalConnections === 0) { + return; + } + + const countToRotate = Math.ceil(totalConnections * 0.2); // rotate 20% + const connectionsArray = Array.from(self.connections.values()); + + const shuffled = connectionsArray.sort(() => Math.random() - 0.5); + + self.getRandomPeers(countToRotate, (err, newPeers) => { + if (err || !newPeers.length) { + return; + } + + self.logger.debug(`Rotating ${newPeers.length} out of ${totalConnections} peers.`); + + const peersToRotate = shuffled.slice(0, newPeers.length).map((connection) => connection.peer); + + peersToRotate.forEach(peer => { + self.logger.debug(`Rotating peer ${peer.ip}:${peer.port}`); + self.cleanupConnection(peer); + }); + + newPeers.forEach(newPeer => { + self.connectToPeer(newPeer); + }); + }); +}; + +TransportWsApi.prototype.startRotation = function() { + this.rotationInterval = setInterval(() => { + this.rotatePeers(); + }, 1000 * 60 * 30); // 30 minutes +}; + +TransportWsApi.prototype.stopRotation = function() { + if (this.rotationInterval) { + clearInterval(this.rotationInterval); + this.rotationInterval = null; + } +}; + +module.exports = TransportWsApi; diff --git a/app.js b/app.js index f96dfe87..91c815cc 100644 --- a/app.js +++ b/app.js @@ -38,6 +38,8 @@ var httpApi = require('./helpers/httpApi.js'); var Sequence = require('./helpers/sequence.js'); var util = require('util'); var z_schema = require('./helpers/z_schema.js'); +const TransportWsApi = require('./api/ws/transport.js'); +const WebSocketServer = require('./api/ws/server.js'); process.stdin.resume(); var versionBuild = fs.readFileSync(path.join(__dirname, 'build'), 'utf8'); @@ -326,11 +328,7 @@ d.run(function () { var server = require('http').createServer(app); - const { Server } = require('socket.io'); - const io = new Server(server, { - allowEIO3: true, - cors: appConfig.cors - }); + const wsServer = new WebSocketServer(server, appConfig); var privateKey, certificate, https, https_io; @@ -354,7 +352,7 @@ d.run(function () { express: express, app: app, server: server, - io: io, + wsServer, https: https, https_io: https_io }); @@ -606,6 +604,21 @@ d.run(function () { }); }], + /** + * Listens for new transacttions using websocket and links peers to the websocket server + */ + transportWs: ['network', 'config', 'modules', 'logic', function (scope, cb) { + const { options } = appConfig.peers; + if (options.maxWsConnections > 0) { + const transportWs = new TransportWsApi(scope.modules, scope.logic, appConfig.peers.options); + transportWs.initialize(); + } + + scope.network.wsServer.linkPeers(scope.logic) + + cb(); + }], + /** * Loads api from `api` folder using `config.api`, once modules, logger and * network are completed. diff --git a/config.default.json b/config.default.json index aa43fd64..4b28b694 100644 --- a/config.default.json +++ b/config.default.json @@ -77,6 +77,7 @@ "blackList": [] }, "options": { + "maxWsConnections": 15, "limits": { "max": 0, "delayMs": 0, @@ -127,5 +128,9 @@ "portWS": 36668, "enabled": true }, + "wsNode": { + "enabled": true, + "maxConnections": 25 + }, "nethash": "bd330166898377fb28743ceef5e43a5d9d0a3efd9b3451fb7bc53530bb0a6d64" } diff --git a/logic/broadcaster.js b/logic/broadcaster.js index fea686c0..57e7f952 100644 --- a/logic/broadcaster.js +++ b/logic/broadcaster.js @@ -92,7 +92,7 @@ Broadcaster.prototype.bind = function (peers, transport, transactions) { }; /** - * Calls peers.list function to get peers. + * Calls peers.list function to get peers and removes peers that are connected using WebSocket. * @implements {modules.peers.list} * @param {Object} params * @param {function} cb @@ -114,7 +114,9 @@ Broadcaster.prototype.getPeers = function (params, cb) { self.consensus = consensus; } - return setImmediate(cb, null, peers); + const httpApiPeers = peers.filter((peer) => !peer.isSocket); + + return setImmediate(cb, null, httpApiPeers); }); }; diff --git a/logic/dapp.js b/logic/dapp.js index a699d6e1..9187bd40 100644 --- a/logic/dapp.js +++ b/logic/dapp.js @@ -454,14 +454,14 @@ DApp.prototype.dbSave = function (trs) { /** * Emits 'dapps/change' signal. - * @implements {library.network.io.sockets} + * @implements {library.network.wsServer} * @param {transaction} trs * @param {function} cb * @return {setImmediateCallback} cb */ DApp.prototype.afterSave = function (trs, cb) { if (library) { - library.network.io.sockets.emit('dapps/change', {}); + library.network.wsServer.emit('dapps/change', {}); } return setImmediate(cb); }; diff --git a/logic/multisignature.js b/logic/multisignature.js index fe83a717..acc89e29 100644 --- a/logic/multisignature.js +++ b/logic/multisignature.js @@ -447,7 +447,7 @@ Multisignature.prototype.dbSave = function (trs) { * @return {setImmediateCallback} cb */ Multisignature.prototype.afterSave = function (trs, cb) { - library.network.io.sockets.emit('multisignatures/change', trs); + library.network.wsServer.emit('multisignatures/change', trs); return setImmediate(cb); }; diff --git a/logic/peer.js b/logic/peer.js index c43a2f6b..a49bcb2e 100644 --- a/logic/peer.js +++ b/logic/peer.js @@ -15,7 +15,7 @@ const SUCCESS_RATE_POOL_SIZE = 25; * @return calls accept method */ // Constructor -function Peer (peer) { +function Peer(peer) { return this.accept(peer || {}); } @@ -46,13 +46,17 @@ Peer.prototype.properties = [ 'height', 'clock', 'updated', - 'nonce' + 'nonce', + 'isBroadcastingViaSocket', + 'syncProtocol' ]; Peer.prototype.immutable = [ 'ip', 'port', - 'string' + 'string', + 'isBroadcastingViaSocket', + 'syncProtocol' ]; Peer.prototype.headers = [ @@ -74,6 +78,11 @@ Peer.prototype.nullable = [ 'updated' ]; +Peer.prototype.defaultValues = { + isBroadcastingViaSocket: false, + syncProtocol: 'http' +}; + /** * Amount of success requests for the last 25 tries */ @@ -218,7 +227,8 @@ Peer.prototype.object = function () { var copy = {}; _.each(this.properties, function (key) { - copy[key] = this[key]; + const defaultValue = this.defaultValues[key]; + copy[key] = this[key] ?? defaultValue; }.bind(this)); _.each(this.nullable, function (key) { diff --git a/logic/peers.js b/logic/peers.js index efc1d572..774b2107 100644 --- a/logic/peers.js +++ b/logic/peers.js @@ -29,6 +29,18 @@ function Peers (logger, cb) { return setImmediate(cb, null, this); } +/** + * Finds the peer by nonce + * @param {string} nonce + */ +Peers.prototype.getByNonce = function (nonce) { + for (const [peerString, peer] of Object.entries(__private.peers)) { + if (peer.nonce === nonce) { + return __private.peers[peerString]; + } + } +} + /** * Returns a peer instance. * @param {peer} peer @@ -87,9 +99,12 @@ Peers.prototype.upsert = function (peer, insertOnly) { var update = function (peer) { peer.updated = Date.now(); + const existingPeer = __private.peers[peer.string]; + var diff = {}; _.each(peer, function (value, key) { - if (key !== 'updated' && __private.peers[peer.string][key] !== value) { + const isImmutableProperty = existingPeer.immutable.includes(key); + if (key !== 'updated' && !isImmutableProperty && existingPeer[key] !== value) { diff[key] = value; } }); @@ -200,6 +215,14 @@ Peers.prototype.list = function (normalize) { } }; +/** + * Returns amount of peers that are connected via socket + * @returns {number} + */ +Peers.prototype.getSocketCount = function () { + return Object.values(__private.peers).reduce((acc, peer) => acc + (peer.isBroadcastingViaSocket ? 1 : 0), 0); +} + // Public methods /** * Modules are not required in this file. diff --git a/modules/dapps.js b/modules/dapps.js index 497f3ea4..99d9617c 100644 --- a/modules/dapps.js +++ b/modules/dapps.js @@ -1298,14 +1298,14 @@ DApps.prototype.internal = { }); }); } else { - library.network.io.sockets.emit('dapps/change', dapp); + library.network.wsServer.emit('dapps/change', dapp); __private.loading[params.id] = false; return setImmediate(cb, null, { success: true, path: dappPath }); } }); } else { - library.network.io.sockets.emit('dapps/change', dapp); + library.network.wsServer.emit('dapps/change', dapp); __private.loading[params.id] = false; return setImmediate(cb, null, { success: true, path: dappPath }); @@ -1346,7 +1346,7 @@ DApps.prototype.internal = { if (err) { return setImmediate(cb, null, { success: false, error: err }); } else { - library.network.io.sockets.emit('dapps/change', dapp); + library.network.wsServer.emit('dapps/change', dapp); return setImmediate(cb, null, { success: true }); } @@ -1360,7 +1360,7 @@ DApps.prototype.internal = { if (err) { return setImmediate(cb, null, { success: false, error: err }); } else { - library.network.io.sockets.emit('dapps/change', dapp); + library.network.wsServer.emit('dapps/change', dapp); return setImmediate(cb, null, { success: true }); } @@ -1378,7 +1378,7 @@ DApps.prototype.internal = { if (err) { return setImmediate(cb, null, { 'success': false, 'error': err }); } else { - library.network.io.sockets.emit('dapps/change', {}); + library.network.wsServer.emit('dapps/change', {}); return setImmediate(cb, null, { 'success': true }); } }); @@ -1440,7 +1440,7 @@ DApps.prototype.internal = { library.logger.error(err); return setImmediate(cb, 'Failed to stop application'); } else { - library.network.io.sockets.emit('dapps/change', dapp); + library.network.wsServer.emit('dapps/change', dapp); __private.launched[params.id] = false; return setImmediate(cb, null, { success: true }); } diff --git a/modules/delegates.js b/modules/delegates.js index 7a6ceb0a..dcc04037 100644 --- a/modules/delegates.js +++ b/modules/delegates.js @@ -531,7 +531,7 @@ Delegates.prototype.fork = function (block, cause) { }; library.db.none(sql.insertFork, fork).then(function () { - library.network.io.sockets.emit('delegates/fork', fork); + library.network.wsServer.emit('delegates/fork', fork); }); }; diff --git a/modules/loader.js b/modules/loader.js index 6cdf9918..76e8b1b4 100644 --- a/modules/loader.js +++ b/modules/loader.js @@ -81,7 +81,7 @@ __private.initialize = function () { * or Sync trigger by sending a socket signal with 'loader/sync' and setting * next sync with 1000 milliseconds. * @private - * @implements {library.network.io.sockets.emit} + * @implements {library.network.wsServer.emit} * @implements {modules.blocks.lastBlock.get} * @param {boolean} turnOn * @emits loader/sync @@ -96,7 +96,7 @@ __private.syncTrigger = function (turnOn) { library.logger.trace('Setting sync interval'); setImmediate(function nextSyncTrigger () { library.logger.trace('Sync trigger'); - library.network.io.sockets.emit('loader/sync', { + library.network.wsServer.emit('loader/sync', { blocks: __private.blocksToSync, height: modules.blocks.lastBlock.get().height }); diff --git a/modules/multisignatures.js b/modules/multisignatures.js index 4eb23a64..ae0d8cf9 100644 --- a/modules/multisignatures.js +++ b/modules/multisignatures.js @@ -160,7 +160,7 @@ Multisignatures.prototype.processSignature = function (tx, cb) { return setImmediate(cb, 'Failed to verify signature'); } - library.network.io.sockets.emit('multisignatures/signature/change', transaction); + library.network.wsServer.emit('multisignatures/signature/change', transaction); return done(cb); }); } @@ -478,7 +478,7 @@ Multisignatures.prototype.shared = { transaction.ready = Multisignature.prototype.ready(transaction, scope.sender); library.bus.message('signature', { transaction: transaction.id, signature: scope.signature }, true); - library.network.io.sockets.emit('multisignatures/signature/change', transaction); + library.network.wsServer.emit('multisignatures/signature/change', transaction); return setImmediate(cb, null, { transactionId: transaction.id }); }); @@ -554,7 +554,7 @@ Multisignatures.prototype.shared = { if (err) { return setImmediate(cb, err); } else { - library.network.io.sockets.emit('multisignatures/change', scope.transaction); + library.network.wsServer.emit('multisignatures/change', scope.transaction); return setImmediate(cb, null, { transactionId: scope.transaction.id }); } }); diff --git a/modules/peers.js b/modules/peers.js index 781c6d99..6470b0cf 100644 --- a/modules/peers.js +++ b/modules/peers.js @@ -1,5 +1,7 @@ 'use strict'; +const EventEmitter = require('node:events'); + var _ = require('lodash'); var async = require('async'); var constants = require('../helpers/constants.js'); @@ -47,6 +49,8 @@ function Peers (cb, scope) { }; self = this; + self.events = new EventEmitter(); + setImmediate(cb, null, self); } @@ -299,6 +303,28 @@ Peers.prototype.update = function (peer) { return library.logic.peers.upsert(peer); }; +/** + * Changes the connection type to ws + * @param {Peer} peer + */ +Peers.prototype.switchToWs = function (peer) { + const existingPeer = library.logic.peers.get(peer); + if (existingPeer) { + existingPeer.syncProtocol = 'ws'; + } +}; + +/** + * Changes the connection type to http + * @param {Peer} peer + */ +Peers.prototype.switchToHttp = function (peer) { + const existingPeer = library.logic.peers.get(peer); + if (existingPeer) { + existingPeer.syncProtocol = 'http'; + } +}; + /** * Returns whether the peer is in config peers list * @param {string} ip @@ -452,6 +478,14 @@ Peers.prototype.acceptable = function (peers) { }).value(); }; +/** + * Returns true if the peer's state is banned + * @param {Peer} peer + */ +Peers.prototype.isBanned = function (peer) { + return library.logic.peers.get(peer).state === Peer.STATE.BANNED; +}; + /** * Gets peers list and calculated consensus. * @param {Object} options - Contains limit, broadhash. @@ -474,6 +508,10 @@ Peers.prototype.list = function (options, cb) { found = peersList.length; // Apply filters peersList = peersList.filter(function (peer) { + if (options.syncProtocol && peer.syncProtocol !== options.syncProtocol) { + return false; + } + if (options.broadhash) { // Skip banned and disconnected peers (state 0 and 1) return options.allowedStates.indexOf(peer.state) !== -1 && ( @@ -598,6 +636,9 @@ Peers.prototype.onPeersReady = function () { } }, function () { library.logger.trace('Peers updated', { updated: updated, total: peers.length }); + if (updated) { + self.events.emit('peers:update'); + } return setImmediate(seriesCb); }); } diff --git a/modules/rounds.js b/modules/rounds.js index ce702fa0..19b36f58 100644 --- a/modules/rounds.js +++ b/modules/rounds.js @@ -330,12 +330,12 @@ Rounds.prototype.onBlockchainReady = function () { /** * Emits a 'rounds/change' socket message. - * @implements {library.network.io.sockets.emit} + * @implements {library.network.wsServer.emit} * @param {number} round * @emits rounds/change */ Rounds.prototype.onFinishRound = function (round) { - library.network.io.sockets.emit('rounds/change', { number: round }); + library.network.wsServer.emit('rounds/change', { number: round }); }; /** diff --git a/modules/transport.js b/modules/transport.js index bd3d75dd..e5c61d7e 100644 --- a/modules/transport.js +++ b/modules/transport.js @@ -474,7 +474,7 @@ Transport.prototype.onBlockchainReady = function () { * Calls enqueue signatures and emits a 'signature/change' socket message. * @implements {Broadcaster.maxRelays} * @implements {Broadcaster.enqueue} - * @implements {library.network.io.sockets.emit} + * @implements {library.network.wsServer.emit} * @param {signature} signature * @param {Object} broadcast * @emits signature/change @@ -482,7 +482,7 @@ Transport.prototype.onBlockchainReady = function () { Transport.prototype.onSignature = function (signature, broadcast) { if (broadcast && !__private.broadcaster.maxRelays(signature)) { __private.broadcaster.enqueue({}, { api: '/signatures', data: { signature: signature }, method: 'POST' }); - library.network.io.sockets.emit('signature/change', signature); + library.network.wsServer.emit('signature/change', signature); } }; @@ -490,7 +490,7 @@ Transport.prototype.onSignature = function (signature, broadcast) { * Calls enqueue transactions and emits a 'transactions/change' socket message. * @implements {Broadcaster.maxRelays} * @implements {Broadcaster.enqueue} - * @implements {library.network.io.sockets.emit} + * @implements {library.network.wsServer.emit} * @param {transaction} transaction * @param {Object} broadcast * @emits transactions/change @@ -498,7 +498,7 @@ Transport.prototype.onSignature = function (signature, broadcast) { Transport.prototype.onUnconfirmedTransaction = function (transaction, broadcast) { if (broadcast && !__private.broadcaster.maxRelays(transaction)) { __private.broadcaster.enqueue({}, { api: '/transactions', data: { transaction: transaction }, method: 'POST' }); - library.network.io.sockets.emit('transactions/change', transaction); + library.network.wsServer.emit('transactions/change', transaction); } }; @@ -507,7 +507,7 @@ Transport.prototype.onUnconfirmedTransaction = function (transaction, broadcast) * @implements {modules.system.getBroadhash} * @implements {Broadcaster.maxRelays} * @implements {Broadcaster.broadcast} - * @implements {library.network.io.sockets.emit} + * @implements {library.network.wsServer.emit} * @param {block} block * @param {Object} broadcast * @emits blocks/change @@ -520,7 +520,7 @@ Transport.prototype.onNewBlock = function (block, broadcast) { if (!__private.broadcaster.maxRelays(block)) { __private.broadcaster.broadcast({ limit: constants.maxPeers, broadhash: broadhash }, { api: '/blocks', data: { block: block }, method: 'POST', immediate: true }); } - library.network.io.sockets.emit('blocks/change', block); + library.network.wsServer.emit('blocks/change', block); }); } }; diff --git a/package-lock.json b/package-lock.json index 15f07f95..dcbf1f63 100644 --- a/package-lock.json +++ b/package-lock.json @@ -43,6 +43,7 @@ "rimraf": "=6.0.1", "semver": "=7.6.3", "socket.io": "^4.8.1", + "socket.io-client": "^4.8.1", "sodium": "^3.0.2", "sodium-browserify-tweetnacl": "*", "strftime": "=0.10.3", @@ -2900,6 +2901,39 @@ "node": ">=10.2.0" } }, + "node_modules/engine.io-client": { + "version": "6.6.3", + "resolved": "https://registry.npmjs.org/engine.io-client/-/engine.io-client-6.6.3.tgz", + "integrity": "sha512-T0iLjnyNWahNyv/lcjS2y4oE358tVS/SYQNxYXGAJ9/GLgH4VCvOQ/mhTjqU88mLZCQgiG8RIegFHYCdVC+j5w==", + "dependencies": { + "@socket.io/component-emitter": "~3.1.0", + "debug": "~4.3.1", + "engine.io-parser": "~5.2.1", + "ws": "~8.17.1", + "xmlhttprequest-ssl": "~2.1.1" + } + }, + "node_modules/engine.io-client/node_modules/debug": { + "version": "4.3.7", + "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.7.tgz", + "integrity": "sha512-Er2nc/H7RrMXZBFCEim6TCmMk02Z8vLC2Rbi1KEBggpo0fS6l0S1nnapwmIi3yW/+GOJap1Krg4w0Hg80oCqgQ==", + "dependencies": { + "ms": "^2.1.3" + }, + "engines": { + "node": ">=6.0" + }, + "peerDependenciesMeta": { + "supports-color": { + "optional": true + } + } + }, + "node_modules/engine.io-client/node_modules/ms": { + "version": "2.1.3", + "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz", + "integrity": "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==" + }, "node_modules/engine.io-parser": { "version": "5.2.3", "resolved": "https://registry.npmjs.org/engine.io-parser/-/engine.io-parser-5.2.3.tgz", @@ -10787,6 +10821,41 @@ "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz", "integrity": "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==" }, + "node_modules/socket.io-client": { + "version": "4.8.1", + "resolved": "https://registry.npmjs.org/socket.io-client/-/socket.io-client-4.8.1.tgz", + "integrity": "sha512-hJVXfu3E28NmzGk8o1sHhN3om52tRvwYeidbj7xKy2eIIse5IoKX3USlS6Tqt3BHAtflLIkCQBkzVrEEfWUyYQ==", + "dependencies": { + "@socket.io/component-emitter": "~3.1.0", + "debug": "~4.3.2", + "engine.io-client": "~6.6.1", + "socket.io-parser": "~4.2.4" + }, + "engines": { + "node": ">=10.0.0" + } + }, + "node_modules/socket.io-client/node_modules/debug": { + "version": "4.3.7", + "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.7.tgz", + "integrity": "sha512-Er2nc/H7RrMXZBFCEim6TCmMk02Z8vLC2Rbi1KEBggpo0fS6l0S1nnapwmIi3yW/+GOJap1Krg4w0Hg80oCqgQ==", + "dependencies": { + "ms": "^2.1.3" + }, + "engines": { + "node": ">=6.0" + }, + "peerDependenciesMeta": { + "supports-color": { + "optional": true + } + } + }, + "node_modules/socket.io-client/node_modules/ms": { + "version": "2.1.3", + "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz", + "integrity": "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==" + }, "node_modules/socket.io-parser": { "version": "4.2.4", "resolved": "https://registry.npmjs.org/socket.io-parser/-/socket.io-parser-4.2.4.tgz", @@ -11750,6 +11819,14 @@ "integrity": "sha512-nquOebG4sngPmGPICTS5EnxqhKbCmz5Ox5hsszI2T6U5qdrJizBc+0ilYSEjTSzU0yZcmvppztXe/5Al5fUwdg==", "dev": true }, + "node_modules/xmlhttprequest-ssl": { + "version": "2.1.2", + "resolved": "https://registry.npmjs.org/xmlhttprequest-ssl/-/xmlhttprequest-ssl-2.1.2.tgz", + "integrity": "sha512-TEU+nJVUUnA4CYJFLvK5X9AOeH4KvDvhIfm0vV1GaQRtchnG0hgK5p8hw/xjv8cunWYCsiPCSDzObPyhEwq3KQ==", + "engines": { + "node": ">=0.4.0" + } + }, "node_modules/xtend": { "version": "4.0.2", "resolved": "https://registry.npmjs.org/xtend/-/xtend-4.0.2.tgz", diff --git a/package.json b/package.json index 12fc25c9..f138ed91 100644 --- a/package.json +++ b/package.json @@ -74,6 +74,7 @@ "rimraf": "=6.0.1", "semver": "=7.6.3", "socket.io": "^4.8.1", + "socket.io-client": "^4.8.1", "sodium": "^3.0.2", "sodium-browserify-tweetnacl": "*", "strftime": "=0.10.3", diff --git a/schema/config.js b/schema/config.js index d97417d4..41092d60 100644 --- a/schema/config.js +++ b/schema/config.js @@ -155,6 +155,9 @@ module.exports = { }, options: { properties: { + maxWsConnections: { + type: 'integer', + }, limits: { type: 'object', properties: { @@ -177,7 +180,7 @@ module.exports = { type: 'integer' } }, - required: ['limits', 'timeout'] + required: ['maxWsConnections', 'limits', 'timeout'] } }, required: ['enabled', 'list', 'access', 'options'] @@ -316,11 +319,24 @@ module.exports = { }, required: ['portWS', 'enabled'] }, + wsNode: { + type: 'object', + properties: { + maxConnections: { + type: 'integer', + minimum: 1, + }, + enabled: { + type: 'boolean' + } + }, + required: ['maxConnections', 'enabled'] + }, nethash: { type: 'string', format: 'hex' } }, - required: ['port', 'address', 'fileLogLevel', 'logFileName', 'consoleLogLevel', 'trustProxy', 'topAccounts', 'cacheEnabled', 'db', 'redis', 'api', 'peers', 'broadcasts', 'transactions', 'forging', 'loading', 'ssl', 'dapp', 'wsClient', 'nethash'] + required: ['port', 'address', 'fileLogLevel', 'logFileName', 'consoleLogLevel', 'trustProxy', 'topAccounts', 'cacheEnabled', 'db', 'redis', 'api', 'peers', 'broadcasts', 'transactions', 'forging', 'loading', 'ssl', 'dapp', 'wsClient', 'wsNode', 'nethash'] } }; diff --git a/test/config.default.json b/test/config.default.json index 076fb35f..f084ae90 100644 --- a/test/config.default.json +++ b/test/config.default.json @@ -63,6 +63,7 @@ "blackList": [] }, "options": { + "maxWsConnections": 15, "limits": { "max": 0, "delayMs": 0, @@ -215,5 +216,9 @@ "portWS": 36665, "enabled": true }, + "wsNode": { + "enabled": true, + "maxConnections": 25 + }, "nethash": "38f153a81332dea86751451fd992df26a9249f0834f72f58f84ac31cceb70f43" }