From f659f8d1b019385ad95673205386b6cbe8f89a49 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Fri, 8 Mar 2024 13:15:35 +0800 Subject: [PATCH] [SPARK-47319][SQL] Improve missingInput calculation ### What changes were proposed in this pull request? This PR improves `QueryPlan.missingInput()` calculation. ### Why are the changes needed? This seems to be the root cause of `DeduplicateRelations` slowness in some cases. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing UTs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45424 from peter-toth/fix-missinginput. Authored-by: Peter Toth Signed-off-by: Kent Yao --- .../catalyst/expressions/AttributeSet.scala | 20 ++++++++++++------- .../spark/sql/catalyst/plans/QueryPlan.scala | 8 +++++++- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala index 2628afd8923c2..236380b2c030b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala @@ -104,13 +104,19 @@ class AttributeSet private (private val baseSet: mutable.LinkedHashSet[Attribute * in `other`. */ def --(other: Iterable[NamedExpression]): AttributeSet = { - other match { - // SPARK-32755: `--` method behave differently under scala 2.12 and 2.13, - // use a Scala 2.12 based code to maintains the insertion order in Scala 2.13 - case otherSet: AttributeSet => - new AttributeSet(baseSet.clone() --= otherSet.baseSet) - case _ => - new AttributeSet(baseSet.clone() --= other.map(a => new AttributeEquals(a.toAttribute))) + if (isEmpty) { + AttributeSet.empty + } else if (other.isEmpty) { + this + } else { + other match { + // SPARK-32755: `--` method behave differently under scala 2.12 and 2.13, + // use a Scala 2.12 based code to maintains the insertion order in Scala 2.13 + case otherSet: AttributeSet => + new AttributeSet(baseSet.clone() --= otherSet.baseSet) + case _ => + new AttributeSet(baseSet.clone() --= other.map(a => new AttributeEquals(a.toAttribute))) + } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index 2a62ea1feb031..0f049103542ec 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -102,7 +102,13 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] /** * Attributes that are referenced by expressions but not provided by this node's children. */ - final def missingInput: AttributeSet = references -- inputSet + final def missingInput: AttributeSet = { + if (references.isEmpty) { + AttributeSet.empty + } else { + references -- inputSet + } + } /** * Runs [[transformExpressionsDown]] with `rule` on all expressions present