1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hdfs.server.mover;
19 
20 import java.io.IOException;
21 import java.net.URI;
22 import java.util.ArrayList;
23 import java.util.Arrays;
24 import java.util.Collection;
25 import java.util.Collections;
26 import java.util.List;
27 import java.util.Map;
28 
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.commons.logging.impl.Log4JLogger;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.fs.FSDataInputStream;
34 import org.apache.hadoop.fs.FSDataOutputStream;
35 import org.apache.hadoop.fs.FileUtil;
36 import org.apache.hadoop.fs.Path;
37 import org.apache.hadoop.fs.StorageType;
38 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
39 import org.apache.hadoop.hdfs.DFSConfigKeys;
40 import org.apache.hadoop.hdfs.DFSOutputStream;
41 import org.apache.hadoop.hdfs.DFSTestUtil;
42 import org.apache.hadoop.hdfs.DFSUtil;
43 import org.apache.hadoop.hdfs.DistributedFileSystem;
44 import org.apache.hadoop.hdfs.HdfsConfiguration;
45 import org.apache.hadoop.hdfs.MiniDFSCluster;
46 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
47 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
48 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
49 import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
50 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
51 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
52 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
53 import org.apache.hadoop.hdfs.server.balancer.Dispatcher;
54 import org.apache.hadoop.hdfs.server.balancer.ExitStatus;
55 import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
56 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
57 import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
58 import org.apache.hadoop.hdfs.server.datanode.DataNode;
59 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
60 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
61 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
62 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
63 import org.apache.hadoop.io.IOUtils;
64 import org.apache.log4j.Level;
65 import org.junit.Assert;
66 import org.junit.Test;
67 
68 import com.google.common.base.Preconditions;
69 import com.google.common.collect.Maps;
70 
71 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC;
72 
73 /**
74  * Test the data migration tool (for Archival Storage)
75  */
76 public class TestStorageMover {
77   static final Log LOG = LogFactory.getLog(TestStorageMover.class);
78   static {
79     ((Log4JLogger)LogFactory.getLog(BlockPlacementPolicy.class)
80         ).getLogger().setLevel(Level.ALL);
81     ((Log4JLogger)LogFactory.getLog(Dispatcher.class)
82         ).getLogger().setLevel(Level.ALL);
83     ((Log4JLogger)LogFactory.getLog(DataTransferProtocol.class)).getLogger()
84         .setLevel(Level.ALL);
85   }
86 
87   private static final int BLOCK_SIZE = 1024;
88   private static final short REPL = 3;
89   private static final int NUM_DATANODES = 6;
90   private static final Configuration DEFAULT_CONF = new HdfsConfiguration();
91   private static final BlockStoragePolicySuite DEFAULT_POLICIES;
92   private static final BlockStoragePolicy HOT;
93   private static final BlockStoragePolicy WARM;
94   private static final BlockStoragePolicy COLD;
95 
96   static {
DEFAULT_CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE)97     DEFAULT_CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
DEFAULT_CONF.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L)98     DEFAULT_CONF.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
DEFAULT_CONF.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 2L)99     DEFAULT_CONF.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
100         2L);
DEFAULT_CONF.setLong(DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_KEY, 2000L)101     DEFAULT_CONF.setLong(DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_KEY, 2000L);
102 
103     DEFAULT_POLICIES = BlockStoragePolicySuite.createDefaultSuite();
104     HOT = DEFAULT_POLICIES.getPolicy(HdfsConstants.HOT_STORAGE_POLICY_NAME);
105     WARM = DEFAULT_POLICIES.getPolicy(HdfsConstants.WARM_STORAGE_POLICY_NAME);
106     COLD = DEFAULT_POLICIES.getPolicy(HdfsConstants.COLD_STORAGE_POLICY_NAME);
TestBalancer.initTestSetup()107     TestBalancer.initTestSetup();
108     Dispatcher.setDelayAfterErrors(1000L);
109   }
110 
111   /**
112    * This scheme defines files/directories and their block storage policies. It
113    * also defines snapshots.
114    */
115   static class NamespaceScheme {
116     final List<Path> dirs;
117     final List<Path> files;
118     final long fileSize;
119     final Map<Path, List<String>> snapshotMap;
120     final Map<Path, BlockStoragePolicy> policyMap;
121 
NamespaceScheme(List<Path> dirs, List<Path> files, long fileSize, Map<Path,List<String>> snapshotMap, Map<Path, BlockStoragePolicy> policyMap)122     NamespaceScheme(List<Path> dirs, List<Path> files, long fileSize,
123                     Map<Path,List<String>> snapshotMap,
124                     Map<Path, BlockStoragePolicy> policyMap) {
125       this.dirs = dirs == null? Collections.<Path>emptyList(): dirs;
126       this.files = files == null? Collections.<Path>emptyList(): files;
127       this.fileSize = fileSize;
128       this.snapshotMap = snapshotMap == null ?
129           Collections.<Path, List<String>>emptyMap() : snapshotMap;
130       this.policyMap = policyMap;
131     }
132 
133     /**
134      * Create files/directories/snapshots.
135      */
prepare(DistributedFileSystem dfs, short repl)136     void prepare(DistributedFileSystem dfs, short repl) throws Exception {
137       for (Path d : dirs) {
138         dfs.mkdirs(d);
139       }
140       for (Path file : files) {
141         DFSTestUtil.createFile(dfs, file, fileSize, repl, 0L);
142       }
143       for (Map.Entry<Path, List<String>> entry : snapshotMap.entrySet()) {
144         for (String snapshot : entry.getValue()) {
145           SnapshotTestHelper.createSnapshot(dfs, entry.getKey(), snapshot);
146         }
147       }
148     }
149 
150     /**
151      * Set storage policies according to the corresponding scheme.
152      */
setStoragePolicy(DistributedFileSystem dfs)153     void setStoragePolicy(DistributedFileSystem dfs) throws Exception {
154       for (Map.Entry<Path, BlockStoragePolicy> entry : policyMap.entrySet()) {
155         dfs.setStoragePolicy(entry.getKey(), entry.getValue().getName());
156       }
157     }
158   }
159 
160   /**
161    * This scheme defines DataNodes and their storage, including storage types
162    * and remaining capacities.
163    */
164   static class ClusterScheme {
165     final Configuration conf;
166     final int numDataNodes;
167     final short repl;
168     final StorageType[][] storageTypes;
169     final long[][] storageCapacities;
170 
ClusterScheme()171     ClusterScheme() {
172       this(DEFAULT_CONF, NUM_DATANODES, REPL,
173           genStorageTypes(NUM_DATANODES), null);
174     }
175 
ClusterScheme(Configuration conf, int numDataNodes, short repl, StorageType[][] types, long[][] capacities)176     ClusterScheme(Configuration conf, int numDataNodes, short repl,
177         StorageType[][] types, long[][] capacities) {
178       Preconditions.checkArgument(types == null || types.length == numDataNodes);
179       Preconditions.checkArgument(capacities == null || capacities.length ==
180           numDataNodes);
181       this.conf = conf;
182       this.numDataNodes = numDataNodes;
183       this.repl = repl;
184       this.storageTypes = types;
185       this.storageCapacities = capacities;
186     }
187   }
188 
189   class MigrationTest {
190     private final ClusterScheme clusterScheme;
191     private final NamespaceScheme nsScheme;
192     private final Configuration conf;
193 
194     private MiniDFSCluster cluster;
195     private DistributedFileSystem dfs;
196     private final BlockStoragePolicySuite policies;
197 
MigrationTest(ClusterScheme cScheme, NamespaceScheme nsScheme)198     MigrationTest(ClusterScheme cScheme, NamespaceScheme nsScheme) {
199       this.clusterScheme = cScheme;
200       this.nsScheme = nsScheme;
201       this.conf = clusterScheme.conf;
202       this.policies = DEFAULT_POLICIES;
203     }
204 
205     /**
206      * Set up the cluster and start NameNode and DataNodes according to the
207      * corresponding scheme.
208      */
setupCluster()209     void setupCluster() throws Exception {
210       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(clusterScheme
211           .numDataNodes).storageTypes(clusterScheme.storageTypes)
212           .storageCapacities(clusterScheme.storageCapacities).build();
213       cluster.waitActive();
214       dfs = cluster.getFileSystem();
215     }
216 
runBasicTest(boolean shutdown)217     private void runBasicTest(boolean shutdown) throws Exception {
218       setupCluster();
219       try {
220         prepareNamespace();
221         verify(true);
222 
223         setStoragePolicy();
224         migrate();
225         verify(true);
226       } finally {
227         if (shutdown) {
228           shutdownCluster();
229         }
230       }
231     }
232 
shutdownCluster()233     void shutdownCluster() throws Exception {
234       IOUtils.cleanup(null, dfs);
235       if (cluster != null) {
236         cluster.shutdown();
237       }
238     }
239 
240     /**
241      * Create files/directories and set their storage policies according to the
242      * corresponding scheme.
243      */
prepareNamespace()244     void prepareNamespace() throws Exception {
245       nsScheme.prepare(dfs, clusterScheme.repl);
246     }
247 
setStoragePolicy()248     void setStoragePolicy() throws Exception {
249       nsScheme.setStoragePolicy(dfs);
250     }
251 
252     /**
253      * Run the migration tool.
254      */
migrate()255     void migrate() throws Exception {
256       runMover();
257       Thread.sleep(5000); // let the NN finish deletion
258     }
259 
260     /**
261      * Verify block locations after running the migration tool.
262      */
verify(boolean verifyAll)263     void verify(boolean verifyAll) throws Exception {
264       for (DataNode dn : cluster.getDataNodes()) {
265         DataNodeTestUtils.triggerBlockReport(dn);
266       }
267       if (verifyAll) {
268         verifyNamespace();
269       }
270     }
271 
runMover()272     private void runMover() throws Exception {
273       Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
274       Map<URI, List<Path>> nnMap = Maps.newHashMap();
275       for (URI nn : namenodes) {
276         nnMap.put(nn, null);
277       }
278       int result = Mover.run(nnMap, conf);
279       Assert.assertEquals(ExitStatus.SUCCESS.getExitCode(), result);
280     }
281 
verifyNamespace()282     private void verifyNamespace() throws Exception {
283       HdfsFileStatus status = dfs.getClient().getFileInfo("/");
284       verifyRecursively(null, status);
285     }
286 
verifyRecursively(final Path parent, final HdfsFileStatus status)287     private void verifyRecursively(final Path parent,
288         final HdfsFileStatus status) throws Exception {
289       if (status.isDir()) {
290         Path fullPath = parent == null ?
291             new Path("/") : status.getFullPath(parent);
292         DirectoryListing children = dfs.getClient().listPaths(
293             fullPath.toString(), HdfsFileStatus.EMPTY_NAME, true);
294         for (HdfsFileStatus child : children.getPartialListing()) {
295           verifyRecursively(fullPath, child);
296         }
297       } else if (!status.isSymlink()) { // is file
298         verifyFile(parent, status, null);
299       }
300     }
301 
verifyFile(final Path file, final Byte expectedPolicyId)302     void verifyFile(final Path file, final Byte expectedPolicyId)
303         throws Exception {
304       final Path parent = file.getParent();
305       DirectoryListing children = dfs.getClient().listPaths(
306           parent.toString(), HdfsFileStatus.EMPTY_NAME, true);
307       for (HdfsFileStatus child : children.getPartialListing()) {
308         if (child.getLocalName().equals(file.getName())) {
309           verifyFile(parent,  child, expectedPolicyId);
310           return;
311         }
312       }
313       Assert.fail("File " + file + " not found.");
314     }
315 
verifyFile(final Path parent, final HdfsFileStatus status, final Byte expectedPolicyId)316     private void verifyFile(final Path parent, final HdfsFileStatus status,
317         final Byte expectedPolicyId) throws Exception {
318       HdfsLocatedFileStatus fileStatus = (HdfsLocatedFileStatus) status;
319       byte policyId = fileStatus.getStoragePolicy();
320       BlockStoragePolicy policy = policies.getPolicy(policyId);
321       if (expectedPolicyId != null) {
322         Assert.assertEquals((byte)expectedPolicyId, policy.getId());
323       }
324       final List<StorageType> types = policy.chooseStorageTypes(
325           status.getReplication());
326       for(LocatedBlock lb : fileStatus.getBlockLocations().getLocatedBlocks()) {
327         final Mover.StorageTypeDiff diff = new Mover.StorageTypeDiff(types,
328             lb.getStorageTypes());
329         Assert.assertTrue(fileStatus.getFullName(parent.toString())
330             + " with policy " + policy + " has non-empty overlap: " + diff
331             + ", the corresponding block is " + lb.getBlock().getLocalBlock(),
332             diff.removeOverlap(true));
333       }
334     }
335 
getReplication(Path file)336     Replication getReplication(Path file) throws IOException {
337       return getOrVerifyReplication(file, null);
338     }
339 
verifyReplication(Path file, int expectedDiskCount, int expectedArchiveCount)340     Replication verifyReplication(Path file, int expectedDiskCount,
341         int expectedArchiveCount) throws IOException {
342       final Replication r = new Replication();
343       r.disk = expectedDiskCount;
344       r.archive = expectedArchiveCount;
345       return getOrVerifyReplication(file, r);
346     }
347 
getOrVerifyReplication(Path file, Replication expected)348     private Replication getOrVerifyReplication(Path file, Replication expected)
349         throws IOException {
350       final List<LocatedBlock> lbs = dfs.getClient().getLocatedBlocks(
351           file.toString(), 0).getLocatedBlocks();
352       Assert.assertEquals(1, lbs.size());
353 
354       LocatedBlock lb = lbs.get(0);
355       StringBuilder types = new StringBuilder();
356       final Replication r = new Replication();
357       for(StorageType t : lb.getStorageTypes()) {
358         types.append(t).append(", ");
359         if (t == StorageType.DISK) {
360           r.disk++;
361         } else if (t == StorageType.ARCHIVE) {
362           r.archive++;
363         } else {
364           Assert.fail("Unexpected storage type " + t);
365         }
366       }
367 
368       if (expected != null) {
369         final String s = "file = " + file + "\n  types = [" + types + "]";
370         Assert.assertEquals(s, expected, r);
371       }
372       return r;
373     }
374   }
375 
376   static class Replication {
377     int disk;
378     int archive;
379 
380     @Override
hashCode()381     public int hashCode() {
382       return disk ^ archive;
383     }
384 
385     @Override
equals(Object obj)386     public boolean equals(Object obj) {
387       if (obj == this) {
388         return true;
389       } else if (obj == null || !(obj instanceof Replication)) {
390         return false;
391       }
392       final Replication that = (Replication)obj;
393       return this.disk == that.disk && this.archive == that.archive;
394     }
395 
396     @Override
toString()397     public String toString() {
398       return "[disk=" + disk + ", archive=" + archive + "]";
399     }
400   }
401 
genStorageTypes(int numDataNodes)402   private static StorageType[][] genStorageTypes(int numDataNodes) {
403     return genStorageTypes(numDataNodes, 0, 0, 0);
404   }
405 
genStorageTypes(int numDataNodes, int numAllDisk, int numAllArchive)406   private static StorageType[][] genStorageTypes(int numDataNodes,
407       int numAllDisk, int numAllArchive) {
408     return genStorageTypes(numDataNodes, numAllDisk, numAllArchive, 0);
409   }
410 
genStorageTypes(int numDataNodes, int numAllDisk, int numAllArchive, int numRamDisk)411   private static StorageType[][] genStorageTypes(int numDataNodes,
412       int numAllDisk, int numAllArchive, int numRamDisk) {
413     Preconditions.checkArgument(
414       (numAllDisk + numAllArchive + numRamDisk) <= numDataNodes);
415 
416     StorageType[][] types = new StorageType[numDataNodes][];
417     int i = 0;
418     for (; i < numRamDisk; i++)
419     {
420       types[i] = new StorageType[]{StorageType.RAM_DISK, StorageType.DISK};
421     }
422     for (; i < numRamDisk + numAllDisk; i++) {
423       types[i] = new StorageType[]{StorageType.DISK, StorageType.DISK};
424     }
425     for (; i < numRamDisk + numAllDisk + numAllArchive; i++) {
426       types[i] = new StorageType[]{StorageType.ARCHIVE, StorageType.ARCHIVE};
427     }
428     for (; i < types.length; i++) {
429       types[i] = new StorageType[]{StorageType.DISK, StorageType.ARCHIVE};
430     }
431     return types;
432   }
433 
genCapacities(int nDatanodes, int numAllDisk, int numAllArchive, int numRamDisk, long diskCapacity, long archiveCapacity, long ramDiskCapacity)434   private static long[][] genCapacities(int nDatanodes, int numAllDisk,
435       int numAllArchive, int numRamDisk, long diskCapacity,
436       long archiveCapacity, long ramDiskCapacity) {
437     final long[][] capacities = new long[nDatanodes][];
438     int i = 0;
439     for (; i < numRamDisk; i++) {
440       capacities[i] = new long[]{ramDiskCapacity, diskCapacity};
441     }
442     for (; i < numRamDisk + numAllDisk; i++) {
443       capacities[i] = new long[]{diskCapacity, diskCapacity};
444     }
445     for (; i < numRamDisk + numAllDisk + numAllArchive; i++) {
446       capacities[i] = new long[]{archiveCapacity, archiveCapacity};
447     }
448     for(; i < capacities.length; i++) {
449       capacities[i] = new long[]{diskCapacity, archiveCapacity};
450     }
451     return capacities;
452   }
453 
454   private static class PathPolicyMap {
455     final Map<Path, BlockStoragePolicy> map = Maps.newHashMap();
456     final Path hot = new Path("/hot");
457     final Path warm = new Path("/warm");
458     final Path cold = new Path("/cold");
459     final List<Path> files;
460 
PathPolicyMap(int filesPerDir)461     PathPolicyMap(int filesPerDir){
462       map.put(hot, HOT);
463       map.put(warm, WARM);
464       map.put(cold, COLD);
465       files = new ArrayList<Path>();
466       for(Path dir : map.keySet()) {
467         for(int i = 0; i < filesPerDir; i++) {
468           files.add(new Path(dir, "file" + i));
469         }
470       }
471     }
472 
newNamespaceScheme()473     NamespaceScheme newNamespaceScheme() {
474       return new NamespaceScheme(Arrays.asList(hot, warm, cold),
475           files, BLOCK_SIZE/2, null, map);
476     }
477 
478     /**
479      * Move hot files to warm and cold, warm files to hot and cold,
480      * and cold files to hot and warm.
481      */
moveAround(DistributedFileSystem dfs)482     void moveAround(DistributedFileSystem dfs) throws Exception {
483       for(Path srcDir : map.keySet()) {
484         int i = 0;
485         for(Path dstDir : map.keySet()) {
486           if (!srcDir.equals(dstDir)) {
487             final Path src = new Path(srcDir, "file" + i++);
488             final Path dst = new Path(dstDir, srcDir.getName() + "2" + dstDir.getName());
489             LOG.info("rename " + src + " to " + dst);
490             dfs.rename(src, dst);
491           }
492         }
493       }
494     }
495   }
496 
497   /**
498    * A normal case for Mover: move a file into archival storage
499    */
500   @Test
testMigrateFileToArchival()501   public void testMigrateFileToArchival() throws Exception {
502     LOG.info("testMigrateFileToArchival");
503     final Path foo = new Path("/foo");
504     Map<Path, BlockStoragePolicy> policyMap = Maps.newHashMap();
505     policyMap.put(foo, COLD);
506     NamespaceScheme nsScheme = new NamespaceScheme(null, Arrays.asList(foo),
507         2*BLOCK_SIZE, null, policyMap);
508     ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF,
509         NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES), null);
510     new MigrationTest(clusterScheme, nsScheme).runBasicTest(true);
511   }
512 
513   /**
514    * Print a big banner in the test log to make debug easier.
515    */
banner(String string)516   static void banner(String string) {
517     LOG.info("\n\n\n\n================================================\n" +
518         string + "\n" +
519         "==================================================\n\n");
520   }
521 
522   /**
523    * Run Mover with arguments specifying files and directories
524    */
525   @Test
testMoveSpecificPaths()526   public void testMoveSpecificPaths() throws Exception {
527     LOG.info("testMoveSpecificPaths");
528     final Path foo = new Path("/foo");
529     final Path barFile = new Path(foo, "bar");
530     final Path foo2 = new Path("/foo2");
531     final Path bar2File = new Path(foo2, "bar2");
532     Map<Path, BlockStoragePolicy> policyMap = Maps.newHashMap();
533     policyMap.put(foo, COLD);
534     policyMap.put(foo2, WARM);
535     NamespaceScheme nsScheme = new NamespaceScheme(Arrays.asList(foo, foo2),
536         Arrays.asList(barFile, bar2File), BLOCK_SIZE, null, policyMap);
537     ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF,
538         NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES), null);
539     MigrationTest test = new MigrationTest(clusterScheme, nsScheme);
540     test.setupCluster();
541 
542     try {
543       test.prepareNamespace();
544       test.setStoragePolicy();
545 
546       Map<URI, List<Path>> map = Mover.Cli.getNameNodePathsToMove(test.conf,
547           "-p", "/foo/bar", "/foo2");
548       int result = Mover.run(map, test.conf);
549       Assert.assertEquals(ExitStatus.SUCCESS.getExitCode(), result);
550 
551       Thread.sleep(5000);
552       test.verify(true);
553     } finally {
554       test.shutdownCluster();
555     }
556   }
557 
558   /**
559    * Move an open file into archival storage
560    */
561   @Test
testMigrateOpenFileToArchival()562   public void testMigrateOpenFileToArchival() throws Exception {
563     LOG.info("testMigrateOpenFileToArchival");
564     final Path fooDir = new Path("/foo");
565     Map<Path, BlockStoragePolicy> policyMap = Maps.newHashMap();
566     policyMap.put(fooDir, COLD);
567     NamespaceScheme nsScheme = new NamespaceScheme(Arrays.asList(fooDir), null,
568         BLOCK_SIZE, null, policyMap);
569     ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF,
570         NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES), null);
571     MigrationTest test = new MigrationTest(clusterScheme, nsScheme);
572     test.setupCluster();
573 
574     // create an open file
575     banner("writing to file /foo/bar");
576     final Path barFile = new Path(fooDir, "bar");
577     DFSTestUtil.createFile(test.dfs, barFile, BLOCK_SIZE, (short) 1, 0L);
578     FSDataOutputStream out = test.dfs.append(barFile);
579     out.writeBytes("hello, ");
580     ((DFSOutputStream) out.getWrappedStream()).hsync();
581 
582     try {
583       banner("start data migration");
584       test.setStoragePolicy(); // set /foo to COLD
585       test.migrate();
586 
587       // make sure the under construction block has not been migrated
588       LocatedBlocks lbs = test.dfs.getClient().getLocatedBlocks(
589           barFile.toString(), BLOCK_SIZE);
590       LOG.info("Locations: " + lbs);
591       List<LocatedBlock> blks = lbs.getLocatedBlocks();
592       Assert.assertEquals(1, blks.size());
593       Assert.assertEquals(1, blks.get(0).getLocations().length);
594 
595       banner("finish the migration, continue writing");
596       // make sure the writing can continue
597       out.writeBytes("world!");
598       ((DFSOutputStream) out.getWrappedStream()).hsync();
599       IOUtils.cleanup(LOG, out);
600 
601       lbs = test.dfs.getClient().getLocatedBlocks(
602           barFile.toString(), BLOCK_SIZE);
603       LOG.info("Locations: " + lbs);
604       blks = lbs.getLocatedBlocks();
605       Assert.assertEquals(1, blks.size());
606       Assert.assertEquals(1, blks.get(0).getLocations().length);
607 
608       banner("finish writing, starting reading");
609       // check the content of /foo/bar
610       FSDataInputStream in = test.dfs.open(barFile);
611       byte[] buf = new byte[13];
612       // read from offset 1024
613       in.readFully(BLOCK_SIZE, buf, 0, buf.length);
614       IOUtils.cleanup(LOG, in);
615       Assert.assertEquals("hello, world!", new String(buf));
616     } finally {
617       test.shutdownCluster();
618     }
619   }
620 
621   /**
622    * Test directories with Hot, Warm and Cold polices.
623    */
624   @Test
testHotWarmColdDirs()625   public void testHotWarmColdDirs() throws Exception {
626     LOG.info("testHotWarmColdDirs");
627     PathPolicyMap pathPolicyMap = new PathPolicyMap(3);
628     NamespaceScheme nsScheme = pathPolicyMap.newNamespaceScheme();
629     ClusterScheme clusterScheme = new ClusterScheme();
630     MigrationTest test = new MigrationTest(clusterScheme, nsScheme);
631 
632     try {
633       test.runBasicTest(false);
634       pathPolicyMap.moveAround(test.dfs);
635       test.migrate();
636 
637       test.verify(true);
638     } finally {
639       test.shutdownCluster();
640     }
641   }
642 
waitForAllReplicas(int expectedReplicaNum, Path file, DistributedFileSystem dfs)643   private void waitForAllReplicas(int expectedReplicaNum, Path file,
644       DistributedFileSystem dfs) throws Exception {
645     for (int i = 0; i < 5; i++) {
646       LocatedBlocks lbs = dfs.getClient().getLocatedBlocks(file.toString(), 0,
647           BLOCK_SIZE);
648       LocatedBlock lb = lbs.get(0);
649       if (lb.getLocations().length >= expectedReplicaNum) {
650         return;
651       } else {
652         Thread.sleep(1000);
653       }
654     }
655   }
656 
setVolumeFull(DataNode dn, StorageType type)657   private void setVolumeFull(DataNode dn, StorageType type) {
658     List<? extends FsVolumeSpi> volumes = dn.getFSDataset().getVolumes();
659     for (FsVolumeSpi v : volumes) {
660       FsVolumeImpl volume = (FsVolumeImpl) v;
661       if (volume.getStorageType() == type) {
662         LOG.info("setCapacity to 0 for [" + volume.getStorageType() + "]"
663             + volume.getStorageID());
664         volume.setCapacityForTesting(0);
665       }
666     }
667   }
668 
669   /**
670    * Test DISK is running out of spaces.
671    */
672   @Test
testNoSpaceDisk()673   public void testNoSpaceDisk() throws Exception {
674     LOG.info("testNoSpaceDisk");
675     final PathPolicyMap pathPolicyMap = new PathPolicyMap(0);
676     final NamespaceScheme nsScheme = pathPolicyMap.newNamespaceScheme();
677 
678     Configuration conf = new Configuration(DEFAULT_CONF);
679     final ClusterScheme clusterScheme = new ClusterScheme(conf,
680         NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES), null);
681     final MigrationTest test = new MigrationTest(clusterScheme, nsScheme);
682 
683     try {
684       test.runBasicTest(false);
685 
686       // create 2 hot files with replication 3
687       final short replication = 3;
688       for (int i = 0; i < 2; i++) {
689         final Path p = new Path(pathPolicyMap.hot, "file" + i);
690         DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L);
691         waitForAllReplicas(replication, p, test.dfs);
692       }
693 
694       // set all the DISK volume to full
695       for (DataNode dn : test.cluster.getDataNodes()) {
696         setVolumeFull(dn, StorageType.DISK);
697         DataNodeTestUtils.triggerHeartbeat(dn);
698       }
699 
700       // test increasing replication.  Since DISK is full,
701       // new replicas should be stored in ARCHIVE as a fallback storage.
702       final Path file0 = new Path(pathPolicyMap.hot, "file0");
703       final Replication r = test.getReplication(file0);
704       final short newReplication = (short) 5;
705       test.dfs.setReplication(file0, newReplication);
706       Thread.sleep(10000);
707       test.verifyReplication(file0, r.disk, newReplication - r.disk);
708 
709       // test creating a cold file and then increase replication
710       final Path p = new Path(pathPolicyMap.cold, "foo");
711       DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L);
712       test.verifyReplication(p, 0, replication);
713 
714       test.dfs.setReplication(p, newReplication);
715       Thread.sleep(10000);
716       test.verifyReplication(p, 0, newReplication);
717 
718       //test move a hot file to warm
719       final Path file1 = new Path(pathPolicyMap.hot, "file1");
720       test.dfs.rename(file1, pathPolicyMap.warm);
721       test.migrate();
722       test.verifyFile(new Path(pathPolicyMap.warm, "file1"), WARM.getId());
723     } finally {
724       test.shutdownCluster();
725     }
726   }
727 
728   /**
729    * Test ARCHIVE is running out of spaces.
730    */
731   @Test
testNoSpaceArchive()732   public void testNoSpaceArchive() throws Exception {
733     LOG.info("testNoSpaceArchive");
734     final PathPolicyMap pathPolicyMap = new PathPolicyMap(0);
735     final NamespaceScheme nsScheme = pathPolicyMap.newNamespaceScheme();
736 
737     final ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF,
738         NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES), null);
739     final MigrationTest test = new MigrationTest(clusterScheme, nsScheme);
740 
741     try {
742       test.runBasicTest(false);
743 
744       // create 2 hot files with replication 3
745       final short replication = 3;
746       for (int i = 0; i < 2; i++) {
747         final Path p = new Path(pathPolicyMap.cold, "file" + i);
748         DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L);
749         waitForAllReplicas(replication, p, test.dfs);
750       }
751 
752       // set all the ARCHIVE volume to full
753       for (DataNode dn : test.cluster.getDataNodes()) {
754         setVolumeFull(dn, StorageType.ARCHIVE);
755         DataNodeTestUtils.triggerHeartbeat(dn);
756       }
757 
758       { // test increasing replication but new replicas cannot be created
759         // since no more ARCHIVE space.
760         final Path file0 = new Path(pathPolicyMap.cold, "file0");
761         final Replication r = test.getReplication(file0);
762         Assert.assertEquals(0, r.disk);
763 
764         final short newReplication = (short) 5;
765         test.dfs.setReplication(file0, newReplication);
766         Thread.sleep(10000);
767 
768         test.verifyReplication(file0, 0, r.archive);
769       }
770 
771       { // test creating a hot file
772         final Path p = new Path(pathPolicyMap.hot, "foo");
773         DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, (short) 3, 0L);
774       }
775 
776       { //test move a cold file to warm
777         final Path file1 = new Path(pathPolicyMap.cold, "file1");
778         test.dfs.rename(file1, pathPolicyMap.warm);
779         test.migrate();
780         test.verify(true);
781       }
782     } finally {
783       test.shutdownCluster();
784     }
785   }
786 }
787