Skip to content

Commit

Permalink
Move fetching missing task info to separate function
Browse files Browse the repository at this point in the history
  • Loading branch information
yousefmoazzam committed Jan 27, 2025
1 parent 7f257a4 commit 6d00ac0
Showing 1 changed file with 43 additions and 52 deletions.
95 changes: 43 additions & 52 deletions graph-proxy/src/graphql/workflows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use axum_extra::headers::{authorization::Bearer, Authorization};
use chrono::{DateTime, Utc};
use std::{collections::HashMap, ops::Deref, sync::Arc};
use tracing::{debug, instrument};
use url::Url;

/// An error encountered when parsing the Argo Server API Workflow response
#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -115,32 +116,11 @@ impl WorkflowRunningStatus {

/// Tasks created by the workflow
async fn tasks(&self, ctx: &Context<'_>) -> anyhow::Result<Vec<Task>> {
let mut url = ctx.data_unchecked::<ArgoServerUrl>().deref().to_owned();
let auth_token = ctx.data_unchecked::<Option<Authorization<Bearer>>>();
let mut nodes = self.0.status.as_ref().unwrap().nodes.clone();
if nodes.is_empty() {
url.path_segments_mut().unwrap().extend([
"api",
"v1",
"workflows",
self.0.as_ref().metadata.namespace.as_ref().unwrap(),
self.0.as_ref().metadata.name.as_ref().unwrap(),
]);
let request = if let Some(auth_token) = auth_token {
CLIENT.get(url).bearer_auth(auth_token.token())
} else {
CLIENT.get(url)
};
nodes = request
.send()
.await?
.json::<APIResult<argo_workflows_openapi::IoArgoprojWorkflowV1alpha1Workflow>>()
.await?
.into_result()?
.status
.unwrap() // Safe as the status field is always present
.nodes;
}
let url = ctx.data_unchecked::<ArgoServerUrl>().deref().to_owned();
let token = ctx
.data_unchecked::<Option<Authorization<Bearer>>>()
.to_owned();
let nodes = fetch_missing_task_info(url, token, Arc::clone(&self.0)).await?;
Ok(TaskMap(nodes).into_tasks())
}
}
Expand Down Expand Up @@ -200,32 +180,11 @@ impl WorkflowCompleteStatus {

/// Tasks created by the workflow
async fn tasks(&self, ctx: &Context<'_>) -> anyhow::Result<Vec<Task>> {
let mut url = ctx.data_unchecked::<ArgoServerUrl>().deref().to_owned();
let auth_token = ctx.data_unchecked::<Option<Authorization<Bearer>>>();
let mut nodes = self.0.status.as_ref().unwrap().nodes.clone();
if nodes.is_empty() {
url.path_segments_mut().unwrap().extend([
"api",
"v1",
"workflows",
self.0.as_ref().metadata.namespace.as_ref().unwrap(),
self.0.as_ref().metadata.name.as_ref().unwrap(),
]);
let request = if let Some(auth_token) = auth_token {
CLIENT.get(url).bearer_auth(auth_token.token())
} else {
CLIENT.get(url)
};
nodes = request
.send()
.await?
.json::<APIResult<argo_workflows_openapi::IoArgoprojWorkflowV1alpha1Workflow>>()
.await?
.into_result()?
.status
.unwrap() // Safe as the status field is always present
.nodes;
}
let url = ctx.data_unchecked::<ArgoServerUrl>().deref().to_owned();
let token = ctx
.data_unchecked::<Option<Authorization<Bearer>>>()
.to_owned();
let nodes = fetch_missing_task_info(url, token, Arc::clone(&self.0)).await?;
Ok(TaskMap(nodes).into_tasks())
}
}
Expand Down Expand Up @@ -304,6 +263,38 @@ impl Task {
}
}

async fn fetch_missing_task_info(
mut url: Url,
token: Option<Authorization<Bearer>>,
manifest: Arc<IoArgoprojWorkflowV1alpha1Workflow>,
) -> anyhow::Result<HashMap<String, IoArgoprojWorkflowV1alpha1NodeStatus>> {
let mut nodes = manifest.status.as_ref().unwrap().nodes.clone();
if nodes.is_empty() {
url.path_segments_mut().unwrap().extend([
"api",
"v1",
"workflows",
manifest.as_ref().metadata.namespace.as_ref().unwrap(),
manifest.as_ref().metadata.name.as_ref().unwrap(),
]);
let request = if let Some(token) = token {
CLIENT.get(url).bearer_auth(token.token())
} else {
CLIENT.get(url)
};
nodes = request
.send()
.await?
.json::<APIResult<argo_workflows_openapi::IoArgoprojWorkflowV1alpha1Workflow>>()
.await?
.into_result()?
.status
.unwrap() // Safe as the status field is always present
.nodes;
}
Ok(nodes)
}

/// A wrapper for list of tasks
struct TaskMap(HashMap<String, IoArgoprojWorkflowV1alpha1NodeStatus>);

Expand Down

0 comments on commit 6d00ac0

Please sign in to comment.