Skip to content

Commit

Permalink
feat: add plugin initialization over gRPC and plugin engine struct
Browse files Browse the repository at this point in the history
  • Loading branch information
j-lanson committed Aug 15, 2024
1 parent 3d758c2 commit 7c6305c
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 17 deletions.
52 changes: 52 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion hipcheck/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ fs_extra = "1.3.0"
tonic = "0.12.1"
prost = "0.13.1"
rand = "0.8.5"
tokio = { version = "1.39.2", features = ["time"] }
tokio = { version = "1.39.2", features = ["rt", "sync", "time"] }
futures = "0.3.30"
async-stream = "0.3.5"

# Exactly matching the version of rustls used by ureq
# Get rid of default features since we don't use the AWS backed crypto provider (we use ring).
Expand Down
23 changes: 12 additions & 11 deletions hipcheck/src/plugin/manager.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::hipcheck::plugin_client::PluginClient;
use crate::plugin::{HcPluginClient, Plugin, PluginContext};
use crate::{hc_error, Result, F64};
use futures::future::join_all;
use futures::Future;
use rand::Rng;
use std::collections::HashSet;
use std::ops::Range;
Expand All @@ -13,7 +15,6 @@ pub struct PluginExecutor {
port_range: Range<u16>,
backoff_interval: Duration,
jitter_pct: u8,
est_ports: HashSet<u16>,
}
impl PluginExecutor {
pub fn new(
Expand All @@ -33,21 +34,23 @@ impl PluginExecutor {
port_range,
backoff_interval,
jitter_pct,
est_ports: HashSet::new(),
})
}
fn get_available_port(&mut self) -> Result<u16> {
fn get_available_port(&self) -> Result<u16> {
for i in self.port_range.start..self.port_range.end {
if !self.est_ports.contains(&i)
&& std::net::TcpListener::bind(format!("127.0.0.1:{i}")).is_ok()
{
if std::net::TcpListener::bind(format!("127.0.0.1:{i}")).is_ok() {
return Ok(i);
}
}
Err(hc_error!("Failed to find available port"))
}
pub async fn start_plugin(&mut self, plugin: &Plugin) -> Result<PluginContext> {
let mut rng = rand::thread_rng();
pub async fn start_plugins(&self, plugins: Vec<Plugin>) -> Result<Vec<PluginContext>> {
join_all(plugins.into_iter().map(|p| self.start_plugin(p)))
.await
.into_iter()
.collect()
}
pub async fn start_plugin(&self, plugin: Plugin) -> Result<PluginContext> {
// Plugin startup design has inherent TOCTOU flaws since we tell the plugin
// which port we expect it to bind to. We can try to ensure the port we pass
// on the cmdline is not already in use, but it is still possible for that
Expand All @@ -73,7 +76,7 @@ impl PluginExecutor {
let mut opt_grpc: Option<HcPluginClient> = None;
while conn_attempts < self.max_conn_attempts {
// Jitter could be positive or negative, so mult by 2 to cover both sides
let jitter: i32 = rng.gen_range(0..(2 * self.jitter_pct)) as i32;
let jitter: i32 = rand::thread_rng().gen_range(0..(2 * self.jitter_pct)) as i32;
// Then subtract by self.jitter_pct to center around 0, and add to 100%
let jitter_pct = 1.0 + ((jitter - (self.jitter_pct as i32)) as f64 / 100.0);
// Once we are confident this math works, we can remove this
Expand Down Expand Up @@ -104,14 +107,12 @@ impl PluginExecutor {
spawn_attempts += 1;
continue;
};
self.est_ports.insert(port);
// We now have an open gRPC connection to our plugin process
return Ok(PluginContext {
plugin: plugin.clone(),
port,
grpc,
proc,
channel: None,
});
}
Err(hc_error!(
Expand Down
51 changes: 51 additions & 0 deletions hipcheck/src/plugin/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
mod manager;
mod types;

use crate::hipcheck::Query;
use crate::plugin::manager::*;
pub use crate::plugin::types::*;
use crate::Result;
use futures::future::join_all;
use serde_json::Value;
use std::collections::HashMap;
use tokio::sync::mpsc;

pub fn dummy() {
let plugin = Plugin {
Expand All @@ -17,3 +23,48 @@ pub fn dummy() {
/* jitter_pct */ 10,
);
}

pub async fn initialize_plugins(plugins: Vec<(PluginContext, Value)>) -> Result<Vec<PluginStream>> {
join_all(plugins.into_iter().map(|(p, c)| p.initialize(c)))
.await
.into_iter()
.collect()
}

struct HcPluginCore {
executor: PluginExecutor,
plugins: HashMap<String, PluginStream>,
}
impl HcPluginCore {
// When this object is returned, the plugins are all connected but the
// initialization protocol over the gRPC still needs to be completed
pub async fn new(executor: PluginExecutor, plugins: Vec<(Plugin, Value)>) -> Result<Self> {
// Separate plugins and configs so we can start plugins async
let mut conf_map = HashMap::<String, Value>::new();
let plugins = plugins
.into_iter()
.map(|(p, c)| {
conf_map.insert(p.name.clone(), c);
p
})
.collect();
let ctxs = executor.start_plugins(plugins).await?;
// Rejoin plugin ctx with its config
let mapped_ctxs: Vec<(PluginContext, Value)> = ctxs
.into_iter()
.map(|c| {
let conf = conf_map.remove(&c.plugin.name).unwrap();
(c, conf)
})
.collect();
// Use configs to initialize corresponding plugin
let plugins = HashMap::<String, PluginStream>::from_iter(
initialize_plugins(mapped_ctxs)
.await?
.into_iter()
.map(|p| (p.name().to_owned(), p)),
);
// Now we have a set of started and initialized plugins to interact with
Ok(HcPluginCore { executor, plugins })
}
}
80 changes: 75 additions & 5 deletions hipcheck/src/plugin/types.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use crate::hipcheck::plugin_client::PluginClient;
use crate::hipcheck::{
Configuration, ConfigurationResult as PluginConfigResult, ConfigurationStatus, Empty,
Configuration, ConfigurationResult as PluginConfigResult, ConfigurationStatus, Empty, Query,
Schema as PluginSchema,
};
use crate::{hc_error, Result};
use crate::{hc_error, Result, StdResult};
use serde_json::Value;
use std::collections::HashMap;
use std::ops::Not;
use std::process::Child;
use tonic::codec::Streaming;
use tonic::transport::Channel;

pub type HcPluginClient = PluginClient<Channel>;
Expand Down Expand Up @@ -56,11 +57,14 @@ impl TryFrom<PluginConfigResult> for ConfigurationResult {
// is stuffed into a custom error type enum that equals the protoc-generated one
// minus the success variant.
impl ConfigurationResult {
pub fn as_result(&self) -> std::result::Result<(), ConfigError> {
pub fn as_result(&self) -> Result<()> {
let Ok(error) = self.status.try_into() else {
return Ok(());
};
Err(ConfigError::new(error, self.message.clone()))
Err(hc_error!(
"{}",
ConfigError::new(error, self.message.clone()).to_string()
))
}
}
pub enum ConfigErrorType {
Expand Down Expand Up @@ -94,14 +98,29 @@ impl ConfigError {
ConfigError { error, message }
}
}
impl std::fmt::Display for ConfigError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> StdResult<(), std::fmt::Error> {
use ConfigErrorType::*;
let msg = match &self.message {
Some(s) => format!(": {s}"),
None => "".to_owned(),
};
let err = match self.error {
Unknown => "unknown configuration error occurred",
MissingRequiredConfig => "configuration is missing requried fields",
UnrecognizedConfig => "configuration contains unrecognized fields",
InvalidConfigValue => "configuration contains invalid values",
};
write!(f, "{}{}", msg, err)
}
}

// State for managing an actively running plugin process
pub struct PluginContext {
pub plugin: Plugin,
pub port: u16,
pub grpc: HcPluginClient,
pub proc: Child,
pub channel: Option<String>,
}
// Redefinition of `grpc` field's functions with more useful types, additional
// error & sanity checking
Expand Down Expand Up @@ -139,6 +158,42 @@ impl PluginContext {
let mut res = self.grpc.get_default_policy_expression(Empty {}).await?;
Ok(res.get_ref().policy_expression.to_owned())
}
pub async fn initiate_query_protocol(
&mut self,
mut rx: tokio::sync::mpsc::Receiver<Query>,
) -> Result<Streaming<Query>> {
let stream = async_stream::stream! {
while let Some(item) = rx.recv().await {
yield item;
}
};
match self.grpc.initiate_query_protocol(stream).await {
Ok(resp) => Ok(resp.into_inner()),
Err(e) => Err(hc_error!(
"query protocol initiation failed with tonic status code {}",
e
)),
}
}
pub async fn initialize(mut self, config: Value) -> Result<PluginStream> {
let schemas = HashMap::<String, Schema>::from_iter(
self.get_query_schemas()
.await?
.into_iter()
.map(|s| (s.query_name.clone(), s)),
);
self.set_configuration(&config).await?.as_result()?;
let default_policy_expr = self.get_default_policy_expression().await?;
let (tx, mut out_rx) = tokio::sync::mpsc::channel::<Query>(10);
let rx = self.initiate_query_protocol(out_rx).await?;
Ok(PluginStream {
schemas,
default_policy_expr,
ctx: self,
tx,
rx,
})
}
}
impl Drop for PluginContext {
fn drop(&mut self) {
Expand All @@ -147,3 +202,18 @@ impl Drop for PluginContext {
}
}
}

// Encapsulate an "initialized" state of a Plugin with interfaces that abstract
// query chunking to produce whole messages for the Hipcheck engine
pub struct PluginStream {
pub schemas: HashMap<String, Schema>,
pub default_policy_expr: String, // TODO - update with policy_expr type
ctx: PluginContext,
tx: tokio::sync::mpsc::Sender<Query>,
rx: Streaming<Query>,
}
impl PluginStream {
pub fn name(&self) -> &str {
&self.ctx.plugin.name
}
}

0 comments on commit 7c6305c

Please sign in to comment.