diff --git a/plugins/jobtype/jobtypes/common.properties b/plugins/jobtype/jobtypes/common.properties
index e84ebe4d..988d02fa 100644
--- a/plugins/jobtype/jobtypes/common.properties
+++ b/plugins/jobtype/jobtypes/common.properties
@@ -3,5 +3,6 @@
hadoop.home=
#hive.home=
#pig.home=
+#spark.home=
#azkaban.should.proxy=
diff --git a/plugins/jobtype/jobtypes/commonprivate.properties b/plugins/jobtype/jobtypes/commonprivate.properties
index 910b3a44..9d1de059 100644
--- a/plugins/jobtype/jobtypes/commonprivate.properties
+++ b/plugins/jobtype/jobtypes/commonprivate.properties
@@ -21,4 +21,4 @@ hadoop.security.manager.class=azkaban.security.HadoopSecurityManager_H_1_0
hadoop.home=
#pig.home=
#hive.home=
-
+#spark.home=
diff --git a/plugins/jobtype/jobtypes/spark/plugin.properties b/plugins/jobtype/jobtypes/spark/plugin.properties
new file mode 100644
index 00000000..370bbf39
--- /dev/null
+++ b/plugins/jobtype/jobtypes/spark/plugin.properties
@@ -0,0 +1 @@
+queue=default
diff --git a/plugins/jobtype/jobtypes/spark/private.properties b/plugins/jobtype/jobtypes/spark/private.properties
new file mode 100644
index 00000000..53483488
--- /dev/null
+++ b/plugins/jobtype/jobtypes/spark/private.properties
@@ -0,0 +1,3 @@
+jobtype.class=azkaban.jobtype.HadoopSparkJob
+
+jobtype.classpath=${hadoop.classpath}:${spark.home}/conf:${spark.home}/lib/*
diff --git a/plugins/jobtype/lib/azkaban-common-2.6.2.53.jar b/plugins/jobtype/lib/azkaban-common-2.6.2.53.jar
new file mode 100644
index 00000000..67e55dc0
Binary files /dev/null and b/plugins/jobtype/lib/azkaban-common-2.6.2.53.jar differ
diff --git a/plugins/jobtype/lib/commons-io-2.4.jar b/plugins/jobtype/lib/commons-io-2.4.jar
new file mode 100644
index 00000000..90035a4f
Binary files /dev/null and b/plugins/jobtype/lib/commons-io-2.4.jar differ
diff --git a/plugins/jobtype/lib/hadoop-auth-2.3.0.jar b/plugins/jobtype/lib/hadoop-auth-2.3.0.jar
new file mode 100644
index 00000000..4c7a0442
Binary files /dev/null and b/plugins/jobtype/lib/hadoop-auth-2.3.0.jar differ
diff --git a/plugins/jobtype/lib/hadoop-common-2.3.0.jar b/plugins/jobtype/lib/hadoop-common-2.3.0.jar
new file mode 100644
index 00000000..e5aeceed
Binary files /dev/null and b/plugins/jobtype/lib/hadoop-common-2.3.0.jar differ
diff --git a/plugins/jobtype/lib/hadoop-yarn-api-2.3.0.jar b/plugins/jobtype/lib/hadoop-yarn-api-2.3.0.jar
new file mode 100644
index 00000000..9b3d8b1a
Binary files /dev/null and b/plugins/jobtype/lib/hadoop-yarn-api-2.3.0.jar differ
diff --git a/plugins/jobtype/lib/hadoop-yarn-client-2.3.0.jar b/plugins/jobtype/lib/hadoop-yarn-client-2.3.0.jar
new file mode 100644
index 00000000..aad117a0
Binary files /dev/null and b/plugins/jobtype/lib/hadoop-yarn-client-2.3.0.jar differ
diff --git a/plugins/jobtype/lib/hadoop-yarn-common-2.3.0.jar b/plugins/jobtype/lib/hadoop-yarn-common-2.3.0.jar
new file mode 100644
index 00000000..8ef77a82
Binary files /dev/null and b/plugins/jobtype/lib/hadoop-yarn-common-2.3.0.jar differ
diff --git a/plugins/jobtype/lib/hamcrest-core-1.3.jar b/plugins/jobtype/lib/hamcrest-core-1.3.jar
new file mode 100644
index 00000000..9d5fe16e
Binary files /dev/null and b/plugins/jobtype/lib/hamcrest-core-1.3.jar differ
diff --git a/plugins/jobtype/lib/jcommander-1.27.jar b/plugins/jobtype/lib/jcommander-1.27.jar
new file mode 100644
index 00000000..f9490902
Binary files /dev/null and b/plugins/jobtype/lib/jcommander-1.27.jar differ
diff --git a/plugins/jobtype/lib/junit-4.11.jar b/plugins/jobtype/lib/junit-4.11.jar
new file mode 100644
index 00000000..aaf74448
Binary files /dev/null and b/plugins/jobtype/lib/junit-4.11.jar differ
diff --git a/plugins/jobtype/lib/spark-core_2.10-1.4.0.jar b/plugins/jobtype/lib/spark-core_2.10-1.4.0.jar
new file mode 100644
index 00000000..fec15fdb
Binary files /dev/null and b/plugins/jobtype/lib/spark-core_2.10-1.4.0.jar differ
diff --git a/plugins/jobtype/src/azkaban/jobtype/HadoopHiveJob.java b/plugins/jobtype/src/azkaban/jobtype/HadoopHiveJob.java
index 0bae33c1..bc5645ce 100644
--- a/plugins/jobtype/src/azkaban/jobtype/HadoopHiveJob.java
+++ b/plugins/jobtype/src/azkaban/jobtype/HadoopHiveJob.java
@@ -30,6 +30,7 @@
import org.apache.log4j.Logger;
+import azkaban.flow.CommonJobProperties;
import azkaban.jobExecutor.JavaProcessJob;
import azkaban.security.commons.HadoopSecurityManager;
import azkaban.security.commons.HadoopSecurityManagerException;
@@ -50,27 +51,24 @@ public class HadoopHiveJob extends JavaProcessJob {
private HadoopSecurityManager hadoopSecurityManager;
- private static final String HADOOP_SECURITY_MANAGER_CLASS_PARAM =
- "hadoop.security.manager.class";
-
private boolean debug = false;
public HadoopHiveJob(String jobid, Props sysProps, Props jobProps, Logger log)
throws IOException {
super(jobid, sysProps, jobProps, log);
- getJobProps().put("azkaban.job.id", jobid);
+ getJobProps().put(CommonJobProperties.JOB_ID, jobid);
- shouldProxy = getSysProps().getBoolean("azkaban.should.proxy", false);
- getJobProps().put("azkaban.should.proxy", Boolean.toString(shouldProxy));
- obtainTokens = getSysProps().getBoolean("obtain.binary.token", false);
+ shouldProxy = getSysProps().getBoolean(HadoopSecurityManager.ENABLE_PROXYING, false);
+ getJobProps().put(HadoopSecurityManager.ENABLE_PROXYING, Boolean.toString(shouldProxy));
+ obtainTokens = getSysProps().getBoolean(HadoopSecurityManager.OBTAIN_BINARY_TOKEN, false);
debug = getJobProps().getBoolean("debug", false);
if (shouldProxy) {
getLog().info("Initiating hadoop security manager.");
try {
- hadoopSecurityManager = loadHadoopSecurityManager(sysProps, log);
+ hadoopSecurityManager = HadoopJobUtils.loadHadoopSecurityManager(getSysProps(), log);
} catch (RuntimeException e) {
throw new RuntimeException("Failed to get hadoop security manager!" + e);
}
@@ -90,91 +88,26 @@ public void run() throws Exception {
Props props = new Props();
props.putAll(getJobProps());
props.putAll(getSysProps());
- tokenFile = getHadoopTokens(props);
+ tokenFile = HadoopJobUtils.getHadoopTokens(hadoopSecurityManager, props, getLog());
getJobProps().put("env." + HADOOP_TOKEN_FILE_LOCATION,
tokenFile.getAbsolutePath());
}
try {
- super.run();
- } catch (Exception e) {
- e.printStackTrace();
- getLog().error("caught exception running the job");
- throw new Exception(e);
+ super.run();
} catch (Throwable t) {
t.printStackTrace();
getLog().error("caught error running the job");
throw new Exception(t);
} finally {
if (tokenFile != null) {
- cancelHadoopTokens(tokenFile);
+ HadoopJobUtils.cancelHadoopTokens(hadoopSecurityManager, userToProxy, tokenFile, getLog());
if (tokenFile.exists()) {
tokenFile.delete();
}
}
}
- }
-
- private void cancelHadoopTokens(File tokenFile) {
- try {
- hadoopSecurityManager.cancelTokens(tokenFile, userToProxy, getLog());
- } catch (HadoopSecurityManagerException e) {
- e.printStackTrace();
- getLog().error(e.getCause() + e.getMessage());
- } catch (Exception e) {
- e.printStackTrace();
- getLog().error(e.getCause() + e.getMessage());
- }
-
- }
-
- private HadoopSecurityManager loadHadoopSecurityManager(Props props,
- Logger logger) throws RuntimeException {
-
- Class> hadoopSecurityManagerClass =
- props.getClass(HADOOP_SECURITY_MANAGER_CLASS_PARAM, true,
- HadoopHiveJob.class.getClassLoader());
- getLog().info(
- "Loading hadoop security manager "
- + hadoopSecurityManagerClass.getName());
- HadoopSecurityManager hadoopSecurityManager = null;
-
- try {
- Method getInstanceMethod =
- hadoopSecurityManagerClass.getMethod("getInstance", Props.class);
- hadoopSecurityManager =
- (HadoopSecurityManager) getInstanceMethod.invoke(
- hadoopSecurityManagerClass, props);
- } catch (InvocationTargetException e) {
- getLog().error(
- "Could not instantiate Hadoop Security Manager "
- + hadoopSecurityManagerClass.getName() + e.getCause());
- throw new RuntimeException(e.getCause());
- } catch (Exception e) {
- e.printStackTrace();
- throw new RuntimeException(e.getCause());
- }
-
- return hadoopSecurityManager;
-
- }
-
- protected File getHadoopTokens(Props props)
- throws HadoopSecurityManagerException {
-
- File tokenFile = null;
- try {
- tokenFile = File.createTempFile("mr-azkaban", ".token");
- } catch (Exception e) {
- e.printStackTrace();
- throw new HadoopSecurityManagerException(
- "Failed to create the token file.", e);
- }
-
- hadoopSecurityManager.prefetchToken(tokenFile, props, getLog());
-
- return tokenFile;
- }
+ }
@Override
protected String getJavaClass() {
diff --git a/plugins/jobtype/src/azkaban/jobtype/HadoopJavaJob.java b/plugins/jobtype/src/azkaban/jobtype/HadoopJavaJob.java
index e33a8c6d..20b94d8e 100644
--- a/plugins/jobtype/src/azkaban/jobtype/HadoopJavaJob.java
+++ b/plugins/jobtype/src/azkaban/jobtype/HadoopJavaJob.java
@@ -16,21 +16,19 @@
package azkaban.jobtype;
+import static org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
+
import java.io.File;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.StringTokenizer;
import org.apache.log4j.Logger;
+import azkaban.flow.CommonJobProperties;
+import azkaban.jobExecutor.JavaProcessJob;
import azkaban.security.commons.HadoopSecurityManager;
-import azkaban.security.commons.HadoopSecurityManagerException;
import azkaban.utils.Props;
-import azkaban.jobExecutor.JavaProcessJob;
-
-import static org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
public class HadoopJavaJob extends JavaProcessJob {
@@ -43,9 +41,6 @@ public class HadoopJavaJob extends JavaProcessJob {
public static final String DEFAULT_RUN_METHOD = "run";
public static final String DEFAULT_PROGRESS_METHOD = "getProgress";
- private static final String HADOOP_SECURITY_MANAGER_CLASS_PARAM =
- "hadoop.security.manager.class";
-
private String _runMethod;
private String _cancelMethod;
private String _progressMethod;
@@ -63,17 +58,18 @@ public HadoopJavaJob(String jobid, Props sysProps, Props jobProps, Logger log)
throws RuntimeException {
super(jobid, sysProps, jobProps, log);
- getJobProps().put("azkaban.job.id", jobid);
- shouldProxy = getSysProps().getBoolean("azkaban.should.proxy", false);
- getJobProps().put("azkaban.should.proxy", Boolean.toString(shouldProxy));
- obtainTokens = getSysProps().getBoolean("obtain.binary.token", false);
+ getJobProps().put(CommonJobProperties.JOB_ID, jobid);
+ shouldProxy = getSysProps().getBoolean(HadoopSecurityManager.ENABLE_PROXYING, false);
+ getJobProps().put(HadoopSecurityManager.ENABLE_PROXYING, Boolean.toString(shouldProxy));
+ obtainTokens = getSysProps().getBoolean(HadoopSecurityManager.OBTAIN_BINARY_TOKEN, false);
noUserClasspath =
getSysProps().getBoolean("azkaban.no.user.classpath", false);
if (shouldProxy) {
getLog().info("Initiating hadoop security manager.");
try {
- hadoopSecurityManager = loadHadoopSecurityManager(sysProps, log);
+ hadoopSecurityManager = HadoopJobUtils.loadHadoopSecurityManager(getSysProps(), log);
+
} catch (RuntimeException e) {
e.printStackTrace();
throw new RuntimeException("Failed to get hadoop security manager!"
@@ -82,35 +78,6 @@ public HadoopJavaJob(String jobid, Props sysProps, Props jobProps, Logger log)
}
}
- private HadoopSecurityManager loadHadoopSecurityManager(Props props,
- Logger logger) throws RuntimeException {
-
- Class> hadoopSecurityManagerClass =
- props.getClass(HADOOP_SECURITY_MANAGER_CLASS_PARAM, true,
- HadoopJavaJob.class.getClassLoader());
- getLog().info(
- "Initializing hadoop security manager "
- + hadoopSecurityManagerClass.getName());
- HadoopSecurityManager hadoopSecurityManager = null;
-
- try {
- Method getInstanceMethod =
- hadoopSecurityManagerClass.getMethod("getInstance", Props.class);
- hadoopSecurityManager =
- (HadoopSecurityManager) getInstanceMethod.invoke(
- hadoopSecurityManagerClass, props);
- } catch (InvocationTargetException e) {
- getLog().error(
- "Could not instantiate Hadoop Security Manager "
- + hadoopSecurityManagerClass.getName() + e.getCause());
- throw new RuntimeException(e.getCause());
- } catch (Exception e) {
- e.printStackTrace();
- throw new RuntimeException(e.getCause());
- }
-
- return hadoopSecurityManager;
- }
@Override
protected String getJVMArguments() {
@@ -189,7 +156,7 @@ public void run() throws Exception {
HadoopConfigurationInjector.prepareResourcesToInject(getJobProps(),
getWorkingDirectory());
- File f = null;
+ File tokenFile = null;
if (shouldProxy && obtainTokens) {
userToProxy = getJobProps().getString("user.to.proxy");
getLog().info("Need to proxy. Getting tokens.");
@@ -197,9 +164,9 @@ public void run() throws Exception {
props.putAll(getJobProps());
props.putAll(getSysProps());
- f = getHadoopTokens(props);
+ tokenFile = HadoopJobUtils.getHadoopTokens(hadoopSecurityManager, props, getLog());
getJobProps().put("env." + HADOOP_TOKEN_FILE_LOCATION,
- f.getAbsolutePath());
+ tokenFile.getAbsolutePath());
}
try {
super.run();
@@ -207,15 +174,15 @@ public void run() throws Exception {
e.printStackTrace();
throw new Exception(e);
} finally {
- if (f != null) {
+ if (tokenFile != null) {
try {
- cancelHadoopTokens(f);
+ HadoopJobUtils.cancelHadoopTokens(hadoopSecurityManager, userToProxy, tokenFile, getLog());
} catch (Throwable t) {
t.printStackTrace();
getLog().error("Failed to cancel tokens.");
}
- if (f.exists()) {
- f.delete();
+ if (tokenFile.exists()) {
+ tokenFile.delete();
}
}
}
@@ -239,37 +206,7 @@ private static String getSourcePathFromClass(Class> containedClass) {
return containedClass.getProtectionDomain().getCodeSource().getLocation()
.getPath();
}
- }
-
- protected File getHadoopTokens(Props props)
- throws HadoopSecurityManagerException {
-
- File tokenFile = null;
- try {
- tokenFile = File.createTempFile("mr-azkaban", ".token");
- } catch (Exception e) {
- e.printStackTrace();
- throw new HadoopSecurityManagerException(
- "Failed to create the token file.", e);
- }
-
- hadoopSecurityManager.prefetchToken(tokenFile, props, getLog());
-
- return tokenFile;
- }
-
- private void cancelHadoopTokens(File f) {
- try {
- hadoopSecurityManager.cancelTokens(f, userToProxy, getLog());
- } catch (HadoopSecurityManagerException e) {
- e.printStackTrace();
- getLog().error(e.getCause() + e.getMessage());
- } catch (Exception e) {
- e.printStackTrace();
- getLog().error(e.getCause() + e.getMessage());
- }
-
- }
+ }
@Override
protected String getJavaClass() {
diff --git a/plugins/jobtype/src/azkaban/jobtype/HadoopJobUtils.java b/plugins/jobtype/src/azkaban/jobtype/HadoopJobUtils.java
new file mode 100644
index 00000000..a597339c
--- /dev/null
+++ b/plugins/jobtype/src/azkaban/jobtype/HadoopJobUtils.java
@@ -0,0 +1,444 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.jobtype;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.log4j.Logger;
+
+import azkaban.security.commons.HadoopSecurityManager;
+import azkaban.security.commons.HadoopSecurityManagerException;
+import azkaban.utils.Props;
+
+/**
+ *
+ * There are many common methods that's required by the Hadoop*Job.java's. They are all consolidated
+ * here.
+ *
+ * Methods here include getting/setting hadoop tokens,
+ * methods for manipulating lib folder paths and jar paths passed in from Azkaban prop file,
+ * and finally methods for helping to parse logs for application ids,
+ * and kill the applications via Yarn (very helpful during the cancel method)
+ *
+ *
+ *
+ *
+ * @see azkaban.jobtype.HadoopSparkJob
+ * @see azkaban.jobtype.HadoopHiveJob
+ * @see azkaban.jobtype.HadoopPigJob
+ * @see azkaban.jobtype.HadoopJavaJob
+ */
+
+public class HadoopJobUtils {
+
+ public static final String HADOOP_SECURITY_MANAGER_CLASS_PARAM = "hadoop.security.manager.class";
+
+ // the regex to look for while looking for application id's in the hadoop log
+ public static final Pattern APPLICATION_ID_PATTERN = Pattern
+ .compile(".* (application_\\d+_\\d+).*");
+
+ // Azkaban built in property name
+ public static final String JOBTYPE_GLOBAL_JVM_ARGS = "jobtype.global.jvm.args";
+
+ // Azkaban built in property name
+ public static final String JOBTYPE_JVM_ARGS = "jobtype.jvm.args";
+
+ // Azkaban built in property name
+ public static final String JVM_ARGS = "jvm.args";
+
+ /**
+ * Invalidates a Hadoop authentication token file
+ *
+ * @param hadoopSecurityManager
+ * @param userToProxy
+ * @param tokenFile
+ * @param log
+ */
+ public static void cancelHadoopTokens(HadoopSecurityManager hadoopSecurityManager,
+ String userToProxy, File tokenFile, Logger log) {
+ try {
+ hadoopSecurityManager.cancelTokens(tokenFile, userToProxy, log);
+ } catch (HadoopSecurityManagerException e) {
+ log.error(e.getCause() + e.getMessage());
+ } catch (Exception e) {
+ log.error(e.getCause() + e.getMessage());
+ }
+ }
+
+ /**
+ * Based on the HADOOP_SECURITY_MANAGER_CLASS_PARAM setting in the incoming props, finds the
+ * correct HadoopSecurityManager Java class
+ *
+ * @param props
+ * @param log
+ * @return a HadoopSecurityManager object. Will throw exception if any errors occur (including not
+ * finding a class)
+ * @throws RuntimeException
+ * : If any errors happen along the way.
+ */
+ public static HadoopSecurityManager loadHadoopSecurityManager(Props props, Logger log)
+ throws RuntimeException {
+
+ Class> hadoopSecurityManagerClass = props.getClass(HADOOP_SECURITY_MANAGER_CLASS_PARAM, true,
+ HadoopJobUtils.class.getClassLoader());
+ log.info("Loading hadoop security manager " + hadoopSecurityManagerClass.getName());
+ HadoopSecurityManager hadoopSecurityManager = null;
+
+ try {
+ Method getInstanceMethod = hadoopSecurityManagerClass.getMethod("getInstance", Props.class);
+ hadoopSecurityManager = (HadoopSecurityManager) getInstanceMethod.invoke(
+ hadoopSecurityManagerClass, props);
+ } catch (InvocationTargetException e) {
+ String errMsg = "Could not instantiate Hadoop Security Manager "
+ + hadoopSecurityManagerClass.getName() + e.getCause();
+ log.error(errMsg);
+ throw new RuntimeException(errMsg, e);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ return hadoopSecurityManager;
+
+ }
+
+ /**
+ * Fetching token with the Azkaban user
+ *
+ * @param hadoopSecurityManager
+ * @param props
+ * @param log
+ * @return
+ * @throws HadoopSecurityManagerException
+ */
+ public static File getHadoopTokens(HadoopSecurityManager hadoopSecurityManager, Props props,
+ Logger log) throws HadoopSecurityManagerException {
+
+ File tokenFile = null;
+ try {
+ tokenFile = File.createTempFile("mr-azkaban", ".token");
+ } catch (Exception e) {
+ throw new HadoopSecurityManagerException("Failed to create the token file.", e);
+ }
+
+ hadoopSecurityManager.prefetchToken(tokenFile, props, log);
+
+ return tokenFile;
+ }
+
+ /**
+ *
+ * If there's a * specification in the "jar" argument (e.g. jar=./lib/*,./lib2/*),
+ * this method helps to resolve the * into actual jar names inside the folder, and in order.
+ * This is due to the requirement that Spark 1.4 doesn't seem to do the resolution for users
+ *
+ *
+ *
+ * @param unresolvedJarSpec
+ * @return jar file list, comma separated, all .../* expanded into actual jar names in order
+ *
+ */
+ public static String resolveWildCardForJarSpec(String workingDirectory, String unresolvedJarSpec,
+ Logger log) {
+
+ log.debug("resolveWildCardForJarSpec: unresolved jar specification: " + unresolvedJarSpec);
+ log.debug("working directory: " + workingDirectory);
+
+ if (unresolvedJarSpec == null || unresolvedJarSpec.isEmpty())
+ return "";
+
+ StringBuilder resolvedJarSpec = new StringBuilder();
+
+ String[] unresolvedJarSpecList = unresolvedJarSpec.split(",");
+ for (String s : unresolvedJarSpecList) {
+ // if need resolution
+ if (s.endsWith("*")) {
+ // remove last 2 characters to get to the folder
+ String dirName = String.format("%s/%s", workingDirectory, s.substring(0, s.length() - 2));
+
+ File[] jars = null;
+ try {
+ jars = getFilesInFolderByRegex(new File(dirName), ".*jar");
+ } catch (FileNotFoundException fnfe) {
+ log.warn("folder does not exist: " + dirName);
+ continue;
+ }
+
+ // if the folder is there, add them to the jar list
+ for (File jar : jars) {
+ resolvedJarSpec.append(jar.toString()).append(",");
+ }
+ } else { // no need for resolution
+ resolvedJarSpec.append(s).append(",");
+ }
+ }
+
+ log.debug("resolveWildCardForJarSpec: resolvedJarSpec: " + resolvedJarSpec);
+
+ // remove the trailing comma
+ int lastCharIndex = resolvedJarSpec.length() - 1;
+ if (lastCharIndex >= 0 && resolvedJarSpec.charAt(lastCharIndex) == ',') {
+ resolvedJarSpec.deleteCharAt(lastCharIndex);
+ }
+
+ return resolvedJarSpec.toString();
+ }
+
+ /**
+ *
+ * This method looks for the proper user execution jar.
+ * The user input is expected in the following 2 formats:
+ * 1. ./lib/abc
+ * 2. ./lib/abc.jar
+ *
+ * This method will use prefix matching to find any jar that is the form of abc*.jar,
+ * so that users can bump jar versions without doing modifications to their Hadoop DSL.
+ *
+ * This method will throw an Exception if more than one jar that matches the prefix is found
+ *
+ * @param workingDirectory
+ * @param userSpecifiedJarName
+ * @return the resolved actual jar file name to execute
+ */
+ public static String resolveExecutionJarName(String workingDirectory,
+ String userSpecifiedJarName, Logger log) {
+
+ if (log.isDebugEnabled()) {
+ String debugMsg = String.format(
+ "Resolving execution jar name: working directory: %s, user specified name: %s",
+ workingDirectory, userSpecifiedJarName);
+ log.debug(debugMsg);
+ }
+
+ // in case user decides to specify with abc.jar, instead of only abc
+ if (userSpecifiedJarName.endsWith(".jar"))
+ userSpecifiedJarName = userSpecifiedJarName.replace(".jar", "");
+
+ // can't use java 1.7 stuff, reverting to a slightly ugly implementation
+ String userSpecifiedJarPath = String.format("%s/%s", workingDirectory, userSpecifiedJarName);
+ int lastIndexOfSlash = userSpecifiedJarPath.lastIndexOf("/");
+ final String jarPrefix = userSpecifiedJarPath.substring(lastIndexOfSlash + 1);
+ final String dirName = userSpecifiedJarPath.substring(0, lastIndexOfSlash);
+
+ if (log.isDebugEnabled()) {
+ String debugMsg = String.format("Resolving execution jar name: dirname: %s, jar name: %s",
+ dirName, jarPrefix);
+ log.debug(debugMsg);
+ }
+
+ File[] potentialExecutionJarList;
+ try {
+ potentialExecutionJarList = getFilesInFolderByRegex(new File(dirName), jarPrefix + ".*jar");
+ } catch (FileNotFoundException e) {
+ throw new IllegalStateException(
+ "execution jar is suppose to be in this folder, but the folder doesn't exist: "
+ + dirName);
+ }
+
+ if (potentialExecutionJarList.length == 0) {
+ throw new IllegalStateException("unable to find execution jar for Spark at path: "
+ + userSpecifiedJarPath + "*.jar");
+ } else if (potentialExecutionJarList.length > 1) {
+ throw new IllegalStateException(
+ "I find more than one matching instance of the execution jar at the path, don't know which one to use: "
+ + userSpecifiedJarPath + "*.jar");
+ }
+
+ String resolvedJarName = potentialExecutionJarList[0].toString();
+ log.debug("Resolving execution jar name: resolvedJarName: " + resolvedJarName);
+ return resolvedJarName;
+ }
+
+ /**
+ *
+ * @return a list of files in the given folder that matches the regex. It may be empty, but will
+ * never return a null
+ * @throws FileNotFoundException
+ */
+ private static File[] getFilesInFolderByRegex(File folder, final String regex)
+ throws FileNotFoundException {
+ // sanity check
+
+ if (!folder.exists()) {
+ throw new FileNotFoundException();
+
+ }
+ if (!folder.isDirectory()) {
+ throw new IllegalStateException(
+ "execution jar is suppose to be in this folder, but the object present is not a directory: "
+ + folder);
+ }
+
+ File[] matchingFiles = folder.listFiles(new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ if (name.matches(regex))
+ return true;
+ else
+ return false;
+ }
+ });
+
+ if (matchingFiles == null) {
+ throw new IllegalStateException(
+ "the File[] matchingFiles variable is null. This means an IOException occured while doing listFiles. Please check disk availability and retry again");
+ }
+
+ return matchingFiles;
+ }
+
+ /**
+ * Pass in a log file, this method will find all the hadoop jobs it has launched, and kills it
+ *
+ * Only works with Hadoop2
+ *
+ * @param logFilePath
+ * @param log
+ * @return a Set. The set will contain the applicationIds that this job tried to kill.
+ */
+ public static Set killAllSpawnedHadoopJobs(String logFilePath, Logger log) {
+ Set allSpawnedJobs = findApplicationIdFromLog(logFilePath, log);
+ log.info("applicationIds to kill: " + allSpawnedJobs);
+
+ for (String appId : allSpawnedJobs) {
+ try {
+ killJobOnCluster(appId, log);
+ } catch (Throwable t) {
+ log.warn("something happened while trying to kill this job: " + appId, t);
+ }
+ }
+
+ return allSpawnedJobs;
+ }
+
+ /**
+ *
+ * Takes in a log file, will grep every line to look for the application_id pattern.
+ * If it finds multiple, it will return all of them, de-duped (this is possible in the case of pig jobs)
+ * This can be used in conjunction with the @killJobOnCluster method in this file.
+ *
+ *
+ * @param logFilePath
+ * @return a Set. May be empty, but will never be null
+ */
+ public static Set findApplicationIdFromLog(String logFilePath, Logger log) {
+
+ File logFile = new File(logFilePath);
+
+ if (!logFile.exists()) {
+ throw new IllegalArgumentException("the logFilePath does not exist: " + logFilePath);
+ }
+ if (!logFile.isFile()) {
+ throw new IllegalArgumentException("the logFilePath specified is not a valid file: "
+ + logFilePath);
+ }
+ if (!logFile.canRead()) {
+ throw new IllegalArgumentException("unable to read the logFilePath specified: " + logFilePath);
+ }
+
+ BufferedReader br = null;
+ Set applicationIds = new HashSet();
+
+ try {
+ br = new BufferedReader(new FileReader(logFile));
+ String input;
+
+ // finds all the application IDs
+ while ((input = br.readLine()) != null) {
+ Matcher m = APPLICATION_ID_PATTERN.matcher(input);
+ if (m.find()) {
+ String appId = m.group(1);
+ applicationIds.add(appId);
+ }
+ }
+ } catch (IOException e) {
+ log.error("Error while trying to find applicationId for Spark log", e);
+ } finally {
+ try {
+ if (br != null)
+ br.close();
+ } catch (Exception e) {
+ // do nothing
+ }
+ }
+ return applicationIds;
+ }
+
+ /**
+ *
+ * Uses YarnClient to kill the job on HDFS.
+ * Using JobClient only works partially:
+ * If yarn container has started but spark job haven't, it will kill
+ * If spark job has started, the cancel will hang until the spark job is complete
+ * If the spark job is complete, it will return immediately, with a job not found on job tracker
+ *
+ *
+ * @param applicationId
+ * @throws IOException
+ * @throws YarnException
+ */
+ public static void killJobOnCluster(String applicationId, Logger log) throws YarnException,
+ IOException {
+
+ YarnConfiguration yarnConf = new YarnConfiguration();
+ YarnClient yarnClient = YarnClient.createYarnClient();
+ yarnClient.init(yarnConf);
+ yarnClient.start();
+
+ String[] split = applicationId.split("_");
+ ApplicationId aid = ApplicationId.newInstance(Long.parseLong(split[1]),
+ Integer.parseInt(split[2]));
+
+ log.info("start klling application: " + aid);
+ yarnClient.killApplication(aid);
+ log.info("successfully killed application: " + aid);
+ }
+
+ /**
+ *
+ * constructions a javaOpts string based on the Props, and the key given, will return
+ * String.format("-D%s=%s", key, value);
+ *
+ *
+ * @param props
+ * @param key
+ * @return will return String.format("-D%s=%s", key, value). Throws RuntimeException if props not
+ * present
+ */
+ public static String javaOptStringFromAzkabanProps(Props props, String key) {
+ String value = props.get(key);
+ if (value == null) {
+ throw new RuntimeException(String.format("Cannot find property [%s], in azkaban props: [%s]",
+ key, value));
+ }
+ return String.format("-D%s=%s", key, value);
+ }
+}
diff --git a/plugins/jobtype/src/azkaban/jobtype/HadoopPigJob.java b/plugins/jobtype/src/azkaban/jobtype/HadoopPigJob.java
index 3b6e502b..2b7b8a03 100644
--- a/plugins/jobtype/src/azkaban/jobtype/HadoopPigJob.java
+++ b/plugins/jobtype/src/azkaban/jobtype/HadoopPigJob.java
@@ -20,8 +20,6 @@
import java.io.File;
import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -31,9 +29,9 @@
import org.apache.log4j.Logger;
import org.apache.pig.PigRunner;
+import azkaban.flow.CommonJobProperties;
import azkaban.jobExecutor.JavaProcessJob;
import azkaban.security.commons.HadoopSecurityManager;
-import azkaban.security.commons.HadoopSecurityManagerException;
import azkaban.utils.Props;
import azkaban.utils.StringUtils;
@@ -70,25 +68,22 @@ public class HadoopPigJob extends JavaProcessJob {
private File pigLogFile = null;
- private static final String HADOOP_SECURITY_MANAGER_CLASS_PARAM =
- "hadoop.security.manager.class";
-
public HadoopPigJob(String jobid, Props sysProps, Props jobProps, Logger log)
throws IOException {
super(jobid, sysProps, jobProps, log);
HADOOP_SECURE_PIG_WRAPPER = HadoopSecurePigWrapper.class.getName();
- getJobProps().put("azkaban.job.id", jobid);
- shouldProxy = getSysProps().getBoolean("azkaban.should.proxy", false);
- getJobProps().put("azkaban.should.proxy", Boolean.toString(shouldProxy));
- obtainTokens = getSysProps().getBoolean("obtain.binary.token", false);
+ getJobProps().put(CommonJobProperties.JOB_ID, jobid);
+ shouldProxy = getSysProps().getBoolean(HadoopSecurityManager.ENABLE_PROXYING, false);
+ getJobProps().put(HadoopSecurityManager.ENABLE_PROXYING, Boolean.toString(shouldProxy));
+ obtainTokens = getSysProps().getBoolean(HadoopSecurityManager.OBTAIN_BINARY_TOKEN, false);
userPigJar = getJobProps().getBoolean("use.user.pig.jar", false);
if (shouldProxy) {
getLog().info("Initiating hadoop security manager.");
try {
- hadoopSecurityManager = loadHadoopSecurityManager(getSysProps(), log);
+ hadoopSecurityManager = HadoopJobUtils.loadHadoopSecurityManager(getSysProps(), log);
} catch (RuntimeException e) {
throw new RuntimeException("Failed to get hadoop security manager!" + e);
}
@@ -100,7 +95,7 @@ public void run() throws Exception {
HadoopConfigurationInjector.prepareResourcesToInject(getJobProps(),
getWorkingDirectory());
- File f = null;
+ File tokenFile = null;
if (shouldProxy && obtainTokens) {
userToProxy = getJobProps().getString("user.to.proxy");
getLog().info("Need to proxy. Getting tokens.");
@@ -108,90 +103,25 @@ public void run() throws Exception {
Props props = new Props();
props.putAll(getJobProps());
props.putAll(getSysProps());
- f = getHadoopTokens(props);
+ tokenFile = HadoopJobUtils.getHadoopTokens(hadoopSecurityManager, props, getLog());
getJobProps().put("env." + HADOOP_TOKEN_FILE_LOCATION,
- f.getAbsolutePath());
+ tokenFile.getAbsolutePath());
}
try {
super.run();
- } catch (Exception e) {
- e.printStackTrace();
- getLog().error("caught exception running the job", e);
- throw new Exception(e);
} catch (Throwable t) {
t.printStackTrace();
getLog().error("caught error running the job", t);
throw new Exception(t);
} finally {
- if (f != null) {
- cancelHadoopTokens(f);
- if (f.exists()) {
- f.delete();
+ if (tokenFile != null) {
+ HadoopJobUtils.cancelHadoopTokens(hadoopSecurityManager, userToProxy, tokenFile, getLog());
+ if (tokenFile.exists()) {
+ tokenFile.delete();
}
}
}
- }
-
- private void cancelHadoopTokens(File f) {
- try {
- hadoopSecurityManager.cancelTokens(f, userToProxy, getLog());
- } catch (HadoopSecurityManagerException e) {
- e.printStackTrace();
- getLog().error(e.getCause() + e.getMessage());
- } catch (Exception e) {
- e.printStackTrace();
- getLog().error(e.getCause() + e.getMessage());
- }
-
- }
-
- private HadoopSecurityManager loadHadoopSecurityManager(Props props,
- Logger logger) throws RuntimeException {
-
- Class> hadoopSecurityManagerClass =
- props.getClass(HADOOP_SECURITY_MANAGER_CLASS_PARAM, true,
- HadoopPigJob.class.getClassLoader());
- getLog().info(
- "Loading hadoop security manager "
- + hadoopSecurityManagerClass.getName());
- HadoopSecurityManager hadoopSecurityManager = null;
-
- try {
- Method getInstanceMethod =
- hadoopSecurityManagerClass.getMethod("getInstance", Props.class);
- hadoopSecurityManager =
- (HadoopSecurityManager) getInstanceMethod.invoke(
- hadoopSecurityManagerClass, props);
- } catch (InvocationTargetException e) {
- getLog().error(
- "Could not instantiate Hadoop Security Manager "
- + hadoopSecurityManagerClass.getName() + e.getCause());
- throw new RuntimeException(e.getCause());
- } catch (Exception e) {
- e.printStackTrace();
- throw new RuntimeException(e.getCause());
- }
-
- return hadoopSecurityManager;
-
- }
-
- protected File getHadoopTokens(Props props)
- throws HadoopSecurityManagerException {
-
- File tokenFile = null;
- try {
- tokenFile = File.createTempFile("mr-azkaban", ".token");
- } catch (Exception e) {
- e.printStackTrace();
- throw new HadoopSecurityManagerException(
- "Failed to create the token file.", e);
- }
-
- hadoopSecurityManager.prefetchToken(tokenFile, props, getLog());
-
- return tokenFile;
- }
+ }
@Override
protected String getJavaClass() {
diff --git a/plugins/jobtype/src/azkaban/jobtype/HadoopSecureHiveWrapper.java b/plugins/jobtype/src/azkaban/jobtype/HadoopSecureHiveWrapper.java
index 75ea667e..c041fafd 100644
--- a/plugins/jobtype/src/azkaban/jobtype/HadoopSecureHiveWrapper.java
+++ b/plugins/jobtype/src/azkaban/jobtype/HadoopSecureHiveWrapper.java
@@ -22,9 +22,7 @@
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORECONNECTURLKEY;
import static org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
-import java.io.BufferedReader;
import java.io.File;
-import java.io.FileReader;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Collections;
@@ -41,12 +39,9 @@
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
import org.apache.log4j.Logger;
-import azkaban.jobExecutor.ProcessJob;
import azkaban.jobtype.hiveutils.HiveQueryExecutionException;
-import azkaban.security.commons.HadoopSecurityManager;
import azkaban.utils.Props;
public class HadoopSecureHiveWrapper {
@@ -63,11 +58,8 @@ public class HadoopSecureHiveWrapper {
private static String hiveScript;
public static void main(final String[] args) throws Exception {
-
- String propsFile = System.getenv(ProcessJob.JOB_PROP_ENV);
- Properties props = new Properties();
- props.load(new BufferedReader(new FileReader(propsFile)));
-
+
+ Properties props = HadoopSecureWrapperUtils.loadAzkabanProps();
HadoopConfigurationInjector.injectResources(new Props(null, props));
hiveScript = props.getProperty("hive.script");
@@ -77,39 +69,11 @@ public static void main(final String[] args) throws Exception {
UserGroupInformation.setConfiguration(conf);
securityEnabled = UserGroupInformation.isSecurityEnabled();
- if (shouldProxy(props)) {
+ if (HadoopSecureWrapperUtils.shouldProxy(props)) {
UserGroupInformation proxyUser = null;
String userToProxy = props.getProperty("user.to.proxy");
if (securityEnabled) {
- String filelocation = System.getenv(HADOOP_TOKEN_FILE_LOCATION);
- if (filelocation == null) {
- throw new RuntimeException("hadoop token information not set.");
- }
- if (!new File(filelocation).exists()) {
- throw new RuntimeException("hadoop token file doesn't exist.");
- }
-
- logger.info("Found token file " + filelocation);
-
- logger.info("Setting "
- + HadoopSecurityManager.MAPREDUCE_JOB_CREDENTIALS_BINARY + " to "
- + filelocation);
- System.setProperty(
- HadoopSecurityManager.MAPREDUCE_JOB_CREDENTIALS_BINARY,
- filelocation);
-
- UserGroupInformation loginUser = null;
-
- loginUser = UserGroupInformation.getLoginUser();
- logger.info("Current logged in user is " + loginUser.getUserName());
-
- logger.info("Creating proxy user.");
- proxyUser =
- UserGroupInformation.createProxyUser(userToProxy, loginUser);
-
- for (Token> token : loginUser.getTokens()) {
- proxyUser.addToken(token);
- }
+ proxyUser = HadoopSecureWrapperUtils.createSecurityEnabledProxyUser(userToProxy, logger);
} else {
proxyUser = UserGroupInformation.createRemoteUser(userToProxy);
}
@@ -216,13 +180,6 @@ public static void runHive(String[] args) throws Exception {
}
}
- public static boolean shouldProxy(Properties props) {
- String shouldProxy =
- props.getProperty(HadoopSecurityManager.ENABLE_PROXYING);
-
- return shouldProxy != null && shouldProxy.equals("true");
- }
-
/**
* Normally hive.aux.jars.path is expanded from just being a path to the full
* list of files in the directory by the hive shell script. Since we normally
diff --git a/plugins/jobtype/src/azkaban/jobtype/HadoopSecurePigWrapper.java b/plugins/jobtype/src/azkaban/jobtype/HadoopSecurePigWrapper.java
index c6dd27d3..4fb91873 100644
--- a/plugins/jobtype/src/azkaban/jobtype/HadoopSecurePigWrapper.java
+++ b/plugins/jobtype/src/azkaban/jobtype/HadoopSecurePigWrapper.java
@@ -27,7 +27,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
import org.apache.log4j.Logger;
import org.apache.pig.PigRunner;
import org.apache.pig.tools.pigstats.JobStats;
@@ -35,7 +34,6 @@
import org.apache.pig.tools.pigstats.PigStats.JobGraph;
import azkaban.jobExecutor.ProcessJob;
-import azkaban.security.commons.HadoopSecurityManager;
import azkaban.utils.Props;
public class HadoopSecurePigWrapper {
@@ -54,38 +52,6 @@ public class HadoopSecurePigWrapper {
logger = Logger.getRootLogger();
}
- private static UserGroupInformation getSecureProxyUser(String userToProxy)
- throws Exception {
- String filelocation =
- System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
- if (filelocation == null) {
- throw new RuntimeException("hadoop token information not set.");
- }
- if (!new File(filelocation).exists()) {
- throw new RuntimeException("hadoop token file doesn't exist.");
- }
-
- logger.info("Found token file " + filelocation);
- logger.info("Setting "
- + HadoopSecurityManager.MAPREDUCE_JOB_CREDENTIALS_BINARY + " to "
- + filelocation);
-
- System.setProperty(HadoopSecurityManager.MAPREDUCE_JOB_CREDENTIALS_BINARY,
- filelocation);
- UserGroupInformation loginUser = null;
- loginUser = UserGroupInformation.getLoginUser();
-
- logger.info("Current logged in user is " + loginUser.getUserName());
- logger.info("Creating proxy user.");
-
- UserGroupInformation proxyUser =
- UserGroupInformation.createProxyUser(userToProxy, loginUser);
- for (Token> token : loginUser.getTokens()) {
- proxyUser.addToken(token);
- }
- return proxyUser;
- }
-
public static void main(final String[] args) throws Exception {
String propsFile = System.getenv(ProcessJob.JOB_PROP_ENV);
props = new Props(null, new File(propsFile));
@@ -97,7 +63,7 @@ public static void main(final String[] args) throws Exception {
UserGroupInformation.setConfiguration(conf);
securityEnabled = UserGroupInformation.isSecurityEnabled();
pigLogFile = new File(System.getenv("PIG_LOG_FILE"));
- if (!shouldProxy(props)) {
+ if (!HadoopSecureWrapperUtils.shouldProxy(props.toProperties())) {
logger.info("Not proxying.");
runPigJob(args);
return;
@@ -106,7 +72,7 @@ public static void main(final String[] args) throws Exception {
UserGroupInformation proxyUser = null;
String userToProxy = props.getString("user.to.proxy");
if (securityEnabled) {
- proxyUser = getSecureProxyUser(userToProxy);
+ proxyUser = HadoopSecureWrapperUtils.createSecurityEnabledProxyUser(userToProxy, logger);
} else {
proxyUser = UserGroupInformation.createRemoteUser(userToProxy);
}
@@ -214,9 +180,4 @@ private static void handleError(File pigLog) throws Exception {
System.err.println("pig log file: " + pigLog + " not found.");
}
}
-
- public static boolean shouldProxy(Props props) {
- String shouldProxy = props.getString(HadoopSecurityManager.ENABLE_PROXYING);
- return shouldProxy != null && shouldProxy.equals("true");
- }
}
diff --git a/plugins/jobtype/src/azkaban/jobtype/HadoopSecureSparkWrapper.java b/plugins/jobtype/src/azkaban/jobtype/HadoopSecureSparkWrapper.java
new file mode 100644
index 00000000..313c9b39
--- /dev/null
+++ b/plugins/jobtype/src/azkaban/jobtype/HadoopSecureSparkWrapper.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.jobtype;
+
+import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.log4j.Logger;
+
+import azkaban.security.commons.HadoopSecurityManager;
+import azkaban.utils.Props;
+
+/**
+ *
+ * A Spark wrapper (more specifically a spark-submit wrapper) that works with Azkaban.
+ * This class will receive input from {@link azkaban.jobtype.HadoopSparkJob}, and pass it on to spark-submit
+ *
+ *
+ * @see azkaban.jobtype.HadoopSecureSparkWrapper
+ */
+public class HadoopSecureSparkWrapper {
+
+ private static boolean securityEnabled;
+
+ private static final Logger logger = Logger.getRootLogger();
+
+ /**
+ * Entry point: a Java wrapper to the spark-submit command
+ *
+ * @param args
+ * @throws Exception
+ */
+ public static void main(final String[] args) throws Exception {
+
+ Properties jobProps = HadoopSecureWrapperUtils.loadAzkabanProps();
+ HadoopConfigurationInjector.injectResources(new Props(null, jobProps));
+
+ // set up hadoop related configurations
+ final Configuration conf = new Configuration();
+ UserGroupInformation.setConfiguration(conf);
+ securityEnabled = UserGroupInformation.isSecurityEnabled();
+
+ if (HadoopSecureWrapperUtils.shouldProxy(jobProps)) {
+ UserGroupInformation proxyUser = null;
+ String userToProxy = jobProps.getProperty(HadoopSecurityManager.USER_TO_PROXY);
+ if (securityEnabled) {
+ proxyUser = HadoopSecureWrapperUtils.createSecurityEnabledProxyUser(userToProxy, logger);
+ } else {
+ proxyUser = UserGroupInformation.createRemoteUser(userToProxy);
+ }
+ logger.info("Proxying to execute job. Proxied as user " + userToProxy);
+
+ proxyUser.doAs(new PrivilegedExceptionAction() {
+ @Override
+ public Void run() throws Exception {
+ runSpark(args);
+ return null;
+ }
+ });
+
+ } else {
+ logger.info("Not proxying to execute job. ");
+ runSpark(args);
+ }
+ }
+
+ /**
+ * Actually calling the spark-submit command
+ *
+ * @param args
+ */
+ private static void runSpark(String[] args) {
+
+ if (args.length == 0) {
+ throw new RuntimeException("SparkSubmit cannot run with zero args");
+ }
+
+ // munge everything together and repartition based by our ^Z character, instead of by the
+ // default "space" character
+ StringBuilder concat = new StringBuilder();
+ concat.append(args[0]);
+ for (int i = 1; i < args.length; i++) {
+ concat.append(" " + args[i]);
+ }
+
+ final String[] newArgs = concat.toString().split(SparkJobArg.delimiter);
+ logger.info("newArgs: " + Arrays.toString(newArgs));
+
+ org.apache.spark.deploy.SparkSubmit$.MODULE$.main(newArgs);
+ }
+
+}
diff --git a/plugins/jobtype/src/azkaban/jobtype/HadoopSecureWrapperUtils.java b/plugins/jobtype/src/azkaban/jobtype/HadoopSecureWrapperUtils.java
new file mode 100644
index 00000000..573f586a
--- /dev/null
+++ b/plugins/jobtype/src/azkaban/jobtype/HadoopSecureWrapperUtils.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2015 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.jobtype;
+
+import static org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.log4j.Logger;
+
+import azkaban.jobExecutor.ProcessJob;
+import azkaban.security.commons.HadoopSecurityManager;
+
+/**
+ *
+ * There are many common methods that's required by the HadoopSecure*Wrapper.java's. They are all consolidated
+ * here.
+ *
+ *
+ * @see azkaban.jobtype.HadoopSecurePigWrapper
+ * @see azkaban.jobtype.HadoopSecureHiveWrapper
+ * @see azkaban.jobtype.HadoopSecureSparkWrapper
+ */
+public class HadoopSecureWrapperUtils {
+
+ /**
+ * Perform all the magic required to get the proxyUser in a securitized grid
+ *
+ * @param userToProxy
+ * @return a UserGroupInformation object for the specified userToProxy, which will also contain
+ * the logged in user's tokens
+ * @throws IOException
+ */
+ public static UserGroupInformation createSecurityEnabledProxyUser(String userToProxy, Logger log)
+ throws IOException {
+
+ String filelocation = System.getenv(HADOOP_TOKEN_FILE_LOCATION);
+ if (filelocation == null) {
+ throw new RuntimeException("hadoop token information not set.");
+ }
+ if (!new File(filelocation).exists()) {
+ throw new RuntimeException("hadoop token file doesn't exist.");
+ }
+
+ log.info("Found token file. Setting " + HadoopSecurityManager.MAPREDUCE_JOB_CREDENTIALS_BINARY
+ + " to " + filelocation);
+ System.setProperty(HadoopSecurityManager.MAPREDUCE_JOB_CREDENTIALS_BINARY, filelocation);
+
+ UserGroupInformation loginUser = null;
+
+ loginUser = UserGroupInformation.getLoginUser();
+ log.info("Current logged in user is " + loginUser.getUserName());
+
+ UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(userToProxy, loginUser);
+
+ for (Token> token : loginUser.getTokens()) {
+ proxyUser.addToken(token);
+ }
+ return proxyUser;
+ }
+
+ /**
+ * Loading the properties file, which is a combination of the jobProps file and sysProps file
+ *
+ * @return a Property file, which is the combination of the jobProps file and sysProps file
+ * @throws IOException
+ * @throws FileNotFoundException
+ */
+ public static Properties loadAzkabanProps() throws IOException, FileNotFoundException {
+ String propsFile = System.getenv(ProcessJob.JOB_PROP_ENV);
+ Properties props = new Properties();
+ props.load(new BufferedReader(new FileReader(propsFile)));
+ return props;
+ }
+
+ /**
+ * Looks for particular properties inside the Properties object passed in, and determines whether
+ * proxying should happen or not
+ *
+ * @param props
+ * @return a boolean value of whether the job should proxy or not
+ */
+ public static boolean shouldProxy(Properties props) {
+ String shouldProxy = props.getProperty(HadoopSecurityManager.ENABLE_PROXYING);
+ return shouldProxy != null && shouldProxy.equals("true");
+ }
+
+}
diff --git a/plugins/jobtype/src/azkaban/jobtype/HadoopSparkJob.java b/plugins/jobtype/src/azkaban/jobtype/HadoopSparkJob.java
new file mode 100644
index 00000000..32b9e5a6
--- /dev/null
+++ b/plugins/jobtype/src/azkaban/jobtype/HadoopSparkJob.java
@@ -0,0 +1,372 @@
+/*
+ * Copyright 2012 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package azkaban.jobtype;
+
+import static azkaban.flow.CommonJobProperties.ATTEMPT_LINK;
+import static azkaban.flow.CommonJobProperties.EXECUTION_LINK;
+import static azkaban.flow.CommonJobProperties.JOB_LINK;
+import static azkaban.flow.CommonJobProperties.WORKFLOW_LINK;
+import static azkaban.security.commons.HadoopSecurityManager.ENABLE_PROXYING;
+import static azkaban.security.commons.HadoopSecurityManager.OBTAIN_BINARY_TOKEN;
+import static azkaban.security.commons.HadoopSecurityManager.USER_TO_PROXY;
+import static org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.StringTokenizer;
+
+import javolution.testing.AssertionException;
+
+import org.apache.log4j.Logger;
+
+import azkaban.flow.CommonJobProperties;
+import azkaban.jobExecutor.JavaProcessJob;
+import azkaban.security.commons.HadoopSecurityManager;
+import azkaban.utils.Props;
+import azkaban.utils.StringUtils;
+
+/**
+ *
+ * The Azkaban adaptor for running a Spark Submit job.
+ * Use this in conjunction with {@link azkaban.jobtype.HadoopSecureSparkWrapper}
+ *
+ *
+ *
+ * @see azkaban.jobtype.HadoopSecureSparkWrapper
+ */
+public class HadoopSparkJob extends JavaProcessJob {
+
+ // Azkaban/Java params
+ private static final String HADOOP_SECURE_SPARK_WRAPPER = HadoopSecureSparkWrapper.class
+ .getName();
+
+ // Spark params
+
+ public static final String DRIVER_JAVA_OPTIONS = "driver-java-options";
+
+ // security variables
+ private String userToProxy = null;
+
+ private boolean shouldProxy = false;
+
+ private boolean obtainTokens = false;
+
+ private HadoopSecurityManager hadoopSecurityManager;
+
+ public HadoopSparkJob(String jobid, Props sysProps, Props jobProps, Logger log) {
+ super(jobid, sysProps, jobProps, log);
+
+ getJobProps().put(CommonJobProperties.JOB_ID, jobid);
+
+ shouldProxy = getSysProps().getBoolean(ENABLE_PROXYING, false);
+ getJobProps().put(ENABLE_PROXYING, Boolean.toString(shouldProxy));
+ obtainTokens = getSysProps().getBoolean(OBTAIN_BINARY_TOKEN, false);
+
+ if (shouldProxy) {
+ getLog().info("Initiating hadoop security manager.");
+ try {
+ hadoopSecurityManager = HadoopJobUtils.loadHadoopSecurityManager(getSysProps(), log);
+ } catch (RuntimeException e) {
+ throw new RuntimeException("Failed to get hadoop security manager!" + e);
+ }
+ }
+ }
+
+ @Override
+ public void run() throws Exception {
+ HadoopConfigurationInjector.prepareResourcesToInject(getJobProps(), getWorkingDirectory());
+
+ File tokenFile = null;
+ if (shouldProxy && obtainTokens) {
+ userToProxy = getJobProps().getString(USER_TO_PROXY);
+ getLog().info("Need to proxy. Getting tokens.");
+ // get tokens in to a file, and put the location in props
+ Props props = new Props();
+ props.putAll(getJobProps());
+ props.putAll(getSysProps());
+ tokenFile = HadoopJobUtils.getHadoopTokens(hadoopSecurityManager, props, getLog());
+ getJobProps().put("env." + HADOOP_TOKEN_FILE_LOCATION, tokenFile.getAbsolutePath());
+ }
+
+ try {
+ super.run();
+ } catch (Throwable t) {
+ t.printStackTrace();
+ getLog().error("caught error running the job");
+ throw new Exception(t);
+ } finally {
+ if (tokenFile != null) {
+ HadoopJobUtils.cancelHadoopTokens(hadoopSecurityManager, userToProxy, tokenFile, getLog());
+ if (tokenFile.exists()) {
+ tokenFile.delete();
+ }
+ }
+ }
+ }
+
+ @Override
+ protected String getJavaClass() {
+ return HADOOP_SECURE_SPARK_WRAPPER;
+ }
+
+ @Override
+ protected String getJVMArguments() {
+ String args = super.getJVMArguments();
+
+ String typeUserGlobalJVMArgs = getJobProps().getString(HadoopJobUtils.JOBTYPE_GLOBAL_JVM_ARGS,
+ null);
+ if (typeUserGlobalJVMArgs != null) {
+ args += " " + typeUserGlobalJVMArgs;
+ }
+ String typeSysGlobalJVMArgs = getSysProps().getString(HadoopJobUtils.JOBTYPE_GLOBAL_JVM_ARGS,
+ null);
+ if (typeSysGlobalJVMArgs != null) {
+ args += " " + typeSysGlobalJVMArgs;
+ }
+ String typeUserJVMArgs = getJobProps().getString(HadoopJobUtils.JOBTYPE_JVM_ARGS, null);
+ if (typeUserJVMArgs != null) {
+ args += " " + typeUserJVMArgs;
+ }
+ String typeSysJVMArgs = getSysProps().getString(HadoopJobUtils.JOBTYPE_JVM_ARGS, null);
+ if (typeSysJVMArgs != null) {
+ args += " " + typeSysJVMArgs;
+ }
+
+ String typeUserJVMArgs2 = getJobProps().getString(HadoopJobUtils.JVM_ARGS, null);
+ if (typeUserJVMArgs != null) {
+ args += " " + typeUserJVMArgs2;
+ }
+ String typeSysJVMArgs2 = getSysProps().getString(HadoopJobUtils.JVM_ARGS, null);
+ if (typeSysJVMArgs != null) {
+ args += " " + typeSysJVMArgs2;
+ }
+
+ if (shouldProxy) {
+ info("Setting up secure proxy info for child process");
+ String secure;
+ secure = " -D" + HadoopSecurityManager.USER_TO_PROXY + "="
+ + getJobProps().getString(HadoopSecurityManager.USER_TO_PROXY);
+ String extraToken = getSysProps().getString(HadoopSecurityManager.OBTAIN_BINARY_TOKEN,
+ "false");
+ if (extraToken != null) {
+ secure += " -D" + HadoopSecurityManager.OBTAIN_BINARY_TOKEN + "=" + extraToken;
+ }
+ info("Secure settings = " + secure);
+ args += secure;
+ } else {
+ info("Not setting up secure proxy info for child process");
+ }
+
+ return args;
+ }
+
+ @Override
+ protected String getMainArguments() {
+ return testableGetMainArguments(jobProps, getWorkingDirectory(), getLog());
+ }
+
+ static String testableGetMainArguments(Props jobProps, String workingDir, Logger log) {
+
+ // if we ever need to recreate a failure scenario in the test case
+ log.debug(jobProps);
+ log.debug(workingDir);
+
+ List argList = new ArrayList();
+
+ // special case handling for DRIVER_JAVA_OPTIONS
+ argList.add(SparkJobArg.DRIVER_JAVA_OPTIONS.sparkParamName);
+ StringBuilder driverJavaOptions = new StringBuilder();
+ String[] requiredJavaOpts = { WORKFLOW_LINK, JOB_LINK, EXECUTION_LINK, ATTEMPT_LINK };
+ driverJavaOptions.append(HadoopJobUtils.javaOptStringFromAzkabanProps(jobProps,
+ requiredJavaOpts[0]));
+ for (int i = 1; i < requiredJavaOpts.length; i++) {
+ driverJavaOptions.append(" "
+ + HadoopJobUtils.javaOptStringFromAzkabanProps(jobProps, requiredJavaOpts[i]));
+ }
+ if (jobProps.containsKey(SparkJobArg.DRIVER_JAVA_OPTIONS.azPropName)) {
+ driverJavaOptions
+ .append(" " + jobProps.getString(SparkJobArg.DRIVER_JAVA_OPTIONS.azPropName));
+ }
+ argList.add(driverJavaOptions.toString());
+
+ for (SparkJobArg sparkJobArg : SparkJobArg.values()) {
+ if (!sparkJobArg.needSpecialTreatment) {
+ handleStandardArgument(jobProps, argList, sparkJobArg);
+ } else if (sparkJobArg.equals(SparkJobArg.SPARK_JARS)) {
+ sparkJarsHelper(jobProps, workingDir, log, argList);
+ } else if (sparkJobArg.equals(SparkJobArg.SPARK_CONF_PREFIX)) {
+ sparkConfPrefixHelper(jobProps, argList);
+ } else if (sparkJobArg.equals(SparkJobArg.DRIVER_JAVA_OPTIONS)) {
+ // do nothing because already handled above
+ } else if (sparkJobArg.equals(SparkJobArg.SPARK_FLAG_PREFIX)) {
+ sparkFlagPrefixHelper(jobProps, argList);
+ } else if (sparkJobArg.equals(SparkJobArg.EXECUTION_JAR)) {
+ executionJarHelper(jobProps, workingDir, log, argList);
+ } else if (sparkJobArg.equals(SparkJobArg.PARAMS)) {
+ paramsHelper(jobProps, argList);
+ }
+ }
+ return StringUtils.join((Collection) argList, SparkJobArg.delimiter);
+ }
+
+ private static void paramsHelper(Props jobProps, List argList) {
+ if (jobProps.containsKey(SparkJobArg.PARAMS.azPropName)) {
+ String params = jobProps.getString(SparkJobArg.PARAMS.azPropName);
+ String[] paramsList = params.split(" ");
+ for (String s : paramsList) {
+ argList.add(s);
+ }
+ }
+ }
+
+ private static void executionJarHelper(Props jobProps, String workingDir, Logger log,
+ List argList) {
+ if (jobProps.containsKey(SparkJobArg.EXECUTION_JAR.azPropName)) {
+ String executionJarName = HadoopJobUtils.resolveExecutionJarName(workingDir,
+ jobProps.getString(SparkJobArg.EXECUTION_JAR.azPropName), log);
+ argList.add(executionJarName);
+ }
+ }
+
+ private static void sparkFlagPrefixHelper(Props jobProps, List argList) {
+ for (Entry entry : jobProps.getMapByPrefix(
+ SparkJobArg.SPARK_FLAG_PREFIX.azPropName).entrySet()) {
+ if ("true".equalsIgnoreCase(entry.getValue()))
+ argList.add(SparkJobArg.SPARK_FLAG_PREFIX.sparkParamName + entry.getKey());
+ }
+ }
+
+ private static void sparkJarsHelper(Props jobProps, String workingDir, Logger log,
+ List argList) {
+ String jarList = HadoopJobUtils.resolveWildCardForJarSpec(workingDir, jobProps.getString(
+ SparkJobArg.SPARK_JARS.azPropName, SparkJobArg.SPARK_JARS.defaultValue), log);
+ if (jarList.length() > 0) {
+ argList.add(SparkJobArg.SPARK_JARS.sparkParamName);
+ argList.add(jarList);
+ }
+ }
+
+ private static void sparkConfPrefixHelper(Props jobProps, List argList) {
+ for (Entry entry : jobProps.getMapByPrefix(
+ SparkJobArg.SPARK_CONF_PREFIX.azPropName).entrySet()) {
+ argList.add(SparkJobArg.SPARK_CONF_PREFIX.sparkParamName);
+ String sparkConfKeyVal = String.format("%s=%s", entry.getKey(), entry.getValue());
+ argList.add(sparkConfKeyVal);
+ }
+ }
+
+ private static void handleStandardArgument(Props jobProps, List argList,
+ SparkJobArg sparkJobArg) {
+ if (jobProps.containsKey(sparkJobArg.azPropName)) {
+ argList.add(sparkJobArg.sparkParamName);
+ argList.add(jobProps.getString(sparkJobArg.azPropName));
+ } else {
+ String defaultValue = sparkJobArg.defaultValue;
+ if (defaultValue.length() == 0) {
+ // do nothing
+ } else {
+ argList.add(sparkJobArg.sparkParamName);
+ argList.add(sparkJobArg.defaultValue);
+ }
+ }
+ }
+
+ @Override
+ protected List getClassPaths() {
+
+ List classPath = super.getClassPaths();
+
+ classPath.add(getSourcePathFromClass(Props.class));
+ classPath.add(getSourcePathFromClass(HadoopSecureHiveWrapper.class));
+ classPath.add(getSourcePathFromClass(HadoopSecurityManager.class));
+
+ classPath.add(HadoopConfigurationInjector.getPath(getJobProps(), getWorkingDirectory()));
+ List typeClassPath = getSysProps().getStringList("jobtype.classpath", null, ",");
+ if (typeClassPath != null) {
+ // fill in this when load this jobtype
+ String pluginDir = getSysProps().get("plugin.dir");
+ for (String jar : typeClassPath) {
+ File jarFile = new File(jar);
+ if (!jarFile.isAbsolute()) {
+ jarFile = new File(pluginDir + File.separatorChar + jar);
+ }
+
+ if (!classPath.contains(jarFile.getAbsoluteFile())) {
+ classPath.add(jarFile.getAbsolutePath());
+ }
+ }
+ }
+
+ List typeGlobalClassPath = getSysProps().getStringList("jobtype.global.classpath",
+ null, ",");
+ if (typeGlobalClassPath != null) {
+ for (String jar : typeGlobalClassPath) {
+ if (!classPath.contains(jar)) {
+ classPath.add(jar);
+ }
+ }
+ }
+
+ return classPath;
+ }
+
+ private static String getSourcePathFromClass(Class> containedClass) {
+ File file = new File(containedClass.getProtectionDomain().getCodeSource().getLocation()
+ .getPath());
+
+ if (!file.isDirectory() && file.getName().endsWith(".class")) {
+ String name = containedClass.getName();
+ StringTokenizer tokenizer = new StringTokenizer(name, ".");
+ while (tokenizer.hasMoreTokens()) {
+ tokenizer.nextElement();
+ file = file.getParentFile();
+ }
+
+ return file.getPath();
+ } else {
+ return containedClass.getProtectionDomain().getCodeSource().getLocation().getPath();
+ }
+ }
+
+ /**
+ * This cancel method, in addition to the default canceling behavior, also kills the Spark job on
+ * Hadoop
+ */
+ @Override
+ public void cancel() throws InterruptedException {
+ super.cancel();
+
+ info("Cancel called. Killing the Spark job on the cluster");
+
+ String azExecId = jobProps.getString("azkaban.flow.execid");
+ String logFilePath = String.format("%s/_job.%s.%s.log", getWorkingDirectory(), azExecId,
+ getId());
+ info("log file path is: " + logFilePath);
+
+ try {
+ HadoopJobUtils.killAllSpawnedHadoopJobs(logFilePath, getLog());
+ } catch (Throwable t) {
+ warn("something happened while trying to kill all spawned jobs", t);
+ }
+ }
+}
diff --git a/plugins/jobtype/src/azkaban/jobtype/JavaJob.java b/plugins/jobtype/src/azkaban/jobtype/JavaJob.java
index dd077d4d..c6e58a75 100644
--- a/plugins/jobtype/src/azkaban/jobtype/JavaJob.java
+++ b/plugins/jobtype/src/azkaban/jobtype/JavaJob.java
@@ -22,6 +22,7 @@
import org.apache.log4j.Logger;
+import azkaban.flow.CommonJobProperties;
import azkaban.jobExecutor.JavaProcessJob;
import azkaban.security.commons.SecurityUtils;
import azkaban.utils.Props;
@@ -47,7 +48,7 @@ public class JavaJob extends JavaProcessJob {
public JavaJob(String jobid, Props sysProps, Props jobProps, Logger log) {
super(jobid, sysProps, new Props(sysProps, jobProps), log);
- getJobProps().put("azkaban.job.id", jobid);
+ getJobProps().put(CommonJobProperties.JOB_ID, jobid);
}
@Override
diff --git a/plugins/jobtype/src/azkaban/jobtype/SparkJobArg.java b/plugins/jobtype/src/azkaban/jobtype/SparkJobArg.java
new file mode 100644
index 00000000..b9d29947
--- /dev/null
+++ b/plugins/jobtype/src/azkaban/jobtype/SparkJobArg.java
@@ -0,0 +1,64 @@
+package azkaban.jobtype;
+
+public enum SparkJobArg {
+
+ // standard spark submit arguments, ordered in the spark-submit --help order
+ MASTER("master", "yarn-cluster", false), // just to trick the eclipse formatter
+ DEPLOY_MODE("deploy-mode", false), //
+ CLASS("class", false), //
+ NAME("name", false), //
+ SPARK_JARS("jars", "./lib/*",true), //
+ PACKAGES("packages", false), //
+ REPOSITORIES("repositories", false), //
+ PY_FILES("py-files", false), //
+ FILES("files", false), //
+ SPARK_CONF_PREFIX("conf.", "--conf", "",true), //
+ PROPERTIES_FILE("properties-file", false), //
+ DRIVER_MEMORY("driver-memory", "512M", false), //
+ DRIVER_JAVA_OPTIONS("driver-java-options", true), //
+ DRIVER_LIBRARY_PATH("driver-library-path", false), //
+ DRIVER_CLASS_PATH("driver-class-path", false), //
+ EXECUTOR_MEMORY("executor-memory", "1g", false), //
+ PROXY_USER("proxy-user", false), //
+ SPARK_FLAG_PREFIX("flag.", "--", "",true), // --help, --verbose, --supervise, --version
+
+ // Yarn only Arguments
+ EXECUTOR_CORES("executor-cores", "1", false), //
+ DRIVER_CORES("driver-cores", "1", false), //
+ QUEUE("queue", "marathon", false), //
+ NUM_EXECUTORS("num-executors", "2", false), //
+ ARCHIVES("archives", false), //
+ PRINCIPAL("principal", false), //
+ KEYTAB("keytab", false), //
+
+ // Not SparkSubmit arguments: only exists in azkaban
+ EXECUTION_JAR("execution-jar", null, null,true), //
+ PARAMS("params", null, null,true), //
+ ;
+
+ public static final String delimiter = "\u001A";
+
+ SparkJobArg(String propName, boolean specialTreatment) {
+ this(propName, "--" + propName, "",specialTreatment);
+ }
+
+ SparkJobArg(String propName, String defaultValue, boolean specialTreatment) {
+ this(propName, "--" + propName, defaultValue,specialTreatment);
+ }
+
+ SparkJobArg(String azPropName, String sparkParamName, String defaultValue, boolean specialTreatment) {
+ this.azPropName = azPropName;
+ this.sparkParamName = sparkParamName;
+ this.defaultValue = defaultValue;
+ this.needSpecialTreatment = specialTreatment;
+ }
+
+ final String azPropName;
+
+ final String sparkParamName;
+
+ final String defaultValue;
+
+ final boolean needSpecialTreatment;
+
+}
\ No newline at end of file
diff --git a/plugins/jobtype/test/azkaban/jobtype/TestHadoopJobUtilsExecutionJar.java b/plugins/jobtype/test/azkaban/jobtype/TestHadoopJobUtilsExecutionJar.java
new file mode 100644
index 00000000..451df99b
--- /dev/null
+++ b/plugins/jobtype/test/azkaban/jobtype/TestHadoopJobUtilsExecutionJar.java
@@ -0,0 +1,89 @@
+package azkaban.jobtype;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import azkaban.utils.Props;
+
+public class TestHadoopJobUtilsExecutionJar {
+ Props jobProps = null;
+
+ Logger logger = Logger.getRootLogger();
+
+ String workingDirString = "/tmp/TestHadoopSpark";
+
+ File workingDirFile = new File(workingDirString);
+
+ File libFolderFile = new File(workingDirFile, "lib");
+
+ String executionJarName = "hadoop-spark-job-test-execution-x.y.z-a.b.c.jar";
+
+ File executionJarFile = new File(libFolderFile, "hadoop-spark-job-test-execution-x.y.z-a.b.c.jar");
+
+ File libraryJarFile = new File(libFolderFile, "library.jar");
+
+ String delim = SparkJobArg.delimiter;
+
+ @Before
+ public void beforeMethod() throws IOException {
+ if (workingDirFile.exists())
+ FileUtils.deleteDirectory(workingDirFile);
+ workingDirFile.mkdirs();
+ libFolderFile.mkdirs();
+ executionJarFile.createNewFile();
+ libraryJarFile.createNewFile();
+
+ }
+
+ // nothing should happen
+ @Test
+ public void testNoLibFolder() throws IOException {
+ FileUtils.deleteDirectory(libFolderFile);
+ String retval = HadoopJobUtils.resolveWildCardForJarSpec(workingDirString, "./lib/*", logger);
+
+ Assert.assertEquals(retval, "");
+ }
+
+ // nothing should happen
+ @Test
+ public void testLibFolderHasNothingInIt() throws IOException {
+ FileUtils.deleteDirectory(libFolderFile);
+ libFolderFile.mkdirs();
+ String retval = HadoopJobUtils.resolveWildCardForJarSpec(workingDirString, "./lib/*", logger);
+
+ Assert.assertEquals(retval, "");
+ }
+
+
+ @Test
+ public void testOneLibFolderExpansion() throws IOException {
+
+ String retval = HadoopJobUtils.resolveWildCardForJarSpec(workingDirString, "./lib/*", logger);
+
+ Assert.assertEquals(
+ retval,
+ "/tmp/TestHadoopSpark/./lib/library.jar,/tmp/TestHadoopSpark/./lib/hadoop-spark-job-test-execution-x.y.z-a.b.c.jar");
+ }
+
+ @Test
+ public void testTwoLibFolderExpansion() throws IOException {
+ File lib2FolderFile = new File(workingDirFile, "lib2");
+ lib2FolderFile.mkdirs();
+ File lib2test1Jar = new File(lib2FolderFile, "test1.jar");
+ lib2test1Jar.createNewFile();
+ File lib2test2Jar = new File(lib2FolderFile, "test2.jar");
+ lib2test2Jar.createNewFile();
+ String retval = HadoopJobUtils.resolveWildCardForJarSpec(workingDirString, "./lib/*,./lib2/*",
+ logger);
+
+ Assert.assertEquals(
+ retval,
+ "/tmp/TestHadoopSpark/./lib/library.jar,/tmp/TestHadoopSpark/./lib/hadoop-spark-job-test-execution-x.y.z-a.b.c.jar,/tmp/TestHadoopSpark/./lib2/test1.jar,/tmp/TestHadoopSpark/./lib2/test2.jar");
+ }
+}
\ No newline at end of file
diff --git a/plugins/jobtype/test/azkaban/jobtype/TestHadoopJobUtilsFindApplicationIdFromLog.java b/plugins/jobtype/test/azkaban/jobtype/TestHadoopJobUtilsFindApplicationIdFromLog.java
new file mode 100644
index 00000000..ed0f8e8a
--- /dev/null
+++ b/plugins/jobtype/test/azkaban/jobtype/TestHadoopJobUtilsFindApplicationIdFromLog.java
@@ -0,0 +1,120 @@
+package azkaban.jobtype;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.log4j.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestHadoopJobUtilsFindApplicationIdFromLog {
+
+ File tempFile = null;
+
+ BufferedWriter bw = null;
+
+ Logger logger = Logger.getRootLogger();
+
+ @Before
+ public void beforeMethod() throws IOException {
+ tempFile = File.createTempFile("test_hadoop_job_utils_find_application_id_from_log", null);
+ bw = new BufferedWriter(new FileWriter(tempFile));
+
+ }
+
+ @Test
+ public void testNoApplicationId() throws IOException {
+ bw.write("28-08-2015 14:05:24 PDT spark INFO - 15/08/28 21:05:24 INFO client.RMProxy: Connecting to ResourceManager at eat1-nertzrm02.grid.linkedin.com/172.20.158.95:8032\n");
+ bw.write("28-08-2015 14:05:24 PDT spark INFO - 15/08/28 21:05:24 INFO yarn.Client: Requesting a new application from cluster with 134 NodeManagers\n");
+ bw.write("28-08-2015 14:05:24 PDT spark INFO - 15/08/28 21:05:24 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (55296 MB per container)\n");
+ bw.write("28-08-2015 14:05:24 PDT spark INFO - 15/08/28 21:05:24 INFO yarn.Client: Will allocate AM container, with 4505 MB memory including 409 MB overhead\n");
+ bw.write("28-08-2015 14:05:24 PDT spark INFO - 15/08/28 21:05:24 INFO yarn.Client: Setting up container launch context for our AM\n");
+ bw.write("28-08-2015 14:05:24 PDT spark INFO - 15/08/28 21:05:24 INFO yarn.Client: Preparing resources for our AM container\n");
+ bw.close();
+
+ Set appId = HadoopJobUtils.findApplicationIdFromLog(tempFile.toString(), logger);
+
+ Assert.assertEquals(0, appId.size());
+
+ }
+
+ @Test
+ public void testOneApplicationId() throws IOException {
+ bw.write("28-08-2015 14:05:32 PDT spark INFO - 15/08/28 21:05:32 INFO spark.SecurityManager: SecurityManager: authentication enabled; ui acls enabled; users with view permissions: Set(*); users with modify permissions: Set(azkaban, jyu)\n");
+ bw.write("28-08-2015 14:05:32 PDT spark INFO - 15/08/28 21:05:32 INFO yarn.Client: Submitting application 3099 to ResourceManager\n");
+ bw.write("28-08-2015 14:05:33 PDT spark INFO - 15/08/28 21:05:33 INFO impl.YarnClientImpl: Submitted application application_1440264346270_3099\n");
+ bw.close();
+
+ Set appId = HadoopJobUtils.findApplicationIdFromLog(tempFile.toString(), logger);
+
+ Assert.assertEquals(1, appId.size());
+ Assert.assertTrue(appId.contains("application_1440264346270_3099"));
+ }
+
+ @Test
+ public void testMultipleSameApplicationIdWhenSparkStarts() throws IOException {
+ bw.write("28-08-2015 14:05:34 PDT spark INFO - 15/08/28 21:05:34 INFO yarn.Client: Application report for application_1440264346270_3099 (state: ACCEPTED)\n");
+ bw.write("28-08-2015 14:05:34 PDT spark INFO - 15/08/28 21:05:34 INFO yarn.Client: \n");
+ bw.write("28-08-2015 14:05:34 PDT spark INFO - client token: Token { kind: YARN_CLIENT_TOKEN, service: }\n");
+ bw.write("28-08-2015 14:05:34 PDT spark INFO - diagnostics: N/A\n");
+ bw.write("28-08-2015 14:05:34 PDT spark INFO - ApplicationMaster host: N/A\n");
+ bw.write("28-08-2015 14:05:34 PDT spark INFO - ApplicationMaster RPC port: -1\n");
+ bw.write("28-08-2015 14:05:34 PDT spark INFO - queue: default\n");
+ bw.write("28-08-2015 14:05:34 PDT spark INFO - start time: 1440795932813\n");
+ bw.write("28-08-2015 14:05:34 PDT spark INFO - final status: UNDEFINED\n");
+ bw.write("28-08-2015 14:05:34 PDT spark INFO - tracking URL: http://eat1-nertzwp02.grid.linkedin.com:8080/proxy/application_1440264346270_3099/\n");
+ bw.write("28-08-2015 14:05:34 PDT spark INFO - user: jyu\n");
+ bw.write("28-08-2015 14:05:35 PDT spark INFO - 15/08/28 21:05:35 INFO yarn.Client: Application report for application_1440264346270_3099 (state: ACCEPTED)\n");
+ bw.close();
+
+ Set appId = HadoopJobUtils.findApplicationIdFromLog(tempFile.toString(), logger);
+
+ Assert.assertEquals(1, appId.size());
+ Assert.assertTrue(appId.contains("application_1440264346270_3099"));
+ }
+
+ @Test
+ public void testMultipleSameApplicationIdForSparkAfterRunningFor17Hours() throws IOException {
+ bw.write("28-08-2015 14:11:50 PDT spark INFO - 15/08/28 21:11:50 INFO yarn.Client: Application report for application_1440264346270_3099 (state: RUNNING)\n");
+ bw.write("28-08-2015 14:11:51 PDT spark INFO - 15/08/28 21:11:51 INFO yarn.Client: Application report for application_1440264346270_3099 (state: RUNNING)\n");
+ bw.write("28-08-2015 14:11:52 PDT spark INFO - 15/08/28 21:11:52 INFO yarn.Client: Application report for application_1440264346270_3099 (state: RUNNING)\n");
+ bw.write("28-08-2015 14:11:53 PDT spark INFO - 15/08/28 21:11:53 INFO yarn.Client: Application report for application_1440264346270_3099 (state: RUNNING)\n");
+ bw.write("28-08-2015 14:11:54 PDT spark INFO - 15/08/28 21:11:54 INFO yarn.Client: Application report for application_1440264346270_3099 (state: RUNNING)\n");
+ bw.close();
+
+ Set appId = HadoopJobUtils.findApplicationIdFromLog(tempFile.toString(), logger);
+
+ Assert.assertEquals(1, appId.size());
+ Assert.assertTrue(appId.contains("application_1440264346270_3099"));
+ }
+
+ @Test
+ public void testLogWithMultipleApplicationIdsAppearingMultipleTimes() throws IOException {
+ bw.write("28-08-2015 12:29:38 PDT Training_clickSelectFeatures INFO - INFO Submitted application application_1440264346270_3044\n");
+ bw.write("28-08-2015 12:29:38 PDT Training_clickSelectFeatures INFO - INFO The url to track the job: http://eat1-nertzwp02.grid.linkedin.com:8080/proxy/application_1440264346270_3044/\n");
+ bw.write("28-08-2015 12:29:38 PDT Training_clickSelectFeatures INFO - INFO See http://eat1-nertzwp02.grid.linkedin.com:8080/proxy/application_1440264346270_3044/ for details.\n");
+ bw.write("28-08-2015 12:29:38 PDT Training_clickSelectFeatures INFO - INFO Running job: job_1440264346270_3044\n");
+ bw.write("28-08-2015 12:30:21 PDT Training_clickSelectFeatures INFO - INFO Closing idle connection Socket[addr=eat1-hcl5481.grid.linkedin.com/172.20.138.228,port=42492,localport=42382] to server eat1-hcl5481.grid.linkedin.com/172.20.138.228:42492\n");
+ bw.write("28-08-2015 12:30:37 PDT Training_clickSelectFeatures INFO - INFO Closing idle connection Socket[addr=eat1-nertznn01.grid.linkedin.com/172.20.158.57,port=9000,localport=30453] to server eat1-nertznn01.grid.linkedin.com/172.20.158.57:9000\n");
+ bw.write("28-08-2015 12:31:09 PDT Training_clickSelectFeatures INFO - INFO Job job_1440264346270_3044 running in uber mode : false\n");
+ bw.write("28-08-2015 12:29:38 PDT Training_clickSelectFeatures INFO - INFO Submitted application application_1440264346270_3088\n");
+ bw.write("28-08-2015 12:29:38 PDT Training_clickSelectFeatures INFO - INFO The url to track the job: http://eat1-nertzwp02.grid.linkedin.com:8080/proxy/application_1440264346270_3088/\n");
+ bw.write("28-08-2015 12:29:38 PDT Training_clickSelectFeatures INFO - INFO See http://eat1-nertzwp02.grid.linkedin.com:8080/proxy/application_1440264346270_3088/ for details.\n");
+ bw.write("28-08-2015 12:29:38 PDT Training_clickSelectFeatures INFO - INFO Running job: job_1440264346270_3088\n");
+ bw.write("28-08-2015 12:30:21 PDT Training_clickSelectFeatures INFO - INFO Closing idle connection Socket[addr=eat1-hcl5481.grid.linkedin.com/172.20.138.228,port=42492,localport=42382] to server eat1-hcl5481.grid.linkedin.com/172.20.138.228:42492\n");
+ bw.write("28-08-2015 12:30:37 PDT Training_clickSelectFeatures INFO - INFO Closing idle connection Socket[addr=eat1-nertznn01.grid.linkedin.com/172.20.158.57,port=9000,localport=30453] to server eat1-nertznn01.grid.linkedin.com/172.20.158.57:9000\n");
+ bw.write("28-08-2015 12:31:09 PDT Training_clickSelectFeatures INFO - INFO Job job_1440264346270_3088 running in uber mode : false\n");
+ bw.close();
+
+ Set appId = HadoopJobUtils.findApplicationIdFromLog(tempFile.toString(), logger);
+
+ Assert.assertEquals(2, appId.size());
+ Assert.assertTrue(appId.contains("application_1440264346270_3044"));
+ Assert.assertTrue(appId.contains("application_1440264346270_3088"));
+ }
+
+}
\ No newline at end of file
diff --git a/plugins/jobtype/test/azkaban/jobtype/TestHadoopJobUtilsResolveJarSpec.java b/plugins/jobtype/test/azkaban/jobtype/TestHadoopJobUtilsResolveJarSpec.java
new file mode 100644
index 00000000..cfb29e23
--- /dev/null
+++ b/plugins/jobtype/test/azkaban/jobtype/TestHadoopJobUtilsResolveJarSpec.java
@@ -0,0 +1,73 @@
+package azkaban.jobtype;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import azkaban.utils.Props;
+
+public class TestHadoopJobUtilsResolveJarSpec {
+ Props jobProps = null;
+
+ Logger logger = Logger.getRootLogger();
+
+ String workingDirString = "/tmp/TestHadoopSpark";
+
+ File workingDirFile = new File(workingDirString);
+
+ File libFolderFile = new File(workingDirFile, "lib");
+
+ String executionJarName = "hadoop-spark-job-test-execution-x.y.z-a.b.c.jar";
+
+ File executionJarFile = new File(libFolderFile, "hadoop-spark-job-test-execution-x.y.z-a.b.c.jar");
+
+ File libraryJarFile = new File(libFolderFile, "library.jar");
+
+ String delim = SparkJobArg.delimiter;
+
+ @Before
+ public void beforeMethod() throws IOException {
+ if (workingDirFile.exists())
+ FileUtils.deleteDirectory(workingDirFile);
+ workingDirFile.mkdirs();
+ libFolderFile.mkdirs();
+ executionJarFile.createNewFile();
+ libraryJarFile.createNewFile();
+
+ }
+
+ // nothing should happen
+ @Test(expected = IllegalStateException.class)
+ public void testJarDoesNotExist() throws IOException {
+ HadoopJobUtils.resolveExecutionJarName(workingDirString, "./lib/abc.jar", logger);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testNoLibFolder() throws IOException {
+ FileUtils.deleteDirectory(libFolderFile);
+ HadoopJobUtils.resolveExecutionJarName(workingDirString, "./lib/abc.jar", logger);
+ }
+
+ @Test
+ public void testSpecificationXXXjar() throws IOException {
+ String retval = HadoopJobUtils.resolveExecutionJarName(workingDirString,
+ "./lib/hadoop-spark.jar", logger);
+
+ Assert.assertEquals(retval,
+ "/tmp/TestHadoopSpark/./lib/hadoop-spark-job-test-execution-x.y.z-a.b.c.jar");
+ }
+
+ @Test
+ public void testSpecificationXXXprefix() throws IOException {
+ String retval = HadoopJobUtils.resolveExecutionJarName(workingDirString, "./lib/hadoop-spark",
+ logger);
+
+ Assert.assertEquals(retval,
+ "/tmp/TestHadoopSpark/./lib/hadoop-spark-job-test-execution-x.y.z-a.b.c.jar");
+ }
+}
\ No newline at end of file
diff --git a/plugins/jobtype/test/azkaban/jobtype/TestHadoopSparkJobGetMainArguments.java b/plugins/jobtype/test/azkaban/jobtype/TestHadoopSparkJobGetMainArguments.java
new file mode 100644
index 00000000..554e7a58
--- /dev/null
+++ b/plugins/jobtype/test/azkaban/jobtype/TestHadoopSparkJobGetMainArguments.java
@@ -0,0 +1,462 @@
+package azkaban.jobtype;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import azkaban.utils.Props;
+
+public class TestHadoopSparkJobGetMainArguments {
+ Props jobProps = null;
+
+ Logger logger = Logger.getRootLogger();
+
+ String workingDirString = "/tmp/TestHadoopSpark";
+
+ File workingDirFile = new File(workingDirString);
+
+ File libFolderFile = new File(workingDirFile, "lib");
+
+ String executionJarName = "hadoop-spark-job-test-execution-x.y.z-a.b.c.jar";
+
+ File executionJarFile = new File(libFolderFile, "hadoop-spark-job-test-execution-x.y.z-a.b.c.jar");
+
+ File libraryJarFile = new File(libFolderFile, "library.jar");
+
+ String delim = SparkJobArg.delimiter;
+
+ @Before
+ public void beforeMethod() throws IOException {
+ if (workingDirFile.exists())
+ FileUtils.deleteDirectory(workingDirFile);
+ workingDirFile.mkdirs();
+ libFolderFile.mkdirs();
+ executionJarFile.createNewFile();
+ libraryJarFile.createNewFile();
+
+ jobProps = new Props();
+ jobProps.put("azkaban.link.workflow.url", "http://azkaban.link.workflow.url");
+ jobProps.put("azkaban.link.job.url", "http://azkaban.link.job.url");
+ jobProps.put("azkaban.link.execution.url", "http://azkaban.link.execution.url");
+ jobProps.put("azkaban.link.jobexec.url", "http://azkaban.link.jobexec.url");
+ jobProps.put("azkaban.link.attempt.url", "http://azkaban.link.attempt.url");
+ jobProps.put(SparkJobArg.CLASS.azPropName, "hadoop.spark.job.test.ExecutionClass");
+ jobProps.put(SparkJobArg.EXECUTION_JAR.azPropName, "./lib/hadoop-spark-job-test-execution.jar");
+
+ }
+
+ @Test
+ public void testDefault() {
+ String retval = HadoopSparkJob.testableGetMainArguments(jobProps, workingDirString, logger);
+
+ // the first one, so no delimiter at front
+ Assert.assertTrue(retval.contains(SparkJobArg.DRIVER_JAVA_OPTIONS.sparkParamName + delim));
+ Assert.assertTrue(retval
+ .contains(delim
+ + "-Dazkaban.link.workflow.url=http://azkaban.link.workflow.url -Dazkaban.link.job.url=http://azkaban.link.job.url -Dazkaban.link.execution.url=http://azkaban.link.execution.url -Dazkaban.link.attempt.url=http://azkaban.link.attempt.url"
+ + delim));
+ Assert.assertTrue(retval.contains(delim + SparkJobArg.MASTER.sparkParamName + delim
+ + "yarn-cluster" + delim));
+ Assert.assertTrue(retval
+ .contains(delim
+ + SparkJobArg.SPARK_JARS.sparkParamName
+ + delim
+ + "/tmp/TestHadoopSpark/./lib/library.jar,/tmp/TestHadoopSpark/./lib/hadoop-spark-job-test-execution-x.y.z-a.b.c.jar"
+ + delim));
+ Assert.assertTrue(retval.contains(delim + SparkJobArg.CLASS.sparkParamName + delim
+ + "hadoop.spark.job.test.ExecutionClass" + delim));
+ Assert.assertTrue(retval.contains(delim + SparkJobArg.NUM_EXECUTORS.sparkParamName + delim
+ + "2" + delim));
+ Assert.assertTrue(retval.contains(delim + SparkJobArg.EXECUTOR_CORES.sparkParamName + delim
+ + "1" + delim));
+ Assert.assertTrue(retval.contains(delim + SparkJobArg.QUEUE.sparkParamName + delim + "marathon"
+ + delim));
+ Assert.assertTrue(retval.contains(delim + SparkJobArg.DRIVER_MEMORY.sparkParamName + delim
+ + "512M" + delim));
+ Assert.assertTrue(retval.contains(delim + SparkJobArg.EXECUTOR_MEMORY.sparkParamName + delim
+ + "1g" + delim));
+ // last one, no delimiter at back
+ Assert.assertTrue(retval.contains(delim
+ + "/tmp/TestHadoopSpark/./lib/hadoop-spark-job-test-execution-x.y.z-a.b.c.jar"));
+
+ // test flag values such as verbose do not come in by default
+ Assert.assertFalse(retval.contains("--verbose"));
+ Assert.assertFalse(retval.contains("--help"));
+ Assert.assertFalse(retval.contains("--version"));
+ }
+
+ @Test
+ public void testDefaultWithExecutionJarSpecification2() {
+ jobProps.put(SparkJobArg.EXECUTION_JAR.azPropName, "./lib/hadoop-spark-job-test-execution");
+
+ String retval = HadoopSparkJob.testableGetMainArguments(jobProps, workingDirString, logger);
+
+ Assert.assertTrue(retval.contains(delim
+ + "/tmp/TestHadoopSpark/./lib/hadoop-spark-job-test-execution-x.y.z-a.b.c.jar"));
+ }
+
+ @Test
+ public void testNoClass() {
+ jobProps.removeLocal(SparkJobArg.CLASS.azPropName);
+
+ String retval = HadoopSparkJob.testableGetMainArguments(jobProps, workingDirString, logger);
+
+ Assert.assertFalse(retval.contains(delim + SparkJobArg.CLASS.sparkParamName + delim));
+ }
+
+ @Test
+ public void testChangeMaster() {
+ jobProps.put(SparkJobArg.MASTER.azPropName, "NEW_SPARK_MASTER");
+
+ String retval = HadoopSparkJob.testableGetMainArguments(jobProps, workingDirString, logger);
+
+ Assert.assertTrue(retval.contains(delim + SparkJobArg.MASTER.sparkParamName + delim
+ + "NEW_SPARK_MASTER" + delim));
+ Assert.assertFalse(retval.contains(delim + SparkJobArg.MASTER.sparkParamName + delim
+ + "yarn-cluster" + delim));
+
+ }
+
+ @Test
+ public void testDeployMode() {
+ jobProps.put(SparkJobArg.DEPLOY_MODE.azPropName, "NEW_DEPLOY_MODE");
+
+ String retval = HadoopSparkJob.testableGetMainArguments(jobProps, workingDirString, logger);
+
+ Assert.assertTrue(retval.contains(delim + SparkJobArg.DEPLOY_MODE.sparkParamName + delim
+ + "NEW_DEPLOY_MODE" + delim));
+ }
+
+ @Test
+ public void testExecutionClass() throws IOException {
+
+ jobProps.put(SparkJobArg.CLASS.azPropName, "new.ExecutionClass");
+
+ String retval = HadoopSparkJob.testableGetMainArguments(jobProps, workingDirString, logger);
+
+ Assert.assertTrue(retval.contains(delim + SparkJobArg.CLASS.sparkParamName + delim
+ + "new.ExecutionClass" + delim));
+ Assert.assertFalse(retval.contains(delim + SparkJobArg.CLASS.sparkParamName + delim
+ + "hadoop.spark.job.test.ExecutionClass" + delim));
+
+ }
+
+ @Test
+ public void testName() throws IOException {
+
+ jobProps.put(SparkJobArg.NAME.azPropName, "NEW_NAME");
+
+ String retval = HadoopSparkJob.testableGetMainArguments(jobProps, workingDirString, logger);
+
+ Assert.assertTrue(retval.contains(delim + SparkJobArg.NAME.sparkParamName + delim + "NEW_NAME"
+ + delim));
+ }
+
+ @Test
+ public void testChangeSparkJar() throws IOException {
+ String topLevelJarString = "topLevelJar.jar";
+ File toplevelJarFile = new File(workingDirFile, topLevelJarString);
+ toplevelJarFile.createNewFile();
+ jobProps.put(SparkJobArg.SPARK_JARS.azPropName, "./*");
+
+ String retval = HadoopSparkJob.testableGetMainArguments(jobProps, workingDirString, logger);
+
+ Assert.assertTrue(retval.contains(delim + SparkJobArg.SPARK_JARS.sparkParamName + delim
+ + "/tmp/TestHadoopSpark/./" + topLevelJarString + delim));
+ Assert.assertFalse(retval
+ .contains(delim
+ + SparkJobArg.SPARK_JARS.sparkParamName
+ + delim
+ + "/tmp/TestHadoopSpark/./lib/library.jar,/tmp/TestHadoopSpark/./lib/hadoop-spark-job-test-execution-x.y.z-a.b.c.jar"
+ + delim));
+
+ }
+
+ @Test
+ public void testPackages() throws IOException {
+
+ jobProps.put(SparkJobArg.PACKAGES.azPropName, "a:b:c,d:e:f");
+
+ String retval = HadoopSparkJob.testableGetMainArguments(jobProps, workingDirString, logger);
+
+ Assert.assertTrue(retval.contains(delim + SparkJobArg.PACKAGES.sparkParamName + delim
+ + "a:b:c,d:e:f" + delim));
+ }
+
+ @Test
+ public void testRepositories() throws IOException {
+
+ jobProps.put(SparkJobArg.REPOSITORIES.azPropName, "repo1,repo2");
+
+ String retval = HadoopSparkJob.testableGetMainArguments(jobProps, workingDirString, logger);
+
+ Assert.assertTrue(retval.contains(delim + SparkJobArg.REPOSITORIES.sparkParamName + delim
+ + "repo1,repo2" + delim));
+ }
+
+ @Test
+ public void testPyFiles() {
+ jobProps.put(SparkJobArg.PY_FILES.azPropName, "file1.py,file2.egg,file3.zip");
+
+ String retval = HadoopSparkJob.testableGetMainArguments(jobProps, workingDirString, logger);
+
+ Assert.assertTrue(retval.contains(delim + SparkJobArg.PY_FILES.sparkParamName + delim
+ + "file1.py,file2.egg,file3.zip" + delim));
+ }
+
+ @Test
+ public void testFiles() {
+ jobProps.put(SparkJobArg.FILES.azPropName, "file1.py,file2.egg,file3.zip");
+
+ String retval = HadoopSparkJob.testableGetMainArguments(jobProps, workingDirString, logger);
+
+ Assert.assertTrue(retval.contains(delim + SparkJobArg.FILES.sparkParamName + delim
+ + "file1.py,file2.egg,file3.zip" + delim));
+ }
+
+ @Test
+ public void testSparkConf() throws IOException {
+
+ jobProps.put(SparkJobArg.SPARK_CONF_PREFIX.azPropName + "conf1", "confValue1");
+ jobProps.put(SparkJobArg.SPARK_CONF_PREFIX.azPropName + "conf2", "confValue2");
+ jobProps.put(SparkJobArg.SPARK_CONF_PREFIX.azPropName + "conf3", "confValue3");
+
+ String retval = HadoopSparkJob.testableGetMainArguments(jobProps, workingDirString, logger);
+
+ for (int i = 1; i <= 3; i++) {
+ String confAnswer = String.format(delim + "%s" + delim + "%s%d=%s%d" + delim,
+ SparkJobArg.SPARK_CONF_PREFIX.sparkParamName, "conf", i, "confValue", i);
+ System.out.println("looking for: " + confAnswer);
+ Assert.assertTrue(retval.contains(confAnswer));
+ }
+
+ }
+
+ @Test
+ public void testPropertiesFile() {
+ jobProps.put(SparkJobArg.PROPERTIES_FILE.azPropName, "NEW_PROPERTIES_FILE");
+
+ String retval = HadoopSparkJob.testableGetMainArguments(jobProps, workingDirString, logger);
+
+ Assert.assertTrue(retval.contains(delim + SparkJobArg.PROPERTIES_FILE.sparkParamName + delim
+ + "NEW_PROPERTIES_FILE" + delim));
+ }
+
+ @Test
+ public void testDriverMemory() throws IOException {
+
+ jobProps.put(SparkJobArg.DRIVER_MEMORY.azPropName, "1t");
+
+ String retval = HadoopSparkJob.testableGetMainArguments(jobProps, workingDirString, logger);
+
+ Assert.assertTrue(retval.contains(delim + SparkJobArg.DRIVER_MEMORY.sparkParamName + delim
+ + "1t" + delim));
+ Assert.assertFalse(retval.contains(delim + SparkJobArg.DRIVER_MEMORY.sparkParamName + delim
+ + "2g" + delim));
+
+ }
+
+ /*
+ * Note that for this test, there are already default stuff in --driver-java-options. So we have
+ * to test to make sure the user specified ones are properly included/appended
+ */
+ @Test
+ public void testDriverJavaOptions() {
+ jobProps.put(SparkJobArg.DRIVER_JAVA_OPTIONS.azPropName, "-Dabc=def -Dfgh=ijk");
+
+ String retval = HadoopSparkJob.testableGetMainArguments(jobProps, workingDirString, logger);
+
+ // only on the ending side has the delimiter
+ Assert.assertTrue(retval
+ .contains(" -Dazkaban.link.attempt.url=http://azkaban.link.attempt.url -Dabc=def -Dfgh=ijk"
+ + delim));
+ }
+
+ @Test
+ public void testDriverLibraryPath() {
+ String libraryPathSpec = "/this/is/library/path:/this/is/library/path/too";
+ jobProps.put(SparkJobArg.DRIVER_LIBRARY_PATH.azPropName, libraryPathSpec);
+
+ String retval = HadoopSparkJob.testableGetMainArguments(jobProps, workingDirString, logger);
+
+ Assert.assertTrue(retval.contains(delim + SparkJobArg.DRIVER_LIBRARY_PATH.sparkParamName
+ + delim + libraryPathSpec + delim));
+ }
+
+ @Test
+ public void testDriverClassPath() {
+ String classPathSpec = "/this/is/class/path:/this/is/class/path/too";
+ jobProps.put(SparkJobArg.DRIVER_CLASS_PATH.azPropName, classPathSpec);
+
+ String retval = HadoopSparkJob.testableGetMainArguments(jobProps, workingDirString, logger);
+
+ Assert.assertTrue(retval.contains(delim + SparkJobArg.DRIVER_CLASS_PATH.sparkParamName + delim
+ + classPathSpec + delim));
+ }
+
+ @Test
+ public void testExecutorMemory() throws IOException {
+
+ jobProps.put(SparkJobArg.EXECUTOR_MEMORY.azPropName, "1t");
+
+ String retval = HadoopSparkJob.testableGetMainArguments(jobProps, workingDirString, logger);
+
+ Assert.assertTrue(retval.contains(delim + SparkJobArg.EXECUTOR_MEMORY.sparkParamName + delim
+ + "1t" + delim));
+ Assert.assertFalse(retval.contains(delim + SparkJobArg.EXECUTOR_MEMORY.sparkParamName + delim
+ + "1g" + delim));
+ }
+
+ @Test
+ public void testProxyUser() throws IOException {
+
+ jobProps.put(SparkJobArg.PROXY_USER.azPropName, "NEW_PROXY_USER");
+
+ String retval = HadoopSparkJob.testableGetMainArguments(jobProps, workingDirString, logger);
+
+ Assert.assertTrue(retval.contains(delim + SparkJobArg.PROXY_USER.sparkParamName + delim
+ + "NEW_PROXY_USER" + delim));
+ }
+
+ @Test
+ public void testSparkFlagOn() {
+ jobProps.put(SparkJobArg.SPARK_FLAG_PREFIX.azPropName + "verbose", "true");
+
+ String retval = HadoopSparkJob.testableGetMainArguments(jobProps, workingDirString, logger);
+
+ Assert.assertTrue(retval.contains("--verbose"));
+ Assert.assertFalse(retval.contains("true"));
+ }
+
+ @Test
+ public void testSparkFlagOffIfValueIsNotTrue() {
+ jobProps.put(SparkJobArg.SPARK_FLAG_PREFIX.azPropName + "verbose", "I am a value, and I do not .equals true");
+
+ String retval = HadoopSparkJob.testableGetMainArguments(jobProps, workingDirString, logger);
+
+ Assert.assertFalse(retval.contains("--verbose"));
+ }
+
+ /*
+ * End of general SparkSubmit argument section, Start of Yarn specific SparkSubmit arguments
+ */
+
+ @Test
+ public void testExecutorCores() throws IOException {
+
+ jobProps.put(SparkJobArg.EXECUTOR_CORES.azPropName, "2000");
+
+ String retval = HadoopSparkJob.testableGetMainArguments(jobProps, workingDirString, logger);
+
+ Assert.assertTrue(retval.contains(delim + SparkJobArg.EXECUTOR_CORES.sparkParamName + delim
+ + "2000" + delim));
+ Assert.assertFalse(retval.contains(delim + SparkJobArg.EXECUTOR_CORES.sparkParamName + delim
+ + "1" + delim));
+
+ }
+
+ @Test
+ public void testDriverCores() throws IOException {
+
+ jobProps.put(SparkJobArg.DRIVER_CORES.azPropName, "2000");
+
+ String retval = HadoopSparkJob.testableGetMainArguments(jobProps, workingDirString, logger);
+
+ Assert.assertTrue(retval.contains(delim + SparkJobArg.DRIVER_CORES.sparkParamName + delim
+ + "2000" + delim));
+ }
+
+ @Test
+ public void testQueue() throws IOException {
+
+ jobProps.put(SparkJobArg.QUEUE.azPropName, "my_own_queue");
+
+ String retval = HadoopSparkJob.testableGetMainArguments(jobProps, workingDirString, logger);
+
+ Assert.assertTrue(retval.contains(delim + SparkJobArg.QUEUE.sparkParamName + delim
+ + "my_own_queue" + delim));
+ Assert.assertFalse(retval.contains(delim + SparkJobArg.QUEUE.sparkParamName + delim
+ + "marathon" + delim));
+
+ }
+
+ @Test
+ public void testNumExecutors() throws IOException {
+
+ jobProps.put(SparkJobArg.NUM_EXECUTORS.azPropName, "1000");
+
+ String retval = HadoopSparkJob.testableGetMainArguments(jobProps, workingDirString, logger);
+
+ Assert.assertTrue(retval.contains(delim + SparkJobArg.NUM_EXECUTORS.sparkParamName + delim
+ + "1000" + delim));
+ Assert.assertFalse(retval.contains(delim + SparkJobArg.NUM_EXECUTORS.sparkParamName + delim
+ + "2" + delim));
+ }
+
+ @Test
+ public void testArchives() throws IOException {
+ String archiveSpec = "archive1,archive2";
+ jobProps.put(SparkJobArg.ARCHIVES.azPropName, archiveSpec);
+
+ String retval = HadoopSparkJob.testableGetMainArguments(jobProps, workingDirString, logger);
+
+ Assert.assertTrue(retval.contains(delim + SparkJobArg.ARCHIVES.sparkParamName + delim
+ + archiveSpec + delim));
+ }
+
+ @Test
+ public void testPrincipal() throws IOException {
+
+ jobProps.put(SparkJobArg.PRINCIPAL.azPropName, "NEW_PRINCIPAL");
+
+ String retval = HadoopSparkJob.testableGetMainArguments(jobProps, workingDirString, logger);
+
+ Assert.assertTrue(retval.contains(delim + SparkJobArg.PRINCIPAL.sparkParamName + delim
+ + "NEW_PRINCIPAL" + delim));
+ }
+
+ @Test
+ public void testKeytab() throws IOException {
+
+ jobProps.put(SparkJobArg.KEYTAB.azPropName, "NEW_KEYTAB");
+
+ String retval = HadoopSparkJob.testableGetMainArguments(jobProps, workingDirString, logger);
+
+ Assert.assertTrue(retval.contains(delim + SparkJobArg.KEYTAB.sparkParamName + delim
+ + "NEW_KEYTAB" + delim));
+ }
+
+ /*
+ * End of general SparkSubmit argument section, Start of Yarn specific SparkSubmit arguments
+ */
+
+ @Test
+ public void testExecutionJar() throws IOException {
+
+ jobProps.put(SparkJobArg.EXECUTION_JAR.azPropName, "./lib/library");
+
+ String retval = HadoopSparkJob.testableGetMainArguments(jobProps, workingDirString, logger);
+
+ Assert.assertTrue(retval.contains(delim + "/tmp/TestHadoopSpark/./lib/library.jar" + delim));
+
+ }
+
+ @Test
+ public void testParams() throws IOException {
+
+ jobProps.put(SparkJobArg.PARAMS.azPropName, "param1 param2 param3 param4");
+
+ String retval = HadoopSparkJob.testableGetMainArguments(jobProps, workingDirString, logger);
+
+ Assert.assertTrue(retval.contains(delim
+ + "/tmp/TestHadoopSpark/./lib/hadoop-spark-job-test-execution-x.y.z-a.b.c.jar" + delim
+ + "param1" + delim + "param2" + delim + "param3" + delim + "param4"));
+
+ }
+
+}
\ No newline at end of file