Skip to content
This repository has been archived by the owner on Mar 25, 2023. It is now read-only.

fpm-controller-api #215

Merged
merged 14 commits into from
Jun 15, 2022
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ repository = "https://github.com/FifthTry/fpm"
homepage = "https://fpm.dev"
build = "build.rs"

[features]
controller = []

[dependencies]
async-recursion = "0.3.2"
camino = "1.0.5"
Expand Down
13 changes: 13 additions & 0 deletions src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,19 @@ async fn serve_static(req: actix_web::HttpRequest) -> actix_web::HttpResponse {

#[actix_web::main]
pub async fn serve(bind_address: &str, port: &str) -> std::io::Result<()> {
if cfg!(feature = "controller") {
// fpm-controller base path and ec2 instance id (hardcoded for now)
let fpm_controller: String = std::env::var("FPM_CONTROLLER")
.unwrap_or_else(|_| "https://controller.fifthtry.com".to_string());
let fpm_instance: String =
std::env::var("FPM_INSTANCE_ID").expect("FPM_INSTANCE_ID is required");

match crate::controller::resolve_dependencies(fpm_instance, fpm_controller).await {
Ok(_) => println!("Dependencies resolved"),
Err(e) => panic!("Error resolving dependencies using controller!!: {:?}", e),
}
}

println!("### Server Started ###");
println!("Go to: http://{}:{}", bind_address, port);
actix_web::HttpServer::new(|| {
Expand Down
5 changes: 5 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,11 @@ impl Package {
}
}

pub fn with_zip(mut self, zip: String) -> fpm::Package {
self.zip = Some(zip);
self
}

pub fn get_dependency_for_interface(&self, interface: &str) -> Option<&fpm::Dependency> {
self.dependencies
.iter()
Expand Down
155 changes: 155 additions & 0 deletions src/controller.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/// FPM Controller Support
/// FPM cli supports communication with fpm controller. This is an optional feature, and is only
/// available when controller feature is enabled, which is not enabled by default.
/// Controller Communication
/// When controller feature is enabled, fpm serve will first communicate with the FPM controller
/// service’s /get-package/ API.

/// FPM Controller Service Endpoint
/// The FPM Controller Service’s endpoint is computed by using environment variable FPM_CONTROLLER,
/// which will look something like this: https://controller.fifthtry.com, with the API path.
/// FPM Controller Service has more than one APIs: /get-package/ and /fpm-ready/.

/// get-package:
/// Through an environment variable FPM_INSTANCE_ID, the fpm serve will learn it’s instance id, and
/// it will pass the instance id to the get-package API.
/// The API returns the URL of the package to be downloaded, git repository URL and the package name.
/// FPM will clone the git repository in the current directory. The current directory will contain
/// FPM.ftd and other files of the package.
/// FPM will then calls fpm install on it.

/// fpm-ready:
/// Once dependencies are ready fpm calls /fpm-ready/ API on the controller. We will pass the
/// FPM_INSTANCE_ID and the git commit hash as input to the API
/// The API will return with success, and once it is done fpm will start receiving HTTP traffic
/// from the controller service.

#[derive(serde::Deserialize, Debug)]
struct ApiResponse<T> {
success: bool,
result: Option<T>,
message: Option<String>,
}

#[derive(serde::Deserialize, Debug)]
struct PackageResult {
package: String,
git: String,
}

pub async fn resolve_dependencies(fpm_instance: String, fpm_controller: String) -> fpm::Result<()> {
// First call get_package API to get package details and resolve dependencies

// response from get-package API
let package_response = get_package(fpm_instance.as_str(), fpm_controller.as_str()).await?;

// Clone the git package into the current directory
// Need to execute shell commands from rust
// git_url https format: https://github.com/<user>/<repo>.git

let package =
fpm::Package::new(package_response.package.as_str()).with_zip(package_response.git);

package.unzip_package().await?;
fpm::Config::read(None).await?;

/*let out = std::process::Command::new("git")
.arg("clone")
.arg(git_url)
.output()
.expect("unable to execute git clone command");

if out.status.success() {
// By this time the cloned repo should be available in the current directory
println!("Git cloning successful for the package {}", package_name);
// Resolve dependencies by reading the FPM.ftd using config.read()
// Assuming package_name and repo name are identical
let _config = fpm::Config::read(Some(package_name.to_string())).await?;
}*/

// Once the dependencies are resolved for the package
// then call fpm_ready API to ensure that the controller service is now ready

// response from fpm_ready API

fpm_ready(fpm_instance.as_str(), fpm_controller.as_str()).await?;

Ok(())
}

/// get-package API
/// input: fpm_instance
/// output: package_name and git repo URL
/// format: {
/// "success": true,
/// "result": {
/// "package": "<package name>"
/// "git": "<git url>"
/// }
/// }
async fn get_package(fpm_instance: &str, fpm_controller: &str) -> fpm::Result<PackageResult> {
let controller_api = format!(
"{}/v1/fpm/get-package?ec2_reservation={}",
fpm_controller, fpm_instance
);

let url = url::Url::parse(controller_api.as_str())?;

let mut headers = reqwest::header::HeaderMap::new();
headers.insert(
reqwest::header::USER_AGENT,
reqwest::header::HeaderValue::from_static("fpm"),
);

let resp: ApiResponse<PackageResult> = fpm::library::http::get_with_type(url, headers).await?;

if !resp.success {
return Err(fpm::Error::APIResponseError(format!(
"get_package api error: {:?}",
resp.message
)));
}

resp.result.ok_or({
fpm::Error::APIResponseError(format!("get_package api error: {:?}", &resp.message))
})
}

/// fpm-ready API
/// input: fpm_instance, *(git commit hash)
/// output: success: true/false
/// format: lang: json
/// {
/// "success": true
/// }

/// Git commit hash needs to be computed before making a call to the fpm_ready API
async fn fpm_ready(fpm_instance: &str, fpm_controller: &str) -> fpm::Result<()> {
let git_commit = "<dummy-git-commit-hash-xxx123>";

let controller_api = format!(
"{}/v1/fpm/fpm-ready?ec2_reservation={}&hash={}",
fpm_controller, fpm_instance, git_commit
);

let url = url::Url::parse(controller_api.as_str())?;

// This request should be put request for fpm_ready API to update the instance status to ready
// Using http::_get() function to make request to this API for now
let mut headers = reqwest::header::HeaderMap::new();
headers.insert(
reqwest::header::USER_AGENT,
reqwest::header::HeaderValue::from_static("fpm"),
);

// TODO: here Map is wrong,
let resp: ApiResponse<std::collections::HashMap<String, String>> =
fpm::library::http::get_with_type(url, headers).await?;
if !resp.success {
return Err(fpm::Error::APIResponseError(format!(
"fpm_ready api error: {:?}",
resp.message
)));
}
Ok(())
}
68 changes: 68 additions & 0 deletions src/dependency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,74 @@ impl fpm::Package {
}
}

pub(crate) async fn unzip_package(&self) -> fpm::Result<()> {
use std::convert::TryInto;
use std::io::Write;

let download_url = if let Some(ref url) = self.zip {
url
} else {
return Ok(());
};

let path = std::env::temp_dir().join(format!("{}.zip", self.name.replace("/", "__")));

let start = std::time::Instant::now();
print!("Downloading {} ... ", self.name.as_str());
std::io::stdout().flush()?;
// Download the zip folder
{
let mut response = if download_url[1..].contains("://")
|| download_url.starts_with("//")
{
reqwest::get(download_url.as_str())?
} else if let Ok(response) = reqwest::get(format!("https://{}", download_url).as_str())
{
response
} else {
reqwest::get(format!("http://{}", download_url).as_str())?
};
let mut file = std::fs::File::create(&path)?;
// TODO: instead of reading the whole thing in memory use tokio::io::copy() somehow?
let mut buf: Vec<u8> = vec![];
response.copy_to(&mut buf)?;
file.write_all(&buf)?;
// file.write_all(response.text().await?.as_bytes())?;
}

let file = std::fs::File::open(&path)?;
// TODO: switch to async_zip crate
let mut archive = zip::ZipArchive::new(file)?;
for i in 0..archive.len() {
let mut c_file = archive.by_index(i).unwrap();
let out_path = match c_file.enclosed_name() {
Some(path) => path.to_owned(),
None => continue,
};
let out_path_without_folder = out_path.to_str().unwrap().split_once("/").unwrap().1;
let file_extract_path = {
let mut file_extract_path: camino::Utf8PathBuf =
std::env::current_dir()?.canonicalize()?.try_into()?;
file_extract_path = file_extract_path.join(out_path_without_folder);
file_extract_path
};
if (&*c_file.name()).ends_with('/') {
std::fs::create_dir_all(&file_extract_path)?;
} else {
if let Some(p) = file_extract_path.parent() {
if !p.exists() {
std::fs::create_dir_all(p)?;
}
}
// Note: we will be able to use tokio::io::copy() with async_zip
let mut outfile = std::fs::File::create(file_extract_path)?;
std::io::copy(&mut c_file, &mut outfile)?;
}
}
fpm::utils::print_end(format!("Downloaded {}", self.name.as_str()).as_str(), start);
Ok(())
}

/// This function is called by `process()` or recursively called by itself.
/// It checks the `FPM.ftd` file of dependent package and find out all the dependency packages.
/// If dependent package is not available, it calls `process()` to download it inside `.packages` directory
Expand Down
7 changes: 7 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub(crate) mod utils;
mod auto_import;
mod commands;
mod config;
mod controller;
mod dependency;
mod doc;
mod file;
Expand Down Expand Up @@ -390,6 +391,9 @@ pub enum Error {
#[error("HttpError: {}", _0)]
HttpError(#[from] reqwest::Error),

#[error("APIResponseError: {}", _0)]
APIResponseError(String),

#[error("IoError: {}", _0)]
IoError(#[from] std::io::Error),

Expand All @@ -416,6 +420,9 @@ pub enum Error {

#[error("SitemapParseError: {}", _0)]
SitemapParseError(#[from] fpm::sitemap::ParseError),

#[error("URLParseError: {}", _0)]
UrlParseError(#[from] url::ParseError),
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down
22 changes: 21 additions & 1 deletion src/library/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub async fn processor<'a>(
doc.from_json(&json, section)
}

async fn get(
pub(crate) async fn get(
url: url::Url,
doc_id: &str,
line_number: usize,
Expand Down Expand Up @@ -89,3 +89,23 @@ async fn _get(url: url::Url) -> reqwest::Result<String> {
.build()?;
c.get(url.to_string().as_str()).send()?.text()
}

pub async fn get_with_type<T: serde::de::DeserializeOwned>(
url: url::Url,
headers: reqwest::header::HeaderMap,
) -> fpm::Result<T> {
let c = reqwest::Client::builder()
.default_headers(headers)
.build()?;

let mut resp = c.get(url.to_string().as_str()).send()?;
if !resp.status().eq(&reqwest::StatusCode::OK) {
return Err(fpm::Error::APIResponseError(format!(
"url: {}, response_status: {}, response: {:?}",
url,
resp.status(),
resp.text()
)));
}
Ok(resp.json()?)
}
2 changes: 1 addition & 1 deletion src/library/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
mod fpm_dot_ftd;
mod get_data;
mod get_version_data;
mod http;
pub(crate) mod http;
mod include;
mod sitemap;
mod sqlite;
Expand Down
26 changes: 14 additions & 12 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,20 @@ async fn main() -> fpm::Result<()> {
return Ok(());
}

// Serve block moved up
if let Some(mark) = matches.subcommand_matches("serve") {
let port = mark
.value_of("port")
.unwrap_or_else(|| mark.value_of("positional_port").unwrap_or("8000"))
.to_string();
let bind = mark.value_of("bind").unwrap_or("127.0.0.1").to_string();
tokio::task::spawn_blocking(move || {
fpm::serve(bind.as_str(), port.as_str()).expect("http service error");
})
.await
.expect("Thread spawn error");
}

let mut config = fpm::Config::read(None).await?;

if matches.subcommand_matches("update").is_some() {
Expand Down Expand Up @@ -72,18 +86,6 @@ async fn main() -> fpm::Result<()> {
let target = mark.value_of("target");
fpm::stop_tracking(&config, source, target).await?;
}
if let Some(mark) = matches.subcommand_matches("serve") {
let port = mark
.value_of("port")
.unwrap_or_else(|| mark.value_of("positional_port").unwrap_or("8000"))
.to_string();
let bind = mark.value_of("bind").unwrap_or("127.0.0.1").to_string();
tokio::task::spawn_blocking(move || {
fpm::serve(bind.as_str(), port.as_str()).expect("http service error");
})
.await
.expect("Thread spawn error");
}
Ok(())
}

Expand Down