1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.contrib.failmon; 20 21 import java.io.BufferedOutputStream; 22 import java.io.BufferedReader; 23 import java.io.BufferedWriter; 24 import java.io.File; 25 import java.io.FileOutputStream; 26 import java.io.FileReader; 27 import java.io.FileWriter; 28 import java.io.IOException; 29 import java.net.InetAddress; 30 import java.util.ArrayList; 31 import java.util.Calendar; 32 import java.util.zip.CRC32; 33 import java.util.zip.CheckedOutputStream; 34 import java.util.zip.ZipEntry; 35 import java.util.zip.ZipOutputStream; 36 37 import org.apache.hadoop.conf.Configuration; 38 import org.apache.hadoop.fs.FileSystem; 39 import org.apache.hadoop.fs.Path; 40 41 /********************************************************** 42 * This class takes care of the temporary local storage of 43 * gathered metrics before they get uploaded into HDFS. It writes 44 * Serialized Records as lines in a temporary file and then 45 * compresses and uploads it into HDFS. 46 * 47 **********************************************************/ 48 49 public class LocalStore { 50 51 public final static char FIELD_SEPARATOR = '|'; 52 53 public final static char RECORD_SEPARATOR = '\n'; 54 55 public final static String COMPRESSION_SUFFIX = ".zip"; 56 57 public final static int UPLOAD_INTERVAL = 600; 58 59 String filename; 60 String hdfsDir; 61 62 boolean compress; 63 64 FileWriter fw; 65 66 BufferedWriter writer; 67 68 /** 69 * Create an instance of the class and read the configuration 70 * file to determine some output parameters. Then, initiate the 71 * structured needed for the buffered I/O (so that smal appends 72 * can be handled efficiently). 73 * 74 */ 75 LocalStore()76 public LocalStore() { 77 // determine the local output file name 78 if (Environment.getProperty("local.tmp.filename") == null) 79 Environment.setProperty("local.tmp.filename", "failmon.dat"); 80 81 // local.tmp.dir has been set by the Executor 82 if (Environment.getProperty("local.tmp.dir") == null) 83 Environment.setProperty("local.tmp.dir", System.getProperty("java.io.tmpdir")); 84 85 filename = Environment.getProperty("local.tmp.dir") + "/" + 86 Environment.getProperty("local.tmp.filename"); 87 88 // determine the upload location 89 hdfsDir = Environment.getProperty("hdfs.upload.dir"); 90 if (hdfsDir == null) 91 hdfsDir = "/failmon"; 92 93 // determine if compression is enabled 94 compress = true; 95 if ("false".equalsIgnoreCase(Environment 96 .getProperty("local.tmp.compression"))) 97 compress = false; 98 99 try { 100 fw = new FileWriter(filename, true); 101 writer = new BufferedWriter(fw); 102 } catch (IOException e) { 103 e.printStackTrace(); 104 } 105 } 106 107 /** 108 * Insert an EventRecord to the local storage, after it 109 * gets serialized and anonymized. 110 * 111 * @param er the EventRecord to be inserted 112 */ 113 insert(EventRecord er)114 public void insert(EventRecord er) { 115 SerializedRecord sr = new SerializedRecord(er); 116 try { 117 Anonymizer.anonymize(sr); 118 } catch (Exception e) { 119 e.printStackTrace(); 120 } 121 append(sr); 122 } 123 124 /** 125 * Insert an array of EventRecords to the local storage, after they 126 * get serialized and anonymized. 127 * 128 * @param ers the array of EventRecords to be inserted 129 */ insert(EventRecord[] ers)130 public void insert(EventRecord[] ers) { 131 for (EventRecord er : ers) 132 insert(er); 133 } 134 append(SerializedRecord sr)135 private void append(SerializedRecord sr) { 136 try { 137 writer.write(pack(sr).toString()); 138 writer.write(RECORD_SEPARATOR); 139 // writer.flush(); 140 } catch (IOException e) { 141 e.printStackTrace(); 142 } 143 } 144 145 /** 146 * Pack a SerializedRecord into an array of bytes 147 * 148 * @param sr the SerializedRecord to be packed 149 */ pack(SerializedRecord sr)150 public static StringBuffer pack(SerializedRecord sr) { 151 StringBuffer sb = new StringBuffer(); 152 153 ArrayList<String> keys = new ArrayList<String>(sr.fields.keySet()); 154 155 if (sr.isValid()) 156 SerializedRecord.arrangeKeys(keys); 157 158 for (int i = 0; i < keys.size(); i++) { 159 String value = sr.fields.get(keys.get(i)); 160 sb.append(keys.get(i) + ":" + value); 161 sb.append(FIELD_SEPARATOR); 162 } 163 return sb; 164 } 165 166 /** 167 * Upload the local file store into HDFS, after it 168 * compressing it. Then a new local file is created 169 * as a temporary record store. 170 * 171 */ upload()172 public void upload() { 173 try { 174 writer.flush(); 175 if (compress) 176 zipCompress(filename); 177 String remoteName = "failmon-"; 178 if ("true".equalsIgnoreCase(Environment.getProperty("anonymizer.hash.hostnames"))) 179 remoteName += Anonymizer.getMD5Hash(InetAddress.getLocalHost().getCanonicalHostName()) + "-"; 180 else 181 remoteName += InetAddress.getLocalHost().getCanonicalHostName() + "-"; 182 remoteName += Calendar.getInstance().getTimeInMillis();//.toString(); 183 if (compress) 184 copyToHDFS(filename + COMPRESSION_SUFFIX, hdfsDir + "/" + remoteName + COMPRESSION_SUFFIX); 185 else 186 copyToHDFS(filename, hdfsDir + "/" + remoteName); 187 } catch (IOException e) { 188 e.printStackTrace(); 189 } 190 191 // delete and re-open 192 try { 193 fw.close(); 194 fw = new FileWriter(filename); 195 writer = new BufferedWriter(fw); 196 } catch (IOException e) { 197 e.printStackTrace(); 198 } 199 } 200 201 /** 202 * Compress a text file using the ZIP compressing algorithm. 203 * 204 * @param filename the path to the file to be compressed 205 */ zipCompress(String filename)206 public static void zipCompress(String filename) throws IOException { 207 FileOutputStream fos = new FileOutputStream(filename + COMPRESSION_SUFFIX); 208 CheckedOutputStream csum = new CheckedOutputStream(fos, new CRC32()); 209 ZipOutputStream out = new ZipOutputStream(new BufferedOutputStream(csum)); 210 out.setComment("Failmon records."); 211 212 BufferedReader in = new BufferedReader(new FileReader(filename)); 213 out.putNextEntry(new ZipEntry(new File(filename).getName())); 214 int c; 215 while ((c = in.read()) != -1) 216 out.write(c); 217 in.close(); 218 219 out.finish(); 220 out.close(); 221 } 222 223 /** 224 * Copy a local file to HDFS 225 * 226 * @param localFile the filename of the local file 227 * @param hdfsFile the HDFS filename to copy to 228 */ copyToHDFS(String localFile, String hdfsFile)229 public static void copyToHDFS(String localFile, String hdfsFile) throws IOException { 230 231 String hadoopConfPath; 232 233 if (Environment.getProperty("hadoop.conf.path") == null) 234 hadoopConfPath = "../../../conf"; 235 else 236 hadoopConfPath = Environment.getProperty("hadoop.conf.path"); 237 238 // Read the configuration for the Hadoop environment 239 Configuration hadoopConf = new Configuration(); 240 hadoopConf.addResource(new Path(hadoopConfPath + "/hadoop-default.xml")); 241 hadoopConf.addResource(new Path(hadoopConfPath + "/hadoop-site.xml")); 242 243 // System.out.println(hadoopConf.get("hadoop.tmp.dir")); 244 // System.out.println(hadoopConf.get("fs.default.name")); 245 FileSystem fs = FileSystem.get(hadoopConf); 246 247 // HadoopDFS deals with Path 248 Path inFile = new Path("file://" + localFile); 249 Path outFile = new Path(hadoopConf.get("fs.default.name") + hdfsFile); 250 251 // Read from and write to new file 252 Environment.logInfo("Uploading to HDFS (file " + outFile + ") ..."); 253 fs.copyFromLocalFile(false, inFile, outFile); 254 } 255 256 /** 257 * Close the temporary local file 258 * 259 */ close()260 public void close() { 261 try { 262 writer.flush(); 263 writer.close(); 264 } catch (IOException e) { 265 e.printStackTrace(); 266 } 267 } 268 } 269