1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.fs;
19 
20 import java.io.FileNotFoundException;
21 import java.io.IOException;
22 import java.net.URI;
23 import java.net.URISyntaxException;
24 import java.util.ArrayList;
25 import java.util.EnumSet;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.NoSuchElementException;
29 
30 import org.apache.hadoop.classification.InterfaceAudience;
31 import org.apache.hadoop.classification.InterfaceStability;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.fs.permission.AclEntry;
34 import org.apache.hadoop.fs.permission.AclStatus;
35 import org.apache.hadoop.fs.permission.FsAction;
36 import org.apache.hadoop.fs.permission.FsPermission;
37 import org.apache.hadoop.fs.Options.ChecksumOpt;
38 import org.apache.hadoop.hdfs.CorruptFileBlockIterator;
39 import org.apache.hadoop.hdfs.DFSClient;
40 import org.apache.hadoop.hdfs.DFSInputStream;
41 import org.apache.hadoop.hdfs.DFSOutputStream;
42 import org.apache.hadoop.hdfs.HdfsConfiguration;
43 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
44 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
45 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
46 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
47 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
48 import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
49 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
50 import org.apache.hadoop.hdfs.server.namenode.NameNode;
51 import org.apache.hadoop.io.Text;
52 import org.apache.hadoop.security.AccessControlException;
53 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
54 import org.apache.hadoop.security.token.Token;
55 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
56 import org.apache.hadoop.util.Progressable;
57 
58 @InterfaceAudience.Private
59 @InterfaceStability.Evolving
60 public class Hdfs extends AbstractFileSystem {
61 
62   DFSClient dfs;
63   private boolean verifyChecksum = true;
64 
65   static {
HdfsConfiguration.init()66     HdfsConfiguration.init();
67   }
68 
69   /**
70    * This constructor has the signature needed by
71    * {@link AbstractFileSystem#createFileSystem(URI, Configuration)}
72    *
73    * @param theUri which must be that of Hdfs
74    * @param conf configuration
75    * @throws IOException
76    */
Hdfs(final URI theUri, final Configuration conf)77   Hdfs(final URI theUri, final Configuration conf) throws IOException, URISyntaxException {
78     super(theUri, HdfsConstants.HDFS_URI_SCHEME, true, NameNode.DEFAULT_PORT);
79 
80     if (!theUri.getScheme().equalsIgnoreCase(HdfsConstants.HDFS_URI_SCHEME)) {
81       throw new IllegalArgumentException("Passed URI's scheme is not for Hdfs");
82     }
83     String host = theUri.getHost();
84     if (host == null) {
85       throw new IOException("Incomplete HDFS URI, no host: " + theUri);
86     }
87 
88     this.dfs = new DFSClient(theUri, conf, getStatistics());
89   }
90 
91   @Override
getUriDefaultPort()92   public int getUriDefaultPort() {
93     return NameNode.DEFAULT_PORT;
94   }
95 
96   @Override
createInternal(Path f, EnumSet<CreateFlag> createFlag, FsPermission absolutePermission, int bufferSize, short replication, long blockSize, Progressable progress, ChecksumOpt checksumOpt, boolean createParent)97   public HdfsDataOutputStream createInternal(Path f,
98       EnumSet<CreateFlag> createFlag, FsPermission absolutePermission,
99       int bufferSize, short replication, long blockSize, Progressable progress,
100       ChecksumOpt checksumOpt, boolean createParent) throws IOException {
101 
102     final DFSOutputStream dfsos = dfs.primitiveCreate(getUriPath(f),
103       absolutePermission, createFlag, createParent, replication, blockSize,
104       progress, bufferSize, checksumOpt);
105     return dfs.createWrappedOutputStream(dfsos, statistics,
106         dfsos.getInitialLen());
107   }
108 
109   @Override
delete(Path f, boolean recursive)110   public boolean delete(Path f, boolean recursive)
111       throws IOException, UnresolvedLinkException {
112     return dfs.delete(getUriPath(f), recursive);
113   }
114 
115   @Override
getFileBlockLocations(Path p, long start, long len)116   public BlockLocation[] getFileBlockLocations(Path p, long start, long len)
117       throws IOException, UnresolvedLinkException {
118     return dfs.getBlockLocations(getUriPath(p), start, len);
119   }
120 
121   @Override
getFileChecksum(Path f)122   public FileChecksum getFileChecksum(Path f)
123       throws IOException, UnresolvedLinkException {
124     return dfs.getFileChecksum(getUriPath(f), Long.MAX_VALUE);
125   }
126 
127   @Override
getFileStatus(Path f)128   public FileStatus getFileStatus(Path f)
129       throws IOException, UnresolvedLinkException {
130     HdfsFileStatus fi = dfs.getFileInfo(getUriPath(f));
131     if (fi != null) {
132       return fi.makeQualified(getUri(), f);
133     } else {
134       throw new FileNotFoundException("File does not exist: " + f.toString());
135     }
136   }
137 
138   @Override
getFileLinkStatus(Path f)139   public FileStatus getFileLinkStatus(Path f)
140       throws IOException, UnresolvedLinkException {
141     HdfsFileStatus fi = dfs.getFileLinkInfo(getUriPath(f));
142     if (fi != null) {
143       return fi.makeQualified(getUri(), f);
144     } else {
145       throw new FileNotFoundException("File does not exist: " + f);
146     }
147   }
148 
149   @Override
getFsStatus()150   public FsStatus getFsStatus() throws IOException {
151     return dfs.getDiskStatus();
152   }
153 
154   @Override
getServerDefaults()155   public FsServerDefaults getServerDefaults() throws IOException {
156     return dfs.getServerDefaults();
157   }
158 
159   @Override
listLocatedStatus( final Path p)160   public RemoteIterator<LocatedFileStatus> listLocatedStatus(
161       final Path p)
162       throws FileNotFoundException, IOException {
163     return new DirListingIterator<LocatedFileStatus>(p, true) {
164 
165       @Override
166       public LocatedFileStatus next() throws IOException {
167         return ((HdfsLocatedFileStatus)getNext()).makeQualifiedLocated(
168             getUri(), p);
169       }
170     };
171   }
172 
173   @Override
174   public RemoteIterator<FileStatus> listStatusIterator(final Path f)
175     throws AccessControlException, FileNotFoundException,
176     UnresolvedLinkException, IOException {
177     return new DirListingIterator<FileStatus>(f, false) {
178 
179       @Override
180       public FileStatus next() throws IOException {
181         return getNext().makeQualified(getUri(), f);
182       }
183     };
184   }
185 
186   /**
187    * This class defines an iterator that returns
188    * the file status of each file/subdirectory of a directory
189    *
190    * if needLocation, status contains block location if it is a file
191    * throws a RuntimeException with the error as its cause.
192    *
193    * @param <T> the type of the file status
194    */
195   abstract private class  DirListingIterator<T extends FileStatus>
196   implements RemoteIterator<T> {
197     private DirectoryListing thisListing;
198     private int i;
199     final private String src;
200     final private boolean needLocation;  // if status
201 
202     private DirListingIterator(Path p, boolean needLocation)
203       throws IOException {
204       this.src = Hdfs.this.getUriPath(p);
205       this.needLocation = needLocation;
206 
207       // fetch the first batch of entries in the directory
208       thisListing = dfs.listPaths(
209           src, HdfsFileStatus.EMPTY_NAME, needLocation);
210       if (thisListing == null) { // the directory does not exist
211         throw new FileNotFoundException("File " + src + " does not exist.");
212       }
213     }
214 
215     @Override
216     public boolean hasNext() throws IOException {
217       if (thisListing == null) {
218         return false;
219       }
220       if (i>=thisListing.getPartialListing().length
221           && thisListing.hasMore()) {
222         // current listing is exhausted & fetch a new listing
223         thisListing = dfs.listPaths(src, thisListing.getLastName(),
224             needLocation);
225         if (thisListing == null) {
226           return false; // the directory is deleted
227         }
228         i = 0;
229       }
230       return (i<thisListing.getPartialListing().length);
231     }
232 
233     /**
234      * Get the next item in the list
235      * @return the next item in the list
236      *
237      * @throws IOException if there is any error
238      * @throws NoSuchElmentException if no more entry is available
239      */
240     public HdfsFileStatus getNext() throws IOException {
241       if (hasNext()) {
242         return thisListing.getPartialListing()[i++];
243       }
244       throw new NoSuchElementException("No more entry in " + src);
245     }
246   }
247 
248   @Override
249   public FileStatus[] listStatus(Path f)
250       throws IOException, UnresolvedLinkException {
251     String src = getUriPath(f);
252 
253     // fetch the first batch of entries in the directory
254     DirectoryListing thisListing = dfs.listPaths(
255         src, HdfsFileStatus.EMPTY_NAME);
256 
257     if (thisListing == null) { // the directory does not exist
258       throw new FileNotFoundException("File " + f + " does not exist.");
259     }
260 
261     HdfsFileStatus[] partialListing = thisListing.getPartialListing();
262     if (!thisListing.hasMore()) { // got all entries of the directory
263       FileStatus[] stats = new FileStatus[partialListing.length];
264       for (int i = 0; i < partialListing.length; i++) {
265         stats[i] = partialListing[i].makeQualified(getUri(), f);
266       }
267       return stats;
268     }
269 
270     // The directory size is too big that it needs to fetch more
271     // estimate the total number of entries in the directory
272     int totalNumEntries =
273       partialListing.length + thisListing.getRemainingEntries();
274     ArrayList<FileStatus> listing =
275       new ArrayList<FileStatus>(totalNumEntries);
276     // add the first batch of entries to the array list
277     for (HdfsFileStatus fileStatus : partialListing) {
278       listing.add(fileStatus.makeQualified(getUri(), f));
279     }
280 
281     // now fetch more entries
282     do {
283       thisListing = dfs.listPaths(src, thisListing.getLastName());
284 
285       if (thisListing == null) {
286         // the directory is deleted
287         throw new FileNotFoundException("File " + f + " does not exist.");
288       }
289 
290       partialListing = thisListing.getPartialListing();
291       for (HdfsFileStatus fileStatus : partialListing) {
292         listing.add(fileStatus.makeQualified(getUri(), f));
293       }
294     } while (thisListing.hasMore());
295 
296     return listing.toArray(new FileStatus[listing.size()]);
297   }
298 
299   @Override
300   public RemoteIterator<Path> listCorruptFileBlocks(Path path)
301     throws IOException {
302     return new CorruptFileBlockIterator(dfs, path);
303   }
304 
305   @Override
306   public void mkdir(Path dir, FsPermission permission, boolean createParent)
307     throws IOException, UnresolvedLinkException {
308     dfs.primitiveMkdir(getUriPath(dir), permission, createParent);
309   }
310 
311   @SuppressWarnings("deprecation")
312   @Override
313   public HdfsDataInputStream open(Path f, int bufferSize)
314       throws IOException, UnresolvedLinkException {
315     final DFSInputStream dfsis = dfs.open(getUriPath(f),
316       bufferSize, verifyChecksum);
317     return dfs.createWrappedInputStream(dfsis);
318   }
319 
320   @Override
321   public boolean truncate(Path f, long newLength)
322       throws IOException, UnresolvedLinkException {
323     return dfs.truncate(getUriPath(f), newLength);
324   }
325 
326   @Override
327   public void renameInternal(Path src, Path dst)
328     throws IOException, UnresolvedLinkException {
329     dfs.rename(getUriPath(src), getUriPath(dst), Options.Rename.NONE);
330   }
331 
332   @Override
333   public void renameInternal(Path src, Path dst, boolean overwrite)
334       throws IOException, UnresolvedLinkException {
335     dfs.rename(getUriPath(src), getUriPath(dst),
336         overwrite ? Options.Rename.OVERWRITE : Options.Rename.NONE);
337   }
338 
339   @Override
340   public void setOwner(Path f, String username, String groupname)
341     throws IOException, UnresolvedLinkException {
342     dfs.setOwner(getUriPath(f), username, groupname);
343   }
344 
345   @Override
346   public void setPermission(Path f, FsPermission permission)
347     throws IOException, UnresolvedLinkException {
348     dfs.setPermission(getUriPath(f), permission);
349   }
350 
351   @Override
352   public boolean setReplication(Path f, short replication)
353     throws IOException, UnresolvedLinkException {
354     return dfs.setReplication(getUriPath(f), replication);
355   }
356 
357   @Override
358   public void setTimes(Path f, long mtime, long atime)
359     throws IOException, UnresolvedLinkException {
360     dfs.setTimes(getUriPath(f), mtime, atime);
361   }
362 
363   @Override
364   public void setVerifyChecksum(boolean verifyChecksum)
365     throws IOException {
366     this.verifyChecksum = verifyChecksum;
367   }
368 
369   @Override
370   public boolean supportsSymlinks() {
371     return true;
372   }
373 
374   @Override
375   public void createSymlink(Path target, Path link, boolean createParent)
376     throws IOException, UnresolvedLinkException {
377     dfs.createSymlink(target.toString(), getUriPath(link), createParent);
378   }
379 
380   @Override
381   public Path getLinkTarget(Path p) throws IOException {
382     return new Path(dfs.getLinkTarget(getUriPath(p)));
383   }
384 
385   @Override
386   public String getCanonicalServiceName() {
387     return dfs.getCanonicalServiceName();
388   }
389 
390   @Override //AbstractFileSystem
391   public List<Token<?>> getDelegationTokens(String renewer) throws IOException {
392     Token<DelegationTokenIdentifier> result = dfs
393         .getDelegationToken(renewer == null ? null : new Text(renewer));
394     List<Token<?>> tokenList = new ArrayList<Token<?>>();
395     tokenList.add(result);
396     return tokenList;
397   }
398 
399   @Override
400   public void modifyAclEntries(Path path, List<AclEntry> aclSpec)
401       throws IOException {
402     dfs.modifyAclEntries(getUriPath(path), aclSpec);
403   }
404 
405   @Override
406   public void removeAclEntries(Path path, List<AclEntry> aclSpec)
407       throws IOException {
408     dfs.removeAclEntries(getUriPath(path), aclSpec);
409   }
410 
411   @Override
412   public void removeDefaultAcl(Path path) throws IOException {
413     dfs.removeDefaultAcl(getUriPath(path));
414   }
415 
416   @Override
417   public void removeAcl(Path path) throws IOException {
418     dfs.removeAcl(getUriPath(path));
419   }
420 
421   @Override
422   public void setAcl(Path path, List<AclEntry> aclSpec) throws IOException {
423     dfs.setAcl(getUriPath(path), aclSpec);
424   }
425 
426   @Override
427   public AclStatus getAclStatus(Path path) throws IOException {
428     return dfs.getAclStatus(getUriPath(path));
429   }
430 
431   @Override
432   public void setXAttr(Path path, String name, byte[] value,
433       EnumSet<XAttrSetFlag> flag) throws IOException {
434     dfs.setXAttr(getUriPath(path), name, value, flag);
435   }
436 
437   @Override
438   public byte[] getXAttr(Path path, String name) throws IOException {
439     return dfs.getXAttr(getUriPath(path), name);
440   }
441 
442   @Override
443   public Map<String, byte[]> getXAttrs(Path path) throws IOException {
444     return dfs.getXAttrs(getUriPath(path));
445   }
446 
447   @Override
448   public Map<String, byte[]> getXAttrs(Path path, List<String> names)
449       throws IOException {
450     return dfs.getXAttrs(getUriPath(path), names);
451   }
452 
453   @Override
454   public List<String> listXAttrs(Path path) throws IOException {
455     return dfs.listXAttrs(getUriPath(path));
456   }
457 
458   @Override
459   public void removeXAttr(Path path, String name) throws IOException {
460     dfs.removeXAttr(getUriPath(path), name);
461   }
462 
463   @Override
464   public void access(Path path, final FsAction mode) throws IOException {
465     dfs.checkAccess(getUriPath(path), mode);
466   }
467 
468   /**
469    * Renew an existing delegation token.
470    *
471    * @param token delegation token obtained earlier
472    * @return the new expiration time
473    * @throws InvalidToken
474    * @throws IOException
475    * @deprecated Use Token.renew instead.
476    */
477   @SuppressWarnings("unchecked")
478   public long renewDelegationToken(
479       Token<? extends AbstractDelegationTokenIdentifier> token)
480       throws InvalidToken, IOException {
481     return dfs.renewDelegationToken((Token<DelegationTokenIdentifier>) token);
482   }
483 
484   /**
485    * Cancel an existing delegation token.
486    *
487    * @param token delegation token
488    * @throws InvalidToken
489    * @throws IOException
490    * @deprecated Use Token.cancel instead.
491    */
492   @SuppressWarnings("unchecked")
493   public void cancelDelegationToken(
494       Token<? extends AbstractDelegationTokenIdentifier> token)
495       throws InvalidToken, IOException {
496     dfs.cancelDelegationToken((Token<DelegationTokenIdentifier>) token);
497   }
498 }
499