1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.tools;
19 
20 import org.apache.hadoop.conf.Configuration;
21 import org.apache.hadoop.fs.FileStatus;
22 import org.apache.hadoop.fs.Path;
23 import org.apache.hadoop.hdfs.DFSTestUtil;
24 import org.apache.hadoop.hdfs.DistributedFileSystem;
25 import org.apache.hadoop.hdfs.HdfsConfiguration;
26 import org.apache.hadoop.hdfs.MiniDFSCluster;
27 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
28 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
29 import org.apache.hadoop.io.IOUtils;
30 import org.apache.hadoop.io.SequenceFile;
31 import org.apache.hadoop.io.Text;
32 import org.apache.hadoop.mapreduce.Mapper;
33 import org.apache.hadoop.security.Credentials;
34 import org.apache.hadoop.tools.mapred.CopyMapper;
35 import org.junit.After;
36 import org.junit.Assert;
37 import org.junit.Before;
38 import org.junit.Test;
39 
40 import java.util.Arrays;
41 import java.util.HashMap;
42 import java.util.Map;
43 
44 public class TestDistCpSync {
45   private MiniDFSCluster cluster;
46   private final Configuration conf = new HdfsConfiguration();
47   private DistributedFileSystem dfs;
48   private DistCpOptions options;
49   private final Path source = new Path("/source");
50   private final Path target = new Path("/target");
51   private final long BLOCK_SIZE = 1024;
52   private final short DATA_NUM = 1;
53 
54   @Before
setUp()55   public void setUp() throws Exception {
56     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DATA_NUM).build();
57     cluster.waitActive();
58 
59     dfs = cluster.getFileSystem();
60     dfs.mkdirs(source);
61     dfs.mkdirs(target);
62 
63     options = new DistCpOptions(Arrays.asList(source), target);
64     options.setSyncFolder(true);
65     options.setDeleteMissing(true);
66     options.setUseDiff(true, "s1", "s2");
67     options.appendToConf(conf);
68 
69     conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, target.toString());
70     conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, target.toString());
71   }
72 
73   @After
tearDown()74   public void tearDown() throws Exception {
75     IOUtils.cleanup(null, dfs);
76     if (cluster != null) {
77       cluster.shutdown();
78     }
79   }
80 
81   /**
82    * Test the sync returns false in the following scenarios:
83    * 1. the source/target dir are not snapshottable dir
84    * 2. the source/target does not have the given snapshots
85    * 3. changes have been made in target
86    */
87   @Test
testFallback()88   public void testFallback() throws Exception {
89     // the source/target dir are not snapshottable dir
90     Assert.assertFalse(DistCpSync.sync(options, conf));
91     // make sure the source path has been updated to the snapshot path
92     final Path spath = new Path(source,
93         HdfsConstants.DOT_SNAPSHOT_DIR + Path.SEPARATOR + "s2");
94     Assert.assertEquals(spath, options.getSourcePaths().get(0));
95 
96     // reset source path in options
97     options.setSourcePaths(Arrays.asList(source));
98     // the source/target does not have the given snapshots
99     dfs.allowSnapshot(source);
100     dfs.allowSnapshot(target);
101     Assert.assertFalse(DistCpSync.sync(options, conf));
102     Assert.assertEquals(spath, options.getSourcePaths().get(0));
103 
104     // reset source path in options
105     options.setSourcePaths(Arrays.asList(source));
106     dfs.createSnapshot(source, "s1");
107     dfs.createSnapshot(source, "s2");
108     dfs.createSnapshot(target, "s1");
109     Assert.assertTrue(DistCpSync.sync(options, conf));
110 
111     // reset source paths in options
112     options.setSourcePaths(Arrays.asList(source));
113     // changes have been made in target
114     final Path subTarget = new Path(target, "sub");
115     dfs.mkdirs(subTarget);
116     Assert.assertFalse(DistCpSync.sync(options, conf));
117     // make sure the source path has been updated to the snapshot path
118     Assert.assertEquals(spath, options.getSourcePaths().get(0));
119 
120     // reset source paths in options
121     options.setSourcePaths(Arrays.asList(source));
122     dfs.delete(subTarget, true);
123     Assert.assertTrue(DistCpSync.sync(options, conf));
124   }
125 
126   /**
127    * create some files and directories under the given directory.
128    * the final subtree looks like this:
129    *                     dir/
130    *              foo/          bar/
131    *           d1/    f1     d2/    f2
132    *         f3            f4
133    */
initData(Path dir)134   private void initData(Path dir) throws Exception {
135     final Path foo = new Path(dir, "foo");
136     final Path bar = new Path(dir, "bar");
137     final Path d1 = new Path(foo, "d1");
138     final Path f1 = new Path(foo, "f1");
139     final Path d2 = new Path(bar, "d2");
140     final Path f2 = new Path(bar, "f2");
141     final Path f3 = new Path(d1, "f3");
142     final Path f4 = new Path(d2, "f4");
143 
144     DFSTestUtil.createFile(dfs, f1, BLOCK_SIZE, DATA_NUM, 0);
145     DFSTestUtil.createFile(dfs, f2, BLOCK_SIZE, DATA_NUM, 0);
146     DFSTestUtil.createFile(dfs, f3, BLOCK_SIZE, DATA_NUM, 0);
147     DFSTestUtil.createFile(dfs, f4, BLOCK_SIZE, DATA_NUM, 0);
148   }
149 
150   /**
151    * make some changes under the given directory (created in the above way).
152    * 1. rename dir/foo/d1 to dir/bar/d1
153    * 2. delete dir/bar/d1/f3
154    * 3. rename dir/foo to /dir/bar/d1/foo
155    * 4. delete dir/bar/d1/foo/f1
156    * 5. create file dir/bar/d1/foo/f1 whose size is 2*BLOCK_SIZE
157    * 6. append one BLOCK to file dir/bar/f2
158    * 7. rename dir/bar to dir/foo
159    *
160    * Thus after all these ops the subtree looks like this:
161    *                       dir/
162    *                       foo/
163    *                 d1/    f2(A)    d2/
164    *                foo/             f4
165    *                f1(new)
166    */
changeData(Path dir)167   private void changeData(Path dir) throws Exception {
168     final Path foo = new Path(dir, "foo");
169     final Path bar = new Path(dir, "bar");
170     final Path d1 = new Path(foo, "d1");
171     final Path f2 = new Path(bar, "f2");
172 
173     final Path bar_d1 = new Path(bar, "d1");
174     dfs.rename(d1, bar_d1);
175     final Path f3 = new Path(bar_d1, "f3");
176     dfs.delete(f3, true);
177     final Path newfoo = new Path(bar_d1, "foo");
178     dfs.rename(foo, newfoo);
179     final Path f1 = new Path(newfoo, "f1");
180     dfs.delete(f1, true);
181     DFSTestUtil.createFile(dfs, f1, 2 * BLOCK_SIZE, DATA_NUM, 0);
182     DFSTestUtil.appendFile(dfs, f2, (int) BLOCK_SIZE);
183     dfs.rename(bar, new Path(dir, "foo"));
184   }
185 
186   /**
187    * Test the basic functionality.
188    */
189   @Test
testSync()190   public void testSync() throws Exception {
191     initData(source);
192     initData(target);
193     dfs.allowSnapshot(source);
194     dfs.allowSnapshot(target);
195     dfs.createSnapshot(source, "s1");
196     dfs.createSnapshot(target, "s1");
197 
198     // make changes under source
199     changeData(source);
200     dfs.createSnapshot(source, "s2");
201 
202     // before sync, make some further changes on source. this should not affect
203     // the later distcp since we're copying (s2-s1) to target
204     final Path toDelete = new Path(source, "foo/d1/foo/f1");
205     dfs.delete(toDelete, true);
206     final Path newdir = new Path(source, "foo/d1/foo/newdir");
207     dfs.mkdirs(newdir);
208 
209     // do the sync
210     Assert.assertTrue(DistCpSync.sync(options, conf));
211 
212     // make sure the source path has been updated to the snapshot path
213     final Path spath = new Path(source,
214         HdfsConstants.DOT_SNAPSHOT_DIR + Path.SEPARATOR + "s2");
215     Assert.assertEquals(spath, options.getSourcePaths().get(0));
216 
217     // build copy listing
218     final Path listingPath = new Path("/tmp/META/fileList.seq");
219     CopyListing listing = new GlobbedCopyListing(conf, new Credentials());
220     listing.buildListing(listingPath, options);
221 
222     Map<Text, CopyListingFileStatus> copyListing = getListing(listingPath);
223     CopyMapper copyMapper = new CopyMapper();
224     StubContext stubContext = new StubContext(conf, null, 0);
225     Mapper<Text, CopyListingFileStatus, Text, Text>.Context context =
226         stubContext.getContext();
227     // Enable append
228     context.getConfiguration().setBoolean(
229         DistCpOptionSwitch.APPEND.getConfigLabel(), true);
230     copyMapper.setup(context);
231     for (Map.Entry<Text, CopyListingFileStatus> entry : copyListing.entrySet()) {
232       copyMapper.map(entry.getKey(), entry.getValue(), context);
233     }
234 
235     // verify that we only copied new appended data of f2 and the new file f1
236     Assert.assertEquals(BLOCK_SIZE * 3, stubContext.getReporter()
237         .getCounter(CopyMapper.Counter.BYTESCOPIED).getValue());
238 
239     // verify the source and target now has the same structure
240     verifyCopy(dfs.getFileStatus(spath), dfs.getFileStatus(target), false);
241   }
242 
getListing(Path listingPath)243   private Map<Text, CopyListingFileStatus> getListing(Path listingPath)
244       throws Exception {
245     SequenceFile.Reader reader = new SequenceFile.Reader(conf,
246         SequenceFile.Reader.file(listingPath));
247     Text key = new Text();
248     CopyListingFileStatus value = new CopyListingFileStatus();
249     Map<Text, CopyListingFileStatus> values = new HashMap<>();
250     while (reader.next(key, value)) {
251       values.put(key, value);
252       key = new Text();
253       value = new CopyListingFileStatus();
254     }
255     return values;
256   }
257 
verifyCopy(FileStatus s, FileStatus t, boolean compareName)258   private void verifyCopy(FileStatus s, FileStatus t, boolean compareName)
259       throws Exception {
260     Assert.assertEquals(s.isDirectory(), t.isDirectory());
261     if (compareName) {
262       Assert.assertEquals(s.getPath().getName(), t.getPath().getName());
263     }
264     if (!s.isDirectory()) {
265       // verify the file content is the same
266       byte[] sbytes = DFSTestUtil.readFileBuffer(dfs, s.getPath());
267       byte[] tbytes = DFSTestUtil.readFileBuffer(dfs, t.getPath());
268       Assert.assertArrayEquals(sbytes, tbytes);
269     } else {
270       FileStatus[] slist = dfs.listStatus(s.getPath());
271       FileStatus[] tlist = dfs.listStatus(t.getPath());
272       Assert.assertEquals(slist.length, tlist.length);
273       for (int i = 0; i < slist.length; i++) {
274         verifyCopy(slist[i], tlist[i], true);
275       }
276     }
277   }
278 
279   /**
280    * Similar test with testSync, but the "to" snapshot is specified as "."
281    * @throws Exception
282    */
283   @Test
testSyncWithCurrent()284   public void testSyncWithCurrent() throws Exception {
285     options.setUseDiff(true, "s1", ".");
286     initData(source);
287     initData(target);
288     dfs.allowSnapshot(source);
289     dfs.allowSnapshot(target);
290     dfs.createSnapshot(source, "s1");
291     dfs.createSnapshot(target, "s1");
292 
293     // make changes under source
294     changeData(source);
295 
296     // do the sync
297     Assert.assertTrue(DistCpSync.sync(options, conf));
298     // make sure the source path is still unchanged
299     Assert.assertEquals(source, options.getSourcePaths().get(0));
300   }
301 
initData2(Path dir)302   private void initData2(Path dir) throws Exception {
303     final Path test = new Path(dir, "test");
304     final Path foo = new Path(dir, "foo");
305     final Path bar = new Path(dir, "bar");
306     final Path f1 = new Path(test, "f1");
307     final Path f2 = new Path(foo, "f2");
308     final Path f3 = new Path(bar, "f3");
309 
310     DFSTestUtil.createFile(dfs, f1, BLOCK_SIZE, DATA_NUM, 0L);
311     DFSTestUtil.createFile(dfs, f2, BLOCK_SIZE, DATA_NUM, 1L);
312     DFSTestUtil.createFile(dfs, f3, BLOCK_SIZE, DATA_NUM, 2L);
313   }
314 
changeData2(Path dir)315   private void changeData2(Path dir) throws Exception {
316     final Path tmpFoo = new Path(dir, "tmpFoo");
317     final Path test = new Path(dir, "test");
318     final Path foo = new Path(dir, "foo");
319     final Path bar = new Path(dir, "bar");
320 
321     dfs.rename(test, tmpFoo);
322     dfs.rename(foo, test);
323     dfs.rename(bar, foo);
324     dfs.rename(tmpFoo, bar);
325   }
326 
327   @Test
testSync2()328   public void testSync2() throws Exception {
329     initData2(source);
330     initData2(target);
331     dfs.allowSnapshot(source);
332     dfs.allowSnapshot(target);
333     dfs.createSnapshot(source, "s1");
334     dfs.createSnapshot(target, "s1");
335 
336     // make changes under source
337     changeData2(source);
338     dfs.createSnapshot(source, "s2");
339 
340     SnapshotDiffReport report = dfs.getSnapshotDiffReport(source, "s1", "s2");
341     System.out.println(report);
342 
343     // do the sync
344     Assert.assertTrue(DistCpSync.sync(options, conf));
345     verifyCopy(dfs.getFileStatus(source), dfs.getFileStatus(target), false);
346   }
347 
initData3(Path dir)348   private void initData3(Path dir) throws Exception {
349     final Path test = new Path(dir, "test");
350     final Path foo = new Path(dir, "foo");
351     final Path bar = new Path(dir, "bar");
352     final Path f1 = new Path(test, "file");
353     final Path f2 = new Path(foo, "file");
354     final Path f3 = new Path(bar, "file");
355 
356     DFSTestUtil.createFile(dfs, f1, BLOCK_SIZE, DATA_NUM, 0L);
357     DFSTestUtil.createFile(dfs, f2, BLOCK_SIZE * 2, DATA_NUM, 1L);
358     DFSTestUtil.createFile(dfs, f3, BLOCK_SIZE * 3, DATA_NUM, 2L);
359   }
360 
changeData3(Path dir)361   private void changeData3(Path dir) throws Exception {
362     final Path test = new Path(dir, "test");
363     final Path foo = new Path(dir, "foo");
364     final Path bar = new Path(dir, "bar");
365     final Path f1 = new Path(test, "file");
366     final Path f2 = new Path(foo, "file");
367     final Path f3 = new Path(bar, "file");
368     final Path newf1 = new Path(test, "newfile");
369     final Path newf2 = new Path(foo, "newfile");
370     final Path newf3 = new Path(bar, "newfile");
371 
372     dfs.rename(f1, newf1);
373     dfs.rename(f2, newf2);
374     dfs.rename(f3, newf3);
375   }
376 
377   /**
378    * Test a case where there are multiple source files with the same name
379    */
380   @Test
testSync3()381   public void testSync3() throws Exception {
382     initData3(source);
383     initData3(target);
384     dfs.allowSnapshot(source);
385     dfs.allowSnapshot(target);
386     dfs.createSnapshot(source, "s1");
387     dfs.createSnapshot(target, "s1");
388 
389     // make changes under source
390     changeData3(source);
391     dfs.createSnapshot(source, "s2");
392 
393     SnapshotDiffReport report = dfs.getSnapshotDiffReport(source, "s1", "s2");
394     System.out.println(report);
395 
396     // do the sync
397     Assert.assertTrue(DistCpSync.sync(options, conf));
398     verifyCopy(dfs.getFileStatus(source), dfs.getFileStatus(target), false);
399   }
400 }
401