From 85b81f97968fd8f98ed314e60348598d1c598624 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patrick=20Jos=C3=A9=20Pereira?= Date: Mon, 2 Sep 2024 18:36:13 -0300 Subject: [PATCH] onvif: Add first version MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Patrick José Pereira --- src/lib/controls/mod.rs | 3 +- src/lib/controls/onvif/client.rs | 150 +++++++++++++++++++++++++++ src/lib/controls/onvif/manager.rs | 165 ++++++++++++++++++++++++++++++ src/lib/controls/onvif/mod.rs | 2 + src/main.rs | 4 +- 5 files changed, 322 insertions(+), 2 deletions(-) create mode 100644 src/lib/controls/onvif/client.rs create mode 100644 src/lib/controls/onvif/manager.rs create mode 100644 src/lib/controls/onvif/mod.rs diff --git a/src/lib/controls/mod.rs b/src/lib/controls/mod.rs index dd198c6d..cd3ad155 100644 --- a/src/lib/controls/mod.rs +++ b/src/lib/controls/mod.rs @@ -1 +1,2 @@ -pub mod types; \ No newline at end of file +pub mod onvif; +pub mod types; diff --git a/src/lib/controls/onvif/client.rs b/src/lib/controls/onvif/client.rs new file mode 100644 index 00000000..cd2889a2 --- /dev/null +++ b/src/lib/controls/onvif/client.rs @@ -0,0 +1,150 @@ +use onvif::soap; +use onvif_schema::{devicemgmt::GetDeviceInformationResponse, transport}; + +use anyhow::{anyhow, Result}; +use tracing::*; + +pub struct Clients { + devicemgmt: soap::client::Client, + event: Option, + deviceio: Option, + media: Option, + media2: Option, + imaging: Option, + ptz: Option, + analytics: Option, +} + +pub struct Auth { + pub credentials: Option, + pub url: Box, +} + +impl Clients { + pub async fn try_new(auth: &Auth) -> Result { + let creds = &auth.credentials; + dbg!(&auth.url); + let devicemgmt_uri = url::Url::parse(&auth.url)?; + let base_uri = &devicemgmt_uri.origin().ascii_serialization(); + + let mut this = Self { + devicemgmt: soap::client::ClientBuilder::new(&devicemgmt_uri) + .credentials(creds.clone()) + .build(), + imaging: None, + ptz: None, + event: None, + deviceio: None, + media: None, + media2: None, + analytics: None, + }; + + let services = + onvif_schema::devicemgmt::get_services(&this.devicemgmt, &Default::default()).await?; + + dbg!(&services); + for service in &services.service { + dbg!(&service.x_addr); + let service_url = url::Url::parse(&service.x_addr).map_err(anyhow::Error::msg)?; + if !service_url.as_str().starts_with(base_uri) { + return Err(anyhow!( + "Service URI {service_url:?} is not within base URI {base_uri:?}" + )); + } + let svc = Some( + soap::client::ClientBuilder::new(&service_url) + .credentials(creds.clone()) + .build(), + ); + match service.namespace.as_str() { + "http://www.onvif.org/ver10/device/wsdl" => { + if service_url != devicemgmt_uri { + warn!( + "advertised device mgmt uri {service_url} not expected {devicemgmt_uri}" + ); + } + } + "http://www.onvif.org/ver10/events/wsdl" => this.event = svc, + "http://www.onvif.org/ver10/deviceIO/wsdl" => this.deviceio = svc, + "http://www.onvif.org/ver10/media/wsdl" => this.media = svc, + "http://www.onvif.org/ver20/media/wsdl" => this.media2 = svc, + "http://www.onvif.org/ver20/imaging/wsdl" => this.imaging = svc, + "http://www.onvif.org/ver20/ptz/wsdl" => this.ptz = svc, + "http://www.onvif.org/ver20/analytics/wsdl" => this.analytics = svc, + _ => debug!("unknown service: {:?}", service), + } + } + + Ok(this) + } + + pub async fn get_device_information( + &self, + ) -> Result { + onvif_schema::devicemgmt::get_device_information(&self.devicemgmt, &Default::default()) + .await + } + + pub async fn get_stream_uris(&self) -> Result, transport::Error> { + let mut urls: Vec = vec![]; + let media_client = self + .media + .as_ref() + .ok_or_else(|| transport::Error::Other("Client media is not available".into()))?; + let profiles = onvif_schema::media::get_profiles(media_client, &Default::default()).await?; + debug!("get_profiles response: {:#?}", &profiles); + let requests: Vec<_> = profiles + .profiles + .iter() + .map( + |p: &onvif_schema::onvif::Profile| onvif_schema::media::GetStreamUri { + profile_token: onvif_schema::onvif::ReferenceToken(p.token.0.clone()), + stream_setup: onvif_schema::onvif::StreamSetup { + stream: onvif_schema::onvif::StreamType::RtpUnicast, + transport: onvif_schema::onvif::Transport { + protocol: onvif_schema::onvif::TransportProtocol::Rtsp, + tunnel: vec![], + }, + }, + }, + ) + .collect(); + + let responses = futures::future::try_join_all( + requests + .iter() + .map(|r| onvif_schema::media::get_stream_uri(media_client, r)), + ) + .await?; + for (p, resp) in profiles.profiles.iter().zip(responses.iter()) { + println!("token={} name={}", &p.token.0, &p.name.0); + println!(" {}", &resp.media_uri.uri); + match url::Url::parse(&resp.media_uri.uri) { + Ok(address) => urls.push(address), + Err(error) => { + error!( + "Failed to parse stream url: {}, reason: {error:?}", + &resp.media_uri.uri + ) + } + } + if let Some(ref v) = p.video_encoder_configuration { + println!( + " {:?}, {}x{}", + v.encoding, v.resolution.width, v.resolution.height + ); + if let Some(ref r) = v.rate_control { + println!(" {} fps, {} kbps", r.frame_rate_limit, r.bitrate_limit); + } + } + if let Some(ref a) = p.audio_encoder_configuration { + println!( + " audio: {:?}, {} kbps, {} kHz", + a.encoding, a.bitrate, a.sample_rate + ); + } + } + Ok(urls) + } +} diff --git a/src/lib/controls/onvif/manager.rs b/src/lib/controls/onvif/manager.rs new file mode 100644 index 00000000..97aa0742 --- /dev/null +++ b/src/lib/controls/onvif/manager.rs @@ -0,0 +1,165 @@ +use std::sync::{Arc, Mutex}; +use std::thread; + +use anyhow::Result; +use async_std::stream::StreamExt; +use tracing::*; + +use crate::stream::types::CaptureConfiguration; +use crate::stream::{gst as gst_stream, manager as stream_manager, types::StreamInformation}; +use crate::video::types::VideoSourceType; +use crate::video::video_source_redirect::{VideoSourceRedirect, VideoSourceRedirectType}; +use crate::video_stream::types::VideoAndStreamInformation; + +use super::client::*; + +lazy_static! { + static ref MANAGER: Arc> = Default::default(); +} + +#[derive(Debug)] +pub struct Manager { + _process: tokio::task::JoinHandle>, +} + +impl Drop for Manager { + fn drop(&mut self) { + self._process.abort(); + } +} + +impl Default for Manager { + #[instrument(level = "trace")] + fn default() -> Self { + Self { + _process: tokio::spawn(async move { Manager::discover_loop().await }), + } + } +} + +impl Manager { + // Construct our manager, should be done inside main + #[instrument(level = "debug")] + pub fn init() { + MANAGER.as_ref(); + } + + #[instrument(level = "debug", fields(endpoint))] + fn run_main_loop() { + tokio::runtime::Builder::new_multi_thread() + .on_thread_start(|| debug!("Thread started")) + .on_thread_stop(|| debug!("Thread stopped")) + .thread_name_fn(|| { + static ATOMIC_ID: std::sync::atomic::AtomicUsize = + std::sync::atomic::AtomicUsize::new(0); + let id = ATOMIC_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + format!("Onvif-{id}") + }) + .worker_threads(2) + .enable_all() + .build() + .expect("Failed building a new tokio runtime") + .block_on(Manager::discover_loop()) + .expect("Error starting Onvif server"); + } + + #[instrument(level = "debug")] + async fn discover_loop() -> Result<()> { + use futures::stream::StreamExt; + use std::net::{IpAddr, Ipv4Addr}; + + loop { + info!("Discovering..."); + + const MAX_CONCURRENT_JUMPERS: usize = 100; + + onvif::discovery::DiscoveryBuilder::default() + .listen_address(IpAddr::V4(Ipv4Addr::UNSPECIFIED)) + .duration(tokio::time::Duration::from_secs(5)) + .run() + .await? + .for_each_concurrent(MAX_CONCURRENT_JUMPERS, |device| async move { + info!("Device found: {device:#?}"); + + //TODO: We should add support to auth later + let credentials = None; + let clients = match Clients::try_new(&Auth { + credentials: credentials.clone(), + url: device.urls.first().unwrap().to_string().into(), + }) + .await + { + Ok(clients) => clients, + Err(error) => { + error!("Failed creating clients: {error:#?}"); + return; + } + }; + + match clients.get_stream_uris().await { + Ok(stream_uris) => { + let mut url = stream_uris[0].clone(); + + let name = if let Ok(device) = &clients.get_device_information().await { + format!("{} - {} - {}", device.model, device.serial_number, url) + } else { + if let Some(name) = device.name { + format!("{name} - {url}") + } else { + format!("{url}") + } + }; + + if let Some(credentials) = credentials { + if url.set_username(&credentials.username).is_err() { + error!("Failed setting username for {url}"); + } + if url.set_password(Some(&credentials.password)).is_err() { + error!("Failed setting password for {url}"); + } + } + let video_source_redirect = VideoSourceRedirect { + name: name.clone(), + source: VideoSourceRedirectType::Redirect( + stream_uris[0].to_string(), + ), + }; + + let video_and_stream = VideoAndStreamInformation { + name: name.clone(), + stream_information: StreamInformation { + endpoints: vec![url], + configuration: CaptureConfiguration::Redirect( + Default::default(), + ), + extended_configuration: None, + }, + video_source: VideoSourceType::Redirect(video_source_redirect), + }; + + if let Ok(streams) = stream_manager::streams().await { + for stream in streams { + if let Err(error) = + video_and_stream.conflicts_with(&stream.video_and_stream) + { + debug!("Stream {name} is already registered: {error}"); + return; + } + } + } + + if let Err(error) = + stream_manager::add_stream_and_start(video_and_stream).await + { + error!("Failed adding stream: {error:#?}"); + } + } + Err(error) => { + error!("Failed getting stream uris: {error:#?}"); + } + } + }) + .await; + } + } +} diff --git a/src/lib/controls/onvif/mod.rs b/src/lib/controls/onvif/mod.rs new file mode 100644 index 00000000..af2485e8 --- /dev/null +++ b/src/lib/controls/onvif/mod.rs @@ -0,0 +1,2 @@ +mod client; +pub mod manager; diff --git a/src/main.rs b/src/main.rs index 794c847c..d14be36f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,4 @@ -use mavlink_camera_manager::{cli, helper, logger, mavlink, server, settings, stream}; +use mavlink_camera_manager::{cli, controls, helper, logger, mavlink, server, settings, stream}; use tracing::*; @@ -11,6 +11,8 @@ async fn main() -> Result<(), std::io::Error> { // Settings should start before everybody else to ensure that the CLI are stored settings::manager::init(Some(&cli::manager::settings_file())); + controls::onvif::manager::Manager::init(); + mavlink::manager::Manager::init(); stream::manager::init();