Skip to content

Commit

Permalink
feat: Implement WAL plugin test API
Browse files Browse the repository at this point in the history
This implements the WAL plugin test API. It also introduces a new API for the Python plugins to be called, get their data, and call back into the database server.

There are some things that I'll want to address in follow on work:
* CLI tests, but will wait on #25737 to land for a refactor of the CLI here
* Would be better to hook the Python logging to call back into the plugin return state like here: https://pyo3.rs/v0.23.3/ecosystem/logging.html#the-python-to-rust-direction
* We should only load the LineBuilder interface once in a module, rather than on every execution of a WAL plugin
* More tests all around

But I want to get this in so that the actual plugin and trigger system can get udated to build around this model.
  • Loading branch information
pauldix committed Jan 5, 2025
1 parent d78756f commit 068d0d8
Show file tree
Hide file tree
Showing 18 changed files with 913 additions and 5 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion influxdb3/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ tokio_console = ["console-subscriber", "tokio/tracing", "observability_deps/rele

# Use jemalloc as the default allocator.
jemalloc_replacing_malloc = ["influxdb3_process/jemalloc_replacing_malloc"]
system-py = ["influxdb3_write/system-py"]
system-py = ["influxdb3_write/system-py", "influxdb3_server/system-py"]

[dev-dependencies]
# Core Crates
Expand Down
27 changes: 26 additions & 1 deletion influxdb3/src/commands/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,32 @@ pub struct InfluxDb3Config {
pub auth_token: Option<Secret<String>>,
}

/// A clap argument privided as a list of items separated by `SEPARATOR`, which by default is a ','
// A clap argument provided as a key/value pair separated by `SEPARATOR`, which by default is a '='
#[derive(Debug, Clone)]
pub struct SeparatedKeyValue<K, V, const SEPARATOR: char = '='>(pub (K, V));

impl<K, V, const SEPARATOR: char> FromStr for SeparatedKeyValue<K, V, SEPARATOR>
where
K: FromStr<Err: Into<anyhow::Error>>,
V: FromStr<Err: Into<anyhow::Error>>,
{
type Err = anyhow::Error;

fn from_str(s: &str) -> Result<Self, Self::Err> {
let mut parts = s.split(SEPARATOR);
let key = parts.next().ok_or_else(|| anyhow::anyhow!("missing key"))?;
let value = parts
.next()
.ok_or_else(|| anyhow::anyhow!("missing value"))?;

Ok(Self((
key.parse().map_err(Into::into)?,
value.parse().map_err(Into::into)?,
)))
}
}

/// A clap argument provided as a list of items separated by `SEPARATOR`, which by default is a ','
#[derive(Debug, Clone)]
pub struct SeparatedList<T, const SEPARATOR: char = ','>(pub Vec<T>);

Expand Down
21 changes: 21 additions & 0 deletions influxdb3/src/commands/plugin_test/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use std::error::Error;

pub mod wal;

#[derive(Debug, clap::Parser)]
pub(crate) struct Config {
#[clap(subcommand)]
command: Command,
}

#[derive(Debug, clap::Parser)]
enum Command {
/// Test a plugin triggered by WAL writes
Wal(wal::Config),
}

pub(crate) async fn command(config: Config) -> Result<(), Box<dyn Error>> {
match config.command {
Command::Wal(config) => wal::command(config).await,
}
}
79 changes: 79 additions & 0 deletions influxdb3/src/commands/plugin_test/wal.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use crate::commands::common::{InfluxDb3Config, SeparatedKeyValue, SeparatedList};
use influxdb3_client::plugin_development::WalPluginTestRequest;
use secrecy::ExposeSecret;
use std::collections::HashMap;
use std::error::Error;

#[derive(Debug, clap::Parser)]
pub struct Config {
#[clap(flatten)]
influxdb3_config: InfluxDb3Config,

#[clap(flatten)]
wal_plugin_test: WalPluginTest,
}

#[derive(Debug, clap::Parser)]
pub struct WalPluginTest {
/// The name of the plugin, which should match its file name on the server `<plugin-dir>/<name>.py`
#[clap(short = 'n', long = "name")]
pub name: String,
/// If given, pass this line protocol as input
#[clap(long = "lp")]
pub input_lp: Option<String>,
/// If given, pass this file of LP as input from on the server `<plugin-dir>/<name>_test/<input-file>`
#[clap(long = "file")]
pub input_file: Option<String>,
/// If given, save the output to this file on the server in `<plugin-dir>/<name>_test/<save-output-to-file>`
#[clap(long = "save-output-to-file")]
pub save_output_to_file: Option<String>,
/// If given, validate the output against this file on the server in `<plugin-dir>/<name>_test/<validate-output-file>`
#[clap(long = "validate-output-file")]
pub validate_output_file: Option<String>,
/// If given pass this map of string key/value pairs as input arguments
#[clap(long = "input-arguments")]
pub input_arguments: Option<SeparatedList<SeparatedKeyValue<String, String>>>,
}

impl From<WalPluginTest> for WalPluginTestRequest {
fn from(val: WalPluginTest) -> Self {
let input_arguments = val.input_arguments.map(|a| {
a.into_iter()
.map(|SeparatedKeyValue((k, v))| (k, v))
.collect::<HashMap<String, String>>()
});

Self {
name: val.name,
input_lp: val.input_lp,
input_file: val.input_file,
save_output_to_file: val.save_output_to_file,
validate_output_file: val.validate_output_file,
input_arguments,
}
}
}

pub(super) async fn command(config: Config) -> Result<(), Box<dyn Error>> {
let InfluxDb3Config {
host_url,
auth_token,
..
} = config.influxdb3_config;

let wal_plugin_test_request: WalPluginTestRequest = config.wal_plugin_test.into();

let mut client = influxdb3_client::Client::new(host_url)?;
if let Some(t) = auth_token {
client = client.with_auth_token(t.expose_secret());
}
let response = client.wal_plugin_test(wal_plugin_test_request).await?;

let res = serde_json::to_string_pretty(&response)
.expect("serialize wal plugin test response as JSON");

// pretty print the response
println!("{}", res);

Ok(())
}
10 changes: 9 additions & 1 deletion influxdb3/src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ use observability_deps::tracing::*;
use panic_logging::SendPanicsToTracing;
use parquet_file::storage::{ParquetStorage, StorageId};
use std::{num::NonZeroUsize, sync::Arc};
use std::{path::Path, str::FromStr};
use std::{
path::{Path, PathBuf},
str::FromStr,
};
use thiserror::Error;
use tokio::net::TcpListener;
use tokio::time::Instant;
Expand Down Expand Up @@ -287,6 +290,10 @@ pub struct Config {
action
)]
pub meta_cache_eviction_interval: humantime::Duration,

/// The local directory that has python plugins and their test files.
#[clap(long = "plugin-dir", env = "INFLUXDB3_PLUGIN_DIR", action)]
pub plugin_dir: Option<PathBuf>,
}

/// Specified size of the Parquet cache in megabytes (MB)
Expand Down Expand Up @@ -481,6 +488,7 @@ pub async fn command(config: Config) -> Result<()> {
wal_config,
parquet_cache,
metric_registry: Arc::clone(&metrics),
plugin_dir: config.plugin_dir,
})
.await
.map_err(|e| Error::WriteBufferInit(e.into()))?;
Expand Down
10 changes: 10 additions & 0 deletions influxdb3/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ mod commands {
pub mod last_cache;
pub mod manage;
pub mod meta_cache;
pub mod plugin_test;
pub mod processing_engine;
pub mod query;
pub mod serve;
Expand Down Expand Up @@ -105,6 +106,9 @@ enum Command {

/// Manage table (delete only for the moment)
Table(commands::manage::table::Config),

/// Test Python plugins for processing WAL writes, persistence Snapshots, requests, or scheduled tasks.
PluginTest(commands::plugin_test::Config),
}

fn main() -> Result<(), std::io::Error> {
Expand Down Expand Up @@ -187,6 +191,12 @@ fn main() -> Result<(), std::io::Error> {
std::process::exit(ReturnCode::Failure as _)
}
}
Some(Command::PluginTest(config)) => {
if let Err(e) = commands::plugin_test::command(config).await {
eprintln!("Plugin Test command failed: {e}");
std::process::exit(ReturnCode::Failure as _)
}
}
}
});

Expand Down
1 change: 1 addition & 0 deletions influxdb3_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ bytes.workspace = true
reqwest.workspace = true
secrecy.workspace = true
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true
url.workspace = true

Expand Down
32 changes: 32 additions & 0 deletions influxdb3_client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
pub mod plugin_development;

use std::{
collections::HashMap, fmt::Display, num::NonZeroUsize, string::FromUtf8Error, time::Duration,
};

use crate::plugin_development::{WalPluginTestRequest, WalPluginTestResponse};
use bytes::Bytes;
use iox_query_params::StatementParam;
use reqwest::{Body, IntoUrl, Method, StatusCode};
Expand Down Expand Up @@ -697,6 +700,35 @@ impl Client {
}
}

/// Make a request to the `POST /api/v3/plugin_test/wal` API
pub async fn wal_plugin_test(
&self,
wal_plugin_test_request: WalPluginTestRequest,
) -> Result<WalPluginTestResponse> {
let api_path = "/api/v3/plugin_test/wal";

let url = self.base_url.join(api_path)?;

let mut req = self.http_client.post(url).json(&wal_plugin_test_request);

if let Some(token) = &self.auth_token {
req = req.bearer_auth(token.expose_secret());
}
let resp = req
.send()
.await
.map_err(|src| Error::request_send(Method::POST, api_path, src))?;

if resp.status().is_success() {
resp.json().await.map_err(Error::Json)
} else {
Err(Error::ApiError {
code: resp.status(),
message: resp.text().await.map_err(Error::Text)?,
})
}
}

/// Send a `/ping` request to the target `influxdb3` server to check its
/// status and gather `version` and `revision` information
pub async fn ping(&self) -> Result<PingResponse> {
Expand Down
23 changes: 23 additions & 0 deletions influxdb3_client/src/plugin_development.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
//! Request structs for the /api/v3/plugin_test API
use serde::{Deserialize, Serialize};
use std::collections::HashMap;

/// Request definition for `POST /api/v3/plugin_test/wal` API
#[derive(Debug, Serialize, Deserialize)]
pub struct WalPluginTestRequest {
pub name: String,
pub input_lp: Option<String>,
pub input_file: Option<String>,
pub save_output_to_file: Option<String>,
pub validate_output_file: Option<String>,
pub input_arguments: Option<HashMap<String, String>>,
}

/// Response definition for `POST /api/v3/plugin_test/wal` API
#[derive(Debug, Serialize, Deserialize)]
pub struct WalPluginTestResponse {
pub log_lines: Vec<String>,
pub database_writes: HashMap<String, Vec<String>>,
pub errors: Vec<String>,
}
Loading

0 comments on commit 068d0d8

Please sign in to comment.