1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.contrib.failmon;
20 
21 import java.util.ArrayList;
22 
23 import org.apache.hadoop.conf.Configuration;
24 
25 /**********************************************************
26  * This class executes monitoring jobs on all nodes of the
27  * cluster, on which we intend to gather failure metrics.
28  * It is basically a thread that sleeps and periodically wakes
29  * up to execute monitoring jobs and ship all gathered data to
30  * a "safe" location, which in most cases will be the HDFS
31  * filesystem of the monitored cluster.
32  *
33  **********************************************************/
34 
35 public class Executor implements Runnable {
36 
37   public static final int DEFAULT_LOG_INTERVAL = 3600;
38 
39   public static final int DEFAULT_POLL_INTERVAL = 360;
40 
41   public static int MIN_INTERVAL = 5;
42 
43   public static int instances = 0;
44 
45   LocalStore lstore;
46 
47   ArrayList<MonitorJob> monitors;
48 
49   int interval;
50 
51   int upload_interval;
52   int upload_counter;
53 
54   /**
55    * Create an instance of the class and read the configuration
56    * file to determine the set of jobs that will be run and the
57    * maximum interval for which the thread can sleep before it
58    * wakes up to execute a monitoring job on the node.
59    *
60    */
61 
Executor(Configuration conf)62   public Executor(Configuration conf) {
63 
64     Environment.prepare("conf/failmon.properties");
65 
66     String localTmpDir;
67 
68     if (conf == null) {
69       // running as a stand-alone application
70       localTmpDir = System.getProperty("java.io.tmpdir");
71       Environment.setProperty("local.tmp.dir", localTmpDir);
72     } else {
73       // running from within Hadoop
74       localTmpDir = conf.get("hadoop.tmp.dir");
75       String hadoopLogPath = System.getProperty("hadoop.log.dir") + "/" + System.getProperty("hadoop.log.file");
76       Environment.setProperty("hadoop.log.file", hadoopLogPath);
77       Environment.setProperty("local.tmp.dir", localTmpDir);
78     }
79 
80     monitors = Environment.getJobs();
81     interval = Environment.getInterval(monitors);
82     upload_interval = LocalStore.UPLOAD_INTERVAL;
83     lstore = new LocalStore();
84 
85     if (Environment.getProperty("local.upload.interval") != null)
86      upload_interval = Integer.parseInt(Environment.getProperty("local.upload.interval"));
87 
88     instances++;
89   }
90 
run()91   public void run() {
92     upload_counter = upload_interval;
93 
94     Environment.logInfo("Failmon Executor thread started successfully.");
95     while (true) {
96       try {
97         Thread.sleep(interval * 1000);
98         for (int i = 0; i < monitors.size(); i++) {
99           monitors.get(i).counter -= interval;
100           if (monitors.get(i).counter <= 0) {
101             monitors.get(i).reset();
102             Environment.logInfo("Calling " + monitors.get(i).job.getInfo() + "...\t");
103             monitors.get(i).job.monitor(lstore);
104           }
105         }
106         upload_counter -= interval;
107         if (upload_counter <= 0) {
108           lstore.upload();
109           upload_counter = upload_interval;
110         }
111       } catch (InterruptedException e) {
112         e.printStackTrace();
113       }
114     }
115   }
116 
cleanup()117   public void cleanup() {
118     instances--;
119   }
120 }
121