Skip to content

Commit

Permalink
feat: inital handling of plugin startup and context management
Browse files Browse the repository at this point in the history
  • Loading branch information
j-lanson committed Aug 12, 2024
1 parent 22b7fca commit e1c7be4
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 0 deletions.
2 changes: 2 additions & 0 deletions hipcheck/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ mod git2_log_shim;
mod git2_rustls_transport;
mod log_bridge;
mod metric;
#[allow(unused)]
mod plugin;
mod report;
mod session;
mod setup;
Expand Down
74 changes: 74 additions & 0 deletions hipcheck/src/plugin/manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use crate::plugin::{Plugin, PluginContext, HcPluginClient};
use crate::{hc_error, Result};
use std::process::Command;
use crate::hipcheck::plugin_client::PluginClient;

const MAX_SPAWN_ATTEMPTS: usize = 5;
const MAX_CONN_ATTEMPTS: usize = 4;

pub struct PluginExecutor {}
impl PluginExecutor {
pub fn new() -> Self {
PluginExecutor {}
}
fn get_available_port(&mut self) -> Result<u16> {
for i in 40000..u16::MAX {
if let Ok(bind) = std::net::TcpListener::bind(format!("127.0.0.1:{i}")) {
drop(bind);
return Ok(i);
}
}
Err(hc_error!("Failed to find available port"))
}
pub async fn start_plugin(&mut self, plugin: &Plugin) -> Result<PluginContext> {
// Plugin startup design has inherent TOCTOU flaws since we tell the plugin
// which port we expect it to bind to. We can try to ensure the port we pass
// on the cmdline is not already in use, but it is still possible for that
// port to become unavailable between our check and the plugin's bind attempt.
// Hence the need for subsequent attempts if we get unlucky
let mut spawn_attempts: usize = 0;
while spawn_attempts < MAX_SPAWN_ATTEMPTS {
// Find free port for process. Don't use spawn_attempts, since
let port = self.get_available_port()?;
let port_str = port.to_string();
// Spawn plugin process
let Ok(mut proc) = Command::new(&plugin.entrypoint)
.args(["--port", port_str.as_str()])
.spawn()
else {
spawn_attempts += 1;
continue;
};
// Potential timing race condition if we try to connect before the plugin
// has had a chance to bind to its port. We try up to MAX_CONN_ATTEMPTS
// times
let mut conn_attempts = 0;
let mut opt_grpc: Option<HcPluginClient> = None;
while conn_attempts < MAX_CONN_ATTEMPTS {
if let Ok(grpc) = PluginClient::connect(format!("http://127.0.0.1:{port_str}")).await {
opt_grpc = Some(grpc);
break;
} else {
conn_attempts += 1;
}
}
// If opt_grpc is None, we did not manage to connect to the plugin. Kill it
// and try again
let Some(grpc) = opt_grpc else {
if let Err(e) = proc.kill() {
println!("Failed to kill child process for plugin: {e}");
}
spawn_attempts += 1;
continue;
};
// We now have an open gRPC connection to our plugin process
return Ok(PluginContext {
plugin: plugin.clone(),
port,
grpc,
proc,
});
}
Err(hc_error!("Reached max spawn attempts for plugin {}", plugin.name))
}
}
13 changes: 13 additions & 0 deletions hipcheck/src/plugin/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
mod manager;
mod types;

use crate::plugin::manager::*;
pub use crate::plugin::types::*;

pub fn dummy() {
let plugin = Plugin {
name: "dummy".to_owned(),
entrypoint: "./dummy".to_owned(),
};
let manager = PluginExecutor::new();
}
68 changes: 68 additions & 0 deletions hipcheck/src/plugin/types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use crate::hipcheck::plugin_client::PluginClient;
use crate::hipcheck::{Schema as PluginSchema, Empty};
use std::process::Child;
use tonic::transport::Channel;
use crate::{Result, hc_error};
use serde_json::Value;

pub type HcPluginClient = PluginClient<Channel>;

#[derive(Clone, Debug)]
pub struct Plugin {
pub name: String,
pub entrypoint: String,
}

pub struct Schema {
pub query_name: String,
pub key_schema: Value,
pub output_schema: Value,
}
impl TryFrom<PluginSchema> for Schema {
type Error = crate::error::Error;
fn try_from(value: PluginSchema) -> Result<Self> {
let key_schema: Value = serde_json::from_str(value.key_schema.as_str())?;
let output_schema: Value = serde_json::from_str(value.output_schema.as_str())?;
Ok(Schema {
query_name: value.query_name,
key_schema,
output_schema,
})
}
}

pub struct PluginContext {
pub plugin: Plugin,
pub port: u16,
pub grpc: HcPluginClient,
pub proc: Child,
}
impl PluginContext {
pub async fn get_query_schemas(&mut self) -> Result<Vec<Schema>> {
let mut res = self.grpc.get_query_schemas(Empty {}).await?;
let stream = res.get_mut();
let mut schemas = vec![];
loop {
let opt_msg = match stream.message().await {
Err(e) => { return Err(hc_error!("{}", e)) },
Ok(om) => om,
};
match opt_msg {
None => {
break;
},
Some(msg) => {
schemas.push(msg.try_into()?);
}
}
}
Ok(schemas)
}
}
impl Drop for PluginContext {
fn drop(&mut self) {
if let Err(e) = self.proc.kill() {
println!("Failed to kill child: {e}");
}
}
}

0 comments on commit e1c7be4

Please sign in to comment.