Skip to content

Commit

Permalink
Apply suggestions from code review
Browse files Browse the repository at this point in the history
Co-authored-by: Iaroslav Igoshev <Poolliver868@mail.ru>
Signed-off-by: arunjose696 <arunjose696@gmail.com>
  • Loading branch information
arunjose696 and YarShev committed Jun 14, 2024
1 parent da02e5f commit 20c2929
Show file tree
Hide file tree
Showing 11 changed files with 98 additions and 104 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ jobs:
with:
filters: |
test-small-query-compiler:
- 'modin/experimental/core/storage_formats/pandas/small_query_compiler.py'
- 'modin/experimental/core/storage_formats/pandas/native_query_compiler.py'
- 'modin/core/storage_formats/pandas/query_compiler.py'
- 'modin/core/storage_formats/base/query_compiler.py'
shared: &shared
Expand Down Expand Up @@ -647,7 +647,7 @@ jobs:
matrix:
python-version: ["3.9"]
env:
MODIN_NATIVE_DATAFRAME_MODE: "Native_pandas"
MODIN_NATIVE_DATAFRAME_MODE: "Pandas"
name: test-small-query-compiler python ${{matrix.python-version}})
steps:
- uses: actions/checkout@v4
Expand Down
21 changes: 10 additions & 11 deletions modin/config/envvars.py
Original file line number Diff line number Diff line change
Expand Up @@ -850,20 +850,19 @@ def _check_vars() -> None:

class NativeDataframeMode(EnvironmentVariable, type=str):
"""
The mode of execution used for handling dataframes in Modin.
When the env variable is set to None the PandasQueryCompiler would be used
which would lead to Modin executing dataframes in distributed fashion.
When set to Native_pandas NativeQueryCompiler is used which handles the
dataframes without distributing, falling back to native pandas functions.
In future more execution modes can be added for single node execution so
keeping the parameter as string.
When this config is set to ``Default``, ``PandasQueryCompiler`` is used,
which leads to Modin executing dataframes in distributed fashion.
When set to a string (e.g., ``Pandas``), ``NativeQueryCompiler`` is used,
which handles the dataframes without distributing,
falling back to native library functions (e.g., ``Pandas``).
This could be beneficial for handling relatively small dataframes
without involving additional overhead of communication between processes.
"""

varname = "MODIN_NATIVE_DATAFRAME_MODE"
choices = ("Native_pandas",)
default = None
choices = ("Pandas",)
default = "Default"


_check_vars()
6 changes: 2 additions & 4 deletions modin/core/execution/dispatching/factories/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@

from modin.config import NativeDataframeMode
from modin.core.io import BaseIO
from modin.experimental.core.storage_formats.pandas.native_query_compiler import (
NativeQueryCompiler,
)
from modin.core.storage_formats.pandas.native_query_compiler import NativeQueryCompiler

Check notice

Code scanning / CodeQL

Cyclic import Note

Import of module
modin.core.storage_formats.pandas.native_query_compiler
begins an import cycle.
from modin.utils import get_current_execution

_doc_abstract_factory_class = """
Expand Down Expand Up @@ -172,7 +170,7 @@ def prepare(cls):
method="io.from_pandas",
)
def _from_pandas(cls, df):
if NativeDataframeMode.get():
if NativeDataframeMode.get() == "Pandas":
df_copy = df.copy()
return NativeQueryCompiler(df_copy)

Check warning on line 175 in modin/core/execution/dispatching/factories/factories.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/dispatching/factories/factories.py#L174-L175

Added lines #L174 - L175 were not covered by tests
return cls.io_cls.from_pandas(df)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,8 +406,6 @@ def _fillna(df, value, **kwargs): # noqa: GL08
df = df.squeeze(axis=1)
if squeeze_value and isinstance(value, pandas.DataFrame):
value = value.squeeze(axis=1)
# if len(df.columns) == 1 and df.columns[0] == "__reduced__":
# df = df["__reduced__"]
return df.fillna(value, **kwargs)

Check warning on line 409 in modin/core/storage_formats/pandas/native_query_compiler.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/pandas/native_query_compiler.py#L403-L409

Added lines #L403 - L409 were not covered by tests


Expand Down Expand Up @@ -495,12 +493,10 @@ def _get_dummies(df, columns, **kwargs): # noqa: GL08
def _register_default_pandas(
func,
is_series=False,
squeeze_series=False,
squeeze_args=False,
squeeze_kwargs=False,
return_modin=True,
return_raw=False,
in_place=False,
df_copy=False,
filter_kwargs=[],
):
"""
Expand All @@ -512,18 +508,14 @@ def _register_default_pandas(
Function to apply.
is_series : bool, default: False
If True, the passed frame will always be squeezed to a series.
squeeze_series : bool, default: False
If True, the passed frame will always be squeezed to a series if there is a single column named "__reduced__".
squeeze_args : bool, default: False
If True, all passed arguments will be squeezed.
squeeze_kwargs : bool, default: False
If True, all passed key word arguments will be squeezed.
return_modin : bool, default: True
If True, the result will always try to convert to DataFrame or Series.
return_raw : bool, default: False
If True, and the result not DataFrame or Series it is returned as is without wrapping in query compiler.
in_place : bool, default: False
If True, the specified function will be applied on the passed frame in place.
df_copy : bool, default: False
If True, the specified function will be applied to a copy of the passed frame.
filter_kwargs : list, default: []
List of key word argument names to remove.
Expand All @@ -535,30 +527,21 @@ def _register_default_pandas(

def caller(query_compiler, *args, **kwargs):
df = query_compiler._modin_frame
if df_copy:
df = df.copy()
if is_series:
df = df.squeeze(axis=1)
exclude_names = [
# "broadcast",
"fold_axis",
# "squeeze_self",
# "squeeze_value",
"ignore_indices",
] + filter_kwargs
exclude_names = ["fold_axis"] + filter_kwargs
kwargs = kwargs.copy()
for name in exclude_names:
kwargs.pop(name, None)
args = try_cast_to_pandas(args, squeeze=squeeze_args)
kwargs = try_cast_to_pandas(kwargs, squeeze=squeeze_kwargs)
result = func(df, *args, **kwargs)
inplace_method = kwargs.get("inplace", False)

if in_place:
inplace_method = in_place
if inplace_method:
result = df
if not (return_modin or isinstance(result, (pandas.Series, pandas.DataFrame))):
if return_raw and not isinstance(result, (pandas.Series, pandas.DataFrame)):
return result
if isinstance(result, pandas.Series):
if result.name is None:
Expand All @@ -576,17 +559,20 @@ class NativeQueryCompiler(BaseQueryCompiler):
Query compiler for the pandas storage format.
This class translates common query compiler API into
plain pandas to execute operations on small data
depending on the threshold.
native library functions (e.g., pandas) to execute operations
on small data depending on the threshold.
Parameters
----------
pandas_frame : pandas.DataFrame
Pandas frame to query with the compiled queries.
"""

def __init__(self, pandas_frame):
assert NativeDataframeMode.get() == "Native_Pandas"
_modin_frame: pandas.DataFrame
_shape_hint: Optional[str]

def __init__(self, pandas_frame, shape_hint: Optional[str] = None):
assert NativeDataframeMode.get() == "Pandas"
if hasattr(pandas_frame, "_to_pandas"):
pandas_frame = pandas_frame._to_pandas()
if is_scalar(pandas_frame):
Expand All @@ -595,6 +581,7 @@ def __init__(self, pandas_frame):
pandas_frame = pandas.DataFrame(pandas_frame)

Check warning on line 581 in modin/core/storage_formats/pandas/native_query_compiler.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/pandas/native_query_compiler.py#L576-L581

Added lines #L576 - L581 were not covered by tests

self._modin_frame = pandas_frame
self._shape_hint = shape_hint

Check warning on line 584 in modin/core/storage_formats/pandas/native_query_compiler.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/pandas/native_query_compiler.py#L583-L584

Added lines #L583 - L584 were not covered by tests

def execute(self):
pass
Expand All @@ -617,6 +604,10 @@ def set_frame_dtypes_cache(self, dtypes):
Parameters
----------
dtypes : pandas.Series, ModinDtypes, callable or None
Notes
-----
This function is for consistency with other QCs, dtypes should be assigned directly on the frame.
"""
pass

Expand All @@ -627,6 +618,10 @@ def set_frame_index_cache(self, index):
Parameters
----------
index : sequence, callable or None
Notes
-----
This function is for consistency with other QCs, dtypes should be assigned directly on the frame.
"""
pass

Expand Down Expand Up @@ -665,27 +660,25 @@ def setitem_bool(self, row_loc, col_loc, item):
self._modin_frame.loc[row_loc._modin_frame.squeeze(axis=1), col_loc] = item
return self.__constructor__(self._modin_frame)

Check warning on line 661 in modin/core/storage_formats/pandas/native_query_compiler.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/pandas/native_query_compiler.py#L660-L661

Added lines #L660 - L661 were not covered by tests

__and__ = _register_default_pandas(pandas.DataFrame.__and__, squeeze_series=True)
__and__ = _register_default_pandas(pandas.DataFrame.__and__)
__dir__ = _register_default_pandas(pandas.DataFrame.__dir__)
__eq__ = _register_default_pandas(pandas.DataFrame.__eq__, squeeze_series=True)
__eq__ = _register_default_pandas(pandas.DataFrame.__eq__)
__format__ = _register_default_pandas(pandas.DataFrame.__format__)
__ge__ = _register_default_pandas(pandas.DataFrame.__ge__, squeeze_series=True)
__gt__ = _register_default_pandas(pandas.DataFrame.__gt__, squeeze_series=True)
__le__ = _register_default_pandas(pandas.DataFrame.__le__, squeeze_series=True)
__lt__ = _register_default_pandas(pandas.DataFrame.__lt__, squeeze_series=True)
__ne__ = _register_default_pandas(pandas.DataFrame.__ne__, squeeze_series=True)
__or__ = _register_default_pandas(pandas.DataFrame.__or__, squeeze_series=True)
__rand__ = _register_default_pandas(pandas.DataFrame.__rand__, squeeze_series=True)
__reduce__ = _register_default_pandas(
pandas.DataFrame.__reduce__, return_modin=False
)
__ge__ = _register_default_pandas(pandas.DataFrame.__ge__)
__gt__ = _register_default_pandas(pandas.DataFrame.__gt__)
__le__ = _register_default_pandas(pandas.DataFrame.__le__)
__lt__ = _register_default_pandas(pandas.DataFrame.__lt__)
__ne__ = _register_default_pandas(pandas.DataFrame.__ne__)
__or__ = _register_default_pandas(pandas.DataFrame.__or__)
__rand__ = _register_default_pandas(pandas.DataFrame.__rand__)
__reduce__ = _register_default_pandas(pandas.DataFrame.__reduce__, return_raw=True)
__reduce_ex__ = _register_default_pandas(
pandas.DataFrame.__reduce_ex__, return_modin=False
pandas.DataFrame.__reduce_ex__, return_raw=True
)
__ror__ = _register_default_pandas(pandas.DataFrame.__ror__, squeeze_series=True)
__rxor__ = _register_default_pandas(pandas.DataFrame.__rxor__, squeeze_series=True)
__ror__ = _register_default_pandas(pandas.DataFrame.__ror__)
__rxor__ = _register_default_pandas(pandas.DataFrame.__rxor__)
__sizeof__ = _register_default_pandas(pandas.DataFrame.__sizeof__)
__xor__ = _register_default_pandas(pandas.DataFrame.__xor__, squeeze_series=True)
__xor__ = _register_default_pandas(pandas.DataFrame.__xor__)
abs = _register_default_pandas(pandas.DataFrame.abs)
add = _register_default_pandas(_register_binary("add"))
all = _register_default_pandas(pandas.DataFrame.all)
Expand All @@ -696,10 +689,8 @@ def setitem_bool(self, row_loc, col_loc, item):
astype = _register_default_pandas(pandas.DataFrame.astype)
case_when = _register_default_pandas(pandas.Series.case_when)
cat_codes = _register_default_pandas(lambda ser: ser.cat.codes, is_series=True)
combine = _register_default_pandas(_combine, squeeze_series=True)
combine_first = _register_default_pandas(
lambda df, other: df.combine_first(other), squeeze_series=True
)
combine = _register_default_pandas(_combine)
combine_first = _register_default_pandas(lambda df, other: df.combine_first(other))
compare = _register_default_pandas(pandas.DataFrame.compare)
concat = _register_default_pandas(_concat)
conj = _register_default_pandas(
Expand All @@ -714,9 +705,7 @@ def setitem_bool(self, row_loc, col_loc, item):
cumprod = _register_default_pandas(pandas.DataFrame.cumprod)
cumsum = _register_default_pandas(pandas.DataFrame.cumsum)
delitem = _register_default_pandas(_delitem)
df_update = _register_default_pandas(
pandas.DataFrame.update, in_place=True, df_copy=True
)
df_update = _register_default_pandas(pandas.DataFrame.update, in_place=True)
diff = _register_default_pandas(pandas.DataFrame.diff)
dot = _register_default_pandas(_register_binary("dot"))
drop = _register_default_pandas(_drop)
Expand Down Expand Up @@ -825,7 +814,7 @@ def setitem_bool(self, row_loc, col_loc, item):

fillna = _register_default_pandas(_fillna)
first_valid_index = _register_default_pandas(
pandas.DataFrame.first_valid_index, return_modin=False
pandas.DataFrame.first_valid_index, return_raw=True
)
floordiv = _register_default_pandas(_register_binary("floordiv"))
ge = _register_default_pandas(_register_binary("ge"), filter_kwargs=["dtypes"])
Expand Down Expand Up @@ -859,7 +848,7 @@ def setitem_bool(self, row_loc, col_loc, item):
idxmax = _register_default_pandas(pandas.DataFrame.idxmax)
idxmin = _register_default_pandas(pandas.DataFrame.idxmin)
infer_objects = _register_default_pandas(
pandas.DataFrame.infer_objects, return_modin=False
pandas.DataFrame.infer_objects, return_raw=True
)
insert = _register_default_pandas(
pandas.DataFrame.insert, in_place=True, squeeze_args=True
Expand All @@ -876,18 +865,18 @@ def setitem_bool(self, row_loc, col_loc, item):
)
isna = _register_default_pandas(pandas.DataFrame.isna)
join = _register_default_pandas(pandas.DataFrame.join)
kurt = _register_default_pandas(pandas.DataFrame.kurt, return_modin=False)
kurt = _register_default_pandas(pandas.DataFrame.kurt, return_raw=True)
last_valid_index = _register_default_pandas(
pandas.DataFrame.last_valid_index, return_modin=False
pandas.DataFrame.last_valid_index, return_raw=True
)
le = _register_default_pandas(_register_binary("le"), filter_kwargs=["dtypes"])
lt = _register_default_pandas(_register_binary("lt"), filter_kwargs=["dtypes"])
# mad = _register_default_pandas(pandas.DataFrame.mad)
mask = _register_default_pandas(pandas.DataFrame.mask)
max = _register_default_pandas(pandas.DataFrame.max)
map = _register_default_pandas(pandas.DataFrame.map)
mean = _register_default_pandas(pandas.DataFrame.mean, return_modin=False)
median = _register_default_pandas(pandas.DataFrame.median, return_modin=False)
mean = _register_default_pandas(pandas.DataFrame.mean, return_raw=True)
median = _register_default_pandas(pandas.DataFrame.median, return_raw=True)
melt = _register_default_pandas(pandas.DataFrame.melt)
memory_usage = _register_default_pandas(pandas.DataFrame.memory_usage)
merge = _register_default_pandas(pandas.DataFrame.merge)
Expand All @@ -899,9 +888,7 @@ def setitem_bool(self, row_loc, col_loc, item):
negative = _register_default_pandas(pandas.DataFrame.__neg__)
nlargest = _register_default_pandas(pandas.DataFrame.nlargest)
notna = _register_default_pandas(pandas.DataFrame.notna)
nsmallest = _register_default_pandas(
lambda df, **kwargs: df.nsmallest(**kwargs), squeeze_series=True
)
nsmallest = _register_default_pandas(lambda df, **kwargs: df.nsmallest(**kwargs))
nunique = _register_default_pandas(pandas.DataFrame.nunique)
pivot = _register_default_pandas(pandas.DataFrame.pivot)
pivot_table = _register_default_pandas(pandas.DataFrame.pivot_table)
Expand Down Expand Up @@ -982,7 +969,7 @@ def setitem_bool(self, row_loc, col_loc, item):
series_view = _register_default_pandas(pandas.Series.view, is_series=True)
set_index_from_columns = _register_default_pandas(pandas.DataFrame.set_index)
setitem = _register_default_pandas(_setitem)
skew = _register_default_pandas(pandas.DataFrame.skew, return_modin=False)
skew = _register_default_pandas(pandas.DataFrame.skew, return_raw=True)
sort_index = _register_default_pandas(_sort_index)
sort_columns_by_row_values = _register_default_pandas(
lambda df, columns, **kwargs: df.sort_values(by=columns, axis=1, **kwargs)
Expand Down Expand Up @@ -1044,13 +1031,13 @@ def setitem_bool(self, row_loc, col_loc, item):
sum_min_count = _register_default_pandas(pandas.DataFrame.sum)
to_datetime = _register_default_pandas(_to_datetime)
to_numeric = _register_default_pandas(_to_numeric)
to_numpy = _register_default_pandas(pandas.DataFrame.to_numpy, return_modin=False)
to_numpy = _register_default_pandas(pandas.DataFrame.to_numpy, return_raw=True)
to_timedelta = _register_default_pandas(
lambda ser, *args, **kwargs: pandas.to_timedelta(ser, *args, **kwargs),
is_series=True,
)
transpose = _register_default_pandas(pandas.DataFrame.transpose)
truediv = _register_default_pandas(_register_binary("truediv"), squeeze_series=True)
truediv = _register_default_pandas(_register_binary("truediv"))
unstack = _register_default_pandas(pandas.DataFrame.unstack)
var = _register_default_pandas(pandas.DataFrame.var)
where = _register_default_pandas(pandas.DataFrame.where)
Expand Down
2 changes: 1 addition & 1 deletion modin/tests/pandas/dataframe/test_binary.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ def operation(df):
reason="Modin on this engine doesn't create virtual partitions.",
)
@pytest.mark.skipif(
NativeDataframeMode.get() is not None,
NativeDataframeMode.get() == "Pandas",
reason="NativeQueryCompiler does not contain partitions.",
)
@pytest.mark.parametrize(
Expand Down
Loading

0 comments on commit 20c2929

Please sign in to comment.