Skip to content

Commit

Permalink
Merge pull request #198 from johnyu0520/master
Browse files Browse the repository at this point in the history
changing communication of azkaban properties WORKFLOW_LINK, JOB_LINK,…
  • Loading branch information
hluu committed Sep 21, 2015
2 parents 62a187a + ff2009d commit c961788
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 10 deletions.
21 changes: 21 additions & 0 deletions plugins/jobtype/src/azkaban/jobtype/HadoopJobUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
Expand Down Expand Up @@ -441,4 +442,24 @@ public static String javaOptStringFromAzkabanProps(Props props, String key) {
}
return String.format("-D%s=%s", key, value);
}

/**
* <pre>
* constructions a javaOpts string based on the Props, and the key given, will return
* String.format("-D%s=%s", key, value);
* </pre>
*
* @param conf
* @param key
* @return will return String.format("-D%s=%s", key, value). Throws RuntimeException if props not
* present
*/
public static String javaOptStringFromHadoopConfiguration(Configuration conf, String key) {
String value = conf.get(key);
if (value == null) {
throw new RuntimeException(String.format("Cannot find property [%s], in Hadoop configuration: [%s]",
key, value));
}
return String.format("-D%s=%s", key, value);
}
}
20 changes: 17 additions & 3 deletions plugins/jobtype/src/azkaban/jobtype/HadoopSecureSparkWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@
import azkaban.security.commons.HadoopSecurityManager;
import azkaban.utils.Props;

import static azkaban.flow.CommonJobProperties.ATTEMPT_LINK;
import static azkaban.flow.CommonJobProperties.EXECUTION_LINK;
import static azkaban.flow.CommonJobProperties.JOB_LINK;
import static azkaban.flow.CommonJobProperties.WORKFLOW_LINK;

/**
* <pre>
* A Spark wrapper (more specifically a spark-submit wrapper) that works with Azkaban.
Expand Down Expand Up @@ -70,14 +75,14 @@ public static void main(final String[] args) throws Exception {
proxyUser.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
runSpark(args);
runSpark(args, conf);
return null;
}
});

} else {
logger.info("Not proxying to execute job. ");
runSpark(args);
runSpark(args, conf);
}
}

Expand All @@ -86,7 +91,7 @@ public Void run() throws Exception {
*
* @param args
*/
private static void runSpark(String[] args) {
private static void runSpark(String[] args, Configuration conf) {

if (args.length == 0) {
throw new RuntimeException("SparkSubmit cannot run with zero args");
Expand All @@ -102,6 +107,15 @@ private static void runSpark(String[] args) {

final String[] newArgs = concat.toString().split(SparkJobArg.delimiter);
logger.info("newArgs: " + Arrays.toString(newArgs));

StringBuilder driverJavaOptions = new StringBuilder(newArgs[1]);
String[] requiredJavaOpts = { WORKFLOW_LINK, JOB_LINK, EXECUTION_LINK, ATTEMPT_LINK };
for (int i = 0; i < requiredJavaOpts.length; i++) {
driverJavaOptions.append(" ").append(HadoopJobUtils.javaOptStringFromHadoopConfiguration(conf,
requiredJavaOpts[i]));
}
newArgs[1] = driverJavaOptions.toString();
logger.info("newArgs2: " + Arrays.toString(newArgs));

org.apache.spark.deploy.SparkSubmit$.MODULE$.main(newArgs);
}
Expand Down
14 changes: 7 additions & 7 deletions plugins/jobtype/src/azkaban/jobtype/HadoopSparkJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,13 @@ static String testableGetMainArguments(Props jobProps, String workingDir, Logger
// special case handling for DRIVER_JAVA_OPTIONS
argList.add(SparkJobArg.DRIVER_JAVA_OPTIONS.sparkParamName);
StringBuilder driverJavaOptions = new StringBuilder();
String[] requiredJavaOpts = { WORKFLOW_LINK, JOB_LINK, EXECUTION_LINK, ATTEMPT_LINK };
driverJavaOptions.append(HadoopJobUtils.javaOptStringFromAzkabanProps(jobProps,
requiredJavaOpts[0]));
for (int i = 1; i < requiredJavaOpts.length; i++) {
driverJavaOptions.append(" "
+ HadoopJobUtils.javaOptStringFromAzkabanProps(jobProps, requiredJavaOpts[i]));
}
// String[] requiredJavaOpts = { WORKFLOW_LINK, JOB_LINK, EXECUTION_LINK, ATTEMPT_LINK };
// driverJavaOptions.append(HadoopJobUtils.javaOptStringFromAzkabanProps(jobProps,
// requiredJavaOpts[0]));
// for (int i = 1; i < requiredJavaOpts.length; i++) {
// driverJavaOptions.append(" "
// + HadoopJobUtils.javaOptStringFromAzkabanProps(jobProps, requiredJavaOpts[i]));
// }
if (jobProps.containsKey(SparkJobArg.DRIVER_JAVA_OPTIONS.azPropName)) {
driverJavaOptions
.append(" " + jobProps.getString(SparkJobArg.DRIVER_JAVA_OPTIONS.azPropName));
Expand Down

0 comments on commit c961788

Please sign in to comment.