1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.streaming; 20 21 import java.io.IOException; 22 import java.io.UnsupportedEncodingException; 23 import java.util.Iterator; 24 import java.net.URLDecoder; 25 26 import org.apache.hadoop.mapred.JobConf; 27 import org.apache.hadoop.mapred.Reducer; 28 import org.apache.hadoop.mapred.Reporter; 29 import org.apache.hadoop.mapred.OutputCollector; 30 import org.apache.hadoop.mapred.SkipBadRecords; 31 import org.apache.hadoop.streaming.io.InputWriter; 32 import org.apache.hadoop.streaming.io.OutputReader; 33 import org.apache.hadoop.util.StringUtils; 34 35 import org.apache.hadoop.io.Writable; 36 37 /** A generic Reducer bridge. 38 * It delegates operations to an external program via stdin and stdout. 39 */ 40 public class PipeReducer extends PipeMapRed implements Reducer { 41 42 private byte[] reduceOutFieldSeparator; 43 private byte[] reduceInputFieldSeparator; 44 private int numOfReduceOutputKeyFields = 1; 45 private boolean skipping = false; 46 getPipeCommand(JobConf job)47 String getPipeCommand(JobConf job) { 48 String str = job.get("stream.reduce.streamprocessor"); 49 if (str == null) { 50 return str; 51 } 52 try { 53 return URLDecoder.decode(str, "UTF-8"); 54 } catch (UnsupportedEncodingException e) { 55 System.err.println("stream.reduce.streamprocessor in jobconf not found"); 56 return null; 57 } 58 } 59 getDoPipe()60 boolean getDoPipe() { 61 String argv = getPipeCommand(job_); 62 // Currently: null is identity reduce. REDUCE_NONE is no-map-outputs. 63 return (argv != null) && !StreamJob.REDUCE_NONE.equals(argv); 64 } 65 configure(JobConf job)66 public void configure(JobConf job) { 67 super.configure(job); 68 //disable the auto increment of the counter. For streaming, no of 69 //processed records could be different(equal or less) than the no of 70 //records input. 71 SkipBadRecords.setAutoIncrReducerProcCount(job, false); 72 skipping = job.getBoolean("mapred.skip.on", false); 73 74 try { 75 reduceOutFieldSeparator = job_.get("stream.reduce.output.field.separator", "\t").getBytes("UTF-8"); 76 reduceInputFieldSeparator = job_.get("stream.reduce.input.field.separator", "\t").getBytes("UTF-8"); 77 this.numOfReduceOutputKeyFields = job_.getInt("stream.num.reduce.output.key.fields", 1); 78 } catch (UnsupportedEncodingException e) { 79 throw new RuntimeException("The current system does not support UTF-8 encoding!", e); 80 } 81 } 82 reduce(Object key, Iterator values, OutputCollector output, Reporter reporter)83 public void reduce(Object key, Iterator values, OutputCollector output, 84 Reporter reporter) throws IOException { 85 86 // init 87 if (doPipe_ && outThread_ == null) { 88 startOutputThreads(output, reporter); 89 } 90 try { 91 while (values.hasNext()) { 92 Writable val = (Writable) values.next(); 93 numRecRead_++; 94 maybeLogRecord(); 95 if (doPipe_) { 96 if (outerrThreadsThrowable != null) { 97 mapRedFinished(); 98 throw new IOException ("MROutput/MRErrThread failed:" 99 + StringUtils.stringifyException( 100 outerrThreadsThrowable)); 101 } 102 inWriter_.writeKey(key); 103 inWriter_.writeValue(val); 104 } else { 105 // "identity reduce" 106 output.collect(key, val); 107 } 108 } 109 if(doPipe_ && skipping) { 110 //flush the streams on every record input if running in skip mode 111 //so that we don't buffer other records surrounding a bad record. 112 clientOut_.flush(); 113 } 114 } catch (IOException io) { 115 // a common reason to get here is failure of the subprocess. 116 // Document that fact, if possible. 117 String extraInfo = ""; 118 try { 119 int exitVal = sim.exitValue(); 120 if (exitVal == 0) { 121 extraInfo = "subprocess exited successfully\n"; 122 } else { 123 extraInfo = "subprocess exited with error code " + exitVal + "\n"; 124 }; 125 } catch (IllegalThreadStateException e) { 126 // hmm, but child is still running. go figure. 127 extraInfo = "subprocess still running\n"; 128 }; 129 appendLogToJobLog("failure"); 130 mapRedFinished(); 131 throw new IOException(extraInfo + getContext() + io.getMessage()); 132 } 133 } 134 close()135 public void close() { 136 appendLogToJobLog("success"); 137 mapRedFinished(); 138 } 139 140 @Override getInputSeparator()141 public byte[] getInputSeparator() { 142 return reduceInputFieldSeparator; 143 } 144 145 @Override getFieldSeparator()146 public byte[] getFieldSeparator() { 147 return reduceOutFieldSeparator; 148 } 149 150 @Override getNumOfKeyFields()151 public int getNumOfKeyFields() { 152 return numOfReduceOutputKeyFields; 153 } 154 155 @Override createInputWriter()156 InputWriter createInputWriter() throws IOException { 157 return super.createInputWriter(reduceInputWriterClass_); 158 } 159 160 @Override createOutputReader()161 OutputReader createOutputReader() throws IOException { 162 return super.createOutputReader(reduceOutputReaderClass_); 163 } 164 165 } 166