1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.rdd 19 20import org.apache.hadoop.conf.{Configurable, Configuration} 21import org.apache.hadoop.io.{Text, Writable} 22import org.apache.hadoop.mapreduce.InputSplit 23import org.apache.hadoop.mapreduce.task.JobContextImpl 24 25import org.apache.spark.{Partition, SparkContext} 26import org.apache.spark.input.WholeTextFileInputFormat 27 28/** 29 * An RDD that reads a bunch of text files in, and each text file becomes one record. 30 */ 31private[spark] class WholeTextFileRDD( 32 sc : SparkContext, 33 inputFormatClass: Class[_ <: WholeTextFileInputFormat], 34 keyClass: Class[Text], 35 valueClass: Class[Text], 36 conf: Configuration, 37 minPartitions: Int) 38 extends NewHadoopRDD[Text, Text](sc, inputFormatClass, keyClass, valueClass, conf) { 39 40 override def getPartitions: Array[Partition] = { 41 val inputFormat = inputFormatClass.newInstance 42 val conf = getConf 43 inputFormat match { 44 case configurable: Configurable => 45 configurable.setConf(conf) 46 case _ => 47 } 48 val jobContext = new JobContextImpl(conf, jobId) 49 inputFormat.setMinPartitions(jobContext, minPartitions) 50 val rawSplits = inputFormat.getSplits(jobContext).toArray 51 val result = new Array[Partition](rawSplits.size) 52 for (i <- 0 until rawSplits.size) { 53 result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable]) 54 } 55 result 56 } 57} 58