1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.fs; 19 20 import java.io.FileNotFoundException; 21 import java.io.IOException; 22 import java.io.UnsupportedEncodingException; 23 import java.net.URI; 24 import java.net.URISyntaxException; 25 import java.net.URLDecoder; 26 import java.util.ArrayList; 27 import java.util.List; 28 import java.util.Map; 29 import java.util.TreeMap; 30 import java.util.HashMap; 31 import java.util.concurrent.ConcurrentHashMap; 32 33 import org.apache.hadoop.conf.Configuration; 34 import org.apache.hadoop.fs.permission.FsPermission; 35 import org.apache.hadoop.io.Text; 36 import org.apache.hadoop.util.LineReader; 37 import org.apache.hadoop.util.Progressable; 38 39 /** 40 * This is an implementation of the Hadoop Archive 41 * Filesystem. This archive Filesystem has index files 42 * of the form _index* and has contents of the form 43 * part-*. The index files store the indexes of the 44 * real files. The index files are of the form _masterindex 45 * and _index. The master index is a level of indirection 46 * in to the index file to make the look ups faster. the index 47 * file is sorted with hash code of the paths that it contains 48 * and the master index contains pointers to the positions in 49 * index for ranges of hashcodes. 50 */ 51 52 public class HarFileSystem extends FilterFileSystem { 53 public static final int VERSION = 3; 54 55 private static final Map<URI, HarMetaData> harMetaCache 56 = new ConcurrentHashMap<URI, HarMetaData>(); 57 58 // uri representation of this Har filesystem 59 private URI uri; 60 // the top level path of the archive 61 // in the underlying file system 62 private Path archivePath; 63 // the har auth 64 private String harAuth; 65 66 // pointer into the static metadata cache 67 private HarMetaData metadata; 68 69 /** 70 * public construction of harfilesystem 71 * 72 */ HarFileSystem()73 public HarFileSystem() { 74 } 75 76 /** 77 * Constructor to create a HarFileSystem with an 78 * underlying filesystem. 79 * @param fs 80 */ HarFileSystem(FileSystem fs)81 public HarFileSystem(FileSystem fs) { 82 super(fs); 83 } 84 85 /** 86 * Initialize a Har filesystem per har archive. The 87 * archive home directory is the top level directory 88 * in the filesystem that contains the HAR archive. 89 * Be careful with this method, you do not want to go 90 * on creating new Filesystem instances per call to 91 * path.getFileSystem(). 92 * the uri of Har is 93 * har://underlyingfsscheme-host:port/archivepath. 94 * or 95 * har:///archivepath. This assumes the underlying filesystem 96 * to be used in case not specified. 97 */ initialize(URI name, Configuration conf)98 public void initialize(URI name, Configuration conf) throws IOException { 99 // decode the name 100 URI underLyingURI = decodeHarURI(name, conf); 101 // we got the right har Path- now check if this is 102 // truly a har filesystem 103 Path harPath = archivePath( 104 new Path(name.getScheme(), name.getAuthority(), name.getPath())); 105 if (harPath == null) { 106 throw new IOException("Invalid path for the Har Filesystem. " + 107 name.toString()); 108 } 109 if (fs == null) { 110 fs = FileSystem.get(underLyingURI, conf); 111 } 112 uri = harPath.toUri(); 113 archivePath = new Path(uri.getPath()); 114 harAuth = getHarAuth(underLyingURI); 115 //check for the underlying fs containing 116 // the index file 117 Path masterIndexPath = new Path(archivePath, "_masterindex"); 118 Path archiveIndexPath = new Path(archivePath, "_index"); 119 if (!fs.exists(masterIndexPath) || !fs.exists(archiveIndexPath)) { 120 throw new IOException("Invalid path for the Har Filesystem. " + 121 "No index file in " + harPath); 122 } 123 124 metadata = harMetaCache.get(uri); 125 if (metadata != null) { 126 FileStatus mStat = fs.getFileStatus(masterIndexPath); 127 FileStatus aStat = fs.getFileStatus(archiveIndexPath); 128 if (mStat.getModificationTime() != metadata.getMasterIndexTimestamp() || 129 aStat.getModificationTime() != metadata.getArchiveIndexTimestamp()) { 130 // the archive has been overwritten since we last read it 131 // remove the entry from the meta data cache 132 metadata = null; 133 harMetaCache.remove(uri); 134 } 135 } 136 if (metadata == null) { 137 metadata = new HarMetaData(fs, masterIndexPath, archiveIndexPath); 138 metadata.parseMetaData(); 139 harMetaCache.put(uri, metadata); 140 } 141 } 142 143 // get the version of the filesystem from the masterindex file 144 // the version is currently not useful since its the first version 145 // of archives getHarVersion()146 public int getHarVersion() throws IOException { 147 if (metadata != null) { 148 return metadata.getVersion(); 149 } 150 else { 151 throw new IOException("Invalid meta data for the Har Filesystem"); 152 } 153 } 154 155 /* 156 * find the parent path that is the 157 * archive path in the path. The last 158 * path segment that ends with .har is 159 * the path that will be returned. 160 */ archivePath(Path p)161 private Path archivePath(Path p) { 162 Path retPath = null; 163 Path tmp = p; 164 for (int i=0; i< p.depth(); i++) { 165 if (tmp.toString().endsWith(".har")) { 166 retPath = tmp; 167 break; 168 } 169 tmp = tmp.getParent(); 170 } 171 return retPath; 172 } 173 174 /** 175 * decode the raw URI to get the underlying URI 176 * @param rawURI raw Har URI 177 * @return filtered URI of the underlying fileSystem 178 */ decodeHarURI(URI rawURI, Configuration conf)179 private URI decodeHarURI(URI rawURI, Configuration conf) throws IOException { 180 String tmpAuth = rawURI.getAuthority(); 181 //we are using the default file 182 //system in the config 183 //so create a underlying uri and 184 //return it 185 if (tmpAuth == null) { 186 //create a path 187 return FileSystem.getDefaultUri(conf); 188 } 189 String host = rawURI.getHost(); 190 if (host == null) { 191 throw new IOException("URI: " + rawURI 192 + " is an invalid Har URI since host==null." 193 + " Expecting har://<scheme>-<host>/<path>."); 194 } 195 int i = host.indexOf('-'); 196 if (i < 0) { 197 throw new IOException("URI: " + rawURI 198 + " is an invalid Har URI since '-' not found." 199 + " Expecting har://<scheme>-<host>/<path>."); 200 } 201 final String underLyingScheme = host.substring(0, i); 202 i++; 203 final String underLyingHost = i == host.length()? null: host.substring(i); 204 int underLyingPort = rawURI.getPort(); 205 String auth = (underLyingHost == null && underLyingPort == -1)? 206 null:(underLyingHost+":"+underLyingPort); 207 URI tmp = null; 208 if (rawURI.getQuery() != null) { 209 // query component not allowed 210 throw new IOException("query component in Path not supported " + rawURI); 211 } 212 try { 213 tmp = new URI(underLyingScheme, auth, rawURI.getPath(), 214 rawURI.getQuery(), rawURI.getFragment()); 215 } catch (URISyntaxException e) { 216 // do nothing should not happen 217 } 218 return tmp; 219 } 220 decodeString(String str)221 private static String decodeString(String str) 222 throws UnsupportedEncodingException { 223 return URLDecoder.decode(str, "UTF-8"); 224 } 225 decodeFileName(String fname)226 private String decodeFileName(String fname) 227 throws UnsupportedEncodingException { 228 int version = metadata.getVersion(); 229 if (version == 2 || version == 3){ 230 return decodeString(fname); 231 } 232 return fname; 233 } 234 235 /** 236 * return the top level archive. 237 */ getWorkingDirectory()238 public Path getWorkingDirectory() { 239 return new Path(uri.toString()); 240 } 241 242 /** 243 * Create a har specific auth 244 * har-underlyingfs:port 245 * @param underLyingURI the uri of underlying 246 * filesystem 247 * @return har specific auth 248 */ getHarAuth(URI underLyingUri)249 private String getHarAuth(URI underLyingUri) { 250 String auth = underLyingUri.getScheme() + "-"; 251 if (underLyingUri.getHost() != null) { 252 auth += underLyingUri.getHost() + ":"; 253 if (underLyingUri.getPort() != -1) { 254 auth += underLyingUri.getPort(); 255 } 256 } 257 else { 258 auth += ":"; 259 } 260 return auth; 261 } 262 263 /** 264 * Returns the uri of this filesystem. 265 * The uri is of the form 266 * har://underlyingfsschema-host:port/pathintheunderlyingfs 267 */ 268 @Override getUri()269 public URI getUri() { 270 return this.uri; 271 } 272 273 @Override getCanonicalServiceName()274 public String getCanonicalServiceName() { 275 return null; 276 } 277 278 /** 279 * this method returns the path 280 * inside the har filesystem. 281 * this is relative path inside 282 * the har filesystem. 283 * @param path the fully qualified path in the har filesystem. 284 * @return relative path in the filesystem. 285 */ getPathInHar(Path path)286 private Path getPathInHar(Path path) { 287 Path harPath = new Path(path.toUri().getPath()); 288 if (archivePath.compareTo(harPath) == 0) 289 return new Path(Path.SEPARATOR); 290 Path tmp = new Path(harPath.getName()); 291 Path parent = harPath.getParent(); 292 while (!(parent.compareTo(archivePath) == 0)) { 293 if (parent.toString().equals(Path.SEPARATOR)) { 294 tmp = null; 295 break; 296 } 297 tmp = new Path(parent.getName(), tmp); 298 parent = parent.getParent(); 299 } 300 if (tmp != null) 301 tmp = new Path(Path.SEPARATOR, tmp); 302 return tmp; 303 } 304 305 //the relative path of p. basically 306 // getting rid of /. Parsing and doing 307 // string manipulation is not good - so 308 // just use the path api to do it. makeRelative(String initial, Path p)309 private Path makeRelative(String initial, Path p) { 310 String scheme = this.uri.getScheme(); 311 String authority = this.uri.getAuthority(); 312 Path root = new Path(Path.SEPARATOR); 313 if (root.compareTo(p) == 0) 314 return new Path(scheme, authority, initial); 315 Path retPath = new Path(p.getName()); 316 Path parent = p.getParent(); 317 for (int i=0; i < p.depth()-1; i++) { 318 retPath = new Path(parent.getName(), retPath); 319 parent = parent.getParent(); 320 } 321 return new Path(new Path(scheme, authority, initial), 322 retPath.toString()); 323 } 324 325 /* this makes a path qualified in the har filesystem 326 * (non-Javadoc) 327 * @see org.apache.hadoop.fs.FilterFileSystem#makeQualified( 328 * org.apache.hadoop.fs.Path) 329 */ 330 @Override makeQualified(Path path)331 public Path makeQualified(Path path) { 332 // make sure that we just get the 333 // path component 334 Path fsPath = path; 335 if (!path.isAbsolute()) { 336 fsPath = new Path(archivePath, path); 337 } 338 339 URI tmpURI = fsPath.toUri(); 340 //change this to Har uri 341 return new Path(uri.getScheme(), harAuth, tmpURI.getPath()); 342 } 343 344 /** 345 * Fix offset and length of block locations. 346 * Note that this method modifies the original array. 347 * @param locations block locations of har part file 348 * @param start the start of the desired range in the contained file 349 * @param len the length of the desired range 350 * @param fileOffsetInHar the offset of the desired file in the har part file 351 * @return block locations with fixed offset and length 352 */ fixBlockLocations(BlockLocation[] locations, long start, long len, long fileOffsetInHar)353 static BlockLocation[] fixBlockLocations(BlockLocation[] locations, 354 long start, 355 long len, 356 long fileOffsetInHar) { 357 // offset 1 past last byte of desired range 358 long end = start + len; 359 360 for (BlockLocation location : locations) { 361 // offset of part block relative to beginning of desired file 362 // (may be negative if file starts in this part block) 363 long harBlockStart = location.getOffset() - fileOffsetInHar; 364 // offset 1 past last byte of har block relative to beginning of 365 // desired file 366 long harBlockEnd = harBlockStart + location.getLength(); 367 368 if (start > harBlockStart) { 369 // desired range starts after beginning of this har block 370 // fix offset to beginning of relevant range (relative to desired file) 371 location.setOffset(start); 372 // fix length to relevant portion of har block 373 location.setLength(location.getLength() - (start - harBlockStart)); 374 } else { 375 // desired range includes beginning of this har block 376 location.setOffset(harBlockStart); 377 } 378 379 if (harBlockEnd > end) { 380 // range ends before end of this har block 381 // fix length to remove irrelevant portion at the end 382 location.setLength(location.getLength() - (harBlockEnd - end)); 383 } 384 } 385 386 return locations; 387 } 388 389 /** 390 * Get block locations from the underlying fs and fix their 391 * offsets and lengths. 392 * @param file the input filestatus to get block locations 393 * @param start the start of the desired range in the contained file 394 * @param len the length of the desired range 395 * @return block locations for this segment of file 396 * @throws IOException 397 */ 398 @Override getFileBlockLocations(FileStatus file, long start, long len)399 public BlockLocation[] getFileBlockLocations(FileStatus file, long start, 400 long len) throws IOException { 401 HarStatus hstatus = getFileHarStatus(file.getPath()); 402 Path partPath = new Path(archivePath, hstatus.getPartName()); 403 FileStatus partStatus = metadata.getPartFileStatus(partPath); 404 405 // get all part blocks that overlap with the desired file blocks 406 BlockLocation[] locations = 407 fs.getFileBlockLocations(partStatus, 408 hstatus.getStartIndex() + start, len); 409 410 return fixBlockLocations(locations, start, len, hstatus.getStartIndex()); 411 } 412 413 /** 414 * the hash of the path p inside iniside 415 * the filesystem 416 * @param p the path in the harfilesystem 417 * @return the hash code of the path. 418 */ getHarHash(Path p)419 public static int getHarHash(Path p) { 420 return (p.toString().hashCode() & 0x7fffffff); 421 } 422 423 static class Store { Store()424 public Store() { 425 begin = end = startHash = endHash = 0; 426 } Store(long begin, long end, int startHash, int endHash)427 public Store(long begin, long end, int startHash, int endHash) { 428 this.begin = begin; 429 this.end = end; 430 this.startHash = startHash; 431 this.endHash = endHash; 432 } 433 public long begin; 434 public long end; 435 public int startHash; 436 public int endHash; 437 } 438 439 /** 440 * Get filestatuses of all the children of a given directory. This just reads 441 * through index file and reads line by line to get all statuses for children 442 * of a directory. Its a brute force way of getting all such filestatuses 443 * 444 * @param parent 445 * the parent path directory 446 * @param statuses 447 * the list to add the children filestatuses to 448 * @param children 449 * the string list of children for this parent 450 * @param archiveIndexStat 451 * the archive index filestatus 452 */ fileStatusesInIndex(HarStatus parent, List<FileStatus> statuses, List<String> children)453 private void fileStatusesInIndex(HarStatus parent, List<FileStatus> statuses, 454 List<String> children) throws IOException { 455 String parentString = parent.getName(); 456 if (!parentString.endsWith(Path.SEPARATOR)){ 457 parentString += Path.SEPARATOR; 458 } 459 Path harPath = new Path(parentString); 460 int harlen = harPath.depth(); 461 final Map<String, FileStatus> cache = new TreeMap<String, FileStatus>(); 462 463 for (HarStatus hstatus : metadata.archive.values()) { 464 String child = hstatus.getName(); 465 if ((child.startsWith(parentString))) { 466 Path thisPath = new Path(child); 467 if (thisPath.depth() == harlen + 1) { 468 statuses.add(toFileStatus(hstatus, cache)); 469 } 470 } 471 } 472 } 473 474 /** 475 * Combine the status stored in the index and the underlying status. 476 * @param h status stored in the index 477 * @param cache caching the underlying file statuses 478 * @return the combined file status 479 * @throws IOException 480 */ toFileStatus(HarStatus h, Map<String, FileStatus> cache)481 private FileStatus toFileStatus(HarStatus h, 482 Map<String, FileStatus> cache) throws IOException { 483 FileStatus underlying = null; 484 if (cache != null) { 485 underlying = cache.get(h.partName); 486 } 487 if (underlying == null) { 488 final Path p = h.isDir? archivePath: new Path(archivePath, h.partName); 489 underlying = fs.getFileStatus(p); 490 if (cache != null) { 491 cache.put(h.partName, underlying); 492 } 493 } 494 495 long modTime = 0; 496 int version = metadata.getVersion(); 497 if (version < 3) { 498 modTime = underlying.getModificationTime(); 499 } else if (version == 3) { 500 modTime = h.getModificationTime(); 501 } 502 503 return new FileStatus( 504 h.isDir()? 0L: h.getLength(), 505 h.isDir(), 506 underlying.getReplication(), 507 underlying.getBlockSize(), 508 modTime, 509 underlying.getAccessTime(), 510 underlying.getPermission(), 511 underlying.getOwner(), 512 underlying.getGroup(), 513 makeRelative(this.uri.getPath(), new Path(h.name))); 514 } 515 516 // a single line parser for hadoop archives status 517 // stored in a single line in the index files 518 // the format is of the form 519 // filename "dir"/"file" partFileName startIndex length 520 // <space seperated children> 521 private class HarStatus { 522 boolean isDir; 523 String name; 524 List<String> children; 525 String partName; 526 long startIndex; 527 long length; 528 long modificationTime = 0; 529 HarStatus(String harString)530 public HarStatus(String harString) throws UnsupportedEncodingException { 531 String[] splits = harString.split(" "); 532 this.name = decodeFileName(splits[0]); 533 this.isDir = "dir".equals(splits[1]) ? true: false; 534 // this is equal to "none" if its a directory 535 this.partName = splits[2]; 536 this.startIndex = Long.parseLong(splits[3]); 537 this.length = Long.parseLong(splits[4]); 538 539 int version = metadata.getVersion(); 540 String[] propSplits = null; 541 // propSplits is used to retrieve the metainformation that Har versions 542 // 1 & 2 missed (modification time, permission, owner group). 543 // These fields are stored in an encoded string placed in different 544 // locations depending on whether it's a file or directory entry. 545 // If it's a directory, the string will be placed at the partName 546 // location (directories have no partName because they don't have data 547 // to be stored). This is done because the number of fields in a 548 // directory entry is unbounded (all children are listed at the end) 549 // If it's a file, the string will be the last field. 550 if (isDir) { 551 if (version == 3){ 552 propSplits = decodeString(this.partName).split(" "); 553 } 554 children = new ArrayList<String>(); 555 for (int i = 5; i < splits.length; i++) { 556 children.add(decodeFileName(splits[i])); 557 } 558 } else if (version == 3) { 559 propSplits = decodeString(splits[5]).split(" "); 560 } 561 562 if (propSplits != null && propSplits.length >= 4) { 563 modificationTime = Long.parseLong(propSplits[0]); 564 // the fields below are stored in the file but are currently not used 565 // by HarFileSystem 566 // permission = new FsPermission(Short.parseShort(propSplits[1])); 567 // owner = decodeString(propSplits[2]); 568 // group = decodeString(propSplits[3]); 569 } 570 } isDir()571 public boolean isDir() { 572 return isDir; 573 } 574 getName()575 public String getName() { 576 return name; 577 } 578 getChildren()579 public List<String> getChildren() { 580 return children; 581 } getFileName()582 public String getFileName() { 583 return name; 584 } getPartName()585 public String getPartName() { 586 return partName; 587 } getStartIndex()588 public long getStartIndex() { 589 return startIndex; 590 } getLength()591 public long getLength() { 592 return length; 593 } getModificationTime()594 public long getModificationTime() { 595 return modificationTime; 596 } 597 } 598 599 /** 600 * return the filestatus of files in har archive. 601 * The permission returned are that of the archive 602 * index files. The permissions are not persisted 603 * while creating a hadoop archive. 604 * @param f the path in har filesystem 605 * @return filestatus. 606 * @throws IOException 607 */ 608 @Override getFileStatus(Path f)609 public FileStatus getFileStatus(Path f) throws IOException { 610 HarStatus hstatus = getFileHarStatus(f); 611 return toFileStatus(hstatus, null); 612 } 613 getFileHarStatus(Path f)614 private HarStatus getFileHarStatus(Path f) throws IOException { 615 // get the fs DataInputStream for the underlying file 616 // look up the index. 617 Path p = makeQualified(f); 618 Path harPath = getPathInHar(p); 619 if (harPath == null) { 620 throw new IOException("Invalid file name: " + f + " in " + uri); 621 } 622 HarStatus hstatus = metadata.archive.get(harPath); 623 if (hstatus == null) { 624 throw new FileNotFoundException("File: " + f + " does not exist in " + uri); 625 } 626 return hstatus; 627 } 628 629 /** 630 * @return null since no checksum algorithm is implemented. 631 */ getFileChecksum(Path f)632 public FileChecksum getFileChecksum(Path f) { 633 return null; 634 } 635 636 /** 637 * Returns a har input stream which fakes end of 638 * file. It reads the index files to get the part 639 * file name and the size and start of the file. 640 */ 641 @Override open(Path f, int bufferSize)642 public FSDataInputStream open(Path f, int bufferSize) throws IOException { 643 // get the fs DataInputStream for the underlying file 644 HarStatus hstatus = getFileHarStatus(f); 645 // we got it.. woo hooo!!! 646 if (hstatus.isDir()) { 647 throw new FileNotFoundException(f + " : not a file in " + 648 archivePath); 649 } 650 return new HarFSDataInputStream(fs, new Path(archivePath, 651 hstatus.getPartName()), 652 hstatus.getStartIndex(), hstatus.getLength(), bufferSize); 653 } 654 655 /* 656 * create throws an exception in Har filesystem. 657 * The archive once created cannot be changed. 658 */ create(Path f, int bufferSize)659 public FSDataOutputStream create(Path f, int bufferSize) 660 throws IOException { 661 throw new IOException("Har: Create not allowed"); 662 } 663 create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress)664 public FSDataOutputStream create(Path f, 665 FsPermission permission, 666 boolean overwrite, 667 int bufferSize, 668 short replication, 669 long blockSize, 670 Progressable progress) throws IOException { 671 throw new IOException("Har: create not allowed."); 672 } 673 674 @Override close()675 public void close() throws IOException { 676 if (fs != null) { 677 try { 678 fs.close(); 679 } catch(IOException ie) { 680 //this might already be closed 681 // ignore 682 } 683 } 684 } 685 686 /** 687 * Not implemented. 688 */ 689 @Override setReplication(Path src, short replication)690 public boolean setReplication(Path src, short replication) throws IOException{ 691 throw new IOException("Har: setreplication not allowed"); 692 } 693 694 /** 695 * Not implemented. 696 */ 697 @Override delete(Path f, boolean recursive)698 public boolean delete(Path f, boolean recursive) throws IOException { 699 throw new IOException("Har: delete not allowed"); 700 } 701 702 /** 703 * liststatus returns the children of a directory 704 * after looking up the index files. 705 */ 706 @Override listStatus(Path f)707 public FileStatus[] listStatus(Path f) throws IOException { 708 //need to see if the file is an index in file 709 //get the filestatus of the archive directory 710 // we will create fake filestatuses to return 711 // to the client 712 List<FileStatus> statuses = new ArrayList<FileStatus>(); 713 Path tmpPath = makeQualified(f); 714 Path harPath = getPathInHar(tmpPath); 715 HarStatus hstatus = metadata.archive.get(harPath); 716 if (hstatus == null) { 717 throw new FileNotFoundException("File " + f + " not found in " + archivePath); 718 } 719 if (hstatus.isDir()) { 720 fileStatusesInIndex(hstatus, statuses, hstatus.children); 721 } else { 722 statuses.add(toFileStatus(hstatus, null)); 723 } 724 725 return statuses.toArray(new FileStatus[statuses.size()]); 726 } 727 728 /** 729 * return the top level archive path. 730 */ getHomeDirectory()731 public Path getHomeDirectory() { 732 return new Path(uri.toString()); 733 } 734 setWorkingDirectory(Path newDir)735 public void setWorkingDirectory(Path newDir) { 736 //does nothing. 737 } 738 739 /** 740 * not implemented. 741 */ mkdirs(Path f, FsPermission permission)742 public boolean mkdirs(Path f, FsPermission permission) throws IOException { 743 throw new IOException("Har: mkdirs not allowed"); 744 } 745 746 /** 747 * not implemented. 748 */ copyFromLocalFile(boolean delSrc, Path src, Path dst)749 public void copyFromLocalFile(boolean delSrc, Path src, Path dst) throws 750 IOException { 751 throw new IOException("Har: copyfromlocalfile not allowed"); 752 } 753 754 /** 755 * copies the file in the har filesystem to a local file. 756 */ copyToLocalFile(boolean delSrc, Path src, Path dst)757 public void copyToLocalFile(boolean delSrc, Path src, Path dst) 758 throws IOException { 759 FileUtil.copy(this, src, getLocal(getConf()), dst, false, getConf()); 760 } 761 762 /** 763 * not implemented. 764 */ startLocalOutput(Path fsOutputFile, Path tmpLocalFile)765 public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile) 766 throws IOException { 767 throw new IOException("Har: startLocalOutput not allowed"); 768 } 769 770 /** 771 * not implemented. 772 */ completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)773 public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile) 774 throws IOException { 775 throw new IOException("Har: completeLocalOutput not allowed"); 776 } 777 778 /** 779 * not implemented. 780 */ setOwner(Path p, String username, String groupname)781 public void setOwner(Path p, String username, String groupname) 782 throws IOException { 783 throw new IOException("Har: setowner not allowed"); 784 } 785 786 /** 787 * Not implemented. 788 */ setPermission(Path p, FsPermission permisssion)789 public void setPermission(Path p, FsPermission permisssion) 790 throws IOException { 791 throw new IOException("Har: setPermission not allowed"); 792 } 793 794 /** 795 * Hadoop archives input stream. This input stream fakes EOF 796 * since archive files are part of bigger part files. 797 */ 798 private static class HarFSDataInputStream extends FSDataInputStream { 799 /** 800 * Create an input stream that fakes all the reads/positions/seeking. 801 */ 802 private static class HarFsInputStream extends FSInputStream { 803 private long position, start, end; 804 //The underlying data input stream that the 805 // underlying filesystem will return. 806 private FSDataInputStream underLyingStream; 807 //one byte buffer 808 private byte[] oneBytebuff = new byte[1]; HarFsInputStream(FileSystem fs, Path path, long start, long length, int bufferSize)809 HarFsInputStream(FileSystem fs, Path path, long start, 810 long length, int bufferSize) throws IOException { 811 underLyingStream = fs.open(path, bufferSize); 812 underLyingStream.seek(start); 813 // the start of this file in the part file 814 this.start = start; 815 // the position pointer in the part file 816 this.position = start; 817 // the end pointer in the part file 818 this.end = start + length; 819 } 820 available()821 public synchronized int available() throws IOException { 822 long remaining = end - underLyingStream.getPos(); 823 if (remaining > (long)Integer.MAX_VALUE) { 824 return Integer.MAX_VALUE; 825 } 826 return (int) remaining; 827 } 828 close()829 public synchronized void close() throws IOException { 830 underLyingStream.close(); 831 super.close(); 832 } 833 834 //not implemented 835 @Override mark(int readLimit)836 public void mark(int readLimit) { 837 // do nothing 838 } 839 840 /** 841 * reset is not implemented 842 */ reset()843 public void reset() throws IOException { 844 throw new IOException("reset not implemented."); 845 } 846 read()847 public synchronized int read() throws IOException { 848 int ret = read(oneBytebuff, 0, 1); 849 return (ret <= 0) ? -1: (oneBytebuff[0] & 0xff); 850 } 851 read(byte[] b)852 public synchronized int read(byte[] b) throws IOException { 853 int ret = read(b, 0, b.length); 854 if (ret != -1) { 855 position += ret; 856 } 857 return ret; 858 } 859 860 /** 861 * 862 */ read(byte[] b, int offset, int len)863 public synchronized int read(byte[] b, int offset, int len) 864 throws IOException { 865 int newlen = len; 866 int ret = -1; 867 if (position + len > end) { 868 newlen = (int) (end - position); 869 } 870 // end case 871 if (newlen == 0) 872 return ret; 873 ret = underLyingStream.read(b, offset, newlen); 874 position += ret; 875 return ret; 876 } 877 skip(long n)878 public synchronized long skip(long n) throws IOException { 879 long tmpN = n; 880 if (tmpN > 0) { 881 if (position + tmpN > end) { 882 tmpN = end - position; 883 } 884 underLyingStream.seek(tmpN + position); 885 position += tmpN; 886 return tmpN; 887 } 888 return (tmpN < 0)? -1 : 0; 889 } 890 getPos()891 public synchronized long getPos() throws IOException { 892 return (position - start); 893 } 894 seek(long pos)895 public synchronized void seek(long pos) throws IOException { 896 if (pos < 0 || (start + pos > end)) { 897 throw new IOException("Failed to seek: EOF"); 898 } 899 position = start + pos; 900 underLyingStream.seek(position); 901 } 902 seekToNewSource(long targetPos)903 public boolean seekToNewSource(long targetPos) throws IOException { 904 //do not need to implement this 905 // hdfs in itself does seektonewsource 906 // while reading. 907 return false; 908 } 909 910 /** 911 * implementing position readable. 912 */ read(long pos, byte[] b, int offset, int length)913 public int read(long pos, byte[] b, int offset, int length) 914 throws IOException { 915 int nlength = length; 916 if (start + nlength + pos > end) { 917 nlength = (int) (end - (start + pos)); 918 } 919 return underLyingStream.read(pos + start , b, offset, nlength); 920 } 921 922 /** 923 * position readable again. 924 */ readFully(long pos, byte[] b, int offset, int length)925 public void readFully(long pos, byte[] b, int offset, int length) 926 throws IOException { 927 if (start + length + pos > end) { 928 throw new IOException("Not enough bytes to read."); 929 } 930 underLyingStream.readFully(pos + start, b, offset, length); 931 } 932 readFully(long pos, byte[] b)933 public void readFully(long pos, byte[] b) throws IOException { 934 readFully(pos, b, 0, b.length); 935 } 936 937 } 938 939 /** 940 * constructors for har input stream. 941 * @param fs the underlying filesystem 942 * @param p The path in the underlying filesystem 943 * @param start the start position in the part file 944 * @param length the length of valid data in the part file 945 * @param bufsize the buffer size 946 * @throws IOException 947 */ HarFSDataInputStream(FileSystem fs, Path p, long start, long length, int bufsize)948 public HarFSDataInputStream(FileSystem fs, Path p, long start, 949 long length, int bufsize) throws IOException { 950 super(new HarFsInputStream(fs, p, start, length, bufsize)); 951 } 952 953 /** 954 * constructor for har input stream. 955 * @param fs the underlying filesystem 956 * @param p the path in the underlying file system 957 * @param start the start position in the part file 958 * @param length the length of valid data in the part file. 959 * @throws IOException 960 */ HarFSDataInputStream(FileSystem fs, Path p, long start, long length)961 public HarFSDataInputStream(FileSystem fs, Path p, long start, long length) 962 throws IOException { 963 super(new HarFsInputStream(fs, p, start, length, 0)); 964 } 965 } 966 967 private class HarMetaData { 968 private FileSystem fs; 969 private int version; 970 // the masterIndex of the archive 971 private Path masterIndexPath; 972 // the index file 973 private Path archiveIndexPath; 974 975 private long masterIndexTimestamp; 976 private long archiveIndexTimestamp; 977 978 List<Store> stores = new ArrayList<Store>(); 979 Map<Path, HarStatus> archive = new HashMap<Path, HarStatus>(); 980 private Map<Path, FileStatus> partFileStatuses = new HashMap<Path, FileStatus>(); 981 HarMetaData(FileSystem fs, Path masterIndexPath, Path archiveIndexPath)982 public HarMetaData(FileSystem fs, Path masterIndexPath, Path archiveIndexPath) { 983 this.fs = fs; 984 this.masterIndexPath = masterIndexPath; 985 this.archiveIndexPath = archiveIndexPath; 986 } 987 getPartFileStatus(Path partPath)988 public FileStatus getPartFileStatus(Path partPath) throws IOException { 989 FileStatus status; 990 status = partFileStatuses.get(partPath); 991 if (status == null) { 992 status = fs.getFileStatus(partPath); 993 partFileStatuses.put(partPath, status); 994 } 995 return status; 996 } 997 getMasterIndexTimestamp()998 public long getMasterIndexTimestamp() { 999 return masterIndexTimestamp; 1000 } 1001 getArchiveIndexTimestamp()1002 public long getArchiveIndexTimestamp() { 1003 return archiveIndexTimestamp; 1004 } 1005 getVersion()1006 private int getVersion() { 1007 return version; 1008 } 1009 parseMetaData()1010 private void parseMetaData() throws IOException { 1011 FSDataInputStream in = fs.open(masterIndexPath); 1012 FileStatus masterStat = fs.getFileStatus(masterIndexPath); 1013 masterIndexTimestamp = masterStat.getModificationTime(); 1014 LineReader lin = new LineReader(in, getConf()); 1015 Text line = new Text(); 1016 long read = lin.readLine(line); 1017 1018 // the first line contains the version of the index file 1019 String versionLine = line.toString(); 1020 String[] arr = versionLine.split(" "); 1021 version = Integer.parseInt(arr[0]); 1022 // make it always backwards-compatible 1023 if (this.version > HarFileSystem.VERSION) { 1024 throw new IOException("Invalid version " + 1025 this.version + " expected " + HarFileSystem.VERSION); 1026 } 1027 1028 // each line contains a hashcode range and the index file name 1029 String[] readStr = null; 1030 while(read < masterStat.getLen()) { 1031 int b = lin.readLine(line); 1032 read += b; 1033 readStr = line.toString().split(" "); 1034 int startHash = Integer.parseInt(readStr[0]); 1035 int endHash = Integer.parseInt(readStr[1]); 1036 stores.add(new Store(Long.parseLong(readStr[2]), 1037 Long.parseLong(readStr[3]), startHash, 1038 endHash)); 1039 line.clear(); 1040 } 1041 try { 1042 // close the master index 1043 lin.close(); 1044 } catch(IOException io){ 1045 // do nothing just a read. 1046 } 1047 1048 FSDataInputStream aIn = fs.open(archiveIndexPath); 1049 FileStatus archiveStat = fs.getFileStatus(archiveIndexPath); 1050 archiveIndexTimestamp = archiveStat.getModificationTime(); 1051 LineReader aLin; 1052 1053 // now start reading the real index file 1054 for (Store s: stores) { 1055 read = 0; 1056 aIn.seek(s.begin); 1057 aLin = new LineReader(aIn, getConf()); 1058 while (read + s.begin < s.end) { 1059 int tmp = aLin.readLine(line); 1060 read += tmp; 1061 String lineFeed = line.toString(); 1062 String[] parsed = lineFeed.split(" "); 1063 parsed[0] = decodeFileName(parsed[0]); 1064 archive.put(new Path(parsed[0]), new HarStatus(lineFeed)); 1065 line.clear(); 1066 } 1067 } 1068 try { 1069 // close the archive index 1070 aIn.close(); 1071 } catch(IOException io) { 1072 // do nothing just a read. 1073 } 1074 } 1075 } 1076 } 1077