1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hdfs.server.blockmanagement;
19 
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertFalse;
22 import static org.junit.Assert.assertTrue;
23 
24 import java.io.File;
25 import java.util.ArrayList;
26 import java.util.Arrays;
27 import java.util.HashMap;
28 import java.util.HashSet;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Set;
32 
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
35 import org.apache.hadoop.fs.FileSystem;
36 import org.apache.hadoop.fs.StorageType;
37 import org.apache.hadoop.hdfs.DFSConfigKeys;
38 import org.apache.hadoop.hdfs.DFSTestUtil;
39 import org.apache.hadoop.hdfs.HdfsConfiguration;
40 import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
41 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
42 import org.apache.hadoop.hdfs.server.namenode.NameNode;
43 import org.apache.hadoop.net.NetworkTopology;
44 import org.apache.hadoop.net.NetworkTopologyWithNodeGroup;
45 import org.apache.hadoop.net.Node;
46 import org.apache.hadoop.test.PathUtils;
47 import org.junit.After;
48 import org.junit.Before;
49 import org.junit.Test;
50 
51 
52 public class TestReplicationPolicyWithNodeGroup {
53   private static final int BLOCK_SIZE = 1024;
54   private static final int NUM_OF_DATANODES = 8;
55   private static final int NUM_OF_DATANODES_BOUNDARY = 6;
56   private static final int NUM_OF_DATANODES_MORE_TARGETS = 12;
57   private static final int NUM_OF_DATANODES_FOR_DEPENDENCIES = 6;
58   private final Configuration CONF = new HdfsConfiguration();
59   private NetworkTopology cluster;
60   private NameNode namenode;
61   private BlockPlacementPolicy replicator;
62   private static final String filename = "/dummyfile.txt";
63 
64   private static final DatanodeStorageInfo[] storages;
65   private static final DatanodeDescriptor[] dataNodes;
66   static {
67     final String[] racks = {
68         "/d1/r1/n1",
69         "/d1/r1/n1",
70         "/d1/r1/n2",
71         "/d1/r2/n3",
72         "/d1/r2/n3",
73         "/d1/r2/n4",
74         "/d2/r3/n5",
75         "/d2/r3/n6"
76     };
77     storages = DFSTestUtil.createDatanodeStorageInfos(racks);
78     dataNodes = DFSTestUtil.toDatanodeDescriptor(storages);
79   }
80 
81   private static final DatanodeStorageInfo[] storagesInBoundaryCase;
82   private static final DatanodeDescriptor[] dataNodesInBoundaryCase;
83   static {
84     final String[] racksInBoundaryCase = {
85         "/d1/r1/n1",
86         "/d1/r1/n1",
87         "/d1/r1/n1",
88         "/d1/r1/n2",
89         "/d1/r2/n3",
90         "/d1/r2/n3"
91     };
92     storagesInBoundaryCase = DFSTestUtil.createDatanodeStorageInfos(racksInBoundaryCase);
93     dataNodesInBoundaryCase = DFSTestUtil.toDatanodeDescriptor(storagesInBoundaryCase);
94   }
95 
96   private static final DatanodeStorageInfo[] storagesInMoreTargetsCase;
97   private final static DatanodeDescriptor[] dataNodesInMoreTargetsCase;
98   static {
99     final String[] racksInMoreTargetsCase = {
100         "/r1/n1",
101         "/r1/n1",
102         "/r1/n2",
103         "/r1/n2",
104         "/r1/n3",
105         "/r1/n3",
106         "/r2/n4",
107         "/r2/n4",
108         "/r2/n5",
109         "/r2/n5",
110         "/r2/n6",
111         "/r2/n6"
112     };
113     storagesInMoreTargetsCase = DFSTestUtil.createDatanodeStorageInfos(racksInMoreTargetsCase);
114     dataNodesInMoreTargetsCase = DFSTestUtil.toDatanodeDescriptor(storagesInMoreTargetsCase);
115   };
116 
117   private final static DatanodeDescriptor NODE =
118       new DatanodeDescriptor(DFSTestUtil.getDatanodeDescriptor("9.9.9.9", "/d2/r4/n7"));
119 
120   private static final DatanodeStorageInfo[] storagesForDependencies;
121   private static final DatanodeDescriptor[]  dataNodesForDependencies;
122   static {
123     final String[] racksForDependencies = {
124         "/d1/r1/n1",
125         "/d1/r1/n1",
126         "/d1/r1/n2",
127         "/d1/r1/n2",
128         "/d1/r1/n3",
129         "/d1/r1/n4"
130     };
131     final String[] hostNamesForDependencies = {
132         "h1",
133         "h2",
134         "h3",
135         "h4",
136         "h5",
137         "h6"
138     };
139 
140     storagesForDependencies = DFSTestUtil.createDatanodeStorageInfos(
141         racksForDependencies, hostNamesForDependencies);
142     dataNodesForDependencies = DFSTestUtil.toDatanodeDescriptor(storagesForDependencies);
143 
144   };
145 
146   @Before
setUp()147   public void setUp() throws Exception {
148     FileSystem.setDefaultUri(CONF, "hdfs://localhost:0");
149     CONF.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
150     // Set properties to make HDFS aware of NodeGroup.
151     CONF.set(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
152         BlockPlacementPolicyWithNodeGroup.class.getName());
153     CONF.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY,
154         NetworkTopologyWithNodeGroup.class.getName());
155 
156     CONF.setBoolean(DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY, true);
157 
158     File baseDir = PathUtils.getTestDir(TestReplicationPolicyWithNodeGroup.class);
159 
160     CONF.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
161         new File(baseDir, "name").getPath());
162 
163     DFSTestUtil.formatNameNode(CONF);
164     namenode = new NameNode(CONF);
165     final BlockManager bm = namenode.getNamesystem().getBlockManager();
166     replicator = bm.getBlockPlacementPolicy();
167     cluster = bm.getDatanodeManager().getNetworkTopology();
168     // construct network topology
169     for(int i=0; i<NUM_OF_DATANODES; i++) {
170       cluster.add(dataNodes[i]);
171     }
172     setupDataNodeCapacity();
173   }
174 
175   @After
tearDown()176   public void tearDown() throws Exception {
177     namenode.stop();
178   }
179 
updateHeartbeatWithUsage(DatanodeDescriptor dn, long capacity, long dfsUsed, long remaining, long blockPoolUsed, long dnCacheCapacity, long dnCacheUsed, int xceiverCount, int volFailures)180   private static void updateHeartbeatWithUsage(DatanodeDescriptor dn,
181       long capacity, long dfsUsed, long remaining, long blockPoolUsed,
182       long dnCacheCapacity, long dnCacheUsed, int xceiverCount,
183       int volFailures) {
184     dn.getStorageInfos()[0].setUtilizationForTesting(
185         capacity, dfsUsed, remaining, blockPoolUsed);
186     dn.updateHeartbeat(
187         BlockManagerTestUtil.getStorageReportsForDatanode(dn),
188         dnCacheCapacity, dnCacheUsed, xceiverCount, volFailures, null);
189   }
190 
setupDataNodeCapacity()191   private static void setupDataNodeCapacity() {
192     for(int i=0; i<NUM_OF_DATANODES; i++) {
193       updateHeartbeatWithUsage(dataNodes[i],
194           2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
195           2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
196     }
197   }
198 
199   /**
200    * Scan the targets list: all targets should be on different NodeGroups.
201    * Return false if two targets are found on the same NodeGroup.
202    */
checkTargetsOnDifferentNodeGroup( DatanodeStorageInfo[] targets)203   private static boolean checkTargetsOnDifferentNodeGroup(
204       DatanodeStorageInfo[] targets) {
205     if(targets.length == 0)
206       return true;
207     Set<String> targetSet = new HashSet<String>();
208     for(DatanodeStorageInfo storage:targets) {
209       final DatanodeDescriptor node = storage.getDatanodeDescriptor();
210       String nodeGroup = NetworkTopology.getLastHalf(node.getNetworkLocation());
211       if(targetSet.contains(nodeGroup)) {
212         return false;
213       } else {
214         targetSet.add(nodeGroup);
215       }
216     }
217     return true;
218   }
219 
isOnSameRack(DatanodeStorageInfo left, DatanodeStorageInfo right)220   private boolean isOnSameRack(DatanodeStorageInfo left, DatanodeStorageInfo right) {
221     return isOnSameRack(left.getDatanodeDescriptor(), right);
222   }
223 
isOnSameRack(DatanodeDescriptor left, DatanodeStorageInfo right)224   private boolean isOnSameRack(DatanodeDescriptor left, DatanodeStorageInfo right) {
225     return cluster.isOnSameRack(left, right.getDatanodeDescriptor());
226   }
227 
isOnSameNodeGroup(DatanodeStorageInfo left, DatanodeStorageInfo right)228   private boolean isOnSameNodeGroup(DatanodeStorageInfo left, DatanodeStorageInfo right) {
229     return isOnSameNodeGroup(left.getDatanodeDescriptor(), right);
230   }
231 
isOnSameNodeGroup(DatanodeDescriptor left, DatanodeStorageInfo right)232   private boolean isOnSameNodeGroup(DatanodeDescriptor left, DatanodeStorageInfo right) {
233     return cluster.isOnSameNodeGroup(left, right.getDatanodeDescriptor());
234   }
235 
chooseTarget(int numOfReplicas)236   private DatanodeStorageInfo[] chooseTarget(int numOfReplicas) {
237     return chooseTarget(numOfReplicas, dataNodes[0]);
238   }
239 
chooseTarget(int numOfReplicas, DatanodeDescriptor writer)240   private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
241       DatanodeDescriptor writer) {
242     return chooseTarget(numOfReplicas, writer,
243         new ArrayList<DatanodeStorageInfo>());
244   }
245 
chooseTarget(int numOfReplicas, List<DatanodeStorageInfo> chosenNodes)246   private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
247       List<DatanodeStorageInfo> chosenNodes) {
248     return chooseTarget(numOfReplicas, dataNodes[0], chosenNodes);
249   }
250 
chooseTarget(int numOfReplicas, DatanodeDescriptor writer, List<DatanodeStorageInfo> chosenNodes)251   private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
252       DatanodeDescriptor writer, List<DatanodeStorageInfo> chosenNodes) {
253     return chooseTarget(numOfReplicas, writer, chosenNodes, null);
254   }
255 
chooseTarget( int numOfReplicas, DatanodeDescriptor writer, List<DatanodeStorageInfo> chosenNodes, Set<Node> excludedNodes)256   private DatanodeStorageInfo[] chooseTarget(
257       int numOfReplicas,
258       DatanodeDescriptor writer,
259       List<DatanodeStorageInfo> chosenNodes,
260       Set<Node> excludedNodes) {
261     return replicator.chooseTarget(filename, numOfReplicas, writer, chosenNodes,
262         false, excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
263   }
264 
265   /**
266    * In this testcase, client is dataNodes[0]. So the 1st replica should be
267    * placed on dataNodes[0], the 2nd replica should be placed on
268    * different rack and third should be placed on different node (and node group)
269    * of rack chosen for 2nd node.
270    * The only excpetion is when the <i>numOfReplicas</i> is 2,
271    * the 1st is on dataNodes[0] and the 2nd is on a different rack.
272    * @throws Exception
273    */
274   @Test
testChooseTarget1()275   public void testChooseTarget1() throws Exception {
276     updateHeartbeatWithUsage(dataNodes[0],
277         2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
278         HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
279         0L, 0L, 4, 0); // overloaded
280 
281     DatanodeStorageInfo[] targets;
282     targets = chooseTarget(0);
283     assertEquals(targets.length, 0);
284 
285     targets = chooseTarget(1);
286     assertEquals(targets.length, 1);
287     assertEquals(storages[0], targets[0]);
288 
289 
290     targets = chooseTarget(2);
291     assertEquals(targets.length, 2);
292     assertEquals(storages[0], targets[0]);
293 
294     assertFalse(isOnSameRack(targets[0], targets[1]));
295 
296     targets = chooseTarget(3);
297     assertEquals(targets.length, 3);
298     assertEquals(storages[0], targets[0]);
299 
300     assertFalse(isOnSameRack(targets[0], targets[1]));
301     assertTrue(isOnSameRack(targets[1], targets[2]));
302     assertFalse(isOnSameNodeGroup(targets[1], targets[2]));
303 
304     targets = chooseTarget(4);
305     assertEquals(targets.length, 4);
306     assertEquals(storages[0], targets[0]);
307 
308     assertTrue(isOnSameRack(targets[1], targets[2]) ||
309                isOnSameRack(targets[2], targets[3]));
310     assertFalse(isOnSameRack(targets[0], targets[2]));
311     // Make sure no more than one replicas are on the same nodegroup
312     verifyNoTwoTargetsOnSameNodeGroup(targets);
313 
314     updateHeartbeatWithUsage(dataNodes[0],
315         2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
316         HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
317   }
318 
verifyNoTwoTargetsOnSameNodeGroup(DatanodeStorageInfo[] targets)319   private void verifyNoTwoTargetsOnSameNodeGroup(DatanodeStorageInfo[] targets) {
320     Set<String> nodeGroupSet = new HashSet<String>();
321     for (DatanodeStorageInfo target: targets) {
322       nodeGroupSet.add(target.getDatanodeDescriptor().getNetworkLocation());
323     }
324     assertEquals(nodeGroupSet.size(), targets.length);
325   }
326 
327   /**
328    * In this testcase, client is dataNodes[0], but the dataNodes[1] is
329    * not allowed to be chosen. So the 1st replica should be
330    * placed on dataNodes[0], the 2nd replica should be placed on a different
331    * rack, the 3rd should be on same rack as the 2nd replica but in different
332    * node group, and the rest should be placed on a third rack.
333    * @throws Exception
334    */
335   @Test
testChooseTarget2()336   public void testChooseTarget2() throws Exception {
337     DatanodeStorageInfo[] targets;
338     BlockPlacementPolicyDefault repl = (BlockPlacementPolicyDefault)replicator;
339     List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
340 
341     Set<Node> excludedNodes = new HashSet<Node>();
342     excludedNodes.add(dataNodes[1]);
343     targets = repl.chooseTarget(filename, 4, dataNodes[0], chosenNodes, false,
344         excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
345     assertEquals(targets.length, 4);
346     assertEquals(storages[0], targets[0]);
347 
348     assertTrue(cluster.isNodeGroupAware());
349     // Make sure no replicas are on the same nodegroup
350     for (int i=1;i<4;i++) {
351       assertFalse(isOnSameNodeGroup(targets[0], targets[i]));
352     }
353     assertTrue(isOnSameRack(targets[1], targets[2]) ||
354                isOnSameRack(targets[2], targets[3]));
355     assertFalse(isOnSameRack(targets[1], targets[3]));
356 
357     excludedNodes.clear();
358     chosenNodes.clear();
359     excludedNodes.add(dataNodes[1]);
360     chosenNodes.add(storages[2]);
361     targets = repl.chooseTarget(filename, 1, dataNodes[0], chosenNodes, true,
362         excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
363     System.out.println("targets=" + Arrays.asList(targets));
364     assertEquals(2, targets.length);
365     //make sure that the chosen node is in the target.
366     int i = 0;
367     for(; i < targets.length && !storages[2].equals(targets[i]); i++);
368     assertTrue(i < targets.length);
369   }
370 
371   /**
372    * In this testcase, client is dataNodes[0], but dataNodes[0] is not qualified
373    * to be chosen. So the 1st replica should be placed on dataNodes[1],
374    * the 2nd replica should be placed on a different rack,
375    * the 3rd replica should be placed on the same rack as the 2nd replica but in different nodegroup,
376    * and the rest should be placed on the third rack.
377    * @throws Exception
378    */
379   @Test
380   public void testChooseTarget3() throws Exception {
381     // make data node 0 to be not qualified to choose
382     updateHeartbeatWithUsage(dataNodes[0],
383         2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
384         (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L,
385         0L, 0L, 0, 0); // no space
386 
387     DatanodeStorageInfo[] targets;
388     targets = chooseTarget(0);
389     assertEquals(targets.length, 0);
390 
391     targets = chooseTarget(1);
392     assertEquals(targets.length, 1);
393     assertEquals(storages[1], targets[0]);
394 
395     targets = chooseTarget(2);
396     assertEquals(targets.length, 2);
397     assertEquals(storages[1], targets[0]);
398     assertFalse(isOnSameRack(targets[0], targets[1]));
399 
400     targets = chooseTarget(3);
401     assertEquals(targets.length, 3);
402     assertEquals(storages[1], targets[0]);
403     assertTrue(isOnSameRack(targets[1], targets[2]));
404     assertFalse(isOnSameRack(targets[0], targets[1]));
405 
406     targets = chooseTarget(4);
407     assertEquals(targets.length, 4);
408     assertEquals(storages[1], targets[0]);
409     assertTrue(cluster.isNodeGroupAware());
410     verifyNoTwoTargetsOnSameNodeGroup(targets);
411     assertTrue(isOnSameRack(targets[1], targets[2]) ||
412                isOnSameRack(targets[2], targets[3]));
413 
414     updateHeartbeatWithUsage(dataNodes[0],
415         2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
416         HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
417   }
418 
419   /**
420    * In this testcase, client is dataNodes[0], but none of the nodes on rack 1
421    * is qualified to be chosen. So the 1st replica should be placed on either
422    * rack 2 or rack 3.
423    * the 2nd replica should be placed on a different rack,
424    * the 3rd replica should be placed on the same rack as the 1st replica, but
425    * in different node group.
426    * @throws Exception
427    */
428   @Test
429   public void testChooseTarget4() throws Exception {
430     // make data node 0-2 to be not qualified to choose: not enough disk space
431     for(int i=0; i<3; i++) {
432       updateHeartbeatWithUsage(dataNodes[i],
433           2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
434           (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
435     }
436 
437     DatanodeStorageInfo[] targets;
438     targets = chooseTarget(0);
439     assertEquals(targets.length, 0);
440 
441     targets = chooseTarget(1);
442     assertEquals(targets.length, 1);
443     assertFalse(isOnSameRack(dataNodes[0], targets[0]));
444 
445     targets = chooseTarget(2);
446     assertEquals(targets.length, 2);
447     assertFalse(isOnSameRack(dataNodes[0], targets[0]));
448     assertFalse(isOnSameRack(targets[0], targets[1]));
449 
450     targets = chooseTarget(3);
451     assertEquals(targets.length, 3);
452     for(int i=0; i<3; i++) {
453       assertFalse(isOnSameRack(dataNodes[0], targets[i]));
454     }
455     verifyNoTwoTargetsOnSameNodeGroup(targets);
456     assertTrue(isOnSameRack(targets[0], targets[1]) ||
457                isOnSameRack(targets[1], targets[2]));
458     assertFalse(isOnSameRack(targets[0], targets[2]));
459   }
460 
461   /**
462    * In this testcase, client is is a node outside of file system.
463    * So the 1st replica can be placed on any node.
464    * the 2nd replica should be placed on a different rack,
465    * the 3rd replica should be placed on the same rack as the 2nd replica,
466    * @throws Exception
467    */
468   @Test
469   public void testChooseTarget5() throws Exception {
470     setupDataNodeCapacity();
471     DatanodeStorageInfo[] targets;
472     targets = chooseTarget(0, NODE);
473     assertEquals(targets.length, 0);
474 
475     targets = chooseTarget(1, NODE);
476     assertEquals(targets.length, 1);
477 
478     targets = chooseTarget(2, NODE);
479     assertEquals(targets.length, 2);
480     assertFalse(isOnSameRack(targets[0], targets[1]));
481 
482     targets = chooseTarget(3, NODE);
483     assertEquals(targets.length, 3);
484     assertTrue(isOnSameRack(targets[1], targets[2]));
485     assertFalse(isOnSameRack(targets[0], targets[1]));
486     verifyNoTwoTargetsOnSameNodeGroup(targets);
487   }
488 
489   /**
490    * This testcase tests re-replication, when dataNodes[0] is already chosen.
491    * So the 1st replica can be placed on random rack.
492    * the 2nd replica should be placed on different node and nodegroup by same rack as
493    * the 1st replica. The 3rd replica can be placed randomly.
494    * @throws Exception
495    */
496   @Test
497   public void testRereplicate1() throws Exception {
498     setupDataNodeCapacity();
499     List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
500     chosenNodes.add(storages[0]);
501     DatanodeStorageInfo[] targets;
502 
503     targets = chooseTarget(0, chosenNodes);
504     assertEquals(targets.length, 0);
505 
506     targets = chooseTarget(1, chosenNodes);
507     assertEquals(targets.length, 1);
508     assertFalse(isOnSameRack(dataNodes[0], targets[0]));
509 
510     targets = chooseTarget(2, chosenNodes);
511     assertEquals(targets.length, 2);
512     assertTrue(isOnSameRack(dataNodes[0], targets[0]));
513     assertFalse(isOnSameRack(targets[0], targets[1]));
514 
515     targets = chooseTarget(3, chosenNodes);
516     assertEquals(targets.length, 3);
517     assertTrue(isOnSameRack(dataNodes[0], targets[0]));
518     assertFalse(isOnSameNodeGroup(dataNodes[0], targets[0]));
519     assertFalse(isOnSameRack(targets[0], targets[2]));
520   }
521 
522   /**
523    * This testcase tests re-replication,
524    * when dataNodes[0] and dataNodes[1] are already chosen.
525    * So the 1st replica should be placed on a different rack of rack 1.
526    * the rest replicas can be placed randomly,
527    * @throws Exception
528    */
529   @Test
530   public void testRereplicate2() throws Exception {
531     setupDataNodeCapacity();
532     List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
533     chosenNodes.add(storages[0]);
534     chosenNodes.add(storages[1]);
535 
536     DatanodeStorageInfo[] targets;
537     targets = chooseTarget(0, chosenNodes);
538     assertEquals(targets.length, 0);
539 
540     targets = chooseTarget(1, chosenNodes);
541     assertEquals(targets.length, 1);
542     assertFalse(isOnSameRack(dataNodes[0], targets[0]));
543 
544     targets = chooseTarget(2, chosenNodes);
545     assertEquals(targets.length, 2);
546     assertFalse(isOnSameRack(dataNodes[0], targets[0]) &&
547         isOnSameRack(dataNodes[0], targets[1]));
548   }
549 
550   /**
551    * This testcase tests re-replication,
552    * when dataNodes[0] and dataNodes[3] are already chosen.
553    * So the 1st replica should be placed on the rack that the writer resides.
554    * the rest replicas can be placed randomly,
555    * @throws Exception
556    */
557   @Test
558   public void testRereplicate3() throws Exception {
559     setupDataNodeCapacity();
560     List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
561     chosenNodes.add(storages[0]);
562     chosenNodes.add(storages[3]);
563 
564     DatanodeStorageInfo[] targets;
565     targets = chooseTarget(0, chosenNodes);
566     assertEquals(targets.length, 0);
567 
568     targets = chooseTarget(1, chosenNodes);
569     assertEquals(targets.length, 1);
570     assertTrue(isOnSameRack(dataNodes[0], targets[0]));
571     assertFalse(isOnSameRack(dataNodes[3], targets[0]));
572 
573     targets = chooseTarget(1, dataNodes[3], chosenNodes);
574     assertEquals(targets.length, 1);
575     assertTrue(isOnSameRack(dataNodes[3], targets[0]));
576     assertFalse(isOnSameNodeGroup(dataNodes[3], targets[0]));
577     assertFalse(isOnSameRack(dataNodes[0], targets[0]));
578 
579     targets = chooseTarget(2, chosenNodes);
580     assertEquals(targets.length, 2);
581     assertTrue(isOnSameRack(dataNodes[0], targets[0]));
582     assertFalse(isOnSameNodeGroup(dataNodes[0], targets[0]));
583 
584     targets = chooseTarget(2, dataNodes[3], chosenNodes);
585     assertEquals(targets.length, 2);
586     assertTrue(isOnSameRack(dataNodes[3], targets[0]));
587   }
588 
589   /**
590    * Test for the chooseReplicaToDelete are processed based on
591    * block locality and free space
592    */
593   @Test
594   public void testChooseReplicaToDelete() throws Exception {
595     List<DatanodeStorageInfo> replicaList = new ArrayList<DatanodeStorageInfo>();
596     final Map<String, List<DatanodeStorageInfo>> rackMap
597         = new HashMap<String, List<DatanodeStorageInfo>>();
598     dataNodes[0].setRemaining(4*1024*1024);
599     replicaList.add(storages[0]);
600 
601     dataNodes[1].setRemaining(3*1024*1024);
602     replicaList.add(storages[1]);
603 
604     dataNodes[2].setRemaining(2*1024*1024);
605     replicaList.add(storages[2]);
606 
607     dataNodes[5].setRemaining(1*1024*1024);
608     replicaList.add(storages[5]);
609 
610     List<DatanodeStorageInfo> first = new ArrayList<DatanodeStorageInfo>();
611     List<DatanodeStorageInfo> second = new ArrayList<DatanodeStorageInfo>();
612     replicator.splitNodesWithRack(
613         replicaList, rackMap, first, second);
614     assertEquals(3, first.size());
615     assertEquals(1, second.size());
616     List<StorageType> excessTypes = new ArrayList<StorageType>();
617     excessTypes.add(StorageType.DEFAULT);
618     DatanodeStorageInfo chosen = replicator.chooseReplicaToDelete(
619         null, null, (short)3, first, second, excessTypes);
620     // Within first set {dataNodes[0], dataNodes[1], dataNodes[2]},
621     // dataNodes[0] and dataNodes[1] are in the same nodegroup,
622     // but dataNodes[1] is chosen as less free space
623     assertEquals(chosen, storages[1]);
624 
625     replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosen);
626     assertEquals(2, first.size());
627     assertEquals(1, second.size());
628     // Within first set {dataNodes[0], dataNodes[2]}, dataNodes[2] is chosen
629     // as less free space
630     excessTypes.add(StorageType.DEFAULT);
631     chosen = replicator.chooseReplicaToDelete(
632         null, null, (short)2, first, second, excessTypes);
633     assertEquals(chosen, storages[2]);
634 
635     replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosen);
636     assertEquals(0, first.size());
637     assertEquals(2, second.size());
638     // Within second set, dataNodes[5] with less free space
639     excessTypes.add(StorageType.DEFAULT);
640     chosen = replicator.chooseReplicaToDelete(
641         null, null, (short)1, first, second, excessTypes);
642     assertEquals(chosen, storages[5]);
643   }
644 
645   /**
646    * Test replica placement policy in case of boundary topology.
647    * Rack 2 has only 1 node group & can't be placed with two replicas
648    * The 1st replica will be placed on writer.
649    * The 2nd replica should be placed on a different rack
650    * The 3rd replica should be placed on the same rack with writer, but on a
651    * different node group.
652    */
653   @Test
654   public void testChooseTargetsOnBoundaryTopology() throws Exception {
655     for(int i=0; i<NUM_OF_DATANODES; i++) {
656       cluster.remove(dataNodes[i]);
657     }
658 
659     for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) {
660       cluster.add(dataNodesInBoundaryCase[i]);
661     }
662     for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) {
663       updateHeartbeatWithUsage(dataNodes[0],
664                 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
665                 (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE,
666                 0L, 0L, 0L, 0, 0);
667 
668       updateHeartbeatWithUsage(dataNodesInBoundaryCase[i],
669           2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
670           2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
671     }
672 
673     DatanodeStorageInfo[] targets;
674     targets = chooseTarget(0, dataNodesInBoundaryCase[0]);
675     assertEquals(targets.length, 0);
676 
677     targets = chooseTarget(1, dataNodesInBoundaryCase[0]);
678     assertEquals(targets.length, 1);
679 
680     targets = chooseTarget(2, dataNodesInBoundaryCase[0]);
681     assertEquals(targets.length, 2);
682     assertFalse(isOnSameRack(targets[0], targets[1]));
683 
684     targets = chooseTarget(3, dataNodesInBoundaryCase[0]);
685     assertEquals(targets.length, 3);
686     assertTrue(checkTargetsOnDifferentNodeGroup(targets));
687   }
688 
689   /**
690    * Test re-replication policy in boundary case.
691    * Rack 2 has only one node group & the node in this node group is chosen
692    * Rack 1 has two nodegroups & one of them is chosen.
693    * Replica policy should choose the node from node group of Rack1 but not the
694    * same nodegroup with chosen nodes.
695    */
696   @Test
697   public void testRereplicateOnBoundaryTopology() throws Exception {
698     for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) {
699       updateHeartbeatWithUsage(dataNodesInBoundaryCase[i],
700           2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
701           2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
702     }
703     List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
704     chosenNodes.add(storagesInBoundaryCase[0]);
705     chosenNodes.add(storagesInBoundaryCase[5]);
706     DatanodeStorageInfo[] targets;
707     targets = chooseTarget(1, dataNodesInBoundaryCase[0], chosenNodes);
708     assertFalse(isOnSameNodeGroup(dataNodesInBoundaryCase[0], targets[0]));
709     assertFalse(isOnSameNodeGroup(dataNodesInBoundaryCase[5], targets[0]));
710     assertTrue(checkTargetsOnDifferentNodeGroup(targets));
711   }
712 
713   /**
714    * Test replica placement policy in case of targets more than number of
715    * NodeGroups.
716    * The 12-nodes cluster only has 6 NodeGroups, but in some cases, like:
717    * placing submitted job file, there is requirement to choose more (10)
718    * targets for placing replica. We should test it can return 6 targets.
719    */
720   @Test
721   public void testChooseMoreTargetsThanNodeGroups() throws Exception {
722     for(int i=0; i<NUM_OF_DATANODES; i++) {
723       cluster.remove(dataNodes[i]);
724     }
725     for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) {
726       DatanodeDescriptor node = dataNodesInBoundaryCase[i];
727       if (cluster.contains(node)) {
728         cluster.remove(node);
729       }
730     }
731 
732     for(int i=0; i<NUM_OF_DATANODES_MORE_TARGETS; i++) {
733       cluster.add(dataNodesInMoreTargetsCase[i]);
734     }
735 
736     for(int i=0; i<NUM_OF_DATANODES_MORE_TARGETS; i++) {
737       updateHeartbeatWithUsage(dataNodesInMoreTargetsCase[i],
738           2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
739           2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
740     }
741 
742     DatanodeStorageInfo[] targets;
743     // Test normal case -- 3 replicas
744     targets = chooseTarget(3, dataNodesInMoreTargetsCase[0]);
745     assertEquals(targets.length, 3);
746     assertTrue(checkTargetsOnDifferentNodeGroup(targets));
747 
748     // Test special case -- replica number over node groups.
749     targets = chooseTarget(10, dataNodesInMoreTargetsCase[0]);
750     assertTrue(checkTargetsOnDifferentNodeGroup(targets));
751     // Verify it only can find 6 targets for placing replicas.
752     assertEquals(targets.length, 6);
753   }
754 
755   @Test
756   public void testChooseTargetWithDependencies() throws Exception {
757     for(int i=0; i<NUM_OF_DATANODES; i++) {
758       cluster.remove(dataNodes[i]);
759     }
760 
761     for(int i=0; i<NUM_OF_DATANODES_MORE_TARGETS; i++) {
762       DatanodeDescriptor node = dataNodesInMoreTargetsCase[i];
763       if (cluster.contains(node)) {
764         cluster.remove(node);
765       }
766     }
767 
768     Host2NodesMap host2DatanodeMap = namenode.getNamesystem()
769         .getBlockManager()
770         .getDatanodeManager().getHost2DatanodeMap();
771     for(int i=0; i<NUM_OF_DATANODES_FOR_DEPENDENCIES; i++) {
772       cluster.add(dataNodesForDependencies[i]);
773       host2DatanodeMap.add(dataNodesForDependencies[i]);
774     }
775 
776     //add dependencies (node1 <-> node2, and node3<->node4)
777     dataNodesForDependencies[1].addDependentHostName(
778         dataNodesForDependencies[2].getHostName());
779     dataNodesForDependencies[2].addDependentHostName(
780         dataNodesForDependencies[1].getHostName());
781     dataNodesForDependencies[3].addDependentHostName(
782         dataNodesForDependencies[4].getHostName());
783     dataNodesForDependencies[4].addDependentHostName(
784         dataNodesForDependencies[3].getHostName());
785 
786     //Update heartbeat
787     for(int i=0; i<NUM_OF_DATANODES_FOR_DEPENDENCIES; i++) {
788       updateHeartbeatWithUsage(dataNodesForDependencies[i],
789           2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
790           2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
791     }
792 
793     List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
794 
795     DatanodeStorageInfo[] targets;
796     Set<Node> excludedNodes = new HashSet<Node>();
797     excludedNodes.add(dataNodesForDependencies[5]);
798 
799     //try to select three targets as there are three node groups
800     targets = chooseTarget(3, dataNodesForDependencies[1], chosenNodes, excludedNodes);
801 
802     //Even there are three node groups, verify that
803     //only two targets are selected due to dependencies
804     assertEquals(targets.length, 2);
805     assertEquals(targets[0], storagesForDependencies[1]);
806     assertTrue(targets[1].equals(storagesForDependencies[3]) || targets[1].equals(storagesForDependencies[4]));
807 
808     //verify that all data nodes are in the excluded list
809     assertEquals(excludedNodes.size(), NUM_OF_DATANODES_FOR_DEPENDENCIES);
810     for(int i=0; i<NUM_OF_DATANODES_FOR_DEPENDENCIES; i++) {
811       assertTrue(excludedNodes.contains(dataNodesForDependencies[i]));
812     }
813   }
814 }
815