Skip to content

Commit

Permalink
REFACTOR-#6739: Use execution_wrapper instead of directly addressing …
Browse files Browse the repository at this point in the history
…DaskWrapper

Signed-off-by: Anatoly Myachev <[email protected]>
  • Loading branch information
anmyachev committed Nov 14, 2023
1 parent 41ecc92 commit 96d864c
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def apply(self, func, *args, **kwargs):
self._is_debug(log) and log.debug(
f"SUBMIT::_apply_list_of_funcs::{self._identity}"
)
futures = DaskWrapper.deploy(
futures = self.execution_wrapper.deploy(
func=apply_list_of_funcs,
f_args=(call_queue, self._data),
num_returns=2,
Expand All @@ -103,7 +103,7 @@ def apply(self, func, *args, **kwargs):
# We handle `len(call_queue) == 1` in a different way because
# this improves performance a bit.
func, f_args, f_kwargs = call_queue[0]
futures = DaskWrapper.deploy(
futures = self.execution_wrapper.deploy(
func=apply_func,
f_args=(self._data, func, *f_args),
f_kwargs=f_kwargs,
Expand All @@ -127,7 +127,7 @@ def drain_call_queue(self):
self._is_debug(log) and log.debug(
f"SUBMIT::_apply_list_of_funcs::{self._identity}"
)
futures = DaskWrapper.deploy(
futures = self.execution_wrapper.deploy(
func=apply_list_of_funcs,
f_args=(call_queue, self._data),
num_returns=2,
Expand All @@ -138,7 +138,7 @@ def drain_call_queue(self):
# this improves performance a bit.
func, f_args, f_kwargs = call_queue[0]
self._is_debug(log) and log.debug(f"SUBMIT::_apply_func::{self._identity}")
futures = DaskWrapper.deploy(
futures = self.execution_wrapper.deploy(
func=apply_func,
f_args=(self._data, func, *f_args),
f_kwargs=f_kwargs,
Expand All @@ -155,7 +155,7 @@ def drain_call_queue(self):
def wait(self):
"""Wait completing computations on the object wrapped by the partition."""
self.drain_call_queue()
DaskWrapper.wait(self._data)
self.execution_wrapper.wait(self._data)

def mask(self, row_labels, col_labels):
"""
Expand All @@ -181,15 +181,15 @@ def mask(self, row_labels, col_labels):
# fast path - full axis take
new_obj._length_cache = self._length_cache
else:
new_obj._length_cache = DaskWrapper.deploy(
new_obj._length_cache = self.execution_wrapper.deploy(
func=compute_sliced_len, f_args=(row_labels, self._length_cache)
)
if isinstance(col_labels, slice) and isinstance(self._width_cache, Future):
if col_labels == slice(None):
# fast path - full axis take
new_obj._width_cache = self._width_cache
else:
new_obj._width_cache = DaskWrapper.deploy(
new_obj._width_cache = self.execution_wrapper.deploy(
func=compute_sliced_len, f_args=(col_labels, self._width_cache)
)
self._is_debug(log) and log.debug(f"EXIT::Partition.mask::{self._identity}")
Expand Down Expand Up @@ -227,7 +227,11 @@ def put(cls, obj):
PandasOnDaskDataframePartition
A new ``PandasOnDaskDataframePartition`` object.
"""
return cls(DaskWrapper.put(obj, hash=False), len(obj.index), len(obj.columns))
return cls(
cls.execution_wrapper.put(obj, hash=False),
len(obj.index),
len(obj.columns),
)

@classmethod
def preprocess_func(cls, func):
Expand All @@ -244,7 +248,7 @@ def preprocess_func(cls, func):
callable
An object that can be accepted by ``apply``.
"""
return DaskWrapper.put(func, hash=False, broadcast=True)
return cls.execution_wrapper.put(func, hash=False, broadcast=True)

def length(self, materialize=True):
"""
Expand All @@ -265,7 +269,7 @@ def length(self, materialize=True):
if self._length_cache is None:
self._length_cache = self.apply(len)._data
if isinstance(self._length_cache, Future) and materialize:
self._length_cache = DaskWrapper.materialize(self._length_cache)
self._length_cache = self.execution_wrapper.materialize(self._length_cache)
return self._length_cache

def width(self, materialize=True):
Expand All @@ -287,7 +291,7 @@ def width(self, materialize=True):
if self._width_cache is None:
self._width_cache = self.apply(lambda df: len(df.columns))._data
if isinstance(self._width_cache, Future) and materialize:
self._width_cache = DaskWrapper.materialize(self._width_cache)
self._width_cache = self.execution_wrapper.materialize(self._width_cache)
return self._width_cache

def ip(self, materialize=True):
Expand All @@ -309,7 +313,7 @@ def ip(self, materialize=True):
if self._ip_cache is None:
self._ip_cache = self.apply(lambda df: pandas.DataFrame([]))._ip_cache
if materialize and isinstance(self._ip_cache, Future):
self._ip_cache = DaskWrapper.materialize(self._ip_cache)
self._ip_cache = self.execution_wrapper.materialize(self._ip_cache)
return self._ip_cache


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,6 @@ def wait_partitions(cls, partitions):
partitions : np.ndarray
NumPy array with ``PandasDataframePartition``-s.
"""
DaskWrapper.wait(
cls._execution_wrapper.wait(
[block for partition in partitions for block in partition.list_of_blocks]
)

0 comments on commit 96d864c

Please sign in to comment.