1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.tools; 19 20 import org.apache.hadoop.conf.Configuration; 21 import org.apache.hadoop.fs.FileStatus; 22 import org.apache.hadoop.fs.Path; 23 import org.apache.hadoop.hdfs.DFSTestUtil; 24 import org.apache.hadoop.hdfs.DistributedFileSystem; 25 import org.apache.hadoop.hdfs.HdfsConfiguration; 26 import org.apache.hadoop.hdfs.MiniDFSCluster; 27 import org.apache.hadoop.hdfs.protocol.HdfsConstants; 28 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; 29 import org.apache.hadoop.io.IOUtils; 30 import org.apache.hadoop.io.SequenceFile; 31 import org.apache.hadoop.io.Text; 32 import org.apache.hadoop.mapreduce.Mapper; 33 import org.apache.hadoop.security.Credentials; 34 import org.apache.hadoop.tools.mapred.CopyMapper; 35 import org.junit.After; 36 import org.junit.Assert; 37 import org.junit.Before; 38 import org.junit.Test; 39 40 import java.util.Arrays; 41 import java.util.HashMap; 42 import java.util.Map; 43 44 public class TestDistCpSync { 45 private MiniDFSCluster cluster; 46 private final Configuration conf = new HdfsConfiguration(); 47 private DistributedFileSystem dfs; 48 private DistCpOptions options; 49 private final Path source = new Path("/source"); 50 private final Path target = new Path("/target"); 51 private final long BLOCK_SIZE = 1024; 52 private final short DATA_NUM = 1; 53 54 @Before setUp()55 public void setUp() throws Exception { 56 cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DATA_NUM).build(); 57 cluster.waitActive(); 58 59 dfs = cluster.getFileSystem(); 60 dfs.mkdirs(source); 61 dfs.mkdirs(target); 62 63 options = new DistCpOptions(Arrays.asList(source), target); 64 options.setSyncFolder(true); 65 options.setDeleteMissing(true); 66 options.setUseDiff(true, "s1", "s2"); 67 options.appendToConf(conf); 68 69 conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, target.toString()); 70 conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, target.toString()); 71 } 72 73 @After tearDown()74 public void tearDown() throws Exception { 75 IOUtils.cleanup(null, dfs); 76 if (cluster != null) { 77 cluster.shutdown(); 78 } 79 } 80 81 /** 82 * Test the sync returns false in the following scenarios: 83 * 1. the source/target dir are not snapshottable dir 84 * 2. the source/target does not have the given snapshots 85 * 3. changes have been made in target 86 */ 87 @Test testFallback()88 public void testFallback() throws Exception { 89 // the source/target dir are not snapshottable dir 90 Assert.assertFalse(DistCpSync.sync(options, conf)); 91 // make sure the source path has been updated to the snapshot path 92 final Path spath = new Path(source, 93 HdfsConstants.DOT_SNAPSHOT_DIR + Path.SEPARATOR + "s2"); 94 Assert.assertEquals(spath, options.getSourcePaths().get(0)); 95 96 // reset source path in options 97 options.setSourcePaths(Arrays.asList(source)); 98 // the source/target does not have the given snapshots 99 dfs.allowSnapshot(source); 100 dfs.allowSnapshot(target); 101 Assert.assertFalse(DistCpSync.sync(options, conf)); 102 Assert.assertEquals(spath, options.getSourcePaths().get(0)); 103 104 // reset source path in options 105 options.setSourcePaths(Arrays.asList(source)); 106 dfs.createSnapshot(source, "s1"); 107 dfs.createSnapshot(source, "s2"); 108 dfs.createSnapshot(target, "s1"); 109 Assert.assertTrue(DistCpSync.sync(options, conf)); 110 111 // reset source paths in options 112 options.setSourcePaths(Arrays.asList(source)); 113 // changes have been made in target 114 final Path subTarget = new Path(target, "sub"); 115 dfs.mkdirs(subTarget); 116 Assert.assertFalse(DistCpSync.sync(options, conf)); 117 // make sure the source path has been updated to the snapshot path 118 Assert.assertEquals(spath, options.getSourcePaths().get(0)); 119 120 // reset source paths in options 121 options.setSourcePaths(Arrays.asList(source)); 122 dfs.delete(subTarget, true); 123 Assert.assertTrue(DistCpSync.sync(options, conf)); 124 } 125 126 /** 127 * create some files and directories under the given directory. 128 * the final subtree looks like this: 129 * dir/ 130 * foo/ bar/ 131 * d1/ f1 d2/ f2 132 * f3 f4 133 */ initData(Path dir)134 private void initData(Path dir) throws Exception { 135 final Path foo = new Path(dir, "foo"); 136 final Path bar = new Path(dir, "bar"); 137 final Path d1 = new Path(foo, "d1"); 138 final Path f1 = new Path(foo, "f1"); 139 final Path d2 = new Path(bar, "d2"); 140 final Path f2 = new Path(bar, "f2"); 141 final Path f3 = new Path(d1, "f3"); 142 final Path f4 = new Path(d2, "f4"); 143 144 DFSTestUtil.createFile(dfs, f1, BLOCK_SIZE, DATA_NUM, 0); 145 DFSTestUtil.createFile(dfs, f2, BLOCK_SIZE, DATA_NUM, 0); 146 DFSTestUtil.createFile(dfs, f3, BLOCK_SIZE, DATA_NUM, 0); 147 DFSTestUtil.createFile(dfs, f4, BLOCK_SIZE, DATA_NUM, 0); 148 } 149 150 /** 151 * make some changes under the given directory (created in the above way). 152 * 1. rename dir/foo/d1 to dir/bar/d1 153 * 2. delete dir/bar/d1/f3 154 * 3. rename dir/foo to /dir/bar/d1/foo 155 * 4. delete dir/bar/d1/foo/f1 156 * 5. create file dir/bar/d1/foo/f1 whose size is 2*BLOCK_SIZE 157 * 6. append one BLOCK to file dir/bar/f2 158 * 7. rename dir/bar to dir/foo 159 * 160 * Thus after all these ops the subtree looks like this: 161 * dir/ 162 * foo/ 163 * d1/ f2(A) d2/ 164 * foo/ f4 165 * f1(new) 166 */ changeData(Path dir)167 private void changeData(Path dir) throws Exception { 168 final Path foo = new Path(dir, "foo"); 169 final Path bar = new Path(dir, "bar"); 170 final Path d1 = new Path(foo, "d1"); 171 final Path f2 = new Path(bar, "f2"); 172 173 final Path bar_d1 = new Path(bar, "d1"); 174 dfs.rename(d1, bar_d1); 175 final Path f3 = new Path(bar_d1, "f3"); 176 dfs.delete(f3, true); 177 final Path newfoo = new Path(bar_d1, "foo"); 178 dfs.rename(foo, newfoo); 179 final Path f1 = new Path(newfoo, "f1"); 180 dfs.delete(f1, true); 181 DFSTestUtil.createFile(dfs, f1, 2 * BLOCK_SIZE, DATA_NUM, 0); 182 DFSTestUtil.appendFile(dfs, f2, (int) BLOCK_SIZE); 183 dfs.rename(bar, new Path(dir, "foo")); 184 } 185 186 /** 187 * Test the basic functionality. 188 */ 189 @Test testSync()190 public void testSync() throws Exception { 191 initData(source); 192 initData(target); 193 dfs.allowSnapshot(source); 194 dfs.allowSnapshot(target); 195 dfs.createSnapshot(source, "s1"); 196 dfs.createSnapshot(target, "s1"); 197 198 // make changes under source 199 changeData(source); 200 dfs.createSnapshot(source, "s2"); 201 202 // before sync, make some further changes on source. this should not affect 203 // the later distcp since we're copying (s2-s1) to target 204 final Path toDelete = new Path(source, "foo/d1/foo/f1"); 205 dfs.delete(toDelete, true); 206 final Path newdir = new Path(source, "foo/d1/foo/newdir"); 207 dfs.mkdirs(newdir); 208 209 // do the sync 210 Assert.assertTrue(DistCpSync.sync(options, conf)); 211 212 // make sure the source path has been updated to the snapshot path 213 final Path spath = new Path(source, 214 HdfsConstants.DOT_SNAPSHOT_DIR + Path.SEPARATOR + "s2"); 215 Assert.assertEquals(spath, options.getSourcePaths().get(0)); 216 217 // build copy listing 218 final Path listingPath = new Path("/tmp/META/fileList.seq"); 219 CopyListing listing = new GlobbedCopyListing(conf, new Credentials()); 220 listing.buildListing(listingPath, options); 221 222 Map<Text, CopyListingFileStatus> copyListing = getListing(listingPath); 223 CopyMapper copyMapper = new CopyMapper(); 224 StubContext stubContext = new StubContext(conf, null, 0); 225 Mapper<Text, CopyListingFileStatus, Text, Text>.Context context = 226 stubContext.getContext(); 227 // Enable append 228 context.getConfiguration().setBoolean( 229 DistCpOptionSwitch.APPEND.getConfigLabel(), true); 230 copyMapper.setup(context); 231 for (Map.Entry<Text, CopyListingFileStatus> entry : copyListing.entrySet()) { 232 copyMapper.map(entry.getKey(), entry.getValue(), context); 233 } 234 235 // verify that we only copied new appended data of f2 and the new file f1 236 Assert.assertEquals(BLOCK_SIZE * 3, stubContext.getReporter() 237 .getCounter(CopyMapper.Counter.BYTESCOPIED).getValue()); 238 239 // verify the source and target now has the same structure 240 verifyCopy(dfs.getFileStatus(spath), dfs.getFileStatus(target), false); 241 } 242 getListing(Path listingPath)243 private Map<Text, CopyListingFileStatus> getListing(Path listingPath) 244 throws Exception { 245 SequenceFile.Reader reader = new SequenceFile.Reader(conf, 246 SequenceFile.Reader.file(listingPath)); 247 Text key = new Text(); 248 CopyListingFileStatus value = new CopyListingFileStatus(); 249 Map<Text, CopyListingFileStatus> values = new HashMap<>(); 250 while (reader.next(key, value)) { 251 values.put(key, value); 252 key = new Text(); 253 value = new CopyListingFileStatus(); 254 } 255 return values; 256 } 257 verifyCopy(FileStatus s, FileStatus t, boolean compareName)258 private void verifyCopy(FileStatus s, FileStatus t, boolean compareName) 259 throws Exception { 260 Assert.assertEquals(s.isDirectory(), t.isDirectory()); 261 if (compareName) { 262 Assert.assertEquals(s.getPath().getName(), t.getPath().getName()); 263 } 264 if (!s.isDirectory()) { 265 // verify the file content is the same 266 byte[] sbytes = DFSTestUtil.readFileBuffer(dfs, s.getPath()); 267 byte[] tbytes = DFSTestUtil.readFileBuffer(dfs, t.getPath()); 268 Assert.assertArrayEquals(sbytes, tbytes); 269 } else { 270 FileStatus[] slist = dfs.listStatus(s.getPath()); 271 FileStatus[] tlist = dfs.listStatus(t.getPath()); 272 Assert.assertEquals(slist.length, tlist.length); 273 for (int i = 0; i < slist.length; i++) { 274 verifyCopy(slist[i], tlist[i], true); 275 } 276 } 277 } 278 279 /** 280 * Similar test with testSync, but the "to" snapshot is specified as "." 281 * @throws Exception 282 */ 283 @Test testSyncWithCurrent()284 public void testSyncWithCurrent() throws Exception { 285 options.setUseDiff(true, "s1", "."); 286 initData(source); 287 initData(target); 288 dfs.allowSnapshot(source); 289 dfs.allowSnapshot(target); 290 dfs.createSnapshot(source, "s1"); 291 dfs.createSnapshot(target, "s1"); 292 293 // make changes under source 294 changeData(source); 295 296 // do the sync 297 Assert.assertTrue(DistCpSync.sync(options, conf)); 298 // make sure the source path is still unchanged 299 Assert.assertEquals(source, options.getSourcePaths().get(0)); 300 } 301 initData2(Path dir)302 private void initData2(Path dir) throws Exception { 303 final Path test = new Path(dir, "test"); 304 final Path foo = new Path(dir, "foo"); 305 final Path bar = new Path(dir, "bar"); 306 final Path f1 = new Path(test, "f1"); 307 final Path f2 = new Path(foo, "f2"); 308 final Path f3 = new Path(bar, "f3"); 309 310 DFSTestUtil.createFile(dfs, f1, BLOCK_SIZE, DATA_NUM, 0L); 311 DFSTestUtil.createFile(dfs, f2, BLOCK_SIZE, DATA_NUM, 1L); 312 DFSTestUtil.createFile(dfs, f3, BLOCK_SIZE, DATA_NUM, 2L); 313 } 314 changeData2(Path dir)315 private void changeData2(Path dir) throws Exception { 316 final Path tmpFoo = new Path(dir, "tmpFoo"); 317 final Path test = new Path(dir, "test"); 318 final Path foo = new Path(dir, "foo"); 319 final Path bar = new Path(dir, "bar"); 320 321 dfs.rename(test, tmpFoo); 322 dfs.rename(foo, test); 323 dfs.rename(bar, foo); 324 dfs.rename(tmpFoo, bar); 325 } 326 327 @Test testSync2()328 public void testSync2() throws Exception { 329 initData2(source); 330 initData2(target); 331 dfs.allowSnapshot(source); 332 dfs.allowSnapshot(target); 333 dfs.createSnapshot(source, "s1"); 334 dfs.createSnapshot(target, "s1"); 335 336 // make changes under source 337 changeData2(source); 338 dfs.createSnapshot(source, "s2"); 339 340 SnapshotDiffReport report = dfs.getSnapshotDiffReport(source, "s1", "s2"); 341 System.out.println(report); 342 343 // do the sync 344 Assert.assertTrue(DistCpSync.sync(options, conf)); 345 verifyCopy(dfs.getFileStatus(source), dfs.getFileStatus(target), false); 346 } 347 initData3(Path dir)348 private void initData3(Path dir) throws Exception { 349 final Path test = new Path(dir, "test"); 350 final Path foo = new Path(dir, "foo"); 351 final Path bar = new Path(dir, "bar"); 352 final Path f1 = new Path(test, "file"); 353 final Path f2 = new Path(foo, "file"); 354 final Path f3 = new Path(bar, "file"); 355 356 DFSTestUtil.createFile(dfs, f1, BLOCK_SIZE, DATA_NUM, 0L); 357 DFSTestUtil.createFile(dfs, f2, BLOCK_SIZE * 2, DATA_NUM, 1L); 358 DFSTestUtil.createFile(dfs, f3, BLOCK_SIZE * 3, DATA_NUM, 2L); 359 } 360 changeData3(Path dir)361 private void changeData3(Path dir) throws Exception { 362 final Path test = new Path(dir, "test"); 363 final Path foo = new Path(dir, "foo"); 364 final Path bar = new Path(dir, "bar"); 365 final Path f1 = new Path(test, "file"); 366 final Path f2 = new Path(foo, "file"); 367 final Path f3 = new Path(bar, "file"); 368 final Path newf1 = new Path(test, "newfile"); 369 final Path newf2 = new Path(foo, "newfile"); 370 final Path newf3 = new Path(bar, "newfile"); 371 372 dfs.rename(f1, newf1); 373 dfs.rename(f2, newf2); 374 dfs.rename(f3, newf3); 375 } 376 377 /** 378 * Test a case where there are multiple source files with the same name 379 */ 380 @Test testSync3()381 public void testSync3() throws Exception { 382 initData3(source); 383 initData3(target); 384 dfs.allowSnapshot(source); 385 dfs.allowSnapshot(target); 386 dfs.createSnapshot(source, "s1"); 387 dfs.createSnapshot(target, "s1"); 388 389 // make changes under source 390 changeData3(source); 391 dfs.createSnapshot(source, "s2"); 392 393 SnapshotDiffReport report = dfs.getSnapshotDiffReport(source, "s1", "s2"); 394 System.out.println(report); 395 396 // do the sync 397 Assert.assertTrue(DistCpSync.sync(options, conf)); 398 verifyCopy(dfs.getFileStatus(source), dfs.getFileStatus(target), false); 399 } 400 } 401