diff --git a/ClusterOperator/DBClient.js b/ClusterOperator/DBClient.js index b11a6bf..472dc0b 100644 --- a/ClusterOperator/DBClient.js +++ b/ClusterOperator/DBClient.js @@ -1,203 +1,223 @@ -/* eslint-disable no-unused-vars */ -const mySql = require('mysql2/promise'); -const net = require('net'); -const config = require('./config'); -const Security = require('./Security'); -const log = require('../lib/log'); - -class DBClient { - constructor() { - this.connection = {}; - this.connected = false; - this.InitDB = ''; - this.stream = null; - this.socketCallBack = null; - this.socketId = null; - this.enableSocketWrite = false; - } - - /** - * [init] - */ - async createStream() { - this.stream = net.connect({ - host: config.dbHost, - port: config.dbPort, - }); - const { stream } = this; - return new Promise((resolve, reject) => { - stream.once('connect', () => { - stream.removeListener('error', reject); - resolve(stream); - }); - stream.once('error', (err) => { - stream.removeListener('connection', resolve); - stream.removeListener('data', resolve); - reject(err); - }); - }); - } - - /** - * [rawCallback] - */ - rawCallback(data) { - if (this.socketCallBack && this.enableSocketWrite) { - this.socketCallBack.write(data); - // log.info(`writing to ${this.socketId}: ${data.length} bytes`); - } - } - - /** - * [setSocket] - */ - setSocket(func, id = null) { - if (func === null) log.info('socket set to null'); - this.socketCallBack = func; - this.socketId = id; - this.enableSocketWrite = true; - } - - /** - * [disableSocketWrite] - */ - disableSocketWrite() { - // log.info(`socket write disabled for ${this.socketId}`); - this.enableSocketWrite = false; - this.socketId = null; - } - - /** - * [init] - */ - async init() { - if (config.dbType === 'mysql') { - await this.createStream(); - this.stream.on('data', (data) => { - this.rawCallback(data); - }); - this.connection = await mySql.createConnection({ - password: Security.getKey(), - user: config.dbUser, - stream: this.stream, - }); - this.connection.once('error', () => { - this.connected = false; - log.info(`Connecten to ${this.InitDB} DB was lost`); - }); - this.connected = true; - } - } - - /** - * [query] - * @param {string} query [description] - */ - async query(query, rawResult = false, fullQuery = '') { - if (config.dbType === 'mysql') { - // log.info(`running Query: ${query}`); - try { - if (!this.connected) { - log.info(`Connecten to ${this.InitDB} DB was lost, reconnecting...`); - await this.init(); - this.setDB(this.InitDB); - } - if (rawResult) { - const [rows, fields, err] = await this.connection.query(query); - if (err) log.error(err); - return [rows, fields, err]; - // eslint-disable-next-line no-else-return - } else { - const [rows, err] = await this.connection.query(query); - if (err && err.toString().includes('Error')) log.error(`Error running query: ${err.toString()}, ${fullQuery}`, 'red'); - return rows; - } - } catch (err) { - if (err && err.toString().includes('Error')) log.error(`Error running query: ${err.toString()}, ${fullQuery}`, 'red'); - return [null, null, err]; - } - } - return null; - } - - /** - * [execute] - * @param {string} query [description] - * @param {array} params [description] - */ - async execute(query, params, rawResult = false, fullQuery = '') { - if (config.dbType === 'mysql') { - try { - if (!this.connected) { - await this.init(); - } - const [rows, fields, err] = await this.connection.execute(query, params); - if (err && err.toString().includes('Error')) log.error(`Error executing query: ${err.toString()}, ${fullQuery}`, 'red'); - if (rawResult) return [rows, fields, err]; - return rows; - } catch (err) { - if (err && err.toString().includes('Error')) log.error(`Error executing query: ${err.toString()}, ${fullQuery}`, 'red'); - return [null, null, err]; - } - } - return null; - } - - /** - * [createDB] - * @param {string} dbName [description] - */ - async createDB(dbName) { - if (config.dbType === 'mysql') { - try { - await this.query(`CREATE DATABASE IF NOT EXISTS ${dbName}`); - } catch (err) { - log.info(`DB ${dbName} exists`); - } - } - return null; - } - - /** - * [setDB] - * @param {string} dbName [description] - */ - async setDB(dbName) { - if (config.dbType === 'mysql') { - this.InitDB = dbName; - // log.info(`seting db to ${dbName}`); - this.connection.changeUser({ - database: dbName, - }, (err) => { - if (err) { - // console.log('Error changing database', err); - } - }); - } - } - - /** - * [setPassword] - * @param {string} key [description] - */ - async setPassword(key) { - if (config.dbType === 'mysql') { - await this.query(`SET PASSWORD FOR 'root'@'localhost' = PASSWORD('${key}');SET PASSWORD FOR 'root'@'%' = PASSWORD('${key}');FLUSH PRIVILEGES;`); - } - } -} - -// eslint-disable-next-line func-names -exports.createClient = async function () { - try { - const cl = new DBClient(); - await cl.init(); - return cl; - } catch (err) { - log.info(JSON.stringify(err)); - if (config.dbType === 'mysql') { - if (err.code === 'ER_ACCESS_DENIED_ERROR') return 'WRONG_KEY'; - } - return null; - } -}; +/* eslint-disable no-unused-vars */ +const mySql = require('mysql2/promise'); +const net = require('net'); +const config = require('./config'); +const Security = require('./Security'); +const log = require('../lib/log'); + +class DBClient { + constructor() { + this.connection = {}; + this.connected = false; + this.InitDB = ''; + this.stream = null; + this.socketCallBack = null; + this.socketId = null; + this.enableSocketWrite = false; + } + + /** + * [init] + */ + async createStream() { + this.stream = net.connect({ + host: config.dbHost, + port: config.dbPort, + }); + const { stream } = this; + return new Promise((resolve, reject) => { + stream.once('connect', () => { + stream.removeListener('error', reject); + resolve(stream); + }); + stream.once('error', (err) => { + stream.removeListener('connection', resolve); + stream.removeListener('data', resolve); + reject(err); + }); + }); + } + + /** + * [rawCallback] + */ + rawCallback(data) { + if (this.socketCallBack && this.enableSocketWrite) { + this.socketCallBack.write(data); + // log.info(`writing to ${this.socketId}: ${data.length} bytes`); + } + } + + /** + * [setSocket] + */ + setSocket(func, id = null) { + if (func === null) log.info('socket set to null'); + this.socketCallBack = func; + this.socketId = id; + this.enableSocketWrite = true; + } + + /** + * [disableSocketWrite] + */ + disableSocketWrite() { + // log.info(`socket write disabled for ${this.socketId}`); + this.enableSocketWrite = false; + this.socketId = null; + } + + /** + * [init] + */ + async init() { + if (config.dbType === 'mysql') { + try { + await this.createStream(); + this.stream.on('data', (data) => { + this.rawCallback(data); + }); + this.connection = await mySql.createConnection({ + password: Security.getKey(), + user: config.dbUser, + stream: this.stream, + connectTimeout: 60000, // Increased timeout + }); + this.connection.on('error', (err) => { + this.connected = false; + log.info(`Connection to ${this.InitDB} DB was lost: ${err.message}`); + this.reconnect(); + }); + this.connected = true; + } catch (err) { + log.error(`Initial connection error: ${err.message}`); + this.reconnect(); + } + } + } + + async reconnect() { + if (this.connected) return; + log.info('Attempting to reconnect to the database...'); + try { + await this.init(); + log.info('Reconnected to the database.'); + } catch (err) { + log.error(`Reconnection failed: ${err.message}`); + setTimeout(() => this.reconnect(), 5000); // Retry after 5 seconds + } + } + + /** + * [query] + * @param {string} query [description] + */ + async query(query, rawResult = false, fullQuery = '') { + if (config.dbType === 'mysql') { + // log.info(`running Query: ${query}`); + try { + if (!this.connected) { + log.info(`Connecten to ${this.InitDB} DB was lost, reconnecting...`); + await this.init(); + this.setDB(this.InitDB); + } + if (rawResult) { + const [rows, fields, err] = await this.connection.query(query); + if (err) log.error(err); + return [rows, fields, err]; + // eslint-disable-next-line no-else-return + } else { + const [rows, err] = await this.connection.query(query); + if (err && err.toString().includes('Error')) log.error(`Error running query: ${err.toString()}, ${fullQuery}`, 'red'); + return rows; + } + } catch (err) { + if (err && err.toString().includes('Error')) log.error(`Error running query: ${err.toString()}, ${fullQuery}`, 'red'); + return [null, null, err]; + } + } + return null; + } + + /** + * [execute] + * @param {string} query [description] + * @param {array} params [description] + */ + async execute(query, params, rawResult = false, fullQuery = '') { + if (config.dbType === 'mysql') { + try { + if (!this.connected) { + await this.init(); + } + const [rows, fields, err] = await this.connection.execute(query, params); + if (err && err.toString().includes('Error')) log.error(`Error executing query: ${err.toString()}, ${fullQuery}`, 'red'); + if (rawResult) return [rows, fields, err]; + return rows; + } catch (err) { + if (err && err.toString().includes('Error')) log.error(`Error executing query: ${err.toString()}, ${fullQuery}`, 'red'); + return [null, null, err]; + } + } + return null; + } + + /** + * [createDB] + * @param {string} dbName [description] + */ + async createDB(dbName) { + if (config.dbType === 'mysql') { + try { + await this.query(`CREATE DATABASE IF NOT EXISTS ${dbName}`); + } catch (err) { + log.info(`DB ${dbName} exists`); + } + } + return null; + } + + /** + * [setDB] + * @param {string} dbName [description] + */ + async setDB(dbName) { + if (config.dbType === 'mysql') { + this.InitDB = dbName; + // log.info(`seting db to ${dbName}`); + this.connection.changeUser({ + database: dbName, + }).catch((err) => { + if (err) { + log.error(`Error changing database: ${err}`); + this.reconnect(); + } + }); + } + } + + /** + * [setPassword] + * @param {string} key [description] + */ + async setPassword(key) { + if (config.dbType === 'mysql') { + await this.query(`SET PASSWORD FOR 'root'@'localhost' = PASSWORD('${key}');SET PASSWORD FOR 'root'@'%' = PASSWORD('${key}');FLUSH PRIVILEGES;`); + } + } +} + +// eslint-disable-next-line func-names +exports.createClient = async function () { + try { + const cl = new DBClient(); + await cl.init(); + return cl; + } catch (err) { + log.info(JSON.stringify(err)); + if (config.dbType === 'mysql') { + if (err.code === 'ER_ACCESS_DENIED_ERROR') return 'WRONG_KEY'; + } + return null; + } +}; diff --git a/ClusterOperator/Operator.js b/ClusterOperator/Operator.js index 076bde3..5016765 100644 --- a/ClusterOperator/Operator.js +++ b/ClusterOperator/Operator.js @@ -654,7 +654,7 @@ class Operator { BackLog.executeLogs = true; let percent = Math.round(((index + response.records.length) / masterSN) * 1000); if (masterSN === 0) percent = 0; - log.info(`sync backlog from ${index} to ${index + response.records.length} - [${'='.repeat(Math.floor(percent / 50))}>${'-'.repeat(Math.floor((1000 - percent) / 50))}] %${percent / 10}`, 'cyan'); + log.info(`sync backlog from ${index + 1} to ${index + response.records.length} - [${'='.repeat(Math.floor(percent / 50))}>${'-'.repeat(Math.floor((1000 - percent) / 50))}] %${percent / 10}`, 'cyan'); } catch (err) { log.error(err); } @@ -676,6 +676,7 @@ class Operator { // wait for all nodes to spawn let ipList = await fluxAPI.getApplicationIP(config.DBAppName); const prevMaster = await BackLog.getKey('masterIP', false); + const myip = await BackLog.getKey('myIP', false); if (prevMaster) { log.info(`previous master was ${prevMaster}`); if (ipList.some((obj) => obj.ip.includes(prevMaster))) { @@ -712,7 +713,7 @@ class Operator { ipList[i].ip = ipList[i].ip.split(':')[0]; } this.OpNodes.push({ - ip: ipList[i].ip, active: null, seqNo: 0, upnp, + ip: ipList[i].ip, active: false, seqNo: 0, upnp, }); } for (let i = 0; i < appIPList.length; i += 1) { @@ -722,23 +723,26 @@ class Operator { } let activeNodes = 1; for (let i = 0; i < ipList.length; i += 1) { - // extraxt ip from upnp nodes - log.info(`asking my ip from: ${ipList[i].ip}:${config.containerApiPort}`); - const status = await fluxAPI.getStatus(ipList[i].ip, config.containerApiPort); - log.info(`response was: ${JSON.stringify(status)}`); - if (status === null || status === 'null') { - this.OpNodes[i].active = false; - } else { - activeNodes += 1; - this.OpNodes[i].seqNo = status.sequenceNumber; - this.OpNodes[i].active = true; - this.myIP = status.remoteIP; + if (myip !== ipList[i].ip) { + // extraxt ip from upnp nodes + log.info(`asking status from: ${ipList[i].ip}:${config.containerApiPort}`); + const status = await fluxAPI.getStatus(ipList[i].ip, config.containerApiPort); + log.info(`${ipList[i].ip}'s response was: ${JSON.stringify(status)}`); + if (status === null || status === 'null') { + this.OpNodes[i].active = false; + } else { + activeNodes += 1; + this.OpNodes[i].seqNo = status.sequenceNumber; + this.OpNodes[i].active = true; + this.myIP = status.remoteIP; + } } } const activeNodePer = 100 * (activeNodes / ipList.length); log.info(`${activeNodePer} percent of nodes are active`); if (this.myIP !== null && activeNodePer >= 50) { log.info(`My ip is ${this.myIP}`); + BackLog.pushKey('myIP', this.myIP, false); } else { log.info('Not enough active nodes, retriying again...'); await timer.setTimeout(15000); @@ -772,9 +776,27 @@ class Operator { for (let i = 0; i < ipList.length; i += 1) { // extraxt ip from upnp nodes nodeList.push(ipList[i].ip); - // eslint-disable-next-line prefer-destructuring - if (ipList[i].ip.includes(':')) ipList[i].ip = ipList[i].ip.split(':')[0]; - this.OpNodes.push({ ip: ipList[i].ip, active: null }); + let nodeReachable = false; + let seqNo = 0; + let upnp = false; + if (ipList[i].ip.includes(':')) { + // eslint-disable-next-line prefer-destructuring + ipList[i].ip = ipList[i].ip.split(':')[0]; + upnp = true; + } + if (this.myIP && ipList[i].ip === this.myIP) { + nodeReachable = true; + seqNo = BackLog.sequenceNumber; + } else { + const status = await fluxAPI.getStatus(ipList[i].ip, config.containerApiPort, 5000); + if (status !== null && status !== 'null') { + nodeReachable = true; + seqNo = status.sequenceNumber; + } + } + this.OpNodes.push({ + ip: ipList[i].ip, active: nodeReachable, seqNo, upnp, + }); if (this.masterNode && ipList[i].ip === this.masterNode) checkMasterIp = true; } for (let i = 0; i < appIPList.length; i += 1) { @@ -785,12 +807,11 @@ class Operator { // check if master is working if (!this.IamMaster && this.masterNode && this.status !== 'INIT' && this.status !== 'COMPRESSING') { let MasterIP = await fluxAPI.getMaster(this.masterNode, config.containerApiPort); - let tries = 0; - while ((MasterIP === null || MasterIP === 'null') && tries < 10) { - MasterIP = await fluxAPI.getMaster(this.masterNode, config.containerApiPort); - await timer.setTimeout(10000); + let tries = 1; + while ((MasterIP === null || MasterIP === 'null') && tries < 5) { tries += 1; log.info(`master not responding, tries :${tries}`); + MasterIP = await fluxAPI.getMaster(this.masterNode, config.containerApiPort); } // log.debug(`checking master node ${this.masterNode}: ${MasterIP}`); if (MasterIP === null || MasterIP === 'null' || MasterIP !== this.masterNode) { diff --git a/ClusterOperator/config.js b/ClusterOperator/config.js index c7392af..ae51118 100644 --- a/ClusterOperator/config.js +++ b/ClusterOperator/config.js @@ -16,7 +16,7 @@ module.exports = { containerApiPort: String(process.env.API_PORT || 33950).trim(), DBAppName: process.env.DB_APPNAME || '', AppName: process.env.CLIENT_APPNAME || '', - version: '1.3.1', + version: '1.5.0', whiteListedIps: process.env.WHITELIST || '127.0.0.1', debugMode: true, authMasterOnly: process.env.AUTH_MASTER_ONLY || false, diff --git a/ClusterOperator/server.js b/ClusterOperator/server.js index 368e248..424c9b0 100644 --- a/ClusterOperator/server.js +++ b/ClusterOperator/server.js @@ -220,6 +220,7 @@ function startUI() { sequenceNumber: BackLog.sequenceNumber, masterIP: Operator.getMaster(), taskStatus: BackLog.compressionTask, + clusterStatus: Operator.OpNodes, }); res.end(); }); @@ -538,14 +539,14 @@ async function initServer() { io.on('connection', async (socket) => { const ip = utill.convertIP(socket.handshake.address); - log.debug(`connection from ${ip}`, 'red'); + // log.debug(`connection from ${ip}`, 'red'); if (auth(ip)) { // log.info(`validating ${ip}: ${await auth(ip)}`); socket.on('disconnect', (reason) => { - log.info(`disconnected from ${ip}`, 'red'); + // log.info(`disconnected from ${ip}`, 'red'); }); socket.on('getStatus', async (callback) => { - // log.info(`getStatus from ${ip}`); + log.info(`getStatus from ${ip}`); callback({ status: Operator.status, sequenceNumber: BackLog.sequenceNumber, @@ -554,7 +555,7 @@ async function initServer() { }); }); socket.on('getMaster', async (callback) => { - // log.info(`getMaster from ${ip}`); + log.info(`getMaster from ${ip}`); callback({ status: 'success', message: Operator.getMaster() }); }); socket.on('getMyIp', async (callback) => { @@ -571,10 +572,10 @@ async function initServer() { // log.info(`forwarding query to slaves: ${JSON.stringify(result)}`); socket.broadcast.emit('query', query, result[1], result[2], false); socket.emit('query', query, result[1], result[2], connId); - // cache write queries for 20 seconds + // cache write queries for 5 seconds queryCache.put(result[1], { query, seq: result[1], timestamp: result[2], connId, ip, - }, 20 * 60); + }, 5 * 60); callback({ status: Operator.status, result: result[0] }); }); socket.on('askQuery', async (index, callback) => { @@ -583,16 +584,12 @@ async function initServer() { let connId = false; if (record) { if (record.ip === ip && record.connId) connId = record.connId; - log.info(`sending query: ${index}`, 'magenta'); + log.info(`sending query: ${index} from Cache`, 'magenta'); socket.emit('query', record.query, record.seq, record.timestamp, connId); } else { - log.warn(`query ${index} not in query cache`, 'red'); - // let BLRecord = BackLog.BLqueryCache.get(index); - // log.info(JSON.stringify(BLRecord), 'red'); - // if (!BLRecord) { const BLRecord = await BackLog.getLog(index); if (BLRecord.length) { - log.info(`from DB : ${JSON.stringify(BLRecord)}`, 'red'); + log.info(`sending query: ${index} from DB`, 'red'); try { socket.emit('query', BLRecord[0].query, BLRecord[0].seq, BLRecord[0].timestamp, connId); } catch (err) { diff --git a/lib/ConnectionPool.js b/lib/ConnectionPool.js index b1989c9..705f4c9 100644 --- a/lib/ConnectionPool.js +++ b/lib/ConnectionPool.js @@ -1,121 +1,125 @@ -/* eslint-disable no-else-return */ -/* eslint-disable no-unused-vars */ -const log = require('./log'); -const dbClient = require('../ClusterOperator/DBClient'); - -class ConnectionPool { - static #connections = []; - - static #freeConnections = []; - - static #dbName = ''; - - static #maxConnections; - - /** - * [init] - */ - static async init(params = { numberOfConnections: 10, maxConnections: 100, db: '' }) { - this.#dbName = params.db; - this.#maxConnections = params.maxConnections; - for (let id = 0; id < params.numberOfConnections; id += 1) { - // eslint-disable-next-line no-await-in-loop - await this.#getNewConnection(); - } - } - - /** - * [getNewConnection] - * @return {connection} [description] - */ - static async #getNewConnection(returnSocket = false, force = false) { - if (this.#connections.length > this.#maxConnections && !force) { - log.error('max pool connection limit reached.'); - throw new Error('max connection limit reached.'); - } - const dbConn = await dbClient.createClient(); - if (dbConn) { - await dbConn.setDB(this.#dbName); - const connId = this.#connections.length; - const connObj = { id: connId, conn: dbConn, socket: null }; - this.#connections.push(connObj); - if (!returnSocket) this.#freeConnections.push(connId); - return connObj; - } else { - log.error('failed to get new DB connection'); - return null; - } - } - - /** - * [keepFreeConnections] - */ - static async keepFreeConnections() { - for (let id = 0; id < this.#freeConnections.length; id += 1) { - // eslint-disable-next-line no-await-in-loop - await this.#connections[this.#freeConnections[id]].conn.setDB(this.#dbName); - } - } - - /** - * [getFreeConnection] - * @param {socket} socket [description] - * @return {int} [description] - */ - static async getFreeConnection(socket, force = false) { - if (this.#freeConnections.length) { - const connId = this.#freeConnections.shift(); - this.#connections[connId].socket = socket; - this.#connections[connId].conn.setSocket(socket, connId); - // console.log(`retuning ID: ${connId}`); - // socket.once('close', this.releaseConnection(connId)); - // log.info(`taking ${connId},freeConnections: ${this.#freeConnections.length}`, 'lb'); - // log.query('taken', 'yellow', connId); - return connId; - } - const connObj = await this.#getNewConnection(true, force); - connObj.socket = socket; - connObj.conn.setSocket(socket, connObj.id); - // log.info(`taking ${connObj.id},freeConnections: ${this.#freeConnections.length}`, 'lb'); - // console.log(`retuning ID: ${connObj.id}`); - // socket.once('close', this.releaseConnection(connObj.id)); - // log.query('taken', 'yellow', connObj.id); - return connObj.id; - } - - /** - * [getSocketById] - * @param {int} connId [description] - * @return {socket} [description] - */ - static getSocketById(connId) { - return this.#connections[connId].socket; - } - - /** - * [getConnectionById] - * @param {int} connId [description] - * @return {connection} [description] - */ - static getConnectionById(connId) { - return this.#connections[connId].conn; - } - - /** - * [releaseConnection] - * @param {int} connId [description] - */ - static releaseConnection(connId) { - if (connId !== null) { - // log.info(`releasing ${connId}`); - // log.query('released', 'yellow', connId); - if (this.#connections[connId].socket) { - this.#connections[connId].socket = null; - this.#connections[connId].conn.disableSocketWrite(); - this.#freeConnections.push(connId); - } - // log.info(`releasing ${connId},freeConnections: ${this.#freeConnections.length}`, 'lb'); - } - } -} -module.exports = ConnectionPool; +/* eslint-disable no-else-return */ +/* eslint-disable no-unused-vars */ +const log = require('./log'); +const dbClient = require('../ClusterOperator/DBClient'); + +class ConnectionPool { + static #connections = []; + + static #freeConnections = []; + + static #dbName = ''; + + static #maxConnections; + + /** + * [init] + */ + static async init(params = { numberOfConnections: 10, maxConnections: 100, db: '' }) { + this.#dbName = params.db; + this.#maxConnections = params.maxConnections; + for (let id = 0; id < params.numberOfConnections; id += 1) { + // eslint-disable-next-line no-await-in-loop + await this.#getNewConnection(); + } + } + + /** + * [getNewConnection] + * @return {connection} [description] + */ + static async #getNewConnection(returnSocket = false, force = false) { + if (this.#connections.length > this.#maxConnections && !force) { + log.error('max pool connection limit reached.'); + throw new Error('max connection limit reached.'); + } + const dbConn = await dbClient.createClient(); + if (dbConn) { + await dbConn.setDB(this.#dbName); + const connId = this.#connections.length; + const connObj = { id: connId, conn: dbConn, socket: null }; + this.#connections.push(connObj); + if (!returnSocket) this.#freeConnections.push(connId); + return connObj; + } else { + log.error('failed to get new DB connection'); + return null; + } + } + + /** + * [keepFreeConnections] + */ + static async keepFreeConnections() { + try { + for (let id = 0; id < this.#freeConnections.length; id += 1) { + // eslint-disable-next-line no-await-in-loop + await this.#connections[this.#freeConnections[id]].conn.setDB(this.#dbName); + } + } catch (error) { + log.error(error); + } + } + + /** + * [getFreeConnection] + * @param {socket} socket [description] + * @return {int} [description] + */ + static async getFreeConnection(socket, force = false) { + if (this.#freeConnections.length) { + const connId = this.#freeConnections.shift(); + this.#connections[connId].socket = socket; + this.#connections[connId].conn.setSocket(socket, connId); + // console.log(`retuning ID: ${connId}`); + // socket.once('close', this.releaseConnection(connId)); + // log.info(`taking ${connId},freeConnections: ${this.#freeConnections.length}`, 'lb'); + // log.query('taken', 'yellow', connId); + return connId; + } + const connObj = await this.#getNewConnection(true, force); + connObj.socket = socket; + connObj.conn.setSocket(socket, connObj.id); + // log.info(`taking ${connObj.id},freeConnections: ${this.#freeConnections.length}`, 'lb'); + // console.log(`retuning ID: ${connObj.id}`); + // socket.once('close', this.releaseConnection(connObj.id)); + // log.query('taken', 'yellow', connObj.id); + return connObj.id; + } + + /** + * [getSocketById] + * @param {int} connId [description] + * @return {socket} [description] + */ + static getSocketById(connId) { + return this.#connections[connId].socket; + } + + /** + * [getConnectionById] + * @param {int} connId [description] + * @return {connection} [description] + */ + static getConnectionById(connId) { + return this.#connections[connId].conn; + } + + /** + * [releaseConnection] + * @param {int} connId [description] + */ + static releaseConnection(connId) { + if (connId !== null) { + // log.info(`releasing ${connId}`); + // log.query('released', 'yellow', connId); + if (this.#connections[connId].socket) { + this.#connections[connId].socket = null; + this.#connections[connId].conn.disableSocketWrite(); + this.#freeConnections.push(connId); + } + // log.info(`releasing ${connId},freeConnections: ${this.#freeConnections.length}`, 'lb'); + } + } +} +module.exports = ConnectionPool; diff --git a/lib/fluxAPI.js b/lib/fluxAPI.js index 3d13ca6..aba418e 100644 --- a/lib/fluxAPI.js +++ b/lib/fluxAPI.js @@ -1,324 +1,324 @@ -const axios = require('axios'); -const { io } = require('socket.io-client'); -const log = require('./log'); -/** - * [getApplicationSpecs Retrieves app specifications] - * @param {string} appName [description] - * @return {Array} [description] - */ -async function getApplicationSpecs(appName) { - try { - const fluxnodeList = await axios.get(`https://api.runonflux.io/apps/appspecifications/${appName}`, { timeout: 13456 }); - if (fluxnodeList.data.status === 'success') { - return fluxnodeList.data.data || []; - } - return []; - } catch (e) { - log.error(e); - return []; - } -} - -/** - * [getApplicationSpecs Retrieves app specifications] - * @param {string} appName [description] - * @return {Array} [description] - */ -async function getApplicationOwner(appName) { - try { - const fluxnodeList = await axios.get(`https://api.runonflux.io/apps/appspecifications/${appName}`, { timeout: 13456 }); - if (fluxnodeList.data.status === 'success') { - return fluxnodeList.data.data.owner || []; - } - return []; - } catch (e) { - log.error(e); - return []; - } -} -/** - * [getApplicationIP Retrieves IP's that a given application is running on] - * @param {string} appName [description] - * @return {Array} [description] - */ -async function getApplicationIP(appName) { - try { - const fluxnodeList = await axios.get(`https://api.runonflux.io/apps/location/${appName}`, { timeout: 13456 }); - if (fluxnodeList.data.status === 'success') { - return fluxnodeList.data.data || []; - } - return []; - } catch (e) { - log.error(e); - return []; - } -} - -/** - * [validateApp] - * @param {string} ip [description] - * @param {string} appName [description] - * @return {boolean} [description] - */ -async function validateApp(appName, ip, port = 16127) { - try { - const result = await axios.get(`http://${ip}:${port}/apps/listallapps`, { timeout: 13456 }); - const appList = result.data; - // console.log(appList.data); - if (appList.status === 'success') { - let isValid = true; - for (let i = 0; i < appList.data.length; i += 1) { - if (appList.data[i].Names[0].endsWith(`_${appName}`) && appList.data[i].State !== 'running') { isValid = false; break; } - // log.info(`${appList.data[i].Names[0]} : ${appList.data[i].State}`); - } - // log.info(isValid); - return isValid; - } - return false; - } catch (e) { - log.error(e); - return false; - } -} - -/** - * [getMaster ] - * @param {string} ip [description] - * @param {string} port [description] - * @return {json} [description] - */ -async function getMaster(ip, port) { - try { - return new Promise((resolve) => { - const client = io.connect(`http://${ip}:${port}`, { transports: ['websocket', 'polling'], reconnection: false, timeout: 2000 }); - const timeout = setTimeout(() => { - client.disconnect(); - resolve(null); - }, 2000); - client.emit('getMaster', (response) => { - client.disconnect(); - clearTimeout(timeout); - resolve(response.message); - }); - }); - } catch (e) { - log.error(e); - return []; - } -} - -/** - * [getMyIp] - * @param {string} ip [description] - * @param {string} port [description] - * @return {json} [description] - */ -async function getMyIp(ip, port) { - try { - return new Promise((resolve) => { - const client = io.connect(`http://${ip}:${port}`, { transports: ['websocket', 'polling'], reconnection: false, timeout: 2000 }); - - const timeout = setTimeout(() => { - client.disconnect(); - resolve(null); - }, 2000); - client.on('connect_error', (reason) => { - log.error(reason); - clearTimeout(timeout); - resolve(null); - }); - client.emit('getMyIp', (response) => { - client.disconnect(); - clearTimeout(timeout); - resolve(response.message); - }); - }); - } catch (e) { - log.error('socket connection failed.'); - log.error(e); - return null; - } -} - -/** - * [getMyIp] - * @param {string} ip [description] - * @param {string} port [description] - * @return {json} [description] - */ -async function getStatus(ip, port) { - try { - return new Promise((resolve) => { - const client = io.connect(`http://${ip}:${port}`, { transports: ['websocket', 'polling'], reconnection: false, timeout: 2000 }); - - const timeout = setTimeout(() => { - log.info('connection timed out'); - client.disconnect(); - resolve(null); - }, 2000); - client.on('connect_error', (reason) => { - log.info('connection Error'); - log.error(reason); - clearTimeout(timeout); - resolve(null); - }); - client.emit('getStatus', (response) => { - // console.log(response); - client.disconnect(); - clearTimeout(timeout); - resolve(response); - }); - }); - } catch (e) { - log.error('socket connection failed.'); - log.error(e); - return null; - } -} - -/** - * [resetMaster] - * @param {string} ip [description] - * @param {string} port [description] - * @return {json} [description] - */ -async function resetMaster(ip, port) { - try { - return new Promise((resolve) => { - const client = io.connect(`http://${ip}:${port}`, { transports: ['websocket', 'polling'], reconnection: false, timeout: 2000 }); - - const timeout = setTimeout(() => { - log.info('connection timed out'); - client.disconnect(); - resolve(null); - }, 2000); - client.on('connect_error', (reason) => { - log.info('connection Error'); - log.error(reason); - clearTimeout(timeout); - resolve(null); - }); - client.emit('resetMaster', (response) => { - // console.log(response); - client.disconnect(); - clearTimeout(timeout); - resolve(response); - }); - }); - } catch (e) { - log.error('socket connection failed.'); - log.error(e); - return null; - } -} - -/** - * [getBackLog] - * @param {string} index [description] - * @param {socket} socket [description] - * @return {json} [description] - */ -async function getBackLog(index, socket) { - try { - return new Promise((resolve) => { - socket.emit('getBackLog', index, (response) => { - // log.info(JSON.stringify(response)); - resolve(response); - }); - }); - } catch (e) { - log.error(e); - return null; - } -} -/** - * [ask for query] - * @param {string} index [description] - * @param {socket} socket [description] - * @return {json} [description] - */ -async function askQuery(index, socket) { - try { - return new Promise((resolve) => { - socket.emit('askQuery', index, (response) => { - log.info(JSON.stringify(response), 'magenta'); - resolve(response); - }); - }); - } catch (e) { - log.error(e); - return null; - } -} - -/** - * [shareKeys] - * @param {string} pubKey [description] - * @param {socket} socket [description] - * @return {json} [description] - */ -async function shareKeys(pubKey, socket) { - try { - return new Promise((resolve) => { - socket.emit('shareKeys', pubKey, (response) => { - resolve(response); - }); - }); - } catch (e) { - log.error(e); - return null; - } -} - -/** - * [updateKey] - * @param {string} key [description] - * @param {socket} socket [description] - * @return {json} [description] - */ -async function updateKey(key, value, socket) { - try { - return new Promise((resolve) => { - socket.emit('updateKey', key, value, (response) => { - resolve(response); - }); - }); - } catch (e) { - log.error(e); - return null; - } -} - -/** - * [getKeys] - * @param {socket} socket [description] - * @return {json} [description] - */ -async function getKeys(socket) { - try { - return new Promise((resolve) => { - socket.emit('getKeys', (response) => { - resolve(response); - }); - }); - } catch (e) { - log.error(e); - return null; - } -} - -module.exports = { - getApplicationIP, - getApplicationSpecs, - validateApp, - getMaster, - getMyIp, - getStatus, - getBackLog, - shareKeys, - updateKey, - getKeys, - askQuery, - resetMaster, - getApplicationOwner, -}; +const axios = require('axios'); +const { io } = require('socket.io-client'); +const log = require('./log'); +/** + * [getApplicationSpecs Retrieves app specifications] + * @param {string} appName [description] + * @return {Array} [description] + */ +async function getApplicationSpecs(appName) { + try { + const fluxnodeList = await axios.get(`https://api.runonflux.io/apps/appspecifications/${appName}`, { timeout: 13456 }); + if (fluxnodeList.data.status === 'success') { + return fluxnodeList.data.data || []; + } + return []; + } catch (e) { + log.error(e); + return []; + } +} + +/** + * [getApplicationSpecs Retrieves app specifications] + * @param {string} appName [description] + * @return {Array} [description] + */ +async function getApplicationOwner(appName) { + try { + const fluxnodeList = await axios.get(`https://api.runonflux.io/apps/appspecifications/${appName}`, { timeout: 13456 }); + if (fluxnodeList.data.status === 'success') { + return fluxnodeList.data.data.owner || []; + } + return []; + } catch (e) { + log.error(e); + return []; + } +} +/** + * [getApplicationIP Retrieves IP's that a given application is running on] + * @param {string} appName [description] + * @return {Array} [description] + */ +async function getApplicationIP(appName) { + try { + const fluxnodeList = await axios.get(`https://api.runonflux.io/apps/location/${appName}`, { timeout: 13456 }); + if (fluxnodeList.data.status === 'success') { + return fluxnodeList.data.data || []; + } + return []; + } catch (e) { + log.error(e); + return []; + } +} + +/** + * [validateApp] + * @param {string} ip [description] + * @param {string} appName [description] + * @return {boolean} [description] + */ +async function validateApp(appName, ip, port = 16127) { + try { + const result = await axios.get(`http://${ip}:${port}/apps/listallapps`, { timeout: 13456 }); + const appList = result.data; + // console.log(appList.data); + if (appList.status === 'success') { + let isValid = true; + for (let i = 0; i < appList.data.length; i += 1) { + if (appList.data[i].Names[0].endsWith(`_${appName}`) && appList.data[i].State !== 'running') { isValid = false; break; } + // log.info(`${appList.data[i].Names[0]} : ${appList.data[i].State}`); + } + // log.info(isValid); + return isValid; + } + return false; + } catch (e) { + log.error(e); + return false; + } +} + +/** + * [getMaster ] + * @param {string} ip [description] + * @param {string} port [description] + * @return {json} [description] + */ +async function getMaster(ip, port) { + try { + return new Promise((resolve) => { + const client = io.connect(`http://${ip}:${port}`, { transports: ['websocket', 'polling'], reconnection: false, timeout: 10000 }); + const timeout = setTimeout(() => { + client.disconnect(); + resolve(null); + }, 10000); + client.emit('getMaster', (response) => { + client.disconnect(); + clearTimeout(timeout); + resolve(response.message); + }); + }); + } catch (e) { + log.error(e); + return []; + } +} + +/** + * [getMyIp] + * @param {string} ip [description] + * @param {string} port [description] + * @return {json} [description] + */ +async function getMyIp(ip, port) { + try { + return new Promise((resolve) => { + const client = io.connect(`http://${ip}:${port}`, { transports: ['websocket', 'polling'], reconnection: false, timeout: 2000 }); + + const timeout = setTimeout(() => { + client.disconnect(); + resolve(null); + }, 2000); + client.on('connect_error', (reason) => { + log.error(reason); + clearTimeout(timeout); + resolve(null); + }); + client.emit('getMyIp', (response) => { + client.disconnect(); + clearTimeout(timeout); + resolve(response.message); + }); + }); + } catch (e) { + log.error('socket connection failed.'); + log.error(e); + return null; + } +} + +/** + * [getMyIp] + * @param {string} ip [description] + * @param {string} port [description] + * @return {json} [description] + */ +async function getStatus(ip, port, timeoutTime = 1000) { + try { + return new Promise((resolve) => { + const client = io.connect(`http://${ip}:${port}`, { transports: ['websocket', 'polling'], reconnection: false, timeout: timeoutTime }); + + const timeout = setTimeout(() => { + log.info('connection timed out'); + client.disconnect(); + resolve(null); + }, timeoutTime); + client.on('connect_error', (reason) => { + log.info('connection Error'); + log.error(reason); + clearTimeout(timeout); + resolve(null); + }); + client.emit('getStatus', (response) => { + // console.log(response); + client.disconnect(); + clearTimeout(timeout); + resolve(response); + }); + }); + } catch (e) { + log.error('socket connection failed.'); + log.error(e); + return null; + } +} + +/** + * [resetMaster] + * @param {string} ip [description] + * @param {string} port [description] + * @return {json} [description] + */ +async function resetMaster(ip, port) { + try { + return new Promise((resolve) => { + const client = io.connect(`http://${ip}:${port}`, { transports: ['websocket', 'polling'], reconnection: false, timeout: 2000 }); + + const timeout = setTimeout(() => { + log.info('connection timed out'); + client.disconnect(); + resolve(null); + }, 2000); + client.on('connect_error', (reason) => { + log.info('connection Error'); + log.error(reason); + clearTimeout(timeout); + resolve(null); + }); + client.emit('resetMaster', (response) => { + // console.log(response); + client.disconnect(); + clearTimeout(timeout); + resolve(response); + }); + }); + } catch (e) { + log.error('socket connection failed.'); + log.error(e); + return null; + } +} + +/** + * [getBackLog] + * @param {string} index [description] + * @param {socket} socket [description] + * @return {json} [description] + */ +async function getBackLog(index, socket) { + try { + return new Promise((resolve) => { + socket.emit('getBackLog', index, (response) => { + // log.info(JSON.stringify(response)); + resolve(response); + }); + }); + } catch (e) { + log.error(e); + return null; + } +} +/** + * [ask for query] + * @param {string} index [description] + * @param {socket} socket [description] + * @return {json} [description] + */ +async function askQuery(index, socket) { + try { + return new Promise((resolve) => { + socket.emit('askQuery', index, (response) => { + log.info(JSON.stringify(response), 'magenta'); + resolve(response); + }); + }); + } catch (e) { + log.error(e); + return null; + } +} + +/** + * [shareKeys] + * @param {string} pubKey [description] + * @param {socket} socket [description] + * @return {json} [description] + */ +async function shareKeys(pubKey, socket) { + try { + return new Promise((resolve) => { + socket.emit('shareKeys', pubKey, (response) => { + resolve(response); + }); + }); + } catch (e) { + log.error(e); + return null; + } +} + +/** + * [updateKey] + * @param {string} key [description] + * @param {socket} socket [description] + * @return {json} [description] + */ +async function updateKey(key, value, socket) { + try { + return new Promise((resolve) => { + socket.emit('updateKey', key, value, (response) => { + resolve(response); + }); + }); + } catch (e) { + log.error(e); + return null; + } +} + +/** + * [getKeys] + * @param {socket} socket [description] + * @return {json} [description] + */ +async function getKeys(socket) { + try { + return new Promise((resolve) => { + socket.emit('getKeys', (response) => { + resolve(response); + }); + }); + } catch (e) { + log.error(e); + return null; + } +} + +module.exports = { + getApplicationIP, + getApplicationSpecs, + validateApp, + getMaster, + getMyIp, + getStatus, + getBackLog, + shareKeys, + updateKey, + getKeys, + askQuery, + resetMaster, + getApplicationOwner, +}; diff --git a/lib/mysqldump.js b/lib/mysqldump.js index 06c189f..31c3a86 100644 --- a/lib/mysqldump.js +++ b/lib/mysqldump.js @@ -8,6 +8,7 @@ var mysql = require('mysql2'); var sqlstring = require('sqlstring'); var zlib = require('zlib'); var mysql$1 = require('mysql2/promise'); +const log = require('./log'); /*! ***************************************************************************** Copyright (c) Microsoft Corporation. All rights reserved. @@ -541,163 +542,188 @@ function buildInsert(table, values, format$$1) { return sql.replace(/NOFORMAT_WRAP\("##(.+?)##"\)/g, '$1'); } function buildInsertValue(row, table) { + return `(${table.columnsOrdered.map(c => row[c]).join(',')})`; } function executeSql(connection, sql) { return new Promise((resolve, reject) => connection.query(sql, err => err ? /* istanbul ignore next */ reject(err) : resolve())); } // eslint-disable-next-line complexity -function getDataDump(connectionOptions, options, tables, dumpToFile) { - return __awaiter(this, void 0, void 0, function* () { - // ensure we have a non-zero max row option - options.maxRowsPerInsertStatement = Math.max(options.maxRowsPerInsertStatement, 0); - // clone the array - tables = [...tables]; - // build the format function if requested - const format$$1 = options.format - ? (sql) => sqlformatter.format(sql) - : (sql) => sql; - // we open a new connection with a special typecast function for dumping data - const connection = mysql.createConnection(deepmerge.all([ - connectionOptions, - { - multipleStatements: true, - typeCast: typeCast(tables), - }, - ])); - const retTables = []; - let currentTableLines = null; - // open the write stream (if configured to) - const outFileStream = dumpToFile - ? fs.createWriteStream(dumpToFile, { - flags: 'a', - encoding: 'utf8', - }) - : null; - function saveChunk(str, inArray = true) { - if (!Array.isArray(str)) { - str = [str]; +function getDataDump(connectionOptions, options, tables, dumpToFile, newConnection) { + return __awaiter(this, void 0, void 0, function* () { + // Ensure we have a non-zero max row option + options.maxRowsPerInsertStatement = Math.max(options.maxRowsPerInsertStatement, 0); + // Clone the array + tables = [...tables]; + // Build the format function if requested + const format$$1 = options.format + ? (sql) => sqlformatter.format(sql) + : (sql) => sql; + // Open a new connection with a special typecast function for dumping data + const connection = mysql.createConnection(deepmerge.all([ + connectionOptions, + { + multipleStatements: true, + typeCast: typeCast(tables), + }, + ])); + const retTables = []; + let currentTableLines = null; + // Open the write stream (if configured to) + const outFileStream = dumpToFile + ? fs.createWriteStream(dumpToFile, { + flags: 'a', + encoding: 'utf8', + }) + : null; + + function saveChunk(str, inArray = true) { + if (!Array.isArray(str)) { + str = [str]; + } + // Write to file if configured + if (outFileStream) { + str.forEach(async s => { + outFileStream.write(`${s}\n`); + }); + if (saveChunk.counter === undefined) { + saveChunk.counter = 0; } - // Function to introduce delay - function delay(ms) { - return new Promise(resolve => setTimeout(resolve, ms)); + saveChunk.counter++; + if (saveChunk.counter % 5000 === 0) { + const stats = fs.statSync(outFileStream.path); + const fileSizeInBytes = stats.size; + const fileSizeInMegabytes = fileSizeInBytes / (1024 * 1024); + log.info(`Current dump file size: ${fileSizeInMegabytes.toFixed(2)} MB ...`); } - // write to file if configured - if (outFileStream) { - str.forEach(async s => { - await delay(20); // 20ms delay - outFileStream.write(`${s}\n`); - }); + } + // Write to memory if configured + if (inArray && currentTableLines) { + str.forEach(async s => { + currentTableLines.push(s); + }); + } + } + + try { + if (options.lockTables) { + // See: https://dev.mysql.com/doc/refman/5.7/en/replication-solutions-backups-read-only.html + yield executeSql(connection, 'FLUSH TABLES WITH READ LOCK'); + yield executeSql(connection, 'SET GLOBAL read_only = ON'); + } + // To avoid having to load an entire DB's worth of data at once, we select from each table individually + // Note that we use async/await within this loop to only process one table at a time (to reduce memory footprint) + while (tables.length > 0) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const table = tables.shift(); + if (table.isView && !options.includeViewData) { + // Don't dump data for views + retTables.push(deepmerge.all([ + table, + { + data: null, + }, + ])); + // eslint-disable-next-line no-continue + continue; + } + currentTableLines = options.returnFromFunction ? [] : null; + if (retTables.length > 0) { + // Add a newline before the next header to pad the dumps + saveChunk(''); } - // write to memory if configured - if (inArray && currentTableLines) { - str.forEach(async s => { - await delay(20); // 20ms delay - currentTableLines.push(s); + if (options.verbose) { + // Write the table header to the file + const header = [ + '# ------------------------------------------------------------', + `# DATA DUMP FOR TABLE: ${table.name}${options.lockTables ? ' (locked)' : ''}`, + '# ------------------------------------------------------------', + '', + ]; + saveChunk(header); + } + + // Get the total number of rows in the table + const countQuery = `SELECT COUNT(*) AS total FROM \`${table.name}\`${options.where[table.name] ? ` WHERE ${options.where[table.name]}` : ''}`; + const countResult = yield newConnection.query(countQuery); + const totalRows = countResult[0].total; + const chunkSize = 500; // Number of rows to fetch at a time + let offset = 0; + + // Fetch and process data in chunks + while (offset < totalRows) { + const where = options.where[table.name] + ? ` WHERE ${options.where[table.name]}` + : ''; + const query = `SELECT * FROM \`${table.name}\`${where} LIMIT ${chunkSize} OFFSET ${offset}`; + // log.info(`Running ${query}`); + + // eslint-disable-next-line no-await-in-loop + yield new Promise((resolve, reject) => { + const rowQueue = []; + const chunkQuery = connection.query(query); + // Stream the data to the file + chunkQuery.on('result', (row) => { + // Build the values list + rowQueue.push(buildInsertValue(row, table)); + // If we've got a full queue + if (rowQueue.length === options.maxRowsPerInsertStatement) { + // Create and write a fresh statement + const insert = buildInsert(table, rowQueue, format$$1); + saveChunk(insert); + rowQueue.length = 0; // Clear the queue + } + }); + chunkQuery.on('end', () => { + // Write the remaining rows to disk + if (rowQueue.length > 0) { + const insert = buildInsert(table, rowQueue, format$$1); + saveChunk(insert); + rowQueue.length = 0; // Clear the queue + } + resolve(); + }); + chunkQuery.on('error', + /* istanbul ignore next */ err => reject(err)); }); + + offset += chunkSize; // Move to the next chunk + + // Add a 100ms delay before the next query + yield new Promise((resolve) => setTimeout(resolve, 75)); } + + // Update the table definition + retTables.push(deepmerge.all([ + table, + { + data: currentTableLines + ? currentTableLines.join('\n') + : null, + }, + ])); } - try { - if (options.lockTables) { - // see: https://dev.mysql.com/doc/refman/5.7/en/replication-solutions-backups-read-only.html - yield executeSql(connection, 'FLUSH TABLES WITH READ LOCK'); - yield executeSql(connection, 'SET GLOBAL read_only = ON'); - } - // to avoid having to load an entire DB's worth of data at once, we select from each table individually - // note that we use async/await within this loop to only process one table at a time (to reduce memory footprint) - while (tables.length > 0) { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const table = tables.shift(); - if (table.isView && !options.includeViewData) { - // don't dump data for views - retTables.push(deepmerge.all([ - table, - { - data: null, - }, - ])); - // eslint-disable-next-line no-continue - continue; - } - currentTableLines = options.returnFromFunction ? [] : null; - if (retTables.length > 0) { - // add a newline before the next header to pad the dumps - saveChunk(''); - } - if (options.verbose) { - // write the table header to the file - const header = [ - '# ------------------------------------------------------------', - `# DATA DUMP FOR TABLE: ${table.name}${options.lockTables ? ' (locked)' : ''}`, - '# ------------------------------------------------------------', - '', - ]; - saveChunk(header); - } - // eslint-disable-next-line no-await-in-loop - yield new Promise((resolve, reject) => { - // send the query - const where = options.where[table.name] - ? ` WHERE ${options.where[table.name]}` - : ''; - const query = connection.query(`SELECT * FROM \`${table.name}\`${where}`); - let rowQueue = []; - // stream the data to the file - query.on('result', (row) => { - // build the values list - rowQueue.push(buildInsertValue(row, table)); - // if we've got a full queue - if (rowQueue.length === options.maxRowsPerInsertStatement) { - // create and write a fresh statement - const insert = buildInsert(table, rowQueue, format$$1); - saveChunk(insert); - rowQueue = []; - } - }); - query.on('end', () => { - // write the remaining rows to disk - if (rowQueue.length > 0) { - const insert = buildInsert(table, rowQueue, format$$1); - saveChunk(insert); - rowQueue = []; - } - resolve(); - }); - query.on('error', - /* istanbul ignore next */ err => reject(err)); - }); - // update the table definition - retTables.push(deepmerge.all([ - table, - { - data: currentTableLines - ? currentTableLines.join('\n') - : null, - }, - ])); - } - saveChunk(''); - } - finally { - if (options.lockTables) { - // see: https://dev.mysql.com/doc/refman/5.7/en/replication-solutions-backups-read-only.html - yield executeSql(connection, 'SET GLOBAL read_only = OFF'); - yield executeSql(connection, 'UNLOCK TABLES'); - } - } - // clean up our connections - yield connection.end(); - if (outFileStream) { - // tidy up the file stream, making sure writes are 100% flushed before continuing - yield new Promise(resolve => { - outFileStream.once('finish', () => { - resolve(); - }); - outFileStream.end(); - }); - } - return retTables; - }); + saveChunk(''); + } finally { + if (options.lockTables) { + // See: https://dev.mysql.com/doc/refman/5.7/en/replication-solutions-backups-read-only.html + yield executeSql(connection, 'SET GLOBAL read_only = OFF'); + yield executeSql(connection, 'UNLOCK TABLES'); + } + } + // Clean up our connections + yield connection.end(); + if (outFileStream) { + // Tidy up the file stream, making sure writes are 100% flushed before continuing + yield new Promise(resolve => { + outFileStream.once('finish', () => { + resolve(); + }); + outFileStream.end(); + }); + } + return retTables; + }); } function compressFile(filename) { @@ -930,8 +956,9 @@ function main(inputOptions) { // dump data if requested if (options.dump.data !== false) { // don't even try to run the data dump + connection = yield DB.connect(deepmerge.all([options.connection, { multipleStatements: true }])); const tables = res.tables; - res.tables = yield getDataDump(options.connection, options.dump.data, tables, options.dumpToFile); + res.tables = yield getDataDump(options.connection, options.dump.data, tables, options.dumpToFile, connection); res.dump.data = res.tables .map(t => t.data) .filter(t => t) diff --git a/ui/index.html b/ui/index.html index 1d70e3e..ac16dc3 100644 --- a/ui/index.html +++ b/ui/index.html @@ -276,7 +276,7 @@