Skip to content

Commit

Permalink
Merge pull request #195 from johnyu0520/master
Browse files Browse the repository at this point in the history
Azkaban Spark job type plugin
  • Loading branch information
hluu committed Sep 9, 2015
2 parents 4610096 + 8ad88b2 commit 62a187a
Show file tree
Hide file tree
Showing 41 changed files with 1,927 additions and 333 deletions.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
30 changes: 29 additions & 1 deletion plugins/jobtype/build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@
<property name="ext.lib.dir" value="${base.dir}/extlib" />
<property name="dist.jar.dir" value="${dist.dir}/jars" />
<property name="dist.classes.dir" value="${dist.dir}/classes" />
<property name="dist.classes.test.dir" value="${dist.dir}/testclasses" />
<property name="dist.packages.dir" value="${dist.dir}/packages" />
<property name="test.report.dir" value="${dist.dir}/report" />
<property name="java.src.dir" value="${basedir}/src" />
<property name="java.src.test.dir" value="${basedir}/test" />
<property name="azkaban-jobtype-jar" value="${dist.jar.dir}/${name}-${version}.jar" />
<property name="jobtypes.dir" value="${basedir}/jobtypes" />
<property name="javatype.dir" value="${basedir}/jobtypes/java" />
Expand Down Expand Up @@ -44,6 +47,7 @@
<delete dir="${dist.jar.dir}" />
<delete dir="${dist.classes.dir}" />
<delete dir="${dist.packages.dir}" />
<delete dir="${test.report.dir}" />
<delete>
<fileset file="${lib.dir}/azkaban-hadoopsecuritymanager-*.jar">
</fileset>
Expand Down Expand Up @@ -78,9 +82,33 @@
<exclude name="**/examples/**"/>
<classpath refid="main.classpath" />
</javac>


<delete dir="${dist.classes.test.dir}" />
<mkdir dir="${dist.classes.test.dir}" />
<javac fork="true" destdir="${dist.classes.test.dir}"
target="1.6" debug="true" deprecation="false" failonerror="true">
<src path="${java.src.test.dir}" />
<classpath refid="main.classpath" />
</javac>

</target>

<target name="junit" depends="build" description="running test cases">
<junit printsummary="yes">
<classpath>
<path refid="main.classpath" />
<pathelement path="${dist.classes.dir}" />
<pathelement path="${dist.classes.test.dir}" />
</classpath>

<batchtest fork="yes">
<fileset dir="${dist.classes.test.dir}" includes="**/*Test*.class" />
</batchtest>
</junit>
</target>

<target name="jars" depends="build" description="Compile main source tree java files">
<target name="jars" depends="junit" description="Compile main source tree java files">

<mkdir dir="${dist.jar.dir}" />
<jar destfile="${azkaban-jobtype-jar}">
Expand Down
1 change: 1 addition & 0 deletions plugins/jobtype/jobtypes/common.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
hadoop.home=
#hive.home=
#pig.home=
#spark.home=

#azkaban.should.proxy=
2 changes: 1 addition & 1 deletion plugins/jobtype/jobtypes/commonprivate.properties
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ hadoop.security.manager.class=azkaban.security.HadoopSecurityManager_H_1_0
hadoop.home=
#pig.home=
#hive.home=

#spark.home=
1 change: 1 addition & 0 deletions plugins/jobtype/jobtypes/spark/plugin.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
queue=default
3 changes: 3 additions & 0 deletions plugins/jobtype/jobtypes/spark/private.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
jobtype.class=azkaban.jobtype.HadoopSparkJob

jobtype.classpath=${hadoop.classpath}:${spark.home}/conf:${spark.home}/lib/*
Binary file added plugins/jobtype/lib/azkaban-common-2.6.2.53.jar
Binary file not shown.
Binary file added plugins/jobtype/lib/commons-io-2.4.jar
Binary file not shown.
Binary file added plugins/jobtype/lib/hadoop-auth-2.3.0.jar
Binary file not shown.
Binary file added plugins/jobtype/lib/hadoop-common-2.3.0.jar
Binary file not shown.
Binary file added plugins/jobtype/lib/hadoop-yarn-api-2.3.0.jar
Binary file not shown.
Binary file added plugins/jobtype/lib/hadoop-yarn-client-2.3.0.jar
Binary file not shown.
Binary file added plugins/jobtype/lib/hadoop-yarn-common-2.3.0.jar
Binary file not shown.
Binary file added plugins/jobtype/lib/hamcrest-core-1.3.jar
Binary file not shown.
Binary file added plugins/jobtype/lib/jcommander-1.27.jar
Binary file not shown.
Binary file added plugins/jobtype/lib/junit-4.11.jar
Binary file not shown.
Binary file added plugins/jobtype/lib/spark-core_2.10-1.4.0.jar
Binary file not shown.
87 changes: 10 additions & 77 deletions plugins/jobtype/src/azkaban/jobtype/HadoopHiveJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import org.apache.log4j.Logger;

import azkaban.flow.CommonJobProperties;
import azkaban.jobExecutor.JavaProcessJob;
import azkaban.security.commons.HadoopSecurityManager;
import azkaban.security.commons.HadoopSecurityManagerException;
Expand All @@ -50,27 +51,24 @@ public class HadoopHiveJob extends JavaProcessJob {

private HadoopSecurityManager hadoopSecurityManager;

private static final String HADOOP_SECURITY_MANAGER_CLASS_PARAM =
"hadoop.security.manager.class";

private boolean debug = false;

public HadoopHiveJob(String jobid, Props sysProps, Props jobProps, Logger log)
throws IOException {
super(jobid, sysProps, jobProps, log);

getJobProps().put("azkaban.job.id", jobid);
getJobProps().put(CommonJobProperties.JOB_ID, jobid);

shouldProxy = getSysProps().getBoolean("azkaban.should.proxy", false);
getJobProps().put("azkaban.should.proxy", Boolean.toString(shouldProxy));
obtainTokens = getSysProps().getBoolean("obtain.binary.token", false);
shouldProxy = getSysProps().getBoolean(HadoopSecurityManager.ENABLE_PROXYING, false);
getJobProps().put(HadoopSecurityManager.ENABLE_PROXYING, Boolean.toString(shouldProxy));
obtainTokens = getSysProps().getBoolean(HadoopSecurityManager.OBTAIN_BINARY_TOKEN, false);

debug = getJobProps().getBoolean("debug", false);

if (shouldProxy) {
getLog().info("Initiating hadoop security manager.");
try {
hadoopSecurityManager = loadHadoopSecurityManager(sysProps, log);
hadoopSecurityManager = HadoopJobUtils.loadHadoopSecurityManager(getSysProps(), log);
} catch (RuntimeException e) {
throw new RuntimeException("Failed to get hadoop security manager!" + e);
}
Expand All @@ -90,91 +88,26 @@ public void run() throws Exception {
Props props = new Props();
props.putAll(getJobProps());
props.putAll(getSysProps());
tokenFile = getHadoopTokens(props);
tokenFile = HadoopJobUtils.getHadoopTokens(hadoopSecurityManager, props, getLog());
getJobProps().put("env." + HADOOP_TOKEN_FILE_LOCATION,
tokenFile.getAbsolutePath());
}

try {
super.run();
} catch (Exception e) {
e.printStackTrace();
getLog().error("caught exception running the job");
throw new Exception(e);
super.run();
} catch (Throwable t) {
t.printStackTrace();
getLog().error("caught error running the job");
throw new Exception(t);
} finally {
if (tokenFile != null) {
cancelHadoopTokens(tokenFile);
HadoopJobUtils.cancelHadoopTokens(hadoopSecurityManager, userToProxy, tokenFile, getLog());
if (tokenFile.exists()) {
tokenFile.delete();
}
}
}
}

private void cancelHadoopTokens(File tokenFile) {
try {
hadoopSecurityManager.cancelTokens(tokenFile, userToProxy, getLog());
} catch (HadoopSecurityManagerException e) {
e.printStackTrace();
getLog().error(e.getCause() + e.getMessage());
} catch (Exception e) {
e.printStackTrace();
getLog().error(e.getCause() + e.getMessage());
}

}

private HadoopSecurityManager loadHadoopSecurityManager(Props props,
Logger logger) throws RuntimeException {

Class<?> hadoopSecurityManagerClass =
props.getClass(HADOOP_SECURITY_MANAGER_CLASS_PARAM, true,
HadoopHiveJob.class.getClassLoader());
getLog().info(
"Loading hadoop security manager "
+ hadoopSecurityManagerClass.getName());
HadoopSecurityManager hadoopSecurityManager = null;

try {
Method getInstanceMethod =
hadoopSecurityManagerClass.getMethod("getInstance", Props.class);
hadoopSecurityManager =
(HadoopSecurityManager) getInstanceMethod.invoke(
hadoopSecurityManagerClass, props);
} catch (InvocationTargetException e) {
getLog().error(
"Could not instantiate Hadoop Security Manager "
+ hadoopSecurityManagerClass.getName() + e.getCause());
throw new RuntimeException(e.getCause());
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e.getCause());
}

return hadoopSecurityManager;

}

protected File getHadoopTokens(Props props)
throws HadoopSecurityManagerException {

File tokenFile = null;
try {
tokenFile = File.createTempFile("mr-azkaban", ".token");
} catch (Exception e) {
e.printStackTrace();
throw new HadoopSecurityManagerException(
"Failed to create the token file.", e);
}

hadoopSecurityManager.prefetchToken(tokenFile, props, getLog());

return tokenFile;
}
}

@Override
protected String getJavaClass() {
Expand Down
99 changes: 18 additions & 81 deletions plugins/jobtype/src/azkaban/jobtype/HadoopJavaJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,19 @@

package azkaban.jobtype;

import static org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;

import java.io.File;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.StringTokenizer;

import org.apache.log4j.Logger;

import azkaban.flow.CommonJobProperties;
import azkaban.jobExecutor.JavaProcessJob;
import azkaban.security.commons.HadoopSecurityManager;
import azkaban.security.commons.HadoopSecurityManagerException;
import azkaban.utils.Props;
import azkaban.jobExecutor.JavaProcessJob;

import static org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;

public class HadoopJavaJob extends JavaProcessJob {

Expand All @@ -43,9 +41,6 @@ public class HadoopJavaJob extends JavaProcessJob {
public static final String DEFAULT_RUN_METHOD = "run";
public static final String DEFAULT_PROGRESS_METHOD = "getProgress";

private static final String HADOOP_SECURITY_MANAGER_CLASS_PARAM =
"hadoop.security.manager.class";

private String _runMethod;
private String _cancelMethod;
private String _progressMethod;
Expand All @@ -63,17 +58,18 @@ public HadoopJavaJob(String jobid, Props sysProps, Props jobProps, Logger log)
throws RuntimeException {
super(jobid, sysProps, jobProps, log);

getJobProps().put("azkaban.job.id", jobid);
shouldProxy = getSysProps().getBoolean("azkaban.should.proxy", false);
getJobProps().put("azkaban.should.proxy", Boolean.toString(shouldProxy));
obtainTokens = getSysProps().getBoolean("obtain.binary.token", false);
getJobProps().put(CommonJobProperties.JOB_ID, jobid);
shouldProxy = getSysProps().getBoolean(HadoopSecurityManager.ENABLE_PROXYING, false);
getJobProps().put(HadoopSecurityManager.ENABLE_PROXYING, Boolean.toString(shouldProxy));
obtainTokens = getSysProps().getBoolean(HadoopSecurityManager.OBTAIN_BINARY_TOKEN, false);
noUserClasspath =
getSysProps().getBoolean("azkaban.no.user.classpath", false);

if (shouldProxy) {
getLog().info("Initiating hadoop security manager.");
try {
hadoopSecurityManager = loadHadoopSecurityManager(sysProps, log);
hadoopSecurityManager = HadoopJobUtils.loadHadoopSecurityManager(getSysProps(), log);

} catch (RuntimeException e) {
e.printStackTrace();
throw new RuntimeException("Failed to get hadoop security manager!"
Expand All @@ -82,35 +78,6 @@ public HadoopJavaJob(String jobid, Props sysProps, Props jobProps, Logger log)
}
}

private HadoopSecurityManager loadHadoopSecurityManager(Props props,
Logger logger) throws RuntimeException {

Class<?> hadoopSecurityManagerClass =
props.getClass(HADOOP_SECURITY_MANAGER_CLASS_PARAM, true,
HadoopJavaJob.class.getClassLoader());
getLog().info(
"Initializing hadoop security manager "
+ hadoopSecurityManagerClass.getName());
HadoopSecurityManager hadoopSecurityManager = null;

try {
Method getInstanceMethod =
hadoopSecurityManagerClass.getMethod("getInstance", Props.class);
hadoopSecurityManager =
(HadoopSecurityManager) getInstanceMethod.invoke(
hadoopSecurityManagerClass, props);
} catch (InvocationTargetException e) {
getLog().error(
"Could not instantiate Hadoop Security Manager "
+ hadoopSecurityManagerClass.getName() + e.getCause());
throw new RuntimeException(e.getCause());
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e.getCause());
}

return hadoopSecurityManager;
}

@Override
protected String getJVMArguments() {
Expand Down Expand Up @@ -189,33 +156,33 @@ public void run() throws Exception {
HadoopConfigurationInjector.prepareResourcesToInject(getJobProps(),
getWorkingDirectory());

File f = null;
File tokenFile = null;
if (shouldProxy && obtainTokens) {
userToProxy = getJobProps().getString("user.to.proxy");
getLog().info("Need to proxy. Getting tokens.");
Props props = new Props();
props.putAll(getJobProps());
props.putAll(getSysProps());

f = getHadoopTokens(props);
tokenFile = HadoopJobUtils.getHadoopTokens(hadoopSecurityManager, props, getLog());
getJobProps().put("env." + HADOOP_TOKEN_FILE_LOCATION,
f.getAbsolutePath());
tokenFile.getAbsolutePath());
}
try {
super.run();
} catch (Exception e) {
e.printStackTrace();
throw new Exception(e);
} finally {
if (f != null) {
if (tokenFile != null) {
try {
cancelHadoopTokens(f);
HadoopJobUtils.cancelHadoopTokens(hadoopSecurityManager, userToProxy, tokenFile, getLog());
} catch (Throwable t) {
t.printStackTrace();
getLog().error("Failed to cancel tokens.");
}
if (f.exists()) {
f.delete();
if (tokenFile.exists()) {
tokenFile.delete();
}
}
}
Expand All @@ -239,37 +206,7 @@ private static String getSourcePathFromClass(Class<?> containedClass) {
return containedClass.getProtectionDomain().getCodeSource().getLocation()
.getPath();
}
}

protected File getHadoopTokens(Props props)
throws HadoopSecurityManagerException {

File tokenFile = null;
try {
tokenFile = File.createTempFile("mr-azkaban", ".token");
} catch (Exception e) {
e.printStackTrace();
throw new HadoopSecurityManagerException(
"Failed to create the token file.", e);
}

hadoopSecurityManager.prefetchToken(tokenFile, props, getLog());

return tokenFile;
}

private void cancelHadoopTokens(File f) {
try {
hadoopSecurityManager.cancelTokens(f, userToProxy, getLog());
} catch (HadoopSecurityManagerException e) {
e.printStackTrace();
getLog().error(e.getCause() + e.getMessage());
} catch (Exception e) {
e.printStackTrace();
getLog().error(e.getCause() + e.getMessage());
}

}
}

@Override
protected String getJavaClass() {
Expand Down
Loading

0 comments on commit 62a187a

Please sign in to comment.