Skip to content

Commit

Permalink
Merge pull request #286 from capitalone/develop
Browse files Browse the repository at this point in the history
Release v0.11.3
  • Loading branch information
fdosani authored Mar 25, 2024
2 parents bfb6ddf + 605152c commit fd2447b
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 21 deletions.
13 changes: 6 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Then extended to carry that functionality over to Spark Dataframes.
pip install datacompy
```

or
or

```shell
conda install datacompy
Expand Down Expand Up @@ -50,18 +50,17 @@ Different versions of Spark play nicely with only certain versions of Python bel
| Python 3.12 ||||||


:::{note}
At the current time Python ``3.12`` is not supported by Spark and also Ray within Fugue.
:::
> [!NOTE]
> At the current time Python `3.12` is not supported by Spark and also Ray within Fugue.
## Supported backends

- Pandas: ([See documentation](https://capitalone.github.io/datacompy/pandas_usage.html))
- Spark: ([See documentation](https://capitalone.github.io/datacompy/spark_usage.html))
- Polars (Experimental): ([See documentation](https://capitalone.github.io/datacompy/polars_usage.html))
- Fugue is a Python library that provides a unified interface for data processing on Pandas, DuckDB, Polars, Arrow,
Spark, Dask, Ray, and many other backends. DataComPy integrates with Fugue to provide a simple way to compare data
across these backends. Please note that Fugue will use the Pandas (Native) logic at its lowest level
- Fugue is a Python library that provides a unified interface for data processing on Pandas, DuckDB, Polars, Arrow,
Spark, Dask, Ray, and many other backends. DataComPy integrates with Fugue to provide a simple way to compare data
across these backends. Please note that Fugue will use the Pandas (Native) logic at its lowest level
([See documentation](https://capitalone.github.io/datacompy/fugue_usage.html))

## Contributors
Expand Down
2 changes: 1 addition & 1 deletion datacompy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

__version__ = "0.11.2"
__version__ = "0.11.3"

from datacompy.core import *
from datacompy.fugue import (
Expand Down
33 changes: 25 additions & 8 deletions datacompy/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,10 @@ def rows_only_base(self) -> "pyspark.sql.DataFrame":
base_rows.createOrReplaceTempView("baseRows")
self.base_df.createOrReplaceTempView("baseTable")
join_condition = " AND ".join(
["A.`" + name + "`<=>B.`" + name + "`" for name in self._join_column_names]
[
"A.`" + name + "`<=>B.`" + name + "`"
for name in self._join_column_names
]
)
sql_query = "select A.* from baseTable as A, baseRows as B where {}".format(
join_condition
Expand All @@ -411,7 +414,10 @@ def rows_only_compare(self) -> Optional["pyspark.sql.DataFrame"]:
compare_rows.createOrReplaceTempView("compareRows")
self.compare_df.createOrReplaceTempView("compareTable")
where_condition = " AND ".join(
["A.`" + name + "`<=>B.`" + name + "`" for name in self._join_column_names]
[
"A.`" + name + "`<=>B.`" + name + "`"
for name in self._join_column_names
]
)
sql_query = (
"select A.* from compareTable as A, compareRows as B where {}".format(
Expand Down Expand Up @@ -443,15 +449,23 @@ def _generate_select_statement(self, match_data: bool = True) -> str:
[self._create_select_statement(name=column_name)]
)
elif column_name in base_only:
select_statement = select_statement + ",".join(["A.`" + column_name + "`"])
select_statement = select_statement + ",".join(
["A.`" + column_name + "`"]
)

elif column_name in compare_only:
if match_data:
select_statement = select_statement + ",".join(["B.`" + column_name + "`"])
select_statement = select_statement + ",".join(
["B.`" + column_name + "`"]
)
else:
select_statement = select_statement + ",".join(["A.`" + column_name + "`"])
select_statement = select_statement + ",".join(
["A.`" + column_name + "`"]
)
elif column_name in self._join_column_names:
select_statement = select_statement + ",".join(["A.`" + column_name + "`"])
select_statement = select_statement + ",".join(
["A.`" + column_name + "`"]
)

if column_name != sorted_list[-1]:
select_statement = select_statement + " , "
Expand Down Expand Up @@ -483,7 +497,10 @@ def _merge_dataframes(self) -> None:
def _get_or_create_joined_dataframe(self) -> "pyspark.sql.DataFrame":
if self._joined_dataframe is None:
join_condition = " AND ".join(
["A.`" + name + "`<=>B.`" + name + "`" for name in self._join_column_names]
[
"A.`" + name + "`<=>B.`" + name + "`"
for name in self._join_column_names
]
)
select_statement = self._generate_select_statement(match_data=True)

Expand Down Expand Up @@ -514,7 +531,7 @@ def _print_num_of_rows_with_column_equality(self, myfile: TextIO) -> None:

where_cond = " AND ".join(
[
"A." + name + "=" + str(MatchType.MATCH.value)
"A.`" + name + "`=" + str(MatchType.MATCH.value)
for name in self.columns_compared
]
)
Expand Down
17 changes: 12 additions & 5 deletions tests/test_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -2094,15 +2094,22 @@ def text_alignment_validator(
if not at_column_section and section_start in line:
at_column_section = True


def test_unicode_columns(spark_session):
df1 = spark_session.createDataFrame(
[{"a": 1, "例": 2}, {"a": 1, "例": 3}]
[
(1, "foo", "test"),
(2, "bar", "test"),
],
["id", "例", "予測対象日"],
)
df2 = spark_session.createDataFrame(
[{"a": 1, "例": 2}, {"a": 1, "例": 3}]
)
compare = SparkCompare(
spark_session, df1, df2, join_columns=["例"]
[
(1, "foo", "test"),
(2, "baz", "test"),
],
["id", "例", "予測対象日"],
)
compare = SparkCompare(spark_session, df1, df2, join_columns=["例"])
# Just render the report to make sure it renders.
compare.report()

0 comments on commit fd2447b

Please sign in to comment.