diff --git a/README.md b/README.md index fefe447..dc518c9 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ Then extended to carry that functionality over to Spark Dataframes. pip install datacompy ``` -or +or ```shell conda install datacompy @@ -50,18 +50,17 @@ Different versions of Spark play nicely with only certain versions of Python bel | Python 3.12 | ❌ | ❌ | ❌ | ❌ | ❌ | -:::{note} -At the current time Python ``3.12`` is not supported by Spark and also Ray within Fugue. -::: +> [!NOTE] +> At the current time Python `3.12` is not supported by Spark and also Ray within Fugue. ## Supported backends - Pandas: ([See documentation](https://capitalone.github.io/datacompy/pandas_usage.html)) - Spark: ([See documentation](https://capitalone.github.io/datacompy/spark_usage.html)) - Polars (Experimental): ([See documentation](https://capitalone.github.io/datacompy/polars_usage.html)) -- Fugue is a Python library that provides a unified interface for data processing on Pandas, DuckDB, Polars, Arrow, - Spark, Dask, Ray, and many other backends. DataComPy integrates with Fugue to provide a simple way to compare data - across these backends. Please note that Fugue will use the Pandas (Native) logic at its lowest level +- Fugue is a Python library that provides a unified interface for data processing on Pandas, DuckDB, Polars, Arrow, + Spark, Dask, Ray, and many other backends. DataComPy integrates with Fugue to provide a simple way to compare data + across these backends. Please note that Fugue will use the Pandas (Native) logic at its lowest level ([See documentation](https://capitalone.github.io/datacompy/fugue_usage.html)) ## Contributors diff --git a/datacompy/__init__.py b/datacompy/__init__.py index 6bc1e6e..6b1aab2 100644 --- a/datacompy/__init__.py +++ b/datacompy/__init__.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "0.11.2" +__version__ = "0.11.3" from datacompy.core import * from datacompy.fugue import ( diff --git a/datacompy/spark.py b/datacompy/spark.py index cf32ff1..9fdc209 100644 --- a/datacompy/spark.py +++ b/datacompy/spark.py @@ -391,7 +391,10 @@ def rows_only_base(self) -> "pyspark.sql.DataFrame": base_rows.createOrReplaceTempView("baseRows") self.base_df.createOrReplaceTempView("baseTable") join_condition = " AND ".join( - ["A.`" + name + "`<=>B.`" + name + "`" for name in self._join_column_names] + [ + "A.`" + name + "`<=>B.`" + name + "`" + for name in self._join_column_names + ] ) sql_query = "select A.* from baseTable as A, baseRows as B where {}".format( join_condition @@ -411,7 +414,10 @@ def rows_only_compare(self) -> Optional["pyspark.sql.DataFrame"]: compare_rows.createOrReplaceTempView("compareRows") self.compare_df.createOrReplaceTempView("compareTable") where_condition = " AND ".join( - ["A.`" + name + "`<=>B.`" + name + "`" for name in self._join_column_names] + [ + "A.`" + name + "`<=>B.`" + name + "`" + for name in self._join_column_names + ] ) sql_query = ( "select A.* from compareTable as A, compareRows as B where {}".format( @@ -443,15 +449,23 @@ def _generate_select_statement(self, match_data: bool = True) -> str: [self._create_select_statement(name=column_name)] ) elif column_name in base_only: - select_statement = select_statement + ",".join(["A.`" + column_name + "`"]) + select_statement = select_statement + ",".join( + ["A.`" + column_name + "`"] + ) elif column_name in compare_only: if match_data: - select_statement = select_statement + ",".join(["B.`" + column_name + "`"]) + select_statement = select_statement + ",".join( + ["B.`" + column_name + "`"] + ) else: - select_statement = select_statement + ",".join(["A.`" + column_name + "`"]) + select_statement = select_statement + ",".join( + ["A.`" + column_name + "`"] + ) elif column_name in self._join_column_names: - select_statement = select_statement + ",".join(["A.`" + column_name + "`"]) + select_statement = select_statement + ",".join( + ["A.`" + column_name + "`"] + ) if column_name != sorted_list[-1]: select_statement = select_statement + " , " @@ -483,7 +497,10 @@ def _merge_dataframes(self) -> None: def _get_or_create_joined_dataframe(self) -> "pyspark.sql.DataFrame": if self._joined_dataframe is None: join_condition = " AND ".join( - ["A.`" + name + "`<=>B.`" + name + "`" for name in self._join_column_names] + [ + "A.`" + name + "`<=>B.`" + name + "`" + for name in self._join_column_names + ] ) select_statement = self._generate_select_statement(match_data=True) @@ -514,7 +531,7 @@ def _print_num_of_rows_with_column_equality(self, myfile: TextIO) -> None: where_cond = " AND ".join( [ - "A." + name + "=" + str(MatchType.MATCH.value) + "A.`" + name + "`=" + str(MatchType.MATCH.value) for name in self.columns_compared ] ) diff --git a/tests/test_spark.py b/tests/test_spark.py index 92e9d87..af8aa8f 100644 --- a/tests/test_spark.py +++ b/tests/test_spark.py @@ -2094,15 +2094,22 @@ def text_alignment_validator( if not at_column_section and section_start in line: at_column_section = True + def test_unicode_columns(spark_session): df1 = spark_session.createDataFrame( - [{"a": 1, "例": 2}, {"a": 1, "例": 3}] + [ + (1, "foo", "test"), + (2, "bar", "test"), + ], + ["id", "例", "予測対象日"], ) df2 = spark_session.createDataFrame( - [{"a": 1, "例": 2}, {"a": 1, "例": 3}] - ) - compare = SparkCompare( - spark_session, df1, df2, join_columns=["例"] + [ + (1, "foo", "test"), + (2, "baz", "test"), + ], + ["id", "例", "予測対象日"], ) + compare = SparkCompare(spark_session, df1, df2, join_columns=["例"]) # Just render the report to make sure it renders. compare.report()