1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hdfs.server.balancer;
19 
20 import static org.junit.Assert.assertEquals;
21 
22 import java.io.IOException;
23 import java.util.HashMap;
24 import java.util.Map;
25 import java.util.Random;
26 import java.util.concurrent.TimeoutException;
27 
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.fs.FileSystem;
32 import org.apache.hadoop.fs.Path;
33 import org.apache.hadoop.hdfs.DFSClient;
34 import org.apache.hadoop.hdfs.DFSTestUtil;
35 import org.apache.hadoop.hdfs.MiniDFSClusterWithNodeGroup;
36 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
37 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
38 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
39 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
40 import org.apache.hadoop.net.NetworkTopology;
41 import org.junit.Assert;
42 import org.junit.Test;
43 
44 /**
45  * This class tests if a balancer schedules tasks correctly.
46  */
47 public class TestBalancerWithNodeGroup {
48 
49   final private static long CAPACITY = 500L;
50   final private static String RACK0 = "/rack0";
51   final private static String RACK1 = "/rack1";
52   final private static String NODEGROUP0 = "/nodegroup0";
53   final private static String NODEGROUP1 = "/nodegroup1";
54   final private static String NODEGROUP2 = "/nodegroup2";
55   final static private String fileName = "/tmp.txt";
56   final static private Path filePath = new Path(fileName);
57   private static final Log LOG = LogFactory.getLog(
58       "org.apache.hadoop.hdfs.TestBalancerWithNodeGroup");
59   MiniDFSClusterWithNodeGroup cluster;
60 
61   ClientProtocol client;
62   private Balancer balancer;
63 
64   static final long TIMEOUT = 20000L; //msec
65   static final double CAPACITY_ALLOWED_VARIANCE = 0.005;  // 0.5%
66   static final double BALANCE_ALLOWED_VARIANCE = 0.11;    // 10%+delta
67   static final int DEFAULT_BLOCK_SIZE = 5;
68   private static final Random r = new Random();
69 
70   static {
71     Balancer.setBlockMoveWaitTime(1000L) ;
72   }
73 
initConf(Configuration conf)74   private void initConf(Configuration conf) {
75     conf.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
76     conf.setInt("io.bytes.per.checksum", DEFAULT_BLOCK_SIZE);
77     conf.setLong("dfs.heartbeat.interval", 1L);
78     conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
79     conf.setLong("dfs.balancer.movedWinWidth", 2000L);
80     conf.set("net.topology.impl", "org.apache.hadoop.net.NetworkTopologyWithNodeGroup");
81     conf.set("dfs.block.replicator.classname",
82         "org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyWithNodeGroup");
83   }
84 
85   /* create a file with a length of <code>fileLen</code> */
createFile(long fileLen, short replicationFactor)86   private void createFile(long fileLen, short replicationFactor)
87       throws IOException {
88     FileSystem fs = cluster.getFileSystem();
89     DFSTestUtil.createFile(fs, filePath, fileLen,
90         replicationFactor, r.nextLong());
91     DFSTestUtil.waitReplication(fs, filePath, replicationFactor);
92   }
93 
94   // create and initiate conf for balancer
createConf()95   private Configuration createConf() {
96     Configuration conf = new Configuration();
97     initConf(conf);
98     return conf;
99   }
100 
101   /**
102    * Wait until heartbeat gives expected results, within CAPACITY_ALLOWED_VARIANCE,
103    * summed over all nodes.  Times out after TIMEOUT msec.
104    * @param expectedUsedSpace
105    * @param expectedTotalSpace
106    * @throws IOException - if getStats() fails
107    * @throws TimeoutException
108    */
waitForHeartBeat(long expectedUsedSpace, long expectedTotalSpace)109   private void waitForHeartBeat(long expectedUsedSpace, long expectedTotalSpace)
110   throws IOException, TimeoutException {
111     long timeout = TIMEOUT;
112     long failtime = (timeout <= 0L) ? Long.MAX_VALUE
113              : System.currentTimeMillis() + timeout;
114 
115     while (true) {
116       long[] status = client.getStats();
117       double totalSpaceVariance = Math.abs((double)status[0] - expectedTotalSpace)
118           / expectedTotalSpace;
119       double usedSpaceVariance = Math.abs((double)status[1] - expectedUsedSpace)
120           / expectedUsedSpace;
121       if (totalSpaceVariance < CAPACITY_ALLOWED_VARIANCE
122           && usedSpaceVariance < CAPACITY_ALLOWED_VARIANCE)
123         break; //done
124 
125       if (System.currentTimeMillis() > failtime) {
126         throw new TimeoutException("Cluster failed to reached expected values of "
127             + "totalSpace (current: " + status[0]
128             + ", expected: " + expectedTotalSpace
129             + "), or usedSpace (current: " + status[1]
130             + ", expected: " + expectedUsedSpace
131             + "), in more than " + timeout + " msec.");
132       }
133       try {
134         Thread.sleep(100L);
135       } catch(InterruptedException ignored) {
136       }
137     }
138   }
139 
140   /**
141    * Wait until balanced: each datanode gives utilization within
142    * BALANCE_ALLOWED_VARIANCE of average
143    * @throws IOException
144    * @throws TimeoutException
145    */
waitForBalancer(long totalUsedSpace, long totalCapacity)146   private void waitForBalancer(long totalUsedSpace, long totalCapacity)
147       throws IOException, TimeoutException {
148     long timeout = TIMEOUT;
149     long failtime = (timeout <= 0L) ? Long.MAX_VALUE
150         : System.currentTimeMillis() + timeout;
151     final double avgUtilization = ((double)totalUsedSpace) / totalCapacity;
152     boolean balanced;
153     do {
154       DatanodeInfo[] datanodeReport =
155           client.getDatanodeReport(DatanodeReportType.ALL);
156       assertEquals(datanodeReport.length, cluster.getDataNodes().size());
157       balanced = true;
158       for (DatanodeInfo datanode : datanodeReport) {
159         double nodeUtilization = ((double)datanode.getDfsUsed())
160             / datanode.getCapacity();
161         if (Math.abs(avgUtilization - nodeUtilization) >
162             BALANCE_ALLOWED_VARIANCE) {
163           balanced = false;
164           if (System.currentTimeMillis() > failtime) {
165             throw new TimeoutException(
166                 "Rebalancing expected avg utilization to become "
167                 + avgUtilization + ", but on datanode " + datanode
168                 + " it remains at " + nodeUtilization
169                 + " after more than " + TIMEOUT + " msec.");
170           }
171           try {
172             Thread.sleep(100);
173           } catch (InterruptedException ignored) {
174           }
175           break;
176         }
177       }
178     } while (!balanced);
179   }
180 
181   /** Start balancer and check if the cluster is balanced after the run */
runBalancer(Configuration conf, long totalUsedSpace, long totalCapacity)182   private void runBalancer(Configuration conf,
183       long totalUsedSpace, long totalCapacity) throws Exception {
184     waitForHeartBeat(totalUsedSpace, totalCapacity);
185 
186     // start rebalancing
187     balancer = new Balancer(conf);
188 
189     final int r = balancer.run(new String[0]);
190 
191     assertEquals(Balancer.SUCCESS, r);
192 
193     waitForHeartBeat(totalUsedSpace, totalCapacity);
194     LOG.info("Rebalancing with default factor.");
195     waitForBalancer(totalUsedSpace, totalCapacity);
196   }
197 
runBalancerCanFinish(Configuration conf, long totalUsedSpace, long totalCapacity)198   private void runBalancerCanFinish(Configuration conf,
199       long totalUsedSpace, long totalCapacity) throws Exception {
200     waitForHeartBeat(totalUsedSpace, totalCapacity);
201 
202     balancer = new Balancer(conf);
203     final int r = balancer.run(new String[0]);
204     Assert.assertTrue(r == Balancer.SUCCESS ||
205         (r == Balancer.NO_MOVE_PROGRESS));
206     waitForHeartBeat(totalUsedSpace, totalCapacity);
207     LOG.info("Rebalancing ends successful.");
208   }
209 
210   /**
211    * Create a 4 nodes cluster: 2 nodes (n0, n1) in RACK0/NODEGROUP0, 1 node (n2)
212    * in RACK1/NODEGROUP1 and 1 node (n3) in RACK1/NODEGROUP2. Fill the cluster
213    * to 60% and 3 replicas, so n2 and n3 will have replica for all blocks according
214    * to replica placement policy with NodeGroup. As a result, n2 and n3 will be
215    * filled with 80% (60% x 4 / 3), and no blocks can be migrated from n2 and n3
216    * to n0 or n1 as balancer policy with node group. Thus, we expect the balancer
217    * to end in 5 iterations without move block process.
218    */
219   @Test(timeout=60000)
testBalancerEndInNoMoveProgress()220   public void testBalancerEndInNoMoveProgress() throws Exception {
221     Configuration conf = createConf();
222     long[] capacities = new long[]{CAPACITY, CAPACITY, CAPACITY, CAPACITY};
223     String[] racks = new String[]{RACK0, RACK0, RACK1, RACK1};
224     String[] nodeGroups = new String[]{NODEGROUP0, NODEGROUP0, NODEGROUP1, NODEGROUP2};
225 
226     int numOfDatanodes = capacities.length;
227     assertEquals(numOfDatanodes, racks.length);
228     assertEquals(numOfDatanodes, nodeGroups.length);
229     MiniDFSClusterWithNodeGroup.setNodeGroups(nodeGroups);
230     cluster = new MiniDFSClusterWithNodeGroup(0, conf, capacities.length,
231         true, true, null, racks, capacities);
232     try {
233       cluster.waitActive();
234       client = DFSClient.createNamenode(conf);
235 
236       long totalCapacity = 0L;
237       for(long capacity : capacities) {
238         totalCapacity += capacity;
239       }
240 
241       // fill up the cluster to be 60% full
242       long totalUsedSpace = totalCapacity * 6 / 10;
243 
244       createFile(totalUsedSpace / 3, (short) 3);
245 
246       // run balancer which can finish in 5 iterations with no block movement.
247       runBalancerCanFinish(conf, totalUsedSpace, totalCapacity);
248 
249     } finally {
250       cluster.shutdown();
251     }
252   }
253 
254   /**
255    * Create a cluster with even distribution, and a new empty node is added to
256    * the cluster, then test rack locality for balancer policy.
257    */
258   @Test(timeout=60000)
testBalancerWithRackLocality()259   public void testBalancerWithRackLocality() throws Exception {
260     Configuration conf = createConf();
261     long[] capacities = new long[]{CAPACITY, CAPACITY};
262     String[] racks = new String[]{RACK0, RACK1};
263     String[] nodeGroups = new String[]{NODEGROUP0, NODEGROUP1};
264 
265     int numOfDatanodes = capacities.length;
266     assertEquals(numOfDatanodes, racks.length);
267     MiniDFSClusterWithNodeGroup.setNodeGroups(nodeGroups);
268 
269     MiniDFSClusterWithNodeGroup.setNodeGroups(nodeGroups);
270     cluster = new MiniDFSClusterWithNodeGroup(0, conf, capacities.length,
271         true, true, null, racks, capacities);
272     try {
273       cluster.waitActive();
274 
275       client = DFSClient.createNamenode(conf);
276 
277       long totalCapacity = 0L;
278       for(long capacity : capacities) {
279         totalCapacity += capacity;
280       }
281 
282       // fill up the cluster to be 30% full
283       long totalUsedSpace = totalCapacity * 3 / 10;
284       createFile(totalUsedSpace / numOfDatanodes, (short) numOfDatanodes);
285 
286       long newCapacity = CAPACITY;
287       String newRack = RACK1;
288       String newNodeGroup = NODEGROUP2;
289 
290       // start up an empty node with the same capacity and on the same rack
291       cluster.startDataNodes(conf, 1, true, null, new String[]{newRack},
292           new String[]{newNodeGroup}, new long[] {newCapacity});
293 
294       totalCapacity += newCapacity;
295 
296       // run balancer and validate results
297       runBalancerCanFinish(conf, totalUsedSpace, totalCapacity);
298 
299       DatanodeInfo[] datanodeReport =
300               client.getDatanodeReport(DatanodeReportType.ALL);
301 
302       Map<String, Integer> rackToUsedCapacity = new HashMap<String, Integer>();
303       for (DatanodeInfo datanode: datanodeReport) {
304         String rack = NetworkTopology.getFirstHalf(datanode.getNetworkLocation());
305         int usedCapacity = (int) datanode.getDfsUsed();
306 
307         if (rackToUsedCapacity.get(rack) != null) {
308           rackToUsedCapacity.put(rack, usedCapacity + rackToUsedCapacity.get(rack));
309         } else {
310           rackToUsedCapacity.put(rack, usedCapacity);
311         }
312       }
313       assertEquals(rackToUsedCapacity.size(), 2);
314       assertEquals(rackToUsedCapacity.get(RACK0), rackToUsedCapacity.get(RACK1));
315 
316     } finally {
317       cluster.shutdown();
318     }
319   }
320 
321   /** Create a cluster with even distribution, and a new empty node is added to
322    *  the cluster, then test rack locality for balancer policy.
323    **/
324   @Test(timeout=60000)
testBalancerWithNodeGroup()325   public void testBalancerWithNodeGroup() throws Exception {
326     Configuration conf = createConf();
327     long[] capacities = new long[]{CAPACITY, CAPACITY};
328     String[] racks = new String[]{RACK0, RACK1};
329     String[] nodeGroups = new String[]{NODEGROUP0, NODEGROUP1};
330 
331     int numOfDatanodes = capacities.length;
332     assertEquals(numOfDatanodes, racks.length);
333     MiniDFSClusterWithNodeGroup.setNodeGroups(nodeGroups);
334     cluster = new MiniDFSClusterWithNodeGroup(0, conf, capacities.length,
335         true, true, null, racks, capacities);
336     try {
337       cluster.waitActive();
338       client = DFSClient.createNamenode(conf);
339 
340       long totalCapacity = 0L;
341       for(long capacity : capacities) {
342         totalCapacity += capacity;
343       }
344 
345       // fill up the cluster to be 30% full
346       long totalUsedSpace = totalCapacity*3/10;
347 
348       createFile(totalUsedSpace / numOfDatanodes, (short) numOfDatanodes);
349 
350       long newCapacity = CAPACITY;
351       String newRack = RACK1;
352       String newNodeGroup = NODEGROUP2;
353       // start up an empty node with the same capacity and on the same rack
354       cluster.startDataNodes(conf, 1, true, null, new String[]{newRack},
355           new String[]{newNodeGroup}, new long[] {newCapacity});
356 
357       totalCapacity += newCapacity;
358 
359       // run balancer and validate results
360       runBalancer(conf, totalUsedSpace, totalCapacity);
361 
362     } finally {
363       cluster.shutdown();
364     }
365   }
366 
367 }