Skip to content

Commit

Permalink
onvif: Add first version
Browse files Browse the repository at this point in the history
Signed-off-by: Patrick José Pereira <[email protected]>
  • Loading branch information
patrickelectric committed Sep 3, 2024
1 parent 9a29545 commit a2cd4fc
Show file tree
Hide file tree
Showing 5 changed files with 313 additions and 1 deletion.
1 change: 1 addition & 0 deletions src/lib/controls/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod onvif;
pub mod types;
148 changes: 148 additions & 0 deletions src/lib/controls/onvif/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
use onvif_schema::{devicemgmt::GetDeviceInformationResponse, transport};
use onvif::soap;

use anyhow::{anyhow, Result};
use tracing::*;

pub struct Clients {
devicemgmt: soap::client::Client,
event: Option<soap::client::Client>,
deviceio: Option<soap::client::Client>,
media: Option<soap::client::Client>,
media2: Option<soap::client::Client>,
imaging: Option<soap::client::Client>,
ptz: Option<soap::client::Client>,
analytics: Option<soap::client::Client>,
}

pub struct Auth {
pub credentials: Option<soap::client::Credentials>,
pub url: Box<str>,
}

impl Clients {
pub async fn try_new(auth: &Auth) -> Result<Self> {
let creds = &auth.credentials;
dbg!(&auth.url);
let devicemgmt_uri = url::Url::parse(&auth.url)?;
let base_uri = &devicemgmt_uri.origin().ascii_serialization();

let mut this = Self {
devicemgmt: soap::client::ClientBuilder::new(&devicemgmt_uri)
.credentials(creds.clone())
.build(),
imaging: None,
ptz: None,
event: None,
deviceio: None,
media: None,
media2: None,
analytics: None,
};

let services =
onvif_schema::devicemgmt::get_services(&this.devicemgmt, &Default::default()).await?;

dbg!(&services);
for service in &services.service {
dbg!(&service.x_addr);
let service_url = url::Url::parse(&service.x_addr).map_err(anyhow::Error::msg)?;
if !service_url.as_str().starts_with(base_uri) {
return Err(anyhow!(
"Service URI {service_url:?} is not within base URI {base_uri:?}"
));
}
let svc = Some(
soap::client::ClientBuilder::new(&service_url)
.credentials(creds.clone())
.build(),
);
match service.namespace.as_str() {
"http://www.onvif.org/ver10/device/wsdl" => {
if service_url != devicemgmt_uri {
warn!(
"advertised device mgmt uri {service_url} not expected {devicemgmt_uri}"
);
}
}
"http://www.onvif.org/ver10/events/wsdl" => this.event = svc,
"http://www.onvif.org/ver10/deviceIO/wsdl" => this.deviceio = svc,
"http://www.onvif.org/ver10/media/wsdl" => this.media = svc,
"http://www.onvif.org/ver20/media/wsdl" => this.media2 = svc,
"http://www.onvif.org/ver20/imaging/wsdl" => this.imaging = svc,
"http://www.onvif.org/ver20/ptz/wsdl" => this.ptz = svc,
"http://www.onvif.org/ver20/analytics/wsdl" => this.analytics = svc,
_ => debug!("unknown service: {:?}", service),
}
}

Ok(this)
}

pub async fn get_device_information(&self) -> Result<GetDeviceInformationResponse, transport::Error> {
onvif_schema::devicemgmt::get_device_information(
&self.devicemgmt,
&Default::default()
)
.await
}

pub async fn get_stream_uris(&self) -> Result<Vec<url::Url>, transport::Error> {
let mut urls: Vec<url::Url> = vec![];
let media_client = self
.media
.as_ref()
.ok_or_else(|| transport::Error::Other("Client media is not available".into()))?;
let profiles = onvif_schema::media::get_profiles(media_client, &Default::default()).await?;
debug!("get_profiles response: {:#?}", &profiles);
let requests: Vec<_> = profiles
.profiles
.iter()
.map(
|p: &onvif_schema::onvif::Profile| onvif_schema::media::GetStreamUri {
profile_token: onvif_schema::onvif::ReferenceToken(p.token.0.clone()),
stream_setup: onvif_schema::onvif::StreamSetup {
stream: onvif_schema::onvif::StreamType::RtpUnicast,
transport: onvif_schema::onvif::Transport {
protocol: onvif_schema::onvif::TransportProtocol::Rtsp,
tunnel: vec![],
},
},
},
)
.collect();

let responses = futures::future::try_join_all(
requests
.iter()
.map(|r| onvif_schema::media::get_stream_uri(media_client, r)),
)
.await?;
for (p, resp) in profiles.profiles.iter().zip(responses.iter()) {
println!("token={} name={}", &p.token.0, &p.name.0);
println!(" {}", &resp.media_uri.uri);
match url::Url::parse(&resp.media_uri.uri) {
Ok(address) => urls.push(address),
Err(error) => {
error!("Failed to parse stream url: {}, reason: {error:?}", &resp.media_uri.uri)
}
}
if let Some(ref v) = p.video_encoder_configuration {
println!(
" {:?}, {}x{}",
v.encoding, v.resolution.width, v.resolution.height
);
if let Some(ref r) = v.rate_control {
println!(" {} fps, {} kbps", r.frame_rate_limit, r.bitrate_limit);
}
}
if let Some(ref a) = p.audio_encoder_configuration {
println!(
" audio: {:?}, {} kbps, {} kHz",
a.encoding, a.bitrate, a.sample_rate
);
}
}
Ok(urls)
}
}
159 changes: 159 additions & 0 deletions src/lib/controls/onvif/manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
use std::sync::{Arc, Mutex};
use std::thread;

use anyhow::Result;
use async_std::stream::StreamExt;
use tracing::*;

use crate::stream::types::CaptureConfiguration;
use crate::video::types::VideoSourceType;
use crate::video_stream::types::VideoAndStreamInformation;
use crate::stream::{gst as gst_stream, manager as stream_manager, types::StreamInformation};
use crate::video::video_source_redirect::{VideoSourceRedirect, VideoSourceRedirectType};

use super::client::*;

lazy_static! {
static ref MANAGER: Arc<Mutex<Manager>> = Default::default();
}

#[derive(Debug)]
pub struct Manager {
_process: tokio::task::JoinHandle<Result<(), anyhow::Error>>,
}

impl Drop for Manager {
fn drop(&mut self) {
self._process.abort();
}
}

impl Default for Manager {
#[instrument(level = "trace")]
fn default() -> Self {
Self {
_process: tokio::spawn(async move {
Manager::discover_loop().await
})
}
}
}

impl Manager {
// Construct our manager, should be done inside main
#[instrument(level = "debug")]
pub fn init() {
MANAGER.as_ref();
}

#[instrument(level = "debug", fields(endpoint))]
fn run_main_loop() {
tokio::runtime::Builder::new_multi_thread()
.on_thread_start(|| debug!("Thread started"))
.on_thread_stop(|| debug!("Thread stopped"))
.thread_name_fn(|| {
static ATOMIC_ID: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
let id = ATOMIC_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
format!("Onvif-{id}")
})
.worker_threads(2)
.enable_all()
.build()
.expect("Failed building a new tokio runtime")
.block_on(Manager::discover_loop())
.expect("Error starting Onvif server");
}

#[instrument(level = "debug")]
async fn discover_loop() -> Result<()> {
use futures::stream::StreamExt;
use std::net::{IpAddr, Ipv4Addr};

loop {
info!("Discovering...");

const MAX_CONCURRENT_JUMPERS: usize = 100;

onvif::discovery::DiscoveryBuilder::default()
.listen_address(IpAddr::V4(Ipv4Addr::UNSPECIFIED))
.duration(tokio::time::Duration::from_secs(5))
.run()
.await?
.for_each_concurrent(MAX_CONCURRENT_JUMPERS, |device| async move {
info!("Device found: {device:#?}");

//TODO: We should add support to auth later
let credentials = None;
let clients = match Clients::try_new(&Auth {
credentials: credentials.clone(),
url: device.urls.first().unwrap().to_string().into(),
})
.await
{
Ok(clients) => clients,
Err(error) => {
error!("Failed creating clients: {error:#?}");
return;
}
};

match clients.get_stream_uris().await {
Ok(stream_uris) => {
let mut url = stream_uris[0].clone();

let name = if let Ok(device) = &clients.get_device_information().await {
format!("{} - {} - {}", device.model, device.serial_number, url)
} else {
if let Some(name) = device.name {
format!("{name} - {url}")
} else {
format!("{url}")
}
};

if let Some(credentials) = credentials {
if url.set_username(&credentials.username).is_err() {
error!("Failed setting username for {url}");
}
if url.set_password(Some(&credentials.password)).is_err() {
error!("Failed setting password for {url}");
}
}
let video_source_redirect = VideoSourceRedirect {
name: name.clone(),
source: VideoSourceRedirectType::Redirect(stream_uris[0].to_string()),
};

let video_and_stream = VideoAndStreamInformation {
name: name.clone(),
stream_information: StreamInformation {
endpoints: vec![url],
configuration: CaptureConfiguration::Redirect(Default::default()),
extended_configuration: None,
},
video_source: VideoSourceType::Redirect(video_source_redirect),
};

if let Ok(streams) = stream_manager::streams().await {
for stream in streams {
if let Err(error) = video_and_stream.conflicts_with(&stream.video_and_stream) {
debug!("Stream {name} is already registered: {error}");
return;
}
}
}

if let Err(error) = stream_manager::add_stream_and_start(video_and_stream).await {
error!("Failed adding stream: {error:#?}");
}
}
Err(error) => {
error!("Failed getting stream uris: {error:#?}");
}
}
})
.await;
}
}
}
2 changes: 2 additions & 0 deletions src/lib/controls/onvif/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
mod client;
pub mod manager;
4 changes: 3 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use mavlink_camera_manager::{cli, helper, logger, mavlink, server, settings, stream};
use mavlink_camera_manager::{cli, controls, helper, logger, mavlink, server, settings, stream};

use tracing::*;

Expand All @@ -11,6 +11,8 @@ async fn main() -> Result<(), std::io::Error> {
// Settings should start before everybody else to ensure that the CLI are stored
settings::manager::init(Some(&cli::manager::settings_file()));

controls::onvif::manager::Manager::init();

mavlink::manager::Manager::init();

stream::manager::init();
Expand Down

0 comments on commit a2cd4fc

Please sign in to comment.