forked from mavlink/mavlink-camera-manager
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: Patrick José Pereira <[email protected]>
- Loading branch information
1 parent
f93c4f1
commit 85b81f9
Showing
5 changed files
with
322 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1,2 @@ | ||
pub mod types; | ||
pub mod onvif; | ||
pub mod types; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,150 @@ | ||
use onvif::soap; | ||
use onvif_schema::{devicemgmt::GetDeviceInformationResponse, transport}; | ||
|
||
use anyhow::{anyhow, Result}; | ||
use tracing::*; | ||
|
||
pub struct Clients { | ||
devicemgmt: soap::client::Client, | ||
event: Option<soap::client::Client>, | ||
deviceio: Option<soap::client::Client>, | ||
media: Option<soap::client::Client>, | ||
media2: Option<soap::client::Client>, | ||
imaging: Option<soap::client::Client>, | ||
ptz: Option<soap::client::Client>, | ||
analytics: Option<soap::client::Client>, | ||
} | ||
|
||
pub struct Auth { | ||
pub credentials: Option<soap::client::Credentials>, | ||
pub url: Box<str>, | ||
} | ||
|
||
impl Clients { | ||
pub async fn try_new(auth: &Auth) -> Result<Self> { | ||
let creds = &auth.credentials; | ||
dbg!(&auth.url); | ||
let devicemgmt_uri = url::Url::parse(&auth.url)?; | ||
let base_uri = &devicemgmt_uri.origin().ascii_serialization(); | ||
|
||
let mut this = Self { | ||
devicemgmt: soap::client::ClientBuilder::new(&devicemgmt_uri) | ||
.credentials(creds.clone()) | ||
.build(), | ||
imaging: None, | ||
ptz: None, | ||
event: None, | ||
deviceio: None, | ||
media: None, | ||
media2: None, | ||
analytics: None, | ||
}; | ||
|
||
let services = | ||
onvif_schema::devicemgmt::get_services(&this.devicemgmt, &Default::default()).await?; | ||
|
||
dbg!(&services); | ||
for service in &services.service { | ||
dbg!(&service.x_addr); | ||
let service_url = url::Url::parse(&service.x_addr).map_err(anyhow::Error::msg)?; | ||
if !service_url.as_str().starts_with(base_uri) { | ||
return Err(anyhow!( | ||
"Service URI {service_url:?} is not within base URI {base_uri:?}" | ||
)); | ||
} | ||
let svc = Some( | ||
soap::client::ClientBuilder::new(&service_url) | ||
.credentials(creds.clone()) | ||
.build(), | ||
); | ||
match service.namespace.as_str() { | ||
"http://www.onvif.org/ver10/device/wsdl" => { | ||
if service_url != devicemgmt_uri { | ||
warn!( | ||
"advertised device mgmt uri {service_url} not expected {devicemgmt_uri}" | ||
); | ||
} | ||
} | ||
"http://www.onvif.org/ver10/events/wsdl" => this.event = svc, | ||
"http://www.onvif.org/ver10/deviceIO/wsdl" => this.deviceio = svc, | ||
"http://www.onvif.org/ver10/media/wsdl" => this.media = svc, | ||
"http://www.onvif.org/ver20/media/wsdl" => this.media2 = svc, | ||
"http://www.onvif.org/ver20/imaging/wsdl" => this.imaging = svc, | ||
"http://www.onvif.org/ver20/ptz/wsdl" => this.ptz = svc, | ||
"http://www.onvif.org/ver20/analytics/wsdl" => this.analytics = svc, | ||
_ => debug!("unknown service: {:?}", service), | ||
} | ||
} | ||
|
||
Ok(this) | ||
} | ||
|
||
pub async fn get_device_information( | ||
&self, | ||
) -> Result<GetDeviceInformationResponse, transport::Error> { | ||
onvif_schema::devicemgmt::get_device_information(&self.devicemgmt, &Default::default()) | ||
.await | ||
} | ||
|
||
pub async fn get_stream_uris(&self) -> Result<Vec<url::Url>, transport::Error> { | ||
let mut urls: Vec<url::Url> = vec![]; | ||
let media_client = self | ||
.media | ||
.as_ref() | ||
.ok_or_else(|| transport::Error::Other("Client media is not available".into()))?; | ||
let profiles = onvif_schema::media::get_profiles(media_client, &Default::default()).await?; | ||
debug!("get_profiles response: {:#?}", &profiles); | ||
let requests: Vec<_> = profiles | ||
.profiles | ||
.iter() | ||
.map( | ||
|p: &onvif_schema::onvif::Profile| onvif_schema::media::GetStreamUri { | ||
profile_token: onvif_schema::onvif::ReferenceToken(p.token.0.clone()), | ||
stream_setup: onvif_schema::onvif::StreamSetup { | ||
stream: onvif_schema::onvif::StreamType::RtpUnicast, | ||
transport: onvif_schema::onvif::Transport { | ||
protocol: onvif_schema::onvif::TransportProtocol::Rtsp, | ||
tunnel: vec![], | ||
}, | ||
}, | ||
}, | ||
) | ||
.collect(); | ||
|
||
let responses = futures::future::try_join_all( | ||
requests | ||
.iter() | ||
.map(|r| onvif_schema::media::get_stream_uri(media_client, r)), | ||
) | ||
.await?; | ||
for (p, resp) in profiles.profiles.iter().zip(responses.iter()) { | ||
println!("token={} name={}", &p.token.0, &p.name.0); | ||
println!(" {}", &resp.media_uri.uri); | ||
match url::Url::parse(&resp.media_uri.uri) { | ||
Ok(address) => urls.push(address), | ||
Err(error) => { | ||
error!( | ||
"Failed to parse stream url: {}, reason: {error:?}", | ||
&resp.media_uri.uri | ||
) | ||
} | ||
} | ||
if let Some(ref v) = p.video_encoder_configuration { | ||
println!( | ||
" {:?}, {}x{}", | ||
v.encoding, v.resolution.width, v.resolution.height | ||
); | ||
if let Some(ref r) = v.rate_control { | ||
println!(" {} fps, {} kbps", r.frame_rate_limit, r.bitrate_limit); | ||
} | ||
} | ||
if let Some(ref a) = p.audio_encoder_configuration { | ||
println!( | ||
" audio: {:?}, {} kbps, {} kHz", | ||
a.encoding, a.bitrate, a.sample_rate | ||
); | ||
} | ||
} | ||
Ok(urls) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,165 @@ | ||
use std::sync::{Arc, Mutex}; | ||
use std::thread; | ||
|
||
use anyhow::Result; | ||
use async_std::stream::StreamExt; | ||
use tracing::*; | ||
|
||
use crate::stream::types::CaptureConfiguration; | ||
use crate::stream::{gst as gst_stream, manager as stream_manager, types::StreamInformation}; | ||
use crate::video::types::VideoSourceType; | ||
use crate::video::video_source_redirect::{VideoSourceRedirect, VideoSourceRedirectType}; | ||
use crate::video_stream::types::VideoAndStreamInformation; | ||
|
||
use super::client::*; | ||
|
||
lazy_static! { | ||
static ref MANAGER: Arc<Mutex<Manager>> = Default::default(); | ||
} | ||
|
||
#[derive(Debug)] | ||
pub struct Manager { | ||
_process: tokio::task::JoinHandle<Result<(), anyhow::Error>>, | ||
} | ||
|
||
impl Drop for Manager { | ||
fn drop(&mut self) { | ||
self._process.abort(); | ||
} | ||
} | ||
|
||
impl Default for Manager { | ||
#[instrument(level = "trace")] | ||
fn default() -> Self { | ||
Self { | ||
_process: tokio::spawn(async move { Manager::discover_loop().await }), | ||
} | ||
} | ||
} | ||
|
||
impl Manager { | ||
// Construct our manager, should be done inside main | ||
#[instrument(level = "debug")] | ||
pub fn init() { | ||
MANAGER.as_ref(); | ||
} | ||
|
||
#[instrument(level = "debug", fields(endpoint))] | ||
fn run_main_loop() { | ||
tokio::runtime::Builder::new_multi_thread() | ||
.on_thread_start(|| debug!("Thread started")) | ||
.on_thread_stop(|| debug!("Thread stopped")) | ||
.thread_name_fn(|| { | ||
static ATOMIC_ID: std::sync::atomic::AtomicUsize = | ||
std::sync::atomic::AtomicUsize::new(0); | ||
let id = ATOMIC_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst); | ||
format!("Onvif-{id}") | ||
}) | ||
.worker_threads(2) | ||
.enable_all() | ||
.build() | ||
.expect("Failed building a new tokio runtime") | ||
.block_on(Manager::discover_loop()) | ||
.expect("Error starting Onvif server"); | ||
} | ||
|
||
#[instrument(level = "debug")] | ||
async fn discover_loop() -> Result<()> { | ||
use futures::stream::StreamExt; | ||
use std::net::{IpAddr, Ipv4Addr}; | ||
|
||
loop { | ||
info!("Discovering..."); | ||
|
||
const MAX_CONCURRENT_JUMPERS: usize = 100; | ||
|
||
onvif::discovery::DiscoveryBuilder::default() | ||
.listen_address(IpAddr::V4(Ipv4Addr::UNSPECIFIED)) | ||
.duration(tokio::time::Duration::from_secs(5)) | ||
.run() | ||
.await? | ||
.for_each_concurrent(MAX_CONCURRENT_JUMPERS, |device| async move { | ||
info!("Device found: {device:#?}"); | ||
|
||
//TODO: We should add support to auth later | ||
let credentials = None; | ||
let clients = match Clients::try_new(&Auth { | ||
credentials: credentials.clone(), | ||
url: device.urls.first().unwrap().to_string().into(), | ||
}) | ||
.await | ||
{ | ||
Ok(clients) => clients, | ||
Err(error) => { | ||
error!("Failed creating clients: {error:#?}"); | ||
return; | ||
} | ||
}; | ||
|
||
match clients.get_stream_uris().await { | ||
Ok(stream_uris) => { | ||
let mut url = stream_uris[0].clone(); | ||
|
||
let name = if let Ok(device) = &clients.get_device_information().await { | ||
format!("{} - {} - {}", device.model, device.serial_number, url) | ||
} else { | ||
if let Some(name) = device.name { | ||
format!("{name} - {url}") | ||
} else { | ||
format!("{url}") | ||
} | ||
}; | ||
|
||
if let Some(credentials) = credentials { | ||
if url.set_username(&credentials.username).is_err() { | ||
error!("Failed setting username for {url}"); | ||
} | ||
if url.set_password(Some(&credentials.password)).is_err() { | ||
error!("Failed setting password for {url}"); | ||
} | ||
} | ||
let video_source_redirect = VideoSourceRedirect { | ||
name: name.clone(), | ||
source: VideoSourceRedirectType::Redirect( | ||
stream_uris[0].to_string(), | ||
), | ||
}; | ||
|
||
let video_and_stream = VideoAndStreamInformation { | ||
name: name.clone(), | ||
stream_information: StreamInformation { | ||
endpoints: vec![url], | ||
configuration: CaptureConfiguration::Redirect( | ||
Default::default(), | ||
), | ||
extended_configuration: None, | ||
}, | ||
video_source: VideoSourceType::Redirect(video_source_redirect), | ||
}; | ||
|
||
if let Ok(streams) = stream_manager::streams().await { | ||
for stream in streams { | ||
if let Err(error) = | ||
video_and_stream.conflicts_with(&stream.video_and_stream) | ||
{ | ||
debug!("Stream {name} is already registered: {error}"); | ||
return; | ||
} | ||
} | ||
} | ||
|
||
if let Err(error) = | ||
stream_manager::add_stream_and_start(video_and_stream).await | ||
{ | ||
error!("Failed adding stream: {error:#?}"); | ||
} | ||
} | ||
Err(error) => { | ||
error!("Failed getting stream uris: {error:#?}"); | ||
} | ||
} | ||
}) | ||
.await; | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
mod client; | ||
pub mod manager; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters