Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add socket connection between nodes #99

Open
wants to merge 15 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions api/ws/server.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
const { Server } = require('socket.io');
const Peer = require('../../logic/peer');

function WebSocketServer(server, appConfig) {
this.io = new Server(server, {
allowEIO3: true,
cors: appConfig.cors,
});

this.enabled = appConfig.wsNode.enabled;
this.max = appConfig.wsNode.maxConnections;
}

WebSocketServer.prototype.linkPeers = function (logic) {
if (!this.enabled) {
return;
}

this.io.on('connection', (socket) => {
const peerIp = socket.handshake.address || socket.request.socket.remoteAddress;
const { nonce } = socket.handshake.auth;

if (!nonce) {
socket.disconnect(true);
return;
}

const existingPeer = logic.peers.getByNonce(nonce);

// Handle IPv6-mapped IPv4 addresses
const normalizeIp = (ip) => ip.replace(/^::ffff:/, '');

if (
!existingPeer
|| normalizeIp(peerIp) !== normalizeIp(existingPeer.ip)
|| existingPeer.state === Peer.STATE.BANNED
) {
socket.disconnect(true);
return;
}

if (logic.peers.getSocketCount() >= this.max) {
socket.disconnect(true);
return;
}

existingPeer.isBroadcastingViaSocket = true;

socket.on('disconnect', () => {
const disconnectedPeer = logic.peers.getByNonce(nonce);

if (disconnectedPeer) {
disconnectedPeer.isBroadcastingViaSocket = false;
}
});
});
};

WebSocketServer.prototype.emit = function (eventName, data) {
if (this.enabled) {
this.io.sockets.emit(eventName, data);
}
}

module.exports = WebSocketServer;
264 changes: 264 additions & 0 deletions api/ws/transport.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
'use strict';

const { io } = require('socket.io-client');
const Peer = require('../../logic/peer.js');

const maxReconnectDelay = 60000;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider moving it to config

const defaultReconnectionDelay = 5000;

function TransportWsApi(modules, library, options) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a JSDoc to describe the purpose of TransportWsApi?

Copy link
Member

@adamant-al adamant-al Feb 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And also JSDoc for prototype's methods below.

this.modules = modules;
this.library = library;
this.peers = modules.peers;
this.system = modules.system;
this.transportModule = modules.transport;
this.logger = library.logger;

this.maxConnections = options.maxWsConnections;
this.reconnectionDelay = defaultReconnectionDelay;

this.connections = new Map();

this.peers.events.on('peers:update', () => this.updatePeers());

this.startRotation();
}

TransportWsApi.prototype.initialize = function() {
const self = this;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Preferred to log this event / debug


// Clear existing connections
self.connections.forEach((socket) => {
socket.removeAllListeners();
socket.disconnect();
});
self.connections.clear();

// Connect to multiple peers
self.getRandomPeers(self.maxConnections, (err, peers) => {
if (err || !peers.length) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Preferred to log this event / debug

return self.scheduleReconnect();
}

this.reconnectionDelay = defaultReconnectionDelay;

peers.forEach((peer) => self.connectToPeer(peer));
});
};

TransportWsApi.prototype.connectToPeer = function(peer) {
const self = this;
const peerUrl = `ws://${peer.ip}:${peer.port}`;

if (this.connections.has(peerUrl)) {
return;
}

self.logger.debug(`Connecting to WebSocket peer: ${peerUrl}`);

const socket = io(peerUrl, {
reconnection: false,
transports: ['websocket'],
auth: {
nonce: this.system.getNonce(),
},
});

socket.on('connect', () => self.handleConnect(socket, peer));
socket.on('connect_error', (err) => self.handleConnectError(peer, err));
socket.on('disconnect', (reason) => self.handleDisconnect(peer, reason));

self.connections.set(peerUrl, { socket, peer });
};

TransportWsApi.prototype.handleConnect = function(socket, peer) {
this.logger.debug(`WebSocket: Connected to peer WebSocket at ${peer.ip}:${peer.port}`);

this.peers.switchToWs(peer);
this.peers.recordRequest(peer.ip, peer.port, null);

this.setupEventHandlers(socket, peer);
};

TransportWsApi.prototype.handleConnectError = function(peer, err) {
this.logger.debug(`WebSocket: Connection error with ${peer.ip}:${peer.port}`, err.message);

this.peers.switchToHttp(peer);
this.peers.recordRequest(peer.ip, peer.port, err);

this.replacePeer(peer);
};

TransportWsApi.prototype.handleDisconnect = function(peer, reason) {
this.logger.debug(`WebSocket: Disconnected from ${peer.ip}:${peer.port}`, reason);
this.peers.switchToHttp(peer);
this.replacePeer(peer);
};

TransportWsApi.prototype.replacePeer = function(peer) {
const self = this;

// Remove the disconnected peer
self.connections.delete(`ws://${peer.ip}:${peer.port}`);

// Find a new peer to replace it
self.getRandomPeer((err, newPeer) => {
if (err || !newPeer) {
self.logger.debug('WebSocket: Failed to find replacement peer');
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'WebSocket: Failed to find replacement' ⟶ Add 'for peer ws://${peer.ip}:${peer.port} + err || no peers'?

return;
}

self.connectToPeer(newPeer);
});
};

TransportWsApi.prototype.scheduleReconnect = function() {
if (this.reconnectTimeout) {
clearTimeout(this.reconnectTimeout);
}

this.reconnectTimeout = setTimeout(() => {
this.initialize();
}, this.reconnectionDelay);

this.reconnectionDelay = Math.min(
this.reconnectionDelay * 2,
maxReconnectDelay
);
};

TransportWsApi.prototype.getRandomPeers = function(limit, callback) {
this.peers.list({
limit,
allowedStates: [Peer.STATE.CONNECTED],
syncProtocol: 'http',
broadhash: this.modules.system.getBroadhash()
}, callback);
};

TransportWsApi.prototype.getRandomPeer = function(callback) {
this.getRandomPeers(1, (err, peers) => {
if (err || !peers.length) {
return callback(err || new Error('No peers available'));
}
callback(null, peers[0]);
});
};

TransportWsApi.prototype.setupEventHandlers = function(socket, peer) {
const self = this;

socket.on('transactions/change', (data) => {
self.transportModule.internal.postTransactions({
transaction: data
}, peer, 'websocket /transactions', (err) => {
if (err) {
self.peers.recordRequest(peer.ip, peer.port, err);
}
});
});

socket.on('blocks/change', (data) => {
self.transportModule.internal.postBlock(data, peer, 'websocket /blocks', (err) => {
if (err) {
self.peers.recordRequest(peer.ip, peer.port, err);
}
});
});

socket.on('signature/change', (data) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No other events possible besides
'transactions/change'
'blocks/change'
'signature/change'
?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be 'rounds/change'?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rounds/change is not used.

self.transportModule.internal.postSignatures({
signature: data
}, (err) => {
if (err) {
self.peers.recordRequest(peer.ip, peer.port, err);
}
});
});
};

TransportWsApi.prototype.updatePeers = function() {
const self = this;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Preferred to log this event / debug


this.connections.forEach(({ peer }) => {
if (self.peers.isBanned(peer)) {
self.cleanupConnection(peer);
}
});

const availableSlots = this.maxConnections - this.connections.size;

if (availableSlots <= 0) {
return;
}

this.getRandomPeers(availableSlots, (err, candidates) => {
if (err || !candidates.length) {
return;
}

candidates.forEach(peer => {
self.connectToPeer(peer);
});
});
};

TransportWsApi.prototype.cleanupConnection = function(peer) {
const peerUrl = `ws://${peer.ip}:${peer.port}`;
const connection = this.connections.get(peerUrl);

if (connection) {
connection.socket.removeAllListeners();
connection.socket.disconnect();
this.connections.delete(peerUrl);
}
};

TransportWsApi.prototype.rotatePeers = function () {
const self = this;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Preferred to log this event / debug


const totalConnections = self.connections.size;

if (totalConnections === 0) {
return;
}

const countToRotate = Math.ceil(totalConnections * 0.2); // rotate 20%
const connectionsArray = Array.from(self.connections.values());

const shuffled = connectionsArray.sort(() => Math.random() - 0.5);

self.getRandomPeers(countToRotate, (err, newPeers) => {
if (err || !newPeers.length) {
return;
}

self.logger.debug(`Rotating ${newPeers.length} out of ${totalConnections} peers.`);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider to clarify logging everywhere that it's about ws.

E.g., [wsNode] Rotating ${newPeers.length} out of ${totalConnections} peers.


const peersToRotate = shuffled.slice(0, newPeers.length).map((connection) => connection.peer);

peersToRotate.forEach(peer => {
self.logger.debug(`Rotating peer ${peer.ip}:${peer.port}`);
self.cleanupConnection(peer);
});

newPeers.forEach(newPeer => {
self.connectToPeer(newPeer);
});
});
};

TransportWsApi.prototype.startRotation = function() {
this.rotationInterval = setInterval(() => {
this.rotatePeers();
}, 1000 * 60 * 30); // 30 minutes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make const

};

TransportWsApi.prototype.stopRotation = function() {
if (this.rotationInterval) {
clearInterval(this.rotationInterval);
this.rotationInterval = null;
}
};

module.exports = TransportWsApi;
25 changes: 19 additions & 6 deletions app.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ var httpApi = require('./helpers/httpApi.js');
var Sequence = require('./helpers/sequence.js');
var util = require('util');
var z_schema = require('./helpers/z_schema.js');
const TransportWsApi = require('./api/ws/transport.js');
const WebSocketServer = require('./api/ws/server.js');
process.stdin.resume();

var versionBuild = fs.readFileSync(path.join(__dirname, 'build'), 'utf8');
Expand Down Expand Up @@ -326,11 +328,7 @@ d.run(function () {

var server = require('http').createServer(app);

const { Server } = require('socket.io');
const io = new Server(server, {
allowEIO3: true,
cors: appConfig.cors
});
const wsServer = new WebSocketServer(server, appConfig);

var privateKey, certificate, https, https_io;

Expand All @@ -354,7 +352,7 @@ d.run(function () {
express: express,
app: app,
server: server,
io: io,
wsServer,
https: https,
https_io: https_io
});
Expand Down Expand Up @@ -606,6 +604,21 @@ d.run(function () {
});
}],

/**
* Listens for new transacttions using websocket and links peers to the websocket server
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: transacttions -> transactions

*/
transportWs: ['network', 'config', 'modules', 'logic', function (scope, cb) {
const { options } = appConfig.peers;
if (options.maxWsConnections > 0) {
const transportWs = new TransportWsApi(scope.modules, scope.logic, appConfig.peers.options);
transportWs.initialize();
}

scope.network.wsServer.linkPeers(scope.logic)

cb();
}],

/**
* Loads api from `api` folder using `config.api`, once modules, logger and
* network are completed.
Expand Down
Loading