1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.fs.slive;
20 
21 import java.io.BufferedReader;
22 import java.io.DataInputStream;
23 import java.io.File;
24 import java.io.FileOutputStream;
25 import java.io.IOException;
26 import java.io.InputStreamReader;
27 import java.io.PrintWriter;
28 import java.util.ArrayList;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.TreeMap;
32 
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.fs.FileStatus;
37 import org.apache.hadoop.fs.FileSystem;
38 import org.apache.hadoop.fs.Path;
39 import org.apache.hadoop.fs.slive.ArgumentParser.ParsedOutput;
40 import org.apache.hadoop.io.Text;
41 import org.apache.hadoop.mapred.FileOutputFormat;
42 import org.apache.hadoop.mapred.JobClient;
43 import org.apache.hadoop.mapred.JobConf;
44 import org.apache.hadoop.mapred.TextOutputFormat;
45 import org.apache.hadoop.util.StringUtils;
46 import org.apache.hadoop.util.Tool;
47 import org.apache.hadoop.util.ToolRunner;
48 
49 /**
50  * Slive test entry point + main program
51  *
52  * This program will output a help message given -help which can be used to
53  * determine the program options and configuration which will affect the program
54  * runtime. The program will take these options, either from configuration or
55  * command line and process them (and merge) and then establish a job which will
56  * thereafter run a set of mappers & reducers and then the output of the
57  * reduction will be reported on.
58  *
59  * The number of maps is specified by "slive.maps".
60  * The number of reduces is specified by "slive.reduces".
61  */
62 public class SliveTest implements Tool {
63 
64   private static final Log LOG = LogFactory.getLog(SliveTest.class);
65 
66   // ensures the hdfs configurations are loaded if they exist
67   static {
68     Configuration.addDefaultResource("hdfs-default.xml");
69     Configuration.addDefaultResource("hdfs-site.xml");
70   }
71 
72   private Configuration base;
73 
SliveTest(Configuration base)74   public SliveTest(Configuration base) {
75     this.base = base;
76   }
77 
run(String[] args)78   public int run(String[] args) {
79     ParsedOutput parsedOpts = null;
80     try {
81       ArgumentParser argHolder = new ArgumentParser(args);
82       parsedOpts = argHolder.parse();
83       if (parsedOpts.shouldOutputHelp()) {
84         parsedOpts.outputHelp();
85         return 1;
86       }
87     } catch (Exception e) {
88       LOG.error("Unable to parse arguments due to error: ", e);
89       return 1;
90     }
91     LOG.info("Running with option list " + Helper.stringifyArray(args, " "));
92     ConfigExtractor config = null;
93     try {
94       ConfigMerger cfgMerger = new ConfigMerger();
95       Configuration cfg = cfgMerger.getMerged(parsedOpts,
96                                               new Configuration(base));
97       if (cfg != null) {
98         config = new ConfigExtractor(cfg);
99       }
100     } catch (Exception e) {
101       LOG.error("Unable to merge config due to error: ", e);
102       return 1;
103     }
104     if (config == null) {
105       LOG.error("Unable to merge config & options!");
106       return 1;
107     }
108     try {
109       LOG.info("Options are:");
110       ConfigExtractor.dumpOptions(config);
111     } catch (Exception e) {
112       LOG.error("Unable to dump options due to error: ", e);
113       return 1;
114     }
115     boolean jobOk = false;
116     try {
117       LOG.info("Running job:");
118       runJob(config);
119       jobOk = true;
120     } catch (Exception e) {
121       LOG.error("Unable to run job due to error: ", e);
122     }
123     if (jobOk) {
124       try {
125         LOG.info("Reporting on job:");
126         writeReport(config);
127       } catch (Exception e) {
128         LOG.error("Unable to report on job due to error: ", e);
129       }
130     }
131     // attempt cleanup (not critical)
132     boolean cleanUp = getBool(parsedOpts
133         .getValue(ConfigOption.CLEANUP.getOpt()));
134     if (cleanUp) {
135       try {
136         LOG.info("Cleaning up job:");
137         cleanup(config);
138       } catch (Exception e) {
139         LOG.error("Unable to cleanup job due to error: ", e);
140       }
141     }
142     // all mostly worked
143     if (jobOk) {
144       return 0;
145     }
146     // maybe didn't work
147     return 1;
148   }
149 
150   /**
151    * Checks if a string is a boolean or not and what type
152    *
153    * @param val
154    *          val to check
155    * @return boolean
156    */
getBool(String val)157   private boolean getBool(String val) {
158     if (val == null) {
159       return false;
160     }
161     String cleanupOpt = StringUtils.toLowerCase(val).trim();
162     if (cleanupOpt.equals("true") || cleanupOpt.equals("1")) {
163       return true;
164     } else {
165       return false;
166     }
167   }
168 
169   /**
170    * Sets up a job conf for the given job using the given config object. Ensures
171    * that the correct input format is set, the mapper and and reducer class and
172    * the input and output keys and value classes along with any other job
173    * configuration.
174    *
175    * @param config
176    * @return JobConf representing the job to be ran
177    * @throws IOException
178    */
getJob(ConfigExtractor config)179   private JobConf getJob(ConfigExtractor config) throws IOException {
180     JobConf job = new JobConf(config.getConfig(), SliveTest.class);
181     job.setInputFormat(DummyInputFormat.class);
182     FileOutputFormat.setOutputPath(job, config.getOutputPath());
183     job.setMapperClass(SliveMapper.class);
184     job.setPartitionerClass(SlivePartitioner.class);
185     job.setReducerClass(SliveReducer.class);
186     job.setOutputKeyClass(Text.class);
187     job.setOutputValueClass(Text.class);
188     job.setOutputFormat(TextOutputFormat.class);
189     TextOutputFormat.setCompressOutput(job, false);
190     job.setNumReduceTasks(config.getReducerAmount());
191     job.setNumMapTasks(config.getMapAmount());
192     return job;
193   }
194 
195   /**
196    * Runs the job given the provided config
197    *
198    * @param config
199    *          the config to run the job with
200    *
201    * @throws IOException
202    *           if can not run the given job
203    */
runJob(ConfigExtractor config)204   private void runJob(ConfigExtractor config) throws IOException {
205     JobClient.runJob(getJob(config));
206   }
207 
208   /**
209    * Attempts to write the report to the given output using the specified
210    * config. It will open up the expected reducer output file and read in its
211    * contents and then split up by operation output and sort by operation type
212    * and then for each operation type it will generate a report to the specified
213    * result file and the console.
214    *
215    * @param cfg
216    *          the config specifying the files and output
217    *
218    * @throws Exception
219    *           if files can not be opened/closed/read or invalid format
220    */
writeReport(ConfigExtractor cfg)221   private void writeReport(ConfigExtractor cfg) throws Exception {
222     Path dn = cfg.getOutputPath();
223     LOG.info("Writing report using contents of " + dn);
224     FileSystem fs = dn.getFileSystem(cfg.getConfig());
225     FileStatus[] reduceFiles = fs.listStatus(dn);
226     BufferedReader fileReader = null;
227     PrintWriter reportWriter = null;
228     try {
229       List<OperationOutput> noOperations = new ArrayList<OperationOutput>();
230       Map<String, List<OperationOutput>> splitTypes = new TreeMap<String, List<OperationOutput>>();
231       for(FileStatus fn : reduceFiles) {
232         if(!fn.getPath().getName().startsWith("part")) continue;
233         fileReader = new BufferedReader(new InputStreamReader(
234             new DataInputStream(fs.open(fn.getPath()))));
235         String line;
236         while ((line = fileReader.readLine()) != null) {
237           String pieces[] = line.split("\t", 2);
238           if (pieces.length == 2) {
239             OperationOutput data = new OperationOutput(pieces[0], pieces[1]);
240             String op = (data.getOperationType());
241             if (op != null) {
242               List<OperationOutput> opList = splitTypes.get(op);
243               if (opList == null) {
244                 opList = new ArrayList<OperationOutput>();
245               }
246               opList.add(data);
247               splitTypes.put(op, opList);
248             } else {
249               noOperations.add(data);
250             }
251           } else {
252             throw new IOException("Unparseable line " + line);
253           }
254         }
255         fileReader.close();
256         fileReader = null;
257       }
258       File resFile = null;
259       if (cfg.getResultFile() != null) {
260         resFile = new File(cfg.getResultFile());
261       }
262       if (resFile != null) {
263         LOG.info("Report results being placed to logging output and to file "
264             + resFile.getCanonicalPath());
265         reportWriter = new PrintWriter(new FileOutputStream(resFile));
266       } else {
267         LOG.info("Report results being placed to logging output");
268       }
269       ReportWriter reporter = new ReportWriter();
270       if (!noOperations.isEmpty()) {
271         reporter.basicReport(noOperations, reportWriter);
272       }
273       for (String opType : splitTypes.keySet()) {
274         reporter.opReport(opType, splitTypes.get(opType), reportWriter);
275       }
276     } finally {
277       if (fileReader != null) {
278         fileReader.close();
279       }
280       if (reportWriter != null) {
281         reportWriter.close();
282       }
283     }
284   }
285 
286   /**
287    * Cleans up the base directory by removing it
288    *
289    * @param cfg
290    *          ConfigExtractor which has location of base directory
291    *
292    * @throws IOException
293    */
cleanup(ConfigExtractor cfg)294   private void cleanup(ConfigExtractor cfg) throws IOException {
295     Path base = cfg.getBaseDirectory();
296     if (base != null) {
297       LOG.info("Attempting to recursively delete " + base);
298       FileSystem fs = base.getFileSystem(cfg.getConfig());
299       fs.delete(base, true);
300     }
301   }
302 
303   /**
304    * The main program entry point. Sets up and parses the command line options,
305    * then merges those options and then dumps those options and the runs the
306    * corresponding map/reduce job that those operations represent and then
307    * writes the report for the output of the run that occurred.
308    *
309    * @param args
310    *          command line options
311    */
main(String[] args)312   public static void main(String[] args) throws Exception {
313     Configuration startCfg = new Configuration(true);
314     SliveTest runner = new SliveTest(startCfg);
315     int ec = ToolRunner.run(runner, args);
316     System.exit(ec);
317   }
318 
319   @Override // Configurable
getConf()320   public Configuration getConf() {
321     return this.base;
322   }
323 
324   @Override // Configurable
setConf(Configuration conf)325   public void setConf(Configuration conf) {
326     this.base = conf;
327   }
328 }
329