1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.fs; 19 20 import java.io.FileNotFoundException; 21 import java.io.IOException; 22 import java.net.URI; 23 import java.net.URISyntaxException; 24 import java.util.ArrayList; 25 import java.util.EnumSet; 26 import java.util.List; 27 import java.util.Map; 28 import java.util.NoSuchElementException; 29 30 import org.apache.hadoop.classification.InterfaceAudience; 31 import org.apache.hadoop.classification.InterfaceStability; 32 import org.apache.hadoop.conf.Configuration; 33 import org.apache.hadoop.fs.permission.AclEntry; 34 import org.apache.hadoop.fs.permission.AclStatus; 35 import org.apache.hadoop.fs.permission.FsAction; 36 import org.apache.hadoop.fs.permission.FsPermission; 37 import org.apache.hadoop.fs.Options.ChecksumOpt; 38 import org.apache.hadoop.hdfs.CorruptFileBlockIterator; 39 import org.apache.hadoop.hdfs.DFSClient; 40 import org.apache.hadoop.hdfs.DFSInputStream; 41 import org.apache.hadoop.hdfs.DFSOutputStream; 42 import org.apache.hadoop.hdfs.HdfsConfiguration; 43 import org.apache.hadoop.hdfs.client.HdfsDataInputStream; 44 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; 45 import org.apache.hadoop.hdfs.protocol.DirectoryListing; 46 import org.apache.hadoop.hdfs.protocol.HdfsConstants; 47 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; 48 import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; 49 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; 50 import org.apache.hadoop.hdfs.server.namenode.NameNode; 51 import org.apache.hadoop.io.Text; 52 import org.apache.hadoop.security.AccessControlException; 53 import org.apache.hadoop.security.token.SecretManager.InvalidToken; 54 import org.apache.hadoop.security.token.Token; 55 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; 56 import org.apache.hadoop.util.Progressable; 57 58 @InterfaceAudience.Private 59 @InterfaceStability.Evolving 60 public class Hdfs extends AbstractFileSystem { 61 62 DFSClient dfs; 63 private boolean verifyChecksum = true; 64 65 static { HdfsConfiguration.init()66 HdfsConfiguration.init(); 67 } 68 69 /** 70 * This constructor has the signature needed by 71 * {@link AbstractFileSystem#createFileSystem(URI, Configuration)} 72 * 73 * @param theUri which must be that of Hdfs 74 * @param conf configuration 75 * @throws IOException 76 */ Hdfs(final URI theUri, final Configuration conf)77 Hdfs(final URI theUri, final Configuration conf) throws IOException, URISyntaxException { 78 super(theUri, HdfsConstants.HDFS_URI_SCHEME, true, NameNode.DEFAULT_PORT); 79 80 if (!theUri.getScheme().equalsIgnoreCase(HdfsConstants.HDFS_URI_SCHEME)) { 81 throw new IllegalArgumentException("Passed URI's scheme is not for Hdfs"); 82 } 83 String host = theUri.getHost(); 84 if (host == null) { 85 throw new IOException("Incomplete HDFS URI, no host: " + theUri); 86 } 87 88 this.dfs = new DFSClient(theUri, conf, getStatistics()); 89 } 90 91 @Override getUriDefaultPort()92 public int getUriDefaultPort() { 93 return NameNode.DEFAULT_PORT; 94 } 95 96 @Override createInternal(Path f, EnumSet<CreateFlag> createFlag, FsPermission absolutePermission, int bufferSize, short replication, long blockSize, Progressable progress, ChecksumOpt checksumOpt, boolean createParent)97 public HdfsDataOutputStream createInternal(Path f, 98 EnumSet<CreateFlag> createFlag, FsPermission absolutePermission, 99 int bufferSize, short replication, long blockSize, Progressable progress, 100 ChecksumOpt checksumOpt, boolean createParent) throws IOException { 101 102 final DFSOutputStream dfsos = dfs.primitiveCreate(getUriPath(f), 103 absolutePermission, createFlag, createParent, replication, blockSize, 104 progress, bufferSize, checksumOpt); 105 return dfs.createWrappedOutputStream(dfsos, statistics, 106 dfsos.getInitialLen()); 107 } 108 109 @Override delete(Path f, boolean recursive)110 public boolean delete(Path f, boolean recursive) 111 throws IOException, UnresolvedLinkException { 112 return dfs.delete(getUriPath(f), recursive); 113 } 114 115 @Override getFileBlockLocations(Path p, long start, long len)116 public BlockLocation[] getFileBlockLocations(Path p, long start, long len) 117 throws IOException, UnresolvedLinkException { 118 return dfs.getBlockLocations(getUriPath(p), start, len); 119 } 120 121 @Override getFileChecksum(Path f)122 public FileChecksum getFileChecksum(Path f) 123 throws IOException, UnresolvedLinkException { 124 return dfs.getFileChecksum(getUriPath(f), Long.MAX_VALUE); 125 } 126 127 @Override getFileStatus(Path f)128 public FileStatus getFileStatus(Path f) 129 throws IOException, UnresolvedLinkException { 130 HdfsFileStatus fi = dfs.getFileInfo(getUriPath(f)); 131 if (fi != null) { 132 return fi.makeQualified(getUri(), f); 133 } else { 134 throw new FileNotFoundException("File does not exist: " + f.toString()); 135 } 136 } 137 138 @Override getFileLinkStatus(Path f)139 public FileStatus getFileLinkStatus(Path f) 140 throws IOException, UnresolvedLinkException { 141 HdfsFileStatus fi = dfs.getFileLinkInfo(getUriPath(f)); 142 if (fi != null) { 143 return fi.makeQualified(getUri(), f); 144 } else { 145 throw new FileNotFoundException("File does not exist: " + f); 146 } 147 } 148 149 @Override getFsStatus()150 public FsStatus getFsStatus() throws IOException { 151 return dfs.getDiskStatus(); 152 } 153 154 @Override getServerDefaults()155 public FsServerDefaults getServerDefaults() throws IOException { 156 return dfs.getServerDefaults(); 157 } 158 159 @Override listLocatedStatus( final Path p)160 public RemoteIterator<LocatedFileStatus> listLocatedStatus( 161 final Path p) 162 throws FileNotFoundException, IOException { 163 return new DirListingIterator<LocatedFileStatus>(p, true) { 164 165 @Override 166 public LocatedFileStatus next() throws IOException { 167 return ((HdfsLocatedFileStatus)getNext()).makeQualifiedLocated( 168 getUri(), p); 169 } 170 }; 171 } 172 173 @Override 174 public RemoteIterator<FileStatus> listStatusIterator(final Path f) 175 throws AccessControlException, FileNotFoundException, 176 UnresolvedLinkException, IOException { 177 return new DirListingIterator<FileStatus>(f, false) { 178 179 @Override 180 public FileStatus next() throws IOException { 181 return getNext().makeQualified(getUri(), f); 182 } 183 }; 184 } 185 186 /** 187 * This class defines an iterator that returns 188 * the file status of each file/subdirectory of a directory 189 * 190 * if needLocation, status contains block location if it is a file 191 * throws a RuntimeException with the error as its cause. 192 * 193 * @param <T> the type of the file status 194 */ 195 abstract private class DirListingIterator<T extends FileStatus> 196 implements RemoteIterator<T> { 197 private DirectoryListing thisListing; 198 private int i; 199 final private String src; 200 final private boolean needLocation; // if status 201 202 private DirListingIterator(Path p, boolean needLocation) 203 throws IOException { 204 this.src = Hdfs.this.getUriPath(p); 205 this.needLocation = needLocation; 206 207 // fetch the first batch of entries in the directory 208 thisListing = dfs.listPaths( 209 src, HdfsFileStatus.EMPTY_NAME, needLocation); 210 if (thisListing == null) { // the directory does not exist 211 throw new FileNotFoundException("File " + src + " does not exist."); 212 } 213 } 214 215 @Override 216 public boolean hasNext() throws IOException { 217 if (thisListing == null) { 218 return false; 219 } 220 if (i>=thisListing.getPartialListing().length 221 && thisListing.hasMore()) { 222 // current listing is exhausted & fetch a new listing 223 thisListing = dfs.listPaths(src, thisListing.getLastName(), 224 needLocation); 225 if (thisListing == null) { 226 return false; // the directory is deleted 227 } 228 i = 0; 229 } 230 return (i<thisListing.getPartialListing().length); 231 } 232 233 /** 234 * Get the next item in the list 235 * @return the next item in the list 236 * 237 * @throws IOException if there is any error 238 * @throws NoSuchElmentException if no more entry is available 239 */ 240 public HdfsFileStatus getNext() throws IOException { 241 if (hasNext()) { 242 return thisListing.getPartialListing()[i++]; 243 } 244 throw new NoSuchElementException("No more entry in " + src); 245 } 246 } 247 248 @Override 249 public FileStatus[] listStatus(Path f) 250 throws IOException, UnresolvedLinkException { 251 String src = getUriPath(f); 252 253 // fetch the first batch of entries in the directory 254 DirectoryListing thisListing = dfs.listPaths( 255 src, HdfsFileStatus.EMPTY_NAME); 256 257 if (thisListing == null) { // the directory does not exist 258 throw new FileNotFoundException("File " + f + " does not exist."); 259 } 260 261 HdfsFileStatus[] partialListing = thisListing.getPartialListing(); 262 if (!thisListing.hasMore()) { // got all entries of the directory 263 FileStatus[] stats = new FileStatus[partialListing.length]; 264 for (int i = 0; i < partialListing.length; i++) { 265 stats[i] = partialListing[i].makeQualified(getUri(), f); 266 } 267 return stats; 268 } 269 270 // The directory size is too big that it needs to fetch more 271 // estimate the total number of entries in the directory 272 int totalNumEntries = 273 partialListing.length + thisListing.getRemainingEntries(); 274 ArrayList<FileStatus> listing = 275 new ArrayList<FileStatus>(totalNumEntries); 276 // add the first batch of entries to the array list 277 for (HdfsFileStatus fileStatus : partialListing) { 278 listing.add(fileStatus.makeQualified(getUri(), f)); 279 } 280 281 // now fetch more entries 282 do { 283 thisListing = dfs.listPaths(src, thisListing.getLastName()); 284 285 if (thisListing == null) { 286 // the directory is deleted 287 throw new FileNotFoundException("File " + f + " does not exist."); 288 } 289 290 partialListing = thisListing.getPartialListing(); 291 for (HdfsFileStatus fileStatus : partialListing) { 292 listing.add(fileStatus.makeQualified(getUri(), f)); 293 } 294 } while (thisListing.hasMore()); 295 296 return listing.toArray(new FileStatus[listing.size()]); 297 } 298 299 @Override 300 public RemoteIterator<Path> listCorruptFileBlocks(Path path) 301 throws IOException { 302 return new CorruptFileBlockIterator(dfs, path); 303 } 304 305 @Override 306 public void mkdir(Path dir, FsPermission permission, boolean createParent) 307 throws IOException, UnresolvedLinkException { 308 dfs.primitiveMkdir(getUriPath(dir), permission, createParent); 309 } 310 311 @SuppressWarnings("deprecation") 312 @Override 313 public HdfsDataInputStream open(Path f, int bufferSize) 314 throws IOException, UnresolvedLinkException { 315 final DFSInputStream dfsis = dfs.open(getUriPath(f), 316 bufferSize, verifyChecksum); 317 return dfs.createWrappedInputStream(dfsis); 318 } 319 320 @Override 321 public boolean truncate(Path f, long newLength) 322 throws IOException, UnresolvedLinkException { 323 return dfs.truncate(getUriPath(f), newLength); 324 } 325 326 @Override 327 public void renameInternal(Path src, Path dst) 328 throws IOException, UnresolvedLinkException { 329 dfs.rename(getUriPath(src), getUriPath(dst), Options.Rename.NONE); 330 } 331 332 @Override 333 public void renameInternal(Path src, Path dst, boolean overwrite) 334 throws IOException, UnresolvedLinkException { 335 dfs.rename(getUriPath(src), getUriPath(dst), 336 overwrite ? Options.Rename.OVERWRITE : Options.Rename.NONE); 337 } 338 339 @Override 340 public void setOwner(Path f, String username, String groupname) 341 throws IOException, UnresolvedLinkException { 342 dfs.setOwner(getUriPath(f), username, groupname); 343 } 344 345 @Override 346 public void setPermission(Path f, FsPermission permission) 347 throws IOException, UnresolvedLinkException { 348 dfs.setPermission(getUriPath(f), permission); 349 } 350 351 @Override 352 public boolean setReplication(Path f, short replication) 353 throws IOException, UnresolvedLinkException { 354 return dfs.setReplication(getUriPath(f), replication); 355 } 356 357 @Override 358 public void setTimes(Path f, long mtime, long atime) 359 throws IOException, UnresolvedLinkException { 360 dfs.setTimes(getUriPath(f), mtime, atime); 361 } 362 363 @Override 364 public void setVerifyChecksum(boolean verifyChecksum) 365 throws IOException { 366 this.verifyChecksum = verifyChecksum; 367 } 368 369 @Override 370 public boolean supportsSymlinks() { 371 return true; 372 } 373 374 @Override 375 public void createSymlink(Path target, Path link, boolean createParent) 376 throws IOException, UnresolvedLinkException { 377 dfs.createSymlink(target.toString(), getUriPath(link), createParent); 378 } 379 380 @Override 381 public Path getLinkTarget(Path p) throws IOException { 382 return new Path(dfs.getLinkTarget(getUriPath(p))); 383 } 384 385 @Override 386 public String getCanonicalServiceName() { 387 return dfs.getCanonicalServiceName(); 388 } 389 390 @Override //AbstractFileSystem 391 public List<Token<?>> getDelegationTokens(String renewer) throws IOException { 392 Token<DelegationTokenIdentifier> result = dfs 393 .getDelegationToken(renewer == null ? null : new Text(renewer)); 394 List<Token<?>> tokenList = new ArrayList<Token<?>>(); 395 tokenList.add(result); 396 return tokenList; 397 } 398 399 @Override 400 public void modifyAclEntries(Path path, List<AclEntry> aclSpec) 401 throws IOException { 402 dfs.modifyAclEntries(getUriPath(path), aclSpec); 403 } 404 405 @Override 406 public void removeAclEntries(Path path, List<AclEntry> aclSpec) 407 throws IOException { 408 dfs.removeAclEntries(getUriPath(path), aclSpec); 409 } 410 411 @Override 412 public void removeDefaultAcl(Path path) throws IOException { 413 dfs.removeDefaultAcl(getUriPath(path)); 414 } 415 416 @Override 417 public void removeAcl(Path path) throws IOException { 418 dfs.removeAcl(getUriPath(path)); 419 } 420 421 @Override 422 public void setAcl(Path path, List<AclEntry> aclSpec) throws IOException { 423 dfs.setAcl(getUriPath(path), aclSpec); 424 } 425 426 @Override 427 public AclStatus getAclStatus(Path path) throws IOException { 428 return dfs.getAclStatus(getUriPath(path)); 429 } 430 431 @Override 432 public void setXAttr(Path path, String name, byte[] value, 433 EnumSet<XAttrSetFlag> flag) throws IOException { 434 dfs.setXAttr(getUriPath(path), name, value, flag); 435 } 436 437 @Override 438 public byte[] getXAttr(Path path, String name) throws IOException { 439 return dfs.getXAttr(getUriPath(path), name); 440 } 441 442 @Override 443 public Map<String, byte[]> getXAttrs(Path path) throws IOException { 444 return dfs.getXAttrs(getUriPath(path)); 445 } 446 447 @Override 448 public Map<String, byte[]> getXAttrs(Path path, List<String> names) 449 throws IOException { 450 return dfs.getXAttrs(getUriPath(path), names); 451 } 452 453 @Override 454 public List<String> listXAttrs(Path path) throws IOException { 455 return dfs.listXAttrs(getUriPath(path)); 456 } 457 458 @Override 459 public void removeXAttr(Path path, String name) throws IOException { 460 dfs.removeXAttr(getUriPath(path), name); 461 } 462 463 @Override 464 public void access(Path path, final FsAction mode) throws IOException { 465 dfs.checkAccess(getUriPath(path), mode); 466 } 467 468 /** 469 * Renew an existing delegation token. 470 * 471 * @param token delegation token obtained earlier 472 * @return the new expiration time 473 * @throws InvalidToken 474 * @throws IOException 475 * @deprecated Use Token.renew instead. 476 */ 477 @SuppressWarnings("unchecked") 478 public long renewDelegationToken( 479 Token<? extends AbstractDelegationTokenIdentifier> token) 480 throws InvalidToken, IOException { 481 return dfs.renewDelegationToken((Token<DelegationTokenIdentifier>) token); 482 } 483 484 /** 485 * Cancel an existing delegation token. 486 * 487 * @param token delegation token 488 * @throws InvalidToken 489 * @throws IOException 490 * @deprecated Use Token.cancel instead. 491 */ 492 @SuppressWarnings("unchecked") 493 public void cancelDelegationToken( 494 Token<? extends AbstractDelegationTokenIdentifier> token) 495 throws InvalidToken, IOException { 496 dfs.cancelDelegationToken((Token<DelegationTokenIdentifier>) token); 497 } 498 } 499