Skip to content

Commit

Permalink
Merge pull request #148 from hluu/master
Browse files Browse the repository at this point in the history
Hive param substitution on Azkaban issue #145
  • Loading branch information
hluu committed Oct 10, 2014
2 parents 139a0b0 + 484a009 commit 0f6b8f5
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 18 deletions.
33 changes: 24 additions & 9 deletions plugins/jobtype/src/azkaban/jobtype/HadoopHiveJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package azkaban.jobtype;

import static org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
Expand All @@ -28,19 +30,17 @@

import org.apache.log4j.Logger;

import azkaban.jobExecutor.JavaProcessJob;
import azkaban.security.commons.HadoopSecurityManager;
import azkaban.security.commons.HadoopSecurityManagerException;

import azkaban.jobExecutor.JavaProcessJob;
import azkaban.utils.Props;
import azkaban.utils.StringUtils;

import static org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;

public class HadoopHiveJob extends JavaProcessJob {

public static final String HIVE_SCRIPT = "hive.script";
private static final String HIVE_PARAM_PREFIX = "hiveconf.";
private static final String HIVECONF_PARAM_PREFIX = "hiveconf.";
private static final String HIVEVAR_PARAM_PREFIX = "hivevar.";
public static final String HADOOP_SECURE_HIVE_WRAPPER =
"azkaban.jobtype.HadoopSecureHiveWrapper";

Expand Down Expand Up @@ -231,12 +231,13 @@ protected String getJVMArguments() {
protected String getMainArguments() {
ArrayList<String> list = new ArrayList<String>();

// for hiveconf
Map<String, String> map = getHiveConf();
if (map != null) {
for (Map.Entry<String, String> entry : map.entrySet()) {
list.add("-hiveconf "
+ StringUtils.shellQuote(entry.getKey() + "=" + entry.getValue(),
StringUtils.SINGLE_QUOTE));
list.add("-hiveconf");
list.add(StringUtils.shellQuote(
entry.getKey() + "=" + entry.getValue(), StringUtils.SINGLE_QUOTE));
}
}

Expand All @@ -245,6 +246,16 @@ protected String getMainArguments() {
list.add("hive.root.logger=INFO,console");
}

// for hivevar
Map<String, String> hiveVarMap = getHiveVar();
if (hiveVarMap != null) {
for (Map.Entry<String, String> entry : hiveVarMap.entrySet()) {
list.add("-hivevar");
list.add(StringUtils.shellQuote(
entry.getKey() + "=" + entry.getValue(), StringUtils.SINGLE_QUOTE));
}
}

list.add("-f");
list.add(getScript());

Expand Down Expand Up @@ -297,7 +308,11 @@ protected String getScript() {
}

protected Map<String, String> getHiveConf() {
return getJobProps().getMapByPrefix(HIVE_PARAM_PREFIX);
return getJobProps().getMapByPrefix(HIVECONF_PARAM_PREFIX);
}

protected Map<String, String> getHiveVar() {
return getJobProps().getMapByPrefix(HIVEVAR_PARAM_PREFIX);
}

private static String getSourcePathFromClass(Class<?> containedClass) {
Expand Down
120 changes: 111 additions & 9 deletions plugins/jobtype/src/azkaban/jobtype/HadoopSecureHiveWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,22 @@
package azkaban.jobtype;

import static azkaban.security.commons.SecurityUtils.MAPREDUCE_JOB_CREDENTIALS_BINARY;
import static azkaban.utils.StringUtils.DOUBLE_QUOTE;
import static azkaban.utils.StringUtils.SINGLE_QUOTE;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEAUXJARS;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORECONNECTURLKEY;
import static org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.cli.CliDriver;
Expand All @@ -28,7 +40,6 @@
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.log4j.Logger;
Expand All @@ -37,15 +48,13 @@
import azkaban.jobtype.hiveutils.HiveQueryExecutionException;
import azkaban.security.commons.HadoopSecurityManager;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Properties;

public class HadoopSecureHiveWrapper {

private static final String DOUBLE_QUOTE_STRING = Character
.toString(DOUBLE_QUOTE);
private static final String SINGLE_QUOTE_STRING = Character
.toString(SINGLE_QUOTE);

private static boolean securityEnabled;
private static final Logger logger = Logger.getRootLogger();

Expand Down Expand Up @@ -124,6 +133,8 @@ public static void runHive(String[] args) throws Exception {

final HiveConf hiveConf = new HiveConf(SessionState.class);

populateHiveConf(hiveConf, args);

if (System.getenv(HADOOP_TOKEN_FILE_LOCATION) != null) {
System.out.println("Setting hadoop tokens ... ");
hiveConf.set(MAPREDUCE_JOB_CREDENTIALS_BINARY,
Expand Down Expand Up @@ -160,7 +171,8 @@ public static void runHive(String[] args) throws Exception {
logger.info("Got auxJars = " + auxJars);

if (StringUtils.isNotBlank(auxJars)) {
loader = Utilities.addToClassPath(loader, StringUtils.split(auxJars, ","));
loader =
Utilities.addToClassPath(loader, StringUtils.split(auxJars, ","));
}
hiveConf.setClassLoader(loader);
Thread.currentThread().setContextClassLoader(loader);
Expand Down Expand Up @@ -188,6 +200,14 @@ public static void runHive(String[] args) throws Exception {
logger.info("Executing query: " + hiveScript);

CliDriver cli = new CliDriver();
Map<String, String> hiveVarMap = getHiveVarMap(args);

logger.info("hiveVarMap: " + hiveVarMap);

if (!hiveVarMap.isEmpty()) {
cli.setHiveVariables(getHiveVarMap(args));
}

int returnCode = cli.processFile(hiveScript);
if (returnCode != 0) {
logger.warn("Got exception " + returnCode + " from line: " + hiveScript);
Expand Down Expand Up @@ -238,4 +258,86 @@ static String filesToURIString(File[] files) throws IOException {
return sb.toString();
}

/**
* Extract hiveconf from command line arguments and populate them into
* HiveConf
*
* An example: -hiveconf 'zipcode=10', -hiveconf hive.root.logger=INFO,console
*
* @param hiveConf
* @param args
*/
private static void populateHiveConf(HiveConf hiveConf, String[] args) {

if (args == null) {
return;
}

int index = 0;
for (; index < args.length; index++) {
if ("-hiveconf".equals(args[index])) {
String hiveConfParam = stripSingleDoubleQuote(args[++index]);

String[] tokens = hiveConfParam.split("=");
if (tokens.length == 2) {
String name = tokens[0];
String value = tokens[1];
logger.info("Setting: " + name + "=" + value + " to hiveConf");
hiveConf.set(name, value);
} else {
logger.warn("Invalid hiveconf: " + hiveConfParam);
}
}
}
}

private static Map<String, String> getHiveVarMap(String[] args) {

if (args == null) {
return Collections.emptyMap();
}

Map<String, String> hiveVarMap = new HashMap<String, String>();
int index = 0;
for (; index < args.length; index++) {
if ("-hivevar".equals(args[index])) {
String hiveVarParam = stripSingleDoubleQuote(args[++index]);

String[] tokens = hiveVarParam.split("=");
if (tokens.length == 2) {
String name = tokens[0];
String value = tokens[1];
logger.info("Setting hivevar: " + name + "=" + value);
hiveVarMap.put(name, value);
} else {
logger.warn("Invalid hivevar: " + hiveVarParam);
}
}
}
return hiveVarMap;
}

/**
* Strip single quote or double quote at either end of the string
*
* @param input
* @return string with w/o leading or trailing single or double quote
*/
private static String stripSingleDoubleQuote(String input) {
if (StringUtils.isEmpty(input)) {
return input;
}

if (input.startsWith(SINGLE_QUOTE_STRING)
|| input.startsWith(DOUBLE_QUOTE_STRING)) {
input = input.substring(1);
}

if (input.endsWith(SINGLE_QUOTE_STRING)
|| input.endsWith(DOUBLE_QUOTE_STRING)) {
input = input.substring(0, input.length() - 1);
}

return input;
}
}

0 comments on commit 0f6b8f5

Please sign in to comment.