Skip to content

Commit

Permalink
deps: switched to internal deps from crate.io (8xFF#367)
Browse files Browse the repository at this point in the history
  • Loading branch information
giangndm authored Jul 22, 2024
1 parent 10169a1 commit 6c0e943
Show file tree
Hide file tree
Showing 11 changed files with 92 additions and 51 deletions.
52 changes: 24 additions & 28 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ members = [
]

[workspace.dependencies]
sans-io-runtime = { git = "https://github.com/giangndm/sans-io-runtime.git", rev = "c781cef12b2a435b5e31a6ede69d301a23719452" , default-features = false}
atm0s-sdn = { git = "https://github.com/giangndm/8xFF-decentralized-sdn.git", rev = "1c82b687763e4abfb2f9fdbd39eb00ea7497bb1a" }
sans-io-runtime = { version = "0.2", default-features = false }
atm0s-sdn = { version = "0.2.2", default-features = false }
tracing-subscriber = { version = "0.3", features = ["env-filter", "std"] }
convert-enum = "0.1"
num_enum = "0.7"
Expand Down
2 changes: 1 addition & 1 deletion bin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub struct NodeConfig {
pub node_id: NodeId,
pub secret: String,
pub seeds: Vec<NodeAddr>,
pub udp_port: u16,
pub bind_addrs: Vec<SocketAddr>,
pub zone: u32,
pub custom_addrs: Vec<SocketAddr>,
}
18 changes: 17 additions & 1 deletion bin/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ struct Args {
#[arg(env, long, default_value_t = 0)]
sdn_port: u16,

/// Sdn enable ipv6
#[arg(env, long)]
sdn_enable_ipv6: bool,

/// Custom Sdn addr
#[arg(env, long)]
sdn_custom_addrs: Vec<SocketAddr>,
Expand Down Expand Up @@ -62,11 +66,23 @@ async fn main() {

let http_port = args.http_port;
let workers = args.workers;
let bind_addrs = local_ip_address::list_afinet_netifas()
.expect("Should have list interfaces")
.into_iter()
.filter(|(_, ip)| {
if ip.is_unspecified() || ip.is_multicast() || (ip.is_ipv6() && !args.sdn_enable_ipv6) {
false
} else {
std::net::UdpSocket::bind(SocketAddr::new(*ip, 0)).is_ok()
}
})
.map(|(_name, ip)| SocketAddr::new(ip, args.sdn_port))
.collect::<Vec<_>>();
let node = NodeConfig {
node_id: args.node_id,
secret: args.secret,
seeds: args.seeds,
udp_port: args.sdn_port,
bind_addrs,
zone: args.sdn_zone,
custom_addrs: args.sdn_custom_addrs,
};
Expand Down
2 changes: 1 addition & 1 deletion bin/src/server/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub async fn run_media_connector(workers: usize, node: NodeConfig, args: Args) {

let node_id = node.node_id;

let mut builder = SdnBuilder::<(), SC, SE, TC, TW, ClusterNodeInfo>::new(node_id, node.udp_port, node.custom_addrs);
let mut builder = SdnBuilder::<(), SC, SE, TC, TW, ClusterNodeInfo>::new(node_id, &node.bind_addrs, node.custom_addrs);
let node_addr = builder.node_addr();
let node_info = ClusterNodeInfo::Connector(ClusterNodeGenericInfo {
addr: node_addr.to_string(),
Expand Down
2 changes: 1 addition & 1 deletion bin/src/server/console.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub async fn run_console_server(workers: usize, http_port: Option<u16>, node: No
let storage = StorageShared::default();

let node_id = node.node_id;
let mut builder = SdnBuilder::<(), SC, SE, TC, TW, ClusterNodeInfo>::new(node_id, node.udp_port, node.custom_addrs);
let mut builder = SdnBuilder::<(), SC, SE, TC, TW, ClusterNodeInfo>::new(node_id, &node.bind_addrs, node.custom_addrs);
let node_addr = builder.node_addr();

builder.set_authorization(StaticKeyAuthorization::new(&node.secret));
Expand Down
2 changes: 1 addition & 1 deletion bin/src/server/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ pub async fn run_media_gateway(workers: usize, http_port: Option<u16>, node: Nod

let node_id = node.node_id;

let mut builder = SdnBuilder::<(), SC, SE, TC, TW, ClusterNodeInfo>::new(node_id, node.udp_port, node.custom_addrs);
let mut builder = SdnBuilder::<(), SC, SE, TC, TW, ClusterNodeInfo>::new(node_id, &node.bind_addrs, node.custom_addrs);
let node_addr = builder.node_addr();
let node_info = ClusterNodeInfo::Gateway(
ClusterNodeGenericInfo {
Expand Down
2 changes: 1 addition & 1 deletion bin/src/server/media/runtime_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl<ES: 'static + MediaEdgeSecure> WorkerInner<Owner, ExtIn, ExtOut, Channel, E
cfg.session,
&cfg.node.secret,
cfg.controller,
cfg.node.udp_port,
cfg.node.bind_addrs,
cfg.node.custom_addrs,
cfg.node.zone,
cfg.media,
Expand Down
1 change: 1 addition & 0 deletions packages/media_runner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ media-server-core = { path = "../media_core" }

sans-io-runtime = { workspace = true, default-features = false }
atm0s-sdn = { workspace = true }
atm0s-sdn-network = "0.5"
transport-webrtc = { path = "../transport_webrtc", optional = true }

[features]
Expand Down
56 changes: 42 additions & 14 deletions packages/media_runner/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use atm0s_sdn::{
services::{manual_discovery, visualization},
ControllerPlaneCfg, DataPlaneCfg, DataWorkerHistory, NetInput, NetOutput, NodeAddr, SdnExtIn, SdnExtOut, SdnWorker, SdnWorkerBusEvent, SdnWorkerCfg, SdnWorkerInput, SdnWorkerOutput, TimePivot,
};
use atm0s_sdn_network::data_plane::NetPair;
use media_server_connector::agent_service::ConnectorAgentServiceBuilder;
use media_server_core::cluster::{self, MediaCluster};
use media_server_gateway::{agent_service::GatewayAgentServiceBuilder, NodeMetrics, ServiceKind, AGENT_SERVICE_ID};
Expand Down Expand Up @@ -107,8 +108,9 @@ enum MediaClusterEndpoint {
pub struct MediaServerWorker<ES: 'static + MediaEdgeSecure> {
worker: u16,
sdn_addr: NodeAddr,
sdn_slot: usize,
sdn_worker: TaskSwitcherBranch<SdnWorker<UserData, SC, SE, TC, TW>, SdnWorkerOutput<UserData, SC, SE, TC, TW>>,
sdn_backend_addrs: HashMap<SocketAddr, usize>,
sdn_backend_slots: HashMap<usize, SocketAddr>,
media_cluster: TaskSwitcherBranch<MediaCluster<MediaClusterEndpoint>, cluster::Output<MediaClusterEndpoint>>,
media_webrtc: TaskSwitcherBranch<MediaWorkerWebrtc<ES>, transport_webrtc::GroupOutput>,
media_max_live: u32,
Expand All @@ -121,15 +123,23 @@ pub struct MediaServerWorker<ES: 'static + MediaEdgeSecure> {

impl<ES: 'static + MediaEdgeSecure> MediaServerWorker<ES> {
#[allow(clippy::too_many_arguments)]
pub fn new(worker: u16, node_id: u32, session: u64, secret: &str, controller: bool, sdn_udp: u16, sdn_custom_addrs: Vec<SocketAddr>, sdn_zone: u32, media: MediaConfig<ES>) -> Self {
pub fn new(
worker: u16,
node_id: u32,
session: u64,
secret: &str,
controller: bool,
sdn_bind_addrs: Vec<SocketAddr>,
sdn_custom_addrs: Vec<SocketAddr>,
sdn_zone: u32,
media: MediaConfig<ES>,
) -> Self {
let secure = media.secure.clone(); //TODO why need this?
let sdn_udp_addr = SocketAddr::from(([0, 0, 0, 0], sdn_udp));

let mut media_max_live = 0;
for (_, max) in media.max_live.iter() {
media_max_live += *max;
}
let node_addr = generate_node_addr(node_id, sdn_udp, sdn_custom_addrs);
let node_addr = generate_node_addr(node_id, &sdn_bind_addrs, sdn_custom_addrs);
let node_info = ClusterNodeInfo::Media(
ClusterNodeGenericInfo {
addr: node_addr.to_string(),
Expand All @@ -155,6 +165,7 @@ impl<ES: 'static + MediaEdgeSecure> MediaServerWorker<ES> {
controller: if controller {
Some(ControllerPlaneCfg {
session,
bind_addrs: sdn_bind_addrs.clone(),
authorization: Arc::new(StaticKeyAuthorization::new(secret)),
handshake_builder: Arc::new(HandshakeBuilderXDA),
random: Box::new(OsRng),
Expand All @@ -172,19 +183,25 @@ impl<ES: 'static + MediaEdgeSecure> MediaServerWorker<ES> {
},
};

let mut queue = DynamicDeque::default();
for addr in sdn_bind_addrs {
queue.push_back(Output::Net(Owner::Sdn, BackendOutgoing::UdpListen { addr, reuse: true }));
}

Self {
worker,
sdn_addr: node_addr,
sdn_slot: 1, //TODO dont use this hack, must to wait to bind success to network
sdn_worker: TaskSwitcherBranch::new(SdnWorker::new(sdn_config), TaskType::Sdn),
media_cluster: TaskSwitcherBranch::default(TaskType::MediaCluster),
media_webrtc: TaskSwitcherBranch::new(MediaWorkerWebrtc::new(media.webrtc_addrs, media.ice_lite, media.secure), TaskType::MediaWebrtc),
media_max_live,
switcher: TaskSwitcher::new(3),
queue: DynamicDeque::from([Output::Net(Owner::Sdn, BackendOutgoing::UdpListen { addr: sdn_udp_addr, reuse: true })]),
queue,
timer: TimePivot::build(),
last_feedback_gateway_agent: 0,
secure,
sdn_backend_addrs: Default::default(),
sdn_backend_slots: Default::default(),
}
}

Expand Down Expand Up @@ -257,13 +274,18 @@ impl<ES: 'static + MediaEdgeSecure> MediaServerWorker<ES> {
Owner::Sdn => {
let now_ms = self.timer.timestamp_ms(now);
match event {
BackendIncoming::UdpPacket { slot: _, from, data } => {
self.sdn_worker.input(&mut self.switcher).on_event(now_ms, SdnWorkerInput::Net(NetInput::UdpPacket(from, data)));
BackendIncoming::UdpPacket { slot, from, data } => {
let local = self.sdn_backend_slots.get(&slot).expect("Should have local addr");
self.sdn_worker
.input(&mut self.switcher)
.on_event(now_ms, SdnWorkerInput::Net(NetInput::UdpPacket(NetPair::new(*local, from), data)));
}
BackendIncoming::UdpListenResult { bind: _, result } => {
let (addr, slot) = result.expect("Should listen ok");
log::info!("[MediaServerWorker] sdn listen success on {addr}, slot {slot}");
self.sdn_slot = slot;
if let Ok((addr, slot)) = result {
log::info!("[MediaServerWorker] sdn listen success on {addr}, slot {slot}");
self.sdn_backend_addrs.insert(addr, slot);
self.sdn_backend_slots.insert(slot, addr);
}
}
}
}
Expand Down Expand Up @@ -327,8 +349,14 @@ impl<ES: 'static + MediaEdgeSecure> MediaServerWorker<ES> {
SdnExtOut::ServicesEvent(..) => Output::Continue,
},
SdnWorkerOutput::Net(out) => match out {
NetOutput::UdpPacket(to, data) => Output::Net(Owner::Sdn, BackendOutgoing::UdpPacket { slot: self.sdn_slot, to, data }),
NetOutput::UdpPackets(to, data) => Output::Net(Owner::Sdn, BackendOutgoing::UdpPackets { slot: self.sdn_slot, to, data }),
NetOutput::UdpPacket(pair, data) => {
let slot = self.sdn_backend_addrs.get(&pair.local).expect("Should have slot");
Output::Net(Owner::Sdn, BackendOutgoing::UdpPacket { slot: *slot, to: pair.remote, data })
}
NetOutput::UdpPackets(pairs, data) => {
let to = pairs.into_iter().filter_map(|p| self.sdn_backend_addrs.get(&p.local).map(|s| (*s, p.remote))).collect::<Vec<_>>();
Output::Net(Owner::Sdn, BackendOutgoing::UdpPackets2 { to, data })
}
},
SdnWorkerOutput::Bus(event) => Output::Bus(event),
SdnWorkerOutput::ShutdownResponse => Output::Continue,
Expand Down
2 changes: 1 addition & 1 deletion packages/transport_webrtc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ media-server-utils = { path = "../media_utils" }
media-server-protocol = { path = "../protocol" }
media-server-secure = { path = "../media_secure" }
media-server-core = { path = "../media_core" }
str0m = { git = "https://github.com/algesten/str0m.git", rev = "d826257b06ac4a5562301eaa1fbac4ab3a6c04a2" }
str0m = "0.6"

0 comments on commit 6c0e943

Please sign in to comment.