Skip to content

Commit

Permalink
Merge pull request #297 from capitalone/develop
Browse files Browse the repository at this point in the history
Release v0.12.0
  • Loading branch information
fdosani authored May 1, 2024
2 parents fd2447b + f088d63 commit 794d55b
Show file tree
Hide file tree
Showing 20 changed files with 5,671 additions and 3,044 deletions.
13 changes: 7 additions & 6 deletions .github/workflows/test-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: [3.8, 3.9, '3.10', '3.11']
spark-version: [3.1.3, 3.2.4, 3.3.4, 3.4.2, 3.5.0]
python-version: [3.9, '3.10', '3.11']
spark-version: [3.2.4, 3.3.4, 3.4.2, 3.5.1]
pandas-version: [2.2.2, 1.5.3]
exclude:
- python-version: '3.11'
spark-version: 3.1.3
- python-version: '3.11'
spark-version: 3.2.4
- python-version: '3.11'
Expand Down Expand Up @@ -51,6 +50,7 @@ jobs:
python -m pip install --upgrade pip
python -m pip install pytest pytest-spark pypandoc
python -m pip install pyspark==${{ matrix.spark-version }}
python -m pip install pandas==${{ matrix.pandas-version }}
python -m pip install .[dev]
- name: Test with pytest
run: |
Expand All @@ -62,7 +62,8 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: [3.8, 3.9, '3.10', '3.11']
python-version: [3.9, '3.10', '3.11']

env:
PYTHON_VERSION: ${{ matrix.python-version }}

Expand All @@ -88,7 +89,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: [3.8, 3.9, '3.10', '3.11']
python-version: [3.9, '3.10', '3.11']
env:
PYTHON_VERSION: ${{ matrix.python-version }}

Expand Down
3 changes: 2 additions & 1 deletion CONTRIBUTORS
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
- Usman Azhar
- Mark Zhou
- Ian Whitestone
- Faisal Dosani
- Faisal Dosani
- Lorenzo Mercado
48 changes: 38 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,44 @@ pip install datacompy[ray]

```

### In-scope Spark versions
Different versions of Spark play nicely with only certain versions of Python below is a matrix of what we test with
### Legacy Spark Deprecation

#### Starting with version 0.12.0

The original ``SparkCompare`` implementation differs from all the other native implementations. To align the API better, and keep behaviour consistent we are deprecating ``SparkCompare`` into a new module ``LegacySparkCompare``

If you wish to use the old SparkCompare moving forward you can

```python
import datacompy.legacy.LegacySparkCompare
```

#### Supported versions and dependncies

Different versions of Spark, Pandas, and Python interact differently. Below is a matrix of what we test with.
With the move to Pandas on Spark API and compatability issues with Pandas 2+ we will for the mean time note support Pandas 2
with the Pandas on Spark implementation. Spark plans to support Pandas 2 in [Spark 4](https://issues.apache.org/jira/browse/SPARK-44101)

With version ``0.12.0``:
- Not support Pandas ``2.0.0`` For the native Spark implemention
- Spark ``3.1`` support will be dropped
- Python ``3.8`` support is dropped


| | Spark 3.2.4 | Spark 3.3.4 | Spark 3.4.2 | Spark 3.5.1 |
|-------------|-------------|-------------|-------------|-------------|
| Python 3.9 |||||
| Python 3.10 |||||
| Python 3.11 |||||
| Python 3.12 |||||


| | Pandas < 1.5.3 | Pandas >=2.0.0 |
|---------------|----------------|----------------|
| Native Pandas |||
| Native Spark |||
| Fugue |||

| | Spark 3.1.3 | Spark 3.2.3 | Spark 3.3.4 | Spark 3.4.2 | Spark 3.5.0 |
|-------------|--------------|-------------|-------------|-------------|-------------|
| Python 3.8 ||||||
| Python 3.9 ||||||
| Python 3.10 ||||||
| Python 3.11 ||||||
| Python 3.12 ||||||


> [!NOTE]
Expand All @@ -56,7 +84,7 @@ Different versions of Spark play nicely with only certain versions of Python bel
## Supported backends

- Pandas: ([See documentation](https://capitalone.github.io/datacompy/pandas_usage.html))
- Spark: ([See documentation](https://capitalone.github.io/datacompy/spark_usage.html))
- Spark (Pandas on Spark API): ([See documentation](https://capitalone.github.io/datacompy/spark_usage.html))
- Polars (Experimental): ([See documentation](https://capitalone.github.io/datacompy/polars_usage.html))
- Fugue is a Python library that provides a unified interface for data processing on Pandas, DuckDB, Polars, Arrow,
Spark, Dask, Ray, and many other backends. DataComPy integrates with Fugue to provide a simple way to compare data
Expand Down
5 changes: 3 additions & 2 deletions datacompy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.

__version__ = "0.11.3"
__version__ = "0.12.0"

from datacompy.core import *
from datacompy.fugue import (
all_columns_match,
all_rows_overlap,
count_matching_rows,
intersect_columns,
is_match,
report,
unq_columns,
)
from datacompy.polars import PolarsCompare
from datacompy.spark import NUMERIC_SPARK_TYPES, SparkCompare
from datacompy.spark import SparkCompare
44 changes: 28 additions & 16 deletions datacompy/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
PROC COMPARE in SAS - i.e. human-readable reporting on the difference between
two dataframes.
"""

import logging
import os
from typing import Any, Dict, List, Optional, Union, cast
Expand Down Expand Up @@ -283,7 +284,11 @@ def _dataframe_merge(self, ignore_spaces: bool) -> None:
self.df2[column] = self.df2[column].str.strip()

outer_join = self.df1.merge(
self.df2, how="outer", suffixes=("_df1", "_df2"), indicator=True, **params
self.df2,
how="outer",
suffixes=("_" + self.df1_name, "_" + self.df2_name),
indicator=True,
**params,
)
# Clean up temp columns for duplicate row matching
if self._any_dupes:
Expand All @@ -295,8 +300,8 @@ def _dataframe_merge(self, ignore_spaces: bool) -> None:
self.df1.drop(order_column, axis=1, inplace=True)
self.df2.drop(order_column, axis=1, inplace=True)

df1_cols = get_merged_columns(self.df1, outer_join, "_df1")
df2_cols = get_merged_columns(self.df2, outer_join, "_df2")
df1_cols = get_merged_columns(self.df1, outer_join, self.df1_name)
df2_cols = get_merged_columns(self.df2, outer_join, self.df2_name)

LOG.debug("Selecting df1 unique rows")
self.df1_unq_rows = outer_join[outer_join["_merge"] == "left_only"][
Expand Down Expand Up @@ -334,8 +339,8 @@ def _intersect_compare(self, ignore_spaces: bool, ignore_case: bool) -> None:
max_diff = 0.0
null_diff = 0
else:
col_1 = column + "_df1"
col_2 = column + "_df2"
col_1 = column + "_" + self.df1_name
col_2 = column + "_" + self.df2_name
col_match = column + "_match"
self.intersect_rows[col_match] = columns_equal(
self.intersect_rows[col_1],
Expand Down Expand Up @@ -484,7 +489,10 @@ def sample_mismatch(
match_cnt = col_match.sum()
sample_count = min(sample_count, row_cnt - match_cnt)
sample = self.intersect_rows[~col_match].sample(sample_count)
return_cols = self.join_columns + [column + "_df1", column + "_df2"]
return_cols = self.join_columns + [
column + "_" + self.df1_name,
column + "_" + self.df2_name,
]
to_return = sample[return_cols]
if for_display:
to_return.columns = pd.Index(
Expand Down Expand Up @@ -517,8 +525,8 @@ def all_mismatch(self, ignore_matching_cols: bool = False) -> pd.DataFrame:
orig_col_name = col[:-6]

col_comparison = columns_equal(
self.intersect_rows[orig_col_name + "_df1"],
self.intersect_rows[orig_col_name + "_df2"],
self.intersect_rows[orig_col_name + "_" + self.df1_name],
self.intersect_rows[orig_col_name + "_" + self.df2_name],
self.rel_tol,
self.abs_tol,
self.ignore_spaces,
Expand All @@ -530,7 +538,12 @@ def all_mismatch(self, ignore_matching_cols: bool = False) -> pd.DataFrame:
):
LOG.debug(f"Adding column {orig_col_name} to the result.")
match_list.append(col)
return_list.extend([orig_col_name + "_df1", orig_col_name + "_df2"])
return_list.extend(
[
orig_col_name + "_" + self.df1_name,
orig_col_name + "_" + self.df2_name,
]
)
elif ignore_matching_cols:
LOG.debug(
f"Column {orig_col_name} is equal in df1 and df2. It will not be added to the result."
Expand Down Expand Up @@ -613,7 +626,6 @@ def df_to_str(pdf: pd.DataFrame) -> str:
)

# Column Matching
cnt_intersect = self.intersect_rows.shape[0]
report += render(
"column_comparison.txt",
len([col for col in self.column_stats if col["unequal_cnt"] > 0]),
Expand Down Expand Up @@ -804,7 +816,7 @@ def columns_equal(
compare = pd.Series(
(col_1 == col_2) | (col_1.isnull() & col_2.isnull())
)
except:
except Exception:
# Blanket exception should just return all False
compare = pd.Series(False, index=col_1.index)
compare.index = col_1.index
Expand Down Expand Up @@ -842,13 +854,13 @@ def compare_string_and_date_columns(
(pd.to_datetime(obj_column) == date_column)
| (obj_column.isnull() & date_column.isnull())
)
except:
except Exception:
try:
return pd.Series(
(pd.to_datetime(obj_column, format="mixed") == date_column)
| (obj_column.isnull() & date_column.isnull())
)
except:
except Exception:
return pd.Series(False, index=col_1.index)


Expand All @@ -871,8 +883,8 @@ def get_merged_columns(
for col in original_df.columns:
if col in merged_df.columns:
columns.append(col)
elif col + suffix in merged_df.columns:
columns.append(col + suffix)
elif col + "_" + suffix in merged_df.columns:
columns.append(col + "_" + suffix)
else:
raise ValueError("Column not found: %s", col)
return columns
Expand Down Expand Up @@ -920,7 +932,7 @@ def calculate_max_diff(col_1: "pd.Series[Any]", col_2: "pd.Series[Any]") -> floa
"""
try:
return cast(float, (col_1.astype(float) - col_2.astype(float)).abs().max())
except:
except Exception:
return 0.0


Expand Down
96 changes: 95 additions & 1 deletion datacompy/fugue.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,101 @@ def all_rows_overlap(
return all(overlap)


def count_matching_rows(
df1: AnyDataFrame,
df2: AnyDataFrame,
join_columns: Union[str, List[str]],
abs_tol: float = 0,
rel_tol: float = 0,
df1_name: str = "df1",
df2_name: str = "df2",
ignore_spaces: bool = False,
ignore_case: bool = False,
cast_column_names_lower: bool = True,
parallelism: Optional[int] = None,
strict_schema: bool = False,
) -> int:
"""Count the number of rows match (on overlapping fields)
Parameters
----------
df1 : ``AnyDataFrame``
First dataframe to check
df2 : ``AnyDataFrame``
Second dataframe to check
join_columns : list or str, optional
Column(s) to join dataframes on. If a string is passed in, that one
column will be used.
abs_tol : float, optional
Absolute tolerance between two values.
rel_tol : float, optional
Relative tolerance between two values.
df1_name : str, optional
A string name for the first dataframe. This allows the reporting to
print out an actual name instead of "df1", and allows human users to
more easily track the dataframes.
df2_name : str, optional
A string name for the second dataframe
ignore_spaces : bool, optional
Flag to strip whitespace (including newlines) from string columns (including any join
columns)
ignore_case : bool, optional
Flag to ignore the case of string columns
cast_column_names_lower: bool, optional
Boolean indicator that controls of column names will be cast into lower case
parallelism: int, optional
An integer representing the amount of parallelism. Entering a value for this
will force to use of Fugue over just vanilla Pandas
strict_schema: bool, optional
The schema must match exactly if set to ``True``. This includes the names and types. Allows for a fast fail.
Returns
-------
int
Number of matching rows
"""
if (
isinstance(df1, pd.DataFrame)
and isinstance(df2, pd.DataFrame)
and parallelism is None # user did not specify parallelism
and fa.get_current_parallelism() == 1 # currently on a local execution engine
):
comp = Compare(
df1=df1,
df2=df2,
join_columns=join_columns,
abs_tol=abs_tol,
rel_tol=rel_tol,
df1_name=df1_name,
df2_name=df2_name,
ignore_spaces=ignore_spaces,
ignore_case=ignore_case,
cast_column_names_lower=cast_column_names_lower,
)
return comp.count_matching_rows()

try:
count_matching_rows = _distributed_compare(
df1=df1,
df2=df2,
join_columns=join_columns,
return_obj_func=lambda comp: comp.count_matching_rows(),
abs_tol=abs_tol,
rel_tol=rel_tol,
df1_name=df1_name,
df2_name=df2_name,
ignore_spaces=ignore_spaces,
ignore_case=ignore_case,
cast_column_names_lower=cast_column_names_lower,
parallelism=parallelism,
strict_schema=strict_schema,
)
except _StrictSchemaError:
return False

return sum(count_matching_rows)


def report(
df1: AnyDataFrame,
df2: AnyDataFrame,
Expand Down Expand Up @@ -460,7 +555,6 @@ def _any(col: str) -> int:
any_mismatch = len(match_sample) > 0

# Column Matching
cnt_intersect = shape0("intersect_rows_shape")
rpt += render(
"column_comparison.txt",
len([col for col in column_stats if col["unequal_cnt"] > 0]),
Expand Down
Loading

0 comments on commit 794d55b

Please sign in to comment.