Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: "carefully" allow for dask Expr that modify index #743

Open
wants to merge 15 commits into
base: main
Choose a base branch
from

Conversation

FBruzzesi
Copy link
Member

What type of PR is this? (check all applicable)

  • πŸ’Ύ Refactor
  • ✨ Feature
  • πŸ› Bug Fix
  • πŸ”§ Optimization
  • πŸ“ Documentation
  • βœ… Test
  • 🐳 Other

Checklist

  • Code follows style guide (ruff)
  • Tests added
  • Documented the changes

If you have comments or can explain your changes, please do so below.

Pretty dangerous stuff to workaround the dask index.

To assess that the implementation is working as expected, I implemented both sort (different index but same length) and drop_nulls (different index due to different length)

@github-actions github-actions bot added the enhancement New feature or request label Aug 8, 2024
@FBruzzesi FBruzzesi changed the title RFC feat: dask index workaround RFC feat: dask index "hacking" Aug 8, 2024
Comment on lines +41 to +42
result = df.select(nw.col("a").drop_nulls(), nw.col("d").drop_nulls())
expected = {"a": [1.0, 2.0], "d": [6, 6]}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sadly this broadcast is not working as drop_nulls does not return a scalar. I would consider this an edge case and focus on the broader support

@anopsy
Copy link
Member

anopsy commented Aug 8, 2024

image

@MarcoGorelli
Copy link
Member

thanks for trying this - i'll test it out and see if there's a perf impact

@FBruzzesi FBruzzesi mentioned this pull request Aug 9, 2024
10 tasks

col_order = list(new_series.keys())

left_most_series = next( # pragma: no cover
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is guaranteed to not end up in StopIteration error as if everything was a scalar the previous block would have been entered and returned

@MarcoGorelli
Copy link
Member

we've got the notebooks in tpch/notebooks, the first two support Dask - fancy running them with this branch and seeing if there's any perf impact?

@FBruzzesi
Copy link
Member Author

FBruzzesi commented Aug 21, 2024

Hey @MarcoGorelli, I am giving another thought on this feature (which I would still love to see), here is a simple idea to have partial support without loss of performance:

  • sort is the only method that changes the index and result in an output with the same length. Instead of changing the index to each series, we can do that specifically in sort, namely by assigning the original index.
    def sort(self: Self, *, descending: bool = False, nulls_last: bool = False) -> Self:
         na_position = "last" if nulls_last else "first"
    
         def func(_input: Any, ascending: bool, na_position: bool) -> Any:  # noqa: FBT001
             name = _input.name
    
             result =_input.to_frame(name=name).sort_values(
                 by=name, ascending=ascending, na_position=na_position
             )[name]
             return de._expr.AssignIndex(result, _input.index)
    
         return self._from_call(
             func,
             "sort",
             not descending,
             na_position,
             returns_scalar=False,
         )
  • All the other methods that change the index, do so by reducing the length of the series. In my working experience and in TPCH queries they are mostly used before a reduction or in isolation, therefore we should not worry of changing their index. Example:
    df.select(
        head_sum=pl.col("a").head().sum(),
        tail_mean=pl.col("a").tail().mean(),
    )
  • What is left and unsupported you may ask? Multiple reductions operations ending up with the same length, different from the original, won't be possible. Example:
    df.select(
        head=pl.col("a").head(),
        tail=pl.col("a").tail(),
    )

What do you think?

@FBruzzesi FBruzzesi marked this pull request as ready for review August 27, 2024 07:56
@FBruzzesi
Copy link
Member Author

FBruzzesi commented Aug 27, 2024

@MarcoGorelli I am tagging this as ready for review as I re-worked it a bit more.

The TL;DR is:

  • sort is kind of special, as it modifies the index but returns a Series of the same length of the original one, therefore in such specific case I am manually re-assigning the index
  • for all other methods, I added a boolean flag to DaskExpr called modifies_index and:
    • that is not allowed in with_columns
    • in select it should be allowed only if there are no other exprs or there is a reduction following (I need to address both these cases actually).

Yet before developing further, I would like some feedback on how likable this approach is and if we want to move forward with it πŸ™πŸΌ

Comment on lines 713 to 724
def head(self: Self, n: int) -> Self:
return self._from_call(
lambda _input, _n: _input.head(_n, compute=False),
"head",
n,
returns_scalar=False,
modifies_index=True,
)

def tail(self: Self, n: int) -> Self:
return self._from_call(
lambda _input, _n: _input.tail(_n, compute=False),
Copy link
Member Author

@FBruzzesi FBruzzesi Aug 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So... head has a npartitions param which can be set to -1 and scan all partitions, while tail does not. This means that if we have multiple partitions, then this implementation of tail may not be what we expect

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't seen cases where users actually want more than 1 partition if they call head or tail tbh, Yes this is technically an issue, but not something I've encountered in the wild

@MarcoGorelli
Copy link
Member

thanks @FBruzzesi !

to be honest I don't know about using such private methods, it makes me feel slightly uneasy - @phofl do you have time/interest in taking a look? specifically the de._collection.Series(de._expr.AssignIndex(result, _input.index)) part in narwhals/_dask/expr.py

I think that for sql engines (like duckdb, which hopefully we can get to eventually) operations like df.select(nw.col('a').sort(), nw.col('b')) would be problematic anyway, so I don't think it'd be an issue to leave them out of the Narwhals area of support

narwhals/_dask/dataframe.py Outdated Show resolved Hide resolved
result = _input.to_frame(name=name).sort_values(
by=name, ascending=ascending, na_position=na_position
)[name]
return de._collection.Series(de._expr.AssignIndex(result, _input.index))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I can second @MarcoGorelli here, please don't do this. We are sorting a Series (i.e. a single column from the df), correct?

I would:

tmp = _input.reset_index().sort_values(...)
result = tmp[_input.name]
result.index = tmp["use the index name of _input"]

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or do you want to keep the Index of the input?

If yes, this is fundamentally a bad idea in Dask, it will shoot you in the foot all over the place. You have zero guarantees that the partitions keep their lengths when sorting (it is a lot more likely that they do not), so this is bound to fail in all kinds of places

Copy link
Member Author

@FBruzzesi FBruzzesi Aug 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @phofl thanks for taking the time.

We are sorting a Series (i.e. a single column from the df), correct?

Yes indeed, but with the final goal to potentially add it as a new column to the original dataframe, and that's where the index misalignment gets into the game.

I will try the approach you are suggesting, which is not too far off from what is already implemented, and see if everything else falls into place πŸ™πŸΌ

Edit: it just ends up raising:

AssertionError: value needs to be aligned with the index

Traceback
---------------------------------------------------------------------------
AssertionError                            Traceback (most recent call last)
Cell In[60], line 3
      1 tmp = df_dd["a"].to_frame(name="a").sort_values("a")
      2 result = tmp["a"]
----> 3 result.index = df_dd["a"].index

    655 @index.setter
    656 def index(self, value):
--> 657     assert expr.are_co_aligned(
    658         self.expr, value.expr
    659     ), "value needs to be aligned with the index"
    660     _expr = expr.AssignIndex(self, value)
    661     self._expr = _expr

AssertionError: value needs to be aligned with the index

@FBruzzesi
Copy link
Member Author

Hi @phofl, apologies to call you in the mix once more.

I have a few questions in order to make this work and guarantee that we don't end up with a

fundamentally a bad idea in Dask, it will shoot you in the foot all over the place.

  • How can we test for when Dask will shoot us in the foot if we do something bad?
  • The latest approach TL;DR is that if a method changes the index, then it either has to be followed by a reduction or be a single selection. Examples:
    • Reductions:
      df.select(
          head_sum=pl.col("a").head().sum(),
          tail_mean=pl.col("a").tail().mean(),
      )
      which would translate to something like dd.concat([df["a"].head().sum(), df["a"].tail().mean()])
    • Single selection:
       df.select(
          head=pl.col("a").head(),
       )
    In sight of the first question, what do you think about this approach? Is it a fundamentally bad idea?

@FBruzzesi FBruzzesi changed the title RFC feat: dask index "hacking" feat: "carefully" allow for dask Expr that modify index Sep 16, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants