Skip to content

Commit

Permalink
fix: unsuccessful bind addr cause crash media node (8xFF#369)
Browse files Browse the repository at this point in the history
* fix: unsuccessful bind addr cause crash media node

* added warns log with unsuccessful bind addr

* refactor port config

* fixed crash when have more than 1 worker
  • Loading branch information
giangndm authored Jul 23, 2024
1 parent 6c0e943 commit fcf360b
Show file tree
Hide file tree
Showing 19 changed files with 140 additions and 115 deletions.
30 changes: 0 additions & 30 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,36 +101,6 @@ cargo build --release --package atm0s-media-server
./target/release/atm0s-media-server --help
```

### Run

Run first WebRTC node:

```bash
atm0s-media-server --http-port 3001 --zone-index=101 webrtc
```

After node1 started it will print out the node address like `101@/ip4/192.168.1.10/udp/10101/ip4/192.168.1.10/tcp/10101`, you can use it as a seed node for other nodes.

Run second WebRTC node:

```bash
atm0s-media-server --http-port 3002 --zone-index=102 --seeds FIRST_NODE_ADDR webrtc
```

Now two nodes will form a cluster and can be used for media streaming.

First media-server: http://localhost:3001/samples/whip/whip.html

Second media-server: http://localhost:3002/samples/whep/whep.html

You can use [Pregenerated-Token](./docs/getting-started/quick-start/whip-whep.md) to publish and play stream.

![Demo Screen](./docs/imgs/demo-screen.jpg)

Each node also has embedded monitoring page at `http://localhost:3001/dashboard/` and `http://localhost:3002/dashboard/`

![Monitoring](./docs/imgs/demo-monitor.png)

## Resources

- [Summary](./docs/SUMMARY.md)
Expand Down
2 changes: 2 additions & 0 deletions bin/console.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,6 @@ cargo run -- \
--node-id 0 \
--sdn-port 10000 \
--sdn-zone 0 \
--enable-private-ip \
--workers 2 \
console
2 changes: 2 additions & 0 deletions bin/gate_z0_n1.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ RUST_BACKTRACE=1 \
cargo run -- \
--http-port 3000 \
--node-id 1 \
--enable-private-ip \
--sdn-port 10001 \
--sdn-zone 0 \
--seeds 0@/ip4/127.0.0.1/udp/10000 \
--workers 2 \
gateway \
--lat 10 \
--lon 20 \
Expand Down
2 changes: 2 additions & 0 deletions bin/gate_z256_n1.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ RUST_BACKTRACE=1 \
cargo run -- \
--http-port 4000 \
--node-id 256 \
--enable-private-ip \
--sdn-zone 256 \
--sdn-port 11000 \
--seeds 0@/ip4/127.0.0.1/udp/10000 \
--workers 2 \
gateway \
--lat 20 \
--lon 30 \
Expand Down
3 changes: 2 additions & 1 deletion bin/media_z0_n2.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ RUST_BACKTRACE=1 \
cargo run -- \
--http-port 3002 \
--node-id 2 \
--enable-private-ip \
--sdn-port 10002 \
--sdn-zone 0 \
--seeds 1@/ip4/127.0.0.1/udp/10001 \
--workers 2 \
media \
--allow-private-ip \
--enable-token-api
3 changes: 2 additions & 1 deletion bin/media_z0_n3.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ RUST_BACKTRACE=1 \
cargo run -- \
--http-port 3003 \
--node-id 3 \
--enable-private-ip \
--sdn-port 10003 \
--sdn-zone 0 \
--seeds 1@/ip4/127.0.0.1/udp/10001 \
--workers 2 \
media \
--allow-private-ip \
--enable-token-api
3 changes: 2 additions & 1 deletion bin/media_z256_n1.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ RUST_BACKTRACE=1 \
cargo run -- \
--http-port 4001 \
--node-id 257 \
--enable-private-ip \
--sdn-port 11001 \
--sdn-zone 256 \
--seeds 256@/ip4/127.0.0.1/udp/11000 \
--workers 2 \
media \
--allow-private-ip \
--enable-token-api
3 changes: 2 additions & 1 deletion bin/media_z256_n2.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ RUST_BACKTRACE=1 \
cargo run -- \
--http-port 4002 \
--node-id 258 \
--enable-private-ip \
--sdn-port 11002 \
--sdn-zone 256 \
--seeds 256@/ip4/127.0.0.1/udp/11000 \
--workers 2 \
media \
--allow-private-ip \
--enable-token-api
2 changes: 1 addition & 1 deletion bin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ pub struct NodeConfig {
pub seeds: Vec<NodeAddr>,
pub bind_addrs: Vec<SocketAddr>,
pub zone: u32,
pub custom_addrs: Vec<SocketAddr>,
pub bind_addrs_alt: Vec<SocketAddr>,
}
58 changes: 36 additions & 22 deletions bin/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::net::SocketAddr;
use std::net::{IpAddr, SocketAddr};

use atm0s_media_server::{server, NodeConfig};
use atm0s_sdn::{NodeAddr, NodeId};
Expand All @@ -21,14 +21,6 @@ struct Args {
#[arg(env, long, default_value_t = 0)]
sdn_port: u16,

/// Sdn enable ipv6
#[arg(env, long)]
sdn_enable_ipv6: bool,

/// Custom Sdn addr
#[arg(env, long)]
sdn_custom_addrs: Vec<SocketAddr>,

/// Sdn Zone, which is 32bit number with last 8bit is 0
#[arg(env, long, default_value_t = 0)]
sdn_zone: u32,
Expand All @@ -37,6 +29,22 @@ struct Args {
#[arg(env, long, default_value_t = 1)]
node_id: NodeId,

/// Setting the single node IP option will disable the autodetect IP addresses logic
#[arg(env, long)]
node_ip: Option<IpAddr>,

/// Some alternative node IPs, which are useful with some cloud providers behind NAT, like AWS or GCP ...
#[arg(env, long)]
node_ip_alt: Vec<IpAddr>,

/// Enable private ip
#[arg(env, long)]
enable_private_ip: bool,

/// Enable ipv6
#[arg(env, long)]
enable_ipv6: bool,

/// Cluster Secret Key
#[arg(env, long, default_value = "insecure")]
secret: String,
Expand Down Expand Up @@ -66,27 +74,33 @@ async fn main() {

let http_port = args.http_port;
let workers = args.workers;
let bind_addrs = local_ip_address::list_afinet_netifas()
.expect("Should have list interfaces")
.into_iter()
.filter(|(_, ip)| {
if ip.is_unspecified() || ip.is_multicast() || (ip.is_ipv6() && !args.sdn_enable_ipv6) {
false
} else {
std::net::UdpSocket::bind(SocketAddr::new(*ip, 0)).is_ok()
}
})
.map(|(_name, ip)| SocketAddr::new(ip, args.sdn_port))
.collect::<Vec<_>>();
let bind_addrs = if let Some(ip) = args.node_ip {
vec![SocketAddr::new(ip, args.sdn_port)]
} else {
local_ip_address::list_afinet_netifas()
.expect("Should have list interfaces")
.into_iter()
.filter(|(_, ip)| {
let allow = match ip {
IpAddr::V4(ipv4) => !ipv4.is_private() || args.enable_private_ip,
IpAddr::V6(ipv6) => !ipv6.is_unspecified() && !ipv6.is_multicast() && (!ipv6.is_loopback() || args.enable_private_ip) && args.enable_ipv6,
};
allow && std::net::UdpSocket::bind(SocketAddr::new(*ip, 0)).is_ok()
})
.map(|(_name, ip)| SocketAddr::new(ip, args.sdn_port))
.collect::<Vec<_>>()
};
let node = NodeConfig {
node_id: args.node_id,
secret: args.secret,
seeds: args.seeds,
bind_addrs,
zone: args.sdn_zone,
custom_addrs: args.sdn_custom_addrs,
bind_addrs_alt: args.node_ip_alt.into_iter().map(|ip| SocketAddr::new(ip, args.sdn_port)).collect::<Vec<_>>(),
};

log::info!("Bind addrs {:?}, bind addrs alt {:?}", node.bind_addrs, node.bind_addrs_alt);

let local = tokio::task::LocalSet::new();
local
.run_until(async move {
Expand Down
2 changes: 1 addition & 1 deletion bin/src/server/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub async fn run_media_connector(workers: usize, node: NodeConfig, args: Args) {

let node_id = node.node_id;

let mut builder = SdnBuilder::<(), SC, SE, TC, TW, ClusterNodeInfo>::new(node_id, &node.bind_addrs, node.custom_addrs);
let mut builder = SdnBuilder::<(), SC, SE, TC, TW, ClusterNodeInfo>::new(node_id, &node.bind_addrs, node.bind_addrs_alt);
let node_addr = builder.node_addr();
let node_info = ClusterNodeInfo::Connector(ClusterNodeGenericInfo {
addr: node_addr.to_string(),
Expand Down
2 changes: 1 addition & 1 deletion bin/src/server/console.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub async fn run_console_server(workers: usize, http_port: Option<u16>, node: No
let storage = StorageShared::default();

let node_id = node.node_id;
let mut builder = SdnBuilder::<(), SC, SE, TC, TW, ClusterNodeInfo>::new(node_id, &node.bind_addrs, node.custom_addrs);
let mut builder = SdnBuilder::<(), SC, SE, TC, TW, ClusterNodeInfo>::new(node_id, &node.bind_addrs, node.bind_addrs_alt);
let node_addr = builder.node_addr();

builder.set_authorization(StaticKeyAuthorization::new(&node.secret));
Expand Down
2 changes: 1 addition & 1 deletion bin/src/server/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ pub async fn run_media_gateway(workers: usize, http_port: Option<u16>, node: Nod

let node_id = node.node_id;

let mut builder = SdnBuilder::<(), SC, SE, TC, TW, ClusterNodeInfo>::new(node_id, &node.bind_addrs, node.custom_addrs);
let mut builder = SdnBuilder::<(), SC, SE, TC, TW, ClusterNodeInfo>::new(node_id, &node.bind_addrs, node.bind_addrs_alt);
let node_addr = builder.node_addr();
let node_info = ClusterNodeInfo::Gateway(
ClusterNodeGenericInfo {
Expand Down
55 changes: 24 additions & 31 deletions bin/src/server/media.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
collections::HashMap,
net::{IpAddr, SocketAddr, SocketAddrV4},
net::SocketAddr,
sync::Arc,
time::{Duration, Instant},
};
Expand Down Expand Up @@ -47,17 +47,10 @@ pub struct Args {
#[arg(env, long)]
ice_lite: bool,

/// Binding port
#[arg(env, long, default_value_t = 0)]
media_port: u16,

/// Allow private ip
#[arg(env, long, default_value_t = false)]
allow_private_ip: bool,

/// Custom binding address for WebRTC UDP
#[arg(env, long)]
custom_ips: Vec<IpAddr>,
/// Seed port for binding media UDP socket. It will increase by one for each worker.
/// Default: worker0: 20000, worker1: 20001, worker2: 20002, ...
#[arg(env, long, default_value_t = 20000)]
webrtc_port_seed: u16,

/// Max ccu per core
#[arg(env, long, default_value_t = 200)]
Expand Down Expand Up @@ -100,25 +93,21 @@ pub async fn run_media_server(workers: usize, http_port: Option<u16>, node: Node
let node_id = node.node_id;
let node_session = random();

let mut webrtc_addrs = args.custom_ips.into_iter().map(|ip| SocketAddr::new(ip, args.media_port)).collect::<Vec<_>>();
local_ip_address::local_ip().into_iter().for_each(|ip| {
if let IpAddr::V4(ip) = ip {
if !ip.is_private() || args.allow_private_ip {
println!("Detect local ip: {ip}");
webrtc_addrs.push(SocketAddr::V4(SocketAddrV4::new(ip, 0)));
}
}
});

println!("Running media server with addrs: {:?}, ice-lite: {}", webrtc_addrs, args.ice_lite);
let mut controller = Controller::<_, _, _, _, _, 128>::default();
for i in 0..workers {
let webrtc_port = args.webrtc_port_seed + i as u16;
let webrtc_addrs = node.bind_addrs.iter().map(|addr| SocketAddr::new(addr.ip(), webrtc_port)).collect::<Vec<_>>();
let webrtc_addrs_alt = node.bind_addrs_alt.iter().map(|addr| SocketAddr::new(addr.ip(), webrtc_port)).collect::<Vec<_>>();

println!("Running media server worker {i} with addrs: {:?}, ice-lite: {}", webrtc_addrs, args.ice_lite);

let cfg = runtime_worker::ICfg {
controller: i == 0,
node: node.clone(),
session: node_session,
media: MediaConfig {
webrtc_addrs: webrtc_addrs.clone(),
webrtc_addrs,
webrtc_addrs_alt,
ice_lite: args.ice_lite,
secure: secure.clone(),
max_live: HashMap::from([(ServiceKind::Webrtc, workers as u32 * args.ccu_per_core)]),
Expand All @@ -128,7 +117,7 @@ pub async fn run_media_server(workers: usize, http_port: Option<u16>, node: Node
}

for seed in node.seeds {
controller.send_to(0, ExtIn::Sdn(SdnExtIn::ConnectTo(seed)));
controller.send_to(0, ExtIn::Sdn(SdnExtIn::ConnectTo(seed), true));
}

let mut req_id_seed = 0;
Expand Down Expand Up @@ -185,17 +174,21 @@ pub async fn run_media_server(workers: usize, http_port: Option<u16>, node: Node
//TODO
}
media_server_record::Output::UploadRequest(upload_id, req) => {
controller.send_to_best(ExtIn::Sdn(SdnExtIn::ServicesControl(
media_server_connector::AGENT_SERVICE_ID.into(),
UserData::Record(upload_id),
media_server_connector::agent_service::Control::Request(now_ms(), connector_request::Request::Record(req)).into(),
)));
controller.send_to_best(ExtIn::Sdn(
SdnExtIn::ServicesControl(
media_server_connector::AGENT_SERVICE_ID.into(),
UserData::Record(upload_id),
media_server_connector::agent_service::Control::Request(now_ms(), connector_request::Request::Record(req)).into(),
),
false,
));
}
}
}

while let Ok(control) = vnet_rx.try_recv() {
controller.send_to_best(ExtIn::Sdn(SdnExtIn::FeaturesControl(media_server_runner::UserData::Cluster, control.into())));
// TODO: fix bug with send_to_best cause cannot connect, avoid send to worker 0
controller.send_to(0, ExtIn::Sdn(SdnExtIn::FeaturesControl(media_server_runner::UserData::Cluster, control.into()), true));
}
while let Ok(req) = req_rx.try_recv() {
let req_id = req_id_seed;
Expand Down
7 changes: 4 additions & 3 deletions bin/src/server/media/runtime_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ use crate::NodeConfig;

#[derive(Debug, Clone)]
pub enum ExtIn {
Sdn(SdnExtIn<UserData, SC>),
/// ext, send controller or worker, true is controller
Sdn(SdnExtIn<UserData, SC>, bool),
Rpc(u64, RpcReq<usize>),
NodeStats(NodeMetrics),
}
Expand Down Expand Up @@ -65,7 +66,7 @@ impl<ES: 'static + MediaEdgeSecure> WorkerInner<Owner, ExtIn, ExtOut, Channel, E
&cfg.node.secret,
cfg.controller,
cfg.node.bind_addrs,
cfg.node.custom_addrs,
cfg.node.bind_addrs_alt,
cfg.node.zone,
cfg.media,
);
Expand Down Expand Up @@ -130,7 +131,7 @@ impl<ES: MediaEdgeSecure> MediaRuntimeWorker<ES> {
},
Input::Ext(ext) => match ext {
ExtIn::Rpc(req_id, ext) => WorkerInput::ExtRpc(req_id, ext),
ExtIn::Sdn(ext) => WorkerInput::ExtSdn(ext),
ExtIn::Sdn(ext, is_controller) => WorkerInput::ExtSdn(ext, is_controller),
ExtIn::NodeStats(metrics) => WorkerInput::NodeStats(metrics),
},
Input::Net(owner, event) => WorkerInput::Net(owner, event),
Expand Down
Loading

0 comments on commit fcf360b

Please sign in to comment.