Skip to content

Commit

Permalink
fix ci
Browse files Browse the repository at this point in the history
  • Loading branch information
Retribution98 committed Jul 10, 2024
1 parent c6d3659 commit 3fcff00
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 27 deletions.
8 changes: 5 additions & 3 deletions modin/core/dataframe/algebra/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -655,9 +655,11 @@ def aggregate_on_dict(grp_obj, *args, **kwargs):
)

native_res_part = [] if native_agg_res is None else [native_agg_res]
result = pandas.concat(
[*native_res_part, *custom_results], axis=1, copy=False
)
parts = [*native_res_part, *custom_results]
if parts:
result = pandas.concat(parts, axis=1, copy=False)
else:
result = pandas.DataFrame(columns=result_columns)

# The order is naturally preserved if there's no custom aggregations
if preserve_aggregation_order and len(custom_aggs):
Expand Down
4 changes: 1 addition & 3 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3265,9 +3265,7 @@ def broadcast_apply(
axis
), self.copy_axis_cache(axis)

new_frame = self._partition_mgr_cls.broadcast_apply(
axis, func, left_parts, right_parts
)
new_frame = self._partition_mgr_cls.apply(axis, func, left_parts, right_parts)
if isinstance(dtypes, str) and dtypes == "copy":
dtypes = self.copy_dtypes_cache()

Expand Down
28 changes: 9 additions & 19 deletions modin/core/dataframe/pandas/partitioning/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,9 +338,7 @@ def groupby_reduce(
f"the number of partitions along {axis=} is not equal: "
+ f"{partitions.shape[axis]} != {by.shape[axis]}"
)
mapped_partitions = cls.broadcast_apply(
axis, map_func, left=partitions, right=by
)
mapped_partitions = cls.apply(axis, map_func, left=partitions, right=by)
else:
mapped_partitions = cls.map_partitions(partitions, map_func)

Expand Down Expand Up @@ -439,7 +437,7 @@ def get_partitions(index):

@classmethod
@wait_computations_if_benchmark_mode
def base_broadcast_apply(cls, axis, apply_func, left, right):
def broadcast_apply(cls, axis, apply_func, left, right):
"""
Broadcast the `right` partitions to `left` and apply `apply_func` function.
Expand Down Expand Up @@ -494,13 +492,12 @@ def map_func(df, *others):

@classmethod
@wait_computations_if_benchmark_mode
def broadcast_axis(
def apply_axis_partitions(
cls,
axis,
apply_func,
left,
right,
keep_partitioning=False,
):
"""
Broadcast the `right` partitions to `left` and apply `apply_func` along full `axis`.
Expand Down Expand Up @@ -530,21 +527,15 @@ def broadcast_axis(
This method differs from `broadcast_axis_partitions` in that it does not send
all right partitions for each remote task based on the left partitions.
"""
num_splits = len(left) if axis == 0 else len(left.T)
preprocessed_map_func = cls.preprocess_func(apply_func)
left_partitions = cls.axis_partition(left, axis)
right_partitions = None if right is None else cls.axis_partition(right, axis)
kw = {
"num_splits": num_splits,
"maintain_partitioning": keep_partitioning,
}

result_blocks = np.array(
[
left_partitions[i].apply(
preprocessed_map_func,
other_axis_partition=right_partitions[i],
**kw,
)
for i in np.arange(len(left_partitions))
]
Expand Down Expand Up @@ -711,7 +702,7 @@ def base_map_partitions(

@classmethod
@wait_computations_if_benchmark_mode
def broadcast_apply(
def apply(
cls,
axis,
apply_func,
Expand All @@ -738,31 +729,30 @@ def broadcast_apply(
np.ndarray
NumPy array of result partition objects.
"""
# The condition for the execution of `base_broadcast_apply` is different from
# The condition for the execution of `broadcast_apply` is different from
# the same condition in the `map_partitions`, since the columnar partitioning approach
# cannot be implemented for the `broadcast_apply`. This is due to the fact that different
# partitions of the left and right dataframes are possible for the `broadcast_apply`,
# cannot be implemented for the `apply`. This is due to the fact that different
# partitions of the left and right dataframes are possible for the `apply`,
# as a result of which it is necessary to merge partitions on both axes at once,
# which leads to large slowdowns.
if (
np.prod(left.shape) <= 1.5 * CpuCount.get()
or left.shape[axis] < CpuCount.get() // 5
):
# block-wise broadcast
new_partitions = cls.base_broadcast_apply(
new_partitions = cls.broadcast_apply(
axis,
apply_func,
left,
right,
)
else:
# axis-wise broadcast
new_partitions = cls.broadcast_axis(
new_partitions = cls.apply_axis_partitions(
axis=axis ^ 1,
left=left,
right=right,
apply_func=apply_func,
keep_partitioning=True,
)
return new_partitions

Expand Down
4 changes: 2 additions & 2 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2628,7 +2628,7 @@ def fillna(df):
def fillna_builder(df, right):
return df.fillna(value=right, **kwargs)

new_modin_frame = self._modin_frame.broadcast_apply(
new_modin_frame = self._modin_frame.apply(
0, fillna_builder, value._modin_frame
)
return self.__constructor__(new_modin_frame)
Expand Down Expand Up @@ -3161,7 +3161,7 @@ def reduce(df: pandas.DataFrame, mask: pandas.DataFrame):

return df[to_take]

result = self._modin_frame.broadcast_apply(
result = self._modin_frame.apply(
# 'masks' have identical partitioning as we specified 'keep_partitioning=True' before,
# this means that we can safely skip the 'co-partitioning' stage
axis=1,
Expand Down

0 comments on commit 3fcff00

Please sign in to comment.