1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.sql.hive.client 19 20import java.io.File 21import java.lang.reflect.InvocationTargetException 22import java.net.{URL, URLClassLoader} 23import java.util 24 25import scala.language.reflectiveCalls 26import scala.util.Try 27 28import org.apache.commons.io.{FileUtils, IOUtils} 29import org.apache.hadoop.conf.Configuration 30 31import org.apache.spark.SparkConf 32import org.apache.spark.deploy.SparkSubmitUtils 33import org.apache.spark.internal.Logging 34import org.apache.spark.sql.catalyst.util.quietly 35import org.apache.spark.sql.hive.HiveUtils 36import org.apache.spark.sql.internal.NonClosableMutableURLClassLoader 37import org.apache.spark.util.{MutableURLClassLoader, Utils} 38 39/** Factory for `IsolatedClientLoader` with specific versions of hive. */ 40private[hive] object IsolatedClientLoader extends Logging { 41 /** 42 * Creates isolated Hive client loaders by downloading the requested version from maven. 43 */ 44 def forVersion( 45 hiveMetastoreVersion: String, 46 hadoopVersion: String, 47 sparkConf: SparkConf, 48 hadoopConf: Configuration, 49 config: Map[String, String] = Map.empty, 50 ivyPath: Option[String] = None, 51 sharedPrefixes: Seq[String] = Seq.empty, 52 barrierPrefixes: Seq[String] = Seq.empty): IsolatedClientLoader = synchronized { 53 val resolvedVersion = hiveVersion(hiveMetastoreVersion) 54 // We will first try to share Hadoop classes. If we cannot resolve the Hadoop artifact 55 // with the given version, we will use Hadoop 2.4.0 and then will not share Hadoop classes. 56 var sharesHadoopClasses = true 57 val files = if (resolvedVersions.contains((resolvedVersion, hadoopVersion))) { 58 resolvedVersions((resolvedVersion, hadoopVersion)) 59 } else { 60 val (downloadedFiles, actualHadoopVersion) = 61 try { 62 (downloadVersion(resolvedVersion, hadoopVersion, ivyPath), hadoopVersion) 63 } catch { 64 case e: RuntimeException if e.getMessage.contains("hadoop") => 65 // If the error message contains hadoop, it is probably because the hadoop 66 // version cannot be resolved (e.g. it is a vendor specific version like 67 // 2.0.0-cdh4.1.1). If it is the case, we will try just 68 // "org.apache.hadoop:hadoop-client:2.4.0". "org.apache.hadoop:hadoop-client:2.4.0" 69 // is used just because we used to hard code it as the hadoop artifact to download. 70 logWarning(s"Failed to resolve Hadoop artifacts for the version ${hadoopVersion}. " + 71 s"We will change the hadoop version from ${hadoopVersion} to 2.4.0 and try again. " + 72 "Hadoop classes will not be shared between Spark and Hive metastore client. " + 73 "It is recommended to set jars used by Hive metastore client through " + 74 "spark.sql.hive.metastore.jars in the production environment.") 75 sharesHadoopClasses = false 76 (downloadVersion(resolvedVersion, "2.4.0", ivyPath), "2.4.0") 77 } 78 resolvedVersions.put((resolvedVersion, actualHadoopVersion), downloadedFiles) 79 resolvedVersions((resolvedVersion, actualHadoopVersion)) 80 } 81 82 new IsolatedClientLoader( 83 hiveVersion(hiveMetastoreVersion), 84 sparkConf, 85 execJars = files, 86 hadoopConf = hadoopConf, 87 config = config, 88 sharesHadoopClasses = sharesHadoopClasses, 89 sharedPrefixes = sharedPrefixes, 90 barrierPrefixes = barrierPrefixes) 91 } 92 93 def hiveVersion(version: String): HiveVersion = version match { 94 case "12" | "0.12" | "0.12.0" => hive.v12 95 case "13" | "0.13" | "0.13.0" | "0.13.1" => hive.v13 96 case "14" | "0.14" | "0.14.0" => hive.v14 97 case "1.0" | "1.0.0" => hive.v1_0 98 case "1.1" | "1.1.0" => hive.v1_1 99 case "1.2" | "1.2.0" | "1.2.1" => hive.v1_2 100 } 101 102 private def downloadVersion( 103 version: HiveVersion, 104 hadoopVersion: String, 105 ivyPath: Option[String]): Seq[URL] = { 106 val hiveArtifacts = version.extraDeps ++ 107 Seq("hive-metastore", "hive-exec", "hive-common", "hive-serde") 108 .map(a => s"org.apache.hive:$a:${version.fullVersion}") ++ 109 Seq("com.google.guava:guava:14.0.1", 110 s"org.apache.hadoop:hadoop-client:$hadoopVersion") 111 112 val classpath = quietly { 113 SparkSubmitUtils.resolveMavenCoordinates( 114 hiveArtifacts.mkString(","), 115 Some("http://www.datanucleus.org/downloads/maven2"), 116 ivyPath, 117 exclusions = version.exclusions) 118 } 119 val allFiles = classpath.split(",").map(new File(_)).toSet 120 121 // TODO: Remove copy logic. 122 val tempDir = Utils.createTempDir(namePrefix = s"hive-${version}") 123 allFiles.foreach(f => FileUtils.copyFileToDirectory(f, tempDir)) 124 tempDir.listFiles().map(_.toURI.toURL) 125 } 126 127 // A map from a given pair of HiveVersion and Hadoop version to jar files. 128 // It is only used by forVersion. 129 private val resolvedVersions = 130 new scala.collection.mutable.HashMap[(HiveVersion, String), Seq[URL]] 131} 132 133/** 134 * Creates a [[HiveClient]] using a classloader that works according to the following rules: 135 * - Shared classes: Java, Scala, logging, and Spark classes are delegated to `baseClassLoader` 136 * allowing the results of calls to the [[HiveClient]] to be visible externally. 137 * - Hive classes: new instances are loaded from `execJars`. These classes are not 138 * accessible externally due to their custom loading. 139 * - [[HiveClientImpl]]: a new copy is created for each instance of `IsolatedClassLoader`. 140 * This new instance is able to see a specific version of hive without using reflection. Since 141 * this is a unique instance, it is not visible externally other than as a generic 142 * [[HiveClient]], unless `isolationOn` is set to `false`. 143 * 144 * @param version The version of hive on the classpath. used to pick specific function signatures 145 * that are not compatible across versions. 146 * @param execJars A collection of jar files that must include hive and hadoop. 147 * @param config A set of options that will be added to the HiveConf of the constructed client. 148 * @param isolationOn When true, custom versions of barrier classes will be constructed. Must be 149 * true unless loading the version of hive that is on Sparks classloader. 150 * @param sharesHadoopClasses When true, we will share Hadoop classes between Spark and 151 * @param rootClassLoader The system root classloader. Must not know about Hive classes. 152 * @param baseClassLoader The spark classloader that is used to load shared classes. 153 */ 154private[hive] class IsolatedClientLoader( 155 val version: HiveVersion, 156 val sparkConf: SparkConf, 157 val hadoopConf: Configuration, 158 val execJars: Seq[URL] = Seq.empty, 159 val config: Map[String, String] = Map.empty, 160 val isolationOn: Boolean = true, 161 val sharesHadoopClasses: Boolean = true, 162 val rootClassLoader: ClassLoader = ClassLoader.getSystemClassLoader.getParent.getParent, 163 val baseClassLoader: ClassLoader = Thread.currentThread().getContextClassLoader, 164 val sharedPrefixes: Seq[String] = Seq.empty, 165 val barrierPrefixes: Seq[String] = Seq.empty) 166 extends Logging { 167 168 // Check to make sure that the root classloader does not know about Hive. 169 assert(Try(rootClassLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf")).isFailure) 170 171 /** All jars used by the hive specific classloader. */ 172 protected def allJars = execJars.toArray 173 174 protected def isSharedClass(name: String): Boolean = { 175 val isHadoopClass = 176 name.startsWith("org.apache.hadoop.") && !name.startsWith("org.apache.hadoop.hive.") 177 178 name.contains("slf4j") || 179 name.contains("log4j") || 180 name.startsWith("org.apache.spark.") || 181 (sharesHadoopClasses && isHadoopClass) || 182 name.startsWith("scala.") || 183 (name.startsWith("com.google") && !name.startsWith("com.google.cloud")) || 184 name.startsWith("java.lang.") || 185 name.startsWith("java.net") || 186 sharedPrefixes.exists(name.startsWith) 187 } 188 189 /** True if `name` refers to a spark class that must see specific version of Hive. */ 190 protected def isBarrierClass(name: String): Boolean = 191 name.startsWith(classOf[HiveClientImpl].getName) || 192 name.startsWith(classOf[Shim].getName) || 193 barrierPrefixes.exists(name.startsWith) 194 195 protected def classToPath(name: String): String = 196 name.replaceAll("\\.", "/") + ".class" 197 198 /** 199 * The classloader that is used to load an isolated version of Hive. 200 * This classloader is a special URLClassLoader that exposes the addURL method. 201 * So, when we add jar, we can add this new jar directly through the addURL method 202 * instead of stacking a new URLClassLoader on top of it. 203 */ 204 private[hive] val classLoader: MutableURLClassLoader = { 205 val isolatedClassLoader = 206 if (isolationOn) { 207 new URLClassLoader(allJars, rootClassLoader) { 208 override def loadClass(name: String, resolve: Boolean): Class[_] = { 209 val loaded = findLoadedClass(name) 210 if (loaded == null) doLoadClass(name, resolve) else loaded 211 } 212 def doLoadClass(name: String, resolve: Boolean): Class[_] = { 213 val classFileName = name.replaceAll("\\.", "/") + ".class" 214 if (isBarrierClass(name)) { 215 // For barrier classes, we construct a new copy of the class. 216 val bytes = IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName)) 217 logDebug(s"custom defining: $name - ${util.Arrays.hashCode(bytes)}") 218 defineClass(name, bytes, 0, bytes.length) 219 } else if (!isSharedClass(name)) { 220 logDebug(s"hive class: $name - ${getResource(classToPath(name))}") 221 super.loadClass(name, resolve) 222 } else { 223 // For shared classes, we delegate to baseClassLoader, but fall back in case the 224 // class is not found. 225 logDebug(s"shared class: $name") 226 try { 227 baseClassLoader.loadClass(name) 228 } catch { 229 case _: ClassNotFoundException => 230 super.loadClass(name, resolve) 231 } 232 } 233 } 234 } 235 } else { 236 baseClassLoader 237 } 238 // Right now, we create a URLClassLoader that gives preference to isolatedClassLoader 239 // over its own URLs when it loads classes and resources. 240 // We may want to use ChildFirstURLClassLoader based on 241 // the configuration of spark.executor.userClassPathFirst, which gives preference 242 // to its own URLs over the parent class loader (see Executor's createClassLoader method). 243 new NonClosableMutableURLClassLoader(isolatedClassLoader) 244 } 245 246 private[hive] def addJar(path: URL): Unit = synchronized { 247 classLoader.addURL(path) 248 } 249 250 /** The isolated client interface to Hive. */ 251 private[hive] def createClient(): HiveClient = { 252 if (!isolationOn) { 253 return new HiveClientImpl(version, sparkConf, hadoopConf, config, baseClassLoader, this) 254 } 255 // Pre-reflective instantiation setup. 256 logDebug("Initializing the logger to avoid disaster...") 257 val origLoader = Thread.currentThread().getContextClassLoader 258 Thread.currentThread.setContextClassLoader(classLoader) 259 260 try { 261 classLoader 262 .loadClass(classOf[HiveClientImpl].getName) 263 .getConstructors.head 264 .newInstance(version, sparkConf, hadoopConf, config, classLoader, this) 265 .asInstanceOf[HiveClient] 266 } catch { 267 case e: InvocationTargetException => 268 if (e.getCause().isInstanceOf[NoClassDefFoundError]) { 269 val cnf = e.getCause().asInstanceOf[NoClassDefFoundError] 270 throw new ClassNotFoundException( 271 s"$cnf when creating Hive client using classpath: ${execJars.mkString(", ")}\n" + 272 "Please make sure that jars for your version of hive and hadoop are included in the " + 273 s"paths passed to ${HiveUtils.HIVE_METASTORE_JARS.key}.", e) 274 } else { 275 throw e 276 } 277 } finally { 278 Thread.currentThread.setContextClassLoader(origLoader) 279 } 280 } 281 282 /** 283 * The place holder for shared Hive client for all the HiveContext sessions (they share an 284 * IsolatedClientLoader). 285 */ 286 private[hive] var cachedHive: Any = null 287} 288