diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index 5456f28f127..a4d9c1a515e 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -33,6 +33,7 @@ from pandas.core.indexes.api import Index, RangeIndex from modin.config import ( + CpuCount, Engine, IsRayCluster, MinColumnPartitionSize, @@ -211,6 +212,22 @@ def num_parts(self) -> int: """ return np.prod(self._partitions.shape) + @property + def size(self) -> Optional[int]: + """ + Get an int representing the number of elements in this frame, if known. + + Returns + ------- + int or None + """ + if self.has_index_cache and self.has_columns_cache: + return len(self.index) * len(self.columns) + elif self._row_lengths_cache and self._column_widths_cache: + return sum(self._row_lengths_cache) * sum(self._column_widths_cache) + else: + return None + @property def row_lengths(self): """ @@ -3265,9 +3282,31 @@ def broadcast_apply( axis ), self.copy_axis_cache(axis) - new_frame = self._partition_mgr_cls.broadcast_apply( - axis, func, left_parts, right_parts - ) + # check the conditions for use of dynamic partitioning + use_dynamic_partitioning = False + if self.num_parts <= 1.5 * CpuCount.get(): + use_dynamic_partitioning = True + + # When the frame is large, dynamic partitioning + # performs worse than the based approach + frame_size = self.size + if frame_size and (frame_size >= 4 * 10**9 or len(self) >= 10**7): + use_dynamic_partitioning = False + + if use_dynamic_partitioning: + new_frame = self._partition_mgr_cls.broadcast_axis_partitions( + axis=axis ^ 1, + left=left_parts, + right=right_parts, + apply_func=func, + broadcast_all=False, + keep_partitioning=True, + ) + else: + new_frame = self._partition_mgr_cls.broadcast_apply( + axis, func, left_parts, right_parts + ) + if isinstance(dtypes, str) and dtypes == "copy": dtypes = self.copy_dtypes_cache() diff --git a/modin/core/dataframe/pandas/partitioning/partition_manager.py b/modin/core/dataframe/pandas/partitioning/partition_manager.py index bd3a1d14760..22b9f4c0060 100644 --- a/modin/core/dataframe/pandas/partitioning/partition_manager.py +++ b/modin/core/dataframe/pandas/partitioning/partition_manager.py @@ -439,7 +439,7 @@ def get_partitions(index): @classmethod @wait_computations_if_benchmark_mode - def base_broadcast_apply(cls, axis, apply_func, left, right): + def broadcast_apply(cls, axis, apply_func, left, right): """ Broadcast the `right` partitions to `left` and apply `apply_func` function. @@ -652,57 +652,6 @@ def base_map_partitions( ] ) - @classmethod - @wait_computations_if_benchmark_mode - def broadcast_apply( - cls, - axis, - apply_func, - left, - right, - ): - """ - Broadcast the `right` partitions to `left` and apply `apply_func` function using different approaches to achieve the best performance. - - Parameters - ---------- - axis : {0, 1} - Axis to apply and broadcast over. - apply_func : callable - Function to apply. - left : np.ndarray - NumPy array of left partitions. - right : np.ndarray - NumPy array of right partitions. - - Returns - ------- - np.ndarray - NumPy array of result partition objects. - """ - # The `broadcast_apply` runtime condition differs from - # the same condition in `map_partitions` because the columnar - # approach for `broadcast_apply` results in a slowdown. - if np.prod(left.shape) <= 1.5 * CpuCount.get(): - # block-wise broadcast - new_partitions = cls.base_broadcast_apply( - axis, - apply_func, - left, - right, - ) - else: - # axis-wise broadcast - new_partitions = cls.broadcast_axis_partitions( - axis=axis ^ 1, - left=left, - right=right, - apply_func=apply_func, - broadcast_all=False, - keep_partitioning=True, - ) - return new_partitions - @classmethod @wait_computations_if_benchmark_mode def map_partitions( diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index 7370673dd3e..d3219de05d6 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -46,6 +46,7 @@ from pandas.errors import DataError from modin.config import RangePartitioning +from modin.config.envvars import CpuCount from modin.core.dataframe.algebra import ( Binary, Fold, @@ -3107,8 +3108,12 @@ def dropna(self, **kwargs): lib.no_default, None, ) + # The map reduce approach works well for frames with few columnar partitions + processable_amount_of_partitions = ( + self._modin_frame.num_parts < CpuCount.get() * 32 + ) - if is_column_wise and no_thresh_passed: + if is_column_wise and no_thresh_passed and processable_amount_of_partitions: how = kwargs.get("how", "any") subset = kwargs.get("subset") how = "any" if how in (lib.no_default, None) else how