Skip to content

Commit

Permalink
Improve result workflow structure (#555)
Browse files Browse the repository at this point in the history
* Fix/update selection workflow metadata for visualization

* Fix/update result workflow metadata for visualization

* Fix/update StaticMechanical workflow metadata for visualization

* Fix/update ModalMechanical workflow metadata for visualization

* Fix/update TransientMechanical workflow metadata for visualization

* Fix/update HarmonicMechanical workflow metadata for visualization

* Add Simulation._append_norm()

* Refactor fluid_simulation.py to add FluidSimulation._get_result_workflow()

* Clean FluidSimulation._get_result_workflow

* Fix wf.add_operators for average_op
  • Loading branch information
PProfizi authored Nov 30, 2023
1 parent 033bb9a commit 451728f
Show file tree
Hide file tree
Showing 7 changed files with 169 additions and 137 deletions.
217 changes: 121 additions & 96 deletions src/ansys/dpf/post/fluid_simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,114 @@ def _filter_zones(self, zone_ids: List[int], keep: locations):
ref = set(self.face_zones.keys())
return [i for i in zone_ids if i in ref]

def _get_result_workflow(
self,
base_name: str,
location: str,
category: ResultCategory,
components: Union[str, List[str], int, List[int], None] = None,
norm: bool = False,
selection: Union[Selection, None] = None,
set_ids: Union[int, List[int], None] = None,
zone_ids: Union[List[int], None] = None,
phases: Union[List[Union[int, str]], None] = None,
species: Union[List[int], None] = None,
qualifiers: Union[dict, None] = None,
) -> (dpf.Workflow, Union[str, list[str], None], str):
"""Generate (without evaluating) the Workflow to extract results."""
comp, to_extract, columns = self._create_components(
base_name, category, components
)

# Initialize a workflow
wf, result_op = self._build_result_workflow(
name=base_name,
location=location,
force_elemental_nodal=False,
)
query_regions_meshes = False
lists = []
lists_labels = []
if qualifiers:
labels = list(qualifiers.keys())
lists_labels.extend(labels)
lists.extend([qualifiers[key] for key in labels])
if "zone" in labels:
query_regions_meshes = qualifiers["zone"]
else:
if set_ids:
lists.append(set_ids)
lists_labels.append("time")
if zone_ids:
lists.append(zone_ids)
lists_labels.append("zone")
query_regions_meshes = zone_ids
if phases:
phase_ids = []
available_phases = self.phases
for phase in phases:
phase_ids.append(available_phases[phase].id)
lists.append(phase_ids)
lists_labels.append("phase")
if species:
lists.append(species)
lists_labels.append("species")

if lists:
import itertools

for i, c in enumerate(itertools.product(*lists)):
label_space = {}
for j, label in enumerate(lists_labels):
label_space[label] = c[j]
result_op.connect(1000 + i, label_space)
# Its output is selected as future workflow output for now
# print(result_op)

if query_regions_meshes:
# Results have been queried on regions,
# A MeshesProvider is required to give meshes as input of the source operator
meshes_provider_op = self._model.operator("meshes_provider")
meshes_provider_op.connect(25, query_regions_meshes)
result_op.connect(7, meshes_provider_op.outputs.meshes)
wf.add_operator(meshes_provider_op)
else:
# Results have been queried on the whole mesh,
# A MeshProvider is required to give the mesh as input of the source operator
mesh_provider_op = self._model.operator("mesh_provider")
result_op.connect(7, mesh_provider_op.outputs.mesh)
wf.add_operator(mesh_provider_op)

out = result_op.outputs.fields_container
# Its inputs are selected as workflow inputs for merging with selection workflows
wf.set_input_name("time_scoping", result_op.inputs.time_scoping)
wf.set_input_name("mesh_scoping", result_op.inputs.mesh_scoping)

wf.connect_with(
selection.time_freq_selection._selection,
output_input_names=("scoping", "time_scoping"),
)
wf.connect_with(
selection.spatial_selection._selection,
output_input_names=("scoping", "mesh_scoping"),
)

# Connect data_sources and streams_container inputs of selection if necessary
if "streams" in wf.input_names:
wf.connect("streams", self._model.metadata.streams_provider)
if "data_sources" in wf.input_names:
wf.connect("data_sources", self._model.metadata.data_sources)

# Add an optional norm operation if requested
if norm:
wf, out, comp, base_name = self._append_norm(wf, out, base_name)

# Set the workflow output
wf.set_output_name("out", out)
wf.progress_bar = False

return wf, comp, base_name

def _get_result(
self,
base_name: str,
Expand Down Expand Up @@ -384,112 +492,29 @@ def _get_result(
location=location,
)

comp, to_extract, columns = self._create_components(
base_name, category, components
)

# Initialize a workflow
wf, result_op = self._build_result_workflow(
name=base_name,
wf, comp, base_name = self._get_result_workflow(
base_name=base_name,
location=location,
force_elemental_nodal=False,
)
query_regions_meshes = False
lists = []
lists_labels = []
if qualifiers:
labels = list(qualifiers.keys())
lists_labels.extend(labels)
lists.extend([qualifiers[key] for key in labels])
if "zone" in labels:
query_regions_meshes = qualifiers["zone"]
else:
if set_ids:
lists.append(set_ids)
lists_labels.append("time")
if zone_ids:
lists.append(zone_ids)
lists_labels.append("zone")
query_regions_meshes = zone_ids
if phases:
phase_ids = []
available_phases = self.phases
for phase in phases:
phase_ids.append(available_phases[phase].id)
lists.append(phase_ids)
lists_labels.append("phase")
if species:
lists.append(species)
lists_labels.append("species")

if lists:
import itertools

for i, c in enumerate(itertools.product(*lists)):
label_space = {}
for j, label in enumerate(lists_labels):
label_space[label] = c[j]
result_op.connect(1000 + i, label_space)
# Its output is selected as future workflow output for now
# print(result_op)

if query_regions_meshes:
# Results have been queried on regions,
# A MeshesProvider is required to give meshes as input of the source operator
meshes_provider_op = self._model.operator("meshes_provider")
meshes_provider_op.connect(25, query_regions_meshes)
result_op.connect(7, meshes_provider_op.outputs.meshes)
wf.add_operator(meshes_provider_op)
else:
# Results have been queried on the whole mesh,
# A MeshProvider is required to give the mesh as input of the source operator
mesh_provider_op = self._model.operator("mesh_provider")
result_op.connect(7, mesh_provider_op.outputs.mesh)
wf.add_operator(mesh_provider_op)

out = result_op.outputs.fields_container
# Its inputs are selected as workflow inputs for merging with selection workflows
wf.set_input_name("time_scoping", result_op.inputs.time_scoping)
wf.set_input_name("mesh_scoping", result_op.inputs.mesh_scoping)

wf.connect_with(
selection.time_freq_selection._selection,
output_input_names=("scoping", "time_scoping"),
)
wf.connect_with(
selection.spatial_selection._selection,
output_input_names=("scoping", "mesh_scoping"),
category=category,
components=components,
norm=norm,
selection=selection,
set_ids=set_ids,
zone_ids=zone_ids,
phases=phases,
species=species,
qualifiers=qualifiers,
)

# Connect data_sources and streams_container inputs of selection if necessary
if "streams" in wf.input_names:
wf.connect("streams", self._model.metadata.streams_provider)
if "data_sources" in wf.input_names:
wf.connect("data_sources", self._model.metadata.data_sources)

# Add an optional norm operation if requested
if norm:
norm_op = self._model.operator(name="norm_fc")
norm_op.connect(0, out)
wf.add_operator(operator=norm_op)
out = norm_op.outputs.fields_container

# if averaging_op_name:
# average_op = self._model.operator(name=averaging_op_name)
# average_op.connect(0, out)
# wf.add_operator(operator=average_op)
# out = average_op.outputs.fields_container

# Set the workflow output
wf.set_output_name("out", out)
wf.progress_bar = False
# Evaluate the workflow
fc = wf.get_output("out", dpf.types.fields_container)
# print(fc)
if location is None and len(fc) > 0:
location = fc[0].location
if location == locations.elemental:
location = "cells"

_, _, columns = self._create_components(base_name, category, components)
return self._create_dataframe(
fc, location, columns, comp, base_name.split("::")[-1], None
)
Expand Down
16 changes: 6 additions & 10 deletions src/ansys/dpf/post/harmonic_mechanical_simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ def _get_result_workflow(
"""Generate (without evaluating) the Workflow to extract results."""
comp, to_extract, _ = self._create_components(base_name, category, components)

# Initialize a workflow
wf = dpf.Workflow(server=self._model._server)

force_elemental_nodal = self._requires_manual_averaging(
base_name=base_name,
location=location,
Expand Down Expand Up @@ -71,6 +68,7 @@ def _get_result_workflow(
if selection.requires_mesh:
# wf.set_input_name(_WfNames.mesh, result_op.inputs.mesh)
mesh_wf = dpf.Workflow(server=self._model._server)
mesh_wf.add_operator(self._model.metadata.mesh_provider)
mesh_wf.set_output_name(
_WfNames.initial_mesh, self._model.metadata.mesh_provider
)
Expand Down Expand Up @@ -109,6 +107,7 @@ def _get_result_workflow(
if average_op is not None:
average_op[0].connect(0, out)
principal_op.connect(0, average_op[1])
wf.add_operators(list(average_op))
# Set as future output of the workflow
average_op = None
else:
Expand All @@ -133,12 +132,13 @@ def _get_result_workflow(
):
equivalent_op.connect(0, out)
average_op[0].connect(0, equivalent_op)
wf.add_operator(operator=average_op[1])
wf.add_operators(list(average_op))
# Set as future output of the workflow
out = average_op[1].outputs.fields_container
elif average_op is not None:
average_op[0].connect(0, out)
equivalent_op.connect(0, average_op[1])
wf.add_operators(list(average_op))
# Set as future output of the workflow
out = equivalent_op.outputs.fields_container
else:
Expand All @@ -149,6 +149,7 @@ def _get_result_workflow(

if average_op is not None:
average_op[0].connect(0, out)
wf.add_operators(list(average_op))
out = average_op[1].outputs.fields_container

# Add an optional component selection step if result is vector, or matrix
Expand Down Expand Up @@ -196,12 +197,7 @@ def _get_result_workflow(
# Add an optional norm operation if requested
# (must be after sweeping_phase for U)
if norm:
norm_op = self._model.operator(name="norm_fc")
norm_op.connect(0, out)
wf.add_operator(operator=norm_op)
out = norm_op.outputs.fields_container
comp = None
base_name += "_N"
wf, out, comp, base_name = self._append_norm(wf, out, base_name)

# Set the workflow output
wf.set_output_name("out", out)
Expand Down
13 changes: 6 additions & 7 deletions src/ansys/dpf/post/modal_mechanical_simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def _get_result_workflow(
)
if selection.requires_mesh:
mesh_wf = dpf.Workflow(server=self._model._server)
mesh_wf.add_operator(self._model.metadata.mesh_provider)
mesh_wf.set_output_name(
_WfNames.initial_mesh, self._model.metadata.mesh_provider
)
Expand Down Expand Up @@ -94,6 +95,7 @@ def _get_result_workflow(
if average_op is not None:
average_op[0].connect(0, out)
principal_op.connect(0, average_op[1])
wf.add_operators(list(average_op))
# Set as future output of the workflow
average_op = None
else:
Expand All @@ -118,12 +120,13 @@ def _get_result_workflow(
):
equivalent_op.connect(0, out)
average_op[0].connect(0, equivalent_op)
wf.add_operator(operator=average_op[1])
wf.add_operators(list(average_op))
# Set as future output of the workflow
out = average_op[1].outputs.fields_container
elif average_op is not None:
average_op[0].connect(0, out)
equivalent_op.connect(0, average_op[1])
wf.add_operators(list(average_op))
# Set as future output of the workflow
out = equivalent_op.outputs.fields_container
else:
Expand All @@ -134,6 +137,7 @@ def _get_result_workflow(

if average_op is not None:
average_op[0].connect(0, out)
wf.add_operators(list(average_op))
out = average_op[1].outputs.fields_container

# Add an optional component selection step if result is vector, matrix, or principal
Expand All @@ -155,12 +159,7 @@ def _get_result_workflow(

# Add an optional norm operation if requested
if norm:
norm_op = self._model.operator(name="norm_fc")
norm_op.connect(0, out)
wf.add_operator(operator=norm_op)
out = norm_op.outputs.fields_container
comp = None
base_name += "_N"
wf, out, comp, base_name = self._append_norm(wf, out, base_name)

# Set the workflow output
wf.set_output_name("out", out)
Expand Down
Loading

0 comments on commit 451728f

Please sign in to comment.