1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark 19 20import java.io.File 21import java.net.{MalformedURLException, URI} 22import java.nio.charset.StandardCharsets 23import java.util.concurrent.TimeUnit 24 25import scala.concurrent.Await 26import scala.concurrent.duration.Duration 27 28import com.google.common.io.Files 29import org.apache.hadoop.conf.Configuration 30import org.apache.hadoop.fs.{FileSystem, Path} 31import org.apache.hadoop.io.{BytesWritable, LongWritable, Text} 32import org.apache.hadoop.mapred.TextInputFormat 33import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFormat} 34import org.scalatest.Matchers._ 35 36import org.apache.spark.util.Utils 37 38class SparkContextSuite extends SparkFunSuite with LocalSparkContext { 39 40 test("Only one SparkContext may be active at a time") { 41 // Regression test for SPARK-4180 42 val conf = new SparkConf().setAppName("test").setMaster("local") 43 .set("spark.driver.allowMultipleContexts", "false") 44 sc = new SparkContext(conf) 45 val envBefore = SparkEnv.get 46 // A SparkContext is already running, so we shouldn't be able to create a second one 47 intercept[SparkException] { new SparkContext(conf) } 48 val envAfter = SparkEnv.get 49 // SparkEnv and other context variables should be the same 50 assert(envBefore == envAfter) 51 // After stopping the running context, we should be able to create a new one 52 resetSparkContext() 53 sc = new SparkContext(conf) 54 } 55 56 test("Can still construct a new SparkContext after failing to construct a previous one") { 57 val conf = new SparkConf().set("spark.driver.allowMultipleContexts", "false") 58 // This is an invalid configuration (no app name or master URL) 59 intercept[SparkException] { 60 new SparkContext(conf) 61 } 62 // Even though those earlier calls failed, we should still be able to create a new context 63 sc = new SparkContext(conf.setMaster("local").setAppName("test")) 64 } 65 66 test("Check for multiple SparkContexts can be disabled via undocumented debug option") { 67 var secondSparkContext: SparkContext = null 68 try { 69 val conf = new SparkConf().setAppName("test").setMaster("local") 70 .set("spark.driver.allowMultipleContexts", "true") 71 sc = new SparkContext(conf) 72 secondSparkContext = new SparkContext(conf) 73 } finally { 74 Option(secondSparkContext).foreach(_.stop()) 75 } 76 } 77 78 test("Test getOrCreate") { 79 var sc2: SparkContext = null 80 SparkContext.clearActiveContext() 81 val conf = new SparkConf().setAppName("test").setMaster("local") 82 83 sc = SparkContext.getOrCreate(conf) 84 85 assert(sc.getConf.get("spark.app.name").equals("test")) 86 sc2 = SparkContext.getOrCreate(new SparkConf().setAppName("test2").setMaster("local")) 87 assert(sc2.getConf.get("spark.app.name").equals("test")) 88 assert(sc === sc2) 89 assert(sc eq sc2) 90 91 // Try creating second context to confirm that it's still possible, if desired 92 sc2 = new SparkContext(new SparkConf().setAppName("test3").setMaster("local") 93 .set("spark.driver.allowMultipleContexts", "true")) 94 95 sc2.stop() 96 } 97 98 test("BytesWritable implicit conversion is correct") { 99 // Regression test for SPARK-3121 100 val bytesWritable = new BytesWritable() 101 val inputArray = (1 to 10).map(_.toByte).toArray 102 bytesWritable.set(inputArray, 0, 10) 103 bytesWritable.set(inputArray, 0, 5) 104 105 val converter = WritableConverter.bytesWritableConverter() 106 val byteArray = converter.convert(bytesWritable) 107 assert(byteArray.length === 5) 108 109 bytesWritable.set(inputArray, 0, 0) 110 val byteArray2 = converter.convert(bytesWritable) 111 assert(byteArray2.length === 0) 112 } 113 114 test("basic case for addFile and listFiles") { 115 val dir = Utils.createTempDir() 116 117 val file1 = File.createTempFile("someprefix1", "somesuffix1", dir) 118 val absolutePath1 = file1.getAbsolutePath 119 120 val file2 = File.createTempFile("someprefix2", "somesuffix2", dir) 121 val relativePath = file2.getParent + "/../" + file2.getParentFile.getName + "/" + file2.getName 122 val absolutePath2 = file2.getAbsolutePath 123 124 try { 125 Files.write("somewords1", file1, StandardCharsets.UTF_8) 126 Files.write("somewords2", file2, StandardCharsets.UTF_8) 127 val length1 = file1.length() 128 val length2 = file2.length() 129 130 sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) 131 sc.addFile(file1.getAbsolutePath) 132 sc.addFile(relativePath) 133 sc.parallelize(Array(1), 1).map(x => { 134 val gotten1 = new File(SparkFiles.get(file1.getName)) 135 val gotten2 = new File(SparkFiles.get(file2.getName)) 136 if (!gotten1.exists()) { 137 throw new SparkException("file doesn't exist : " + absolutePath1) 138 } 139 if (!gotten2.exists()) { 140 throw new SparkException("file doesn't exist : " + absolutePath2) 141 } 142 143 if (length1 != gotten1.length()) { 144 throw new SparkException( 145 s"file has different length $length1 than added file ${gotten1.length()} : " + 146 absolutePath1) 147 } 148 if (length2 != gotten2.length()) { 149 throw new SparkException( 150 s"file has different length $length2 than added file ${gotten2.length()} : " + 151 absolutePath2) 152 } 153 154 if (absolutePath1 == gotten1.getAbsolutePath) { 155 throw new SparkException("file should have been copied :" + absolutePath1) 156 } 157 if (absolutePath2 == gotten2.getAbsolutePath) { 158 throw new SparkException("file should have been copied : " + absolutePath2) 159 } 160 x 161 }).count() 162 assert(sc.listFiles().filter(_.contains("somesuffix1")).size == 1) 163 } finally { 164 sc.stop() 165 } 166 } 167 168 test("add and list jar files") { 169 val jarPath = Thread.currentThread().getContextClassLoader.getResource("TestUDTF.jar") 170 try { 171 sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) 172 sc.addJar(jarPath.toString) 173 assert(sc.listJars().filter(_.contains("TestUDTF.jar")).size == 1) 174 } finally { 175 sc.stop() 176 } 177 } 178 179 test("SPARK-17650: malformed url's throw exceptions before bricking Executors") { 180 try { 181 sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) 182 Seq("http", "https", "ftp").foreach { scheme => 183 val badURL = s"$scheme://user:pwd/path" 184 val e1 = intercept[MalformedURLException] { 185 sc.addFile(badURL) 186 } 187 assert(e1.getMessage.contains(badURL)) 188 val e2 = intercept[MalformedURLException] { 189 sc.addJar(badURL) 190 } 191 assert(e2.getMessage.contains(badURL)) 192 assert(sc.addedFiles.isEmpty) 193 assert(sc.addedJars.isEmpty) 194 } 195 } finally { 196 sc.stop() 197 } 198 } 199 200 test("addFile recursive works") { 201 val pluto = Utils.createTempDir() 202 val neptune = Utils.createTempDir(pluto.getAbsolutePath) 203 val saturn = Utils.createTempDir(neptune.getAbsolutePath) 204 val alien1 = File.createTempFile("alien", "1", neptune) 205 val alien2 = File.createTempFile("alien", "2", saturn) 206 207 try { 208 sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) 209 sc.addFile(neptune.getAbsolutePath, true) 210 sc.parallelize(Array(1), 1).map(x => { 211 val sep = File.separator 212 if (!new File(SparkFiles.get(neptune.getName + sep + alien1.getName)).exists()) { 213 throw new SparkException("can't access file under root added directory") 214 } 215 if (!new File(SparkFiles.get(neptune.getName + sep + saturn.getName + sep + alien2.getName)) 216 .exists()) { 217 throw new SparkException("can't access file in nested directory") 218 } 219 if (new File(SparkFiles.get(pluto.getName + sep + neptune.getName + sep + alien1.getName)) 220 .exists()) { 221 throw new SparkException("file exists that shouldn't") 222 } 223 x 224 }).count() 225 } finally { 226 sc.stop() 227 } 228 } 229 230 test("addFile recursive can't add directories by default") { 231 val dir = Utils.createTempDir() 232 233 try { 234 sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) 235 intercept[SparkException] { 236 sc.addFile(dir.getAbsolutePath) 237 } 238 } finally { 239 sc.stop() 240 } 241 } 242 243 test("cannot call addFile with different paths that have the same filename") { 244 val dir = Utils.createTempDir() 245 try { 246 val subdir1 = new File(dir, "subdir1") 247 val subdir2 = new File(dir, "subdir2") 248 assert(subdir1.mkdir()) 249 assert(subdir2.mkdir()) 250 val file1 = new File(subdir1, "file") 251 val file2 = new File(subdir2, "file") 252 Files.write("old", file1, StandardCharsets.UTF_8) 253 Files.write("new", file2, StandardCharsets.UTF_8) 254 sc = new SparkContext("local-cluster[1,1,1024]", "test") 255 sc.addFile(file1.getAbsolutePath) 256 def getAddedFileContents(): String = { 257 sc.parallelize(Seq(0)).map { _ => 258 scala.io.Source.fromFile(SparkFiles.get("file")).mkString 259 }.first() 260 } 261 assert(getAddedFileContents() === "old") 262 intercept[IllegalArgumentException] { 263 sc.addFile(file2.getAbsolutePath) 264 } 265 assert(getAddedFileContents() === "old") 266 } finally { 267 Utils.deleteRecursively(dir) 268 } 269 } 270 271 // Regression tests for SPARK-16787 272 for ( 273 schedulingMode <- Seq("local-mode", "non-local-mode"); 274 method <- Seq("addJar", "addFile") 275 ) { 276 val jarPath = Thread.currentThread().getContextClassLoader.getResource("TestUDTF.jar").toString 277 val master = schedulingMode match { 278 case "local-mode" => "local" 279 case "non-local-mode" => "local-cluster[1,1,1024]" 280 } 281 test(s"$method can be called twice with same file in $schedulingMode (SPARK-16787)") { 282 sc = new SparkContext(master, "test") 283 method match { 284 case "addJar" => 285 sc.addJar(jarPath) 286 sc.addJar(jarPath) 287 case "addFile" => 288 sc.addFile(jarPath) 289 sc.addFile(jarPath) 290 } 291 } 292 } 293 294 test("add jar with invalid path") { 295 val tmpDir = Utils.createTempDir() 296 val tmpJar = File.createTempFile("test", ".jar", tmpDir) 297 298 sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) 299 sc.addJar(tmpJar.getAbsolutePath) 300 301 // Invaid jar path will only print the error log, will not add to file server. 302 sc.addJar("dummy.jar") 303 sc.addJar("") 304 sc.addJar(tmpDir.getAbsolutePath) 305 306 sc.listJars().size should be (1) 307 sc.listJars().head should include (tmpJar.getName) 308 } 309 310 test("Cancelling job group should not cause SparkContext to shutdown (SPARK-6414)") { 311 try { 312 sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) 313 val future = sc.parallelize(Seq(0)).foreachAsync(_ => {Thread.sleep(1000L)}) 314 sc.cancelJobGroup("nonExistGroupId") 315 Await.ready(future, Duration(2, TimeUnit.SECONDS)) 316 317 // In SPARK-6414, sc.cancelJobGroup will cause NullPointerException and cause 318 // SparkContext to shutdown, so the following assertion will fail. 319 assert(sc.parallelize(1 to 10).count() == 10L) 320 } finally { 321 sc.stop() 322 } 323 } 324 325 test("Comma separated paths for newAPIHadoopFile/wholeTextFiles/binaryFiles (SPARK-7155)") { 326 // Regression test for SPARK-7155 327 // dir1 and dir2 are used for wholeTextFiles and binaryFiles 328 val dir1 = Utils.createTempDir() 329 val dir2 = Utils.createTempDir() 330 331 val dirpath1 = dir1.getAbsolutePath 332 val dirpath2 = dir2.getAbsolutePath 333 334 // file1 and file2 are placed inside dir1, they are also used for 335 // textFile, hadoopFile, and newAPIHadoopFile 336 // file3, file4 and file5 are placed inside dir2, they are used for 337 // textFile, hadoopFile, and newAPIHadoopFile as well 338 val file1 = new File(dir1, "part-00000") 339 val file2 = new File(dir1, "part-00001") 340 val file3 = new File(dir2, "part-00000") 341 val file4 = new File(dir2, "part-00001") 342 val file5 = new File(dir2, "part-00002") 343 344 val filepath1 = file1.getAbsolutePath 345 val filepath2 = file2.getAbsolutePath 346 val filepath3 = file3.getAbsolutePath 347 val filepath4 = file4.getAbsolutePath 348 val filepath5 = file5.getAbsolutePath 349 350 351 try { 352 // Create 5 text files. 353 Files.write("someline1 in file1\nsomeline2 in file1\nsomeline3 in file1", file1, 354 StandardCharsets.UTF_8) 355 Files.write("someline1 in file2\nsomeline2 in file2", file2, StandardCharsets.UTF_8) 356 Files.write("someline1 in file3", file3, StandardCharsets.UTF_8) 357 Files.write("someline1 in file4\nsomeline2 in file4", file4, StandardCharsets.UTF_8) 358 Files.write("someline1 in file2\nsomeline2 in file5", file5, StandardCharsets.UTF_8) 359 360 sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) 361 362 // Test textFile, hadoopFile, and newAPIHadoopFile for file1 and file2 363 assert(sc.textFile(filepath1 + "," + filepath2).count() == 5L) 364 assert(sc.hadoopFile(filepath1 + "," + filepath2, 365 classOf[TextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L) 366 assert(sc.newAPIHadoopFile(filepath1 + "," + filepath2, 367 classOf[NewTextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L) 368 369 // Test textFile, hadoopFile, and newAPIHadoopFile for file3, file4, and file5 370 assert(sc.textFile(filepath3 + "," + filepath4 + "," + filepath5).count() == 5L) 371 assert(sc.hadoopFile(filepath3 + "," + filepath4 + "," + filepath5, 372 classOf[TextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L) 373 assert(sc.newAPIHadoopFile(filepath3 + "," + filepath4 + "," + filepath5, 374 classOf[NewTextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L) 375 376 // Test wholeTextFiles, and binaryFiles for dir1 and dir2 377 assert(sc.wholeTextFiles(dirpath1 + "," + dirpath2).count() == 5L) 378 assert(sc.binaryFiles(dirpath1 + "," + dirpath2).count() == 5L) 379 380 } finally { 381 sc.stop() 382 } 383 } 384 385 test("Default path for file based RDDs is properly set (SPARK-12517)") { 386 sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) 387 388 // Test filetextFile, wholeTextFiles, binaryFiles, hadoopFile and 389 // newAPIHadoopFile for setting the default path as the RDD name 390 val mockPath = "default/path/for/" 391 392 var targetPath = mockPath + "textFile" 393 assert(sc.textFile(targetPath).name === targetPath) 394 395 targetPath = mockPath + "wholeTextFiles" 396 assert(sc.wholeTextFiles(targetPath).name === targetPath) 397 398 targetPath = mockPath + "binaryFiles" 399 assert(sc.binaryFiles(targetPath).name === targetPath) 400 401 targetPath = mockPath + "hadoopFile" 402 assert(sc.hadoopFile(targetPath).name === targetPath) 403 404 targetPath = mockPath + "newAPIHadoopFile" 405 assert(sc.newAPIHadoopFile(targetPath).name === targetPath) 406 407 sc.stop() 408 } 409 410 test("calling multiple sc.stop() must not throw any exception") { 411 noException should be thrownBy { 412 sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) 413 val cnt = sc.parallelize(1 to 4).count() 414 sc.cancelAllJobs() 415 sc.stop() 416 // call stop second time 417 sc.stop() 418 } 419 } 420 421 test("No exception when both num-executors and dynamic allocation set.") { 422 noException should be thrownBy { 423 sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local") 424 .set("spark.dynamicAllocation.enabled", "true").set("spark.executor.instances", "6")) 425 assert(sc.executorAllocationManager.isEmpty) 426 assert(sc.getConf.getInt("spark.executor.instances", 0) === 6) 427 } 428 } 429 430 431 test("localProperties are inherited by spawned threads.") { 432 sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) 433 sc.setLocalProperty("testProperty", "testValue") 434 var result = "unset"; 435 val thread = new Thread() { override def run() = {result = sc.getLocalProperty("testProperty")}} 436 thread.start() 437 thread.join() 438 sc.stop() 439 assert(result == "testValue") 440 } 441 442 test("localProperties do not cross-talk between threads.") { 443 sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) 444 var result = "unset"; 445 val thread1 = new Thread() { 446 override def run() = {sc.setLocalProperty("testProperty", "testValue")}} 447 // testProperty should be unset and thus return null 448 val thread2 = new Thread() { 449 override def run() = {result = sc.getLocalProperty("testProperty")}} 450 thread1.start() 451 thread1.join() 452 thread2.start() 453 thread2.join() 454 sc.stop() 455 assert(result == null) 456 } 457 458 test("log level case-insensitive and reset log level") { 459 sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) 460 val originalLevel = org.apache.log4j.Logger.getRootLogger().getLevel 461 try { 462 sc.setLogLevel("debug") 463 assert(org.apache.log4j.Logger.getRootLogger().getLevel === org.apache.log4j.Level.DEBUG) 464 sc.setLogLevel("INfo") 465 assert(org.apache.log4j.Logger.getRootLogger().getLevel === org.apache.log4j.Level.INFO) 466 } finally { 467 sc.setLogLevel(originalLevel.toString) 468 assert(org.apache.log4j.Logger.getRootLogger().getLevel === originalLevel) 469 sc.stop() 470 } 471 } 472 473 test("SPARK-19446: DebugFilesystem.assertNoOpenStreams should report " + 474 "open streams to help debugging") { 475 val fs = new DebugFilesystem() 476 fs.initialize(new URI("file:///"), new Configuration()) 477 val file = File.createTempFile("SPARK19446", "temp") 478 Files.write(Array.ofDim[Byte](1000), file) 479 val path = new Path("file:///" + file.getCanonicalPath) 480 val stream = fs.open(path) 481 val exc = intercept[RuntimeException] { 482 DebugFilesystem.assertNoOpenStreams() 483 } 484 assert(exc != null) 485 assert(exc.getCause() != null) 486 stream.close() 487 } 488} 489