Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v1.5.0 #81

Merged
merged 28 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
426 changes: 223 additions & 203 deletions ClusterOperator/DBClient.js

Large diffs are not rendered by default.

61 changes: 41 additions & 20 deletions ClusterOperator/Operator.js
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ class Operator {
BackLog.executeLogs = true;
let percent = Math.round(((index + response.records.length) / masterSN) * 1000);
if (masterSN === 0) percent = 0;
log.info(`sync backlog from ${index} to ${index + response.records.length} - [${'='.repeat(Math.floor(percent / 50))}>${'-'.repeat(Math.floor((1000 - percent) / 50))}] %${percent / 10}`, 'cyan');
log.info(`sync backlog from ${index + 1} to ${index + response.records.length} - [${'='.repeat(Math.floor(percent / 50))}>${'-'.repeat(Math.floor((1000 - percent) / 50))}] %${percent / 10}`, 'cyan');
} catch (err) {
log.error(err);
}
Expand All @@ -676,6 +676,7 @@ class Operator {
// wait for all nodes to spawn
let ipList = await fluxAPI.getApplicationIP(config.DBAppName);
const prevMaster = await BackLog.getKey('masterIP', false);
const myip = await BackLog.getKey('myIP', false);
if (prevMaster) {
log.info(`previous master was ${prevMaster}`);
if (ipList.some((obj) => obj.ip.includes(prevMaster))) {
Expand Down Expand Up @@ -712,7 +713,7 @@ class Operator {
ipList[i].ip = ipList[i].ip.split(':')[0];
}
this.OpNodes.push({
ip: ipList[i].ip, active: null, seqNo: 0, upnp,
ip: ipList[i].ip, active: false, seqNo: 0, upnp,
});
}
for (let i = 0; i < appIPList.length; i += 1) {
Expand All @@ -722,23 +723,26 @@ class Operator {
}
let activeNodes = 1;
for (let i = 0; i < ipList.length; i += 1) {
// extraxt ip from upnp nodes
log.info(`asking my ip from: ${ipList[i].ip}:${config.containerApiPort}`);
const status = await fluxAPI.getStatus(ipList[i].ip, config.containerApiPort);
log.info(`response was: ${JSON.stringify(status)}`);
if (status === null || status === 'null') {
this.OpNodes[i].active = false;
} else {
activeNodes += 1;
this.OpNodes[i].seqNo = status.sequenceNumber;
this.OpNodes[i].active = true;
this.myIP = status.remoteIP;
if (myip !== ipList[i].ip) {
// extraxt ip from upnp nodes
log.info(`asking status from: ${ipList[i].ip}:${config.containerApiPort}`);
const status = await fluxAPI.getStatus(ipList[i].ip, config.containerApiPort);
log.info(`${ipList[i].ip}'s response was: ${JSON.stringify(status)}`);
if (status === null || status === 'null') {
this.OpNodes[i].active = false;
} else {
activeNodes += 1;
this.OpNodes[i].seqNo = status.sequenceNumber;
this.OpNodes[i].active = true;
this.myIP = status.remoteIP;
}
}
}
const activeNodePer = 100 * (activeNodes / ipList.length);
log.info(`${activeNodePer} percent of nodes are active`);
if (this.myIP !== null && activeNodePer >= 50) {
log.info(`My ip is ${this.myIP}`);
BackLog.pushKey('myIP', this.myIP, false);
} else {
log.info('Not enough active nodes, retriying again...');
await timer.setTimeout(15000);
Expand Down Expand Up @@ -772,9 +776,27 @@ class Operator {
for (let i = 0; i < ipList.length; i += 1) {
// extraxt ip from upnp nodes
nodeList.push(ipList[i].ip);
// eslint-disable-next-line prefer-destructuring
if (ipList[i].ip.includes(':')) ipList[i].ip = ipList[i].ip.split(':')[0];
this.OpNodes.push({ ip: ipList[i].ip, active: null });
let nodeReachable = false;
let seqNo = 0;
let upnp = false;
if (ipList[i].ip.includes(':')) {
// eslint-disable-next-line prefer-destructuring
ipList[i].ip = ipList[i].ip.split(':')[0];
upnp = true;
}
if (this.myIP && ipList[i].ip === this.myIP) {
nodeReachable = true;
seqNo = BackLog.sequenceNumber;
} else {
const status = await fluxAPI.getStatus(ipList[i].ip, config.containerApiPort, 5000);
if (status !== null && status !== 'null') {
nodeReachable = true;
seqNo = status.sequenceNumber;
}
}
this.OpNodes.push({
ip: ipList[i].ip, active: nodeReachable, seqNo, upnp,
});
if (this.masterNode && ipList[i].ip === this.masterNode) checkMasterIp = true;
}
for (let i = 0; i < appIPList.length; i += 1) {
Expand All @@ -785,12 +807,11 @@ class Operator {
// check if master is working
if (!this.IamMaster && this.masterNode && this.status !== 'INIT' && this.status !== 'COMPRESSING') {
let MasterIP = await fluxAPI.getMaster(this.masterNode, config.containerApiPort);
let tries = 0;
while ((MasterIP === null || MasterIP === 'null') && tries < 10) {
MasterIP = await fluxAPI.getMaster(this.masterNode, config.containerApiPort);
await timer.setTimeout(10000);
let tries = 1;
while ((MasterIP === null || MasterIP === 'null') && tries < 5) {
tries += 1;
log.info(`master not responding, tries :${tries}`);
MasterIP = await fluxAPI.getMaster(this.masterNode, config.containerApiPort);
}
// log.debug(`checking master node ${this.masterNode}: ${MasterIP}`);
if (MasterIP === null || MasterIP === 'null' || MasterIP !== this.masterNode) {
Expand Down
2 changes: 1 addition & 1 deletion ClusterOperator/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ module.exports = {
containerApiPort: String(process.env.API_PORT || 33950).trim(),
DBAppName: process.env.DB_APPNAME || '',
AppName: process.env.CLIENT_APPNAME || '',
version: '1.3.1',
version: '1.5.0',
whiteListedIps: process.env.WHITELIST || '127.0.0.1',
debugMode: true,
authMasterOnly: process.env.AUTH_MASTER_ONLY || false,
Expand Down
21 changes: 9 additions & 12 deletions ClusterOperator/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@
sequenceNumber: BackLog.sequenceNumber,
masterIP: Operator.getMaster(),
taskStatus: BackLog.compressionTask,
clusterStatus: Operator.OpNodes,
});
res.end();
});
Expand All @@ -236,7 +237,7 @@
app.get('/getLogsByTime', async (req, res) => {
if (authUser(req)) {
const { starttime } = req.query;
const { length } = req.query;

Check warning

Code scanning / CodeQL

Information exposure through a stack trace Medium

This information exposed to the user depends on
stack trace information
.
res.send(await BackLog.getLogsByTime(starttime, length));
res.end();
} else {
Expand Down Expand Up @@ -538,14 +539,14 @@

io.on('connection', async (socket) => {
const ip = utill.convertIP(socket.handshake.address);
log.debug(`connection from ${ip}`, 'red');
// log.debug(`connection from ${ip}`, 'red');
if (auth(ip)) {
// log.info(`validating ${ip}: ${await auth(ip)}`);
socket.on('disconnect', (reason) => {
log.info(`disconnected from ${ip}`, 'red');
// log.info(`disconnected from ${ip}`, 'red');
});
socket.on('getStatus', async (callback) => {
// log.info(`getStatus from ${ip}`);
log.info(`getStatus from ${ip}`);
callback({
status: Operator.status,
sequenceNumber: BackLog.sequenceNumber,
Expand All @@ -554,7 +555,7 @@
});
});
socket.on('getMaster', async (callback) => {
// log.info(`getMaster from ${ip}`);
log.info(`getMaster from ${ip}`);
callback({ status: 'success', message: Operator.getMaster() });
});
socket.on('getMyIp', async (callback) => {
Expand All @@ -571,10 +572,10 @@
// log.info(`forwarding query to slaves: ${JSON.stringify(result)}`);
socket.broadcast.emit('query', query, result[1], result[2], false);
socket.emit('query', query, result[1], result[2], connId);
// cache write queries for 20 seconds
// cache write queries for 5 seconds
queryCache.put(result[1], {
query, seq: result[1], timestamp: result[2], connId, ip,
}, 20 * 60);
}, 5 * 60);
callback({ status: Operator.status, result: result[0] });
});
socket.on('askQuery', async (index, callback) => {
Expand All @@ -583,16 +584,12 @@
let connId = false;
if (record) {
if (record.ip === ip && record.connId) connId = record.connId;
log.info(`sending query: ${index}`, 'magenta');
log.info(`sending query: ${index} from Cache`, 'magenta');
socket.emit('query', record.query, record.seq, record.timestamp, connId);
} else {
log.warn(`query ${index} not in query cache`, 'red');
// let BLRecord = BackLog.BLqueryCache.get(index);
// log.info(JSON.stringify(BLRecord), 'red');
// if (!BLRecord) {
const BLRecord = await BackLog.getLog(index);
if (BLRecord.length) {
log.info(`from DB : ${JSON.stringify(BLRecord)}`, 'red');
log.info(`sending query: ${index} from DB`, 'red');
try {
socket.emit('query', BLRecord[0].query, BLRecord[0].seq, BLRecord[0].timestamp, connId);
} catch (err) {
Expand Down
Loading
Loading