From 698c9b69e9daa5f7c952d7e4cc5a9fb98c9de0a4 Mon Sep 17 00:00:00 2001 From: Wenlei Xie Date: Tue, 7 Dec 2021 19:28:38 -0800 Subject: [PATCH] Back out "Delegate `IColumn.fill_null/drop_null` to Arrow" Summary: Original commit changeset: 14ad956d406f Original Phabricator Diff: D32770009 The commit fails GitHub CI: https://github.com/facebookresearch/torcharrow/runs/4448410206?check_suite_focus=true In PyArrow 6.0, it's legit for an array to have non-null NullBuffer while null_count is zero: ``` import pyarrow as pa >>> pa.__version__ '6.0.0' >>> a = pa.array([1, 2, None, 3]) >>> a = a.fill_null(12) >>> a.buffers() [, ] ``` So this triggers https://github.com/facebookincubator/velox/blob/674562b94780b8a895fd291c310778e4de73e7e9/velox/vector/arrow/Bridge.cpp#L499-L502 In contrast, PyArrow 2.0 will make NullBuffer to be null: ``` >>> pa.__version__ '2.0.0' >>> a = pa.array([1, 2, None, 3]) >>> a = a.fill_null(12) >>> a.buffers() >>> a.buffers() [None, ] ``` Differential Revision: D32940474 fbshipit-source-id: 2558dbf181b6adf75a0e3f5a9cfbadac1b3d03a4 --- torcharrow/icolumn.py | 17 +++++++++++------ torcharrow/test/test_dataframe.py | 8 ++++---- torcharrow/test/test_numerical_column.py | 8 ++++---- torcharrow/velox_rt/numerical_column_cpu.py | 21 +++++++++++++++++++++ 4 files changed, 40 insertions(+), 14 deletions(-) diff --git a/torcharrow/icolumn.py b/torcharrow/icolumn.py index b2befeff9..7ef91d1a7 100644 --- a/torcharrow/icolumn.py +++ b/torcharrow/icolumn.py @@ -19,7 +19,6 @@ from .dispatcher import Device from .expression import expression -from .interop import from_arrow from .scope import Scope from .trace import trace, traceproperty @@ -1130,12 +1129,18 @@ def fill_null(self, fill_value: ty.Union[dt.ScalarTypes, ty.Dict]): dtype: int64, length: 4, null_count: 0 """ - if isinstance(fill_value, IColumn._scalar_types): - import pyarrow.compute as pc + self._prototype_support_warning("fill_null") - arr = pc.fill_null(self.to_arrow(), fill_value) - arr_dtype = self.dtype.with_null(nullable=False) - return from_arrow(arr, dtype=arr_dtype, device=self.device) + if not isinstance(fill_value, IColumn._scalar_types): + raise TypeError(f"fill_null with {type(fill_value)} is not supported") + if isinstance(fill_value, IColumn._scalar_types): + res = Scope._EmptyColumn(self.dtype.constructor(nullable=False)) + for m, i in self._items(): + if not m: + res._append_value(i) + else: + res._append_value(fill_value) + return res._finalize() else: raise TypeError(f"fill_null with {type(fill_value)} is not supported") diff --git a/torcharrow/test/test_dataframe.py b/torcharrow/test/test_dataframe.py index 05986e493..72e63f435 100644 --- a/torcharrow/test/test_dataframe.py +++ b/torcharrow/test/test_dataframe.py @@ -537,13 +537,13 @@ def base_test_python_comparison_ops(self): assert c == c.append([None]) def base_test_na_handling(self): - c = ta.DataFrame({"a": [None, 2, 17]}, device=self.device) + c = ta.DataFrame({"a": [None, 2, 17.0]}, device=self.device) - self.assertEqual(list(c.fill_null(99)), [(i,) for i in [99, 2, 17]]) - self.assertEqual(list(c.drop_null()), [(i,) for i in [2, 17]]) + self.assertEqual(list(c.fill_null(99.0)), [(i,) for i in [99.0, 2, 17.0]]) + self.assertEqual(list(c.drop_null()), [(i,) for i in [2, 17.0]]) c = c.append([(2,)]) - self.assertEqual(list(c.drop_duplicates()), [(i,) for i in [None, 2, 17]]) + self.assertEqual(list(c.drop_duplicates()), [(i,) for i in [None, 2, 17.0]]) # duplicates with subset d = ta.DataFrame( diff --git a/torcharrow/test/test_numerical_column.py b/torcharrow/test/test_numerical_column.py index 78c6776f7..5d7b768bb 100644 --- a/torcharrow/test/test_numerical_column.py +++ b/torcharrow/test/test_numerical_column.py @@ -381,13 +381,13 @@ def base_test_operators(self): # TODO Test type promotion rules def base_test_na_handling(self): - c = ta.Column([None, 2, 17], device=self.device) + c = ta.Column([None, 2, 17.0], device=self.device) - self.assertEqual(list(c.fill_null(99)), [99, 2, 17]) - self.assertEqual(list(c.drop_null()), [2, 17]) + self.assertEqual(list(c.fill_null(99.0)), [99.0, 2, 17.0]) + self.assertEqual(list(c.drop_null()), [2.0, 17.0]) c = c.append([2]) - self.assertEqual(set(c.drop_duplicates()), {None, 2, 17}) + self.assertEqual(set(c.drop_duplicates()), {None, 2, 17.0}) def base_test_agg_handling(self): import functools diff --git a/torcharrow/velox_rt/numerical_column_cpu.py b/torcharrow/velox_rt/numerical_column_cpu.py index ebb303c66..d51c5cc12 100644 --- a/torcharrow/velox_rt/numerical_column_cpu.py +++ b/torcharrow/velox_rt/numerical_column_cpu.py @@ -631,6 +631,27 @@ def round(self, decimals=0): # data cleaning ----------------------------------------------------------- + @trace + @expression + def fill_null(self, fill_value: Union[dt.ScalarTypes, Dict]): + self._prototype_support_warning("fill_null") + + if not isinstance(fill_value, IColumn._scalar_types): + raise TypeError(f"fill_null with {type(fill_value)} is not supported") + if not self.is_nullable: + return self + else: + col = velox.Column(get_velox_type(self.dtype)) + for i in range(len(self)): + if self._getmask(i): + if isinstance(fill_value, Dict): + raise NotImplementedError() + else: + col.append(fill_value) + else: + col.append(self._getdata(i)) + return ColumnFromVelox._from_velox(self.device, self.dtype, col, True) + @trace @expression def drop_null(self, how="any"):