Skip to content

Commit

Permalink
Add artifact struct and artifacts field to task
Browse files Browse the repository at this point in the history
  • Loading branch information
yousefmoazzam committed Jan 27, 2025
1 parent 6d00ac0 commit ef580c8
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 5 deletions.
1 change: 1 addition & 0 deletions graph-proxy/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion graph-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ exclude = ["test-resources/"]
[dependencies]
anyhow = { version = "1.0.95" }
argo-workflows-openapi = { path = "./argo-workflows-openapi" }
async-graphql = { version = "7.0.13", features = ["chrono"] }
async-graphql = { version = "7.0.13", features = ["chrono", "url"] }
async-graphql-axum = { version = "7.0.13" }
axum = { version = "0.7.9" }
axum-extra = { version = "0.9.6", features = ["typed-header"] }
Expand Down
133 changes: 129 additions & 4 deletions graph-proxy/src/graphql/workflows.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use super::{VisitInput, CLIENT};
use crate::ArgoServerUrl;
use argo_workflows_openapi::{
APIResult, IoArgoprojWorkflowV1alpha1NodeStatus, IoArgoprojWorkflowV1alpha1Workflow,
APIResult, IoArgoprojWorkflowV1alpha1Artifact, IoArgoprojWorkflowV1alpha1NodeStatus,
IoArgoprojWorkflowV1alpha1Workflow,
};
use async_graphql::{
connection::{Connection, CursorType, Edge, EmptyFields, OpaqueCursor},
Expand Down Expand Up @@ -121,7 +122,7 @@ impl WorkflowRunningStatus {
.data_unchecked::<Option<Authorization<Bearer>>>()
.to_owned();
let nodes = fetch_missing_task_info(url, token, Arc::clone(&self.0)).await?;
Ok(TaskMap(nodes).into_tasks())
Ok(TaskMap(nodes).into_tasks(Arc::clone(&self.0)))
}
}

Expand Down Expand Up @@ -185,7 +186,7 @@ impl WorkflowCompleteStatus {
.data_unchecked::<Option<Authorization<Bearer>>>()
.to_owned();
let nodes = fetch_missing_task_info(url, token, Arc::clone(&self.0)).await?;
Ok(TaskMap(nodes).into_tasks())
Ok(TaskMap(nodes).into_tasks(Arc::clone(&self.0)))
}
}

Expand Down Expand Up @@ -218,10 +219,43 @@ impl TryFrom<String> for TaskStatus {
}
}

/// A task artifact
#[allow(clippy::missing_docs_in_private_items)]
#[derive(Debug)]
struct Artifact {
manifest: IoArgoprojWorkflowV1alpha1Artifact,
namespace: String,
workflow_name: String,
node_id: String,
}

#[Object]
impl Artifact {
/// The name of the artifact
async fn name(&self) -> &str {
self.manifest.name.as_str()
}

/// The download URL for the artifact
async fn url(&self, ctx: &Context<'_>) -> Url {
let server_url = ctx.data_unchecked::<ArgoServerUrl>().deref();
let mut url = server_url.clone();
url.path_segments_mut().unwrap().extend([
"artifacts",
&self.namespace,
&self.workflow_name,
&self.node_id,
&self.manifest.name,
]);
url
}
}

/// A Task created by a workflow
#[allow(clippy::missing_docs_in_private_items)]
#[derive(Debug, Clone)]
struct Task {
workflow: Arc<IoArgoprojWorkflowV1alpha1Workflow>,
node_status: IoArgoprojWorkflowV1alpha1NodeStatus,
depends: Vec<String>,
}
Expand Down Expand Up @@ -261,6 +295,23 @@ impl Task {
async fn dependencies(&self) -> Vec<String> {
self.node_status.children.clone()
}

/// Artifacts produced by a task
async fn artifacts(&self) -> Vec<Artifact> {
match self.node_status.outputs.as_ref() {
None => vec![],
Some(outputs) => outputs
.artifacts
.iter()
.map(|manifest| Artifact {
manifest: manifest.clone(),
namespace: self.workflow.metadata.namespace.clone().unwrap(),
workflow_name: self.workflow.metadata.name.clone().unwrap(),
node_id: self.node_status.id.clone(),
})
.collect::<Vec<_>>(),
}
}
}

async fn fetch_missing_task_info(
Expand Down Expand Up @@ -315,13 +366,14 @@ impl TaskMap {
}

/// Converts [`TaskMap`] into [`Vec<Task>`]`
fn into_tasks(self) -> Vec<Task> {
fn into_tasks(self, workflow: Arc<IoArgoprojWorkflowV1alpha1Workflow>) -> Vec<Task> {
let mut relationship_map = TaskMap::generate_relationship_map(&self);
self.0
.into_iter()
.map(|(node_name, node_status)| {
let depends = relationship_map.remove(&node_name).unwrap_or_default();
Task {
workflow: Arc::clone(&workflow),
node_status,
depends,
}
Expand Down Expand Up @@ -1002,4 +1054,77 @@ mod tests {
schema.execute(query).await.into_result().unwrap();
workflows_endpoint.assert_async().await;
}

#[tokio::test]
async fn get_artifacts_of_succeeded_workflow_query() {
let workflow_name = "numpy-benchmark-wdkwj";
let visit = Visit {
proposal_code: "mg".to_string(),
proposal_number: 36964,
number: 1,
};

let mut server = mockito::Server::new_async().await;
let mut response_file_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
response_file_path.push("test-resources");
response_file_path.push("get-workflow-wdkwj.json");
let workflow_endpoint = server
.mock(
"GET",
&format!("/api/v1/workflows/{}/{}", visit, workflow_name)[..],
)
.with_status(200)
.with_header("content-type", "application/json")
.with_body_from_file(response_file_path)
.create_async()
.await;

let argo_server_url = Url::parse(&server.url()).unwrap();
let schema = root_schema_builder()
.data(ArgoServerUrl(argo_server_url))
.data(None::<Authorization<Bearer>>)
.finish();
let query = format!(
r#"
query {{
workflow(name: "{}", visit: {{proposalCode: "{}", proposalNumber: {}, number: {}}}) {{
status {{
...on WorkflowSucceededStatus {{
tasks {{
artifacts {{
name
url
}}
}}
}}
}}
}}
}}
"#,
workflow_name, visit.proposal_code, visit.proposal_number, visit.number
);
let resp = schema.execute(query).await.into_result().unwrap();

workflow_endpoint.assert_async().await;
let expected_download_url = format!(
"{}/artifacts/{}/{}/{}/main-logs",
server.url(),
visit,
workflow_name,
workflow_name
);
let expected_data = json!({
"workflow": {
"status": {
"tasks": [{
"artifacts": [{
"name": "main-logs",
"url": expected_download_url
}]
}]
}
}
});
assert_eq!(resp.data.into_json().unwrap(), expected_data);
}
}

0 comments on commit ef580c8

Please sign in to comment.