Skip to content

Commit

Permalink
fix: some time connector handle duplicate incorrect, ensure it success (
Browse files Browse the repository at this point in the history
  • Loading branch information
giangndm authored Dec 22, 2024
1 parent 5e38365 commit 41cf961
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 99 deletions.
2 changes: 1 addition & 1 deletion bin/src/server/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ pub async fn run_media_connector(workers: usize, http_port: Option<u16>, node: N
connector_storage.on_tick().await;
}
select2::OrOutput::Right(Some((from, ts, req_id, event))) => {
match connector_storage.on_event(from, ts, event).await {
match connector_storage.on_event(from, ts, req_id, event).await {
Some(res) => {
if let Err(e) = connector_handler_control_tx.send(handler_service::Control::Res(from, req_id, res)).await {
log::error!("[Connector] send control to service error {:?}", e);
Expand Down
12 changes: 1 addition & 11 deletions packages/media_connector/src/handler_service.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::VecDeque, fmt::Debug, num::NonZeroUsize};
use std::{collections::VecDeque, fmt::Debug};

use atm0s_sdn::{
base::{
Expand All @@ -8,7 +8,6 @@ use atm0s_sdn::{
features::{data, FeaturesControl, FeaturesEvent},
NodeId, RouteRule,
};
use lru::LruCache;
use media_server_protocol::protobuf::cluster_connector::{connector_request, connector_response, ConnectorRequest, ConnectorResponse};
use prost::Message;

Expand All @@ -25,10 +24,7 @@ pub enum Event {
Req(NodeId, u64, u64, connector_request::Request),
}

type ReqUuid = (NodeId, u64, u64);

pub struct ConnectorHandlerService<UserData, SC, SE, TC, TW> {
lru: LruCache<ReqUuid, ()>,
subscriber: Option<ServiceControlActor<UserData>>,
queue: VecDeque<ServiceOutput<UserData, FeaturesControl, SE, TW>>,
shutdown: bool,
Expand All @@ -45,7 +41,6 @@ impl<UserData, SC, SE, TC, TW> ConnectorHandlerService<UserData, SC, SE, TC, TW>
pub fn new() -> Self {
Self {
subscriber: None,
lru: LruCache::new(NonZeroUsize::new(10000).expect("should be non-zero")),
queue: VecDeque::from([ServiceOutput::FeatureControl(data::Control::DataListen(DATA_PORT).into())]),
shutdown: false,
_tmp: std::marker::PhantomData,
Expand Down Expand Up @@ -96,11 +91,6 @@ where
data::Event::Recv(_port, meta, buf) => match ConnectorRequest::decode(buf.as_slice()) {
Ok(msg) => {
if let Some(source) = meta.source {
if self.lru.put((source, msg.ts, msg.req_id), ()).is_some() {
log::warn!("[ConnectorHandler] duplicate msg {:?}", msg);
return;
}

log::info!("[ConnectorHandler] on event {:?}", msg);
if let Some(event) = msg.request {
if let Some(actor) = self.subscriber {
Expand Down
6 changes: 3 additions & 3 deletions packages/media_connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ pub trait Storage {
type Q: Querier;
fn querier(&mut self) -> Self::Q;
fn on_tick(&mut self, now_ms: u64) -> impl std::future::Future<Output = ()> + Send;
fn on_event(&mut self, now_ms: u64, from: NodeId, req_ts: u64, req: connector_request::Request) -> impl std::future::Future<Output = Option<connector_response::Response>> + Send;
fn on_event(&mut self, now_ms: u64, from: NodeId, req_ts: u64, req_id: u64, req: connector_request::Request) -> impl std::future::Future<Output = Option<connector_response::Response>> + Send;
fn pop_hook_event(&mut self) -> Option<(AppId, HookEvent)>;
}

Expand Down Expand Up @@ -135,8 +135,8 @@ impl ConnectorStorage {
}
}

pub async fn on_event(&mut self, from: NodeId, ts: u64, req: connector_request::Request) -> Option<connector_response::Response> {
let res = self.sql_storage.on_event(now_ms(), from, ts, req).await?;
pub async fn on_event(&mut self, from: NodeId, ts: u64, req_id: u64, req: connector_request::Request) -> Option<connector_response::Response> {
let res = self.sql_storage.on_event(now_ms(), from, ts, req_id, req).await?;

while let Some((app, event)) = self.sql_storage.pop_hook_event() {
self.hook.on_event(app, event);
Expand Down
203 changes: 119 additions & 84 deletions packages/media_connector/src/sql_storage.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
use std::{
collections::{HashMap, VecDeque},
num::NonZeroUsize,
time::Duration,
};

use atm0s_sdn::NodeId;
use lru::LruCache;
use media_server_protocol::{
multi_tenancy::AppId,
protobuf::cluster_connector::{
connector_request, connector_response, hook_event, peer_event,
record_event::{self, RecordPeerJoined, RecordStarted},
room_event::{self, RoomAllPeersLeaved, RoomPeerJoined, RoomPeerLeaved, RoomStarted, RoomStopped},
HookEvent, PeerRes, RecordEvent, RecordRes, RoomEvent,
HookEvent, PeerRes, RecordEvent, RecordReq, RecordRes, RoomEvent,
},
};
use media_server_utils::CustomUri;
Expand Down Expand Up @@ -40,6 +42,7 @@ pub struct ConnectorSqlStorage {
s3: Presigner,
s3_sub_folder: String,
room_destroy_after_ms: u64,
lru: LruCache<(NodeId, u64, u64), ()>,
hook_events: VecDeque<(AppId, HookEvent)>,
}

Expand Down Expand Up @@ -81,6 +84,7 @@ impl ConnectorSqlStorage {
s3,
s3_sub_folder,
room_destroy_after_ms: cfg.room_destroy_after_ms,
lru: LruCache::new(NonZeroUsize::new(10000).expect("should be non-zero")),
hook_events: Default::default(),
}
}
Expand Down Expand Up @@ -545,6 +549,93 @@ impl ConnectorSqlStorage {
.count(&self.db)
.await
}

async fn on_record_req(&mut self, from: NodeId, event_ts: u64, req: RecordReq) -> Result<RecordRes, DbErr> {
let room = entity::room::Entity::find()
.filter(entity::room::Column::App.eq(&req.app))
.filter(entity::room::Column::Room.eq(&req.room))
.filter(entity::room::Column::DestroyedAt.is_null())
.one(&self.db)
.await?
.ok_or(DbErr::RecordNotFound("Room not found".to_string()))?;
let room_id = room.id;
let room_path = if let Some(path) = room.record {
path
} else {
let room_path = std::path::Path::new(&self.s3_sub_folder)
.join(&req.app)
.join(&req.room)
.join(room_id.to_string())
.to_str()
.ok_or(DbErr::Custom("Should convert path to string".to_string()))?
.to_string();
log::info!("[ConnectorSqlStorage] room {} record started in path: {room_path}", req.room);
self.hook_events.push_back((
req.app.clone().into(),
HookEvent {
node: from,
ts: event_ts,
event: Some(hook_event::Event::Record(RecordEvent {
app: req.app.clone(),
room: req.room.clone(),
event: Some(record_event::Event::Started(RecordStarted { path: room_path.clone() })),
})),
},
));

let mut model: entity::room::ActiveModel = room.into();
model.record = Set(Some(room_path.clone()));
model.save(&self.db).await?;
room_path
};

let peer_session = entity::peer_session::Entity::find()
.filter(entity::peer_session::Column::Session.eq(req.session as i64))
.filter(entity::peer_session::Column::Room.eq(room_id))
.order_by_desc(entity::peer_session::Column::CreatedAt)
.one(&self.db)
.await?
.ok_or(DbErr::RecordNotFound("Peer not found".to_string()))?;
let peer_path = if let Some(path) = peer_session.record {
path
} else {
let peer_path = std::path::Path::new(&req.peer)
.join(peer_session.id.to_string())
.to_str()
.ok_or(DbErr::Custom("Should convert path to string".to_string()))?
.to_string();
log::info!("[ConnectorSqlStorage] room {} peer {} record started in path: {peer_path}", req.room, req.peer);
self.hook_events.push_back((
req.app.clone().into(),
HookEvent {
node: from,
ts: event_ts,
event: Some(hook_event::Event::Record(RecordEvent {
app: req.app.clone(),
room: req.room.clone(),
event: Some(record_event::Event::PeerJoined(RecordPeerJoined {
peer: req.peer.clone(),
path: peer_path.clone(),
})),
})),
},
));

let mut model: entity::peer_session::ActiveModel = peer_session.into();
model.record = Set(Some(peer_path.clone()));
model.save(&self.db).await?;
peer_path
};

let path = std::path::Path::new(&room_path)
.join(peer_path)
.join(format!("{}-{}-{}.rec", req.index, req.from_ts, req.to_ts))
.to_str()
.ok_or(DbErr::Custom("Should convert path to string".to_string()))?
.to_string();
let s3_uri = self.s3.put(&path, 86400).expect("Should create s3_uri");
Ok(RecordRes { s3_uri })
}
}

impl Storage for ConnectorSqlStorage {
Expand All @@ -557,11 +648,16 @@ impl Storage for ConnectorSqlStorage {
log::error!("[ConnectorSqlStorage] on_tick db error {e:?}");
}
}
async fn on_event(&mut self, now_ms: u64, from: NodeId, event_ts: u64, event: connector_request::Request) -> Option<connector_response::Response> {
async fn on_event(&mut self, now_ms: u64, from: NodeId, event_ts: u64, req_id: u64, event: connector_request::Request) -> Option<connector_response::Response> {
match event {
connector_request::Request::Peer(event) => {
if self.lru.contains(&(from, event_ts, req_id)) {
log::warn!("[ConnectorSqlStorage] peer event {event:?} already processed");
return Some(connector_response::Response::Peer(PeerRes {}));
}

if let Err(e) = self.on_peer_event(now_ms, from, event_ts, &event.app, event.session_id, event.event.clone()?).await {
log::error!("[ConnectorSqlStorage] on_peer_event db error {e:?}");
log::error!("[ConnectorSqlStorage] on_peer_event {event:?} db error {e:?}");
return None;
}
self.hook_events.push_back((
Expand All @@ -572,83 +668,16 @@ impl Storage for ConnectorSqlStorage {
event: Some(hook_event::Event::Peer(event)),
},
));
self.lru.put((from, event_ts, req_id), ());
Some(connector_response::Response::Peer(PeerRes {}))
}
connector_request::Request::Record(req) => {
let room = entity::room::Entity::find()
.filter(entity::room::Column::App.eq(&req.app))
.filter(entity::room::Column::Room.eq(&req.room))
.filter(entity::room::Column::DestroyedAt.is_null())
.one(&self.db)
.await
.ok()??;
let room_id = room.id;
let room_path = if let Some(path) = room.record {
path
} else {
let room_path = std::path::Path::new(&self.s3_sub_folder).join(&req.app).join(&req.room).join(room_id.to_string()).to_str()?.to_string();
log::info!("[ConnectorSqlStorage] room {} record started in path: {room_path}", req.room);
self.hook_events.push_back((
req.app.clone().into(),
HookEvent {
node: from,
ts: event_ts,
event: Some(hook_event::Event::Record(RecordEvent {
app: req.app.clone(),
room: req.room.clone(),
event: Some(record_event::Event::Started(RecordStarted { path: room_path.clone() })),
})),
},
));

let mut model: entity::room::ActiveModel = room.into();
model.record = Set(Some(room_path.clone()));
model.save(&self.db).await.ok()?;
room_path
};

let peer_session = entity::peer_session::Entity::find()
.filter(entity::peer_session::Column::Session.eq(req.session as i64))
.filter(entity::peer_session::Column::Room.eq(room_id))
.order_by_desc(entity::peer_session::Column::CreatedAt)
.one(&self.db)
.await
.ok()??;
let peer_path = if let Some(path) = peer_session.record {
path
} else {
let peer_path = std::path::Path::new(&req.peer).join(peer_session.id.to_string()).to_str()?.to_string();
log::info!("[ConnectorSqlStorage] room {} peer {} record started in path: {peer_path}", req.room, req.peer);
self.hook_events.push_back((
req.app.clone().into(),
HookEvent {
node: from,
ts: event_ts,
event: Some(hook_event::Event::Record(RecordEvent {
app: req.app.clone(),
room: req.room.clone(),
event: Some(record_event::Event::PeerJoined(RecordPeerJoined {
peer: req.peer.clone(),
path: peer_path.clone(),
})),
})),
},
));

let mut model: entity::peer_session::ActiveModel = peer_session.into();
model.record = Set(Some(peer_path.clone()));
model.save(&self.db).await.ok()?;
peer_path
};

let path = std::path::Path::new(&room_path)
.join(peer_path)
.join(format!("{}-{}-{}.rec", req.index, req.from_ts, req.to_ts))
.to_str()?
.to_string();
let s3_uri = self.s3.put(&path, 86400).expect("Should create s3_uri");
Some(connector_response::Response::Record(RecordRes { s3_uri }))
}
connector_request::Request::Record(req) => match self.on_record_req(from, event_ts, req.clone()).await {
Ok(res) => Some(connector_response::Response::Record(res)),
Err(err) => {
log::error!("[ConnectorSqlStorage] on_record_req {req:?} db error {err}");
None
}
},
}
}

Expand Down Expand Up @@ -914,7 +943,7 @@ mod tests {
session_id,
event: Some(Event::RouteBegin(RouteBegin { remote_ip: remote_ip.clone() })),
};
storage.on_event(0, node, ts, connector_request::Request::Peer(event.clone())).await.expect("Should process event");
storage.on_event(0, node, ts, 0, connector_request::Request::Peer(event.clone())).await.expect("Should process event");

assert_eq!(
storage.pop_hook_event(),
Expand Down Expand Up @@ -967,7 +996,7 @@ mod tests {
event: Some(Event::Connecting(Connecting { remote_ip: remote_ip.clone() })),
};
storage
.on_event(0, node, ts, connector_request::Request::Peer(connecting_event.clone()))
.on_event(0, node, ts, 0, connector_request::Request::Peer(connecting_event.clone()))
.await
.expect("Should process event");

Expand Down Expand Up @@ -1008,7 +1037,7 @@ mod tests {
})),
};
storage
.on_event(0, node, ts, connector_request::Request::Peer(connected_event.clone()))
.on_event(0, node, ts, 1, connector_request::Request::Peer(connected_event.clone()))
.await
.expect("Should process event");
assert_eq!(
Expand Down Expand Up @@ -1037,7 +1066,10 @@ mod tests {
peer: "peer".to_string(),
})),
};
storage.on_event(0, node, ts, connector_request::Request::Peer(join_event.clone())).await.expect("Should process event");
storage
.on_event(0, node, ts, 2, connector_request::Request::Peer(join_event.clone()))
.await
.expect("Should process event");

assert_eq!(
storage.pop_hook_event(),
Expand Down Expand Up @@ -1102,7 +1134,7 @@ mod tests {
})),
};
storage
.on_event(1000, node, ts, connector_request::Request::Peer(leave_event.clone()))
.on_event(1000, node, ts, 3, connector_request::Request::Peer(leave_event.clone()))
.await
.expect("Should process event");

Expand Down Expand Up @@ -1179,7 +1211,10 @@ mod tests {
peer: "peer".to_string(),
})),
};
storage.on_event(0, node, ts, connector_request::Request::Peer(join_event.clone())).await.expect("Should process event");
storage
.on_event(0, node, ts, 4, connector_request::Request::Peer(join_event.clone()))
.await
.expect("Should process event");
assert_eq!(
storage.pop_hook_event(),
Some((
Expand Down

0 comments on commit 41cf961

Please sign in to comment.