1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.contrib.failmon;
20 
21 import java.io.BufferedOutputStream;
22 import java.io.BufferedReader;
23 import java.io.BufferedWriter;
24 import java.io.File;
25 import java.io.FileOutputStream;
26 import java.io.FileReader;
27 import java.io.FileWriter;
28 import java.io.IOException;
29 import java.net.InetAddress;
30 import java.util.ArrayList;
31 import java.util.Calendar;
32 import java.util.zip.CRC32;
33 import java.util.zip.CheckedOutputStream;
34 import java.util.zip.ZipEntry;
35 import java.util.zip.ZipOutputStream;
36 
37 import org.apache.hadoop.conf.Configuration;
38 import org.apache.hadoop.fs.FileSystem;
39 import org.apache.hadoop.fs.Path;
40 
41 /**********************************************************
42  * This class takes care of the temporary local storage of
43  * gathered metrics before they get uploaded into HDFS. It writes
44  * Serialized Records as lines in a temporary file and then
45  * compresses and uploads it into HDFS.
46  *
47  **********************************************************/
48 
49 public class LocalStore {
50 
51   public final static char FIELD_SEPARATOR = '|';
52 
53   public final static char RECORD_SEPARATOR = '\n';
54 
55   public final static String COMPRESSION_SUFFIX = ".zip";
56 
57   public final static int UPLOAD_INTERVAL = 600;
58 
59   String filename;
60   String hdfsDir;
61 
62   boolean compress;
63 
64   FileWriter fw;
65 
66   BufferedWriter writer;
67 
68   /**
69    * Create an instance of the class and read the configuration
70    * file to determine some output parameters. Then, initiate the
71    * structured needed for the buffered I/O (so that smal appends
72    * can be handled efficiently).
73    *
74    */
75 
LocalStore()76   public LocalStore() {
77     // determine the local output file name
78     if (Environment.getProperty("local.tmp.filename") == null)
79       Environment.setProperty("local.tmp.filename", "failmon.dat");
80 
81     // local.tmp.dir has been set by the Executor
82     if (Environment.getProperty("local.tmp.dir") == null)
83       Environment.setProperty("local.tmp.dir", System.getProperty("java.io.tmpdir"));
84 
85     filename = Environment.getProperty("local.tmp.dir") + "/" +
86       Environment.getProperty("local.tmp.filename");
87 
88     // determine the upload location
89     hdfsDir = Environment.getProperty("hdfs.upload.dir");
90     if (hdfsDir == null)
91       hdfsDir = "/failmon";
92 
93     // determine if compression is enabled
94     compress = true;
95     if ("false".equalsIgnoreCase(Environment
96         .getProperty("local.tmp.compression")))
97       compress = false;
98 
99     try {
100       fw = new FileWriter(filename, true);
101       writer = new BufferedWriter(fw);
102     } catch (IOException e) {
103       e.printStackTrace();
104     }
105   }
106 
107   /**
108    * Insert an EventRecord to the local storage, after it
109    * gets serialized and anonymized.
110    *
111    * @param er the EventRecord to be inserted
112    */
113 
insert(EventRecord er)114   public void insert(EventRecord er) {
115     SerializedRecord sr = new SerializedRecord(er);
116     try {
117       Anonymizer.anonymize(sr);
118     } catch (Exception e) {
119       e.printStackTrace();
120     }
121     append(sr);
122   }
123 
124   /**
125    * Insert an array of EventRecords to the local storage, after they
126    * get serialized and anonymized.
127    *
128    * @param ers the array of EventRecords to be inserted
129    */
insert(EventRecord[] ers)130   public void insert(EventRecord[] ers) {
131     for (EventRecord er : ers)
132       insert(er);
133   }
134 
append(SerializedRecord sr)135   private void append(SerializedRecord sr) {
136     try {
137       writer.write(pack(sr).toString());
138       writer.write(RECORD_SEPARATOR);
139       // writer.flush();
140     } catch (IOException e) {
141       e.printStackTrace();
142     }
143   }
144 
145   /**
146    * Pack a SerializedRecord into an array of bytes
147    *
148    * @param sr the SerializedRecord to be packed
149    */
pack(SerializedRecord sr)150   public static StringBuffer pack(SerializedRecord sr) {
151     StringBuffer sb = new StringBuffer();
152 
153     ArrayList<String> keys = new ArrayList<String>(sr.fields.keySet());
154 
155     if (sr.isValid())
156       SerializedRecord.arrangeKeys(keys);
157 
158     for (int i = 0; i < keys.size(); i++) {
159       String value = sr.fields.get(keys.get(i));
160       sb.append(keys.get(i) + ":" + value);
161       sb.append(FIELD_SEPARATOR);
162     }
163     return sb;
164   }
165 
166   /**
167    * Upload the local file store into HDFS, after it
168    * compressing it. Then a new local file is created
169    * as a temporary record store.
170    *
171    */
upload()172   public void upload() {
173     try {
174       writer.flush();
175       if (compress)
176         zipCompress(filename);
177       String remoteName = "failmon-";
178       if ("true".equalsIgnoreCase(Environment.getProperty("anonymizer.hash.hostnames")))
179         remoteName += Anonymizer.getMD5Hash(InetAddress.getLocalHost().getCanonicalHostName()) + "-";
180       else
181         remoteName += InetAddress.getLocalHost().getCanonicalHostName() + "-";
182       remoteName += Calendar.getInstance().getTimeInMillis();//.toString();
183       if (compress)
184 	copyToHDFS(filename + COMPRESSION_SUFFIX, hdfsDir + "/" + remoteName + COMPRESSION_SUFFIX);
185       else
186 	copyToHDFS(filename, hdfsDir + "/" + remoteName);
187     } catch (IOException e) {
188       e.printStackTrace();
189     }
190 
191     // delete and re-open
192     try {
193       fw.close();
194       fw = new FileWriter(filename);
195       writer = new BufferedWriter(fw);
196     } catch (IOException e) {
197       e.printStackTrace();
198     }
199   }
200 
201   /**
202    * Compress a text file using the ZIP compressing algorithm.
203    *
204    * @param filename the path to the file to be compressed
205    */
zipCompress(String filename)206   public static void zipCompress(String filename) throws IOException {
207     FileOutputStream fos = new FileOutputStream(filename + COMPRESSION_SUFFIX);
208     CheckedOutputStream csum = new CheckedOutputStream(fos, new CRC32());
209     ZipOutputStream out = new ZipOutputStream(new BufferedOutputStream(csum));
210     out.setComment("Failmon records.");
211 
212     BufferedReader in = new BufferedReader(new FileReader(filename));
213     out.putNextEntry(new ZipEntry(new File(filename).getName()));
214     int c;
215     while ((c = in.read()) != -1)
216       out.write(c);
217     in.close();
218 
219     out.finish();
220     out.close();
221   }
222 
223   /**
224    * Copy a local file to HDFS
225    *
226    * @param localFile the filename of the local file
227    * @param hdfsFile the HDFS filename to copy to
228    */
copyToHDFS(String localFile, String hdfsFile)229   public static void copyToHDFS(String localFile, String hdfsFile) throws IOException {
230 
231     String hadoopConfPath;
232 
233     if (Environment.getProperty("hadoop.conf.path") == null)
234       hadoopConfPath = "../../../conf";
235     else
236       hadoopConfPath = Environment.getProperty("hadoop.conf.path");
237 
238     // Read the configuration for the Hadoop environment
239     Configuration hadoopConf = new Configuration();
240     hadoopConf.addResource(new Path(hadoopConfPath + "/hadoop-default.xml"));
241     hadoopConf.addResource(new Path(hadoopConfPath + "/hadoop-site.xml"));
242 
243     // System.out.println(hadoopConf.get("hadoop.tmp.dir"));
244     // System.out.println(hadoopConf.get("fs.default.name"));
245     FileSystem fs = FileSystem.get(hadoopConf);
246 
247     // HadoopDFS deals with Path
248     Path inFile = new Path("file://" + localFile);
249     Path outFile = new Path(hadoopConf.get("fs.default.name") + hdfsFile);
250 
251      // Read from and write to new file
252     Environment.logInfo("Uploading to HDFS (file " + outFile + ") ...");
253     fs.copyFromLocalFile(false, inFile, outFile);
254   }
255 
256   /**
257    * Close the temporary local file
258    *
259    */
close()260   public void close() {
261     try {
262     writer.flush();
263     writer.close();
264     } catch (IOException e) {
265       e.printStackTrace();
266     }
267   }
268 }
269