1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.mapred;
20 
21 import static org.junit.Assert.assertFalse;
22 import static org.junit.Assert.assertTrue;
23 import static org.mockito.Matchers.any;
24 import static org.mockito.Matchers.anyInt;
25 import static org.mockito.Mockito.mock;
26 import static org.mockito.Mockito.when;
27 
28 import java.io.ByteArrayInputStream;
29 import java.io.File;
30 import java.io.FileNotFoundException;
31 import java.io.IOException;
32 import java.net.URI;
33 
34 import org.apache.hadoop.fs.FSDataInputStream;
35 import org.apache.hadoop.fs.FileStatus;
36 import org.apache.hadoop.fs.FileSystem;
37 import org.apache.hadoop.fs.FilterFileSystem;
38 import org.apache.hadoop.fs.Path;
39 import org.apache.hadoop.fs.PositionedReadable;
40 import org.apache.hadoop.fs.Seekable;
41 import org.apache.hadoop.fs.permission.FsPermission;
42 import org.apache.hadoop.mapreduce.MRConfig;
43 import org.apache.hadoop.mapreduce.MRJobConfig;
44 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
45 import org.junit.After;
46 import org.junit.Before;
47 import org.junit.Test;
48 import org.mockito.invocation.InvocationOnMock;
49 import org.mockito.stubbing.Answer;
50 
51 @SuppressWarnings("deprecation")
52 public class TestLocalDistributedCacheManager {
53 
54   private static FileSystem mockfs;
55 
56   public static class MockFileSystem extends FilterFileSystem {
MockFileSystem()57     public MockFileSystem() {
58       super(mockfs);
59     }
60   }
61 
62   private File localDir;
63 
delete(File file)64   private static void delete(File file) throws IOException {
65     if (file.getAbsolutePath().length() < 5) {
66       throw new IllegalArgumentException(
67           "Path [" + file + "] is too short, not deleting");
68     }
69     if (file.exists()) {
70       if (file.isDirectory()) {
71         File[] children = file.listFiles();
72         if (children != null) {
73           for (File child : children) {
74             delete(child);
75           }
76         }
77       }
78       if (!file.delete()) {
79         throw new RuntimeException(
80           "Could not delete path [" + file + "]");
81       }
82     }
83   }
84 
85   @Before
setup()86   public void setup() throws Exception {
87     mockfs = mock(FileSystem.class);
88     localDir = new File(System.getProperty("test.build.dir", "target/test-dir"),
89         TestLocalDistributedCacheManager.class.getName());
90     delete(localDir);
91     localDir.mkdirs();
92   }
93 
94   @After
cleanup()95   public void cleanup() throws Exception {
96     delete(localDir);
97   }
98 
99   /**
100    * Mock input stream based on a byte array so that it can be used by a
101    * FSDataInputStream.
102    */
103   private static class MockInputStream extends ByteArrayInputStream
104       implements Seekable, PositionedReadable {
MockInputStream(byte[] buf)105     public MockInputStream(byte[] buf) {
106       super(buf);
107     }
108 
109     // empty implementation for unused methods
read(long position, byte[] buffer, int offset, int length)110     public int read(long position, byte[] buffer, int offset, int length) { return -1; }
readFully(long position, byte[] buffer, int offset, int length)111     public void readFully(long position, byte[] buffer, int offset, int length) {}
readFully(long position, byte[] buffer)112     public void readFully(long position, byte[] buffer) {}
seek(long position)113     public void seek(long position) {}
getPos()114     public long getPos() { return 0; }
seekToNewSource(long targetPos)115     public boolean seekToNewSource(long targetPos) { return false; }
116   }
117 
118   @Test
testDownload()119   public void testDownload() throws Exception {
120     JobConf conf = new JobConf();
121     conf.setClass("fs.mock.impl", MockFileSystem.class, FileSystem.class);
122 
123     URI mockBase = new URI("mock://test-nn1/");
124     when(mockfs.getUri()).thenReturn(mockBase);
125     Path working = new Path("mock://test-nn1/user/me/");
126     when(mockfs.getWorkingDirectory()).thenReturn(working);
127     when(mockfs.resolvePath(any(Path.class))).thenAnswer(new Answer<Path>() {
128       @Override
129       public Path answer(InvocationOnMock args) throws Throwable {
130         return (Path) args.getArguments()[0];
131       }
132     });
133 
134     final URI file = new URI("mock://test-nn1/user/me/file.txt#link");
135     final Path filePath = new Path(file);
136     File link = new File("link");
137 
138     when(mockfs.getFileStatus(any(Path.class))).thenAnswer(new Answer<FileStatus>() {
139       @Override
140       public FileStatus answer(InvocationOnMock args) throws Throwable {
141         Path p = (Path)args.getArguments()[0];
142         if("file.txt".equals(p.getName())) {
143          return new FileStatus(201, false, 1, 500, 101, 101,
144              FsPermission.getDefault(), "me", "me", filePath);
145         }  else {
146           throw new FileNotFoundException(p+" not supported by mocking");
147         }
148       }
149     });
150 
151     when(mockfs.getConf()).thenReturn(conf);
152     final FSDataInputStream in =
153         new FSDataInputStream(new MockInputStream("This is a test file\n".getBytes()));
154     when(mockfs.open(any(Path.class), anyInt())).thenAnswer(new Answer<FSDataInputStream>() {
155       @Override
156       public FSDataInputStream answer(InvocationOnMock args) throws Throwable {
157         Path src = (Path)args.getArguments()[0];
158         if ("file.txt".equals(src.getName())) {
159           return in;
160         } else {
161           throw new FileNotFoundException(src+" not supported by mocking");
162         }
163       }
164     });
165 
166     DistributedCache.addCacheFile(file, conf);
167     conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "101");
168     conf.set(MRJobConfig.CACHE_FILES_SIZES, "201");
169     conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "false");
170     conf.set(MRConfig.LOCAL_DIR, localDir.getAbsolutePath());
171     LocalDistributedCacheManager manager = new LocalDistributedCacheManager();
172     try {
173       manager.setup(conf);
174       assertTrue(link.exists());
175     } finally {
176       manager.close();
177     }
178     assertFalse(link.exists());
179   }
180 
181   @Test
testEmptyDownload()182   public void testEmptyDownload() throws Exception {
183     JobConf conf = new JobConf();
184     conf.setClass("fs.mock.impl", MockFileSystem.class, FileSystem.class);
185 
186     URI mockBase = new URI("mock://test-nn1/");
187     when(mockfs.getUri()).thenReturn(mockBase);
188     Path working = new Path("mock://test-nn1/user/me/");
189     when(mockfs.getWorkingDirectory()).thenReturn(working);
190     when(mockfs.resolvePath(any(Path.class))).thenAnswer(new Answer<Path>() {
191       @Override
192       public Path answer(InvocationOnMock args) throws Throwable {
193         return (Path) args.getArguments()[0];
194       }
195     });
196 
197     when(mockfs.getFileStatus(any(Path.class))).thenAnswer(new Answer<FileStatus>() {
198       @Override
199       public FileStatus answer(InvocationOnMock args) throws Throwable {
200         Path p = (Path)args.getArguments()[0];
201         throw new FileNotFoundException(p+" not supported by mocking");
202       }
203     });
204 
205     when(mockfs.getConf()).thenReturn(conf);
206     when(mockfs.open(any(Path.class), anyInt())).thenAnswer(new Answer<FSDataInputStream>() {
207       @Override
208       public FSDataInputStream answer(InvocationOnMock args) throws Throwable {
209         Path src = (Path)args.getArguments()[0];
210         throw new FileNotFoundException(src+" not supported by mocking");
211       }
212     });
213 
214     conf.set(MRJobConfig.CACHE_FILES, "");
215     conf.set(MRConfig.LOCAL_DIR, localDir.getAbsolutePath());
216     LocalDistributedCacheManager manager = new LocalDistributedCacheManager();
217     try {
218       manager.setup(conf);
219     } finally {
220       manager.close();
221     }
222   }
223 
224 
225   @Test
testDuplicateDownload()226   public void testDuplicateDownload() throws Exception {
227     JobConf conf = new JobConf();
228     conf.setClass("fs.mock.impl", MockFileSystem.class, FileSystem.class);
229 
230     URI mockBase = new URI("mock://test-nn1/");
231     when(mockfs.getUri()).thenReturn(mockBase);
232     Path working = new Path("mock://test-nn1/user/me/");
233     when(mockfs.getWorkingDirectory()).thenReturn(working);
234     when(mockfs.resolvePath(any(Path.class))).thenAnswer(new Answer<Path>() {
235       @Override
236       public Path answer(InvocationOnMock args) throws Throwable {
237         return (Path) args.getArguments()[0];
238       }
239     });
240 
241     final URI file = new URI("mock://test-nn1/user/me/file.txt#link");
242     final Path filePath = new Path(file);
243     File link = new File("link");
244 
245     when(mockfs.getFileStatus(any(Path.class))).thenAnswer(new Answer<FileStatus>() {
246       @Override
247       public FileStatus answer(InvocationOnMock args) throws Throwable {
248         Path p = (Path)args.getArguments()[0];
249         if("file.txt".equals(p.getName())) {
250          return new FileStatus(201, false, 1, 500, 101, 101,
251              FsPermission.getDefault(), "me", "me", filePath);
252         }  else {
253           throw new FileNotFoundException(p+" not supported by mocking");
254         }
255       }
256     });
257 
258     when(mockfs.getConf()).thenReturn(conf);
259     final FSDataInputStream in =
260         new FSDataInputStream(new MockInputStream("This is a test file\n".getBytes()));
261     when(mockfs.open(any(Path.class), anyInt())).thenAnswer(new Answer<FSDataInputStream>() {
262       @Override
263       public FSDataInputStream answer(InvocationOnMock args) throws Throwable {
264         Path src = (Path)args.getArguments()[0];
265         if ("file.txt".equals(src.getName())) {
266           return in;
267         } else {
268           throw new FileNotFoundException(src+" not supported by mocking");
269         }
270       }
271     });
272 
273     DistributedCache.addCacheFile(file, conf);
274     DistributedCache.addCacheFile(file, conf);
275     conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "101,101");
276     conf.set(MRJobConfig.CACHE_FILES_SIZES, "201,201");
277     conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "false,false");
278     conf.set(MRConfig.LOCAL_DIR, localDir.getAbsolutePath());
279     LocalDistributedCacheManager manager = new LocalDistributedCacheManager();
280     try {
281       manager.setup(conf);
282       assertTrue(link.exists());
283     } finally {
284       manager.close();
285     }
286     assertFalse(link.exists());
287   }
288 }
289