Skip to content

Commit

Permalink
first working export
Browse files Browse the repository at this point in the history
  • Loading branch information
mariostieriansys committed Dec 6, 2024
1 parent bcb9f3d commit 439a74c
Showing 1 changed file with 72 additions and 32 deletions.
104 changes: 72 additions & 32 deletions src/ansys/pyensight/core/dvs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
import platform
import re
import sys
import time
import traceback
import tempfile
from math import floor
from ansys.pyensight.core.common import find_unused_ports
import threading

if TYPE_CHECKING:
from ansys.pyensight.core import Session
Expand Down Expand Up @@ -113,7 +115,10 @@ def start_dvs_servers(self, num_servers: int, transport: int= 0, ranks_per_serve
try:
for n in range(0, num_servers):
# Assume ranks equally distributed
local_uri = uri + f":{ports[n]}"
if grpc:
local_uri = uri + f":{ports[n]}"
else:
local_uri = uri
server_id = self.server_create(uri=local_uri)
self.server_start(server_id, server_num=n, local_ranks=ranks_per_server, options=options)
self._server_ids.append(server_id)
Expand All @@ -132,6 +137,15 @@ def start_dvs_servers(self, num_servers: int, transport: int= 0, ranks_per_serve
}
)
self._total_ranks = ranks_per_server * len(self._server_ids)
started = False
start = time.time()
while not started and time.time() - start < 60:
if not all([self.server_started(s) for s in self._server_ids]):
time.sleep(0.5)
else:
started = True
if not started:
raise RuntimeError("The DVS servers have not started in 60 seconds.")
except Exception as e:
traceback.print_exc()
raise RuntimeError(f"Couldn't start the servers, error: {e}")
Expand All @@ -141,18 +155,20 @@ def _start_dvs_client(self, server_id: int, rank: int, secret_key="", block_for_
raise RuntimeError(f"Server ID {server_id} not started in this process.")
flags = self.FLAGS_NONE
if block_for_servers:
flags &= self.FLAGS_BLOCK_FOR_SERVER
flags = self.FLAGS_BLOCK_FOR_SERVER
if dedup:
flags &= self.FLAGS_DEDUP
flags |= self.FLAGS_DEDUP
try:

client_id = self.connect(server_id=server_id, secret=secret_key, flags=flags)
except Exception as e:
traceback.print_exc()
raise RuntimeError(f"Couldn't start the client, error {e}")
self._clients[self._client_count] = {
"client_id": client_id,
"server_id": server_id,
"rank": rank
"rank": rank,
"update_started": False
}
self._client_count += 1

Expand All @@ -176,11 +192,12 @@ def start_servers_in_ensight(self, secret_key= ""):
"in_ensight": True
}

def start_sending_dataset(self, dataset_name: str, secret_key: str = "", block_for_servers=False, dedup=False):
def start_sending_dataset(self, dataset_name: str, secret_key: str = "", dedup=False):
self._dataset_name = dataset_name
rank_per_server = list(self._servers.values())[0].get("ranks")
local_ranks = 0
n = 0
block_for_servers = True if self._total_ranks > 1 else False
for rank in range(0, self._total_ranks):
server = self._servers[n]
local_ranks += 1
Expand All @@ -189,12 +206,20 @@ def start_sending_dataset(self, dataset_name: str, secret_key: str = "", block_f
n += 1
self._start_dvs_client(server["server_id"], rank, secret_key=secret_key, block_for_servers=block_for_servers, dedup=dedup)

def _begin_update(self, client_dict, update_num, time, rank, chunk):
try:
self.begin_update(client_dict["client_id"], update_num, time, rank, chunk)
client_dict["update_started"] = True
except Exception as e:
traceback.print_exc()
raise RuntimeError(f"Couldn't begin update. Error: {e}")

def begin_updates(self, time):
update_num = self._update_num
self._current_update = update_num
for _, client_vals in self._clients.items():
self.begin_update(client_vals["client_id"], update_num, time, client_vals["rank"], 0)
self._update_num += 1
thread = threading.Thread(target=self._begin_update, args=(client_vals, update_num, time, client_vals["rank"], 0))
thread.start()

def create_part(self, part_id: int, part_name: int, metadata: Optional[Dict[str, str]]=None):
if not metadata:
Expand All @@ -211,28 +236,49 @@ def create_part(self, part_id: int, part_name: int, metadata: Optional[Dict[str,
}
for c in range(self._client_count):
client = self._clients[c]
self.begin_init(self._clients[self._client_count-1]["client_id"], dataset_name=f"Simba_{self._dataset_name}_{part_name}", rank=client["rank"], total_ranks=self._total_ranks, num_chunks=1)
val = self.begin_init(client["client_id"], dataset_name=f"Simba_{self._dataset_name}_{part_name}", rank=client["rank"], total_ranks=self._total_ranks, num_chunks=1)
self.add_part_info(client["client_id"], [part])
for c in range(self._client_count):
client = self._clients[c]
self.end_init(client["client_id"])
val = self.end_init(client["client_id"])
self.begin_updates(0.0)
self._parts[part_id] = part

def _check_updates_started(self):
started = False
start = time.time()
while not started and time.time() - start < 60:
started = all([vals["update_started"] for c,vals in self._clients.items()])
time.sleep(0.5)
if not started:
raise RuntimeError("Not all clients have begun the updates.")

def send_vertices(self, part_id, vertices):
if not self._parts.get(part_id):
raise RuntimeError("Please create the part first via create_part() or the lower level add_part_info.")
if not self._parts[part_id].get("vert_indices"):
raise RuntimeError(f"Please send first the faces for part {part_id}")
if not isinstance(vertices, numpy.ndarray):
vertices = numpy.ndarray(vertices)
reshaped_vertices = vertices.reshape(-1, 3)
x_coords = reshaped_vertices[:, 0]
y_coords = reshaped_vertices[:, 1]
z_coords = reshaped_vertices[:, 2]
vert_indices = self._parts[part_id]["vert_indices"]
self.update_nodes(client["client_id"], part_id=part_id,x=x_coords[vert_indices], y=y_coords[vert_indices], z=z_coords[vert_indices])
del self._parts[part_id]["vert_indices"]
self._check_updates_started()
for c in range(self._client_count):
client = self._clients[c]
self.update_nodes(client["client_id"], part_id=part_id,x=x_coords, y=y_coords, z=z_coords)

@staticmethod
def _split_list(lst, num_parts):
n = len(lst)
part_size = n // num_parts
remainder = n % num_parts
parts = []
start = 0
for i in range(num_parts):
end = start + part_size + (1 if i < remainder else 0)
parts.append(lst[start:end])
start = end
return parts

def send_faces(self, part_id, faces: Union[List, numpy.ndarray], ghost=False):
if not self._clients:
Expand All @@ -244,15 +290,16 @@ def send_faces(self, part_id, faces: Union[List, numpy.ndarray], ghost=False):
i = 0
vertices_per_face = []
connectivity_1d = []
indices = []
while i < len(faces):
num_vertices = faces[i]
vertices_per_face.append(num_vertices)
connectivity_1d.extend(faces[i+1:i+1+num_vertices])
i += num_vertices + 1
indices.append(i)
i += faces[i] + 1
mask = numpy.zeros(faces.shape, dtype=bool)
mask[indices] = True
vertices_per_face = numpy.array(faces[mask])
connectivity_1d = numpy.array(faces[~mask])
connectivity_split = numpy.split(connectivity_1d, numpy.cumsum(vertices_per_face[:-1]))
all_same = numpy.all(numpy.array(vertices_per_face) == vertices_per_face[0])
num_split = floor(len(connectivity_split) / self._total_ranks)
additional = len(connectivity_split) % self._total_ranks
elem_type = self.ELEMTYPE_N_SIDED_POLYGON
if all_same:
num_vertices = vertices_per_face[0]
Expand All @@ -261,18 +308,17 @@ def send_faces(self, part_id, faces: Union[List, numpy.ndarray], ghost=False):
elem_type = _elem_type
if ghost:
elem_type += 1
self._check_updates_started()
split_arrays = self._split_list(connectivity_split, self._total_ranks)
split_num_faces = self._split_list(vertices_per_face, self._total_ranks)
for c in range(self._client_count):
client = self._clients[c]
arrays = connectivity_split[c:c+num_split-1]
if additional and c==self._client_count-1:
arrays.extend(connectivity_split[-1*additional:])
arrays = split_arrays[c]
indices = numpy.concatenate(arrays)
vert_indices = numpy.unique(indices)
self._parts[part_id]["vert_indices"] = vert_indices
if elem_type not in [self.ELEMTYPE_N_SIDED_POLYGON, self.ELEMTYPE_N_SIDED_POLYGON_GHOST]:
self.update_elements(client["client_id"], part_id=part_id, elem_type=elem_type, indices=indices)
else:
connectivity_num_faces = vertices_per_face[c:c+num_split-1]
connectivity_num_faces = split_num_faces[c]
self.update_elements_polygon(client["client_id"], part_id=part_id, elem_type=elem_type, nodes_per_polygon=numpy.array(connectivity_num_faces), indices=indices)

def load_dataset_in_ensight(self):
Expand All @@ -294,10 +340,4 @@ def send_done(self):










0 comments on commit 439a74c

Please sign in to comment.