Skip to content

Commit

Permalink
Merge branch 'main' into my-first-change
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Oct 20, 2023
2 parents c235a2a + f67ffd4 commit 6e39753
Show file tree
Hide file tree
Showing 145 changed files with 4,046 additions and 645 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/asv.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ jobs:
id: build
shell: bash -el {0}
run: |
pip install -e "git+https://github.com/xorbitsai/xoscar.git@main#subdirectory=python&egg=xoscar"
pip install numpy scipy cython asv coverage
pip install numpy scipy cython asv==0.5.1 coverage
cd python && pip install -e ".[dev,extra]"
- name: Run ASV benchmarks
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/python.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ jobs:
../CI/install-hadoop.sh
echo "import coverage; coverage.process_startup()" > \
$(python -c "import site; print(site.getsitepackages()[-1])")/coverage.pth
conda install --quiet --yes -c conda-forge skein libffi conda-pack
conda install --quiet --yes -c conda-forge skein libffi conda-pack grpcio=1.42.0
fi
if [[ "$MODULE" == "vineyard" ]]; then
pip install "vineyard<0.16.1" -i https://pypi.org/simple
Expand Down Expand Up @@ -250,7 +250,7 @@ jobs:
- name: Install on GPU
if: ${{ matrix.module == 'gpu' }}
run: |
pip install -e "git+https://github.com/xorbitsai/xoscar.git@main#subdirectory=python&egg=xoscar"
pip install -U xoscar
python setup.py build_ext -i
working-directory: ./python

Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ repos:
- id: prettier
types_or: [html, javascript]
- repo: https://github.com/codespell-project/codespell
rev: v2.2.5
rev: v2.2.6
hooks:
- id: codespell
exclude: _mars/lib
Expand Down
2 changes: 1 addition & 1 deletion asv/asv.conf.json
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
"numpy": [],
"Cython": ["0.29.24"],
"pandas": [],
"scipy": [],
"scipy": ["1.10.0"],
"scikit-learn": [],
"numexpr": [],
"cloudpickle": [],
Expand Down
5 changes: 1 addition & 4 deletions python/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ install_requires =
tqdm>=4.1.0
uvloop>=0.14.0; sys_platform!="win32"
pyarrow>=5.0.0
fsspec>=2022.7.1,!=2022.8.0

[options.packages.find]
exclude =
Expand Down Expand Up @@ -84,7 +85,6 @@ doc =
extra =
pillow>=7.0.0
lz4>=1.0.0
fsspec>=2022.7.1,!=2022.8.0
numexpr>=2.6.4
jax =
jax>=0.4.0; sys.platform != "win32"
Expand All @@ -96,14 +96,11 @@ ray =
vineyard =
vineyard>=0.3; sys.platform != "win32"
aws =
fsspec>=2022.7.1,!=2022.8.0
s3fs
azure =
fsspec>=2022.7.1,!=2022.8.0
adlfs
datasets =
datasets
fsspec>=2022.7.1,!=2022.8.0

[coverage:run]
branch = True
Expand Down
2 changes: 2 additions & 0 deletions python/xorbits/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def _install():
from .lightgbm import _install as _install_lightgbm
from .numpy import _install as _install_numpy
from .pandas import _install as _install_pandas
from .sklearn import _install as _install_sklearn
from .web import _install as _install_web
from .xgboost import _install as _install_xgboost

Expand All @@ -34,6 +35,7 @@ def _install():
_install_xgboost()
_install_datasets()
_install_experimental()
_install_sklearn()


_install()
Expand Down
3 changes: 0 additions & 3 deletions python/xorbits/_mars/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,9 +342,6 @@ def validate(x):
default_options.register_option("serialize_method", "pickle")

# dataframe-related options
default_options.register_option(
"dataframe.mode.use_inf_as_na", False, validator=is_bool
)
default_options.register_option(
"dataframe.use_arrow_dtype", None, validator=any_validator(is_null, is_bool)
)
Expand Down
2 changes: 1 addition & 1 deletion python/xorbits/_mars/core/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def __copy__(self):
return self.copy()

def copy(self):
return self.copy_to(type(self)(_key=self.key))
return self.copy_to(type(self)())

def copy_to(self, target: "Base"):
target_fields = target._FIELDS
Expand Down
27 changes: 26 additions & 1 deletion python/xorbits/_mars/core/entity/tileables.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,14 @@ def __copy__(self):
def _view(self):
return super().copy()

def copy(self: TileableType) -> TileableType:
def copy(self: TileableType, **kw) -> TileableType:
from ...dataframe import Index
from ...deploy.oscar.session import SyncSession

new_name = None
if isinstance(self, Index):
new_name = kw.pop("name", None)

new_op = self.op.copy()
if new_op.create_view:
# if the operand is a view, make it a copy
Expand All @@ -378,6 +385,24 @@ def copy(self: TileableType) -> TileableType:
new_outs = new_op.new_tileables(
self.op.inputs, kws=params, output_limit=len(params)
)

sess = self._executed_sessions[-1] if self._executed_sessions else None
to_incref_keys = []
for _out in new_outs:
if sess:
_out._attach_session(sess)
to_incref_keys.append(_out.key)
if self.data in sess._tileable_to_fetch:
sess._tileable_to_fetch[_out.data] = sess._tileable_to_fetch[
self.data
]
if new_name:
_out.name = new_name

if to_incref_keys:
assert sess is not None
SyncSession.from_isolated_session(sess).incref(*to_incref_keys)

pos = -1
for i, out in enumerate(self.op.outputs):
# create a ref to copied one
Expand Down
110 changes: 55 additions & 55 deletions python/xorbits/_mars/dataframe/base/cartesian_chunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import logging

import numpy as np
import pandas as pd

Expand All @@ -22,75 +24,45 @@
from ...serialization.serializables import (
DictField,
FunctionField,
Int32Field,
KeyField,
StringField,
TupleField,
)
from ...utils import enter_current_session, has_unknown_shape, quiet_stdio
from ..operands import DataFrameOperand, DataFrameOperandMixin, OutputType
from ..operands import DataFrameOperand, OutputType
from ..utils import (
build_df,
build_empty_df,
build_series,
parse_index,
validate_output_types,
)
from .core import DataFrameAutoMergeMixin

logger = logging.getLogger(__name__)


class DataFrameCartesianChunk(DataFrameOperand, DataFrameOperandMixin):
class DataFrameCartesianChunk(DataFrameOperand, DataFrameAutoMergeMixin):
_op_type_ = opcodes.CARTESIAN_CHUNK

_left = KeyField("left")
_right = KeyField("right")
_func = FunctionField("func")
_args = TupleField("args")
_kwargs = DictField("kwargs")
left = KeyField("left")
right = KeyField("right")
func = FunctionField("func")
args = TupleField("args")
kwargs = DictField("kwargs")
auto_merge = StringField("auto_merge")
auto_merge_threshold = Int32Field("auto_merge_threshold")

def __init__(
self,
left=None,
right=None,
func=None,
args=None,
kwargs=None,
output_types=None,
**kw
):
super().__init__(
_left=left,
_right=right,
_func=func,
_args=args,
_kwargs=kwargs,
_output_types=output_types,
**kw
)
def __init__(self, output_types=None, **kw):
super().__init__(_output_types=output_types, **kw)
if self.memory_scale is None:
self.memory_scale = 2.0

@property
def left(self):
return self._left

@property
def right(self):
return self._right

@property
def func(self):
return self._func

@property
def args(self):
return self._args

@property
def kwargs(self):
return self._kwargs

def _set_inputs(self, inputs):
super()._set_inputs(inputs)
self._left = self._inputs[0]
self._right = self._inputs[1]
self.left = self.inputs[0]
self.right = self.inputs[1]

@staticmethod
def _build_test_obj(obj):
Expand All @@ -103,15 +75,15 @@ def _build_test_obj(obj):
def __call__(self, left, right, index=None, dtypes=None):
test_left = self._build_test_obj(left)
test_right = self._build_test_obj(right)
output_type = self._output_types[0] if self._output_types else None
output_type = self.output_types[0] if self.output_types else None

if output_type == OutputType.df_or_series:
return self.new_df_or_series([left, right])

# try run to infer meta
try:
with np.errstate(all="ignore"), quiet_stdio():
obj = self._func(test_left, test_right, *self._args, **self._kwargs)
obj = self.func(test_left, test_right, *self.args, **self.kwargs)
except: # noqa: E722 # nosec # pylint: disable=bare-except
if output_type == OutputType.series:
obj = pd.Series([], dtype=np.dtype(object))
Expand All @@ -126,11 +98,11 @@ def __call__(self, left, right, index=None, dtypes=None):
)

if getattr(obj, "ndim", 0) == 1 or output_type == OutputType.series:
shape = self._kwargs.pop("shape", (np.nan,))
shape = self.kwargs.pop("shape", (np.nan,))
if index is None:
index = obj.index
index_value = parse_index(
index, left, right, self._func, self._args, self._kwargs
index, left, right, self.func, self.args, self.kwargs
)
return self.new_series(
[left, right],
Expand All @@ -147,7 +119,7 @@ def __call__(self, left, right, index=None, dtypes=None):
if index is None:
index = obj.index
index_value = parse_index(
index, left, right, self._func, self._args, self._kwargs
index, left, right, self.func, self.args, self.kwargs
)
return self.new_dataframe(
[left, right],
Expand All @@ -164,6 +136,14 @@ def tile(cls, op: "DataFrameCartesianChunk"):
out = op.outputs[0]
out_type = op.output_types[0]

auto_merge_threshold = op.auto_merge_threshold
auto_merge_before, auto_merge_after = cls._get_auto_merge_options(op.auto_merge)

merge_before_res = yield from cls._merge_before(
op, auto_merge_before, auto_merge_threshold, left, right, logger
)
left, right = merge_before_res[0], merge_before_res[1]

if left.ndim == 2 and left.chunk_shape[1] > 1:
if has_unknown_shape(left):
yield
Expand Down Expand Up @@ -240,7 +220,12 @@ def tile(cls, op: "DataFrameCartesianChunk"):
params["nsplits"] = tuple(tuple(ns) for ns in nsplits) if nsplits else nsplits
params["chunks"] = out_chunks
new_op = op.copy()
return new_op.new_tileables(op.inputs, kws=[params])
ret = new_op.new_tileables(op.inputs, kws=[params])

ret = yield from cls._merge_after(
op, auto_merge_after, auto_merge_threshold, ret, logger
)
return ret

@classmethod
@redirect_custom_log
Expand All @@ -250,7 +235,16 @@ def execute(cls, ctx, op: "DataFrameCartesianChunk"):
ctx[op.outputs[0].key] = op.func(left, right, *op.args, **(op.kwargs or dict()))


def cartesian_chunk(left, right, func, skip_infer=False, args=(), **kwargs):
def cartesian_chunk(
left,
right,
func,
skip_infer=False,
args=(),
auto_merge: str = "both",
auto_merge_threshold: int = 8,
**kwargs,
):
output_type = kwargs.pop("output_type", None)
output_types = kwargs.pop("output_types", None)
object_type = kwargs.pop("object_type", None)
Expand All @@ -265,6 +259,10 @@ def cartesian_chunk(left, right, func, skip_infer=False, args=(), **kwargs):
index = kwargs.pop("index", None)
dtypes = kwargs.pop("dtypes", None)
memory_scale = kwargs.pop("memory_scale", None)
if auto_merge not in ["both", "none", "before", "after"]: # pragma: no cover
raise ValueError(
f"auto_merge can only be `both`, `none`, `before` or `after`, got {auto_merge}"
)

op = DataFrameCartesianChunk(
left=left,
Expand All @@ -274,5 +272,7 @@ def cartesian_chunk(left, right, func, skip_infer=False, args=(), **kwargs):
kwargs=kwargs,
output_types=output_types,
memory_scale=memory_scale,
auto_merge=auto_merge,
auto_merge_threshold=auto_merge_threshold,
)
return op(left, right, index=index, dtypes=dtypes)
Loading

0 comments on commit 6e39753

Please sign in to comment.