1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.deploy 19 20import java.io.{File, OutputStream, PrintStream} 21 22import scala.collection.mutable.ArrayBuffer 23 24import org.apache.ivy.core.module.descriptor.MDArtifact 25import org.apache.ivy.core.settings.IvySettings 26import org.apache.ivy.plugins.resolver.{AbstractResolver, FileSystemResolver, IBiblioResolver} 27import org.scalatest.BeforeAndAfterAll 28 29import org.apache.spark.SparkFunSuite 30import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate 31import org.apache.spark.util.Utils 32 33class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { 34 35 private var tempIvyPath: String = _ 36 37 private val noOpOutputStream = new OutputStream { 38 def write(b: Int) = {} 39 } 40 41 /** Simple PrintStream that reads data into a buffer */ 42 private class BufferPrintStream extends PrintStream(noOpOutputStream) { 43 var lineBuffer = ArrayBuffer[String]() 44 // scalastyle:off println 45 override def println(line: String) { 46 lineBuffer += line 47 } 48 // scalastyle:on println 49 } 50 51 override def beforeAll() { 52 super.beforeAll() 53 // We don't want to write logs during testing 54 SparkSubmitUtils.printStream = new BufferPrintStream 55 tempIvyPath = Utils.createTempDir(namePrefix = "ivy").getAbsolutePath() 56 } 57 58 test("incorrect maven coordinate throws error") { 59 val coordinates = Seq("a:b: ", " :a:b", "a: :b", "a:b:", ":a:b", "a::b", "::", "a:b", "a") 60 for (coordinate <- coordinates) { 61 intercept[IllegalArgumentException] { 62 SparkSubmitUtils.extractMavenCoordinates(coordinate) 63 } 64 } 65 } 66 67 test("create repo resolvers") { 68 val settings = new IvySettings 69 val res1 = SparkSubmitUtils.createRepoResolvers(None, settings) 70 // should have central and spark-packages by default 71 assert(res1.getResolvers.size() === 4) 72 assert(res1.getResolvers.get(0).asInstanceOf[IBiblioResolver].getName === "local-m2-cache") 73 assert(res1.getResolvers.get(1).asInstanceOf[FileSystemResolver].getName === "local-ivy-cache") 74 assert(res1.getResolvers.get(2).asInstanceOf[IBiblioResolver].getName === "central") 75 assert(res1.getResolvers.get(3).asInstanceOf[IBiblioResolver].getName === "spark-packages") 76 77 val repos = "a/1,b/2,c/3" 78 val resolver2 = SparkSubmitUtils.createRepoResolvers(Option(repos), settings) 79 assert(resolver2.getResolvers.size() === 7) 80 val expected = repos.split(",").map(r => s"$r/") 81 resolver2.getResolvers.toArray.zipWithIndex.foreach { case (resolver: AbstractResolver, i) => 82 if (i < 3) { 83 assert(resolver.getName === s"repo-${i + 1}") 84 assert(resolver.asInstanceOf[IBiblioResolver].getRoot === expected(i)) 85 } 86 } 87 } 88 89 test("add dependencies works correctly") { 90 val md = SparkSubmitUtils.getModuleDescriptor 91 val artifacts = SparkSubmitUtils.extractMavenCoordinates("com.databricks:spark-csv_2.10:0.1," + 92 "com.databricks:spark-avro_2.10:0.1") 93 94 SparkSubmitUtils.addDependenciesToIvy(md, artifacts, "default") 95 assert(md.getDependencies.length === 2) 96 } 97 98 test("excludes works correctly") { 99 val md = SparkSubmitUtils.getModuleDescriptor 100 val excludes = Seq("a:b", "c:d") 101 excludes.foreach { e => 102 md.addExcludeRule(SparkSubmitUtils.createExclusion(e + ":*", new IvySettings, "default")) 103 } 104 val rules = md.getAllExcludeRules 105 assert(rules.length === 2) 106 val rule1 = rules(0).getId.getModuleId 107 assert(rule1.getOrganisation === "a") 108 assert(rule1.getName === "b") 109 val rule2 = rules(1).getId.getModuleId 110 assert(rule2.getOrganisation === "c") 111 assert(rule2.getName === "d") 112 intercept[IllegalArgumentException] { 113 SparkSubmitUtils.createExclusion("e:f:g:h", new IvySettings, "default") 114 } 115 } 116 117 test("ivy path works correctly") { 118 val md = SparkSubmitUtils.getModuleDescriptor 119 val artifacts = for (i <- 0 until 3) yield new MDArtifact(md, s"jar-$i", "jar", "jar") 120 var jPaths = SparkSubmitUtils.resolveDependencyPaths(artifacts.toArray, new File(tempIvyPath)) 121 for (i <- 0 until 3) { 122 val index = jPaths.indexOf(tempIvyPath) 123 assert(index >= 0) 124 jPaths = jPaths.substring(index + tempIvyPath.length) 125 } 126 val main = MavenCoordinate("my.awesome.lib", "mylib", "0.1") 127 IvyTestUtils.withRepository(main, None, None) { repo => 128 // end to end 129 val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, Option(repo), 130 Option(tempIvyPath), isTest = true) 131 assert(jarPath.indexOf(tempIvyPath) >= 0, "should use non-default ivy path") 132 } 133 } 134 135 test("search for artifact at local repositories") { 136 val main = new MavenCoordinate("my.great.lib", "mylib", "0.1") 137 val dep = "my.great.dep:mydep:0.5" 138 // Local M2 repository 139 IvyTestUtils.withRepository(main, Some(dep), Some(SparkSubmitUtils.m2Path)) { repo => 140 val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, None, None, 141 isTest = true) 142 assert(jarPath.indexOf("mylib") >= 0, "should find artifact") 143 assert(jarPath.indexOf("mydep") >= 0, "should find dependency") 144 } 145 // Local Ivy Repository 146 val settings = new IvySettings 147 val ivyLocal = new File(settings.getDefaultIvyUserDir, "local" + File.separator) 148 IvyTestUtils.withRepository(main, Some(dep), Some(ivyLocal), useIvyLayout = true) { repo => 149 val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, None, None, 150 isTest = true) 151 assert(jarPath.indexOf("mylib") >= 0, "should find artifact") 152 assert(jarPath.indexOf("mydep") >= 0, "should find dependency") 153 } 154 // Local ivy repository with modified home 155 val dummyIvyLocal = new File(tempIvyPath, "local" + File.separator) 156 settings.setDefaultIvyUserDir(new File(tempIvyPath)) 157 IvyTestUtils.withRepository(main, Some(dep), Some(dummyIvyLocal), useIvyLayout = true, 158 ivySettings = settings) { repo => 159 val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, None, 160 Some(tempIvyPath), isTest = true) 161 assert(jarPath.indexOf("mylib") >= 0, "should find artifact") 162 assert(jarPath.indexOf(tempIvyPath) >= 0, "should be in new ivy path") 163 assert(jarPath.indexOf("mydep") >= 0, "should find dependency") 164 } 165 } 166 167 test("dependency not found throws RuntimeException") { 168 intercept[RuntimeException] { 169 SparkSubmitUtils.resolveMavenCoordinates("a:b:c", None, None, isTest = true) 170 } 171 } 172 173 test("neglects Spark and Spark's dependencies") { 174 val components = Seq("catalyst_", "core_", "graphx_", "hive_", "mllib_", "repl_", 175 "sql_", "streaming_", "yarn_", "network-common_", "network-shuffle_", "network-yarn_") 176 177 val coordinates = 178 components.map(comp => s"org.apache.spark:spark-${comp}2.10:1.2.0").mkString(",") + 179 ",org.apache.spark:spark-core_fake:1.2.0" 180 181 val path = SparkSubmitUtils.resolveMavenCoordinates(coordinates, None, None, isTest = true) 182 assert(path === "", "should return empty path") 183 val main = MavenCoordinate("org.apache.spark", "spark-streaming-kafka-assembly_2.10", "1.2.0") 184 IvyTestUtils.withRepository(main, None, None) { repo => 185 val files = SparkSubmitUtils.resolveMavenCoordinates(coordinates + "," + main.toString, 186 Some(repo), None, isTest = true) 187 assert(files.indexOf(main.artifactId) >= 0, "Did not return artifact") 188 } 189 } 190 191 test("exclude dependencies end to end") { 192 val main = new MavenCoordinate("my.great.lib", "mylib", "0.1") 193 val dep = "my.great.dep:mydep:0.5" 194 IvyTestUtils.withRepository(main, Some(dep), None) { repo => 195 val files = SparkSubmitUtils.resolveMavenCoordinates(main.toString, 196 Some(repo), None, Seq("my.great.dep:mydep"), isTest = true) 197 assert(files.indexOf(main.artifactId) >= 0, "Did not return artifact") 198 assert(files.indexOf("my.great.dep") < 0, "Returned excluded artifact") 199 } 200 } 201} 202