Skip to content

Commit

Permalink
fix k8s parameter incompatibility error
Browse files Browse the repository at this point in the history
  • Loading branch information
guyu committed Jul 15, 2024
1 parent 8c02156 commit 3b46ce3
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package org.apache.spark.internal.io.cloud

import java.io.IOException

import scala.collection.mutable
import scala.util.Try

import org.apache.hadoop.fs.{Path, StreamCapabilities}
import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
Expand Down Expand Up @@ -77,6 +80,12 @@ class PathOutputCommitProtocol(
/** The destination path. This is serializable in Hadoop 3. */
private[cloud] val destPath: Path = new Path(destination)

private val hasValidPath = Try { new Path(destination) }.isSuccess

private var addedAbsPathFiles: mutable.Map[String, String] = null

private var partitionPaths: mutable.Set[String] = null

logTrace(s"Instantiated committer with job ID=$jobId;" +
s" destination=$destPath;" +
s" stagingDirOverwrite=$stagingDirOverwrite")
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@

<!-- org.fusesource.leveldbjni will be used except on arm64 platform. -->
<leveldbjni.group>org.fusesource.leveldbjni</leveldbjni.group>
<kubernetes-client.version>5.12.2</kubernetes-client.version>
<kubernetes-client.version>6.7.2</kubernetes-client.version>

<test.java.home>${java.home}</test.java.home>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,21 +72,21 @@ private[spark] object Config extends Logging {
.booleanConf
.createWithDefault(true)

val KUBERNETES_DRIVER_SERVICE_IP_FAMILY_POLICY =
ConfigBuilder("spark.kubernetes.driver.service.ipFamilyPolicy")
.doc("K8s IP Family Policy for Driver Service")
.version("3.4.0")
.stringConf
.checkValues(Set("SingleStack", "PreferDualStack", "RequireDualStack"))
.createWithDefault("SingleStack")

val KUBERNETES_DRIVER_SERVICE_IP_FAMILIES =
ConfigBuilder("spark.kubernetes.driver.service.ipFamilies")
.doc("A list of IP families for K8s Driver Service")
.version("3.4.0")
.stringConf
.checkValues(Set("IPv4", "IPv6", "IPv4,IPv6", "IPv6,IPv4"))
.createWithDefault("IPv4")
// val KUBERNETES_DRIVER_SERVICE_IP_FAMILY_POLICY =
// ConfigBuilder("spark.kubernetes.driver.service.ipFamilyPolicy")
// .doc("K8s IP Family Policy for Driver Service")
// .version("3.4.0")
// .stringConf
// .checkValues(Set("SingleStack", "PreferDualStack", "RequireDualStack"))
// .createWithDefault("SingleStack")

// val KUBERNETES_DRIVER_SERVICE_IP_FAMILIES =
// ConfigBuilder("spark.kubernetes.driver.service.ipFamilies")
// .doc("A list of IP families for K8s Driver Service")
// .version("3.4.0")
// .stringConf
// .checkValues(Set("IPv4", "IPv6", "IPv4,IPv6", "IPv6,IPv4"))
// .createWithDefault("IPv4")

val KUBERNETES_DRIVER_OWN_PVC =
ConfigBuilder("spark.kubernetes.driver.ownPersistentVolumeClaim")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.collection.JavaConverters._
import io.fabric8.kubernetes.api.model.{HasMetadata, ServiceBuilder}

import org.apache.spark.deploy.k8s.{KubernetesDriverConf, KubernetesUtils, SparkPod}
import org.apache.spark.deploy.k8s.Config.{KUBERNETES_DNS_LABEL_NAME_MAX_LENGTH, KUBERNETES_DRIVER_SERVICE_IP_FAMILIES, KUBERNETES_DRIVER_SERVICE_IP_FAMILY_POLICY}
import org.apache.spark.deploy.k8s.Config.KUBERNETES_DNS_LABEL_NAME_MAX_LENGTH
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.util.{Clock, SystemClock}
Expand Down Expand Up @@ -50,10 +50,6 @@ private[spark] class DriverServiceFeatureStep(
s"$shorterServiceName as the driver service's name.")
shorterServiceName
}
private val ipFamilyPolicy =
kubernetesConf.sparkConf.get(KUBERNETES_DRIVER_SERVICE_IP_FAMILY_POLICY)
private val ipFamilies =
kubernetesConf.sparkConf.get(KUBERNETES_DRIVER_SERVICE_IP_FAMILIES).split(",").toList.asJava

private val driverPort = kubernetesConf.sparkConf.getInt(
config.DRIVER_PORT.key, DEFAULT_DRIVER_PORT)
Expand All @@ -80,23 +76,24 @@ private[spark] class DriverServiceFeatureStep(
.endMetadata()
.withNewSpec()
.withClusterIP("None")
.withIpFamilyPolicy(ipFamilyPolicy)
.withIpFamilies(ipFamilies)
.withSelector(kubernetesConf.labels.asJava)
.addNewPort()
.withName(DRIVER_PORT_NAME)
.withPort(driverPort)
.withNewTargetPort(driverPort)
.withProtocol("TCP")
.endPort()
.addNewPort()
.withName(BLOCK_MANAGER_PORT_NAME)
.withPort(driverBlockManagerPort)
.withNewTargetPort(driverBlockManagerPort)
.withProtocol("TCP")
.endPort()
.addNewPort()
.withName(UI_PORT_NAME)
.withPort(driverUIPort)
.withNewTargetPort(driverUIPort)
.withProtocol("TCP")
.endPort()
.endSpec()
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,24 +183,30 @@ private[spark] class Client(
registerStopOnSignal(driverPodName)

val sId = Seq(conf.namespace, driverPodName).mkString(":")
breakable {
while (true) {
val podWithName = kubernetesClient
.pods()
.withName(driverPodName)
// Reset resource to old before we start the watch, this is important for race conditions
watcher.reset()
watch = podWithName.watch(watcher)
if (conf.get(WAIT_FOR_APP_COMPLETION)) {
breakable {
while (true) {
val podWithName = kubernetesClient
.pods()
.inNamespace(conf.namespace)
.withName(driverPodName)
// Reset resource to old before we start the watch, this is important for race conditions
watcher.reset()
watch = podWithName.watch(watcher)

// Send the latest pod state we know to the watcher to make sure we didn't miss anything
watcher.eventReceived(Action.MODIFIED, podWithName.get())
// Send the latest pod state we know to the watcher to make sure we didn't miss anything
watcher.eventReceived(Action.MODIFIED, podWithName.get())

// Break the while loop if the pod is completed or we don't want to wait
if (watcher.watchOrStop(sId)) {
watch.close()
break
// Break the while loop if the pod is completed or we don't want to wait
if (watcher.watchOrStop(sId)) {
watch.close()
break
}
}
}
} else {
logInfo(s"Deployed Spark application ${conf.appName} with application ID ${conf.appId} " +
s"and submission ID $sId into Kubernetes")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,50 +183,50 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
assert(driverService.getSpec.getIpFamilies.get(0) == "IPv4")
}

test("Support ipFamilies spec with SingleStack and IPv6") {
val sparkConf = new SparkConf(false)
.set(KUBERNETES_DRIVER_SERVICE_IP_FAMILIES, "IPv6")
val kconf = KubernetesTestConf.createDriverConf(
sparkConf = sparkConf,
labels = DRIVER_LABELS,
serviceLabels = DRIVER_SERVICE_LABELS,
serviceAnnotations = DRIVER_SERVICE_ANNOTATIONS)
val configurationStep = new DriverServiceFeatureStep(kconf)
assert(configurationStep.configurePod(SparkPod.initialPod()) === SparkPod.initialPod())
val driverService = configurationStep
.getAdditionalKubernetesResources()
.head
.asInstanceOf[Service]
assert(driverService.getSpec.getIpFamilyPolicy() == "SingleStack")
assert(driverService.getSpec.getIpFamilies.size() === 1)
assert(driverService.getSpec.getIpFamilies.get(0) == "IPv6")
}
// test("Support ipFamilies spec with SingleStack and IPv6") {
// val sparkConf = new SparkConf(false)
// .set(KUBERNETES_DRIVER_SERVICE_IP_FAMILIES, "IPv6")
// val kconf = KubernetesTestConf.createDriverConf(
// sparkConf = sparkConf,
// labels = DRIVER_LABELS,
// serviceLabels = DRIVER_SERVICE_LABELS,
// serviceAnnotations = DRIVER_SERVICE_ANNOTATIONS)
// val configurationStep = new DriverServiceFeatureStep(kconf)
// assert(configurationStep.configurePod(SparkPod.initialPod()) === SparkPod.initialPod())
// val driverService = configurationStep
// .getAdditionalKubernetesResources()
// .head
// .asInstanceOf[Service]
// assert(driverService.getSpec.getIpFamilyPolicy() == "SingleStack")
// assert(driverService.getSpec.getIpFamilies.size() === 1)
// assert(driverService.getSpec.getIpFamilies.get(0) == "IPv6")
// }

test("Support DualStack") {
Seq("PreferDualStack", "RequireDualStack").foreach { stack =>
val configAndAnswers = Seq(
("IPv4,IPv6", Seq("IPv4", "IPv6")),
("IPv6,IPv4", Seq("IPv6", "IPv4")))
configAndAnswers.foreach { case (config, answer) =>
val sparkConf = new SparkConf(false)
.set(KUBERNETES_DRIVER_SERVICE_IP_FAMILY_POLICY, stack)
.set(KUBERNETES_DRIVER_SERVICE_IP_FAMILIES, config)
val kconf = KubernetesTestConf.createDriverConf(
sparkConf = sparkConf,
labels = DRIVER_LABELS,
serviceLabels = DRIVER_SERVICE_LABELS,
serviceAnnotations = DRIVER_SERVICE_ANNOTATIONS)
val configurationStep = new DriverServiceFeatureStep(kconf)
assert(configurationStep.configurePod(SparkPod.initialPod()) === SparkPod.initialPod())
val driverService = configurationStep
.getAdditionalKubernetesResources()
.head
.asInstanceOf[Service]
assert(driverService.getSpec.getIpFamilyPolicy() == stack)
assert(driverService.getSpec.getIpFamilies === answer.asJava)
}
}
}
// test("Support DualStack") {
// Seq("PreferDualStack", "RequireDualStack").foreach { stack =>
// val configAndAnswers = Seq(
// ("IPv4,IPv6", Seq("IPv4", "IPv6")),
// ("IPv6,IPv4", Seq("IPv6", "IPv4")))
// configAndAnswers.foreach { case (config, answer) =>
// val sparkConf = new SparkConf(false)
// .set(KUBERNETES_DRIVER_SERVICE_IP_FAMILY_POLICY, stack)
// .set(KUBERNETES_DRIVER_SERVICE_IP_FAMILIES, config)
// val kconf = KubernetesTestConf.createDriverConf(
// sparkConf = sparkConf,
// labels = DRIVER_LABELS,
// serviceLabels = DRIVER_SERVICE_LABELS,
// serviceAnnotations = DRIVER_SERVICE_ANNOTATIONS)
// val configurationStep = new DriverServiceFeatureStep(kconf)
// assert(configurationStep.configurePod(SparkPod.initialPod()) === SparkPod.initialPod())
// val driverService = configurationStep
// .getAdditionalKubernetesResources()
// .head
// .asInstanceOf[Service]
// assert(driverService.getSpec.getIpFamilyPolicy() == stack)
// assert(driverService.getSpec.getIpFamilies === answer.asJava)
// }
// }
// }

private def verifyService(
driverPort: Int,
Expand Down

0 comments on commit 3b46ce3

Please sign in to comment.