Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX-#7329: Do not sort columns on df.update #7330

Merged
merged 3 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions modin/core/dataframe/algebra/binary.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ def register(
cls,
func: Callable[..., pandas.DataFrame],
join_type: str = "outer",
sort: bool = None,
labels: str = "replace",
infer_dtypes: Optional[str] = None,
) -> Callable[..., PandasQueryCompiler]:
Expand All @@ -310,6 +311,8 @@ def register(
Binary function to execute. Have to be able to accept at least two arguments.
join_type : {'left', 'right', 'outer', 'inner', None}, default: 'outer'
Type of join that will be used if indices of operands are not aligned.
sort : bool, default: None
Whether to sort index and columns or not.
labels : {"keep", "replace", "drop"}, default: "replace"
Whether keep labels from left Modin DataFrame, replace them with labels
from joined DataFrame or drop altogether to make them be computed lazily later.
Expand Down Expand Up @@ -419,6 +422,7 @@ def caller(
lambda x, y: func(x, y, *args, **kwargs),
[other._modin_frame],
join_type=join_type,
sort=sort,
labels=labels,
dtypes=dtypes,
),
Expand Down
21 changes: 15 additions & 6 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3255,7 +3255,6 @@ def broadcast_apply(
axis,
other,
join_type,
sort=not self.get_axis(axis).equals(other.get_axis(axis)),
)
# unwrap list returned by `copartition`.
right_parts = right_parts[0]
Expand Down Expand Up @@ -3681,7 +3680,7 @@ def _check_if_axes_identical(self, other: PandasDataframe, axis: int = 0) -> boo
) and self._get_axis_lengths(axis) == other._get_axis_lengths(axis)

def _copartition(
self, axis, other, how, sort, force_repartition=False, fill_value=None
self, axis, other, how, sort=None, force_repartition=False, fill_value=None
):
"""
Copartition two Modin DataFrames.
Expand All @@ -3696,8 +3695,9 @@ def _copartition(
Other Modin DataFrame(s) to copartition against.
how : str
How to manage joining the index object ("left", "right", etc.).
sort : bool
sort : bool, default: None
Whether sort the joined index or not.
If ``None``, sort is defined in depend on labels equality along the axis.
force_repartition : bool, default: False
Whether force the repartitioning or not. By default,
this method will skip repartitioning if it is possible. This is because
Expand Down Expand Up @@ -3730,6 +3730,9 @@ def _copartition(
self._get_axis_lengths_cache(axis),
)

if sort is None:
sort = not all(self.get_axis(axis).equals(o.get_axis(axis)) for o in other)

self_index = self.get_axis(axis)
others_index = [o.get_axis(axis) for o in other]
joined_index, make_reindexer = self._join_index_objects(
Expand Down Expand Up @@ -3823,6 +3826,7 @@ def n_ary_op(
op,
right_frames: list[PandasDataframe],
join_type="outer",
sort=None,
copartition_along_columns=True,
labels="replace",
dtypes: Optional[pandas.Series] = None,
Expand All @@ -3838,6 +3842,8 @@ def n_ary_op(
Modin DataFrames to join with.
join_type : str, default: "outer"
Type of join to apply.
sort : bool, default: None
Whether to sort index and columns or not.
copartition_along_columns : bool, default: True
Whether to perform copartitioning along columns or not.
For some ops this isn't needed (e.g., `fillna`).
Expand All @@ -3854,7 +3860,10 @@ def n_ary_op(
New Modin DataFrame.
"""
left_parts, list_of_right_parts, joined_index, row_lengths = self._copartition(
0, right_frames, join_type, sort=True
0,
right_frames,
join_type,
sort=sort,
)
if copartition_along_columns:
new_left_frame = self.__constructor__(
Expand Down Expand Up @@ -3886,7 +3895,7 @@ def n_ary_op(
1,
new_right_frames,
join_type,
sort=True,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was sort=True here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to add more tests because you are changing the behavior of a low-level function that is used to implement many high-level functions.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My assumption is that someone was thinking this should work for all cases.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What tests would you suggest to add?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have pretty extensive test suite for checking high-level functions.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My assumption is that someone was thinking this should work for all cases.

Why do you think that this should always work without sorting now? My doubts, for example, are based on this line:

sort=not self.get_axis(axis).equals(other.get_axis(axis)),

What tests would you suggest to add?

I don't have any specific functions, you can add for any few you find.

We already have pretty extensive test suite for checking high-level functions.

I'm not sure we wrote many tests taking into account the order of the columns.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, handling sort here is a bit tricky. Changed the logic and added a test for some bin op.

sort=sort,
)
else:
joined_columns = self.copy_columns_cache(copy_lengths=True)
Expand Down Expand Up @@ -3978,7 +3987,7 @@ def _compute_new_widths():
joined_index,
partition_sizes_along_axis,
) = self._copartition(
axis.value ^ 1, others, how, sort, force_repartition=False
axis.value ^ 1, others, how, sort=sort, force_repartition=False
)
if axis == Axis.COL_WISE:
new_lengths = partition_sizes_along_axis
Expand Down
2 changes: 2 additions & 0 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,13 +460,15 @@ def to_numpy(self, **kwargs):
df_update = Binary.register(
copy_df_for_func(pandas.DataFrame.update, display_name="update"),
join_type="left",
sort=False,
)
series_update = Binary.register(
copy_df_for_func(
lambda x, y: pandas.Series.update(x.squeeze(axis=1), y.squeeze(axis=1)),
display_name="update",
),
join_type="left",
sort=False,
)

# Needed for numpy API
Expand Down
27 changes: 27 additions & 0 deletions modin/tests/pandas/dataframe/test_binary.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,3 +527,30 @@ def test_arithmetic_with_tricky_dtypes(val1, val2, op, request):
lambda dfs: getattr(dfs[0], op)(dfs[1]),
expected_exception=expected_exception,
)


@pytest.mark.parametrize(
"data, other_data",
[
({"A": [1, 2, 3], "B": [400, 500, 600]}, {"B": [4, 5, 6], "C": [7, 8, 9]}),
({"C": [1, 2, 3], "B": [400, 500, 600]}, {"B": [4, 5, 6], "A": [7, 8, 9]}),
],
)
@pytest.mark.parametrize("axis", [0, 1])
@pytest.mark.parametrize("match_index", [True, False])
def test_bin_op_mismatched_columns(data, other_data, axis, match_index):
modin_df, pandas_df = create_test_dfs(data)
other_modin_df, other_pandas_df = create_test_dfs(other_data)
if axis == 0:
if not match_index:
modin_df.index = pandas_df.index = ["1", "2", "3"]
other_modin_df.index = other_pandas_df.index = ["2", "1", "3"]
eval_general(
modin_df,
pandas_df,
lambda df: (
df.add(other_modin_df, axis=axis)
if isinstance(df, pd.DataFrame)
else df.add(other_pandas_df, axis=axis)
),
)
1 change: 1 addition & 0 deletions modin/tests/pandas/dataframe/test_map_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -1592,6 +1592,7 @@ def test_transpose(data):
"data, other_data",
[
({"A": [1, 2, 3], "B": [400, 500, 600]}, {"B": [4, 5, 6], "C": [7, 8, 9]}),
({"C": [1, 2, 3], "B": [400, 500, 600]}, {"B": [4, 5, 6], "A": [7, 8, 9]}),
(
{"A": ["a", "b", "c"], "B": ["x", "y", "z"]},
{"B": ["d", "e", "f", "g", "h", "i"]},
Expand Down
Loading