1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.rdd
19
20import org.apache.hadoop.conf.{Configurable, Configuration}
21import org.apache.hadoop.io.{Text, Writable}
22import org.apache.hadoop.mapreduce.InputSplit
23import org.apache.hadoop.mapreduce.task.JobContextImpl
24
25import org.apache.spark.{Partition, SparkContext}
26import org.apache.spark.input.WholeTextFileInputFormat
27
28/**
29 * An RDD that reads a bunch of text files in, and each text file becomes one record.
30 */
31private[spark] class WholeTextFileRDD(
32    sc : SparkContext,
33    inputFormatClass: Class[_ <: WholeTextFileInputFormat],
34    keyClass: Class[Text],
35    valueClass: Class[Text],
36    conf: Configuration,
37    minPartitions: Int)
38  extends NewHadoopRDD[Text, Text](sc, inputFormatClass, keyClass, valueClass, conf) {
39
40  override def getPartitions: Array[Partition] = {
41    val inputFormat = inputFormatClass.newInstance
42    val conf = getConf
43    inputFormat match {
44      case configurable: Configurable =>
45        configurable.setConf(conf)
46      case _ =>
47    }
48    val jobContext = new JobContextImpl(conf, jobId)
49    inputFormat.setMinPartitions(jobContext, minPartitions)
50    val rawSplits = inputFormat.getSplits(jobContext).toArray
51    val result = new Array[Partition](rawSplits.size)
52    for (i <- 0 until rawSplits.size) {
53      result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
54    }
55    result
56  }
57}
58