Skip to content

Commit

Permalink
chore(engine): Refactor workflow parsing to use try_from instead o …
Browse files Browse the repository at this point in the history
…`try_from_str` (#550)

* chore(engine): Refactor workflow parsing to use `try_from` instead of `try_from_str`

* fix

* fix
  • Loading branch information
miseyu authored Oct 3, 2024
1 parent f669937 commit 66f1c54
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 82 deletions.
2 changes: 1 addition & 1 deletion engine/cli/src/dot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl DotCliCommand {
"Router".to_string(),
NodeKind::Processor(Box::<RouterFactory>::default()),
);
let workflow = Workflow::try_from_str(&json);
let workflow = Workflow::try_from(json.as_str()).map_err(crate::errors::Error::run)?;
let dag =
DagSchemas::from_graphs(workflow.entry_graph_id, workflow.graphs, factories, None);
println!("{}", dag.to_dot());
Expand Down
4 changes: 4 additions & 0 deletions engine/cli/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,8 @@ impl Error {
pub(crate) fn unknown_command<T: ToString>(message: T) -> Self {
Self::UnknownCommand(message.to_string())
}

pub(crate) fn run<T: ToString>(message: T) -> Self {
Self::Run(message.to_string())
}
}
6 changes: 4 additions & 2 deletions engine/cli/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,10 @@ impl RunCliCommand {
.map_err(crate::errors::Error::init)?;
String::from_utf8(bytes.to_vec()).map_err(crate::errors::Error::init)?
};
let mut workflow = Workflow::try_from_str(&json);
workflow.merge_with(self.vars.clone());
let mut workflow = Workflow::try_from(json.as_str()).map_err(crate::errors::Error::init)?;
workflow
.merge_with(self.vars.clone())
.map_err(crate::errors::Error::init)?;
let job_id = match &self.job_id {
Some(job_id) => {
uuid::Uuid::from_str(job_id.as_str()).map_err(crate::errors::Error::init)?
Expand Down
8 changes: 6 additions & 2 deletions engine/plateau-gis-quality-checker/src-tauri/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,12 @@ pub(crate) async fn run_flow(
)?;
let json = String::from_utf8(bytes.data.iter().cloned().collect())
.map_err(crate::errors::Error::io)?;
let mut workflow = Workflow::try_from_str(&json);
workflow.merge_with(params);
let mut workflow = Workflow::try_from(json.as_str()).map_err(|e| {
crate::errors::Error::ExecuteFailed(format!("failed to parse workflow with {:?}", e))
})?;
workflow.merge_with(params).map_err(|e| {
crate::errors::Error::ExecuteFailed(format!("failed to merge params with {:?}", e))
})?;
let storage_resolver = Arc::new(resolve::StorageResolver::new());
let job_id = uuid::Uuid::new_v4();
let action_log_uri = setup_job_directory("plateau-gis-quality-checker", "action-log", job_id)
Expand Down
2 changes: 1 addition & 1 deletion engine/runtime/examples/plateau/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ pub fn create_workflow(workflow: &str) -> Workflow {
let path = absolute_path.unwrap();
let yaml = Transformer::new(path, false).unwrap();
let yaml = yaml.to_string();
Workflow::try_from_str(yaml.as_str())
Workflow::try_from(yaml.as_str()).expect("Failed to parse workflow.")
}

pub fn setup_logging_and_tracing() {
Expand Down
2 changes: 1 addition & 1 deletion engine/runtime/tests/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub(crate) fn execute(test_id: &str, fixture_files: Vec<&str>) -> Result<(), Err
),
Uri::for_test("ram:///log/").path(),
));
let workflow = Workflow::try_from_str(workflow);
let workflow = Workflow::try_from(workflow).expect("failed to parse workflow");
Runner::run(
workflow,
BUILTIN_ACTION_FACTORIES.clone(),
Expand Down
74 changes: 48 additions & 26 deletions engine/runtime/types/src/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ pub type Parameter = Map<String, Value>;

static ENVIRONMENT_PREFIX: &str = "FLOW_VAR_";

#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct WorkflowParameter {
pub global: Option<Parameter>,
pub node: Option<NodeProperty>,
}

#[derive(Serialize, Deserialize, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct Workflow {
Expand All @@ -26,21 +33,18 @@ pub struct Workflow {
pub graphs: Vec<Graph>,
}

#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct WorkflowParameter {
pub global: Option<Parameter>,
pub node: Option<NodeProperty>,
}
impl TryFrom<&str> for Workflow {
type Error = crate::error::Error;

impl Workflow {
pub fn try_from_str(s: &str) -> Self {
let mut workflow: Self = from_str(s).unwrap();
workflow.load_variables_from_environment();
workflow
fn try_from(value: &str) -> Result<Self, Self::Error> {
let mut workflow: Self = from_str(value).map_err(crate::error::Error::input)?;
workflow.load_variables_from_environment()?;
Ok(workflow)
}
}

fn load_variables_from_environment(&mut self) {
impl Workflow {
fn load_variables_from_environment(&mut self) -> Result<(), crate::error::Error> {
let environment_vars: Vec<(String, String)> = env::vars()
.filter(|(key, _)| key.starts_with(ENVIRONMENT_PREFIX))
.map(|(key, value)| (key[ENVIRONMENT_PREFIX.len()..].to_string(), value))
Expand All @@ -52,26 +56,39 @@ impl Workflow {
})
.collect();
if environment_vars.is_empty() {
return;
return Ok(());
}
let mut with = if let Some(with) = self.with.clone() {
with
} else {
serde_json::Map::<String, Value>::new()
};
with.extend(environment_vars.into_iter().map(|(key, value)| {
let value = match determine_format(value.as_str()) {
SerdeFormat::Json | SerdeFormat::Yaml => from_str(value.as_str()).unwrap(),
SerdeFormat::Unknown => serde_json::to_value(value).unwrap(),
};
(key, value)
}));
with.extend(
environment_vars
.into_iter()
.map(|(key, value)| {
let value = match determine_format(value.as_str()) {
SerdeFormat::Json | SerdeFormat::Yaml => {
from_str(value.as_str()).map_err(crate::error::Error::input)?
}
SerdeFormat::Unknown => {
serde_json::to_value(value).map_err(crate::error::Error::input)?
}
};
Ok((key, value))
})
.collect::<Result<Vec<_>, crate::error::Error>>()?,
);
self.with = Some(with);
Ok(())
}

pub fn merge_with(&mut self, params: HashMap<String, String>) {
pub fn merge_with(
&mut self,
params: HashMap<String, String>,
) -> Result<(), crate::error::Error> {
if params.is_empty() {
return;
return Ok(());
}
let mut with = if let Some(with) = self.with.clone() {
with
Expand All @@ -88,14 +105,19 @@ impl Workflow {
})
.map(|(key, value)| {
let value = match determine_format(value.as_str()) {
SerdeFormat::Json | SerdeFormat::Yaml => from_str(value.as_str()).unwrap(),
SerdeFormat::Unknown => serde_json::to_value(value).unwrap(),
SerdeFormat::Json | SerdeFormat::Yaml => {
from_str(value.as_str()).map_err(crate::error::Error::input)?
}
SerdeFormat::Unknown => {
serde_json::to_value(value).map_err(crate::error::Error::input)?
}
};
(key, value)
Ok((key, value))
})
.collect::<serde_json::Map<String, Value>>();
.collect::<Result<HashMap<_, _>, crate::error::Error>>()?;
with.extend(params);
self.with = Some(with);
Ok(())
}
}

Expand Down
4 changes: 0 additions & 4 deletions engine/worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@ version.workspace = true
name = "reearth-flow-worker"
path = "src/main.rs"

[features]
default = []
feature-async = []

[dependencies]
reearth-flow-action-log.workspace = true
reearth-flow-action-plateau-processor.workspace = true
Expand Down
46 changes: 1 addition & 45 deletions engine/worker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use factory::ALL_ACTION_FACTORIES;
// This is a placeholder for the actual implementation of the worker.
// I don't know whether to use sync or async, so I've implemented both. Once you've decided which to use,

#[cfg(not(feature = "feature-async"))]
fn main() {
use reearth_flow_runner::runner::Runner;
// TODO: Prepare Process
Expand All @@ -24,7 +23,7 @@ fn main() {
let yaml = "${yamlcode}"; // TODO: Read from ??
let job_id: Option<String> = None; // TODO: Read from ??
let action_log_uri: Option<String> = None; // TODO: Read from ??
let workflow = Workflow::try_from_str(yaml);
let workflow = Workflow::try_from(yaml).expect("Failed to parse workflow");

let storage_resolver = Arc::new(resolve::StorageResolver::new());
let job_id = match job_id {
Expand Down Expand Up @@ -56,46 +55,3 @@ fn main() {

// TODO: Clean up Process
}

#[cfg(feature = "feature-async")]
#[tokio::main]
async fn main() {
use reearth_flow_runner::runner::AsyncRunner;
// TODO: Prepare Process
// TODO: Please make sure to handle errors properly in the 'expect' section.
let yaml = "${yamlcode}"; // TODO: Read from ??
let job_id: Option<String> = None; // TODO: Read from ??
let action_log_uri: Option<String> = None; // TODO: Read from ??
let workflow = Workflow::try_from_str(yaml);

let storage_resolver = Arc::new(resolve::StorageResolver::new());
let job_id = match job_id {
Some(job_id) => uuid::Uuid::from_str(job_id.as_str()).expect("Invalid job id"),
None => uuid::Uuid::new_v4(),
};
let action_log_uri = match action_log_uri {
Some(uri) => Uri::from_str(&uri).expect("Invalid action log uri"),
None => setup_job_directory("worker", "action-log", job_id)
.expect("Failed to setup job directory"),
};
let state_uri = setup_job_directory("worker", "feature-store", job_id)
.expect("Failed to setup job directory");
let state =
Arc::new(State::new(&state_uri, &storage_resolver).expect("Failed to create state"));

let logger_factory = Arc::new(LoggerFactory::new(
create_root_logger(action_log_uri.path()),
action_log_uri.path(),
));
AsyncRunner::run(
workflow,
ALL_ACTION_FACTORIES.clone(),
logger_factory,
storage_resolver,
state,
)
.await
.expect("Failed to run workflow");

// TODO: Clean up Process
}

0 comments on commit 66f1c54

Please sign in to comment.