diff --git a/plugins/jobtype/src/azkaban/jobtype/HadoopJobUtils.java b/plugins/jobtype/src/azkaban/jobtype/HadoopJobUtils.java index a597339c..4be24c07 100644 --- a/plugins/jobtype/src/azkaban/jobtype/HadoopJobUtils.java +++ b/plugins/jobtype/src/azkaban/jobtype/HadoopJobUtils.java @@ -29,6 +29,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -441,4 +442,24 @@ public static String javaOptStringFromAzkabanProps(Props props, String key) { } return String.format("-D%s=%s", key, value); } + + /** + *
+ * constructions a javaOpts string based on the Props, and the key given, will return + * String.format("-D%s=%s", key, value); + *+ * + * @param conf + * @param key + * @return will return String.format("-D%s=%s", key, value). Throws RuntimeException if props not + * present + */ + public static String javaOptStringFromHadoopConfiguration(Configuration conf, String key) { + String value = conf.get(key); + if (value == null) { + throw new RuntimeException(String.format("Cannot find property [%s], in Hadoop configuration: [%s]", + key, value)); + } + return String.format("-D%s=%s", key, value); + } } diff --git a/plugins/jobtype/src/azkaban/jobtype/HadoopSecureSparkWrapper.java b/plugins/jobtype/src/azkaban/jobtype/HadoopSecureSparkWrapper.java index 313c9b39..fb30214d 100644 --- a/plugins/jobtype/src/azkaban/jobtype/HadoopSecureSparkWrapper.java +++ b/plugins/jobtype/src/azkaban/jobtype/HadoopSecureSparkWrapper.java @@ -27,6 +27,11 @@ import azkaban.security.commons.HadoopSecurityManager; import azkaban.utils.Props; +import static azkaban.flow.CommonJobProperties.ATTEMPT_LINK; +import static azkaban.flow.CommonJobProperties.EXECUTION_LINK; +import static azkaban.flow.CommonJobProperties.JOB_LINK; +import static azkaban.flow.CommonJobProperties.WORKFLOW_LINK; + /** *
* A Spark wrapper (more specifically a spark-submit wrapper) that works with Azkaban. @@ -70,14 +75,14 @@ public static void main(final String[] args) throws Exception { proxyUser.doAs(new PrivilegedExceptionAction() { @Override public Void run() throws Exception { - runSpark(args); + runSpark(args, conf); return null; } }); } else { logger.info("Not proxying to execute job. "); - runSpark(args); + runSpark(args, conf); } } @@ -86,7 +91,7 @@ public Void run() throws Exception { * * @param args */ - private static void runSpark(String[] args) { + private static void runSpark(String[] args, Configuration conf) { if (args.length == 0) { throw new RuntimeException("SparkSubmit cannot run with zero args"); @@ -102,6 +107,15 @@ private static void runSpark(String[] args) { final String[] newArgs = concat.toString().split(SparkJobArg.delimiter); logger.info("newArgs: " + Arrays.toString(newArgs)); + + StringBuilder driverJavaOptions = new StringBuilder(newArgs[1]); + String[] requiredJavaOpts = { WORKFLOW_LINK, JOB_LINK, EXECUTION_LINK, ATTEMPT_LINK }; + for (int i = 0; i < requiredJavaOpts.length; i++) { + driverJavaOptions.append(" ").append(HadoopJobUtils.javaOptStringFromHadoopConfiguration(conf, + requiredJavaOpts[i])); + } + newArgs[1] = driverJavaOptions.toString(); + logger.info("newArgs2: " + Arrays.toString(newArgs)); org.apache.spark.deploy.SparkSubmit$.MODULE$.main(newArgs); } diff --git a/plugins/jobtype/src/azkaban/jobtype/HadoopSparkJob.java b/plugins/jobtype/src/azkaban/jobtype/HadoopSparkJob.java index 32b9e5a6..22c0974c 100644 --- a/plugins/jobtype/src/azkaban/jobtype/HadoopSparkJob.java +++ b/plugins/jobtype/src/azkaban/jobtype/HadoopSparkJob.java @@ -196,13 +196,13 @@ static String testableGetMainArguments(Props jobProps, String workingDir, Logger // special case handling for DRIVER_JAVA_OPTIONS argList.add(SparkJobArg.DRIVER_JAVA_OPTIONS.sparkParamName); StringBuilder driverJavaOptions = new StringBuilder(); - String[] requiredJavaOpts = { WORKFLOW_LINK, JOB_LINK, EXECUTION_LINK, ATTEMPT_LINK }; - driverJavaOptions.append(HadoopJobUtils.javaOptStringFromAzkabanProps(jobProps, - requiredJavaOpts[0])); - for (int i = 1; i < requiredJavaOpts.length; i++) { - driverJavaOptions.append(" " - + HadoopJobUtils.javaOptStringFromAzkabanProps(jobProps, requiredJavaOpts[i])); - } +// String[] requiredJavaOpts = { WORKFLOW_LINK, JOB_LINK, EXECUTION_LINK, ATTEMPT_LINK }; +// driverJavaOptions.append(HadoopJobUtils.javaOptStringFromAzkabanProps(jobProps, +// requiredJavaOpts[0])); +// for (int i = 1; i < requiredJavaOpts.length; i++) { +// driverJavaOptions.append(" " +// + HadoopJobUtils.javaOptStringFromAzkabanProps(jobProps, requiredJavaOpts[i])); +// } if (jobProps.containsKey(SparkJobArg.DRIVER_JAVA_OPTIONS.azPropName)) { driverJavaOptions .append(" " + jobProps.getString(SparkJobArg.DRIVER_JAVA_OPTIONS.azPropName));