diff --git a/libraries/BatchedUpdateAggregator.js b/libraries/BatchedUpdateAggregator.js index 46c08a12d..2844783d5 100644 --- a/libraries/BatchedUpdateAggregator.js +++ b/libraries/BatchedUpdateAggregator.js @@ -3,6 +3,8 @@ * are all moving around at once. We establish a short window interval (e.g. 100ms), and all /batchedUpdate * messages that are received within that same window are grouped together. Pending messages are sent to * all clients a single time per interval (rather than individually as soon as they are received). + * The interval adjusts dynamically based on the number of active clients, as well as the average round-trip + * time that each client has self-reported; this helps to maintain speed when network isn't congested. */ class BatchedUpdateAggregator { /** @@ -10,7 +12,7 @@ class BatchedUpdateAggregator { * @param {Object} [options] - Optional configuration object. * @param {number} [options.minAggregationIntervalMs=33] - The minimum interval (in milliseconds) between aggregations. Defaults to 33ms. * @param {number} [options.maxAggregationIntervalMs=1000] - The maximum interval (in milliseconds) between aggregations. Defaults to 1000ms. - * @param {number} [options.rollingWindowSize=30] - The size of the rolling window used to calculate the average or peak number of active clients. Defaults to 30. + * @param {number} [options.rollingWindowSize=30] - The size of the rolling window used to calculate the peak number of active clients. Defaults to 30. */ constructor(broadcastCallback, options = { minAggregationIntervalMs: 33, maxAggregationIntervalMs: 1000, rollingWindowSize: 30 }) { if (typeof options.minAggregationIntervalMs !== 'number' || options.minAggregationIntervalMs <= 0) { @@ -32,14 +34,49 @@ class BatchedUpdateAggregator { this.broadcastCallback = broadcastCallback; this.updateBuffer = []; this.aggregationTimer = null; - this.activeClients = new Set(); - this.activeClientsHistory = []; + this.activeClients = new Set(); // this stores the list of clientIds active in the current time interval + this.activeClientsHistory = []; // this stores the *number* of active clients per time window, going into the past this.rollingWindowSize = options.rollingWindowSize; + this.clientRTTs = new Map(); // Store round-trip times for each client + this.areClientsReportingRTTs = false; // backwards compatible if clients aren't using the new /status RTT tracker this.ENABLE_LOGGING = false; + + // Start periodic pruning of inactive clients + this.RTT_EXPIRY_TIME = 10000; // 10 seconds – this controls how long in the past the average RTT is based on + this._startPruningInactiveClients(); + } + + /** + * Call this with the round-trip-time of a client to make the batchedUpdate include that in its congestion calculations + * @public + * + * @param {string} clientId + * @param {number} rtt + */ + trackClientRTT(clientId, rtt) { + const currentTime = Date.now(); + this.areClientsReportingRTTs = true; + + if (!this.clientRTTs.has(clientId)) { + this.clientRTTs.set(clientId, []); + } + + const rttHistory = this.clientRTTs.get(clientId); + + // Store RTT along with the current timestamp + rttHistory.push({ rtt, timestamp: currentTime }); + + // Remove RTT entries older than a certain threshold + this.clientRTTs.set(clientId, rttHistory.filter(entry => currentTime - entry.timestamp < this.RTT_EXPIRY_TIME)); } - // Method to handle incoming updates + /** + * Method to handle incoming updates + * @public + * + * @param update + */ addUpdate(update) { if (!update || !update.batchedUpdates || !Array.isArray(update.batchedUpdates)) { console.warn('Invalid update format received'); @@ -52,23 +89,26 @@ class BatchedUpdateAggregator { } // this.updateBuffer.push(update); - this.pushUpdateToBuffer(update); + this._pushUpdateToBuffer(update); // Dynamically adjust the aggregation interval based on the rolling average of active clients - this.adjustAggregationInterval(); + this._adjustAggregationInterval(); // Start the timer if it's not already running if (!this.aggregationTimer) { this.aggregationTimer = setTimeout(() => { - this.aggregateAndSendUpdates(); + this._aggregateAndSendUpdates(); this.aggregationTimer = null; // Reset the timer }, this.aggregationIntervalMs); } } - // there's no reason to send multiple messages updating the same property, so remove old identical queued messages - // before adding the newest one to the updateBuffer. These will be sent out in `aggregateAndSendUpdates` - pushUpdateToBuffer(update) { + /** + * There's no reason to send multiple messages updating the same property, so remove old identical queued messages + * before adding the newest one to the updateBuffer. These will be sent out in `_aggregateAndSendUpdates` + * @param {Object} update + */ + _pushUpdateToBuffer(update) { const { batchedUpdates } = update; // Loop over each update in the batchedUpdates array @@ -93,13 +133,66 @@ class BatchedUpdateAggregator { this.updateBuffer.push(update); } + /** + * Calculates the average round-trip-time across all clients over the past RTT_EXPIRY_TIME milliseconds + * @return {number} + */ + _getAverageRTT() { + let totalRTT = 0; + let count = 0; + const currentTime = Date.now(); + + this.clientRTTs.forEach(rttHistory => { + // Filter out expired RTT entries before calculating + const recentRTTs = rttHistory.filter(entry => currentTime - entry.timestamp < this.RTT_EXPIRY_TIME); + + if (recentRTTs.length > 0) { + totalRTT += recentRTTs.reduce((sum, entry) => sum + entry.rtt, 0); + count += recentRTTs.length; + } + }); + + return count > 0 ? totalRTT / count : 0; + } + + /** + * Removes any reported round-trip-times that are older than RTT_EXPIRY_TIME milliseconds, + * so that old values and disconnected clients no longer influence the current traffic interval + */ + _startPruningInactiveClients() { + // Prune inactive clients every 30 seconds + setInterval(() => { + const currentTime = Date.now(); + this.clientRTTs.forEach((rttHistory, clientId) => { + // If all RTT entries are expired, remove the client + const recentRTTs = rttHistory.filter(entry => currentTime - entry.timestamp < this.RTT_EXPIRY_TIME); + if (recentRTTs.length === 0) { + this.clientRTTs.delete(clientId); + } else { + // Otherwise, update the RTT list for that client + this.clientRTTs.set(clientId, recentRTTs); + } + }); + + if (this.ENABLE_LOGGING) { + console.log('Pruned inactive clients from RTT tracking.'); + } + }, this.RTT_EXPIRY_TIME); // Pruning interval + } - // Track active clients and maintain a rolling average + /** + * Adds a clientId to the active clients, so that the number of active clients can be computed for this time interval. + * @param {string} clientId + */ _trackActiveClient(clientId) { this.activeClients.add(clientId); } - // Update the rolling average of active clients + /** + * Updates a rolling list of the number of clients sending messages per time interval, + * for the past `rollingWindowSize` number of time intervals. + * @param {number} clientCount + */ _updateClientHistory(clientCount) { if (this.activeClientsHistory.length >= this.rollingWindowSize) { this.activeClientsHistory.shift(); // Remove the oldest entry @@ -107,37 +200,57 @@ class BatchedUpdateAggregator { this.activeClientsHistory.push(clientCount); } - // rolling average doesn't work as well ask peak usage over the window. defaults to 1 user. - getPeakClientCount() { + /** + * Gives peak number of users sending messages over the rolling window. Defaults to 1 user. + * Note: I've found this to work better than rolling average for responding to congestion. + * @return {number} + */ + _getPeakClientCount() { return Math.max(...this.activeClientsHistory, 1); } - adjustAggregationInterval() { - const peakClientCount = this.getPeakClientCount(); + /** + * Dynamically adjusts the tick-rate of the system to attempt to balance speed and congestion, + * based on the number of active clients and the self-reported RTTs of each client. + */ + _adjustAggregationInterval() { + const peakClientCount = this._getPeakClientCount(); + const averageRTT = this._getAverageRTT(); // try getMedianRTT as well? + + // Dynamically increase the time interval to reduce traffic; it increases based on two factors: + + // 1. Begin by calculating the number of user-to-user connections (assumes each message broadcasts to every client). + // Cap this factor at a threshold to prevent it from getting too slow purely due to number of clients + const MAX_MS_DELAY_DUE_TO_CLIENT_NUMBER = this.areClientsReportingRTTs ? 150 : 1000; + let interval = Math.min(peakClientCount * (peakClientCount - 1), MAX_MS_DELAY_DUE_TO_CLIENT_NUMBER); - // Set your desired BANDWIDTH_CAP (number of messages per time window) and TIME_WINDOW (e.g., 5 seconds) - const BANDWIDTH_CAP = 1000; // Empirically/arbitrarily chosen – adjust based on your capacity - const TIME_WINDOW = 5000; // 5 seconds in milliseconds + // 2. Introduce a scaling factor that adds to the base interval based on higher round-trip-time values + const RTT_SCALING_FACTOR = 1.5; // Adjusts sensitivity to RTT; can be tuned to more aggressively dampen traffic + interval += (averageRTT * RTT_SCALING_FACTOR); - // Calculate the interval needed to keep traffic under the BANDWIDTH_CAP - // This is based on the following formula, if every message broadcasts to every client: - // `bandwidth_used = (time_window / interval) * N * (N - 1)` - // this increases quadratically: 30ms when 3 clients, 100ms when 5, 450ms when 10, caps at 1000 when 15+ - let interval = (TIME_WINDOW * peakClientCount * (peakClientCount - 1)) / BANDWIDTH_CAP; + // a valid RTT will never be 0, so if we see 0 that means everyone is lagging so much that all of the RTTs expired + if (averageRTT === 0 && this.areClientsReportingRTTs) { + // in this case, max out the interval to try to decongest the server + interval = this.maxAggregationIntervalMs; + } - // Clamp the interval to be within the min and max bounds + // Clamp the interval to be within the min and max bounds, e.g. between 1fps and 30fps this.aggregationIntervalMs = Math.max( this.minAggregationIntervalMs, Math.min(this.maxAggregationIntervalMs, interval) ); if (this.ENABLE_LOGGING && this.aggregationIntervalMs !== this.minAggregationIntervalMs) { - console.log(`Adjusted aggregation interval to ${this.aggregationIntervalMs}ms based on ${peakClientCount} peak active clients.`); + console.log(`Adjusted aggregation interval to ${this.aggregationIntervalMs.toFixed(2)}ms based on ${peakClientCount} peak active clients and ${averageRTT.toFixed(2)}ms average RTT.`); } } - // Method to aggregate multiple batchedUpdates into a single batchedUpdate and broadcast them - aggregateAndSendUpdates() { + /** + * Aggregate all of the batchedUpdates received in the current interval (stored in `updateBuffer`) + * into a single batchedUpdate message, broadcast the message to all clients, + * and prepare state for the next time interval + */ + _aggregateAndSendUpdates() { if (this.updateBuffer.length === 0) { return; } diff --git a/server.js b/server.js index d4137014c..a5cc6ec1d 100644 --- a/server.js +++ b/server.js @@ -633,7 +633,11 @@ var sockets = { // For realtime updates, rather than sending N^2 messages when many clients are updating at once, // aggregate them at send at most one aggregate message per small interval. const BatchedUpdateAggregator = require('./libraries/BatchedUpdateAggregator'); -const updateAggregator = new BatchedUpdateAggregator(broadcastAggregatedUpdates); +const updateAggregator = new BatchedUpdateAggregator(broadcastAggregatedUpdates, { + minAggregationIntervalMs: 33, + maxAggregationIntervalMs: 1000, + rollingWindowSize: 10 +}); // Define the callback function to broadcast updates function broadcastAggregatedUpdates(aggregatedUpdates) { for (const entry of realityEditorUpdateSocketSubscriptions) { @@ -1985,8 +1989,22 @@ function objectWebServer() { webServer.use('/spatial', spatialRouter.router); webServer.use('/history', historyRouter.router); + /** + * Checks whether the server is online. Can be used by clients to also calculate the round-tip-time to the server. + * Clients can optionally include prevRTT and clientId in the query params, and the server will track the RTTs. + * This helps to deal with server congestion. + */ webServer.get('/status', function(req, res) { - res.sendStatus(200); // OK + // Check if the RTT parameter exists in the query string + const clientRTT = parseFloat(req.query.prevRTT); + const clientId = req.query.clientId; + + if (typeof clientRTT === 'number' && !isNaN(clientRTT) && clientId) { + // Track the client's RTT in the BatchedUpdateAggregator + updateAggregator.trackClientRTT(clientId, clientRTT); + } + + res.sendStatus(200); // Respond OK }); // receivePost blocks can be triggered with a post request. *1 is the object *2 is the logic *3 is the link id