From 3fcff00d670c545d11477c4c21c868846e5753e2 Mon Sep 17 00:00:00 2001 From: Kirill Suvorov Date: Wed, 10 Jul 2024 17:48:40 +0000 Subject: [PATCH] fix ci --- modin/core/dataframe/algebra/groupby.py | 8 ++++-- .../dataframe/pandas/dataframe/dataframe.py | 4 +-- .../pandas/partitioning/partition_manager.py | 28 ++++++------------- .../storage_formats/pandas/query_compiler.py | 4 +-- 4 files changed, 17 insertions(+), 27 deletions(-) diff --git a/modin/core/dataframe/algebra/groupby.py b/modin/core/dataframe/algebra/groupby.py index cc9196a422a..fec0fe3c6ac 100644 --- a/modin/core/dataframe/algebra/groupby.py +++ b/modin/core/dataframe/algebra/groupby.py @@ -655,9 +655,11 @@ def aggregate_on_dict(grp_obj, *args, **kwargs): ) native_res_part = [] if native_agg_res is None else [native_agg_res] - result = pandas.concat( - [*native_res_part, *custom_results], axis=1, copy=False - ) + parts = [*native_res_part, *custom_results] + if parts: + result = pandas.concat(parts, axis=1, copy=False) + else: + result = pandas.DataFrame(columns=result_columns) # The order is naturally preserved if there's no custom aggregations if preserve_aggregation_order and len(custom_aggs): diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index 5456f28f127..5d46d4b3190 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -3265,9 +3265,7 @@ def broadcast_apply( axis ), self.copy_axis_cache(axis) - new_frame = self._partition_mgr_cls.broadcast_apply( - axis, func, left_parts, right_parts - ) + new_frame = self._partition_mgr_cls.apply(axis, func, left_parts, right_parts) if isinstance(dtypes, str) and dtypes == "copy": dtypes = self.copy_dtypes_cache() diff --git a/modin/core/dataframe/pandas/partitioning/partition_manager.py b/modin/core/dataframe/pandas/partitioning/partition_manager.py index 97764e3316d..ebed831933a 100644 --- a/modin/core/dataframe/pandas/partitioning/partition_manager.py +++ b/modin/core/dataframe/pandas/partitioning/partition_manager.py @@ -338,9 +338,7 @@ def groupby_reduce( f"the number of partitions along {axis=} is not equal: " + f"{partitions.shape[axis]} != {by.shape[axis]}" ) - mapped_partitions = cls.broadcast_apply( - axis, map_func, left=partitions, right=by - ) + mapped_partitions = cls.apply(axis, map_func, left=partitions, right=by) else: mapped_partitions = cls.map_partitions(partitions, map_func) @@ -439,7 +437,7 @@ def get_partitions(index): @classmethod @wait_computations_if_benchmark_mode - def base_broadcast_apply(cls, axis, apply_func, left, right): + def broadcast_apply(cls, axis, apply_func, left, right): """ Broadcast the `right` partitions to `left` and apply `apply_func` function. @@ -494,13 +492,12 @@ def map_func(df, *others): @classmethod @wait_computations_if_benchmark_mode - def broadcast_axis( + def apply_axis_partitions( cls, axis, apply_func, left, right, - keep_partitioning=False, ): """ Broadcast the `right` partitions to `left` and apply `apply_func` along full `axis`. @@ -530,21 +527,15 @@ def broadcast_axis( This method differs from `broadcast_axis_partitions` in that it does not send all right partitions for each remote task based on the left partitions. """ - num_splits = len(left) if axis == 0 else len(left.T) preprocessed_map_func = cls.preprocess_func(apply_func) left_partitions = cls.axis_partition(left, axis) right_partitions = None if right is None else cls.axis_partition(right, axis) - kw = { - "num_splits": num_splits, - "maintain_partitioning": keep_partitioning, - } result_blocks = np.array( [ left_partitions[i].apply( preprocessed_map_func, other_axis_partition=right_partitions[i], - **kw, ) for i in np.arange(len(left_partitions)) ] @@ -711,7 +702,7 @@ def base_map_partitions( @classmethod @wait_computations_if_benchmark_mode - def broadcast_apply( + def apply( cls, axis, apply_func, @@ -738,10 +729,10 @@ def broadcast_apply( np.ndarray NumPy array of result partition objects. """ - # The condition for the execution of `base_broadcast_apply` is different from + # The condition for the execution of `broadcast_apply` is different from # the same condition in the `map_partitions`, since the columnar partitioning approach - # cannot be implemented for the `broadcast_apply`. This is due to the fact that different - # partitions of the left and right dataframes are possible for the `broadcast_apply`, + # cannot be implemented for the `apply`. This is due to the fact that different + # partitions of the left and right dataframes are possible for the `apply`, # as a result of which it is necessary to merge partitions on both axes at once, # which leads to large slowdowns. if ( @@ -749,7 +740,7 @@ def broadcast_apply( or left.shape[axis] < CpuCount.get() // 5 ): # block-wise broadcast - new_partitions = cls.base_broadcast_apply( + new_partitions = cls.broadcast_apply( axis, apply_func, left, @@ -757,12 +748,11 @@ def broadcast_apply( ) else: # axis-wise broadcast - new_partitions = cls.broadcast_axis( + new_partitions = cls.apply_axis_partitions( axis=axis ^ 1, left=left, right=right, apply_func=apply_func, - keep_partitioning=True, ) return new_partitions diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index 9d4467c2085..061b15fe298 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -2628,7 +2628,7 @@ def fillna(df): def fillna_builder(df, right): return df.fillna(value=right, **kwargs) - new_modin_frame = self._modin_frame.broadcast_apply( + new_modin_frame = self._modin_frame.apply( 0, fillna_builder, value._modin_frame ) return self.__constructor__(new_modin_frame) @@ -3161,7 +3161,7 @@ def reduce(df: pandas.DataFrame, mask: pandas.DataFrame): return df[to_take] - result = self._modin_frame.broadcast_apply( + result = self._modin_frame.apply( # 'masks' have identical partitioning as we specified 'keep_partitioning=True' before, # this means that we can safely skip the 'co-partitioning' stage axis=1,