1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.sql.hive.client
19
20import java.io.File
21import java.lang.reflect.InvocationTargetException
22import java.net.{URL, URLClassLoader}
23import java.util
24
25import scala.language.reflectiveCalls
26import scala.util.Try
27
28import org.apache.commons.io.{FileUtils, IOUtils}
29import org.apache.hadoop.conf.Configuration
30
31import org.apache.spark.SparkConf
32import org.apache.spark.deploy.SparkSubmitUtils
33import org.apache.spark.internal.Logging
34import org.apache.spark.sql.catalyst.util.quietly
35import org.apache.spark.sql.hive.HiveUtils
36import org.apache.spark.sql.internal.NonClosableMutableURLClassLoader
37import org.apache.spark.util.{MutableURLClassLoader, Utils}
38
39/** Factory for `IsolatedClientLoader` with specific versions of hive. */
40private[hive] object IsolatedClientLoader extends Logging {
41  /**
42   * Creates isolated Hive client loaders by downloading the requested version from maven.
43   */
44  def forVersion(
45      hiveMetastoreVersion: String,
46      hadoopVersion: String,
47      sparkConf: SparkConf,
48      hadoopConf: Configuration,
49      config: Map[String, String] = Map.empty,
50      ivyPath: Option[String] = None,
51      sharedPrefixes: Seq[String] = Seq.empty,
52      barrierPrefixes: Seq[String] = Seq.empty): IsolatedClientLoader = synchronized {
53    val resolvedVersion = hiveVersion(hiveMetastoreVersion)
54    // We will first try to share Hadoop classes. If we cannot resolve the Hadoop artifact
55    // with the given version, we will use Hadoop 2.4.0 and then will not share Hadoop classes.
56    var sharesHadoopClasses = true
57    val files = if (resolvedVersions.contains((resolvedVersion, hadoopVersion))) {
58      resolvedVersions((resolvedVersion, hadoopVersion))
59    } else {
60      val (downloadedFiles, actualHadoopVersion) =
61        try {
62          (downloadVersion(resolvedVersion, hadoopVersion, ivyPath), hadoopVersion)
63        } catch {
64          case e: RuntimeException if e.getMessage.contains("hadoop") =>
65            // If the error message contains hadoop, it is probably because the hadoop
66            // version cannot be resolved (e.g. it is a vendor specific version like
67            // 2.0.0-cdh4.1.1). If it is the case, we will try just
68            // "org.apache.hadoop:hadoop-client:2.4.0". "org.apache.hadoop:hadoop-client:2.4.0"
69            // is used just because we used to hard code it as the hadoop artifact to download.
70            logWarning(s"Failed to resolve Hadoop artifacts for the version ${hadoopVersion}. " +
71              s"We will change the hadoop version from ${hadoopVersion} to 2.4.0 and try again. " +
72              "Hadoop classes will not be shared between Spark and Hive metastore client. " +
73              "It is recommended to set jars used by Hive metastore client through " +
74              "spark.sql.hive.metastore.jars in the production environment.")
75            sharesHadoopClasses = false
76            (downloadVersion(resolvedVersion, "2.4.0", ivyPath), "2.4.0")
77        }
78      resolvedVersions.put((resolvedVersion, actualHadoopVersion), downloadedFiles)
79      resolvedVersions((resolvedVersion, actualHadoopVersion))
80    }
81
82    new IsolatedClientLoader(
83      hiveVersion(hiveMetastoreVersion),
84      sparkConf,
85      execJars = files,
86      hadoopConf = hadoopConf,
87      config = config,
88      sharesHadoopClasses = sharesHadoopClasses,
89      sharedPrefixes = sharedPrefixes,
90      barrierPrefixes = barrierPrefixes)
91  }
92
93  def hiveVersion(version: String): HiveVersion = version match {
94    case "12" | "0.12" | "0.12.0" => hive.v12
95    case "13" | "0.13" | "0.13.0" | "0.13.1" => hive.v13
96    case "14" | "0.14" | "0.14.0" => hive.v14
97    case "1.0" | "1.0.0" => hive.v1_0
98    case "1.1" | "1.1.0" => hive.v1_1
99    case "1.2" | "1.2.0" | "1.2.1" => hive.v1_2
100  }
101
102  private def downloadVersion(
103      version: HiveVersion,
104      hadoopVersion: String,
105      ivyPath: Option[String]): Seq[URL] = {
106    val hiveArtifacts = version.extraDeps ++
107      Seq("hive-metastore", "hive-exec", "hive-common", "hive-serde")
108        .map(a => s"org.apache.hive:$a:${version.fullVersion}") ++
109      Seq("com.google.guava:guava:14.0.1",
110        s"org.apache.hadoop:hadoop-client:$hadoopVersion")
111
112    val classpath = quietly {
113      SparkSubmitUtils.resolveMavenCoordinates(
114        hiveArtifacts.mkString(","),
115        Some("http://www.datanucleus.org/downloads/maven2"),
116        ivyPath,
117        exclusions = version.exclusions)
118    }
119    val allFiles = classpath.split(",").map(new File(_)).toSet
120
121    // TODO: Remove copy logic.
122    val tempDir = Utils.createTempDir(namePrefix = s"hive-${version}")
123    allFiles.foreach(f => FileUtils.copyFileToDirectory(f, tempDir))
124    tempDir.listFiles().map(_.toURI.toURL)
125  }
126
127  // A map from a given pair of HiveVersion and Hadoop version to jar files.
128  // It is only used by forVersion.
129  private val resolvedVersions =
130    new scala.collection.mutable.HashMap[(HiveVersion, String), Seq[URL]]
131}
132
133/**
134 * Creates a [[HiveClient]] using a classloader that works according to the following rules:
135 *  - Shared classes: Java, Scala, logging, and Spark classes are delegated to `baseClassLoader`
136 *    allowing the results of calls to the [[HiveClient]] to be visible externally.
137 *  - Hive classes: new instances are loaded from `execJars`.  These classes are not
138 *    accessible externally due to their custom loading.
139 *  - [[HiveClientImpl]]: a new copy is created for each instance of `IsolatedClassLoader`.
140 *    This new instance is able to see a specific version of hive without using reflection. Since
141 *    this is a unique instance, it is not visible externally other than as a generic
142 *    [[HiveClient]], unless `isolationOn` is set to `false`.
143 *
144 * @param version The version of hive on the classpath.  used to pick specific function signatures
145 *                that are not compatible across versions.
146 * @param execJars A collection of jar files that must include hive and hadoop.
147 * @param config   A set of options that will be added to the HiveConf of the constructed client.
148 * @param isolationOn When true, custom versions of barrier classes will be constructed.  Must be
149 *                    true unless loading the version of hive that is on Sparks classloader.
150 * @param sharesHadoopClasses When true, we will share Hadoop classes between Spark and
151 * @param rootClassLoader The system root classloader. Must not know about Hive classes.
152 * @param baseClassLoader The spark classloader that is used to load shared classes.
153 */
154private[hive] class IsolatedClientLoader(
155    val version: HiveVersion,
156    val sparkConf: SparkConf,
157    val hadoopConf: Configuration,
158    val execJars: Seq[URL] = Seq.empty,
159    val config: Map[String, String] = Map.empty,
160    val isolationOn: Boolean = true,
161    val sharesHadoopClasses: Boolean = true,
162    val rootClassLoader: ClassLoader = ClassLoader.getSystemClassLoader.getParent.getParent,
163    val baseClassLoader: ClassLoader = Thread.currentThread().getContextClassLoader,
164    val sharedPrefixes: Seq[String] = Seq.empty,
165    val barrierPrefixes: Seq[String] = Seq.empty)
166  extends Logging {
167
168  // Check to make sure that the root classloader does not know about Hive.
169  assert(Try(rootClassLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf")).isFailure)
170
171  /** All jars used by the hive specific classloader. */
172  protected def allJars = execJars.toArray
173
174  protected def isSharedClass(name: String): Boolean = {
175    val isHadoopClass =
176      name.startsWith("org.apache.hadoop.") && !name.startsWith("org.apache.hadoop.hive.")
177
178    name.contains("slf4j") ||
179    name.contains("log4j") ||
180    name.startsWith("org.apache.spark.") ||
181    (sharesHadoopClasses && isHadoopClass) ||
182    name.startsWith("scala.") ||
183    (name.startsWith("com.google") && !name.startsWith("com.google.cloud")) ||
184    name.startsWith("java.lang.") ||
185    name.startsWith("java.net") ||
186    sharedPrefixes.exists(name.startsWith)
187  }
188
189  /** True if `name` refers to a spark class that must see specific version of Hive. */
190  protected def isBarrierClass(name: String): Boolean =
191    name.startsWith(classOf[HiveClientImpl].getName) ||
192    name.startsWith(classOf[Shim].getName) ||
193    barrierPrefixes.exists(name.startsWith)
194
195  protected def classToPath(name: String): String =
196    name.replaceAll("\\.", "/") + ".class"
197
198  /**
199   * The classloader that is used to load an isolated version of Hive.
200   * This classloader is a special URLClassLoader that exposes the addURL method.
201   * So, when we add jar, we can add this new jar directly through the addURL method
202   * instead of stacking a new URLClassLoader on top of it.
203   */
204  private[hive] val classLoader: MutableURLClassLoader = {
205    val isolatedClassLoader =
206      if (isolationOn) {
207        new URLClassLoader(allJars, rootClassLoader) {
208          override def loadClass(name: String, resolve: Boolean): Class[_] = {
209            val loaded = findLoadedClass(name)
210            if (loaded == null) doLoadClass(name, resolve) else loaded
211          }
212          def doLoadClass(name: String, resolve: Boolean): Class[_] = {
213            val classFileName = name.replaceAll("\\.", "/") + ".class"
214            if (isBarrierClass(name)) {
215              // For barrier classes, we construct a new copy of the class.
216              val bytes = IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName))
217              logDebug(s"custom defining: $name - ${util.Arrays.hashCode(bytes)}")
218              defineClass(name, bytes, 0, bytes.length)
219            } else if (!isSharedClass(name)) {
220              logDebug(s"hive class: $name - ${getResource(classToPath(name))}")
221              super.loadClass(name, resolve)
222            } else {
223              // For shared classes, we delegate to baseClassLoader, but fall back in case the
224              // class is not found.
225              logDebug(s"shared class: $name")
226              try {
227                baseClassLoader.loadClass(name)
228              } catch {
229                case _: ClassNotFoundException =>
230                  super.loadClass(name, resolve)
231              }
232            }
233          }
234        }
235      } else {
236        baseClassLoader
237      }
238    // Right now, we create a URLClassLoader that gives preference to isolatedClassLoader
239    // over its own URLs when it loads classes and resources.
240    // We may want to use ChildFirstURLClassLoader based on
241    // the configuration of spark.executor.userClassPathFirst, which gives preference
242    // to its own URLs over the parent class loader (see Executor's createClassLoader method).
243    new NonClosableMutableURLClassLoader(isolatedClassLoader)
244  }
245
246  private[hive] def addJar(path: URL): Unit = synchronized {
247    classLoader.addURL(path)
248  }
249
250  /** The isolated client interface to Hive. */
251  private[hive] def createClient(): HiveClient = {
252    if (!isolationOn) {
253      return new HiveClientImpl(version, sparkConf, hadoopConf, config, baseClassLoader, this)
254    }
255    // Pre-reflective instantiation setup.
256    logDebug("Initializing the logger to avoid disaster...")
257    val origLoader = Thread.currentThread().getContextClassLoader
258    Thread.currentThread.setContextClassLoader(classLoader)
259
260    try {
261      classLoader
262        .loadClass(classOf[HiveClientImpl].getName)
263        .getConstructors.head
264        .newInstance(version, sparkConf, hadoopConf, config, classLoader, this)
265        .asInstanceOf[HiveClient]
266    } catch {
267      case e: InvocationTargetException =>
268        if (e.getCause().isInstanceOf[NoClassDefFoundError]) {
269          val cnf = e.getCause().asInstanceOf[NoClassDefFoundError]
270          throw new ClassNotFoundException(
271            s"$cnf when creating Hive client using classpath: ${execJars.mkString(", ")}\n" +
272            "Please make sure that jars for your version of hive and hadoop are included in the " +
273            s"paths passed to ${HiveUtils.HIVE_METASTORE_JARS.key}.", e)
274        } else {
275          throw e
276        }
277    } finally {
278      Thread.currentThread.setContextClassLoader(origLoader)
279    }
280  }
281
282  /**
283   * The place holder for shared Hive client for all the HiveContext sessions (they share an
284   * IsolatedClientLoader).
285   */
286  private[hive] var cachedHive: Any = null
287}
288