From af5ed0628a4a609f70becd29fd081a290ba981a0 Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Fri, 7 Jun 2024 23:31:23 +0200 Subject: [PATCH] PERF-#7299: Avoid using `synchronize_labels` for `combine` function (#7300) Signed-off-by: Anatoly Myachev --- .../dataframe/pandas/dataframe/dataframe.py | 11 +++++++-- .../pandas/partitioning/partition_manager.py | 16 ++++++++++--- modin/core/dataframe/pandas/utils.py | 23 ++++++++++++++++--- 3 files changed, 42 insertions(+), 8 deletions(-) diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index 3b56c5762f6..9f781eac673 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -2891,7 +2891,15 @@ def combine(self) -> PandasDataframe: PandasDataframe A single partition PandasDataframe. """ - partitions = self._partition_mgr_cls.combine(self._partitions) + new_index = None + new_columns = None + if self._deferred_index: + new_index = self.index + if self._deferred_column: + new_columns = self.columns + partitions = self._partition_mgr_cls.combine( + self._partitions, new_index, new_columns + ) result = self.__constructor__( partitions, index=self.copy_index_cache(), @@ -2909,7 +2917,6 @@ def combine(self) -> PandasDataframe: dtypes=self.copy_dtypes_cache(), pandas_backend=self._pandas_backend, ) - result.synchronize_labels() return result @lazy_metadata_decorator(apply_axis="both") diff --git a/modin/core/dataframe/pandas/partitioning/partition_manager.py b/modin/core/dataframe/pandas/partitioning/partition_manager.py index 79b3d9ccf75..dcd24079f95 100644 --- a/modin/core/dataframe/pandas/partitioning/partition_manager.py +++ b/modin/core/dataframe/pandas/partitioning/partition_manager.py @@ -1261,7 +1261,7 @@ def _apply_func_to_list_of_partitions(cls, func, partitions, **kwargs): return [obj.apply(preprocessed_func, **kwargs) for obj in partitions] @classmethod - def combine(cls, partitions): + def combine(cls, partitions, new_index=None, new_columns=None): """ Convert a NumPy 2D array of partitions to a NumPy 2D array of a single partition. @@ -1269,19 +1269,29 @@ def combine(cls, partitions): ---------- partitions : np.ndarray The partitions which have to be converted to a single partition. + new_index : pandas.Index, optional + Index for propagation into internal partitions. + Optimization allowing to do this in one remote kernel. + new_columns : pandas.Index, optional + Columns for propagation into internal partitions. + Optimization allowing to do this in one remote kernel. Returns ------- np.ndarray A NumPy 2D array of a single partition. """ - if partitions.size <= 1: + if partitions.size <= 1 and new_index is None and new_columns is None: return partitions def to_pandas_remote(df, partition_shape, *dfs): """Copy of ``cls.to_pandas()`` method adapted for a remote function.""" return create_pandas_df_from_partitions( - (df,) + dfs, partition_shape, called_from_remote=True + (df,) + dfs, + partition_shape, + called_from_remote=True, + new_index=new_index, + new_columns=new_columns, ) preprocessed_func = cls.preprocess_func(to_pandas_remote) diff --git a/modin/core/dataframe/pandas/utils.py b/modin/core/dataframe/pandas/utils.py index 358becd8ba2..500bfb14f5d 100644 --- a/modin/core/dataframe/pandas/utils.py +++ b/modin/core/dataframe/pandas/utils.py @@ -72,7 +72,11 @@ def concatenate(dfs, copy=True): def create_pandas_df_from_partitions( - partition_data, partition_shape, called_from_remote=False + partition_data, + partition_shape, + called_from_remote=False, + new_index=None, + new_columns=None, ): """ Convert partition data of multiple dataframes to a single dataframe. @@ -85,6 +89,12 @@ def create_pandas_df_from_partitions( Shape of the partitions NumPy array. called_from_remote : bool, default: False Flag used to check if explicit copy should be done in concat. + new_index : pandas.Index, optional + Index for propagation into internal partitions. + Optimization allowing to do this in one remote kernel. + new_columns : pandas.Index, optional + Columns for propagation into internal partitions. + Optimization allowing to do this in one remote kernel. Returns ------- @@ -127,6 +137,13 @@ def is_part_empty(part): del partition_data if len(df_rows) == 0: - return pandas.DataFrame() + res = pandas.DataFrame() else: - return concatenate(df_rows, copy=not called_from_remote) + res = concatenate(df_rows, copy=not called_from_remote) + + if new_index is not None: + res.index = new_index + if new_columns is not None: + res.columns = new_columns + + return res