1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.tools.mapred; 20 21 import org.apache.commons.logging.Log; 22 import org.apache.commons.logging.LogFactory; 23 import org.apache.hadoop.conf.Configuration; 24 import org.apache.hadoop.fs.FileStatus; 25 import org.apache.hadoop.fs.FileSystem; 26 import org.apache.hadoop.fs.Path; 27 import org.apache.hadoop.fs.permission.FsPermission; 28 import org.apache.hadoop.hdfs.MiniDFSCluster; 29 import org.apache.hadoop.mapreduce.*; 30 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; 31 import org.apache.hadoop.mapreduce.task.JobContextImpl; 32 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 33 import org.apache.hadoop.tools.CopyListing; 34 import org.apache.hadoop.tools.DistCpConstants; 35 import org.apache.hadoop.tools.DistCpOptions; 36 import org.apache.hadoop.tools.DistCpOptions.FileAttribute; 37 import org.apache.hadoop.tools.GlobbedCopyListing; 38 import org.apache.hadoop.tools.util.TestDistCpUtils; 39 import org.apache.hadoop.security.Credentials; 40 import org.junit.*; 41 42 import java.io.IOException; 43 import java.util.*; 44 45 public class TestCopyCommitter { 46 private static final Log LOG = LogFactory.getLog(TestCopyCommitter.class); 47 48 private static final Random rand = new Random(); 49 50 private static final Credentials CREDENTIALS = new Credentials(); 51 public static final int PORT = 39737; 52 53 54 private static Configuration config; 55 private static MiniDFSCluster cluster; 56 getJobForClient()57 private static Job getJobForClient() throws IOException { 58 Job job = Job.getInstance(new Configuration()); 59 job.getConfiguration().set("mapred.job.tracker", "localhost:" + PORT); 60 job.setInputFormatClass(NullInputFormat.class); 61 job.setOutputFormatClass(NullOutputFormat.class); 62 job.setNumReduceTasks(0); 63 return job; 64 } 65 66 @BeforeClass create()67 public static void create() throws IOException { 68 config = getJobForClient().getConfiguration(); 69 config.setLong(DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED, 0); 70 cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).format(true) 71 .build(); 72 } 73 74 @AfterClass destroy()75 public static void destroy() { 76 if (cluster != null) { 77 cluster.shutdown(); 78 } 79 } 80 81 @Before createMetaFolder()82 public void createMetaFolder() { 83 config.set(DistCpConstants.CONF_LABEL_META_FOLDER, "/meta"); 84 Path meta = new Path("/meta"); 85 try { 86 cluster.getFileSystem().mkdirs(meta); 87 } catch (IOException e) { 88 LOG.error("Exception encountered while creating meta folder", e); 89 Assert.fail("Unable to create meta folder"); 90 } 91 } 92 93 @After cleanupMetaFolder()94 public void cleanupMetaFolder() { 95 Path meta = new Path("/meta"); 96 try { 97 if (cluster.getFileSystem().exists(meta)) { 98 cluster.getFileSystem().delete(meta, true); 99 Assert.fail("Expected meta folder to be deleted"); 100 } 101 } catch (IOException e) { 102 LOG.error("Exception encountered while cleaning up folder", e); 103 Assert.fail("Unable to clean up meta folder"); 104 } 105 } 106 107 @Test testNoCommitAction()108 public void testNoCommitAction() { 109 TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config); 110 JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(), 111 taskAttemptContext.getTaskAttemptID().getJobID()); 112 try { 113 OutputCommitter committer = new CopyCommitter(null, taskAttemptContext); 114 committer.commitJob(jobContext); 115 Assert.assertEquals(taskAttemptContext.getStatus(), "Commit Successful"); 116 117 //Test for idempotent commit 118 committer.commitJob(jobContext); 119 Assert.assertEquals(taskAttemptContext.getStatus(), "Commit Successful"); 120 } catch (IOException e) { 121 LOG.error("Exception encountered ", e); 122 Assert.fail("Commit failed"); 123 } 124 } 125 126 @Test testPreserveStatus()127 public void testPreserveStatus() { 128 TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config); 129 JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(), 130 taskAttemptContext.getTaskAttemptID().getJobID()); 131 Configuration conf = jobContext.getConfiguration(); 132 133 134 String sourceBase; 135 String targetBase; 136 FileSystem fs = null; 137 try { 138 OutputCommitter committer = new CopyCommitter(null, taskAttemptContext); 139 fs = FileSystem.get(conf); 140 FsPermission sourcePerm = new FsPermission((short) 511); 141 FsPermission initialPerm = new FsPermission((short) 448); 142 sourceBase = TestDistCpUtils.createTestSetup(fs, sourcePerm); 143 targetBase = TestDistCpUtils.createTestSetup(fs, initialPerm); 144 145 DistCpOptions options = new DistCpOptions(Arrays.asList(new Path(sourceBase)), 146 new Path("/out")); 147 options.preserve(FileAttribute.PERMISSION); 148 options.appendToConf(conf); 149 options.setTargetPathExists(false); 150 151 CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS); 152 Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong())); 153 listing.buildListing(listingFile, options); 154 155 conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase); 156 157 committer.commitJob(jobContext); 158 if (!checkDirectoryPermissions(fs, targetBase, sourcePerm)) { 159 Assert.fail("Permission don't match"); 160 } 161 162 //Test for idempotent commit 163 committer.commitJob(jobContext); 164 if (!checkDirectoryPermissions(fs, targetBase, sourcePerm)) { 165 Assert.fail("Permission don't match"); 166 } 167 168 } catch (IOException e) { 169 LOG.error("Exception encountered while testing for preserve status", e); 170 Assert.fail("Preserve status failure"); 171 } finally { 172 TestDistCpUtils.delete(fs, "/tmp1"); 173 conf.unset(DistCpConstants.CONF_LABEL_PRESERVE_STATUS); 174 } 175 176 } 177 178 @Test testDeleteMissing()179 public void testDeleteMissing() { 180 TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config); 181 JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(), 182 taskAttemptContext.getTaskAttemptID().getJobID()); 183 Configuration conf = jobContext.getConfiguration(); 184 185 String sourceBase; 186 String targetBase; 187 FileSystem fs = null; 188 try { 189 OutputCommitter committer = new CopyCommitter(null, taskAttemptContext); 190 fs = FileSystem.get(conf); 191 sourceBase = TestDistCpUtils.createTestSetup(fs, FsPermission.getDefault()); 192 targetBase = TestDistCpUtils.createTestSetup(fs, FsPermission.getDefault()); 193 String targetBaseAdd = TestDistCpUtils.createTestSetup(fs, FsPermission.getDefault()); 194 fs.rename(new Path(targetBaseAdd), new Path(targetBase)); 195 196 DistCpOptions options = new DistCpOptions(Arrays.asList(new Path(sourceBase)), 197 new Path("/out")); 198 options.setSyncFolder(true); 199 options.setDeleteMissing(true); 200 options.appendToConf(conf); 201 202 CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS); 203 Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong())); 204 listing.buildListing(listingFile, options); 205 206 conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase); 207 conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase); 208 209 committer.commitJob(jobContext); 210 if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, targetBase, sourceBase)) { 211 Assert.fail("Source and target folders are not in sync"); 212 } 213 if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, sourceBase, targetBase)) { 214 Assert.fail("Source and target folders are not in sync"); 215 } 216 217 //Test for idempotent commit 218 committer.commitJob(jobContext); 219 if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, targetBase, sourceBase)) { 220 Assert.fail("Source and target folders are not in sync"); 221 } 222 if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, sourceBase, targetBase)) { 223 Assert.fail("Source and target folders are not in sync"); 224 } 225 } catch (Throwable e) { 226 LOG.error("Exception encountered while testing for delete missing", e); 227 Assert.fail("Delete missing failure"); 228 } finally { 229 TestDistCpUtils.delete(fs, "/tmp1"); 230 conf.set(DistCpConstants.CONF_LABEL_DELETE_MISSING, "false"); 231 } 232 } 233 234 @Test testDeleteMissingFlatInterleavedFiles()235 public void testDeleteMissingFlatInterleavedFiles() { 236 TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config); 237 JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(), 238 taskAttemptContext.getTaskAttemptID().getJobID()); 239 Configuration conf = jobContext.getConfiguration(); 240 241 242 String sourceBase; 243 String targetBase; 244 FileSystem fs = null; 245 try { 246 OutputCommitter committer = new CopyCommitter(null, taskAttemptContext); 247 fs = FileSystem.get(conf); 248 sourceBase = "/tmp1/" + String.valueOf(rand.nextLong()); 249 targetBase = "/tmp1/" + String.valueOf(rand.nextLong()); 250 TestDistCpUtils.createFile(fs, sourceBase + "/1"); 251 TestDistCpUtils.createFile(fs, sourceBase + "/3"); 252 TestDistCpUtils.createFile(fs, sourceBase + "/4"); 253 TestDistCpUtils.createFile(fs, sourceBase + "/5"); 254 TestDistCpUtils.createFile(fs, sourceBase + "/7"); 255 TestDistCpUtils.createFile(fs, sourceBase + "/8"); 256 TestDistCpUtils.createFile(fs, sourceBase + "/9"); 257 258 TestDistCpUtils.createFile(fs, targetBase + "/2"); 259 TestDistCpUtils.createFile(fs, targetBase + "/4"); 260 TestDistCpUtils.createFile(fs, targetBase + "/5"); 261 TestDistCpUtils.createFile(fs, targetBase + "/7"); 262 TestDistCpUtils.createFile(fs, targetBase + "/9"); 263 TestDistCpUtils.createFile(fs, targetBase + "/A"); 264 265 DistCpOptions options = new DistCpOptions(Arrays.asList(new Path(sourceBase)), 266 new Path("/out")); 267 options.setSyncFolder(true); 268 options.setDeleteMissing(true); 269 options.appendToConf(conf); 270 271 CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS); 272 Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong())); 273 listing.buildListing(listingFile, options); 274 275 conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase); 276 conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase); 277 278 committer.commitJob(jobContext); 279 if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, targetBase, sourceBase)) { 280 Assert.fail("Source and target folders are not in sync"); 281 } 282 Assert.assertEquals(fs.listStatus(new Path(targetBase)).length, 4); 283 284 //Test for idempotent commit 285 committer.commitJob(jobContext); 286 if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, targetBase, sourceBase)) { 287 Assert.fail("Source and target folders are not in sync"); 288 } 289 Assert.assertEquals(fs.listStatus(new Path(targetBase)).length, 4); 290 } catch (IOException e) { 291 LOG.error("Exception encountered while testing for delete missing", e); 292 Assert.fail("Delete missing failure"); 293 } finally { 294 TestDistCpUtils.delete(fs, "/tmp1"); 295 conf.set(DistCpConstants.CONF_LABEL_DELETE_MISSING, "false"); 296 } 297 298 } 299 300 @Test testAtomicCommitMissingFinal()301 public void testAtomicCommitMissingFinal() { 302 TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config); 303 JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(), 304 taskAttemptContext.getTaskAttemptID().getJobID()); 305 Configuration conf = jobContext.getConfiguration(); 306 307 String workPath = "/tmp1/" + String.valueOf(rand.nextLong()); 308 String finalPath = "/tmp1/" + String.valueOf(rand.nextLong()); 309 FileSystem fs = null; 310 try { 311 OutputCommitter committer = new CopyCommitter(null, taskAttemptContext); 312 fs = FileSystem.get(conf); 313 fs.mkdirs(new Path(workPath)); 314 315 conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, workPath); 316 conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, finalPath); 317 conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, true); 318 319 Assert.assertTrue(fs.exists(new Path(workPath))); 320 Assert.assertFalse(fs.exists(new Path(finalPath))); 321 committer.commitJob(jobContext); 322 Assert.assertFalse(fs.exists(new Path(workPath))); 323 Assert.assertTrue(fs.exists(new Path(finalPath))); 324 325 //Test for idempotent commit 326 committer.commitJob(jobContext); 327 Assert.assertFalse(fs.exists(new Path(workPath))); 328 Assert.assertTrue(fs.exists(new Path(finalPath))); 329 330 } catch (IOException e) { 331 LOG.error("Exception encountered while testing for preserve status", e); 332 Assert.fail("Atomic commit failure"); 333 } finally { 334 TestDistCpUtils.delete(fs, workPath); 335 TestDistCpUtils.delete(fs, finalPath); 336 conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, false); 337 } 338 } 339 340 @Test testAtomicCommitExistingFinal()341 public void testAtomicCommitExistingFinal() { 342 TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config); 343 JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(), 344 taskAttemptContext.getTaskAttemptID().getJobID()); 345 Configuration conf = jobContext.getConfiguration(); 346 347 348 String workPath = "/tmp1/" + String.valueOf(rand.nextLong()); 349 String finalPath = "/tmp1/" + String.valueOf(rand.nextLong()); 350 FileSystem fs = null; 351 try { 352 OutputCommitter committer = new CopyCommitter(null, taskAttemptContext); 353 fs = FileSystem.get(conf); 354 fs.mkdirs(new Path(workPath)); 355 fs.mkdirs(new Path(finalPath)); 356 357 conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, workPath); 358 conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, finalPath); 359 conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, true); 360 361 Assert.assertTrue(fs.exists(new Path(workPath))); 362 Assert.assertTrue(fs.exists(new Path(finalPath))); 363 try { 364 committer.commitJob(jobContext); 365 Assert.fail("Should not be able to atomic-commit to pre-existing path."); 366 } catch(Exception exception) { 367 Assert.assertTrue(fs.exists(new Path(workPath))); 368 Assert.assertTrue(fs.exists(new Path(finalPath))); 369 LOG.info("Atomic-commit Test pass."); 370 } 371 372 } catch (IOException e) { 373 LOG.error("Exception encountered while testing for atomic commit.", e); 374 Assert.fail("Atomic commit failure"); 375 } finally { 376 TestDistCpUtils.delete(fs, workPath); 377 TestDistCpUtils.delete(fs, finalPath); 378 conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, false); 379 } 380 } 381 getTaskAttemptContext(Configuration conf)382 private TaskAttemptContext getTaskAttemptContext(Configuration conf) { 383 return new TaskAttemptContextImpl(conf, 384 new TaskAttemptID("200707121733", 1, TaskType.MAP, 1, 1)); 385 } 386 checkDirectoryPermissions(FileSystem fs, String targetBase, FsPermission sourcePerm)387 private boolean checkDirectoryPermissions(FileSystem fs, String targetBase, 388 FsPermission sourcePerm) throws IOException { 389 Path base = new Path(targetBase); 390 391 Stack<Path> stack = new Stack<Path>(); 392 stack.push(base); 393 while (!stack.isEmpty()) { 394 Path file = stack.pop(); 395 if (!fs.exists(file)) continue; 396 FileStatus[] fStatus = fs.listStatus(file); 397 if (fStatus == null || fStatus.length == 0) continue; 398 399 for (FileStatus status : fStatus) { 400 if (status.isDirectory()) { 401 stack.push(status.getPath()); 402 Assert.assertEquals(status.getPermission(), sourcePerm); 403 } 404 } 405 } 406 return true; 407 } 408 409 private static class NullInputFormat extends InputFormat { 410 @Override getSplits(JobContext context)411 public List getSplits(JobContext context) 412 throws IOException, InterruptedException { 413 return Collections.EMPTY_LIST; 414 } 415 416 @Override createRecordReader(InputSplit split, TaskAttemptContext context)417 public RecordReader createRecordReader(InputSplit split, 418 TaskAttemptContext context) 419 throws IOException, InterruptedException { 420 return null; 421 } 422 } 423 } 424