1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.streaming;
20 
21 import java.io.IOException;
22 import java.io.UnsupportedEncodingException;
23 import java.util.Iterator;
24 import java.net.URLDecoder;
25 
26 import org.apache.hadoop.mapred.JobConf;
27 import org.apache.hadoop.mapred.Reducer;
28 import org.apache.hadoop.mapred.Reporter;
29 import org.apache.hadoop.mapred.OutputCollector;
30 import org.apache.hadoop.mapred.SkipBadRecords;
31 import org.apache.hadoop.streaming.io.InputWriter;
32 import org.apache.hadoop.streaming.io.OutputReader;
33 import org.apache.hadoop.util.StringUtils;
34 
35 import org.apache.hadoop.io.Writable;
36 
37 /** A generic Reducer bridge.
38  *  It delegates operations to an external program via stdin and stdout.
39  */
40 public class PipeReducer extends PipeMapRed implements Reducer {
41 
42   private byte[] reduceOutFieldSeparator;
43   private byte[] reduceInputFieldSeparator;
44   private int numOfReduceOutputKeyFields = 1;
45   private boolean skipping = false;
46 
getPipeCommand(JobConf job)47   String getPipeCommand(JobConf job) {
48     String str = job.get("stream.reduce.streamprocessor");
49     if (str == null) {
50       return str;
51     }
52     try {
53       return URLDecoder.decode(str, "UTF-8");
54     } catch (UnsupportedEncodingException e) {
55       System.err.println("stream.reduce.streamprocessor in jobconf not found");
56       return null;
57     }
58   }
59 
getDoPipe()60   boolean getDoPipe() {
61     String argv = getPipeCommand(job_);
62     // Currently: null is identity reduce. REDUCE_NONE is no-map-outputs.
63     return (argv != null) && !StreamJob.REDUCE_NONE.equals(argv);
64   }
65 
configure(JobConf job)66   public void configure(JobConf job) {
67     super.configure(job);
68     //disable the auto increment of the counter. For streaming, no of
69     //processed records could be different(equal or less) than the no of
70     //records input.
71     SkipBadRecords.setAutoIncrReducerProcCount(job, false);
72     skipping = job.getBoolean("mapred.skip.on", false);
73 
74     try {
75       reduceOutFieldSeparator = job_.get("stream.reduce.output.field.separator", "\t").getBytes("UTF-8");
76       reduceInputFieldSeparator = job_.get("stream.reduce.input.field.separator", "\t").getBytes("UTF-8");
77       this.numOfReduceOutputKeyFields = job_.getInt("stream.num.reduce.output.key.fields", 1);
78     } catch (UnsupportedEncodingException e) {
79       throw new RuntimeException("The current system does not support UTF-8 encoding!", e);
80     }
81   }
82 
reduce(Object key, Iterator values, OutputCollector output, Reporter reporter)83   public void reduce(Object key, Iterator values, OutputCollector output,
84                      Reporter reporter) throws IOException {
85 
86     // init
87     if (doPipe_ && outThread_ == null) {
88       startOutputThreads(output, reporter);
89     }
90     try {
91       while (values.hasNext()) {
92         Writable val = (Writable) values.next();
93         numRecRead_++;
94         maybeLogRecord();
95         if (doPipe_) {
96           if (outerrThreadsThrowable != null) {
97             mapRedFinished();
98             throw new IOException ("MROutput/MRErrThread failed:"
99                                    + StringUtils.stringifyException(
100                                                                     outerrThreadsThrowable));
101           }
102           inWriter_.writeKey(key);
103           inWriter_.writeValue(val);
104         } else {
105           // "identity reduce"
106           output.collect(key, val);
107         }
108       }
109       if(doPipe_ && skipping) {
110         //flush the streams on every record input if running in skip mode
111         //so that we don't buffer other records surrounding a bad record.
112         clientOut_.flush();
113       }
114     } catch (IOException io) {
115       // a common reason to get here is failure of the subprocess.
116       // Document that fact, if possible.
117       String extraInfo = "";
118       try {
119         int exitVal = sim.exitValue();
120 	if (exitVal == 0) {
121 	  extraInfo = "subprocess exited successfully\n";
122 	} else {
123 	  extraInfo = "subprocess exited with error code " + exitVal + "\n";
124 	};
125       } catch (IllegalThreadStateException e) {
126         // hmm, but child is still running.  go figure.
127 	extraInfo = "subprocess still running\n";
128       };
129       appendLogToJobLog("failure");
130       mapRedFinished();
131       throw new IOException(extraInfo + getContext() + io.getMessage());
132     }
133   }
134 
close()135   public void close() {
136     appendLogToJobLog("success");
137     mapRedFinished();
138   }
139 
140   @Override
getInputSeparator()141   public byte[] getInputSeparator() {
142     return reduceInputFieldSeparator;
143   }
144 
145   @Override
getFieldSeparator()146   public byte[] getFieldSeparator() {
147     return reduceOutFieldSeparator;
148   }
149 
150   @Override
getNumOfKeyFields()151   public int getNumOfKeyFields() {
152     return numOfReduceOutputKeyFields;
153   }
154 
155   @Override
createInputWriter()156   InputWriter createInputWriter() throws IOException {
157     return super.createInputWriter(reduceInputWriterClass_);
158   }
159 
160   @Override
createOutputReader()161   OutputReader createOutputReader() throws IOException {
162     return super.createOutputReader(reduceOutputReaderClass_);
163   }
164 
165 }
166