From bc48554a08cd2503a562f1203624047afa48fe21 Mon Sep 17 00:00:00 2001 From: Jiaming Yuan Date: Wed, 18 Dec 2024 16:15:44 +0800 Subject: [PATCH] [WIP] Initial support for polars. pyarrow is required for polars input. Categorical data is not yet supported, will wait until the re-coder is completed. This patch also helps setup the code for accepting CPU-based arrow data. - Add masked dataframe support - Initial support for polars. --- ops/conda_env/python_lint.yml | 1 + python-package/xgboost/_data_utils.py | 17 +- python-package/xgboost/compat.py | 7 + python-package/xgboost/core.py | 20 +- python-package/xgboost/data.py | 322 +++++++++++++++++++-- python-package/xgboost/testing/__init__.py | 3 + src/data/adapter.h | 23 +- tests/python/test_with_arrow.py | 1 - tests/python/test_with_polars.py | 55 ++++ tests/python/test_with_sklearn.py | 1 - 10 files changed, 409 insertions(+), 41 deletions(-) create mode 100644 tests/python/test_with_polars.py diff --git a/ops/conda_env/python_lint.yml b/ops/conda_env/python_lint.yml index 579c377ef5ac..c59a70398ad4 100644 --- a/ops/conda_env/python_lint.yml +++ b/ops/conda_env/python_lint.yml @@ -10,6 +10,7 @@ dependencies: - numpy - scipy - pandas +- pyarrow - scikit-learn - dask - distributed diff --git a/python-package/xgboost/_data_utils.py b/python-package/xgboost/_data_utils.py index 23483af0751e..11a2608bde82 100644 --- a/python-package/xgboost/_data_utils.py +++ b/python-package/xgboost/_data_utils.py @@ -16,6 +16,13 @@ class _ArrayLikeArg(Protocol): def __array_interface__(self) -> "ArrayInf": ... +class _TransformedDf(Protocol): + def array_interface(self) -> bytes: ... + + @property + def shape(self) -> Tuple[int, int]: ... + + ArrayInf = TypedDict( "ArrayInf", { @@ -92,7 +99,10 @@ def __cuda_array_interface__(self, interface: ArrayInf) -> None: def make_array_interface( - ptr: CNumericPtr, shape: Tuple[int, ...], dtype: Type[np.number], is_cuda: bool + ptr: Union[CNumericPtr, int], + shape: Tuple[int, ...], + dtype: Type[np.number], + is_cuda: bool, ) -> ArrayInf: """Make an __(cuda)_array_interface__ from a pointer.""" # Use an empty array to handle typestr and descr @@ -103,7 +113,10 @@ def make_array_interface( empty = np.empty(shape=(0,), dtype=dtype) array = empty.__array_interface__ # pylint: disable=no-member - addr = ctypes.cast(ptr, ctypes.c_void_p).value + if not isinstance(ptr, int): + addr = ctypes.cast(ptr, ctypes.c_void_p).value + else: + addr = ptr length = int(np.prod(shape)) # Handle empty dataset. assert addr is not None or length == 0 diff --git a/python-package/xgboost/compat.py b/python-package/xgboost/compat.py index e09ccb6f4d29..ef61d43f796a 100644 --- a/python-package/xgboost/compat.py +++ b/python-package/xgboost/compat.py @@ -123,6 +123,13 @@ def import_cupy() -> types.ModuleType: return cupy +@functools.cache +def import_polars() -> types.ModuleType: + import polars as pl + + return pl + + try: import scipy.sparse as scipy_sparse from scipy.sparse import csr_matrix as scipy_csr diff --git a/python-package/xgboost/core.py b/python-package/xgboost/core.py index f2d4fb548c8e..215d50386da8 100644 --- a/python-package/xgboost/core.py +++ b/python-package/xgboost/core.py @@ -37,6 +37,7 @@ import scipy.sparse from ._data_utils import ( + _TransformedDf, array_interface, cuda_array_interface, from_array_interface, @@ -64,7 +65,7 @@ TransformedData, c_bst_ulong, ) -from .compat import PANDAS_INSTALLED, DataFrame, py_str +from .compat import PANDAS_INSTALLED, DataFrame, py_str, import_polars from .libpath import find_lib_path @@ -1431,7 +1432,7 @@ def _ref_data_from_array(self, data: np.ndarray) -> None: """Reference data from numpy array.""" _check_call(_LIB.XGProxyDMatrixSetDataDense(self.handle, array_interface(data))) - def _ref_data_from_pandas(self, data: DataType) -> None: + def _ref_data_from_pandas(self, data: _TransformedDf) -> None: """Reference data from a pandas DataFrame. The input is a PandasTransformed instance. @@ -2601,8 +2602,8 @@ def inplace_predict( assert proxy is None or isinstance(proxy, _ProxyDMatrix) from .data import ( + ArrowTransformed, PandasTransformed, - _arrow_transform, _is_arrow, _is_cudf_df, _is_cudf_pandas, @@ -2611,7 +2612,11 @@ def inplace_predict( _is_np_array_like, _is_pandas_df, _is_pandas_series, + _is_polars, _is_tuple, + _is_polars_series, + _transform_polars_df, + _transform_arrow_table, _transform_pandas_df, ) @@ -2620,7 +2625,12 @@ def inplace_predict( enable_categorical = True if _is_arrow(data): - data = _arrow_transform(data) + data, fns, _ = _transform_arrow_table(data, enable_categorical, None, None) + if _is_polars_series(data): + pl = import_polars() + data = pl.DataFrame({data.name: data}) + if _is_polars(data): + data, fns, _ = _transform_polars_df(data, enable_categorical, None, None) if _is_pandas_series(data): import pandas as pd @@ -2659,7 +2669,7 @@ def inplace_predict( ) ) return _prediction_output(shape, dims, preds, False) - if isinstance(data, PandasTransformed): + if isinstance(data, (ArrowTransformed, PandasTransformed)): _check_call( _LIB.XGBoosterPredictFromColumnar( self.handle, diff --git a/python-package/xgboost/data.py b/python-package/xgboost/data.py index d49ff5e43899..d25a0f3be120 100644 --- a/python-package/xgboost/data.py +++ b/python-package/xgboost/data.py @@ -6,15 +6,31 @@ import json import os import warnings -from typing import Any, Callable, List, Optional, Sequence, Tuple, TypeGuard, cast +from typing import ( + TYPE_CHECKING, + Any, + Callable, + Dict, + List, + Optional, + Sequence, + Tuple, + Type, + TypeGuard, + Union, + cast, +) import numpy as np from ._data_utils import ( + ArrayInf, + _TransformedDf, array_hasobject, array_interface, array_interface_dict, cuda_array_interface, + make_array_interface, ) from ._typing import ( CupyT, @@ -30,7 +46,7 @@ ) from .compat import DataFrame from .compat import Series as PdSeries -from .compat import lazy_isinstance +from .compat import import_polars, lazy_isinstance from .core import ( _LIB, DataIter, @@ -43,6 +59,9 @@ make_jcargs, ) +if TYPE_CHECKING: + import pyarrow as pa + DispatchedDataBackendReturnType = Tuple[ ctypes.c_void_p, Optional[FeatureNames], Optional[FeatureTypes] ] @@ -585,7 +604,7 @@ def oth_type(ser: PdSeries) -> np.ndarray: return result -class PandasTransformed: +class PandasTransformed(_TransformedDf): """A storage class for transformed pandas DataFrame.""" def __init__(self, columns: List[np.ndarray]) -> None: @@ -721,26 +740,243 @@ def _from_pandas_series( ) +@functools.cache +def _arrow_npdtype() -> Dict[Any, Type[np.number]]: + import pyarrow as pa + + mapping: Dict[Any, Type[np.number]] = { + pa.int8(): np.int8, + pa.int16(): np.int16, + pa.int32(): np.int32, + pa.int64(): np.int64, + pa.uint8(): np.uint8, + pa.uint16(): np.uint16, + pa.uint32(): np.uint32, + pa.uint64(): np.uint64, + pa.float16(): np.float16, + pa.float32(): np.float32, + pa.float64(): np.float64, + } + + return mapping + + +class ArrowTransformed(_TransformedDf): + """A storage class for transformed arrow table.""" + + def __init__( + self, columns: List[Union["pa.NumericArray", "pa.DictionaryArray"]] + ) -> None: + self.columns = columns + + def array_interface(self) -> bytes: + """Return a byte string for JSON encoded array interface.""" + import pyarrow as pa + + def map_array_inf( + col: Union["pa.NumericArray", "pa.DictionaryArray"] + ) -> ArrayInf: + buffers = col.buffers() + if isinstance(col, pa.DictionaryArray): + mask, _, data = col.buffers() + else: + mask, data = buffers + + assert data.is_cpu + assert col.offset == 0 + + jdata = make_array_interface( + data.address, + shape=(len(col),), + dtype=_arrow_npdtype()[col.type], + is_cuda=not data.is_cpu, + ) + if mask is not None: + jmask: ArrayInf = { + "data": (mask.address, True), + "typestr": " Tuple[int, int]: + """Return shape of the transformed DataFrame.""" + return len(self.columns[0]), len(self.columns) + + def _is_arrow(data: DataType) -> bool: - return lazy_isinstance(data, "pyarrow.lib", "Table") or lazy_isinstance( - data, "pyarrow._dataset", "Dataset" + return lazy_isinstance(data, "pyarrow.lib", "Table") + + +def _transform_arrow_table( + data: "pa.Table", + enable_categorical: bool, + feature_names: Optional[FeatureNames], + feature_types: Optional[FeatureTypes], +) -> Tuple[ArrowTransformed, Optional[FeatureNames], Optional[FeatureTypes]]: + import pyarrow as pa + + t_names, t_types = _arrow_feature_info(data) + + if feature_names is None: + feature_names = t_names + if feature_types is None: + feature_types = t_types + + columns = [] + for cname in feature_names: + col0: pa.ChunkedArray = data.column(cname) + col: Union[pa.NumericArray, pa.DictionaryArray] = col0.combine_chunks() + if isinstance(col, pa.BooleanArray): + col = col.cast(pa.int8()) # bit-compressed array, not supported. + columns.append(col) + + df_t = ArrowTransformed(columns) + return df_t, feature_names, feature_types + + +def _from_arrow_table( + data: DataType, + enable_categorical: bool, + missing: FloatCompatible, + n_threads: int, + feature_names: Optional[FeatureNames], + feature_types: Optional[FeatureTypes], + data_split_mode: DataSplitMode = DataSplitMode.ROW, +) -> DispatchedDataBackendReturnType: + df_t, feature_names, feature_types = _transform_arrow_table( + data, enable_categorical, feature_names, feature_types ) + handle = ctypes.c_void_p() + _check_call( + _LIB.XGDMatrixCreateFromColumnar( + df_t.array_interface(), + make_jcargs( + nthread=n_threads, missing=missing, data_split_mode=data_split_mode + ), + ctypes.byref(handle), + ) + ) + return handle, feature_names, feature_types -def _arrow_transform(data: DataType) -> Any: - import pandas as pd +@functools.cache +def _arrow_dtype() -> Dict[DataType, str]: + import pyarrow as pa + + mapping = { + pa.int8(): "int", + pa.int16(): "int", + pa.int32(): "int", + pa.int64(): "int", + pa.uint8(): "int", + pa.uint16(): "int", + pa.uint32(): "int", + pa.uint64(): "int", + pa.float16(): "float", + pa.float32(): "float", + pa.float64(): "float", + pa.bool_(): "i", + } + + return mapping + + +def _arrow_feature_info(data: DataType) -> Tuple[List[str], List]: import pyarrow as pa - from pyarrow.dataset import Dataset - if isinstance(data, Dataset): - raise TypeError("arrow Dataset is not supported.") + table: pa.Table = data + names = table.column_names - data = cast(pa.Table, data) + def map_type(name: str) -> str: + col = table.column(name) + if isinstance(col, pa.DictionaryType): + raise NotImplementedError( + "Categorical feature is not yet supported with the current input data " + "type." + ) + return CAT_T + return _arrow_dtype()[col.type] + + types = list(map(map_type, names)) + return names, types + + +def _meta_from_arrow_table( + data: DataType, + name: str, + dtype: Optional[NumpyDType], + handle: ctypes.c_void_p, +) -> None: + table: "pa.Table" = data + _meta_from_pandas_df(table.to_pandas(), name=name, dtype=dtype, handle=handle) - # For common cases, this is zero-copy, can check with: - # pa.total_allocated_bytes() - df = data.to_pandas(types_mapper=pd.ArrowDtype) - return df + +def _is_polars_lazyframe(data: DataType) -> bool: + return lazy_isinstance(data, "polars.lazyframe.frame", "LazyFrame") + + +def _is_polars_series(data: DataType) -> bool: + return lazy_isinstance(data, "polars.series.series", "Series") + + +def _is_polars(data: DataType) -> bool: + lf = _is_polars_lazyframe(data) + df = lazy_isinstance(data, "polars.dataframe.frame", "DataFrame") + return lf or df + + +def _transform_polars_df( + data: DataType, + enable_categorical: bool, + feature_names: Optional[FeatureNames], + feature_types: Optional[FeatureTypes], +) -> Tuple[ArrowTransformed, Optional[FeatureNames], Optional[FeatureTypes]]: + if _is_polars_lazyframe(data): + df = data.collect() + else: + df = data + + table = df.to_arrow() + return _transform_arrow_table( + table, enable_categorical, feature_names, feature_types + ) + + +def _from_polars_df( + data: DataType, + enable_categorical: bool, + missing: FloatCompatible, + n_threads: int, + feature_names: Optional[FeatureNames], + feature_types: Optional[FeatureTypes], + data_split_mode: DataSplitMode = DataSplitMode.ROW, +) -> DispatchedDataBackendReturnType: + df_t, feature_names, feature_types = _transform_polars_df( + data, enable_categorical, feature_names, feature_types + ) + handle = ctypes.c_void_p() + _check_call( + _LIB.XGDMatrixCreateFromColumnar( + df_t.array_interface(), + make_jcargs( + nthread=n_threads, missing=missing, data_split_mode=data_split_mode + ), + ctypes.byref(handle), + ) + ) + return handle, feature_names, feature_types def _is_cudf_df(data: DataType) -> bool: @@ -1089,7 +1325,11 @@ def dispatch_data_backend( data_split_mode: DataSplitMode = DataSplitMode.ROW, ) -> DispatchedDataBackendReturnType: """Dispatch data for DMatrix.""" - if not _is_cudf_ser(data) and not _is_pandas_series(data): + if ( + not _is_cudf_ser(data) + and not _is_pandas_series(data) + and not _is_polars_series(data) + ): _check_data_shape(data) if is_scipy_csr(data): return _from_scipy_csr( @@ -1147,8 +1387,30 @@ def dispatch_data_backend( feature_types=feature_types, data_split_mode=data_split_mode, ) + if _is_polars_series(data): + pl = import_polars() + + data = pl.DataFrame({data.name: data}) + if _is_polars(data): + return _from_polars_df( + data, + enable_categorical, + missing=missing, + n_threads=threads, + feature_names=feature_names, + feature_types=feature_types, + data_split_mode=data_split_mode, + ) if _is_arrow(data): - data = _arrow_transform(data) + return _from_arrow_table( + data, + enable_categorical, + missing=missing, + n_threads=threads, + feature_names=feature_names, + feature_types=feature_types, + data_split_mode=data_split_mode, + ) if _is_cudf_pandas(data): data = data._fsproxy_fast # pylint: disable=protected-access if _is_pandas_series(data): @@ -1303,9 +1565,15 @@ def dispatch_meta_backend( _meta_from_numpy(data, name, dtype, handle) return if _is_arrow(data): - data = _arrow_transform(data) + _meta_from_arrow_table(data, name, dtype, handle) + return if _is_cudf_pandas(data): data = data._fsproxy_fast # pylint: disable=protected-access + if _is_polars(data): + if _is_polars_lazyframe(data): + data = data.collect() + _meta_from_arrow_table(data.to_arrow(), name, dtype, handle) + return if _is_pandas_df(data): _meta_from_pandas_df(data, name, dtype=dtype, handle=handle) return @@ -1399,12 +1667,20 @@ def _proxy_transform( if is_scipy_coo(data): data = transform_scipy_sparse(data.tocsr(), True) return data, None, feature_names, feature_types + if _is_polars(data): + df_pl, feature_names, feature_types = _transform_polars_df( + data, enable_categorical, feature_names, feature_types + ) + return df_pl, None, feature_names, feature_types if _is_pandas_series(data): import pandas as pd data = pd.DataFrame(data) if _is_arrow(data): - data = _arrow_transform(data) + df_pa, feature_names, feature_types = _transform_arrow_table( + data, enable_categorical, feature_names, feature_types + ) + return df_pa, None, feature_names, feature_types if _is_pandas_df(data): df, feature_names, feature_types = _transform_pandas_df( data, enable_categorical, feature_names, feature_types @@ -1424,7 +1700,11 @@ def dispatch_proxy_set_data( cat_codes: Optional[list], ) -> None: """Dispatch for QuantileDMatrix.""" - if not _is_cudf_ser(data) and not _is_pandas_series(data): + if ( + not _is_cudf_ser(data) + and not _is_pandas_series(data) + and not _is_polars_series(data) + ): _check_data_shape(data) if _is_cudf_df(data): @@ -1443,7 +1723,7 @@ def dispatch_proxy_set_data( proxy._ref_data_from_cuda_interface(data) # pylint: disable=W0212 return # Host - if isinstance(data, PandasTransformed): + if isinstance(data, (ArrowTransformed, PandasTransformed)): proxy._ref_data_from_pandas(data) # pylint: disable=W0212 return if _is_np_array_like(data): diff --git a/python-package/xgboost/testing/__init__.py b/python-package/xgboost/testing/__init__.py index 53779403917b..7859f6b36032 100644 --- a/python-package/xgboost/testing/__init__.py +++ b/python-package/xgboost/testing/__init__.py @@ -141,6 +141,9 @@ def no_arrow() -> PytestSkip: return no_mod("pyarrow") +def no_polars() -> PytestSkip: + return no_mod("polars") + def no_modin() -> PytestSkip: try: import modin.pandas as md diff --git a/src/data/adapter.h b/src/data/adapter.h index 741fd69cebb2..9259e54b6a00 100644 --- a/src/data/adapter.h +++ b/src/data/adapter.h @@ -10,10 +10,8 @@ #include // for isfinite #include // for size_t #include // for uint8_t -#include // for back_inserter #include // for numeric_limits #include // for unique_ptr, make_unique -#include // for string #include // for move #include // for vector @@ -537,26 +535,29 @@ class CSCArrayAdapter : public detail::SingleBatchDataIter }; class ColumnarAdapterBatch : public detail::NoMetaInfo { - common::Span> columns_; + common::Span> columns_; class Line { - common::Span> const& columns_; - std::size_t ridx_; + common::Span> const& columns_; + std::size_t const ridx_; public: - explicit Line(common::Span> const& columns, std::size_t ridx) + explicit Line(common::Span> const& columns, std::size_t ridx) : columns_{columns}, ridx_{ridx} {} [[nodiscard]] std::size_t Size() const { return columns_.empty() ? 0 : columns_.size(); } [[nodiscard]] COOTuple GetElement(std::size_t idx) const { - return {ridx_, idx, columns_[idx](ridx_)}; + auto const& column = columns_[idx]; + float value = column.valid.Data() == nullptr || column.valid.Check(ridx_) + ? column(ridx_) + : std::numeric_limits::quiet_NaN(); + return {ridx_, idx, value}; } }; public: ColumnarAdapterBatch() = default; - explicit ColumnarAdapterBatch(common::Span> columns) - : columns_{columns} {} + explicit ColumnarAdapterBatch(common::Span> columns) : columns_{columns} {} [[nodiscard]] Line GetLine(std::size_t ridx) const { return Line{columns_, ridx}; } [[nodiscard]] std::size_t Size() const { return columns_.empty() ? 0 : columns_.front().Shape<0>(); @@ -568,7 +569,7 @@ class ColumnarAdapterBatch : public detail::NoMetaInfo { }; class ColumnarAdapter : public detail::SingleBatchDataIter { - std::vector> columns_; + std::vector> columns_; ColumnarAdapterBatch batch_; public: @@ -581,7 +582,7 @@ class ColumnarAdapter : public detail::SingleBatchDataIter } bool consistent = columns_.empty() || - std::all_of(columns_.cbegin(), columns_.cend(), [&](ArrayInterface<1, false> const& array) { + std::all_of(columns_.cbegin(), columns_.cend(), [&](ArrayInterface<1> const& array) { return array.Shape<0>() == columns_[0].Shape<0>(); }); CHECK(consistent) << "Size of columns should be the same."; diff --git a/tests/python/test_with_arrow.py b/tests/python/test_with_arrow.py index 145cc0f2b3d9..827aa1709929 100644 --- a/tests/python/test_with_arrow.py +++ b/tests/python/test_with_arrow.py @@ -1,5 +1,4 @@ import os -import sys import numpy as np import pytest diff --git a/tests/python/test_with_polars.py b/tests/python/test_with_polars.py new file mode 100644 index 000000000000..11eac717398f --- /dev/null +++ b/tests/python/test_with_polars.py @@ -0,0 +1,55 @@ +from typing import Type, Union + +import numpy as np +import pytest +import xgboost as xgb +from xgboost import testing as tm + +pytestmark = [pytest.mark.skipif(**tm.no_polars()), tm.timeout(30)] + +import polars as pl + + +@pytest.mark.parametrize("DMatrixT", [xgb.DMatrix, xgb.QuantileDMatrix]) +def test_polars_basic( + DMatrixT: Union[Type[xgb.DMatrix], Type[xgb.QuantileDMatrix]] +) -> None: + df = pl.DataFrame({"a": [1, 2, 3], "b": [3, 4, 5]}) + Xy = DMatrixT(df) + assert Xy.num_row() == df.shape[0] + assert Xy.num_col() == df.shape[1] + assert Xy.num_nonmissing() == np.prod(df.shape) + + res = Xy.get_data().toarray() + res1 = df.to_numpy() + + if isinstance(Xy, xgb.QuantileDMatrix): + np.testing.assert_allclose(res[1:, :], res1[1:, :]) + else: + np.testing.assert_allclose(res, res1) + + # boolean + df = pl.DataFrame({"a": [True, False, False], "b": [False, False, True]}) + Xy = DMatrixT(df) + np.testing.assert_allclose( + Xy.get_data().data, np.array([1, 0, 0, 0, 0, 1]), atol=1e-5 + ) + + +def test_polars_missing() -> None: + df = pl.DataFrame({"a": [1, None, 3], "b": [3, 4, None]}) + Xy = xgb.DMatrix(df) + assert Xy.num_row() == df.shape[0] + assert Xy.num_col() == df.shape[1] + assert Xy.num_nonmissing() == 4 + + np.testing.assert_allclose(Xy.get_data().data, np.array([1, 3, 4, 3])) + np.testing.assert_allclose(Xy.get_data().indptr, np.array([0, 2, 3, 4])) + np.testing.assert_allclose(Xy.get_data().indices, np.array([0, 1, 1, 0])) + + ser = pl.Series("y", np.arange(0, df.shape[0])) + Xy.set_info(label=ser) + booster = xgb.train({}, Xy, num_boost_round=1) + predt0 = booster.inplace_predict(df) + predt1 = booster.predict(Xy) + np.testing.assert_allclose(predt0, predt1) diff --git a/tests/python/test_with_sklearn.py b/tests/python/test_with_sklearn.py index 3f2b13038c34..42973e843f31 100644 --- a/tests/python/test_with_sklearn.py +++ b/tests/python/test_with_sklearn.py @@ -1,7 +1,6 @@ import json import os import pickle -import random import re import tempfile import warnings