1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hbase.master.balancer; 19 20 import java.util.ArrayDeque; 21 import java.util.Arrays; 22 import java.util.Collection; 23 import java.util.Deque; 24 import java.util.HashMap; 25 import java.util.LinkedList; 26 import java.util.List; 27 import java.util.Map; 28 import java.util.Map.Entry; 29 import java.util.Random; 30 31 import org.apache.commons.logging.Log; 32 import org.apache.commons.logging.LogFactory; 33 import org.apache.hadoop.hbase.classification.InterfaceAudience; 34 import org.apache.hadoop.conf.Configuration; 35 import org.apache.hadoop.hbase.ClusterStatus; 36 import org.apache.hadoop.hbase.HBaseInterfaceAudience; 37 import org.apache.hadoop.hbase.HRegionInfo; 38 import org.apache.hadoop.hbase.RegionLoad; 39 import org.apache.hadoop.hbase.ServerLoad; 40 import org.apache.hadoop.hbase.ServerName; 41 import org.apache.hadoop.hbase.master.MasterServices; 42 import org.apache.hadoop.hbase.master.RegionPlan; 43 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action; 44 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type; 45 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.AssignRegionAction; 46 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction; 47 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction; 48 import org.apache.hadoop.hbase.util.Bytes; 49 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 50 51 /** 52 * <p>This is a best effort load balancer. Given a Cost function F(C) => x It will 53 * randomly try and mutate the cluster to Cprime. If F(Cprime) < F(C) then the 54 * new cluster state becomes the plan. It includes costs functions to compute the cost of:</p> 55 * <ul> 56 * <li>Region Load</li> 57 * <li>Table Load</li> 58 * <li>Data Locality</li> 59 * <li>Memstore Sizes</li> 60 * <li>Storefile Sizes</li> 61 * </ul> 62 * 63 * 64 * <p>Every cost function returns a number between 0 and 1 inclusive; where 0 is the lowest cost 65 * best solution, and 1 is the highest possible cost and the worst solution. The computed costs are 66 * scaled by their respective multipliers:</p> 67 * 68 * <ul> 69 * <li>hbase.master.balancer.stochastic.regionLoadCost</li> 70 * <li>hbase.master.balancer.stochastic.moveCost</li> 71 * <li>hbase.master.balancer.stochastic.tableLoadCost</li> 72 * <li>hbase.master.balancer.stochastic.localityCost</li> 73 * <li>hbase.master.balancer.stochastic.memstoreSizeCost</li> 74 * <li>hbase.master.balancer.stochastic.storefileSizeCost</li> 75 * </ul> 76 * 77 * <p>In addition to the above configurations, the balancer can be tuned by the following 78 * configuration values:</p> 79 * <ul> 80 * <li>hbase.master.balancer.stochastic.maxMoveRegions which 81 * controls what the max number of regions that can be moved in a single invocation of this 82 * balancer.</li> 83 * <li>hbase.master.balancer.stochastic.stepsPerRegion is the coefficient by which the number of 84 * regions is multiplied to try and get the number of times the balancer will 85 * mutate all servers.</li> 86 * <li>hbase.master.balancer.stochastic.maxSteps which controls the maximum number of times that 87 * the balancer will try and mutate all the servers. The balancer will use the minimum of this 88 * value and the above computation.</li> 89 * </ul> 90 * 91 * <p>This balancer is best used with hbase.master.loadbalance.bytable set to false 92 * so that the balancer gets the full picture of all loads on the cluster.</p> 93 */ 94 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 95 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC", 96 justification="Complaint is about costFunctions not being synchronized; not end of the world") 97 public class StochasticLoadBalancer extends BaseLoadBalancer { 98 99 protected static final String STEPS_PER_REGION_KEY = 100 "hbase.master.balancer.stochastic.stepsPerRegion"; 101 protected static final String MAX_STEPS_KEY = 102 "hbase.master.balancer.stochastic.maxSteps"; 103 protected static final String MAX_RUNNING_TIME_KEY = 104 "hbase.master.balancer.stochastic.maxRunningTime"; 105 protected static final String KEEP_REGION_LOADS = 106 "hbase.master.balancer.stochastic.numRegionLoadsToRemember"; 107 108 private static final Random RANDOM = new Random(System.currentTimeMillis()); 109 private static final Log LOG = LogFactory.getLog(StochasticLoadBalancer.class); 110 111 Map<String, Deque<RegionLoad>> loads = new HashMap<String, Deque<RegionLoad>>(); 112 113 // values are defaults 114 private int maxSteps = 1000000; 115 private int stepsPerRegion = 800; 116 private long maxRunningTime = 30 * 1000 * 1; // 30 seconds. 117 private int numRegionLoadsToRemember = 15; 118 119 private CandidateGenerator[] candidateGenerators; 120 private CostFromRegionLoadFunction[] regionLoadFunctions; 121 122 private CostFunction[] costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC 123 124 // Keep locality based picker and cost function to alert them 125 // when new services are offered 126 private LocalityBasedCandidateGenerator localityCandidateGenerator; 127 private LocalityCostFunction localityCost; 128 private RegionReplicaHostCostFunction regionReplicaHostCostFunction; 129 private RegionReplicaRackCostFunction regionReplicaRackCostFunction; 130 131 @Override onConfigurationChange(Configuration conf)132 public void onConfigurationChange(Configuration conf) { 133 setConf(conf); 134 } 135 136 @Override setConf(Configuration conf)137 public synchronized void setConf(Configuration conf) { 138 super.setConf(conf); 139 LOG.info("loading config"); 140 141 maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps); 142 143 stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion); 144 maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, maxRunningTime); 145 146 numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember); 147 148 if (localityCandidateGenerator == null) { 149 localityCandidateGenerator = new LocalityBasedCandidateGenerator(services); 150 } 151 localityCost = new LocalityCostFunction(conf, services); 152 153 if (candidateGenerators == null) { 154 candidateGenerators = new CandidateGenerator[] { 155 new RandomCandidateGenerator(), 156 new LoadCandidateGenerator(), 157 localityCandidateGenerator, 158 new RegionReplicaRackCandidateGenerator(), 159 }; 160 } 161 162 regionLoadFunctions = new CostFromRegionLoadFunction[] { 163 new ReadRequestCostFunction(conf), 164 new WriteRequestCostFunction(conf), 165 new MemstoreSizeCostFunction(conf), 166 new StoreFileCostFunction(conf) 167 }; 168 169 regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf); 170 regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf); 171 172 costFunctions = new CostFunction[]{ 173 new RegionCountSkewCostFunction(conf), 174 new PrimaryRegionCountSkewCostFunction(conf), 175 new MoveCostFunction(conf), 176 localityCost, 177 new TableSkewCostFunction(conf), 178 regionReplicaHostCostFunction, 179 regionReplicaRackCostFunction, 180 regionLoadFunctions[0], 181 regionLoadFunctions[1], 182 regionLoadFunctions[2], 183 regionLoadFunctions[3], 184 }; 185 } 186 187 @Override setSlop(Configuration conf)188 protected void setSlop(Configuration conf) { 189 this.slop = conf.getFloat("hbase.regions.slop", 0.001F); 190 } 191 192 @Override setClusterStatus(ClusterStatus st)193 public synchronized void setClusterStatus(ClusterStatus st) { 194 super.setClusterStatus(st); 195 updateRegionLoad(); 196 for(CostFromRegionLoadFunction cost : regionLoadFunctions) { 197 cost.setClusterStatus(st); 198 } 199 } 200 201 @Override setMasterServices(MasterServices masterServices)202 public synchronized void setMasterServices(MasterServices masterServices) { 203 super.setMasterServices(masterServices); 204 this.localityCost.setServices(masterServices); 205 this.localityCandidateGenerator.setServices(masterServices); 206 207 } 208 209 @Override areSomeRegionReplicasColocated(Cluster c)210 protected synchronized boolean areSomeRegionReplicasColocated(Cluster c) { 211 regionReplicaHostCostFunction.init(c); 212 if (regionReplicaHostCostFunction.cost() > 0) return true; 213 regionReplicaRackCostFunction.init(c); 214 if (regionReplicaRackCostFunction.cost() > 0) return true; 215 return false; 216 } 217 218 /** 219 * Given the cluster state this will try and approach an optimal balance. This 220 * should always approach the optimal state given enough steps. 221 */ 222 @Override balanceCluster(Map<ServerName, List<HRegionInfo>> clusterState)223 public synchronized List<RegionPlan> balanceCluster(Map<ServerName, 224 List<HRegionInfo>> clusterState) { 225 List<RegionPlan> plans = balanceMasterRegions(clusterState); 226 if (plans != null || clusterState == null || clusterState.size() <= 1) { 227 return plans; 228 } 229 if (masterServerName != null && clusterState.containsKey(masterServerName)) { 230 if (clusterState.size() <= 2) { 231 return null; 232 } 233 clusterState = new HashMap<ServerName, List<HRegionInfo>>(clusterState); 234 clusterState.remove(masterServerName); 235 } 236 237 // On clusters with lots of HFileLinks or lots of reference files, 238 // instantiating the storefile infos can be quite expensive. 239 // Allow turning this feature off if the locality cost is not going to 240 // be used in any computations. 241 RegionLocationFinder finder = null; 242 if (this.localityCost != null && this.localityCost.getMultiplier() > 0) { 243 finder = this.regionFinder; 244 } 245 246 //The clusterState that is given to this method contains the state 247 //of all the regions in the table(s) (that's true today) 248 // Keep track of servers to iterate through them. 249 Cluster cluster = new Cluster(clusterState, loads, finder, rackManager); 250 if (!needsBalance(cluster)) { 251 return null; 252 } 253 254 long startTime = EnvironmentEdgeManager.currentTime(); 255 256 initCosts(cluster); 257 258 double currentCost = computeCost(cluster, Double.MAX_VALUE); 259 260 double initCost = currentCost; 261 double newCost = currentCost; 262 263 long computedMaxSteps = Math.min(this.maxSteps, 264 ((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers)); 265 // Perform a stochastic walk to see if we can get a good fit. 266 long step; 267 268 for (step = 0; step < computedMaxSteps; step++) { 269 int generatorIdx = RANDOM.nextInt(candidateGenerators.length); 270 CandidateGenerator p = candidateGenerators[generatorIdx]; 271 Cluster.Action action = p.generate(cluster); 272 273 if (action.type == Type.NULL) { 274 continue; 275 } 276 277 cluster.doAction(action); 278 updateCostsWithAction(cluster, action); 279 280 newCost = computeCost(cluster, currentCost); 281 282 // Should this be kept? 283 if (newCost < currentCost) { 284 currentCost = newCost; 285 } else { 286 // Put things back the way they were before. 287 // TODO: undo by remembering old values 288 Action undoAction = action.undoAction(); 289 cluster.doAction(undoAction); 290 updateCostsWithAction(cluster, undoAction); 291 } 292 293 if (EnvironmentEdgeManager.currentTime() - startTime > 294 maxRunningTime) { 295 break; 296 } 297 } 298 long endTime = EnvironmentEdgeManager.currentTime(); 299 300 metricsBalancer.balanceCluster(endTime - startTime); 301 302 if (initCost > currentCost) { 303 plans = createRegionPlans(cluster); 304 if (LOG.isDebugEnabled()) { 305 LOG.debug("Finished computing new load balance plan. Computation took " 306 + (endTime - startTime) + "ms to try " + step 307 + " different iterations. Found a solution that moves " 308 + plans.size() + " regions; Going from a computed cost of " 309 + initCost + " to a new cost of " + currentCost); 310 } 311 return plans; 312 } 313 if (LOG.isDebugEnabled()) { 314 LOG.debug("Could not find a better load balance plan. Tried " 315 + step + " different configurations in " + (endTime - startTime) 316 + "ms, and did not find anything with a computed cost less than " + initCost); 317 } 318 return null; 319 } 320 321 /** 322 * Create all of the RegionPlan's needed to move from the initial cluster state to the desired 323 * state. 324 * 325 * @param cluster The state of the cluster 326 * @return List of RegionPlan's that represent the moves needed to get to desired final state. 327 */ createRegionPlans(Cluster cluster)328 private List<RegionPlan> createRegionPlans(Cluster cluster) { 329 List<RegionPlan> plans = new LinkedList<RegionPlan>(); 330 for (int regionIndex = 0; 331 regionIndex < cluster.regionIndexToServerIndex.length; regionIndex++) { 332 int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex]; 333 int newServerIndex = cluster.regionIndexToServerIndex[regionIndex]; 334 335 if (initialServerIndex != newServerIndex) { 336 HRegionInfo region = cluster.regions[regionIndex]; 337 ServerName initialServer = cluster.servers[initialServerIndex]; 338 ServerName newServer = cluster.servers[newServerIndex]; 339 340 if (LOG.isTraceEnabled()) { 341 LOG.trace("Moving Region " + region.getEncodedName() + " from server " 342 + initialServer.getHostname() + " to " + newServer.getHostname()); 343 } 344 RegionPlan rp = new RegionPlan(region, initialServer, newServer); 345 plans.add(rp); 346 } 347 } 348 return plans; 349 } 350 351 /** 352 * Store the current region loads. 353 */ updateRegionLoad()354 private synchronized void updateRegionLoad() { 355 // We create a new hashmap so that regions that are no longer there are removed. 356 // However we temporarily need the old loads so we can use them to keep the rolling average. 357 Map<String, Deque<RegionLoad>> oldLoads = loads; 358 loads = new HashMap<String, Deque<RegionLoad>>(); 359 360 for (ServerName sn : clusterStatus.getServers()) { 361 ServerLoad sl = clusterStatus.getLoad(sn); 362 if (sl == null) { 363 continue; 364 } 365 for (Entry<byte[], RegionLoad> entry : sl.getRegionsLoad().entrySet()) { 366 Deque<RegionLoad> rLoads = oldLoads.get(Bytes.toString(entry.getKey())); 367 if (rLoads == null) { 368 // There was nothing there 369 rLoads = new ArrayDeque<RegionLoad>(); 370 } else if (rLoads.size() >= numRegionLoadsToRemember) { 371 rLoads.remove(); 372 } 373 rLoads.add(entry.getValue()); 374 loads.put(Bytes.toString(entry.getKey()), rLoads); 375 376 } 377 } 378 379 for(CostFromRegionLoadFunction cost : regionLoadFunctions) { 380 cost.setLoads(loads); 381 } 382 } 383 initCosts(Cluster cluster)384 protected void initCosts(Cluster cluster) { 385 for (CostFunction c:costFunctions) { 386 c.init(cluster); 387 } 388 } 389 updateCostsWithAction(Cluster cluster, Action action)390 protected void updateCostsWithAction(Cluster cluster, Action action) { 391 for (CostFunction c : costFunctions) { 392 c.postAction(action); 393 } 394 } 395 396 /** 397 * This is the main cost function. It will compute a cost associated with a proposed cluster 398 * state. All different costs will be combined with their multipliers to produce a double cost. 399 * 400 * @param cluster The state of the cluster 401 * @param previousCost the previous cost. This is used as an early out. 402 * @return a double of a cost associated with the proposed cluster state. This cost is an 403 * aggregate of all individual cost functions. 404 */ computeCost(Cluster cluster, double previousCost)405 protected double computeCost(Cluster cluster, double previousCost) { 406 double total = 0; 407 408 for (CostFunction c:costFunctions) { 409 if (c.getMultiplier() <= 0) { 410 continue; 411 } 412 413 total += c.getMultiplier() * c.cost(); 414 415 if (total > previousCost) { 416 return total; 417 } 418 } 419 return total; 420 } 421 422 /** Generates a candidate action to be applied to the cluster for cost function search */ 423 abstract static class CandidateGenerator { generate(Cluster cluster)424 abstract Cluster.Action generate(Cluster cluster); 425 426 /** 427 * From a list of regions pick a random one. Null can be returned which 428 * {@link StochasticLoadBalancer#balanceCluster(Map)} recognize as signal to try a region move 429 * rather than swap. 430 * 431 * @param cluster The state of the cluster 432 * @param server index of the server 433 * @param chanceOfNoSwap Chance that this will decide to try a move rather 434 * than a swap. 435 * @return a random {@link HRegionInfo} or null if an asymmetrical move is 436 * suggested. 437 */ pickRandomRegion(Cluster cluster, int server, double chanceOfNoSwap)438 protected int pickRandomRegion(Cluster cluster, int server, double chanceOfNoSwap) { 439 // Check to see if this is just a move. 440 if (cluster.regionsPerServer[server].length == 0 || RANDOM.nextFloat() < chanceOfNoSwap) { 441 // signal a move only. 442 return -1; 443 } 444 int rand = RANDOM.nextInt(cluster.regionsPerServer[server].length); 445 return cluster.regionsPerServer[server][rand]; 446 447 } pickRandomServer(Cluster cluster)448 protected int pickRandomServer(Cluster cluster) { 449 if (cluster.numServers < 1) { 450 return -1; 451 } 452 453 return RANDOM.nextInt(cluster.numServers); 454 } 455 pickRandomRack(Cluster cluster)456 protected int pickRandomRack(Cluster cluster) { 457 if (cluster.numRacks < 1) { 458 return -1; 459 } 460 461 return RANDOM.nextInt(cluster.numRacks); 462 } 463 pickOtherRandomServer(Cluster cluster, int serverIndex)464 protected int pickOtherRandomServer(Cluster cluster, int serverIndex) { 465 if (cluster.numServers < 2) { 466 return -1; 467 } 468 while (true) { 469 int otherServerIndex = pickRandomServer(cluster); 470 if (otherServerIndex != serverIndex) { 471 return otherServerIndex; 472 } 473 } 474 } 475 pickOtherRandomRack(Cluster cluster, int rackIndex)476 protected int pickOtherRandomRack(Cluster cluster, int rackIndex) { 477 if (cluster.numRacks < 2) { 478 return -1; 479 } 480 while (true) { 481 int otherRackIndex = pickRandomRack(cluster); 482 if (otherRackIndex != rackIndex) { 483 return otherRackIndex; 484 } 485 } 486 } 487 pickRandomRegions(Cluster cluster, int thisServer, int otherServer)488 protected Cluster.Action pickRandomRegions(Cluster cluster, 489 int thisServer, 490 int otherServer) { 491 if (thisServer < 0 || otherServer < 0) { 492 return Cluster.NullAction; 493 } 494 495 // Decide who is most likely to need another region 496 int thisRegionCount = cluster.getNumRegions(thisServer); 497 int otherRegionCount = cluster.getNumRegions(otherServer); 498 499 // Assign the chance based upon the above 500 double thisChance = (thisRegionCount > otherRegionCount) ? 0 : 0.5; 501 double otherChance = (thisRegionCount <= otherRegionCount) ? 0 : 0.5; 502 503 int thisRegion = pickRandomRegion(cluster, thisServer, thisChance); 504 int otherRegion = pickRandomRegion(cluster, otherServer, otherChance); 505 506 return getAction(thisServer, thisRegion, otherServer, otherRegion); 507 } 508 getAction(int fromServer, int fromRegion, int toServer, int toRegion)509 protected Cluster.Action getAction (int fromServer, int fromRegion, 510 int toServer, int toRegion) { 511 if (fromServer < 0 || toServer < 0) { 512 return Cluster.NullAction; 513 } 514 if (fromRegion > 0 && toRegion > 0) { 515 return new Cluster.SwapRegionsAction(fromServer, fromRegion, 516 toServer, toRegion); 517 } else if (fromRegion > 0) { 518 return new Cluster.MoveRegionAction(fromRegion, fromServer, toServer); 519 } else if (toRegion > 0) { 520 return new Cluster.MoveRegionAction(toRegion, toServer, fromServer); 521 } else { 522 return Cluster.NullAction; 523 } 524 } 525 } 526 527 static class RandomCandidateGenerator extends CandidateGenerator { 528 529 @Override generate(Cluster cluster)530 Cluster.Action generate(Cluster cluster) { 531 532 int thisServer = pickRandomServer(cluster); 533 534 // Pick the other server 535 int otherServer = pickOtherRandomServer(cluster, thisServer); 536 537 return pickRandomRegions(cluster, thisServer, otherServer); 538 } 539 } 540 541 static class LoadCandidateGenerator extends CandidateGenerator { 542 543 @Override generate(Cluster cluster)544 Cluster.Action generate(Cluster cluster) { 545 cluster.sortServersByRegionCount(); 546 int thisServer = pickMostLoadedServer(cluster, -1); 547 int otherServer = pickLeastLoadedServer(cluster, thisServer); 548 549 return pickRandomRegions(cluster, thisServer, otherServer); 550 } 551 pickLeastLoadedServer(final Cluster cluster, int thisServer)552 private int pickLeastLoadedServer(final Cluster cluster, int thisServer) { 553 Integer[] servers = cluster.serverIndicesSortedByRegionCount; 554 555 int index = 0; 556 while (servers[index] == null || servers[index] == thisServer) { 557 index++; 558 if (index == servers.length) { 559 return -1; 560 } 561 } 562 return servers[index]; 563 } 564 pickMostLoadedServer(final Cluster cluster, int thisServer)565 private int pickMostLoadedServer(final Cluster cluster, int thisServer) { 566 Integer[] servers = cluster.serverIndicesSortedByRegionCount; 567 568 int index = servers.length - 1; 569 while (servers[index] == null || servers[index] == thisServer) { 570 index--; 571 if (index < 0) { 572 return -1; 573 } 574 } 575 return servers[index]; 576 } 577 } 578 579 static class LocalityBasedCandidateGenerator extends CandidateGenerator { 580 581 private MasterServices masterServices; 582 LocalityBasedCandidateGenerator(MasterServices masterServices)583 LocalityBasedCandidateGenerator(MasterServices masterServices) { 584 this.masterServices = masterServices; 585 } 586 587 @Override generate(Cluster cluster)588 Cluster.Action generate(Cluster cluster) { 589 if (this.masterServices == null) { 590 int thisServer = pickRandomServer(cluster); 591 // Pick the other server 592 int otherServer = pickOtherRandomServer(cluster, thisServer); 593 return pickRandomRegions(cluster, thisServer, otherServer); 594 } 595 596 cluster.calculateRegionServerLocalities(); 597 // Pick server with lowest locality 598 int thisServer = pickLowestLocalityServer(cluster); 599 int thisRegion; 600 if (thisServer == -1) { 601 LOG.warn("Could not pick lowest locality region server"); 602 return Cluster.NullAction; 603 } else { 604 // Pick lowest locality region on this server 605 thisRegion = pickLowestLocalityRegionOnServer(cluster, thisServer); 606 } 607 608 if (thisRegion == -1) { 609 return Cluster.NullAction; 610 } 611 612 // Pick the least loaded server with good locality for the region 613 int otherServer = cluster.getLeastLoadedTopServerForRegion(thisRegion); 614 615 if (otherServer == -1) { 616 return Cluster.NullAction; 617 } 618 619 // Let the candidate region be moved to its highest locality server. 620 int otherRegion = -1; 621 622 return getAction(thisServer, thisRegion, otherServer, otherRegion); 623 } 624 pickLowestLocalityServer(Cluster cluster)625 private int pickLowestLocalityServer(Cluster cluster) { 626 return cluster.getLowestLocalityRegionServer(); 627 } 628 pickLowestLocalityRegionOnServer(Cluster cluster, int server)629 private int pickLowestLocalityRegionOnServer(Cluster cluster, int server) { 630 return cluster.getLowestLocalityRegionOnServer(server); 631 } 632 setServices(MasterServices services)633 void setServices(MasterServices services) { 634 this.masterServices = services; 635 } 636 } 637 638 /** 639 * Generates candidates which moves the replicas out of the region server for 640 * co-hosted region replicas 641 */ 642 static class RegionReplicaCandidateGenerator extends CandidateGenerator { 643 644 RandomCandidateGenerator randomGenerator = new RandomCandidateGenerator(); 645 646 /** 647 * Randomly select one regionIndex out of all region replicas co-hosted in the same group 648 * (a group is a server, host or rack) 649 * @param primariesOfRegionsPerGroup either Cluster.primariesOfRegionsPerServer, 650 * primariesOfRegionsPerHost or primariesOfRegionsPerRack 651 * @param regionsPerGroup either Cluster.regionsPerServer, regionsPerHost or regionsPerRack 652 * @param regionIndexToPrimaryIndex Cluster.regionsIndexToPrimaryIndex 653 * @return a regionIndex for the selected primary or -1 if there is no co-locating 654 */ selectCoHostedRegionPerGroup(int[] primariesOfRegionsPerGroup, int[] regionsPerGroup , int[] regionIndexToPrimaryIndex)655 int selectCoHostedRegionPerGroup(int[] primariesOfRegionsPerGroup, int[] regionsPerGroup 656 , int[] regionIndexToPrimaryIndex) { 657 int currentPrimary = -1; 658 int currentPrimaryIndex = -1; 659 int selectedPrimaryIndex = -1; 660 double currentLargestRandom = -1; 661 // primariesOfRegionsPerGroup is a sorted array. Since it contains the primary region 662 // ids for the regions hosted in server, a consecutive repetition means that replicas 663 // are co-hosted 664 for (int j = 0; j <= primariesOfRegionsPerGroup.length; j++) { 665 int primary = j < primariesOfRegionsPerGroup.length 666 ? primariesOfRegionsPerGroup[j] : -1; 667 if (primary != currentPrimary) { // check for whether we see a new primary 668 int numReplicas = j - currentPrimaryIndex; 669 if (numReplicas > 1) { // means consecutive primaries, indicating co-location 670 // decide to select this primary region id or not 671 double currentRandom = RANDOM.nextDouble(); 672 // we don't know how many region replicas are co-hosted, we will randomly select one 673 // using reservoir sampling (http://gregable.com/2007/10/reservoir-sampling.html) 674 if (currentRandom > currentLargestRandom) { 675 selectedPrimaryIndex = currentPrimary; 676 currentLargestRandom = currentRandom; 677 } 678 } 679 currentPrimary = primary; 680 currentPrimaryIndex = j; 681 } 682 } 683 684 // we have found the primary id for the region to move. Now find the actual regionIndex 685 // with the given primary, prefer to move the secondary region. 686 for (int j = 0; j < regionsPerGroup.length; j++) { 687 int regionIndex = regionsPerGroup[j]; 688 if (selectedPrimaryIndex == regionIndexToPrimaryIndex[regionIndex]) { 689 // always move the secondary, not the primary 690 if (selectedPrimaryIndex != regionIndex) { 691 return regionIndex; 692 } 693 } 694 } 695 return -1; 696 } 697 698 @Override generate(Cluster cluster)699 Cluster.Action generate(Cluster cluster) { 700 int serverIndex = pickRandomServer(cluster); 701 if (cluster.numServers <= 1 || serverIndex == -1) { 702 return Cluster.NullAction; 703 } 704 705 int regionIndex = selectCoHostedRegionPerGroup( 706 cluster.primariesOfRegionsPerServer[serverIndex], 707 cluster.regionsPerServer[serverIndex], 708 cluster.regionIndexToPrimaryIndex); 709 710 // if there are no pairs of region replicas co-hosted, default to random generator 711 if (regionIndex == -1) { 712 // default to randompicker 713 return randomGenerator.generate(cluster); 714 } 715 716 int toServerIndex = pickOtherRandomServer(cluster, serverIndex); 717 int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f); 718 return getAction (serverIndex, regionIndex, toServerIndex, toRegionIndex); 719 } 720 } 721 722 /** 723 * Generates candidates which moves the replicas out of the rack for 724 * co-hosted region replicas in the same rack 725 */ 726 static class RegionReplicaRackCandidateGenerator extends RegionReplicaCandidateGenerator { 727 @Override generate(Cluster cluster)728 Cluster.Action generate(Cluster cluster) { 729 int rackIndex = pickRandomRack(cluster); 730 if (cluster.numRacks <= 1 || rackIndex == -1) { 731 return super.generate(cluster); 732 } 733 734 int regionIndex = selectCoHostedRegionPerGroup( 735 cluster.primariesOfRegionsPerRack[rackIndex], 736 cluster.regionsPerRack[rackIndex], 737 cluster.regionIndexToPrimaryIndex); 738 739 // if there are no pairs of region replicas co-hosted, default to random generator 740 if (regionIndex == -1) { 741 // default to randompicker 742 return randomGenerator.generate(cluster); 743 } 744 745 int serverIndex = cluster.regionIndexToServerIndex[regionIndex]; 746 int toRackIndex = pickOtherRandomRack(cluster, rackIndex); 747 748 int rand = RANDOM.nextInt(cluster.serversPerRack[toRackIndex].length); 749 int toServerIndex = cluster.serversPerRack[toRackIndex][rand]; 750 int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f); 751 return getAction (serverIndex, regionIndex, toServerIndex, toRegionIndex); 752 } 753 } 754 755 /** 756 * Base class of StochasticLoadBalancer's Cost Functions. 757 */ 758 abstract static class CostFunction { 759 760 private float multiplier = 0; 761 762 protected Cluster cluster; 763 CostFunction(Configuration c)764 CostFunction(Configuration c) { 765 766 } 767 getMultiplier()768 float getMultiplier() { 769 return multiplier; 770 } 771 setMultiplier(float m)772 void setMultiplier(float m) { 773 this.multiplier = m; 774 } 775 776 /** Called once per LB invocation to give the cost function 777 * to initialize it's state, and perform any costly calculation. 778 */ init(Cluster cluster)779 void init(Cluster cluster) { 780 this.cluster = cluster; 781 } 782 783 /** Called once per cluster Action to give the cost function 784 * an opportunity to update it's state. postAction() is always 785 * called at least once before cost() is called with the cluster 786 * that this action is performed on. */ postAction(Action action)787 void postAction(Action action) { 788 switch (action.type) { 789 case NULL: break; 790 case ASSIGN_REGION: 791 AssignRegionAction ar = (AssignRegionAction) action; 792 regionMoved(ar.region, -1, ar.server); 793 break; 794 case MOVE_REGION: 795 MoveRegionAction mra = (MoveRegionAction) action; 796 regionMoved(mra.region, mra.fromServer, mra.toServer); 797 break; 798 case SWAP_REGIONS: 799 SwapRegionsAction a = (SwapRegionsAction) action; 800 regionMoved(a.fromRegion, a.fromServer, a.toServer); 801 regionMoved(a.toRegion, a.toServer, a.fromServer); 802 break; 803 default: 804 throw new RuntimeException("Uknown action:" + action.type); 805 } 806 } 807 regionMoved(int region, int oldServer, int newServer)808 protected void regionMoved(int region, int oldServer, int newServer) { 809 } 810 cost()811 abstract double cost(); 812 813 /** 814 * Function to compute a scaled cost using {@link DescriptiveStatistics}. It 815 * assumes that this is a zero sum set of costs. It assumes that the worst case 816 * possible is all of the elements in one region server and the rest having 0. 817 * 818 * @param stats the costs 819 * @return a scaled set of costs. 820 */ costFromArray(double[] stats)821 protected double costFromArray(double[] stats) { 822 double totalCost = 0; 823 double total = getSum(stats); 824 825 double count = stats.length; 826 double mean = total/count; 827 828 // Compute max as if all region servers had 0 and one had the sum of all costs. This must be 829 // a zero sum cost for this to make sense. 830 double max = ((count - 1) * mean) + (total - mean); 831 832 // It's possible that there aren't enough regions to go around 833 double min; 834 if (count > total) { 835 min = ((count - total) * mean) + ((1 - mean) * total); 836 } else { 837 // Some will have 1 more than everything else. 838 int numHigh = (int) (total - (Math.floor(mean) * count)); 839 int numLow = (int) (count - numHigh); 840 841 min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean))); 842 843 } 844 min = Math.max(0, min); 845 for (int i=0; i<stats.length; i++) { 846 double n = stats[i]; 847 double diff = Math.abs(mean - n); 848 totalCost += diff; 849 } 850 851 double scaled = scale(min, max, totalCost); 852 return scaled; 853 } 854 getSum(double[] stats)855 private double getSum(double[] stats) { 856 double total = 0; 857 for(double s:stats) { 858 total += s; 859 } 860 return total; 861 } 862 863 /** 864 * Scale the value between 0 and 1. 865 * 866 * @param min Min value 867 * @param max The Max value 868 * @param value The value to be scaled. 869 * @return The scaled value. 870 */ scale(double min, double max, double value)871 protected double scale(double min, double max, double value) { 872 if (max <= min || value <= min) { 873 return 0; 874 } 875 if ((max - min) == 0) return 0; 876 877 return Math.max(0d, Math.min(1d, (value - min) / (max - min))); 878 } 879 } 880 881 /** 882 * Given the starting state of the regions and a potential ending state 883 * compute cost based upon the number of regions that have moved. 884 */ 885 static class MoveCostFunction extends CostFunction { 886 private static final String MOVE_COST_KEY = "hbase.master.balancer.stochastic.moveCost"; 887 private static final String MAX_MOVES_PERCENT_KEY = 888 "hbase.master.balancer.stochastic.maxMovePercent"; 889 private static final float DEFAULT_MOVE_COST = 100; 890 private static final int DEFAULT_MAX_MOVES = 600; 891 private static final float DEFAULT_MAX_MOVE_PERCENT = 0.25f; 892 893 private final float maxMovesPercent; 894 MoveCostFunction(Configuration conf)895 MoveCostFunction(Configuration conf) { 896 super(conf); 897 898 // Move cost multiplier should be the same cost or higher than the rest of the costs to ensure 899 // that large benefits are need to overcome the cost of a move. 900 this.setMultiplier(conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST)); 901 // What percent of the number of regions a single run of the balancer can move. 902 maxMovesPercent = conf.getFloat(MAX_MOVES_PERCENT_KEY, DEFAULT_MAX_MOVE_PERCENT); 903 } 904 905 @Override cost()906 double cost() { 907 // Try and size the max number of Moves, but always be prepared to move some. 908 int maxMoves = Math.max((int) (cluster.numRegions * maxMovesPercent), 909 DEFAULT_MAX_MOVES); 910 911 double moveCost = cluster.numMovedRegions; 912 913 // Don't let this single balance move more than the max moves. 914 // This allows better scaling to accurately represent the actual cost of a move. 915 if (moveCost > maxMoves) { 916 return 1000000; // return a number much greater than any of the other cost 917 } 918 919 return scale(0, cluster.numRegions, moveCost); 920 } 921 } 922 923 /** 924 * Compute the cost of a potential cluster state from skew in number of 925 * regions on a cluster. 926 */ 927 static class RegionCountSkewCostFunction extends CostFunction { 928 private static final String REGION_COUNT_SKEW_COST_KEY = 929 "hbase.master.balancer.stochastic.regionCountCost"; 930 private static final float DEFAULT_REGION_COUNT_SKEW_COST = 500; 931 932 private double[] stats = null; 933 RegionCountSkewCostFunction(Configuration conf)934 RegionCountSkewCostFunction(Configuration conf) { 935 super(conf); 936 // Load multiplier should be the greatest as it is the most general way to balance data. 937 this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST)); 938 } 939 940 @Override cost()941 double cost() { 942 if (stats == null || stats.length != cluster.numServers) { 943 stats = new double[cluster.numServers]; 944 } 945 946 for (int i =0; i < cluster.numServers; i++) { 947 stats[i] = cluster.regionsPerServer[i].length; 948 } 949 950 return costFromArray(stats); 951 } 952 } 953 954 /** 955 * Compute the cost of a potential cluster state from skew in number of 956 * primary regions on a cluster. 957 */ 958 static class PrimaryRegionCountSkewCostFunction extends CostFunction { 959 private static final String PRIMARY_REGION_COUNT_SKEW_COST_KEY = 960 "hbase.master.balancer.stochastic.primaryRegionCountCost"; 961 private static final float DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST = 500; 962 963 private double[] stats = null; 964 PrimaryRegionCountSkewCostFunction(Configuration conf)965 PrimaryRegionCountSkewCostFunction(Configuration conf) { 966 super(conf); 967 // Load multiplier should be the greatest as primary regions serve majority of reads/writes. 968 this.setMultiplier(conf.getFloat(PRIMARY_REGION_COUNT_SKEW_COST_KEY, 969 DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST)); 970 } 971 972 @Override cost()973 double cost() { 974 if (!cluster.hasRegionReplicas) { 975 return 0; 976 } 977 if (stats == null || stats.length != cluster.numServers) { 978 stats = new double[cluster.numServers]; 979 } 980 981 for (int i =0; i < cluster.numServers; i++) { 982 stats[i] = 0; 983 for (int regionIdx : cluster.regionsPerServer[i]) { 984 if (regionIdx == cluster.regionIndexToPrimaryIndex[regionIdx]) { 985 stats[i] ++; 986 } 987 } 988 } 989 990 return costFromArray(stats); 991 } 992 } 993 994 /** 995 * Compute the cost of a potential cluster configuration based upon how evenly 996 * distributed tables are. 997 */ 998 static class TableSkewCostFunction extends CostFunction { 999 1000 private static final String TABLE_SKEW_COST_KEY = 1001 "hbase.master.balancer.stochastic.tableSkewCost"; 1002 private static final float DEFAULT_TABLE_SKEW_COST = 35; 1003 TableSkewCostFunction(Configuration conf)1004 TableSkewCostFunction(Configuration conf) { 1005 super(conf); 1006 this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST)); 1007 } 1008 1009 @Override cost()1010 double cost() { 1011 double max = cluster.numRegions; 1012 double min = ((double) cluster.numRegions) / cluster.numServers; 1013 double value = 0; 1014 1015 for (int i = 0; i < cluster.numMaxRegionsPerTable.length; i++) { 1016 value += cluster.numMaxRegionsPerTable[i]; 1017 } 1018 1019 return scale(min, max, value); 1020 } 1021 } 1022 1023 /** 1024 * Compute a cost of a potential cluster configuration based upon where 1025 * {@link org.apache.hadoop.hbase.regionserver.StoreFile}s are located. 1026 */ 1027 static class LocalityCostFunction extends CostFunction { 1028 1029 private static final String LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.localityCost"; 1030 private static final float DEFAULT_LOCALITY_COST = 25; 1031 1032 private MasterServices services; 1033 LocalityCostFunction(Configuration conf, MasterServices srv)1034 LocalityCostFunction(Configuration conf, MasterServices srv) { 1035 super(conf); 1036 this.setMultiplier(conf.getFloat(LOCALITY_COST_KEY, DEFAULT_LOCALITY_COST)); 1037 this.services = srv; 1038 } 1039 setServices(MasterServices srvc)1040 void setServices(MasterServices srvc) { 1041 this.services = srvc; 1042 } 1043 1044 @Override cost()1045 double cost() { 1046 double max = 0; 1047 double cost = 0; 1048 1049 // If there's no master so there's no way anything else works. 1050 if (this.services == null) { 1051 return cost; 1052 } 1053 1054 for (int i = 0; i < cluster.regionLocations.length; i++) { 1055 max += 1; 1056 int serverIndex = cluster.regionIndexToServerIndex[i]; 1057 int[] regionLocations = cluster.regionLocations[i]; 1058 1059 // If we can't find where the data is getTopBlock returns null. 1060 // so count that as being the best possible. 1061 if (regionLocations == null) { 1062 continue; 1063 } 1064 1065 int index = -1; 1066 for (int j = 0; j < regionLocations.length; j++) { 1067 if (regionLocations[j] >= 0 && regionLocations[j] == serverIndex) { 1068 index = j; 1069 break; 1070 } 1071 } 1072 1073 if (index < 0) { 1074 cost += 1; 1075 } else { 1076 cost += (1 - cluster.getLocalityOfRegion(i, index)); 1077 } 1078 } 1079 return scale(0, max, cost); 1080 } 1081 } 1082 1083 /** 1084 * Base class the allows writing costs functions from rolling average of some 1085 * number from RegionLoad. 1086 */ 1087 abstract static class CostFromRegionLoadFunction extends CostFunction { 1088 1089 private ClusterStatus clusterStatus = null; 1090 private Map<String, Deque<RegionLoad>> loads = null; 1091 private double[] stats = null; CostFromRegionLoadFunction(Configuration conf)1092 CostFromRegionLoadFunction(Configuration conf) { 1093 super(conf); 1094 } 1095 setClusterStatus(ClusterStatus status)1096 void setClusterStatus(ClusterStatus status) { 1097 this.clusterStatus = status; 1098 } 1099 setLoads(Map<String, Deque<RegionLoad>> l)1100 void setLoads(Map<String, Deque<RegionLoad>> l) { 1101 this.loads = l; 1102 } 1103 1104 @Override cost()1105 double cost() { 1106 if (clusterStatus == null || loads == null) { 1107 return 0; 1108 } 1109 1110 if (stats == null || stats.length != cluster.numServers) { 1111 stats = new double[cluster.numServers]; 1112 } 1113 1114 for (int i =0; i < stats.length; i++) { 1115 //Cost this server has from RegionLoad 1116 long cost = 0; 1117 1118 // for every region on this server get the rl 1119 for(int regionIndex:cluster.regionsPerServer[i]) { 1120 Collection<RegionLoad> regionLoadList = cluster.regionLoads[regionIndex]; 1121 1122 // Now if we found a region load get the type of cost that was requested. 1123 if (regionLoadList != null) { 1124 cost += getRegionLoadCost(regionLoadList); 1125 } 1126 } 1127 1128 // Add the total cost to the stats. 1129 stats[i] = cost; 1130 } 1131 1132 // Now return the scaled cost from data held in the stats object. 1133 return costFromArray(stats); 1134 } 1135 getRegionLoadCost(Collection<RegionLoad> regionLoadList)1136 protected double getRegionLoadCost(Collection<RegionLoad> regionLoadList) { 1137 double cost = 0; 1138 1139 for (RegionLoad rl : regionLoadList) { 1140 double toAdd = getCostFromRl(rl); 1141 1142 if (cost == 0) { 1143 cost = toAdd; 1144 } else { 1145 cost = (.5 * cost) + (.5 * toAdd); 1146 } 1147 } 1148 1149 return cost; 1150 } 1151 getCostFromRl(RegionLoad rl)1152 protected abstract double getCostFromRl(RegionLoad rl); 1153 } 1154 1155 /** 1156 * Compute the cost of total number of read requests The more unbalanced the higher the 1157 * computed cost will be. This uses a rolling average of regionload. 1158 */ 1159 1160 static class ReadRequestCostFunction extends CostFromRegionLoadFunction { 1161 1162 private static final String READ_REQUEST_COST_KEY = 1163 "hbase.master.balancer.stochastic.readRequestCost"; 1164 private static final float DEFAULT_READ_REQUEST_COST = 5; 1165 ReadRequestCostFunction(Configuration conf)1166 ReadRequestCostFunction(Configuration conf) { 1167 super(conf); 1168 this.setMultiplier(conf.getFloat(READ_REQUEST_COST_KEY, DEFAULT_READ_REQUEST_COST)); 1169 } 1170 1171 1172 @Override getCostFromRl(RegionLoad rl)1173 protected double getCostFromRl(RegionLoad rl) { 1174 return rl.getReadRequestsCount(); 1175 } 1176 } 1177 1178 /** 1179 * Compute the cost of total number of write requests. The more unbalanced the higher the 1180 * computed cost will be. This uses a rolling average of regionload. 1181 */ 1182 static class WriteRequestCostFunction extends CostFromRegionLoadFunction { 1183 1184 private static final String WRITE_REQUEST_COST_KEY = 1185 "hbase.master.balancer.stochastic.writeRequestCost"; 1186 private static final float DEFAULT_WRITE_REQUEST_COST = 5; 1187 WriteRequestCostFunction(Configuration conf)1188 WriteRequestCostFunction(Configuration conf) { 1189 super(conf); 1190 this.setMultiplier(conf.getFloat(WRITE_REQUEST_COST_KEY, DEFAULT_WRITE_REQUEST_COST)); 1191 } 1192 1193 @Override getCostFromRl(RegionLoad rl)1194 protected double getCostFromRl(RegionLoad rl) { 1195 return rl.getWriteRequestsCount(); 1196 } 1197 } 1198 1199 /** 1200 * A cost function for region replicas. We give a very high cost to hosting 1201 * replicas of the same region in the same host. We do not prevent the case 1202 * though, since if numReplicas > numRegionServers, we still want to keep the 1203 * replica open. 1204 */ 1205 static class RegionReplicaHostCostFunction extends CostFunction { 1206 private static final String REGION_REPLICA_HOST_COST_KEY = 1207 "hbase.master.balancer.stochastic.regionReplicaHostCostKey"; 1208 private static final float DEFAULT_REGION_REPLICA_HOST_COST_KEY = 100000; 1209 1210 long maxCost = 0; 1211 long[] costsPerGroup; // group is either server, host or rack 1212 int[][] primariesOfRegionsPerGroup; 1213 RegionReplicaHostCostFunction(Configuration conf)1214 public RegionReplicaHostCostFunction(Configuration conf) { 1215 super(conf); 1216 this.setMultiplier(conf.getFloat(REGION_REPLICA_HOST_COST_KEY, 1217 DEFAULT_REGION_REPLICA_HOST_COST_KEY)); 1218 } 1219 1220 @Override init(Cluster cluster)1221 void init(Cluster cluster) { 1222 super.init(cluster); 1223 // max cost is the case where every region replica is hosted together regardless of host 1224 maxCost = cluster.numHosts > 1 ? getMaxCost(cluster) : 0; 1225 costsPerGroup = new long[cluster.numHosts]; 1226 primariesOfRegionsPerGroup = cluster.multiServersPerHost // either server based or host based 1227 ? cluster.primariesOfRegionsPerHost 1228 : cluster.primariesOfRegionsPerServer; 1229 for (int i = 0 ; i < primariesOfRegionsPerGroup.length; i++) { 1230 costsPerGroup[i] = costPerGroup(primariesOfRegionsPerGroup[i]); 1231 } 1232 } 1233 getMaxCost(Cluster cluster)1234 long getMaxCost(Cluster cluster) { 1235 if (!cluster.hasRegionReplicas) { 1236 return 0; // short circuit 1237 } 1238 // max cost is the case where every region replica is hosted together regardless of host 1239 int[] primariesOfRegions = new int[cluster.numRegions]; 1240 System.arraycopy(cluster.regionIndexToPrimaryIndex, 0, primariesOfRegions, 0, 1241 cluster.regions.length); 1242 1243 Arrays.sort(primariesOfRegions); 1244 1245 // compute numReplicas from the sorted array 1246 return costPerGroup(primariesOfRegions); 1247 } 1248 1249 @Override cost()1250 double cost() { 1251 if (maxCost <= 0) { 1252 return 0; 1253 } 1254 1255 long totalCost = 0; 1256 for (int i = 0 ; i < costsPerGroup.length; i++) { 1257 totalCost += costsPerGroup[i]; 1258 } 1259 return scale(0, maxCost, totalCost); 1260 } 1261 1262 /** 1263 * For each primary region, it computes the total number of replicas in the array (numReplicas) 1264 * and returns a sum of numReplicas-1 squared. For example, if the server hosts 1265 * regions a, b, c, d, e, f where a and b are same replicas, and c,d,e are same replicas, it 1266 * returns (2-1) * (2-1) + (3-1) * (3-1) + (1-1) * (1-1). 1267 * @param primariesOfRegions a sorted array of primary regions ids for the regions hosted 1268 * @return a sum of numReplicas-1 squared for each primary region in the group. 1269 */ costPerGroup(int[] primariesOfRegions)1270 protected long costPerGroup(int[] primariesOfRegions) { 1271 long cost = 0; 1272 int currentPrimary = -1; 1273 int currentPrimaryIndex = -1; 1274 // primariesOfRegions is a sorted array of primary ids of regions. Replicas of regions 1275 // sharing the same primary will have consecutive numbers in the array. 1276 for (int j = 0 ; j <= primariesOfRegions.length; j++) { 1277 int primary = j < primariesOfRegions.length ? primariesOfRegions[j] : -1; 1278 if (primary != currentPrimary) { // we see a new primary 1279 int numReplicas = j - currentPrimaryIndex; 1280 // square the cost 1281 if (numReplicas > 1) { // means consecutive primaries, indicating co-location 1282 cost += (numReplicas - 1) * (numReplicas - 1); 1283 } 1284 currentPrimary = primary; 1285 currentPrimaryIndex = j; 1286 } 1287 } 1288 1289 return cost; 1290 } 1291 1292 @Override regionMoved(int region, int oldServer, int newServer)1293 protected void regionMoved(int region, int oldServer, int newServer) { 1294 if (maxCost <= 0) { 1295 return; // no need to compute 1296 } 1297 if (cluster.multiServersPerHost) { 1298 int oldHost = cluster.serverIndexToHostIndex[oldServer]; 1299 int newHost = cluster.serverIndexToHostIndex[newServer]; 1300 if (newHost != oldHost) { 1301 costsPerGroup[oldHost] = costPerGroup(cluster.primariesOfRegionsPerHost[oldHost]); 1302 costsPerGroup[newHost] = costPerGroup(cluster.primariesOfRegionsPerHost[newHost]); 1303 } 1304 } else { 1305 costsPerGroup[oldServer] = costPerGroup(cluster.primariesOfRegionsPerServer[oldServer]); 1306 costsPerGroup[newServer] = costPerGroup(cluster.primariesOfRegionsPerServer[newServer]); 1307 } 1308 } 1309 } 1310 1311 /** 1312 * A cost function for region replicas for the rack distribution. We give a relatively high 1313 * cost to hosting replicas of the same region in the same rack. We do not prevent the case 1314 * though. 1315 */ 1316 static class RegionReplicaRackCostFunction extends RegionReplicaHostCostFunction { 1317 private static final String REGION_REPLICA_RACK_COST_KEY = 1318 "hbase.master.balancer.stochastic.regionReplicaRackCostKey"; 1319 private static final float DEFAULT_REGION_REPLICA_RACK_COST_KEY = 10000; 1320 1321 public RegionReplicaRackCostFunction(Configuration conf) { 1322 super(conf); 1323 this.setMultiplier(conf.getFloat(REGION_REPLICA_RACK_COST_KEY, DEFAULT_REGION_REPLICA_RACK_COST_KEY)); 1324 } 1325 1326 @Override 1327 void init(Cluster cluster) { 1328 this.cluster = cluster; 1329 if (cluster.numRacks <= 1) { 1330 maxCost = 0; 1331 return; // disabled for 1 rack 1332 } 1333 // max cost is the case where every region replica is hosted together regardless of rack 1334 maxCost = getMaxCost(cluster); 1335 costsPerGroup = new long[cluster.numRacks]; 1336 for (int i = 0 ; i < cluster.primariesOfRegionsPerRack.length; i++) { 1337 costsPerGroup[i] = costPerGroup(cluster.primariesOfRegionsPerRack[i]); 1338 } 1339 } 1340 1341 @Override 1342 protected void regionMoved(int region, int oldServer, int newServer) { 1343 if (maxCost <= 0) { 1344 return; // no need to compute 1345 } 1346 int oldRack = cluster.serverIndexToRackIndex[oldServer]; 1347 int newRack = cluster.serverIndexToRackIndex[newServer]; 1348 if (newRack != oldRack) { 1349 costsPerGroup[oldRack] = costPerGroup(cluster.primariesOfRegionsPerRack[oldRack]); 1350 costsPerGroup[newRack] = costPerGroup(cluster.primariesOfRegionsPerRack[newRack]); 1351 } 1352 } 1353 } 1354 1355 /** 1356 * Compute the cost of total memstore size. The more unbalanced the higher the 1357 * computed cost will be. This uses a rolling average of regionload. 1358 */ 1359 static class MemstoreSizeCostFunction extends CostFromRegionLoadFunction { 1360 1361 private static final String MEMSTORE_SIZE_COST_KEY = 1362 "hbase.master.balancer.stochastic.memstoreSizeCost"; 1363 private static final float DEFAULT_MEMSTORE_SIZE_COST = 5; 1364 1365 MemstoreSizeCostFunction(Configuration conf) { 1366 super(conf); 1367 this.setMultiplier(conf.getFloat(MEMSTORE_SIZE_COST_KEY, DEFAULT_MEMSTORE_SIZE_COST)); 1368 } 1369 1370 @Override 1371 protected double getCostFromRl(RegionLoad rl) { 1372 return rl.getMemStoreSizeMB(); 1373 } 1374 } 1375 /** 1376 * Compute the cost of total open storefiles size. The more unbalanced the higher the 1377 * computed cost will be. This uses a rolling average of regionload. 1378 */ 1379 static class StoreFileCostFunction extends CostFromRegionLoadFunction { 1380 1381 private static final String STOREFILE_SIZE_COST_KEY = 1382 "hbase.master.balancer.stochastic.storefileSizeCost"; 1383 private static final float DEFAULT_STOREFILE_SIZE_COST = 5; 1384 1385 StoreFileCostFunction(Configuration conf) { 1386 super(conf); 1387 this.setMultiplier(conf.getFloat(STOREFILE_SIZE_COST_KEY, DEFAULT_STOREFILE_SIZE_COST)); 1388 } 1389 1390 @Override 1391 protected double getCostFromRl(RegionLoad rl) { 1392 return rl.getStorefileSizeMB(); 1393 } 1394 } 1395 } 1396