From 439a74c659de3997cb73e6e5aec8663c05f32c9a Mon Sep 17 00:00:00 2001 From: Mario Ostieri Date: Fri, 6 Dec 2024 09:35:40 +0000 Subject: [PATCH] first working export --- src/ansys/pyensight/core/dvs.py | 104 ++++++++++++++++++++++---------- 1 file changed, 72 insertions(+), 32 deletions(-) diff --git a/src/ansys/pyensight/core/dvs.py b/src/ansys/pyensight/core/dvs.py index 26ec059aae2..40787ec4917 100644 --- a/src/ansys/pyensight/core/dvs.py +++ b/src/ansys/pyensight/core/dvs.py @@ -8,10 +8,12 @@ import platform import re import sys +import time import traceback import tempfile from math import floor from ansys.pyensight.core.common import find_unused_ports +import threading if TYPE_CHECKING: from ansys.pyensight.core import Session @@ -113,7 +115,10 @@ def start_dvs_servers(self, num_servers: int, transport: int= 0, ranks_per_serve try: for n in range(0, num_servers): # Assume ranks equally distributed - local_uri = uri + f":{ports[n]}" + if grpc: + local_uri = uri + f":{ports[n]}" + else: + local_uri = uri server_id = self.server_create(uri=local_uri) self.server_start(server_id, server_num=n, local_ranks=ranks_per_server, options=options) self._server_ids.append(server_id) @@ -132,6 +137,15 @@ def start_dvs_servers(self, num_servers: int, transport: int= 0, ranks_per_serve } ) self._total_ranks = ranks_per_server * len(self._server_ids) + started = False + start = time.time() + while not started and time.time() - start < 60: + if not all([self.server_started(s) for s in self._server_ids]): + time.sleep(0.5) + else: + started = True + if not started: + raise RuntimeError("The DVS servers have not started in 60 seconds.") except Exception as e: traceback.print_exc() raise RuntimeError(f"Couldn't start the servers, error: {e}") @@ -141,10 +155,11 @@ def _start_dvs_client(self, server_id: int, rank: int, secret_key="", block_for_ raise RuntimeError(f"Server ID {server_id} not started in this process.") flags = self.FLAGS_NONE if block_for_servers: - flags &= self.FLAGS_BLOCK_FOR_SERVER + flags = self.FLAGS_BLOCK_FOR_SERVER if dedup: - flags &= self.FLAGS_DEDUP + flags |= self.FLAGS_DEDUP try: + client_id = self.connect(server_id=server_id, secret=secret_key, flags=flags) except Exception as e: traceback.print_exc() @@ -152,7 +167,8 @@ def _start_dvs_client(self, server_id: int, rank: int, secret_key="", block_for_ self._clients[self._client_count] = { "client_id": client_id, "server_id": server_id, - "rank": rank + "rank": rank, + "update_started": False } self._client_count += 1 @@ -176,11 +192,12 @@ def start_servers_in_ensight(self, secret_key= ""): "in_ensight": True } - def start_sending_dataset(self, dataset_name: str, secret_key: str = "", block_for_servers=False, dedup=False): + def start_sending_dataset(self, dataset_name: str, secret_key: str = "", dedup=False): self._dataset_name = dataset_name rank_per_server = list(self._servers.values())[0].get("ranks") local_ranks = 0 n = 0 + block_for_servers = True if self._total_ranks > 1 else False for rank in range(0, self._total_ranks): server = self._servers[n] local_ranks += 1 @@ -189,12 +206,20 @@ def start_sending_dataset(self, dataset_name: str, secret_key: str = "", block_f n += 1 self._start_dvs_client(server["server_id"], rank, secret_key=secret_key, block_for_servers=block_for_servers, dedup=dedup) + def _begin_update(self, client_dict, update_num, time, rank, chunk): + try: + self.begin_update(client_dict["client_id"], update_num, time, rank, chunk) + client_dict["update_started"] = True + except Exception as e: + traceback.print_exc() + raise RuntimeError(f"Couldn't begin update. Error: {e}") + def begin_updates(self, time): update_num = self._update_num self._current_update = update_num for _, client_vals in self._clients.items(): - self.begin_update(client_vals["client_id"], update_num, time, client_vals["rank"], 0) - self._update_num += 1 + thread = threading.Thread(target=self._begin_update, args=(client_vals, update_num, time, client_vals["rank"], 0)) + thread.start() def create_part(self, part_id: int, part_name: int, metadata: Optional[Dict[str, str]]=None): if not metadata: @@ -211,28 +236,49 @@ def create_part(self, part_id: int, part_name: int, metadata: Optional[Dict[str, } for c in range(self._client_count): client = self._clients[c] - self.begin_init(self._clients[self._client_count-1]["client_id"], dataset_name=f"Simba_{self._dataset_name}_{part_name}", rank=client["rank"], total_ranks=self._total_ranks, num_chunks=1) + val = self.begin_init(client["client_id"], dataset_name=f"Simba_{self._dataset_name}_{part_name}", rank=client["rank"], total_ranks=self._total_ranks, num_chunks=1) self.add_part_info(client["client_id"], [part]) for c in range(self._client_count): client = self._clients[c] - self.end_init(client["client_id"]) + val = self.end_init(client["client_id"]) + self.begin_updates(0.0) self._parts[part_id] = part + def _check_updates_started(self): + started = False + start = time.time() + while not started and time.time() - start < 60: + started = all([vals["update_started"] for c,vals in self._clients.items()]) + time.sleep(0.5) + if not started: + raise RuntimeError("Not all clients have begun the updates.") def send_vertices(self, part_id, vertices): if not self._parts.get(part_id): raise RuntimeError("Please create the part first via create_part() or the lower level add_part_info.") - if not self._parts[part_id].get("vert_indices"): - raise RuntimeError(f"Please send first the faces for part {part_id}") if not isinstance(vertices, numpy.ndarray): vertices = numpy.ndarray(vertices) reshaped_vertices = vertices.reshape(-1, 3) x_coords = reshaped_vertices[:, 0] y_coords = reshaped_vertices[:, 1] z_coords = reshaped_vertices[:, 2] - vert_indices = self._parts[part_id]["vert_indices"] - self.update_nodes(client["client_id"], part_id=part_id,x=x_coords[vert_indices], y=y_coords[vert_indices], z=z_coords[vert_indices]) - del self._parts[part_id]["vert_indices"] + self._check_updates_started() + for c in range(self._client_count): + client = self._clients[c] + self.update_nodes(client["client_id"], part_id=part_id,x=x_coords, y=y_coords, z=z_coords) + + @staticmethod + def _split_list(lst, num_parts): + n = len(lst) + part_size = n // num_parts + remainder = n % num_parts + parts = [] + start = 0 + for i in range(num_parts): + end = start + part_size + (1 if i < remainder else 0) + parts.append(lst[start:end]) + start = end + return parts def send_faces(self, part_id, faces: Union[List, numpy.ndarray], ghost=False): if not self._clients: @@ -244,15 +290,16 @@ def send_faces(self, part_id, faces: Union[List, numpy.ndarray], ghost=False): i = 0 vertices_per_face = [] connectivity_1d = [] + indices = [] while i < len(faces): - num_vertices = faces[i] - vertices_per_face.append(num_vertices) - connectivity_1d.extend(faces[i+1:i+1+num_vertices]) - i += num_vertices + 1 + indices.append(i) + i += faces[i] + 1 + mask = numpy.zeros(faces.shape, dtype=bool) + mask[indices] = True + vertices_per_face = numpy.array(faces[mask]) + connectivity_1d = numpy.array(faces[~mask]) connectivity_split = numpy.split(connectivity_1d, numpy.cumsum(vertices_per_face[:-1])) all_same = numpy.all(numpy.array(vertices_per_face) == vertices_per_face[0]) - num_split = floor(len(connectivity_split) / self._total_ranks) - additional = len(connectivity_split) % self._total_ranks elem_type = self.ELEMTYPE_N_SIDED_POLYGON if all_same: num_vertices = vertices_per_face[0] @@ -261,18 +308,17 @@ def send_faces(self, part_id, faces: Union[List, numpy.ndarray], ghost=False): elem_type = _elem_type if ghost: elem_type += 1 + self._check_updates_started() + split_arrays = self._split_list(connectivity_split, self._total_ranks) + split_num_faces = self._split_list(vertices_per_face, self._total_ranks) for c in range(self._client_count): client = self._clients[c] - arrays = connectivity_split[c:c+num_split-1] - if additional and c==self._client_count-1: - arrays.extend(connectivity_split[-1*additional:]) + arrays = split_arrays[c] indices = numpy.concatenate(arrays) - vert_indices = numpy.unique(indices) - self._parts[part_id]["vert_indices"] = vert_indices if elem_type not in [self.ELEMTYPE_N_SIDED_POLYGON, self.ELEMTYPE_N_SIDED_POLYGON_GHOST]: self.update_elements(client["client_id"], part_id=part_id, elem_type=elem_type, indices=indices) else: - connectivity_num_faces = vertices_per_face[c:c+num_split-1] + connectivity_num_faces = split_num_faces[c] self.update_elements_polygon(client["client_id"], part_id=part_id, elem_type=elem_type, nodes_per_polygon=numpy.array(connectivity_num_faces), indices=indices) def load_dataset_in_ensight(self): @@ -294,10 +340,4 @@ def send_done(self): - - - - - - \ No newline at end of file