1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hdfs.server.balancer;
19 
20 import static org.junit.Assert.assertEquals;
21 
22 import java.io.IOException;
23 import java.net.URI;
24 import java.util.Collection;
25 import java.util.HashSet;
26 import java.util.List;
27 import java.util.Set;
28 import java.util.concurrent.TimeoutException;
29 
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
34 import org.apache.hadoop.fs.Path;
35 import org.apache.hadoop.hdfs.DFSConfigKeys;
36 import org.apache.hadoop.hdfs.DFSUtil;
37 import org.apache.hadoop.hdfs.HdfsConfiguration;
38 import org.apache.hadoop.hdfs.MiniDFSCluster;
39 import org.apache.hadoop.hdfs.MiniDFSClusterWithNodeGroup;
40 import org.apache.hadoop.hdfs.NameNodeProxies;
41 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
42 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
43 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
44 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
45 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
46 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
47 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithNodeGroup;
48 import org.apache.hadoop.net.NetworkTopology;
49 import org.apache.hadoop.net.NetworkTopologyWithNodeGroup;
50 import org.junit.Assert;
51 import org.junit.Test;
52 
53 /**
54  * This class tests if a balancer schedules tasks correctly.
55  */
56 public class TestBalancerWithNodeGroup {
57   private static final Log LOG = LogFactory.getLog(
58   "org.apache.hadoop.hdfs.TestBalancerWithNodeGroup");
59 
60   final private static long CAPACITY = 5000L;
61   final private static String RACK0 = "/rack0";
62   final private static String RACK1 = "/rack1";
63   final private static String NODEGROUP0 = "/nodegroup0";
64   final private static String NODEGROUP1 = "/nodegroup1";
65   final private static String NODEGROUP2 = "/nodegroup2";
66   final static private String fileName = "/tmp.txt";
67   final static private Path filePath = new Path(fileName);
68   MiniDFSClusterWithNodeGroup cluster;
69 
70   ClientProtocol client;
71 
72   static final long TIMEOUT = 40000L; //msec
73   static final double CAPACITY_ALLOWED_VARIANCE = 0.005;  // 0.5%
74   static final double BALANCE_ALLOWED_VARIANCE = 0.11;    // 10%+delta
75   static final int DEFAULT_BLOCK_SIZE = 100;
76 
77   static {
TestBalancer.initTestSetup()78     TestBalancer.initTestSetup();
79   }
80 
createConf()81   static Configuration createConf() {
82     Configuration conf = new HdfsConfiguration();
83     TestBalancer.initConf(conf);
84     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
85     conf.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY,
86         NetworkTopologyWithNodeGroup.class.getName());
87     conf.set(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
88         BlockPlacementPolicyWithNodeGroup.class.getName());
89     return conf;
90   }
91 
92   /**
93    * Wait until heartbeat gives expected results, within CAPACITY_ALLOWED_VARIANCE,
94    * summed over all nodes.  Times out after TIMEOUT msec.
95    * @param expectedUsedSpace
96    * @param expectedTotalSpace
97    * @throws IOException - if getStats() fails
98    * @throws TimeoutException
99    */
waitForHeartBeat(long expectedUsedSpace, long expectedTotalSpace)100   private void waitForHeartBeat(long expectedUsedSpace, long expectedTotalSpace)
101   throws IOException, TimeoutException {
102     long timeout = TIMEOUT;
103     long failtime = (timeout <= 0L) ? Long.MAX_VALUE
104              : System.currentTimeMillis() + timeout;
105 
106     while (true) {
107       long[] status = client.getStats();
108       double totalSpaceVariance = Math.abs((double)status[0] - expectedTotalSpace)
109           / expectedTotalSpace;
110       double usedSpaceVariance = Math.abs((double)status[1] - expectedUsedSpace)
111           / expectedUsedSpace;
112       if (totalSpaceVariance < CAPACITY_ALLOWED_VARIANCE
113           && usedSpaceVariance < CAPACITY_ALLOWED_VARIANCE)
114         break; //done
115 
116       if (System.currentTimeMillis() > failtime) {
117         throw new TimeoutException("Cluster failed to reached expected values of "
118             + "totalSpace (current: " + status[0]
119             + ", expected: " + expectedTotalSpace
120             + "), or usedSpace (current: " + status[1]
121             + ", expected: " + expectedUsedSpace
122             + "), in more than " + timeout + " msec.");
123       }
124       try {
125         Thread.sleep(100L);
126       } catch(InterruptedException ignored) {
127       }
128     }
129   }
130 
131   /**
132    * Wait until balanced: each datanode gives utilization within
133    * BALANCE_ALLOWED_VARIANCE of average
134    * @throws IOException
135    * @throws TimeoutException
136    */
waitForBalancer(long totalUsedSpace, long totalCapacity)137   private void waitForBalancer(long totalUsedSpace, long totalCapacity)
138   throws IOException, TimeoutException {
139     long timeout = TIMEOUT;
140     long failtime = (timeout <= 0L) ? Long.MAX_VALUE
141         : System.currentTimeMillis() + timeout;
142     final double avgUtilization = ((double)totalUsedSpace) / totalCapacity;
143     boolean balanced;
144     do {
145       DatanodeInfo[] datanodeReport =
146           client.getDatanodeReport(DatanodeReportType.ALL);
147       assertEquals(datanodeReport.length, cluster.getDataNodes().size());
148       balanced = true;
149       for (DatanodeInfo datanode : datanodeReport) {
150         double nodeUtilization = ((double)datanode.getDfsUsed())
151             / datanode.getCapacity();
152         if (Math.abs(avgUtilization - nodeUtilization) >
153             BALANCE_ALLOWED_VARIANCE) {
154           balanced = false;
155           if (System.currentTimeMillis() > failtime) {
156             throw new TimeoutException(
157                 "Rebalancing expected avg utilization to become "
158                 + avgUtilization + ", but on datanode " + datanode
159                 + " it remains at " + nodeUtilization
160                 + " after more than " + TIMEOUT + " msec.");
161           }
162           try {
163             Thread.sleep(100);
164           } catch (InterruptedException ignored) {
165           }
166           break;
167         }
168       }
169     } while (!balanced);
170   }
171 
runBalancer(Configuration conf, long totalUsedSpace, long totalCapacity)172   private void runBalancer(Configuration conf,
173       long totalUsedSpace, long totalCapacity) throws Exception {
174     waitForHeartBeat(totalUsedSpace, totalCapacity);
175 
176     // start rebalancing
177     Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
178     final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf);
179     assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
180 
181     waitForHeartBeat(totalUsedSpace, totalCapacity);
182     LOG.info("Rebalancing with default factor.");
183     waitForBalancer(totalUsedSpace, totalCapacity);
184   }
185 
runBalancerCanFinish(Configuration conf, long totalUsedSpace, long totalCapacity)186   private void runBalancerCanFinish(Configuration conf,
187       long totalUsedSpace, long totalCapacity) throws Exception {
188     waitForHeartBeat(totalUsedSpace, totalCapacity);
189 
190     // start rebalancing
191     Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
192     final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf);
193     Assert.assertTrue(r == ExitStatus.SUCCESS.getExitCode() ||
194         (r == ExitStatus.NO_MOVE_PROGRESS.getExitCode()));
195     waitForHeartBeat(totalUsedSpace, totalCapacity);
196     LOG.info("Rebalancing with default factor.");
197   }
198 
getBlocksOnRack(List<LocatedBlock> blks, String rack)199   private Set<ExtendedBlock> getBlocksOnRack(List<LocatedBlock> blks, String rack) {
200     Set<ExtendedBlock> ret = new HashSet<ExtendedBlock>();
201     for (LocatedBlock blk : blks) {
202       for (DatanodeInfo di : blk.getLocations()) {
203         if (rack.equals(NetworkTopology.getFirstHalf(di.getNetworkLocation()))) {
204           ret.add(blk.getBlock());
205           break;
206         }
207       }
208     }
209     return ret;
210   }
211 
212   /**
213    * Create a cluster with even distribution, and a new empty node is added to
214    * the cluster, then test rack locality for balancer policy.
215    */
216   @Test(timeout=60000)
testBalancerWithRackLocality()217   public void testBalancerWithRackLocality() throws Exception {
218     Configuration conf = createConf();
219     long[] capacities = new long[]{CAPACITY, CAPACITY};
220     String[] racks = new String[]{RACK0, RACK1};
221     String[] nodeGroups = new String[]{NODEGROUP0, NODEGROUP1};
222 
223     int numOfDatanodes = capacities.length;
224     assertEquals(numOfDatanodes, racks.length);
225     MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf)
226                                 .numDataNodes(capacities.length)
227                                 .racks(racks)
228                                 .simulatedCapacities(capacities);
229     MiniDFSClusterWithNodeGroup.setNodeGroups(nodeGroups);
230     cluster = new MiniDFSClusterWithNodeGroup(builder);
231     try {
232       cluster.waitActive();
233       client = NameNodeProxies.createProxy(conf,
234           cluster.getFileSystem(0).getUri(),
235           ClientProtocol.class).getProxy();
236 
237       long totalCapacity = TestBalancer.sum(capacities);
238 
239       // fill up the cluster to be 30% full
240       long totalUsedSpace = totalCapacity * 3 / 10;
241       long length = totalUsedSpace / numOfDatanodes;
242       TestBalancer.createFile(cluster, filePath, length,
243           (short) numOfDatanodes, 0);
244 
245       LocatedBlocks lbs = client.getBlockLocations(filePath.toUri().getPath(), 0,
246           length);
247       Set<ExtendedBlock> before = getBlocksOnRack(lbs.getLocatedBlocks(), RACK0);
248 
249       long newCapacity = CAPACITY;
250       String newRack = RACK1;
251       String newNodeGroup = NODEGROUP2;
252       // start up an empty node with the same capacity and on the same rack
253       cluster.startDataNodes(conf, 1, true, null, new String[]{newRack},
254           new long[] {newCapacity}, new String[]{newNodeGroup});
255 
256       totalCapacity += newCapacity;
257 
258       // run balancer and validate results
259       runBalancerCanFinish(conf, totalUsedSpace, totalCapacity);
260 
261       lbs = client.getBlockLocations(filePath.toUri().getPath(), 0, length);
262       Set<ExtendedBlock> after = getBlocksOnRack(lbs.getLocatedBlocks(), RACK0);
263       assertEquals(before, after);
264 
265     } finally {
266       cluster.shutdown();
267     }
268   }
269 
270   /**
271    * Create a cluster with even distribution, and a new empty node is added to
272    * the cluster, then test node-group locality for balancer policy.
273    */
274   @Test(timeout=60000)
testBalancerWithNodeGroup()275   public void testBalancerWithNodeGroup() throws Exception {
276     Configuration conf = createConf();
277     long[] capacities = new long[]{CAPACITY, CAPACITY, CAPACITY, CAPACITY};
278     String[] racks = new String[]{RACK0, RACK0, RACK1, RACK1};
279     String[] nodeGroups = new String[]{NODEGROUP0, NODEGROUP0, NODEGROUP1, NODEGROUP2};
280 
281     int numOfDatanodes = capacities.length;
282     assertEquals(numOfDatanodes, racks.length);
283     assertEquals(numOfDatanodes, nodeGroups.length);
284     MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf)
285                                 .numDataNodes(capacities.length)
286                                 .racks(racks)
287                                 .simulatedCapacities(capacities);
288     MiniDFSClusterWithNodeGroup.setNodeGroups(nodeGroups);
289     cluster = new MiniDFSClusterWithNodeGroup(builder);
290     try {
291       cluster.waitActive();
292       client = NameNodeProxies.createProxy(conf,
293           cluster.getFileSystem(0).getUri(),
294           ClientProtocol.class).getProxy();
295 
296       long totalCapacity = TestBalancer.sum(capacities);
297       // fill up the cluster to be 20% full
298       long totalUsedSpace = totalCapacity * 2 / 10;
299       TestBalancer.createFile(cluster, filePath, totalUsedSpace / (numOfDatanodes/2),
300           (short) (numOfDatanodes/2), 0);
301 
302       long newCapacity = CAPACITY;
303       String newRack = RACK1;
304       String newNodeGroup = NODEGROUP2;
305       // start up an empty node with the same capacity and on NODEGROUP2
306       cluster.startDataNodes(conf, 1, true, null, new String[]{newRack},
307           new long[] {newCapacity}, new String[]{newNodeGroup});
308 
309       totalCapacity += newCapacity;
310 
311       // run balancer and validate results
312       runBalancer(conf, totalUsedSpace, totalCapacity);
313 
314     } finally {
315       cluster.shutdown();
316     }
317   }
318 
319   /**
320    * Create a 4 nodes cluster: 2 nodes (n0, n1) in RACK0/NODEGROUP0, 1 node (n2)
321    * in RACK1/NODEGROUP1 and 1 node (n3) in RACK1/NODEGROUP2. Fill the cluster
322    * to 60% and 3 replicas, so n2 and n3 will have replica for all blocks according
323    * to replica placement policy with NodeGroup. As a result, n2 and n3 will be
324    * filled with 80% (60% x 4 / 3), and no blocks can be migrated from n2 and n3
325    * to n0 or n1 as balancer policy with node group. Thus, we expect the balancer
326    * to end in 5 iterations without move block process.
327    */
328   @Test(timeout=60000)
testBalancerEndInNoMoveProgress()329   public void testBalancerEndInNoMoveProgress() throws Exception {
330     Configuration conf = createConf();
331     long[] capacities = new long[]{CAPACITY, CAPACITY, CAPACITY, CAPACITY};
332     String[] racks = new String[]{RACK0, RACK0, RACK1, RACK1};
333     String[] nodeGroups = new String[]{NODEGROUP0, NODEGROUP0, NODEGROUP1, NODEGROUP2};
334 
335     int numOfDatanodes = capacities.length;
336     assertEquals(numOfDatanodes, racks.length);
337     assertEquals(numOfDatanodes, nodeGroups.length);
338     MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf)
339                                 .numDataNodes(capacities.length)
340                                 .racks(racks)
341                                 .simulatedCapacities(capacities);
342     MiniDFSClusterWithNodeGroup.setNodeGroups(nodeGroups);
343     cluster = new MiniDFSClusterWithNodeGroup(builder);
344     try {
345       cluster.waitActive();
346       client = NameNodeProxies.createProxy(conf,
347           cluster.getFileSystem(0).getUri(),
348           ClientProtocol.class).getProxy();
349 
350       long totalCapacity = TestBalancer.sum(capacities);
351       // fill up the cluster to be 60% full
352       long totalUsedSpace = totalCapacity * 6 / 10;
353       TestBalancer.createFile(cluster, filePath, totalUsedSpace / 3,
354           (short) (3), 0);
355 
356       // run balancer which can finish in 5 iterations with no block movement.
357       runBalancerCanFinish(conf, totalUsedSpace, totalCapacity);
358 
359     } finally {
360       cluster.shutdown();
361     }
362   }
363 }
364