Skip to content

Commit

Permalink
[SPARK-49063][SQL] Fix Between with ScalarSubqueries
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Fix for between with ScalarSubqueries.

### Why are the changes needed?
There is a regression introduced from a previous PR apache#44299. This needs to be addressed as between operator was completely broken with resolved ScalarSubqueries.

### Does this PR introduce _any_ user-facing change?
No, the bug is not release yet.

### How was this patch tested?
Tests added to golden file.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#47581 from mihailom-db/fixbetween.

Authored-by: Mihailo Milosevic <mihailo.milosevic@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
mihailom-db authored and cloud-fan committed Aug 5, 2024
1 parent f99291a commit 6bf6088
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,10 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] with AliasHelpe
if (Utils.isTesting) {
assert(mayHaveCountBug.isDefined)
}
if (resultWithZeroTups.isEmpty) {
if (!SQLConf.get.legacyDuplicateBetweenInput && currentChild.output.contains(origOutput)) {
// If we had multiple of the same scalar subqueries they will resolve to the same aliases.
currentChild
} else if (resultWithZeroTups.isEmpty) {
// CASE 1: Subquery guaranteed not to have the COUNT bug because it evaluates to NULL
// with zero tuples.
planWithoutCountBug
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2187,8 +2187,14 @@ class AstBuilder extends DataTypeAstBuilder
// Create the predicate.
ctx.kind.getType match {
case SqlBaseParser.BETWEEN =>
invertIfNotDefined(UnresolvedFunction(
"between", Seq(e, expression(ctx.lower), expression(ctx.upper)), isDistinct = false))
if (!SQLConf.get.legacyDuplicateBetweenInput) {
invertIfNotDefined(UnresolvedFunction(
"between", Seq(e, expression(ctx.lower), expression(ctx.upper)), isDistinct = false))
} else {
invertIfNotDefined(And(
GreaterThanOrEqual(e, expression(ctx.lower)),
LessThanOrEqual(e, expression(ctx.upper))))
}
case SqlBaseParser.IN if ctx.query != null =>
invertIfNotDefined(InSubquery(getValueExpressions(e), ListQuery(plan(ctx.query))))
case SqlBaseParser.IN =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4614,6 +4614,15 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val LEGACY_DUPLICATE_BETWEEN_INPUT =
buildConf("spark.sql.legacy.duplicateBetweenInput")
.internal()
.doc("When true, we use legacy between implementation. This is a flag that fixes a " +
"problem introduced by a between optimization, see ticket SPARK-49063.")
.version("4.0.0")
.booleanConf
.createWithDefault(false)

val LEGACY_COMPLEX_TYPES_TO_STRING =
buildConf("spark.sql.legacy.castComplexTypesToString.enabled")
.internal()
Expand Down Expand Up @@ -5997,6 +6006,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
def optimizeNullAwareAntiJoin: Boolean =
getConf(SQLConf.OPTIMIZE_NULL_AWARE_ANTI_JOIN)

def legacyDuplicateBetweenInput: Boolean =
getConf(SQLConf.LEGACY_DUPLICATE_BETWEEN_INPUT)

def legacyPathOptionBehavior: Boolean = getConf(SQLConf.LEGACY_PATH_OPTION_BEHAVIOR)

def supportSecondOffsetFormat: Boolean = getConf(SQLConf.SUPPORT_SECOND_OFFSET_FORMAT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1710,3 +1710,41 @@ Project [t0a#x, t0b#x]
+- View (`t0`, [t0a#x, t0b#x])
+- Project [cast(col1#x as int) AS t0a#x, cast(col2#x as int) AS t0b#x]
+- LocalRelation [col1#x, col2#x]


-- !query
select *
from range(1, 3) t1
where (select t2.id c
from range (1, 2) t2 where t1.id = t2.id
) between 1 and 2
-- !query analysis
Project [id#xL]
+- Filter between(scalar-subquery#x [id#xL], 1, 2)
: +- Project [id#xL AS c#xL]
: +- Filter (outer(id#xL) = id#xL)
: +- SubqueryAlias t2
: +- Range (1, 2, step=1)
+- SubqueryAlias t1
+- Range (1, 3, step=1)


-- !query
SELECT *
FROM t1
WHERE (SELECT max(t2c)
FROM t2 WHERE t1b = t2b
) between 1 and 2
-- !query analysis
Project [t1a#x, t1b#x, t1c#x]
+- Filter between(scalar-subquery#x [t1b#x], 1, 2)
: +- Aggregate [max(t2c#x) AS max(t2c)#x]
: +- Filter (outer(t1b#x) = t2b#x)
: +- SubqueryAlias t2
: +- View (`t2`, [t2a#x, t2b#x, t2c#x])
: +- Project [cast(col1#x as int) AS t2a#x, cast(col2#x as int) AS t2b#x, cast(col3#x as int) AS t2c#x]
: +- LocalRelation [col1#x, col2#x, col3#x]
+- SubqueryAlias t1
+- View (`t1`, [t1a#x, t1b#x, t1c#x])
+- Project [cast(col1#x as int) AS t1a#x, cast(col2#x as int) AS t1b#x, cast(col3#x as int) AS t1c#x]
+- LocalRelation [col1#x, col2#x, col3#x]
Original file line number Diff line number Diff line change
Expand Up @@ -518,3 +518,14 @@ SELECT * FROM t0 WHERE t0a <
FROM t1 LEFT JOIN t2 ON (t1a = t0a AND t2b = t0b))
);

select *
from range(1, 3) t1
where (select t2.id c
from range (1, 2) t2 where t1.id = t2.id
) between 1 and 2;

SELECT *
FROM t1
WHERE (SELECT max(t2c)
FROM t2 WHERE t1b = t2b
) between 1 and 2;
Original file line number Diff line number Diff line change
Expand Up @@ -882,3 +882,27 @@ struct<t0a:int,t0b:int>
-- !query output
1 1
2 0


-- !query
select *
from range(1, 3) t1
where (select t2.id c
from range (1, 2) t2 where t1.id = t2.id
) between 1 and 2
-- !query schema
struct<id:bigint>
-- !query output
1


-- !query
SELECT *
FROM t1
WHERE (SELECT max(t2c)
FROM t2 WHERE t1b = t2b
) between 1 and 2
-- !query schema
struct<t1a:int,t1b:int,t1c:int>
-- !query output

0 comments on commit 6bf6088

Please sign in to comment.