1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.tools.mapred;
20 
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.apache.hadoop.conf.Configuration;
24 import org.apache.hadoop.fs.FileStatus;
25 import org.apache.hadoop.fs.FileSystem;
26 import org.apache.hadoop.fs.Path;
27 import org.apache.hadoop.fs.permission.FsPermission;
28 import org.apache.hadoop.hdfs.MiniDFSCluster;
29 import org.apache.hadoop.mapreduce.*;
30 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
31 import org.apache.hadoop.mapreduce.task.JobContextImpl;
32 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
33 import org.apache.hadoop.tools.CopyListing;
34 import org.apache.hadoop.tools.DistCpConstants;
35 import org.apache.hadoop.tools.DistCpOptions;
36 import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
37 import org.apache.hadoop.tools.GlobbedCopyListing;
38 import org.apache.hadoop.tools.util.TestDistCpUtils;
39 import org.apache.hadoop.security.Credentials;
40 import org.junit.*;
41 
42 import java.io.IOException;
43 import java.util.*;
44 
45 public class TestCopyCommitter {
46   private static final Log LOG = LogFactory.getLog(TestCopyCommitter.class);
47 
48   private static final Random rand = new Random();
49 
50   private static final Credentials CREDENTIALS = new Credentials();
51   public static final int PORT = 39737;
52 
53 
54   private static Configuration config;
55   private static MiniDFSCluster cluster;
56 
getJobForClient()57   private static Job getJobForClient() throws IOException {
58     Job job = Job.getInstance(new Configuration());
59     job.getConfiguration().set("mapred.job.tracker", "localhost:" + PORT);
60     job.setInputFormatClass(NullInputFormat.class);
61     job.setOutputFormatClass(NullOutputFormat.class);
62     job.setNumReduceTasks(0);
63     return job;
64   }
65 
66   @BeforeClass
create()67   public static void create() throws IOException {
68     config = getJobForClient().getConfiguration();
69     config.setLong(DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED, 0);
70     cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).format(true)
71                       .build();
72   }
73 
74   @AfterClass
destroy()75   public static void destroy() {
76     if (cluster != null) {
77       cluster.shutdown();
78     }
79   }
80 
81   @Before
createMetaFolder()82   public void createMetaFolder() {
83     config.set(DistCpConstants.CONF_LABEL_META_FOLDER, "/meta");
84     Path meta = new Path("/meta");
85     try {
86       cluster.getFileSystem().mkdirs(meta);
87     } catch (IOException e) {
88       LOG.error("Exception encountered while creating meta folder", e);
89       Assert.fail("Unable to create meta folder");
90     }
91   }
92 
93   @After
cleanupMetaFolder()94   public void cleanupMetaFolder() {
95     Path meta = new Path("/meta");
96     try {
97       if (cluster.getFileSystem().exists(meta)) {
98         cluster.getFileSystem().delete(meta, true);
99         Assert.fail("Expected meta folder to be deleted");
100       }
101     } catch (IOException e) {
102       LOG.error("Exception encountered while cleaning up folder", e);
103       Assert.fail("Unable to clean up meta folder");
104     }
105   }
106 
107   @Test
testNoCommitAction()108   public void testNoCommitAction() {
109     TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
110     JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(),
111         taskAttemptContext.getTaskAttemptID().getJobID());
112     try {
113       OutputCommitter committer = new CopyCommitter(null, taskAttemptContext);
114       committer.commitJob(jobContext);
115       Assert.assertEquals(taskAttemptContext.getStatus(), "Commit Successful");
116 
117       //Test for idempotent commit
118       committer.commitJob(jobContext);
119       Assert.assertEquals(taskAttemptContext.getStatus(), "Commit Successful");
120     } catch (IOException e) {
121       LOG.error("Exception encountered ", e);
122       Assert.fail("Commit failed");
123     }
124   }
125 
126   @Test
testPreserveStatus()127   public void testPreserveStatus() {
128     TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
129     JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(),
130         taskAttemptContext.getTaskAttemptID().getJobID());
131     Configuration conf = jobContext.getConfiguration();
132 
133 
134     String sourceBase;
135     String targetBase;
136     FileSystem fs = null;
137     try {
138       OutputCommitter committer = new CopyCommitter(null, taskAttemptContext);
139       fs = FileSystem.get(conf);
140       FsPermission sourcePerm = new FsPermission((short) 511);
141       FsPermission initialPerm = new FsPermission((short) 448);
142       sourceBase = TestDistCpUtils.createTestSetup(fs, sourcePerm);
143       targetBase = TestDistCpUtils.createTestSetup(fs, initialPerm);
144 
145       DistCpOptions options = new DistCpOptions(Arrays.asList(new Path(sourceBase)),
146           new Path("/out"));
147       options.preserve(FileAttribute.PERMISSION);
148       options.appendToConf(conf);
149       options.setTargetPathExists(false);
150 
151       CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS);
152       Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong()));
153       listing.buildListing(listingFile, options);
154 
155       conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
156 
157       committer.commitJob(jobContext);
158       if (!checkDirectoryPermissions(fs, targetBase, sourcePerm)) {
159         Assert.fail("Permission don't match");
160       }
161 
162       //Test for idempotent commit
163       committer.commitJob(jobContext);
164       if (!checkDirectoryPermissions(fs, targetBase, sourcePerm)) {
165         Assert.fail("Permission don't match");
166       }
167 
168     } catch (IOException e) {
169       LOG.error("Exception encountered while testing for preserve status", e);
170       Assert.fail("Preserve status failure");
171     } finally {
172       TestDistCpUtils.delete(fs, "/tmp1");
173       conf.unset(DistCpConstants.CONF_LABEL_PRESERVE_STATUS);
174     }
175 
176   }
177 
178   @Test
testDeleteMissing()179   public void testDeleteMissing() {
180     TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
181     JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(),
182         taskAttemptContext.getTaskAttemptID().getJobID());
183     Configuration conf = jobContext.getConfiguration();
184 
185     String sourceBase;
186     String targetBase;
187     FileSystem fs = null;
188     try {
189       OutputCommitter committer = new CopyCommitter(null, taskAttemptContext);
190       fs = FileSystem.get(conf);
191       sourceBase = TestDistCpUtils.createTestSetup(fs, FsPermission.getDefault());
192       targetBase = TestDistCpUtils.createTestSetup(fs, FsPermission.getDefault());
193       String targetBaseAdd = TestDistCpUtils.createTestSetup(fs, FsPermission.getDefault());
194       fs.rename(new Path(targetBaseAdd), new Path(targetBase));
195 
196       DistCpOptions options = new DistCpOptions(Arrays.asList(new Path(sourceBase)),
197           new Path("/out"));
198       options.setSyncFolder(true);
199       options.setDeleteMissing(true);
200       options.appendToConf(conf);
201 
202       CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS);
203       Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong()));
204       listing.buildListing(listingFile, options);
205 
206       conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
207       conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase);
208 
209       committer.commitJob(jobContext);
210       if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, targetBase, sourceBase)) {
211         Assert.fail("Source and target folders are not in sync");
212       }
213       if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, sourceBase, targetBase)) {
214         Assert.fail("Source and target folders are not in sync");
215       }
216 
217       //Test for idempotent commit
218       committer.commitJob(jobContext);
219       if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, targetBase, sourceBase)) {
220         Assert.fail("Source and target folders are not in sync");
221       }
222       if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, sourceBase, targetBase)) {
223         Assert.fail("Source and target folders are not in sync");
224       }
225     } catch (Throwable e) {
226       LOG.error("Exception encountered while testing for delete missing", e);
227       Assert.fail("Delete missing failure");
228     } finally {
229       TestDistCpUtils.delete(fs, "/tmp1");
230       conf.set(DistCpConstants.CONF_LABEL_DELETE_MISSING, "false");
231     }
232   }
233 
234   @Test
testDeleteMissingFlatInterleavedFiles()235   public void testDeleteMissingFlatInterleavedFiles() {
236     TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
237     JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(),
238         taskAttemptContext.getTaskAttemptID().getJobID());
239     Configuration conf = jobContext.getConfiguration();
240 
241 
242     String sourceBase;
243     String targetBase;
244     FileSystem fs = null;
245     try {
246       OutputCommitter committer = new CopyCommitter(null, taskAttemptContext);
247       fs = FileSystem.get(conf);
248       sourceBase = "/tmp1/" + String.valueOf(rand.nextLong());
249       targetBase = "/tmp1/" + String.valueOf(rand.nextLong());
250       TestDistCpUtils.createFile(fs, sourceBase + "/1");
251       TestDistCpUtils.createFile(fs, sourceBase + "/3");
252       TestDistCpUtils.createFile(fs, sourceBase + "/4");
253       TestDistCpUtils.createFile(fs, sourceBase + "/5");
254       TestDistCpUtils.createFile(fs, sourceBase + "/7");
255       TestDistCpUtils.createFile(fs, sourceBase + "/8");
256       TestDistCpUtils.createFile(fs, sourceBase + "/9");
257 
258       TestDistCpUtils.createFile(fs, targetBase + "/2");
259       TestDistCpUtils.createFile(fs, targetBase + "/4");
260       TestDistCpUtils.createFile(fs, targetBase + "/5");
261       TestDistCpUtils.createFile(fs, targetBase + "/7");
262       TestDistCpUtils.createFile(fs, targetBase + "/9");
263       TestDistCpUtils.createFile(fs, targetBase + "/A");
264 
265       DistCpOptions options = new DistCpOptions(Arrays.asList(new Path(sourceBase)),
266           new Path("/out"));
267       options.setSyncFolder(true);
268       options.setDeleteMissing(true);
269       options.appendToConf(conf);
270 
271       CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS);
272       Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong()));
273       listing.buildListing(listingFile, options);
274 
275       conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
276       conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase);
277 
278       committer.commitJob(jobContext);
279       if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, targetBase, sourceBase)) {
280         Assert.fail("Source and target folders are not in sync");
281       }
282       Assert.assertEquals(fs.listStatus(new Path(targetBase)).length, 4);
283 
284       //Test for idempotent commit
285       committer.commitJob(jobContext);
286       if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, targetBase, sourceBase)) {
287         Assert.fail("Source and target folders are not in sync");
288       }
289       Assert.assertEquals(fs.listStatus(new Path(targetBase)).length, 4);
290     } catch (IOException e) {
291       LOG.error("Exception encountered while testing for delete missing", e);
292       Assert.fail("Delete missing failure");
293     } finally {
294       TestDistCpUtils.delete(fs, "/tmp1");
295       conf.set(DistCpConstants.CONF_LABEL_DELETE_MISSING, "false");
296     }
297 
298   }
299 
300   @Test
testAtomicCommitMissingFinal()301   public void testAtomicCommitMissingFinal() {
302     TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
303     JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(),
304         taskAttemptContext.getTaskAttemptID().getJobID());
305     Configuration conf = jobContext.getConfiguration();
306 
307     String workPath = "/tmp1/" + String.valueOf(rand.nextLong());
308     String finalPath = "/tmp1/" + String.valueOf(rand.nextLong());
309     FileSystem fs = null;
310     try {
311       OutputCommitter committer = new CopyCommitter(null, taskAttemptContext);
312       fs = FileSystem.get(conf);
313       fs.mkdirs(new Path(workPath));
314 
315       conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, workPath);
316       conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, finalPath);
317       conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, true);
318 
319       Assert.assertTrue(fs.exists(new Path(workPath)));
320       Assert.assertFalse(fs.exists(new Path(finalPath)));
321       committer.commitJob(jobContext);
322       Assert.assertFalse(fs.exists(new Path(workPath)));
323       Assert.assertTrue(fs.exists(new Path(finalPath)));
324 
325       //Test for idempotent commit
326       committer.commitJob(jobContext);
327       Assert.assertFalse(fs.exists(new Path(workPath)));
328       Assert.assertTrue(fs.exists(new Path(finalPath)));
329 
330     } catch (IOException e) {
331       LOG.error("Exception encountered while testing for preserve status", e);
332       Assert.fail("Atomic commit failure");
333     } finally {
334       TestDistCpUtils.delete(fs, workPath);
335       TestDistCpUtils.delete(fs, finalPath);
336       conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, false);
337     }
338   }
339 
340   @Test
testAtomicCommitExistingFinal()341   public void testAtomicCommitExistingFinal() {
342     TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
343     JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(),
344         taskAttemptContext.getTaskAttemptID().getJobID());
345     Configuration conf = jobContext.getConfiguration();
346 
347 
348     String workPath = "/tmp1/" + String.valueOf(rand.nextLong());
349     String finalPath = "/tmp1/" + String.valueOf(rand.nextLong());
350     FileSystem fs = null;
351     try {
352       OutputCommitter committer = new CopyCommitter(null, taskAttemptContext);
353       fs = FileSystem.get(conf);
354       fs.mkdirs(new Path(workPath));
355       fs.mkdirs(new Path(finalPath));
356 
357       conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, workPath);
358       conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, finalPath);
359       conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, true);
360 
361       Assert.assertTrue(fs.exists(new Path(workPath)));
362       Assert.assertTrue(fs.exists(new Path(finalPath)));
363       try {
364         committer.commitJob(jobContext);
365         Assert.fail("Should not be able to atomic-commit to pre-existing path.");
366       } catch(Exception exception) {
367         Assert.assertTrue(fs.exists(new Path(workPath)));
368         Assert.assertTrue(fs.exists(new Path(finalPath)));
369         LOG.info("Atomic-commit Test pass.");
370       }
371 
372     } catch (IOException e) {
373       LOG.error("Exception encountered while testing for atomic commit.", e);
374       Assert.fail("Atomic commit failure");
375     } finally {
376       TestDistCpUtils.delete(fs, workPath);
377       TestDistCpUtils.delete(fs, finalPath);
378       conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, false);
379     }
380   }
381 
getTaskAttemptContext(Configuration conf)382   private TaskAttemptContext getTaskAttemptContext(Configuration conf) {
383     return new TaskAttemptContextImpl(conf,
384         new TaskAttemptID("200707121733", 1, TaskType.MAP, 1, 1));
385   }
386 
checkDirectoryPermissions(FileSystem fs, String targetBase, FsPermission sourcePerm)387   private boolean checkDirectoryPermissions(FileSystem fs, String targetBase,
388                                             FsPermission sourcePerm) throws IOException {
389     Path base = new Path(targetBase);
390 
391     Stack<Path> stack = new Stack<Path>();
392     stack.push(base);
393     while (!stack.isEmpty()) {
394       Path file = stack.pop();
395       if (!fs.exists(file)) continue;
396       FileStatus[] fStatus = fs.listStatus(file);
397       if (fStatus == null || fStatus.length == 0) continue;
398 
399       for (FileStatus status : fStatus) {
400         if (status.isDirectory()) {
401           stack.push(status.getPath());
402           Assert.assertEquals(status.getPermission(), sourcePerm);
403         }
404       }
405     }
406     return true;
407   }
408 
409   private static class NullInputFormat extends InputFormat {
410     @Override
getSplits(JobContext context)411     public List getSplits(JobContext context)
412         throws IOException, InterruptedException {
413       return Collections.EMPTY_LIST;
414     }
415 
416     @Override
createRecordReader(InputSplit split, TaskAttemptContext context)417     public RecordReader createRecordReader(InputSplit split,
418                                            TaskAttemptContext context)
419         throws IOException, InterruptedException {
420       return null;
421     }
422   }
423 }
424