Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#7337: Using dynamic partitionning in broadcast_apply #7338

Merged
merged 11 commits into from
Aug 26, 2024

Conversation

Retribution98
Copy link
Collaborator

What do these changes do?

  • first commit message and PR title follow format outlined here

    NOTE: If you edit the PR title to match this format, you need to add another commit (even if it's empty) or amend your last commit for the CI job that checks the PR title to pick up the new PR title.

  • passes flake8 modin/ asv_bench/benchmarks scripts/doc_checker.py
  • passes black --check modin/ asv_bench/benchmarks scripts/doc_checker.py
  • signed commit with git commit -s
  • Resolves Using dynamic partitioning for broadcast_apply #7337?
  • tests added and passing
  • module layout described at docs/development/architecture.rst is up-to-date

Performance results were obtained by using Dataframe with shape (20000, 5000).

Operation Modin main This PR
fillna(df_value) 32.460861 1.79279

Comment on lines 3110 to 3165
# FIXME: this is a naive workaround for this problem: https://github.com/modin-project/modin/issues/5394
# if there are too many partitions then all non-full-axis implementations start acting very badly.
# The here threshold is pretty random though it works fine on simple scenarios
processable_amount_of_partitions = (
self._modin_frame.num_parts < CpuCount.get() * 32
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this removed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you can see from this comment, this condition is a naive workaround for a problem that is solved by dynamic partitioning in this PR.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was moved back with different message.

Comment on lines 683 to 686
# The `broadcast_apply` runtime condition differs from
# the same condition in `map_partitions` because the columnar
# approach for `broadcast_apply` results in a slowdown.
if np.prod(left.shape) <= 1.5 * CpuCount.get():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now we have the same condition, haven't we?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, because in map_partitions we use 3 approaches: base_map_partitions, map_partitions_joined_by_column and map_axis_partitions.
In broadcast_apply we use only 2 approaches: base_broadcast_apply and broadcast_axis_partitions because broadcast_joined_by_axis does not work as well as for map.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition changed using a new environment variable DynamicPartitioning

@YarShev
Copy link
Collaborator

YarShev commented Jul 17, 2024

Can you extend the table with measurements in the PR description? dropna, groupby?

@Retribution98
Copy link
Collaborator Author

Retribution98 commented Jul 19, 2024

Actual result from this PR

fillna

rows\cols 32 64 128 256 512 1024 2048 3584
10**4 0.0497 0.1502 0.1629 0.1835 0.2736 0.4397 0.7819 1.4975
10**5 0.0454 0.1533 0.1745 0.2250 0.3691 0.5706 1.0826 2.0390
10**6 0.1062 0.2522 0.4855 1.0331 2.0359 5.6754 10.8838 24.7736

dropna

rows\cols 32 64 128 256 512 1024 2048 3584
10**4 0.2251 0.1471 0.1720 0.2370 0.3101 0.5641 1.2769 1.9646
10**5 0.2431 0.1778 0.1855 0.2301 0.3822 0.5796 1.1278 1.8591
10**6 0.2303 0.5102 0.5533 0.7130 1.0950 1.9358 4.0412 6.7829
10**7 0.2371 $${\color{red}4.8146}$$ $${\color{red}5.4220}$$ $${\color{red}7.2868}$$ $${\color{red}18.8597}$$ $${\color{red}69.6273}$$ $${\color{red}192.3792}$$ $${\color{red}463.7308}$$

groupby.sum

rows\cols 32 64 128 256 512 1024 2048 3584
10**4 0.3525 0.7186 1.1743 2.3160 4.8310 10.8509 33.6107 100.7768

Previous results from main

fillna

rows\cols 32 64 128 256 512 1024 2048 3584
10**4 0.0489 0.0879 0.1756 0.4972 1.3735 3.7084 9.9866 26.9021
10**5 0.0772 0.1471 0.2382 0.5195 1.4396 3.6447 14.2589 37.6544
10**6 0.0902 0.1910 0.5250 1.3777 5.2725 20.2653 84.3823 268.2694

dropna

rows\cols 32 64 128 256 512 1024 2048 3584
10**4 0.2343 0.4562 0.7633 1.4865 3.7967 0.2784 0.5159 0.9843
10**5 0.3956 0.5020 0.9178 1.6317 3.2689 0.3431 0.5513 1.0118
10**6 0.3732 0.5757 1.2170 2.0218 4.5112 1.0296 1.8023 3.5953
10**7 0.3134 1.0215 1.9741 5.0844 10.6587 21.2115 85.1395 238.7371

groupby.sum

rows\cols 32 64 128 256 512 1024 2048 3584
10**4 0.3703 0.6839 1.3411 2.8394 6.7643 18.2716 64.4190 204.0969

Explanatory note

As you can see, fillna and groupby results seem expected and this PR shows good speedup, but dropna results seem strange. This is because PR #6472 contained improvements for dropna, but issue#5394 affected it and the new approach only applied to dataframes that have less than 32 column partitions (1024 columns when using the default MinPartitionSize). This PR fixes that problem, but it doesn't help because the old way looks better for data frames with more columns.
This can be seen by the fact that the performance results increase for data frames with less than 1024 columns, but larger dataframes still have better performance.

@anmyachev
Copy link
Collaborator

anmyachev commented Jul 19, 2024

This PR fixes that problem, but it doesn't help because the old way looks better for data frames with more columns.

@Retribution98 do you plan to preserve that path?

but issue#5347 affected it and the new approach only applied to dataframes that have less than 32 column partitions (1024 columns when using the default MinPartitionSize). This PR fixes that problem

Is the issue number correct? If you plan to keep the fix, it is better to put it in a separate pull request.

if parts:
result = pandas.concat(parts, axis=1, copy=False)
else:
result = pandas.DataFrame(columns=result_columns)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it important to keep the column names in case of empty partitions? How did you come to this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do it to get the expectable result, because result columns are known already. Would it bring any problems or slowdowns?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just want to make sure that Modin's behavior matches that of a pandas. Do we have a test for this new code?

modin/core/storage_formats/pandas/query_compiler.py Outdated Show resolved Hide resolved
# FIXME: this is a naive workaround for this problem: https://github.com/modin-project/modin/issues/5394
# if there are too many partitions then all non-full-axis implementations start acting very badly.
# The here threshold is pretty random though it works fine on simple scenarios
# The map reduce approach works well for frames with few columnar partitions
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# The map reduce approach works well for frames with few columnar partitions
# The map reduce approach works well for frames with few row or column partitions

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not correct. I got the follows results:
If Dataframe has few rows (less than 10^7) the old way show a better performance regardless of the number of columns.
If Dataframe has plenty of rows (more than 10^7) and few columns the map reduce approach is better.
If Dataframe has plenty of rows (more than 10^7) and plenty of columns the old approach is better again.

But I didn't add any new conditions here, because this is beyond the scope of the current task.

modin/core/dataframe/pandas/dataframe/dataframe.py Outdated Show resolved Hide resolved
modin/core/dataframe/pandas/dataframe/dataframe.py Outdated Show resolved Hide resolved
modin/core/dataframe/pandas/dataframe/dataframe.py Outdated Show resolved Hide resolved
anmyachev
anmyachev previously approved these changes Aug 19, 2024
Copy link
Collaborator

@anmyachev anmyachev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@anmyachev anmyachev merged commit 8249915 into modin-project:main Aug 26, 2024
38 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Using dynamic partitioning for broadcast_apply
4 participants