Skip to content

Commit

Permalink
[SPARK-47383][CORE] Support spark.shutdown.timeout config
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Make the shutdown hook timeout configurable. If this is not defined it falls back to the existing behavior, which uses a default timeout of 30 seconds, or whatever is defined in core-site.xml for the hadoop.service.shutdown.timeout property.

### Why are the changes needed?
Spark sometimes times out during the shutdown process. This can result in data left in the queues to be dropped and causes metadata loss (e.g. event logs, anything written by custom listeners).

This is not easily configurable before this change. The underlying `org.apache.hadoop.util.ShutdownHookManager` has a default timeout of 30 seconds.  It can be configured by setting hadoop.service.shutdown.timeout, but this must be done in the core-site.xml/core-default.xml because a new hadoop conf object is created and there is no opportunity to modify it.

### Does this PR introduce _any_ user-facing change?
Yes, a new config `spark.shutdown.timeout` is added.

### How was this patch tested?
Manual testing in spark-shell. This behavior is not practical to write a unit test for.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#45504 from robreeves/sc_shutdown_timeout.

Authored-by: Rob Reeves <roreeves@linkedin.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
  • Loading branch information
robreeves authored and dongjoon-hyun committed Mar 18, 2024
1 parent 8bd42cb commit ce93c9f
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 2 deletions.
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2683,4 +2683,14 @@ package object config {
.version("4.0.0")
.booleanConf
.createWithDefault(false)

private[spark] val SPARK_SHUTDOWN_TIMEOUT_MS =
ConfigBuilder("spark.shutdown.timeout")
.internal()
.doc("Defines the timeout period to wait for all shutdown hooks to be executed. " +
"This must be passed as a system property argument in the Java options, for example " +
"spark.driver.extraJavaOptions=\"-Dspark.shutdown.timeout=60s\".")
.version("4.0.0")
.timeConf(TimeUnit.MILLISECONDS)
.createOptional
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ package org.apache.spark.util

import java.io.File
import java.util.PriorityQueue
import java.util.concurrent.TimeUnit

import scala.util.Try

import org.apache.hadoop.fs.FileSystem

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.SPARK_SHUTDOWN_TIMEOUT_MS


/**
* Various utility methods used by Spark.
Expand Down Expand Up @@ -177,8 +181,19 @@ private [util] class SparkShutdownHookManager {
val hookTask = new Runnable() {
override def run(): Unit = runAll()
}
org.apache.hadoop.util.ShutdownHookManager.get().addShutdownHook(
hookTask, FileSystem.SHUTDOWN_HOOK_PRIORITY + 30)
val priority = FileSystem.SHUTDOWN_HOOK_PRIORITY + 30
// The timeout property must be passed as a Java system property because this
// is initialized before Spark configurations are registered as system
// properties later in initialization.
val timeout = new SparkConf().get(SPARK_SHUTDOWN_TIMEOUT_MS)

timeout.fold {
org.apache.hadoop.util.ShutdownHookManager.get().addShutdownHook(
hookTask, priority)
} { t =>
org.apache.hadoop.util.ShutdownHookManager.get().addShutdownHook(
hookTask, priority, t, TimeUnit.MILLISECONDS)
}
}

def runAll(): Unit = {
Expand Down

0 comments on commit ce93c9f

Please sign in to comment.