1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.mapreduce.lib.input; 19 20 import java.io.FileNotFoundException; 21 import java.io.IOException; 22 import java.util.Arrays; 23 import java.util.Collection; 24 import java.util.List; 25 import java.util.Set; 26 27 import javax.annotation.Nullable; 28 29 import org.junit.Assert; 30 31 import org.apache.commons.logging.Log; 32 import org.apache.commons.logging.LogFactory; 33 import org.apache.hadoop.conf.Configuration; 34 import org.apache.hadoop.fs.BlockLocation; 35 import org.apache.hadoop.fs.FileStatus; 36 import org.apache.hadoop.fs.FileSystem; 37 import org.apache.hadoop.fs.LocatedFileStatus; 38 import org.apache.hadoop.fs.Path; 39 import org.apache.hadoop.fs.PathFilter; 40 import org.apache.hadoop.fs.RawLocalFileSystem; 41 import org.apache.hadoop.fs.RemoteIterator; 42 import org.apache.hadoop.mapred.SplitLocationInfo; 43 import org.apache.hadoop.mapreduce.InputSplit; 44 import org.apache.hadoop.mapreduce.Job; 45 import org.junit.After; 46 import org.junit.Before; 47 import org.junit.Test; 48 import org.junit.runner.RunWith; 49 import org.junit.runners.Parameterized; 50 import org.junit.runners.Parameterized.Parameters; 51 52 import com.google.common.base.Function; 53 import com.google.common.collect.Iterables; 54 import com.google.common.collect.Lists; 55 import com.google.common.collect.Sets; 56 57 @RunWith(value = Parameterized.class) 58 public class TestFileInputFormat { 59 60 private static final Log LOG = LogFactory.getLog(TestFileInputFormat.class); 61 62 private static String testTmpDir = System.getProperty("test.build.data", "/tmp"); 63 private static final Path TEST_ROOT_DIR = new Path(testTmpDir, "TestFIF"); 64 65 private static FileSystem localFs; 66 67 private int numThreads; 68 TestFileInputFormat(int numThreads)69 public TestFileInputFormat(int numThreads) { 70 this.numThreads = numThreads; 71 LOG.info("Running with numThreads: " + numThreads); 72 } 73 74 @Parameters data()75 public static Collection<Object[]> data() { 76 Object[][] data = new Object[][] { { 1 }, { 5 }}; 77 return Arrays.asList(data); 78 } 79 80 @Before setup()81 public void setup() throws IOException { 82 LOG.info("Using Test Dir: " + TEST_ROOT_DIR); 83 localFs = FileSystem.getLocal(new Configuration()); 84 localFs.delete(TEST_ROOT_DIR, true); 85 localFs.mkdirs(TEST_ROOT_DIR); 86 } 87 88 @After cleanup()89 public void cleanup() throws IOException { 90 localFs.delete(TEST_ROOT_DIR, true); 91 } 92 93 @Test testNumInputFilesRecursively()94 public void testNumInputFilesRecursively() throws Exception { 95 Configuration conf = getConfiguration(); 96 conf.set(FileInputFormat.INPUT_DIR_RECURSIVE, "true"); 97 conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads); 98 Job job = Job.getInstance(conf); 99 FileInputFormat<?, ?> fileInputFormat = new TextInputFormat(); 100 List<InputSplit> splits = fileInputFormat.getSplits(job); 101 Assert.assertEquals("Input splits are not correct", 3, splits.size()); 102 verifySplits(Lists.newArrayList("test:/a1/a2/file2", "test:/a1/a2/file3", 103 "test:/a1/file1"), splits); 104 105 // Using the deprecated configuration 106 conf = getConfiguration(); 107 conf.set("mapred.input.dir.recursive", "true"); 108 job = Job.getInstance(conf); 109 splits = fileInputFormat.getSplits(job); 110 verifySplits(Lists.newArrayList("test:/a1/a2/file2", "test:/a1/a2/file3", 111 "test:/a1/file1"), splits); 112 } 113 114 @Test testNumInputFilesWithoutRecursively()115 public void testNumInputFilesWithoutRecursively() throws Exception { 116 Configuration conf = getConfiguration(); 117 conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads); 118 Job job = Job.getInstance(conf); 119 FileInputFormat<?, ?> fileInputFormat = new TextInputFormat(); 120 List<InputSplit> splits = fileInputFormat.getSplits(job); 121 Assert.assertEquals("Input splits are not correct", 2, splits.size()); 122 verifySplits(Lists.newArrayList("test:/a1/a2", "test:/a1/file1"), splits); 123 } 124 125 @Test testListLocatedStatus()126 public void testListLocatedStatus() throws Exception { 127 Configuration conf = getConfiguration(); 128 conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads); 129 conf.setBoolean("fs.test.impl.disable.cache", false); 130 conf.set(FileInputFormat.INPUT_DIR, "test:///a1/a2"); 131 MockFileSystem mockFs = 132 (MockFileSystem) new Path("test:///").getFileSystem(conf); 133 Assert.assertEquals("listLocatedStatus already called", 134 0, mockFs.numListLocatedStatusCalls); 135 Job job = Job.getInstance(conf); 136 FileInputFormat<?, ?> fileInputFormat = new TextInputFormat(); 137 List<InputSplit> splits = fileInputFormat.getSplits(job); 138 Assert.assertEquals("Input splits are not correct", 2, splits.size()); 139 Assert.assertEquals("listLocatedStatuss calls", 140 1, mockFs.numListLocatedStatusCalls); 141 FileSystem.closeAll(); 142 } 143 144 @Test testSplitLocationInfo()145 public void testSplitLocationInfo() throws Exception { 146 Configuration conf = getConfiguration(); 147 conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, 148 "test:///a1/a2"); 149 Job job = Job.getInstance(conf); 150 TextInputFormat fileInputFormat = new TextInputFormat(); 151 List<InputSplit> splits = fileInputFormat.getSplits(job); 152 String[] locations = splits.get(0).getLocations(); 153 Assert.assertEquals(2, locations.length); 154 SplitLocationInfo[] locationInfo = splits.get(0).getLocationInfo(); 155 Assert.assertEquals(2, locationInfo.length); 156 SplitLocationInfo localhostInfo = locations[0].equals("localhost") ? 157 locationInfo[0] : locationInfo[1]; 158 SplitLocationInfo otherhostInfo = locations[0].equals("otherhost") ? 159 locationInfo[0] : locationInfo[1]; 160 Assert.assertTrue(localhostInfo.isOnDisk()); 161 Assert.assertTrue(localhostInfo.isInMemory()); 162 Assert.assertTrue(otherhostInfo.isOnDisk()); 163 Assert.assertFalse(otherhostInfo.isInMemory()); 164 } 165 166 @Test testListStatusSimple()167 public void testListStatusSimple() throws IOException { 168 Configuration conf = new Configuration(); 169 conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads); 170 171 List<Path> expectedPaths = configureTestSimple(conf, localFs); 172 173 Job job = Job.getInstance(conf); 174 FileInputFormat<?, ?> fif = new TextInputFormat(); 175 List<FileStatus> statuses = fif.listStatus(job); 176 177 verifyFileStatuses(expectedPaths, statuses, localFs); 178 } 179 180 @Test testListStatusNestedRecursive()181 public void testListStatusNestedRecursive() throws IOException { 182 Configuration conf = new Configuration(); 183 conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads); 184 185 List<Path> expectedPaths = configureTestNestedRecursive(conf, localFs); 186 Job job = Job.getInstance(conf); 187 FileInputFormat<?, ?> fif = new TextInputFormat(); 188 List<FileStatus> statuses = fif.listStatus(job); 189 190 verifyFileStatuses(expectedPaths, statuses, localFs); 191 } 192 193 194 @Test testListStatusNestedNonRecursive()195 public void testListStatusNestedNonRecursive() throws IOException { 196 Configuration conf = new Configuration(); 197 conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads); 198 199 List<Path> expectedPaths = configureTestNestedNonRecursive(conf, localFs); 200 Job job = Job.getInstance(conf); 201 FileInputFormat<?, ?> fif = new TextInputFormat(); 202 List<FileStatus> statuses = fif.listStatus(job); 203 204 verifyFileStatuses(expectedPaths, statuses, localFs); 205 } 206 207 @Test testListStatusErrorOnNonExistantDir()208 public void testListStatusErrorOnNonExistantDir() throws IOException { 209 Configuration conf = new Configuration(); 210 conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads); 211 212 configureTestErrorOnNonExistantDir(conf, localFs); 213 Job job = Job.getInstance(conf); 214 FileInputFormat<?, ?> fif = new TextInputFormat(); 215 try { 216 fif.listStatus(job); 217 Assert.fail("Expecting an IOException for a missing Input path"); 218 } catch (IOException e) { 219 Path expectedExceptionPath = new Path(TEST_ROOT_DIR, "input2"); 220 expectedExceptionPath = localFs.makeQualified(expectedExceptionPath); 221 Assert.assertTrue(e instanceof InvalidInputException); 222 Assert.assertEquals( 223 "Input path does not exist: " + expectedExceptionPath.toString(), 224 e.getMessage()); 225 } 226 } 227 configureTestSimple(Configuration conf, FileSystem localFs)228 public static List<Path> configureTestSimple(Configuration conf, FileSystem localFs) 229 throws IOException { 230 Path base1 = new Path(TEST_ROOT_DIR, "input1"); 231 Path base2 = new Path(TEST_ROOT_DIR, "input2"); 232 conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, 233 localFs.makeQualified(base1) + "," + localFs.makeQualified(base2)); 234 localFs.mkdirs(base1); 235 localFs.mkdirs(base2); 236 237 Path in1File1 = new Path(base1, "file1"); 238 Path in1File2 = new Path(base1, "file2"); 239 localFs.createNewFile(in1File1); 240 localFs.createNewFile(in1File2); 241 242 Path in2File1 = new Path(base2, "file1"); 243 Path in2File2 = new Path(base2, "file2"); 244 localFs.createNewFile(in2File1); 245 localFs.createNewFile(in2File2); 246 List<Path> expectedPaths = Lists.newArrayList(in1File1, in1File2, in2File1, 247 in2File2); 248 return expectedPaths; 249 } 250 configureTestNestedRecursive(Configuration conf, FileSystem localFs)251 public static List<Path> configureTestNestedRecursive(Configuration conf, 252 FileSystem localFs) throws IOException { 253 Path base1 = new Path(TEST_ROOT_DIR, "input1"); 254 conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, 255 localFs.makeQualified(base1).toString()); 256 conf.setBoolean( 257 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR_RECURSIVE, 258 true); 259 localFs.mkdirs(base1); 260 261 Path inDir1 = new Path(base1, "dir1"); 262 Path inDir2 = new Path(base1, "dir2"); 263 Path inFile1 = new Path(base1, "file1"); 264 265 Path dir1File1 = new Path(inDir1, "file1"); 266 Path dir1File2 = new Path(inDir1, "file2"); 267 268 Path dir2File1 = new Path(inDir2, "file1"); 269 Path dir2File2 = new Path(inDir2, "file2"); 270 271 localFs.mkdirs(inDir1); 272 localFs.mkdirs(inDir2); 273 274 localFs.createNewFile(inFile1); 275 localFs.createNewFile(dir1File1); 276 localFs.createNewFile(dir1File2); 277 localFs.createNewFile(dir2File1); 278 localFs.createNewFile(dir2File2); 279 280 List<Path> expectedPaths = Lists.newArrayList(inFile1, dir1File1, 281 dir1File2, dir2File1, dir2File2); 282 return expectedPaths; 283 } 284 configureTestNestedNonRecursive(Configuration conf, FileSystem localFs)285 public static List<Path> configureTestNestedNonRecursive(Configuration conf, 286 FileSystem localFs) throws IOException { 287 Path base1 = new Path(TEST_ROOT_DIR, "input1"); 288 conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, 289 localFs.makeQualified(base1).toString()); 290 conf.setBoolean( 291 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR_RECURSIVE, 292 false); 293 localFs.mkdirs(base1); 294 295 Path inDir1 = new Path(base1, "dir1"); 296 Path inDir2 = new Path(base1, "dir2"); 297 Path inFile1 = new Path(base1, "file1"); 298 299 Path dir1File1 = new Path(inDir1, "file1"); 300 Path dir1File2 = new Path(inDir1, "file2"); 301 302 Path dir2File1 = new Path(inDir2, "file1"); 303 Path dir2File2 = new Path(inDir2, "file2"); 304 305 localFs.mkdirs(inDir1); 306 localFs.mkdirs(inDir2); 307 308 localFs.createNewFile(inFile1); 309 localFs.createNewFile(dir1File1); 310 localFs.createNewFile(dir1File2); 311 localFs.createNewFile(dir2File1); 312 localFs.createNewFile(dir2File2); 313 314 List<Path> expectedPaths = Lists.newArrayList(inFile1, inDir1, inDir2); 315 return expectedPaths; 316 } 317 configureTestErrorOnNonExistantDir(Configuration conf, FileSystem localFs)318 public static List<Path> configureTestErrorOnNonExistantDir(Configuration conf, 319 FileSystem localFs) throws IOException { 320 Path base1 = new Path(TEST_ROOT_DIR, "input1"); 321 Path base2 = new Path(TEST_ROOT_DIR, "input2"); 322 conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, 323 localFs.makeQualified(base1) + "," + localFs.makeQualified(base2)); 324 conf.setBoolean( 325 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR_RECURSIVE, 326 true); 327 localFs.mkdirs(base1); 328 329 Path inFile1 = new Path(base1, "file1"); 330 Path inFile2 = new Path(base1, "file2"); 331 332 localFs.createNewFile(inFile1); 333 localFs.createNewFile(inFile2); 334 335 List<Path> expectedPaths = Lists.newArrayList(); 336 return expectedPaths; 337 } 338 verifyFileStatuses(List<Path> expectedPaths, List<FileStatus> fetchedStatuses, final FileSystem localFs)339 public static void verifyFileStatuses(List<Path> expectedPaths, 340 List<FileStatus> fetchedStatuses, final FileSystem localFs) { 341 Assert.assertEquals(expectedPaths.size(), fetchedStatuses.size()); 342 343 Iterable<Path> fqExpectedPaths = Iterables.transform(expectedPaths, 344 new Function<Path, Path>() { 345 @Override 346 public Path apply(Path input) { 347 return localFs.makeQualified(input); 348 } 349 }); 350 351 Set<Path> expectedPathSet = Sets.newHashSet(fqExpectedPaths); 352 for (FileStatus fileStatus : fetchedStatuses) { 353 if (!expectedPathSet.remove(localFs.makeQualified(fileStatus.getPath()))) { 354 Assert.fail("Found extra fetched status: " + fileStatus.getPath()); 355 } 356 } 357 Assert.assertEquals( 358 "Not all expectedPaths matched: " + expectedPathSet.toString(), 0, 359 expectedPathSet.size()); 360 } 361 362 verifySplits(List<String> expected, List<InputSplit> splits)363 private void verifySplits(List<String> expected, List<InputSplit> splits) { 364 Iterable<String> pathsFromSplits = Iterables.transform(splits, 365 new Function<InputSplit, String>() { 366 @Override 367 public String apply(@Nullable InputSplit input) { 368 return ((FileSplit) input).getPath().toString(); 369 } 370 }); 371 372 Set<String> expectedSet = Sets.newHashSet(expected); 373 for (String splitPathString : pathsFromSplits) { 374 if (!expectedSet.remove(splitPathString)) { 375 Assert.fail("Found extra split: " + splitPathString); 376 } 377 } 378 Assert.assertEquals( 379 "Not all expectedPaths matched: " + expectedSet.toString(), 0, 380 expectedSet.size()); 381 } 382 getConfiguration()383 private Configuration getConfiguration() { 384 Configuration conf = new Configuration(); 385 conf.set("fs.test.impl.disable.cache", "true"); 386 conf.setClass("fs.test.impl", MockFileSystem.class, FileSystem.class); 387 conf.set(FileInputFormat.INPUT_DIR, "test:///a1"); 388 return conf; 389 } 390 391 static class MockFileSystem extends RawLocalFileSystem { 392 int numListLocatedStatusCalls = 0; 393 394 @Override listStatus(Path f)395 public FileStatus[] listStatus(Path f) throws FileNotFoundException, 396 IOException { 397 if (f.toString().equals("test:/a1")) { 398 return new FileStatus[] { 399 new FileStatus(0, true, 1, 150, 150, new Path("test:/a1/a2")), 400 new FileStatus(10, false, 1, 150, 150, new Path("test:/a1/file1")) }; 401 } else if (f.toString().equals("test:/a1/a2")) { 402 return new FileStatus[] { 403 new FileStatus(10, false, 1, 150, 150, 404 new Path("test:/a1/a2/file2")), 405 new FileStatus(10, false, 1, 151, 150, 406 new Path("test:/a1/a2/file3")) }; 407 } 408 return new FileStatus[0]; 409 } 410 411 @Override globStatus(Path pathPattern, PathFilter filter)412 public FileStatus[] globStatus(Path pathPattern, PathFilter filter) 413 throws IOException { 414 return new FileStatus[] { new FileStatus(10, true, 1, 150, 150, 415 pathPattern) }; 416 } 417 418 @Override listStatus(Path f, PathFilter filter)419 public FileStatus[] listStatus(Path f, PathFilter filter) 420 throws FileNotFoundException, IOException { 421 return this.listStatus(f); 422 } 423 424 @Override getFileBlockLocations(Path p, long start, long len)425 public BlockLocation[] getFileBlockLocations(Path p, long start, long len) 426 throws IOException { 427 return new BlockLocation[] { 428 new BlockLocation(new String[] { "localhost:50010", "otherhost:50010" }, 429 new String[] { "localhost", "otherhost" }, new String[] { "localhost" }, 430 new String[0], 0, len, false) }; } 431 432 @Override listLocatedStatus(Path f, PathFilter filter)433 protected RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f, 434 PathFilter filter) throws FileNotFoundException, IOException { 435 ++numListLocatedStatusCalls; 436 return super.listLocatedStatus(f, filter); 437 } 438 } 439 } 440