1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.deploy
19
20import java.io.{File, OutputStream, PrintStream}
21
22import scala.collection.mutable.ArrayBuffer
23
24import org.apache.ivy.core.module.descriptor.MDArtifact
25import org.apache.ivy.core.settings.IvySettings
26import org.apache.ivy.plugins.resolver.{AbstractResolver, FileSystemResolver, IBiblioResolver}
27import org.scalatest.BeforeAndAfterAll
28
29import org.apache.spark.SparkFunSuite
30import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate
31import org.apache.spark.util.Utils
32
33class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
34
35  private var tempIvyPath: String = _
36
37  private val noOpOutputStream = new OutputStream {
38    def write(b: Int) = {}
39  }
40
41  /** Simple PrintStream that reads data into a buffer */
42  private class BufferPrintStream extends PrintStream(noOpOutputStream) {
43    var lineBuffer = ArrayBuffer[String]()
44    // scalastyle:off println
45    override def println(line: String) {
46      lineBuffer += line
47    }
48    // scalastyle:on println
49  }
50
51  override def beforeAll() {
52    super.beforeAll()
53    // We don't want to write logs during testing
54    SparkSubmitUtils.printStream = new BufferPrintStream
55    tempIvyPath = Utils.createTempDir(namePrefix = "ivy").getAbsolutePath()
56  }
57
58  test("incorrect maven coordinate throws error") {
59    val coordinates = Seq("a:b: ", " :a:b", "a: :b", "a:b:", ":a:b", "a::b", "::", "a:b", "a")
60    for (coordinate <- coordinates) {
61      intercept[IllegalArgumentException] {
62        SparkSubmitUtils.extractMavenCoordinates(coordinate)
63      }
64    }
65  }
66
67  test("create repo resolvers") {
68    val settings = new IvySettings
69    val res1 = SparkSubmitUtils.createRepoResolvers(None, settings)
70    // should have central and spark-packages by default
71    assert(res1.getResolvers.size() === 4)
72    assert(res1.getResolvers.get(0).asInstanceOf[IBiblioResolver].getName === "local-m2-cache")
73    assert(res1.getResolvers.get(1).asInstanceOf[FileSystemResolver].getName === "local-ivy-cache")
74    assert(res1.getResolvers.get(2).asInstanceOf[IBiblioResolver].getName === "central")
75    assert(res1.getResolvers.get(3).asInstanceOf[IBiblioResolver].getName === "spark-packages")
76
77    val repos = "a/1,b/2,c/3"
78    val resolver2 = SparkSubmitUtils.createRepoResolvers(Option(repos), settings)
79    assert(resolver2.getResolvers.size() === 7)
80    val expected = repos.split(",").map(r => s"$r/")
81    resolver2.getResolvers.toArray.zipWithIndex.foreach { case (resolver: AbstractResolver, i) =>
82      if (i < 3) {
83        assert(resolver.getName === s"repo-${i + 1}")
84        assert(resolver.asInstanceOf[IBiblioResolver].getRoot === expected(i))
85      }
86    }
87  }
88
89  test("add dependencies works correctly") {
90    val md = SparkSubmitUtils.getModuleDescriptor
91    val artifacts = SparkSubmitUtils.extractMavenCoordinates("com.databricks:spark-csv_2.10:0.1," +
92      "com.databricks:spark-avro_2.10:0.1")
93
94    SparkSubmitUtils.addDependenciesToIvy(md, artifacts, "default")
95    assert(md.getDependencies.length === 2)
96  }
97
98  test("excludes works correctly") {
99    val md = SparkSubmitUtils.getModuleDescriptor
100    val excludes = Seq("a:b", "c:d")
101    excludes.foreach { e =>
102      md.addExcludeRule(SparkSubmitUtils.createExclusion(e + ":*", new IvySettings, "default"))
103    }
104    val rules = md.getAllExcludeRules
105    assert(rules.length === 2)
106    val rule1 = rules(0).getId.getModuleId
107    assert(rule1.getOrganisation === "a")
108    assert(rule1.getName === "b")
109    val rule2 = rules(1).getId.getModuleId
110    assert(rule2.getOrganisation === "c")
111    assert(rule2.getName === "d")
112    intercept[IllegalArgumentException] {
113      SparkSubmitUtils.createExclusion("e:f:g:h", new IvySettings, "default")
114    }
115  }
116
117  test("ivy path works correctly") {
118    val md = SparkSubmitUtils.getModuleDescriptor
119    val artifacts = for (i <- 0 until 3) yield new MDArtifact(md, s"jar-$i", "jar", "jar")
120    var jPaths = SparkSubmitUtils.resolveDependencyPaths(artifacts.toArray, new File(tempIvyPath))
121    for (i <- 0 until 3) {
122      val index = jPaths.indexOf(tempIvyPath)
123      assert(index >= 0)
124      jPaths = jPaths.substring(index + tempIvyPath.length)
125    }
126    val main = MavenCoordinate("my.awesome.lib", "mylib", "0.1")
127    IvyTestUtils.withRepository(main, None, None) { repo =>
128      // end to end
129      val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, Option(repo),
130        Option(tempIvyPath), isTest = true)
131      assert(jarPath.indexOf(tempIvyPath) >= 0, "should use non-default ivy path")
132    }
133  }
134
135  test("search for artifact at local repositories") {
136    val main = new MavenCoordinate("my.great.lib", "mylib", "0.1")
137    val dep = "my.great.dep:mydep:0.5"
138    // Local M2 repository
139    IvyTestUtils.withRepository(main, Some(dep), Some(SparkSubmitUtils.m2Path)) { repo =>
140      val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, None, None,
141        isTest = true)
142      assert(jarPath.indexOf("mylib") >= 0, "should find artifact")
143      assert(jarPath.indexOf("mydep") >= 0, "should find dependency")
144    }
145    // Local Ivy Repository
146    val settings = new IvySettings
147    val ivyLocal = new File(settings.getDefaultIvyUserDir, "local" + File.separator)
148    IvyTestUtils.withRepository(main, Some(dep), Some(ivyLocal), useIvyLayout = true) { repo =>
149      val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, None, None,
150        isTest = true)
151      assert(jarPath.indexOf("mylib") >= 0, "should find artifact")
152      assert(jarPath.indexOf("mydep") >= 0, "should find dependency")
153    }
154    // Local ivy repository with modified home
155    val dummyIvyLocal = new File(tempIvyPath, "local" + File.separator)
156    settings.setDefaultIvyUserDir(new File(tempIvyPath))
157    IvyTestUtils.withRepository(main, Some(dep), Some(dummyIvyLocal), useIvyLayout = true,
158      ivySettings = settings) { repo =>
159      val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, None,
160        Some(tempIvyPath), isTest = true)
161      assert(jarPath.indexOf("mylib") >= 0, "should find artifact")
162      assert(jarPath.indexOf(tempIvyPath) >= 0, "should be in new ivy path")
163      assert(jarPath.indexOf("mydep") >= 0, "should find dependency")
164    }
165  }
166
167  test("dependency not found throws RuntimeException") {
168    intercept[RuntimeException] {
169      SparkSubmitUtils.resolveMavenCoordinates("a:b:c", None, None, isTest = true)
170    }
171  }
172
173  test("neglects Spark and Spark's dependencies") {
174    val components = Seq("catalyst_", "core_", "graphx_", "hive_", "mllib_", "repl_",
175      "sql_", "streaming_", "yarn_", "network-common_", "network-shuffle_", "network-yarn_")
176
177    val coordinates =
178      components.map(comp => s"org.apache.spark:spark-${comp}2.10:1.2.0").mkString(",") +
179      ",org.apache.spark:spark-core_fake:1.2.0"
180
181    val path = SparkSubmitUtils.resolveMavenCoordinates(coordinates, None, None, isTest = true)
182    assert(path === "", "should return empty path")
183    val main = MavenCoordinate("org.apache.spark", "spark-streaming-kafka-assembly_2.10", "1.2.0")
184    IvyTestUtils.withRepository(main, None, None) { repo =>
185      val files = SparkSubmitUtils.resolveMavenCoordinates(coordinates + "," + main.toString,
186        Some(repo), None, isTest = true)
187      assert(files.indexOf(main.artifactId) >= 0, "Did not return artifact")
188    }
189  }
190
191  test("exclude dependencies end to end") {
192    val main = new MavenCoordinate("my.great.lib", "mylib", "0.1")
193    val dep = "my.great.dep:mydep:0.5"
194    IvyTestUtils.withRepository(main, Some(dep), None) { repo =>
195      val files = SparkSubmitUtils.resolveMavenCoordinates(main.toString,
196        Some(repo), None, Seq("my.great.dep:mydep"), isTest = true)
197      assert(files.indexOf(main.artifactId) >= 0, "Did not return artifact")
198      assert(files.indexOf("my.great.dep") < 0, "Returned excluded artifact")
199    }
200  }
201}
202