1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark
19
20import java.io._
21import java.nio.ByteBuffer
22import java.util.zip.GZIPOutputStream
23
24import scala.io.Source
25
26import org.apache.hadoop.io._
27import org.apache.hadoop.io.compress.DefaultCodec
28import org.apache.hadoop.mapred.{FileAlreadyExistsException, FileSplit, JobConf, TextInputFormat, TextOutputFormat}
29import org.apache.hadoop.mapreduce.Job
30import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, TextInputFormat => NewTextInputFormat}
31import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
32
33import org.apache.spark.internal.config.IGNORE_CORRUPT_FILES
34import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD}
35import org.apache.spark.storage.StorageLevel
36import org.apache.spark.util.Utils
37
38class FileSuite extends SparkFunSuite with LocalSparkContext {
39  var tempDir: File = _
40
41  override def beforeEach() {
42    super.beforeEach()
43    tempDir = Utils.createTempDir()
44  }
45
46  override def afterEach() {
47    try {
48      Utils.deleteRecursively(tempDir)
49    } finally {
50      super.afterEach()
51    }
52  }
53
54  test("text files") {
55    sc = new SparkContext("local", "test")
56    val outputDir = new File(tempDir, "output").getAbsolutePath
57    val nums = sc.makeRDD(1 to 4)
58    nums.saveAsTextFile(outputDir)
59    // Read the plain text file and check it's OK
60    val outputFile = new File(outputDir, "part-00000")
61    val content = Source.fromFile(outputFile).mkString
62    assert(content === "1\n2\n3\n4\n")
63    // Also try reading it in as a text file RDD
64    assert(sc.textFile(outputDir).collect().toList === List("1", "2", "3", "4"))
65  }
66
67  test("text files (compressed)") {
68    sc = new SparkContext("local", "test")
69    val normalDir = new File(tempDir, "output_normal").getAbsolutePath
70    val compressedOutputDir = new File(tempDir, "output_compressed").getAbsolutePath
71    val codec = new DefaultCodec()
72
73    val data = sc.parallelize("a" * 10000, 1)
74    data.saveAsTextFile(normalDir)
75    data.saveAsTextFile(compressedOutputDir, classOf[DefaultCodec])
76
77    val normalFile = new File(normalDir, "part-00000")
78    val normalContent = sc.textFile(normalDir).collect
79    assert(normalContent === Array.fill(10000)("a"))
80
81    val compressedFile = new File(compressedOutputDir, "part-00000" + codec.getDefaultExtension)
82    val compressedContent = sc.textFile(compressedOutputDir).collect
83    assert(compressedContent === Array.fill(10000)("a"))
84
85    assert(compressedFile.length < normalFile.length)
86  }
87
88  test("SequenceFiles") {
89    sc = new SparkContext("local", "test")
90    val outputDir = new File(tempDir, "output").getAbsolutePath
91    val nums = sc.makeRDD(1 to 3).map(x => (x, "a" * x)) // (1,a), (2,aa), (3,aaa)
92    nums.saveAsSequenceFile(outputDir)
93    // Try reading the output back as a SequenceFile
94    val output = sc.sequenceFile[IntWritable, Text](outputDir)
95    assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
96  }
97
98  test("SequenceFile (compressed)") {
99    sc = new SparkContext("local", "test")
100    val normalDir = new File(tempDir, "output_normal").getAbsolutePath
101    val compressedOutputDir = new File(tempDir, "output_compressed").getAbsolutePath
102    val codec = new DefaultCodec()
103
104    val data = sc.parallelize(Seq.fill(100)("abc"), 1).map(x => (x, x))
105    data.saveAsSequenceFile(normalDir)
106    data.saveAsSequenceFile(compressedOutputDir, Some(classOf[DefaultCodec]))
107
108    val normalFile = new File(normalDir, "part-00000")
109    val normalContent = sc.sequenceFile[String, String](normalDir).collect
110    assert(normalContent === Array.fill(100)("abc", "abc"))
111
112    val compressedFile = new File(compressedOutputDir, "part-00000" + codec.getDefaultExtension)
113    val compressedContent = sc.sequenceFile[String, String](compressedOutputDir).collect
114    assert(compressedContent === Array.fill(100)("abc", "abc"))
115
116    assert(compressedFile.length < normalFile.length)
117  }
118
119  test("SequenceFile with writable key") {
120    sc = new SparkContext("local", "test")
121    val outputDir = new File(tempDir, "output").getAbsolutePath
122    val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), "a" * x))
123    nums.saveAsSequenceFile(outputDir)
124    // Try reading the output back as a SequenceFile
125    val output = sc.sequenceFile[IntWritable, Text](outputDir)
126    assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
127  }
128
129  test("SequenceFile with writable value") {
130    sc = new SparkContext("local", "test")
131    val outputDir = new File(tempDir, "output").getAbsolutePath
132    val nums = sc.makeRDD(1 to 3).map(x => (x, new Text("a" * x)))
133    nums.saveAsSequenceFile(outputDir)
134    // Try reading the output back as a SequenceFile
135    val output = sc.sequenceFile[IntWritable, Text](outputDir)
136    assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
137  }
138
139  test("SequenceFile with writable key and value") {
140    sc = new SparkContext("local", "test")
141    val outputDir = new File(tempDir, "output").getAbsolutePath
142    val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x)))
143    nums.saveAsSequenceFile(outputDir)
144    // Try reading the output back as a SequenceFile
145    val output = sc.sequenceFile[IntWritable, Text](outputDir)
146    assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
147  }
148
149  test("implicit conversions in reading SequenceFiles") {
150    sc = new SparkContext("local", "test")
151    val outputDir = new File(tempDir, "output").getAbsolutePath
152    val nums = sc.makeRDD(1 to 3).map(x => (x, "a" * x)) // (1,a), (2,aa), (3,aaa)
153    nums.saveAsSequenceFile(outputDir)
154    // Similar to the tests above, we read a SequenceFile, but this time we pass type params
155    // that are convertable to Writable instead of calling sequenceFile[IntWritable, Text]
156    val output1 = sc.sequenceFile[Int, String](outputDir)
157    assert(output1.collect().toList === List((1, "a"), (2, "aa"), (3, "aaa")))
158    // Also try having one type be a subclass of Writable and one not
159    val output2 = sc.sequenceFile[Int, Text](outputDir)
160    assert(output2.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
161    val output3 = sc.sequenceFile[IntWritable, String](outputDir)
162    assert(output3.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
163  }
164
165  test("object files of ints") {
166    sc = new SparkContext("local", "test")
167    val outputDir = new File(tempDir, "output").getAbsolutePath
168    val nums = sc.makeRDD(1 to 4)
169    nums.saveAsObjectFile(outputDir)
170    // Try reading the output back as an object file
171    val output = sc.objectFile[Int](outputDir)
172    assert(output.collect().toList === List(1, 2, 3, 4))
173  }
174
175  test("object files of complex types") {
176    sc = new SparkContext("local", "test")
177    val outputDir = new File(tempDir, "output").getAbsolutePath
178    val nums = sc.makeRDD(1 to 3).map(x => (x, "a" * x))
179    nums.saveAsObjectFile(outputDir)
180    // Try reading the output back as an object file
181    val output = sc.objectFile[(Int, String)](outputDir)
182    assert(output.collect().toList === List((1, "a"), (2, "aa"), (3, "aaa")))
183  }
184
185  test("object files of classes from a JAR") {
186    // scalastyle:off classforname
187    val original = Thread.currentThread().getContextClassLoader
188    val className = "FileSuiteObjectFileTest"
189    val jar = TestUtils.createJarWithClasses(Seq(className))
190    val loader = new java.net.URLClassLoader(Array(jar), Utils.getContextOrSparkClassLoader)
191    Thread.currentThread().setContextClassLoader(loader)
192    try {
193      sc = new SparkContext("local", "test")
194      val objs = sc.makeRDD(1 to 3).map { x =>
195        val loader = Thread.currentThread().getContextClassLoader
196        Class.forName(className, true, loader).newInstance()
197      }
198      val outputDir = new File(tempDir, "output").getAbsolutePath
199      objs.saveAsObjectFile(outputDir)
200      // Try reading the output back as an object file
201      val ct = reflect.ClassTag[Any](Class.forName(className, true, loader))
202      val output = sc.objectFile[Any](outputDir)
203      assert(output.collect().size === 3)
204      assert(output.collect().head.getClass.getName === className)
205    }
206    finally {
207      Thread.currentThread().setContextClassLoader(original)
208    }
209    // scalastyle:on classforname
210  }
211
212  test("write SequenceFile using new Hadoop API") {
213    import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
214    sc = new SparkContext("local", "test")
215    val outputDir = new File(tempDir, "output").getAbsolutePath
216    val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x)))
217    nums.saveAsNewAPIHadoopFile[SequenceFileOutputFormat[IntWritable, Text]](
218        outputDir)
219    val output = sc.sequenceFile[IntWritable, Text](outputDir)
220    assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
221  }
222
223  test("read SequenceFile using new Hadoop API") {
224    import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat
225    sc = new SparkContext("local", "test")
226    val outputDir = new File(tempDir, "output").getAbsolutePath
227    val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x)))
228    nums.saveAsSequenceFile(outputDir)
229    val output =
230      sc.newAPIHadoopFile[IntWritable, Text, SequenceFileInputFormat[IntWritable, Text]](outputDir)
231    assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
232  }
233
234  private def writeBinaryData(testOutput: Array[Byte], testOutputCopies: Int): File = {
235    val outFile = new File(tempDir, "record-bytestream-00000.bin")
236    val file = new FileOutputStream(outFile)
237    val channel = file.getChannel
238    for (i <- 0 until testOutputCopies) {
239      // Shift values by i so that they're different in the output
240      val alteredOutput = testOutput.map(b => (b + i).toByte)
241      channel.write(ByteBuffer.wrap(alteredOutput))
242    }
243    channel.close()
244    file.close()
245    outFile
246  }
247
248  test("binary file input as byte array") {
249    sc = new SparkContext("local", "test")
250    val testOutput = Array[Byte](1, 2, 3, 4, 5, 6)
251    val outFile = writeBinaryData(testOutput, 1)
252    val inRdd = sc.binaryFiles(outFile.getAbsolutePath)
253    val (infile, indata) = inRdd.collect().head
254    // Make sure the name and array match
255    assert(infile.contains(outFile.getAbsolutePath)) // a prefix may get added
256    assert(indata.toArray === testOutput)
257  }
258
259  test("portabledatastream caching tests") {
260    sc = new SparkContext("local", "test")
261    val testOutput = Array[Byte](1, 2, 3, 4, 5, 6)
262    val outFile = writeBinaryData(testOutput, 1)
263    val inRdd = sc.binaryFiles(outFile.getAbsolutePath).cache()
264    inRdd.foreach(_._2.toArray()) // force the file to read
265    // Try reading the output back as an object file
266    assert(inRdd.values.collect().head.toArray === testOutput)
267  }
268
269  test("portabledatastream persist disk storage") {
270    sc = new SparkContext("local", "test")
271    val testOutput = Array[Byte](1, 2, 3, 4, 5, 6)
272    val outFile = writeBinaryData(testOutput, 1)
273    val inRdd = sc.binaryFiles(outFile.getAbsolutePath).persist(StorageLevel.DISK_ONLY)
274    inRdd.foreach(_._2.toArray()) // force the file to read
275    assert(inRdd.values.collect().head.toArray === testOutput)
276  }
277
278  test("portabledatastream flatmap tests") {
279    sc = new SparkContext("local", "test")
280    val testOutput = Array[Byte](1, 2, 3, 4, 5, 6)
281    val outFile = writeBinaryData(testOutput, 1)
282    val inRdd = sc.binaryFiles(outFile.getAbsolutePath)
283    val numOfCopies = 3
284    val copyRdd = inRdd.flatMap(curData => (0 until numOfCopies).map(_ => curData._2))
285    val copyArr = copyRdd.collect()
286    assert(copyArr.length == numOfCopies)
287    for (i <- copyArr.indices) {
288      assert(copyArr(i).toArray === testOutput)
289    }
290  }
291
292  test("fixed record length binary file as byte array") {
293    sc = new SparkContext("local", "test")
294    val testOutput = Array[Byte](1, 2, 3, 4, 5, 6)
295    val testOutputCopies = 10
296    val outFile = writeBinaryData(testOutput, testOutputCopies)
297    val inRdd = sc.binaryRecords(outFile.getAbsolutePath, testOutput.length)
298    assert(inRdd.count == testOutputCopies)
299    val inArr = inRdd.collect()
300    for (i <- inArr.indices) {
301      assert(inArr(i) === testOutput.map(b => (b + i).toByte))
302    }
303  }
304
305  test ("negative binary record length should raise an exception") {
306    sc = new SparkContext("local", "test")
307    val outFile = writeBinaryData(Array[Byte](1, 2, 3, 4, 5, 6), 1)
308    intercept[SparkException] {
309      sc.binaryRecords(outFile.getAbsolutePath, -1).count()
310    }
311  }
312
313  test("file caching") {
314    sc = new SparkContext("local", "test")
315    val out = new FileWriter(tempDir + "/input")
316    out.write("Hello world!\n")
317    out.write("What's up?\n")
318    out.write("Goodbye\n")
319    out.close()
320    val rdd = sc.textFile(tempDir + "/input").cache()
321    assert(rdd.count() === 3)
322    assert(rdd.count() === 3)
323    assert(rdd.count() === 3)
324  }
325
326  test ("prevent user from overwriting the empty directory (old Hadoop API)") {
327    sc = new SparkContext("local", "test")
328    val randomRDD = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 1)
329    intercept[FileAlreadyExistsException] {
330      randomRDD.saveAsTextFile(tempDir.getPath)
331    }
332  }
333
334  test ("prevent user from overwriting the non-empty directory (old Hadoop API)") {
335    sc = new SparkContext("local", "test")
336    val randomRDD = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 1)
337    randomRDD.saveAsTextFile(tempDir.getPath + "/output")
338    assert(new File(tempDir.getPath + "/output/part-00000").exists() === true)
339    intercept[FileAlreadyExistsException] {
340      randomRDD.saveAsTextFile(tempDir.getPath + "/output")
341    }
342  }
343
344  test ("allow user to disable the output directory existence checking (old Hadoop API") {
345    val sf = new SparkConf()
346    sf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false")
347    sc = new SparkContext(sf)
348    val randomRDD = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 1)
349    randomRDD.saveAsTextFile(tempDir.getPath + "/output")
350    assert(new File(tempDir.getPath + "/output/part-00000").exists() === true)
351    randomRDD.saveAsTextFile(tempDir.getPath + "/output")
352    assert(new File(tempDir.getPath + "/output/part-00000").exists() === true)
353  }
354
355  test ("prevent user from overwriting the empty directory (new Hadoop API)") {
356    sc = new SparkContext("local", "test")
357    val randomRDD = sc.parallelize(
358      Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
359    intercept[FileAlreadyExistsException] {
360      randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath)
361    }
362  }
363
364  test ("prevent user from overwriting the non-empty directory (new Hadoop API)") {
365    sc = new SparkContext("local", "test")
366    val randomRDD = sc.parallelize(
367      Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
368    randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](
369      tempDir.getPath + "/output")
370    assert(new File(tempDir.getPath + "/output/part-r-00000").exists() === true)
371    intercept[FileAlreadyExistsException] {
372      randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath)
373    }
374  }
375
376  test ("allow user to disable the output directory existence checking (new Hadoop API") {
377    val sf = new SparkConf()
378    sf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false")
379    sc = new SparkContext(sf)
380    val randomRDD = sc.parallelize(
381      Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
382    randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](
383      tempDir.getPath + "/output")
384    assert(new File(tempDir.getPath + "/output/part-r-00000").exists() === true)
385    randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](
386      tempDir.getPath + "/output")
387    assert(new File(tempDir.getPath + "/output/part-r-00000").exists() === true)
388  }
389
390  test ("save Hadoop Dataset through old Hadoop API") {
391    sc = new SparkContext("local", "test")
392    val randomRDD = sc.parallelize(
393      Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
394    val job = new JobConf()
395    job.setOutputKeyClass(classOf[String])
396    job.setOutputValueClass(classOf[String])
397    job.set("mapred.output.format.class", classOf[TextOutputFormat[String, String]].getName)
398    job.set("mapred.output.dir", tempDir.getPath + "/outputDataset_old")
399    randomRDD.saveAsHadoopDataset(job)
400    assert(new File(tempDir.getPath + "/outputDataset_old/part-00000").exists() === true)
401  }
402
403  test ("save Hadoop Dataset through new Hadoop API") {
404    sc = new SparkContext("local", "test")
405    val randomRDD = sc.parallelize(
406      Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
407    val job = Job.getInstance(sc.hadoopConfiguration)
408    job.setOutputKeyClass(classOf[String])
409    job.setOutputValueClass(classOf[String])
410    job.setOutputFormatClass(classOf[NewTextOutputFormat[String, String]])
411    val jobConfig = job.getConfiguration
412    jobConfig.set("mapred.output.dir", tempDir.getPath + "/outputDataset_new")
413    randomRDD.saveAsNewAPIHadoopDataset(jobConfig)
414    assert(new File(tempDir.getPath + "/outputDataset_new/part-r-00000").exists() === true)
415  }
416
417  test("Get input files via old Hadoop API") {
418    sc = new SparkContext("local", "test")
419    val outDir = new File(tempDir, "output").getAbsolutePath
420    sc.makeRDD(1 to 4, 2).saveAsTextFile(outDir)
421
422    val inputPaths =
423      sc.hadoopFile(outDir, classOf[TextInputFormat], classOf[LongWritable], classOf[Text])
424        .asInstanceOf[HadoopRDD[_, _]]
425        .mapPartitionsWithInputSplit { (split, part) =>
426          Iterator(split.asInstanceOf[FileSplit].getPath.toUri.getPath)
427        }.collect()
428    assert(inputPaths.toSet === Set(s"$outDir/part-00000", s"$outDir/part-00001"))
429  }
430
431  test("Get input files via new Hadoop API") {
432    sc = new SparkContext("local", "test")
433    val outDir = new File(tempDir, "output").getAbsolutePath
434    sc.makeRDD(1 to 4, 2).saveAsTextFile(outDir)
435
436    val inputPaths =
437      sc.newAPIHadoopFile(outDir, classOf[NewTextInputFormat], classOf[LongWritable], classOf[Text])
438        .asInstanceOf[NewHadoopRDD[_, _]]
439        .mapPartitionsWithInputSplit { (split, part) =>
440          Iterator(split.asInstanceOf[NewFileSplit].getPath.toUri.getPath)
441        }.collect()
442    assert(inputPaths.toSet === Set(s"$outDir/part-00000", s"$outDir/part-00001"))
443  }
444
445  test("spark.files.ignoreCorruptFiles should work both HadoopRDD and NewHadoopRDD") {
446    val inputFile = File.createTempFile("input-", ".gz")
447    try {
448      // Create a corrupt gzip file
449      val byteOutput = new ByteArrayOutputStream()
450      val gzip = new GZIPOutputStream(byteOutput)
451      try {
452        gzip.write(Array[Byte](1, 2, 3, 4))
453      } finally {
454        gzip.close()
455      }
456      val bytes = byteOutput.toByteArray
457      val o = new FileOutputStream(inputFile)
458      try {
459        // It's corrupt since we only write half of bytes into the file.
460        o.write(bytes.take(bytes.length / 2))
461      } finally {
462        o.close()
463      }
464
465      // Reading a corrupt gzip file should throw EOFException
466      sc = new SparkContext("local", "test")
467      // Test HadoopRDD
468      var e = intercept[SparkException] {
469        sc.textFile(inputFile.toURI.toString).collect()
470      }
471      assert(e.getCause.isInstanceOf[EOFException])
472      assert(e.getCause.getMessage === "Unexpected end of input stream")
473      // Test NewHadoopRDD
474      e = intercept[SparkException] {
475        sc.newAPIHadoopFile(
476          inputFile.toURI.toString,
477          classOf[NewTextInputFormat],
478          classOf[LongWritable],
479          classOf[Text]).collect()
480      }
481      assert(e.getCause.isInstanceOf[EOFException])
482      assert(e.getCause.getMessage === "Unexpected end of input stream")
483      sc.stop()
484
485      val conf = new SparkConf().set(IGNORE_CORRUPT_FILES, true)
486      sc = new SparkContext("local", "test", conf)
487      // Test HadoopRDD
488      assert(sc.textFile(inputFile.toURI.toString).collect().isEmpty)
489      // Test NewHadoopRDD
490      assert {
491        sc.newAPIHadoopFile(
492          inputFile.toURI.toString,
493          classOf[NewTextInputFormat],
494          classOf[LongWritable],
495          classOf[Text]).collect().isEmpty
496      }
497    } finally {
498      inputFile.delete()
499    }
500  }
501
502}
503