1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark 19 20import java.io._ 21import java.nio.ByteBuffer 22import java.util.zip.GZIPOutputStream 23 24import scala.io.Source 25 26import org.apache.hadoop.io._ 27import org.apache.hadoop.io.compress.DefaultCodec 28import org.apache.hadoop.mapred.{FileAlreadyExistsException, FileSplit, JobConf, TextInputFormat, TextOutputFormat} 29import org.apache.hadoop.mapreduce.Job 30import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, TextInputFormat => NewTextInputFormat} 31import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat} 32 33import org.apache.spark.internal.config.IGNORE_CORRUPT_FILES 34import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD} 35import org.apache.spark.storage.StorageLevel 36import org.apache.spark.util.Utils 37 38class FileSuite extends SparkFunSuite with LocalSparkContext { 39 var tempDir: File = _ 40 41 override def beforeEach() { 42 super.beforeEach() 43 tempDir = Utils.createTempDir() 44 } 45 46 override def afterEach() { 47 try { 48 Utils.deleteRecursively(tempDir) 49 } finally { 50 super.afterEach() 51 } 52 } 53 54 test("text files") { 55 sc = new SparkContext("local", "test") 56 val outputDir = new File(tempDir, "output").getAbsolutePath 57 val nums = sc.makeRDD(1 to 4) 58 nums.saveAsTextFile(outputDir) 59 // Read the plain text file and check it's OK 60 val outputFile = new File(outputDir, "part-00000") 61 val content = Source.fromFile(outputFile).mkString 62 assert(content === "1\n2\n3\n4\n") 63 // Also try reading it in as a text file RDD 64 assert(sc.textFile(outputDir).collect().toList === List("1", "2", "3", "4")) 65 } 66 67 test("text files (compressed)") { 68 sc = new SparkContext("local", "test") 69 val normalDir = new File(tempDir, "output_normal").getAbsolutePath 70 val compressedOutputDir = new File(tempDir, "output_compressed").getAbsolutePath 71 val codec = new DefaultCodec() 72 73 val data = sc.parallelize("a" * 10000, 1) 74 data.saveAsTextFile(normalDir) 75 data.saveAsTextFile(compressedOutputDir, classOf[DefaultCodec]) 76 77 val normalFile = new File(normalDir, "part-00000") 78 val normalContent = sc.textFile(normalDir).collect 79 assert(normalContent === Array.fill(10000)("a")) 80 81 val compressedFile = new File(compressedOutputDir, "part-00000" + codec.getDefaultExtension) 82 val compressedContent = sc.textFile(compressedOutputDir).collect 83 assert(compressedContent === Array.fill(10000)("a")) 84 85 assert(compressedFile.length < normalFile.length) 86 } 87 88 test("SequenceFiles") { 89 sc = new SparkContext("local", "test") 90 val outputDir = new File(tempDir, "output").getAbsolutePath 91 val nums = sc.makeRDD(1 to 3).map(x => (x, "a" * x)) // (1,a), (2,aa), (3,aaa) 92 nums.saveAsSequenceFile(outputDir) 93 // Try reading the output back as a SequenceFile 94 val output = sc.sequenceFile[IntWritable, Text](outputDir) 95 assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)")) 96 } 97 98 test("SequenceFile (compressed)") { 99 sc = new SparkContext("local", "test") 100 val normalDir = new File(tempDir, "output_normal").getAbsolutePath 101 val compressedOutputDir = new File(tempDir, "output_compressed").getAbsolutePath 102 val codec = new DefaultCodec() 103 104 val data = sc.parallelize(Seq.fill(100)("abc"), 1).map(x => (x, x)) 105 data.saveAsSequenceFile(normalDir) 106 data.saveAsSequenceFile(compressedOutputDir, Some(classOf[DefaultCodec])) 107 108 val normalFile = new File(normalDir, "part-00000") 109 val normalContent = sc.sequenceFile[String, String](normalDir).collect 110 assert(normalContent === Array.fill(100)("abc", "abc")) 111 112 val compressedFile = new File(compressedOutputDir, "part-00000" + codec.getDefaultExtension) 113 val compressedContent = sc.sequenceFile[String, String](compressedOutputDir).collect 114 assert(compressedContent === Array.fill(100)("abc", "abc")) 115 116 assert(compressedFile.length < normalFile.length) 117 } 118 119 test("SequenceFile with writable key") { 120 sc = new SparkContext("local", "test") 121 val outputDir = new File(tempDir, "output").getAbsolutePath 122 val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), "a" * x)) 123 nums.saveAsSequenceFile(outputDir) 124 // Try reading the output back as a SequenceFile 125 val output = sc.sequenceFile[IntWritable, Text](outputDir) 126 assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)")) 127 } 128 129 test("SequenceFile with writable value") { 130 sc = new SparkContext("local", "test") 131 val outputDir = new File(tempDir, "output").getAbsolutePath 132 val nums = sc.makeRDD(1 to 3).map(x => (x, new Text("a" * x))) 133 nums.saveAsSequenceFile(outputDir) 134 // Try reading the output back as a SequenceFile 135 val output = sc.sequenceFile[IntWritable, Text](outputDir) 136 assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)")) 137 } 138 139 test("SequenceFile with writable key and value") { 140 sc = new SparkContext("local", "test") 141 val outputDir = new File(tempDir, "output").getAbsolutePath 142 val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x))) 143 nums.saveAsSequenceFile(outputDir) 144 // Try reading the output back as a SequenceFile 145 val output = sc.sequenceFile[IntWritable, Text](outputDir) 146 assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)")) 147 } 148 149 test("implicit conversions in reading SequenceFiles") { 150 sc = new SparkContext("local", "test") 151 val outputDir = new File(tempDir, "output").getAbsolutePath 152 val nums = sc.makeRDD(1 to 3).map(x => (x, "a" * x)) // (1,a), (2,aa), (3,aaa) 153 nums.saveAsSequenceFile(outputDir) 154 // Similar to the tests above, we read a SequenceFile, but this time we pass type params 155 // that are convertable to Writable instead of calling sequenceFile[IntWritable, Text] 156 val output1 = sc.sequenceFile[Int, String](outputDir) 157 assert(output1.collect().toList === List((1, "a"), (2, "aa"), (3, "aaa"))) 158 // Also try having one type be a subclass of Writable and one not 159 val output2 = sc.sequenceFile[Int, Text](outputDir) 160 assert(output2.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)")) 161 val output3 = sc.sequenceFile[IntWritable, String](outputDir) 162 assert(output3.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)")) 163 } 164 165 test("object files of ints") { 166 sc = new SparkContext("local", "test") 167 val outputDir = new File(tempDir, "output").getAbsolutePath 168 val nums = sc.makeRDD(1 to 4) 169 nums.saveAsObjectFile(outputDir) 170 // Try reading the output back as an object file 171 val output = sc.objectFile[Int](outputDir) 172 assert(output.collect().toList === List(1, 2, 3, 4)) 173 } 174 175 test("object files of complex types") { 176 sc = new SparkContext("local", "test") 177 val outputDir = new File(tempDir, "output").getAbsolutePath 178 val nums = sc.makeRDD(1 to 3).map(x => (x, "a" * x)) 179 nums.saveAsObjectFile(outputDir) 180 // Try reading the output back as an object file 181 val output = sc.objectFile[(Int, String)](outputDir) 182 assert(output.collect().toList === List((1, "a"), (2, "aa"), (3, "aaa"))) 183 } 184 185 test("object files of classes from a JAR") { 186 // scalastyle:off classforname 187 val original = Thread.currentThread().getContextClassLoader 188 val className = "FileSuiteObjectFileTest" 189 val jar = TestUtils.createJarWithClasses(Seq(className)) 190 val loader = new java.net.URLClassLoader(Array(jar), Utils.getContextOrSparkClassLoader) 191 Thread.currentThread().setContextClassLoader(loader) 192 try { 193 sc = new SparkContext("local", "test") 194 val objs = sc.makeRDD(1 to 3).map { x => 195 val loader = Thread.currentThread().getContextClassLoader 196 Class.forName(className, true, loader).newInstance() 197 } 198 val outputDir = new File(tempDir, "output").getAbsolutePath 199 objs.saveAsObjectFile(outputDir) 200 // Try reading the output back as an object file 201 val ct = reflect.ClassTag[Any](Class.forName(className, true, loader)) 202 val output = sc.objectFile[Any](outputDir) 203 assert(output.collect().size === 3) 204 assert(output.collect().head.getClass.getName === className) 205 } 206 finally { 207 Thread.currentThread().setContextClassLoader(original) 208 } 209 // scalastyle:on classforname 210 } 211 212 test("write SequenceFile using new Hadoop API") { 213 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat 214 sc = new SparkContext("local", "test") 215 val outputDir = new File(tempDir, "output").getAbsolutePath 216 val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x))) 217 nums.saveAsNewAPIHadoopFile[SequenceFileOutputFormat[IntWritable, Text]]( 218 outputDir) 219 val output = sc.sequenceFile[IntWritable, Text](outputDir) 220 assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)")) 221 } 222 223 test("read SequenceFile using new Hadoop API") { 224 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat 225 sc = new SparkContext("local", "test") 226 val outputDir = new File(tempDir, "output").getAbsolutePath 227 val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x))) 228 nums.saveAsSequenceFile(outputDir) 229 val output = 230 sc.newAPIHadoopFile[IntWritable, Text, SequenceFileInputFormat[IntWritable, Text]](outputDir) 231 assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)")) 232 } 233 234 private def writeBinaryData(testOutput: Array[Byte], testOutputCopies: Int): File = { 235 val outFile = new File(tempDir, "record-bytestream-00000.bin") 236 val file = new FileOutputStream(outFile) 237 val channel = file.getChannel 238 for (i <- 0 until testOutputCopies) { 239 // Shift values by i so that they're different in the output 240 val alteredOutput = testOutput.map(b => (b + i).toByte) 241 channel.write(ByteBuffer.wrap(alteredOutput)) 242 } 243 channel.close() 244 file.close() 245 outFile 246 } 247 248 test("binary file input as byte array") { 249 sc = new SparkContext("local", "test") 250 val testOutput = Array[Byte](1, 2, 3, 4, 5, 6) 251 val outFile = writeBinaryData(testOutput, 1) 252 val inRdd = sc.binaryFiles(outFile.getAbsolutePath) 253 val (infile, indata) = inRdd.collect().head 254 // Make sure the name and array match 255 assert(infile.contains(outFile.getAbsolutePath)) // a prefix may get added 256 assert(indata.toArray === testOutput) 257 } 258 259 test("portabledatastream caching tests") { 260 sc = new SparkContext("local", "test") 261 val testOutput = Array[Byte](1, 2, 3, 4, 5, 6) 262 val outFile = writeBinaryData(testOutput, 1) 263 val inRdd = sc.binaryFiles(outFile.getAbsolutePath).cache() 264 inRdd.foreach(_._2.toArray()) // force the file to read 265 // Try reading the output back as an object file 266 assert(inRdd.values.collect().head.toArray === testOutput) 267 } 268 269 test("portabledatastream persist disk storage") { 270 sc = new SparkContext("local", "test") 271 val testOutput = Array[Byte](1, 2, 3, 4, 5, 6) 272 val outFile = writeBinaryData(testOutput, 1) 273 val inRdd = sc.binaryFiles(outFile.getAbsolutePath).persist(StorageLevel.DISK_ONLY) 274 inRdd.foreach(_._2.toArray()) // force the file to read 275 assert(inRdd.values.collect().head.toArray === testOutput) 276 } 277 278 test("portabledatastream flatmap tests") { 279 sc = new SparkContext("local", "test") 280 val testOutput = Array[Byte](1, 2, 3, 4, 5, 6) 281 val outFile = writeBinaryData(testOutput, 1) 282 val inRdd = sc.binaryFiles(outFile.getAbsolutePath) 283 val numOfCopies = 3 284 val copyRdd = inRdd.flatMap(curData => (0 until numOfCopies).map(_ => curData._2)) 285 val copyArr = copyRdd.collect() 286 assert(copyArr.length == numOfCopies) 287 for (i <- copyArr.indices) { 288 assert(copyArr(i).toArray === testOutput) 289 } 290 } 291 292 test("fixed record length binary file as byte array") { 293 sc = new SparkContext("local", "test") 294 val testOutput = Array[Byte](1, 2, 3, 4, 5, 6) 295 val testOutputCopies = 10 296 val outFile = writeBinaryData(testOutput, testOutputCopies) 297 val inRdd = sc.binaryRecords(outFile.getAbsolutePath, testOutput.length) 298 assert(inRdd.count == testOutputCopies) 299 val inArr = inRdd.collect() 300 for (i <- inArr.indices) { 301 assert(inArr(i) === testOutput.map(b => (b + i).toByte)) 302 } 303 } 304 305 test ("negative binary record length should raise an exception") { 306 sc = new SparkContext("local", "test") 307 val outFile = writeBinaryData(Array[Byte](1, 2, 3, 4, 5, 6), 1) 308 intercept[SparkException] { 309 sc.binaryRecords(outFile.getAbsolutePath, -1).count() 310 } 311 } 312 313 test("file caching") { 314 sc = new SparkContext("local", "test") 315 val out = new FileWriter(tempDir + "/input") 316 out.write("Hello world!\n") 317 out.write("What's up?\n") 318 out.write("Goodbye\n") 319 out.close() 320 val rdd = sc.textFile(tempDir + "/input").cache() 321 assert(rdd.count() === 3) 322 assert(rdd.count() === 3) 323 assert(rdd.count() === 3) 324 } 325 326 test ("prevent user from overwriting the empty directory (old Hadoop API)") { 327 sc = new SparkContext("local", "test") 328 val randomRDD = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 1) 329 intercept[FileAlreadyExistsException] { 330 randomRDD.saveAsTextFile(tempDir.getPath) 331 } 332 } 333 334 test ("prevent user from overwriting the non-empty directory (old Hadoop API)") { 335 sc = new SparkContext("local", "test") 336 val randomRDD = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 1) 337 randomRDD.saveAsTextFile(tempDir.getPath + "/output") 338 assert(new File(tempDir.getPath + "/output/part-00000").exists() === true) 339 intercept[FileAlreadyExistsException] { 340 randomRDD.saveAsTextFile(tempDir.getPath + "/output") 341 } 342 } 343 344 test ("allow user to disable the output directory existence checking (old Hadoop API") { 345 val sf = new SparkConf() 346 sf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false") 347 sc = new SparkContext(sf) 348 val randomRDD = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 1) 349 randomRDD.saveAsTextFile(tempDir.getPath + "/output") 350 assert(new File(tempDir.getPath + "/output/part-00000").exists() === true) 351 randomRDD.saveAsTextFile(tempDir.getPath + "/output") 352 assert(new File(tempDir.getPath + "/output/part-00000").exists() === true) 353 } 354 355 test ("prevent user from overwriting the empty directory (new Hadoop API)") { 356 sc = new SparkContext("local", "test") 357 val randomRDD = sc.parallelize( 358 Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) 359 intercept[FileAlreadyExistsException] { 360 randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath) 361 } 362 } 363 364 test ("prevent user from overwriting the non-empty directory (new Hadoop API)") { 365 sc = new SparkContext("local", "test") 366 val randomRDD = sc.parallelize( 367 Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) 368 randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]]( 369 tempDir.getPath + "/output") 370 assert(new File(tempDir.getPath + "/output/part-r-00000").exists() === true) 371 intercept[FileAlreadyExistsException] { 372 randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath) 373 } 374 } 375 376 test ("allow user to disable the output directory existence checking (new Hadoop API") { 377 val sf = new SparkConf() 378 sf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false") 379 sc = new SparkContext(sf) 380 val randomRDD = sc.parallelize( 381 Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) 382 randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]]( 383 tempDir.getPath + "/output") 384 assert(new File(tempDir.getPath + "/output/part-r-00000").exists() === true) 385 randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]]( 386 tempDir.getPath + "/output") 387 assert(new File(tempDir.getPath + "/output/part-r-00000").exists() === true) 388 } 389 390 test ("save Hadoop Dataset through old Hadoop API") { 391 sc = new SparkContext("local", "test") 392 val randomRDD = sc.parallelize( 393 Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) 394 val job = new JobConf() 395 job.setOutputKeyClass(classOf[String]) 396 job.setOutputValueClass(classOf[String]) 397 job.set("mapred.output.format.class", classOf[TextOutputFormat[String, String]].getName) 398 job.set("mapred.output.dir", tempDir.getPath + "/outputDataset_old") 399 randomRDD.saveAsHadoopDataset(job) 400 assert(new File(tempDir.getPath + "/outputDataset_old/part-00000").exists() === true) 401 } 402 403 test ("save Hadoop Dataset through new Hadoop API") { 404 sc = new SparkContext("local", "test") 405 val randomRDD = sc.parallelize( 406 Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) 407 val job = Job.getInstance(sc.hadoopConfiguration) 408 job.setOutputKeyClass(classOf[String]) 409 job.setOutputValueClass(classOf[String]) 410 job.setOutputFormatClass(classOf[NewTextOutputFormat[String, String]]) 411 val jobConfig = job.getConfiguration 412 jobConfig.set("mapred.output.dir", tempDir.getPath + "/outputDataset_new") 413 randomRDD.saveAsNewAPIHadoopDataset(jobConfig) 414 assert(new File(tempDir.getPath + "/outputDataset_new/part-r-00000").exists() === true) 415 } 416 417 test("Get input files via old Hadoop API") { 418 sc = new SparkContext("local", "test") 419 val outDir = new File(tempDir, "output").getAbsolutePath 420 sc.makeRDD(1 to 4, 2).saveAsTextFile(outDir) 421 422 val inputPaths = 423 sc.hadoopFile(outDir, classOf[TextInputFormat], classOf[LongWritable], classOf[Text]) 424 .asInstanceOf[HadoopRDD[_, _]] 425 .mapPartitionsWithInputSplit { (split, part) => 426 Iterator(split.asInstanceOf[FileSplit].getPath.toUri.getPath) 427 }.collect() 428 assert(inputPaths.toSet === Set(s"$outDir/part-00000", s"$outDir/part-00001")) 429 } 430 431 test("Get input files via new Hadoop API") { 432 sc = new SparkContext("local", "test") 433 val outDir = new File(tempDir, "output").getAbsolutePath 434 sc.makeRDD(1 to 4, 2).saveAsTextFile(outDir) 435 436 val inputPaths = 437 sc.newAPIHadoopFile(outDir, classOf[NewTextInputFormat], classOf[LongWritable], classOf[Text]) 438 .asInstanceOf[NewHadoopRDD[_, _]] 439 .mapPartitionsWithInputSplit { (split, part) => 440 Iterator(split.asInstanceOf[NewFileSplit].getPath.toUri.getPath) 441 }.collect() 442 assert(inputPaths.toSet === Set(s"$outDir/part-00000", s"$outDir/part-00001")) 443 } 444 445 test("spark.files.ignoreCorruptFiles should work both HadoopRDD and NewHadoopRDD") { 446 val inputFile = File.createTempFile("input-", ".gz") 447 try { 448 // Create a corrupt gzip file 449 val byteOutput = new ByteArrayOutputStream() 450 val gzip = new GZIPOutputStream(byteOutput) 451 try { 452 gzip.write(Array[Byte](1, 2, 3, 4)) 453 } finally { 454 gzip.close() 455 } 456 val bytes = byteOutput.toByteArray 457 val o = new FileOutputStream(inputFile) 458 try { 459 // It's corrupt since we only write half of bytes into the file. 460 o.write(bytes.take(bytes.length / 2)) 461 } finally { 462 o.close() 463 } 464 465 // Reading a corrupt gzip file should throw EOFException 466 sc = new SparkContext("local", "test") 467 // Test HadoopRDD 468 var e = intercept[SparkException] { 469 sc.textFile(inputFile.toURI.toString).collect() 470 } 471 assert(e.getCause.isInstanceOf[EOFException]) 472 assert(e.getCause.getMessage === "Unexpected end of input stream") 473 // Test NewHadoopRDD 474 e = intercept[SparkException] { 475 sc.newAPIHadoopFile( 476 inputFile.toURI.toString, 477 classOf[NewTextInputFormat], 478 classOf[LongWritable], 479 classOf[Text]).collect() 480 } 481 assert(e.getCause.isInstanceOf[EOFException]) 482 assert(e.getCause.getMessage === "Unexpected end of input stream") 483 sc.stop() 484 485 val conf = new SparkConf().set(IGNORE_CORRUPT_FILES, true) 486 sc = new SparkContext("local", "test", conf) 487 // Test HadoopRDD 488 assert(sc.textFile(inputFile.toURI.toString).collect().isEmpty) 489 // Test NewHadoopRDD 490 assert { 491 sc.newAPIHadoopFile( 492 inputFile.toURI.toString, 493 classOf[NewTextInputFormat], 494 classOf[LongWritable], 495 classOf[Text]).collect().isEmpty 496 } 497 } finally { 498 inputFile.delete() 499 } 500 } 501 502} 503