Skip to content

Commit

Permalink
feat: Add documentation for Server event messaging (#232)
Browse files Browse the repository at this point in the history
  • Loading branch information
SandPod authored Jan 17, 2025
1 parent b19084c commit 59df747
Show file tree
Hide file tree
Showing 34 changed files with 476 additions and 176 deletions.
51 changes: 7 additions & 44 deletions docs/06-concepts/15-streams.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
# Streams and messaging
# Streams

For some applications, it's not enough to be able to call server-side methods. You may also want to push data from the server to the client or send data two-way. Examples include real-time games or chat applications. Luckily, Serverpod supports a framework for streaming data. It's possible to stream any serialized objects to or from any endpoint.

Serverpod supports two ways to stream data. The first approach, [streaming methods](#streaming-methods), imitates how `Streams` work in Dart and offers a simple interface that automatically handles the connection with the server. In contrast, the second approach, [streaming endpoint](#streaming-endpoints), requires developers to manage the web socket connection. The second approach was Serverpod's initial solution for streaming data but will be removed in future updates.

:::tip

For a real-world example, check out [Pixorama](https://pixorama.live). It's a multi-user drawing experience showcasing Serverpod's real-time capabilities and comes with complete source code.

:::

## Streaming Methods

When an endpoint method is defined with `Stream` instead of `Future` as the return type or includes `Stream` as a method parameter, it is recognized as a streaming method. Streaming methods transmit data over a shared, self-managed web socket connection that automatically connects and disconnects from the server.
Expand Down Expand Up @@ -138,49 +144,6 @@ Future<void> streamOpened(StreamingSession session) async {

You can access the user object at any time by calling the `getUserObject` method. The user object is automatically discarded when a session ends.

#### Internal server messaging

A typical scenario when working with streams is to pass on messages from one user to another. For instance, if one client sends a chat message to the server, the server should send it to the correct user. Serverpod comes with a built-in messaging system that makes this easy. You can pass messages locally on a single server, but messages are passed through Redis by default. Passing the messages through Redis makes it possible to send the messages between multiple servers in a cluster.

In most cases, it's easiest to subscribe to a message channel in the `streamOpened` method. The subscription will automatically be disposed of when the stream is closed. The following example will forward any message sent to a user identified by its user id.

```dart
@override
Future<void> streamOpened(StreamingSession session) async {
final authenticationInfo = await session.authenticated;
final userId = authenticationInfo?.userId;
session.messages.addListener(
'user_$userId',
(message) {
sendStreamMessage(session, message);
},
);
}
```

In your `handleStreamMessage` method, you can pass on messages to the correct channel.

```dart
@override
Future<void> handleStreamMessage(
StreamingSession session,
SerializableModel message,
) async {
if (message is MyChatMessage) {
session.messages.postMessage(
'user_${message.recipientId}',
message,
);
}
}
```

:::tip

For a real-world example, check out [Pixorama](https://pixorama.live). It's a multi-user drawing experience showcasing Serverpod's real-time capabilities and comes with complete source code.

:::

### Handling streams in your app

Before you can access streams in your client, you need to connect to the server's web socket. You do this by calling connectWebSocket on your client.
Expand Down
110 changes: 110 additions & 0 deletions docs/06-concepts/16-server-events.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# Server events

Serverpod framework comes with a built-in event messaging system. This enables efficient message exchange within and across servers, making it ideal for scenarios where shared state is needed, such as coordinating streams or managing data across a server cluster.

The event message system is accessed on the `Session` object through the field `messages`.

## Quick Reference

Here is a quick reference to the key messaging methods:

| Method | Description |
|--------|---------|
| `postMessage` | Send a message to a channel. |
| `addListener` | Add a listener to a channel. |
| `removeListener` | Remove a listener from a channel. |
| `createStream` | Create a stream that listens to a channel. |
| `revokeAuthentication` | Revoke authentication tokens. |

## Sending messages

To send a message, you can use the `postMessage` method. The message is published to the specified channel and needs to be a Serverpod model.

```dart
var message = UserUpdate(); // Model that represents changes to user data.
session.messages.postMessage('user_updates', message);
```

In the example above, the message published on the `user_updates` channel. Any subscriber to this channel in the server will receive the message.

### Global messages

Serverpod uses Redis to pass messages between servers. To send a message to another server, enable Redis and then set the `global` parameter to `true` when posting a message.

```dart
var message = UserUpdate(); // Model that represents changes to user data.
session.messages.postMessage('user_updates', message, global: true);
```

In the example above, the message is published to the `user_updates` channel and will be received by all servers connected to the same Redis instance.

:::warning

If Redis is not enabled, sending global messages will throw an exception.

:::

## Receiving messages

Receiving messages is just as simple as sending them! Serverpod provides two ways to handle incoming messages: by creating a stream that subscribes to a channel or by adding a listener to a channel.

### Creating a stream

To create a stream that subscribes to a channel, use the `createStream` method. The stream will emit a value whenever a message is posted to the channel.

```dart
var stream = session.messages.createStream('user_updates');
stream.listen((message) {
print('Received message: $message');
})
```

In the above example, a stream is created that listens to the `user_updates` channel and processes incoming requests.

#### Stream lifecycle

The stream is automatically closed when the session is closed. If you want to close the stream manually, you simply cancel the stream subscription.

```dart
var stream = session.messages.createStream('user_updates');
var subscription = stream.listen((message) {
print('Received message: $message');
});
subscription.cancel();
```

In the above example, the stream is first created and then canceled.

### Adding a listener

To add a listener to a channel, use the `addListener` method. The listener will be called whenever a message is posted to the channel.

```dart
session.messages.addListener('user_updates', (message) {
print('Received message: $message');
});
```

In the above example, the listener will be called whenever a message is posted to the `user_updates` channel. The listener will be called regardless if a message is published globally by another server or internally by the same server.

#### Listener lifecycle

The listener is automatically removed when the session is closed. To manually remove a listener, use the `removeListener` method.

```dart
var myListenerCallback = (message) {
print('Received message: $message');
};
// Register the listener
session.messages.addListener('user_updates', myListenerCallback);
// Remove the listener
session.messages.removeListener('user_updates', myListenerCallback);
```

In the above example, the listener is first added and then removed from the `user_updates` channel.

## Revoke authentication

The messaging interface also exposes a method for revoking authentication. For more details on handling revoked authentication, refer to the section on [handling revoked authentication](authentication/custom-overrides#Handling-revoked-authentication).
File renamed without changes.
File renamed without changes.
File renamed without changes.
59 changes: 15 additions & 44 deletions versioned_docs/version-2.1.0/06-concepts/15-streams.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
# Streams and messaging
# Streams

For some applications, it's not enough to be able to call server-side methods. You may also want to push data from the server to the client or send data two-way. Examples include real-time games or chat applications. Luckily, Serverpod supports a framework for streaming data. It's possible to stream any serialized objects to or from any endpoint.

Serverpod supports two ways to stream data. The first approach, [streaming methods](#streaming-methods), imitates how `Streams` work in Dart and offers a simple interface that automatically handles the connection with the server. In contrast, the second approach, [streaming endpoint](#streaming-endpoints), requires developers to manage the web socket connection. The second approach was Serverpod's initial solution for streaming data but will be removed in future updates.

:::tip

For a real-world example, check out [Pixorama](https://pixorama.live). It's a multi-user drawing experience showcasing Serverpod's real-time capabilities and comes with complete source code.

:::

## Streaming Methods

When an endpoint method is defined with `Stream` instead of `Future` as the return type or includes `Stream` as a method parameter, it is recognized as a streaming method. Streaming methods transmit data over a shared, self-managed web socket connection that automatically connects and disconnects from the server.
Expand Down Expand Up @@ -65,6 +71,14 @@ Streams in parameters are closed when the stream is closed. This can be done by

All streams in parameters are closed when the method call is over.

### Authentication

Authentication is seamlessly integrated into streaming method calls. When a client initiates a streaming method, the server automatically authenticates the session.

Authentication is validated when the stream is first established, utilizing the authentication data stored in the `Session` object. If a user's authentication is subsequently revoked—requiring denial of access to the stream—the stream will be promptly closed, and an exception will be thrown.

For more details on handling revoked authentication, refer to the section on [handling revoked authentication](authentication/custom-overrides#Handling-revoked-authentication).

### Error handling

Error handling works just like in regular endpoint methods in Serverpod. If an exception is thrown on a stream, the stream is closed with an exception. If the exception thrown is a serializable exception, the exception is first serialized and passed over the stream before it is closed.
Expand Down Expand Up @@ -130,49 +144,6 @@ Future<void> streamOpened(StreamingSession session) async {

You can access the user object at any time by calling the `getUserObject` method. The user object is automatically discarded when a session ends.

#### Internal server messaging

A typical scenario when working with streams is to pass on messages from one user to another. For instance, if one client sends a chat message to the server, the server should send it to the correct user. Serverpod comes with a built-in messaging system that makes this easy. You can pass messages locally on a single server, but messages are passed through Redis by default. Passing the messages through Redis makes it possible to send the messages between multiple servers in a cluster.

In most cases, it's easiest to subscribe to a message channel in the `streamOpened` method. The subscription will automatically be disposed of when the stream is closed. The following example will forward any message sent to a user identified by its user id.

```dart
@override
Future<void> streamOpened(StreamingSession session) async {
final authenticationInfo = await session.authenticated;
final userId = authenticationInfo?.userId;
session.messages.addListener(
'user_$userId',
(message) {
sendStreamMessage(session, message);
},
);
}
```

In your `handleStreamMessage` method, you can pass on messages to the correct channel.

```dart
@override
Future<void> handleStreamMessage(
StreamingSession session,
SerializableModel message,
) async {
if (message is MyChatMessage) {
session.messages.postMessage(
'user_${message.recipientId}',
message,
);
}
}
```

:::tip

For a real-world example, check out [Pixorama](https://pixorama.live). It's a multi-user drawing experience showcasing Serverpod's real-time capabilities and comes with complete source code.

:::

### Handling streams in your app

Before you can access streams in your client, you need to connect to the server's web socket. You do this by calling connectWebSocket on your client.
Expand Down
110 changes: 110 additions & 0 deletions versioned_docs/version-2.1.0/06-concepts/16-server-events.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# Server events

Serverpod framework comes with a built-in event messaging system. This enables efficient message exchange within and across servers, making it ideal for scenarios where shared state is needed, such as coordinating streams or managing data across a server cluster.

The event message system is accessed on the `Session` object through the field `messages`.

## Quick Reference

Here is a quick reference to the key messaging methods:

| Method | Description |
|--------|---------|
| `postMessage` | Send a message to a channel. |
| `addListener` | Add a listener to a channel. |
| `removeListener` | Remove a listener from a channel. |
| `createStream` | Create a stream that listens to a channel. |
| `revokeAuthentication` | Revoke authentication tokens. |

## Sending messages

To send a message, you can use the `postMessage` method. The message is published to the specified channel and needs to be a Serverpod model.

```dart
var message = UserUpdate(); // Model that represents changes to user data.
session.messages.postMessage('user_updates', message);
```

In the example above, the message published on the `user_updates` channel. Any subscriber to this channel in the server will receive the message.

### Global messages

Serverpod uses Redis to pass messages between servers. To send a message to another server, enable Redis and then set the `global` parameter to `true` when posting a message.

```dart
var message = UserUpdate(); // Model that represents changes to user data.
session.messages.postMessage('user_updates', message, global: true);
```

In the example above, the message is published to the `user_updates` channel and will be received by all servers connected to the same Redis instance.

:::warning

If Redis is not enabled, sending global messages will throw an exception.

:::

## Receiving messages

Receiving messages is just as simple as sending them! Serverpod provides two ways to handle incoming messages: by creating a stream that subscribes to a channel or by adding a listener to a channel.

### Creating a stream

To create a stream that subscribes to a channel, use the `createStream` method. The stream will emit a value whenever a message is posted to the channel.

```dart
var stream = session.messages.createStream('user_updates');
stream.listen((message) {
print('Received message: $message');
})
```

In the above example, a stream is created that listens to the `user_updates` channel and processes incoming requests.

#### Stream lifecycle

The stream is automatically closed when the session is closed. If you want to close the stream manually, you simply cancel the stream subscription.

```dart
var stream = session.messages.createStream('user_updates');
var subscription = stream.listen((message) {
print('Received message: $message');
});
subscription.cancel();
```

In the above example, the stream is first created and then canceled.

### Adding a listener

To add a listener to a channel, use the `addListener` method. The listener will be called whenever a message is posted to the channel.

```dart
session.messages.addListener('user_updates', (message) {
print('Received message: $message');
});
```

In the above example, the listener will be called whenever a message is posted to the `user_updates` channel. The listener will be called regardless if a message is published globally by another server or internally by the same server.

#### Listener lifecycle

The listener is automatically removed when the session is closed. To manually remove a listener, use the `removeListener` method.

```dart
var myListenerCallback = (message) {
print('Received message: $message');
};
// Register the listener
session.messages.addListener('user_updates', myListenerCallback);
// Remove the listener
session.messages.removeListener('user_updates', myListenerCallback);
```

In the above example, the listener is first added and then removed from the `user_updates` channel.

## Revoke authentication

The messaging interface also exposes a method for revoking authentication. For more details on handling revoked authentication, refer to the section on [handling revoked authentication](authentication/custom-overrides#Handling-revoked-authentication).
Loading

0 comments on commit 59df747

Please sign in to comment.