diff --git a/bin/src/server/connector.rs b/bin/src/server/connector.rs index aea72857..0c953906 100644 --- a/bin/src/server/connector.rs +++ b/bin/src/server/connector.rs @@ -177,7 +177,7 @@ pub async fn run_media_connector(workers: usize, http_port: Option, node: N connector_storage.on_tick().await; } select2::OrOutput::Right(Some((from, ts, req_id, event))) => { - match connector_storage.on_event(from, ts, event).await { + match connector_storage.on_event(from, ts, req_id, event).await { Some(res) => { if let Err(e) = connector_handler_control_tx.send(handler_service::Control::Res(from, req_id, res)).await { log::error!("[Connector] send control to service error {:?}", e); diff --git a/packages/media_connector/src/handler_service.rs b/packages/media_connector/src/handler_service.rs index b22fed9d..26a6f956 100644 --- a/packages/media_connector/src/handler_service.rs +++ b/packages/media_connector/src/handler_service.rs @@ -1,4 +1,4 @@ -use std::{collections::VecDeque, fmt::Debug, num::NonZeroUsize}; +use std::{collections::VecDeque, fmt::Debug}; use atm0s_sdn::{ base::{ @@ -8,7 +8,6 @@ use atm0s_sdn::{ features::{data, FeaturesControl, FeaturesEvent}, NodeId, RouteRule, }; -use lru::LruCache; use media_server_protocol::protobuf::cluster_connector::{connector_request, connector_response, ConnectorRequest, ConnectorResponse}; use prost::Message; @@ -25,10 +24,7 @@ pub enum Event { Req(NodeId, u64, u64, connector_request::Request), } -type ReqUuid = (NodeId, u64, u64); - pub struct ConnectorHandlerService { - lru: LruCache, subscriber: Option>, queue: VecDeque>, shutdown: bool, @@ -45,7 +41,6 @@ impl ConnectorHandlerService pub fn new() -> Self { Self { subscriber: None, - lru: LruCache::new(NonZeroUsize::new(10000).expect("should be non-zero")), queue: VecDeque::from([ServiceOutput::FeatureControl(data::Control::DataListen(DATA_PORT).into())]), shutdown: false, _tmp: std::marker::PhantomData, @@ -96,11 +91,6 @@ where data::Event::Recv(_port, meta, buf) => match ConnectorRequest::decode(buf.as_slice()) { Ok(msg) => { if let Some(source) = meta.source { - if self.lru.put((source, msg.ts, msg.req_id), ()).is_some() { - log::warn!("[ConnectorHandler] duplicate msg {:?}", msg); - return; - } - log::info!("[ConnectorHandler] on event {:?}", msg); if let Some(event) = msg.request { if let Some(actor) = self.subscriber { diff --git a/packages/media_connector/src/lib.rs b/packages/media_connector/src/lib.rs index f784aa4f..42c2dac5 100644 --- a/packages/media_connector/src/lib.rs +++ b/packages/media_connector/src/lib.rs @@ -99,7 +99,7 @@ pub trait Storage { type Q: Querier; fn querier(&mut self) -> Self::Q; fn on_tick(&mut self, now_ms: u64) -> impl std::future::Future + Send; - fn on_event(&mut self, now_ms: u64, from: NodeId, req_ts: u64, req: connector_request::Request) -> impl std::future::Future> + Send; + fn on_event(&mut self, now_ms: u64, from: NodeId, req_ts: u64, req_id: u64, req: connector_request::Request) -> impl std::future::Future> + Send; fn pop_hook_event(&mut self) -> Option<(AppId, HookEvent)>; } @@ -135,8 +135,8 @@ impl ConnectorStorage { } } - pub async fn on_event(&mut self, from: NodeId, ts: u64, req: connector_request::Request) -> Option { - let res = self.sql_storage.on_event(now_ms(), from, ts, req).await?; + pub async fn on_event(&mut self, from: NodeId, ts: u64, req_id: u64, req: connector_request::Request) -> Option { + let res = self.sql_storage.on_event(now_ms(), from, ts, req_id, req).await?; while let Some((app, event)) = self.sql_storage.pop_hook_event() { self.hook.on_event(app, event); diff --git a/packages/media_connector/src/sql_storage.rs b/packages/media_connector/src/sql_storage.rs index 4c0088c8..30d8756e 100644 --- a/packages/media_connector/src/sql_storage.rs +++ b/packages/media_connector/src/sql_storage.rs @@ -1,16 +1,18 @@ use std::{ collections::{HashMap, VecDeque}, + num::NonZeroUsize, time::Duration, }; use atm0s_sdn::NodeId; +use lru::LruCache; use media_server_protocol::{ multi_tenancy::AppId, protobuf::cluster_connector::{ connector_request, connector_response, hook_event, peer_event, record_event::{self, RecordPeerJoined, RecordStarted}, room_event::{self, RoomAllPeersLeaved, RoomPeerJoined, RoomPeerLeaved, RoomStarted, RoomStopped}, - HookEvent, PeerRes, RecordEvent, RecordRes, RoomEvent, + HookEvent, PeerRes, RecordEvent, RecordReq, RecordRes, RoomEvent, }, }; use media_server_utils::CustomUri; @@ -40,6 +42,7 @@ pub struct ConnectorSqlStorage { s3: Presigner, s3_sub_folder: String, room_destroy_after_ms: u64, + lru: LruCache<(NodeId, u64, u64), ()>, hook_events: VecDeque<(AppId, HookEvent)>, } @@ -81,6 +84,7 @@ impl ConnectorSqlStorage { s3, s3_sub_folder, room_destroy_after_ms: cfg.room_destroy_after_ms, + lru: LruCache::new(NonZeroUsize::new(10000).expect("should be non-zero")), hook_events: Default::default(), } } @@ -545,6 +549,93 @@ impl ConnectorSqlStorage { .count(&self.db) .await } + + async fn on_record_req(&mut self, from: NodeId, event_ts: u64, req: RecordReq) -> Result { + let room = entity::room::Entity::find() + .filter(entity::room::Column::App.eq(&req.app)) + .filter(entity::room::Column::Room.eq(&req.room)) + .filter(entity::room::Column::DestroyedAt.is_null()) + .one(&self.db) + .await? + .ok_or(DbErr::RecordNotFound("Room not found".to_string()))?; + let room_id = room.id; + let room_path = if let Some(path) = room.record { + path + } else { + let room_path = std::path::Path::new(&self.s3_sub_folder) + .join(&req.app) + .join(&req.room) + .join(room_id.to_string()) + .to_str() + .ok_or(DbErr::Custom("Should convert path to string".to_string()))? + .to_string(); + log::info!("[ConnectorSqlStorage] room {} record started in path: {room_path}", req.room); + self.hook_events.push_back(( + req.app.clone().into(), + HookEvent { + node: from, + ts: event_ts, + event: Some(hook_event::Event::Record(RecordEvent { + app: req.app.clone(), + room: req.room.clone(), + event: Some(record_event::Event::Started(RecordStarted { path: room_path.clone() })), + })), + }, + )); + + let mut model: entity::room::ActiveModel = room.into(); + model.record = Set(Some(room_path.clone())); + model.save(&self.db).await?; + room_path + }; + + let peer_session = entity::peer_session::Entity::find() + .filter(entity::peer_session::Column::Session.eq(req.session as i64)) + .filter(entity::peer_session::Column::Room.eq(room_id)) + .order_by_desc(entity::peer_session::Column::CreatedAt) + .one(&self.db) + .await? + .ok_or(DbErr::RecordNotFound("Peer not found".to_string()))?; + let peer_path = if let Some(path) = peer_session.record { + path + } else { + let peer_path = std::path::Path::new(&req.peer) + .join(peer_session.id.to_string()) + .to_str() + .ok_or(DbErr::Custom("Should convert path to string".to_string()))? + .to_string(); + log::info!("[ConnectorSqlStorage] room {} peer {} record started in path: {peer_path}", req.room, req.peer); + self.hook_events.push_back(( + req.app.clone().into(), + HookEvent { + node: from, + ts: event_ts, + event: Some(hook_event::Event::Record(RecordEvent { + app: req.app.clone(), + room: req.room.clone(), + event: Some(record_event::Event::PeerJoined(RecordPeerJoined { + peer: req.peer.clone(), + path: peer_path.clone(), + })), + })), + }, + )); + + let mut model: entity::peer_session::ActiveModel = peer_session.into(); + model.record = Set(Some(peer_path.clone())); + model.save(&self.db).await?; + peer_path + }; + + let path = std::path::Path::new(&room_path) + .join(peer_path) + .join(format!("{}-{}-{}.rec", req.index, req.from_ts, req.to_ts)) + .to_str() + .ok_or(DbErr::Custom("Should convert path to string".to_string()))? + .to_string(); + let s3_uri = self.s3.put(&path, 86400).expect("Should create s3_uri"); + Ok(RecordRes { s3_uri }) + } } impl Storage for ConnectorSqlStorage { @@ -557,11 +648,16 @@ impl Storage for ConnectorSqlStorage { log::error!("[ConnectorSqlStorage] on_tick db error {e:?}"); } } - async fn on_event(&mut self, now_ms: u64, from: NodeId, event_ts: u64, event: connector_request::Request) -> Option { + async fn on_event(&mut self, now_ms: u64, from: NodeId, event_ts: u64, req_id: u64, event: connector_request::Request) -> Option { match event { connector_request::Request::Peer(event) => { + if self.lru.contains(&(from, event_ts, req_id)) { + log::warn!("[ConnectorSqlStorage] peer event {event:?} already processed"); + return Some(connector_response::Response::Peer(PeerRes {})); + } + if let Err(e) = self.on_peer_event(now_ms, from, event_ts, &event.app, event.session_id, event.event.clone()?).await { - log::error!("[ConnectorSqlStorage] on_peer_event db error {e:?}"); + log::error!("[ConnectorSqlStorage] on_peer_event {event:?} db error {e:?}"); return None; } self.hook_events.push_back(( @@ -572,83 +668,16 @@ impl Storage for ConnectorSqlStorage { event: Some(hook_event::Event::Peer(event)), }, )); + self.lru.put((from, event_ts, req_id), ()); Some(connector_response::Response::Peer(PeerRes {})) } - connector_request::Request::Record(req) => { - let room = entity::room::Entity::find() - .filter(entity::room::Column::App.eq(&req.app)) - .filter(entity::room::Column::Room.eq(&req.room)) - .filter(entity::room::Column::DestroyedAt.is_null()) - .one(&self.db) - .await - .ok()??; - let room_id = room.id; - let room_path = if let Some(path) = room.record { - path - } else { - let room_path = std::path::Path::new(&self.s3_sub_folder).join(&req.app).join(&req.room).join(room_id.to_string()).to_str()?.to_string(); - log::info!("[ConnectorSqlStorage] room {} record started in path: {room_path}", req.room); - self.hook_events.push_back(( - req.app.clone().into(), - HookEvent { - node: from, - ts: event_ts, - event: Some(hook_event::Event::Record(RecordEvent { - app: req.app.clone(), - room: req.room.clone(), - event: Some(record_event::Event::Started(RecordStarted { path: room_path.clone() })), - })), - }, - )); - - let mut model: entity::room::ActiveModel = room.into(); - model.record = Set(Some(room_path.clone())); - model.save(&self.db).await.ok()?; - room_path - }; - - let peer_session = entity::peer_session::Entity::find() - .filter(entity::peer_session::Column::Session.eq(req.session as i64)) - .filter(entity::peer_session::Column::Room.eq(room_id)) - .order_by_desc(entity::peer_session::Column::CreatedAt) - .one(&self.db) - .await - .ok()??; - let peer_path = if let Some(path) = peer_session.record { - path - } else { - let peer_path = std::path::Path::new(&req.peer).join(peer_session.id.to_string()).to_str()?.to_string(); - log::info!("[ConnectorSqlStorage] room {} peer {} record started in path: {peer_path}", req.room, req.peer); - self.hook_events.push_back(( - req.app.clone().into(), - HookEvent { - node: from, - ts: event_ts, - event: Some(hook_event::Event::Record(RecordEvent { - app: req.app.clone(), - room: req.room.clone(), - event: Some(record_event::Event::PeerJoined(RecordPeerJoined { - peer: req.peer.clone(), - path: peer_path.clone(), - })), - })), - }, - )); - - let mut model: entity::peer_session::ActiveModel = peer_session.into(); - model.record = Set(Some(peer_path.clone())); - model.save(&self.db).await.ok()?; - peer_path - }; - - let path = std::path::Path::new(&room_path) - .join(peer_path) - .join(format!("{}-{}-{}.rec", req.index, req.from_ts, req.to_ts)) - .to_str()? - .to_string(); - let s3_uri = self.s3.put(&path, 86400).expect("Should create s3_uri"); - Some(connector_response::Response::Record(RecordRes { s3_uri })) - } + connector_request::Request::Record(req) => match self.on_record_req(from, event_ts, req.clone()).await { + Ok(res) => Some(connector_response::Response::Record(res)), + Err(err) => { + log::error!("[ConnectorSqlStorage] on_record_req {req:?} db error {err}"); + None + } + }, } } @@ -914,7 +943,7 @@ mod tests { session_id, event: Some(Event::RouteBegin(RouteBegin { remote_ip: remote_ip.clone() })), }; - storage.on_event(0, node, ts, connector_request::Request::Peer(event.clone())).await.expect("Should process event"); + storage.on_event(0, node, ts, 0, connector_request::Request::Peer(event.clone())).await.expect("Should process event"); assert_eq!( storage.pop_hook_event(), @@ -967,7 +996,7 @@ mod tests { event: Some(Event::Connecting(Connecting { remote_ip: remote_ip.clone() })), }; storage - .on_event(0, node, ts, connector_request::Request::Peer(connecting_event.clone())) + .on_event(0, node, ts, 0, connector_request::Request::Peer(connecting_event.clone())) .await .expect("Should process event"); @@ -1008,7 +1037,7 @@ mod tests { })), }; storage - .on_event(0, node, ts, connector_request::Request::Peer(connected_event.clone())) + .on_event(0, node, ts, 1, connector_request::Request::Peer(connected_event.clone())) .await .expect("Should process event"); assert_eq!( @@ -1037,7 +1066,10 @@ mod tests { peer: "peer".to_string(), })), }; - storage.on_event(0, node, ts, connector_request::Request::Peer(join_event.clone())).await.expect("Should process event"); + storage + .on_event(0, node, ts, 2, connector_request::Request::Peer(join_event.clone())) + .await + .expect("Should process event"); assert_eq!( storage.pop_hook_event(), @@ -1102,7 +1134,7 @@ mod tests { })), }; storage - .on_event(1000, node, ts, connector_request::Request::Peer(leave_event.clone())) + .on_event(1000, node, ts, 3, connector_request::Request::Peer(leave_event.clone())) .await .expect("Should process event"); @@ -1179,7 +1211,10 @@ mod tests { peer: "peer".to_string(), })), }; - storage.on_event(0, node, ts, connector_request::Request::Peer(join_event.clone())).await.expect("Should process event"); + storage + .on_event(0, node, ts, 4, connector_request::Request::Peer(join_event.clone())) + .await + .expect("Should process event"); assert_eq!( storage.pop_hook_event(), Some((