Skip to content

Commit

Permalink
Add artifact struct and artifacts field to task
Browse files Browse the repository at this point in the history
Co-Authored-By: Garry O'Donnell <[email protected]>
  • Loading branch information
yousefmoazzam and Garry O'Donnell committed Jan 31, 2025
1 parent e4193ea commit e8dadeb
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 9 deletions.
1 change: 1 addition & 0 deletions graph-proxy/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion graph-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ exclude = ["test-resources/"]
[dependencies]
anyhow = { version = "1.0.95" }
argo-workflows-openapi = { path = "./argo-workflows-openapi" }
async-graphql = { version = "7.0.13", features = ["chrono"] }
async-graphql = { version = "7.0.13", features = ["chrono", "url"] }
async-graphql-axum = { version = "7.0.13" }
axum = { version = "0.7.9" }
axum-extra = { version = "0.9.6", features = ["typed-header"] }
Expand Down
156 changes: 148 additions & 8 deletions graph-proxy/src/graphql/workflows.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
use super::{Visit, VisitInput, CLIENT};
use crate::ArgoServerUrl;
use argo_workflows_openapi::{
APIResult, IoArgoprojWorkflowV1alpha1NodeStatus, IoArgoprojWorkflowV1alpha1Workflow,
IoArgoprojWorkflowV1alpha1WorkflowStatus,
APIResult, IoArgoprojWorkflowV1alpha1Artifact, IoArgoprojWorkflowV1alpha1NodeStatus,
IoArgoprojWorkflowV1alpha1Workflow, IoArgoprojWorkflowV1alpha1WorkflowStatus,
};
use async_graphql::{
connection::{Connection, CursorType, Edge, EmptyFields, OpaqueCursor},
Context, Enum, Object, SimpleObject, Union,
};
use axum_extra::headers::{authorization::Bearer, Authorization};
use chrono::{DateTime, Utc};
use std::{collections::HashMap, ops::Deref};
use std::{collections::HashMap, ops::Deref, path::Path};
use tracing::{debug, instrument};
use url::Url;

Expand All @@ -30,6 +30,12 @@ pub(super) enum WorkflowParsingError {
UnrecognisedTaskDisplayName,
#[error("status was expected but was not present")]
MissingWorkflowStatus,
#[error("artifact.s3 was expected but was not present")]
UnrecognisedArtifactStore,
#[error("artifact.s3.key was expected but was not present")]
MissingArtifactKey,
#[error("artifact file name is not valid UTF-8")]
InvalidArtifactFilename,
}

/// A Workflow consisting of one or more [`Task`]s
Expand Down Expand Up @@ -174,7 +180,7 @@ impl WorkflowRunningStatus<'_> {
.data_unchecked::<Option<Authorization<Bearer>>>()
.to_owned();
let nodes = fetch_missing_task_info(url, token, self.manifest, self.metadata).await?;
Ok(TaskMap(nodes).into_tasks())
Ok(TaskMap(nodes).into_tasks(self.metadata))
}
}

Expand Down Expand Up @@ -242,7 +248,7 @@ impl WorkflowCompleteStatus<'_> {
.data_unchecked::<Option<Authorization<Bearer>>>()
.to_owned();
let nodes = fetch_missing_task_info(url, token, self.manifest, self.metadata).await?;
Ok(TaskMap(nodes).into_tasks())
Ok(TaskMap(nodes).into_tasks(self.metadata))
}
}

Expand Down Expand Up @@ -275,16 +281,60 @@ impl TryFrom<String> for TaskStatus {
}
}

/// An output produced by a [`Task`] within a [`Workflow`]
#[allow(clippy::missing_docs_in_private_items)]
#[derive(Debug)]
struct Artifact<'a> {
manifest: &'a IoArgoprojWorkflowV1alpha1Artifact,
metadata: &'a Metadata,
node_id: &'a str,
}

#[Object]
impl Artifact<'_> {
/// The file name of the artifact
async fn name(&self) -> Result<&str, WorkflowParsingError> {
let path = Path::new(
self.manifest
.s3
.as_ref()
.ok_or(WorkflowParsingError::UnrecognisedArtifactStore)?
.key
.as_ref()
.ok_or(WorkflowParsingError::MissingArtifactKey)?,
);
path.file_name()
.unwrap_or_default()
.to_str()
.ok_or(WorkflowParsingError::InvalidArtifactFilename)
}

/// The download URL for the artifact
async fn url(&self, ctx: &Context<'_>) -> Url {
let server_url = ctx.data_unchecked::<ArgoServerUrl>().deref();
let mut url = server_url.clone();
url.path_segments_mut().unwrap().extend([
"artifacts",
&self.metadata.visit.to_string(),
&self.metadata.name,
self.node_id,
&self.manifest.name,
]);
url
}
}

/// A Task created by a workflow
#[allow(clippy::missing_docs_in_private_items)]
#[derive(Debug, Clone)]
struct Task {
struct Task<'a> {
node_status: IoArgoprojWorkflowV1alpha1NodeStatus,
depends: Vec<String>,
metadata: &'a Metadata,
}

#[Object]
impl Task {
impl Task<'_> {
/// Unique name of the task
async fn id(&self) -> &String {
&self.node_status.id
Expand Down Expand Up @@ -318,6 +368,22 @@ impl Task {
async fn dependencies(&self) -> Vec<String> {
self.node_status.children.clone()
}

/// Artifacts produced by a task
async fn artifacts(&self) -> Vec<Artifact> {
match self.node_status.outputs.as_ref() {
None => Vec::new(),
Some(outputs) => outputs
.artifacts
.iter()
.map(|manifest| Artifact {
manifest,
metadata: self.metadata,
node_id: &self.node_status.id,
})
.collect::<Vec<_>>(),
}
}
}

async fn fetch_missing_task_info(
Expand Down Expand Up @@ -373,7 +439,7 @@ impl TaskMap {
}

/// Converts [`TaskMap`] into [`Vec<Task>`]`
fn into_tasks(self) -> Vec<Task> {
fn into_tasks(self, metadata: &Metadata) -> Vec<Task> {
let mut relationship_map = TaskMap::generate_relationship_map(&self);
self.0
.into_iter()
Expand All @@ -382,6 +448,7 @@ impl TaskMap {
Task {
node_status,
depends,
metadata,
}
})
.collect::<Vec<_>>()
Expand Down Expand Up @@ -1060,4 +1127,77 @@ mod tests {
schema.execute(query).await.into_result().unwrap();
workflows_endpoint.assert_async().await;
}

#[tokio::test]
async fn get_artifacts_of_succeeded_workflow_query() {
let workflow_name = "numpy-benchmark-wdkwj";
let visit = Visit {
proposal_code: "mg".to_string(),
proposal_number: 36964,
number: 1,
};

let mut server = mockito::Server::new_async().await;
let mut response_file_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
response_file_path.push("test-assets");
response_file_path.push("get-workflow-wdkwj.json");
let workflow_endpoint = server
.mock(
"GET",
&format!("/api/v1/workflows/{}/{}", visit, workflow_name)[..],
)
.with_status(200)
.with_header("content-type", "application/json")
.with_body_from_file(response_file_path)
.create_async()
.await;

let argo_server_url = Url::parse(&server.url()).unwrap();
let schema = root_schema_builder()
.data(ArgoServerUrl(argo_server_url))
.data(None::<Authorization<Bearer>>)
.finish();
let query = format!(
r#"
query {{
workflow(name: "{}", visit: {{proposalCode: "{}", proposalNumber: {}, number: {}}}) {{
status {{
...on WorkflowSucceededStatus {{
tasks {{
artifacts {{
name
url
}}
}}
}}
}}
}}
}}
"#,
workflow_name, visit.proposal_code, visit.proposal_number, visit.number
);
let resp = schema.execute(query).await.into_result().unwrap();

workflow_endpoint.assert_async().await;
let expected_download_url = format!(
"{}/artifacts/{}/{}/{}/main-logs",
server.url(),
visit,
workflow_name,
workflow_name
);
let expected_data = json!({
"workflow": {
"status": {
"tasks": [{
"artifacts": [{
"name": "main.log",
"url": expected_download_url
}]
}]
}
}
});
assert_eq!(resp.data.into_json().unwrap(), expected_data);
}
}

0 comments on commit e8dadeb

Please sign in to comment.