Skip to content

Commit

Permalink
Merge branch '996-rendre-optionnel-le-type-de-mode-multiprocessing' i…
Browse files Browse the repository at this point in the history
…nto 'master'

Resolve "rendre optionnel le type de mode multiprocessing"

Closes #996

See merge request 3d/cars-park/cars!811
  • Loading branch information
dyoussef committed Feb 25, 2025
2 parents 497c82b + 962ab8d commit 504f073
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 32 deletions.
17 changes: 13 additions & 4 deletions cars/orchestrator/cluster/mp_cluster/multiprocessing_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ def __init__(self, conf_cluster, out_dir, launch_worker=True):

# retrieve parameters
self.nb_workers = self.checked_conf_cluster["nb_workers"]
self.mp_mode = self.checked_conf_cluster["mp_mode"]
self.task_timeout = self.checked_conf_cluster["task_timeout"]
self.max_tasks_per_worker = self.checked_conf_cluster[
"max_tasks_per_worker"
Expand All @@ -111,8 +112,14 @@ def __init__(self, conf_cluster, out_dir, launch_worker=True):
self.profiling = self.checked_conf_cluster["profiling"]
self.factorize_tasks = self.checked_conf_cluster["factorize_tasks"]
# Set multiprocessing mode
# forkserver is used, to allow OMP to be used in numba
mp_mode = "spawn" if IS_WIN else "forkserver"
self.mp_mode = self.checked_conf_cluster["mp_mode"]

if IS_WIN:
self.mp_mode = "spawn"
logging.warning(
"{} is not functionnal in windows,"
"spawn will be used instead".format(self.mp_mode)
)

self.launch_worker = launch_worker

Expand All @@ -139,7 +146,7 @@ def __init__(self, conf_cluster, out_dir, launch_worker=True):
self.wrapper = mp_wrapper.WrapperNone(None)

# Create pool
ctx_in_main = mp.get_context(mp_mode)
ctx_in_main = mp.get_context(self.mp_mode)
# import cars for env variables firts
# import cars pipelines for numba compilation
ctx_in_main.set_forkserver_preload(["cars", "cars.pipelines"])
Expand Down Expand Up @@ -206,6 +213,7 @@ def check_conf(self, conf):

# Overload conf
overloaded_conf["mode"] = conf.get("mode", "mp")
overloaded_conf["mp_mode"] = conf.get("mp_mode", "forkserver")
nb_workers = conf.get("nb_workers", 2)
overloaded_conf["nb_workers"] = min(available_cpu, nb_workers)
overloaded_conf["task_timeout"] = conf.get("task_timeout", 600)
Expand All @@ -223,6 +231,7 @@ def check_conf(self, conf):
cluster_schema = {
"mode": str,
"dump_to_disk": bool,
"mp_mode": str,
"nb_workers": And(int, lambda x: x > 0),
"task_timeout": And(int, lambda x: x > 0),
"max_ram_per_worker": And(Or(float, int), lambda x: x > 0),
Expand Down Expand Up @@ -265,7 +274,7 @@ def cleanup(self):
if self.tmp_dir is not None:
shutil.rmtree(self.tmp_dir)

def scatter(self, data, broadcast=True):
def scatter(self, data):
"""
Distribute data through workers
Expand Down
61 changes: 33 additions & 28 deletions docs/source/how_to_use_CARS/basic_configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ The structure follows this organization:
**ROI**

A terrain ROI can be provided by the user. It can be either a vector file (Shapefile for instance) path,
or a GeoJson dictionary. These structures must contain a single Polygon or MultiPolygon. Multi-features are
or a GeoJson dictionary. These structures must contain a single Polygon or MultiPolygon. Multi-features are
not supported. Instead of cropping the input images, the whole images will be used to compute grid correction
and terrain + epipolar a priori. Then the rest of the pipeline will use the given roi. T
his allow better correction of epipolar rectification grids.
Expand Down Expand Up @@ -327,11 +327,11 @@ The structure follows this organization:
}
}
If the *debug_with_roi* advanced parameter (see dedicated tab) is enabled, the tiling of the entire image is kept but only the tiles intersecting
If the *debug_with_roi* advanced parameter (see dedicated tab) is enabled, the tiling of the entire image is kept but only the tiles intersecting
the ROI are computed.

MultiPolygon feature is only useful if the parameter *debug_with_roi* is activated, otherwise the total footprint of the
MultiPolygon will be used as ROI.
MultiPolygon feature is only useful if the parameter *debug_with_roi* is activated, otherwise the total footprint of the
MultiPolygon will be used as ROI.

By default epsg 4326 is used. If the user has defined a polygon in a different reference system, the "crs" field must be specified.

Expand Down Expand Up @@ -410,7 +410,7 @@ The structure follows this organization:
**Initial elevation**

The attribute contains all informations about initial elevation: dem path, geoid path and default altitudes.

+-----------------------+----------------------------------------------------------------------------+--------+----------------------+----------------------+----------+
| Name | Description | Type | Available value | Default value | Required |
+=======================+============================================================================+========+======================+======================+==========+
Expand All @@ -424,11 +424,11 @@ The structure follows this organization:
+-----------------------+----------------------------------------------------------------------------+--------+----------------------+----------------------+----------+

See section :ref:`download_srtm_tiles` to download 90-m SRTM DEM. If no DEM path is provided, an internal DEM is generated with sparse matches. Moreover, when there is no DEM data available, a default height above ellipsoid of 0 is used (no coverage for some points or pixels with no_data in the DEM tiles).

If no geoid is provided, the default cars geoid is used (egm96).

If no altitude delta is provided, the `dem_min` and `dem_max` generated with sparse matches will be used.

The altitude deltas are used following this formula:

.. code-block:: python
Expand Down Expand Up @@ -496,7 +496,7 @@ The structure follows this organization:

If CARS is launched on HPC cluster, this mode is not recommended because parameters would be set according to the full node resources.
In this case, use multiprocessing mode and fill the parameters *nb_workers* and *max_ram_per_worker* according to the resources you requested.


Depending on the used orchestrator mode, the following parameters can be added in the configuration:

Expand Down Expand Up @@ -544,33 +544,38 @@ The structure follows this organization:

**Mode multiprocessing:**

+-----------------------+-----------------------------------------------------------+------------------------------------------+---------------+----------+
| Name | Description | Type | Default value | Required |
+=======================+===========================================================+==========================================+===============+==========+
| *nb_workers* | Number of workers | int, should be > 0 | 2 | No |
+-----------------------+-----------------------------------------------------------+------------------------------------------+---------------+----------+
| *max_ram_per_worker* | Maximum ram per worker | int or float, should be > 0 | 2000 | No |
+-----------------------+-----------------------------------------------------------+------------------------------------------+---------------+----------+
| *max_tasks_per_worker*| Number of tasks a worker can complete before refresh | int, should be > 0 | 10 | No |
+-----------------------+-----------------------------------------------------------+------------------------------------------+---------------+----------+
| *dump_to_disk* | Dump temporary files to disk | bool | True | No |
+-----------------------+-----------------------------------------------------------+------------------------------------------+---------------+----------+
| *per_job_timeout* | Timeout used for a job | int or float | 600 | No |
+-----------------------+-----------------------------------------------------------+------------------------------------------+---------------+----------+
| *factorize_tasks* | Tasks sequentially dependent are run in one task | bool | True | No |
+-----------------------+-----------------------------------------------------------+------------------------------------------+---------------+----------+

+-----------------------+-----------------------------------------------------------------+------------------------------------------+---------------+----------+
| Name | Description | Type | Default value | Required |
+=======================+=================================================================+==========================================+===============+==========+
| *mp_mode* | The type of multiprocessing mode "forkserver", "fork", "spawn" | str | "forkserver" | No |
+-----------------------+-----------------------------------------------------------------+------------------------------------------+---------------+----------+
| *nb_workers* | Number of workers | int, should be > 0 | 2 | No |
+-----------------------+-----------------------------------------------------------------+------------------------------------------+---------------+----------+
| *max_ram_per_worker* | Maximum ram per worker | int or float, should be > 0 | 2000 | No |
+-----------------------+-----------------------------------------------------------------+------------------------------------------+---------------+----------+
| *max_tasks_per_worker*| Number of tasks a worker can complete before refresh | int, should be > 0 | 10 | No |
+-----------------------+-----------------------------------------------------------------+------------------------------------------+---------------+----------+
| *dump_to_disk* | Dump temporary files to disk | bool | True | No |
+-----------------------+-----------------------------------------------------------------+------------------------------------------+---------------+----------+
| *per_job_timeout* | Timeout used for a job | int or float | 600 | No |
+-----------------------+-----------------------------------------------------------------+------------------------------------------+---------------+----------+
| *factorize_tasks* | Tasks sequentially dependent are run in one task | bool | True | No |
+-----------------------+-----------------------------------------------------------------+------------------------------------------+---------------+----------+

.. note::

**Factorisation**

Two or more tasks are sequentially dependant if they can be run sequentially, independantly from any other task.
Two or more tasks are sequentially dependant if they can be run sequentially, independantly from any other task.
If it is the case, those tasks can be factorized, which means they can be run in a single task.
Running several tasks in one task avoids doing useless dumps on disk between sequential tasks. It does not lose time
because tasks that are factorized could not be run in parallel, and it permits to save some time from the

Running several tasks in one task avoids doing useless dumps on disk between sequential tasks. It does not lose time
because tasks that are factorized could not be run in parallel, and it permits to save some time from the
creation of tasks and data transfer that are avoided.

.. note::

If you are working on windows, the spawn multiprocessing mode has to be used. If you are putting "fork" or "forkserver", it will be forced to spawn.

**Profiling configuration:**

Expand Down
2 changes: 2 additions & 0 deletions tests/test_end2end.py
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,7 @@ def test_end2end_ventoux_unique():
gt_used_conf_orchestrator = {
"orchestrator": {
"mode": "multiprocessing",
"mp_mode": "forkserver",
"nb_workers": NB_WORKERS,
"profiling": {
"mode": "cars_profiling",
Expand Down Expand Up @@ -2668,6 +2669,7 @@ def test_end2end_use_epipolar_a_priori():
gt_used_conf_orchestrator = {
"orchestrator": {
"mode": "multiprocessing",
"mp_mode": "forkserver",
"nb_workers": NB_WORKERS,
"profiling": {
"mode": "cars_profiling",
Expand Down

0 comments on commit 504f073

Please sign in to comment.