1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hdfs.server.namenode;
19 
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertFalse;
22 import static org.junit.Assert.assertTrue;
23 
24 import java.io.File;
25 import java.io.IOException;
26 import java.util.ArrayList;
27 import java.util.Arrays;
28 import java.util.HashMap;
29 import java.util.HashSet;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Set;
33 
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.fs.FileSystem;
36 import org.apache.hadoop.hdfs.DFSConfigKeys;
37 import org.apache.hadoop.hdfs.DFSTestUtil;
38 import org.apache.hadoop.hdfs.DFSUtil;
39 import org.apache.hadoop.hdfs.protocol.FSConstants;
40 import org.apache.hadoop.net.NetworkTopology;
41 import org.apache.hadoop.net.Node;
42 import org.junit.Test;
43 
44 public class TestReplicationPolicyWithNodeGroup {
45   private static final int BLOCK_SIZE = 1024;
46   private static final int NUM_OF_DATANODES = 8;
47   private static final int NUM_OF_DATANODES_BOUNDARY = 6;
48   private static final int NUM_OF_DATANODES_MORE_TARGETS = 12;
49   private static final Configuration CONF = new Configuration();
50   private static final NetworkTopology cluster;
51   private static final NameNode namenode;
52   private static final BlockPlacementPolicy replicator;
53   private static final String filename = "/dummyfile.txt";
54 
55   private final static DatanodeDescriptor dataNodes[] = new DatanodeDescriptor[] {
56       DFSTestUtil.getDatanodeDescriptor("1.1.1.1", "/d1/r1/n1"),
57       DFSTestUtil.getDatanodeDescriptor("2.2.2.2", "/d1/r1/n1"),
58       DFSTestUtil.getDatanodeDescriptor("3.3.3.3", "/d1/r1/n2"),
59       DFSTestUtil.getDatanodeDescriptor("4.4.4.4", "/d1/r2/n3"),
60       DFSTestUtil.getDatanodeDescriptor("5.5.5.5", "/d1/r2/n3"),
61       DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/d1/r2/n4"),
62       DFSTestUtil.getDatanodeDescriptor("7.7.7.7", "/d2/r3/n5"),
63       DFSTestUtil.getDatanodeDescriptor("8.8.8.8", "/d2/r3/n6")
64   };
65 
66   private final static DatanodeDescriptor dataNodesInBoundaryCase[] =
67           new DatanodeDescriptor[] {
68       DFSTestUtil.getDatanodeDescriptor("1.1.1.1", "/d1/r1/n1"),
69       DFSTestUtil.getDatanodeDescriptor("2.2.2.2", "/d1/r1/n1"),
70       DFSTestUtil.getDatanodeDescriptor("3.3.3.3", "/d1/r1/n1"),
71       DFSTestUtil.getDatanodeDescriptor("4.4.4.4", "/d1/r1/n2"),
72       DFSTestUtil.getDatanodeDescriptor("5.5.5.5", "/d1/r2/n3"),
73       DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/d1/r2/n3")
74   };
75 
76   private final static DatanodeDescriptor dataNodesInMoreTargetsCase[] =
77           new DatanodeDescriptor[] {
78       DFSTestUtil.getDatanodeDescriptor("1.1.1.1", "/r1/n1"),
79       DFSTestUtil.getDatanodeDescriptor("2.2.2.2", "/r1/n1"),
80       DFSTestUtil.getDatanodeDescriptor("3.3.3.3", "/r1/n2"),
81       DFSTestUtil.getDatanodeDescriptor("4.4.4.4", "/r1/n2"),
82       DFSTestUtil.getDatanodeDescriptor("5.5.5.5", "/r1/n3"),
83       DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/r1/n3"),
84       DFSTestUtil.getDatanodeDescriptor("7.7.7.7", "/r2/n4"),
85       DFSTestUtil.getDatanodeDescriptor("8.8.8.8", "/r2/n4"),
86       DFSTestUtil.getDatanodeDescriptor("9.9.9.9", "/r2/n5"),
87       DFSTestUtil.getDatanodeDescriptor("10.10.10.10", "/r2/n5"),
88       DFSTestUtil.getDatanodeDescriptor("11.11.11.11", "/r2/n6"),
89       DFSTestUtil.getDatanodeDescriptor("12.12.12.12", "/r2/n6"),
90   };
91 
92   private final static DatanodeDescriptor NODE =
93       new DatanodeDescriptor(DFSTestUtil.getDatanodeDescriptor("9.9.9.9", "/d2/r4/n7"));
94 
95   static {
96     try {
FileSystem.setDefaultUri(CONF, R)97       FileSystem.setDefaultUri(CONF, "hdfs://localhost:0");
CONF.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, R)98       CONF.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
99       // Set properties to make HDFS aware of NodeGroup.
100       CONF.set("dfs.block.replicator.classname",
101           "org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyWithNodeGroup");
102       CONF.set("net.topology.impl",
103           "org.apache.hadoop.net.NetworkTopologyWithNodeGroup");
104 
105       File baseDir = new File(System.getProperty(
106            "test.build.data", "build/test/data"), "dfs/");
CONF.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, new File(baseDir, R).getPath())107       CONF.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
108            new File(baseDir, "name").getPath());
109 
110       NameNode.format(CONF);
111       namenode = new NameNode(CONF);
112     } catch (IOException e) {
113       e.printStackTrace();
114       throw (RuntimeException)new RuntimeException().initCause(e);
115     }
116     FSNamesystem fsNamesystem = FSNamesystem.getFSNamesystem();
117     replicator = fsNamesystem.replicator;
118     cluster = fsNamesystem.clusterMap;
119     // construct network topology
120     for(int i=0; i<NUM_OF_DATANODES; i++) {
121       cluster.add(dataNodes[i]);
122     }
setupDataNodeCapacity()123     setupDataNodeCapacity();
124   }
125 
setupDataNodeCapacity()126   private static void setupDataNodeCapacity() {
127     for(int i=0; i<NUM_OF_DATANODES; i++) {
128       dataNodes[i].updateHeartbeat(
129           2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
130           2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0);
131     }
132   }
133 
134   /**
135    * Scan the targets list: all targets should be on different NodeGroups.
136    * Return false if two targets are found on the same NodeGroup.
137    */
checkTargetsOnDifferentNodeGroup( DatanodeDescriptor[] targets)138   private static boolean checkTargetsOnDifferentNodeGroup(
139       DatanodeDescriptor[] targets) {
140     if(targets.length == 0)
141       return true;
142     Set<String> targetSet = new HashSet<String>();
143     for(DatanodeDescriptor node:targets) {
144       String nodeGroup = NetworkTopology.getLastHalf(node.getNetworkLocation());
145       if(targetSet.contains(nodeGroup)) {
146         return false;
147       } else {
148         targetSet.add(nodeGroup);
149       }
150     }
151     return true;
152   }
153 
154   /**
155    * In this testcase, client is dataNodes[0]. So the 1st replica should be
156    * placed on dataNodes[0], the 2nd replica should be placed on
157    * different rack and third should be placed on different node (and node group)
158    * of rack chosen for 2nd node.
159    * The only excpetion is when the <i>numOfReplicas</i> is 2,
160    * the 1st is on dataNodes[0] and the 2nd is on a different rack.
161    * @throws Exception
162    */
163   @Test
testChooseTarget1()164   public void testChooseTarget1() throws Exception {
165     dataNodes[0].updateHeartbeat(
166         2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
167         FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 4); // overloaded
168 
169     DatanodeDescriptor[] targets;
170     targets = replicator.chooseTarget(filename, 0, dataNodes[0],
171         new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
172     assertEquals(targets.length, 0);
173 
174     targets = replicator.chooseTarget(filename, 1, dataNodes[0],
175         new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
176     assertEquals(targets.length, 1);
177     assertEquals(targets[0], dataNodes[0]);
178 
179     targets = replicator.chooseTarget(filename, 2, dataNodes[0],
180         new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
181     assertEquals(targets.length, 2);
182     assertEquals(targets[0], dataNodes[0]);
183     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
184 
185     targets = replicator.chooseTarget(filename, 3, dataNodes[0],
186         new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
187     assertEquals(targets.length, 3);
188     assertEquals(targets[0], dataNodes[0]);
189     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
190     assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
191     assertFalse(cluster.isOnSameNodeGroup(targets[1], targets[2]));
192 
193     targets = replicator.chooseTarget(filename, 4, dataNodes[0],
194         new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
195     assertEquals(targets.length, 4);
196     assertEquals(targets[0], dataNodes[0]);
197     assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
198                cluster.isOnSameRack(targets[2], targets[3]));
199     assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
200     // Make sure no more than one replicas are on the same nodegroup
201     verifyNoTwoTargetsOnSameNodeGroup(targets);
202 
203     dataNodes[0].updateHeartbeat(
204         2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
205         FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0);
206   }
207 
verifyNoTwoTargetsOnSameNodeGroup(DatanodeDescriptor[] targets)208   private void verifyNoTwoTargetsOnSameNodeGroup(DatanodeDescriptor[] targets) {
209     Set<String> nodeGroupSet = new HashSet<String>();
210     for (DatanodeDescriptor target: targets) {
211       nodeGroupSet.add(target.getNetworkLocation());
212     }
213     assertEquals(nodeGroupSet.size(), targets.length);
214   }
215 
216   /**
217    * In this testcase, client is dataNodes[0], but the dataNodes[1] is
218    * not allowed to be chosen. So the 1st replica should be
219    * placed on dataNodes[0], the 2nd replica should be placed on a different
220    * rack, the 3rd should be on same rack as the 2nd replica but in different
221    * node group, and the rest should be placed on a third rack.
222    * @throws Exception
223    */
224   @Test
testChooseTarget2()225   public void testChooseTarget2() throws Exception {
226     HashMap<Node, Node> excludedNodes;
227     DatanodeDescriptor[] targets;
228     BlockPlacementPolicyDefault repl = (BlockPlacementPolicyDefault)replicator;
229     List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
230 
231     excludedNodes = new HashMap<Node, Node>();
232     excludedNodes.put(dataNodes[1], dataNodes[1]);
233     targets = repl.chooseTarget(4, dataNodes[0], chosenNodes,
234         excludedNodes, BLOCK_SIZE);
235     assertEquals(targets.length, 4);
236     assertEquals(targets[0], dataNodes[0]);
237     assertTrue(cluster.isNodeGroupAware());
238     // Make sure no replicas are on the same nodegroup
239     for (int i=1;i<4;i++) {
240       assertFalse(cluster.isOnSameNodeGroup(targets[0], targets[i]));
241     }
242     assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
243                cluster.isOnSameRack(targets[2], targets[3]));
244     assertFalse(cluster.isOnSameRack(targets[1], targets[3]));
245   }
246 
247   /**
248    * In this testcase, client is dataNodes[0], but dataNodes[0] is not qualified
249    * to be chosen. So the 1st replica should be placed on dataNodes[1],
250    * the 2nd replica should be placed on a different rack,
251    * the 3rd replica should be placed on the same rack as the 2nd replica but in different nodegroup,
252    * and the rest should be placed on the third rack.
253    * @throws Exception
254    */
255   @Test
testChooseTarget3()256   public void testChooseTarget3() throws Exception {
257     // make data node 0 to be not qualified to choose
258     dataNodes[0].updateHeartbeat(
259         2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
260         (FSConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0); // no space
261 
262     DatanodeDescriptor[] targets;
263     targets = replicator.chooseTarget(filename, 0, dataNodes[0],
264         new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
265     assertEquals(targets.length, 0);
266 
267     targets = replicator.chooseTarget(filename, 1, dataNodes[0],
268         new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
269     assertEquals(targets.length, 1);
270     assertEquals(targets[0], dataNodes[1]);
271 
272     targets = replicator.chooseTarget(filename, 2, dataNodes[0],
273         new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
274     assertEquals(targets.length, 2);
275     assertEquals(targets[0], dataNodes[1]);
276     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
277 
278     targets = replicator.chooseTarget(filename, 3, dataNodes[0],
279         new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
280     assertEquals(targets.length, 3);
281     assertEquals(targets[0], dataNodes[1]);
282     assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
283     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
284 
285     targets = replicator.chooseTarget(filename, 4, dataNodes[0],
286         new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
287     assertEquals(targets.length, 4);
288     assertEquals(targets[0], dataNodes[1]);
289     assertTrue(cluster.isNodeGroupAware());
290     verifyNoTwoTargetsOnSameNodeGroup(targets);
291     assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
292                cluster.isOnSameRack(targets[2], targets[3]));
293 
294     dataNodes[0].updateHeartbeat(
295         2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
296         FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0);
297   }
298 
299   /**
300    * In this testcase, client is dataNodes[0], but none of the nodes on rack 1
301    * is qualified to be chosen. So the 1st replica should be placed on either
302    * rack 2 or rack 3.
303    * the 2nd replica should be placed on a different rack,
304    * the 3rd replica should be placed on the same rack as the 1st replica, but
305    * in different node group.
306    * @throws Exception
307    */
308   @Test
testChooseTarget4()309   public void testChooseTarget4() throws Exception {
310     // make data node 0-2 to be not qualified to choose: not enough disk space
311     for(int i=0; i<3; i++) {
312       dataNodes[i].updateHeartbeat(
313           2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
314           (FSConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0);
315     }
316 
317     DatanodeDescriptor[] targets;
318     targets = replicator.chooseTarget(filename, 0, dataNodes[0],
319         new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
320     assertEquals(targets.length, 0);
321 
322     targets = replicator.chooseTarget(filename, 1, dataNodes[0],
323         new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
324     assertEquals(targets.length, 1);
325     assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
326 
327     targets = replicator.chooseTarget(filename, 2, dataNodes[0],
328         new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
329     assertEquals(targets.length, 2);
330     assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
331     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
332 
333     targets = replicator.chooseTarget(filename, 3, dataNodes[0],
334         new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
335     assertEquals(targets.length, 3);
336     for(int i=0; i<3; i++) {
337       assertFalse(cluster.isOnSameRack(targets[i], dataNodes[0]));
338     }
339     verifyNoTwoTargetsOnSameNodeGroup(targets);
340     assertTrue(cluster.isOnSameRack(targets[0], targets[1]) ||
341                cluster.isOnSameRack(targets[1], targets[2]));
342     assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
343   }
344 
345   /**
346    * In this testcase, client is is a node outside of file system.
347    * So the 1st replica can be placed on any node.
348    * the 2nd replica should be placed on a different rack,
349    * the 3rd replica should be placed on the same rack as the 2nd replica,
350    * @throws Exception
351    */
352   @Test
testChooseTarget5()353   public void testChooseTarget5() throws Exception {
354     setupDataNodeCapacity();
355     DatanodeDescriptor[] targets;
356     targets = replicator.chooseTarget(filename, 0, NODE,
357         new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
358     assertEquals(targets.length, 0);
359 
360     targets = replicator.chooseTarget(filename, 1, NODE,
361         new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
362     assertEquals(targets.length, 1);
363 
364     targets = replicator.chooseTarget(filename, 2, NODE,
365         new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
366     assertEquals(targets.length, 2);
367     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
368 
369     targets = replicator.chooseTarget(filename, 3, NODE,
370         new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
371     assertEquals(targets.length, 3);
372     assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
373     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
374     verifyNoTwoTargetsOnSameNodeGroup(targets);
375   }
376 
377   /**
378    * This testcase tests re-replication, when dataNodes[0] is already chosen.
379    * So the 1st replica can be placed on random rack.
380    * the 2nd replica should be placed on different node and nodegroup by same rack as
381    * the 1st replica. The 3rd replica can be placed randomly.
382    * @throws Exception
383    */
384   @Test
testRereplicate1()385   public void testRereplicate1() throws Exception {
386     setupDataNodeCapacity();
387     List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
388     chosenNodes.add(dataNodes[0]);
389     DatanodeDescriptor[] targets;
390 
391     targets = replicator.chooseTarget(filename,
392                                       0, dataNodes[0], chosenNodes, BLOCK_SIZE);
393     assertEquals(targets.length, 0);
394 
395     targets = replicator.chooseTarget(filename,
396                                       1, dataNodes[0], chosenNodes, BLOCK_SIZE);
397     assertEquals(targets.length, 1);
398     assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
399 
400     targets = replicator.chooseTarget(filename,
401                                       2, dataNodes[0], chosenNodes, BLOCK_SIZE);
402     assertEquals(targets.length, 2);
403     assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
404     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
405 
406     targets = replicator.chooseTarget(filename,
407                                       3, dataNodes[0], chosenNodes, BLOCK_SIZE);
408     assertEquals(targets.length, 3);
409     assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
410     assertFalse(cluster.isOnSameNodeGroup(dataNodes[0], targets[0]));
411     assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
412   }
413 
414   /**
415    * This testcase tests re-replication,
416    * when dataNodes[0] and dataNodes[1] are already chosen.
417    * So the 1st replica should be placed on a different rack of rack 1.
418    * the rest replicas can be placed randomly,
419    * @throws Exception
420    */
421   @Test
testRereplicate2()422   public void testRereplicate2() throws Exception {
423     setupDataNodeCapacity();
424     List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
425     chosenNodes.add(dataNodes[0]);
426     chosenNodes.add(dataNodes[1]);
427 
428     DatanodeDescriptor[] targets;
429     targets = replicator.chooseTarget(filename,
430                                       0, dataNodes[0], chosenNodes, BLOCK_SIZE);
431     assertEquals(targets.length, 0);
432 
433     targets = replicator.chooseTarget(filename,
434                                       1, dataNodes[0], chosenNodes, BLOCK_SIZE);
435     assertEquals(targets.length, 1);
436     assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
437 
438     targets = replicator.chooseTarget(filename,
439                                       2, dataNodes[0], chosenNodes, BLOCK_SIZE);
440     assertEquals(targets.length, 2);
441     assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]) &&
442         cluster.isOnSameRack(dataNodes[0], targets[1]));
443   }
444 
445   /**
446    * This testcase tests re-replication,
447    * when dataNodes[0] and dataNodes[3] are already chosen.
448    * So the 1st replica should be placed on the rack that the writer resides.
449    * the rest replicas can be placed randomly,
450    * @throws Exception
451    */
452   @Test
testRereplicate3()453   public void testRereplicate3() throws Exception {
454     setupDataNodeCapacity();
455     List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
456     chosenNodes.add(dataNodes[0]);
457     chosenNodes.add(dataNodes[3]);
458 
459     DatanodeDescriptor[] targets;
460     targets = replicator.chooseTarget(filename,
461                                       0, dataNodes[0], chosenNodes, BLOCK_SIZE);
462     assertEquals(targets.length, 0);
463 
464     targets = replicator.chooseTarget(filename,
465                                       1, dataNodes[0], chosenNodes, BLOCK_SIZE);
466     assertEquals(targets.length, 1);
467     assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
468     assertFalse(cluster.isOnSameRack(dataNodes[3], targets[0]));
469 
470     targets = replicator.chooseTarget(filename,
471                                1, dataNodes[3], chosenNodes, BLOCK_SIZE);
472     assertEquals(targets.length, 1);
473     assertTrue(cluster.isOnSameRack(dataNodes[3], targets[0]));
474     assertFalse(cluster.isOnSameNodeGroup(dataNodes[3], targets[0]));
475     assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
476 
477     targets = replicator.chooseTarget(filename,
478                                       2, dataNodes[0], chosenNodes, BLOCK_SIZE);
479     assertEquals(targets.length, 2);
480     assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
481     assertFalse(cluster.isOnSameNodeGroup(dataNodes[0], targets[0]));
482     targets = replicator.chooseTarget(filename,
483                                2, dataNodes[3], chosenNodes, BLOCK_SIZE);
484     assertEquals(targets.length, 2);
485     assertTrue(cluster.isOnSameRack(dataNodes[3], targets[0]));
486   }
487 
488   /**
489    * Test for the chooseReplicaToDelete are processed based on
490    * block locality and free space
491    */
492   @Test
testChooseReplicaToDelete()493   public void testChooseReplicaToDelete() throws Exception {
494     List<DatanodeDescriptor> replicaNodeList =
495         new ArrayList<DatanodeDescriptor>();
496     final Map<String, List<DatanodeDescriptor>> rackMap =
497         new HashMap<String, List<DatanodeDescriptor>>();
498     dataNodes[0].setRemaining(4*1024*1024);
499     replicaNodeList.add(dataNodes[0]);
500 
501     dataNodes[1].setRemaining(3*1024*1024);
502     replicaNodeList.add(dataNodes[1]);
503 
504     dataNodes[2].setRemaining(2*1024*1024);
505     replicaNodeList.add(dataNodes[2]);
506 
507     dataNodes[5].setRemaining(1*1024*1024);
508     replicaNodeList.add(dataNodes[5]);
509 
510     List<DatanodeDescriptor> first = new ArrayList<DatanodeDescriptor>();
511     List<DatanodeDescriptor> second = new ArrayList<DatanodeDescriptor>();
512     replicator.splitNodesWithRack(
513         replicaNodeList, rackMap, first, second);
514     assertEquals(3, first.size());
515     assertEquals(1, second.size());
516     DatanodeDescriptor chosenNode = replicator.chooseReplicaToDelete(
517         null, null, (short)3, first, second);
518     // Within first set {dataNodes[0], dataNodes[1], dataNodes[2]},
519     // dataNodes[0] and dataNodes[1] are in the same nodegroup,
520     // but dataNodes[1] is chosen as less free space
521     assertEquals(chosenNode, dataNodes[1]);
522 
523     replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosenNode);
524     assertEquals(2, first.size());
525     assertEquals(1, second.size());
526     // Within first set {dataNodes[0], dataNodes[2]}, dataNodes[2] is chosen
527     // as less free space
528     chosenNode = replicator.chooseReplicaToDelete(
529         null, null, (short)2, first, second);
530     assertEquals(chosenNode, dataNodes[2]);
531 
532     replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosenNode);
533     assertEquals(0, first.size());
534     assertEquals(2, second.size());
535     // Within second set, dataNodes[5] with less free space
536     chosenNode = replicator.chooseReplicaToDelete(
537         null, null, (short)1, first, second);
538     assertEquals(chosenNode, dataNodes[5]);
539   }
540 
541   /**
542    * Test replica placement policy in case of boundary topology.
543    * Rack 2 has only 1 node group & can't be placed with two replicas
544    * The 1st replica will be placed on writer.
545    * The 2nd replica should be placed on a different rack
546    * The 3rd replica should be placed on the same rack with writer, but on a
547    * different node group.
548    */
549   @Test
testChooseTargetsOnBoundaryTopology()550   public void testChooseTargetsOnBoundaryTopology() throws Exception {
551     for(int i=0; i<NUM_OF_DATANODES; i++) {
552       cluster.remove(dataNodes[i]);
553     }
554 
555     for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) {
556       cluster.add(dataNodesInBoundaryCase[i]);
557     }
558     for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) {
559       dataNodes[0].updateHeartbeat(
560           2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
561           (FSConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0);
562 
563       dataNodesInBoundaryCase[i].updateHeartbeat(
564           2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
565           2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0);
566     }
567 
568     DatanodeDescriptor[] targets;
569     targets = replicator.chooseTarget(filename, 0, dataNodesInBoundaryCase[0],
570         new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
571     assertEquals(targets.length, 0);
572 
573     targets = replicator.chooseTarget(filename, 1, dataNodesInBoundaryCase[0],
574         new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
575     assertEquals(targets.length, 1);
576 
577     targets = replicator.chooseTarget(filename, 2, dataNodesInBoundaryCase[0],
578         new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
579     assertEquals(targets.length, 2);
580     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
581 
582     targets = replicator.chooseTarget(filename, 3, dataNodesInBoundaryCase[0],
583         new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
584     assertEquals(targets.length, 3);
585     assertTrue(checkTargetsOnDifferentNodeGroup(targets));
586   }
587 
588   /**
589    * Test re-replication policy in boundary case.
590    * Rack 2 has only one node group & the node in this node group is chosen
591    * Rack 1 has two nodegroups & one of them is chosen.
592    * Replica policy should choose the node from node group of Rack1 but not the
593    * same nodegroup with chosen nodes.
594    */
595   @Test
testRereplicateOnBoundaryTopology()596   public void testRereplicateOnBoundaryTopology() throws Exception {
597     for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) {
598       dataNodesInBoundaryCase[i].updateHeartbeat(
599           2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
600           2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0);
601     }
602     List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
603     chosenNodes.add(dataNodesInBoundaryCase[0]);
604     chosenNodes.add(dataNodesInBoundaryCase[5]);
605     DatanodeDescriptor[] targets;
606     targets = replicator.chooseTarget(filename, 1, dataNodesInBoundaryCase[0],
607         chosenNodes, BLOCK_SIZE);
608     assertFalse(cluster.isOnSameNodeGroup(targets[0],
609         dataNodesInBoundaryCase[0]));
610     assertFalse(cluster.isOnSameNodeGroup(targets[0],
611         dataNodesInBoundaryCase[5]));
612     assertTrue(checkTargetsOnDifferentNodeGroup(targets));
613   }
614 
615   /**
616    * Test replica placement policy in case of targets more than number of
617    * NodeGroups.
618    * The 12-nodes cluster only has 6 NodeGroups, but in some cases, like:
619    * placing submitted job file, there is requirement to choose more (10)
620    * targets for placing replica. We should test it can return 6 targets.
621    */
622   @Test
testChooseMoreTargetsThanNodeGroups()623   public void testChooseMoreTargetsThanNodeGroups() throws Exception {
624     // Cleanup nodes in previous tests
625     for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) {
626       DatanodeDescriptor node = dataNodesInBoundaryCase[i];
627       if (cluster.contains(node)) {
628         cluster.remove(node);
629       }
630     }
631 
632     for(int i=0; i<NUM_OF_DATANODES_MORE_TARGETS; i++) {
633       cluster.add(dataNodesInMoreTargetsCase[i]);
634     }
635 
636     for(int i=0; i<NUM_OF_DATANODES_MORE_TARGETS; i++) {
637       dataNodesInMoreTargetsCase[i].updateHeartbeat(
638           2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
639           2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0);
640     }
641 
642     DatanodeDescriptor[] targets;
643     // Test normal case -- 3 replicas
644     targets = replicator.chooseTarget(filename, 3, dataNodesInMoreTargetsCase[0],
645         new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
646     assertEquals(targets.length, 3);
647     assertTrue(checkTargetsOnDifferentNodeGroup(targets));
648 
649     // Test special case -- replica number over node groups.
650     targets = replicator.chooseTarget(filename, 10, dataNodesInMoreTargetsCase[0],
651         new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
652     assertTrue(checkTargetsOnDifferentNodeGroup(targets));
653     // Verify it only can find 6 targets for placing replicas.
654     assertEquals(targets.length, 6);
655   }
656 
657 }
658