Skip to content

Commit

Permalink
[WIP] Avoid locks and use proper tasking instead of multiple tokio ru…
Browse files Browse the repository at this point in the history
…ntimes
  • Loading branch information
joaoantoniocardoso committed Nov 30, 2023
1 parent c2b31e9 commit 9cafdf5
Show file tree
Hide file tree
Showing 9 changed files with 253 additions and 244 deletions.
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async fn main() -> Result<(), std::io::Error> {

stream::webrtc::signalling_server::SignallingServer::default();

if let Err(error) = stream::manager::start_default() {
if let Err(error) = stream::manager::start_default().await {
error!("Failed to start default streams. Reason: {error:?}")
}

Expand Down
30 changes: 8 additions & 22 deletions src/mavlink/mavlink_camera.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use super::utils::*;
#[derive(Debug)]
pub struct MavlinkCameraHandle {
inner: Arc<MavlinkCamera>,
_runtime: tokio::runtime::Runtime,
heartbeat_handle: tokio::task::JoinHandle<()>,
messages_handle: tokio::task::JoinHandle<()>,
}
Expand All @@ -34,33 +33,18 @@ struct MavlinkCamera {

impl MavlinkCameraHandle {
#[instrument(level = "debug")]
pub fn try_new(video_and_stream_information: &VideoAndStreamInformation) -> Result<Self> {
pub async fn try_new(video_and_stream_information: &VideoAndStreamInformation) -> Result<Self> {
let inner = Arc::new(MavlinkCamera::try_new(video_and_stream_information)?);

let sender = crate::mavlink::manager::Manager::get_sender();

let runtime = tokio::runtime::Builder::new_multi_thread()
.on_thread_start(|| debug!("Thread started"))
.on_thread_stop(|| debug!("Thread stopped"))
.thread_name_fn(|| {
static ATOMIC_ID: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
let id = ATOMIC_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
format!("MavlinkCamera-{id}")
})
.worker_threads(2)
.enable_all()
.build()
.expect("Failed building a new tokio runtime");

let heartbeat_handle =
runtime.spawn(MavlinkCamera::heartbeat_loop(inner.clone(), sender.clone()));
tokio::spawn(MavlinkCamera::heartbeat_loop(inner.clone(), sender.clone()));
let messages_handle =
runtime.spawn(MavlinkCamera::messages_loop(inner.clone(), sender.clone()));
tokio::spawn(MavlinkCamera::messages_loop(inner.clone(), sender.clone()));

Ok(Self {
inner,
_runtime: runtime,
heartbeat_handle,
messages_handle,
})
Expand Down Expand Up @@ -166,14 +150,16 @@ impl MavlinkCamera {
let mut receiver = sender.subscribe();

loop {
let (header, message) = match receiver.recv().await {
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
// Note: we can't block this task awaiting for recv, otherwise it might not die. This is why we use try_recv here
let (header, message) = match receiver.try_recv() {
Ok(Message::Received(message)) => message,
Err(broadcast::error::RecvError::Closed) => {
Err(broadcast::error::TryRecvError::Closed) => {
unreachable!(
"Closed channel: This should never happen, this channel is static!"
);
}
Ok(Message::ToBeSent(_)) | Err(broadcast::error::RecvError::Lagged(_)) => continue,
Ok(Message::ToBeSent(_)) | Err(_) => continue,
};

trace!("Message received: {header:?}, {message:?}");
Expand Down
36 changes: 18 additions & 18 deletions src/server/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ pub async fn run(server_address: &str) -> Result<(), std::io::Error> {
App::new()
// Add debug call for API access
.wrap_fn(|req, srv| {
trace!("{:#?}", &req);
let fut = srv.call(req);
async { fut.await }
trace!("{req:#?}");
srv.call(req)
})
.wrap(TracingLogger::default())
.wrap(actix_web::middleware::Logger::default())
Expand Down Expand Up @@ -67,25 +66,26 @@ pub async fn run(server_address: &str) -> Result<(), std::io::Error> {
)
.route("/xml", web::get().to(pages::xml))
.route("/sdp", web::get().to(pages::sdp))
.service(
web::scope("/thumbnail")
// Add a rate limitter to prevent flood
.wrap(
RateLimiter::builder(
InMemoryBackend::builder().build(),
SimpleInputFunctionBuilder::new(std::time::Duration::from_secs(1), 4)
.real_ip_key()
.build(),
)
.add_headers()
.build(),
)
.route("", web::get().to(pages::thumbnail)),
)
// .service(
// web::scope("/thumbnail")
// // Add a rate limitter to prevent flood
// .wrap(
// RateLimiter::builder(
// InMemoryBackend::builder().build(),
// SimpleInputFunctionBuilder::new(std::time::Duration::from_secs(1), 4)
// .real_ip_key()
// .build(),
// )
// .add_headers()
// .build(),
// )
// .route("", web::get().to(pages::thumbnail)),
// )
.build()
})
.bind(server_address)
.expect("Failed starting web API")
.workers(1)
.run()
.await
}
84 changes: 43 additions & 41 deletions src/server/pages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ pub fn v4l_post(json: web::Json<V4lControl>) -> HttpResponse {
pub async fn reset_settings(query: web::Query<ResetSettings>) -> HttpResponse {
if query.all.unwrap_or_default() {
settings::manager::reset();
if let Err(error) = stream_manager::start_default() {
if let Err(error) = stream_manager::start_default().await {
return HttpResponse::InternalServerError()
.content_type("text/plain")
.body(format!("{error:#?}"));
Expand Down Expand Up @@ -225,7 +225,7 @@ pub async fn streams() -> HttpResponse {

#[api_v2_operation]
/// Create a video stream
pub fn streams_post(json: web::Json<PostStream>) -> HttpResponse {
pub async fn streams_post(json: web::Json<PostStream>) -> HttpResponse {
let json = json.into_inner();

let video_source = match video_source::get_video_source(&json.source) {
Expand All @@ -241,7 +241,9 @@ pub fn streams_post(json: web::Json<PostStream>) -> HttpResponse {
name: json.name,
stream_information: json.stream_information,
video_source,
}) {
})
.await
{
return HttpResponse::NotAcceptable()
.content_type("text/plain")
.body(format!("{error:#?}"));
Expand Down Expand Up @@ -380,41 +382,41 @@ pub fn sdp(sdp_file_request: web::Query<SdpFileRequest>) -> HttpResponse {
}
}

#[api_v2_operation]
/// Provides a thumbnail file of the given source
pub async fn thumbnail(thumbnail_file_request: web::Query<ThumbnailFileRequest>) -> HttpResponse {
// Ideally, we should be using `actix_web_validator::Query` instead of `web::Query`,
// but because paperclip (at least until 0.8) is using `actix-web-validator 3.x`,
// and `validator 0.14`, the newest api needed to use it along #[api_v2_operation]
// wasn't implemented yet, it doesn't compile.
// To workaround this, we are manually calling the validator here, using actix to
// automatically handle the validation error for us as it normally would.
// TODO: update this function to use `actix_web_validator::Query` directly and get
// rid of this workaround.
if let Err(errors) = thumbnail_file_request.validate() {
warn!("Failed validating ThumbnailFileRequest. Reason: {errors:?}");
return actix_web::ResponseError::error_response(&actix_web_validator::Error::from(errors));
}

let source = thumbnail_file_request.source.clone();
let quality = thumbnail_file_request.quality.unwrap_or(70u8);
let target_height = thumbnail_file_request.target_height.map(|v| v as u32);

match stream_manager::get_jpeg_thumbnail_from_source(source, quality, target_height).await {
Some(Ok(image)) => HttpResponse::Ok().content_type("image/jpeg").body(image),
None => HttpResponse::NotFound()
.content_type("text/plain")
.body(format!(
"Thumbnail not found for source {:?}.",
thumbnail_file_request.source
)),
Some(Err(error)) => HttpResponse::ServiceUnavailable()
.reason("Thumbnail temporarily unavailable")
.insert_header((header::RETRY_AFTER, 10))
.content_type("text/plain")
.body(format!(
"Thumbnail for source {:?} is temporarily unavailable. Try again later. Details: {error:?}",
thumbnail_file_request.source
)),
}
}
// #[api_v2_operation]
// /// Provides a thumbnail file of the given source
// pub async fn thumbnail(thumbnail_file_request: web::Query<ThumbnailFileRequest>) -> HttpResponse {
// Ideally, we should be using `actix_web_validator::Query` instead of `web::Query`,
// but because paperclip (at least until 0.8) is using `actix-web-validator 3.x`,
// and `validator 0.14`, the newest api needed to use it along #[api_v2_operation]
// wasn't implemented yet, it doesn't compile.
// To workaround this, we are manually calling the validator here, using actix to
// automatically handle the validation error for us as it normally would.
// TODO: update this function to use `actix_web_validator::Query` directly and get
// rid of this workaround.
// if let Err(errors) = thumbnail_file_request.validate() {
// warn!("Failed validating ThumbnailFileRequest. Reason: {errors:?}");
// return actix_web::ResponseError::error_response(&actix_web_validator::Error::from(errors));
// }

// let source = thumbnail_file_request.source.clone();
// let quality = thumbnail_file_request.quality.unwrap_or(70u8);
// let target_height = thumbnail_file_request.target_height.map(|v| v as u32);

// match stream_manager::get_jpeg_thumbnail_from_source(source, quality, target_height).await {
// Some(Ok(image)) => HttpResponse::Ok().content_type("image/jpeg").body(image),
// None => HttpResponse::NotFound()
// .content_type("text/plain")
// .body(format!(
// "Thumbnail not found for source {:?}.",
// thumbnail_file_request.source
// )),
// Some(Err(error)) => HttpResponse::ServiceUnavailable()
// .reason("Thumbnail temporarily unavailable")
// .insert_header((header::RETRY_AFTER, 10))
// .content_type("text/plain")
// .body(format!(
// "Thumbnail for source {:?} is temporarily unavailable. Try again later. Details: {error:?}",
// thumbnail_file_request.source
// )),
// }
// }
26 changes: 13 additions & 13 deletions src/settings/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use directories::ProjectDirs;
use serde::{Deserialize, Serialize};
use std::io::prelude::*;
use std::path::Path;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, RwLock};
use tracing::*;

use crate::cli;
Expand Down Expand Up @@ -34,7 +34,7 @@ struct Manager {
}

lazy_static! {
static ref MANAGER: Arc<Mutex<Manager>> = Arc::new(Mutex::new(Manager { content: None }));
static ref MANAGER: Arc<RwLock<Manager>> = Arc::new(RwLock::new(Manager { content: None }));
}

impl Default for SettingsStruct {
Expand Down Expand Up @@ -101,7 +101,7 @@ impl Manager {
// Init settings manager with the desired settings file,
// will be created if does not exist
pub fn init(file_name: Option<&str>) {
let mut manager = MANAGER.lock().unwrap();
let mut manager = MANAGER.write().unwrap();
let file_name = file_name.unwrap_or("settings.json");
manager.content = Some(Manager::with(file_name));
}
Expand Down Expand Up @@ -153,7 +153,7 @@ fn save_settings_to_file(file_name: &str, content: &SettingsStruct) -> Result<()

// Save the latest state of the settings
pub fn save() {
let manager = MANAGER.lock().unwrap();
let manager = MANAGER.read().unwrap();
//TODO: deal com save problems here
if let Some(content) = &manager.content {
if let Err(error) = save_settings_to_file(&content.file_name, &content.config) {
Expand All @@ -171,12 +171,12 @@ pub fn save() {

#[allow(dead_code)]
pub fn header() -> HeaderSettingsFile {
let manager = MANAGER.lock().unwrap();
let manager = MANAGER.read().unwrap();
return manager.content.as_ref().unwrap().config.header.clone();
}

pub fn mavlink_endpoint() -> Option<String> {
let manager = MANAGER.lock().unwrap();
let manager = MANAGER.read().unwrap();
return manager
.content
.as_ref()
Expand All @@ -189,23 +189,23 @@ pub fn mavlink_endpoint() -> Option<String> {
pub fn set_mavlink_endpoint(endpoint: &str) {
//TODO: make content more easy to access
{
let mut manager = MANAGER.lock().unwrap();
let mut manager = MANAGER.write().unwrap();
let mut content = manager.content.as_mut();
content.as_mut().unwrap().config.mavlink_endpoint = Some(endpoint.into());
}
save();
}

pub fn streams() -> Vec<VideoAndStreamInformation> {
let manager = MANAGER.lock().unwrap();
let manager = MANAGER.read().unwrap();
let content = manager.content.as_ref();
content.unwrap().config.streams.clone()
}

pub fn set_streams(streams: &[VideoAndStreamInformation]) {
// Take care of scope mutex
// Take care of scope RwLock
{
let mut manager = MANAGER.lock().unwrap();
let mut manager = MANAGER.write().unwrap();
let mut content = manager.content.as_mut();
content.as_mut().unwrap().config.streams.clear();
content
Expand All @@ -219,9 +219,9 @@ pub fn set_streams(streams: &[VideoAndStreamInformation]) {
}

pub fn reset() {
// Take care of scope mutex
// Take care of scope RwLock
{
let mut manager = MANAGER.lock().unwrap();
let mut manager = MANAGER.write().unwrap();
manager.content.as_mut().unwrap().config = SettingsStruct::default();
}
save();
Expand Down Expand Up @@ -254,7 +254,7 @@ mod tests {
#[test]
fn test_no_aboslute_path() {
init(None);
let manager = MANAGER.lock().unwrap();
let manager = MANAGER.read().unwrap();
let file_name = &manager.content.as_ref().unwrap().file_name;
assert!(
std::path::Path::new(&file_name).exists(),
Expand Down
Loading

0 comments on commit 9cafdf5

Please sign in to comment.