Skip to content

Commit

Permalink
fix handling of external categorical columns
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Chigarev <[email protected]>
  • Loading branch information
dchigarev committed Jan 31, 2024
1 parent 3926adb commit 4d3e418
Showing 1 changed file with 16 additions and 14 deletions.
30 changes: 16 additions & 14 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -4023,7 +4023,7 @@ def apply_func(df): # pragma: no cover
else:
original_dtypes = None

def compute_aligned_columns(*dfs, initial_columns=None):
def compute_aligned_columns(*dfs, initial_columns=None, by=None):
"""Take row partitions, filter empty ones, and return joined columns for them."""
if align_result_columns:
valid_dfs = [
Expand All @@ -4046,18 +4046,6 @@ def compute_aligned_columns(*dfs, initial_columns=None):

masks = None
if add_missing_cats:
external_by_cols = [
None if col.startswith(MODIN_UNNAMED_SERIES_LABEL) else col
for obj in external_by
for col in obj.columns
]
by = []
# restoring original order of 'by' columns
for idx in by_positions:
if idx >= 0:
by.append(external_by_cols[idx])
else:
by.append(internal_by[-idx - 1])
masks, combined_cols = add_missing_categories_to_groupby(
dfs,
by,
Expand All @@ -4074,6 +4062,19 @@ def compute_aligned_columns(*dfs, initial_columns=None):
else (None, masks)
)

external_by_cols = [
None if col.startswith(MODIN_UNNAMED_SERIES_LABEL) else col
for obj in external_by
for col in obj.columns
]
by = []
# restoring original order of 'by' columns
for idx in by_positions:
if idx >= 0:
by.append(external_by_cols[idx])
else:
by.append(internal_by[-idx - 1])

# Passing all partitions to the 'compute_aligned_columns' kernel to get
# aligned columns
parts = result._partitions.flatten()
Expand All @@ -4082,7 +4083,8 @@ def compute_aligned_columns(*dfs, initial_columns=None):
# otherwise, the execution fails. Look into the issue later.
self._partition_mgr_cls.preprocess_func(compute_aligned_columns),
*[part._data for part in parts[1:]],
initial_columns=self.columns,
initial_columns=pandas.Index(external_by_cols).append(self.columns),
by=by,
)

def apply_aligned(df, args, partition_idx):
Expand Down

0 comments on commit 4d3e418

Please sign in to comment.