1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.contrib.failmon; 20 21 import java.io.BufferedOutputStream; 22 import java.io.InputStream; 23 import java.io.BufferedReader; 24 import java.io.BufferedWriter; 25 import java.io.File; 26 import java.io.FileOutputStream; 27 import java.io.FileReader; 28 import java.io.FileWriter; 29 import java.io.IOException; 30 import java.io.FileNotFoundException; 31 import java.net.InetAddress; 32 import java.util.ArrayList; 33 import java.util.Calendar; 34 import java.util.zip.CRC32; 35 import java.util.zip.CheckedOutputStream; 36 import java.util.zip.ZipEntry; 37 import java.util.zip.ZipInputStream; 38 39 import org.apache.hadoop.conf.Configuration; 40 import org.apache.hadoop.fs.FileSystem; 41 import org.apache.hadoop.fs.FileStatus; 42 import org.apache.hadoop.fs.Path; 43 import org.apache.hadoop.fs.FSDataOutputStream; 44 import org.apache.hadoop.fs.FSDataInputStream; 45 46 public class HDFSMerger { 47 48 Configuration hadoopConf; 49 FileSystem hdfs; 50 51 String hdfsDir; 52 53 FileStatus [] inputFiles; 54 55 Path outputFilePath; 56 FSDataOutputStream outputFile; 57 58 boolean compress; 59 60 FileWriter fw; 61 62 BufferedWriter writer; 63 HDFSMerger()64 public HDFSMerger() throws IOException { 65 66 String hadoopConfPath; 67 68 if (Environment.getProperty("hadoop.conf.path") == null) 69 hadoopConfPath = "../../../conf"; 70 else 71 hadoopConfPath = Environment.getProperty("hadoop.conf.path"); 72 73 // Read the configuration for the Hadoop environment 74 Configuration hadoopConf = new Configuration(); 75 hadoopConf.addResource(new Path(hadoopConfPath + "/hadoop-default.xml")); 76 hadoopConf.addResource(new Path(hadoopConfPath + "/hadoop-site.xml")); 77 78 // determine the local output file name 79 if (Environment.getProperty("local.tmp.filename") == null) 80 Environment.setProperty("local.tmp.filename", "failmon.dat"); 81 82 // determine the upload location 83 hdfsDir = Environment.getProperty("hdfs.upload.dir"); 84 if (hdfsDir == null) 85 hdfsDir = "/failmon"; 86 87 hdfs = FileSystem.get(hadoopConf); 88 89 Path hdfsDirPath = new Path(hadoopConf.get("fs.default.name") + hdfsDir); 90 91 try { 92 if (!hdfs.getFileStatus(hdfsDirPath).isDir()) { 93 Environment.logInfo("HDFSMerger: Not an HDFS directory: " + hdfsDirPath.toString()); 94 System.exit(0); 95 } 96 } catch (FileNotFoundException e) { 97 Environment.logInfo("HDFSMerger: Directory not found: " + hdfsDirPath.toString()); 98 } 99 100 inputFiles = hdfs.listStatus(hdfsDirPath); 101 102 outputFilePath = new Path(hdfsDirPath.toString() + "/" + "merge-" 103 + Calendar.getInstance().getTimeInMillis() + ".dat"); 104 outputFile = hdfs.create(outputFilePath); 105 106 for (FileStatus fstatus : inputFiles) { 107 appendFile(fstatus.getPath()); 108 hdfs.delete(fstatus.getPath()); 109 } 110 111 outputFile.close(); 112 113 Environment.logInfo("HDFS file merging complete!"); 114 } 115 appendFile(Path inputPath)116 private void appendFile (Path inputPath) throws IOException { 117 118 FSDataInputStream anyInputFile = hdfs.open(inputPath); 119 InputStream inputFile; 120 byte buffer[] = new byte[4096]; 121 122 if (inputPath.toString().endsWith(LocalStore.COMPRESSION_SUFFIX)) { 123 // the file is compressed 124 inputFile = new ZipInputStream(anyInputFile); 125 ((ZipInputStream) inputFile).getNextEntry(); 126 } else { 127 inputFile = anyInputFile; 128 } 129 130 try { 131 int bytesRead = 0; 132 while ((bytesRead = inputFile.read(buffer)) > 0) { 133 outputFile.write(buffer, 0, bytesRead); 134 } 135 } catch (IOException e) { 136 Environment.logInfo("Error while copying file:" + inputPath.toString()); 137 } finally { 138 inputFile.close(); 139 } 140 } 141 142 main(String [] args)143 public static void main(String [] args) { 144 145 Environment.prepare("./conf/failmon.properties"); 146 147 try { 148 new HDFSMerger(); 149 } catch (IOException e) { 150 e.printStackTrace(); 151 } 152 153 } 154 } 155