A broadcaster liblary for actix-ws that includes grouping and conditional broadcasting.
This liblary provides grouping and broadcasting mechanism for brand new websocket liblary of Actix-Web Ecosystem. You have individual Connection
for each Session
implementation of actix-ws, will be identified as the given id. And there is also rooms exist, which benefits to group related connections on a single entity.
Add that line to your Cargo.toml
file:
actix-ws-broadcaster = "0.7.1"
use actix_wsb::Broadcaster;
The best way to initialize the broadcaster is initialize it
in entry point of the app and share it via HashMap
, web::Data
,
etc.
let broadcaster = Broadcaster::new();
It returns an Arc<RwLock<Broadcaster>>
. Which means you can pass it between threads.
We have very basic api, when you get the broadcaster in websocket controller, you'll handle all the whole grouping work with only one line of code:
// you handle the socket initially with actix_ws api's:
let (response, session, mut msg_stream) = actix_ws::handle(&req, body)?;
// get the broadcaster, then handle it:
// the room_id and connection_id has to be string:
let broadcaster = Broadcaster::handle(&broadcaster, &room_id, &connection_id, session);
Note: You have to do broadcasting in same broadcaster instance, don't clone it. Otherwise it could cause data race.
In the loop of websocket, if a message received, you can broadcast it by that code:
Message::Text(msg) => {
// we get the broadcaster for each message with write access:
let mut writeable_broadcaster = broadcaster.write().unwrap();
// broadcast it.
writeable_broadcaster.room(&room_id).broadcast(msg.to_string()).await;
}
If you want to broadcast message conditionally, you can use
.broadcast_if()
and .broadcast_if_not()
methods.
In .broadcast_if()
method, if given condition on the closure is true
per each connection, broadcastes the message.
// broadcast for all:
writeable_broadcaster.room(&room_id).broadcast_if(msg.to_string(), |connection| true).await;
And i also implemented the reverse version of that function, broadcastes if given condition is false:
// broadcast for all:
writeable_broadcaster.room(&room_id).broadcast_if_not(msg.to_string(), |connection| false).await;
If a client disconnects, you should remove their assigned connection by that code:
Message::Close(reason) => {
// warning, that closes and removes all the connections but not removes the room:
let _ = get_broadcaster.write().unwrap().room(&room_id).close(reason).await;
// if you want to remove a room with removing all the connections, use this instead:
let _ = get_broadcaster.write().unwrap().remove_room(&room_id).await;
// if you want to remove a single connection with given id, use this:
let _ = get_broadcaster.write().unwrap().room(&room_id).close_conn(reason, &id).await;
// warning, this is the old and deprecated way:
let _ = broadcaster.write().unwrap().remove_connection(id).unwrap().close(reason).await;
// stop listening messages and break the loop if
// a connection is removed.
break;
},
To try it yourself, run that command: cargo run --example example
,
than go to the http://localhost:5000
address on a firefox based
browser(such as firefox, librewolf etc.). Because chromium based
browsers don't support to send query parameters to websockets from
the javascript, our front-end configuration don't work on them.
In real world scenarios, you have to provide room and connection
id's with different approach.
Issues, suggestions and pull requests are welcome.