diff --git a/graph-proxy/src/graphql/workflow_templates.rs b/graph-proxy/src/graphql/workflow_templates.rs index bd72a631..1ac9c496 100644 --- a/graph-proxy/src/graphql/workflow_templates.rs +++ b/graph-proxy/src/graphql/workflow_templates.rs @@ -224,7 +224,7 @@ impl WorkflowTemplatesMutation { .json::>() .await? .into_result()?; - Ok(Workflow::new(workflow, visit.into())?) + Ok(Workflow::new(workflow, visit.into())) } } diff --git a/graph-proxy/src/graphql/workflows.rs b/graph-proxy/src/graphql/workflows.rs index 7b6e851e..cf8cfd94 100644 --- a/graph-proxy/src/graphql/workflows.rs +++ b/graph-proxy/src/graphql/workflows.rs @@ -10,8 +10,9 @@ use async_graphql::{ }; use axum_extra::headers::{authorization::Bearer, Authorization}; use chrono::{DateTime, Utc}; -use std::{collections::HashMap, ops::Deref, sync::Arc}; +use std::{collections::HashMap, ops::Deref}; use tracing::{debug, instrument}; +use url::Url; /// An error encountered when parsing the Argo Server API Workflow response #[derive(Debug, thiserror::Error)] @@ -27,175 +28,221 @@ pub(super) enum WorkflowParsingError { UnrecognisedTaskPhase, #[error("value.display_name was not a recognised value")] UnrecognisedTaskDisplayName, + #[error("status was expected but was not present")] + MissingWorkflowStatus, } /// A Workflow consisting of one or more [`Task`]s -#[derive(Debug, SimpleObject)] +#[derive(Debug)] pub(super) struct Workflow { - /// Metadata containing name, proposal code, proposal number and visit of a workflow - #[graphql(flatten)] - metadata: Arc, - /// The time at which the workflow began running - status: Option, + /// Manifest associated with the workflow + pub(super) manifest: IoArgoprojWorkflowV1alpha1Workflow, + /// Metadata associated with the workflow + pub(super) metadata: Metadata, } -#[derive(Debug, SimpleObject)] -struct Metadata { - /// The name given to the workflow, unique within a given visit - name: String, - /// The visit the Workflow was run against - visit: Visit, +impl Workflow { + /// Create [`Workflow`] from [`IoArgoprojWorkflowV1alpha1Workflow`] and [`Visit`] + pub fn new(manifest: IoArgoprojWorkflowV1alpha1Workflow, visit: Visit) -> Workflow { + let name = manifest.metadata.name.clone().unwrap(); + Workflow { + manifest, + metadata: Metadata { name, visit }, + } + } } -#[allow(clippy::missing_docs_in_private_items)] +#[Object] impl Workflow { - pub(super) fn new( - value: IoArgoprojWorkflowV1alpha1Workflow, - visit: Visit, - ) -> Result { - let metadata = Arc::new(Metadata { - name: value.metadata.name.clone().unwrap(), - visit, - }); - let status = WorkflowStatus::new(value.status.clone().unwrap(), metadata.clone())?; - Ok(Self { metadata, status }) + /// The name given to the workflow, unique within a given visit + async fn name(&self) -> &str { + &self.metadata.name + } + + /// The visit the Workflow was run against + async fn visit(&self) -> &Visit { + &self.metadata.visit + } + + /// The current status of the workflow + async fn status(&self) -> Result, WorkflowParsingError> { + WorkflowStatus::new(&self.manifest, &self.metadata) } } +#[derive(Debug)] +pub(super) struct Metadata { + /// The name given to the workflow, unique within a given visit + name: String, + /// The visit the Workflow was run against + visit: Visit, +} + /// The status of a workflow #[derive(Debug, Union)] #[allow(clippy::missing_docs_in_private_items)] -enum WorkflowStatus { - Pending(WorkflowPendingStatus), - Running(WorkflowRunningStatus), - Succeeded(WorkflowSucceededStatus), - Failed(WorkflowFailedStatus), - Errored(WorkflowErroredStatus), +enum WorkflowStatus<'a> { + Pending(WorkflowPendingStatus<'a>), + Running(WorkflowRunningStatus<'a>), + Succeeded(WorkflowSucceededStatus<'a>), + Failed(WorkflowFailedStatus<'a>), + Errored(WorkflowErroredStatus<'a>), } -impl WorkflowStatus { +impl<'a> WorkflowStatus<'a> { /// Creates a new `WorkflowStatus` from `IoArgoprojWorkflowV1alpha1WorkflowStatus` and associated metadata. fn new( - value: IoArgoprojWorkflowV1alpha1WorkflowStatus, - metadata: Arc, + workflow: &'a IoArgoprojWorkflowV1alpha1Workflow, + metadata: &'a Metadata, ) -> Result, WorkflowParsingError> { - match value.phase.as_deref() { - Some("Pending") => Ok(Some(Self::Pending(WorkflowPendingStatus::from(value)))), - Some("Running") => Ok(Some(Self::Running(WorkflowRunningStatus::new( - value, metadata, - )?))), - Some("Succeeded") => Ok(Some(Self::Succeeded( - WorkflowCompleteStatus::new(value, metadata)?.into(), - ))), - Some("Failed") => Ok(Some(Self::Failed( - WorkflowCompleteStatus::new(value, metadata)?.into(), - ))), - Some("Error") => Ok(Some(Self::Errored( - WorkflowCompleteStatus::new(value, metadata)?.into(), - ))), - Some(_) => Err(WorkflowParsingError::UnrecognisedPhase), - None => Ok(None), + match workflow.status.as_ref() { + Some(status) => match status.phase.as_deref() { + Some("Pending") => Ok(Some(Self::Pending(WorkflowPendingStatus(status)))), + Some("Running") => Ok(Some(Self::Running(WorkflowRunningStatus { + manifest: status, + metadata, + }))), + Some("Succeeded") => Ok(Some(Self::Succeeded( + WorkflowCompleteStatus { + manifest: status, + metadata, + } + .into(), + ))), + Some("Failed") => Ok(Some(Self::Failed( + WorkflowCompleteStatus { + manifest: status, + metadata, + } + .into(), + ))), + Some("Error") => Ok(Some(Self::Errored( + WorkflowCompleteStatus { + manifest: status, + metadata, + } + .into(), + ))), + Some(_) => Err(WorkflowParsingError::UnrecognisedPhase), + None => Ok(None), + }, + None => Err(WorkflowParsingError::MissingWorkflowStatus), } } } /// No tasks within the workflow have been scheduled -#[derive(Debug, SimpleObject)] -struct WorkflowPendingStatus { - /// A human readable message indicating details about why the workflow is in this condition - message: Option, -} +#[derive(Debug)] +struct WorkflowPendingStatus<'a>(&'a IoArgoprojWorkflowV1alpha1WorkflowStatus); -impl From for WorkflowPendingStatus { - fn from(value: IoArgoprojWorkflowV1alpha1WorkflowStatus) -> Self { - Self { - message: value.message, - } +#[Object] +impl WorkflowPendingStatus<'_> { + /// A human readable message indicating details about why the workflow is in this condition + async fn message(&self) -> Option<&str> { + self.0.message.as_deref() } } /// At least one of the tasks has been scheduled, but they have not yet all complete -#[derive(Debug, SimpleObject)] -struct WorkflowRunningStatus { +#[allow(clippy::missing_docs_in_private_items)] +#[derive(Debug)] +struct WorkflowRunningStatus<'a> { + manifest: &'a IoArgoprojWorkflowV1alpha1WorkflowStatus, + metadata: &'a Metadata, +} + +#[Object] +impl WorkflowRunningStatus<'_> { /// Time at which this workflow started - start_time: DateTime, + async fn start_time(&self) -> Result, WorkflowParsingError> { + Ok(**self + .manifest + .started_at + .as_ref() + .ok_or(WorkflowParsingError::MissingStartTime)?) + } + /// A human readable message indicating details about why the workflow is in this condition - message: Option, - /// Tasks created by the workflow - #[graphql(flatten)] - tasks: Tasks, -} + async fn message(&self) -> Option<&str> { + self.manifest.message.as_deref() + } -impl WorkflowRunningStatus { - /// Creates a new `WorkflowRunningStatus` from `IoArgoprojWorkflowV1alpha1WorkflowStatus` and associated metadata. - fn new( - value: IoArgoprojWorkflowV1alpha1WorkflowStatus, - metadata: Arc, - ) -> Result { - Ok(Self { - start_time: *value - .started_at - .ok_or(WorkflowParsingError::MissingStartTime)?, - message: value.message, - tasks: TaskMap(value.nodes).into_tasks(metadata)?, - }) + /// Tasks created by the workflow + async fn tasks(&self, ctx: &Context<'_>) -> anyhow::Result> { + let url = ctx.data_unchecked::().deref().to_owned(); + let token = ctx + .data_unchecked::>>() + .to_owned(); + let nodes = fetch_missing_task_info(url, token, self.manifest, self.metadata).await?; + Ok(TaskMap(nodes).into_tasks()) } } /// All tasks in the workflow have succeded #[derive(Debug, SimpleObject, derive_more::From)] -struct WorkflowSucceededStatus { +struct WorkflowSucceededStatus<'a> { #[graphql(flatten)] #[allow(clippy::missing_docs_in_private_items)] - status: WorkflowCompleteStatus, + status: WorkflowCompleteStatus<'a>, } /// All tasks in the workflow have failed #[derive(Debug, SimpleObject, derive_more::From)] -struct WorkflowFailedStatus { +struct WorkflowFailedStatus<'a> { #[graphql(flatten)] #[allow(clippy::missing_docs_in_private_items)] - status: WorkflowCompleteStatus, + status: WorkflowCompleteStatus<'a>, } /// All tasks in the workflow have errored #[derive(Debug, SimpleObject, derive_more::From)] -struct WorkflowErroredStatus { +struct WorkflowErroredStatus<'a> { #[graphql(flatten)] #[allow(clippy::missing_docs_in_private_items)] - status: WorkflowCompleteStatus, + status: WorkflowCompleteStatus<'a>, } /// All tasks in the workflow have completed (succeeded, failed, or errored) -#[derive(Debug, SimpleObject)] -struct WorkflowCompleteStatus { +#[allow(clippy::missing_docs_in_private_items)] +#[derive(Debug)] +struct WorkflowCompleteStatus<'a> { + manifest: &'a IoArgoprojWorkflowV1alpha1WorkflowStatus, + metadata: &'a Metadata, +} + +#[Object] +impl WorkflowCompleteStatus<'_> { /// Time at which this workflow started - start_time: DateTime, + async fn start_time(&self) -> Result, WorkflowParsingError> { + Ok(**self + .manifest + .started_at + .as_ref() + .ok_or(WorkflowParsingError::MissingStartTime)?) + } + /// Time at which this workflow completed - end_time: DateTime, + async fn end_time(&self) -> Result, WorkflowParsingError> { + Ok(**self + .manifest + .finished_at + .as_ref() + .ok_or(WorkflowParsingError::MissingEndTime)?) + } + /// A human readable message indicating details about why the workflow is in this condition - message: Option, - /// Tasks created by the workflow - #[graphql(flatten)] - tasks: Tasks, -} + async fn message(&self) -> Option<&str> { + self.manifest.message.as_deref() + } -impl WorkflowCompleteStatus { - /// Creates a new [`WorkflowCompleteStatus`] from [`IoArgoprojWorkflowV1alpha1WorkflowStatus`] and associated metadata. - fn new( - value: IoArgoprojWorkflowV1alpha1WorkflowStatus, - metadata: Arc, - ) -> Result { - Ok(Self { - start_time: *value - .started_at - .ok_or(WorkflowParsingError::MissingStartTime)?, - end_time: *value - .finished_at - .ok_or(WorkflowParsingError::MissingEndTime)?, - message: value.message, - tasks: TaskMap(value.nodes).into_tasks(metadata)?, - }) + /// Tasks created by the workflow + async fn tasks(&self, ctx: &Context<'_>) -> anyhow::Result> { + let url = ctx.data_unchecked::().deref().to_owned(); + let token = ctx + .data_unchecked::>>() + .to_owned(); + let nodes = fetch_missing_task_info(url, token, self.manifest, self.metadata).await?; + Ok(TaskMap(nodes).into_tasks()) } } @@ -229,89 +276,81 @@ impl TryFrom for TaskStatus { } /// A Task created by a workflow -#[derive(Debug, SimpleObject, Clone)] +#[allow(clippy::missing_docs_in_private_items)] +#[derive(Debug, Clone)] struct Task { - /// Unique name of the task - id: String, - /// Display name of the task - name: String, - /// Current status of a task - status: TaskStatus, - /// Parent of a task + node_status: IoArgoprojWorkflowV1alpha1NodeStatus, depends: Vec, - /// Children of a task - dependencies: Vec, } +#[Object] impl Task { - /// Create a task from node status and its dependencies - fn new( - node_status: argo_workflows_openapi::IoArgoprojWorkflowV1alpha1NodeStatus, - depends: Vec, - ) -> Result { - Ok(Self { - id: node_status.id, - name: node_status - .display_name - .ok_or(WorkflowParsingError::UnrecognisedTaskDisplayName)? + /// Unique name of the task + async fn id(&self) -> &String { + &self.node_status.id + } + + /// Display name of the task + async fn name(&self) -> Result<&String, WorkflowParsingError> { + self.node_status + .display_name + .as_ref() + .ok_or(WorkflowParsingError::UnrecognisedTaskDisplayName) + } + + /// Current status of a task + async fn status(&self) -> Result { + TaskStatus::try_from( + self.node_status + .phase + .as_ref() + .ok_or(WorkflowParsingError::UnrecognisedTaskPhase)? .to_string(), - status: TaskStatus::try_from( - node_status - .phase - .ok_or(WorkflowParsingError::UnrecognisedTaskPhase)? - .to_string(), - )?, - depends, - dependencies: node_status.children, - }) + ) } -} -#[allow(clippy::missing_docs_in_private_items)] -#[derive(Debug)] -enum Tasks { - Fetched(Vec), - UnFetched(Arc), + /// Parent of a task + async fn depends(&self) -> Vec { + self.depends.clone() + } + + /// Children of a task + async fn dependencies(&self) -> Vec { + self.node_status.children.clone() + } } -#[Object] -impl Tasks { - #[allow(clippy::missing_docs_in_private_items)] - async fn tasks(&self, ctx: &Context<'_>) -> anyhow::Result> { - match self { - Tasks::Fetched(tasks) => Ok(tasks.clone()), - Tasks::UnFetched(metadata) => { - let server_url = ctx.data_unchecked::().deref(); - let auth_token = ctx.data_unchecked::>>(); - let mut url = server_url.clone(); - url.path_segments_mut().unwrap().extend([ - "api", - "v1", - "workflows", - &metadata.visit.to_string(), - &metadata.name, - ]); - let request = if let Some(auth_token) = auth_token { - CLIENT.get(url).bearer_auth(auth_token.token()) - } else { - CLIENT.get(url) - }; - let nodes = request - .send() - .await? - .json::>() - .await? - .into_result()? - .status - .unwrap() // Safe as the status field is always present - .nodes; - Ok(match TaskMap(nodes).into_tasks(metadata.clone())? { - Tasks::Fetched(fetched_tasks) => fetched_tasks, - Tasks::UnFetched(_) => vec![], - }) - } - } +async fn fetch_missing_task_info( + mut url: Url, + token: Option>, + manifest: &IoArgoprojWorkflowV1alpha1WorkflowStatus, + metadata: &Metadata, +) -> anyhow::Result> { + let mut nodes = manifest.nodes.clone(); + if nodes.is_empty() { + url.path_segments_mut().unwrap().extend([ + "api", + "v1", + "workflows", + &metadata.visit.to_string(), + &metadata.name, + ]); + let request = if let Some(token) = token { + CLIENT.get(url).bearer_auth(token.token()) + } else { + CLIENT.get(url) + }; + nodes = request + .send() + .await? + .json::>() + .await? + .into_result()? + .status + .unwrap() // Safe as the status field is always present + .nodes; } + Ok(nodes) } /// A wrapper for list of tasks @@ -333,21 +372,19 @@ impl TaskMap { parent_map } - /// Converts [`TaskMap`] into [`Tasks`]` - fn into_tasks(self, metadata: Arc) -> Result { + /// Converts [`TaskMap`] into [`Vec`]` + fn into_tasks(self) -> Vec { let mut relationship_map = TaskMap::generate_relationship_map(&self); - if self.0.is_empty() { - return Ok(Tasks::UnFetched(metadata)); - } - let tasks = self - .0 + self.0 .into_iter() .map(|(node_name, node_status)| { let depends = relationship_map.remove(&node_name).unwrap_or_default(); - Task::new(node_status, depends) + Task { + node_status, + depends, + } }) - .collect::, _>>()?; - Ok(Tasks::Fetched(tasks)) + .collect::>() } } @@ -387,7 +424,7 @@ impl WorkflowsQuery { .json::>() .await? .into_result()?; - Ok(Workflow::new(workflow, visit.into())?) + Ok(Workflow::new(workflow, visit.into())) } #[instrument(skip(self, ctx))] @@ -431,7 +468,7 @@ impl WorkflowsQuery { .items .into_iter() .map(|workflow| Workflow::new(workflow, visit.clone().into())) - .collect::, _>>()?; + .collect::>(); let mut connection = Connection::new( cursor_index > 0, workflows_response.metadata.continue_.is_some(),