1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.fs;
20 
21 import java.io.*;
22 import java.net.URI;
23 import java.nio.ByteBuffer;
24 import java.util.*;
25 
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.fs.permission.*;
28 import org.apache.hadoop.io.nativeio.NativeIO;
29 import org.apache.hadoop.util.Progressable;
30 import org.apache.hadoop.util.StringUtils;
31 import org.apache.hadoop.util.Shell;
32 
33 /****************************************************************
34  * Implement the FileSystem API for the raw local filesystem.
35  *
36  *****************************************************************/
37 public class RawLocalFileSystem extends FileSystem {
38   static final URI NAME = URI.create("file:///");
39   private Path workingDir;
40 
RawLocalFileSystem()41   public RawLocalFileSystem() {
42     workingDir = new Path(System.getProperty("user.dir")).makeQualified(this);
43   }
44 
makeAbsolute(Path f)45   private Path makeAbsolute(Path f) {
46     if (f.isAbsolute()) {
47       return f;
48     } else {
49       return new Path(workingDir, f);
50     }
51   }
52 
53   /** Convert a path to a File. */
pathToFile(Path path)54   public File pathToFile(Path path) {
55     checkPath(path);
56     if (!path.isAbsolute()) {
57       path = new Path(getWorkingDirectory(), path);
58     }
59     return new File(path.toUri().getPath());
60   }
61 
getUri()62   public URI getUri() { return NAME; }
63 
initialize(URI uri, Configuration conf)64   public void initialize(URI uri, Configuration conf) throws IOException {
65     super.initialize(uri, conf);
66     setConf(conf);
67   }
68 
69   class TrackingFileInputStream extends FileInputStream {
TrackingFileInputStream(File f)70     public TrackingFileInputStream(File f) throws IOException {
71       super(f);
72     }
73 
read()74     public int read() throws IOException {
75       int result = super.read();
76       if (result != -1) {
77         statistics.incrementBytesRead(1);
78       }
79       return result;
80     }
81 
read(byte[] data)82     public int read(byte[] data) throws IOException {
83       int result = super.read(data);
84       if (result != -1) {
85         statistics.incrementBytesRead(result);
86       }
87       return result;
88     }
89 
read(byte[] data, int offset, int length)90     public int read(byte[] data, int offset, int length) throws IOException {
91       int result = super.read(data, offset, length);
92       if (result != -1) {
93         statistics.incrementBytesRead(result);
94       }
95       return result;
96     }
97   }
98 
99   /*******************************************************
100    * For open()'s FSInputStream
101    *******************************************************/
102   class LocalFSFileInputStream extends FSInputStream implements HasFileDescriptor {
103     FileInputStream fis;
104     private long position;
105 
LocalFSFileInputStream(Path f)106     public LocalFSFileInputStream(Path f) throws IOException {
107       this.fis = new TrackingFileInputStream(pathToFile(f));
108     }
109 
seek(long pos)110     public void seek(long pos) throws IOException {
111       fis.getChannel().position(pos);
112       this.position = pos;
113     }
114 
getPos()115     public long getPos() throws IOException {
116       return this.position;
117     }
118 
seekToNewSource(long targetPos)119     public boolean seekToNewSource(long targetPos) throws IOException {
120       return false;
121     }
122 
123     /*
124      * Just forward to the fis
125      */
available()126     public int available() throws IOException { return fis.available(); }
close()127     public void close() throws IOException { fis.close(); }
markSupport()128     public boolean markSupport() { return false; }
129 
read()130     public int read() throws IOException {
131       try {
132         int value = fis.read();
133         if (value >= 0) {
134           this.position++;
135         }
136         return value;
137       } catch (IOException e) {                 // unexpected exception
138         throw new FSError(e);                   // assume native fs error
139       }
140     }
141 
read(byte[] b, int off, int len)142     public int read(byte[] b, int off, int len) throws IOException {
143       try {
144         int value = fis.read(b, off, len);
145         if (value > 0) {
146           this.position += value;
147         }
148         return value;
149       } catch (IOException e) {                 // unexpected exception
150         throw new FSError(e);                   // assume native fs error
151       }
152     }
153 
read(long position, byte[] b, int off, int len)154     public int read(long position, byte[] b, int off, int len)
155       throws IOException {
156       ByteBuffer bb = ByteBuffer.wrap(b, off, len);
157       try {
158         return fis.getChannel().read(bb, position);
159       } catch (IOException e) {
160         throw new FSError(e);
161       }
162     }
163 
skip(long n)164     public long skip(long n) throws IOException {
165       long value = fis.skip(n);
166       if (value > 0) {
167         this.position += value;
168       }
169       return value;
170     }
171 
172     @Override
getFileDescriptor()173     public FileDescriptor getFileDescriptor() throws IOException {
174       return fis.getFD();
175     }
176   }
177 
open(Path f, int bufferSize)178   public FSDataInputStream open(Path f, int bufferSize) throws IOException {
179     if (!exists(f)) {
180       throw new FileNotFoundException(f.toString());
181     }
182     return new FSDataInputStream(new BufferedFSInputStream(
183         new LocalFSFileInputStream(f), bufferSize));
184   }
185 
186   /*********************************************************
187    * For create()'s FSOutputStream.
188    *********************************************************/
189   class LocalFSFileOutputStream extends OutputStream implements Syncable {
190     FileOutputStream fos;
191 
LocalFSFileOutputStream(Path f, boolean append)192     private LocalFSFileOutputStream(Path f, boolean append) throws IOException {
193       this.fos = new FileOutputStream(pathToFile(f), append);
194     }
195 
196     /*
197      * Just forward to the fos
198      */
close()199     public void close() throws IOException { fos.close(); }
flush()200     public void flush() throws IOException { fos.flush(); }
write(byte[] b, int off, int len)201     public void write(byte[] b, int off, int len) throws IOException {
202       try {
203         fos.write(b, off, len);
204       } catch (IOException e) {                // unexpected exception
205         throw new FSError(e);                  // assume native fs error
206       }
207     }
208 
write(int b)209     public void write(int b) throws IOException {
210       try {
211         fos.write(b);
212       } catch (IOException e) {              // unexpected exception
213         throw new FSError(e);                // assume native fs error
214       }
215     }
216 
217     /** {@inheritDoc} */
sync()218     public void sync() throws IOException {
219       fos.getFD().sync();
220     }
221   }
222 
223   /** {@inheritDoc} */
append(Path f, int bufferSize, Progressable progress)224   public FSDataOutputStream append(Path f, int bufferSize,
225       Progressable progress) throws IOException {
226     if (!exists(f)) {
227       throw new FileNotFoundException("File " + f + " not found.");
228     }
229     if (getFileStatus(f).isDir()) {
230       throw new IOException("Cannot append to a diretory (=" + f + " ).");
231     }
232     return new FSDataOutputStream(new BufferedOutputStream(
233         new LocalFSFileOutputStream(f, true), bufferSize), statistics);
234   }
235 
236   /** {@inheritDoc} */
237   @Override
create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress)238   public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize,
239                                    short replication, long blockSize, Progressable progress)
240   throws IOException {
241     return create(f, overwrite, true, bufferSize, replication, blockSize, progress);
242   }
243 
create(Path f, boolean overwrite, boolean createParent, int bufferSize, short replication, long blockSize, Progressable progress)244   private FSDataOutputStream create(Path f, boolean overwrite,
245       boolean createParent, int bufferSize,
246       short replication, long blockSize, Progressable progress)
247     throws IOException {
248     if (exists(f) && !overwrite) {
249       throw new IOException("File already exists:"+f);
250     }
251     Path parent = f.getParent();
252     if (parent != null) {
253       if (!createParent && !exists(parent)) {
254         throw new FileNotFoundException("Parent directory doesn't exist: "
255             + parent);
256       } else if (!mkdirs(parent)) {
257         throw new IOException("Mkdirs failed to create " + parent);
258       }
259     }
260     return new FSDataOutputStream(new BufferedOutputStream(
261         new LocalFSFileOutputStream(f, false), bufferSize), statistics);
262   }
263 
264   /** {@inheritDoc} */
265   @Override
create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress)266   public FSDataOutputStream create(Path f, FsPermission permission,
267       boolean overwrite, int bufferSize, short replication, long blockSize,
268       Progressable progress) throws IOException {
269     FSDataOutputStream out = create(f,
270         overwrite, bufferSize, replication, blockSize, progress);
271     setPermission(f, permission);
272     return out;
273   }
274 
275   /** {@inheritDoc} */
276   @Override
createNonRecursive(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress)277   public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
278       boolean overwrite,
279       int bufferSize, short replication, long blockSize,
280       Progressable progress) throws IOException {
281     FSDataOutputStream out = create(f,
282         overwrite, false, bufferSize, replication, blockSize, progress);
283     setPermission(f, permission);
284     return out;
285   }
286 
rename(Path src, Path dst)287   public boolean rename(Path src, Path dst) throws IOException {
288     if (pathToFile(src).renameTo(pathToFile(dst))) {
289       return true;
290     }
291     LOG.debug("Falling through to a copy of " + src + " to " + dst);
292     return FileUtil.copy(this, src, this, dst, true, getConf());
293   }
294 
295   @Deprecated
delete(Path p)296   public boolean delete(Path p) throws IOException {
297     return delete(p, true);
298   }
299 
delete(Path p, boolean recursive)300   public boolean delete(Path p, boolean recursive) throws IOException {
301     File f = pathToFile(p);
302     if (f.isFile()) {
303       return f.delete();
304     } else if ((!recursive) && f.isDirectory() &&
305         (FileUtil.listFiles(f).length != 0)) {
306       throw new IOException("Directory " + f.toString() + " is not empty");
307     }
308     return FileUtil.fullyDelete(f);
309   }
310 
listStatus(Path f)311   public FileStatus[] listStatus(Path f) throws IOException {
312     File localf = pathToFile(f);
313     FileStatus[] results;
314 
315     if (!localf.exists()) {
316       return null;
317     }
318     if (localf.isFile()) {
319       return new FileStatus[] {
320           new RawLocalFileStatus(localf, getDefaultBlockSize(), this) };
321     }
322 
323     String[] names = localf.list();
324     if (names == null) {
325       return null;
326     }
327     results = new FileStatus[names.length];
328     for (int i = 0; i < names.length; i++) {
329       results[i] = getFileStatus(new Path(f, names[i]));
330     }
331     return results;
332   }
333 
334   /**
335    * Creates the specified directory hierarchy. Does not
336    * treat existence as an error.
337    */
mkdirs(Path f)338   public boolean mkdirs(Path f) throws IOException {
339     Path parent = f.getParent();
340     File p2f = pathToFile(f);
341     return (parent == null || mkdirs(parent)) &&
342       (p2f.mkdir() || p2f.isDirectory());
343   }
344 
345   /** {@inheritDoc} */
346   @Override
mkdirs(Path f, FsPermission permission)347   public boolean mkdirs(Path f, FsPermission permission) throws IOException {
348     boolean b = mkdirs(f);
349     setPermission(f, permission);
350     return b;
351   }
352 
353   @Override
getHomeDirectory()354   public Path getHomeDirectory() {
355     return new Path(System.getProperty("user.home")).makeQualified(this);
356   }
357 
358   /**
359    * Set the working directory to the given directory.
360    */
361   @Override
setWorkingDirectory(Path newDir)362   public void setWorkingDirectory(Path newDir) {
363     workingDir = makeAbsolute(newDir);
364     checkPath(workingDir);
365 
366   }
367 
368   @Override
getWorkingDirectory()369   public Path getWorkingDirectory() {
370     return workingDir;
371   }
372 
373   // In the case of the local filesystem, we can just rename the file.
moveFromLocalFile(Path src, Path dst)374   public void moveFromLocalFile(Path src, Path dst) throws IOException {
375     rename(src, dst);
376   }
377 
378   // We can write output directly to the final location
startLocalOutput(Path fsOutputFile, Path tmpLocalFile)379   public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
380     throws IOException {
381     return fsOutputFile;
382   }
383 
384   // It's in the right place - nothing to do.
completeLocalOutput(Path fsWorkingFile, Path tmpLocalFile)385   public void completeLocalOutput(Path fsWorkingFile, Path tmpLocalFile)
386     throws IOException {
387   }
388 
close()389   public void close() throws IOException {
390     super.close();
391   }
392 
toString()393   public String toString() {
394     return "LocalFS";
395   }
396 
getFileStatus(Path f)397   public FileStatus getFileStatus(Path f) throws IOException {
398     File path = pathToFile(f);
399     if (path.exists()) {
400       return new RawLocalFileStatus(pathToFile(f), getDefaultBlockSize(), this);
401     } else {
402       throw new FileNotFoundException( "File " + f + " does not exist.");
403     }
404   }
405 
406   static class RawLocalFileStatus extends FileStatus {
407     /* We can add extra fields here. It breaks at least CopyFiles.FilePair().
408      * We recognize if the information is already loaded by check if
409      * onwer.equals("").
410      */
isPermissionLoaded()411     private boolean isPermissionLoaded() {
412       return !super.getOwner().equals("");
413     }
414 
RawLocalFileStatus(File f, long defaultBlockSize, FileSystem fs)415     RawLocalFileStatus(File f, long defaultBlockSize, FileSystem fs) {
416       super(f.length(), f.isDirectory(), 1, defaultBlockSize,
417             f.lastModified(), new Path(f.getPath()).makeQualified(fs));
418     }
419 
420     @Override
getPermission()421     public FsPermission getPermission() {
422       if (!isPermissionLoaded()) {
423         loadPermissionInfo();
424       }
425       return super.getPermission();
426     }
427 
428     @Override
getOwner()429     public String getOwner() {
430       if (!isPermissionLoaded()) {
431         loadPermissionInfo();
432       }
433       return super.getOwner();
434     }
435 
436     @Override
getGroup()437     public String getGroup() {
438       if (!isPermissionLoaded()) {
439         loadPermissionInfo();
440       }
441       return super.getGroup();
442     }
443 
444     /// loads permissions, owner, and group from `ls -ld`
loadPermissionInfo()445     private void loadPermissionInfo() {
446       IOException e = null;
447       try {
448         StringTokenizer t = new StringTokenizer(
449             FileUtil.execCommand(new File(getPath().toUri()),
450                                  Shell.getGET_PERMISSION_COMMAND()));
451         //expected format
452         //-rw-------    1 username groupname ...
453         String permission = t.nextToken();
454         if (permission.length() > 10) { //files with ACLs might have a '+'
455           permission = permission.substring(0, 10);
456         }
457         setPermission(FsPermission.valueOf(permission));
458         t.nextToken();
459         setOwner(t.nextToken());
460         setGroup(t.nextToken());
461       } catch (Shell.ExitCodeException ioe) {
462         if (ioe.getExitCode() != 1) {
463           e = ioe;
464         } else {
465           setPermission(null);
466           setOwner(null);
467           setGroup(null);
468         }
469       } catch (IOException ioe) {
470         e = ioe;
471       } finally {
472         if (e != null) {
473           throw new RuntimeException("Error while running command to get " +
474                                      "file permissions : " +
475                                      StringUtils.stringifyException(e));
476         }
477       }
478     }
479 
480     @Override
write(DataOutput out)481     public void write(DataOutput out) throws IOException {
482       if (!isPermissionLoaded()) {
483         loadPermissionInfo();
484       }
485       super.write(out);
486     }
487   }
488 
489   /**
490    * Use the command chown to set owner.
491    */
492   @Override
setOwner(Path p, String username, String groupname )493   public void setOwner(Path p, String username, String groupname
494       ) throws IOException {
495     if (username == null && groupname == null) {
496       throw new IOException("username == null && groupname == null");
497     }
498 
499     if (username == null) {
500       FileUtil.execCommand(pathToFile(p), Shell.SET_GROUP_COMMAND, groupname);
501     } else {
502       //OWNER[:[GROUP]]
503       String s = username + (groupname == null? "": ":" + groupname);
504       FileUtil.execCommand(pathToFile(p), Shell.SET_OWNER_COMMAND, s);
505     }
506   }
507 
508   /**
509    * Use the command chmod to set permission.
510    */
511   @Override
setPermission(Path p, FsPermission permission )512   public void setPermission(Path p, FsPermission permission
513                             ) throws IOException {
514     FileUtil.setPermission(pathToFile(p), permission);
515   }
516 }
517