diff --git a/hadoop-cloud/src/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala b/hadoop-cloud/src/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala index 73be0580c9d8a..5d051c7e33156 100644 --- a/hadoop-cloud/src/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala +++ b/hadoop-cloud/src/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala @@ -19,6 +19,9 @@ package org.apache.spark.internal.io.cloud import java.io.IOException +import scala.collection.mutable +import scala.util.Try + import org.apache.hadoop.fs.{Path, StreamCapabilities} import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} @@ -77,6 +80,12 @@ class PathOutputCommitProtocol( /** The destination path. This is serializable in Hadoop 3. */ private[cloud] val destPath: Path = new Path(destination) + private val hasValidPath = Try { new Path(destination) }.isSuccess + + private var addedAbsPathFiles: mutable.Map[String, String] = null + + private var partitionPaths: mutable.Set[String] = null + logTrace(s"Instantiated committer with job ID=$jobId;" + s" destination=$destPath;" + s" stagingDirOverwrite=$stagingDirOverwrite") diff --git a/pom.xml b/pom.xml index 65b1c1632c6b5..f1a7a1618073a 100644 --- a/pom.xml +++ b/pom.xml @@ -230,7 +230,7 @@ org.fusesource.leveldbjni - 5.12.2 + 6.7.2 ${java.home} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 804f3b2c7fbf7..873b3974edf6f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -72,21 +72,21 @@ private[spark] object Config extends Logging { .booleanConf .createWithDefault(true) - val KUBERNETES_DRIVER_SERVICE_IP_FAMILY_POLICY = - ConfigBuilder("spark.kubernetes.driver.service.ipFamilyPolicy") - .doc("K8s IP Family Policy for Driver Service") - .version("3.4.0") - .stringConf - .checkValues(Set("SingleStack", "PreferDualStack", "RequireDualStack")) - .createWithDefault("SingleStack") - - val KUBERNETES_DRIVER_SERVICE_IP_FAMILIES = - ConfigBuilder("spark.kubernetes.driver.service.ipFamilies") - .doc("A list of IP families for K8s Driver Service") - .version("3.4.0") - .stringConf - .checkValues(Set("IPv4", "IPv6", "IPv4,IPv6", "IPv6,IPv4")) - .createWithDefault("IPv4") +// val KUBERNETES_DRIVER_SERVICE_IP_FAMILY_POLICY = +// ConfigBuilder("spark.kubernetes.driver.service.ipFamilyPolicy") +// .doc("K8s IP Family Policy for Driver Service") +// .version("3.4.0") +// .stringConf +// .checkValues(Set("SingleStack", "PreferDualStack", "RequireDualStack")) +// .createWithDefault("SingleStack") + +// val KUBERNETES_DRIVER_SERVICE_IP_FAMILIES = +// ConfigBuilder("spark.kubernetes.driver.service.ipFamilies") +// .doc("A list of IP families for K8s Driver Service") +// .version("3.4.0") +// .stringConf +// .checkValues(Set("IPv4", "IPv6", "IPv4,IPv6", "IPv6,IPv4")) +// .createWithDefault("IPv4") val KUBERNETES_DRIVER_OWN_PVC = ConfigBuilder("spark.kubernetes.driver.ownPersistentVolumeClaim") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala index 37dfe8ec07a4c..ac1824b0859bc 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala @@ -21,7 +21,7 @@ import scala.collection.JavaConverters._ import io.fabric8.kubernetes.api.model.{HasMetadata, ServiceBuilder} import org.apache.spark.deploy.k8s.{KubernetesDriverConf, KubernetesUtils, SparkPod} -import org.apache.spark.deploy.k8s.Config.{KUBERNETES_DNS_LABEL_NAME_MAX_LENGTH, KUBERNETES_DRIVER_SERVICE_IP_FAMILIES, KUBERNETES_DRIVER_SERVICE_IP_FAMILY_POLICY} +import org.apache.spark.deploy.k8s.Config.KUBERNETES_DNS_LABEL_NAME_MAX_LENGTH import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.{config, Logging} import org.apache.spark.util.{Clock, SystemClock} @@ -50,10 +50,6 @@ private[spark] class DriverServiceFeatureStep( s"$shorterServiceName as the driver service's name.") shorterServiceName } - private val ipFamilyPolicy = - kubernetesConf.sparkConf.get(KUBERNETES_DRIVER_SERVICE_IP_FAMILY_POLICY) - private val ipFamilies = - kubernetesConf.sparkConf.get(KUBERNETES_DRIVER_SERVICE_IP_FAMILIES).split(",").toList.asJava private val driverPort = kubernetesConf.sparkConf.getInt( config.DRIVER_PORT.key, DEFAULT_DRIVER_PORT) @@ -80,23 +76,24 @@ private[spark] class DriverServiceFeatureStep( .endMetadata() .withNewSpec() .withClusterIP("None") - .withIpFamilyPolicy(ipFamilyPolicy) - .withIpFamilies(ipFamilies) .withSelector(kubernetesConf.labels.asJava) .addNewPort() .withName(DRIVER_PORT_NAME) .withPort(driverPort) .withNewTargetPort(driverPort) + .withProtocol("TCP") .endPort() .addNewPort() .withName(BLOCK_MANAGER_PORT_NAME) .withPort(driverBlockManagerPort) .withNewTargetPort(driverBlockManagerPort) + .withProtocol("TCP") .endPort() .addNewPort() .withName(UI_PORT_NAME) .withPort(driverUIPort) .withNewTargetPort(driverUIPort) + .withProtocol("TCP") .endPort() .endSpec() .build() diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index 53adc77d7e76a..1f8885ce16d9a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -183,24 +183,30 @@ private[spark] class Client( registerStopOnSignal(driverPodName) val sId = Seq(conf.namespace, driverPodName).mkString(":") - breakable { - while (true) { - val podWithName = kubernetesClient - .pods() - .withName(driverPodName) - // Reset resource to old before we start the watch, this is important for race conditions - watcher.reset() - watch = podWithName.watch(watcher) + if (conf.get(WAIT_FOR_APP_COMPLETION)) { + breakable { + while (true) { + val podWithName = kubernetesClient + .pods() + .inNamespace(conf.namespace) + .withName(driverPodName) + // Reset resource to old before we start the watch, this is important for race conditions + watcher.reset() + watch = podWithName.watch(watcher) - // Send the latest pod state we know to the watcher to make sure we didn't miss anything - watcher.eventReceived(Action.MODIFIED, podWithName.get()) + // Send the latest pod state we know to the watcher to make sure we didn't miss anything + watcher.eventReceived(Action.MODIFIED, podWithName.get()) - // Break the while loop if the pod is completed or we don't want to wait - if (watcher.watchOrStop(sId)) { - watch.close() - break + // Break the while loop if the pod is completed or we don't want to wait + if (watcher.watchOrStop(sId)) { + watch.close() + break + } } } + } else { + logInfo(s"Deployed Spark application ${conf.appName} with application ID ${conf.appId} " + + s"and submission ID $sId into Kubernetes") } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala index 609c80f27c3da..2b7e1c2b06034 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala @@ -183,50 +183,50 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite { assert(driverService.getSpec.getIpFamilies.get(0) == "IPv4") } - test("Support ipFamilies spec with SingleStack and IPv6") { - val sparkConf = new SparkConf(false) - .set(KUBERNETES_DRIVER_SERVICE_IP_FAMILIES, "IPv6") - val kconf = KubernetesTestConf.createDriverConf( - sparkConf = sparkConf, - labels = DRIVER_LABELS, - serviceLabels = DRIVER_SERVICE_LABELS, - serviceAnnotations = DRIVER_SERVICE_ANNOTATIONS) - val configurationStep = new DriverServiceFeatureStep(kconf) - assert(configurationStep.configurePod(SparkPod.initialPod()) === SparkPod.initialPod()) - val driverService = configurationStep - .getAdditionalKubernetesResources() - .head - .asInstanceOf[Service] - assert(driverService.getSpec.getIpFamilyPolicy() == "SingleStack") - assert(driverService.getSpec.getIpFamilies.size() === 1) - assert(driverService.getSpec.getIpFamilies.get(0) == "IPv6") - } +// test("Support ipFamilies spec with SingleStack and IPv6") { +// val sparkConf = new SparkConf(false) +// .set(KUBERNETES_DRIVER_SERVICE_IP_FAMILIES, "IPv6") +// val kconf = KubernetesTestConf.createDriverConf( +// sparkConf = sparkConf, +// labels = DRIVER_LABELS, +// serviceLabels = DRIVER_SERVICE_LABELS, +// serviceAnnotations = DRIVER_SERVICE_ANNOTATIONS) +// val configurationStep = new DriverServiceFeatureStep(kconf) +// assert(configurationStep.configurePod(SparkPod.initialPod()) === SparkPod.initialPod()) +// val driverService = configurationStep +// .getAdditionalKubernetesResources() +// .head +// .asInstanceOf[Service] +// assert(driverService.getSpec.getIpFamilyPolicy() == "SingleStack") +// assert(driverService.getSpec.getIpFamilies.size() === 1) +// assert(driverService.getSpec.getIpFamilies.get(0) == "IPv6") +// } - test("Support DualStack") { - Seq("PreferDualStack", "RequireDualStack").foreach { stack => - val configAndAnswers = Seq( - ("IPv4,IPv6", Seq("IPv4", "IPv6")), - ("IPv6,IPv4", Seq("IPv6", "IPv4"))) - configAndAnswers.foreach { case (config, answer) => - val sparkConf = new SparkConf(false) - .set(KUBERNETES_DRIVER_SERVICE_IP_FAMILY_POLICY, stack) - .set(KUBERNETES_DRIVER_SERVICE_IP_FAMILIES, config) - val kconf = KubernetesTestConf.createDriverConf( - sparkConf = sparkConf, - labels = DRIVER_LABELS, - serviceLabels = DRIVER_SERVICE_LABELS, - serviceAnnotations = DRIVER_SERVICE_ANNOTATIONS) - val configurationStep = new DriverServiceFeatureStep(kconf) - assert(configurationStep.configurePod(SparkPod.initialPod()) === SparkPod.initialPod()) - val driverService = configurationStep - .getAdditionalKubernetesResources() - .head - .asInstanceOf[Service] - assert(driverService.getSpec.getIpFamilyPolicy() == stack) - assert(driverService.getSpec.getIpFamilies === answer.asJava) - } - } - } +// test("Support DualStack") { +// Seq("PreferDualStack", "RequireDualStack").foreach { stack => +// val configAndAnswers = Seq( +// ("IPv4,IPv6", Seq("IPv4", "IPv6")), +// ("IPv6,IPv4", Seq("IPv6", "IPv4"))) +// configAndAnswers.foreach { case (config, answer) => +// val sparkConf = new SparkConf(false) +// .set(KUBERNETES_DRIVER_SERVICE_IP_FAMILY_POLICY, stack) +// .set(KUBERNETES_DRIVER_SERVICE_IP_FAMILIES, config) +// val kconf = KubernetesTestConf.createDriverConf( +// sparkConf = sparkConf, +// labels = DRIVER_LABELS, +// serviceLabels = DRIVER_SERVICE_LABELS, +// serviceAnnotations = DRIVER_SERVICE_ANNOTATIONS) +// val configurationStep = new DriverServiceFeatureStep(kconf) +// assert(configurationStep.configurePod(SparkPod.initialPod()) === SparkPod.initialPod()) +// val driverService = configurationStep +// .getAdditionalKubernetesResources() +// .head +// .asInstanceOf[Service] +// assert(driverService.getSpec.getIpFamilyPolicy() == stack) +// assert(driverService.getSpec.getIpFamilies === answer.asJava) +// } +// } +// } private def verifyService( driverPort: Int,