diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 52f77b82..05bcf903 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,12 +1,12 @@ repos: - repo: https://github.com/psf/black - rev: 19.10b0 + rev: 22.3.0 hooks: - id: black language_version: python3 exclude: "^docs/.*$" - repo: https://github.com/pycqa/isort - rev: 5.7.0 + rev: 5.12.0 hooks: - id: isort args: diff --git a/docs/text/tsfresh_on_a_cluster.rst b/docs/text/tsfresh_on_a_cluster.rst index 75dfdcc1..6576a1a3 100644 --- a/docs/text/tsfresh_on_a_cluster.rst +++ b/docs/text/tsfresh_on_a_cluster.rst @@ -200,6 +200,56 @@ of 3 workers: column_sort='time', distributor=Distributor) +Using Ray to distribute the calculations +''''''''''''''''''''''''''''''''''''''''' +We provide a Distributor for the `Ray framework `_, where +*"Ray is an open-source unified compute framework that makes it easy to scale AI and Python workloads."* + +.. NOTE:: + Ray is an optional dependency and users who needs to use Ray to distribute the calculations should install + it first by `pip install ray`. + +Ray is a easy-to-use developing framework for distributed computing workload. Users could use it on single node or scale +to a cluster. Users only need to have an address of the Ray cluster to connect to (e.g., "ray://123.45.67.89:10001"). + +.. code:: python + + from tsfresh.examples.robot_execution_failures import \ + download_robot_execution_failures, \ + load_robot_execution_failures + from tsfresh.feature_extraction import extract_features + from tsfresh.utilities.distribution import RayDistributor + + download_robot_execution_failures() + df, y = load_robot_execution_failures() + + Distributor = RayDistributor(address="ray://123.45.67.89:10001") + + X = extract_features(timeseries_container=df, + column_id='id', + column_sort='time', + distributor=Distributor) + +If users would like to have a local cluster with multiple workers. They could simply leave `address`` unset. + +.. code:: python + + from tsfresh.examples.robot_execution_failures import \ + download_robot_execution_failures, \ + load_robot_execution_failures + from tsfresh.feature_extraction import extract_features + from tsfresh.utilities.distribution import RayDistributor + + download_robot_execution_failures() + df, y = load_robot_execution_failures() + + Distributor = RayDistributor(n_workers=3) + + X = extract_features(timeseries_container=df, + column_id='id', + column_sort='time', + distributor=Distributor) + Writing your own distributor '''''''''''''''''''''''''''' diff --git a/test-requirements.txt b/test-requirements.txt index 790b542e..7f59345a 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -8,4 +8,6 @@ seaborn>=0.7.1 ipython>=5.3.0 notebook>=4.4.1 pandas-datareader>=0.5.0 +ray>=2.5.0 +protobuf<=3.20.3 pre-commit diff --git a/tests/units/utilities/test_distribution.py b/tests/units/utilities/test_distribution.py index c4cddd21..b503ca4b 100644 --- a/tests/units/utilities/test_distribution.py +++ b/tests/units/utilities/test_distribution.py @@ -15,6 +15,7 @@ ClusterDaskDistributor, LocalDaskDistributor, MultiprocessingDistributor, + RayDistributor, ) @@ -189,3 +190,70 @@ def test_dask_cluster_extraction_two_workers(self): self.assertTrue(np.all(extracted_features.b__mean == np.array([37.85, 34.75]))) self.assertTrue(np.all(extracted_features.b__median == np.array([39.5, 28.0]))) cluster.close() + + +# RayDistributor +class LocalRayDistributorTestCase(DataTestCase): + def test_ray_cluster_extraction_one_worker(self): + + Distributor = RayDistributor(n_workers=1) + + df = self.create_test_data_sample() + extracted_features = extract_features( + df, + column_id="id", + column_sort="sort", + column_kind="kind", + column_value="val", + distributor=Distributor, + ) + + self.assertIsInstance(extracted_features, pd.DataFrame) + self.assertTrue(np.all(extracted_features.a__maximum == np.array([71, 77]))) + self.assertTrue( + np.all(extracted_features.a__sum_values == np.array([691, 1017])) + ) + self.assertTrue( + np.all(extracted_features.a__abs_energy == np.array([32211, 63167])) + ) + self.assertTrue( + np.all(extracted_features.b__sum_values == np.array([757, 695])) + ) + self.assertTrue(np.all(extracted_features.b__minimum == np.array([3, 1]))) + self.assertTrue( + np.all(extracted_features.b__abs_energy == np.array([36619, 35483])) + ) + self.assertTrue(np.all(extracted_features.b__mean == np.array([37.85, 34.75]))) + self.assertTrue(np.all(extracted_features.b__median == np.array([39.5, 28.0]))) + + def test_ray_cluster_extraction_two_worker(self): + + Distributor = RayDistributor(n_workers=2) + + df = self.create_test_data_sample() + extracted_features = extract_features( + df, + column_id="id", + column_sort="sort", + column_kind="kind", + column_value="val", + distributor=Distributor, + ) + + self.assertIsInstance(extracted_features, pd.DataFrame) + self.assertTrue(np.all(extracted_features.a__maximum == np.array([71, 77]))) + self.assertTrue( + np.all(extracted_features.a__sum_values == np.array([691, 1017])) + ) + self.assertTrue( + np.all(extracted_features.a__abs_energy == np.array([32211, 63167])) + ) + self.assertTrue( + np.all(extracted_features.b__sum_values == np.array([757, 695])) + ) + self.assertTrue(np.all(extracted_features.b__minimum == np.array([3, 1]))) + self.assertTrue( + np.all(extracted_features.b__abs_energy == np.array([36619, 35483])) + ) + self.assertTrue(np.all(extracted_features.b__mean == np.array([37.85, 34.75]))) + self.assertTrue(np.all(extracted_features.b__median == np.array([39.5, 28.0]))) diff --git a/tsfresh/utilities/distribution.py b/tsfresh/utilities/distribution.py index 66ac692b..05218c4d 100644 --- a/tsfresh/utilities/distribution.py +++ b/tsfresh/utilities/distribution.py @@ -494,6 +494,84 @@ def close(self): self.pool.join() +class RayDistributor(IterableDistributorBaseClass): + def __init__( + self, + address=None, + rayinit_config={}, + n_workers=1, + disable_progressbar=False, + progressbar_title="Feature Extraction", + ): + """ + Creates a new RayDistributor instance + + :param address: the ip address and port number of the Ray Cluster + :type address: str + :param rayinit_config: external config for the ray.init calling + :type rayinit_config: dict + :param n_workers: How many workers should the multiprocessing pool have? + :type n_workers: int + :param disable_progressbar: whether to show a progressbar or not. + :type disable_progressbar: bool + :param progressbar_title: the title of the progressbar + :type progressbar_title: basestring + """ + import ray + + ray.init(address=address, **rayinit_config) + self.n_workers = n_workers + self.cpu_per_worker = max( + 1, int(ray.available_resources()["CPU"]) // self.n_workers + ) + self.disable_progressbar = disable_progressbar + self.progressbar_title = progressbar_title + + def calculate_best_chunk_size(self, data_length): + """ + Uses the number of ray workers in the cluster (during execution time, meaning when you start the extraction) + to find the optimal chunk_size. + + :param data_length: A length which defines how many calculations there need to be. + :type data_length: int + """ + chunk_size, extra = divmod(data_length, self.n_workers * 5) + if extra: + chunk_size += 1 + return chunk_size + + def distribute(self, func, partitioned_chunks, kwargs): + """ + Create a remote function in ray and calculate the features in a parallel fashion + by distributing the data chunck to the remote function. + + :param func: the function to send to each worker. + :type func: callable + :param partitioned_chunks: The list of data chunks - each element is again + a list of chunks - and should be processed by one worker. + :type partitioned_chunks: iterable + :param kwargs: parameters for the map function + :type kwargs: dict of string to parameter + + :return: The result of the calculation as a generator - + each item should be the result of the application of func to a single element. + """ + import ray + + remote_func = ray.remote(func).options(num_cpus=self.cpu_per_worker) + results = [remote_func.remote(chunk, **kwargs) for chunk in partitioned_chunks] + for result in results: + yield ray.get(result) + + def close(self): + """ + Disconnect the worker, and terminate processes. + """ + import ray + + ray.shutdown() + + class ApplyDistributor(DistributorBaseClass): def __init__(self, meta): self.meta = meta