1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.contrib.failmon; 20 21 import java.util.ArrayList; 22 23 import org.apache.hadoop.conf.Configuration; 24 25 /********************************************************** 26 * This class executes monitoring jobs on all nodes of the 27 * cluster, on which we intend to gather failure metrics. 28 * It is basically a thread that sleeps and periodically wakes 29 * up to execute monitoring jobs and ship all gathered data to 30 * a "safe" location, which in most cases will be the HDFS 31 * filesystem of the monitored cluster. 32 * 33 **********************************************************/ 34 35 public class Executor implements Runnable { 36 37 public static final int DEFAULT_LOG_INTERVAL = 3600; 38 39 public static final int DEFAULT_POLL_INTERVAL = 360; 40 41 public static int MIN_INTERVAL = 5; 42 43 public static int instances = 0; 44 45 LocalStore lstore; 46 47 ArrayList<MonitorJob> monitors; 48 49 int interval; 50 51 int upload_interval; 52 int upload_counter; 53 54 /** 55 * Create an instance of the class and read the configuration 56 * file to determine the set of jobs that will be run and the 57 * maximum interval for which the thread can sleep before it 58 * wakes up to execute a monitoring job on the node. 59 * 60 */ 61 Executor(Configuration conf)62 public Executor(Configuration conf) { 63 64 Environment.prepare("conf/failmon.properties"); 65 66 String localTmpDir; 67 68 if (conf == null) { 69 // running as a stand-alone application 70 localTmpDir = System.getProperty("java.io.tmpdir"); 71 Environment.setProperty("local.tmp.dir", localTmpDir); 72 } else { 73 // running from within Hadoop 74 localTmpDir = conf.get("hadoop.tmp.dir"); 75 String hadoopLogPath = System.getProperty("hadoop.log.dir") + "/" + System.getProperty("hadoop.log.file"); 76 Environment.setProperty("hadoop.log.file", hadoopLogPath); 77 Environment.setProperty("local.tmp.dir", localTmpDir); 78 } 79 80 monitors = Environment.getJobs(); 81 interval = Environment.getInterval(monitors); 82 upload_interval = LocalStore.UPLOAD_INTERVAL; 83 lstore = new LocalStore(); 84 85 if (Environment.getProperty("local.upload.interval") != null) 86 upload_interval = Integer.parseInt(Environment.getProperty("local.upload.interval")); 87 88 instances++; 89 } 90 run()91 public void run() { 92 upload_counter = upload_interval; 93 94 Environment.logInfo("Failmon Executor thread started successfully."); 95 while (true) { 96 try { 97 Thread.sleep(interval * 1000); 98 for (int i = 0; i < monitors.size(); i++) { 99 monitors.get(i).counter -= interval; 100 if (monitors.get(i).counter <= 0) { 101 monitors.get(i).reset(); 102 Environment.logInfo("Calling " + monitors.get(i).job.getInfo() + "...\t"); 103 monitors.get(i).job.monitor(lstore); 104 } 105 } 106 upload_counter -= interval; 107 if (upload_counter <= 0) { 108 lstore.upload(); 109 upload_counter = upload_interval; 110 } 111 } catch (InterruptedException e) { 112 e.printStackTrace(); 113 } 114 } 115 } 116 cleanup()117 public void cleanup() { 118 instances--; 119 } 120 } 121