From ff2009dca85998119ad8fd08aa7bcc0e6e29829d Mon Sep 17 00:00:00 2001 From: John Yu Date: Wed, 16 Sep 2015 12:33:30 -0700 Subject: [PATCH] changing communication of azkaban properties WORKFLOW_LINK, JOB_LINK, etc through the hadoop-inject conf file instead of command line --- .../src/azkaban/jobtype/HadoopJobUtils.java | 21 +++++++++++++++++++ .../jobtype/HadoopSecureSparkWrapper.java | 20 +++++++++++++++--- .../src/azkaban/jobtype/HadoopSparkJob.java | 14 ++++++------- 3 files changed, 45 insertions(+), 10 deletions(-) diff --git a/plugins/jobtype/src/azkaban/jobtype/HadoopJobUtils.java b/plugins/jobtype/src/azkaban/jobtype/HadoopJobUtils.java index a597339c..4be24c07 100644 --- a/plugins/jobtype/src/azkaban/jobtype/HadoopJobUtils.java +++ b/plugins/jobtype/src/azkaban/jobtype/HadoopJobUtils.java @@ -29,6 +29,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -441,4 +442,24 @@ public static String javaOptStringFromAzkabanProps(Props props, String key) { } return String.format("-D%s=%s", key, value); } + + /** + *
+   * constructions a javaOpts string based on the Props, and the key given, will return 
+   *  String.format("-D%s=%s", key, value);
+   * 
+ * + * @param conf + * @param key + * @return will return String.format("-D%s=%s", key, value). Throws RuntimeException if props not + * present + */ + public static String javaOptStringFromHadoopConfiguration(Configuration conf, String key) { + String value = conf.get(key); + if (value == null) { + throw new RuntimeException(String.format("Cannot find property [%s], in Hadoop configuration: [%s]", + key, value)); + } + return String.format("-D%s=%s", key, value); + } } diff --git a/plugins/jobtype/src/azkaban/jobtype/HadoopSecureSparkWrapper.java b/plugins/jobtype/src/azkaban/jobtype/HadoopSecureSparkWrapper.java index 313c9b39..fb30214d 100644 --- a/plugins/jobtype/src/azkaban/jobtype/HadoopSecureSparkWrapper.java +++ b/plugins/jobtype/src/azkaban/jobtype/HadoopSecureSparkWrapper.java @@ -27,6 +27,11 @@ import azkaban.security.commons.HadoopSecurityManager; import azkaban.utils.Props; +import static azkaban.flow.CommonJobProperties.ATTEMPT_LINK; +import static azkaban.flow.CommonJobProperties.EXECUTION_LINK; +import static azkaban.flow.CommonJobProperties.JOB_LINK; +import static azkaban.flow.CommonJobProperties.WORKFLOW_LINK; + /** *
  * A Spark wrapper (more specifically a spark-submit wrapper) that works with Azkaban.
@@ -70,14 +75,14 @@ public static void main(final String[] args) throws Exception {
       proxyUser.doAs(new PrivilegedExceptionAction() {
         @Override
         public Void run() throws Exception {
-          runSpark(args);
+          runSpark(args, conf);
           return null;
         }
       });
 
     } else {
       logger.info("Not proxying to execute job. ");
-      runSpark(args);
+      runSpark(args, conf);
     }
   }
 
@@ -86,7 +91,7 @@ public Void run() throws Exception {
    * 
    * @param args
    */
-  private static void runSpark(String[] args) {
+  private static void runSpark(String[] args, Configuration conf) {
 
     if (args.length == 0) {
       throw new RuntimeException("SparkSubmit cannot run with zero args");
@@ -102,6 +107,15 @@ private static void runSpark(String[] args) {
 
     final String[] newArgs = concat.toString().split(SparkJobArg.delimiter);
     logger.info("newArgs: " + Arrays.toString(newArgs));
+    
+    StringBuilder driverJavaOptions = new StringBuilder(newArgs[1]);    
+    String[] requiredJavaOpts = { WORKFLOW_LINK, JOB_LINK, EXECUTION_LINK, ATTEMPT_LINK };
+    for (int i = 0; i < requiredJavaOpts.length; i++) {
+    	  driverJavaOptions.append(" ").append(HadoopJobUtils.javaOptStringFromHadoopConfiguration(conf,
+    	            requiredJavaOpts[i]));
+    }
+    newArgs[1] = driverJavaOptions.toString();
+    logger.info("newArgs2: " + Arrays.toString(newArgs));
 
     org.apache.spark.deploy.SparkSubmit$.MODULE$.main(newArgs);
   }
diff --git a/plugins/jobtype/src/azkaban/jobtype/HadoopSparkJob.java b/plugins/jobtype/src/azkaban/jobtype/HadoopSparkJob.java
index 32b9e5a6..22c0974c 100644
--- a/plugins/jobtype/src/azkaban/jobtype/HadoopSparkJob.java
+++ b/plugins/jobtype/src/azkaban/jobtype/HadoopSparkJob.java
@@ -196,13 +196,13 @@ static String testableGetMainArguments(Props jobProps, String workingDir, Logger
     // special case handling for DRIVER_JAVA_OPTIONS
     argList.add(SparkJobArg.DRIVER_JAVA_OPTIONS.sparkParamName);
     StringBuilder driverJavaOptions = new StringBuilder();
-    String[] requiredJavaOpts = { WORKFLOW_LINK, JOB_LINK, EXECUTION_LINK, ATTEMPT_LINK };
-    driverJavaOptions.append(HadoopJobUtils.javaOptStringFromAzkabanProps(jobProps,
-            requiredJavaOpts[0]));
-    for (int i = 1; i < requiredJavaOpts.length; i++) {
-      driverJavaOptions.append(" "
-              + HadoopJobUtils.javaOptStringFromAzkabanProps(jobProps, requiredJavaOpts[i]));
-    }
+//    String[] requiredJavaOpts = { WORKFLOW_LINK, JOB_LINK, EXECUTION_LINK, ATTEMPT_LINK };
+//    driverJavaOptions.append(HadoopJobUtils.javaOptStringFromAzkabanProps(jobProps,
+//            requiredJavaOpts[0]));
+//    for (int i = 1; i < requiredJavaOpts.length; i++) {
+//      driverJavaOptions.append(" "
+//              + HadoopJobUtils.javaOptStringFromAzkabanProps(jobProps, requiredJavaOpts[i]));
+//    }
     if (jobProps.containsKey(SparkJobArg.DRIVER_JAVA_OPTIONS.azPropName)) {
       driverJavaOptions
               .append(" " + jobProps.getString(SparkJobArg.DRIVER_JAVA_OPTIONS.azPropName));