1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.contrib.failmon;
20 
21 import java.io.BufferedOutputStream;
22 import java.io.InputStream;
23 import java.io.BufferedReader;
24 import java.io.BufferedWriter;
25 import java.io.File;
26 import java.io.FileOutputStream;
27 import java.io.FileReader;
28 import java.io.FileWriter;
29 import java.io.IOException;
30 import java.io.FileNotFoundException;
31 import java.net.InetAddress;
32 import java.util.ArrayList;
33 import java.util.Calendar;
34 import java.util.zip.CRC32;
35 import java.util.zip.CheckedOutputStream;
36 import java.util.zip.ZipEntry;
37 import java.util.zip.ZipInputStream;
38 
39 import org.apache.hadoop.conf.Configuration;
40 import org.apache.hadoop.fs.FileSystem;
41 import org.apache.hadoop.fs.FileStatus;
42 import org.apache.hadoop.fs.Path;
43 import org.apache.hadoop.fs.FSDataOutputStream;
44 import org.apache.hadoop.fs.FSDataInputStream;
45 
46 public class HDFSMerger {
47 
48   Configuration hadoopConf;
49   FileSystem hdfs;
50 
51   String hdfsDir;
52 
53   FileStatus [] inputFiles;
54 
55   Path outputFilePath;
56   FSDataOutputStream outputFile;
57 
58   boolean compress;
59 
60   FileWriter fw;
61 
62   BufferedWriter writer;
63 
HDFSMerger()64   public HDFSMerger() throws IOException {
65 
66     String hadoopConfPath;
67 
68     if (Environment.getProperty("hadoop.conf.path") == null)
69       hadoopConfPath = "../../../conf";
70     else
71       hadoopConfPath = Environment.getProperty("hadoop.conf.path");
72 
73     // Read the configuration for the Hadoop environment
74     Configuration hadoopConf = new Configuration();
75     hadoopConf.addResource(new Path(hadoopConfPath + "/hadoop-default.xml"));
76     hadoopConf.addResource(new Path(hadoopConfPath + "/hadoop-site.xml"));
77 
78     // determine the local output file name
79     if (Environment.getProperty("local.tmp.filename") == null)
80       Environment.setProperty("local.tmp.filename", "failmon.dat");
81 
82     // determine the upload location
83     hdfsDir = Environment.getProperty("hdfs.upload.dir");
84     if (hdfsDir == null)
85       hdfsDir = "/failmon";
86 
87     hdfs = FileSystem.get(hadoopConf);
88 
89     Path hdfsDirPath = new Path(hadoopConf.get("fs.default.name") + hdfsDir);
90 
91     try {
92       if (!hdfs.getFileStatus(hdfsDirPath).isDir()) {
93 	Environment.logInfo("HDFSMerger: Not an HDFS directory: " + hdfsDirPath.toString());
94 	System.exit(0);
95       }
96     } catch (FileNotFoundException e) {
97       Environment.logInfo("HDFSMerger: Directory not found: " + hdfsDirPath.toString());
98     }
99 
100     inputFiles = hdfs.listStatus(hdfsDirPath);
101 
102     outputFilePath = new Path(hdfsDirPath.toString() + "/" + "merge-"
103 			  + Calendar.getInstance().getTimeInMillis() + ".dat");
104     outputFile = hdfs.create(outputFilePath);
105 
106     for (FileStatus fstatus : inputFiles) {
107       appendFile(fstatus.getPath());
108       hdfs.delete(fstatus.getPath());
109     }
110 
111     outputFile.close();
112 
113     Environment.logInfo("HDFS file merging complete!");
114   }
115 
appendFile(Path inputPath)116   private void appendFile (Path inputPath) throws IOException {
117 
118     FSDataInputStream anyInputFile = hdfs.open(inputPath);
119     InputStream inputFile;
120     byte buffer[] = new byte[4096];
121 
122     if (inputPath.toString().endsWith(LocalStore.COMPRESSION_SUFFIX)) {
123       // the file is compressed
124       inputFile = new ZipInputStream(anyInputFile);
125       ((ZipInputStream) inputFile).getNextEntry();
126     } else {
127       inputFile = anyInputFile;
128     }
129 
130     try {
131       int bytesRead = 0;
132       while ((bytesRead = inputFile.read(buffer)) > 0) {
133 	outputFile.write(buffer, 0, bytesRead);
134       }
135     } catch (IOException e) {
136       Environment.logInfo("Error while copying file:" + inputPath.toString());
137     } finally {
138       inputFile.close();
139     }
140   }
141 
142 
main(String [] args)143   public static void main(String [] args) {
144 
145     Environment.prepare("./conf/failmon.properties");
146 
147     try {
148       new HDFSMerger();
149     } catch (IOException e) {
150       e.printStackTrace();
151       }
152 
153   }
154 }
155