diff --git a/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py b/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py index 8dcc87b5699..a8d10bf60f7 100644 --- a/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py +++ b/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py @@ -93,7 +93,7 @@ def apply(self, func, *args, **kwargs): self._is_debug(log) and log.debug( f"SUBMIT::_apply_list_of_funcs::{self._identity}" ) - futures = DaskWrapper.deploy( + futures = self.execution_wrapper.deploy( func=apply_list_of_funcs, f_args=(call_queue, self._data), num_returns=2, @@ -103,7 +103,7 @@ def apply(self, func, *args, **kwargs): # We handle `len(call_queue) == 1` in a different way because # this improves performance a bit. func, f_args, f_kwargs = call_queue[0] - futures = DaskWrapper.deploy( + futures = self.execution_wrapper.deploy( func=apply_func, f_args=(self._data, func, *f_args), f_kwargs=f_kwargs, @@ -127,7 +127,7 @@ def drain_call_queue(self): self._is_debug(log) and log.debug( f"SUBMIT::_apply_list_of_funcs::{self._identity}" ) - futures = DaskWrapper.deploy( + futures = self.execution_wrapper.deploy( func=apply_list_of_funcs, f_args=(call_queue, self._data), num_returns=2, @@ -138,7 +138,7 @@ def drain_call_queue(self): # this improves performance a bit. func, f_args, f_kwargs = call_queue[0] self._is_debug(log) and log.debug(f"SUBMIT::_apply_func::{self._identity}") - futures = DaskWrapper.deploy( + futures = self.execution_wrapper.deploy( func=apply_func, f_args=(self._data, func, *f_args), f_kwargs=f_kwargs, @@ -155,7 +155,7 @@ def drain_call_queue(self): def wait(self): """Wait completing computations on the object wrapped by the partition.""" self.drain_call_queue() - DaskWrapper.wait(self._data) + self.execution_wrapper.wait(self._data) def mask(self, row_labels, col_labels): """ @@ -181,7 +181,7 @@ def mask(self, row_labels, col_labels): # fast path - full axis take new_obj._length_cache = self._length_cache else: - new_obj._length_cache = DaskWrapper.deploy( + new_obj._length_cache = self.execution_wrapper.deploy( func=compute_sliced_len, f_args=(row_labels, self._length_cache) ) if isinstance(col_labels, slice) and isinstance(self._width_cache, Future): @@ -189,7 +189,7 @@ def mask(self, row_labels, col_labels): # fast path - full axis take new_obj._width_cache = self._width_cache else: - new_obj._width_cache = DaskWrapper.deploy( + new_obj._width_cache = self.execution_wrapper.deploy( func=compute_sliced_len, f_args=(col_labels, self._width_cache) ) self._is_debug(log) and log.debug(f"EXIT::Partition.mask::{self._identity}") @@ -227,7 +227,11 @@ def put(cls, obj): PandasOnDaskDataframePartition A new ``PandasOnDaskDataframePartition`` object. """ - return cls(DaskWrapper.put(obj, hash=False), len(obj.index), len(obj.columns)) + return cls( + cls.execution_wrapper.put(obj, hash=False), + len(obj.index), + len(obj.columns), + ) @classmethod def preprocess_func(cls, func): @@ -244,7 +248,7 @@ def preprocess_func(cls, func): callable An object that can be accepted by ``apply``. """ - return DaskWrapper.put(func, hash=False, broadcast=True) + return cls.execution_wrapper.put(func, hash=False, broadcast=True) def length(self, materialize=True): """ @@ -265,7 +269,7 @@ def length(self, materialize=True): if self._length_cache is None: self._length_cache = self.apply(len)._data if isinstance(self._length_cache, Future) and materialize: - self._length_cache = DaskWrapper.materialize(self._length_cache) + self._length_cache = self.execution_wrapper.materialize(self._length_cache) return self._length_cache def width(self, materialize=True): @@ -287,7 +291,7 @@ def width(self, materialize=True): if self._width_cache is None: self._width_cache = self.apply(lambda df: len(df.columns))._data if isinstance(self._width_cache, Future) and materialize: - self._width_cache = DaskWrapper.materialize(self._width_cache) + self._width_cache = self.execution_wrapper.materialize(self._width_cache) return self._width_cache def ip(self, materialize=True): @@ -309,7 +313,7 @@ def ip(self, materialize=True): if self._ip_cache is None: self._ip_cache = self.apply(lambda df: pandas.DataFrame([]))._ip_cache if materialize and isinstance(self._ip_cache, Future): - self._ip_cache = DaskWrapper.materialize(self._ip_cache) + self._ip_cache = self.execution_wrapper.materialize(self._ip_cache) return self._ip_cache diff --git a/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition_manager.py b/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition_manager.py index f045c6ef392..a1a46dc0f12 100644 --- a/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition_manager.py +++ b/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition_manager.py @@ -46,6 +46,6 @@ def wait_partitions(cls, partitions): partitions : np.ndarray NumPy array with ``PandasDataframePartition``-s. """ - DaskWrapper.wait( + cls._execution_wrapper.wait( [block for partition in partitions for block in partition.list_of_blocks] )