diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4684a73308d..f19b6f41d67 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -663,6 +663,9 @@ jobs: - run: pip install "dfsql>=0.4.2" "pyparsing<=2.4.7" && mpiexec -n 1 python -m pytest modin/experimental/sql/test/test_sql.py - run: mpiexec -n 1 python -m pytest modin/test/interchange/dataframe_protocol/test_general.py - run: mpiexec -n 1 python -m pytest modin/test/interchange/dataframe_protocol/pandas/test_protocol.py + - run: | + python -m pip install lazy_import + mpiexec -n 1 python -m pytest modin/pandas/test/integrations/ - uses: ./.github/workflows/upload-coverage test-all: @@ -823,6 +826,10 @@ jobs: if: matrix.engine == 'python' || matrix.test_task == 'group_4' - run: python -m pytest modin/test/interchange/dataframe_protocol/pandas/test_protocol.py if: matrix.engine == 'python' || matrix.test_task == 'group_4' + - run: | + python -m pip install lazy_import + python -m pytest modin/pandas/test/integrations/ + if: matrix.engine == 'python' || matrix.test_task == 'group_4' - uses: ./.github/workflows/upload-coverage test-experimental: diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py index 2d68463c8a8..747f45d0668 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py @@ -26,12 +26,6 @@ from modin.utils import _inherit_docstrings -# If Ray has not been initialized yet by Modin, -# it will be initialized when calling `RayWrapper.put`. -_DEPLOY_AXIS_FUNC = RayWrapper.put(PandasDataframeAxisPartition.deploy_axis_func) -_DRAIN = RayWrapper.put(PandasDataframeAxisPartition.drain) - - class PandasOnRayDataframeVirtualPartition(PandasDataframeAxisPartition): """ The class implements the interface in ``PandasDataframeAxisPartition``. @@ -58,6 +52,24 @@ class PandasOnRayDataframeVirtualPartition(PandasDataframeAxisPartition): instance_type = ray.ObjectRef axis = None + # these variables are intentionally initialized at runtime (see #6023) + _DEPLOY_AXIS_FUNC = None + _DRAIN_FUNC = None + + @classmethod + def _get_deploy_axis_func(cls): # noqa: GL08 + if cls._DEPLOY_AXIS_FUNC is None: + cls._DEPLOY_AXIS_FUNC = RayWrapper.put( + PandasDataframeAxisPartition.deploy_axis_func + ) + return cls._DEPLOY_AXIS_FUNC + + @classmethod + def _get_drain_func(cls): # noqa: GL08 + if cls._DRAIN_FUNC is None: + cls._DRAIN_FUNC = RayWrapper.put(PandasDataframeAxisPartition.drain) + return cls._DRAIN_FUNC + def __init__( self, list_of_partitions, @@ -200,7 +212,7 @@ def deploy_axis_func( num_returns=(num_splits if lengths is None else len(lengths)) * 4, **({"max_retries": max_retries} if max_retries is not None else {}), ).remote( - _DEPLOY_AXIS_FUNC, + cls._get_deploy_axis_func(), axis, func, f_args, @@ -473,7 +485,7 @@ def drain_call_queue(self, num_splits=None): _ = self.list_of_blocks return drained = super(PandasOnRayDataframeVirtualPartition, self).apply( - _DRAIN, num_splits=num_splits, call_queue=self.call_queue + self._get_drain_func(), num_splits=num_splits, call_queue=self.call_queue ) self._list_of_block_partitions = drained self.call_queue = [] diff --git a/modin/core/execution/unidist/implementations/pandas_on_unidist/partitioning/virtual_partition.py b/modin/core/execution/unidist/implementations/pandas_on_unidist/partitioning/virtual_partition.py index 0e1b4a9ea78..96e9bfc6d73 100644 --- a/modin/core/execution/unidist/implementations/pandas_on_unidist/partitioning/virtual_partition.py +++ b/modin/core/execution/unidist/implementations/pandas_on_unidist/partitioning/virtual_partition.py @@ -25,13 +25,6 @@ from modin.utils import _inherit_docstrings -# If unidist has not been initialized yet by Modin, -# unidist itself handles initialization when calling `unidist.put`, -# which is called inside of `UnidistWrapper.put`. -_DEPLOY_AXIS_FUNC = UnidistWrapper.put(PandasDataframeAxisPartition.deploy_axis_func) -_DRAIN = UnidistWrapper.put(PandasDataframeAxisPartition.drain) - - class PandasOnUnidistDataframeVirtualPartition(PandasDataframeAxisPartition): """ The class implements the interface in ``PandasDataframeAxisPartition``. @@ -58,6 +51,24 @@ class PandasOnUnidistDataframeVirtualPartition(PandasDataframeAxisPartition): instance_type = unidist.core.base.object_ref.ObjectRef axis = None + # these variables are intentionally initialized at runtime (see #6023) + _DEPLOY_AXIS_FUNC = None + _DRAIN_FUNC = None + + @classmethod + def _get_deploy_axis_func(cls): # noqa: GL08 + if cls._DEPLOY_AXIS_FUNC is None: + cls._DEPLOY_AXIS_FUNC = UnidistWrapper.put( + PandasDataframeAxisPartition.deploy_axis_func + ) + return cls._DEPLOY_AXIS_FUNC + + @classmethod + def _get_drain_func(cls): # noqa: GL08 + if cls._DRAIN_FUNC is None: + cls._DRAIN_FUNC = UnidistWrapper.put(PandasDataframeAxisPartition.drain) + return cls._DRAIN_FUNC + def __init__( self, list_of_partitions, @@ -199,7 +210,7 @@ def deploy_axis_func( num_returns=(num_splits if lengths is None else len(lengths)) * 4, **({"max_retries": max_retries} if max_retries is not None else {}), ).remote( - _DEPLOY_AXIS_FUNC, + cls._get_deploy_axis_func(), axis, func, f_args, @@ -466,7 +477,7 @@ def drain_call_queue(self, num_splits=None): _ = self.list_of_blocks return drained = super(PandasOnUnidistDataframeVirtualPartition, self).apply( - _DRAIN, num_splits=num_splits, call_queue=self.call_queue + self._get_drain_func(), num_splits=num_splits, call_queue=self.call_queue ) self._list_of_block_partitions = drained self.call_queue = [] diff --git a/modin/pandas/test/integrations/__init__.py b/modin/pandas/test/integrations/__init__.py new file mode 100644 index 00000000000..cae6413e559 --- /dev/null +++ b/modin/pandas/test/integrations/__init__.py @@ -0,0 +1,12 @@ +# Licensed to Modin Development Team under one or more contributor license agreements. +# See the NOTICE file distributed with this work for additional information regarding +# copyright ownership. The Modin Development Team licenses this file to you under the +# Apache License, Version 2.0 (the "License"); you may not use this file except in +# compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under +# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific language +# governing permissions and limitations under the License. diff --git a/modin/pandas/test/integrations/test_lazy_import.py b/modin/pandas/test/integrations/test_lazy_import.py new file mode 100644 index 00000000000..21f76f715af --- /dev/null +++ b/modin/pandas/test/integrations/test_lazy_import.py @@ -0,0 +1,22 @@ +# Licensed to Modin Development Team under one or more contributor license agreements. +# See the NOTICE file distributed with this work for additional information regarding +# copyright ownership. The Modin Development Team licenses this file to you under the +# Apache License, Version 2.0 (the "License"); you may not use this file except in +# compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under +# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific language +# governing permissions and limitations under the License. + +import lazy_import + +pandas = lazy_import.lazy_module("pandas") +pyarrow = lazy_import.lazy_module("pyarrow") +from modin import pandas as pd # noqa: E402 + + +def test_dataframe_constructor(): + pd.DataFrame({"col1": [1, 2, 3], "col2": list("abc")})