1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.fs; 20 21 import java.io.*; 22 import java.net.URI; 23 import java.nio.ByteBuffer; 24 import java.util.*; 25 26 import org.apache.hadoop.conf.Configuration; 27 import org.apache.hadoop.fs.permission.*; 28 import org.apache.hadoop.io.nativeio.NativeIO; 29 import org.apache.hadoop.util.Progressable; 30 import org.apache.hadoop.util.StringUtils; 31 import org.apache.hadoop.util.Shell; 32 33 /**************************************************************** 34 * Implement the FileSystem API for the raw local filesystem. 35 * 36 *****************************************************************/ 37 public class RawLocalFileSystem extends FileSystem { 38 static final URI NAME = URI.create("file:///"); 39 private Path workingDir; 40 RawLocalFileSystem()41 public RawLocalFileSystem() { 42 workingDir = new Path(System.getProperty("user.dir")).makeQualified(this); 43 } 44 makeAbsolute(Path f)45 private Path makeAbsolute(Path f) { 46 if (f.isAbsolute()) { 47 return f; 48 } else { 49 return new Path(workingDir, f); 50 } 51 } 52 53 /** Convert a path to a File. */ pathToFile(Path path)54 public File pathToFile(Path path) { 55 checkPath(path); 56 if (!path.isAbsolute()) { 57 path = new Path(getWorkingDirectory(), path); 58 } 59 return new File(path.toUri().getPath()); 60 } 61 getUri()62 public URI getUri() { return NAME; } 63 initialize(URI uri, Configuration conf)64 public void initialize(URI uri, Configuration conf) throws IOException { 65 super.initialize(uri, conf); 66 setConf(conf); 67 } 68 69 class TrackingFileInputStream extends FileInputStream { TrackingFileInputStream(File f)70 public TrackingFileInputStream(File f) throws IOException { 71 super(f); 72 } 73 read()74 public int read() throws IOException { 75 int result = super.read(); 76 if (result != -1) { 77 statistics.incrementBytesRead(1); 78 } 79 return result; 80 } 81 read(byte[] data)82 public int read(byte[] data) throws IOException { 83 int result = super.read(data); 84 if (result != -1) { 85 statistics.incrementBytesRead(result); 86 } 87 return result; 88 } 89 read(byte[] data, int offset, int length)90 public int read(byte[] data, int offset, int length) throws IOException { 91 int result = super.read(data, offset, length); 92 if (result != -1) { 93 statistics.incrementBytesRead(result); 94 } 95 return result; 96 } 97 } 98 99 /******************************************************* 100 * For open()'s FSInputStream 101 *******************************************************/ 102 class LocalFSFileInputStream extends FSInputStream implements HasFileDescriptor { 103 FileInputStream fis; 104 private long position; 105 LocalFSFileInputStream(Path f)106 public LocalFSFileInputStream(Path f) throws IOException { 107 this.fis = new TrackingFileInputStream(pathToFile(f)); 108 } 109 seek(long pos)110 public void seek(long pos) throws IOException { 111 fis.getChannel().position(pos); 112 this.position = pos; 113 } 114 getPos()115 public long getPos() throws IOException { 116 return this.position; 117 } 118 seekToNewSource(long targetPos)119 public boolean seekToNewSource(long targetPos) throws IOException { 120 return false; 121 } 122 123 /* 124 * Just forward to the fis 125 */ available()126 public int available() throws IOException { return fis.available(); } close()127 public void close() throws IOException { fis.close(); } markSupport()128 public boolean markSupport() { return false; } 129 read()130 public int read() throws IOException { 131 try { 132 int value = fis.read(); 133 if (value >= 0) { 134 this.position++; 135 } 136 return value; 137 } catch (IOException e) { // unexpected exception 138 throw new FSError(e); // assume native fs error 139 } 140 } 141 read(byte[] b, int off, int len)142 public int read(byte[] b, int off, int len) throws IOException { 143 try { 144 int value = fis.read(b, off, len); 145 if (value > 0) { 146 this.position += value; 147 } 148 return value; 149 } catch (IOException e) { // unexpected exception 150 throw new FSError(e); // assume native fs error 151 } 152 } 153 read(long position, byte[] b, int off, int len)154 public int read(long position, byte[] b, int off, int len) 155 throws IOException { 156 ByteBuffer bb = ByteBuffer.wrap(b, off, len); 157 try { 158 return fis.getChannel().read(bb, position); 159 } catch (IOException e) { 160 throw new FSError(e); 161 } 162 } 163 skip(long n)164 public long skip(long n) throws IOException { 165 long value = fis.skip(n); 166 if (value > 0) { 167 this.position += value; 168 } 169 return value; 170 } 171 172 @Override getFileDescriptor()173 public FileDescriptor getFileDescriptor() throws IOException { 174 return fis.getFD(); 175 } 176 } 177 open(Path f, int bufferSize)178 public FSDataInputStream open(Path f, int bufferSize) throws IOException { 179 if (!exists(f)) { 180 throw new FileNotFoundException(f.toString()); 181 } 182 return new FSDataInputStream(new BufferedFSInputStream( 183 new LocalFSFileInputStream(f), bufferSize)); 184 } 185 186 /********************************************************* 187 * For create()'s FSOutputStream. 188 *********************************************************/ 189 class LocalFSFileOutputStream extends OutputStream implements Syncable { 190 FileOutputStream fos; 191 LocalFSFileOutputStream(Path f, boolean append)192 private LocalFSFileOutputStream(Path f, boolean append) throws IOException { 193 this.fos = new FileOutputStream(pathToFile(f), append); 194 } 195 196 /* 197 * Just forward to the fos 198 */ close()199 public void close() throws IOException { fos.close(); } flush()200 public void flush() throws IOException { fos.flush(); } write(byte[] b, int off, int len)201 public void write(byte[] b, int off, int len) throws IOException { 202 try { 203 fos.write(b, off, len); 204 } catch (IOException e) { // unexpected exception 205 throw new FSError(e); // assume native fs error 206 } 207 } 208 write(int b)209 public void write(int b) throws IOException { 210 try { 211 fos.write(b); 212 } catch (IOException e) { // unexpected exception 213 throw new FSError(e); // assume native fs error 214 } 215 } 216 217 /** {@inheritDoc} */ sync()218 public void sync() throws IOException { 219 fos.getFD().sync(); 220 } 221 } 222 223 /** {@inheritDoc} */ append(Path f, int bufferSize, Progressable progress)224 public FSDataOutputStream append(Path f, int bufferSize, 225 Progressable progress) throws IOException { 226 if (!exists(f)) { 227 throw new FileNotFoundException("File " + f + " not found."); 228 } 229 if (getFileStatus(f).isDir()) { 230 throw new IOException("Cannot append to a diretory (=" + f + " )."); 231 } 232 return new FSDataOutputStream(new BufferedOutputStream( 233 new LocalFSFileOutputStream(f, true), bufferSize), statistics); 234 } 235 236 /** {@inheritDoc} */ 237 @Override create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress)238 public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, 239 short replication, long blockSize, Progressable progress) 240 throws IOException { 241 return create(f, overwrite, true, bufferSize, replication, blockSize, progress); 242 } 243 create(Path f, boolean overwrite, boolean createParent, int bufferSize, short replication, long blockSize, Progressable progress)244 private FSDataOutputStream create(Path f, boolean overwrite, 245 boolean createParent, int bufferSize, 246 short replication, long blockSize, Progressable progress) 247 throws IOException { 248 if (exists(f) && !overwrite) { 249 throw new IOException("File already exists:"+f); 250 } 251 Path parent = f.getParent(); 252 if (parent != null) { 253 if (!createParent && !exists(parent)) { 254 throw new FileNotFoundException("Parent directory doesn't exist: " 255 + parent); 256 } else if (!mkdirs(parent)) { 257 throw new IOException("Mkdirs failed to create " + parent); 258 } 259 } 260 return new FSDataOutputStream(new BufferedOutputStream( 261 new LocalFSFileOutputStream(f, false), bufferSize), statistics); 262 } 263 264 /** {@inheritDoc} */ 265 @Override create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress)266 public FSDataOutputStream create(Path f, FsPermission permission, 267 boolean overwrite, int bufferSize, short replication, long blockSize, 268 Progressable progress) throws IOException { 269 FSDataOutputStream out = create(f, 270 overwrite, bufferSize, replication, blockSize, progress); 271 setPermission(f, permission); 272 return out; 273 } 274 275 /** {@inheritDoc} */ 276 @Override createNonRecursive(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress)277 public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, 278 boolean overwrite, 279 int bufferSize, short replication, long blockSize, 280 Progressable progress) throws IOException { 281 FSDataOutputStream out = create(f, 282 overwrite, false, bufferSize, replication, blockSize, progress); 283 setPermission(f, permission); 284 return out; 285 } 286 rename(Path src, Path dst)287 public boolean rename(Path src, Path dst) throws IOException { 288 if (pathToFile(src).renameTo(pathToFile(dst))) { 289 return true; 290 } 291 LOG.debug("Falling through to a copy of " + src + " to " + dst); 292 return FileUtil.copy(this, src, this, dst, true, getConf()); 293 } 294 295 @Deprecated delete(Path p)296 public boolean delete(Path p) throws IOException { 297 return delete(p, true); 298 } 299 delete(Path p, boolean recursive)300 public boolean delete(Path p, boolean recursive) throws IOException { 301 File f = pathToFile(p); 302 if (f.isFile()) { 303 return f.delete(); 304 } else if ((!recursive) && f.isDirectory() && 305 (FileUtil.listFiles(f).length != 0)) { 306 throw new IOException("Directory " + f.toString() + " is not empty"); 307 } 308 return FileUtil.fullyDelete(f); 309 } 310 listStatus(Path f)311 public FileStatus[] listStatus(Path f) throws IOException { 312 File localf = pathToFile(f); 313 FileStatus[] results; 314 315 if (!localf.exists()) { 316 return null; 317 } 318 if (localf.isFile()) { 319 return new FileStatus[] { 320 new RawLocalFileStatus(localf, getDefaultBlockSize(), this) }; 321 } 322 323 String[] names = localf.list(); 324 if (names == null) { 325 return null; 326 } 327 results = new FileStatus[names.length]; 328 for (int i = 0; i < names.length; i++) { 329 results[i] = getFileStatus(new Path(f, names[i])); 330 } 331 return results; 332 } 333 334 /** 335 * Creates the specified directory hierarchy. Does not 336 * treat existence as an error. 337 */ mkdirs(Path f)338 public boolean mkdirs(Path f) throws IOException { 339 Path parent = f.getParent(); 340 File p2f = pathToFile(f); 341 return (parent == null || mkdirs(parent)) && 342 (p2f.mkdir() || p2f.isDirectory()); 343 } 344 345 /** {@inheritDoc} */ 346 @Override mkdirs(Path f, FsPermission permission)347 public boolean mkdirs(Path f, FsPermission permission) throws IOException { 348 boolean b = mkdirs(f); 349 setPermission(f, permission); 350 return b; 351 } 352 353 @Override getHomeDirectory()354 public Path getHomeDirectory() { 355 return new Path(System.getProperty("user.home")).makeQualified(this); 356 } 357 358 /** 359 * Set the working directory to the given directory. 360 */ 361 @Override setWorkingDirectory(Path newDir)362 public void setWorkingDirectory(Path newDir) { 363 workingDir = makeAbsolute(newDir); 364 checkPath(workingDir); 365 366 } 367 368 @Override getWorkingDirectory()369 public Path getWorkingDirectory() { 370 return workingDir; 371 } 372 373 // In the case of the local filesystem, we can just rename the file. moveFromLocalFile(Path src, Path dst)374 public void moveFromLocalFile(Path src, Path dst) throws IOException { 375 rename(src, dst); 376 } 377 378 // We can write output directly to the final location startLocalOutput(Path fsOutputFile, Path tmpLocalFile)379 public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile) 380 throws IOException { 381 return fsOutputFile; 382 } 383 384 // It's in the right place - nothing to do. completeLocalOutput(Path fsWorkingFile, Path tmpLocalFile)385 public void completeLocalOutput(Path fsWorkingFile, Path tmpLocalFile) 386 throws IOException { 387 } 388 close()389 public void close() throws IOException { 390 super.close(); 391 } 392 toString()393 public String toString() { 394 return "LocalFS"; 395 } 396 getFileStatus(Path f)397 public FileStatus getFileStatus(Path f) throws IOException { 398 File path = pathToFile(f); 399 if (path.exists()) { 400 return new RawLocalFileStatus(pathToFile(f), getDefaultBlockSize(), this); 401 } else { 402 throw new FileNotFoundException( "File " + f + " does not exist."); 403 } 404 } 405 406 static class RawLocalFileStatus extends FileStatus { 407 /* We can add extra fields here. It breaks at least CopyFiles.FilePair(). 408 * We recognize if the information is already loaded by check if 409 * onwer.equals(""). 410 */ isPermissionLoaded()411 private boolean isPermissionLoaded() { 412 return !super.getOwner().equals(""); 413 } 414 RawLocalFileStatus(File f, long defaultBlockSize, FileSystem fs)415 RawLocalFileStatus(File f, long defaultBlockSize, FileSystem fs) { 416 super(f.length(), f.isDirectory(), 1, defaultBlockSize, 417 f.lastModified(), new Path(f.getPath()).makeQualified(fs)); 418 } 419 420 @Override getPermission()421 public FsPermission getPermission() { 422 if (!isPermissionLoaded()) { 423 loadPermissionInfo(); 424 } 425 return super.getPermission(); 426 } 427 428 @Override getOwner()429 public String getOwner() { 430 if (!isPermissionLoaded()) { 431 loadPermissionInfo(); 432 } 433 return super.getOwner(); 434 } 435 436 @Override getGroup()437 public String getGroup() { 438 if (!isPermissionLoaded()) { 439 loadPermissionInfo(); 440 } 441 return super.getGroup(); 442 } 443 444 /// loads permissions, owner, and group from `ls -ld` loadPermissionInfo()445 private void loadPermissionInfo() { 446 IOException e = null; 447 try { 448 StringTokenizer t = new StringTokenizer( 449 FileUtil.execCommand(new File(getPath().toUri()), 450 Shell.getGET_PERMISSION_COMMAND())); 451 //expected format 452 //-rw------- 1 username groupname ... 453 String permission = t.nextToken(); 454 if (permission.length() > 10) { //files with ACLs might have a '+' 455 permission = permission.substring(0, 10); 456 } 457 setPermission(FsPermission.valueOf(permission)); 458 t.nextToken(); 459 setOwner(t.nextToken()); 460 setGroup(t.nextToken()); 461 } catch (Shell.ExitCodeException ioe) { 462 if (ioe.getExitCode() != 1) { 463 e = ioe; 464 } else { 465 setPermission(null); 466 setOwner(null); 467 setGroup(null); 468 } 469 } catch (IOException ioe) { 470 e = ioe; 471 } finally { 472 if (e != null) { 473 throw new RuntimeException("Error while running command to get " + 474 "file permissions : " + 475 StringUtils.stringifyException(e)); 476 } 477 } 478 } 479 480 @Override write(DataOutput out)481 public void write(DataOutput out) throws IOException { 482 if (!isPermissionLoaded()) { 483 loadPermissionInfo(); 484 } 485 super.write(out); 486 } 487 } 488 489 /** 490 * Use the command chown to set owner. 491 */ 492 @Override setOwner(Path p, String username, String groupname )493 public void setOwner(Path p, String username, String groupname 494 ) throws IOException { 495 if (username == null && groupname == null) { 496 throw new IOException("username == null && groupname == null"); 497 } 498 499 if (username == null) { 500 FileUtil.execCommand(pathToFile(p), Shell.SET_GROUP_COMMAND, groupname); 501 } else { 502 //OWNER[:[GROUP]] 503 String s = username + (groupname == null? "": ":" + groupname); 504 FileUtil.execCommand(pathToFile(p), Shell.SET_OWNER_COMMAND, s); 505 } 506 } 507 508 /** 509 * Use the command chmod to set permission. 510 */ 511 @Override setPermission(Path p, FsPermission permission )512 public void setPermission(Path p, FsPermission permission 513 ) throws IOException { 514 FileUtil.setPermission(pathToFile(p), permission); 515 } 516 } 517