1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.deploy.yarn.security 19 20import scala.reflect.runtime.universe 21import scala.util.control.NonFatal 22 23import org.apache.hadoop.conf.Configuration 24import org.apache.hadoop.security.Credentials 25import org.apache.hadoop.security.token.{Token, TokenIdentifier} 26 27import org.apache.spark.SparkConf 28import org.apache.spark.internal.Logging 29import org.apache.spark.util.Utils 30 31private[security] class HBaseCredentialProvider extends ServiceCredentialProvider with Logging { 32 33 override def serviceName: String = "hbase" 34 35 override def obtainCredentials( 36 hadoopConf: Configuration, 37 sparkConf: SparkConf, 38 creds: Credentials): Option[Long] = { 39 try { 40 val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader) 41 val obtainToken = mirror.classLoader. 42 loadClass("org.apache.hadoop.hbase.security.token.TokenUtil"). 43 getMethod("obtainToken", classOf[Configuration]) 44 45 logDebug("Attempting to fetch HBase security token.") 46 val token = obtainToken.invoke(null, hbaseConf(hadoopConf)) 47 .asInstanceOf[Token[_ <: TokenIdentifier]] 48 logInfo(s"Get token from HBase: ${token.toString}") 49 creds.addToken(token.getService, token) 50 } catch { 51 case NonFatal(e) => 52 logDebug(s"Failed to get token from service $serviceName", e) 53 } 54 55 None 56 } 57 58 override def credentialsRequired(hadoopConf: Configuration): Boolean = { 59 hbaseConf(hadoopConf).get("hbase.security.authentication") == "kerberos" 60 } 61 62 private def hbaseConf(conf: Configuration): Configuration = { 63 try { 64 val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader) 65 val confCreate = mirror.classLoader. 66 loadClass("org.apache.hadoop.hbase.HBaseConfiguration"). 67 getMethod("create", classOf[Configuration]) 68 confCreate.invoke(null, conf).asInstanceOf[Configuration] 69 } catch { 70 case NonFatal(e) => 71 logDebug("Fail to invoke HBaseConfiguration", e) 72 conf 73 } 74 } 75} 76