From dc5028a52eca534aecd8f5d4f263cdf8aee24a81 Mon Sep 17 00:00:00 2001 From: Mani Sarkar Date: Fri, 16 Oct 2020 12:49:11 +0100 Subject: [PATCH 1/2] Refactor: split the original generate_features.py and extract the parallelisation aspect into a module of it's own --- nlp_profiler/generate_features.py | 59 +------------------ .../parallelisation_methods/__init__.py | 53 +++++++++++++++++ 2 files changed, 56 insertions(+), 56 deletions(-) create mode 100644 nlp_profiler/parallelisation_methods/__init__.py diff --git a/nlp_profiler/generate_features.py b/nlp_profiler/generate_features.py index 24a4cb7..07d3f8a 100644 --- a/nlp_profiler/generate_features.py +++ b/nlp_profiler/generate_features.py @@ -1,27 +1,9 @@ -import sys -import tempfile - import pandas as pd import swifter # noqa -from joblib import Memory, Parallel, delayed -from tqdm.auto import tqdm from nlp_profiler.constants import DEFAULT_PARALLEL_METHOD, SWIFTER_METHOD - -memory = Memory(tempfile.gettempdir(), compress=9, verbose=0) - - -def is_running_from_ipython(): - inJupyter = sys.argv[-1].endswith('json') - return inJupyter - - -PROGRESS_BAR_WIDTH = 900 if is_running_from_ipython() else None - - -def get_progress_bar(values: list) -> tqdm: - cached_tqdm = memory.cache(tqdm) - return cached_tqdm(values, ncols=PROGRESS_BAR_WIDTH) +from nlp_profiler.parallelisation_methods \ + import get_progress_bar, using_joblib_parallel, using_swifter def generate_features(main_header: str, @@ -43,40 +25,5 @@ def generate_features(main_header: str, ) new_dataframe[new_column] = parallelisation_method_function( - source_field, transformation_function, - source_column, new_column + source_field, transformation_function, new_column ) - - -def run_task(task_function, value: str): # pragma: no cover - # pragma: no cover => multiprocessing leads to loss of test coverage info - cached_task_function = memory.cache(task_function) - return cached_task_function(value) - - -def using_joblib_parallel( - source_field, apply_function, - source_column: str, new_column: str, -) -> pd.DataFrame: - source_values_to_transform = get_progress_bar(source_field.values) - source_values_to_transform.set_description(new_column) - - result = Parallel(n_jobs=-1)( - delayed(run_task)( - apply_function, each_value - ) for _, each_value in enumerate(source_values_to_transform) - ) - source_values_to_transform.update() - return result - - -def using_swifter( - source_field, apply_function, - source_column: str = None, new_column: str = None -) -> pd.DataFrame: - return source_field \ - .swifter \ - .set_dask_scheduler(scheduler="processes") \ - .allow_dask_on_strings(enable=True) \ - .progress_bar(enable=True, desc=new_column) \ - .apply(apply_function, axis=1) diff --git a/nlp_profiler/parallelisation_methods/__init__.py b/nlp_profiler/parallelisation_methods/__init__.py new file mode 100644 index 0000000..d41ebec --- /dev/null +++ b/nlp_profiler/parallelisation_methods/__init__.py @@ -0,0 +1,53 @@ +import sys +import tempfile + +import pandas as pd +import swifter # noqa +from joblib import Memory, Parallel, delayed +from tqdm.auto import tqdm + +memory = Memory(tempfile.gettempdir(), compress=9, verbose=0) + + +def is_running_from_ipython(): + return sys.argv[-1].endswith('json') + + +PROGRESS_BAR_WIDTH = 900 if is_running_from_ipython() else None + + +def get_progress_bar(values: list) -> tqdm: + cached_tqdm = memory.cache(tqdm) + return cached_tqdm(values, ncols=PROGRESS_BAR_WIDTH) + + +def run_task(task_function, value: str): # pragma: no cover + # pragma: no cover => multiprocessing leads to loss of test coverage info + cached_task_function = memory.cache(task_function) + return cached_task_function(value) + + +def using_joblib_parallel( + source_field, apply_function, new_column: str, +) -> pd.DataFrame: + source_values_to_transform = get_progress_bar(source_field.values) + source_values_to_transform.set_description(new_column) + + result = Parallel(n_jobs=-1)( + delayed(run_task)( + apply_function, each_value + ) for _, each_value in enumerate(source_values_to_transform) + ) + source_values_to_transform.update() + return result + + +def using_swifter( + source_field, apply_function, new_column: str = None +) -> pd.DataFrame: + return source_field \ + .swifter \ + .set_dask_scheduler(scheduler="processes") \ + .allow_dask_on_strings(enable=True) \ + .progress_bar(enable=True, desc=new_column) \ + .apply(apply_function, axis=1) From 193f97a56d632e959b1d9e3216f6557eef0f43cd Mon Sep 17 00:00:00 2001 From: Mani Sarkar Date: Fri, 16 Oct 2020 13:03:58 +0100 Subject: [PATCH 2/2] Refactor: converted both generate_features and parallelisation_method into python packages --- .../{generate_features.py => generate_features/__init__.py} | 2 +- .../{ => generate_features}/parallelisation_methods/__init__.py | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename nlp_profiler/{generate_features.py => generate_features/__init__.py} (94%) rename nlp_profiler/{ => generate_features}/parallelisation_methods/__init__.py (100%) diff --git a/nlp_profiler/generate_features.py b/nlp_profiler/generate_features/__init__.py similarity index 94% rename from nlp_profiler/generate_features.py rename to nlp_profiler/generate_features/__init__.py index 07d3f8a..ae8db4f 100644 --- a/nlp_profiler/generate_features.py +++ b/nlp_profiler/generate_features/__init__.py @@ -2,7 +2,7 @@ import swifter # noqa from nlp_profiler.constants import DEFAULT_PARALLEL_METHOD, SWIFTER_METHOD -from nlp_profiler.parallelisation_methods \ +from nlp_profiler.generate_features.parallelisation_methods \ import get_progress_bar, using_joblib_parallel, using_swifter diff --git a/nlp_profiler/parallelisation_methods/__init__.py b/nlp_profiler/generate_features/parallelisation_methods/__init__.py similarity index 100% rename from nlp_profiler/parallelisation_methods/__init__.py rename to nlp_profiler/generate_features/parallelisation_methods/__init__.py