1 /** 2 * 3 * Licensed under the Apache License, Version 2.0 4 * (the "License"); you may not use this file except in compliance with 5 * the License. You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 12 * implied. See the License for the specific language governing 13 * permissions and limitations under the License. 14 * 15 * @author: Sriram Rao (Kosmix Corp.) 16 * 17 * Implements the Hadoop FS interfaces to allow applications to store 18 *files in Kosmos File System (KFS). 19 */ 20 21 package org.apache.hadoop.fs.kfs; 22 23 import java.io.*; 24 import java.net.*; 25 26 import org.apache.hadoop.conf.Configuration; 27 import org.apache.hadoop.fs.FSDataInputStream; 28 import org.apache.hadoop.fs.FSDataOutputStream; 29 import org.apache.hadoop.fs.FileSystem; 30 import org.apache.hadoop.fs.FileStatus; 31 import org.apache.hadoop.fs.FileUtil; 32 import org.apache.hadoop.fs.Path; 33 import org.apache.hadoop.fs.permission.FsPermission; 34 import org.apache.hadoop.util.Progressable; 35 import org.apache.hadoop.fs.BlockLocation; 36 37 /** 38 * A FileSystem backed by KFS. 39 * 40 */ 41 42 public class KosmosFileSystem extends FileSystem { 43 44 private FileSystem localFs; 45 private IFSImpl kfsImpl = null; 46 private URI uri; 47 private Path workingDir = new Path("/"); 48 KosmosFileSystem()49 public KosmosFileSystem() { 50 51 } 52 KosmosFileSystem(IFSImpl fsimpl)53 KosmosFileSystem(IFSImpl fsimpl) { 54 this.kfsImpl = fsimpl; 55 } 56 getUri()57 public URI getUri() { 58 return uri; 59 } 60 initialize(URI uri, Configuration conf)61 public void initialize(URI uri, Configuration conf) throws IOException { 62 super.initialize(uri, conf); 63 try { 64 if (kfsImpl == null) { 65 if (uri.getHost() == null) { 66 kfsImpl = new KFSImpl(conf.get("fs.kfs.metaServerHost", ""), 67 conf.getInt("fs.kfs.metaServerPort", -1), 68 statistics); 69 } else { 70 kfsImpl = new KFSImpl(uri.getHost(), uri.getPort(), statistics); 71 } 72 } 73 74 this.localFs = FileSystem.getLocal(conf); 75 this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); 76 this.workingDir = new Path("/user", System.getProperty("user.name") 77 ).makeQualified(this); 78 setConf(conf); 79 80 } catch (Exception e) { 81 e.printStackTrace(); 82 System.out.println("Unable to initialize KFS"); 83 System.exit(-1); 84 } 85 } 86 87 @Deprecated getName()88 public String getName() { 89 return getUri().toString(); 90 } 91 getWorkingDirectory()92 public Path getWorkingDirectory() { 93 return workingDir; 94 } 95 setWorkingDirectory(Path dir)96 public void setWorkingDirectory(Path dir) { 97 workingDir = makeAbsolute(dir); 98 } 99 makeAbsolute(Path path)100 private Path makeAbsolute(Path path) { 101 if (path.isAbsolute()) { 102 return path; 103 } 104 return new Path(workingDir, path); 105 } 106 mkdirs(Path path, FsPermission permission )107 public boolean mkdirs(Path path, FsPermission permission 108 ) throws IOException { 109 Path absolute = makeAbsolute(path); 110 String srep = absolute.toUri().getPath(); 111 112 int res; 113 114 // System.out.println("Calling mkdirs on: " + srep); 115 116 res = kfsImpl.mkdirs(srep); 117 118 return res == 0; 119 } 120 121 @Deprecated isDirectory(Path path)122 public boolean isDirectory(Path path) throws IOException { 123 Path absolute = makeAbsolute(path); 124 String srep = absolute.toUri().getPath(); 125 126 // System.out.println("Calling isdir on: " + srep); 127 128 return kfsImpl.isDirectory(srep); 129 } 130 131 @Deprecated isFile(Path path)132 public boolean isFile(Path path) throws IOException { 133 Path absolute = makeAbsolute(path); 134 String srep = absolute.toUri().getPath(); 135 return kfsImpl.isFile(srep); 136 } 137 listStatus(Path path)138 public FileStatus[] listStatus(Path path) throws IOException { 139 Path absolute = makeAbsolute(path); 140 String srep = absolute.toUri().getPath(); 141 142 if (kfsImpl.isFile(srep)) 143 return new FileStatus[] { getFileStatus(path) } ; 144 145 return kfsImpl.readdirplus(absolute); 146 } 147 getFileStatus(Path path)148 public FileStatus getFileStatus(Path path) throws IOException { 149 Path absolute = makeAbsolute(path); 150 String srep = absolute.toUri().getPath(); 151 if (!kfsImpl.exists(srep)) { 152 throw new FileNotFoundException("File " + path + " does not exist."); 153 } 154 if (kfsImpl.isDirectory(srep)) { 155 // System.out.println("Status of path: " + path + " is dir"); 156 return new FileStatus(0, true, 1, 0, kfsImpl.getModificationTime(srep), 157 path.makeQualified(this)); 158 } else { 159 // System.out.println("Status of path: " + path + " is file"); 160 return new FileStatus(kfsImpl.filesize(srep), false, 161 kfsImpl.getReplication(srep), 162 getDefaultBlockSize(), 163 kfsImpl.getModificationTime(srep), 164 path.makeQualified(this)); 165 } 166 } 167 168 /** This optional operation is not yet supported. */ append(Path f, int bufferSize, Progressable progress)169 public FSDataOutputStream append(Path f, int bufferSize, 170 Progressable progress) throws IOException { 171 throw new IOException("Not supported"); 172 } 173 create(Path file, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress)174 public FSDataOutputStream create(Path file, FsPermission permission, 175 boolean overwrite, int bufferSize, 176 short replication, long blockSize, Progressable progress) 177 throws IOException { 178 179 if (exists(file)) { 180 if (overwrite) { 181 delete(file); 182 } else { 183 throw new IOException("File already exists: " + file); 184 } 185 } 186 187 Path parent = file.getParent(); 188 if (parent != null && !mkdirs(parent)) { 189 throw new IOException("Mkdirs failed to create " + parent); 190 } 191 192 Path absolute = makeAbsolute(file); 193 String srep = absolute.toUri().getPath(); 194 195 return kfsImpl.create(srep, replication, bufferSize); 196 } 197 open(Path path, int bufferSize)198 public FSDataInputStream open(Path path, int bufferSize) throws IOException { 199 if (!exists(path)) 200 throw new IOException("File does not exist: " + path); 201 202 Path absolute = makeAbsolute(path); 203 String srep = absolute.toUri().getPath(); 204 205 return kfsImpl.open(srep, bufferSize); 206 } 207 rename(Path src, Path dst)208 public boolean rename(Path src, Path dst) throws IOException { 209 Path absoluteS = makeAbsolute(src); 210 String srepS = absoluteS.toUri().getPath(); 211 Path absoluteD = makeAbsolute(dst); 212 String srepD = absoluteD.toUri().getPath(); 213 214 // System.out.println("Calling rename on: " + srepS + " -> " + srepD); 215 216 return kfsImpl.rename(srepS, srepD) == 0; 217 } 218 219 // recursively delete the directory and its contents delete(Path path, boolean recursive)220 public boolean delete(Path path, boolean recursive) throws IOException { 221 Path absolute = makeAbsolute(path); 222 String srep = absolute.toUri().getPath(); 223 if (kfsImpl.isFile(srep)) 224 return kfsImpl.remove(srep) == 0; 225 226 FileStatus[] dirEntries = listStatus(absolute); 227 if ((!recursive) && (dirEntries != null) && 228 (dirEntries.length != 0)) { 229 throw new IOException("Directory " + path.toString() + 230 " is not empty."); 231 } 232 if (dirEntries != null) { 233 for (int i = 0; i < dirEntries.length; i++) { 234 delete(new Path(absolute, dirEntries[i].getPath()), recursive); 235 } 236 } 237 return kfsImpl.rmdir(srep) == 0; 238 } 239 240 @Deprecated delete(Path path)241 public boolean delete(Path path) throws IOException { 242 return delete(path, true); 243 } 244 245 @Deprecated getLength(Path path)246 public long getLength(Path path) throws IOException { 247 Path absolute = makeAbsolute(path); 248 String srep = absolute.toUri().getPath(); 249 return kfsImpl.filesize(srep); 250 } 251 252 @Deprecated getReplication(Path path)253 public short getReplication(Path path) throws IOException { 254 Path absolute = makeAbsolute(path); 255 String srep = absolute.toUri().getPath(); 256 return kfsImpl.getReplication(srep); 257 } 258 getDefaultReplication()259 public short getDefaultReplication() { 260 return 3; 261 } 262 setReplication(Path path, short replication)263 public boolean setReplication(Path path, short replication) 264 throws IOException { 265 266 Path absolute = makeAbsolute(path); 267 String srep = absolute.toUri().getPath(); 268 269 int res = kfsImpl.setReplication(srep, replication); 270 return res >= 0; 271 } 272 273 // 64MB is the KFS block size 274 getDefaultBlockSize()275 public long getDefaultBlockSize() { 276 return 1 << 26; 277 } 278 279 @Deprecated lock(Path path, boolean shared)280 public void lock(Path path, boolean shared) throws IOException { 281 282 } 283 284 @Deprecated release(Path path)285 public void release(Path path) throws IOException { 286 287 } 288 289 /** 290 * Return null if the file doesn't exist; otherwise, get the 291 * locations of the various chunks of the file file from KFS. 292 */ 293 @Override getFileBlockLocations(FileStatus file, long start, long len)294 public BlockLocation[] getFileBlockLocations(FileStatus file, long start, 295 long len) throws IOException { 296 297 if (file == null) { 298 return null; 299 } 300 String srep = makeAbsolute(file.getPath()).toUri().getPath(); 301 String[][] hints = kfsImpl.getDataLocation(srep, start, len); 302 if (hints == null) { 303 return null; 304 } 305 BlockLocation[] result = new BlockLocation[hints.length]; 306 long blockSize = getDefaultBlockSize(); 307 long length = len; 308 long blockStart = start; 309 for(int i=0; i < result.length; ++i) { 310 result[i] = new BlockLocation(null, hints[i], blockStart, 311 length < blockSize ? length : blockSize); 312 blockStart += blockSize; 313 length -= blockSize; 314 } 315 return result; 316 } 317 copyFromLocalFile(boolean delSrc, Path src, Path dst)318 public void copyFromLocalFile(boolean delSrc, Path src, Path dst) throws IOException { 319 FileUtil.copy(localFs, src, this, dst, delSrc, getConf()); 320 } 321 copyToLocalFile(boolean delSrc, Path src, Path dst)322 public void copyToLocalFile(boolean delSrc, Path src, Path dst) throws IOException { 323 FileUtil.copy(this, src, localFs, dst, delSrc, getConf()); 324 } 325 startLocalOutput(Path fsOutputFile, Path tmpLocalFile)326 public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile) 327 throws IOException { 328 return tmpLocalFile; 329 } 330 completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)331 public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile) 332 throws IOException { 333 moveFromLocalFile(tmpLocalFile, fsOutputFile); 334 } 335 } 336