diff --git a/CHANGELOG.md b/CHANGELOG.md index 5470358..0e9d224 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - Refactor middlewares and authorization check - Change `cds.ws` to point to CDS websocket server (not the native implementation, use `cds.wss` or `cds.io` for that) +- Fix maxListeners issue for `ws` implementation ## Version 0.4.0 - 2024-01-26 diff --git a/src/socket/ws.js b/src/socket/ws.js index faa9cd3..d2ccd55 100644 --- a/src/socket/ws.js +++ b/src/socket/ws.js @@ -11,6 +11,7 @@ class SocketWSServer extends SocketServer { constructor(server, path) { super(server, path); this.wss = new WebSocket.Server({ server }); + this.services = {}; cds.ws = this; cds.wss = this.wss; } @@ -18,15 +19,18 @@ class SocketWSServer extends SocketServer { async setup() { await this.applyAdapter(); this._middlewares = this.middlewares(); + this.wss.on("connection", async (ws, request) => { + const service = this.services[request?.url]; + if (service) { + service(ws, request); + } + }); } service(service, connected) { this.adapter?.on(service); - this.wss.on("connection", async (ws, request) => { + this.services[`${this.path}${service}`] = (ws, request) => { ws.request = request; - if (ws.request?.url !== `${this.path}${service}`) { - return; - } DEBUG?.("Connected"); ws.on("close", () => { DEBUG?.("Disconnected"); @@ -34,6 +38,22 @@ class SocketWSServer extends SocketServer { ws.on("error", (error) => { LOG?.error(error); }); + const events = {}; + ws.on("message", async (message) => { + let payload = {}; + try { + payload = JSON.parse(message); + } catch (_) { + // ignore + } + try { + for (const callback of events[payload?.event] || []) { + await callback(payload.data); + } + } catch (err) { + LOG?.error(err); + } + }); this.applyMiddleware(ws, async () => { try { const facade = { @@ -52,21 +72,8 @@ class SocketWSServer extends SocketServer { }; }, on: (event, callback) => { - ws.on("message", async (message) => { - let payload = {}; - try { - payload = JSON.parse(message); - } catch (_) { - // ignore - } - try { - if (payload?.event === event) { - await callback(payload.data); - } - } catch (err) { - LOG?.error(err); - } - }); + events[event] ||= []; + events[event].push(callback); }, emit: (event, data) => { ws.send( @@ -91,7 +98,7 @@ class SocketWSServer extends SocketServer { LOG?.error(err); } }); - }); + } } async broadcast(service, event, data, socket, remote) {