1 /**
2  *
3  * Licensed under the Apache License, Version 2.0
4  * (the "License"); you may not use this file except in compliance with
5  * the License. You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12  * implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  *
15  * @author: Sriram Rao (Kosmix Corp.)
16  *
17  * Implements the Hadoop FS interfaces to allow applications to store
18  *files in Kosmos File System (KFS).
19  */
20 
21 package org.apache.hadoop.fs.kfs;
22 
23 import java.io.*;
24 import java.net.*;
25 
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.fs.FSDataInputStream;
28 import org.apache.hadoop.fs.FSDataOutputStream;
29 import org.apache.hadoop.fs.FileSystem;
30 import org.apache.hadoop.fs.FileStatus;
31 import org.apache.hadoop.fs.FileUtil;
32 import org.apache.hadoop.fs.Path;
33 import org.apache.hadoop.fs.permission.FsPermission;
34 import org.apache.hadoop.util.Progressable;
35 import org.apache.hadoop.fs.BlockLocation;
36 
37 /**
38  * A FileSystem backed by KFS.
39  *
40  */
41 
42 public class KosmosFileSystem extends FileSystem {
43 
44     private FileSystem localFs;
45     private IFSImpl kfsImpl = null;
46     private URI uri;
47     private Path workingDir = new Path("/");
48 
KosmosFileSystem()49     public KosmosFileSystem() {
50 
51     }
52 
KosmosFileSystem(IFSImpl fsimpl)53     KosmosFileSystem(IFSImpl fsimpl) {
54         this.kfsImpl = fsimpl;
55     }
56 
getUri()57     public URI getUri() {
58 	return uri;
59     }
60 
initialize(URI uri, Configuration conf)61     public void initialize(URI uri, Configuration conf) throws IOException {
62       super.initialize(uri, conf);
63       try {
64         if (kfsImpl == null) {
65           if (uri.getHost() == null) {
66             kfsImpl = new KFSImpl(conf.get("fs.kfs.metaServerHost", ""),
67                                   conf.getInt("fs.kfs.metaServerPort", -1),
68                                   statistics);
69           } else {
70             kfsImpl = new KFSImpl(uri.getHost(), uri.getPort(), statistics);
71           }
72         }
73 
74         this.localFs = FileSystem.getLocal(conf);
75         this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
76         this.workingDir = new Path("/user", System.getProperty("user.name")
77                                    ).makeQualified(this);
78         setConf(conf);
79 
80       } catch (Exception e) {
81         e.printStackTrace();
82         System.out.println("Unable to initialize KFS");
83         System.exit(-1);
84       }
85     }
86 
87     @Deprecated
getName()88     public String getName() {
89 	return getUri().toString();
90     }
91 
getWorkingDirectory()92     public Path getWorkingDirectory() {
93 	return workingDir;
94     }
95 
setWorkingDirectory(Path dir)96     public void setWorkingDirectory(Path dir) {
97 	workingDir = makeAbsolute(dir);
98     }
99 
makeAbsolute(Path path)100     private Path makeAbsolute(Path path) {
101 	if (path.isAbsolute()) {
102 	    return path;
103 	}
104 	return new Path(workingDir, path);
105     }
106 
mkdirs(Path path, FsPermission permission )107     public boolean mkdirs(Path path, FsPermission permission
108         ) throws IOException {
109 	Path absolute = makeAbsolute(path);
110         String srep = absolute.toUri().getPath();
111 
112 	int res;
113 
114 	// System.out.println("Calling mkdirs on: " + srep);
115 
116 	res = kfsImpl.mkdirs(srep);
117 
118 	return res == 0;
119     }
120 
121     @Deprecated
isDirectory(Path path)122     public boolean isDirectory(Path path) throws IOException {
123 	Path absolute = makeAbsolute(path);
124         String srep = absolute.toUri().getPath();
125 
126 	// System.out.println("Calling isdir on: " + srep);
127 
128         return kfsImpl.isDirectory(srep);
129     }
130 
131     @Deprecated
isFile(Path path)132     public boolean isFile(Path path) throws IOException {
133 	Path absolute = makeAbsolute(path);
134         String srep = absolute.toUri().getPath();
135         return kfsImpl.isFile(srep);
136     }
137 
listStatus(Path path)138     public FileStatus[] listStatus(Path path) throws IOException {
139         Path absolute = makeAbsolute(path);
140         String srep = absolute.toUri().getPath();
141 
142         if (kfsImpl.isFile(srep))
143                 return new FileStatus[] { getFileStatus(path) } ;
144 
145         return kfsImpl.readdirplus(absolute);
146     }
147 
getFileStatus(Path path)148     public FileStatus getFileStatus(Path path) throws IOException {
149 	Path absolute = makeAbsolute(path);
150         String srep = absolute.toUri().getPath();
151         if (!kfsImpl.exists(srep)) {
152           throw new FileNotFoundException("File " + path + " does not exist.");
153         }
154         if (kfsImpl.isDirectory(srep)) {
155             // System.out.println("Status of path: " + path + " is dir");
156             return new FileStatus(0, true, 1, 0, kfsImpl.getModificationTime(srep),
157                                   path.makeQualified(this));
158         } else {
159             // System.out.println("Status of path: " + path + " is file");
160             return new FileStatus(kfsImpl.filesize(srep), false,
161                                   kfsImpl.getReplication(srep),
162                                   getDefaultBlockSize(),
163                                   kfsImpl.getModificationTime(srep),
164                                   path.makeQualified(this));
165         }
166     }
167 
168     /** This optional operation is not yet supported. */
append(Path f, int bufferSize, Progressable progress)169     public FSDataOutputStream append(Path f, int bufferSize,
170         Progressable progress) throws IOException {
171       throw new IOException("Not supported");
172     }
173 
create(Path file, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress)174     public FSDataOutputStream create(Path file, FsPermission permission,
175                                      boolean overwrite, int bufferSize,
176 				     short replication, long blockSize, Progressable progress)
177 	throws IOException {
178 
179         if (exists(file)) {
180             if (overwrite) {
181                 delete(file);
182             } else {
183                 throw new IOException("File already exists: " + file);
184             }
185         }
186 
187 	Path parent = file.getParent();
188 	if (parent != null && !mkdirs(parent)) {
189 	    throw new IOException("Mkdirs failed to create " + parent);
190 	}
191 
192         Path absolute = makeAbsolute(file);
193         String srep = absolute.toUri().getPath();
194 
195         return kfsImpl.create(srep, replication, bufferSize);
196     }
197 
open(Path path, int bufferSize)198     public FSDataInputStream open(Path path, int bufferSize) throws IOException {
199         if (!exists(path))
200             throw new IOException("File does not exist: " + path);
201 
202         Path absolute = makeAbsolute(path);
203         String srep = absolute.toUri().getPath();
204 
205         return kfsImpl.open(srep, bufferSize);
206     }
207 
rename(Path src, Path dst)208     public boolean rename(Path src, Path dst) throws IOException {
209 	Path absoluteS = makeAbsolute(src);
210         String srepS = absoluteS.toUri().getPath();
211 	Path absoluteD = makeAbsolute(dst);
212         String srepD = absoluteD.toUri().getPath();
213 
214         // System.out.println("Calling rename on: " + srepS + " -> " + srepD);
215 
216         return kfsImpl.rename(srepS, srepD) == 0;
217     }
218 
219     // recursively delete the directory and its contents
delete(Path path, boolean recursive)220     public boolean delete(Path path, boolean recursive) throws IOException {
221       Path absolute = makeAbsolute(path);
222       String srep = absolute.toUri().getPath();
223       if (kfsImpl.isFile(srep))
224         return kfsImpl.remove(srep) == 0;
225 
226       FileStatus[] dirEntries = listStatus(absolute);
227       if ((!recursive) && (dirEntries != null) &&
228             (dirEntries.length != 0)) {
229         throw new IOException("Directory " + path.toString() +
230         " is not empty.");
231       }
232       if (dirEntries != null) {
233         for (int i = 0; i < dirEntries.length; i++) {
234           delete(new Path(absolute, dirEntries[i].getPath()), recursive);
235         }
236       }
237       return kfsImpl.rmdir(srep) == 0;
238     }
239 
240     @Deprecated
delete(Path path)241     public boolean delete(Path path) throws IOException {
242       return delete(path, true);
243     }
244 
245     @Deprecated
getLength(Path path)246     public long getLength(Path path) throws IOException {
247 	Path absolute = makeAbsolute(path);
248         String srep = absolute.toUri().getPath();
249         return kfsImpl.filesize(srep);
250     }
251 
252     @Deprecated
getReplication(Path path)253     public short getReplication(Path path) throws IOException {
254 	Path absolute = makeAbsolute(path);
255         String srep = absolute.toUri().getPath();
256         return kfsImpl.getReplication(srep);
257     }
258 
getDefaultReplication()259     public short getDefaultReplication() {
260 	return 3;
261     }
262 
setReplication(Path path, short replication)263     public boolean setReplication(Path path, short replication)
264 	throws IOException {
265 
266 	Path absolute = makeAbsolute(path);
267         String srep = absolute.toUri().getPath();
268 
269         int res = kfsImpl.setReplication(srep, replication);
270         return res >= 0;
271     }
272 
273     // 64MB is the KFS block size
274 
getDefaultBlockSize()275     public long getDefaultBlockSize() {
276 	return 1 << 26;
277     }
278 
279     @Deprecated
lock(Path path, boolean shared)280     public void lock(Path path, boolean shared) throws IOException {
281 
282     }
283 
284     @Deprecated
release(Path path)285     public void release(Path path) throws IOException {
286 
287     }
288 
289     /**
290      * Return null if the file doesn't exist; otherwise, get the
291      * locations of the various chunks of the file file from KFS.
292      */
293     @Override
getFileBlockLocations(FileStatus file, long start, long len)294     public BlockLocation[] getFileBlockLocations(FileStatus file, long start,
295         long len) throws IOException {
296 
297       if (file == null) {
298         return null;
299       }
300       String srep = makeAbsolute(file.getPath()).toUri().getPath();
301       String[][] hints = kfsImpl.getDataLocation(srep, start, len);
302       if (hints == null) {
303         return null;
304       }
305       BlockLocation[] result = new BlockLocation[hints.length];
306       long blockSize = getDefaultBlockSize();
307       long length = len;
308       long blockStart = start;
309       for(int i=0; i < result.length; ++i) {
310         result[i] = new BlockLocation(null, hints[i], blockStart,
311                                       length < blockSize ? length : blockSize);
312         blockStart += blockSize;
313         length -= blockSize;
314       }
315       return result;
316     }
317 
copyFromLocalFile(boolean delSrc, Path src, Path dst)318     public void copyFromLocalFile(boolean delSrc, Path src, Path dst) throws IOException {
319 	FileUtil.copy(localFs, src, this, dst, delSrc, getConf());
320     }
321 
copyToLocalFile(boolean delSrc, Path src, Path dst)322     public void copyToLocalFile(boolean delSrc, Path src, Path dst) throws IOException {
323 	FileUtil.copy(this, src, localFs, dst, delSrc, getConf());
324     }
325 
startLocalOutput(Path fsOutputFile, Path tmpLocalFile)326     public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
327 	throws IOException {
328 	return tmpLocalFile;
329     }
330 
completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)331     public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
332 	throws IOException {
333 	moveFromLocalFile(tmpLocalFile, fsOutputFile);
334     }
335 }
336