Skip to content

Commit

Permalink
[SPARK-47435][SQL] Fix overflow issue of MySQL UNSIGNED TINYINT cause…
Browse files Browse the repository at this point in the history
…d by SPARK-45561

### What changes were proposed in this pull request?

SPARK-45561 mapped java.sql.Types.TINYINT to ByteType in MySQL Dialect, which caused unsigned TINYINT overflow. As regardless of signed or unsigned types, the TINYINT is used for java.sql.Types.

In this PR, we put the signed info into the metadata for mapping TINYINT to short or byte.

### Why are the changes needed?

bugfix

### Does this PR introduce _any_ user-facing change?

Uses can read MySQL UNSIGNED TINYINT values after this PR like versions before 3.5.0 which has breaked since 3.5.1

### How was this patch tested?

new tests

### Was this patch authored or co-authored using generative AI tooling?
no

Closes apache#45556 from yaooqinn/SPARK-47435.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
  • Loading branch information
yaooqinn authored and dongjoon-hyun committed Mar 18, 2024
1 parent 1aafe60 commit 8bd42cb
Show file tree
Hide file tree
Showing 11 changed files with 114 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,11 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite {

conn.prepareStatement("CREATE TABLE numbers (onebit BIT(1), tenbits BIT(10), "
+ "small SMALLINT, med MEDIUMINT, nor INT, big BIGINT, deci DECIMAL(40,20), flt FLOAT, "
+ "dbl DOUBLE, tiny TINYINT)").executeUpdate()
+ "dbl DOUBLE, tiny TINYINT, u_tiny TINYINT UNSIGNED)").executeUpdate()

conn.prepareStatement("INSERT INTO numbers VALUES (b'0', b'1000100101', "
+ "17, 77777, 123456789, 123456789012345, 123456789012345.123456789012345, "
+ "42.75, 1.0000000000000002, -128)").executeUpdate()
+ "42.75, 1.0000000000000002, -128, 255)").executeUpdate()

conn.prepareStatement("CREATE TABLE dates (d DATE, t TIME, dt DATETIME, ts TIMESTAMP, "
+ "yr YEAR)").executeUpdate()
Expand Down Expand Up @@ -90,7 +91,7 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite {
val rows = df.collect()
assert(rows.length == 1)
val types = rows(0).toSeq.map(x => x.getClass.toString)
assert(types.length == 10)
assert(types.length == 11)
assert(types(0).equals("class java.lang.Boolean"))
assert(types(1).equals("class java.lang.Long"))
assert(types(2).equals("class java.lang.Integer"))
Expand All @@ -101,6 +102,7 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite {
assert(types(7).equals("class java.lang.Double"))
assert(types(8).equals("class java.lang.Double"))
assert(types(9).equals("class java.lang.Byte"))
assert(types(10).equals("class java.lang.Short"))
assert(rows(0).getBoolean(0) == false)
assert(rows(0).getLong(1) == 0x225)
assert(rows(0).getInt(2) == 17)
Expand All @@ -112,6 +114,7 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite {
assert(rows(0).getDouble(7) == 42.75)
assert(rows(0).getDouble(8) == 1.0000000000000002)
assert(rows(0).getByte(9) == 0x80.toByte)
assert(rows(0).getShort(10) == 0xff.toShort)
}

test("Date types") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,13 @@ class DB2IntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest {
override def testUpdateColumnType(tbl: String): Unit = {
sql(s"CREATE TABLE $tbl (ID INTEGER)")
var t = spark.table(tbl)
var expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata)
var expectedSchema = new StructType()
.add("ID", IntegerType, true, defaultMetadata(IntegerType))
assert(t.schema === expectedSchema)
sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE DOUBLE")
t = spark.table(tbl)
expectedSchema = new StructType().add("ID", DoubleType, true, defaultMetadata)
expectedSchema = new StructType()
.add("ID", DoubleType, true, defaultMetadata(DoubleType))
assert(t.schema === expectedSchema)
// Update column type from DOUBLE to STRING
val sql1 = s"ALTER TABLE $tbl ALTER COLUMN id TYPE VARCHAR(10)"
Expand All @@ -97,7 +99,8 @@ class DB2IntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest {
sql(s"CREATE TABLE $tbl (ID INT)" +
s" TBLPROPERTIES('CCSID'='UNICODE')")
val t = spark.table(tbl)
val expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata)
val expectedSchema = new StructType()
.add("ID", IntegerType, true, defaultMetadata(IntegerType))
assert(t.schema === expectedSchema)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,13 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JD
override def testUpdateColumnType(tbl: String): Unit = {
sql(s"CREATE TABLE $tbl (ID INTEGER)")
var t = spark.table(tbl)
var expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata)
var expectedSchema = new StructType()
.add("ID", IntegerType, true, defaultMetadata(IntegerType))
assert(t.schema === expectedSchema)
sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE STRING")
t = spark.table(tbl)
expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata)
expectedSchema = new StructType()
.add("ID", StringType, true, defaultMetadata())
assert(t.schema === expectedSchema)
// Update column type from STRING to INTEGER
val sql1 = s"ALTER TABLE $tbl ALTER COLUMN id TYPE INTEGER"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest

private var mySQLVersion = -1

override def defaultMetadata(dataType: DataType = StringType): Metadata = new MetadataBuilder()
.putLong("scale", 0)
.putBoolean("isTimestampNTZ", false)
.putBoolean("isSigned", true)
.build()

override def tablePreparation(connection: Connection): Unit = {
mySQLVersion = connection.getMetaData.getDatabaseMajorVersion
connection.prepareStatement(
Expand All @@ -93,11 +99,13 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest
override def testUpdateColumnType(tbl: String): Unit = {
sql(s"CREATE TABLE $tbl (ID INTEGER)")
var t = spark.table(tbl)
var expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata)
var expectedSchema = new StructType()
.add("ID", IntegerType, true, defaultMetadata(IntegerType))
assert(t.schema === expectedSchema)
sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE STRING")
t = spark.table(tbl)
expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata)
expectedSchema = new StructType()
.add("ID", StringType, true, defaultMetadata())
assert(t.schema === expectedSchema)
// Update column type from STRING to INTEGER
val sql1 = s"ALTER TABLE $tbl ALTER COLUMN id TYPE INTEGER"
Expand Down Expand Up @@ -145,7 +153,8 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest
sql(s"CREATE TABLE $tbl (ID INT)" +
s" TBLPROPERTIES('ENGINE'='InnoDB', 'DEFAULT CHARACTER SET'='utf8')")
val t = spark.table(tbl)
val expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata)
val expectedSchema = new StructType()
.add("ID", IntegerType, true, defaultMetadata(IntegerType))
assert(t.schema === expectedSchema)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,10 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTes
override val namespaceOpt: Option[String] = Some("SYSTEM")
override val db = new OracleDatabaseOnDocker

override val defaultMetadata: Metadata = new MetadataBuilder()
override def defaultMetadata(dataType: DataType): Metadata = new MetadataBuilder()
.putLong("scale", 0)
.putBoolean("isTimestampNTZ", false)
.putBoolean("isSigned", dataType.isInstanceOf[NumericType] || dataType.isInstanceOf[StringType])
.putString(CHAR_VARCHAR_TYPE_STRING_METADATA_KEY, "varchar(255)")
.build()

Expand All @@ -101,11 +102,13 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTes
override def testUpdateColumnType(tbl: String): Unit = {
sql(s"CREATE TABLE $tbl (ID INTEGER)")
var t = spark.table(tbl)
var expectedSchema = new StructType().add("ID", DecimalType(10, 0), true, super.defaultMetadata)
var expectedSchema = new StructType()
.add("ID", DecimalType(10, 0), true, super.defaultMetadata(DecimalType(10, 0)))
assert(t.schema === expectedSchema)
sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE LONG")
t = spark.table(tbl)
expectedSchema = new StructType().add("ID", DecimalType(19, 0), true, super.defaultMetadata)
expectedSchema = new StructType()
.add("ID", DecimalType(19, 0), true, super.defaultMetadata(DecimalType(19, 0)))
assert(t.schema === expectedSchema)
// Update column type from LONG to INTEGER
val sql1 = s"ALTER TABLE $tbl ALTER COLUMN id TYPE INTEGER"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,13 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT
override def testUpdateColumnType(tbl: String): Unit = {
sql(s"CREATE TABLE $tbl (ID INTEGER)")
var t = spark.table(tbl)
var expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata)
var expectedSchema = new StructType()
.add("ID", IntegerType, true, defaultMetadata(IntegerType))
assert(t.schema === expectedSchema)
sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE STRING")
t = spark.table(tbl)
expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata)
expectedSchema = new StructType()
.add("ID", StringType, true, defaultMetadata())
assert(t.schema === expectedSchema)
// Update column type from STRING to INTEGER
val sql1 = s"ALTER TABLE $tbl ALTER COLUMN id TYPE INTEGER"
Expand All @@ -91,7 +93,8 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT
sql(s"CREATE TABLE $tbl (ID INT)" +
s" TBLPROPERTIES('TABLESPACE'='pg_default')")
val t = spark.table(tbl)
val expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata)
val expectedSchema = new StructType()
.add("ID", IntegerType, true, defaultMetadata(IntegerType))
assert(t.schema === expectedSchema)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,22 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu

def notSupportsTableComment: Boolean = false

def defaultMetadata: Metadata = new MetadataBuilder()
def defaultMetadata(dataType: DataType = StringType): Metadata = new MetadataBuilder()
.putLong("scale", 0)
.putBoolean("isTimestampNTZ", false)
.putBoolean("isSigned", dataType.isInstanceOf[NumericType])
.build()

def testUpdateColumnNullability(tbl: String): Unit = {
sql(s"CREATE TABLE $catalogName.alt_table (ID STRING NOT NULL)")
var t = spark.table(s"$catalogName.alt_table")
// nullable is true in the expectedSchema because Spark always sets nullable to true
// regardless of the JDBC metadata https://github.com/apache/spark/pull/18445
var expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata)
var expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata())
assert(t.schema === expectedSchema)
sql(s"ALTER TABLE $catalogName.alt_table ALTER COLUMN ID DROP NOT NULL")
t = spark.table(s"$catalogName.alt_table")
expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata)
expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata())
assert(t.schema === expectedSchema)
// Update nullability of not existing column
val msg = intercept[AnalysisException] {
Expand All @@ -75,8 +76,9 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
def testRenameColumn(tbl: String): Unit = {
sql(s"ALTER TABLE $tbl RENAME COLUMN ID TO RENAMED")
val t = spark.table(s"$tbl")
val expectedSchema = new StructType().add("RENAMED", StringType, true, defaultMetadata)
.add("ID1", StringType, true, defaultMetadata).add("ID2", StringType, true, defaultMetadata)
val expectedSchema = new StructType().add("RENAMED", StringType, true, defaultMetadata())
.add("ID1", StringType, true, defaultMetadata())
.add("ID2", StringType, true, defaultMetadata())
assert(t.schema === expectedSchema)
}

Expand All @@ -86,16 +88,19 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
withTable(s"$catalogName.alt_table") {
sql(s"CREATE TABLE $catalogName.alt_table (ID STRING)")
var t = spark.table(s"$catalogName.alt_table")
var expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata)
var expectedSchema = new StructType()
.add("ID", StringType, true, defaultMetadata())
assert(t.schema === expectedSchema)
sql(s"ALTER TABLE $catalogName.alt_table ADD COLUMNS (C1 STRING, C2 STRING)")
t = spark.table(s"$catalogName.alt_table")
expectedSchema = expectedSchema.add("C1", StringType, true, defaultMetadata)
.add("C2", StringType, true, defaultMetadata)
expectedSchema = expectedSchema
.add("C1", StringType, true, defaultMetadata())
.add("C2", StringType, true, defaultMetadata())
assert(t.schema === expectedSchema)
sql(s"ALTER TABLE $catalogName.alt_table ADD COLUMNS (C3 STRING)")
t = spark.table(s"$catalogName.alt_table")
expectedSchema = expectedSchema.add("C3", StringType, true, defaultMetadata)
expectedSchema = expectedSchema
.add("C3", StringType, true, defaultMetadata())
assert(t.schema === expectedSchema)
// Add already existing column
checkError(
Expand Down Expand Up @@ -128,7 +133,8 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
sql(s"ALTER TABLE $catalogName.alt_table DROP COLUMN C1")
sql(s"ALTER TABLE $catalogName.alt_table DROP COLUMN c3")
val t = spark.table(s"$catalogName.alt_table")
val expectedSchema = new StructType().add("C2", StringType, true, defaultMetadata)
val expectedSchema = new StructType()
.add("C2", StringType, true, defaultMetadata())
assert(t.schema === expectedSchema)
// Drop not existing column
val msg = intercept[AnalysisException] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ object JdbcUtils extends Logging with SQLConfHelper {
val fields = new Array[StructField](ncols)
var i = 0
while (i < ncols) {
val metadata = new MetadataBuilder()
val columnName = rsmd.getColumnLabel(i + 1)
val dataType = rsmd.getColumnType(i + 1)
val typeName = rsmd.getColumnTypeName(i + 1)
Expand All @@ -294,8 +295,6 @@ object JdbcUtils extends Logging with SQLConfHelper {
} else {
rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls
}
val metadata = new MetadataBuilder()
metadata.putLong("scale", fieldScale)

dataType match {
case java.sql.Types.TIME =>
Expand All @@ -307,7 +306,9 @@ object JdbcUtils extends Logging with SQLConfHelper {
metadata.putBoolean("rowid", true)
case _ =>
}
metadata.putBoolean("isSigned", isSigned)
metadata.putBoolean("isTimestampNTZ", isTimestampNTZ)
metadata.putLong("scale", fieldScale)
val columnType =
dialect.getCatalystType(dataType, typeName, fieldSize, metadata).getOrElse(
getCatalystType(dataType, typeName, fieldSize, fieldScale, isSigned, isTimestampNTZ))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql.connector.catalog.index.TableIndex
import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, NamedReference, NullOrdering, SortDirection}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, FloatType, LongType, MetadataBuilder, StringType, TimestampType}
import org.apache.spark.sql.types._

private case object MySQLDialect extends JdbcDialect with SQLConfHelper {

Expand Down Expand Up @@ -107,8 +107,12 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper {
// Some MySQL JDBC drivers converts JSON type into Types.VARCHAR with a precision of -1.
// Explicitly converts it into StringType here.
Some(StringType)
case Types.TINYINT if "TINYINT".equalsIgnoreCase(typeName) =>
Some(ByteType)
case Types.TINYINT =>
if (md.build().getBoolean("isSigned")) {
Some(ByteType)
} else {
Some(ShortType)
}
case Types.TIMESTAMP if "DATETIME".equalsIgnoreCase(typeName) =>
// scalastyle:off line.size.limit
// In MYSQL, DATETIME is TIMESTAMP WITHOUT TIME ZONE
Expand Down
Loading

0 comments on commit 8bd42cb

Please sign in to comment.