diff --git a/build.xml b/build.xml
index 5db8a120..cc0dbf55 100644
--- a/build.xml
+++ b/build.xml
@@ -77,14 +77,6 @@ ${current.time}
-
-
-
-
-
-
-
-
@@ -104,10 +96,6 @@ ${current.time}
-
-
-
-
diff --git a/plugins/hadoopsecuritymanager-common/build.properties b/plugins/hadoopsecuritymanager-common/build.properties
deleted file mode 100644
index bfe96c25..00000000
--- a/plugins/hadoopsecuritymanager-common/build.properties
+++ /dev/null
@@ -1,3 +0,0 @@
-name=azkaban-hadoopsecuritymanager-common
-version=3.0.0
-
diff --git a/plugins/hadoopsecuritymanager-common/build.xml b/plugins/hadoopsecuritymanager-common/build.xml
deleted file mode 100644
index cee50ad1..00000000
--- a/plugins/hadoopsecuritymanager-common/build.xml
+++ /dev/null
@@ -1,62 +0,0 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/plugins/hadoopsecuritymanager-common/lib/hadoop-core-1.0.3.jar b/plugins/hadoopsecuritymanager-common/lib/hadoop-core-1.0.3.jar
deleted file mode 100644
index 85092fea..00000000
Binary files a/plugins/hadoopsecuritymanager-common/lib/hadoop-core-1.0.3.jar and /dev/null differ
diff --git a/plugins/hadoopsecuritymanager-common/src/azkaban/security/commons/HadoopSecurityManager.java b/plugins/hadoopsecuritymanager-common/src/azkaban/security/commons/HadoopSecurityManager.java
deleted file mode 100644
index d6578d2f..00000000
--- a/plugins/hadoopsecuritymanager-common/src/azkaban/security/commons/HadoopSecurityManager.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Copyright 2011 LinkedIn Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package azkaban.security.commons;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Properties;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.log4j.Logger;
-
-import azkaban.utils.Props;
-
-public abstract class HadoopSecurityManager {
-
- public static final String ENABLE_PROXYING = "azkaban.should.proxy"; // boolean
-
- public static final String PROXY_KEYTAB_LOCATION = "proxy.keytab.location";
- public static final String PROXY_USER = "proxy.user";
- public static final String USER_TO_PROXY = "user.to.proxy";
- public static final String OBTAIN_BINARY_TOKEN = "obtain.binary.token";
- public static final String MAPREDUCE_JOB_CREDENTIALS_BINARY =
- "mapreduce.job.credentials.binary";
-
- public static final String OBTAIN_JOBTRACKER_TOKEN =
- "obtain.jobtracker.token";
- public static final String OBTAIN_NAMENODE_TOKEN = "obtain.namenode.token";
- public static final String OBTAIN_HCAT_TOKEN = "obtain.hcat.token";
-
- public boolean isHadoopSecurityEnabled()
- throws HadoopSecurityManagerException {
- return false;
- }
-
- public void reloginFromKeytab() throws IOException {
- UserGroupInformation.getLoginUser().reloginFromKeytab();
- }
-
- /**
- * Create a proxied user based on the explicit user name, taking other
- * parameters necessary from properties file.
- */
- public abstract UserGroupInformation getProxiedUser(String toProxy)
- throws HadoopSecurityManagerException;
-
- /**
- * Create a proxied user, taking all parameters, including which user to proxy
- * from provided Properties.
- */
- public abstract UserGroupInformation getProxiedUser(Props prop)
- throws HadoopSecurityManagerException;
-
- public abstract FileSystem getFSAsUser(String user)
- throws HadoopSecurityManagerException;
-
- public static boolean shouldProxy(Properties prop) {
- String shouldProxy = prop.getProperty(ENABLE_PROXYING);
-
- return shouldProxy != null && shouldProxy.equals("true");
- }
-
- public abstract void prefetchToken(File tokenFile, String userToProxy,
- Logger logger) throws HadoopSecurityManagerException;
-
- public abstract void cancelTokens(File tokenFile, String userToProxy,
- Logger logger) throws HadoopSecurityManagerException;
-
- public abstract void prefetchToken(File tokenFile, Props props, Logger logger)
- throws HadoopSecurityManagerException;
-
-}
diff --git a/plugins/hadoopsecuritymanager-common/src/azkaban/security/commons/HadoopSecurityManagerException.java b/plugins/hadoopsecuritymanager-common/src/azkaban/security/commons/HadoopSecurityManagerException.java
deleted file mode 100644
index 5b55d5f8..00000000
--- a/plugins/hadoopsecuritymanager-common/src/azkaban/security/commons/HadoopSecurityManagerException.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Copyright 2012 LinkedIn Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package azkaban.security.commons;
-
-public class HadoopSecurityManagerException extends Exception {
- private static final long serialVersionUID = 1L;
-
- public HadoopSecurityManagerException(String message) {
- super(message);
- }
-
- public HadoopSecurityManagerException(String message, Throwable cause) {
- super(message, cause);
- }
-}
diff --git a/plugins/hadoopsecuritymanager-common/src/azkaban/security/commons/SecurityUtils.java b/plugins/hadoopsecuritymanager-common/src/azkaban/security/commons/SecurityUtils.java
deleted file mode 100644
index ef4cc35a..00000000
--- a/plugins/hadoopsecuritymanager-common/src/azkaban/security/commons/SecurityUtils.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Copyright 2011 LinkedIn Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package azkaban.security.commons;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.log4j.Logger;
-
-import azkaban.utils.Props;
-
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-import java.util.Properties;
-
-public class SecurityUtils {
- // Secure Hadoop proxy user params
- public static final String ENABLE_PROXYING = "azkaban.should.proxy"; // boolean
- public static final String PROXY_KEYTAB_LOCATION = "proxy.keytab.location";
- public static final String PROXY_USER = "proxy.user";
- public static final String TO_PROXY = "user.to.proxy";
-
- private static UserGroupInformation loginUser = null;
-
- /**
- * Create a proxied user based on the explicit user name, taking other
- * parameters necessary from properties file.
- */
- public static synchronized UserGroupInformation getProxiedUser(
- String toProxy, Properties prop, Logger log, Configuration conf)
- throws IOException {
-
- if (conf == null) {
- throw new IllegalArgumentException("conf can't be null");
- }
- UserGroupInformation.setConfiguration(conf);
-
- if (toProxy == null) {
- throw new IllegalArgumentException("toProxy can't be null");
- }
-
- if (loginUser == null) {
- log.info("No login user. Creating login user");
- String keytab = verifySecureProperty(prop, PROXY_KEYTAB_LOCATION, log);
- String proxyUser = verifySecureProperty(prop, PROXY_USER, log);
- UserGroupInformation.loginUserFromKeytab(proxyUser, keytab);
- loginUser = UserGroupInformation.getLoginUser();
- log.info("Logged in with user " + loginUser);
- } else {
- log.info("loginUser (" + loginUser + ") already created, refreshing tgt.");
- loginUser.checkTGTAndReloginFromKeytab();
- }
-
- return UserGroupInformation.createProxyUser(toProxy, loginUser);
- }
-
- /**
- * Create a proxied user, taking all parameters, including which user to proxy
- * from provided Properties.
- */
- public static UserGroupInformation getProxiedUser(Properties prop,
- Logger log, Configuration conf) throws IOException {
- String toProxy = verifySecureProperty(prop, TO_PROXY, log);
- UserGroupInformation user = getProxiedUser(toProxy, prop, log, conf);
- if (user == null)
- throw new IOException(
- "Proxy as any user in unsecured grid is not supported!"
- + prop.toString());
- log.info("created proxy user for " + user.getUserName() + user.toString());
- return user;
- }
-
- public static String verifySecureProperty(Properties properties, String s,
- Logger l) throws IOException {
- String value = properties.getProperty(s);
-
- if (value == null)
- throw new IOException(s
- + " not set in properties. Cannot use secure proxy");
- l.info("Secure proxy configuration: Property " + s + " = " + value);
- return value;
- }
-
- public static boolean shouldProxy(Properties prop) {
- String shouldProxy = prop.getProperty(ENABLE_PROXYING);
-
- return shouldProxy != null && shouldProxy.equals("true");
- }
-
- public static final String OBTAIN_BINARY_TOKEN = "obtain.binary.token";
- public static final String MAPREDUCE_JOB_CREDENTIALS_BINARY =
- "mapreduce.job.credentials.binary";
-
- public static synchronized void prefetchToken(final File tokenFile,
- final Props p, final Logger logger) throws InterruptedException,
- IOException {
-
- final Configuration conf = new Configuration();
- logger.info("Getting proxy user for " + p.getString(TO_PROXY));
- logger.info("Getting proxy user for " + p.toString());
-
- getProxiedUser(p.toProperties(), logger, conf).doAs(
- new PrivilegedExceptionAction() {
- @Override
- public Void run() throws Exception {
- getToken(p);
- return null;
- }
-
- private void getToken(Props p) throws InterruptedException,
- IOException {
- String shouldPrefetch = p.getString(OBTAIN_BINARY_TOKEN);
- if (shouldPrefetch != null && shouldPrefetch.equals("true")) {
- logger.info("Pre-fetching token");
-
- logger.info("Pre-fetching fs token");
- FileSystem fs = FileSystem.get(conf);
- Token> fsToken =
- fs.getDelegationToken(p.getString("user.to.proxy"));
- logger.info("Created token: " + fsToken.toString());
-
- Job job =
- new Job(conf, "totally phony, extremely fake, not real job");
- JobConf jc = new JobConf(conf);
- JobClient jobClient = new JobClient(jc);
- logger.info("Pre-fetching job token: Got new JobClient: " + jc);
- Token mrdt =
- jobClient.getDelegationToken(new Text("hi"));
- logger.info("Created token: " + mrdt.toString());
-
- job.getCredentials().addToken(new Text("howdy"), mrdt);
- job.getCredentials().addToken(fsToken.getService(), fsToken);
-
- FileOutputStream fos = null;
- DataOutputStream dos = null;
- try {
- fos = new FileOutputStream(tokenFile);
- dos = new DataOutputStream(fos);
- job.getCredentials().writeTokenStorageToStream(dos);
- } finally {
- if (dos != null) {
- dos.close();
- }
- if (fos != null) {
- fos.close();
- }
- }
- logger.info("Loading hadoop tokens into "
- + tokenFile.getAbsolutePath());
- p.put("HadoopTokenFileLoc", tokenFile.getAbsolutePath());
- } else {
- logger.info("Not pre-fetching token");
- }
- }
- });
- }
-}
diff --git a/plugins/hadoopsecuritymanager-yarn/build.properties b/plugins/hadoopsecuritymanager-yarn/build.properties
deleted file mode 100644
index dcfc5efe..00000000
--- a/plugins/hadoopsecuritymanager-yarn/build.properties
+++ /dev/null
@@ -1,3 +0,0 @@
-name=azkaban-hadoopsecuritymanager-yarn
-version=3.0.0
-
diff --git a/plugins/hadoopsecuritymanager-yarn/build.xml b/plugins/hadoopsecuritymanager-yarn/build.xml
deleted file mode 100644
index 1267bfed..00000000
--- a/plugins/hadoopsecuritymanager-yarn/build.xml
+++ /dev/null
@@ -1,109 +0,0 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/plugins/hadoopsecuritymanager-yarn/lib/hadoop-common-2.3.0.11.jar b/plugins/hadoopsecuritymanager-yarn/lib/hadoop-common-2.3.0.11.jar
deleted file mode 100644
index 299db48d..00000000
Binary files a/plugins/hadoopsecuritymanager-yarn/lib/hadoop-common-2.3.0.11.jar and /dev/null differ
diff --git a/plugins/hadoopsecuritymanager-yarn/lib/hadoop-mapreduce-client-common-2.3.0.11.jar b/plugins/hadoopsecuritymanager-yarn/lib/hadoop-mapreduce-client-common-2.3.0.11.jar
deleted file mode 100644
index 5a4d4d86..00000000
Binary files a/plugins/hadoopsecuritymanager-yarn/lib/hadoop-mapreduce-client-common-2.3.0.11.jar and /dev/null differ
diff --git a/plugins/hadoopsecuritymanager-yarn/lib/hadoop-mapreduce-client-core-2.3.0.11.jar b/plugins/hadoopsecuritymanager-yarn/lib/hadoop-mapreduce-client-core-2.3.0.11.jar
deleted file mode 100644
index 772ac104..00000000
Binary files a/plugins/hadoopsecuritymanager-yarn/lib/hadoop-mapreduce-client-core-2.3.0.11.jar and /dev/null differ
diff --git a/plugins/hadoopsecuritymanager-yarn/lib/hadoop-yarn-api-2.3.0.11.jar b/plugins/hadoopsecuritymanager-yarn/lib/hadoop-yarn-api-2.3.0.11.jar
deleted file mode 100644
index ff3e500c..00000000
Binary files a/plugins/hadoopsecuritymanager-yarn/lib/hadoop-yarn-api-2.3.0.11.jar and /dev/null differ
diff --git a/plugins/hadoopsecuritymanager-yarn/lib/hadoop-yarn-common-2.3.0.11.jar b/plugins/hadoopsecuritymanager-yarn/lib/hadoop-yarn-common-2.3.0.11.jar
deleted file mode 100644
index 39195691..00000000
Binary files a/plugins/hadoopsecuritymanager-yarn/lib/hadoop-yarn-common-2.3.0.11.jar and /dev/null differ
diff --git a/plugins/hadoopsecuritymanager-yarn/lib/hive-metastore-0.12.0.22.jar b/plugins/hadoopsecuritymanager-yarn/lib/hive-metastore-0.12.0.22.jar
deleted file mode 100644
index 68652b6b..00000000
Binary files a/plugins/hadoopsecuritymanager-yarn/lib/hive-metastore-0.12.0.22.jar and /dev/null differ
diff --git a/plugins/hadoopsecuritymanager-yarn/src/azkaban/security/HadoopSecurityManager_H_2_0.java b/plugins/hadoopsecuritymanager-yarn/src/azkaban/security/HadoopSecurityManager_H_2_0.java
deleted file mode 100644
index 9c7757ad..00000000
--- a/plugins/hadoopsecuritymanager-yarn/src/azkaban/security/HadoopSecurityManager_H_2_0.java
+++ /dev/null
@@ -1,857 +0,0 @@
-/*
- * Copyright 2011 LinkedIn Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package azkaban.security;
-
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.security.PrivilegedAction;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Master;
-import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
-import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol;
-import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenRequest;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
-import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.Records;
-import org.apache.log4j.Logger;
-import org.apache.thrift.TException;
-
-import azkaban.security.commons.HadoopSecurityManager;
-import azkaban.security.commons.HadoopSecurityManagerException;
-import azkaban.utils.Props;
-import azkaban.utils.UndefinedPropertyException;
-
-public class HadoopSecurityManager_H_2_0 extends HadoopSecurityManager {
-
- private static final String FS_HDFS_IMPL_DISABLE_CACHE =
- "fs.hdfs.impl.disable.cache";
-
- /** The Kerberos principal for the job tracker. */
- public static final String JT_PRINCIPAL = JTConfig.JT_USER_NAME;
- // "mapreduce.jobtracker.kerberos.principal";
- /** The Kerberos principal for the resource manager. */
- public static final String RM_PRINCIPAL = "yarn.resourcemanager.principal";
-
- public static final String HADOOP_JOB_TRACKER = "mapred.job.tracker";
- public static final String HADOOP_JOB_TRACKER_2 =
- "mapreduce.jobtracker.address";
- public static final String HADOOP_YARN_RM = "yarn.resourcemanager.address";
-
- private static final String OTHER_NAMENODES_TO_GET_TOKEN = "other_namenodes";
-
- /**
- * the settings to be defined by user indicating if there are hcat locations
- * other than the default one the system should pre-fetch hcat token from.
- * Note: Multiple thrift uris are supported, use comma to separate the values,
- * values are case insensitive.
- * */
- private static final String EXTRA_HCAT_LOCATION = "other_hcat_location";
-
- /**
- * the key that will be used to set proper signature for each of the hcat
- * token when multiple hcat tokens are required to be fetched.
- * */
- public static final String HIVE_TOKEN_SIGNATURE_KEY =
- "hive.metastore.token.signature";
-
- public static final Text DEFAULT_RENEWER = new Text("azkaban mr tokens");
-
- private static final String AZKABAN_KEYTAB_LOCATION = "proxy.keytab.location";
- private static final String AZKABAN_PRINCIPAL = "proxy.user";
- private static final String OBTAIN_JOBHISTORYSERVER_TOKEN =
- "obtain.jobhistoryserver.token";
-
- private UserGroupInformation loginUser = null;
- private final static Logger logger = Logger
- .getLogger(HadoopSecurityManager_H_2_0.class);
- private Configuration conf;
-
- private String keytabLocation;
- private String keytabPrincipal;
- private boolean shouldProxy = false;
- private boolean securityEnabled = false;
-
- private static HadoopSecurityManager hsmInstance = null;
- private ConcurrentMap userUgiMap;
-
- private static URLClassLoader ucl;
-
- private final RecordFactory recordFactory = RecordFactoryProvider
- .getRecordFactory(null);
-
- private HadoopSecurityManager_H_2_0(Props props)
- throws HadoopSecurityManagerException, IOException {
-
- // for now, assume the same/compatible native library, the same/compatible
- // hadoop-core jar
- String hadoopHome = props.getString("hadoop.home", null);
- String hadoopConfDir = props.getString("hadoop.conf.dir", null);
-
- if (hadoopHome == null) {
- hadoopHome = System.getenv("HADOOP_HOME");
- }
- if (hadoopConfDir == null) {
- hadoopConfDir = System.getenv("HADOOP_CONF_DIR");
- }
-
- List resources = new ArrayList();
- URL urlToHadoop = null;
- if (hadoopConfDir != null) {
- urlToHadoop = new File(hadoopConfDir).toURI().toURL();
- logger.info("Using hadoop config found in " + urlToHadoop);
- resources.add(urlToHadoop);
- } else if (hadoopHome != null) {
- urlToHadoop = new File(hadoopHome, "conf").toURI().toURL();
- logger.info("Using hadoop config found in " + urlToHadoop);
- resources.add(urlToHadoop);
- } else {
- logger.info("HADOOP_HOME not set, using default hadoop config.");
- }
-
- ucl = new URLClassLoader(resources.toArray(new URL[resources.size()]));
-
- conf = new Configuration();
- conf.setClassLoader(ucl);
-
- if (props.containsKey(FS_HDFS_IMPL_DISABLE_CACHE)) {
- logger.info("Setting " + FS_HDFS_IMPL_DISABLE_CACHE + " to "
- + props.get(FS_HDFS_IMPL_DISABLE_CACHE));
- conf.setBoolean(FS_HDFS_IMPL_DISABLE_CACHE,
- Boolean.valueOf(props.get(FS_HDFS_IMPL_DISABLE_CACHE)));
- }
-
- logger.info(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION + ": "
- + conf.get(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION));
- logger.info(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION + ": "
- + conf.get(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION));
- logger.info(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY + ": "
- + conf.get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY));
-
- UserGroupInformation.setConfiguration(conf);
-
- securityEnabled = UserGroupInformation.isSecurityEnabled();
- if (securityEnabled) {
- logger.info("The Hadoop cluster has enabled security");
- shouldProxy = true;
- try {
-
- keytabLocation = props.getString(AZKABAN_KEYTAB_LOCATION);
- keytabPrincipal = props.getString(AZKABAN_PRINCIPAL);
- } catch (UndefinedPropertyException e) {
- throw new HadoopSecurityManagerException(e.getMessage());
- }
-
- // try login
- try {
- if (loginUser == null) {
- logger.info("No login user. Creating login user");
- logger.info("Using principal from " + keytabPrincipal + " and "
- + keytabLocation);
- UserGroupInformation.loginUserFromKeytab(keytabPrincipal,
- keytabLocation);
- loginUser = UserGroupInformation.getLoginUser();
- logger.info("Logged in with user " + loginUser);
- } else {
- logger.info("loginUser (" + loginUser
- + ") already created, refreshing tgt.");
- loginUser.checkTGTAndReloginFromKeytab();
- }
- } catch (IOException e) {
- throw new HadoopSecurityManagerException(
- "Failed to login with kerberos ", e);
- }
-
- }
-
- userUgiMap = new ConcurrentHashMap();
-
- logger.info("Hadoop Security Manager initialized");
- }
-
- public static HadoopSecurityManager getInstance(Props props)
- throws HadoopSecurityManagerException, IOException {
- if (hsmInstance == null) {
- synchronized (HadoopSecurityManager_H_2_0.class) {
- if (hsmInstance == null) {
- logger.info("getting new instance");
- hsmInstance = new HadoopSecurityManager_H_2_0(props);
- }
- }
- }
-
- logger.debug("Relogging in from keytab if necessary.");
- hsmInstance.reloginFromKeytab();
-
- return hsmInstance;
- }
-
- /**
- * Create a proxied user based on the explicit user name, taking other
- * parameters necessary from properties file.
- *
- * @throws IOException
- */
- @Override
- public synchronized UserGroupInformation getProxiedUser(String userToProxy)
- throws HadoopSecurityManagerException {
-
- if (userToProxy == null) {
- throw new HadoopSecurityManagerException("userToProxy can't be null");
- }
-
- UserGroupInformation ugi = userUgiMap.get(userToProxy);
- if (ugi == null) {
- logger.info("proxy user " + userToProxy
- + " not exist. Creating new proxy user");
- if (shouldProxy) {
- try {
- ugi =
- UserGroupInformation.createProxyUser(userToProxy,
- UserGroupInformation.getLoginUser());
- } catch (IOException e) {
- throw new HadoopSecurityManagerException(
- "Failed to create proxy user", e);
- }
- } else {
- ugi = UserGroupInformation.createRemoteUser(userToProxy);
- }
- userUgiMap.putIfAbsent(userToProxy, ugi);
- }
- return ugi;
- }
-
- /**
- * Create a proxied user, taking all parameters, including which user to proxy
- * from provided Properties.
- */
- @Override
- public UserGroupInformation getProxiedUser(Props userProp)
- throws HadoopSecurityManagerException {
- String userToProxy = verifySecureProperty(userProp, USER_TO_PROXY);
- UserGroupInformation user = getProxiedUser(userToProxy);
- if (user == null) {
- throw new HadoopSecurityManagerException(
- "Proxy as any user in unsecured grid is not supported!");
- }
- return user;
- }
-
- public String verifySecureProperty(Props props, String s)
- throws HadoopSecurityManagerException {
- String value = props.getString(s);
- if (value == null) {
- throw new HadoopSecurityManagerException(s + " not set in properties.");
- }
- return value;
- }
-
- @Override
- public FileSystem getFSAsUser(String user)
- throws HadoopSecurityManagerException {
- FileSystem fs;
- try {
- logger.info("Getting file system as " + user);
- UserGroupInformation ugi = getProxiedUser(user);
-
- if (ugi != null) {
- fs = ugi.doAs(new PrivilegedAction() {
-
- @Override
- public FileSystem run() {
- try {
- return FileSystem.get(conf);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- });
- } else {
- fs = FileSystem.get(conf);
- }
- } catch (Exception e) {
- throw new HadoopSecurityManagerException("Failed to get FileSystem. ", e);
- }
- return fs;
- }
-
- public boolean shouldProxy() {
- return shouldProxy;
- }
-
- @Override
- public boolean isHadoopSecurityEnabled() {
- return securityEnabled;
- }
-
- /*
- * Gets hadoop tokens for a user to run mapred/pig jobs on a secured cluster
- */
- @Override
- public synchronized void prefetchToken(final File tokenFile,
- final String userToProxy, final Logger logger)
- throws HadoopSecurityManagerException {
-
- logger.info("Getting hadoop tokens for " + userToProxy);
-
- try {
- getProxiedUser(userToProxy).doAs(new PrivilegedExceptionAction() {
- @Override
- public Void run() throws Exception {
- getToken(userToProxy);
- return null;
- }
-
- private void getToken(String userToProxy) throws InterruptedException,
- IOException, HadoopSecurityManagerException {
-
- FileSystem fs = FileSystem.get(conf);
- // check if we get the correct FS, and most importantly, the conf
- logger.info("Getting DFS token from " + fs.getCanonicalServiceName()
- + fs.getUri());
- Token> fsToken = fs.getDelegationToken(userToProxy);
- if (fsToken == null) {
- logger.error("Failed to fetch DFS token for ");
- throw new HadoopSecurityManagerException(
- "Failed to fetch DFS token for " + userToProxy);
- }
- logger.info("Created DFS token: " + fsToken.toString());
- logger.info("Token kind: " + fsToken.getKind());
- logger.info("Token id: " + fsToken.getIdentifier());
- logger.info("Token service: " + fsToken.getService());
-
- JobConf jc = new JobConf(conf);
- JobClient jobClient = new JobClient(jc);
- logger.info("Pre-fetching JT token: Got new JobClient: " + jc);
-
- Token mrdt =
- jobClient.getDelegationToken(new Text("mr token"));
- if (mrdt == null) {
- logger.error("Failed to fetch JT token for ");
- throw new HadoopSecurityManagerException(
- "Failed to fetch JT token for " + userToProxy);
- }
- logger.info("Created JT token: " + mrdt.toString());
- logger.info("Token kind: " + mrdt.getKind());
- logger.info("Token id: " + mrdt.getIdentifier());
- logger.info("Token service: " + mrdt.getService());
-
- jc.getCredentials().addToken(mrdt.getService(), mrdt);
- jc.getCredentials().addToken(fsToken.getService(), fsToken);
-
- FileOutputStream fos = null;
- DataOutputStream dos = null;
- try {
- fos = new FileOutputStream(tokenFile);
- dos = new DataOutputStream(fos);
- jc.getCredentials().writeTokenStorageToStream(dos);
- } finally {
- if (dos != null) {
- try {
- dos.close();
- } catch (Throwable t) {
- // best effort
- logger
- .error(
- "encountered exception while closing DataOutputStream of the tokenFile",
- t);
- }
- }
- if (fos != null) {
- fos.close();
- }
- }
- // stash them to cancel after use.
- logger.info("Tokens loaded in " + tokenFile.getAbsolutePath());
- }
- });
- } catch (Exception e) {
- throw new HadoopSecurityManagerException("Failed to get hadoop tokens! "
- + e.getMessage() + e.getCause());
-
- }
- }
-
- private void cancelNameNodeToken(final Token extends TokenIdentifier> t,
- String userToProxy) throws HadoopSecurityManagerException {
- try {
- getProxiedUser(userToProxy).doAs(new PrivilegedExceptionAction() {
- @Override
- public Void run() throws Exception {
- cancelToken(t);
- return null;
- }
-
- private void cancelToken(Token> nt) throws IOException,
- InterruptedException {
- nt.cancel(conf);
- }
- });
- } catch (Exception e) {
- throw new HadoopSecurityManagerException("Failed to cancel token. "
- + e.getMessage() + e.getCause(), e);
- }
- }
-
- private void cancelMRJobTrackerToken(
- final Token extends TokenIdentifier> t, String userToProxy)
- throws HadoopSecurityManagerException {
- try {
- getProxiedUser(userToProxy).doAs(new PrivilegedExceptionAction() {
- @SuppressWarnings("unchecked")
- @Override
- public Void run() throws Exception {
- cancelToken((Token) t);
- return null;
- }
-
- private void cancelToken(Token jt)
- throws IOException, InterruptedException {
- JobConf jc = new JobConf(conf);
- JobClient jobClient = new JobClient(jc);
- jobClient.cancelDelegationToken(jt);
- }
- });
- } catch (Exception e) {
- throw new HadoopSecurityManagerException("Failed to cancel token. "
- + e.getMessage() + e.getCause(), e);
- }
- }
-
- private void cancelJhsToken(final Token extends TokenIdentifier> t,
- String userToProxy) throws HadoopSecurityManagerException {
- // it appears yarn would clean up this token after app finish, after a long
- // while though.
- org.apache.hadoop.yarn.api.records.Token token =
- org.apache.hadoop.yarn.api.records.Token.newInstance(t.getIdentifier(),
- t.getKind().toString(), t.getPassword(), t.getService().toString());
- final YarnRPC rpc = YarnRPC.create(conf);
- final InetSocketAddress jhsAddress = SecurityUtil.getTokenServiceAddr(t);
- MRClientProtocol jhsProxy = null;
- try {
- jhsProxy =
- UserGroupInformation.getCurrentUser().doAs(
- new PrivilegedAction() {
- @Override
- public MRClientProtocol run() {
- return (MRClientProtocol) rpc.getProxy(
- HSClientProtocol.class, jhsAddress, conf);
- }
- });
- CancelDelegationTokenRequest request =
- Records.newRecord(CancelDelegationTokenRequest.class);
- request.setDelegationToken(token);
- jhsProxy.cancelDelegationToken(request);
- } catch (Exception e) {
- throw new HadoopSecurityManagerException("Failed to cancel token. "
- + e.getMessage() + e.getCause(), e);
- } finally {
- RPC.stopProxy(jhsProxy);
- }
-
- }
-
- private void cancelHiveToken(final Token extends TokenIdentifier> t,
- String userToProxy) throws HadoopSecurityManagerException {
- try {
- HiveConf hiveConf = new HiveConf();
- HiveMetaStoreClient hiveClient = new HiveMetaStoreClient(hiveConf);
- hiveClient.cancelDelegationToken(t.encodeToUrlString());
- } catch (Exception e) {
- throw new HadoopSecurityManagerException("Failed to cancel Token. "
- + e.getMessage() + e.getCause(), e);
- }
- }
-
- @Override
- public void cancelTokens(File tokenFile, String userToProxy, Logger logger)
- throws HadoopSecurityManagerException {
- // nntoken
- Credentials cred = null;
- try {
- cred =
- Credentials.readTokenStorageFile(new Path(tokenFile.toURI()),
- new Configuration());
- for (Token extends TokenIdentifier> t : cred.getAllTokens()) {
-
- logger.info("Got token: " + t.toString());
- logger.info("Token kind: " + t.getKind());
- logger.info("Token id: " + new String(t.getIdentifier()));
- logger.info("Token service: " + t.getService());
-
- if (t.getKind().equals(new Text("HIVE_DELEGATION_TOKEN"))) {
- logger.info("Cancelling hive token " + new String(t.getIdentifier()));
- cancelHiveToken(t, userToProxy);
- } else if (t.getKind().equals(new Text("RM_DELEGATION_TOKEN"))) {
- logger.info("Cancelling mr job tracker token "
- + new String(t.getIdentifier()));
- // cancelMRJobTrackerToken(t, userToProxy);
- } else if (t.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) {
- logger.info("Cancelling namenode token "
- + new String(t.getIdentifier()));
- // cancelNameNodeToken(t, userToProxy);
- } else if (t.getKind().equals(new Text("MR_DELEGATION_TOKEN"))) {
- logger.info("Cancelling jobhistoryserver mr token "
- + new String(t.getIdentifier()));
- // cancelJhsToken(t, userToProxy);
- } else {
- logger.info("unknown token type " + t.getKind());
- }
- }
- } catch (Exception e) {
- throw new HadoopSecurityManagerException("Failed to cancel tokens "
- + e.getMessage() + e.getCause(), e);
- }
-
- }
-
- /**
- * function to fetch hcat token as per the specified hive configuration and
- * then store the token in to the credential store specified .
- *
- * @param userToProxy String value indicating the name of the user the token
- * will be fetched for.
- * @param hiveConf the configuration based off which the hive client will be
- * initialized.
- * @param logger the logger instance which writes the logging content to the
- * job logs.
- *
- * @throws IOException
- * @throws TException
- * @throws MetaException
- *
- * */
- private Token fetchHcatToken(String userToProxy,
- HiveConf hiveConf, String tokenSignatureOverwrite, final Logger logger)
- throws IOException, MetaException, TException {
-
- logger.info(HiveConf.ConfVars.METASTOREURIS.varname + ": "
- + hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname));
-
- logger.info(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname + ": "
- + hiveConf.get(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname));
-
- logger.info(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname + ": "
- + hiveConf.get(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname));
-
- HiveMetaStoreClient hiveClient = new HiveMetaStoreClient(hiveConf);
- String hcatTokenStr =
- hiveClient.getDelegationToken(userToProxy, UserGroupInformation
- .getLoginUser().getShortUserName());
- Token hcatToken =
- new Token();
- hcatToken.decodeFromUrlString(hcatTokenStr);
-
- // overwrite the value of the service property of the token if the signature
- // override is specified.
- if (tokenSignatureOverwrite != null
- && tokenSignatureOverwrite.trim().length() > 0) {
- hcatToken.setService(new Text(tokenSignatureOverwrite.trim()
- .toLowerCase()));
-
- logger.info(HIVE_TOKEN_SIGNATURE_KEY + ":"
- + (tokenSignatureOverwrite == null ? "" : tokenSignatureOverwrite));
- }
-
- logger.info("Created hive metastore token: " + hcatTokenStr);
- logger.info("Token kind: " + hcatToken.getKind());
- logger.info("Token id: " + hcatToken.getIdentifier());
- logger.info("Token service: " + hcatToken.getService());
- return hcatToken;
- }
-
- /*
- * Gets hadoop tokens for a user to run mapred/hive jobs on a secured cluster
- */
- @Override
- public synchronized void prefetchToken(final File tokenFile,
- final Props props, final Logger logger)
- throws HadoopSecurityManagerException {
-
- final String userToProxy = props.getString(USER_TO_PROXY);
-
- logger.info("Getting hadoop tokens based on props for " + userToProxy);
-
- final Credentials cred = new Credentials();
-
- if (props.getBoolean(OBTAIN_HCAT_TOKEN, false)) {
- try {
-
- // first we fetch and save the default hcat token.
- logger.info("Pre-fetching default Hive MetaStore token from hive");
-
- HiveConf hiveConf = new HiveConf();
- Token hcatToken =
- fetchHcatToken(userToProxy, hiveConf, null, logger);
-
- cred.addToken(hcatToken.getService(), hcatToken);
-
- // check and see if user specified the extra hcat locations we need to
- // look at and fetch token.
- final List extraHcatLocations =
- props.getStringList(EXTRA_HCAT_LOCATION);
- if (Collections.EMPTY_LIST != extraHcatLocations) {
- logger.info("Need to pre-fetch extra metaStore tokens from hive.");
-
- // start to process the user inputs.
- for (String thriftUrl : extraHcatLocations) {
- logger.info("Pre-fetching metaStore token from : " + thriftUrl);
-
- hiveConf = new HiveConf();
- hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, thriftUrl);
- hcatToken =
- fetchHcatToken(userToProxy, hiveConf, thriftUrl, logger);
- cred.addToken(hcatToken.getService(), hcatToken);
- }
-
- }
-
- } catch (Throwable t) {
- String message =
- "Failed to get hive metastore token." + t.getMessage()
- + t.getCause();
- logger.error(message, t);
- throw new HadoopSecurityManagerException(message);
- }
- }
-
- if (props.getBoolean(OBTAIN_JOBHISTORYSERVER_TOKEN, false)) {
- YarnRPC rpc = YarnRPC.create(conf);
- final String serviceAddr = conf.get(JHAdminConfig.MR_HISTORY_ADDRESS);
-
- logger.debug("Connecting to HistoryServer at: " + serviceAddr);
- HSClientProtocol hsProxy =
- (HSClientProtocol) rpc.getProxy(HSClientProtocol.class,
- NetUtils.createSocketAddr(serviceAddr), conf);
- logger.info("Pre-fetching JH token from job history server");
-
- Token> jhsdt = null;
- try {
- jhsdt = getDelegationTokenFromHS(hsProxy);
- } catch (Exception e) {
- logger.error("Failed to fetch JH token", e);
- throw new HadoopSecurityManagerException(
- "Failed to fetch JH token for " + userToProxy);
- }
-
- if (jhsdt == null) {
- logger.error("getDelegationTokenFromHS() returned null");
- throw new HadoopSecurityManagerException(
- "Unable to fetch JH token for " + userToProxy);
- }
-
- logger.info("Created JH token: " + jhsdt.toString());
- logger.info("Token kind: " + jhsdt.getKind());
- logger.info("Token id: " + jhsdt.getIdentifier());
- logger.info("Token service: " + jhsdt.getService());
-
- cred.addToken(jhsdt.getService(), jhsdt);
- }
-
- try {
- getProxiedUser(userToProxy).doAs(new PrivilegedExceptionAction() {
- @Override
- public Void run() throws Exception {
- getToken(userToProxy);
- return null;
- }
-
- private void getToken(String userToProxy) throws InterruptedException,
- IOException, HadoopSecurityManagerException {
- logger.info("Here is the props for " + OBTAIN_NAMENODE_TOKEN + ": "
- + props.getBoolean(OBTAIN_NAMENODE_TOKEN));
- if (props.getBoolean(OBTAIN_NAMENODE_TOKEN, false)) {
- FileSystem fs = FileSystem.get(conf);
- // check if we get the correct FS, and most importantly, the
- // conf
- logger.info("Getting DFS token from " + fs.getUri());
- Token> fsToken =
- fs.getDelegationToken(getMRTokenRenewerInternal(new JobConf())
- .toString());
- if (fsToken == null) {
- logger.error("Failed to fetch DFS token for ");
- throw new HadoopSecurityManagerException(
- "Failed to fetch DFS token for " + userToProxy);
- }
- logger.info("Created DFS token: " + fsToken.toString());
- logger.info("Token kind: " + fsToken.getKind());
- logger.info("Token id: " + fsToken.getIdentifier());
- logger.info("Token service: " + fsToken.getService());
-
- cred.addToken(fsToken.getService(), fsToken);
-
- // getting additional name nodes tokens
- String otherNamenodes = props.get(OTHER_NAMENODES_TO_GET_TOKEN);
- if ((otherNamenodes != null) && (otherNamenodes.length() > 0)) {
- logger.info(OTHER_NAMENODES_TO_GET_TOKEN + ": '" + otherNamenodes
- + "'");
- String[] nameNodeArr = otherNamenodes.split(",");
- Path[] ps = new Path[nameNodeArr.length];
- for (int i = 0; i < ps.length; i++) {
- ps[i] = new Path(nameNodeArr[i].trim());
- }
- TokenCache.obtainTokensForNamenodes(cred, ps, conf);
- logger.info("Successfully fetched tokens for: " + otherNamenodes);
- } else {
- logger.info(OTHER_NAMENODES_TO_GET_TOKEN + " was not configured");
- }
- }
-
- if (props.getBoolean(OBTAIN_JOBTRACKER_TOKEN, false)) {
- JobConf jobConf = new JobConf();
- JobClient jobClient = new JobClient(jobConf);
- logger.info("Pre-fetching JT token from JobTracker");
-
- Token mrdt =
- jobClient
- .getDelegationToken(getMRTokenRenewerInternal(jobConf));
- if (mrdt == null) {
- logger.error("Failed to fetch JT token");
- throw new HadoopSecurityManagerException(
- "Failed to fetch JT token for " + userToProxy);
- }
- logger.info("Created JT token: " + mrdt.toString());
- logger.info("Token kind: " + mrdt.getKind());
- logger.info("Token id: " + mrdt.getIdentifier());
- logger.info("Token service: " + mrdt.getService());
- cred.addToken(mrdt.getService(), mrdt);
- }
-
- }
- });
-
- FileOutputStream fos = null;
- DataOutputStream dos = null;
- try {
- fos = new FileOutputStream(tokenFile);
- dos = new DataOutputStream(fos);
- cred.writeTokenStorageToStream(dos);
- } finally {
- if (dos != null) {
- try {
- dos.close();
- } catch (Throwable t) {
- // best effort
- logger
- .error(
- "encountered exception while closing DataOutputStream of the tokenFile",
- t);
- }
- }
- if (fos != null) {
- fos.close();
- }
- }
- // stash them to cancel after use.
-
- logger.info("Tokens loaded in " + tokenFile.getAbsolutePath());
-
- } catch (Exception e) {
- throw new HadoopSecurityManagerException("Failed to get hadoop tokens! "
- + e.getMessage() + e.getCause(), e);
- } catch (Throwable t) {
- throw new HadoopSecurityManagerException("Failed to get hadoop tokens! "
- + t.getMessage() + t.getCause(), t);
- }
-
- }
-
- private Text getMRTokenRenewerInternal(JobConf jobConf) throws IOException {
- // Taken from Oozie
- //
- // Getting renewer correctly for JT principal also though JT in hadoop
- // 1.x does not have
- // support for renewing/cancelling tokens
- String servicePrincipal =
- jobConf.get(RM_PRINCIPAL, jobConf.get(JT_PRINCIPAL));
- Text renewer;
- if (servicePrincipal != null) {
- String target =
- jobConf.get(HADOOP_YARN_RM, jobConf.get(HADOOP_JOB_TRACKER_2));
- if (target == null) {
- target = jobConf.get(HADOOP_JOB_TRACKER);
- }
-
- String addr = NetUtils.createSocketAddr(target).getHostName();
- renewer =
- new Text(SecurityUtil.getServerPrincipal(servicePrincipal, addr));
- } else {
- // No security
- renewer = DEFAULT_RENEWER;
- }
-
- return renewer;
- }
-
- private Token> getDelegationTokenFromHS(HSClientProtocol hsProxy)
- throws IOException, InterruptedException {
- GetDelegationTokenRequest request =
- recordFactory.newRecordInstance(GetDelegationTokenRequest.class);
- request.setRenewer(Master.getMasterPrincipal(conf));
- org.apache.hadoop.yarn.api.records.Token mrDelegationToken;
- mrDelegationToken =
- hsProxy.getDelegationToken(request).getDelegationToken();
- return ConverterUtils.convertFromYarn(mrDelegationToken,
- hsProxy.getConnectAddress());
- }
-
- private void cancelDelegationTokenFromHS(
- final org.apache.hadoop.yarn.api.records.Token t, HSClientProtocol hsProxy)
- throws IOException, InterruptedException {
- CancelDelegationTokenRequest request =
- recordFactory.newRecordInstance(CancelDelegationTokenRequest.class);
- request.setDelegationToken(t);
- hsProxy.cancelDelegationToken(request);
- }
-
-}
diff --git a/plugins/hadoopsecuritymanager/build.properties b/plugins/hadoopsecuritymanager/build.properties
deleted file mode 100644
index 7c3115fc..00000000
--- a/plugins/hadoopsecuritymanager/build.properties
+++ /dev/null
@@ -1,3 +0,0 @@
-name=azkaban-hadoopsecuritymanager
-version=3.0.0
-
diff --git a/plugins/hadoopsecuritymanager/build.xml b/plugins/hadoopsecuritymanager/build.xml
deleted file mode 100644
index ea6f7f05..00000000
--- a/plugins/hadoopsecuritymanager/build.xml
+++ /dev/null
@@ -1,89 +0,0 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/plugins/hadoopsecuritymanager/lib/hadoop-core-1.0.3.jar b/plugins/hadoopsecuritymanager/lib/hadoop-core-1.0.3.jar
deleted file mode 100644
index 85092fea..00000000
Binary files a/plugins/hadoopsecuritymanager/lib/hadoop-core-1.0.3.jar and /dev/null differ
diff --git a/plugins/hadoopsecuritymanager/lib/hive-common-0.10.0.jar b/plugins/hadoopsecuritymanager/lib/hive-common-0.10.0.jar
deleted file mode 100644
index 956eff90..00000000
Binary files a/plugins/hadoopsecuritymanager/lib/hive-common-0.10.0.jar and /dev/null differ
diff --git a/plugins/hadoopsecuritymanager/lib/hive-metastore-0.10.0.jar b/plugins/hadoopsecuritymanager/lib/hive-metastore-0.10.0.jar
deleted file mode 100644
index a54851d9..00000000
Binary files a/plugins/hadoopsecuritymanager/lib/hive-metastore-0.10.0.jar and /dev/null differ
diff --git a/plugins/hadoopsecuritymanager/lib/libthrift-0.9.0.jar b/plugins/hadoopsecuritymanager/lib/libthrift-0.9.0.jar
deleted file mode 100644
index 58d16357..00000000
Binary files a/plugins/hadoopsecuritymanager/lib/libthrift-0.9.0.jar and /dev/null differ
diff --git a/plugins/hadoopsecuritymanager/src/azkaban/security/DefaultHadoopSecurityManager.java b/plugins/hadoopsecuritymanager/src/azkaban/security/DefaultHadoopSecurityManager.java
deleted file mode 100644
index bfdbca1e..00000000
--- a/plugins/hadoopsecuritymanager/src/azkaban/security/DefaultHadoopSecurityManager.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Copright 2011 LinkedIn Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package azkaban.security;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.log4j.Logger;
-
-import azkaban.security.commons.HadoopSecurityManager;
-import azkaban.security.commons.HadoopSecurityManagerException;
-import azkaban.utils.Props;
-
-/**
- * This is just get by compile, or in the case of no hadoop installation.
- * Otherwise, there needs to be a real HadoopSecurityManger plugin for the
- * hadoop installation, even if the hadoop installation is not security enabled.
- */
-public class DefaultHadoopSecurityManager extends HadoopSecurityManager {
-
- private static final Logger logger = Logger
- .getLogger(DefaultHadoopSecurityManager.class);
-
- private static HadoopSecurityManager hsmInstance = null;
-
- private DefaultHadoopSecurityManager(Props props) {
- logger.info("Default Hadoop Security Manager is used. Only do this on "
- + "a non-hadoop cluster!");
- }
-
- public static HadoopSecurityManager getInstance(Props props)
- throws HadoopSecurityManagerException, IOException {
- if (hsmInstance == null) {
- synchronized (DefaultHadoopSecurityManager.class) {
- if (hsmInstance == null) {
- logger.info("getting new instance");
- hsmInstance = new DefaultHadoopSecurityManager(props);
- }
- }
- }
- return hsmInstance;
- }
-
- @Override
- public UserGroupInformation getProxiedUser(String toProxy)
- throws HadoopSecurityManagerException {
- throw new HadoopSecurityManagerException(
- "No real Hadoop Security Manager is set!");
- }
-
- /**
- * Create a proxied user, taking all parameters, including which user to proxy
- * from provided Properties.
- */
- @Override
- public UserGroupInformation getProxiedUser(Props prop)
- throws HadoopSecurityManagerException {
- throw new HadoopSecurityManagerException(
- "No real Hadoop Security Manager is set!");
- }
-
- @Override
- public boolean isHadoopSecurityEnabled()
- throws HadoopSecurityManagerException {
- throw new HadoopSecurityManagerException(
- "No real Hadoop Security Manager is set!");
- }
-
- @Override
- public FileSystem getFSAsUser(String user)
- throws HadoopSecurityManagerException {
- throw new HadoopSecurityManagerException(
- "No real Hadoop Security Manager is set!");
- }
-
- @Override
- public void prefetchToken(File tokenFile, String userToProxy, Logger logger)
- throws HadoopSecurityManagerException {
- throw new HadoopSecurityManagerException(
- "No real Hadoop Security Manager is set!");
- }
-
- @Override
- public void cancelTokens(File tokenFile, String userToProxy, Logger logger)
- throws HadoopSecurityManagerException {
- }
-
- @Override
- public void prefetchToken(File tokenFile, Props props, Logger logger)
- throws HadoopSecurityManagerException {
- throw new HadoopSecurityManagerException(
- "No real Hadoop Security Manager is set!");
- }
-
-}
diff --git a/plugins/hadoopsecuritymanager/src/azkaban/security/HadoopSecurityManager.java b/plugins/hadoopsecuritymanager/src/azkaban/security/HadoopSecurityManager.java
deleted file mode 100644
index 91a9f64c..00000000
--- a/plugins/hadoopsecuritymanager/src/azkaban/security/HadoopSecurityManager.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Copyright 2011 LinkedIn Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package azkaban.security;
-
-import java.io.File;
-import java.util.Properties;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.log4j.Logger;
-
-import azkaban.utils.Props;
-
-public abstract class HadoopSecurityManager {
-
- public static final String ENABLE_PROXYING = "azkaban.should.proxy"; // boolean
-
- public static final String PROXY_KEYTAB_LOCATION = "proxy.keytab.location";
- public static final String PROXY_USER = "proxy.user";
- public static final String USER_TO_PROXY = "user.to.proxy";
- public static final String OBTAIN_BINARY_TOKEN = "obtain.binary.token";
- public static final String MAPREDUCE_JOB_CREDENTIALS_BINARY =
- "mapreduce.job.credentials.binary";
-
- public boolean isHadoopSecurityEnabled()
- throws HadoopSecurityManagerException {
- return false;
- }
-
- /**
- * Create a proxied user based on the explicit user name, taking other
- * parameters necessary from properties file.
- */
- public abstract UserGroupInformation getProxiedUser(String toProxy)
- throws HadoopSecurityManagerException;
-
- /**
- * Create a proxied user, taking all parameters, including which user to proxy
- * from provided Properties.
- */
- public abstract UserGroupInformation getProxiedUser(Props prop)
- throws HadoopSecurityManagerException;
-
- public abstract FileSystem getFSAsUser(String user)
- throws HadoopSecurityManagerException;
-
- public static boolean shouldProxy(Properties prop) {
- String shouldProxy = prop.getProperty(ENABLE_PROXYING);
-
- return shouldProxy != null && shouldProxy.equals("true");
- }
-
- public abstract void prefetchToken(File tokenFile, String userToProxy,
- Logger logger) throws HadoopSecurityManagerException;
-
- public abstract void cancelTokens(File tokenFile, String userToProxy,
- Logger logger) throws HadoopSecurityManagerException;
-
-}
diff --git a/plugins/hadoopsecuritymanager/src/azkaban/security/HadoopSecurityManagerException.java b/plugins/hadoopsecuritymanager/src/azkaban/security/HadoopSecurityManagerException.java
deleted file mode 100644
index bb7699e4..00000000
--- a/plugins/hadoopsecuritymanager/src/azkaban/security/HadoopSecurityManagerException.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Copyright 2012 LinkedIn Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package azkaban.security;
-
-public class HadoopSecurityManagerException extends Exception {
- private static final long serialVersionUID = 1L;
-
- public HadoopSecurityManagerException(String message) {
- super(message);
- }
-
- public HadoopSecurityManagerException(String message, Throwable cause) {
- super(message, cause);
- }
-}
diff --git a/plugins/hadoopsecuritymanager/src/azkaban/security/HadoopSecurityManager_H_1_0.java b/plugins/hadoopsecuritymanager/src/azkaban/security/HadoopSecurityManager_H_1_0.java
deleted file mode 100644
index 3c66f5ec..00000000
--- a/plugins/hadoopsecuritymanager/src/azkaban/security/HadoopSecurityManager_H_1_0.java
+++ /dev/null
@@ -1,582 +0,0 @@
-/*
- * Copyright 2011 LinkedIn Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package azkaban.security;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.log4j.Logger;
-
-import azkaban.security.commons.HadoopSecurityManager;
-import azkaban.security.commons.HadoopSecurityManagerException;
-import azkaban.utils.Props;
-import azkaban.utils.UndefinedPropertyException;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.security.PrivilegedAction;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-public class HadoopSecurityManager_H_1_0 extends HadoopSecurityManager {
-
- private UserGroupInformation loginUser = null;
- private final static Logger logger = Logger
- .getLogger(HadoopSecurityManager.class);
- private Configuration conf;
-
- private String keytabLocation;
- private String keytabPrincipal;
- private boolean shouldProxy = false;
- private boolean securityEnabled = false;
-
- private static HadoopSecurityManager hsmInstance = null;
- private ConcurrentMap userUgiMap;
-
- private static final String HIVE_METASTORE_SASL_ENABLED =
- "hive.metastore.sasl.enabled";
- private static final String HIVE_METASTORE_KERBEROS_PRINCIPAL =
- "hive.metastore.kerberos.principal";
- private static final String HIVE_METASTORE_LOCAL = "hive.metastore.local";
-
- private static URLClassLoader ucl;
-
- private HadoopSecurityManager_H_1_0(Props props)
- throws HadoopSecurityManagerException, IOException {
-
- // for now, assume the same/compatible native library, the same/compatible
- // hadoop-core jar
- String hadoopHome = props.getString("hadoop.home", null);
- String hadoopConfDir = props.getString("hadoop.conf.dir", null);
-
- if (hadoopHome == null) {
- hadoopHome = System.getenv("HADOOP_HOME");
- }
- if (hadoopConfDir == null) {
- hadoopConfDir = System.getenv("HADOOP_CONF_DIR");
- }
-
- List resources = new ArrayList();
- if (hadoopConfDir != null) {
- logger.info("Using hadoop config found in "
- + new File(hadoopConfDir).toURI().toURL());
- resources.add(new File(hadoopConfDir).toURI().toURL());
- } else if (hadoopHome != null) {
- logger.info("Using hadoop config found in "
- + new File(hadoopHome, "conf").toURI().toURL());
- resources.add(new File(hadoopHome, "conf").toURI().toURL());
- } else {
- logger.info("HADOOP_HOME not set, using default hadoop config.");
- }
-
- ucl = new URLClassLoader(resources.toArray(new URL[resources.size()]));
-
- conf = new Configuration();
- conf.setClassLoader(ucl);
-
- if (props.containsKey("fs.hdfs.impl.disable.cache")) {
- logger.info("Setting fs.hdfs.impl.disable.cache to "
- + props.get("fs.hdfs.impl.disable.cache"));
- conf.setBoolean("fs.hdfs.impl.disable.cache",
- Boolean.valueOf(props.get("fs.hdfs.impl.disable.cache")));
- }
-
- logger.info("hadoop.security.authentication set to "
- + conf.get("hadoop.security.authentication"));
- logger.info("hadoop.security.authorization set to "
- + conf.get("hadoop.security.authorization"));
- logger.info("DFS name " + conf.get("fs.default.name"));
-
- UserGroupInformation.setConfiguration(conf);
-
- securityEnabled = UserGroupInformation.isSecurityEnabled();
- if (securityEnabled) {
- logger.info("The Hadoop cluster has enabled security");
- shouldProxy = true;
- try {
- keytabLocation = props.getString(PROXY_KEYTAB_LOCATION);
- keytabPrincipal = props.getString(PROXY_USER);
- } catch (UndefinedPropertyException e) {
- throw new HadoopSecurityManagerException(e.getMessage());
- }
-
- // try login
- try {
- if (loginUser == null) {
- logger.info("No login user. Creating login user");
- logger.info("Logging with " + keytabPrincipal + " and "
- + keytabLocation);
- UserGroupInformation.loginUserFromKeytab(keytabPrincipal,
- keytabLocation);
- loginUser = UserGroupInformation.getLoginUser();
- logger.info("Logged in with user " + loginUser);
- } else {
- logger.info("loginUser (" + loginUser
- + ") already created, refreshing tgt.");
- loginUser.checkTGTAndReloginFromKeytab();
- }
- } catch (IOException e) {
- throw new HadoopSecurityManagerException(
- "Failed to login with kerberos ", e);
- }
-
- }
-
- userUgiMap = new ConcurrentHashMap();
-
- logger.info("Hadoop Security Manager Initiated");
- }
-
- public static HadoopSecurityManager getInstance(Props props)
- throws HadoopSecurityManagerException, IOException {
- if (hsmInstance == null) {
- synchronized (HadoopSecurityManager_H_1_0.class) {
- if (hsmInstance == null) {
- logger.info("getting new instance");
- hsmInstance = new HadoopSecurityManager_H_1_0(props);
- }
- }
- }
- return hsmInstance;
- }
-
- /**
- * Create a proxied user based on the explicit user name, taking other
- * parameters necessary from properties file.
- *
- * @throws IOException
- */
- @Override
- public synchronized UserGroupInformation getProxiedUser(String userToProxy)
- throws HadoopSecurityManagerException {
-
- if (userToProxy == null) {
- throw new HadoopSecurityManagerException("userToProxy can't be null");
- }
-
- UserGroupInformation ugi = userUgiMap.get(userToProxy);
- if (ugi == null) {
- logger.info("proxy user " + userToProxy
- + " not exist. Creating new proxy user");
- if (shouldProxy) {
- try {
- ugi =
- UserGroupInformation.createProxyUser(userToProxy,
- UserGroupInformation.getLoginUser());
- } catch (IOException e) {
- e.printStackTrace();
- throw new HadoopSecurityManagerException(
- "Failed to create proxy user", e);
- }
- } else {
- ugi = UserGroupInformation.createRemoteUser(userToProxy);
- }
- userUgiMap.putIfAbsent(userToProxy, ugi);
- }
- return ugi;
- }
-
- /**
- * Create a proxied user, taking all parameters, including which user to proxy
- * from provided Properties.
- */
- @Override
- public UserGroupInformation getProxiedUser(Props userProp)
- throws HadoopSecurityManagerException {
- String userToProxy = verifySecureProperty(userProp, USER_TO_PROXY);
- UserGroupInformation user = getProxiedUser(userToProxy);
- if (user == null)
- throw new HadoopSecurityManagerException(
- "Proxy as any user in unsecured grid is not supported!");
- return user;
- }
-
- public String verifySecureProperty(Props props, String s)
- throws HadoopSecurityManagerException {
- String value = props.getString(s);
- if (value == null) {
- throw new HadoopSecurityManagerException(s + " not set in properties.");
- }
- // logger.info("Secure proxy configuration: Property " + s + " = " + value);
- return value;
- }
-
- @Override
- public FileSystem getFSAsUser(String user)
- throws HadoopSecurityManagerException {
- FileSystem fs;
- try {
- logger.info("Getting file system as " + user);
- UserGroupInformation ugi = getProxiedUser(user);
-
- if (ugi != null) {
- fs = ugi.doAs(new PrivilegedAction() {
-
- @Override
- public FileSystem run() {
- try {
- return FileSystem.get(conf);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- });
- } else {
- fs = FileSystem.get(conf);
- }
- } catch (Exception e) {
- throw new HadoopSecurityManagerException("Failed to get FileSystem. ", e);
- }
- return fs;
- }
-
- public boolean shouldProxy() {
- return shouldProxy;
- }
-
- @Override
- public boolean isHadoopSecurityEnabled() {
- return securityEnabled;
- }
-
- /*
- * Gets hadoop tokens for a user to run mapred/pig jobs on a secured cluster
- */
- @Override
- public synchronized void prefetchToken(final File tokenFile,
- final String userToProxy, final Logger logger)
- throws HadoopSecurityManagerException {
-
- logger.info("Getting hadoop tokens for " + userToProxy);
-
- try {
- getProxiedUser(userToProxy).doAs(
- new PrivilegedExceptionAction() {
- @Override
- public Void run() throws Exception {
- getToken(userToProxy);
- return null;
- }
-
- private void getToken(String userToProxy)
- throws InterruptedException, IOException,
- HadoopSecurityManagerException {
-
- FileSystem fs = FileSystem.get(conf);
- // check if we get the correct FS, and most importantly, the conf
- logger.info("Getting DFS token from "
- + fs.getCanonicalServiceName() + fs.getUri());
- Token> fsToken = fs.getDelegationToken(userToProxy);
- if (fsToken == null) {
- logger.error("Failed to fetch DFS token for ");
- throw new HadoopSecurityManagerException(
- "Failed to fetch DFS token for " + userToProxy);
- }
- logger.info("Created DFS token: " + fsToken.toString());
- logger.info("Token kind: " + fsToken.getKind());
- logger.info("Token id: " + fsToken.getIdentifier());
- logger.info("Token service: " + fsToken.getService());
-
- JobConf jc = new JobConf(conf);
- JobClient jobClient = new JobClient(jc);
- logger.info("Pre-fetching JT token: Got new JobClient: " + jc);
-
- Token mrdt =
- jobClient.getDelegationToken(new Text("mr token"));
- if (mrdt == null) {
- logger.error("Failed to fetch JT token for ");
- throw new HadoopSecurityManagerException(
- "Failed to fetch JT token for " + userToProxy);
- }
- logger.info("Created JT token: " + mrdt.toString());
- logger.info("Token kind: " + mrdt.getKind());
- logger.info("Token id: " + mrdt.getIdentifier());
- logger.info("Token service: " + mrdt.getService());
-
- jc.getCredentials().addToken(mrdt.getService(), mrdt);
- jc.getCredentials().addToken(fsToken.getService(), fsToken);
-
- FileOutputStream fos = null;
- DataOutputStream dos = null;
- try {
- fos = new FileOutputStream(tokenFile);
- dos = new DataOutputStream(fos);
- jc.getCredentials().writeTokenStorageToStream(dos);
- } finally {
- if (dos != null) {
- dos.close();
- }
- if (fos != null) {
- fos.close();
- }
- }
- // stash them to cancel after use.
- logger.info("Tokens loaded in " + tokenFile.getAbsolutePath());
- }
- });
- } catch (Exception e) {
- e.printStackTrace();
- throw new HadoopSecurityManagerException("Failed to get hadoop tokens! "
- + e.getMessage() + e.getCause());
-
- }
- }
-
- private void cancelNameNodeToken(final Token extends TokenIdentifier> t,
- String userToProxy) throws HadoopSecurityManagerException {
- try {
- getProxiedUser(userToProxy).doAs(new PrivilegedExceptionAction() {
- @Override
- public Void run() throws Exception {
- cancelToken(t);
- return null;
- }
-
- private void cancelToken(Token> nt) throws IOException,
- InterruptedException {
- nt.cancel(conf);
- }
- });
- } catch (Exception e) {
- e.printStackTrace();
- throw new HadoopSecurityManagerException("Failed to cancel Token. "
- + e.getMessage() + e.getCause());
- }
- }
-
- private void cancelMRJobTrackerToken(
- final Token extends TokenIdentifier> t, String userToProxy)
- throws HadoopSecurityManagerException {
- try {
- getProxiedUser(userToProxy).doAs(new PrivilegedExceptionAction() {
- @SuppressWarnings("unchecked")
- @Override
- public Void run() throws Exception {
- cancelToken((Token) t);
- return null;
- }
-
- private void cancelToken(Token jt)
- throws IOException, InterruptedException {
- JobConf jc = new JobConf(conf);
- JobClient jobClient = new JobClient(jc);
- jobClient.cancelDelegationToken(jt);
- }
- });
- } catch (Exception e) {
- e.printStackTrace();
- throw new HadoopSecurityManagerException("Failed to cancel Token. "
- + e.getMessage() + e.getCause());
- }
- }
-
- private void cancelHiveToken(final Token extends TokenIdentifier> t,
- String userToProxy) throws HadoopSecurityManagerException {
- try {
- HiveConf hiveConf = new HiveConf();
- HiveMetaStoreClient hiveClient = new HiveMetaStoreClient(hiveConf);
- hiveClient.cancelDelegationToken(t.encodeToUrlString());
- } catch (Exception e) {
- e.printStackTrace();
- throw new HadoopSecurityManagerException("Failed to cancel Token. "
- + e.getMessage() + e.getCause());
- }
- }
-
- @Override
- public void cancelTokens(File tokenFile, String userToProxy, Logger logger)
- throws HadoopSecurityManagerException {
- // nntoken
- Credentials cred = null;
- try {
- cred =
- Credentials.readTokenStorageFile(new Path(tokenFile.toURI()),
- new Configuration());
- for (Token extends TokenIdentifier> t : cred.getAllTokens()) {
- logger.info("Got token: " + t.toString());
- logger.info("Token kind: " + t.getKind());
- logger.info("Token id: " + new String(t.getIdentifier()));
- logger.info("Token service: " + t.getService());
- if (t.getKind().equals(new Text("HIVE_DELEGATION_TOKEN"))) {
- logger.info("Cancelling hive token " + new String(t.getIdentifier()));
- cancelHiveToken(t, userToProxy);
- } else if (t.getKind().equals(new Text("MAPREDUCE_DELEGATION_TOKEN"))) {
- logger.info("Cancelling mr job tracker token "
- + new String(t.getIdentifier()));
- cancelMRJobTrackerToken(t, userToProxy);
- } else if (t.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) {
- logger.info("Cancelling namenode token "
- + new String(t.getIdentifier()));
- cancelNameNodeToken(t, userToProxy);
- } else {
- logger.info("unknown token type " + t.getKind());
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- }
-
- /*
- * Gets hadoop tokens for a user to run mapred/hive jobs on a secured cluster
- */
- @Override
- public synchronized void prefetchToken(final File tokenFile,
- final Props props, final Logger logger)
- throws HadoopSecurityManagerException {
-
- final String userToProxy = props.getString(USER_TO_PROXY);
-
- logger.info("Getting hadoop tokens for " + userToProxy);
-
- final Credentials cred = new Credentials();
-
- if (props.getBoolean(OBTAIN_HCAT_TOKEN, false)) {
- try {
- logger.info("Pre-fetching Hive MetaStore token from hive");
-
- HiveConf hiveConf = new HiveConf();
- logger.info("HiveConf.ConfVars.METASTOREURIS.varname "
- + hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname));
- logger.info("HIVE_METASTORE_SASL_ENABLED "
- + hiveConf.get(HIVE_METASTORE_SASL_ENABLED));
- logger.info("HIVE_METASTORE_KERBEROS_PRINCIPAL "
- + hiveConf.get(HIVE_METASTORE_KERBEROS_PRINCIPAL));
- logger.info("HIVE_METASTORE_LOCAL "
- + hiveConf.get(HIVE_METASTORE_LOCAL));
-
- HiveMetaStoreClient hiveClient = new HiveMetaStoreClient(hiveConf);
- String hcatTokenStr =
- hiveClient.getDelegationToken(userToProxy, UserGroupInformation
- .getLoginUser().getShortUserName());
- Token hcatToken =
- new Token();
- hcatToken.decodeFromUrlString(hcatTokenStr);
- logger.info("Created hive metastore token: " + hcatTokenStr);
- logger.info("Token kind: " + hcatToken.getKind());
- logger.info("Token id: " + hcatToken.getIdentifier());
- logger.info("Token service: " + hcatToken.getService());
- cred.addToken(hcatToken.getService(), hcatToken);
- } catch (Exception e) {
- e.printStackTrace();
- logger.error("Failed to get hive metastore token." + e.getMessage()
- + e.getCause());
- } catch (Throwable t) {
- t.printStackTrace();
- logger.error("Failed to get hive metastore token." + t.getMessage()
- + t.getCause());
- }
- }
-
- try {
- getProxiedUser(userToProxy).doAs(
- new PrivilegedExceptionAction() {
- @Override
- public Void run() throws Exception {
- getToken(userToProxy);
- return null;
- }
-
- private void getToken(String userToProxy)
- throws InterruptedException, IOException,
- HadoopSecurityManagerException {
- logger.info("Here is the props for " + OBTAIN_NAMENODE_TOKEN
- + ": " + props.getBoolean(OBTAIN_NAMENODE_TOKEN));
- if (props.getBoolean(OBTAIN_NAMENODE_TOKEN, false)) {
- FileSystem fs = FileSystem.get(conf);
- // check if we get the correct FS, and most importantly, the
- // conf
- logger.info("Getting DFS token from " + fs.getUri());
- Token> fsToken = fs.getDelegationToken(userToProxy);
- if (fsToken == null) {
- logger.error("Failed to fetch DFS token for ");
- throw new HadoopSecurityManagerException(
- "Failed to fetch DFS token for " + userToProxy);
- }
- logger.info("Created DFS token: " + fsToken.toString());
- logger.info("Token kind: " + fsToken.getKind());
- logger.info("Token id: " + fsToken.getIdentifier());
- logger.info("Token service: " + fsToken.getService());
- cred.addToken(fsToken.getService(), fsToken);
- }
-
- if (props.getBoolean(OBTAIN_JOBTRACKER_TOKEN, false)) {
- JobClient jobClient = new JobClient(new JobConf());
- logger.info("Pre-fetching JT token from JobTracker");
-
- Token mrdt =
- jobClient.getDelegationToken(new Text("mr token"));
- if (mrdt == null) {
- logger.error("Failed to fetch JT token");
- throw new HadoopSecurityManagerException(
- "Failed to fetch JT token for " + userToProxy);
- }
- logger.info("Created JT token: " + mrdt.toString());
- logger.info("Token kind: " + mrdt.getKind());
- logger.info("Token id: " + mrdt.getIdentifier());
- logger.info("Token service: " + mrdt.getService());
- cred.addToken(mrdt.getService(), mrdt);
- }
- }
- });
-
- FileOutputStream fos = null;
- DataOutputStream dos = null;
- try {
- fos = new FileOutputStream(tokenFile);
- dos = new DataOutputStream(fos);
- cred.writeTokenStorageToStream(dos);
- } finally {
- if (dos != null) {
- dos.close();
- }
- if (fos != null) {
- fos.close();
- }
- }
-
- // stash them to cancel after use.
- logger.info("Tokens loaded in " + tokenFile.getAbsolutePath());
-
- } catch (Exception e) {
- e.printStackTrace();
- throw new HadoopSecurityManagerException("Failed to get hadoop tokens! "
- + e.getMessage() + e.getCause());
- } catch (Throwable t) {
- t.printStackTrace();
- throw new HadoopSecurityManagerException("Failed to get hadoop tokens! "
- + t.getMessage() + t.getCause());
- }
- }
-}
diff --git a/plugins/jobtype/build.xml b/plugins/jobtype/build.xml
index 84ddb4b8..39246e58 100644
--- a/plugins/jobtype/build.xml
+++ b/plugins/jobtype/build.xml
@@ -71,15 +71,6 @@
-
-
-
-
-
-
-
-
-
@@ -177,22 +168,6 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-