Skip to content

Commit

Permalink
Enable test_default.py and test_join_sort.py
Browse files Browse the repository at this point in the history
Signed-off-by: Igoshev, Iaroslav <iaroslav.igoshev@intel.com>
  • Loading branch information
YarShev committed May 16, 2024
1 parent b6dc27c commit 8c6544e
Show file tree
Hide file tree
Showing 14 changed files with 109 additions and 181 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,7 @@ jobs:
matrix:
python-version: ["3.9"]
env:
MODIN_SMALL_QUERY_COMPILER: "True"
MODIN_USE_PLAIN_PANDAS_QUERY_COMPILER: "True"
name: test-small-query-compiler python ${{matrix.python-version}})
steps:
- uses: actions/checkout@v4
Expand Down
4 changes: 2 additions & 2 deletions modin/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
GpuCount,
HdkFragmentSize,
HdkLaunchParameters,
InitializeWithSmallQueryCompilers,
IsDebug,
IsExperimental,
IsRayCluster,
Expand All @@ -58,6 +57,7 @@
TestReadFromPostgres,
TestReadFromSqlServer,
TrackFileLeaks,
UsePlainPandasQueryCompiler,
use_range_partitioning_groupby,
)
from modin.config.pubsub import Parameter, ValueSource, context
Expand All @@ -74,7 +74,7 @@
"CpuCount",
"GpuCount",
"Memory",
"InitializeWithSmallQueryCompilers",
"UsePlainPandasQueryCompiler",
# Ray specific
"IsRayCluster",
"RayRedisAddress",
Expand Down
6 changes: 3 additions & 3 deletions modin/config/envvars.py
Original file line number Diff line number Diff line change
Expand Up @@ -1051,10 +1051,10 @@ def _check_vars() -> None:
)


class InitializeWithSmallQueryCompilers(EnvironmentVariable, type=str):
"""Set to true to use implementation of SmallQueryCompiler."""
class UsePlainPandasQueryCompiler(EnvironmentVariable, type=bool):
"""Set to true to use implementation of PlainPandasQueryCompiler."""

varname = "MODIN_SMALL_QUERY_COMPILER"
varname = "MODIN_USE_PLAIN_PANDAS_QUERY_COMPILER"
default = False


Expand Down
142 changes: 31 additions & 111 deletions modin/experimental/core/storage_formats/pandas/small_query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,18 @@
# governing permissions and limitations under the License.

"""
Module contains ``SmallQueryCompiler`` class.
Module contains ``PlainPandasQueryCompiler`` class.
``SmallQueryCompiler`` is responsible for compiling efficient DataFrame algebra
``PlainPandasQueryCompiler`` is responsible for compiling efficient DataFrame algebra
queries for small data and empty ``PandasDataFrame``.
"""

import warnings

import numpy as np
import pandas
from pandas.core.dtypes.common import is_list_like, is_scalar

from modin.config.envvars import InitializeWithSmallQueryCompilers
from modin.config.envvars import UsePlainPandasQueryCompiler
from modin.core.storage_formats.base.query_compiler import BaseQueryCompiler
from modin.error_message import ErrorMessage
from modin.utils import (
MODIN_UNNAMED_SERIES_LABEL,
_inherit_docstrings,
Expand All @@ -48,9 +45,9 @@ def _get_axis(axis):
callable(PandasQueryCompiler) -> pandas.Index
"""
if axis == 0:
return lambda self: self._pandas_frame.index
return lambda self: self._modin_frame.index
else:
return lambda self: self._pandas_frame.columns
return lambda self: self._modin_frame.columns


def _set_axis(axis):
Expand All @@ -69,12 +66,12 @@ def _set_axis(axis):
if axis == 0:

def set_axis(self, idx):
self._pandas_frame.index = idx
self._modin_frame.index = idx

else:

def set_axis(self, cols):
self._pandas_frame.columns = cols
self._modin_frame.columns = cols

return set_axis

Expand Down Expand Up @@ -572,7 +569,7 @@ def _register_default_pandas(
"""

def caller(query_compiler, *args, **kwargs):
df = query_compiler._pandas_frame
df = query_compiler._modin_frame
if df_copy:
df = df.copy()
if is_series:
Expand Down Expand Up @@ -605,121 +602,46 @@ def caller(query_compiler, *args, **kwargs):


@_inherit_docstrings(BaseQueryCompiler)
class SmallQueryCompiler(BaseQueryCompiler):
class PlainPandasQueryCompiler(BaseQueryCompiler):
"""
Query compiler for the pandas storage format.
This class translates common query compiler API to default all methods
to pandas.
This class translates common query compiler API into
plain pandas to execute operations on small data
depending on the threshold.
Parameters
----------
pandas_frame : pandas.DataFrame
Modin Frame to query with the compiled queries.
Pandas frame to query with the compiled queries.
"""

def __init__(self, pandas_frame):
assert InitializeWithSmallQueryCompilers.get()
assert UsePlainPandasQueryCompiler.get()
if hasattr(pandas_frame, "_to_pandas"):
pandas_frame = pandas_frame._to_pandas()
if is_scalar(pandas_frame):
pandas_frame = pandas.DataFrame([pandas_frame])
elif not isinstance(pandas_frame, pandas.DataFrame):
pandas_frame = pandas.DataFrame(pandas_frame)

self._pandas_frame = pandas_frame

# def default_to_pandas(self, pandas_op, *args, **kwargs):
# args = (a.to_pandas() if isinstance(a, type(self)) else a for a in args)
# kwargs = {
# k: v.to_pandas if isinstance(v, type(self)) else v
# for k, v in kwargs.items()
# }
# op_name = getattr(pandas_op, "__name__", str(pandas_op))
# ErrorMessage.default_to_pandas(op_name)

# result = pandas_op(self._pandas_frame, *args, **kwargs)
# if isinstance(result, pandas.Series):
# if result.name is None:
# result.name = MODIN_UNNAMED_SERIES_LABEL
# result = result.to_frame()

# return result

def default_to_pandas(self, pandas_op, *args, **kwargs):
"""
Do fallback to pandas for the passed function.
Parameters
----------
pandas_op : callable(pandas.DataFrame) -> object
Function to apply to the casted to pandas frame.
*args : iterable
Positional arguments to pass to `pandas_op`.
**kwargs : dict
Key-value arguments to pass to `pandas_op`.
Returns
-------
BaseQueryCompiler
The result of the `pandas_op`, converted back to ``BaseQueryCompiler``.
"""
op_name = getattr(pandas_op, "__name__", str(pandas_op))
ErrorMessage.default_to_pandas(op_name)
args = try_cast_to_pandas(args)
kwargs = try_cast_to_pandas(kwargs)

with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=FutureWarning)
result = pandas_op(try_cast_to_pandas(self), *args, **kwargs)
if isinstance(result, (tuple, list)):
if "Series.tolist" in pandas_op.__name__:
# fast path: no need to iterate over the result from `tolist` function
return result
return [self.__wrap_in_qc(obj) for obj in result]
# breakpoint()
return type(self)(result)

def __wrap_in_qc(self, obj):
"""
Wrap `obj` in query compiler.
Parameters
----------
obj : any
Object to wrap.
Returns
-------
BaseQueryCompiler
Query compiler wrapping the object.
"""
if isinstance(obj, pandas.Series):
if obj.name is None:
obj.name = MODIN_UNNAMED_SERIES_LABEL
obj = obj.to_frame()
if isinstance(obj, pandas.DataFrame):
return self.from_pandas(obj, type(self._pandas_frame))
else:
return obj
self._modin_frame = pandas_frame

def execute(self):
"""Wait for all computations to complete without materializing data."""
pass

def take_2d_positional(self, index=None, columns=None):
index = slice(None) if index is None else index
columns = slice(None) if columns is None else columns
self._pandas_frame.iloc[index, columns]
return self.__constructor__(self._pandas_frame.iloc[index, columns])
return self.__constructor__(self._modin_frame.iloc[index, columns])

def copy(self):
return self.__constructor__(self._pandas_frame.copy())
return self.__constructor__(self._modin_frame.copy())

def setitem_bool(self, row_loc, col_loc, item):

self._pandas_frame.loc[row_loc._pandas_frame.squeeze(axis=1), col_loc] = item
return self.__constructor__(self._pandas_frame)
self._modin_frame.loc[row_loc._modin_frame.squeeze(axis=1), col_loc] = item
return self.__constructor__(self._modin_frame)

__and__ = _register_default_pandas(pandas.DataFrame.__and__, squeeze_series=True)
__dir__ = _register_default_pandas(pandas.DataFrame.__dir__)
Expand Down Expand Up @@ -943,7 +865,7 @@ def setitem_bool(self, row_loc, col_loc, item):
mask = _register_default_pandas(pandas.DataFrame.mask)
max = _register_default_pandas(pandas.DataFrame.max)
map = _register_default_pandas(pandas.DataFrame.map)
mean = _register_default_pandas(pandas.DataFrame.mean)
mean = _register_default_pandas(pandas.DataFrame.mean, return_modin=False)
median = _register_default_pandas(pandas.DataFrame.median)
melt = _register_default_pandas(pandas.DataFrame.melt)
memory_usage = _register_default_pandas(pandas.DataFrame.memory_usage)
Expand Down Expand Up @@ -1140,9 +1062,9 @@ def dot(self, other, squeeze_self=None, squeeze_other=None):
if squeeze_other:
other = other.squeeze()
if squeeze_self:
result = self._pandas_frame.squeeze(axis=1).dot(other)
result = self._modin_frame.squeeze(axis=1).dot(other)
else:
result = self._pandas_frame.dot(other)
result = self._modin_frame.dot(other)
if isinstance(result, pandas.Series):
if result.name is None:
result.name = "__reduced__"
Expand Down Expand Up @@ -1227,7 +1149,7 @@ def expanding_corr(
)

def get_axis(self, axis):
return self._pandas_frame.index if axis == 0 else self._pandas_frame.columns
return self._modin_frame.index if axis == 0 else self._modin_frame.columns

def get_index_name(self, axis=0):
return self.get_axis(axis).name
Expand All @@ -1240,9 +1162,9 @@ def set_index_name(self, name, axis=0):

def has_multiindex(self, axis=0):
if axis == 0:
return isinstance(self._pandas_frame.index, pandas.MultiIndex)
return isinstance(self._modin_frame.index, pandas.MultiIndex)
assert axis == 1
return isinstance(self._pandas_frame.columns, pandas.MultiIndex)
return isinstance(self._modin_frame.columns, pandas.MultiIndex)

def isin(self, values, ignore_indices=False, **kwargs):
if isinstance(values, type(self)) and ignore_indices:
Expand All @@ -1258,7 +1180,7 @@ def isin(self, values, ignore_indices=False, **kwargs):
)

def to_pandas(self):
return self._pandas_frame
return self._modin_frame

@classmethod
def from_pandas(cls, df, data_cls):
Expand All @@ -1277,7 +1199,7 @@ def finalize(self):
# Dataframe exchange protocol

def to_dataframe(self, nan_as_null: bool = False, allow_copy: bool = True):
return self._pandas_frame.__dataframe__(
return self._modin_frame.__dataframe__(
nan_as_null=nan_as_null, allow_copy=allow_copy
)

Expand All @@ -1292,14 +1214,12 @@ def from_dataframe(cls, df, data_cls):

@property
def dtypes(self):
return self._pandas_frame.dtypes
return self._modin_frame.dtypes

def getitem_column_array(self, key, numeric=False, ignore_order=False):
if numeric:
return self.__constructor__(self._pandas_frame.iloc[:, key])
return self.__constructor__(self._pandas_frame.loc[:, key])
return self.__constructor__(self._modin_frame.iloc[:, key])
return self.__constructor__(self._modin_frame.loc[:, key])

def is_series_like(self):
return (
len(self._pandas_frame.columns) == 1 or len(self._pandas_frame.index) == 1
)
return len(self._modin_frame.columns) == 1 or len(self._modin_frame.index) == 1
4 changes: 2 additions & 2 deletions modin/pandas/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
from modin import pandas as pd
from modin.error_message import ErrorMessage
from modin.experimental.core.storage_formats.pandas.small_query_compiler import (
SmallQueryCompiler,
PlainPandasQueryCompiler,
)
from modin.logging import ClassLogger, disable_logging
from modin.pandas.accessor import CachedAccessor, ModinAPI
Expand Down Expand Up @@ -286,7 +286,7 @@ def _build_repr_df(
indexer = row_indexer, _get_repr_axis_label_indexer(self.columns, num_cols)
else:
indexer = row_indexer
if isinstance(self._query_compiler, SmallQueryCompiler):
if isinstance(self._query_compiler, PlainPandasQueryCompiler):
return self._query_compiler.to_pandas().iloc[indexer]
return self.iloc[indexer]._query_compiler.to_pandas()

Expand Down
8 changes: 4 additions & 4 deletions modin/pandas/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@
from pandas.io.formats.info import DataFrameInfo
from pandas.util._validators import validate_bool_kwarg

from modin.config import InitializeWithSmallQueryCompilers, PersistentPickle
from modin.config import PersistentPickle, UsePlainPandasQueryCompiler
from modin.error_message import ErrorMessage
from modin.experimental.core.storage_formats.pandas.small_query_compiler import (
SmallQueryCompiler,
PlainPandasQueryCompiler,
)
from modin.logging import disable_logging
from modin.pandas import Categorical
Expand Down Expand Up @@ -261,11 +261,11 @@ def __init__(
else:
self._query_compiler = query_compiler

if query_compiler is None and InitializeWithSmallQueryCompilers.get():
if query_compiler is None and UsePlainPandasQueryCompiler.get():
small_dataframe = pandas.DataFrame(
data=data, index=index, columns=columns, dtype=dtype, copy=copy
)
self._query_compiler = SmallQueryCompiler(small_dataframe)
self._query_compiler = PlainPandasQueryCompiler(small_dataframe)

def __repr__(self) -> str:
"""
Expand Down
8 changes: 4 additions & 4 deletions modin/pandas/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@
from pandas.io.parsers import TextFileReader
from pandas.io.parsers.readers import _c_parser_defaults

from modin.config import ExperimentalNumPyAPI, InitializeWithSmallQueryCompilers
from modin.config import ExperimentalNumPyAPI, UsePlainPandasQueryCompiler
from modin.error_message import ErrorMessage
from modin.experimental.core.storage_formats.pandas.small_query_compiler import (
SmallQueryCompiler,
PlainPandasQueryCompiler,
)
from modin.logging import ClassLogger, enable_logging
from modin.utils import (
Expand Down Expand Up @@ -995,8 +995,8 @@ def from_pandas(df) -> DataFrame:
"""
from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher

if InitializeWithSmallQueryCompilers.get():
return ModinObjects.DataFrame(query_compiler=SmallQueryCompiler(df))
if UsePlainPandasQueryCompiler.get():
return ModinObjects.DataFrame(query_compiler=PlainPandasQueryCompiler(df))

return ModinObjects.DataFrame(query_compiler=FactoryDispatcher.from_pandas(df))

Expand Down
Loading

0 comments on commit 8c6544e

Please sign in to comment.