Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add foreach Functionality for DataFrame Operations #20

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

Shindora
Copy link

This pull request introduces the foreach method to the DataFrame API, enabling users to apply a closure to each row of the DataFrame asynchronously. The foreach method iterates over the rows of the DataFrame and applies the provided closure, allowing users to perform custom operations or side effects on each row.

Details:
Implemented the foreach method in the DataFrame struct, which asynchronously applies a closure to each row.
Added support for asynchronous iteration over the rows of the DataFrame using the foreach method.
Ensured error handling and propagation of errors encountered during the iteration process.

Testing:
Added unit tests to validate the functionality of the foreach method.
Tested the foreach method with various scenarios, including empty DataFrames, DataFrames with different numbers of rows, and DataFrames with different column types.
Conducted integration testing to ensure compatibility with other DataFrame operations and functionalities.

Additional Notes:

The foreach method provides a convenient way to perform asynchronous operations on DataFrame rows without collecting the DataFrame into memory.
Users should be aware of the potential concurrency issues and thread safety considerations when using the foreach method in multi-threaded environments.

Screenshot 2024-04-21 at 03 17 33

@@ -528,6 +528,31 @@ mod tests {
assert_eq!(&expected, &rows_func_asc);
Ok(())
}
#[tokio::test]
async fn test_func_foreach() -> Result<(), SparkError> {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move this test into the dataframe.rs file?

@sjrusso8
Copy link
Owner

Thanks for making this pull request! Have not had a good chance to look at implementing things like foreach. Technically the foreach method is not available on the released versions of Spark Connect. The roadmap seems to indicate that this will be added for Spark v4.

Correct me if I am wrong, but I think this implementation might be a little more involved. The planned implementation for pyspark connect client is to register the provided function as a UDF and then call collect as a way to force the action. So this would serialize the function, distribute the work to all the workers, and then collect the result. This implementation would first collect all the values onto the client and then apply the func over the RecordBatch. This might not have the expected result of not collecting the DataFrame into memory on the client.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants