Skip to content

Commit

Permalink
Revert 'Change DP conditions'
Browse files Browse the repository at this point in the history
  • Loading branch information
Retribution98 committed Aug 13, 2024
1 parent 462b7c1 commit e183124
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 43 deletions.
45 changes: 3 additions & 42 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
from pandas.core.indexes.api import Index, RangeIndex

from modin.config import (
CpuCount,
Engine,
IsRayCluster,
MinColumnPartitionSize,
Expand Down Expand Up @@ -212,22 +211,6 @@ def num_parts(self) -> int:
"""
return np.prod(self._partitions.shape)

@property
def size(self) -> Optional[int]:
"""
Get an int representing the number of elements in this frame, if known.
Returns
-------
int or None
"""
if self.has_index_cache and self.has_columns_cache:
return len(self.index) * len(self.columns)
elif self._row_lengths_cache and self._column_widths_cache:
return sum(self._row_lengths_cache) * sum(self._column_widths_cache)
else:
return None

@property
def row_lengths(self):
"""
Expand Down Expand Up @@ -3282,31 +3265,9 @@ def broadcast_apply(
axis
), self.copy_axis_cache(axis)

# check the conditions for use of dynamic partitioning
use_dynamic_partitioning = False
if self.num_parts <= 1.5 * CpuCount.get():
use_dynamic_partitioning = True

# When the frame is large, dynamic partitioning
# performs worse than the based approach
frame_size = self.size
if frame_size and (frame_size >= 4 * 10**9 or len(self) >= 10**7):
use_dynamic_partitioning = False

if use_dynamic_partitioning:
new_frame = self._partition_mgr_cls.broadcast_axis_partitions(
axis=axis ^ 1,
left=left_parts,
right=right_parts,
apply_func=func,
broadcast_all=False,
keep_partitioning=True,
)
else:
new_frame = self._partition_mgr_cls.broadcast_apply(
axis, func, left_parts, right_parts
)

new_frame = self._partition_mgr_cls.broadcast_apply(
axis, func, left_parts, right_parts
)
if isinstance(dtypes, str) and dtypes == "copy":
dtypes = self.copy_dtypes_cache()

Expand Down
53 changes: 52 additions & 1 deletion modin/core/dataframe/pandas/partitioning/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ def get_partitions(index):

@classmethod
@wait_computations_if_benchmark_mode
def broadcast_apply(cls, axis, apply_func, left, right):
def base_broadcast_apply(cls, axis, apply_func, left, right):
"""
Broadcast the `right` partitions to `left` and apply `apply_func` function.
Expand Down Expand Up @@ -652,6 +652,57 @@ def base_map_partitions(
]
)

@classmethod
@wait_computations_if_benchmark_mode
def broadcast_apply(
cls,
axis,
apply_func,
left,
right,
):
"""
Broadcast the `right` partitions to `left` and apply `apply_func` function using different approaches to achieve the best performance.
Parameters
----------
axis : {0, 1}
Axis to apply and broadcast over.
apply_func : callable
Function to apply.
left : np.ndarray
NumPy array of left partitions.
right : np.ndarray
NumPy array of right partitions.
Returns
-------
np.ndarray
NumPy array of result partition objects.
"""
# The `broadcast_apply` runtime condition differs from
# the same condition in `map_partitions` because the columnar
# approach for `broadcast_apply` results in a slowdown.
if np.prod(left.shape) <= 1.5 * CpuCount.get():
# block-wise broadcast
new_partitions = cls.base_broadcast_apply(
axis,
apply_func,
left,
right,
)
else:
# axis-wise broadcast
new_partitions = cls.broadcast_axis_partitions(
axis=axis ^ 1,
left=left,
right=right,
apply_func=apply_func,
broadcast_all=False,
keep_partitioning=True,
)
return new_partitions

@classmethod
@wait_computations_if_benchmark_mode
def map_partitions(
Expand Down

0 comments on commit e183124

Please sign in to comment.