1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.mapred; 20 21 import static org.junit.Assert.assertFalse; 22 import static org.junit.Assert.assertTrue; 23 import static org.mockito.Matchers.any; 24 import static org.mockito.Matchers.anyInt; 25 import static org.mockito.Mockito.mock; 26 import static org.mockito.Mockito.when; 27 28 import java.io.ByteArrayInputStream; 29 import java.io.File; 30 import java.io.FileNotFoundException; 31 import java.io.IOException; 32 import java.net.URI; 33 34 import org.apache.hadoop.fs.FSDataInputStream; 35 import org.apache.hadoop.fs.FileStatus; 36 import org.apache.hadoop.fs.FileSystem; 37 import org.apache.hadoop.fs.FilterFileSystem; 38 import org.apache.hadoop.fs.Path; 39 import org.apache.hadoop.fs.PositionedReadable; 40 import org.apache.hadoop.fs.Seekable; 41 import org.apache.hadoop.fs.permission.FsPermission; 42 import org.apache.hadoop.mapreduce.MRConfig; 43 import org.apache.hadoop.mapreduce.MRJobConfig; 44 import org.apache.hadoop.mapreduce.filecache.DistributedCache; 45 import org.junit.After; 46 import org.junit.Before; 47 import org.junit.Test; 48 import org.mockito.invocation.InvocationOnMock; 49 import org.mockito.stubbing.Answer; 50 51 @SuppressWarnings("deprecation") 52 public class TestLocalDistributedCacheManager { 53 54 private static FileSystem mockfs; 55 56 public static class MockFileSystem extends FilterFileSystem { MockFileSystem()57 public MockFileSystem() { 58 super(mockfs); 59 } 60 } 61 62 private File localDir; 63 delete(File file)64 private static void delete(File file) throws IOException { 65 if (file.getAbsolutePath().length() < 5) { 66 throw new IllegalArgumentException( 67 "Path [" + file + "] is too short, not deleting"); 68 } 69 if (file.exists()) { 70 if (file.isDirectory()) { 71 File[] children = file.listFiles(); 72 if (children != null) { 73 for (File child : children) { 74 delete(child); 75 } 76 } 77 } 78 if (!file.delete()) { 79 throw new RuntimeException( 80 "Could not delete path [" + file + "]"); 81 } 82 } 83 } 84 85 @Before setup()86 public void setup() throws Exception { 87 mockfs = mock(FileSystem.class); 88 localDir = new File(System.getProperty("test.build.dir", "target/test-dir"), 89 TestLocalDistributedCacheManager.class.getName()); 90 delete(localDir); 91 localDir.mkdirs(); 92 } 93 94 @After cleanup()95 public void cleanup() throws Exception { 96 delete(localDir); 97 } 98 99 /** 100 * Mock input stream based on a byte array so that it can be used by a 101 * FSDataInputStream. 102 */ 103 private static class MockInputStream extends ByteArrayInputStream 104 implements Seekable, PositionedReadable { MockInputStream(byte[] buf)105 public MockInputStream(byte[] buf) { 106 super(buf); 107 } 108 109 // empty implementation for unused methods read(long position, byte[] buffer, int offset, int length)110 public int read(long position, byte[] buffer, int offset, int length) { return -1; } readFully(long position, byte[] buffer, int offset, int length)111 public void readFully(long position, byte[] buffer, int offset, int length) {} readFully(long position, byte[] buffer)112 public void readFully(long position, byte[] buffer) {} seek(long position)113 public void seek(long position) {} getPos()114 public long getPos() { return 0; } seekToNewSource(long targetPos)115 public boolean seekToNewSource(long targetPos) { return false; } 116 } 117 118 @Test testDownload()119 public void testDownload() throws Exception { 120 JobConf conf = new JobConf(); 121 conf.setClass("fs.mock.impl", MockFileSystem.class, FileSystem.class); 122 123 URI mockBase = new URI("mock://test-nn1/"); 124 when(mockfs.getUri()).thenReturn(mockBase); 125 Path working = new Path("mock://test-nn1/user/me/"); 126 when(mockfs.getWorkingDirectory()).thenReturn(working); 127 when(mockfs.resolvePath(any(Path.class))).thenAnswer(new Answer<Path>() { 128 @Override 129 public Path answer(InvocationOnMock args) throws Throwable { 130 return (Path) args.getArguments()[0]; 131 } 132 }); 133 134 final URI file = new URI("mock://test-nn1/user/me/file.txt#link"); 135 final Path filePath = new Path(file); 136 File link = new File("link"); 137 138 when(mockfs.getFileStatus(any(Path.class))).thenAnswer(new Answer<FileStatus>() { 139 @Override 140 public FileStatus answer(InvocationOnMock args) throws Throwable { 141 Path p = (Path)args.getArguments()[0]; 142 if("file.txt".equals(p.getName())) { 143 return new FileStatus(201, false, 1, 500, 101, 101, 144 FsPermission.getDefault(), "me", "me", filePath); 145 } else { 146 throw new FileNotFoundException(p+" not supported by mocking"); 147 } 148 } 149 }); 150 151 when(mockfs.getConf()).thenReturn(conf); 152 final FSDataInputStream in = 153 new FSDataInputStream(new MockInputStream("This is a test file\n".getBytes())); 154 when(mockfs.open(any(Path.class), anyInt())).thenAnswer(new Answer<FSDataInputStream>() { 155 @Override 156 public FSDataInputStream answer(InvocationOnMock args) throws Throwable { 157 Path src = (Path)args.getArguments()[0]; 158 if ("file.txt".equals(src.getName())) { 159 return in; 160 } else { 161 throw new FileNotFoundException(src+" not supported by mocking"); 162 } 163 } 164 }); 165 166 DistributedCache.addCacheFile(file, conf); 167 conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "101"); 168 conf.set(MRJobConfig.CACHE_FILES_SIZES, "201"); 169 conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "false"); 170 conf.set(MRConfig.LOCAL_DIR, localDir.getAbsolutePath()); 171 LocalDistributedCacheManager manager = new LocalDistributedCacheManager(); 172 try { 173 manager.setup(conf); 174 assertTrue(link.exists()); 175 } finally { 176 manager.close(); 177 } 178 assertFalse(link.exists()); 179 } 180 181 @Test testEmptyDownload()182 public void testEmptyDownload() throws Exception { 183 JobConf conf = new JobConf(); 184 conf.setClass("fs.mock.impl", MockFileSystem.class, FileSystem.class); 185 186 URI mockBase = new URI("mock://test-nn1/"); 187 when(mockfs.getUri()).thenReturn(mockBase); 188 Path working = new Path("mock://test-nn1/user/me/"); 189 when(mockfs.getWorkingDirectory()).thenReturn(working); 190 when(mockfs.resolvePath(any(Path.class))).thenAnswer(new Answer<Path>() { 191 @Override 192 public Path answer(InvocationOnMock args) throws Throwable { 193 return (Path) args.getArguments()[0]; 194 } 195 }); 196 197 when(mockfs.getFileStatus(any(Path.class))).thenAnswer(new Answer<FileStatus>() { 198 @Override 199 public FileStatus answer(InvocationOnMock args) throws Throwable { 200 Path p = (Path)args.getArguments()[0]; 201 throw new FileNotFoundException(p+" not supported by mocking"); 202 } 203 }); 204 205 when(mockfs.getConf()).thenReturn(conf); 206 when(mockfs.open(any(Path.class), anyInt())).thenAnswer(new Answer<FSDataInputStream>() { 207 @Override 208 public FSDataInputStream answer(InvocationOnMock args) throws Throwable { 209 Path src = (Path)args.getArguments()[0]; 210 throw new FileNotFoundException(src+" not supported by mocking"); 211 } 212 }); 213 214 conf.set(MRJobConfig.CACHE_FILES, ""); 215 conf.set(MRConfig.LOCAL_DIR, localDir.getAbsolutePath()); 216 LocalDistributedCacheManager manager = new LocalDistributedCacheManager(); 217 try { 218 manager.setup(conf); 219 } finally { 220 manager.close(); 221 } 222 } 223 224 225 @Test testDuplicateDownload()226 public void testDuplicateDownload() throws Exception { 227 JobConf conf = new JobConf(); 228 conf.setClass("fs.mock.impl", MockFileSystem.class, FileSystem.class); 229 230 URI mockBase = new URI("mock://test-nn1/"); 231 when(mockfs.getUri()).thenReturn(mockBase); 232 Path working = new Path("mock://test-nn1/user/me/"); 233 when(mockfs.getWorkingDirectory()).thenReturn(working); 234 when(mockfs.resolvePath(any(Path.class))).thenAnswer(new Answer<Path>() { 235 @Override 236 public Path answer(InvocationOnMock args) throws Throwable { 237 return (Path) args.getArguments()[0]; 238 } 239 }); 240 241 final URI file = new URI("mock://test-nn1/user/me/file.txt#link"); 242 final Path filePath = new Path(file); 243 File link = new File("link"); 244 245 when(mockfs.getFileStatus(any(Path.class))).thenAnswer(new Answer<FileStatus>() { 246 @Override 247 public FileStatus answer(InvocationOnMock args) throws Throwable { 248 Path p = (Path)args.getArguments()[0]; 249 if("file.txt".equals(p.getName())) { 250 return new FileStatus(201, false, 1, 500, 101, 101, 251 FsPermission.getDefault(), "me", "me", filePath); 252 } else { 253 throw new FileNotFoundException(p+" not supported by mocking"); 254 } 255 } 256 }); 257 258 when(mockfs.getConf()).thenReturn(conf); 259 final FSDataInputStream in = 260 new FSDataInputStream(new MockInputStream("This is a test file\n".getBytes())); 261 when(mockfs.open(any(Path.class), anyInt())).thenAnswer(new Answer<FSDataInputStream>() { 262 @Override 263 public FSDataInputStream answer(InvocationOnMock args) throws Throwable { 264 Path src = (Path)args.getArguments()[0]; 265 if ("file.txt".equals(src.getName())) { 266 return in; 267 } else { 268 throw new FileNotFoundException(src+" not supported by mocking"); 269 } 270 } 271 }); 272 273 DistributedCache.addCacheFile(file, conf); 274 DistributedCache.addCacheFile(file, conf); 275 conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "101,101"); 276 conf.set(MRJobConfig.CACHE_FILES_SIZES, "201,201"); 277 conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "false,false"); 278 conf.set(MRConfig.LOCAL_DIR, localDir.getAbsolutePath()); 279 LocalDistributedCacheManager manager = new LocalDistributedCacheManager(); 280 try { 281 manager.setup(conf); 282 assertTrue(link.exists()); 283 } finally { 284 manager.close(); 285 } 286 assertFalse(link.exists()); 287 } 288 } 289