Skip to content

Commit

Permalink
chore: refactor Datadog Status/Flare component in preparation for sha…
Browse files Browse the repository at this point in the history
…red telemetry via reflector (#483)
  • Loading branch information
tobz authored Feb 10, 2025
1 parent 3213257 commit 258633d
Show file tree
Hide file tree
Showing 14 changed files with 460 additions and 283 deletions.
9 changes: 5 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions bin/agent-data-plane/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ fips = ["saluki-app/tls-fips"]
[dependencies]
async-trait = { workspace = true }
bytesize = { workspace = true }
chrono = { workspace = true }
datadog-protos = { workspace = true }
memory-accounting = { workspace = true }
rand = { workspace = true }
saluki-app = { workspace = true, features = ["full"] }
saluki-components = { workspace = true }
saluki-config = { workspace = true }
Expand All @@ -34,7 +37,9 @@ tokio = { workspace = true, features = [
"signal",
] }
tokio-rustls = { workspace = true }
tonic = { workspace = true }
tracing = { workspace = true }
uuid = { workspace = true, features = ["std", "v7"] }

[target.'cfg(target_os = "linux")'.dependencies]
tikv-jemalloc-ctl = { workspace = true, features = ["use_std"] }
Expand Down
81 changes: 81 additions & 0 deletions bin/agent-data-plane/src/api/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use std::future::pending;

use saluki_app::api::APIBuilder;
use saluki_config::GenericConfiguration;
use saluki_error::{generic_error, ErrorContext as _, GenericError};
use saluki_io::net::ListenAddress;
use tracing::{error, info};

mod remote_agent;
use self::remote_agent::RemoteAgentHelperConfiguration;

const PRIMARY_UNPRIVILEGED_API_PORT: u16 = 5100;
const PRIMARY_PRIVILEGED_API_PORT: u16 = 5101;

pub async fn configure_and_spawn_api_endpoints(
config: &GenericConfiguration, unprivileged_api: APIBuilder, mut privileged_api: APIBuilder,
) -> Result<(), GenericError> {
let api_listen_address = config
.try_get_typed("api_listen_address")
.error_context("Failed to get API listen address.")?
.unwrap_or_else(|| ListenAddress::any_tcp(PRIMARY_UNPRIVILEGED_API_PORT));

let secure_api_listen_address = config
.try_get_typed("secure_api_listen_address")
.error_context("Failed to get secure API listen address.")?
.unwrap_or_else(|| ListenAddress::any_tcp(PRIMARY_PRIVILEGED_API_PORT));

// When not in standalone mode, install the necessary components for registering ourselves with the Datadog Agent as
// a "remote agent", which wires up ADP to allow the Datadog Agent to query it for status and flare information.
let in_standalone_mode = config.get_typed_or_default::<bool>("adp.standalone_mode");
if !in_standalone_mode {
let local_secure_api_listen_addr = secure_api_listen_address
.as_local_connect_addr()
.ok_or_else(|| generic_error!("Failed to get local secure API listen address to advertise."))?;

// Build and spawn our helper task for registering ourselves with the Datadog Agent as a remote agent.
let remote_agent_config =
RemoteAgentHelperConfiguration::from_configuration(config, local_secure_api_listen_addr).await?;
let remote_agent_service = remote_agent_config.spawn().await;

// Register our Remote Agent gRPC service with the privileged API.
privileged_api = privileged_api.with_grpc_service(remote_agent_service);
}

spawn_unprivileged_api(unprivileged_api, api_listen_address).await?;
spawn_privileged_api(privileged_api, secure_api_listen_address).await?;

Ok(())
}

async fn spawn_unprivileged_api(
api_builder: APIBuilder, api_listen_address: ListenAddress,
) -> Result<(), GenericError> {
// TODO: Use something better than `pending()`... perhaps something like a more generalized
// `ComponentShutdownCoordinator` that allows for triggering and waiting for all attached tasks to signal that
// they've shutdown.
tokio::spawn(async move {
info!("Serving unprivileged API on {}.", api_listen_address);

if let Err(e) = api_builder.serve(api_listen_address, pending()).await {
error!("Failed to serve unprivileged API: {}", e);
}
});

Ok(())
}

async fn spawn_privileged_api(api_builder: APIBuilder, api_listen_address: ListenAddress) -> Result<(), GenericError> {
// TODO: Use something better than `pending()`... perhaps something like a more generalized
// `ComponentShutdownCoordinator` that allows for triggering and waiting for all attached tasks to signal that
// they've shutdown.
tokio::spawn(async move {
info!("Serving privileged API on {}.", api_listen_address);

if let Err(e) = api_builder.serve(api_listen_address, pending()).await {
error!("Failed to serve privileged API: {}", e);
}
});

Ok(())
}
127 changes: 127 additions & 0 deletions bin/agent-data-plane/src/api/remote_agent/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
use std::time::Duration;
use std::{collections::HashMap, net::SocketAddr};

use async_trait::async_trait;
use chrono::{DateTime, Utc};
use datadog_protos::agent::{
GetFlareFilesRequest, GetFlareFilesResponse, GetStatusDetailsRequest, GetStatusDetailsResponse, RemoteAgent,
RemoteAgentServer, StatusSection,
};
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use saluki_config::GenericConfiguration;
use saluki_env::helpers::remote_agent::RemoteAgentClient;
use saluki_error::GenericError;
use tokio::time::{interval, MissedTickBehavior};
use tracing::debug;
use uuid::Uuid;

/// Remote Agent helper configuration.
pub struct RemoteAgentHelperConfiguration {
id: String,
display_name: String,
local_api_listen_addr: SocketAddr,
client: RemoteAgentClient,
}

impl RemoteAgentHelperConfiguration {
/// Creates a new `RemoteAgentHelperConfiguration` from the given configuration.
pub async fn from_configuration(
config: &GenericConfiguration, local_api_listen_addr: SocketAddr,
) -> Result<Self, GenericError> {
let app_details = saluki_metadata::get_app_details();
let formatted_full_name = app_details
.full_name()
.replace(" ", "-")
.replace("_", "-")
.to_lowercase();
let client = RemoteAgentClient::from_configuration(config).await?;

Ok(Self {
id: format!("{}-{}", formatted_full_name, Uuid::now_v7()),
display_name: formatted_full_name,
local_api_listen_addr,
client,
})
}

/// Spawns the remote agent helper task.
///
/// The spawned task ensures that this process is registered as a Remote Agent with the configured Datadog Agent
/// instance. Additionally, an implementation of the `RemoteAgent` gRPC service is returned that must be installed
/// on the API server that is listening at `local_api_listen_addr`.
pub async fn spawn(self) -> RemoteAgentServer<RemoteAgentImpl> {
let service_impl = RemoteAgentImpl { started: Utc::now() };
let service = RemoteAgentServer::new(service_impl);

tokio::spawn(run_remote_agent_helper(
self.id,
self.display_name,
self.local_api_listen_addr,
self.client,
));

service
}
}

async fn run_remote_agent_helper(
id: String, display_name: String, local_api_listen_addr: SocketAddr, mut client: RemoteAgentClient,
) {
let local_api_listen_addr = local_api_listen_addr.to_string();
let auth_token: String = thread_rng()
.sample_iter(&Alphanumeric)
.take(64)
.map(char::from)
.collect();

let mut register_agent = interval(Duration::from_secs(10));
register_agent.set_missed_tick_behavior(MissedTickBehavior::Delay);

debug!("Remote Agent helper started.");

loop {
register_agent.tick().await;
match client
.register_remote_agent_request(&id, &display_name, &local_api_listen_addr, &auth_token)
.await
{
Ok(resp) => {
let new_refresh_interval = resp.into_inner().recommended_refresh_interval_secs;
register_agent.reset_after(Duration::from_secs(new_refresh_interval as u64));
debug!("Refreshed registration with Datadog Agent");
}
Err(e) => {
debug!("Failed to refresh registration with Datadog Agent: {}", e);
}
}
}
}

#[derive(Default)]
pub struct RemoteAgentImpl {
started: DateTime<Utc>,
}

#[async_trait]
impl RemoteAgent for RemoteAgentImpl {
async fn get_status_details(
&self, _request: tonic::Request<GetStatusDetailsRequest>,
) -> Result<tonic::Response<GetStatusDetailsResponse>, tonic::Status> {
let mut status_fields = HashMap::new();
status_fields.insert("Started".to_string(), self.started.to_rfc3339());
let response = GetStatusDetailsResponse {
main_section: Some(StatusSection { fields: status_fields }),
named_sections: HashMap::new(),
};
Ok(tonic::Response::new(response))
}

async fn get_flare_files(
&self, _request: tonic::Request<GetFlareFilesRequest>,
) -> Result<tonic::Response<GetFlareFilesResponse>, tonic::Status> {
let response = GetFlareFilesResponse {
files: HashMap::default(),
};
Ok(tonic::Response::new(response))
}
}
Loading

0 comments on commit 258633d

Please sign in to comment.