1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.deploy.yarn.security
19
20import scala.reflect.runtime.universe
21import scala.util.control.NonFatal
22
23import org.apache.hadoop.conf.Configuration
24import org.apache.hadoop.security.Credentials
25import org.apache.hadoop.security.token.{Token, TokenIdentifier}
26
27import org.apache.spark.SparkConf
28import org.apache.spark.internal.Logging
29import org.apache.spark.util.Utils
30
31private[security] class HBaseCredentialProvider extends ServiceCredentialProvider with Logging {
32
33  override def serviceName: String = "hbase"
34
35  override def obtainCredentials(
36      hadoopConf: Configuration,
37      sparkConf: SparkConf,
38      creds: Credentials): Option[Long] = {
39    try {
40      val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
41      val obtainToken = mirror.classLoader.
42        loadClass("org.apache.hadoop.hbase.security.token.TokenUtil").
43        getMethod("obtainToken", classOf[Configuration])
44
45      logDebug("Attempting to fetch HBase security token.")
46      val token = obtainToken.invoke(null, hbaseConf(hadoopConf))
47        .asInstanceOf[Token[_ <: TokenIdentifier]]
48      logInfo(s"Get token from HBase: ${token.toString}")
49      creds.addToken(token.getService, token)
50    } catch {
51      case NonFatal(e) =>
52        logDebug(s"Failed to get token from service $serviceName", e)
53    }
54
55    None
56  }
57
58  override def credentialsRequired(hadoopConf: Configuration): Boolean = {
59    hbaseConf(hadoopConf).get("hbase.security.authentication") == "kerberos"
60  }
61
62  private def hbaseConf(conf: Configuration): Configuration = {
63    try {
64      val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
65      val confCreate = mirror.classLoader.
66        loadClass("org.apache.hadoop.hbase.HBaseConfiguration").
67        getMethod("create", classOf[Configuration])
68      confCreate.invoke(null, conf).asInstanceOf[Configuration]
69    } catch {
70      case NonFatal(e) =>
71        logDebug("Fail to invoke HBaseConfiguration", e)
72        conf
73    }
74  }
75}
76