Skip to content

Commit

Permalink
Fix maxListeners issue for ws implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
oklemenz2 committed Jan 26, 2024
1 parent 52f87a1 commit 40628fb
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

- Refactor middlewares and authorization check
- Change `cds.ws` to point to CDS websocket server (not the native implementation, use `cds.wss` or `cds.io` for that)
- Fix maxListeners issue for `ws` implementation

## Version 0.4.0 - 2024-01-26

Expand Down
47 changes: 27 additions & 20 deletions src/socket/ws.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,49 @@ class SocketWSServer extends SocketServer {
constructor(server, path) {
super(server, path);
this.wss = new WebSocket.Server({ server });
this.services = {};
cds.ws = this;
cds.wss = this.wss;
}

async setup() {
await this.applyAdapter();
this._middlewares = this.middlewares();
this.wss.on("connection", async (ws, request) => {
const service = this.services[request?.url];
if (service) {
service(ws, request);
}
});
}

service(service, connected) {
this.adapter?.on(service);
this.wss.on("connection", async (ws, request) => {
this.services[`${this.path}${service}`] = (ws, request) => {
ws.request = request;
if (ws.request?.url !== `${this.path}${service}`) {
return;
}
DEBUG?.("Connected");
ws.on("close", () => {
DEBUG?.("Disconnected");
});
ws.on("error", (error) => {
LOG?.error(error);
});
const events = {};
ws.on("message", async (message) => {
let payload = {};
try {
payload = JSON.parse(message);
} catch (_) {
// ignore
}
try {
for (const callback of events[payload?.event] || []) {
await callback(payload.data);
}
} catch (err) {
LOG?.error(err);
}
});
this.applyMiddleware(ws, async () => {
try {
const facade = {
Expand All @@ -52,21 +72,8 @@ class SocketWSServer extends SocketServer {
};
},
on: (event, callback) => {
ws.on("message", async (message) => {
let payload = {};
try {
payload = JSON.parse(message);
} catch (_) {
// ignore
}
try {
if (payload?.event === event) {
await callback(payload.data);
}
} catch (err) {
LOG?.error(err);
}
});
events[event] ||= [];
events[event].push(callback);
},
emit: (event, data) => {
ws.send(
Expand All @@ -91,7 +98,7 @@ class SocketWSServer extends SocketServer {
LOG?.error(err);
}
});
});
}
}

async broadcast(service, event, data, socket, remote) {
Expand Down

0 comments on commit 40628fb

Please sign in to comment.