From e91915ae4bd283f0fd4d4e48e24aaa58f00bd38d Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Wed, 12 Apr 2023 11:08:18 +0200 Subject: [PATCH] FIX-#5977: use `wrapper.materialize` instead of `wait_partitions`; use AWS env vars in `pytest_sessionstart` function (#5981) Co-authored-by: Karthik Velayutham Signed-off-by: Anatoly Myachev --- modin/conftest.py | 7 ++++++- .../dask/implementations/pandas_on_dask/io/io.py | 4 +++- .../ray/implementations/pandas_on_ray/io/io.py | 12 +++++++++--- .../implementations/pandas_on_unidist/io/io.py | 12 +++++++++--- .../dask/implementations/pandas_on_dask/io/io.py | 4 +++- .../ray/implementations/pandas_on_ray/io/io.py | 4 +++- .../implementations/pandas_on_unidist/io/io.py | 4 +++- 7 files changed, 36 insertions(+), 11 deletions(-) diff --git a/modin/conftest.py b/modin/conftest.py index 942c3271eec..b81dbf033f7 100644 --- a/modin/conftest.py +++ b/modin/conftest.py @@ -587,7 +587,12 @@ def pytest_sessionstart(session): addr = "localhost:50051" global ray_client_server ray_client_server = ray_server.serve(addr) - ray.util.connect(addr) + env_vars = { + "AWS_ACCESS_KEY_ID": CIAWSAccessKeyID.get(), + "AWS_SECRET_ACCESS_KEY": CIAWSSecretAccessKey.get(), + } + extra_init_kw = {"runtime_env": {"env_vars": env_vars}} + ray.util.connect(addr, ray_init_kwargs=extra_init_kw) def pytest_sessionfinish(session, exitstatus): diff --git a/modin/core/execution/dask/implementations/pandas_on_dask/io/io.py b/modin/core/execution/dask/implementations/pandas_on_dask/io/io.py index 69e4327fa34..8f6eedf1eaa 100644 --- a/modin/core/execution/dask/implementations/pandas_on_dask/io/io.py +++ b/modin/core/execution/dask/implementations/pandas_on_dask/io/io.py @@ -191,4 +191,6 @@ def func(df): # pragma: no cover # Ensure that the metadata is synchronized qc._modin_frame._propagate_index_objs(axis=None) result = qc._modin_frame.apply_full_axis(1, func, new_index=[], new_columns=[]) - result._partition_mgr_cls.wait_partitions(result._partitions.flatten()) + DaskWrapper.materialize( + [part.list_of_blocks[0] for row in result._partitions for part in row] + ) diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py b/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py index a1ca8f9c312..c292e7c37f3 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py @@ -111,7 +111,9 @@ def func(df): # pragma: no cover # Ensure that the metadata is synchronized qc._modin_frame._propagate_index_objs(axis=None) result = qc._modin_frame.apply_full_axis(1, func, new_index=[], new_columns=[]) - result._partition_mgr_cls.wait_partitions(result._partitions.flatten()) + RayWrapper.materialize( + [part.list_of_blocks[0] for row in result._partitions for part in row] + ) @staticmethod def _to_csv_check_support(kwargs): @@ -235,7 +237,9 @@ def func(df, **kw): max_retries=0, ) # pending completion - qc._modin_frame._partition_mgr_cls.wait_partitions(result.flatten()) + RayWrapper.materialize( + [part.list_of_blocks[0] for row in result for part in row] + ) @staticmethod def _to_parquet_check_support(kwargs): @@ -313,4 +317,6 @@ def func(df, **kw): enumerate_partitions=True, ) # pending completion - qc._modin_frame._partition_mgr_cls.wait_partitions(result.flatten()) + RayWrapper.materialize( + [part.list_of_blocks[0] for row in result for part in row] + ) diff --git a/modin/core/execution/unidist/implementations/pandas_on_unidist/io/io.py b/modin/core/execution/unidist/implementations/pandas_on_unidist/io/io.py index bf0c23674f0..a8153fdfb71 100644 --- a/modin/core/execution/unidist/implementations/pandas_on_unidist/io/io.py +++ b/modin/core/execution/unidist/implementations/pandas_on_unidist/io/io.py @@ -110,7 +110,9 @@ def func(df): # pragma: no cover # Ensure that the metadata is synchronized qc._modin_frame._propagate_index_objs(axis=None) result = qc._modin_frame.apply_full_axis(1, func, new_index=[], new_columns=[]) - result._partition_mgr_cls.wait_partitions(result._partitions.flatten()) + UnidistWrapper.materialize( + [part.list_of_blocks[0] for row in result._partitions for part in row] + ) @staticmethod def _to_csv_check_support(kwargs): @@ -234,7 +236,9 @@ def func(df, **kw): max_retries=0, ) # pending completion - qc._modin_frame._partition_mgr_cls.wait_partitions(result.flatten()) + UnidistWrapper.materialize( + [part.list_of_blocks[0] for row in result for part in row] + ) @staticmethod def _to_parquet_check_support(kwargs): @@ -312,4 +316,6 @@ def func(df, **kw): enumerate_partitions=True, ) # pending completion - qc._modin_frame._partition_mgr_cls.wait_partitions(result.flatten()) + UnidistWrapper.materialize( + [part.list_of_blocks[0] for row in result for part in row] + ) diff --git a/modin/experimental/core/execution/dask/implementations/pandas_on_dask/io/io.py b/modin/experimental/core/execution/dask/implementations/pandas_on_dask/io/io.py index 35d0f812edc..a27df53a1ca 100644 --- a/modin/experimental/core/execution/dask/implementations/pandas_on_dask/io/io.py +++ b/modin/experimental/core/execution/dask/implementations/pandas_on_dask/io/io.py @@ -121,4 +121,6 @@ def func(df, **kw): # pragma: no cover result = qc._modin_frame.apply_full_axis( 1, func, new_index=[], new_columns=[], enumerate_partitions=True ) - result._partition_mgr_cls.wait_partitions(result._partitions.flatten()) + DaskWrapper.materialize( + [part.list_of_blocks[0] for row in result._partitions for part in row] + ) diff --git a/modin/experimental/core/execution/ray/implementations/pandas_on_ray/io/io.py b/modin/experimental/core/execution/ray/implementations/pandas_on_ray/io/io.py index b218a424412..66f336588e1 100644 --- a/modin/experimental/core/execution/ray/implementations/pandas_on_ray/io/io.py +++ b/modin/experimental/core/execution/ray/implementations/pandas_on_ray/io/io.py @@ -112,4 +112,6 @@ def func(df, **kw): # pragma: no cover result = qc._modin_frame.apply_full_axis( 1, func, new_index=[], new_columns=[], enumerate_partitions=True ) - result._partition_mgr_cls.wait_partitions(result._partitions.flatten()) + RayWrapper.materialize( + [part.list_of_blocks[0] for row in result._partitions for part in row] + ) diff --git a/modin/experimental/core/execution/unidist/implementations/pandas_on_unidist/io/io.py b/modin/experimental/core/execution/unidist/implementations/pandas_on_unidist/io/io.py index f49f09f66b5..cfa14ccce31 100644 --- a/modin/experimental/core/execution/unidist/implementations/pandas_on_unidist/io/io.py +++ b/modin/experimental/core/execution/unidist/implementations/pandas_on_unidist/io/io.py @@ -114,4 +114,6 @@ def func(df, **kw): # pragma: no cover result = qc._modin_frame.apply_full_axis( 1, func, new_index=[], new_columns=[], enumerate_partitions=True ) - result._partition_mgr_cls.wait_partitions(result._partitions.flatten()) + UnidistWrapper.materialize( + [part.list_of_blocks[0] for row in result._partitions for part in row] + )