1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.fs.slive; 20 21 import java.io.BufferedReader; 22 import java.io.DataInputStream; 23 import java.io.File; 24 import java.io.FileOutputStream; 25 import java.io.IOException; 26 import java.io.InputStreamReader; 27 import java.io.PrintWriter; 28 import java.util.ArrayList; 29 import java.util.List; 30 import java.util.Map; 31 import java.util.TreeMap; 32 33 import org.apache.commons.logging.Log; 34 import org.apache.commons.logging.LogFactory; 35 import org.apache.hadoop.conf.Configuration; 36 import org.apache.hadoop.fs.FileStatus; 37 import org.apache.hadoop.fs.FileSystem; 38 import org.apache.hadoop.fs.Path; 39 import org.apache.hadoop.fs.slive.ArgumentParser.ParsedOutput; 40 import org.apache.hadoop.io.Text; 41 import org.apache.hadoop.mapred.FileOutputFormat; 42 import org.apache.hadoop.mapred.JobClient; 43 import org.apache.hadoop.mapred.JobConf; 44 import org.apache.hadoop.mapred.TextOutputFormat; 45 import org.apache.hadoop.util.StringUtils; 46 import org.apache.hadoop.util.Tool; 47 import org.apache.hadoop.util.ToolRunner; 48 49 /** 50 * Slive test entry point + main program 51 * 52 * This program will output a help message given -help which can be used to 53 * determine the program options and configuration which will affect the program 54 * runtime. The program will take these options, either from configuration or 55 * command line and process them (and merge) and then establish a job which will 56 * thereafter run a set of mappers & reducers and then the output of the 57 * reduction will be reported on. 58 * 59 * The number of maps is specified by "slive.maps". 60 * The number of reduces is specified by "slive.reduces". 61 */ 62 public class SliveTest implements Tool { 63 64 private static final Log LOG = LogFactory.getLog(SliveTest.class); 65 66 // ensures the hdfs configurations are loaded if they exist 67 static { 68 Configuration.addDefaultResource("hdfs-default.xml"); 69 Configuration.addDefaultResource("hdfs-site.xml"); 70 } 71 72 private Configuration base; 73 SliveTest(Configuration base)74 public SliveTest(Configuration base) { 75 this.base = base; 76 } 77 run(String[] args)78 public int run(String[] args) { 79 ParsedOutput parsedOpts = null; 80 try { 81 ArgumentParser argHolder = new ArgumentParser(args); 82 parsedOpts = argHolder.parse(); 83 if (parsedOpts.shouldOutputHelp()) { 84 parsedOpts.outputHelp(); 85 return 1; 86 } 87 } catch (Exception e) { 88 LOG.error("Unable to parse arguments due to error: ", e); 89 return 1; 90 } 91 LOG.info("Running with option list " + Helper.stringifyArray(args, " ")); 92 ConfigExtractor config = null; 93 try { 94 ConfigMerger cfgMerger = new ConfigMerger(); 95 Configuration cfg = cfgMerger.getMerged(parsedOpts, 96 new Configuration(base)); 97 if (cfg != null) { 98 config = new ConfigExtractor(cfg); 99 } 100 } catch (Exception e) { 101 LOG.error("Unable to merge config due to error: ", e); 102 return 1; 103 } 104 if (config == null) { 105 LOG.error("Unable to merge config & options!"); 106 return 1; 107 } 108 try { 109 LOG.info("Options are:"); 110 ConfigExtractor.dumpOptions(config); 111 } catch (Exception e) { 112 LOG.error("Unable to dump options due to error: ", e); 113 return 1; 114 } 115 boolean jobOk = false; 116 try { 117 LOG.info("Running job:"); 118 runJob(config); 119 jobOk = true; 120 } catch (Exception e) { 121 LOG.error("Unable to run job due to error: ", e); 122 } 123 if (jobOk) { 124 try { 125 LOG.info("Reporting on job:"); 126 writeReport(config); 127 } catch (Exception e) { 128 LOG.error("Unable to report on job due to error: ", e); 129 } 130 } 131 // attempt cleanup (not critical) 132 boolean cleanUp = getBool(parsedOpts 133 .getValue(ConfigOption.CLEANUP.getOpt())); 134 if (cleanUp) { 135 try { 136 LOG.info("Cleaning up job:"); 137 cleanup(config); 138 } catch (Exception e) { 139 LOG.error("Unable to cleanup job due to error: ", e); 140 } 141 } 142 // all mostly worked 143 if (jobOk) { 144 return 0; 145 } 146 // maybe didn't work 147 return 1; 148 } 149 150 /** 151 * Checks if a string is a boolean or not and what type 152 * 153 * @param val 154 * val to check 155 * @return boolean 156 */ getBool(String val)157 private boolean getBool(String val) { 158 if (val == null) { 159 return false; 160 } 161 String cleanupOpt = StringUtils.toLowerCase(val).trim(); 162 if (cleanupOpt.equals("true") || cleanupOpt.equals("1")) { 163 return true; 164 } else { 165 return false; 166 } 167 } 168 169 /** 170 * Sets up a job conf for the given job using the given config object. Ensures 171 * that the correct input format is set, the mapper and and reducer class and 172 * the input and output keys and value classes along with any other job 173 * configuration. 174 * 175 * @param config 176 * @return JobConf representing the job to be ran 177 * @throws IOException 178 */ getJob(ConfigExtractor config)179 private JobConf getJob(ConfigExtractor config) throws IOException { 180 JobConf job = new JobConf(config.getConfig(), SliveTest.class); 181 job.setInputFormat(DummyInputFormat.class); 182 FileOutputFormat.setOutputPath(job, config.getOutputPath()); 183 job.setMapperClass(SliveMapper.class); 184 job.setPartitionerClass(SlivePartitioner.class); 185 job.setReducerClass(SliveReducer.class); 186 job.setOutputKeyClass(Text.class); 187 job.setOutputValueClass(Text.class); 188 job.setOutputFormat(TextOutputFormat.class); 189 TextOutputFormat.setCompressOutput(job, false); 190 job.setNumReduceTasks(config.getReducerAmount()); 191 job.setNumMapTasks(config.getMapAmount()); 192 return job; 193 } 194 195 /** 196 * Runs the job given the provided config 197 * 198 * @param config 199 * the config to run the job with 200 * 201 * @throws IOException 202 * if can not run the given job 203 */ runJob(ConfigExtractor config)204 private void runJob(ConfigExtractor config) throws IOException { 205 JobClient.runJob(getJob(config)); 206 } 207 208 /** 209 * Attempts to write the report to the given output using the specified 210 * config. It will open up the expected reducer output file and read in its 211 * contents and then split up by operation output and sort by operation type 212 * and then for each operation type it will generate a report to the specified 213 * result file and the console. 214 * 215 * @param cfg 216 * the config specifying the files and output 217 * 218 * @throws Exception 219 * if files can not be opened/closed/read or invalid format 220 */ writeReport(ConfigExtractor cfg)221 private void writeReport(ConfigExtractor cfg) throws Exception { 222 Path dn = cfg.getOutputPath(); 223 LOG.info("Writing report using contents of " + dn); 224 FileSystem fs = dn.getFileSystem(cfg.getConfig()); 225 FileStatus[] reduceFiles = fs.listStatus(dn); 226 BufferedReader fileReader = null; 227 PrintWriter reportWriter = null; 228 try { 229 List<OperationOutput> noOperations = new ArrayList<OperationOutput>(); 230 Map<String, List<OperationOutput>> splitTypes = new TreeMap<String, List<OperationOutput>>(); 231 for(FileStatus fn : reduceFiles) { 232 if(!fn.getPath().getName().startsWith("part")) continue; 233 fileReader = new BufferedReader(new InputStreamReader( 234 new DataInputStream(fs.open(fn.getPath())))); 235 String line; 236 while ((line = fileReader.readLine()) != null) { 237 String pieces[] = line.split("\t", 2); 238 if (pieces.length == 2) { 239 OperationOutput data = new OperationOutput(pieces[0], pieces[1]); 240 String op = (data.getOperationType()); 241 if (op != null) { 242 List<OperationOutput> opList = splitTypes.get(op); 243 if (opList == null) { 244 opList = new ArrayList<OperationOutput>(); 245 } 246 opList.add(data); 247 splitTypes.put(op, opList); 248 } else { 249 noOperations.add(data); 250 } 251 } else { 252 throw new IOException("Unparseable line " + line); 253 } 254 } 255 fileReader.close(); 256 fileReader = null; 257 } 258 File resFile = null; 259 if (cfg.getResultFile() != null) { 260 resFile = new File(cfg.getResultFile()); 261 } 262 if (resFile != null) { 263 LOG.info("Report results being placed to logging output and to file " 264 + resFile.getCanonicalPath()); 265 reportWriter = new PrintWriter(new FileOutputStream(resFile)); 266 } else { 267 LOG.info("Report results being placed to logging output"); 268 } 269 ReportWriter reporter = new ReportWriter(); 270 if (!noOperations.isEmpty()) { 271 reporter.basicReport(noOperations, reportWriter); 272 } 273 for (String opType : splitTypes.keySet()) { 274 reporter.opReport(opType, splitTypes.get(opType), reportWriter); 275 } 276 } finally { 277 if (fileReader != null) { 278 fileReader.close(); 279 } 280 if (reportWriter != null) { 281 reportWriter.close(); 282 } 283 } 284 } 285 286 /** 287 * Cleans up the base directory by removing it 288 * 289 * @param cfg 290 * ConfigExtractor which has location of base directory 291 * 292 * @throws IOException 293 */ cleanup(ConfigExtractor cfg)294 private void cleanup(ConfigExtractor cfg) throws IOException { 295 Path base = cfg.getBaseDirectory(); 296 if (base != null) { 297 LOG.info("Attempting to recursively delete " + base); 298 FileSystem fs = base.getFileSystem(cfg.getConfig()); 299 fs.delete(base, true); 300 } 301 } 302 303 /** 304 * The main program entry point. Sets up and parses the command line options, 305 * then merges those options and then dumps those options and the runs the 306 * corresponding map/reduce job that those operations represent and then 307 * writes the report for the output of the run that occurred. 308 * 309 * @param args 310 * command line options 311 */ main(String[] args)312 public static void main(String[] args) throws Exception { 313 Configuration startCfg = new Configuration(true); 314 SliveTest runner = new SliveTest(startCfg); 315 int ec = ToolRunner.run(runner, args); 316 System.exit(ec); 317 } 318 319 @Override // Configurable getConf()320 public Configuration getConf() { 321 return this.base; 322 } 323 324 @Override // Configurable setConf(Configuration conf)325 public void setConf(Configuration conf) { 326 this.base = conf; 327 } 328 } 329