diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala index b1d239337aa01..79e88f1095345 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala @@ -57,10 +57,11 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite { conn.prepareStatement("CREATE TABLE numbers (onebit BIT(1), tenbits BIT(10), " + "small SMALLINT, med MEDIUMINT, nor INT, big BIGINT, deci DECIMAL(40,20), flt FLOAT, " - + "dbl DOUBLE, tiny TINYINT)").executeUpdate() + + "dbl DOUBLE, tiny TINYINT, u_tiny TINYINT UNSIGNED)").executeUpdate() + conn.prepareStatement("INSERT INTO numbers VALUES (b'0', b'1000100101', " + "17, 77777, 123456789, 123456789012345, 123456789012345.123456789012345, " - + "42.75, 1.0000000000000002, -128)").executeUpdate() + + "42.75, 1.0000000000000002, -128, 255)").executeUpdate() conn.prepareStatement("CREATE TABLE dates (d DATE, t TIME, dt DATETIME, ts TIMESTAMP, " + "yr YEAR)").executeUpdate() @@ -90,7 +91,7 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite { val rows = df.collect() assert(rows.length == 1) val types = rows(0).toSeq.map(x => x.getClass.toString) - assert(types.length == 10) + assert(types.length == 11) assert(types(0).equals("class java.lang.Boolean")) assert(types(1).equals("class java.lang.Long")) assert(types(2).equals("class java.lang.Integer")) @@ -101,6 +102,7 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite { assert(types(7).equals("class java.lang.Double")) assert(types(8).equals("class java.lang.Double")) assert(types(9).equals("class java.lang.Byte")) + assert(types(10).equals("class java.lang.Short")) assert(rows(0).getBoolean(0) == false) assert(rows(0).getLong(1) == 0x225) assert(rows(0).getInt(2) == 17) @@ -112,6 +114,7 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite { assert(rows(0).getDouble(7) == 42.75) assert(rows(0).getDouble(8) == 1.0000000000000002) assert(rows(0).getByte(9) == 0x80.toByte) + assert(rows(0).getShort(10) == 0xff.toShort) } test("Date types") { diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/DB2IntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/DB2IntegrationSuite.scala index c3ec7e1925fa3..6c1b7fdd1be5a 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/DB2IntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/DB2IntegrationSuite.scala @@ -70,11 +70,13 @@ class DB2IntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest { override def testUpdateColumnType(tbl: String): Unit = { sql(s"CREATE TABLE $tbl (ID INTEGER)") var t = spark.table(tbl) - var expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata) + var expectedSchema = new StructType() + .add("ID", IntegerType, true, defaultMetadata(IntegerType)) assert(t.schema === expectedSchema) sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE DOUBLE") t = spark.table(tbl) - expectedSchema = new StructType().add("ID", DoubleType, true, defaultMetadata) + expectedSchema = new StructType() + .add("ID", DoubleType, true, defaultMetadata(DoubleType)) assert(t.schema === expectedSchema) // Update column type from DOUBLE to STRING val sql1 = s"ALTER TABLE $tbl ALTER COLUMN id TYPE VARCHAR(10)" @@ -97,7 +99,8 @@ class DB2IntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest { sql(s"CREATE TABLE $tbl (ID INT)" + s" TBLPROPERTIES('CCSID'='UNICODE')") val t = spark.table(tbl) - val expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata) + val expectedSchema = new StructType() + .add("ID", IntegerType, true, defaultMetadata(IntegerType)) assert(t.schema === expectedSchema) } diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MsSqlServerIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MsSqlServerIntegrationSuite.scala index fc93f5cba4c03..e451cc2b8c52a 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MsSqlServerIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MsSqlServerIntegrationSuite.scala @@ -93,11 +93,13 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JD override def testUpdateColumnType(tbl: String): Unit = { sql(s"CREATE TABLE $tbl (ID INTEGER)") var t = spark.table(tbl) - var expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata) + var expectedSchema = new StructType() + .add("ID", IntegerType, true, defaultMetadata(IntegerType)) assert(t.schema === expectedSchema) sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE STRING") t = spark.table(tbl) - expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata) + expectedSchema = new StructType() + .add("ID", StringType, true, defaultMetadata()) assert(t.schema === expectedSchema) // Update column type from STRING to INTEGER val sql1 = s"ALTER TABLE $tbl ALTER COLUMN id TYPE INTEGER" diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala index 5e340f135c85d..2b189db2c1cbd 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala @@ -83,6 +83,12 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest private var mySQLVersion = -1 + override def defaultMetadata(dataType: DataType = StringType): Metadata = new MetadataBuilder() + .putLong("scale", 0) + .putBoolean("isTimestampNTZ", false) + .putBoolean("isSigned", true) + .build() + override def tablePreparation(connection: Connection): Unit = { mySQLVersion = connection.getMetaData.getDatabaseMajorVersion connection.prepareStatement( @@ -93,11 +99,13 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest override def testUpdateColumnType(tbl: String): Unit = { sql(s"CREATE TABLE $tbl (ID INTEGER)") var t = spark.table(tbl) - var expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata) + var expectedSchema = new StructType() + .add("ID", IntegerType, true, defaultMetadata(IntegerType)) assert(t.schema === expectedSchema) sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE STRING") t = spark.table(tbl) - expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata) + expectedSchema = new StructType() + .add("ID", StringType, true, defaultMetadata()) assert(t.schema === expectedSchema) // Update column type from STRING to INTEGER val sql1 = s"ALTER TABLE $tbl ALTER COLUMN id TYPE INTEGER" @@ -145,7 +153,8 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest sql(s"CREATE TABLE $tbl (ID INT)" + s" TBLPROPERTIES('ENGINE'='InnoDB', 'DEFAULT CHARACTER SET'='utf8')") val t = spark.table(tbl) - val expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata) + val expectedSchema = new StructType() + .add("ID", IntegerType, true, defaultMetadata(IntegerType)) assert(t.schema === expectedSchema) } diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleIntegrationSuite.scala index 591147413486e..0aa2905f93b85 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleIntegrationSuite.scala @@ -77,9 +77,10 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTes override val namespaceOpt: Option[String] = Some("SYSTEM") override val db = new OracleDatabaseOnDocker - override val defaultMetadata: Metadata = new MetadataBuilder() + override def defaultMetadata(dataType: DataType): Metadata = new MetadataBuilder() .putLong("scale", 0) .putBoolean("isTimestampNTZ", false) + .putBoolean("isSigned", dataType.isInstanceOf[NumericType] || dataType.isInstanceOf[StringType]) .putString(CHAR_VARCHAR_TYPE_STRING_METADATA_KEY, "varchar(255)") .build() @@ -101,11 +102,13 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTes override def testUpdateColumnType(tbl: String): Unit = { sql(s"CREATE TABLE $tbl (ID INTEGER)") var t = spark.table(tbl) - var expectedSchema = new StructType().add("ID", DecimalType(10, 0), true, super.defaultMetadata) + var expectedSchema = new StructType() + .add("ID", DecimalType(10, 0), true, super.defaultMetadata(DecimalType(10, 0))) assert(t.schema === expectedSchema) sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE LONG") t = spark.table(tbl) - expectedSchema = new StructType().add("ID", DecimalType(19, 0), true, super.defaultMetadata) + expectedSchema = new StructType() + .add("ID", DecimalType(19, 0), true, super.defaultMetadata(DecimalType(19, 0))) assert(t.schema === expectedSchema) // Update column type from LONG to INTEGER val sql1 = s"ALTER TABLE $tbl ALTER COLUMN id TYPE INTEGER" diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala index 233a634cac678..1f09c2fd3fc59 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala @@ -64,11 +64,13 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT override def testUpdateColumnType(tbl: String): Unit = { sql(s"CREATE TABLE $tbl (ID INTEGER)") var t = spark.table(tbl) - var expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata) + var expectedSchema = new StructType() + .add("ID", IntegerType, true, defaultMetadata(IntegerType)) assert(t.schema === expectedSchema) sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE STRING") t = spark.table(tbl) - expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata) + expectedSchema = new StructType() + .add("ID", StringType, true, defaultMetadata()) assert(t.schema === expectedSchema) // Update column type from STRING to INTEGER val sql1 = s"ALTER TABLE $tbl ALTER COLUMN id TYPE INTEGER" @@ -91,7 +93,8 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT sql(s"CREATE TABLE $tbl (ID INT)" + s" TBLPROPERTIES('TABLESPACE'='pg_default')") val t = spark.table(tbl) - val expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata) + val expectedSchema = new StructType() + .add("ID", IntegerType, true, defaultMetadata(IntegerType)) assert(t.schema === expectedSchema) } diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala index 8dd377f4a35fa..c80fbfc748dd1 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala @@ -49,9 +49,10 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu def notSupportsTableComment: Boolean = false - def defaultMetadata: Metadata = new MetadataBuilder() + def defaultMetadata(dataType: DataType = StringType): Metadata = new MetadataBuilder() .putLong("scale", 0) .putBoolean("isTimestampNTZ", false) + .putBoolean("isSigned", dataType.isInstanceOf[NumericType]) .build() def testUpdateColumnNullability(tbl: String): Unit = { @@ -59,11 +60,11 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu var t = spark.table(s"$catalogName.alt_table") // nullable is true in the expectedSchema because Spark always sets nullable to true // regardless of the JDBC metadata https://github.com/apache/spark/pull/18445 - var expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata) + var expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata()) assert(t.schema === expectedSchema) sql(s"ALTER TABLE $catalogName.alt_table ALTER COLUMN ID DROP NOT NULL") t = spark.table(s"$catalogName.alt_table") - expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata) + expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata()) assert(t.schema === expectedSchema) // Update nullability of not existing column val msg = intercept[AnalysisException] { @@ -75,8 +76,9 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu def testRenameColumn(tbl: String): Unit = { sql(s"ALTER TABLE $tbl RENAME COLUMN ID TO RENAMED") val t = spark.table(s"$tbl") - val expectedSchema = new StructType().add("RENAMED", StringType, true, defaultMetadata) - .add("ID1", StringType, true, defaultMetadata).add("ID2", StringType, true, defaultMetadata) + val expectedSchema = new StructType().add("RENAMED", StringType, true, defaultMetadata()) + .add("ID1", StringType, true, defaultMetadata()) + .add("ID2", StringType, true, defaultMetadata()) assert(t.schema === expectedSchema) } @@ -86,16 +88,19 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu withTable(s"$catalogName.alt_table") { sql(s"CREATE TABLE $catalogName.alt_table (ID STRING)") var t = spark.table(s"$catalogName.alt_table") - var expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata) + var expectedSchema = new StructType() + .add("ID", StringType, true, defaultMetadata()) assert(t.schema === expectedSchema) sql(s"ALTER TABLE $catalogName.alt_table ADD COLUMNS (C1 STRING, C2 STRING)") t = spark.table(s"$catalogName.alt_table") - expectedSchema = expectedSchema.add("C1", StringType, true, defaultMetadata) - .add("C2", StringType, true, defaultMetadata) + expectedSchema = expectedSchema + .add("C1", StringType, true, defaultMetadata()) + .add("C2", StringType, true, defaultMetadata()) assert(t.schema === expectedSchema) sql(s"ALTER TABLE $catalogName.alt_table ADD COLUMNS (C3 STRING)") t = spark.table(s"$catalogName.alt_table") - expectedSchema = expectedSchema.add("C3", StringType, true, defaultMetadata) + expectedSchema = expectedSchema + .add("C3", StringType, true, defaultMetadata()) assert(t.schema === expectedSchema) // Add already existing column checkError( @@ -128,7 +133,8 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu sql(s"ALTER TABLE $catalogName.alt_table DROP COLUMN C1") sql(s"ALTER TABLE $catalogName.alt_table DROP COLUMN c3") val t = spark.table(s"$catalogName.alt_table") - val expectedSchema = new StructType().add("C2", StringType, true, defaultMetadata) + val expectedSchema = new StructType() + .add("C2", StringType, true, defaultMetadata()) assert(t.schema === expectedSchema) // Drop not existing column val msg = intercept[AnalysisException] { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index bc88ab9bfcae8..84d87f0082178 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -274,6 +274,7 @@ object JdbcUtils extends Logging with SQLConfHelper { val fields = new Array[StructField](ncols) var i = 0 while (i < ncols) { + val metadata = new MetadataBuilder() val columnName = rsmd.getColumnLabel(i + 1) val dataType = rsmd.getColumnType(i + 1) val typeName = rsmd.getColumnTypeName(i + 1) @@ -294,8 +295,6 @@ object JdbcUtils extends Logging with SQLConfHelper { } else { rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls } - val metadata = new MetadataBuilder() - metadata.putLong("scale", fieldScale) dataType match { case java.sql.Types.TIME => @@ -307,7 +306,9 @@ object JdbcUtils extends Logging with SQLConfHelper { metadata.putBoolean("rowid", true) case _ => } + metadata.putBoolean("isSigned", isSigned) metadata.putBoolean("isTimestampNTZ", isTimestampNTZ) + metadata.putLong("scale", fieldScale) val columnType = dialect.getCatalystType(dataType, typeName, fieldSize, metadata).getOrElse( getCatalystType(dataType, typeName, fieldSize, fieldScale, isSigned, isTimestampNTZ)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index 42b1a3a2854ed..4e5f092b193c8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.connector.catalog.index.TableIndex import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, NamedReference, NullOrdering, SortDirection} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils} -import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, FloatType, LongType, MetadataBuilder, StringType, TimestampType} +import org.apache.spark.sql.types._ private case object MySQLDialect extends JdbcDialect with SQLConfHelper { @@ -107,8 +107,12 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper { // Some MySQL JDBC drivers converts JSON type into Types.VARCHAR with a precision of -1. // Explicitly converts it into StringType here. Some(StringType) - case Types.TINYINT if "TINYINT".equalsIgnoreCase(typeName) => - Some(ByteType) + case Types.TINYINT => + if (md.build().getBoolean("isSigned")) { + Some(ByteType) + } else { + Some(ShortType) + } case Types.TIMESTAMP if "DATETIME".equalsIgnoreCase(typeName) => // scalastyle:off line.size.limit // In MYSQL, DATETIME is TIMESTAMP WITHOUT TIME ZONE diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala index fc313de6c8fee..f4e7921e88bc2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala @@ -37,9 +37,11 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { val tempDir = Utils.createTempDir() val url = s"jdbc:h2:${tempDir.getCanonicalPath};user=testUser;password=testPass" - val defaultMetadata = new MetadataBuilder() + + def defaultMetadata(dataType: DataType): Metadata = new MetadataBuilder() .putLong("scale", 0) .putBoolean("isTimestampNTZ", false) + .putBoolean("isSigned", dataType.isInstanceOf[NumericType]) .build() override def sparkConf: SparkConf = super.sparkConf @@ -142,8 +144,8 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { test("load a table") { val t = spark.table("h2.test.people") val expectedSchema = new StructType() - .add("NAME", VarcharType(32), true, defaultMetadata) - .add("ID", IntegerType, true, defaultMetadata) + .add("NAME", VarcharType(32), true, defaultMetadata(VarcharType(32))) + .add("ID", IntegerType, true, defaultMetadata(IntegerType)) assert(t.schema === CharVarcharUtils.replaceCharVarcharWithStringInSchema(expectedSchema)) Seq( "h2.test.not_existing_table" -> "`h2`.`test`.`not_existing_table`", @@ -185,13 +187,13 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { sql(s"ALTER TABLE $tableName ADD COLUMNS (C1 INTEGER, C2 STRING)") var t = spark.table(tableName) var expectedSchema = new StructType() - .add("ID", IntegerType, true, defaultMetadata) - .add("C1", IntegerType, true, defaultMetadata) - .add("C2", StringType, true, defaultMetadata) + .add("ID", IntegerType, true, defaultMetadata(IntegerType)) + .add("C1", IntegerType, true, defaultMetadata(IntegerType)) + .add("C2", StringType, true, defaultMetadata(StringType)) assert(t.schema === expectedSchema) sql(s"ALTER TABLE $tableName ADD COLUMNS (c3 DOUBLE)") t = spark.table(tableName) - expectedSchema = expectedSchema.add("c3", DoubleType, true, defaultMetadata) + expectedSchema = expectedSchema.add("c3", DoubleType, true, defaultMetadata(DoubleType)) assert(t.schema === expectedSchema) // Add already existing column checkError( @@ -229,8 +231,8 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { sql(s"ALTER TABLE $tableName RENAME COLUMN id TO C") val t = spark.table(tableName) val expectedSchema = new StructType() - .add("C", IntegerType, true, defaultMetadata) - .add("C0", IntegerType, true, defaultMetadata) + .add("C", IntegerType, true, defaultMetadata(IntegerType)) + .add("C0", IntegerType, true, defaultMetadata(IntegerType)) assert(t.schema === expectedSchema) // Rename to already existing column checkError( @@ -268,7 +270,8 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { sql(s"ALTER TABLE $tableName DROP COLUMN C1") sql(s"ALTER TABLE $tableName DROP COLUMN c3") val t = spark.table(tableName) - val expectedSchema = new StructType().add("C2", IntegerType, true, defaultMetadata) + val expectedSchema = new StructType() + .add("C2", IntegerType, true, defaultMetadata(IntegerType)) assert(t.schema === expectedSchema) // Drop not existing column val sqlText = s"ALTER TABLE $tableName DROP COLUMN bad_column" @@ -307,8 +310,8 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { sql(s"ALTER TABLE $tableName ALTER COLUMN deptno TYPE DOUBLE") val t = spark.table(tableName) val expectedSchema = new StructType() - .add("ID", DoubleType, true, defaultMetadata) - .add("deptno", DoubleType, true, defaultMetadata) + .add("ID", DoubleType, true, defaultMetadata(DoubleType)) + .add("deptno", DoubleType, true, defaultMetadata(DoubleType)) assert(t.schema === expectedSchema) // Update not existing column val sqlText = s"ALTER TABLE $tableName ALTER COLUMN bad_column TYPE DOUBLE" @@ -356,8 +359,8 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { sql(s"ALTER TABLE $tableName ALTER COLUMN deptno DROP NOT NULL") val t = spark.table(tableName) val expectedSchema = new StructType() - .add("ID", IntegerType, true, defaultMetadata) - .add("deptno", IntegerType, true, defaultMetadata) + .add("ID", IntegerType, true, defaultMetadata(IntegerType)) + .add("deptno", IntegerType, true, defaultMetadata(IntegerType)) assert(t.schema === expectedSchema) // Update nullability of not existing column val sqlText = s"ALTER TABLE $tableName ALTER COLUMN bad_column DROP NOT NULL" @@ -491,8 +494,8 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { sql(s"CREATE TABLE $tableName (c1 INTEGER NOT NULL, c2 INTEGER)") var t = spark.table(tableName) var expectedSchema = new StructType() - .add("c1", IntegerType, true, defaultMetadata) - .add("c2", IntegerType, true, defaultMetadata) + .add("c1", IntegerType, true, defaultMetadata(IntegerType)) + .add("c2", IntegerType, true, defaultMetadata(IntegerType)) assert(t.schema === expectedSchema) withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { @@ -516,8 +519,8 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { sql(s"ALTER TABLE $tableName RENAME COLUMN C2 TO c3") expectedSchema = new StructType() - .add("c1", IntegerType, true, defaultMetadata) - .add("c3", IntegerType, true, defaultMetadata) + .add("c1", IntegerType, true, defaultMetadata(IntegerType)) + .add("c3", IntegerType, true, defaultMetadata(IntegerType)) t = spark.table(tableName) assert(t.schema === expectedSchema) } @@ -542,7 +545,8 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { sql(s"ALTER TABLE $tableName DROP COLUMN C3") - expectedSchema = new StructType().add("c1", IntegerType, true, defaultMetadata) + expectedSchema = new StructType() + .add("c1", IntegerType, true, defaultMetadata(IntegerType)) t = spark.table(tableName) assert(t.schema === expectedSchema) } @@ -566,7 +570,8 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { sql(s"ALTER TABLE $tableName ALTER COLUMN C1 TYPE DOUBLE") - expectedSchema = new StructType().add("c1", DoubleType, true, defaultMetadata) + expectedSchema = new StructType() + .add("c1", DoubleType, true, defaultMetadata(DoubleType)) t = spark.table(tableName) assert(t.schema === expectedSchema) } @@ -590,7 +595,8 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { sql(s"ALTER TABLE $tableName ALTER COLUMN C1 DROP NOT NULL") - expectedSchema = new StructType().add("c1", DoubleType, true, defaultMetadata) + expectedSchema = new StructType() + .add("c1", DoubleType, true, defaultMetadata(IntegerType)) t = spark.table(tableName) assert(t.schema === expectedSchema) } @@ -660,8 +666,8 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { sql(s"ALTER TABLE $tableName ALTER COLUMN deptno TYPE VARCHAR(30)") val t = spark.table(tableName) val expected = new StructType() - .add("ID", CharType(10), true, defaultMetadata) - .add("deptno", VarcharType(30), true, defaultMetadata) + .add("ID", CharType(10), true, defaultMetadata(CharType(10))) + .add("deptno", VarcharType(30), true, defaultMetadata(VarcharType(30))) val replaced = CharVarcharUtils.replaceCharVarcharWithStringInSchema(expected) assert(t.schema === replaced) } @@ -674,13 +680,13 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { .executeUpdate()) withSQLConf(SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING.key -> "true") { val expected = new StructType() - .add("ID", StringType, true, defaultMetadata) - .add("DEPTNO", StringType, true, defaultMetadata) + .add("ID", StringType, true, defaultMetadata(StringType)) + .add("DEPTNO", StringType, true, defaultMetadata(StringType)) assert(sql(s"SELECT * FROM h2.test.char_tbl").schema === expected) } val expected = new StructType() - .add("ID", CharType(5), true, defaultMetadata) - .add("DEPTNO", VarcharType(10), true, defaultMetadata) + .add("ID", CharType(5), true, defaultMetadata(CharType(5))) + .add("DEPTNO", VarcharType(10), true, defaultMetadata(VarcharType(10))) val replaced = CharVarcharUtils.replaceCharVarcharWithStringInSchema(expected) assert(sql(s"SELECT * FROM h2.test.char_tbl").schema === replaced) } finally { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 9e8df6d733e07..a2dac5a9e1e94 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -78,9 +78,10 @@ class JDBCSuite extends QueryTest with SharedSparkSession { } } - val defaultMetadata = new MetadataBuilder() + def defaultMetadata(dataType: DataType): Metadata = new MetadataBuilder() .putLong("scale", 0) .putBoolean("isTimestampNTZ", false) + .putBoolean("isSigned", dataType.isInstanceOf[NumericType]) .build() override def beforeAll(): Unit = { @@ -928,7 +929,7 @@ class JDBCSuite extends QueryTest with SharedSparkSession { test("MySQLDialect catalyst type mapping") { val mySqlDialect = JdbcDialects.get("jdbc:mysql") - val metadata = new MetadataBuilder() + val metadata = new MetadataBuilder().putBoolean("isSigned", value = true) assert(mySqlDialect.getCatalystType(java.sql.Types.VARBINARY, "BIT", 2, metadata) == Some(LongType)) assert(metadata.build().contains("binarylong")) @@ -937,6 +938,9 @@ class JDBCSuite extends QueryTest with SharedSparkSession { Some(BooleanType)) assert(mySqlDialect.getCatalystType(java.sql.Types.TINYINT, "TINYINT", 1, metadata) == Some(ByteType)) + metadata.putBoolean("isSigned", value = false) + assert(mySqlDialect.getCatalystType(java.sql.Types.TINYINT, "TINYINT", 1, metadata) === + Some(ShortType)) } test("SPARK-35446: MySQLDialect type mapping of float") { @@ -1386,8 +1390,8 @@ class JDBCSuite extends QueryTest with SharedSparkSession { } test("SPARK-16848: jdbc API throws an exception for user specified schema") { - val schema = StructType(Seq(StructField("name", StringType, false, defaultMetadata), - StructField("theid", IntegerType, false, defaultMetadata))) + val schema = StructType(Seq(StructField("name", StringType, false, defaultMetadata(StringType)), + StructField("theid", IntegerType, false, defaultMetadata(IntegerType)))) val parts = Array[String]("THEID < 2", "THEID >= 2") val e1 = intercept[AnalysisException] { spark.read.schema(schema).jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, new Properties()) @@ -1407,8 +1411,9 @@ class JDBCSuite extends QueryTest with SharedSparkSession { props.put("customSchema", customSchema) val df = spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, props) assert(df.schema.size === 2) - val expectedSchema = new StructType(CatalystSqlParser.parseTableSchema(customSchema).map( - f => StructField(f.name, f.dataType, f.nullable, defaultMetadata)).toArray) + val structType = CatalystSqlParser.parseTableSchema(customSchema) + val expectedSchema = new StructType(structType.map( + f => StructField(f.name, f.dataType, f.nullable, defaultMetadata(f.dataType))).toArray) assert(df.schema === CharVarcharUtils.replaceCharVarcharWithStringInSchema(expectedSchema)) assert(df.count() === 3) } @@ -1426,7 +1431,7 @@ class JDBCSuite extends QueryTest with SharedSparkSession { val df = sql("select * from people_view") assert(df.schema.length === 2) val expectedSchema = new StructType(CatalystSqlParser.parseTableSchema(customSchema) - .map(f => StructField(f.name, f.dataType, f.nullable, defaultMetadata)).toArray) + .map(f => StructField(f.name, f.dataType, f.nullable, defaultMetadata(f.dataType))).toArray) assert(df.schema === CharVarcharUtils.replaceCharVarcharWithStringInSchema(expectedSchema)) assert(df.count() === 3) @@ -1577,8 +1582,9 @@ class JDBCSuite extends QueryTest with SharedSparkSession { } test("jdbc data source shouldn't have unnecessary metadata in its schema") { - var schema = StructType(Seq(StructField("NAME", VarcharType(32), true, defaultMetadata), - StructField("THEID", IntegerType, true, defaultMetadata))) + var schema = StructType( + Seq(StructField("NAME", VarcharType(32), true, defaultMetadata(VarcharType(32))), + StructField("THEID", IntegerType, true, defaultMetadata(IntegerType)))) schema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(schema) val df = spark.read.format("jdbc") .option("Url", urlWithUserAndPass)