1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.mapreduce.lib.input;
20 
21 import java.io.*;
22 import java.lang.reflect.*;
23 
24 import org.apache.hadoop.mapreduce.*;
25 import org.apache.hadoop.classification.InterfaceAudience;
26 import org.apache.hadoop.classification.InterfaceStability;
27 import org.apache.hadoop.conf.Configuration;
28 
29 /**
30  * A generic RecordReader that can hand out different recordReaders
31  * for each chunk in a {@link CombineFileSplit}.
32  * A CombineFileSplit can combine data chunks from multiple files.
33  * This class allows using different RecordReaders for processing
34  * these data chunks from different files.
35  * @see CombineFileSplit
36  */
37 @InterfaceAudience.Public
38 @InterfaceStability.Stable
39 public class CombineFileRecordReader<K, V> extends RecordReader<K, V> {
40 
41   static final Class [] constructorSignature = new Class []
42                                          {CombineFileSplit.class,
43                                           TaskAttemptContext.class,
44                                           Integer.class};
45 
46   protected CombineFileSplit split;
47   protected Constructor<? extends RecordReader<K,V>> rrConstructor;
48   protected TaskAttemptContext context;
49 
50   protected int idx;
51   protected long progress;
52   protected RecordReader<K, V> curReader;
53 
initialize(InputSplit split, TaskAttemptContext context)54   public void initialize(InputSplit split,
55       TaskAttemptContext context) throws IOException, InterruptedException {
56     this.split = (CombineFileSplit)split;
57     this.context = context;
58     if (null != this.curReader) {
59       this.curReader.initialize(split, context);
60     }
61   }
62 
nextKeyValue()63   public boolean nextKeyValue() throws IOException, InterruptedException {
64 
65     while ((curReader == null) || !curReader.nextKeyValue()) {
66       if (!initNextRecordReader()) {
67         return false;
68       }
69     }
70     return true;
71   }
72 
getCurrentKey()73   public K getCurrentKey() throws IOException, InterruptedException {
74     return curReader.getCurrentKey();
75   }
76 
getCurrentValue()77   public V getCurrentValue() throws IOException, InterruptedException {
78     return curReader.getCurrentValue();
79   }
80 
close()81   public void close() throws IOException {
82     if (curReader != null) {
83       curReader.close();
84       curReader = null;
85     }
86   }
87 
88   /**
89    * return progress based on the amount of data processed so far.
90    */
getProgress()91   public float getProgress() throws IOException, InterruptedException {
92     long subprogress = 0;    // bytes processed in current split
93     if (null != curReader) {
94       // idx is always one past the current subsplit's true index.
95       subprogress = (long)(curReader.getProgress() * split.getLength(idx - 1));
96     }
97     return Math.min(1.0f,  (progress + subprogress)/(float)(split.getLength()));
98   }
99 
100   /**
101    * A generic RecordReader that can hand out different recordReaders
102    * for each chunk in the CombineFileSplit.
103    */
CombineFileRecordReader(CombineFileSplit split, TaskAttemptContext context, Class<? extends RecordReader<K,V>> rrClass)104   public CombineFileRecordReader(CombineFileSplit split,
105                                  TaskAttemptContext context,
106                                  Class<? extends RecordReader<K,V>> rrClass)
107     throws IOException {
108     this.split = split;
109     this.context = context;
110     this.idx = 0;
111     this.curReader = null;
112     this.progress = 0;
113 
114     try {
115       rrConstructor = rrClass.getDeclaredConstructor(constructorSignature);
116       rrConstructor.setAccessible(true);
117     } catch (Exception e) {
118       throw new RuntimeException(rrClass.getName() +
119                                  " does not have valid constructor", e);
120     }
121     initNextRecordReader();
122   }
123 
124   /**
125    * Get the record reader for the next chunk in this CombineFileSplit.
126    */
initNextRecordReader()127   protected boolean initNextRecordReader() throws IOException {
128 
129     if (curReader != null) {
130       curReader.close();
131       curReader = null;
132       if (idx > 0) {
133         progress += split.getLength(idx-1);    // done processing so far
134       }
135     }
136 
137     // if all chunks have been processed, nothing more to do.
138     if (idx == split.getNumPaths()) {
139       return false;
140     }
141 
142     context.progress();
143 
144     // get a record reader for the idx-th chunk
145     try {
146       Configuration conf = context.getConfiguration();
147       // setup some helper config variables.
148       conf.set(MRJobConfig.MAP_INPUT_FILE, split.getPath(idx).toString());
149       conf.setLong(MRJobConfig.MAP_INPUT_START, split.getOffset(idx));
150       conf.setLong(MRJobConfig.MAP_INPUT_PATH, split.getLength(idx));
151 
152       curReader =  rrConstructor.newInstance(new Object []
153                             {split, context, Integer.valueOf(idx)});
154 
155       if (idx > 0) {
156         // initialize() for the first RecordReader will be called by MapTask;
157         // we're responsible for initializing subsequent RecordReaders.
158         curReader.initialize(split, context);
159       }
160     } catch (Exception e) {
161       throw new RuntimeException (e);
162     }
163     idx++;
164     return true;
165   }
166 }
167