Skip to content

Commit

Permalink
FIX-#5977: use wrapper.materialize instead of wait_partitions; us…
Browse files Browse the repository at this point in the history
…e AWS env vars in `pytest_sessionstart` function (#5981)

Co-authored-by: Karthik Velayutham <vkarthik@ponder.io>
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
  • Loading branch information
anmyachev and Karthik Velayutham authored Apr 12, 2023
1 parent e1d639f commit e91915a
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 11 deletions.
7 changes: 6 additions & 1 deletion modin/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,12 @@ def pytest_sessionstart(session):
addr = "localhost:50051"
global ray_client_server
ray_client_server = ray_server.serve(addr)
ray.util.connect(addr)
env_vars = {
"AWS_ACCESS_KEY_ID": CIAWSAccessKeyID.get(),
"AWS_SECRET_ACCESS_KEY": CIAWSSecretAccessKey.get(),
}
extra_init_kw = {"runtime_env": {"env_vars": env_vars}}
ray.util.connect(addr, ray_init_kwargs=extra_init_kw)


def pytest_sessionfinish(session, exitstatus):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,4 +191,6 @@ def func(df): # pragma: no cover
# Ensure that the metadata is synchronized
qc._modin_frame._propagate_index_objs(axis=None)
result = qc._modin_frame.apply_full_axis(1, func, new_index=[], new_columns=[])
result._partition_mgr_cls.wait_partitions(result._partitions.flatten())
DaskWrapper.materialize(
[part.list_of_blocks[0] for row in result._partitions for part in row]
)
12 changes: 9 additions & 3 deletions modin/core/execution/ray/implementations/pandas_on_ray/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ def func(df): # pragma: no cover
# Ensure that the metadata is synchronized
qc._modin_frame._propagate_index_objs(axis=None)
result = qc._modin_frame.apply_full_axis(1, func, new_index=[], new_columns=[])
result._partition_mgr_cls.wait_partitions(result._partitions.flatten())
RayWrapper.materialize(
[part.list_of_blocks[0] for row in result._partitions for part in row]
)

@staticmethod
def _to_csv_check_support(kwargs):
Expand Down Expand Up @@ -235,7 +237,9 @@ def func(df, **kw):
max_retries=0,
)
# pending completion
qc._modin_frame._partition_mgr_cls.wait_partitions(result.flatten())
RayWrapper.materialize(
[part.list_of_blocks[0] for row in result for part in row]
)

@staticmethod
def _to_parquet_check_support(kwargs):
Expand Down Expand Up @@ -313,4 +317,6 @@ def func(df, **kw):
enumerate_partitions=True,
)
# pending completion
qc._modin_frame._partition_mgr_cls.wait_partitions(result.flatten())
RayWrapper.materialize(
[part.list_of_blocks[0] for row in result for part in row]
)
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ def func(df): # pragma: no cover
# Ensure that the metadata is synchronized
qc._modin_frame._propagate_index_objs(axis=None)
result = qc._modin_frame.apply_full_axis(1, func, new_index=[], new_columns=[])
result._partition_mgr_cls.wait_partitions(result._partitions.flatten())
UnidistWrapper.materialize(
[part.list_of_blocks[0] for row in result._partitions for part in row]
)

@staticmethod
def _to_csv_check_support(kwargs):
Expand Down Expand Up @@ -234,7 +236,9 @@ def func(df, **kw):
max_retries=0,
)
# pending completion
qc._modin_frame._partition_mgr_cls.wait_partitions(result.flatten())
UnidistWrapper.materialize(
[part.list_of_blocks[0] for row in result for part in row]
)

@staticmethod
def _to_parquet_check_support(kwargs):
Expand Down Expand Up @@ -312,4 +316,6 @@ def func(df, **kw):
enumerate_partitions=True,
)
# pending completion
qc._modin_frame._partition_mgr_cls.wait_partitions(result.flatten())
UnidistWrapper.materialize(
[part.list_of_blocks[0] for row in result for part in row]
)
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,6 @@ def func(df, **kw): # pragma: no cover
result = qc._modin_frame.apply_full_axis(
1, func, new_index=[], new_columns=[], enumerate_partitions=True
)
result._partition_mgr_cls.wait_partitions(result._partitions.flatten())
DaskWrapper.materialize(
[part.list_of_blocks[0] for row in result._partitions for part in row]
)
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,6 @@ def func(df, **kw): # pragma: no cover
result = qc._modin_frame.apply_full_axis(
1, func, new_index=[], new_columns=[], enumerate_partitions=True
)
result._partition_mgr_cls.wait_partitions(result._partitions.flatten())
RayWrapper.materialize(
[part.list_of_blocks[0] for row in result._partitions for part in row]
)
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,6 @@ def func(df, **kw): # pragma: no cover
result = qc._modin_frame.apply_full_axis(
1, func, new_index=[], new_columns=[], enumerate_partitions=True
)
result._partition_mgr_cls.wait_partitions(result._partitions.flatten())
UnidistWrapper.materialize(
[part.list_of_blocks[0] for row in result._partitions for part in row]
)

0 comments on commit e91915a

Please sign in to comment.