Skip to content

Commit

Permalink
FIX-#6022: support lazy import of modin.pandas module (#6023)
Browse files Browse the repository at this point in the history
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
  • Loading branch information
anmyachev committed Apr 24, 2023
1 parent f2422e9 commit ed474fe
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 17 deletions.
7 changes: 7 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,9 @@ jobs:
- run: pip install "dfsql>=0.4.2" "pyparsing<=2.4.7" && mpiexec -n 1 python -m pytest modin/experimental/sql/test/test_sql.py
- run: mpiexec -n 1 python -m pytest modin/test/interchange/dataframe_protocol/test_general.py
- run: mpiexec -n 1 python -m pytest modin/test/interchange/dataframe_protocol/pandas/test_protocol.py
- run: |
python -m pip install lazy_import
mpiexec -n 1 python -m pytest modin/pandas/test/integrations/
- uses: ./.github/workflows/upload-coverage

test-all:
Expand Down Expand Up @@ -823,6 +826,10 @@ jobs:
if: matrix.engine == 'python' || matrix.test_task == 'group_4'
- run: python -m pytest modin/test/interchange/dataframe_protocol/pandas/test_protocol.py
if: matrix.engine == 'python' || matrix.test_task == 'group_4'
- run: |
python -m pip install lazy_import
python -m pytest modin/pandas/test/integrations/
if: matrix.engine == 'python' || matrix.test_task == 'group_4'
- uses: ./.github/workflows/upload-coverage

test-experimental:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,6 @@
from modin.utils import _inherit_docstrings


# If Ray has not been initialized yet by Modin,
# it will be initialized when calling `RayWrapper.put`.
_DEPLOY_AXIS_FUNC = RayWrapper.put(PandasDataframeAxisPartition.deploy_axis_func)
_DRAIN = RayWrapper.put(PandasDataframeAxisPartition.drain)


class PandasOnRayDataframeVirtualPartition(PandasDataframeAxisPartition):
"""
The class implements the interface in ``PandasDataframeAxisPartition``.
Expand All @@ -58,6 +52,24 @@ class PandasOnRayDataframeVirtualPartition(PandasDataframeAxisPartition):
instance_type = ray.ObjectRef
axis = None

# these variables are intentionally initialized at runtime (see #6023)
_DEPLOY_AXIS_FUNC = None
_DRAIN_FUNC = None

@classmethod
def _get_deploy_axis_func(cls): # noqa: GL08
if cls._DEPLOY_AXIS_FUNC is None:
cls._DEPLOY_AXIS_FUNC = RayWrapper.put(
PandasDataframeAxisPartition.deploy_axis_func
)
return cls._DEPLOY_AXIS_FUNC

@classmethod
def _get_drain_func(cls): # noqa: GL08
if cls._DRAIN_FUNC is None:
cls._DRAIN_FUNC = RayWrapper.put(PandasDataframeAxisPartition.drain)
return cls._DRAIN_FUNC

def __init__(
self,
list_of_partitions,
Expand Down Expand Up @@ -200,7 +212,7 @@ def deploy_axis_func(
num_returns=(num_splits if lengths is None else len(lengths)) * 4,
**({"max_retries": max_retries} if max_retries is not None else {}),
).remote(
_DEPLOY_AXIS_FUNC,
cls._get_deploy_axis_func(),
axis,
func,
f_args,
Expand Down Expand Up @@ -473,7 +485,7 @@ def drain_call_queue(self, num_splits=None):
_ = self.list_of_blocks
return
drained = super(PandasOnRayDataframeVirtualPartition, self).apply(
_DRAIN, num_splits=num_splits, call_queue=self.call_queue
self._get_drain_func(), num_splits=num_splits, call_queue=self.call_queue
)
self._list_of_block_partitions = drained
self.call_queue = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,6 @@
from modin.utils import _inherit_docstrings


# If unidist has not been initialized yet by Modin,
# unidist itself handles initialization when calling `unidist.put`,
# which is called inside of `UnidistWrapper.put`.
_DEPLOY_AXIS_FUNC = UnidistWrapper.put(PandasDataframeAxisPartition.deploy_axis_func)
_DRAIN = UnidistWrapper.put(PandasDataframeAxisPartition.drain)


class PandasOnUnidistDataframeVirtualPartition(PandasDataframeAxisPartition):
"""
The class implements the interface in ``PandasDataframeAxisPartition``.
Expand All @@ -58,6 +51,24 @@ class PandasOnUnidistDataframeVirtualPartition(PandasDataframeAxisPartition):
instance_type = unidist.core.base.object_ref.ObjectRef
axis = None

# these variables are intentionally initialized at runtime (see #6023)
_DEPLOY_AXIS_FUNC = None
_DRAIN_FUNC = None

@classmethod
def _get_deploy_axis_func(cls): # noqa: GL08
if cls._DEPLOY_AXIS_FUNC is None:
cls._DEPLOY_AXIS_FUNC = UnidistWrapper.put(
PandasDataframeAxisPartition.deploy_axis_func
)
return cls._DEPLOY_AXIS_FUNC

@classmethod
def _get_drain_func(cls): # noqa: GL08
if cls._DRAIN_FUNC is None:
cls._DRAIN_FUNC = UnidistWrapper.put(PandasDataframeAxisPartition.drain)
return cls._DRAIN_FUNC

def __init__(
self,
list_of_partitions,
Expand Down Expand Up @@ -199,7 +210,7 @@ def deploy_axis_func(
num_returns=(num_splits if lengths is None else len(lengths)) * 4,
**({"max_retries": max_retries} if max_retries is not None else {}),
).remote(
_DEPLOY_AXIS_FUNC,
cls._get_deploy_axis_func(),
axis,
func,
f_args,
Expand Down Expand Up @@ -466,7 +477,7 @@ def drain_call_queue(self, num_splits=None):
_ = self.list_of_blocks
return
drained = super(PandasOnUnidistDataframeVirtualPartition, self).apply(
_DRAIN, num_splits=num_splits, call_queue=self.call_queue
self._get_drain_func(), num_splits=num_splits, call_queue=self.call_queue
)
self._list_of_block_partitions = drained
self.call_queue = []
Expand Down
12 changes: 12 additions & 0 deletions modin/pandas/test/integrations/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Licensed to Modin Development Team under one or more contributor license agreements.
# See the NOTICE file distributed with this work for additional information regarding
# copyright ownership. The Modin Development Team licenses this file to you under the
# Apache License, Version 2.0 (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.
22 changes: 22 additions & 0 deletions modin/pandas/test/integrations/test_lazy_import.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Licensed to Modin Development Team under one or more contributor license agreements.
# See the NOTICE file distributed with this work for additional information regarding
# copyright ownership. The Modin Development Team licenses this file to you under the
# Apache License, Version 2.0 (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

import lazy_import

pandas = lazy_import.lazy_module("pandas")
pyarrow = lazy_import.lazy_module("pyarrow")
from modin import pandas as pd # noqa: E402


def test_dataframe_constructor():
pd.DataFrame({"col1": [1, 2, 3], "col2": list("abc")})

0 comments on commit ed474fe

Please sign in to comment.