Skip to content

Commit

Permalink
Add activity leaf fetcher
Browse files Browse the repository at this point in the history
  • Loading branch information
Elais Player committed Feb 9, 2023
1 parent b2a5d5d commit 53eaa66
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 19 deletions.
77 changes: 60 additions & 17 deletions components/nmdc_runtime/workflow_execution_activity/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ class ActiveActivities(TypedDict):
workflow: Workflow


class ActivityWithWorkflow(TypedDict):
activity: WorkflowExecutionActivity
workflow: Workflow


flatten = lambda *n: (
e for a in n for e in (flatten(*a) if isinstance(a, (tuple, list)) else (a,))
)
Expand Down Expand Up @@ -45,7 +50,7 @@ def add_relevant_info(
return workflow


def construct_job_config(activity: WorkflowExecutionActivity, name: str) -> Any:
def construct_job_config(activity: WorkflowExecutionActivity, name: str) -> Workflow:
workflows = get_all_workflows()
next_workflows = list(filter(lambda wf: wf.predecessor == name, workflows))
relevant_info = [add_relevant_info(wf, activity) for wf in next_workflows]
Expand All @@ -59,9 +64,7 @@ def container_job(
return jobs


def parse_data_objects(
activity: WorkflowExecutionActivity, data_objects: list[DataObject]
) -> Workflow:
def parse_data_objects(activity: Workflow, data_objects: list[DataObject]) -> dict[str, Any]:
activity_dict = activity.dict()
for key in activity_dict["inputs"]:
for do in data_objects:
Expand All @@ -71,12 +74,47 @@ def parse_data_objects(
return activity_dict


def associate_activity_with_workflow(
aa: ActiveActivities,
) -> list[ActivityWithWorkflow]:
return [
{"activity": activity, "workflow": aa["workflow"]}
for activity in aa["activities"]
]


def get_input_set(activities: list[ActivityWithWorkflow]) -> set[str]:
activity_input_set = set()
for entry in activities:
activity_input_set.update(entry["activity"].has_input)
return activity_input_set


def outputs_in_inputs(activity: ActivityWithWorkflow, inputs: set[str]) -> bool:
for output in activity["activity"].has_outputs:
if output in inputs:
return True

return False


def filter_activities(
activities: list[ActivityWithWorkflow], inputs: set[str]
) -> list[ActivityWithWorkflow]:
leaves: list[ActivityWithWorkflow] = []
for activity in activities:
if not outputs_in_inputs(activity, inputs):
leaves.append(activity)

return leaves


class ActivityService:
def create_jobs(
self,
activities: list[ActiveActivities],
data_objects: list[DataObject],
) -> list[WorkflowExecutionActivity]:
) -> list[dict[str, Any]]:
"""Create jobs for automation.
Parameters
Expand All @@ -87,24 +125,29 @@ def create_jobs(
Returns
-------
list[Workflow]
list[dict[str,Any]]
"""
processed_activities: list[Workflow] = list(
flatten(
[
container_job(aa["activities"], aa["workflow"].name)
for aa in activities
]
)
flattened_activities: list[ActivityWithWorkflow] = list(
flatten([associate_activity_with_workflow(entry) for entry in activities])
)

input_set: set[str] = get_input_set(flattened_activities)
activity_leaves: list[ActivityWithWorkflow] = filter_activities(
flattened_activities, input_set
)

job_configs: list[Workflow] = [
construct_job_config(entry["activity"], entry["workflow"].name)
for entry in activity_leaves
]

return [
parse_data_objects(activity, data_objects)
for activity in processed_activities
parse_data_objects(job_config, data_objects) for job_config in job_configs
]

async def add_activity_set(
self, activities: Database, db: MongoDatabase
) -> list[WorkflowExecutionActivity]:
) -> list[ActiveActivities]:
"""
Store workflow activities.
Expand All @@ -118,7 +161,7 @@ async def add_activity_set(
Returns
-------
list[str]
list[ActiveActivities]
IDs for all activities added to the collection
"""
insert_activities(activities, db)
Expand Down
4 changes: 2 additions & 2 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
buildInputs = [
(pkgs.${python}.withPackages
(ps: with ps; [pip python-lsp-server python-lsp-black isort
pip-tools pylsp-mypy]))
pkgs.gnumake pkgs.ruff pkgs.docker pkgs.docker-compose
pip-tools pylsp-mypy pydantic mypy]))
pkgs.gnumake pkgs.ruff pkgs.docker pkgs.docker-compose pkgs.python310
];
};
});
Expand Down

0 comments on commit 53eaa66

Please sign in to comment.