1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.fs; 19 20 import org.apache.hadoop.classification.InterfaceAudience; 21 import org.apache.hadoop.classification.InterfaceStability; 22 import org.apache.hadoop.conf.Configuration; 23 import org.apache.hadoop.fs.CommonConfigurationKeys; 24 import org.apache.hadoop.util.Shell; 25 26 import java.io.BufferedReader; 27 import java.io.File; 28 import java.io.IOException; 29 import java.util.concurrent.atomic.AtomicLong; 30 31 /** Filesystem disk space usage statistics. Uses the unix 'du' program*/ 32 @InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"}) 33 @InterfaceStability.Evolving 34 public class DU extends Shell { 35 private String dirPath; 36 37 private AtomicLong used = new AtomicLong(); 38 private volatile boolean shouldRun = true; 39 private Thread refreshUsed; 40 private IOException duException = null; 41 private long refreshInterval; 42 43 /** 44 * Keeps track of disk usage. 45 * @param path the path to check disk usage in 46 * @param interval refresh the disk usage at this interval 47 * @throws IOException if we fail to refresh the disk usage 48 */ DU(File path, long interval)49 public DU(File path, long interval) throws IOException { 50 this(path, interval, -1L); 51 } 52 53 /** 54 * Keeps track of disk usage. 55 * @param path the path to check disk usage in 56 * @param interval refresh the disk usage at this interval 57 * @param initialUsed use this value until next refresh 58 * @throws IOException if we fail to refresh the disk usage 59 */ DU(File path, long interval, long initialUsed)60 public DU(File path, long interval, long initialUsed) throws IOException { 61 super(0); 62 63 //we set the Shell interval to 0 so it will always run our command 64 //and use this one to set the thread sleep interval 65 this.refreshInterval = interval; 66 this.dirPath = path.getCanonicalPath(); 67 68 //populate the used variable if the initial value is not specified. 69 if (initialUsed < 0) { 70 run(); 71 } else { 72 this.used.set(initialUsed); 73 } 74 } 75 76 /** 77 * Keeps track of disk usage. 78 * @param path the path to check disk usage in 79 * @param conf configuration object 80 * @throws IOException if we fail to refresh the disk usage 81 */ DU(File path, Configuration conf)82 public DU(File path, Configuration conf) throws IOException { 83 this(path, conf, -1L); 84 } 85 86 /** 87 * Keeps track of disk usage. 88 * @param path the path to check disk usage in 89 * @param conf configuration object 90 * @param initialUsed use it until the next refresh. 91 * @throws IOException if we fail to refresh the disk usage 92 */ DU(File path, Configuration conf, long initialUsed)93 public DU(File path, Configuration conf, long initialUsed) 94 throws IOException { 95 this(path, conf.getLong(CommonConfigurationKeys.FS_DU_INTERVAL_KEY, 96 CommonConfigurationKeys.FS_DU_INTERVAL_DEFAULT), initialUsed); 97 } 98 99 100 101 /** 102 * This thread refreshes the "used" variable. 103 * 104 * Future improvements could be to not permanently 105 * run this thread, instead run when getUsed is called. 106 **/ 107 class DURefreshThread implements Runnable { 108 109 @Override run()110 public void run() { 111 112 while(shouldRun) { 113 114 try { 115 Thread.sleep(refreshInterval); 116 117 try { 118 //update the used variable 119 DU.this.run(); 120 } catch (IOException e) { 121 synchronized (DU.this) { 122 //save the latest exception so we can return it in getUsed() 123 duException = e; 124 } 125 126 LOG.warn("Could not get disk usage information", e); 127 } 128 } catch (InterruptedException e) { 129 } 130 } 131 } 132 } 133 134 /** 135 * Decrease how much disk space we use. 136 * @param value decrease by this value 137 */ decDfsUsed(long value)138 public void decDfsUsed(long value) { 139 used.addAndGet(-value); 140 } 141 142 /** 143 * Increase how much disk space we use. 144 * @param value increase by this value 145 */ incDfsUsed(long value)146 public void incDfsUsed(long value) { 147 used.addAndGet(value); 148 } 149 150 /** 151 * @return disk space used 152 * @throws IOException if the shell command fails 153 */ getUsed()154 public long getUsed() throws IOException { 155 //if the updating thread isn't started, update on demand 156 if(refreshUsed == null) { 157 run(); 158 } else { 159 synchronized (DU.this) { 160 //if an exception was thrown in the last run, rethrow 161 if(duException != null) { 162 IOException tmp = duException; 163 duException = null; 164 throw tmp; 165 } 166 } 167 } 168 169 return Math.max(used.longValue(), 0L); 170 } 171 172 /** 173 * @return the path of which we're keeping track of disk usage 174 */ getDirPath()175 public String getDirPath() { 176 return dirPath; 177 } 178 179 180 /** 181 * Override to hook in DUHelper class. Maybe this can be used more 182 * generally as well on Unix/Linux based systems 183 */ 184 @Override run()185 protected void run() throws IOException { 186 if (WINDOWS) { 187 used.set(DUHelper.getFolderUsage(dirPath)); 188 return; 189 } 190 super.run(); 191 } 192 193 /** 194 * Start the disk usage checking thread. 195 */ start()196 public void start() { 197 //only start the thread if the interval is sane 198 if(refreshInterval > 0) { 199 refreshUsed = new Thread(new DURefreshThread(), 200 "refreshUsed-"+dirPath); 201 refreshUsed.setDaemon(true); 202 refreshUsed.start(); 203 } 204 } 205 206 /** 207 * Shut down the refreshing thread. 208 */ shutdown()209 public void shutdown() { 210 this.shouldRun = false; 211 212 if(this.refreshUsed != null) { 213 this.refreshUsed.interrupt(); 214 } 215 } 216 217 @Override toString()218 public String toString() { 219 return 220 "du -sk " + dirPath +"\n" + 221 used + "\t" + dirPath; 222 } 223 224 @Override getExecString()225 protected String[] getExecString() { 226 return new String[] {"du", "-sk", dirPath}; 227 } 228 229 @Override parseExecResult(BufferedReader lines)230 protected void parseExecResult(BufferedReader lines) throws IOException { 231 String line = lines.readLine(); 232 if (line == null) { 233 throw new IOException("Expecting a line not the end of stream"); 234 } 235 String[] tokens = line.split("\t"); 236 if(tokens.length == 0) { 237 throw new IOException("Illegal du output"); 238 } 239 this.used.set(Long.parseLong(tokens[0])*1024); 240 } 241 main(String[] args)242 public static void main(String[] args) throws Exception { 243 String path = "."; 244 if (args.length > 0) { 245 path = args[0]; 246 } 247 248 System.out.println(new DU(new File(path), new Configuration()).toString()); 249 } 250 } 251