Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve result workflow structure #555

Merged
merged 10 commits into from
Nov 30, 2023
217 changes: 121 additions & 96 deletions src/ansys/dpf/post/fluid_simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,114 @@
ref = set(self.face_zones.keys())
return [i for i in zone_ids if i in ref]

def _get_result_workflow(
self,
base_name: str,
location: str,
category: ResultCategory,
components: Union[str, List[str], int, List[int], None] = None,
norm: bool = False,
selection: Union[Selection, None] = None,
set_ids: Union[int, List[int], None] = None,
zone_ids: Union[List[int], None] = None,
phases: Union[List[Union[int, str]], None] = None,
species: Union[List[int], None] = None,
qualifiers: Union[dict, None] = None,
) -> (dpf.Workflow, Union[str, list[str], None], str):
"""Generate (without evaluating) the Workflow to extract results."""
comp, to_extract, columns = self._create_components(
base_name, category, components
)

# Initialize a workflow
wf, result_op = self._build_result_workflow(
name=base_name,
location=location,
force_elemental_nodal=False,
)
query_regions_meshes = False
lists = []
lists_labels = []
if qualifiers:
labels = list(qualifiers.keys())
lists_labels.extend(labels)
lists.extend([qualifiers[key] for key in labels])
if "zone" in labels:
query_regions_meshes = qualifiers["zone"]
else:
if set_ids:
lists.append(set_ids)
lists_labels.append("time")

Check warning on line 245 in src/ansys/dpf/post/fluid_simulation.py

View check run for this annotation

Codecov / codecov/patch

src/ansys/dpf/post/fluid_simulation.py#L244-L245

Added lines #L244 - L245 were not covered by tests
if zone_ids:
lists.append(zone_ids)
lists_labels.append("zone")
query_regions_meshes = zone_ids
if phases:
phase_ids = []
available_phases = self.phases
for phase in phases:
phase_ids.append(available_phases[phase].id)
lists.append(phase_ids)
lists_labels.append("phase")
if species:
lists.append(species)
lists_labels.append("species")

Check warning on line 259 in src/ansys/dpf/post/fluid_simulation.py

View check run for this annotation

Codecov / codecov/patch

src/ansys/dpf/post/fluid_simulation.py#L258-L259

Added lines #L258 - L259 were not covered by tests

if lists:
import itertools

for i, c in enumerate(itertools.product(*lists)):
label_space = {}
for j, label in enumerate(lists_labels):
label_space[label] = c[j]
result_op.connect(1000 + i, label_space)
# Its output is selected as future workflow output for now
# print(result_op)

if query_regions_meshes:
# Results have been queried on regions,
# A MeshesProvider is required to give meshes as input of the source operator
meshes_provider_op = self._model.operator("meshes_provider")
meshes_provider_op.connect(25, query_regions_meshes)
result_op.connect(7, meshes_provider_op.outputs.meshes)
wf.add_operator(meshes_provider_op)
else:
# Results have been queried on the whole mesh,
# A MeshProvider is required to give the mesh as input of the source operator
mesh_provider_op = self._model.operator("mesh_provider")
result_op.connect(7, mesh_provider_op.outputs.mesh)
wf.add_operator(mesh_provider_op)

out = result_op.outputs.fields_container
# Its inputs are selected as workflow inputs for merging with selection workflows
wf.set_input_name("time_scoping", result_op.inputs.time_scoping)
wf.set_input_name("mesh_scoping", result_op.inputs.mesh_scoping)

wf.connect_with(
selection.time_freq_selection._selection,
output_input_names=("scoping", "time_scoping"),
)
wf.connect_with(
selection.spatial_selection._selection,
output_input_names=("scoping", "mesh_scoping"),
)

# Connect data_sources and streams_container inputs of selection if necessary
if "streams" in wf.input_names:
wf.connect("streams", self._model.metadata.streams_provider)

Check warning on line 302 in src/ansys/dpf/post/fluid_simulation.py

View check run for this annotation

Codecov / codecov/patch

src/ansys/dpf/post/fluid_simulation.py#L302

Added line #L302 was not covered by tests
if "data_sources" in wf.input_names:
wf.connect("data_sources", self._model.metadata.data_sources)

Check warning on line 304 in src/ansys/dpf/post/fluid_simulation.py

View check run for this annotation

Codecov / codecov/patch

src/ansys/dpf/post/fluid_simulation.py#L304

Added line #L304 was not covered by tests

# Add an optional norm operation if requested
if norm:
wf, out, comp, base_name = self._append_norm(wf, out, base_name)

Check warning on line 308 in src/ansys/dpf/post/fluid_simulation.py

View check run for this annotation

Codecov / codecov/patch

src/ansys/dpf/post/fluid_simulation.py#L308

Added line #L308 was not covered by tests

# Set the workflow output
wf.set_output_name("out", out)
wf.progress_bar = False

return wf, comp, base_name

def _get_result(
self,
base_name: str,
Expand Down Expand Up @@ -384,112 +492,29 @@
location=location,
)

comp, to_extract, columns = self._create_components(
base_name, category, components
)

# Initialize a workflow
wf, result_op = self._build_result_workflow(
name=base_name,
wf, comp, base_name = self._get_result_workflow(
base_name=base_name,
location=location,
force_elemental_nodal=False,
)
query_regions_meshes = False
lists = []
lists_labels = []
if qualifiers:
labels = list(qualifiers.keys())
lists_labels.extend(labels)
lists.extend([qualifiers[key] for key in labels])
if "zone" in labels:
query_regions_meshes = qualifiers["zone"]
else:
if set_ids:
lists.append(set_ids)
lists_labels.append("time")
if zone_ids:
lists.append(zone_ids)
lists_labels.append("zone")
query_regions_meshes = zone_ids
if phases:
phase_ids = []
available_phases = self.phases
for phase in phases:
phase_ids.append(available_phases[phase].id)
lists.append(phase_ids)
lists_labels.append("phase")
if species:
lists.append(species)
lists_labels.append("species")

if lists:
import itertools

for i, c in enumerate(itertools.product(*lists)):
label_space = {}
for j, label in enumerate(lists_labels):
label_space[label] = c[j]
result_op.connect(1000 + i, label_space)
# Its output is selected as future workflow output for now
# print(result_op)

if query_regions_meshes:
# Results have been queried on regions,
# A MeshesProvider is required to give meshes as input of the source operator
meshes_provider_op = self._model.operator("meshes_provider")
meshes_provider_op.connect(25, query_regions_meshes)
result_op.connect(7, meshes_provider_op.outputs.meshes)
wf.add_operator(meshes_provider_op)
else:
# Results have been queried on the whole mesh,
# A MeshProvider is required to give the mesh as input of the source operator
mesh_provider_op = self._model.operator("mesh_provider")
result_op.connect(7, mesh_provider_op.outputs.mesh)
wf.add_operator(mesh_provider_op)

out = result_op.outputs.fields_container
# Its inputs are selected as workflow inputs for merging with selection workflows
wf.set_input_name("time_scoping", result_op.inputs.time_scoping)
wf.set_input_name("mesh_scoping", result_op.inputs.mesh_scoping)

wf.connect_with(
selection.time_freq_selection._selection,
output_input_names=("scoping", "time_scoping"),
)
wf.connect_with(
selection.spatial_selection._selection,
output_input_names=("scoping", "mesh_scoping"),
category=category,
components=components,
norm=norm,
selection=selection,
set_ids=set_ids,
zone_ids=zone_ids,
phases=phases,
species=species,
qualifiers=qualifiers,
)

# Connect data_sources and streams_container inputs of selection if necessary
if "streams" in wf.input_names:
wf.connect("streams", self._model.metadata.streams_provider)
if "data_sources" in wf.input_names:
wf.connect("data_sources", self._model.metadata.data_sources)

# Add an optional norm operation if requested
if norm:
norm_op = self._model.operator(name="norm_fc")
norm_op.connect(0, out)
wf.add_operator(operator=norm_op)
out = norm_op.outputs.fields_container

# if averaging_op_name:
# average_op = self._model.operator(name=averaging_op_name)
# average_op.connect(0, out)
# wf.add_operator(operator=average_op)
# out = average_op.outputs.fields_container

# Set the workflow output
wf.set_output_name("out", out)
wf.progress_bar = False
# Evaluate the workflow
fc = wf.get_output("out", dpf.types.fields_container)
# print(fc)
if location is None and len(fc) > 0:
location = fc[0].location
if location == locations.elemental:
location = "cells"

_, _, columns = self._create_components(base_name, category, components)
return self._create_dataframe(
fc, location, columns, comp, base_name.split("::")[-1], None
)
Expand Down
16 changes: 6 additions & 10 deletions src/ansys/dpf/post/harmonic_mechanical_simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@
"""Generate (without evaluating) the Workflow to extract results."""
comp, to_extract, _ = self._create_components(base_name, category, components)

# Initialize a workflow
wf = dpf.Workflow(server=self._model._server)

force_elemental_nodal = self._requires_manual_averaging(
base_name=base_name,
location=location,
Expand Down Expand Up @@ -71,6 +68,7 @@
if selection.requires_mesh:
# wf.set_input_name(_WfNames.mesh, result_op.inputs.mesh)
mesh_wf = dpf.Workflow(server=self._model._server)
mesh_wf.add_operator(self._model.metadata.mesh_provider)
mesh_wf.set_output_name(
_WfNames.initial_mesh, self._model.metadata.mesh_provider
)
Expand Down Expand Up @@ -109,6 +107,7 @@
if average_op is not None:
average_op[0].connect(0, out)
principal_op.connect(0, average_op[1])
wf.add_operators(list(average_op))
# Set as future output of the workflow
average_op = None
else:
Expand All @@ -133,12 +132,13 @@
):
equivalent_op.connect(0, out)
average_op[0].connect(0, equivalent_op)
wf.add_operator(operator=average_op[1])
wf.add_operators(list(average_op))
# Set as future output of the workflow
out = average_op[1].outputs.fields_container
elif average_op is not None:
average_op[0].connect(0, out)
equivalent_op.connect(0, average_op[1])
wf.add_operators(list(average_op))
# Set as future output of the workflow
out = equivalent_op.outputs.fields_container
else:
Expand All @@ -149,6 +149,7 @@

if average_op is not None:
average_op[0].connect(0, out)
wf.add_operators(list(average_op))
out = average_op[1].outputs.fields_container

# Add an optional component selection step if result is vector, or matrix
Expand Down Expand Up @@ -196,12 +197,7 @@
# Add an optional norm operation if requested
# (must be after sweeping_phase for U)
if norm:
norm_op = self._model.operator(name="norm_fc")
norm_op.connect(0, out)
wf.add_operator(operator=norm_op)
out = norm_op.outputs.fields_container
comp = None
base_name += "_N"
wf, out, comp, base_name = self._append_norm(wf, out, base_name)

Check warning on line 200 in src/ansys/dpf/post/harmonic_mechanical_simulation.py

View check run for this annotation

Codecov / codecov/patch

src/ansys/dpf/post/harmonic_mechanical_simulation.py#L200

Added line #L200 was not covered by tests

# Set the workflow output
wf.set_output_name("out", out)
Expand Down
13 changes: 6 additions & 7 deletions src/ansys/dpf/post/modal_mechanical_simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
)
if selection.requires_mesh:
mesh_wf = dpf.Workflow(server=self._model._server)
mesh_wf.add_operator(self._model.metadata.mesh_provider)
mesh_wf.set_output_name(
_WfNames.initial_mesh, self._model.metadata.mesh_provider
)
Expand Down Expand Up @@ -94,6 +95,7 @@
if average_op is not None:
average_op[0].connect(0, out)
principal_op.connect(0, average_op[1])
wf.add_operators(list(average_op))
# Set as future output of the workflow
average_op = None
else:
Expand All @@ -118,12 +120,13 @@
):
equivalent_op.connect(0, out)
average_op[0].connect(0, equivalent_op)
wf.add_operator(operator=average_op[1])
wf.add_operators(list(average_op))
# Set as future output of the workflow
out = average_op[1].outputs.fields_container
elif average_op is not None:
average_op[0].connect(0, out)
equivalent_op.connect(0, average_op[1])
wf.add_operators(list(average_op))
# Set as future output of the workflow
out = equivalent_op.outputs.fields_container
else:
Expand All @@ -134,6 +137,7 @@

if average_op is not None:
average_op[0].connect(0, out)
wf.add_operators(list(average_op))
out = average_op[1].outputs.fields_container

# Add an optional component selection step if result is vector, matrix, or principal
Expand All @@ -155,12 +159,7 @@

# Add an optional norm operation if requested
if norm:
norm_op = self._model.operator(name="norm_fc")
norm_op.connect(0, out)
wf.add_operator(operator=norm_op)
out = norm_op.outputs.fields_container
comp = None
base_name += "_N"
wf, out, comp, base_name = self._append_norm(wf, out, base_name)

Check warning on line 162 in src/ansys/dpf/post/modal_mechanical_simulation.py

View check run for this annotation

Codecov / codecov/patch

src/ansys/dpf/post/modal_mechanical_simulation.py#L162

Added line #L162 was not covered by tests

# Set the workflow output
wf.set_output_name("out", out)
Expand Down
Loading