1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hdfs;
19 
20 import static org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite.ID_UNSPECIFIED;
21 
22 import java.io.File;
23 import java.io.FileNotFoundException;
24 import java.io.IOException;
25 import java.util.*;
26 
27 import com.google.common.collect.Lists;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.fs.FSDataOutputStream;
30 import org.apache.hadoop.fs.FileSystem;
31 import org.apache.hadoop.fs.Path;
32 import org.apache.hadoop.fs.StorageType;
33 import org.apache.hadoop.hdfs.protocol.*;
34 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
35 import org.apache.hadoop.hdfs.server.blockmanagement.*;
36 import org.apache.hadoop.hdfs.server.datanode.DataNode;
37 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
38 import org.apache.hadoop.hdfs.server.namenode.NameNode;
39 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
40 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
41 import org.apache.hadoop.io.IOUtils;
42 import org.apache.hadoop.ipc.RemoteException;
43 import org.apache.hadoop.net.NetworkTopology;
44 import org.apache.hadoop.net.Node;
45 import org.apache.hadoop.test.GenericTestUtils;
46 import org.apache.hadoop.test.PathUtils;
47 import org.junit.Assert;
48 import org.junit.Test;
49 
50 /** Test {@link BlockStoragePolicy} */
51 public class TestBlockStoragePolicy {
52   public static final BlockStoragePolicySuite POLICY_SUITE;
53   public static final BlockStoragePolicy DEFAULT_STORAGE_POLICY;
54   public static final Configuration conf;
55 
56   static {
57     conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1)58     conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1)59     conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
60     POLICY_SUITE = BlockStoragePolicySuite.createDefaultSuite();
61     DEFAULT_STORAGE_POLICY = POLICY_SUITE.getDefaultPolicy();
62   }
63 
64   static final EnumSet<StorageType> none = EnumSet.noneOf(StorageType.class);
65   static final EnumSet<StorageType> archive = EnumSet.of(StorageType.ARCHIVE);
66   static final EnumSet<StorageType> disk = EnumSet.of(StorageType.DISK);
67   static final EnumSet<StorageType> both = EnumSet.of(StorageType.DISK, StorageType.ARCHIVE);
68 
69   static final long FILE_LEN = 1024;
70   static final short REPLICATION = 3;
71 
72   static final byte COLD = HdfsConstants.COLD_STORAGE_POLICY_ID;
73   static final byte WARM = HdfsConstants.WARM_STORAGE_POLICY_ID;
74   static final byte HOT  = HdfsConstants.HOT_STORAGE_POLICY_ID;
75   static final byte ONESSD  = HdfsConstants.ONESSD_STORAGE_POLICY_ID;
76   static final byte ALLSSD  = HdfsConstants.ALLSSD_STORAGE_POLICY_ID;
77   static final byte LAZY_PERSIST  = HdfsConstants.MEMORY_STORAGE_POLICY_ID;
78 
79   @Test (timeout=300000)
testConfigKeyEnabled()80   public void testConfigKeyEnabled() throws IOException {
81     Configuration conf = new HdfsConfiguration();
82     conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, true);
83     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
84         .numDataNodes(1).build();
85     try {
86       cluster.waitActive();
87       cluster.getFileSystem().setStoragePolicy(new Path("/"),
88           HdfsConstants.COLD_STORAGE_POLICY_NAME);
89     } finally {
90       cluster.shutdown();
91     }
92   }
93 
94   /**
95    * Ensure that setStoragePolicy throws IOException when
96    * dfs.storage.policy.enabled is set to false.
97    * @throws IOException
98    */
99   @Test (timeout=300000, expected=IOException.class)
testConfigKeyDisabled()100   public void testConfigKeyDisabled() throws IOException {
101     Configuration conf = new HdfsConfiguration();
102     conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, false);
103     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
104         .numDataNodes(1).build();
105     try {
106       cluster.waitActive();
107       cluster.getFileSystem().setStoragePolicy(new Path("/"),
108           HdfsConstants.COLD_STORAGE_POLICY_NAME);
109     } finally {
110       cluster.shutdown();
111     }
112   }
113 
114   @Test
testDefaultPolicies()115   public void testDefaultPolicies() {
116     final Map<Byte, String> expectedPolicyStrings = new HashMap<Byte, String>();
117     expectedPolicyStrings.put(COLD,
118         "BlockStoragePolicy{COLD:" + COLD + ", storageTypes=[ARCHIVE], " +
119             "creationFallbacks=[], replicationFallbacks=[]}");
120     expectedPolicyStrings.put(WARM,
121         "BlockStoragePolicy{WARM:" + WARM + ", storageTypes=[DISK, ARCHIVE], " +
122             "creationFallbacks=[DISK, ARCHIVE], " +
123             "replicationFallbacks=[DISK, ARCHIVE]}");
124     expectedPolicyStrings.put(HOT,
125         "BlockStoragePolicy{HOT:" + HOT + ", storageTypes=[DISK], " +
126             "creationFallbacks=[], replicationFallbacks=[ARCHIVE]}");
127     expectedPolicyStrings.put(ONESSD, "BlockStoragePolicy{ONE_SSD:" + ONESSD +
128         ", storageTypes=[SSD, DISK], creationFallbacks=[SSD, DISK], " +
129         "replicationFallbacks=[SSD, DISK]}");
130     expectedPolicyStrings.put(ALLSSD, "BlockStoragePolicy{ALL_SSD:" + ALLSSD +
131         ", storageTypes=[SSD], creationFallbacks=[DISK], " +
132         "replicationFallbacks=[DISK]}");
133     expectedPolicyStrings.put(LAZY_PERSIST,
134         "BlockStoragePolicy{LAZY_PERSIST:" + LAZY_PERSIST + ", storageTypes=[RAM_DISK, DISK], " +
135             "creationFallbacks=[DISK], replicationFallbacks=[DISK]}");
136 
137     for(byte i = 1; i < 16; i++) {
138       final BlockStoragePolicy policy = POLICY_SUITE.getPolicy(i);
139       if (policy != null) {
140         final String s = policy.toString();
141         Assert.assertEquals(expectedPolicyStrings.get(i), s);
142       }
143     }
144     Assert.assertEquals(POLICY_SUITE.getPolicy(HOT), POLICY_SUITE.getDefaultPolicy());
145 
146     { // check Cold policy
147       final BlockStoragePolicy cold = POLICY_SUITE.getPolicy(COLD);
148       for(short replication = 1; replication < 6; replication++) {
149         final List<StorageType> computed = cold.chooseStorageTypes(replication);
150         assertStorageType(computed, replication, StorageType.ARCHIVE);
151       }
152       assertCreationFallback(cold, null, null, null);
153       assertReplicationFallback(cold, null, null, null);
154     }
155 
156     { // check Warm policy
157       final BlockStoragePolicy warm = POLICY_SUITE.getPolicy(WARM);
158       for(short replication = 1; replication < 6; replication++) {
159         final List<StorageType> computed = warm.chooseStorageTypes(replication);
160         assertStorageType(computed, replication, StorageType.DISK, StorageType.ARCHIVE);
161       }
162       assertCreationFallback(warm, StorageType.DISK, StorageType.DISK, StorageType.ARCHIVE);
163       assertReplicationFallback(warm, StorageType.DISK, StorageType.DISK, StorageType.ARCHIVE);
164     }
165 
166     { // check Hot policy
167       final BlockStoragePolicy hot = POLICY_SUITE.getPolicy(HOT);
168       for(short replication = 1; replication < 6; replication++) {
169         final List<StorageType> computed = hot.chooseStorageTypes(replication);
170         assertStorageType(computed, replication, StorageType.DISK);
171       }
172       assertCreationFallback(hot, null, null, null);
173       assertReplicationFallback(hot, StorageType.ARCHIVE, null, StorageType.ARCHIVE);
174     }
175   }
176 
newStorageTypes(int nDisk, int nArchive)177   static StorageType[] newStorageTypes(int nDisk, int nArchive) {
178     final StorageType[] t = new StorageType[nDisk + nArchive];
179     Arrays.fill(t, 0, nDisk, StorageType.DISK);
180     Arrays.fill(t, nDisk, t.length, StorageType.ARCHIVE);
181     return t;
182   }
183 
asList(int nDisk, int nArchive)184   static List<StorageType> asList(int nDisk, int nArchive) {
185     return Arrays.asList(newStorageTypes(nDisk, nArchive));
186   }
187 
assertStorageType(List<StorageType> computed, short replication, StorageType... answers)188   static void assertStorageType(List<StorageType> computed, short replication,
189       StorageType... answers) {
190     Assert.assertEquals(replication, computed.size());
191     final StorageType last = answers[answers.length - 1];
192     for(int i = 0; i < computed.size(); i++) {
193       final StorageType expected = i < answers.length? answers[i]: last;
194       Assert.assertEquals(expected, computed.get(i));
195     }
196   }
197 
198   static void assertCreationFallback(BlockStoragePolicy policy, StorageType noneExpected,
199       StorageType archiveExpected, StorageType diskExpected) {
200     Assert.assertEquals(noneExpected, policy.getCreationFallback(none));
201     Assert.assertEquals(archiveExpected, policy.getCreationFallback(archive));
202     Assert.assertEquals(diskExpected, policy.getCreationFallback(disk));
203     Assert.assertEquals(null, policy.getCreationFallback(both));
204   }
205 
206   static void assertReplicationFallback(BlockStoragePolicy policy, StorageType noneExpected,
207       StorageType archiveExpected, StorageType diskExpected) {
208     Assert.assertEquals(noneExpected, policy.getReplicationFallback(none));
209     Assert.assertEquals(archiveExpected, policy.getReplicationFallback(archive));
210     Assert.assertEquals(diskExpected, policy.getReplicationFallback(disk));
211     Assert.assertEquals(null, policy.getReplicationFallback(both));
212   }
213 
214   private static interface CheckChooseStorageTypes {
215     public void checkChooseStorageTypes(BlockStoragePolicy p, short replication,
216         List<StorageType> chosen, StorageType... expected);
217 
218     /** Basic case: pass only replication and chosen */
219     static final CheckChooseStorageTypes Basic = new CheckChooseStorageTypes() {
220       @Override
221       public void checkChooseStorageTypes(BlockStoragePolicy p, short replication,
222           List<StorageType> chosen, StorageType... expected) {
223         final List<StorageType> types = p.chooseStorageTypes(replication, chosen);
224         assertStorageTypes(types, expected);
225       }
226     };
227 
228     /** With empty unavailables and isNewBlock=true */
229     static final CheckChooseStorageTypes EmptyUnavailablesAndNewBlock
230         = new CheckChooseStorageTypes() {
231       @Override
232       public void checkChooseStorageTypes(BlockStoragePolicy p,
233           short replication, List<StorageType> chosen, StorageType... expected) {
234         final List<StorageType> types = p.chooseStorageTypes(replication,
235             chosen, none, true);
236         assertStorageTypes(types, expected);
237       }
238     };
239 
240     /** With empty unavailables and isNewBlock=false */
241     static final CheckChooseStorageTypes EmptyUnavailablesAndNonNewBlock
242         = new CheckChooseStorageTypes() {
243       @Override
244       public void checkChooseStorageTypes(BlockStoragePolicy p,
245           short replication, List<StorageType> chosen, StorageType... expected) {
246         final List<StorageType> types = p.chooseStorageTypes(replication,
247             chosen, none, false);
248         assertStorageTypes(types, expected);
249       }
250     };
251 
252     /** With both DISK and ARCHIVE unavailables and isNewBlock=true */
253     static final CheckChooseStorageTypes BothUnavailableAndNewBlock
254         = new CheckChooseStorageTypes() {
255       @Override
256       public void checkChooseStorageTypes(BlockStoragePolicy p,
257           short replication, List<StorageType> chosen, StorageType... expected) {
258         final List<StorageType> types = p.chooseStorageTypes(replication,
259             chosen, both, true);
260         assertStorageTypes(types, expected);
261       }
262     };
263 
264     /** With both DISK and ARCHIVE unavailable and isNewBlock=false */
265     static final CheckChooseStorageTypes BothUnavailableAndNonNewBlock
266         = new CheckChooseStorageTypes() {
267       @Override
268       public void checkChooseStorageTypes(BlockStoragePolicy p,
269           short replication, List<StorageType> chosen, StorageType... expected) {
270         final List<StorageType> types = p.chooseStorageTypes(replication,
271             chosen, both, false);
272         assertStorageTypes(types, expected);
273       }
274     };
275 
276     /** With ARCHIVE unavailable and isNewBlock=true */
277     static final CheckChooseStorageTypes ArchivalUnavailableAndNewBlock
278         = new CheckChooseStorageTypes() {
279       @Override
280       public void checkChooseStorageTypes(BlockStoragePolicy p,
281           short replication, List<StorageType> chosen, StorageType... expected) {
282         final List<StorageType> types = p.chooseStorageTypes(replication,
283             chosen, archive, true);
284         assertStorageTypes(types, expected);
285       }
286     };
287 
288     /** With ARCHIVE unavailable and isNewBlock=true */
289     static final CheckChooseStorageTypes ArchivalUnavailableAndNonNewBlock
290         = new CheckChooseStorageTypes() {
291       @Override
292       public void checkChooseStorageTypes(BlockStoragePolicy p,
293           short replication, List<StorageType> chosen, StorageType... expected) {
294         final List<StorageType> types = p.chooseStorageTypes(replication,
295             chosen, archive, false);
296         assertStorageTypes(types, expected);
297       }
298     };
299   }
300 
301   @Test
302   public void testChooseStorageTypes() {
303     run(CheckChooseStorageTypes.Basic);
304     run(CheckChooseStorageTypes.EmptyUnavailablesAndNewBlock);
305     run(CheckChooseStorageTypes.EmptyUnavailablesAndNonNewBlock);
306   }
307 
308   private static void run(CheckChooseStorageTypes method) {
309     final BlockStoragePolicy hot = POLICY_SUITE.getPolicy(HOT);
310     final BlockStoragePolicy warm = POLICY_SUITE.getPolicy(WARM);
311     final BlockStoragePolicy cold = POLICY_SUITE.getPolicy(COLD);
312 
313     final short replication = 3;
314     {
315       final List<StorageType> chosen = Lists.newArrayList();
316       method.checkChooseStorageTypes(hot, replication, chosen,
317           StorageType.DISK, StorageType.DISK, StorageType.DISK);
318       method.checkChooseStorageTypes(warm, replication, chosen,
319           StorageType.DISK, StorageType.ARCHIVE, StorageType.ARCHIVE);
320       method.checkChooseStorageTypes(cold, replication, chosen,
321           StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE);
322     }
323 
324     {
325       final List<StorageType> chosen = Arrays.asList(StorageType.DISK);
326       method.checkChooseStorageTypes(hot, replication, chosen,
327           StorageType.DISK, StorageType.DISK);
328       method.checkChooseStorageTypes(warm, replication, chosen,
329           StorageType.ARCHIVE, StorageType.ARCHIVE);
330       method.checkChooseStorageTypes(cold, replication, chosen,
331           StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE);
332     }
333 
334     {
335       final List<StorageType> chosen = Arrays.asList(StorageType.ARCHIVE);
336       method.checkChooseStorageTypes(hot, replication, chosen,
337           StorageType.DISK, StorageType.DISK, StorageType.DISK);
338       method.checkChooseStorageTypes(warm, replication, chosen,
339           StorageType.DISK, StorageType.ARCHIVE);
340       method.checkChooseStorageTypes(cold, replication, chosen,
341           StorageType.ARCHIVE, StorageType.ARCHIVE);
342     }
343 
344     {
345       final List<StorageType> chosen = Arrays.asList(
346           StorageType.DISK, StorageType.DISK);
347       method.checkChooseStorageTypes(hot, replication, chosen,
348           StorageType.DISK);
349       method.checkChooseStorageTypes(warm, replication, chosen,
350           StorageType.ARCHIVE, StorageType.ARCHIVE);
351       method.checkChooseStorageTypes(cold, replication, chosen,
352           StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE);
353     }
354 
355     {
356       final List<StorageType> chosen = Arrays.asList(
357           StorageType.DISK, StorageType.ARCHIVE);
358       method.checkChooseStorageTypes(hot, replication, chosen,
359           StorageType.DISK, StorageType.DISK);
360       method.checkChooseStorageTypes(warm, replication, chosen,
361           StorageType.ARCHIVE);
362       method.checkChooseStorageTypes(cold, replication, chosen,
363           StorageType.ARCHIVE, StorageType.ARCHIVE);
364     }
365 
366     {
367       final List<StorageType> chosen = Arrays.asList(
368           StorageType.ARCHIVE, StorageType.ARCHIVE);
369       method.checkChooseStorageTypes(hot, replication, chosen,
370           StorageType.DISK, StorageType.DISK, StorageType.DISK);
371       method.checkChooseStorageTypes(warm, replication, chosen,
372           StorageType.DISK);
373       method.checkChooseStorageTypes(cold, replication, chosen,
374           StorageType.ARCHIVE);
375     }
376 
377     {
378       final List<StorageType> chosen = Arrays.asList(
379           StorageType.DISK, StorageType.DISK, StorageType.DISK);
380       method.checkChooseStorageTypes(hot, replication, chosen);
381       method.checkChooseStorageTypes(warm, replication, chosen,
382           StorageType.ARCHIVE, StorageType.ARCHIVE);
383       method.checkChooseStorageTypes(cold, replication, chosen,
384           StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE);
385     }
386 
387     {
388       final List<StorageType> chosen = Arrays.asList(
389           StorageType.DISK, StorageType.DISK, StorageType.ARCHIVE);
390       method.checkChooseStorageTypes(hot, replication, chosen,
391           StorageType.DISK);
392       method.checkChooseStorageTypes(warm, replication, chosen,
393           StorageType.ARCHIVE);
394       method.checkChooseStorageTypes(cold, replication, chosen,
395           StorageType.ARCHIVE, StorageType.ARCHIVE);
396     }
397 
398     {
399       final List<StorageType> chosen = Arrays.asList(
400           StorageType.DISK, StorageType.ARCHIVE, StorageType.ARCHIVE);
401       method.checkChooseStorageTypes(hot, replication, chosen,
402           StorageType.DISK, StorageType.DISK);
403       method.checkChooseStorageTypes(warm, replication, chosen);
404       method.checkChooseStorageTypes(cold, replication, chosen,
405           StorageType.ARCHIVE);
406     }
407 
408     {
409       final List<StorageType> chosen = Arrays.asList(
410           StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE);
411       method.checkChooseStorageTypes(hot, replication, chosen,
412           StorageType.DISK, StorageType.DISK, StorageType.DISK);
413       method.checkChooseStorageTypes(warm, replication, chosen,
414           StorageType.DISK);
415       method.checkChooseStorageTypes(cold, replication, chosen);
416     }
417   }
418 
419   @Test
420   public void testChooseStorageTypesWithBothUnavailable() {
421     runWithBothUnavailable(CheckChooseStorageTypes.BothUnavailableAndNewBlock);
422     runWithBothUnavailable(CheckChooseStorageTypes.BothUnavailableAndNonNewBlock);
423   }
424 
425   private static void runWithBothUnavailable(CheckChooseStorageTypes method) {
426     final BlockStoragePolicy hot = POLICY_SUITE.getPolicy(HOT);
427     final BlockStoragePolicy warm = POLICY_SUITE.getPolicy(WARM);
428     final BlockStoragePolicy cold = POLICY_SUITE.getPolicy(COLD);
429 
430     final short replication = 3;
431     for(int n = 0; n <= 3; n++) {
432       for(int d = 0; d <= n; d++) {
433         final int a = n - d;
434         final List<StorageType> chosen = asList(d, a);
435         method.checkChooseStorageTypes(hot, replication, chosen);
436         method.checkChooseStorageTypes(warm, replication, chosen);
437         method.checkChooseStorageTypes(cold, replication, chosen);
438       }
439     }
440   }
441 
442   @Test
443   public void testChooseStorageTypesWithDiskUnavailableAndNewBlock() {
444     final BlockStoragePolicy hot = POLICY_SUITE.getPolicy(HOT);
445     final BlockStoragePolicy warm = POLICY_SUITE.getPolicy(WARM);
446     final BlockStoragePolicy cold = POLICY_SUITE.getPolicy(COLD);
447 
448     final short replication = 3;
449     final EnumSet<StorageType> unavailables = disk;
450     final boolean isNewBlock = true;
451     {
452       final List<StorageType> chosen = Lists.newArrayList();
453       checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock);
454       checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock,
455           StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE);
456       checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock,
457           StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE);
458     }
459 
460     {
461       final List<StorageType> chosen = Arrays.asList(StorageType.DISK);
462       checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock);
463       checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock,
464           StorageType.ARCHIVE, StorageType.ARCHIVE);
465       checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock,
466           StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE);
467     }
468 
469     {
470       final List<StorageType> chosen = Arrays.asList(StorageType.ARCHIVE);
471       checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock);
472       checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock,
473           StorageType.ARCHIVE, StorageType.ARCHIVE);
474       checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock,
475           StorageType.ARCHIVE, StorageType.ARCHIVE);
476     }
477 
478     {
479       final List<StorageType> chosen = Arrays.asList(
480           StorageType.DISK, StorageType.DISK);
481       checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock);
482       checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock,
483           StorageType.ARCHIVE, StorageType.ARCHIVE);
484       checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock,
485           StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE);
486     }
487 
488     {
489       final List<StorageType> chosen = Arrays.asList(
490           StorageType.DISK, StorageType.ARCHIVE);
491       checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock);
492       checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock,
493           StorageType.ARCHIVE);
494       checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock,
495           StorageType.ARCHIVE, StorageType.ARCHIVE);
496     }
497 
498     {
499       final List<StorageType> chosen = Arrays.asList(
500           StorageType.ARCHIVE, StorageType.ARCHIVE);
501       checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock);
502       checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock,
503           StorageType.ARCHIVE);
504       checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock,
505           StorageType.ARCHIVE);
506     }
507 
508     {
509       final List<StorageType> chosen = Arrays.asList(
510           StorageType.DISK, StorageType.DISK, StorageType.DISK);
511       checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock);
512       checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock,
513           StorageType.ARCHIVE, StorageType.ARCHIVE);
514       checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock,
515           StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE);
516     }
517 
518     {
519       final List<StorageType> chosen = Arrays.asList(
520           StorageType.DISK, StorageType.DISK, StorageType.ARCHIVE);
521       checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock);
522       checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock,
523           StorageType.ARCHIVE);
524       checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock,
525           StorageType.ARCHIVE, StorageType.ARCHIVE);
526     }
527 
528     {
529       final List<StorageType> chosen = Arrays.asList(
530           StorageType.DISK, StorageType.ARCHIVE, StorageType.ARCHIVE);
531       checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock);
532       checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock);
533       checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock,
534           StorageType.ARCHIVE);
535     }
536 
537     {
538       final List<StorageType> chosen = Arrays.asList(
539           StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE);
540       checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock);
541       checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock);
542       checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock);
543     }
544   }
545 
546   @Test
547   public void testChooseStorageTypesWithArchiveUnavailable() {
548     runWithArchiveUnavailable(CheckChooseStorageTypes.ArchivalUnavailableAndNewBlock);
549     runWithArchiveUnavailable(CheckChooseStorageTypes.ArchivalUnavailableAndNonNewBlock);
550   }
551 
552   private static void runWithArchiveUnavailable(CheckChooseStorageTypes method) {
553     final BlockStoragePolicy hot = POLICY_SUITE.getPolicy(HOT);
554     final BlockStoragePolicy warm = POLICY_SUITE.getPolicy(WARM);
555     final BlockStoragePolicy cold = POLICY_SUITE.getPolicy(COLD);
556 
557     final short replication = 3;
558     {
559       final List<StorageType> chosen = Lists.newArrayList();
560       method.checkChooseStorageTypes(hot, replication, chosen,
561           StorageType.DISK, StorageType.DISK, StorageType.DISK);
562       method.checkChooseStorageTypes(warm, replication, chosen,
563           StorageType.DISK, StorageType.DISK, StorageType.DISK);
564       method.checkChooseStorageTypes(cold, replication, chosen);
565     }
566 
567     {
568       final List<StorageType> chosen = Arrays.asList(StorageType.DISK);
569       method.checkChooseStorageTypes(hot, replication, chosen,
570           StorageType.DISK, StorageType.DISK);
571       method.checkChooseStorageTypes(warm, replication, chosen,
572           StorageType.DISK, StorageType.DISK);
573       method.checkChooseStorageTypes(cold, replication, chosen);
574     }
575 
576     {
577       final List<StorageType> chosen = Arrays.asList(StorageType.ARCHIVE);
578       method.checkChooseStorageTypes(hot, replication, chosen,
579           StorageType.DISK, StorageType.DISK, StorageType.DISK);
580       method.checkChooseStorageTypes(warm, replication, chosen,
581           StorageType.DISK, StorageType.DISK);
582       method.checkChooseStorageTypes(cold, replication, chosen);
583     }
584 
585     {
586       final List<StorageType> chosen = Arrays.asList(
587           StorageType.DISK, StorageType.DISK);
588       method.checkChooseStorageTypes(hot, replication, chosen,
589           StorageType.DISK);
590       method.checkChooseStorageTypes(warm, replication, chosen,
591           StorageType.DISK);
592       method.checkChooseStorageTypes(cold, replication, chosen);
593     }
594 
595     {
596       final List<StorageType> chosen = Arrays.asList(
597           StorageType.DISK, StorageType.ARCHIVE);
598       method.checkChooseStorageTypes(hot, replication, chosen,
599           StorageType.DISK, StorageType.DISK);
600       method.checkChooseStorageTypes(warm, replication, chosen,
601           StorageType.DISK);
602       method.checkChooseStorageTypes(cold, replication, chosen);
603     }
604 
605     {
606       final List<StorageType> chosen = Arrays.asList(
607           StorageType.ARCHIVE, StorageType.ARCHIVE);
608       method.checkChooseStorageTypes(hot, replication, chosen,
609           StorageType.DISK, StorageType.DISK, StorageType.DISK);
610       method.checkChooseStorageTypes(warm, replication, chosen,
611           StorageType.DISK);
612       method.checkChooseStorageTypes(cold, replication, chosen);
613     }
614 
615     {
616       final List<StorageType> chosen = Arrays.asList(
617           StorageType.DISK, StorageType.DISK, StorageType.DISK);
618       method.checkChooseStorageTypes(hot, replication, chosen);
619       method.checkChooseStorageTypes(warm, replication, chosen);
620       method.checkChooseStorageTypes(cold, replication, chosen);
621     }
622 
623     {
624       final List<StorageType> chosen = Arrays.asList(
625           StorageType.DISK, StorageType.DISK, StorageType.ARCHIVE);
626       method.checkChooseStorageTypes(hot, replication, chosen,
627           StorageType.DISK);
628       method.checkChooseStorageTypes(warm, replication, chosen);
629       method.checkChooseStorageTypes(cold, replication, chosen);
630     }
631 
632     {
633       final List<StorageType> chosen = Arrays.asList(
634           StorageType.DISK, StorageType.ARCHIVE, StorageType.ARCHIVE);
635       method.checkChooseStorageTypes(hot, replication, chosen,
636           StorageType.DISK, StorageType.DISK);
637       method.checkChooseStorageTypes(warm, replication, chosen);
638       method.checkChooseStorageTypes(cold, replication, chosen);
639     }
640 
641     {
642       final List<StorageType> chosen = Arrays.asList(
643           StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE);
644       method.checkChooseStorageTypes(hot, replication, chosen,
645           StorageType.DISK, StorageType.DISK, StorageType.DISK);
646       method.checkChooseStorageTypes(warm, replication, chosen,
647           StorageType.DISK);
648       method.checkChooseStorageTypes(cold, replication, chosen);
649     }
650   }
651 
652   @Test
653   public void testChooseStorageTypesWithDiskUnavailableAndNonNewBlock() {
654     final BlockStoragePolicy hot = POLICY_SUITE.getPolicy(HOT);
655     final BlockStoragePolicy warm = POLICY_SUITE.getPolicy(WARM);
656     final BlockStoragePolicy cold = POLICY_SUITE.getPolicy(COLD);
657 
658     final short replication = 3;
659     final EnumSet<StorageType> unavailables = disk;
660     final boolean isNewBlock = false;
661     {
662       final List<StorageType> chosen = Lists.newArrayList();
663       checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock,
664           StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE);
665       checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock,
666           StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE);
667       checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock,
668           StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE);
669     }
670 
671     {
672       final List<StorageType> chosen = Arrays.asList(StorageType.DISK);
673       checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock,
674           StorageType.ARCHIVE, StorageType.ARCHIVE);
675       checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock,
676           StorageType.ARCHIVE, StorageType.ARCHIVE);
677       checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock,
678           StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE);
679     }
680 
681     {
682       final List<StorageType> chosen = Arrays.asList(StorageType.ARCHIVE);
683       checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock,
684           StorageType.ARCHIVE, StorageType.ARCHIVE);
685       checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock,
686           StorageType.ARCHIVE, StorageType.ARCHIVE);
687       checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock,
688           StorageType.ARCHIVE, StorageType.ARCHIVE);
689     }
690 
691     {
692       final List<StorageType> chosen = Arrays.asList(
693           StorageType.DISK, StorageType.DISK);
694       checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock,
695           StorageType.ARCHIVE);
696       checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock,
697           StorageType.ARCHIVE, StorageType.ARCHIVE);
698       checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock,
699           StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE);
700     }
701 
702     {
703       final List<StorageType> chosen = Arrays.asList(
704           StorageType.DISK, StorageType.ARCHIVE);
705       checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock,
706           StorageType.ARCHIVE);
707       checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock,
708           StorageType.ARCHIVE);
709       checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock,
710           StorageType.ARCHIVE, StorageType.ARCHIVE);
711     }
712 
713     {
714       final List<StorageType> chosen = Arrays.asList(
715           StorageType.ARCHIVE, StorageType.ARCHIVE);
716       checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock,
717           StorageType.ARCHIVE);
718       checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock,
719           StorageType.ARCHIVE);
720       checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock,
721           StorageType.ARCHIVE);
722     }
723 
724     {
725       final List<StorageType> chosen = Arrays.asList(
726           StorageType.DISK, StorageType.DISK, StorageType.DISK);
727       checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock);
728       checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock,
729           StorageType.ARCHIVE, StorageType.ARCHIVE);
730       checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock,
731           StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE);
732     }
733 
734     {
735       final List<StorageType> chosen = Arrays.asList(
736           StorageType.DISK, StorageType.DISK, StorageType.ARCHIVE);
737       checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock);
738       checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock,
739           StorageType.ARCHIVE);
740       checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock,
741           StorageType.ARCHIVE, StorageType.ARCHIVE);
742     }
743 
744     {
745       final List<StorageType> chosen = Arrays.asList(
746           StorageType.DISK, StorageType.ARCHIVE, StorageType.ARCHIVE);
747       checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock);
748       checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock);
749       checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock,
750           StorageType.ARCHIVE);
751     }
752 
753     {
754       final List<StorageType> chosen = Arrays.asList(
755           StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE);
756       checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock);
757       checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock);
758       checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock);
759     }
760   }
761 
762   static void checkChooseStorageTypes(BlockStoragePolicy p, short replication,
763       List<StorageType> chosen, EnumSet<StorageType> unavailables,
764       boolean isNewBlock, StorageType... expected) {
765     final List<StorageType> types = p.chooseStorageTypes(replication, chosen,
766         unavailables, isNewBlock);
767     assertStorageTypes(types, expected);
768   }
769 
770   static void assertStorageTypes(List<StorageType> computed, StorageType... expected) {
771     assertStorageTypes(computed.toArray(StorageType.EMPTY_ARRAY), expected);
772   }
773 
774   static void assertStorageTypes(StorageType[] computed, StorageType... expected) {
775     Arrays.sort(expected);
776     Arrays.sort(computed);
777     Assert.assertArrayEquals(expected, computed);
778   }
779 
780   @Test
781   public void testChooseExcess() {
782     final BlockStoragePolicy hot = POLICY_SUITE.getPolicy(HOT);
783     final BlockStoragePolicy warm = POLICY_SUITE.getPolicy(WARM);
784     final BlockStoragePolicy cold = POLICY_SUITE.getPolicy(COLD);
785 
786     final short replication = 3;
787     for(int n = 0; n <= 6; n++) {
788       for(int d = 0; d <= n; d++) {
789         final int a = n - d;
790         final List<StorageType> chosen = asList(d, a);
791         {
792           final int nDisk = Math.max(0, d - replication);
793           final int nArchive = a;
794           final StorageType[] expected = newStorageTypes(nDisk, nArchive);
795           checkChooseExcess(hot, replication, chosen, expected);
796         }
797 
798         {
799           final int nDisk = Math.max(0, d - 1);
800           final int nArchive = Math.max(0, a - replication + 1);
801           final StorageType[] expected = newStorageTypes(nDisk, nArchive);
802           checkChooseExcess(warm, replication, chosen, expected);
803         }
804 
805         {
806           final int nDisk = d;
807           final int nArchive = Math.max(0, a - replication );
808           final StorageType[] expected = newStorageTypes(nDisk, nArchive);
809           checkChooseExcess(cold, replication, chosen, expected);
810         }
811       }
812     }
813   }
814 
815   static void checkChooseExcess(BlockStoragePolicy p, short replication,
816       List<StorageType> chosen, StorageType... expected) {
817     final List<StorageType> types = p.chooseExcess(replication, chosen);
818     assertStorageTypes(types, expected);
819   }
820 
821   private void checkDirectoryListing(HdfsFileStatus[] stats, byte... policies) {
822     Assert.assertEquals(stats.length, policies.length);
823     for (int i = 0; i < stats.length; i++) {
824       Assert.assertEquals(stats[i].getStoragePolicy(), policies[i]);
825     }
826   }
827 
828   @Test
829   public void testSetStoragePolicy() throws Exception {
830     final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
831         .numDataNodes(REPLICATION).build();
832     cluster.waitActive();
833     final DistributedFileSystem fs = cluster.getFileSystem();
834     try {
835       final Path dir = new Path("/testSetStoragePolicy");
836       final Path fooFile = new Path(dir, "foo");
837       final Path barDir = new Path(dir, "bar");
838       final Path barFile1= new Path(barDir, "f1");
839       final Path barFile2= new Path(barDir, "f2");
840       DFSTestUtil.createFile(fs, fooFile, FILE_LEN, REPLICATION, 0L);
841       DFSTestUtil.createFile(fs, barFile1, FILE_LEN, REPLICATION, 0L);
842       DFSTestUtil.createFile(fs, barFile2, FILE_LEN, REPLICATION, 0L);
843 
844       final String invalidPolicyName = "INVALID-POLICY";
845       try {
846         fs.setStoragePolicy(fooFile, invalidPolicyName);
847         Assert.fail("Should throw a HadoopIllegalArgumentException");
848       } catch (RemoteException e) {
849         GenericTestUtils.assertExceptionContains(invalidPolicyName, e);
850       }
851 
852       // check storage policy
853       HdfsFileStatus[] dirList = fs.getClient().listPaths(dir.toString(),
854           HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
855       HdfsFileStatus[] barList = fs.getClient().listPaths(barDir.toString(),
856           HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
857       checkDirectoryListing(dirList, ID_UNSPECIFIED, ID_UNSPECIFIED);
858       checkDirectoryListing(barList, ID_UNSPECIFIED, ID_UNSPECIFIED);
859 
860       final Path invalidPath = new Path("/invalidPath");
861       try {
862         fs.setStoragePolicy(invalidPath, HdfsConstants.WARM_STORAGE_POLICY_NAME);
863         Assert.fail("Should throw a FileNotFoundException");
864       } catch (FileNotFoundException e) {
865         GenericTestUtils.assertExceptionContains(invalidPath.toString(), e);
866       }
867 
868       fs.setStoragePolicy(fooFile, HdfsConstants.COLD_STORAGE_POLICY_NAME);
869       fs.setStoragePolicy(barDir, HdfsConstants.WARM_STORAGE_POLICY_NAME);
870       fs.setStoragePolicy(barFile2, HdfsConstants.HOT_STORAGE_POLICY_NAME);
871 
872       dirList = fs.getClient().listPaths(dir.toString(),
873           HdfsFileStatus.EMPTY_NAME).getPartialListing();
874       barList = fs.getClient().listPaths(barDir.toString(),
875           HdfsFileStatus.EMPTY_NAME).getPartialListing();
876       checkDirectoryListing(dirList, WARM, COLD); // bar is warm, foo is cold
877       checkDirectoryListing(barList, WARM, HOT);
878 
879       // restart namenode to make sure the editlog is correct
880       cluster.restartNameNode(true);
881       dirList = fs.getClient().listPaths(dir.toString(),
882           HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
883       barList = fs.getClient().listPaths(barDir.toString(),
884           HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
885       checkDirectoryListing(dirList, WARM, COLD); // bar is warm, foo is cold
886       checkDirectoryListing(barList, WARM, HOT);
887 
888       // restart namenode with checkpoint to make sure the fsimage is correct
889       fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
890       fs.saveNamespace();
891       fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
892       cluster.restartNameNode(true);
893       dirList = fs.getClient().listPaths(dir.toString(),
894           HdfsFileStatus.EMPTY_NAME).getPartialListing();
895       barList = fs.getClient().listPaths(barDir.toString(),
896           HdfsFileStatus.EMPTY_NAME).getPartialListing();
897       checkDirectoryListing(dirList, WARM, COLD); // bar is warm, foo is cold
898       checkDirectoryListing(barList, WARM, HOT);
899     } finally {
900       cluster.shutdown();
901     }
902   }
903 
904   @Test
905   public void testSetStoragePolicyWithSnapshot() throws Exception {
906     final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
907         .numDataNodes(REPLICATION).build();
908     cluster.waitActive();
909     final DistributedFileSystem fs = cluster.getFileSystem();
910     try {
911       final Path dir = new Path("/testSetStoragePolicyWithSnapshot");
912       final Path fooDir = new Path(dir, "foo");
913       final Path fooFile1= new Path(fooDir, "f1");
914       final Path fooFile2= new Path(fooDir, "f2");
915       DFSTestUtil.createFile(fs, fooFile1, FILE_LEN, REPLICATION, 0L);
916       DFSTestUtil.createFile(fs, fooFile2, FILE_LEN, REPLICATION, 0L);
917 
918       fs.setStoragePolicy(fooDir, HdfsConstants.WARM_STORAGE_POLICY_NAME);
919 
920       HdfsFileStatus[] dirList = fs.getClient().listPaths(dir.toString(),
921           HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
922       checkDirectoryListing(dirList, WARM);
923       HdfsFileStatus[] fooList = fs.getClient().listPaths(fooDir.toString(),
924           HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
925       checkDirectoryListing(fooList, WARM, WARM);
926 
927       // take snapshot
928       SnapshotTestHelper.createSnapshot(fs, dir, "s1");
929       // change the storage policy of fooFile1
930       fs.setStoragePolicy(fooFile1, HdfsConstants.COLD_STORAGE_POLICY_NAME);
931 
932       fooList = fs.getClient().listPaths(fooDir.toString(),
933           HdfsFileStatus.EMPTY_NAME).getPartialListing();
934       checkDirectoryListing(fooList, COLD, WARM);
935 
936       // check the policy for /dir/.snapshot/s1/foo/f1. Note we always return
937       // the latest storage policy for a file/directory.
938       Path s1f1 = SnapshotTestHelper.getSnapshotPath(dir, "s1", "foo/f1");
939       DirectoryListing f1Listing = fs.getClient().listPaths(s1f1.toString(),
940           HdfsFileStatus.EMPTY_NAME);
941       checkDirectoryListing(f1Listing.getPartialListing(), COLD);
942 
943       // delete f1
944       fs.delete(fooFile1, true);
945       fooList = fs.getClient().listPaths(fooDir.toString(),
946           HdfsFileStatus.EMPTY_NAME).getPartialListing();
947       checkDirectoryListing(fooList, WARM);
948       // check the policy for /dir/.snapshot/s1/foo/f1 again after the deletion
949       checkDirectoryListing(fs.getClient().listPaths(s1f1.toString(),
950           HdfsFileStatus.EMPTY_NAME).getPartialListing(), COLD);
951 
952       // change the storage policy of foo dir
953       fs.setStoragePolicy(fooDir, HdfsConstants.HOT_STORAGE_POLICY_NAME);
954       // /dir/foo is now hot
955       dirList = fs.getClient().listPaths(dir.toString(),
956           HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
957       checkDirectoryListing(dirList, HOT);
958       // /dir/foo/f2 is hot
959       fooList = fs.getClient().listPaths(fooDir.toString(),
960           HdfsFileStatus.EMPTY_NAME).getPartialListing();
961       checkDirectoryListing(fooList, HOT);
962 
963       // check storage policy of snapshot path
964       Path s1 = SnapshotTestHelper.getSnapshotRoot(dir, "s1");
965       Path s1foo = SnapshotTestHelper.getSnapshotPath(dir, "s1", "foo");
966       checkDirectoryListing(fs.getClient().listPaths(s1.toString(),
967           HdfsFileStatus.EMPTY_NAME).getPartialListing(), HOT);
968       // /dir/.snapshot/.s1/foo/f1 and /dir/.snapshot/.s1/foo/f2 should still
969       // follow the latest
970       checkDirectoryListing(fs.getClient().listPaths(s1foo.toString(),
971           HdfsFileStatus.EMPTY_NAME).getPartialListing(), COLD, HOT);
972 
973       // delete foo
974       fs.delete(fooDir, true);
975       checkDirectoryListing(fs.getClient().listPaths(s1.toString(),
976           HdfsFileStatus.EMPTY_NAME).getPartialListing(), HOT);
977       checkDirectoryListing(fs.getClient().listPaths(s1foo.toString(),
978           HdfsFileStatus.EMPTY_NAME).getPartialListing(), COLD, HOT);
979     } finally {
980       cluster.shutdown();
981     }
982   }
983 
984   private static StorageType[][] genStorageTypes(int numDataNodes) {
985     StorageType[][] types = new StorageType[numDataNodes][];
986     for (int i = 0; i < types.length; i++) {
987       types[i] = new StorageType[]{StorageType.DISK, StorageType.ARCHIVE};
988     }
989     return types;
990   }
991 
992   private void checkLocatedBlocks(HdfsLocatedFileStatus status, int blockNum,
993                                   int replicaNum, StorageType... types) {
994     List<StorageType> typeList = Lists.newArrayList();
995     Collections.addAll(typeList, types);
996     LocatedBlocks lbs = status.getBlockLocations();
997     Assert.assertEquals(blockNum, lbs.getLocatedBlocks().size());
998     for (LocatedBlock lb : lbs.getLocatedBlocks()) {
999       Assert.assertEquals(replicaNum, lb.getStorageTypes().length);
1000       for (StorageType type : lb.getStorageTypes()) {
1001         Assert.assertTrue(typeList.remove(type));
1002       }
1003     }
1004     Assert.assertTrue(typeList.isEmpty());
1005   }
1006 
1007   private void testChangeFileRep(String policyName, byte policyId,
1008                                    StorageType[] before,
1009                                    StorageType[] after) throws Exception {
1010     final int numDataNodes = 5;
1011     final StorageType[][] types = genStorageTypes(numDataNodes);
1012     final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
1013         .numDataNodes(numDataNodes).storageTypes(types).build();
1014     cluster.waitActive();
1015     final DistributedFileSystem fs = cluster.getFileSystem();
1016     try {
1017       final Path dir = new Path("/test");
1018       fs.mkdirs(dir);
1019       fs.setStoragePolicy(dir, policyName);
1020 
1021       final Path foo = new Path(dir, "foo");
1022       DFSTestUtil.createFile(fs, foo, FILE_LEN, REPLICATION, 0L);
1023 
1024       HdfsFileStatus[] status = fs.getClient().listPaths(foo.toString(),
1025           HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
1026       checkDirectoryListing(status, policyId);
1027       HdfsLocatedFileStatus fooStatus = (HdfsLocatedFileStatus) status[0];
1028       checkLocatedBlocks(fooStatus, 1, 3, before);
1029 
1030       // change the replication factor to 5
1031       fs.setReplication(foo, (short) numDataNodes);
1032       Thread.sleep(1000);
1033       for (DataNode dn : cluster.getDataNodes()) {
1034         DataNodeTestUtils.triggerHeartbeat(dn);
1035       }
1036       Thread.sleep(1000);
1037       status = fs.getClient().listPaths(foo.toString(),
1038           HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
1039       checkDirectoryListing(status, policyId);
1040       fooStatus = (HdfsLocatedFileStatus) status[0];
1041       checkLocatedBlocks(fooStatus, 1, numDataNodes, after);
1042 
1043       // change the replication factor back to 3
1044       fs.setReplication(foo, REPLICATION);
1045       Thread.sleep(1000);
1046       for (DataNode dn : cluster.getDataNodes()) {
1047         DataNodeTestUtils.triggerHeartbeat(dn);
1048       }
1049       Thread.sleep(1000);
1050       for (DataNode dn : cluster.getDataNodes()) {
1051         DataNodeTestUtils.triggerBlockReport(dn);
1052       }
1053       Thread.sleep(1000);
1054       status = fs.getClient().listPaths(foo.toString(),
1055           HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
1056       checkDirectoryListing(status, policyId);
1057       fooStatus = (HdfsLocatedFileStatus) status[0];
1058       checkLocatedBlocks(fooStatus, 1, REPLICATION, before);
1059     } finally {
1060       cluster.shutdown();
1061     }
1062   }
1063 
1064   /**
1065    * Consider a File with Hot storage policy. Increase replication factor of
1066    * that file from 3 to 5. Make sure all replications are created in DISKS.
1067    */
1068   @Test
1069   public void testChangeHotFileRep() throws Exception {
1070     testChangeFileRep(HdfsConstants.HOT_STORAGE_POLICY_NAME, HOT,
1071         new StorageType[]{StorageType.DISK, StorageType.DISK,
1072             StorageType.DISK},
1073         new StorageType[]{StorageType.DISK, StorageType.DISK, StorageType.DISK,
1074             StorageType.DISK, StorageType.DISK});
1075   }
1076 
1077   /**
1078    * Consider a File with Warm temperature. Increase replication factor of
1079    * that file from 3 to 5. Make sure all replicas are created in DISKS
1080    * and ARCHIVE.
1081    */
1082   @Test
1083   public void testChangeWarmRep() throws Exception {
1084     testChangeFileRep(HdfsConstants.WARM_STORAGE_POLICY_NAME, WARM,
1085         new StorageType[]{StorageType.DISK, StorageType.ARCHIVE,
1086             StorageType.ARCHIVE},
1087         new StorageType[]{StorageType.DISK, StorageType.ARCHIVE,
1088             StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE});
1089   }
1090 
1091   /**
1092    * Consider a File with Cold temperature. Increase replication factor of
1093    * that file from 3 to 5. Make sure all replicas are created in ARCHIVE.
1094    */
1095   @Test
1096   public void testChangeColdRep() throws Exception {
1097     testChangeFileRep(HdfsConstants.COLD_STORAGE_POLICY_NAME, COLD,
1098         new StorageType[]{StorageType.ARCHIVE, StorageType.ARCHIVE,
1099             StorageType.ARCHIVE},
1100         new StorageType[]{StorageType.ARCHIVE, StorageType.ARCHIVE,
1101             StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE});
1102   }
1103 
1104   @Test
1105   public void testChooseTargetWithTopology() throws Exception {
1106     BlockStoragePolicy policy1 = new BlockStoragePolicy((byte) 9, "TEST1",
1107         new StorageType[]{StorageType.SSD, StorageType.DISK,
1108             StorageType.ARCHIVE}, new StorageType[]{}, new StorageType[]{});
1109     BlockStoragePolicy policy2 = new BlockStoragePolicy((byte) 11, "TEST2",
1110         new StorageType[]{StorageType.DISK, StorageType.SSD,
1111             StorageType.ARCHIVE}, new StorageType[]{}, new StorageType[]{});
1112 
1113     final String[] racks = {"/d1/r1", "/d1/r2", "/d1/r2"};
1114     final String[] hosts = {"host1", "host2", "host3"};
1115     final StorageType[] types = {StorageType.DISK, StorageType.SSD,
1116         StorageType.ARCHIVE};
1117 
1118     final DatanodeStorageInfo[] storages = DFSTestUtil
1119         .createDatanodeStorageInfos(3, racks, hosts, types);
1120     final DatanodeDescriptor[] dataNodes = DFSTestUtil
1121         .toDatanodeDescriptor(storages);
1122 
1123     FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
1124     conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
1125     File baseDir = PathUtils.getTestDir(TestReplicationPolicy.class);
1126     conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
1127         new File(baseDir, "name").getPath());
1128     DFSTestUtil.formatNameNode(conf);
1129     NameNode namenode = new NameNode(conf);
1130 
1131     final BlockManager bm = namenode.getNamesystem().getBlockManager();
1132     BlockPlacementPolicy replicator = bm.getBlockPlacementPolicy();
1133     NetworkTopology cluster = bm.getDatanodeManager().getNetworkTopology();
1134     for (DatanodeDescriptor datanode : dataNodes) {
1135       cluster.add(datanode);
1136     }
1137 
1138     DatanodeStorageInfo[] targets = replicator.chooseTarget("/foo", 3,
1139         dataNodes[0], Collections.<DatanodeStorageInfo>emptyList(), false,
1140         new HashSet<Node>(), 0, policy1);
1141     System.out.println(Arrays.asList(targets));
1142     Assert.assertEquals(3, targets.length);
1143     targets = replicator.chooseTarget("/foo", 3,
1144         dataNodes[0], Collections.<DatanodeStorageInfo>emptyList(), false,
1145         new HashSet<Node>(), 0, policy2);
1146     System.out.println(Arrays.asList(targets));
1147     Assert.assertEquals(3, targets.length);
1148   }
1149 
1150   /**
1151    * Test getting all the storage policies from the namenode
1152    */
1153   @Test
1154   public void testGetAllStoragePolicies() throws Exception {
1155     final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
1156         .numDataNodes(0).build();
1157     cluster.waitActive();
1158     final DistributedFileSystem fs = cluster.getFileSystem();
1159     try {
1160       BlockStoragePolicy[] policies = fs.getStoragePolicies();
1161       Assert.assertEquals(6, policies.length);
1162       Assert.assertEquals(POLICY_SUITE.getPolicy(COLD).toString(),
1163           policies[0].toString());
1164       Assert.assertEquals(POLICY_SUITE.getPolicy(WARM).toString(),
1165           policies[1].toString());
1166       Assert.assertEquals(POLICY_SUITE.getPolicy(HOT).toString(),
1167           policies[2].toString());
1168       Assert.assertEquals(POLICY_SUITE.getPolicy(ONESSD).toString(),
1169           policies[3].toString());
1170       Assert.assertEquals(POLICY_SUITE.getPolicy(ALLSSD).toString(),
1171           policies[4].toString());
1172       Assert.assertEquals(POLICY_SUITE.getPolicy(LAZY_PERSIST).toString(),
1173           policies[5].toString());
1174     } finally {
1175       IOUtils.cleanup(null, fs);
1176       cluster.shutdown();
1177     }
1178   }
1179 
1180   @Test
1181   public void testChooseSsdOverDisk() throws Exception {
1182     BlockStoragePolicy policy = new BlockStoragePolicy((byte) 9, "TEST1",
1183         new StorageType[]{StorageType.SSD, StorageType.DISK,
1184             StorageType.ARCHIVE}, new StorageType[]{}, new StorageType[]{});
1185 
1186     final String[] racks = {"/d1/r1", "/d1/r1", "/d1/r1"};
1187     final String[] hosts = {"host1", "host2", "host3"};
1188     final StorageType[] disks = {StorageType.DISK, StorageType.DISK, StorageType.DISK};
1189 
1190     final DatanodeStorageInfo[] diskStorages
1191         = DFSTestUtil.createDatanodeStorageInfos(3, racks, hosts, disks);
1192     final DatanodeDescriptor[] dataNodes
1193         = DFSTestUtil.toDatanodeDescriptor(diskStorages);
1194     for(int i = 0; i < dataNodes.length; i++) {
1195       BlockManagerTestUtil.updateStorage(dataNodes[i],
1196           new DatanodeStorage("ssd" + i, DatanodeStorage.State.NORMAL,
1197               StorageType.SSD));
1198     }
1199 
1200     FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
1201     conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
1202     File baseDir = PathUtils.getTestDir(TestReplicationPolicy.class);
1203     conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
1204         new File(baseDir, "name").getPath());
1205     DFSTestUtil.formatNameNode(conf);
1206     NameNode namenode = new NameNode(conf);
1207 
1208     final BlockManager bm = namenode.getNamesystem().getBlockManager();
1209     BlockPlacementPolicy replicator = bm.getBlockPlacementPolicy();
1210     NetworkTopology cluster = bm.getDatanodeManager().getNetworkTopology();
1211     for (DatanodeDescriptor datanode : dataNodes) {
1212       cluster.add(datanode);
1213     }
1214 
1215     DatanodeStorageInfo[] targets = replicator.chooseTarget("/foo", 3,
1216         dataNodes[0], Collections.<DatanodeStorageInfo>emptyList(), false,
1217         new HashSet<Node>(), 0, policy);
1218     System.out.println(policy.getName() + ": " + Arrays.asList(targets));
1219     Assert.assertEquals(2, targets.length);
1220     Assert.assertEquals(StorageType.SSD, targets[0].getStorageType());
1221     Assert.assertEquals(StorageType.DISK, targets[1].getStorageType());
1222   }
1223 
1224   @Test
1225   public void testStorageType() {
1226     final EnumMap<StorageType, Integer> map = new EnumMap<>(StorageType.class);
1227 
1228     //put storage type is reversed order
1229     map.put(StorageType.ARCHIVE, 1);
1230     map.put(StorageType.DISK, 1);
1231     map.put(StorageType.SSD, 1);
1232     map.put(StorageType.RAM_DISK, 1);
1233 
1234     {
1235       final Iterator<StorageType> i = map.keySet().iterator();
1236       Assert.assertEquals(StorageType.RAM_DISK, i.next());
1237       Assert.assertEquals(StorageType.SSD, i.next());
1238       Assert.assertEquals(StorageType.DISK, i.next());
1239       Assert.assertEquals(StorageType.ARCHIVE, i.next());
1240     }
1241 
1242     {
1243       final Iterator<Map.Entry<StorageType, Integer>> i
1244           = map.entrySet().iterator();
1245       Assert.assertEquals(StorageType.RAM_DISK, i.next().getKey());
1246       Assert.assertEquals(StorageType.SSD, i.next().getKey());
1247       Assert.assertEquals(StorageType.DISK, i.next().getKey());
1248       Assert.assertEquals(StorageType.ARCHIVE, i.next().getKey());
1249     }
1250   }
1251 
1252   public void testGetFileStoragePolicyAfterRestartNN() throws Exception {
1253     //HDFS8219
1254     final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
1255         .numDataNodes(REPLICATION)
1256         .storageTypes(
1257             new StorageType[] {StorageType.DISK, StorageType.ARCHIVE})
1258         .build();
1259     cluster.waitActive();
1260     final DistributedFileSystem fs = cluster.getFileSystem();
1261     try {
1262       final String file = "/testScheduleWithinSameNode/file";
1263       Path dir = new Path("/testScheduleWithinSameNode");
1264       fs.mkdirs(dir);
1265       // 2. Set Dir policy
1266       fs.setStoragePolicy(dir, "COLD");
1267       // 3. Create file
1268       final FSDataOutputStream out = fs.create(new Path(file));
1269       out.writeChars("testScheduleWithinSameNode");
1270       out.close();
1271       // 4. Set Dir policy
1272       fs.setStoragePolicy(dir, "HOT");
1273       HdfsFileStatus status = fs.getClient().getFileInfo(file);
1274       // 5. get file policy, it should be parent policy.
1275       Assert
1276           .assertTrue(
1277               "File storage policy should be HOT",
1278               status.getStoragePolicy()
1279               == HdfsConstants.HOT_STORAGE_POLICY_ID);
1280       // 6. restart NameNode for reloading edits logs.
1281       cluster.restartNameNode(true);
1282       // 7. get file policy, it should be parent policy.
1283       status = fs.getClient().getFileInfo(file);
1284       Assert
1285           .assertTrue(
1286               "File storage policy should be HOT",
1287               status.getStoragePolicy()
1288               == HdfsConstants.HOT_STORAGE_POLICY_ID);
1289 
1290     } finally {
1291       cluster.shutdown();
1292     }
1293   }
1294 }
1295