1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.fs;
19 
20 import org.apache.hadoop.classification.InterfaceAudience;
21 import org.apache.hadoop.classification.InterfaceStability;
22 import org.apache.hadoop.conf.Configuration;
23 import org.apache.hadoop.fs.CommonConfigurationKeys;
24 import org.apache.hadoop.util.Shell;
25 
26 import java.io.BufferedReader;
27 import java.io.File;
28 import java.io.IOException;
29 import java.util.concurrent.atomic.AtomicLong;
30 
31 /** Filesystem disk space usage statistics.  Uses the unix 'du' program*/
32 @InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
33 @InterfaceStability.Evolving
34 public class DU extends Shell {
35   private String  dirPath;
36 
37   private AtomicLong used = new AtomicLong();
38   private volatile boolean shouldRun = true;
39   private Thread refreshUsed;
40   private IOException duException = null;
41   private long refreshInterval;
42 
43   /**
44    * Keeps track of disk usage.
45    * @param path the path to check disk usage in
46    * @param interval refresh the disk usage at this interval
47    * @throws IOException if we fail to refresh the disk usage
48    */
DU(File path, long interval)49   public DU(File path, long interval) throws IOException {
50     this(path, interval, -1L);
51   }
52 
53   /**
54    * Keeps track of disk usage.
55    * @param path the path to check disk usage in
56    * @param interval refresh the disk usage at this interval
57    * @param initialUsed use this value until next refresh
58    * @throws IOException if we fail to refresh the disk usage
59    */
DU(File path, long interval, long initialUsed)60   public DU(File path, long interval, long initialUsed) throws IOException {
61     super(0);
62 
63     //we set the Shell interval to 0 so it will always run our command
64     //and use this one to set the thread sleep interval
65     this.refreshInterval = interval;
66     this.dirPath = path.getCanonicalPath();
67 
68     //populate the used variable if the initial value is not specified.
69     if (initialUsed < 0) {
70       run();
71     } else {
72       this.used.set(initialUsed);
73     }
74   }
75 
76   /**
77    * Keeps track of disk usage.
78    * @param path the path to check disk usage in
79    * @param conf configuration object
80    * @throws IOException if we fail to refresh the disk usage
81    */
DU(File path, Configuration conf)82   public DU(File path, Configuration conf) throws IOException {
83     this(path, conf, -1L);
84   }
85 
86   /**
87    * Keeps track of disk usage.
88    * @param path the path to check disk usage in
89    * @param conf configuration object
90    * @param initialUsed use it until the next refresh.
91    * @throws IOException if we fail to refresh the disk usage
92    */
DU(File path, Configuration conf, long initialUsed)93   public DU(File path, Configuration conf, long initialUsed)
94       throws IOException {
95     this(path, conf.getLong(CommonConfigurationKeys.FS_DU_INTERVAL_KEY,
96                 CommonConfigurationKeys.FS_DU_INTERVAL_DEFAULT), initialUsed);
97   }
98 
99 
100 
101   /**
102    * This thread refreshes the "used" variable.
103    *
104    * Future improvements could be to not permanently
105    * run this thread, instead run when getUsed is called.
106    **/
107   class DURefreshThread implements Runnable {
108 
109     @Override
run()110     public void run() {
111 
112       while(shouldRun) {
113 
114         try {
115           Thread.sleep(refreshInterval);
116 
117           try {
118             //update the used variable
119             DU.this.run();
120           } catch (IOException e) {
121             synchronized (DU.this) {
122               //save the latest exception so we can return it in getUsed()
123               duException = e;
124             }
125 
126             LOG.warn("Could not get disk usage information", e);
127           }
128         } catch (InterruptedException e) {
129         }
130       }
131     }
132   }
133 
134   /**
135    * Decrease how much disk space we use.
136    * @param value decrease by this value
137    */
decDfsUsed(long value)138   public void decDfsUsed(long value) {
139     used.addAndGet(-value);
140   }
141 
142   /**
143    * Increase how much disk space we use.
144    * @param value increase by this value
145    */
incDfsUsed(long value)146   public void incDfsUsed(long value) {
147     used.addAndGet(value);
148   }
149 
150   /**
151    * @return disk space used
152    * @throws IOException if the shell command fails
153    */
getUsed()154   public long getUsed() throws IOException {
155     //if the updating thread isn't started, update on demand
156     if(refreshUsed == null) {
157       run();
158     } else {
159       synchronized (DU.this) {
160         //if an exception was thrown in the last run, rethrow
161         if(duException != null) {
162           IOException tmp = duException;
163           duException = null;
164           throw tmp;
165         }
166       }
167     }
168 
169     return Math.max(used.longValue(), 0L);
170   }
171 
172   /**
173    * @return the path of which we're keeping track of disk usage
174    */
getDirPath()175   public String getDirPath() {
176     return dirPath;
177   }
178 
179 
180   /**
181    * Override to hook in DUHelper class. Maybe this can be used more
182    * generally as well on Unix/Linux based systems
183    */
184   @Override
run()185   protected void run() throws IOException {
186     if (WINDOWS) {
187       used.set(DUHelper.getFolderUsage(dirPath));
188       return;
189     }
190     super.run();
191   }
192 
193   /**
194    * Start the disk usage checking thread.
195    */
start()196   public void start() {
197     //only start the thread if the interval is sane
198     if(refreshInterval > 0) {
199       refreshUsed = new Thread(new DURefreshThread(),
200           "refreshUsed-"+dirPath);
201       refreshUsed.setDaemon(true);
202       refreshUsed.start();
203     }
204   }
205 
206   /**
207    * Shut down the refreshing thread.
208    */
shutdown()209   public void shutdown() {
210     this.shouldRun = false;
211 
212     if(this.refreshUsed != null) {
213       this.refreshUsed.interrupt();
214     }
215   }
216 
217   @Override
toString()218   public String toString() {
219     return
220       "du -sk " + dirPath +"\n" +
221       used + "\t" + dirPath;
222   }
223 
224   @Override
getExecString()225   protected String[] getExecString() {
226     return new String[] {"du", "-sk", dirPath};
227   }
228 
229   @Override
parseExecResult(BufferedReader lines)230   protected void parseExecResult(BufferedReader lines) throws IOException {
231     String line = lines.readLine();
232     if (line == null) {
233       throw new IOException("Expecting a line not the end of stream");
234     }
235     String[] tokens = line.split("\t");
236     if(tokens.length == 0) {
237       throw new IOException("Illegal du output");
238     }
239     this.used.set(Long.parseLong(tokens[0])*1024);
240   }
241 
main(String[] args)242   public static void main(String[] args) throws Exception {
243     String path = ".";
244     if (args.length > 0) {
245       path = args[0];
246     }
247 
248     System.out.println(new DU(new File(path), new Configuration()).toString());
249   }
250 }
251