Skip to content

Commit

Permalink
Merge pull request #1129 from benptc/optimize-update-aggregator
Browse files Browse the repository at this point in the history
Optimize update aggregator
  • Loading branch information
benptc authored Sep 4, 2024
2 parents 3515834 + d76232a commit c0e6d5b
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 30 deletions.
169 changes: 141 additions & 28 deletions libraries/BatchedUpdateAggregator.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@
* are all moving around at once. We establish a short window interval (e.g. 100ms), and all /batchedUpdate
* messages that are received within that same window are grouped together. Pending messages are sent to
* all clients a single time per interval (rather than individually as soon as they are received).
* The interval adjusts dynamically based on the number of active clients, as well as the average round-trip
* time that each client has self-reported; this helps to maintain speed when network isn't congested.
*/
class BatchedUpdateAggregator {
/**
* @param {Function} broadcastCallback - The function to call with the aggregated updates every interval.
* @param {Object} [options] - Optional configuration object.
* @param {number} [options.minAggregationIntervalMs=33] - The minimum interval (in milliseconds) between aggregations. Defaults to 33ms.
* @param {number} [options.maxAggregationIntervalMs=1000] - The maximum interval (in milliseconds) between aggregations. Defaults to 1000ms.
* @param {number} [options.rollingWindowSize=30] - The size of the rolling window used to calculate the average or peak number of active clients. Defaults to 30.
* @param {number} [options.rollingWindowSize=30] - The size of the rolling window used to calculate the peak number of active clients. Defaults to 30.
*/
constructor(broadcastCallback, options = { minAggregationIntervalMs: 33, maxAggregationIntervalMs: 1000, rollingWindowSize: 30 }) {
if (typeof options.minAggregationIntervalMs !== 'number' || options.minAggregationIntervalMs <= 0) {
Expand All @@ -32,14 +34,49 @@ class BatchedUpdateAggregator {
this.broadcastCallback = broadcastCallback;
this.updateBuffer = [];
this.aggregationTimer = null;
this.activeClients = new Set();
this.activeClientsHistory = [];
this.activeClients = new Set(); // this stores the list of clientIds active in the current time interval
this.activeClientsHistory = []; // this stores the *number* of active clients per time window, going into the past
this.rollingWindowSize = options.rollingWindowSize;
this.clientRTTs = new Map(); // Store round-trip times for each client
this.areClientsReportingRTTs = false; // backwards compatible if clients aren't using the new /status RTT tracker

this.ENABLE_LOGGING = false;

// Start periodic pruning of inactive clients
this.RTT_EXPIRY_TIME = 10000; // 10 seconds – this controls how long in the past the average RTT is based on
this._startPruningInactiveClients();
}

/**
* Call this with the round-trip-time of a client to make the batchedUpdate include that in its congestion calculations
* @public
*
* @param {string} clientId
* @param {number} rtt
*/
trackClientRTT(clientId, rtt) {
const currentTime = Date.now();
this.areClientsReportingRTTs = true;

if (!this.clientRTTs.has(clientId)) {
this.clientRTTs.set(clientId, []);
}

const rttHistory = this.clientRTTs.get(clientId);

// Store RTT along with the current timestamp
rttHistory.push({ rtt, timestamp: currentTime });

// Remove RTT entries older than a certain threshold
this.clientRTTs.set(clientId, rttHistory.filter(entry => currentTime - entry.timestamp < this.RTT_EXPIRY_TIME));
}

// Method to handle incoming updates
/**
* Method to handle incoming updates
* @public
*
* @param update
*/
addUpdate(update) {
if (!update || !update.batchedUpdates || !Array.isArray(update.batchedUpdates)) {
console.warn('Invalid update format received');
Expand All @@ -52,23 +89,26 @@ class BatchedUpdateAggregator {
}

// this.updateBuffer.push(update);
this.pushUpdateToBuffer(update);
this._pushUpdateToBuffer(update);

// Dynamically adjust the aggregation interval based on the rolling average of active clients
this.adjustAggregationInterval();
this._adjustAggregationInterval();

// Start the timer if it's not already running
if (!this.aggregationTimer) {
this.aggregationTimer = setTimeout(() => {
this.aggregateAndSendUpdates();
this._aggregateAndSendUpdates();
this.aggregationTimer = null; // Reset the timer
}, this.aggregationIntervalMs);
}
}

// there's no reason to send multiple messages updating the same property, so remove old identical queued messages
// before adding the newest one to the updateBuffer. These will be sent out in `aggregateAndSendUpdates`
pushUpdateToBuffer(update) {
/**
* There's no reason to send multiple messages updating the same property, so remove old identical queued messages
* before adding the newest one to the updateBuffer. These will be sent out in `_aggregateAndSendUpdates`
* @param {Object} update
*/
_pushUpdateToBuffer(update) {
const { batchedUpdates } = update;

// Loop over each update in the batchedUpdates array
Expand All @@ -93,51 +133,124 @@ class BatchedUpdateAggregator {
this.updateBuffer.push(update);
}

/**
* Calculates the average round-trip-time across all clients over the past RTT_EXPIRY_TIME milliseconds
* @return {number}
*/
_getAverageRTT() {
let totalRTT = 0;
let count = 0;
const currentTime = Date.now();

this.clientRTTs.forEach(rttHistory => {
// Filter out expired RTT entries before calculating
const recentRTTs = rttHistory.filter(entry => currentTime - entry.timestamp < this.RTT_EXPIRY_TIME);

if (recentRTTs.length > 0) {
totalRTT += recentRTTs.reduce((sum, entry) => sum + entry.rtt, 0);
count += recentRTTs.length;
}
});

return count > 0 ? totalRTT / count : 0;
}

/**
* Removes any reported round-trip-times that are older than RTT_EXPIRY_TIME milliseconds,
* so that old values and disconnected clients no longer influence the current traffic interval
*/
_startPruningInactiveClients() {
// Prune inactive clients every 30 seconds
setInterval(() => {
const currentTime = Date.now();
this.clientRTTs.forEach((rttHistory, clientId) => {
// If all RTT entries are expired, remove the client
const recentRTTs = rttHistory.filter(entry => currentTime - entry.timestamp < this.RTT_EXPIRY_TIME);
if (recentRTTs.length === 0) {
this.clientRTTs.delete(clientId);
} else {
// Otherwise, update the RTT list for that client
this.clientRTTs.set(clientId, recentRTTs);
}
});

if (this.ENABLE_LOGGING) {
console.log('Pruned inactive clients from RTT tracking.');
}
}, this.RTT_EXPIRY_TIME); // Pruning interval
}

// Track active clients and maintain a rolling average
/**
* Adds a clientId to the active clients, so that the number of active clients can be computed for this time interval.
* @param {string} clientId
*/
_trackActiveClient(clientId) {
this.activeClients.add(clientId);
}

// Update the rolling average of active clients
/**
* Updates a rolling list of the number of clients sending messages per time interval,
* for the past `rollingWindowSize` number of time intervals.
* @param {number} clientCount
*/
_updateClientHistory(clientCount) {
if (this.activeClientsHistory.length >= this.rollingWindowSize) {
this.activeClientsHistory.shift(); // Remove the oldest entry
}
this.activeClientsHistory.push(clientCount);
}

// rolling average doesn't work as well ask peak usage over the window. defaults to 1 user.
getPeakClientCount() {
/**
* Gives peak number of users sending messages over the rolling window. Defaults to 1 user.
* Note: I've found this to work better than rolling average for responding to congestion.
* @return {number}
*/
_getPeakClientCount() {
return Math.max(...this.activeClientsHistory, 1);
}

adjustAggregationInterval() {
const peakClientCount = this.getPeakClientCount();
/**
* Dynamically adjusts the tick-rate of the system to attempt to balance speed and congestion,
* based on the number of active clients and the self-reported RTTs of each client.
*/
_adjustAggregationInterval() {
const peakClientCount = this._getPeakClientCount();
const averageRTT = this._getAverageRTT(); // try getMedianRTT as well?

// Dynamically increase the time interval to reduce traffic; it increases based on two factors:

// 1. Begin by calculating the number of user-to-user connections (assumes each message broadcasts to every client).
// Cap this factor at a threshold to prevent it from getting too slow purely due to number of clients
const MAX_MS_DELAY_DUE_TO_CLIENT_NUMBER = this.areClientsReportingRTTs ? 150 : 1000;
let interval = Math.min(peakClientCount * (peakClientCount - 1), MAX_MS_DELAY_DUE_TO_CLIENT_NUMBER);

// Set your desired BANDWIDTH_CAP (number of messages per time window) and TIME_WINDOW (e.g., 5 seconds)
const BANDWIDTH_CAP = 1000; // Empirically/arbitrarily chosen – adjust based on your capacity
const TIME_WINDOW = 5000; // 5 seconds in milliseconds
// 2. Introduce a scaling factor that adds to the base interval based on higher round-trip-time values
const RTT_SCALING_FACTOR = 1.5; // Adjusts sensitivity to RTT; can be tuned to more aggressively dampen traffic
interval += (averageRTT * RTT_SCALING_FACTOR);

// Calculate the interval needed to keep traffic under the BANDWIDTH_CAP
// This is based on the following formula, if every message broadcasts to every client:
// `bandwidth_used = (time_window / interval) * N * (N - 1)`
// this increases quadratically: 30ms when 3 clients, 100ms when 5, 450ms when 10, caps at 1000 when 15+
let interval = (TIME_WINDOW * peakClientCount * (peakClientCount - 1)) / BANDWIDTH_CAP;
// a valid RTT will never be 0, so if we see 0 that means everyone is lagging so much that all of the RTTs expired
if (averageRTT === 0 && this.areClientsReportingRTTs) {
// in this case, max out the interval to try to decongest the server
interval = this.maxAggregationIntervalMs;
}

// Clamp the interval to be within the min and max bounds
// Clamp the interval to be within the min and max bounds, e.g. between 1fps and 30fps
this.aggregationIntervalMs = Math.max(
this.minAggregationIntervalMs,
Math.min(this.maxAggregationIntervalMs, interval)
);

if (this.ENABLE_LOGGING && this.aggregationIntervalMs !== this.minAggregationIntervalMs) {
console.log(`Adjusted aggregation interval to ${this.aggregationIntervalMs}ms based on ${peakClientCount} peak active clients.`);
console.log(`Adjusted aggregation interval to ${this.aggregationIntervalMs.toFixed(2)}ms based on ${peakClientCount} peak active clients and ${averageRTT.toFixed(2)}ms average RTT.`);
}
}

// Method to aggregate multiple batchedUpdates into a single batchedUpdate and broadcast them
aggregateAndSendUpdates() {
/**
* Aggregate all of the batchedUpdates received in the current interval (stored in `updateBuffer`)
* into a single batchedUpdate message, broadcast the message to all clients,
* and prepare state for the next time interval
*/
_aggregateAndSendUpdates() {
if (this.updateBuffer.length === 0) {
return;
}
Expand Down
22 changes: 20 additions & 2 deletions server.js
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,11 @@ var sockets = {
// For realtime updates, rather than sending N^2 messages when many clients are updating at once,
// aggregate them at send at most one aggregate message per small interval.
const BatchedUpdateAggregator = require('./libraries/BatchedUpdateAggregator');
const updateAggregator = new BatchedUpdateAggregator(broadcastAggregatedUpdates);
const updateAggregator = new BatchedUpdateAggregator(broadcastAggregatedUpdates, {
minAggregationIntervalMs: 33,
maxAggregationIntervalMs: 1000,
rollingWindowSize: 10
});
// Define the callback function to broadcast updates
function broadcastAggregatedUpdates(aggregatedUpdates) {
for (const entry of realityEditorUpdateSocketSubscriptions) {
Expand Down Expand Up @@ -1985,8 +1989,22 @@ function objectWebServer() {
webServer.use('/spatial', spatialRouter.router);
webServer.use('/history', historyRouter.router);

/**
* Checks whether the server is online. Can be used by clients to also calculate the round-tip-time to the server.
* Clients can optionally include prevRTT and clientId in the query params, and the server will track the RTTs.
* This helps to deal with server congestion.
*/
webServer.get('/status', function(req, res) {
res.sendStatus(200); // OK
// Check if the RTT parameter exists in the query string
const clientRTT = parseFloat(req.query.prevRTT);
const clientId = req.query.clientId;

if (typeof clientRTT === 'number' && !isNaN(clientRTT) && clientId) {
// Track the client's RTT in the BatchedUpdateAggregator
updateAggregator.trackClientRTT(clientId, clientRTT);
}

res.sendStatus(200); // Respond OK
});

// receivePost blocks can be triggered with a post request. *1 is the object *2 is the logic *3 is the link id
Expand Down

0 comments on commit c0e6d5b

Please sign in to comment.