1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark
19
20import java.io.File
21import java.net.{MalformedURLException, URI}
22import java.nio.charset.StandardCharsets
23import java.util.concurrent.TimeUnit
24
25import scala.concurrent.Await
26import scala.concurrent.duration.Duration
27
28import com.google.common.io.Files
29import org.apache.hadoop.conf.Configuration
30import org.apache.hadoop.fs.{FileSystem, Path}
31import org.apache.hadoop.io.{BytesWritable, LongWritable, Text}
32import org.apache.hadoop.mapred.TextInputFormat
33import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFormat}
34import org.scalatest.Matchers._
35
36import org.apache.spark.util.Utils
37
38class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
39
40  test("Only one SparkContext may be active at a time") {
41    // Regression test for SPARK-4180
42    val conf = new SparkConf().setAppName("test").setMaster("local")
43      .set("spark.driver.allowMultipleContexts", "false")
44    sc = new SparkContext(conf)
45    val envBefore = SparkEnv.get
46    // A SparkContext is already running, so we shouldn't be able to create a second one
47    intercept[SparkException] { new SparkContext(conf) }
48    val envAfter = SparkEnv.get
49    // SparkEnv and other context variables should be the same
50    assert(envBefore == envAfter)
51    // After stopping the running context, we should be able to create a new one
52    resetSparkContext()
53    sc = new SparkContext(conf)
54  }
55
56  test("Can still construct a new SparkContext after failing to construct a previous one") {
57    val conf = new SparkConf().set("spark.driver.allowMultipleContexts", "false")
58    // This is an invalid configuration (no app name or master URL)
59    intercept[SparkException] {
60      new SparkContext(conf)
61    }
62    // Even though those earlier calls failed, we should still be able to create a new context
63    sc = new SparkContext(conf.setMaster("local").setAppName("test"))
64  }
65
66  test("Check for multiple SparkContexts can be disabled via undocumented debug option") {
67    var secondSparkContext: SparkContext = null
68    try {
69      val conf = new SparkConf().setAppName("test").setMaster("local")
70        .set("spark.driver.allowMultipleContexts", "true")
71      sc = new SparkContext(conf)
72      secondSparkContext = new SparkContext(conf)
73    } finally {
74      Option(secondSparkContext).foreach(_.stop())
75    }
76  }
77
78  test("Test getOrCreate") {
79    var sc2: SparkContext = null
80    SparkContext.clearActiveContext()
81    val conf = new SparkConf().setAppName("test").setMaster("local")
82
83    sc = SparkContext.getOrCreate(conf)
84
85    assert(sc.getConf.get("spark.app.name").equals("test"))
86    sc2 = SparkContext.getOrCreate(new SparkConf().setAppName("test2").setMaster("local"))
87    assert(sc2.getConf.get("spark.app.name").equals("test"))
88    assert(sc === sc2)
89    assert(sc eq sc2)
90
91    // Try creating second context to confirm that it's still possible, if desired
92    sc2 = new SparkContext(new SparkConf().setAppName("test3").setMaster("local")
93        .set("spark.driver.allowMultipleContexts", "true"))
94
95    sc2.stop()
96  }
97
98  test("BytesWritable implicit conversion is correct") {
99    // Regression test for SPARK-3121
100    val bytesWritable = new BytesWritable()
101    val inputArray = (1 to 10).map(_.toByte).toArray
102    bytesWritable.set(inputArray, 0, 10)
103    bytesWritable.set(inputArray, 0, 5)
104
105    val converter = WritableConverter.bytesWritableConverter()
106    val byteArray = converter.convert(bytesWritable)
107    assert(byteArray.length === 5)
108
109    bytesWritable.set(inputArray, 0, 0)
110    val byteArray2 = converter.convert(bytesWritable)
111    assert(byteArray2.length === 0)
112  }
113
114  test("basic case for addFile and listFiles") {
115    val dir = Utils.createTempDir()
116
117    val file1 = File.createTempFile("someprefix1", "somesuffix1", dir)
118    val absolutePath1 = file1.getAbsolutePath
119
120    val file2 = File.createTempFile("someprefix2", "somesuffix2", dir)
121    val relativePath = file2.getParent + "/../" + file2.getParentFile.getName + "/" + file2.getName
122    val absolutePath2 = file2.getAbsolutePath
123
124    try {
125      Files.write("somewords1", file1, StandardCharsets.UTF_8)
126      Files.write("somewords2", file2, StandardCharsets.UTF_8)
127      val length1 = file1.length()
128      val length2 = file2.length()
129
130      sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
131      sc.addFile(file1.getAbsolutePath)
132      sc.addFile(relativePath)
133      sc.parallelize(Array(1), 1).map(x => {
134        val gotten1 = new File(SparkFiles.get(file1.getName))
135        val gotten2 = new File(SparkFiles.get(file2.getName))
136        if (!gotten1.exists()) {
137          throw new SparkException("file doesn't exist : " + absolutePath1)
138        }
139        if (!gotten2.exists()) {
140          throw new SparkException("file doesn't exist : " + absolutePath2)
141        }
142
143        if (length1 != gotten1.length()) {
144          throw new SparkException(
145            s"file has different length $length1 than added file ${gotten1.length()} : " +
146              absolutePath1)
147        }
148        if (length2 != gotten2.length()) {
149          throw new SparkException(
150            s"file has different length $length2 than added file ${gotten2.length()} : " +
151              absolutePath2)
152        }
153
154        if (absolutePath1 == gotten1.getAbsolutePath) {
155          throw new SparkException("file should have been copied :" + absolutePath1)
156        }
157        if (absolutePath2 == gotten2.getAbsolutePath) {
158          throw new SparkException("file should have been copied : " + absolutePath2)
159        }
160        x
161      }).count()
162      assert(sc.listFiles().filter(_.contains("somesuffix1")).size == 1)
163    } finally {
164      sc.stop()
165    }
166  }
167
168  test("add and list jar files") {
169    val jarPath = Thread.currentThread().getContextClassLoader.getResource("TestUDTF.jar")
170    try {
171      sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
172      sc.addJar(jarPath.toString)
173      assert(sc.listJars().filter(_.contains("TestUDTF.jar")).size == 1)
174    } finally {
175      sc.stop()
176    }
177  }
178
179  test("SPARK-17650: malformed url's throw exceptions before bricking Executors") {
180    try {
181      sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
182      Seq("http", "https", "ftp").foreach { scheme =>
183        val badURL = s"$scheme://user:pwd/path"
184        val e1 = intercept[MalformedURLException] {
185          sc.addFile(badURL)
186        }
187        assert(e1.getMessage.contains(badURL))
188        val e2 = intercept[MalformedURLException] {
189          sc.addJar(badURL)
190        }
191        assert(e2.getMessage.contains(badURL))
192        assert(sc.addedFiles.isEmpty)
193        assert(sc.addedJars.isEmpty)
194      }
195    } finally {
196      sc.stop()
197    }
198  }
199
200  test("addFile recursive works") {
201    val pluto = Utils.createTempDir()
202    val neptune = Utils.createTempDir(pluto.getAbsolutePath)
203    val saturn = Utils.createTempDir(neptune.getAbsolutePath)
204    val alien1 = File.createTempFile("alien", "1", neptune)
205    val alien2 = File.createTempFile("alien", "2", saturn)
206
207    try {
208      sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
209      sc.addFile(neptune.getAbsolutePath, true)
210      sc.parallelize(Array(1), 1).map(x => {
211        val sep = File.separator
212        if (!new File(SparkFiles.get(neptune.getName + sep + alien1.getName)).exists()) {
213          throw new SparkException("can't access file under root added directory")
214        }
215        if (!new File(SparkFiles.get(neptune.getName + sep + saturn.getName + sep + alien2.getName))
216            .exists()) {
217          throw new SparkException("can't access file in nested directory")
218        }
219        if (new File(SparkFiles.get(pluto.getName + sep + neptune.getName + sep + alien1.getName))
220            .exists()) {
221          throw new SparkException("file exists that shouldn't")
222        }
223        x
224      }).count()
225    } finally {
226      sc.stop()
227    }
228  }
229
230  test("addFile recursive can't add directories by default") {
231    val dir = Utils.createTempDir()
232
233    try {
234      sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
235      intercept[SparkException] {
236        sc.addFile(dir.getAbsolutePath)
237      }
238    } finally {
239      sc.stop()
240    }
241  }
242
243  test("cannot call addFile with different paths that have the same filename") {
244    val dir = Utils.createTempDir()
245    try {
246      val subdir1 = new File(dir, "subdir1")
247      val subdir2 = new File(dir, "subdir2")
248      assert(subdir1.mkdir())
249      assert(subdir2.mkdir())
250      val file1 = new File(subdir1, "file")
251      val file2 = new File(subdir2, "file")
252      Files.write("old", file1, StandardCharsets.UTF_8)
253      Files.write("new", file2, StandardCharsets.UTF_8)
254      sc = new SparkContext("local-cluster[1,1,1024]", "test")
255      sc.addFile(file1.getAbsolutePath)
256      def getAddedFileContents(): String = {
257        sc.parallelize(Seq(0)).map { _ =>
258          scala.io.Source.fromFile(SparkFiles.get("file")).mkString
259        }.first()
260      }
261      assert(getAddedFileContents() === "old")
262      intercept[IllegalArgumentException] {
263        sc.addFile(file2.getAbsolutePath)
264      }
265      assert(getAddedFileContents() === "old")
266    } finally {
267      Utils.deleteRecursively(dir)
268    }
269  }
270
271  // Regression tests for SPARK-16787
272  for (
273    schedulingMode <- Seq("local-mode", "non-local-mode");
274    method <- Seq("addJar", "addFile")
275  ) {
276    val jarPath = Thread.currentThread().getContextClassLoader.getResource("TestUDTF.jar").toString
277    val master = schedulingMode match {
278      case "local-mode" => "local"
279      case "non-local-mode" => "local-cluster[1,1,1024]"
280    }
281    test(s"$method can be called twice with same file in $schedulingMode (SPARK-16787)") {
282      sc = new SparkContext(master, "test")
283      method match {
284        case "addJar" =>
285          sc.addJar(jarPath)
286          sc.addJar(jarPath)
287        case "addFile" =>
288          sc.addFile(jarPath)
289          sc.addFile(jarPath)
290      }
291    }
292  }
293
294  test("add jar with invalid path") {
295    val tmpDir = Utils.createTempDir()
296    val tmpJar = File.createTempFile("test", ".jar", tmpDir)
297
298    sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
299    sc.addJar(tmpJar.getAbsolutePath)
300
301    // Invaid jar path will only print the error log, will not add to file server.
302    sc.addJar("dummy.jar")
303    sc.addJar("")
304    sc.addJar(tmpDir.getAbsolutePath)
305
306    sc.listJars().size should be (1)
307    sc.listJars().head should include (tmpJar.getName)
308  }
309
310  test("Cancelling job group should not cause SparkContext to shutdown (SPARK-6414)") {
311    try {
312      sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
313      val future = sc.parallelize(Seq(0)).foreachAsync(_ => {Thread.sleep(1000L)})
314      sc.cancelJobGroup("nonExistGroupId")
315      Await.ready(future, Duration(2, TimeUnit.SECONDS))
316
317      // In SPARK-6414, sc.cancelJobGroup will cause NullPointerException and cause
318      // SparkContext to shutdown, so the following assertion will fail.
319      assert(sc.parallelize(1 to 10).count() == 10L)
320    } finally {
321      sc.stop()
322    }
323  }
324
325  test("Comma separated paths for newAPIHadoopFile/wholeTextFiles/binaryFiles (SPARK-7155)") {
326    // Regression test for SPARK-7155
327    // dir1 and dir2 are used for wholeTextFiles and binaryFiles
328    val dir1 = Utils.createTempDir()
329    val dir2 = Utils.createTempDir()
330
331    val dirpath1 = dir1.getAbsolutePath
332    val dirpath2 = dir2.getAbsolutePath
333
334    // file1 and file2 are placed inside dir1, they are also used for
335    // textFile, hadoopFile, and newAPIHadoopFile
336    // file3, file4 and file5 are placed inside dir2, they are used for
337    // textFile, hadoopFile, and newAPIHadoopFile as well
338    val file1 = new File(dir1, "part-00000")
339    val file2 = new File(dir1, "part-00001")
340    val file3 = new File(dir2, "part-00000")
341    val file4 = new File(dir2, "part-00001")
342    val file5 = new File(dir2, "part-00002")
343
344    val filepath1 = file1.getAbsolutePath
345    val filepath2 = file2.getAbsolutePath
346    val filepath3 = file3.getAbsolutePath
347    val filepath4 = file4.getAbsolutePath
348    val filepath5 = file5.getAbsolutePath
349
350
351    try {
352      // Create 5 text files.
353      Files.write("someline1 in file1\nsomeline2 in file1\nsomeline3 in file1", file1,
354        StandardCharsets.UTF_8)
355      Files.write("someline1 in file2\nsomeline2 in file2", file2, StandardCharsets.UTF_8)
356      Files.write("someline1 in file3", file3, StandardCharsets.UTF_8)
357      Files.write("someline1 in file4\nsomeline2 in file4", file4, StandardCharsets.UTF_8)
358      Files.write("someline1 in file2\nsomeline2 in file5", file5, StandardCharsets.UTF_8)
359
360      sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
361
362      // Test textFile, hadoopFile, and newAPIHadoopFile for file1 and file2
363      assert(sc.textFile(filepath1 + "," + filepath2).count() == 5L)
364      assert(sc.hadoopFile(filepath1 + "," + filepath2,
365        classOf[TextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L)
366      assert(sc.newAPIHadoopFile(filepath1 + "," + filepath2,
367        classOf[NewTextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L)
368
369      // Test textFile, hadoopFile, and newAPIHadoopFile for file3, file4, and file5
370      assert(sc.textFile(filepath3 + "," + filepath4 + "," + filepath5).count() == 5L)
371      assert(sc.hadoopFile(filepath3 + "," + filepath4 + "," + filepath5,
372               classOf[TextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L)
373      assert(sc.newAPIHadoopFile(filepath3 + "," + filepath4 + "," + filepath5,
374               classOf[NewTextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L)
375
376      // Test wholeTextFiles, and binaryFiles for dir1 and dir2
377      assert(sc.wholeTextFiles(dirpath1 + "," + dirpath2).count() == 5L)
378      assert(sc.binaryFiles(dirpath1 + "," + dirpath2).count() == 5L)
379
380    } finally {
381      sc.stop()
382    }
383  }
384
385  test("Default path for file based RDDs is properly set (SPARK-12517)") {
386    sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
387
388    // Test filetextFile, wholeTextFiles, binaryFiles, hadoopFile and
389    // newAPIHadoopFile for setting the default path as the RDD name
390    val mockPath = "default/path/for/"
391
392    var targetPath = mockPath + "textFile"
393    assert(sc.textFile(targetPath).name === targetPath)
394
395    targetPath = mockPath + "wholeTextFiles"
396    assert(sc.wholeTextFiles(targetPath).name === targetPath)
397
398    targetPath = mockPath + "binaryFiles"
399    assert(sc.binaryFiles(targetPath).name === targetPath)
400
401    targetPath = mockPath + "hadoopFile"
402    assert(sc.hadoopFile(targetPath).name === targetPath)
403
404    targetPath = mockPath + "newAPIHadoopFile"
405    assert(sc.newAPIHadoopFile(targetPath).name === targetPath)
406
407    sc.stop()
408  }
409
410  test("calling multiple sc.stop() must not throw any exception") {
411    noException should be thrownBy {
412      sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
413      val cnt = sc.parallelize(1 to 4).count()
414      sc.cancelAllJobs()
415      sc.stop()
416      // call stop second time
417      sc.stop()
418    }
419  }
420
421  test("No exception when both num-executors and dynamic allocation set.") {
422    noException should be thrownBy {
423      sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")
424        .set("spark.dynamicAllocation.enabled", "true").set("spark.executor.instances", "6"))
425      assert(sc.executorAllocationManager.isEmpty)
426      assert(sc.getConf.getInt("spark.executor.instances", 0) === 6)
427    }
428  }
429
430
431  test("localProperties are inherited by spawned threads.") {
432    sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
433    sc.setLocalProperty("testProperty", "testValue")
434    var result = "unset";
435    val thread = new Thread() { override def run() = {result = sc.getLocalProperty("testProperty")}}
436    thread.start()
437    thread.join()
438    sc.stop()
439    assert(result == "testValue")
440  }
441
442  test("localProperties do not cross-talk between threads.") {
443    sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
444    var result = "unset";
445    val thread1 = new Thread() {
446      override def run() = {sc.setLocalProperty("testProperty", "testValue")}}
447    // testProperty should be unset and thus return null
448    val thread2 = new Thread() {
449      override def run() = {result = sc.getLocalProperty("testProperty")}}
450    thread1.start()
451    thread1.join()
452    thread2.start()
453    thread2.join()
454    sc.stop()
455    assert(result == null)
456  }
457
458  test("log level case-insensitive and reset log level") {
459    sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
460    val originalLevel = org.apache.log4j.Logger.getRootLogger().getLevel
461    try {
462      sc.setLogLevel("debug")
463      assert(org.apache.log4j.Logger.getRootLogger().getLevel === org.apache.log4j.Level.DEBUG)
464      sc.setLogLevel("INfo")
465      assert(org.apache.log4j.Logger.getRootLogger().getLevel === org.apache.log4j.Level.INFO)
466    } finally {
467      sc.setLogLevel(originalLevel.toString)
468      assert(org.apache.log4j.Logger.getRootLogger().getLevel === originalLevel)
469      sc.stop()
470    }
471  }
472
473  test("SPARK-19446: DebugFilesystem.assertNoOpenStreams should report " +
474    "open streams to help debugging") {
475    val fs = new DebugFilesystem()
476    fs.initialize(new URI("file:///"), new Configuration())
477    val file = File.createTempFile("SPARK19446", "temp")
478    Files.write(Array.ofDim[Byte](1000), file)
479    val path = new Path("file:///" + file.getCanonicalPath)
480    val stream = fs.open(path)
481    val exc = intercept[RuntimeException] {
482      DebugFilesystem.assertNoOpenStreams()
483    }
484    assert(exc != null)
485    assert(exc.getCause() != null)
486    stream.close()
487  }
488}
489