Skip to content

Commit

Permalink
Change DP conditions
Browse files Browse the repository at this point in the history
  • Loading branch information
Retribution98 committed Aug 1, 2024
1 parent 13bcd19 commit 462b7c1
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 56 deletions.
45 changes: 42 additions & 3 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from pandas.core.indexes.api import Index, RangeIndex

from modin.config import (
CpuCount,
Engine,
IsRayCluster,
MinColumnPartitionSize,
Expand Down Expand Up @@ -211,6 +212,22 @@ def num_parts(self) -> int:
"""
return np.prod(self._partitions.shape)

@property
def size(self) -> Optional[int]:
"""
Get an int representing the number of elements in this frame, if known.
Returns
-------
int or None
"""
if self.has_index_cache and self.has_columns_cache:
return len(self.index) * len(self.columns)
elif self._row_lengths_cache and self._column_widths_cache:
return sum(self._row_lengths_cache) * sum(self._column_widths_cache)

Check warning on line 227 in modin/core/dataframe/pandas/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

modin/core/dataframe/pandas/dataframe/dataframe.py#L226-L227

Added lines #L226 - L227 were not covered by tests
else:
return None

Check warning on line 229 in modin/core/dataframe/pandas/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

modin/core/dataframe/pandas/dataframe/dataframe.py#L229

Added line #L229 was not covered by tests

@property
def row_lengths(self):
"""
Expand Down Expand Up @@ -3265,9 +3282,31 @@ def broadcast_apply(
axis
), self.copy_axis_cache(axis)

new_frame = self._partition_mgr_cls.broadcast_apply(
axis, func, left_parts, right_parts
)
# check the conditions for use of dynamic partitioning
use_dynamic_partitioning = False
if self.num_parts <= 1.5 * CpuCount.get():
use_dynamic_partitioning = True

# When the frame is large, dynamic partitioning
# performs worse than the based approach
frame_size = self.size
if frame_size and (frame_size >= 4 * 10**9 or len(self) >= 10**7):
use_dynamic_partitioning = False

Check warning on line 3294 in modin/core/dataframe/pandas/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

modin/core/dataframe/pandas/dataframe/dataframe.py#L3294

Added line #L3294 was not covered by tests

if use_dynamic_partitioning:
new_frame = self._partition_mgr_cls.broadcast_axis_partitions(
axis=axis ^ 1,
left=left_parts,
right=right_parts,
apply_func=func,
broadcast_all=False,
keep_partitioning=True,
)
else:
new_frame = self._partition_mgr_cls.broadcast_apply(
axis, func, left_parts, right_parts
)

if isinstance(dtypes, str) and dtypes == "copy":
dtypes = self.copy_dtypes_cache()

Expand Down
53 changes: 1 addition & 52 deletions modin/core/dataframe/pandas/partitioning/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ def get_partitions(index):

@classmethod
@wait_computations_if_benchmark_mode
def base_broadcast_apply(cls, axis, apply_func, left, right):
def broadcast_apply(cls, axis, apply_func, left, right):
"""
Broadcast the `right` partitions to `left` and apply `apply_func` function.
Expand Down Expand Up @@ -652,57 +652,6 @@ def base_map_partitions(
]
)

@classmethod
@wait_computations_if_benchmark_mode
def broadcast_apply(
cls,
axis,
apply_func,
left,
right,
):
"""
Broadcast the `right` partitions to `left` and apply `apply_func` function using different approaches to achieve the best performance.
Parameters
----------
axis : {0, 1}
Axis to apply and broadcast over.
apply_func : callable
Function to apply.
left : np.ndarray
NumPy array of left partitions.
right : np.ndarray
NumPy array of right partitions.
Returns
-------
np.ndarray
NumPy array of result partition objects.
"""
# The `broadcast_apply` runtime condition differs from
# the same condition in `map_partitions` because the columnar
# approach for `broadcast_apply` results in a slowdown.
if np.prod(left.shape) <= 1.5 * CpuCount.get():
# block-wise broadcast
new_partitions = cls.base_broadcast_apply(
axis,
apply_func,
left,
right,
)
else:
# axis-wise broadcast
new_partitions = cls.broadcast_axis_partitions(
axis=axis ^ 1,
left=left,
right=right,
apply_func=apply_func,
broadcast_all=False,
keep_partitioning=True,
)
return new_partitions

@classmethod
@wait_computations_if_benchmark_mode
def map_partitions(
Expand Down
7 changes: 6 additions & 1 deletion modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from pandas.errors import DataError

from modin.config import RangePartitioning
from modin.config.envvars import CpuCount
from modin.core.dataframe.algebra import (
Binary,
Fold,
Expand Down Expand Up @@ -3107,8 +3108,12 @@ def dropna(self, **kwargs):
lib.no_default,
None,
)
# The map reduce approach works well for frames with few columnar partitions
processable_amount_of_partitions = (
self._modin_frame.num_parts < CpuCount.get() * 32
)

if is_column_wise and no_thresh_passed:
if is_column_wise and no_thresh_passed and processable_amount_of_partitions:
how = kwargs.get("how", "any")
subset = kwargs.get("subset")
how = "any" if how in (lib.no_default, None) else how
Expand Down

0 comments on commit 462b7c1

Please sign in to comment.