Skip to content

Commit

Permalink
Event Tenant Isolation
Browse files Browse the repository at this point in the history
Event Context Concept
Refactor Broadcast
Readme
  • Loading branch information
oklemenz2 committed Jan 29, 2024
1 parent 40628fb commit a5f90da
Show file tree
Hide file tree
Showing 35 changed files with 786 additions and 182 deletions.
6 changes: 4 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,19 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## Version 0.5.0 - 2024-02-xx
## Version 0.5.0 - 2024-01-29

### Added

- Provide user context in examples and tests, to verify authorization flow
- Introduce optional `context` concept to broadcast to a client subset via annotation `@websocket.context` or `@websocket.ws`

### Fixed

- Respect tenant isolation for event broadcasting
- Fix maxListeners issue for `ws` implementation
- Refactor middlewares and authorization check
- Change `cds.ws` to point to CDS websocket server (not the native implementation, use `cds.wss` or `cds.io` for that)
- Fix maxListeners issue for `ws` implementation

## Version 0.4.0 - 2024-01-26

Expand Down
63 changes: 63 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 +175,76 @@ Events can be directly emitted via the native `socket`, bypassing CDS runtime, i
For each server websocket connection the standard CDS middlewares are applied. That means, that especially the correct CDS
context is set up and the configured authorization strategy is applied.

### Tenant Isolation

WebSockets are processed tenant aware. Especially for broadcasting events tenant isolation is ensured, that only
websocket clients connected for the same tenant are notified in tenant context. Tenant isolation is also ensured
over remote distribution via Redis.

### Authentication & Authorization

Authentication only works via AppRouter (e.g. using a UAA configuration), as the auth token is forwarded
via authorization header bearer token by AppRouter to backend instance. CDS middlewares process the auth token and
set the auth info accordingly. Authorization scopes are checked as defined in the CDS services `@requires` annotations and
authorization restrictions are checked as defined in the CDS services `@restrict` annotations.

### Event Contexts

It is possible to broadcast events to a subset of clients. By entering or existing contexts, the server can be instructed to
determined based on the event data, to which subset of clients the event shall be emitted. To specify which data parts of the
event are leveraged for setting up the context, the annotation `@websocket.context` or `@ws.context` is available on
event data element level (alternatives include `@websocket.broadcast.context` or `@ws.broadcast.context`):

```cds
event received {
@websocket.context
ID: UUID;
text: String;
}
```

This sets up the event context based on the unique ID of the event data. The annotation can be used on multiple event
data elements setting up different event contexts in parallel, if event shall be broadcast/emitted into multiple contexts at the same time.

To manage event contexts the following options exist:

- **Server side**: Call websocket service facade
- CDS context object `req` exposes the websocket facade via `req.context.ws.service` providing the following context functions
- **Enter Context**: `enter(context)` - Enter the current server socket into the passed context
- **Exit Context**: `exit(context)` - Exit the current server socket from the passed context
- **Client side**: Emit `wsContext` event from client socket
- **Enter Context**:
- WS Standard:
```js
socket.send(JSON.stringify({ event: "wsContext", data: { context: "..." } }));
```
- Socket.IO:
```js
socket.emit("wsContext", { context: "..." });
```
- **Exit**:
- WS Standard:
```js
socket.send(JSON.stringify({ event: "wsContext", data: { context: "...", exit: true } }));
```
- Socket.IO:
```js
socket.emit("wsContext", { context: "...", exit: true });
```

Multiple contexts can be entered for the same server socket at the same time. Furthermore, a service operation named
`wsContext` is invoked, if existing on the websocket enabled CDS service. Event context isolation is also ensured
over remote distribution via Redis.

For Socket.IO (`kind: socket.io`) contexts are implemented leveraging [Socket.IO rooms](https://socket.io/docs/v4/rooms/).

### Connect & Disconnect

Every time a server socket is connected via websocket client, the CDS service is notified by calling the corresponding service operation:

- `Connect`: Invoke service operation `wsConnect`, if available
- `Disconnect`: Invoke service operation `wsDisconnect`, if available

#### Approuter

Authorization in provided in production by approuter component (e.g. via XSUAA auth).
Expand Down
2 changes: 1 addition & 1 deletion src/adapter/redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class RedisAdapter {
await this.client.subscribe(channel, async (message, messageChannel) => {
try {
if (messageChannel === channel) {
await this.server.broadcast(service, message);
await this.server.broadcast({ service, event: message });
}
} catch (err) {
LOG?.error(err);
Expand Down
84 changes: 66 additions & 18 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const LOG = cds.log("websocket");
const WebSocketAction = {
Connect: "wsConnect",
Disconnect: "wsDisconnect",
Context: "wsContext",
};

let socketServer;
Expand Down Expand Up @@ -135,7 +136,16 @@ function bindServiceEvents(socketServer, servicePath, service) {
service.on(event, async (req) => {
const localEventName = serviceLocalName(service, event.name);
try {
await socketServer.broadcast(servicePath, localEventName, req.data, null, true);
const contexts = deriveContexts(event, req.data);
await socketServer.broadcast({
service: servicePath,
event: localEventName,
data: req.data,
tenant: req.tenant,
contexts,
socket: null,
remote: true,
});
} catch (err) {
LOG?.error(err);
}
Expand All @@ -145,15 +155,30 @@ function bindServiceEvents(socketServer, servicePath, service) {

function bindServiceDefaults(socket, service) {
if (service.operations(WebSocketAction.Disconnect).length) {
socket.on("disconnect", async (reason) => {
socket.onDisconnect(async (reason) => {
await processEvent(socket, service, undefined, WebSocketAction.Disconnect, { reason });
});
}
socket.on(WebSocketAction.Context, async (data, callback) => {
if (!data?.exit) {
await socket.enter(data?.context);
} else {
await socket.exit(data?.context);
}
if (service.operations(WebSocketAction.Context).length) {
await processEvent(socket, service, undefined, WebSocketAction.Context, data, callback);
} else {
callback && callback();
}
});
}

function bindServiceOperations(socket, service) {
for (const operation of service.operations()) {
const event = serviceLocalName(service, operation.name);
if (Object.values(WebSocketAction).includes(event)) {
continue;
}
socket.on(event, async (data, callback) => {
await processEvent(socket, service, undefined, event, data, callback);
});
Expand All @@ -164,9 +189,9 @@ function bindServiceEntities(socket, service) {
for (const entity of service.entities()) {
const localEntityName = serviceLocalName(service, entity.name);
socket.on(`${localEntityName}:create`, async (data, callback) => {
await processEvent(socket, service, entity, "create", data, (response) => {
await processEvent(socket, service, entity, "create", data, async (response) => {
callback && callback(response);
broadcast(socket, `${localEntityName}:created`, entity, response);
await broadcast(socket, `${localEntityName}:created`, entity, response);
});
});
socket.on(`${localEntityName}:read`, async (data, callback) => {
Expand All @@ -176,15 +201,15 @@ function bindServiceEntities(socket, service) {
await processEvent(socket, service, entity, "readDeep", data, callback);
});
socket.on(`${localEntityName}:update`, async (data, callback) => {
await processEvent(socket, service, entity, "update", data, (response) => {
await processEvent(socket, service, entity, "update", data, async (response) => {
callback && callback(response);
broadcast(socket, `${localEntityName}:updated`, entity, response);
await broadcast(socket, `${localEntityName}:updated`, entity, response);
});
});
socket.on(`${localEntityName}:delete`, async (data, callback) => {
await processEvent(socket, service, entity, "delete", data, (response) => {
await processEvent(socket, service, entity, "delete", data, async (response) => {
callback && callback(response);
broadcast(socket, `${localEntityName}:deleted`, entity, { ...response, ...data });
await broadcast(socket, `${localEntityName}:deleted`, entity, { ...response, ...data });
});
});
socket.on(`${localEntityName}:list`, async (data, callback) => {
Expand All @@ -208,17 +233,17 @@ async function emitConnect(socket, service) {
async function processEvent(socket, service, entity, event, data, callback) {
try {
const response = await call(socket, service, entity, event, data);
callback && callback(response);
callback && (await callback(response));
} catch (err) {
LOG?.error(err);
try {
callback &&
callback({
(await callback({
error: {
code: err.code || err.status || err.statusCode,
message: err.message,
},
});
}));
} catch (err) {
LOG?.error(err);
}
Expand Down Expand Up @@ -258,20 +283,21 @@ async function call(socket, service, entity, event, data) {
});
}

function broadcast(socket, event, entity, data) {
async function broadcast(socket, event, entity, data) {
const contexts = deriveContexts(event, data);
if (entity["@websocket.broadcast.all"] || entity["@ws.broadcast.all"]) {
socket.broadcastAll(event, broadcastData(entity, data));
await socket.broadcastAll(event, broadcastData(entity, data), contexts);
} else {
socket.broadcast(event, broadcastData(entity, data));
await socket.broadcast(event, broadcastData(entity, data), contexts);
}
}

function broadcastData(entity, data) {
if (
(entity["@websocket.broadcast"] ||
entity["@websocket.broadcast.content"] ||
entity["@ws.broadcast"] ||
entity["@ws.broadcast.content"]) === "data"
(entity["@websocket.broadcast.content"] ||
entity["@ws.broadcast.content"] ||
entity["@websocket.broadcast"] ||
entity["@ws.broadcast"]) === "data"
) {
return data;
}
Expand All @@ -285,6 +311,28 @@ function deriveKey(entity, data) {
}, {});
}

function deriveContexts(event, data) {
const contexts = [];
let isContextEvent = false;
if (event.elements) {
for (const name in event.elements) {
const element = event.elements[name];
const context =
element["@websocket.context"] ||
element["@ws.context"] ||
element["@websocket.broadcast.context"] ||
element["@ws.broadcast.context"];
if (context) {
isContextEvent = true;
if (data[name] !== undefined && data[name] !== null) {
contexts.push(String(data[name]));
}
}
}
}
return isContextEvent ? contexts : undefined;
}

function getDeepEntityColumns(entity) {
const columns = [];
for (const element of Object.values(entity.elements)) {
Expand Down
4 changes: 2 additions & 2 deletions src/redis/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const createPrimaryClientAndConnect = () => {
const errorHandlerCreateClient = (err) => {
LOG?.error("Error from redis client for pub/sub failed", err);
primaryClientPromise = null;
setTimeout(createPrimaryClientAndConnect, TIMEOUT);
setTimeout(createPrimaryClientAndConnect, TIMEOUT).unref();
};
primaryClientPromise = _createClientAndConnect(errorHandlerCreateClient);
return primaryClientPromise;
Expand All @@ -37,7 +37,7 @@ const createSecondaryClientAndConnect = () => {
const errorHandlerCreateClient = (err) => {
LOG?.error("Error from redis client for pub/sub failed", err);
secondaryClientPromise = null;
setTimeout(createSecondaryClientAndConnect, TIMEOUT);
setTimeout(createSecondaryClientAndConnect, TIMEOUT).unref();
};
secondaryClientPromise = _createClientAndConnect(errorHandlerCreateClient);
return secondaryClientPromise;
Expand Down
31 changes: 23 additions & 8 deletions src/socket/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class SocketServer {
/**
* Connect a service to websocket
* @param {string} service service path, e.g. "/chat"
* @param {function} connected Callback function to be called on every websocket connection passing socket functions (i.e. ws.on("connection", connected))
* @param {function<object>} connected Callback function to be called on every websocket connection passing socket functions (i.e. ws.on("connection", connected)) passing the facade
*/
service(service, connected) {
const facade = {
Expand All @@ -48,28 +48,43 @@ class SocketServer {
};
},
on: (event, callback) => {},
emit: (event, data) => {},
broadcast: (event, data) => {},
broadcastAll: (event, data) => {},
emit: async (event, data, contexts) => {
return Promise.resolve();
},
broadcast: async (event, data, contexts) => {
return Promise.resolve();
},
broadcastAll: async (event, data, contexts) => {
return Promise.resolve();
},
enter: async (context) => {
return Promise.resolve();
},
exit: async (context) => {
return Promise.resolve();
},
disconnect() {},
onDisconnect: (callback) => {},
};
connected && connected(facade);
}

/**
* Broadcast to all websocket clients
* @param {string} service service path, e.g. "/chat"
* @param {string} event Event name
* @param {string} event Event name or message content (if data is not provided)
* @param {Object} data Data object
* @param {string} tenant Tenant
* @param {[string]} contexts Array of contexts
* @param {Object} socket Broadcast client to be excluded
* @param {boolean} remote Broadcast also remote (e.g. via redis)
* @returns {Promise<void>} Promise when broadcasting completed
*/
async broadcast(service, event, data, socket, remote) {}
async broadcast({ service, event, data, tenant, contexts, socket, remote }) {}

/**
* Handle HTTP request response
* @param {object} Server socket
* @param {object} socket Server socket
* @param {Number} statusCode Response status code
* @param {string} body Response body
*/
Expand All @@ -82,7 +97,7 @@ class SocketServer {
/**
* Close socket and disconnect client. If no socket is passed the server is closed
* @param {object} socket Socket to be disconnected
* @param {integer} code Reason code for close
* @param {Number} code Reason code for close
* @param {string} reason Reason text for close
*/
close(socket, code, reason) {}
Expand Down
Loading

0 comments on commit a5f90da

Please sign in to comment.