Skip to content

Commit

Permalink
fix concurrent
Browse files Browse the repository at this point in the history
  • Loading branch information
sumeet-db committed Aug 20, 2024
1 parent 40d94b6 commit a3a03d6
Showing 1 changed file with 33 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, TableSpec, UnaryNode}
import org.apache.spark.sql.catalyst.util.{removeInternalMetadata, CharVarcharUtils, WriteDeltaProjections}
Expand All @@ -34,6 +35,7 @@ import org.apache.spark.sql.connector.metric.CustomMetric
import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, DeltaWrite, DeltaWriter, PhysicalWriteInfoImpl, Write, WriterCommitMessage}
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.handleConcurrentCreateExceptions
import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLMetric, SQLMetrics}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.{LongAccumulator, Utils}
Expand All @@ -55,6 +57,19 @@ case class WriteToDataSourceV2(
copy(query = newChild)
}

object WriteToDataSourceV2Exec {
// Returns None if it detects table created by a concurrent command and ifNotExists is true.
def handleConcurrentCreateExceptions[T](ifNotExists: Boolean)(block: => T): Option[T] = {
try {
Some(block)
} catch {
// Committing to the table failed because of a concurrent table creation to catalog,
// but we ignore the exception when IF NOT EXISTS is true.
case _: TableAlreadyExistsException if ifNotExists => None
}
}
}

/**
* Physical plan node for v2 create table as select when the catalog does not support staging
* the table creation.
Expand Down Expand Up @@ -82,10 +97,13 @@ case class CreateTableAsSelectExec(
}
throw QueryCompilationErrors.tableAlreadyExistsError(ident)
}
val table = Option(catalog.createTable(
ident, getV2Columns(query.schema, catalog.useNullableQuerySchema),
partitioning.toArray, properties.asJava)).getOrElse(catalog.loadTable(ident))
writeToTable(catalog, table, writeOptions, ident, query)
val table = handleConcurrentCreateExceptions[Table](ifNotExists) {
Option(catalog.createTable(
ident, getV2Columns(query.schema, catalog.useNullableQuerySchema),
partitioning.toArray, properties.asJava)
).getOrElse(catalog.loadTable(ident))
}.getOrElse { return Nil }
writeToTable(catalog, table, writeOptions, ident, query, ifNotExists)
}
}

Expand Down Expand Up @@ -116,10 +134,12 @@ case class AtomicCreateTableAsSelectExec(
}
throw QueryCompilationErrors.tableAlreadyExistsError(ident)
}
val stagedTable = catalog.stageCreate(
ident, getV2Columns(query.schema, catalog.useNullableQuerySchema),
partitioning.toArray, properties.asJava)
writeToTable(catalog, stagedTable, writeOptions, ident, query)
val stagedTable = handleConcurrentCreateExceptions[StagedTable](ifNotExists) {
catalog.stageCreate(
ident, getV2Columns(query.schema, catalog.useNullableQuerySchema),
partitioning.toArray, properties.asJava)
}.getOrElse { return Nil }
writeToTable(catalog, stagedTable, writeOptions, ident, query, ifNotExists)
}
}

Expand Down Expand Up @@ -605,15 +625,18 @@ private[v2] trait V2CreateTableAsSelectBaseExec extends LeafV2CommandExec {
table: Table,
writeOptions: Map[String, String],
ident: Identifier,
query: LogicalPlan): Seq[InternalRow] = {
query: LogicalPlan,
ifNotExists: Boolean = false): Seq[InternalRow] = {
Utils.tryWithSafeFinallyAndFailureCallbacks({
val relation = DataSourceV2Relation.create(table, Some(catalog), Some(ident))
val append = AppendData.byPosition(relation, query, writeOptions)
val qe = session.sessionState.executePlan(append)
qe.assertCommandExecuted()

table match {
case st: StagedTable => st.commitStagedChanges()
case st: StagedTable => handleConcurrentCreateExceptions(ifNotExists) {
st.commitStagedChanges()
}
case _ =>
}

Expand Down

0 comments on commit a3a03d6

Please sign in to comment.