1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.fs;
19 
20 import java.io.FileNotFoundException;
21 import java.io.IOException;
22 import java.io.UnsupportedEncodingException;
23 import java.net.URI;
24 import java.net.URISyntaxException;
25 import java.net.URLDecoder;
26 import java.util.ArrayList;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.TreeMap;
30 import java.util.HashMap;
31 import java.util.concurrent.ConcurrentHashMap;
32 
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.fs.permission.FsPermission;
35 import org.apache.hadoop.io.Text;
36 import org.apache.hadoop.util.LineReader;
37 import org.apache.hadoop.util.Progressable;
38 
39 /**
40  * This is an implementation of the Hadoop Archive
41  * Filesystem. This archive Filesystem has index files
42  * of the form _index* and has contents of the form
43  * part-*. The index files store the indexes of the
44  * real files. The index files are of the form _masterindex
45  * and _index. The master index is a level of indirection
46  * in to the index file to make the look ups faster. the index
47  * file is sorted with hash code of the paths that it contains
48  * and the master index contains pointers to the positions in
49  * index for ranges of hashcodes.
50  */
51 
52 public class HarFileSystem extends FilterFileSystem {
53   public static final int VERSION = 3;
54 
55   private static final Map<URI, HarMetaData> harMetaCache
56     = new ConcurrentHashMap<URI, HarMetaData>();
57 
58   // uri representation of this Har filesystem
59   private URI uri;
60   // the top level path of the archive
61   // in the underlying file system
62   private Path archivePath;
63   // the har auth
64   private String harAuth;
65 
66   // pointer into the static metadata cache
67   private HarMetaData metadata;
68 
69   /**
70    * public construction of harfilesystem
71    *
72    */
HarFileSystem()73   public HarFileSystem() {
74   }
75 
76   /**
77    * Constructor to create a HarFileSystem with an
78    * underlying filesystem.
79    * @param fs
80    */
HarFileSystem(FileSystem fs)81   public HarFileSystem(FileSystem fs) {
82     super(fs);
83   }
84 
85   /**
86    * Initialize a Har filesystem per har archive. The
87    * archive home directory is the top level directory
88    * in the filesystem that contains the HAR archive.
89    * Be careful with this method, you do not want to go
90    * on creating new Filesystem instances per call to
91    * path.getFileSystem().
92    * the uri of Har is
93    * har://underlyingfsscheme-host:port/archivepath.
94    * or
95    * har:///archivepath. This assumes the underlying filesystem
96    * to be used in case not specified.
97    */
initialize(URI name, Configuration conf)98   public void initialize(URI name, Configuration conf) throws IOException {
99     // decode the name
100     URI underLyingURI = decodeHarURI(name, conf);
101     // we got the right har Path- now check if this is
102     // truly a har filesystem
103     Path harPath = archivePath(
104       new Path(name.getScheme(), name.getAuthority(), name.getPath()));
105     if (harPath == null) {
106       throw new IOException("Invalid path for the Har Filesystem. " +
107                            name.toString());
108     }
109     if (fs == null) {
110       fs = FileSystem.get(underLyingURI, conf);
111     }
112     uri = harPath.toUri();
113     archivePath = new Path(uri.getPath());
114     harAuth = getHarAuth(underLyingURI);
115     //check for the underlying fs containing
116     // the index file
117     Path masterIndexPath = new Path(archivePath, "_masterindex");
118     Path archiveIndexPath = new Path(archivePath, "_index");
119     if (!fs.exists(masterIndexPath) || !fs.exists(archiveIndexPath)) {
120       throw new IOException("Invalid path for the Har Filesystem. " +
121           "No index file in " + harPath);
122     }
123 
124     metadata = harMetaCache.get(uri);
125     if (metadata != null) {
126       FileStatus mStat = fs.getFileStatus(masterIndexPath);
127       FileStatus aStat = fs.getFileStatus(archiveIndexPath);
128       if (mStat.getModificationTime() != metadata.getMasterIndexTimestamp() ||
129           aStat.getModificationTime() != metadata.getArchiveIndexTimestamp()) {
130         // the archive has been overwritten since we last read it
131         // remove the entry from the meta data cache
132         metadata = null;
133         harMetaCache.remove(uri);
134       }
135     }
136     if (metadata == null) {
137       metadata = new HarMetaData(fs, masterIndexPath, archiveIndexPath);
138       metadata.parseMetaData();
139       harMetaCache.put(uri, metadata);
140     }
141   }
142 
143   // get the version of the filesystem from the masterindex file
144   // the version is currently not useful since its the first version
145   // of archives
getHarVersion()146   public int getHarVersion() throws IOException {
147     if (metadata != null) {
148       return metadata.getVersion();
149     }
150     else {
151       throw new IOException("Invalid meta data for the Har Filesystem");
152     }
153   }
154 
155   /*
156    * find the parent path that is the
157    * archive path in the path. The last
158    * path segment that ends with .har is
159    * the path that will be returned.
160    */
archivePath(Path p)161   private Path archivePath(Path p) {
162     Path retPath = null;
163     Path tmp = p;
164     for (int i=0; i< p.depth(); i++) {
165       if (tmp.toString().endsWith(".har")) {
166         retPath = tmp;
167         break;
168       }
169       tmp = tmp.getParent();
170     }
171     return retPath;
172   }
173 
174   /**
175    * decode the raw URI to get the underlying URI
176    * @param rawURI raw Har URI
177    * @return filtered URI of the underlying fileSystem
178    */
decodeHarURI(URI rawURI, Configuration conf)179   private URI decodeHarURI(URI rawURI, Configuration conf) throws IOException {
180     String tmpAuth = rawURI.getAuthority();
181     //we are using the default file
182     //system in the config
183     //so create a underlying uri and
184     //return it
185     if (tmpAuth == null) {
186       //create a path
187       return FileSystem.getDefaultUri(conf);
188     }
189     String host = rawURI.getHost();
190     if (host == null) {
191       throw new IOException("URI: " + rawURI
192           + " is an invalid Har URI since host==null."
193           + "  Expecting har://<scheme>-<host>/<path>.");
194     }
195     int i = host.indexOf('-');
196     if (i < 0) {
197       throw new IOException("URI: " + rawURI
198           + " is an invalid Har URI since '-' not found."
199           + "  Expecting har://<scheme>-<host>/<path>.");
200     }
201     final String underLyingScheme = host.substring(0, i);
202     i++;
203     final String underLyingHost = i == host.length()? null: host.substring(i);
204     int underLyingPort = rawURI.getPort();
205     String auth = (underLyingHost == null && underLyingPort == -1)?
206                   null:(underLyingHost+":"+underLyingPort);
207     URI tmp = null;
208     if (rawURI.getQuery() != null) {
209       // query component not allowed
210       throw new IOException("query component in Path not supported  " + rawURI);
211     }
212     try {
213       tmp = new URI(underLyingScheme, auth, rawURI.getPath(),
214             rawURI.getQuery(), rawURI.getFragment());
215     } catch (URISyntaxException e) {
216         // do nothing should not happen
217     }
218     return tmp;
219   }
220 
decodeString(String str)221   private static String decodeString(String str)
222     throws UnsupportedEncodingException {
223     return URLDecoder.decode(str, "UTF-8");
224   }
225 
decodeFileName(String fname)226   private String decodeFileName(String fname)
227     throws UnsupportedEncodingException {
228     int version = metadata.getVersion();
229     if (version == 2 || version == 3){
230       return decodeString(fname);
231     }
232     return fname;
233   }
234 
235   /**
236    * return the top level archive.
237    */
getWorkingDirectory()238   public Path getWorkingDirectory() {
239     return new Path(uri.toString());
240   }
241 
242   /**
243    * Create a har specific auth
244    * har-underlyingfs:port
245    * @param underLyingURI the uri of underlying
246    * filesystem
247    * @return har specific auth
248    */
getHarAuth(URI underLyingUri)249   private String getHarAuth(URI underLyingUri) {
250     String auth = underLyingUri.getScheme() + "-";
251     if (underLyingUri.getHost() != null) {
252       auth += underLyingUri.getHost() + ":";
253       if (underLyingUri.getPort() != -1) {
254         auth +=  underLyingUri.getPort();
255       }
256     }
257     else {
258       auth += ":";
259     }
260     return auth;
261   }
262 
263   /**
264    * Returns the uri of this filesystem.
265    * The uri is of the form
266    * har://underlyingfsschema-host:port/pathintheunderlyingfs
267    */
268   @Override
getUri()269   public URI getUri() {
270     return this.uri;
271   }
272 
273   @Override
getCanonicalServiceName()274   public String getCanonicalServiceName() {
275     return null;
276   }
277 
278   /**
279    * this method returns the path
280    * inside the har filesystem.
281    * this is relative path inside
282    * the har filesystem.
283    * @param path the fully qualified path in the har filesystem.
284    * @return relative path in the filesystem.
285    */
getPathInHar(Path path)286   private Path getPathInHar(Path path) {
287     Path harPath = new Path(path.toUri().getPath());
288     if (archivePath.compareTo(harPath) == 0)
289       return new Path(Path.SEPARATOR);
290     Path tmp = new Path(harPath.getName());
291     Path parent = harPath.getParent();
292     while (!(parent.compareTo(archivePath) == 0)) {
293       if (parent.toString().equals(Path.SEPARATOR)) {
294         tmp = null;
295         break;
296       }
297       tmp = new Path(parent.getName(), tmp);
298       parent = parent.getParent();
299     }
300     if (tmp != null)
301       tmp = new Path(Path.SEPARATOR, tmp);
302     return tmp;
303   }
304 
305   //the relative path of p. basically
306   // getting rid of /. Parsing and doing
307   // string manipulation is not good - so
308   // just use the path api to do it.
makeRelative(String initial, Path p)309   private Path makeRelative(String initial, Path p) {
310     String scheme = this.uri.getScheme();
311     String authority = this.uri.getAuthority();
312     Path root = new Path(Path.SEPARATOR);
313     if (root.compareTo(p) == 0)
314       return new Path(scheme, authority, initial);
315     Path retPath = new Path(p.getName());
316     Path parent = p.getParent();
317     for (int i=0; i < p.depth()-1; i++) {
318       retPath = new Path(parent.getName(), retPath);
319       parent = parent.getParent();
320     }
321     return new Path(new Path(scheme, authority, initial),
322       retPath.toString());
323   }
324 
325   /* this makes a path qualified in the har filesystem
326    * (non-Javadoc)
327    * @see org.apache.hadoop.fs.FilterFileSystem#makeQualified(
328    * org.apache.hadoop.fs.Path)
329    */
330   @Override
makeQualified(Path path)331   public Path makeQualified(Path path) {
332     // make sure that we just get the
333     // path component
334     Path fsPath = path;
335     if (!path.isAbsolute()) {
336       fsPath = new Path(archivePath, path);
337     }
338 
339     URI tmpURI = fsPath.toUri();
340     //change this to Har uri
341     return new Path(uri.getScheme(), harAuth, tmpURI.getPath());
342   }
343 
344   /**
345    * Fix offset and length of block locations.
346    * Note that this method modifies the original array.
347    * @param locations block locations of har part file
348    * @param start the start of the desired range in the contained file
349    * @param len the length of the desired range
350    * @param fileOffsetInHar the offset of the desired file in the har part file
351    * @return block locations with fixed offset and length
352    */
fixBlockLocations(BlockLocation[] locations, long start, long len, long fileOffsetInHar)353   static BlockLocation[] fixBlockLocations(BlockLocation[] locations,
354                                           long start,
355                                           long len,
356                                           long fileOffsetInHar) {
357     // offset 1 past last byte of desired range
358     long end = start + len;
359 
360     for (BlockLocation location : locations) {
361       // offset of part block relative to beginning of desired file
362       // (may be negative if file starts in this part block)
363       long harBlockStart = location.getOffset() - fileOffsetInHar;
364       // offset 1 past last byte of har block relative to beginning of
365       // desired file
366       long harBlockEnd = harBlockStart + location.getLength();
367 
368       if (start > harBlockStart) {
369         // desired range starts after beginning of this har block
370         // fix offset to beginning of relevant range (relative to desired file)
371         location.setOffset(start);
372         // fix length to relevant portion of har block
373         location.setLength(location.getLength() - (start - harBlockStart));
374       } else {
375         // desired range includes beginning of this har block
376         location.setOffset(harBlockStart);
377       }
378 
379       if (harBlockEnd > end) {
380         // range ends before end of this har block
381         // fix length to remove irrelevant portion at the end
382         location.setLength(location.getLength() - (harBlockEnd - end));
383       }
384     }
385 
386     return locations;
387   }
388 
389   /**
390    * Get block locations from the underlying fs and fix their
391    * offsets and lengths.
392    * @param file the input filestatus to get block locations
393    * @param start the start of the desired range in the contained file
394    * @param len the length of the desired range
395    * @return block locations for this segment of file
396    * @throws IOException
397    */
398   @Override
getFileBlockLocations(FileStatus file, long start, long len)399   public BlockLocation[] getFileBlockLocations(FileStatus file, long start,
400                                                long len) throws IOException {
401     HarStatus hstatus = getFileHarStatus(file.getPath());
402     Path partPath = new Path(archivePath, hstatus.getPartName());
403     FileStatus partStatus = metadata.getPartFileStatus(partPath);
404 
405     // get all part blocks that overlap with the desired file blocks
406     BlockLocation[] locations =
407       fs.getFileBlockLocations(partStatus,
408                                hstatus.getStartIndex() + start, len);
409 
410     return fixBlockLocations(locations, start, len, hstatus.getStartIndex());
411   }
412 
413   /**
414    * the hash of the path p inside iniside
415    * the filesystem
416    * @param p the path in the harfilesystem
417    * @return the hash code of the path.
418    */
getHarHash(Path p)419   public static int getHarHash(Path p) {
420     return (p.toString().hashCode() & 0x7fffffff);
421   }
422 
423   static class Store {
Store()424     public Store() {
425       begin = end = startHash = endHash = 0;
426     }
Store(long begin, long end, int startHash, int endHash)427     public Store(long begin, long end, int startHash, int endHash) {
428       this.begin = begin;
429       this.end = end;
430       this.startHash = startHash;
431       this.endHash = endHash;
432     }
433     public long begin;
434     public long end;
435     public int startHash;
436     public int endHash;
437   }
438 
439   /**
440    * Get filestatuses of all the children of a given directory. This just reads
441    * through index file and reads line by line to get all statuses for children
442    * of a directory. Its a brute force way of getting all such filestatuses
443    *
444    * @param parent
445    *          the parent path directory
446    * @param statuses
447    *          the list to add the children filestatuses to
448    * @param children
449    *          the string list of children for this parent
450    * @param archiveIndexStat
451    *          the archive index filestatus
452    */
fileStatusesInIndex(HarStatus parent, List<FileStatus> statuses, List<String> children)453   private void fileStatusesInIndex(HarStatus parent, List<FileStatus> statuses,
454       List<String> children) throws IOException {
455     String parentString = parent.getName();
456     if (!parentString.endsWith(Path.SEPARATOR)){
457         parentString += Path.SEPARATOR;
458     }
459     Path harPath = new Path(parentString);
460     int harlen = harPath.depth();
461     final Map<String, FileStatus> cache = new TreeMap<String, FileStatus>();
462 
463     for (HarStatus hstatus : metadata.archive.values()) {
464       String child = hstatus.getName();
465       if ((child.startsWith(parentString))) {
466         Path thisPath = new Path(child);
467         if (thisPath.depth() == harlen + 1) {
468           statuses.add(toFileStatus(hstatus, cache));
469         }
470       }
471     }
472   }
473 
474   /**
475    * Combine the status stored in the index and the underlying status.
476    * @param h status stored in the index
477    * @param cache caching the underlying file statuses
478    * @return the combined file status
479    * @throws IOException
480    */
toFileStatus(HarStatus h, Map<String, FileStatus> cache)481   private FileStatus toFileStatus(HarStatus h,
482       Map<String, FileStatus> cache) throws IOException {
483     FileStatus underlying = null;
484     if (cache != null) {
485       underlying = cache.get(h.partName);
486     }
487     if (underlying == null) {
488       final Path p = h.isDir? archivePath: new Path(archivePath, h.partName);
489       underlying = fs.getFileStatus(p);
490       if (cache != null) {
491         cache.put(h.partName, underlying);
492       }
493     }
494 
495     long modTime = 0;
496     int version = metadata.getVersion();
497     if (version < 3) {
498       modTime = underlying.getModificationTime();
499     } else if (version == 3) {
500       modTime = h.getModificationTime();
501     }
502 
503     return new FileStatus(
504         h.isDir()? 0L: h.getLength(),
505         h.isDir(),
506         underlying.getReplication(),
507         underlying.getBlockSize(),
508         modTime,
509         underlying.getAccessTime(),
510         underlying.getPermission(),
511         underlying.getOwner(),
512         underlying.getGroup(),
513         makeRelative(this.uri.getPath(), new Path(h.name)));
514   }
515 
516   // a single line parser for hadoop archives status
517   // stored in a single line in the index files
518   // the format is of the form
519   // filename "dir"/"file" partFileName startIndex length
520   // <space seperated children>
521   private class HarStatus {
522     boolean isDir;
523     String name;
524     List<String> children;
525     String partName;
526     long startIndex;
527     long length;
528     long modificationTime = 0;
529 
HarStatus(String harString)530     public HarStatus(String harString) throws UnsupportedEncodingException {
531       String[] splits = harString.split(" ");
532       this.name = decodeFileName(splits[0]);
533       this.isDir = "dir".equals(splits[1]) ? true: false;
534       // this is equal to "none" if its a directory
535       this.partName = splits[2];
536       this.startIndex = Long.parseLong(splits[3]);
537       this.length = Long.parseLong(splits[4]);
538 
539       int version = metadata.getVersion();
540       String[] propSplits = null;
541       // propSplits is used to retrieve the metainformation that Har versions
542       // 1 & 2 missed (modification time, permission, owner group).
543       // These fields are stored in an encoded string placed in different
544       // locations depending on whether it's a file or directory entry.
545       // If it's a directory, the string will be placed at the partName
546       // location (directories have no partName because they don't have data
547       // to be stored). This is done because the number of fields in a
548       // directory entry is unbounded (all children are listed at the end)
549       // If it's a file, the string will be the last field.
550       if (isDir) {
551         if (version == 3){
552           propSplits = decodeString(this.partName).split(" ");
553         }
554         children = new ArrayList<String>();
555         for (int i = 5; i < splits.length; i++) {
556           children.add(decodeFileName(splits[i]));
557         }
558       } else if (version == 3) {
559         propSplits = decodeString(splits[5]).split(" ");
560       }
561 
562       if (propSplits != null && propSplits.length >= 4) {
563         modificationTime = Long.parseLong(propSplits[0]);
564         // the fields below are stored in the file but are currently not used
565         // by HarFileSystem
566         // permission = new FsPermission(Short.parseShort(propSplits[1]));
567         // owner = decodeString(propSplits[2]);
568         // group = decodeString(propSplits[3]);
569       }
570     }
isDir()571     public boolean isDir() {
572       return isDir;
573     }
574 
getName()575     public String getName() {
576       return name;
577     }
578 
getChildren()579     public List<String> getChildren() {
580       return children;
581     }
getFileName()582     public String getFileName() {
583       return name;
584     }
getPartName()585     public String getPartName() {
586       return partName;
587     }
getStartIndex()588     public long getStartIndex() {
589       return startIndex;
590     }
getLength()591     public long getLength() {
592       return length;
593     }
getModificationTime()594     public long getModificationTime() {
595       return modificationTime;
596     }
597   }
598 
599   /**
600    * return the filestatus of files in har archive.
601    * The permission returned are that of the archive
602    * index files. The permissions are not persisted
603    * while creating a hadoop archive.
604    * @param f the path in har filesystem
605    * @return filestatus.
606    * @throws IOException
607    */
608   @Override
getFileStatus(Path f)609   public FileStatus getFileStatus(Path f) throws IOException {
610     HarStatus hstatus = getFileHarStatus(f);
611     return toFileStatus(hstatus, null);
612   }
613 
getFileHarStatus(Path f)614   private HarStatus getFileHarStatus(Path f) throws IOException {
615     // get the fs DataInputStream for the underlying file
616     // look up the index.
617     Path p = makeQualified(f);
618     Path harPath = getPathInHar(p);
619     if (harPath == null) {
620       throw new IOException("Invalid file name: " + f + " in " + uri);
621     }
622     HarStatus hstatus = metadata.archive.get(harPath);
623     if (hstatus == null) {
624       throw new FileNotFoundException("File: " +  f + " does not exist in " + uri);
625     }
626     return hstatus;
627   }
628 
629   /**
630    * @return null since no checksum algorithm is implemented.
631    */
getFileChecksum(Path f)632   public FileChecksum getFileChecksum(Path f) {
633     return null;
634   }
635 
636   /**
637    * Returns a har input stream which fakes end of
638    * file. It reads the index files to get the part
639    * file name and the size and start of the file.
640    */
641   @Override
open(Path f, int bufferSize)642   public FSDataInputStream open(Path f, int bufferSize) throws IOException {
643     // get the fs DataInputStream for the underlying file
644     HarStatus hstatus = getFileHarStatus(f);
645     // we got it.. woo hooo!!!
646     if (hstatus.isDir()) {
647       throw new FileNotFoundException(f + " : not a file in " +
648                 archivePath);
649     }
650     return new HarFSDataInputStream(fs, new Path(archivePath,
651         hstatus.getPartName()),
652         hstatus.getStartIndex(), hstatus.getLength(), bufferSize);
653   }
654 
655   /*
656    * create throws an exception in Har filesystem.
657    * The archive once created cannot be changed.
658    */
create(Path f, int bufferSize)659   public FSDataOutputStream create(Path f, int bufferSize)
660                                     throws IOException {
661     throw new IOException("Har: Create not allowed");
662   }
663 
create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress)664   public FSDataOutputStream create(Path f,
665       FsPermission permission,
666       boolean overwrite,
667       int bufferSize,
668       short replication,
669       long blockSize,
670       Progressable progress) throws IOException {
671     throw new IOException("Har: create not allowed.");
672   }
673 
674   @Override
close()675   public void close() throws IOException {
676     if (fs != null) {
677       try {
678         fs.close();
679       } catch(IOException ie) {
680         //this might already be closed
681         // ignore
682       }
683     }
684   }
685 
686   /**
687    * Not implemented.
688    */
689   @Override
setReplication(Path src, short replication)690   public boolean setReplication(Path src, short replication) throws IOException{
691     throw new IOException("Har: setreplication not allowed");
692   }
693 
694   /**
695    * Not implemented.
696    */
697   @Override
delete(Path f, boolean recursive)698   public boolean delete(Path f, boolean recursive) throws IOException {
699     throw new IOException("Har: delete not allowed");
700   }
701 
702   /**
703    * liststatus returns the children of a directory
704    * after looking up the index files.
705    */
706   @Override
listStatus(Path f)707   public FileStatus[] listStatus(Path f) throws IOException {
708     //need to see if the file is an index in file
709     //get the filestatus of the archive directory
710     // we will create fake filestatuses to return
711     // to the client
712     List<FileStatus> statuses = new ArrayList<FileStatus>();
713     Path tmpPath = makeQualified(f);
714     Path harPath = getPathInHar(tmpPath);
715     HarStatus hstatus = metadata.archive.get(harPath);
716     if (hstatus == null) {
717       throw new FileNotFoundException("File " + f + " not found in " + archivePath);
718     }
719     if (hstatus.isDir()) {
720       fileStatusesInIndex(hstatus, statuses, hstatus.children);
721     } else {
722       statuses.add(toFileStatus(hstatus, null));
723     }
724 
725     return statuses.toArray(new FileStatus[statuses.size()]);
726   }
727 
728   /**
729    * return the top level archive path.
730    */
getHomeDirectory()731   public Path getHomeDirectory() {
732     return new Path(uri.toString());
733   }
734 
setWorkingDirectory(Path newDir)735   public void setWorkingDirectory(Path newDir) {
736     //does nothing.
737   }
738 
739   /**
740    * not implemented.
741    */
mkdirs(Path f, FsPermission permission)742   public boolean mkdirs(Path f, FsPermission permission) throws IOException {
743     throw new IOException("Har: mkdirs not allowed");
744   }
745 
746   /**
747    * not implemented.
748    */
copyFromLocalFile(boolean delSrc, Path src, Path dst)749   public void copyFromLocalFile(boolean delSrc, Path src, Path dst) throws
750         IOException {
751     throw new IOException("Har: copyfromlocalfile not allowed");
752   }
753 
754   /**
755    * copies the file in the har filesystem to a local file.
756    */
copyToLocalFile(boolean delSrc, Path src, Path dst)757   public void copyToLocalFile(boolean delSrc, Path src, Path dst)
758     throws IOException {
759     FileUtil.copy(this, src, getLocal(getConf()), dst, false, getConf());
760   }
761 
762   /**
763    * not implemented.
764    */
startLocalOutput(Path fsOutputFile, Path tmpLocalFile)765   public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
766     throws IOException {
767     throw new IOException("Har: startLocalOutput not allowed");
768   }
769 
770   /**
771    * not implemented.
772    */
completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)773   public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
774     throws IOException {
775     throw new IOException("Har: completeLocalOutput not allowed");
776   }
777 
778   /**
779    * not implemented.
780    */
setOwner(Path p, String username, String groupname)781   public void setOwner(Path p, String username, String groupname)
782     throws IOException {
783     throw new IOException("Har: setowner not allowed");
784   }
785 
786   /**
787    * Not implemented.
788    */
setPermission(Path p, FsPermission permisssion)789   public void setPermission(Path p, FsPermission permisssion)
790     throws IOException {
791     throw new IOException("Har: setPermission not allowed");
792   }
793 
794   /**
795    * Hadoop archives input stream. This input stream fakes EOF
796    * since archive files are part of bigger part files.
797    */
798   private static class HarFSDataInputStream extends FSDataInputStream {
799     /**
800      * Create an input stream that fakes all the reads/positions/seeking.
801      */
802     private static class HarFsInputStream extends FSInputStream {
803       private long position, start, end;
804       //The underlying data input stream that the
805       // underlying filesystem will return.
806       private FSDataInputStream underLyingStream;
807       //one byte buffer
808       private byte[] oneBytebuff = new byte[1];
HarFsInputStream(FileSystem fs, Path path, long start, long length, int bufferSize)809       HarFsInputStream(FileSystem fs, Path path, long start,
810           long length, int bufferSize) throws IOException {
811         underLyingStream = fs.open(path, bufferSize);
812         underLyingStream.seek(start);
813         // the start of this file in the part file
814         this.start = start;
815         // the position pointer in the part file
816         this.position = start;
817         // the end pointer in the part file
818         this.end = start + length;
819       }
820 
available()821       public synchronized int available() throws IOException {
822         long remaining = end - underLyingStream.getPos();
823         if (remaining > (long)Integer.MAX_VALUE) {
824           return Integer.MAX_VALUE;
825         }
826         return (int) remaining;
827       }
828 
close()829       public synchronized  void close() throws IOException {
830         underLyingStream.close();
831         super.close();
832       }
833 
834       //not implemented
835       @Override
mark(int readLimit)836       public void mark(int readLimit) {
837         // do nothing
838       }
839 
840       /**
841        * reset is not implemented
842        */
reset()843       public void reset() throws IOException {
844         throw new IOException("reset not implemented.");
845       }
846 
read()847       public synchronized int read() throws IOException {
848         int ret = read(oneBytebuff, 0, 1);
849         return (ret <= 0) ? -1: (oneBytebuff[0] & 0xff);
850       }
851 
read(byte[] b)852       public synchronized int read(byte[] b) throws IOException {
853         int ret = read(b, 0, b.length);
854         if (ret != -1) {
855           position += ret;
856         }
857         return ret;
858       }
859 
860       /**
861        *
862        */
read(byte[] b, int offset, int len)863       public synchronized int read(byte[] b, int offset, int len)
864         throws IOException {
865         int newlen = len;
866         int ret = -1;
867         if (position + len > end) {
868           newlen = (int) (end - position);
869         }
870         // end case
871         if (newlen == 0)
872           return ret;
873         ret = underLyingStream.read(b, offset, newlen);
874         position += ret;
875         return ret;
876       }
877 
skip(long n)878       public synchronized long skip(long n) throws IOException {
879         long tmpN = n;
880         if (tmpN > 0) {
881           if (position + tmpN > end) {
882             tmpN = end - position;
883           }
884           underLyingStream.seek(tmpN + position);
885           position += tmpN;
886           return tmpN;
887         }
888         return (tmpN < 0)? -1 : 0;
889       }
890 
getPos()891       public synchronized long getPos() throws IOException {
892         return (position - start);
893       }
894 
seek(long pos)895       public synchronized void seek(long pos) throws IOException {
896         if (pos < 0 || (start + pos > end)) {
897           throw new IOException("Failed to seek: EOF");
898         }
899         position = start + pos;
900         underLyingStream.seek(position);
901       }
902 
seekToNewSource(long targetPos)903       public boolean seekToNewSource(long targetPos) throws IOException {
904         //do not need to implement this
905         // hdfs in itself does seektonewsource
906         // while reading.
907         return false;
908       }
909 
910       /**
911        * implementing position readable.
912        */
read(long pos, byte[] b, int offset, int length)913       public int read(long pos, byte[] b, int offset, int length)
914       throws IOException {
915         int nlength = length;
916         if (start + nlength + pos > end) {
917           nlength = (int) (end - (start + pos));
918         }
919         return underLyingStream.read(pos + start , b, offset, nlength);
920       }
921 
922       /**
923        * position readable again.
924        */
readFully(long pos, byte[] b, int offset, int length)925       public void readFully(long pos, byte[] b, int offset, int length)
926       throws IOException {
927         if (start + length + pos > end) {
928           throw new IOException("Not enough bytes to read.");
929         }
930         underLyingStream.readFully(pos + start, b, offset, length);
931       }
932 
readFully(long pos, byte[] b)933       public void readFully(long pos, byte[] b) throws IOException {
934           readFully(pos, b, 0, b.length);
935       }
936 
937     }
938 
939     /**
940      * constructors for har input stream.
941      * @param fs the underlying filesystem
942      * @param p The path in the underlying filesystem
943      * @param start the start position in the part file
944      * @param length the length of valid data in the part file
945      * @param bufsize the buffer size
946      * @throws IOException
947      */
HarFSDataInputStream(FileSystem fs, Path p, long start, long length, int bufsize)948     public HarFSDataInputStream(FileSystem fs, Path  p, long start,
949         long length, int bufsize) throws IOException {
950         super(new HarFsInputStream(fs, p, start, length, bufsize));
951     }
952 
953     /**
954      * constructor for har input stream.
955      * @param fs the underlying filesystem
956      * @param p the path in the underlying file system
957      * @param start the start position in the part file
958      * @param length the length of valid data in the part file.
959      * @throws IOException
960      */
HarFSDataInputStream(FileSystem fs, Path p, long start, long length)961     public HarFSDataInputStream(FileSystem fs, Path  p, long start, long length)
962       throws IOException {
963         super(new HarFsInputStream(fs, p, start, length, 0));
964     }
965   }
966 
967   private class HarMetaData {
968     private FileSystem fs;
969     private int version;
970     // the masterIndex of the archive
971     private Path masterIndexPath;
972     // the index file
973     private Path archiveIndexPath;
974 
975     private long masterIndexTimestamp;
976     private long archiveIndexTimestamp;
977 
978     List<Store> stores = new ArrayList<Store>();
979     Map<Path, HarStatus> archive = new HashMap<Path, HarStatus>();
980     private Map<Path, FileStatus> partFileStatuses = new HashMap<Path, FileStatus>();
981 
HarMetaData(FileSystem fs, Path masterIndexPath, Path archiveIndexPath)982     public HarMetaData(FileSystem fs, Path masterIndexPath, Path archiveIndexPath) {
983       this.fs = fs;
984       this.masterIndexPath = masterIndexPath;
985       this.archiveIndexPath = archiveIndexPath;
986     }
987 
getPartFileStatus(Path partPath)988     public FileStatus getPartFileStatus(Path partPath) throws IOException {
989       FileStatus status;
990       status = partFileStatuses.get(partPath);
991       if (status == null) {
992         status = fs.getFileStatus(partPath);
993         partFileStatuses.put(partPath, status);
994       }
995       return status;
996     }
997 
getMasterIndexTimestamp()998     public long getMasterIndexTimestamp() {
999       return masterIndexTimestamp;
1000     }
1001 
getArchiveIndexTimestamp()1002     public long getArchiveIndexTimestamp() {
1003       return archiveIndexTimestamp;
1004     }
1005 
getVersion()1006     private int getVersion() {
1007       return version;
1008     }
1009 
parseMetaData()1010     private void parseMetaData() throws IOException {
1011       FSDataInputStream in = fs.open(masterIndexPath);
1012       FileStatus masterStat = fs.getFileStatus(masterIndexPath);
1013       masterIndexTimestamp = masterStat.getModificationTime();
1014       LineReader lin = new LineReader(in, getConf());
1015       Text line = new Text();
1016       long read = lin.readLine(line);
1017 
1018      // the first line contains the version of the index file
1019       String versionLine = line.toString();
1020       String[] arr = versionLine.split(" ");
1021       version = Integer.parseInt(arr[0]);
1022       // make it always backwards-compatible
1023       if (this.version > HarFileSystem.VERSION) {
1024         throw new IOException("Invalid version " +
1025             this.version + " expected " + HarFileSystem.VERSION);
1026       }
1027 
1028       // each line contains a hashcode range and the index file name
1029       String[] readStr = null;
1030       while(read < masterStat.getLen()) {
1031         int b = lin.readLine(line);
1032         read += b;
1033         readStr = line.toString().split(" ");
1034         int startHash = Integer.parseInt(readStr[0]);
1035         int endHash  = Integer.parseInt(readStr[1]);
1036         stores.add(new Store(Long.parseLong(readStr[2]),
1037             Long.parseLong(readStr[3]), startHash,
1038             endHash));
1039         line.clear();
1040       }
1041       try {
1042         // close the master index
1043         lin.close();
1044       } catch(IOException io){
1045         // do nothing just a read.
1046       }
1047 
1048       FSDataInputStream aIn = fs.open(archiveIndexPath);
1049       FileStatus archiveStat = fs.getFileStatus(archiveIndexPath);
1050       archiveIndexTimestamp = archiveStat.getModificationTime();
1051       LineReader aLin;
1052 
1053       // now start reading the real index file
1054       for (Store s: stores) {
1055         read = 0;
1056         aIn.seek(s.begin);
1057         aLin = new LineReader(aIn, getConf());
1058         while (read + s.begin < s.end) {
1059           int tmp = aLin.readLine(line);
1060           read += tmp;
1061           String lineFeed = line.toString();
1062           String[] parsed = lineFeed.split(" ");
1063           parsed[0] = decodeFileName(parsed[0]);
1064           archive.put(new Path(parsed[0]), new HarStatus(lineFeed));
1065           line.clear();
1066         }
1067       }
1068       try {
1069         // close the archive index
1070         aIn.close();
1071       } catch(IOException io) {
1072         // do nothing just a read.
1073       }
1074     }
1075   }
1076 }
1077