Skip to content

Commit

Permalink
[OPPRO-266] Use the smaller table to build hashmap in shuffled hash j…
Browse files Browse the repository at this point in the history
…oin (oap-project#408)

* support both build left and right

* for test

* prefer agg and filter as build side

* change back build side for vanilla spark

* fix ut

* fix gluten-it, for test

* annotations

* Revert "fix gluten-it, for test"

This reverts commit 15d648941ef88fbacf6784a9e94f815e1a66e7ee.

* Revert "for test"

This reverts commit f01290511041db1c697ccdab506e45ba42fc1b65.

Co-authored-by: rui-mo <rui-mo@intel.com>
  • Loading branch information
rui-mo and rui-mo authored Oct 20, 2022
1 parent b7aead2 commit 654dcd8
Show file tree
Hide file tree
Showing 16 changed files with 1,838 additions and 103 deletions.
15 changes: 10 additions & 5 deletions backends-velox/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</profile>
<profile>
Expand All @@ -63,16 +68,16 @@
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</profile>
</profiles>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ import io.substrait.proto.JoinRel
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.{FilterExec, SparkPlan}
import org.apache.spark.sql.types.{BooleanType, DataType}
import java.util

import org.apache.spark.sql.execution.aggregate.BaseAggregateExec

import scala.collection.JavaConverters._

trait VeloxHashJoinLikeExecTransformer extends HashJoinLikeExecTransformer {
Expand All @@ -46,7 +48,7 @@ trait VeloxHashJoinLikeExecTransformer extends HashJoinLikeExecTransformer {
// join type is reverted.
JoinRel.JoinType.JOIN_TYPE_LEFT
case LeftSemi =>
JoinRel.JoinType.JOIN_TYPE_SEMI
JoinRel.JoinType.JOIN_TYPE_LEFT_SEMI
case LeftAnti =>
if (!antiJoinWorkaroundNeeded) {
JoinRel.JoinType.JOIN_TYPE_ANTI
Expand Down Expand Up @@ -184,20 +186,19 @@ trait VeloxHashJoinLikeExecTransformer extends HashJoinLikeExecTransformer {
}

// Result projection will drop the appended keys, and exchange columns order if BuildLeft.
val resultProjection = joinBuildSide match {
case BuildLeft =>
val (leftOutput, rightOutput) =
getResultProjectionOutput(inputBuildOutput, inputStreamedOutput)
// Exchange the order of build and streamed.
leftOutput.indices.map(idx =>
ExpressionBuilder.makeSelection(idx + streamedOutput.size)) ++
rightOutput.indices
.map(ExpressionBuilder.makeSelection(_))
case BuildRight =>
val (leftOutput, rightOutput) =
getResultProjectionOutput(inputStreamedOutput, inputBuildOutput)
leftOutput.indices.map(ExpressionBuilder.makeSelection(_)) ++
rightOutput.indices.map(idx => ExpressionBuilder.makeSelection(idx + streamedOutput.size))
val resultProjection = if (exchangeTable) {
val (leftOutput, rightOutput) =
getResultProjectionOutput(inputBuildOutput, inputStreamedOutput)
// Exchange the order of build and streamed.
leftOutput.indices.map(idx =>
ExpressionBuilder.makeSelection(idx + streamedOutput.size)) ++
rightOutput.indices
.map(ExpressionBuilder.makeSelection(_))
} else {
val (leftOutput, rightOutput) =
getResultProjectionOutput(inputStreamedOutput, inputBuildOutput)
leftOutput.indices.map(ExpressionBuilder.makeSelection(_)) ++
rightOutput.indices.map(idx => ExpressionBuilder.makeSelection(idx + streamedOutput.size))
}

RelBuilder.makeProjectRel(
Expand Down Expand Up @@ -225,6 +226,127 @@ case class VeloxShuffledHashJoinExecTransformer(leftKeys: Seq[Expression],
left,
right) with VeloxHashJoinLikeExecTransformer {

// Used to specify the preferred build side in backend's real execution.
object PreferredBuildSide extends Serializable {
val LEFT = "left table"
val RIGHT = "right table"
val NON = "none"
}

/**
* Returns whether the plan matches the condition to be preferred as build side.
* Currently, filter and aggregation are preferred.
* @param plan the left or right plan of join
* @return whether the plan matches the condition
*/
private def matchCondition(plan: SparkPlan): Boolean =
plan.isInstanceOf[FilterExecBaseTransformer] || plan.isInstanceOf[FilterExec] ||
plan.isInstanceOf[BaseAggregateExec]

/**
* Returns whether a plan is preferred as the build side.
* If this plan or its children match the condition, it will be preferred.
* @param plan the left or right plan of join
* @return whether the plan is preferred as the build side
*/
private def isPreferred(plan: SparkPlan): Boolean =
matchCondition(plan) || plan.children.exists(child => matchCondition(child))

// Returns the preferred build side with the consideration of preferring condition.
private lazy val preferredBuildSide: String =
if ((isPreferred(left) && isPreferred(right)) || (!isPreferred(left) && !isPreferred(right))) {
PreferredBuildSide.NON
} else if (isPreferred(left)) {
PreferredBuildSide.LEFT
} else {
PreferredBuildSide.RIGHT
}

/**
* Returns whether the build and stream table should be exchanged with consideration of
* build type, planned build side and the preferred build side.
*/
override lazy val exchangeTable: Boolean = hashJoinType match {
case LeftOuter | LeftSemi => joinBuildSide match {
case BuildLeft =>
// Exchange build and stream side when left side or none is preferred as the build side,
// and RightOuter or RightSemi wil be used.
!(preferredBuildSide == PreferredBuildSide.RIGHT)
case _ =>
// Do not exchange build and stream side when right side or none is preferred
// as the build side, and LeftOuter or LeftSemi wil be used.
preferredBuildSide == PreferredBuildSide.LEFT
}
case RightOuter => joinBuildSide match {
case BuildRight =>
// Do not exchange build and stream side when right side or none is preferred
// as the build side, and RightOuter will be used.
preferredBuildSide == PreferredBuildSide.LEFT
case _ =>
// Exchange build and stream side when left side or none is preferred as the build side,
// and LeftOuter will be used.
!(preferredBuildSide == PreferredBuildSide.RIGHT)
}
case _ => joinBuildSide match {
case BuildLeft => true
case BuildRight => false
}
}

// Direct output order of Substrait join operation.
override protected val substraitJoinType: JoinRel.JoinType = joinType match {
case Inner =>
JoinRel.JoinType.JOIN_TYPE_INNER
case FullOuter =>
JoinRel.JoinType.JOIN_TYPE_OUTER
case LeftOuter => joinBuildSide match {
case BuildLeft => if (preferredBuildSide == PreferredBuildSide.RIGHT) {
JoinRel.JoinType.JOIN_TYPE_LEFT
} else {
JoinRel.JoinType.JOIN_TYPE_RIGHT
}
case _ => if (preferredBuildSide == PreferredBuildSide.LEFT) {
JoinRel.JoinType.JOIN_TYPE_RIGHT
} else {
JoinRel.JoinType.JOIN_TYPE_LEFT
}
}
case RightOuter => joinBuildSide match {
case BuildRight => if (preferredBuildSide == PreferredBuildSide.LEFT) {
JoinRel.JoinType.JOIN_TYPE_LEFT
} else {
JoinRel.JoinType.JOIN_TYPE_RIGHT
}
case _ => if (preferredBuildSide == PreferredBuildSide.RIGHT) {
JoinRel.JoinType.JOIN_TYPE_RIGHT
} else {
JoinRel.JoinType.JOIN_TYPE_LEFT
}
}
case LeftSemi => joinBuildSide match {
case BuildLeft => if (preferredBuildSide == PreferredBuildSide.RIGHT) {
JoinRel.JoinType.JOIN_TYPE_LEFT_SEMI
} else {
JoinRel.JoinType.JOIN_TYPE_RIGHT_SEMI
}
case _ => if (preferredBuildSide == PreferredBuildSide.LEFT) {
JoinRel.JoinType.JOIN_TYPE_RIGHT_SEMI
} else {
JoinRel.JoinType.JOIN_TYPE_LEFT_SEMI
}
}
case LeftAnti => if (!antiJoinWorkaroundNeeded) {
JoinRel.JoinType.JOIN_TYPE_ANTI
} else {
// Use Left to replace Anti as a workaround.
JoinRel.JoinType.JOIN_TYPE_LEFT
}
case _ =>
// TODO: Support cross join with Cross Rel
// TODO: Support existence join
JoinRel.JoinType.UNRECOGNIZED
}

/**
* Returns whether a workaround for Anti join is needed. True for 'not exists' semantics.
* For SHJ, always returns true for Anti join.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,12 @@ message JoinRel {
JOIN_TYPE_OUTER = 2;
JOIN_TYPE_LEFT = 3;
JOIN_TYPE_RIGHT = 4;
JOIN_TYPE_SEMI = 5;
JOIN_TYPE_ANTI = 6;
JOIN_TYPE_LEFT_SEMI = 5;
JOIN_TYPE_RIGHT_SEMI = 6;
JOIN_TYPE_ANTI = 7;
// This join is useful for nested sub-queries where we need exactly one tuple in output (or throw exception)
// See Section 3.2 of https://15721.courses.cs.cmu.edu/spring2018/papers/16-optimizer2/hyperjoins-btw2017.pdf
JOIN_TYPE_SINGLE = 7;
JOIN_TYPE_SINGLE = 8;
}

substrait.extensions.AdvancedExtension advanced_extension = 10;
Expand Down
Loading

0 comments on commit 654dcd8

Please sign in to comment.