Skip to content

Commit

Permalink
Rate limit requirements.yml endpoint
Browse files Browse the repository at this point in the history
Signed-off-by: Fabricio Aguiar <[email protected]>
  • Loading branch information
fao89 committed Sep 17, 2023
1 parent 1f59461 commit 694198e
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 77 deletions.
114 changes: 38 additions & 76 deletions src/sync/collections.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{download_tar, get_json, get_with_retry};
use super::{build_service, get_json, request};
use crate::models::{self, CollectionNew, CollectionVersionNew};
use crate::schema::collection_versions;
use actix_web::web;
Expand All @@ -14,13 +14,10 @@ use log::info;
use reqwest::{Client, Request};
use serde_json::Value;
use std::collections::{HashMap, HashSet};
use std::time::Duration;
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use tower::buffer::Buffer;
use tower::limit::{ConcurrencyLimit, RateLimit};
use tower::{Service, ServiceExt};
use url::Url;

#[derive(Debug, Clone)]
pub struct CollectionData {
Expand All @@ -31,30 +28,16 @@ pub struct CollectionData {
pub version: String,
pub metadata: Value,
}

pub async fn get_version(
url: String,
client: Client,
mut service: Buffer<ConcurrencyLimit<RateLimit<Client>>, Request>,
service: Buffer<ConcurrencyLimit<RateLimit<Client>>, Request>,
) -> Result<Value> {
let http_request = client.get(url).build().unwrap();
let mut is_ready = service.ready().await.is_ok();
while !is_ready {
is_ready = service.ready().await.is_ok();
}

let resp = service.call(http_request).await.unwrap();
let (service, resp) = request(url, &client, service).await;
let status = resp.status().as_str().to_string();
let json_response = resp.json::<Value>().await.unwrap();
if status != "404" {
let http_request = client
.get(json_response["download_url"].as_str().unwrap())
.build()
.unwrap();
let mut is_ready = service.ready().await.is_ok();
while !is_ready {
is_ready = service.ready().await.is_ok();
}

let version_path = format!(
"content/collections/{}/{}/versions/{}/",
json_response["namespace"]["name"].as_str().unwrap(),
Expand All @@ -67,7 +50,12 @@ pub async fn get_version(

let filename = json_response["artifact"]["filename"].as_str().unwrap();
info!("Downloading {}", filename);
let resp = service.call(http_request).await.unwrap();
let (_, resp) = request(
json_response["download_url"].as_str().unwrap().to_string(),
&client,
service,
)
.await;
let mut file = match File::create(format!("{version_path}{filename}").as_str()).await {
Err(why) => panic!("couldn't create {}", why),
Ok(file) => file,
Expand All @@ -91,11 +79,7 @@ pub async fn sync_collections(
.as_u64()
.unwrap();
let client = reqwest::Client::new();
let service = tower::ServiceBuilder::new()
.buffer(5)
.concurrency_limit(5)
.rate_limit(5, Duration::from_secs(1))
.service(client.clone());
let service = build_service(client.clone());
let galaxy_url = dotenv::var("GALAXY_URL").unwrap_or("https://galaxy.ansible.com/".to_string());
let mut fut: Vec<_> = Vec::with_capacity(100);
for n in 1..total + 1 {
Expand Down Expand Up @@ -189,19 +173,42 @@ pub async fn fetch_collection(data: &Value) -> Result<Vec<CollectionData>> {
async fn fetch_versions(url: &Value) -> Result<Vec<CollectionData>> {
let mut versions: Vec<CollectionData> = Vec::new();
let mut versions_url = format!("{}?page_size=100", url.as_str().unwrap());
let client = reqwest::Client::new();
let mut service = build_service(client.clone());
loop {
let json_response = get_json(versions_url.as_str()).await?;
let (svc, resp) = request(versions_url, &client, service).await;
service = svc;
let json_response = resp.json::<Value>().await.unwrap();
let results = json_response.as_object().unwrap()["results"]
.as_array()
.unwrap();

// Downloading
let collection_version_futures: Vec<_> =
results.iter().map(fetch_collection_version).collect();
let collection_version_futures: Vec<_> = results
.iter()
.map(|v| {
get_version(
v["href"].as_str().unwrap().to_string(),
client.clone(),
service.clone(),
)
})
.collect();
let cversions = try_join_all(collection_version_futures)
.await
.context("Failed to join collection versions futures")?;
versions.extend_from_slice(&cversions);
let cdata: Vec<CollectionData> = cversions
.iter()
.map(|v| CollectionData {
namespace: v["namespace"]["name"].as_str().unwrap().to_string(),
name: v["collection"]["name"].as_str().unwrap().to_string(),
download_url: v["download_url"].as_str().unwrap().to_string(),
artifact: v["artifact"].clone(),
version: v["version"].as_str().unwrap().to_string(),
metadata: v["metadata"].clone(),
})
.collect();
versions.extend_from_slice(&cdata);

if json_response.as_object().unwrap()["next"]
.as_str()
Expand All @@ -217,47 +224,6 @@ async fn fetch_versions(url: &Value) -> Result<Vec<CollectionData>> {
Ok(versions)
}

async fn fetch_collection_version(data: &Value) -> Result<CollectionData> {
let json_response = get_json(data["href"].as_str().unwrap()).await?;
let data = CollectionData {
namespace: json_response["namespace"]["name"]
.as_str()
.unwrap()
.to_string(),
name: json_response["collection"]["name"]
.as_str()
.unwrap()
.to_string(),
download_url: json_response["download_url"].as_str().unwrap().to_string(),
artifact: json_response["artifact"].clone(),
version: json_response["version"].as_str().unwrap().to_string(),
metadata: json_response["metadata"].clone(),
};

Ok(data)
}

pub async fn download_version(data: &CollectionData) -> Result<()> {
let version_path = format!(
"content/collections/{}/{}/versions/{}/",
data.namespace.as_str(),
data.name.as_str(),
data.version.as_str(),
);
tokio::fs::create_dir_all(&version_path)
.await
.with_context(|| format!("Failed to create dir {version_path}"))?;
let download_url = Url::parse(data.download_url.as_str())
.with_context(|| format!("Failed to parse URL {}", data.download_url))?;
let response = get_with_retry(download_url.as_str()).await?;
let filename = download_url.path_segments().unwrap().last().unwrap();
info!("Downloading {filename}");
download_tar(format!("{version_path}{filename}").as_str(), response)
.await
.with_context(|| format!("Failed to download {download_url}"))?;
Ok(())
}

pub async fn process_collection_data(
pool: web::Data<Pool<ConnectionManager<PgConnection>>>,
data: Vec<Vec<CollectionData>>,
Expand Down Expand Up @@ -291,10 +257,6 @@ pub async fn process_collection_data(
mmap.insert(format!("{}.{}", v.1.as_str(), v.2.as_str()), v.0);
}

let data_futures: Vec<_> = versions.iter().map(download_version).collect();
try_join_all(data_futures)
.await
.context("Failed to join collection data futures")?;
let mut to_save: Vec<models::CollectionVersionNew> = Vec::new();
for vs in versions.iter() {
to_save.push(models::CollectionVersionNew {
Expand Down
2 changes: 1 addition & 1 deletion src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ pub use collections::{fetch_collection, process_collection_data, sync_collection
pub use common::{import_task, mirror_content, process_requirements};
pub use decode::a2b_base64;
pub use roles::sync_roles;
pub use utils::{download_tar, get_json, get_with_retry};
pub use utils::{build_service, download_tar, get_json, get_with_retry, request};
43 changes: 43 additions & 0 deletions src/sync/utils.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
use anyhow::{Context, Result};
use log::warn;
use reqwest::{Client, Request, Response};
use serde_json::Value;
use std::time::Duration;
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use tokio::time;
use tower::buffer::Buffer;
use tower::limit::{ConcurrencyLimit, RateLimit};
use tower::{Service, ServiceExt};

pub async fn download_tar(filename: &str, response: reqwest::Response) -> Result<()> {
let mut file = match File::create(filename).await {
Expand Down Expand Up @@ -57,3 +61,42 @@ pub async fn get_json(url: &str) -> Result<Value> {
.context(format!("Failed to parse JSON from {url}"))?;
Ok(values)
}

pub fn build_service(client: Client) -> Buffer<ConcurrencyLimit<RateLimit<Client>>, Request> {
let buffer = dotenv::var("GROOT_BUFFER")
.unwrap_or("5".to_string())
.as_str()
.parse::<usize>()
.unwrap();
let limit = dotenv::var("GROOT_CONCURRENCY_LIMIT")
.unwrap_or("5".to_string())
.as_str()
.parse::<usize>()
.unwrap();
let total_req = dotenv::var("GROOT_TOTAL_REQUESTS_PER_SECOND")
.unwrap_or("5".to_string())
.parse::<u64>()
.unwrap();
tower::ServiceBuilder::new()
.buffer(buffer)
.concurrency_limit(limit)
.rate_limit(total_req, Duration::from_secs(1))
.service(client.clone())
}

pub async fn request(
url: String,
client: &Client,
mut service: Buffer<ConcurrencyLimit<RateLimit<Client>>, Request>,
) -> (
Buffer<ConcurrencyLimit<RateLimit<Client>>, Request>,
Response,
) {
let http_request = client.get(url).build().unwrap();
let mut is_ready = service.ready().await.is_ok();
while !is_ready {
is_ready = service.ready().await.is_ok();
}
let response = service.call(http_request).await.unwrap();
(service, response)
}

0 comments on commit 694198e

Please sign in to comment.