1 /** 2 * 3 * Licensed to the Apache Software Foundation (ASF) under one 4 * or more contributor license agreements. See the NOTICE file 5 * distributed with this work for additional information 6 * regarding copyright ownership. The ASF licenses this file 7 * to you under the Apache License, Version 2.0 (the 8 * "License"); you may not use this file except in compliance 9 * with the License. You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software 14 * distributed under the License is distributed on an "AS IS" BASIS, 15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 16 * See the License for the specific language governing permissions and 17 * limitations under the License. 18 */ 19 package org.apache.hadoop.hbase.master; 20 21 import java.io.IOException; 22 import java.io.InterruptedIOException; 23 import java.util.ArrayList; 24 import java.util.Arrays; 25 import java.util.Collection; 26 import java.util.Collections; 27 import java.util.HashMap; 28 import java.util.HashSet; 29 import java.util.Iterator; 30 import java.util.List; 31 import java.util.Map; 32 import java.util.NavigableMap; 33 import java.util.Random; 34 import java.util.Set; 35 import java.util.TreeMap; 36 import java.util.concurrent.Callable; 37 import java.util.concurrent.ConcurrentHashMap; 38 import java.util.concurrent.CopyOnWriteArrayList; 39 import java.util.concurrent.ThreadFactory; 40 import java.util.concurrent.TimeUnit; 41 import java.util.concurrent.atomic.AtomicBoolean; 42 import java.util.concurrent.atomic.AtomicInteger; 43 import java.util.concurrent.locks.Lock; 44 import java.util.concurrent.locks.ReentrantLock; 45 46 import org.apache.commons.logging.Log; 47 import org.apache.commons.logging.LogFactory; 48 import org.apache.hadoop.conf.Configuration; 49 import org.apache.hadoop.fs.FileSystem; 50 import org.apache.hadoop.fs.Path; 51 import org.apache.hadoop.hbase.CoordinatedStateException; 52 import org.apache.hadoop.hbase.HBaseIOException; 53 import org.apache.hadoop.hbase.HConstants; 54 import org.apache.hadoop.hbase.HRegionInfo; 55 import org.apache.hadoop.hbase.HRegionLocation; 56 import org.apache.hadoop.hbase.HTableDescriptor; 57 import org.apache.hadoop.hbase.MetaTableAccessor; 58 import org.apache.hadoop.hbase.NotServingRegionException; 59 import org.apache.hadoop.hbase.RegionLocations; 60 import org.apache.hadoop.hbase.RegionStateListener; 61 import org.apache.hadoop.hbase.RegionTransition; 62 import org.apache.hadoop.hbase.ServerName; 63 import org.apache.hadoop.hbase.TableName; 64 import org.apache.hadoop.hbase.TableNotFoundException; 65 import org.apache.hadoop.hbase.TableStateManager; 66 import org.apache.hadoop.hbase.classification.InterfaceAudience; 67 import org.apache.hadoop.hbase.client.RegionReplicaUtil; 68 import org.apache.hadoop.hbase.client.Result; 69 import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; 70 import org.apache.hadoop.hbase.coordination.OpenRegionCoordination; 71 import org.apache.hadoop.hbase.coordination.RegionMergeCoordination; 72 import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination.SplitTransactionDetails; 73 import org.apache.hadoop.hbase.coordination.ZkOpenRegionCoordination; 74 import org.apache.hadoop.hbase.coordination.ZkRegionMergeCoordination; 75 import org.apache.hadoop.hbase.exceptions.DeserializationException; 76 import org.apache.hadoop.hbase.executor.EventHandler; 77 import org.apache.hadoop.hbase.executor.EventType; 78 import org.apache.hadoop.hbase.executor.ExecutorService; 79 import org.apache.hadoop.hbase.ipc.FailedServerException; 80 import org.apache.hadoop.hbase.ipc.RpcClient; 81 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 82 import org.apache.hadoop.hbase.master.RegionState.State; 83 import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper; 84 import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer; 85 import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler; 86 import org.apache.hadoop.hbase.master.handler.DisableTableHandler; 87 import org.apache.hadoop.hbase.master.handler.EnableTableHandler; 88 import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler; 89 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; 90 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; 91 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; 92 import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException; 93 import org.apache.hadoop.hbase.regionserver.RegionOpeningState; 94 import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException; 95 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 96 import org.apache.hadoop.hbase.util.ConfigUtil; 97 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 98 import org.apache.hadoop.hbase.util.FSUtils; 99 import org.apache.hadoop.hbase.util.KeyLocker; 100 import org.apache.hadoop.hbase.util.Pair; 101 import org.apache.hadoop.hbase.util.PairOfSameType; 102 import org.apache.hadoop.hbase.util.Threads; 103 import org.apache.hadoop.hbase.util.Triple; 104 import org.apache.hadoop.hbase.wal.DefaultWALProvider; 105 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; 106 import org.apache.hadoop.hbase.zookeeper.ZKAssign; 107 import org.apache.hadoop.hbase.zookeeper.ZKUtil; 108 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; 109 import org.apache.hadoop.ipc.RemoteException; 110 import org.apache.hadoop.util.StringUtils; 111 import org.apache.zookeeper.AsyncCallback; 112 import org.apache.zookeeper.KeeperException; 113 import org.apache.zookeeper.KeeperException.NoNodeException; 114 import org.apache.zookeeper.KeeperException.NodeExistsException; 115 import org.apache.zookeeper.data.Stat; 116 117 import com.google.common.annotations.VisibleForTesting; 118 import com.google.common.collect.LinkedHashMultimap; 119 120 /** 121 * Manages and performs region assignment. 122 * <p> 123 * Monitors ZooKeeper for events related to regions in transition. 124 * <p> 125 * Handles existing regions in transition during master failover. 126 */ 127 @InterfaceAudience.Private 128 public class AssignmentManager extends ZooKeeperListener { 129 private static final Log LOG = LogFactory.getLog(AssignmentManager.class); 130 131 public static final ServerName HBCK_CODE_SERVERNAME = ServerName.valueOf(HConstants.HBCK_CODE_NAME, 132 -1, -1L); 133 134 static final String ALREADY_IN_TRANSITION_WAITTIME 135 = "hbase.assignment.already.intransition.waittime"; 136 static final int DEFAULT_ALREADY_IN_TRANSITION_WAITTIME = 60000; // 1 minute 137 138 protected final MasterServices server; 139 140 private ServerManager serverManager; 141 142 private boolean shouldAssignRegionsWithFavoredNodes; 143 144 private LoadBalancer balancer; 145 146 private final MetricsAssignmentManager metricsAssignmentManager; 147 148 private final TableLockManager tableLockManager; 149 150 private AtomicInteger numRegionsOpened = new AtomicInteger(0); 151 152 final private KeyLocker<String> locker = new KeyLocker<String>(); 153 154 Set<HRegionInfo> replicasToClose = Collections.synchronizedSet(new HashSet<HRegionInfo>()); 155 156 /** 157 * Map of regions to reopen after the schema of a table is changed. Key - 158 * encoded region name, value - HRegionInfo 159 */ 160 private final Map <String, HRegionInfo> regionsToReopen; 161 162 /* 163 * Maximum times we recurse an assignment/unassignment. 164 * See below in {@link #assign()} and {@link #unassign()}. 165 */ 166 private final int maximumAttempts; 167 168 /** 169 * Map of two merging regions from the region to be created. 170 */ 171 private final Map<String, PairOfSameType<HRegionInfo>> mergingRegions 172 = new HashMap<String, PairOfSameType<HRegionInfo>>(); 173 174 private final Map<HRegionInfo, PairOfSameType<HRegionInfo>> splitRegions 175 = new HashMap<HRegionInfo, PairOfSameType<HRegionInfo>>(); 176 177 /** 178 * The sleep time for which the assignment will wait before retrying in case of hbase:meta assignment 179 * failure due to lack of availability of region plan or bad region plan 180 */ 181 private final long sleepTimeBeforeRetryingMetaAssignment; 182 183 /** Plans for region movement. Key is the encoded version of a region name*/ 184 // TODO: When do plans get cleaned out? Ever? In server open and in server 185 // shutdown processing -- St.Ack 186 // All access to this Map must be synchronized. 187 final NavigableMap<String, RegionPlan> regionPlans = 188 new TreeMap<String, RegionPlan>(); 189 190 private final TableStateManager tableStateManager; 191 192 private final ExecutorService executorService; 193 194 // For unit tests, keep track of calls to ClosedRegionHandler 195 private Map<HRegionInfo, AtomicBoolean> closedRegionHandlerCalled = null; 196 197 // For unit tests, keep track of calls to OpenedRegionHandler 198 private Map<HRegionInfo, AtomicBoolean> openedRegionHandlerCalled = null; 199 200 //Thread pool executor service for timeout monitor 201 private java.util.concurrent.ExecutorService threadPoolExecutorService; 202 203 // A bunch of ZK events workers. Each is a single thread executor service 204 private final java.util.concurrent.ExecutorService zkEventWorkers; 205 206 private List<EventType> ignoreStatesRSOffline = Arrays.asList( 207 EventType.RS_ZK_REGION_FAILED_OPEN, EventType.RS_ZK_REGION_CLOSED); 208 209 private final RegionStates regionStates; 210 211 // The threshold to use bulk assigning. Using bulk assignment 212 // only if assigning at least this many regions to at least this 213 // many servers. If assigning fewer regions to fewer servers, 214 // bulk assigning may be not as efficient. 215 private final int bulkAssignThresholdRegions; 216 private final int bulkAssignThresholdServers; 217 private final int bulkPerRegionOpenTimeGuesstimate; 218 219 // Should bulk assignment wait till all regions are assigned, 220 // or it is timed out? This is useful to measure bulk assignment 221 // performance, but not needed in most use cases. 222 private final boolean bulkAssignWaitTillAllAssigned; 223 224 /** 225 * Indicator that AssignmentManager has recovered the region states so 226 * that ServerShutdownHandler can be fully enabled and re-assign regions 227 * of dead servers. So that when re-assignment happens, AssignmentManager 228 * has proper region states. 229 * 230 * Protected to ease testing. 231 */ 232 protected final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false); 233 234 /** 235 * A map to track the count a region fails to open in a row. 236 * So that we don't try to open a region forever if the failure is 237 * unrecoverable. We don't put this information in region states 238 * because we don't expect this to happen frequently; we don't 239 * want to copy this information over during each state transition either. 240 */ 241 private final ConcurrentHashMap<String, AtomicInteger> 242 failedOpenTracker = new ConcurrentHashMap<String, AtomicInteger>(); 243 244 // A flag to indicate if we are using ZK for region assignment 245 private final boolean useZKForAssignment; 246 247 // In case not using ZK for region assignment, region states 248 // are persisted in meta with a state store 249 private final RegionStateStore regionStateStore; 250 251 /** 252 * For testing only! Set to true to skip handling of split. 253 */ 254 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL") 255 public static boolean TEST_SKIP_SPLIT_HANDLING = false; 256 257 /** Listeners that are called on assignment events. */ 258 private List<AssignmentListener> listeners = new CopyOnWriteArrayList<AssignmentListener>(); 259 260 private RegionStateListener regionStateListener; 261 262 public enum ServerHostRegion { 263 NOT_HOSTING_REGION, HOSTING_REGION, UNKNOWN, 264 } 265 266 /** 267 * Constructs a new assignment manager. 268 * 269 * @param server instance of HMaster this AM running inside 270 * @param serverManager serverManager for associated HMaster 271 * @param balancer implementation of {@link LoadBalancer} 272 * @param service Executor service 273 * @param metricsMaster metrics manager 274 * @param tableLockManager TableLock manager 275 * @throws KeeperException 276 * @throws IOException 277 */ AssignmentManager(MasterServices server, ServerManager serverManager, final LoadBalancer balancer, final ExecutorService service, MetricsMaster metricsMaster, final TableLockManager tableLockManager)278 public AssignmentManager(MasterServices server, ServerManager serverManager, 279 final LoadBalancer balancer, 280 final ExecutorService service, MetricsMaster metricsMaster, 281 final TableLockManager tableLockManager) throws KeeperException, 282 IOException, CoordinatedStateException { 283 super(server.getZooKeeper()); 284 this.server = server; 285 this.serverManager = serverManager; 286 this.executorService = service; 287 this.regionStateStore = new RegionStateStore(server); 288 this.regionsToReopen = Collections.synchronizedMap 289 (new HashMap<String, HRegionInfo> ()); 290 Configuration conf = server.getConfiguration(); 291 // Only read favored nodes if using the favored nodes load balancer. 292 this.shouldAssignRegionsWithFavoredNodes = conf.getClass( 293 HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class).equals( 294 FavoredNodeLoadBalancer.class); 295 try { 296 if (server.getCoordinatedStateManager() != null) { 297 this.tableStateManager = server.getCoordinatedStateManager().getTableStateManager(); 298 } else { 299 this.tableStateManager = null; 300 } 301 } catch (InterruptedException e) { 302 throw new InterruptedIOException(); 303 } 304 // This is the max attempts, not retries, so it should be at least 1. 305 this.maximumAttempts = Math.max(1, 306 this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10)); 307 this.sleepTimeBeforeRetryingMetaAssignment = this.server.getConfiguration().getLong( 308 "hbase.meta.assignment.retry.sleeptime", 1000l); 309 this.balancer = balancer; 310 int maxThreads = conf.getInt("hbase.assignment.threads.max", 30); 311 this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool( 312 maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM.")); 313 this.regionStates = new RegionStates( 314 server, tableStateManager, serverManager, regionStateStore); 315 316 this.bulkAssignWaitTillAllAssigned = 317 conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false); 318 this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7); 319 this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3); 320 this.bulkPerRegionOpenTimeGuesstimate = 321 conf.getInt("hbase.bulk.assignment.perregion.open.time", 10000); 322 323 int workers = conf.getInt("hbase.assignment.zkevent.workers", 20); 324 ThreadFactory threadFactory = Threads.newDaemonThreadFactory("AM.ZK.Worker"); 325 zkEventWorkers = Threads.getBoundedCachedThreadPool(workers, 60L, 326 TimeUnit.SECONDS, threadFactory); 327 this.tableLockManager = tableLockManager; 328 329 this.metricsAssignmentManager = new MetricsAssignmentManager(); 330 useZKForAssignment = ConfigUtil.useZKForAssignment(conf); 331 } 332 333 /** 334 * Add the listener to the notification list. 335 * @param listener The AssignmentListener to register 336 */ registerListener(final AssignmentListener listener)337 public void registerListener(final AssignmentListener listener) { 338 this.listeners.add(listener); 339 } 340 341 /** 342 * Remove the listener from the notification list. 343 * @param listener The AssignmentListener to unregister 344 */ unregisterListener(final AssignmentListener listener)345 public boolean unregisterListener(final AssignmentListener listener) { 346 return this.listeners.remove(listener); 347 } 348 349 /** 350 * @return Instance of ZKTableStateManager. 351 */ getTableStateManager()352 public TableStateManager getTableStateManager() { 353 // These are 'expensive' to make involving trip to zk ensemble so allow 354 // sharing. 355 return this.tableStateManager; 356 } 357 358 /** 359 * This SHOULD not be public. It is public now 360 * because of some unit tests. 361 * 362 * TODO: make it package private and keep RegionStates in the master package 363 */ getRegionStates()364 public RegionStates getRegionStates() { 365 return regionStates; 366 } 367 368 /** 369 * Used in some tests to mock up region state in meta 370 */ 371 @VisibleForTesting getRegionStateStore()372 RegionStateStore getRegionStateStore() { 373 return regionStateStore; 374 } 375 getRegionReopenPlan(HRegionInfo hri)376 public RegionPlan getRegionReopenPlan(HRegionInfo hri) { 377 return new RegionPlan(hri, null, regionStates.getRegionServerOfRegion(hri)); 378 } 379 380 /** 381 * Add a regionPlan for the specified region. 382 * @param encodedName 383 * @param plan 384 */ addPlan(String encodedName, RegionPlan plan)385 public void addPlan(String encodedName, RegionPlan plan) { 386 synchronized (regionPlans) { 387 regionPlans.put(encodedName, plan); 388 } 389 } 390 391 /** 392 * Add a map of region plans. 393 */ addPlans(Map<String, RegionPlan> plans)394 public void addPlans(Map<String, RegionPlan> plans) { 395 synchronized (regionPlans) { 396 regionPlans.putAll(plans); 397 } 398 } 399 400 /** 401 * Set the list of regions that will be reopened 402 * because of an update in table schema 403 * 404 * @param regions 405 * list of regions that should be tracked for reopen 406 */ setRegionsToReopen(List <HRegionInfo> regions)407 public void setRegionsToReopen(List <HRegionInfo> regions) { 408 for(HRegionInfo hri : regions) { 409 regionsToReopen.put(hri.getEncodedName(), hri); 410 } 411 } 412 413 /** 414 * Used by the client to identify if all regions have the schema updates 415 * 416 * @param tableName 417 * @return Pair indicating the status of the alter command 418 * @throws IOException 419 */ getReopenStatus(TableName tableName)420 public Pair<Integer, Integer> getReopenStatus(TableName tableName) 421 throws IOException { 422 List<HRegionInfo> hris; 423 if (TableName.META_TABLE_NAME.equals(tableName)) { 424 hris = new MetaTableLocator().getMetaRegions(server.getZooKeeper()); 425 } else { 426 hris = MetaTableAccessor.getTableRegions(server.getZooKeeper(), 427 server.getConnection(), tableName, true); 428 } 429 430 Integer pending = 0; 431 for (HRegionInfo hri : hris) { 432 String name = hri.getEncodedName(); 433 // no lock concurrent access ok: sequential consistency respected. 434 if (regionsToReopen.containsKey(name) 435 || regionStates.isRegionInTransition(name)) { 436 pending++; 437 } 438 } 439 return new Pair<Integer, Integer>(pending, hris.size()); 440 } 441 442 /** 443 * Used by ServerShutdownHandler to make sure AssignmentManager has completed 444 * the failover cleanup before re-assigning regions of dead servers. So that 445 * when re-assignment happens, AssignmentManager has proper region states. 446 */ isFailoverCleanupDone()447 public boolean isFailoverCleanupDone() { 448 return failoverCleanupDone.get(); 449 } 450 451 /** 452 * To avoid racing with AM, external entities may need to lock a region, 453 * for example, when SSH checks what regions to skip re-assigning. 454 */ acquireRegionLock(final String encodedName)455 public Lock acquireRegionLock(final String encodedName) { 456 return locker.acquireLock(encodedName); 457 } 458 459 /** 460 * Now, failover cleanup is completed. Notify server manager to 461 * process queued up dead servers processing, if any. 462 */ failoverCleanupDone()463 void failoverCleanupDone() { 464 failoverCleanupDone.set(true); 465 serverManager.processQueuedDeadServers(); 466 } 467 468 /** 469 * Called on startup. 470 * Figures whether a fresh cluster start of we are joining extant running cluster. 471 * @throws IOException 472 * @throws KeeperException 473 * @throws InterruptedException 474 * @throws CoordinatedStateException 475 */ joinCluster()476 void joinCluster() throws IOException, 477 KeeperException, InterruptedException, CoordinatedStateException { 478 long startTime = System.currentTimeMillis(); 479 // Concurrency note: In the below the accesses on regionsInTransition are 480 // outside of a synchronization block where usually all accesses to RIT are 481 // synchronized. The presumption is that in this case it is safe since this 482 // method is being played by a single thread on startup. 483 484 // TODO: Regions that have a null location and are not in regionsInTransitions 485 // need to be handled. 486 487 // Scan hbase:meta to build list of existing regions, servers, and assignment 488 // Returns servers who have not checked in (assumed dead) that some regions 489 // were assigned to (according to the meta) 490 Set<ServerName> deadServers = rebuildUserRegions(); 491 492 // This method will assign all user regions if a clean server startup or 493 // it will reconstruct master state and cleanup any leftovers from previous master process. 494 boolean failover = processDeadServersAndRegionsInTransition(deadServers); 495 496 if (!useZKForAssignment) { 497 // Not use ZK for assignment any more, remove the ZNode 498 ZKUtil.deleteNodeRecursively(watcher, watcher.assignmentZNode); 499 } 500 recoverTableInDisablingState(); 501 recoverTableInEnablingState(); 502 LOG.info("Joined the cluster in " + (System.currentTimeMillis() 503 - startTime) + "ms, failover=" + failover); 504 } 505 506 /** 507 * Process all regions that are in transition in zookeeper and also 508 * processes the list of dead servers. 509 * Used by master joining an cluster. If we figure this is a clean cluster 510 * startup, will assign all user regions. 511 * @param deadServers Set of servers that are offline probably legitimately that were carrying 512 * regions according to a scan of hbase:meta. Can be null. 513 * @throws KeeperException 514 * @throws IOException 515 * @throws InterruptedException 516 */ processDeadServersAndRegionsInTransition(final Set<ServerName> deadServers)517 boolean processDeadServersAndRegionsInTransition(final Set<ServerName> deadServers) 518 throws KeeperException, IOException, InterruptedException, CoordinatedStateException { 519 List<String> nodes = ZKUtil.listChildrenNoWatch(watcher, watcher.assignmentZNode); 520 521 if (useZKForAssignment && nodes == null) { 522 String errorMessage = "Failed to get the children from ZK"; 523 server.abort(errorMessage, new IOException(errorMessage)); 524 return true; // Doesn't matter in this case 525 } 526 527 boolean failover = !serverManager.getDeadServers().isEmpty(); 528 if (failover) { 529 // This may not be a failover actually, especially if meta is on this master. 530 if (LOG.isDebugEnabled()) { 531 LOG.debug("Found dead servers out on cluster " + serverManager.getDeadServers()); 532 } 533 } else { 534 // If any one region except meta is assigned, it's a failover. 535 Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet(); 536 for (Map.Entry<HRegionInfo, ServerName> en: 537 regionStates.getRegionAssignments().entrySet()) { 538 HRegionInfo hri = en.getKey(); 539 if (!hri.isMetaTable() 540 && onlineServers.contains(en.getValue())) { 541 LOG.debug("Found " + hri + " out on cluster"); 542 failover = true; 543 break; 544 } 545 } 546 if (!failover && nodes != null) { 547 // If any one region except meta is in transition, it's a failover. 548 for (String encodedName: nodes) { 549 RegionState regionState = regionStates.getRegionState(encodedName); 550 if (regionState != null && !regionState.getRegion().isMetaRegion()) { 551 LOG.debug("Found " + regionState + " in RITs"); 552 failover = true; 553 break; 554 } 555 } 556 } 557 } 558 if (!failover && !useZKForAssignment) { 559 // If any region except meta is in transition on a live server, it's a failover. 560 Map<String, RegionState> regionsInTransition = regionStates.getRegionsInTransition(); 561 if (!regionsInTransition.isEmpty()) { 562 Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet(); 563 for (RegionState regionState: regionsInTransition.values()) { 564 ServerName serverName = regionState.getServerName(); 565 if (!regionState.getRegion().isMetaRegion() 566 && serverName != null && onlineServers.contains(serverName)) { 567 LOG.debug("Found " + regionState + " in RITs"); 568 failover = true; 569 break; 570 } 571 } 572 } 573 } 574 if (!failover) { 575 // If we get here, we have a full cluster restart. It is a failover only 576 // if there are some WALs are not split yet. For meta WALs, they should have 577 // been split already, if any. We can walk through those queued dead servers, 578 // if they don't have any WALs, this restart should be considered as a clean one 579 Set<ServerName> queuedDeadServers = serverManager.getRequeuedDeadServers().keySet(); 580 if (!queuedDeadServers.isEmpty()) { 581 Configuration conf = server.getConfiguration(); 582 Path rootdir = FSUtils.getRootDir(conf); 583 FileSystem fs = rootdir.getFileSystem(conf); 584 for (ServerName serverName: queuedDeadServers) { 585 // In the case of a clean exit, the shutdown handler would have presplit any WALs and 586 // removed empty directories. 587 Path logDir = new Path(rootdir, 588 DefaultWALProvider.getWALDirectoryName(serverName.toString())); 589 Path splitDir = logDir.suffix(DefaultWALProvider.SPLITTING_EXT); 590 if (fs.exists(logDir) || fs.exists(splitDir)) { 591 LOG.debug("Found queued dead server " + serverName); 592 failover = true; 593 break; 594 } 595 } 596 if (!failover) { 597 // We figured that it's not a failover, so no need to 598 // work on these re-queued dead servers any more. 599 LOG.info("AM figured that it's not a failover and cleaned up " 600 + queuedDeadServers.size() + " queued dead servers"); 601 serverManager.removeRequeuedDeadServers(); 602 } 603 } 604 } 605 606 Set<TableName> disabledOrDisablingOrEnabling = null; 607 Map<HRegionInfo, ServerName> allRegions = null; 608 609 if (!failover) { 610 disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates( 611 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING, 612 ZooKeeperProtos.Table.State.ENABLING); 613 614 // Clean re/start, mark all user regions closed before reassignment 615 allRegions = regionStates.closeAllUserRegions( 616 disabledOrDisablingOrEnabling); 617 } 618 619 // Now region states are restored 620 regionStateStore.start(); 621 622 // If we found user regions out on cluster, its a failover. 623 if (failover) { 624 LOG.info("Found regions out on cluster or in RIT; presuming failover"); 625 // Process list of dead servers and regions in RIT. 626 // See HBASE-4580 for more information. 627 processDeadServersAndRecoverLostRegions(deadServers); 628 } 629 630 if (!failover && useZKForAssignment) { 631 // Cleanup any existing ZK nodes and start watching 632 ZKAssign.deleteAllNodes(watcher); 633 ZKUtil.listChildrenAndWatchForNewChildren(this.watcher, 634 this.watcher.assignmentZNode); 635 } 636 637 // Now we can safely claim failover cleanup completed and enable 638 // ServerShutdownHandler for further processing. The nodes (below) 639 // in transition, if any, are for regions not related to those 640 // dead servers at all, and can be done in parallel to SSH. 641 failoverCleanupDone(); 642 if (!failover) { 643 // Fresh cluster startup. 644 LOG.info("Clean cluster startup. Assigning user regions"); 645 assignAllUserRegions(allRegions); 646 } 647 // unassign replicas of the split parents and the merged regions 648 // the daughter replicas are opened in assignAllUserRegions if it was 649 // not already opened. 650 for (HRegionInfo h : replicasToClose) { 651 unassign(h); 652 } 653 replicasToClose.clear(); 654 return failover; 655 } 656 657 /** 658 * If region is up in zk in transition, then do fixup and block and wait until 659 * the region is assigned and out of transition. Used on startup for 660 * catalog regions. 661 * @param hri Region to look for. 662 * @return True if we processed a region in transition else false if region 663 * was not up in zk in transition. 664 * @throws InterruptedException 665 * @throws KeeperException 666 * @throws IOException 667 */ processRegionInTransitionAndBlockUntilAssigned(final HRegionInfo hri)668 boolean processRegionInTransitionAndBlockUntilAssigned(final HRegionInfo hri) 669 throws InterruptedException, KeeperException, IOException { 670 String encodedRegionName = hri.getEncodedName(); 671 if (!processRegionInTransition(encodedRegionName, hri)) { 672 return false; // The region is not in transition 673 } 674 LOG.debug("Waiting on " + HRegionInfo.prettyPrint(encodedRegionName)); 675 while (!this.server.isStopped() && 676 this.regionStates.isRegionInTransition(encodedRegionName)) { 677 RegionState state = this.regionStates.getRegionTransitionState(encodedRegionName); 678 if (state == null || !serverManager.isServerOnline(state.getServerName())) { 679 // The region is not in transition, or not in transition on an online 680 // server. Doesn't help to block here any more. Caller need to 681 // verify the region is actually assigned. 682 break; 683 } 684 this.regionStates.waitForUpdate(100); 685 } 686 return true; 687 } 688 689 /** 690 * Process failover of new master for region <code>encodedRegionName</code> 691 * up in zookeeper. 692 * @param encodedRegionName Region to process failover for. 693 * @param regionInfo If null we'll go get it from meta table. 694 * @return True if we processed <code>regionInfo</code> as a RIT. 695 * @throws KeeperException 696 * @throws IOException 697 */ processRegionInTransition(final String encodedRegionName, final HRegionInfo regionInfo)698 boolean processRegionInTransition(final String encodedRegionName, 699 final HRegionInfo regionInfo) throws KeeperException, IOException { 700 // We need a lock here to ensure that we will not put the same region twice 701 // It has no reason to be a lock shared with the other operations. 702 // We can do the lock on the region only, instead of a global lock: what we want to ensure 703 // is that we don't have two threads working on the same region. 704 Lock lock = locker.acquireLock(encodedRegionName); 705 try { 706 Stat stat = new Stat(); 707 byte [] data = ZKAssign.getDataAndWatch(watcher, encodedRegionName, stat); 708 if (data == null) return false; 709 RegionTransition rt; 710 try { 711 rt = RegionTransition.parseFrom(data); 712 } catch (DeserializationException e) { 713 LOG.warn("Failed parse znode data", e); 714 return false; 715 } 716 HRegionInfo hri = regionInfo; 717 if (hri == null) { 718 // The region info is not passed in. We will try to find the region 719 // from region states map/meta based on the encoded region name. But we 720 // may not be able to find it. This is valid for online merge that 721 // the region may have not been created if the merge is not completed. 722 // Therefore, it is not in meta at master recovery time. 723 hri = regionStates.getRegionInfo(rt.getRegionName()); 724 EventType et = rt.getEventType(); 725 if (hri == null && et != EventType.RS_ZK_REGION_MERGING 726 && et != EventType.RS_ZK_REQUEST_REGION_MERGE) { 727 LOG.warn("Couldn't find the region in recovering " + rt); 728 return false; 729 } 730 } 731 732 // TODO: This code is tied to ZK anyway, so for now leaving it as is, 733 // will refactor when whole region assignment will be abstracted from ZK 734 BaseCoordinatedStateManager cp = 735 (BaseCoordinatedStateManager) this.server.getCoordinatedStateManager(); 736 OpenRegionCoordination openRegionCoordination = cp.getOpenRegionCoordination(); 737 738 ZkOpenRegionCoordination.ZkOpenRegionDetails zkOrd = 739 new ZkOpenRegionCoordination.ZkOpenRegionDetails(); 740 zkOrd.setVersion(stat.getVersion()); 741 zkOrd.setServerName(cp.getServer().getServerName()); 742 743 return processRegionsInTransition( 744 rt, hri, openRegionCoordination, zkOrd); 745 } finally { 746 lock.unlock(); 747 } 748 } 749 750 /** 751 * This call is invoked only (1) master assign meta; 752 * (2) during failover mode startup, zk assignment node processing. 753 * The locker is set in the caller. It returns true if the region 754 * is in transition for sure, false otherwise. 755 * 756 * It should be private but it is used by some test too. 757 */ processRegionsInTransition( final RegionTransition rt, final HRegionInfo regionInfo, OpenRegionCoordination coordination, final OpenRegionCoordination.OpenRegionDetails ord)758 boolean processRegionsInTransition( 759 final RegionTransition rt, final HRegionInfo regionInfo, 760 OpenRegionCoordination coordination, 761 final OpenRegionCoordination.OpenRegionDetails ord) throws KeeperException { 762 EventType et = rt.getEventType(); 763 // Get ServerName. Could not be null. 764 final ServerName sn = rt.getServerName(); 765 final byte[] regionName = rt.getRegionName(); 766 final String encodedName = HRegionInfo.encodeRegionName(regionName); 767 final String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName); 768 LOG.info("Processing " + prettyPrintedRegionName + " in state: " + et); 769 770 if (regionStates.isRegionInTransition(encodedName) 771 && (regionInfo.isMetaRegion() || !useZKForAssignment)) { 772 LOG.info("Processed region " + prettyPrintedRegionName + " in state: " 773 + et + ", does nothing since the region is already in transition " 774 + regionStates.getRegionTransitionState(encodedName)); 775 // Just return 776 return true; 777 } 778 if (!serverManager.isServerOnline(sn)) { 779 // It was transitioning on a dead server, so it's closed now. 780 // Force to OFFLINE and put it in transition, but not assign it 781 // since log splitting for the dead server is not done yet. 782 LOG.debug("RIT " + encodedName + " in state=" + rt.getEventType() + 783 " was on deadserver; forcing offline"); 784 if (regionStates.isRegionOnline(regionInfo)) { 785 // Meta could still show the region is assigned to the previous 786 // server. If that server is online, when we reload the meta, the 787 // region is put back to online, we need to offline it. 788 regionStates.regionOffline(regionInfo); 789 sendRegionClosedNotification(regionInfo); 790 } 791 // Put it back in transition so that SSH can re-assign it 792 regionStates.updateRegionState(regionInfo, State.OFFLINE, sn); 793 794 if (regionInfo.isMetaRegion()) { 795 // If it's meta region, reset the meta location. 796 // So that master knows the right meta region server. 797 MetaTableLocator.setMetaLocation(watcher, sn, State.OPEN); 798 } else { 799 // No matter the previous server is online or offline, 800 // we need to reset the last region server of the region. 801 regionStates.setLastRegionServerOfRegion(sn, encodedName); 802 // Make sure we know the server is dead. 803 if (!serverManager.isServerDead(sn)) { 804 serverManager.expireServer(sn); 805 } 806 } 807 return false; 808 } 809 switch (et) { 810 case M_ZK_REGION_CLOSING: 811 // Insert into RIT & resend the query to the region server: may be the previous master 812 // died before sending the query the first time. 813 final RegionState rsClosing = regionStates.updateRegionState(rt, State.CLOSING); 814 this.executorService.submit( 815 new EventHandler(server, EventType.M_MASTER_RECOVERY) { 816 @Override 817 public void process() throws IOException { 818 ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName()); 819 try { 820 final int expectedVersion = ((ZkOpenRegionCoordination.ZkOpenRegionDetails) ord) 821 .getVersion(); 822 unassign(regionInfo, rsClosing, expectedVersion, null, useZKForAssignment, null); 823 if (regionStates.isRegionOffline(regionInfo)) { 824 assign(regionInfo, true); 825 } 826 } finally { 827 lock.unlock(); 828 } 829 } 830 }); 831 break; 832 833 case RS_ZK_REGION_CLOSED: 834 case RS_ZK_REGION_FAILED_OPEN: 835 // Region is closed, insert into RIT and handle it 836 regionStates.setLastRegionServerOfRegion(sn, encodedName); 837 regionStates.updateRegionState(regionInfo, State.CLOSED, sn); 838 if (!replicasToClose.contains(regionInfo)) { 839 invokeAssign(regionInfo); 840 } else { 841 offlineDisabledRegion(regionInfo); 842 } 843 break; 844 845 case M_ZK_REGION_OFFLINE: 846 // Insert in RIT and resend to the regionserver 847 regionStates.updateRegionState(rt, State.PENDING_OPEN); 848 final RegionState rsOffline = regionStates.getRegionState(regionInfo); 849 this.executorService.submit( 850 new EventHandler(server, EventType.M_MASTER_RECOVERY) { 851 @Override 852 public void process() throws IOException { 853 ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName()); 854 try { 855 RegionPlan plan = new RegionPlan(regionInfo, null, sn); 856 addPlan(encodedName, plan); 857 assign(rsOffline, false, false); 858 } finally { 859 lock.unlock(); 860 } 861 } 862 }); 863 break; 864 865 case RS_ZK_REGION_OPENING: 866 regionStates.updateRegionState(rt, State.OPENING); 867 break; 868 869 case RS_ZK_REGION_OPENED: 870 // Region is opened, insert into RIT and handle it 871 // This could be done asynchronously, we would need then to acquire the lock in the 872 // handler. 873 regionStates.updateRegionState(rt, State.OPEN); 874 new OpenedRegionHandler(server, this, regionInfo, coordination, ord).process(); 875 break; 876 case RS_ZK_REQUEST_REGION_SPLIT: 877 case RS_ZK_REGION_SPLITTING: 878 case RS_ZK_REGION_SPLIT: 879 // Splitting region should be online. We could have skipped it during 880 // user region rebuilding since we may consider the split is completed. 881 // Put it in SPLITTING state to avoid complications. 882 regionStates.regionOnline(regionInfo, sn); 883 regionStates.updateRegionState(rt, State.SPLITTING); 884 if (!handleRegionSplitting( 885 rt, encodedName, prettyPrintedRegionName, sn)) { 886 deleteSplittingNode(encodedName, sn); 887 } 888 break; 889 case RS_ZK_REQUEST_REGION_MERGE: 890 case RS_ZK_REGION_MERGING: 891 case RS_ZK_REGION_MERGED: 892 if (!handleRegionMerging( 893 rt, encodedName, prettyPrintedRegionName, sn)) { 894 deleteMergingNode(encodedName, sn); 895 } 896 break; 897 default: 898 throw new IllegalStateException("Received region in state:" + et + " is not valid."); 899 } 900 LOG.info("Processed region " + prettyPrintedRegionName + " in state " 901 + et + ", on " + (serverManager.isServerOnline(sn) ? "" : "dead ") 902 + "server: " + sn); 903 return true; 904 } 905 906 /** 907 * When a region is closed, it should be removed from the regionsToReopen 908 * @param hri HRegionInfo of the region which was closed 909 */ removeClosedRegion(HRegionInfo hri)910 public void removeClosedRegion(HRegionInfo hri) { 911 if (regionsToReopen.remove(hri.getEncodedName()) != null) { 912 LOG.debug("Removed region from reopening regions because it was closed"); 913 } 914 } 915 916 /** 917 * Handles various states an unassigned node can be in. 918 * <p> 919 * Method is called when a state change is suspected for an unassigned node. 920 * <p> 921 * This deals with skipped transitions (we got a CLOSED but didn't see CLOSING 922 * yet). 923 * @param rt region transition 924 * @param coordination coordination for opening region 925 * @param ord details about opening region 926 */ 927 @edu.umd.cs.findbugs.annotations.SuppressWarnings( 928 value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION", 929 justification="Needs work; says access to ConcurrentHashMaps not ATOMIC!!!") handleRegion(final RegionTransition rt, OpenRegionCoordination coordination, OpenRegionCoordination.OpenRegionDetails ord)930 void handleRegion(final RegionTransition rt, OpenRegionCoordination coordination, 931 OpenRegionCoordination.OpenRegionDetails ord) { 932 if (rt == null) { 933 LOG.warn("Unexpected NULL input for RegionTransition rt"); 934 return; 935 } 936 final ServerName sn = rt.getServerName(); 937 // Check if this is a special HBCK transition 938 if (sn.equals(HBCK_CODE_SERVERNAME)) { 939 handleHBCK(rt); 940 return; 941 } 942 final long createTime = rt.getCreateTime(); 943 final byte[] regionName = rt.getRegionName(); 944 String encodedName = HRegionInfo.encodeRegionName(regionName); 945 String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName); 946 // Verify this is a known server 947 if (!serverManager.isServerOnline(sn) 948 && !ignoreStatesRSOffline.contains(rt.getEventType())) { 949 LOG.warn("Attempted to handle region transition for server but " + 950 "it is not online: " + prettyPrintedRegionName + ", " + rt); 951 return; 952 } 953 954 RegionState regionState = 955 regionStates.getRegionState(encodedName); 956 long startTime = System.currentTimeMillis(); 957 if (LOG.isDebugEnabled()) { 958 boolean lateEvent = createTime < (startTime - 15000); 959 LOG.debug("Handling " + rt.getEventType() + 960 ", server=" + sn + ", region=" + 961 (prettyPrintedRegionName == null ? "null" : prettyPrintedRegionName) + 962 (lateEvent ? ", which is more than 15 seconds late" : "") + 963 ", current_state=" + regionState); 964 } 965 // We don't do anything for this event, 966 // so separate it out, no need to lock/unlock anything 967 if (rt.getEventType() == EventType.M_ZK_REGION_OFFLINE) { 968 return; 969 } 970 971 // We need a lock on the region as we could update it 972 Lock lock = locker.acquireLock(encodedName); 973 try { 974 RegionState latestState = 975 regionStates.getRegionState(encodedName); 976 if ((regionState == null && latestState != null) 977 || (regionState != null && latestState == null) 978 || (regionState != null && latestState != null 979 && latestState.getState() != regionState.getState())) { 980 LOG.warn("Region state changed from " + regionState + " to " 981 + latestState + ", while acquiring lock"); 982 } 983 long waitedTime = System.currentTimeMillis() - startTime; 984 if (waitedTime > 5000) { 985 LOG.warn("Took " + waitedTime + "ms to acquire the lock"); 986 } 987 regionState = latestState; 988 switch (rt.getEventType()) { 989 case RS_ZK_REQUEST_REGION_SPLIT: 990 case RS_ZK_REGION_SPLITTING: 991 case RS_ZK_REGION_SPLIT: 992 if (!handleRegionSplitting( 993 rt, encodedName, prettyPrintedRegionName, sn)) { 994 deleteSplittingNode(encodedName, sn); 995 } 996 break; 997 998 case RS_ZK_REQUEST_REGION_MERGE: 999 case RS_ZK_REGION_MERGING: 1000 case RS_ZK_REGION_MERGED: 1001 // Merged region is a new region, we can't find it in the region states now. 1002 // However, the two merging regions are not new. They should be in state for merging. 1003 if (!handleRegionMerging( 1004 rt, encodedName, prettyPrintedRegionName, sn)) { 1005 deleteMergingNode(encodedName, sn); 1006 } 1007 break; 1008 1009 case M_ZK_REGION_CLOSING: 1010 // Should see CLOSING after we have asked it to CLOSE or additional 1011 // times after already being in state of CLOSING 1012 if (regionState == null 1013 || !regionState.isPendingCloseOrClosingOnServer(sn)) { 1014 LOG.warn("Received CLOSING for " + prettyPrintedRegionName 1015 + " from " + sn + " but the region isn't PENDING_CLOSE/CLOSING here: " 1016 + regionStates.getRegionState(encodedName)); 1017 return; 1018 } 1019 // Transition to CLOSING (or update stamp if already CLOSING) 1020 regionStates.updateRegionState(rt, State.CLOSING); 1021 break; 1022 1023 case RS_ZK_REGION_CLOSED: 1024 // Should see CLOSED after CLOSING but possible after PENDING_CLOSE 1025 if (regionState == null 1026 || !regionState.isPendingCloseOrClosingOnServer(sn)) { 1027 LOG.warn("Received CLOSED for " + prettyPrintedRegionName 1028 + " from " + sn + " but the region isn't PENDING_CLOSE/CLOSING here: " 1029 + regionStates.getRegionState(encodedName)); 1030 return; 1031 } 1032 // Handle CLOSED by assigning elsewhere or stopping if a disable 1033 // If we got here all is good. Need to update RegionState -- else 1034 // what follows will fail because not in expected state. 1035 new ClosedRegionHandler(server, this, regionState.getRegion()).process(); 1036 updateClosedRegionHandlerTracker(regionState.getRegion()); 1037 break; 1038 1039 case RS_ZK_REGION_FAILED_OPEN: 1040 if (regionState == null 1041 || !regionState.isPendingOpenOrOpeningOnServer(sn)) { 1042 LOG.warn("Received FAILED_OPEN for " + prettyPrintedRegionName 1043 + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: " 1044 + regionStates.getRegionState(encodedName)); 1045 return; 1046 } 1047 AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName); 1048 if (failedOpenCount == null) { 1049 failedOpenCount = new AtomicInteger(); 1050 // No need to use putIfAbsent, or extra synchronization since 1051 // this whole handleRegion block is locked on the encoded region 1052 // name, and failedOpenTracker is updated only in this block 1053 // FindBugs: AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION 1054 failedOpenTracker.put(encodedName, failedOpenCount); 1055 } 1056 if (failedOpenCount.incrementAndGet() >= maximumAttempts) { 1057 // FindBugs: AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION 1058 regionStates.updateRegionState(rt, State.FAILED_OPEN); 1059 // remove the tracking info to save memory, also reset 1060 // the count for next open initiative 1061 failedOpenTracker.remove(encodedName); 1062 } else { 1063 // Handle this the same as if it were opened and then closed. 1064 regionState = regionStates.updateRegionState(rt, State.CLOSED); 1065 if (regionState != null) { 1066 // When there are more than one region server a new RS is selected as the 1067 // destination and the same is updated in the regionplan. (HBASE-5546) 1068 try { 1069 getRegionPlan(regionState.getRegion(), sn, true); 1070 new ClosedRegionHandler(server, this, regionState.getRegion()).process(); 1071 } catch (HBaseIOException e) { 1072 LOG.warn("Failed to get region plan", e); 1073 } 1074 } 1075 } 1076 break; 1077 1078 case RS_ZK_REGION_OPENING: 1079 // Should see OPENING after we have asked it to OPEN or additional 1080 // times after already being in state of OPENING 1081 if (regionState == null 1082 || !regionState.isPendingOpenOrOpeningOnServer(sn)) { 1083 LOG.warn("Received OPENING for " + prettyPrintedRegionName 1084 + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: " 1085 + regionStates.getRegionState(encodedName)); 1086 return; 1087 } 1088 // Transition to OPENING (or update stamp if already OPENING) 1089 regionStates.updateRegionState(rt, State.OPENING); 1090 break; 1091 1092 case RS_ZK_REGION_OPENED: 1093 // Should see OPENED after OPENING but possible after PENDING_OPEN. 1094 if (regionState == null 1095 || !regionState.isPendingOpenOrOpeningOnServer(sn)) { 1096 LOG.warn("Received OPENED for " + prettyPrintedRegionName 1097 + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: " 1098 + regionStates.getRegionState(encodedName)); 1099 1100 if (regionState != null) { 1101 // Close it without updating the internal region states, 1102 // so as not to create double assignments in unlucky scenarios 1103 // mentioned in OpenRegionHandler#process 1104 unassign(regionState.getRegion(), null, -1, null, false, sn); 1105 } 1106 return; 1107 } 1108 // Handle OPENED by removing from transition and deleted zk node 1109 regionState = 1110 regionStates.transitionOpenFromPendingOpenOrOpeningOnServer(rt,regionState, sn); 1111 if (regionState != null) { 1112 failedOpenTracker.remove(encodedName); // reset the count, if any 1113 new OpenedRegionHandler( 1114 server, this, regionState.getRegion(), coordination, ord).process(); 1115 updateOpenedRegionHandlerTracker(regionState.getRegion()); 1116 } 1117 break; 1118 1119 default: 1120 throw new IllegalStateException("Received event is not valid."); 1121 } 1122 } finally { 1123 lock.unlock(); 1124 } 1125 } 1126 1127 //For unit tests only wasClosedHandlerCalled(HRegionInfo hri)1128 boolean wasClosedHandlerCalled(HRegionInfo hri) { 1129 AtomicBoolean b = closedRegionHandlerCalled.get(hri); 1130 //compareAndSet to be sure that unit tests don't see stale values. Means, 1131 //we will return true exactly once unless the handler code resets to true 1132 //this value. 1133 return b == null ? false : b.compareAndSet(true, false); 1134 } 1135 1136 //For unit tests only wasOpenedHandlerCalled(HRegionInfo hri)1137 boolean wasOpenedHandlerCalled(HRegionInfo hri) { 1138 AtomicBoolean b = openedRegionHandlerCalled.get(hri); 1139 //compareAndSet to be sure that unit tests don't see stale values. Means, 1140 //we will return true exactly once unless the handler code resets to true 1141 //this value. 1142 return b == null ? false : b.compareAndSet(true, false); 1143 } 1144 1145 //For unit tests only initializeHandlerTrackers()1146 void initializeHandlerTrackers() { 1147 closedRegionHandlerCalled = new HashMap<HRegionInfo, AtomicBoolean>(); 1148 openedRegionHandlerCalled = new HashMap<HRegionInfo, AtomicBoolean>(); 1149 } 1150 updateClosedRegionHandlerTracker(HRegionInfo hri)1151 void updateClosedRegionHandlerTracker(HRegionInfo hri) { 1152 if (closedRegionHandlerCalled != null) { //only for unit tests this is true 1153 closedRegionHandlerCalled.put(hri, new AtomicBoolean(true)); 1154 } 1155 } 1156 updateOpenedRegionHandlerTracker(HRegionInfo hri)1157 void updateOpenedRegionHandlerTracker(HRegionInfo hri) { 1158 if (openedRegionHandlerCalled != null) { //only for unit tests this is true 1159 openedRegionHandlerCalled.put(hri, new AtomicBoolean(true)); 1160 } 1161 } 1162 1163 // TODO: processFavoredNodes might throw an exception, for e.g., if the 1164 // meta could not be contacted/updated. We need to see how seriously to treat 1165 // this problem as. Should we fail the current assignment. We should be able 1166 // to recover from this problem eventually (if the meta couldn't be updated 1167 // things should work normally and eventually get fixed up). processFavoredNodes(List<HRegionInfo> regions)1168 void processFavoredNodes(List<HRegionInfo> regions) throws IOException { 1169 if (!shouldAssignRegionsWithFavoredNodes) return; 1170 // The AM gets the favored nodes info for each region and updates the meta 1171 // table with that info 1172 Map<HRegionInfo, List<ServerName>> regionToFavoredNodes = 1173 new HashMap<HRegionInfo, List<ServerName>>(); 1174 for (HRegionInfo region : regions) { 1175 regionToFavoredNodes.put(region, 1176 ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region)); 1177 } 1178 FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(regionToFavoredNodes, 1179 this.server.getConnection()); 1180 } 1181 1182 /** 1183 * Handle a ZK unassigned node transition triggered by HBCK repair tool. 1184 * <p> 1185 * This is handled in a separate code path because it breaks the normal rules. 1186 * @param rt 1187 */ 1188 @SuppressWarnings("deprecation") handleHBCK(RegionTransition rt)1189 private void handleHBCK(RegionTransition rt) { 1190 String encodedName = HRegionInfo.encodeRegionName(rt.getRegionName()); 1191 LOG.info("Handling HBCK triggered transition=" + rt.getEventType() + 1192 ", server=" + rt.getServerName() + ", region=" + 1193 HRegionInfo.prettyPrint(encodedName)); 1194 RegionState regionState = regionStates.getRegionTransitionState(encodedName); 1195 switch (rt.getEventType()) { 1196 case M_ZK_REGION_OFFLINE: 1197 HRegionInfo regionInfo; 1198 if (regionState != null) { 1199 regionInfo = regionState.getRegion(); 1200 } else { 1201 try { 1202 byte [] name = rt.getRegionName(); 1203 Pair<HRegionInfo, ServerName> p = MetaTableAccessor.getRegion( 1204 this.server.getConnection(), name); 1205 regionInfo = p.getFirst(); 1206 } catch (IOException e) { 1207 LOG.info("Exception reading hbase:meta doing HBCK repair operation", e); 1208 return; 1209 } 1210 } 1211 LOG.info("HBCK repair is triggering assignment of region=" + 1212 regionInfo.getRegionNameAsString()); 1213 // trigger assign, node is already in OFFLINE so don't need to update ZK 1214 assign(regionInfo, false); 1215 break; 1216 1217 default: 1218 LOG.warn("Received unexpected region state from HBCK: " + rt.toString()); 1219 break; 1220 } 1221 1222 } 1223 1224 // ZooKeeper events 1225 1226 /** 1227 * New unassigned node has been created. 1228 * 1229 * <p>This happens when an RS begins the OPENING or CLOSING of a region by 1230 * creating an unassigned node. 1231 * 1232 * <p>When this happens we must: 1233 * <ol> 1234 * <li>Watch the node for further events</li> 1235 * <li>Read and handle the state in the node</li> 1236 * </ol> 1237 */ 1238 @Override nodeCreated(String path)1239 public void nodeCreated(String path) { 1240 handleAssignmentEvent(path); 1241 } 1242 1243 /** 1244 * Existing unassigned node has had data changed. 1245 * 1246 * <p>This happens when an RS transitions from OFFLINE to OPENING, or between 1247 * OPENING/OPENED and CLOSING/CLOSED. 1248 * 1249 * <p>When this happens we must: 1250 * <ol> 1251 * <li>Watch the node for further events</li> 1252 * <li>Read and handle the state in the node</li> 1253 * </ol> 1254 */ 1255 @Override nodeDataChanged(String path)1256 public void nodeDataChanged(String path) { 1257 handleAssignmentEvent(path); 1258 } 1259 1260 1261 // We don't want to have two events on the same region managed simultaneously. 1262 // For this reason, we need to wait if an event on the same region is currently in progress. 1263 // So we track the region names of the events in progress, and we keep a waiting list. 1264 private final Set<String> regionsInProgress = new HashSet<String>(); 1265 // In a LinkedHashMultimap, the put order is kept when we retrieve the collection back. We need 1266 // this as we want the events to be managed in the same order as we received them. 1267 private final LinkedHashMultimap <String, RegionRunnable> 1268 zkEventWorkerWaitingList = LinkedHashMultimap.create(); 1269 1270 /** 1271 * A specific runnable that works only on a region. 1272 */ 1273 private interface RegionRunnable extends Runnable{ 1274 /** 1275 * @return - the name of the region it works on. 1276 */ getRegionName()1277 String getRegionName(); 1278 } 1279 1280 /** 1281 * Submit a task, ensuring that there is only one task at a time that working on a given region. 1282 * Order is respected. 1283 */ zkEventWorkersSubmit(final RegionRunnable regRunnable)1284 protected void zkEventWorkersSubmit(final RegionRunnable regRunnable) { 1285 1286 synchronized (regionsInProgress) { 1287 // If we're there is already a task with this region, we add it to the 1288 // waiting list and return. 1289 if (regionsInProgress.contains(regRunnable.getRegionName())) { 1290 synchronized (zkEventWorkerWaitingList){ 1291 zkEventWorkerWaitingList.put(regRunnable.getRegionName(), regRunnable); 1292 } 1293 return; 1294 } 1295 1296 // No event in progress on this region => we can submit a new task immediately. 1297 regionsInProgress.add(regRunnable.getRegionName()); 1298 zkEventWorkers.submit(new Runnable() { 1299 @Override 1300 public void run() { 1301 try { 1302 regRunnable.run(); 1303 } finally { 1304 // now that we have finished, let's see if there is an event for the same region in the 1305 // waiting list. If it's the case, we can now submit it to the pool. 1306 synchronized (regionsInProgress) { 1307 regionsInProgress.remove(regRunnable.getRegionName()); 1308 synchronized (zkEventWorkerWaitingList) { 1309 java.util.Set<RegionRunnable> waiting = zkEventWorkerWaitingList.get( 1310 regRunnable.getRegionName()); 1311 if (!waiting.isEmpty()) { 1312 // We want the first object only. The only way to get it is through an iterator. 1313 RegionRunnable toSubmit = waiting.iterator().next(); 1314 zkEventWorkerWaitingList.remove(toSubmit.getRegionName(), toSubmit); 1315 zkEventWorkersSubmit(toSubmit); 1316 } 1317 } 1318 } 1319 } 1320 } 1321 }); 1322 } 1323 } 1324 1325 @Override nodeDeleted(final String path)1326 public void nodeDeleted(final String path) { 1327 if (path.startsWith(watcher.assignmentZNode)) { 1328 final String regionName = ZKAssign.getRegionName(watcher, path); 1329 zkEventWorkersSubmit(new RegionRunnable() { 1330 @Override 1331 public String getRegionName() { 1332 return regionName; 1333 } 1334 1335 @Override 1336 public void run() { 1337 Lock lock = locker.acquireLock(regionName); 1338 try { 1339 RegionState rs = regionStates.getRegionTransitionState(regionName); 1340 if (rs == null) { 1341 rs = regionStates.getRegionState(regionName); 1342 if (rs == null || !rs.isMergingNew()) { 1343 // MergingNew is an offline state 1344 return; 1345 } 1346 } 1347 1348 HRegionInfo regionInfo = rs.getRegion(); 1349 String regionNameStr = regionInfo.getRegionNameAsString(); 1350 LOG.debug("Znode " + regionNameStr + " deleted, state: " + rs); 1351 1352 boolean disabled = getTableStateManager().isTableState(regionInfo.getTable(), 1353 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING); 1354 1355 ServerName serverName = rs.getServerName(); 1356 if (serverManager.isServerOnline(serverName)) { 1357 if (rs.isOnServer(serverName) && (rs.isOpened() || rs.isSplitting())) { 1358 synchronized (regionStates) { 1359 regionOnline(regionInfo, serverName); 1360 if (rs.isSplitting() && splitRegions.containsKey(regionInfo)) { 1361 // Check if the daugter regions are still there, if they are present, offline 1362 // as its the case of a rollback. 1363 HRegionInfo hri_a = splitRegions.get(regionInfo).getFirst(); 1364 HRegionInfo hri_b = splitRegions.get(regionInfo).getSecond(); 1365 if (!regionStates.isRegionInTransition(hri_a.getEncodedName())) { 1366 LOG.warn("Split daughter region not in transition " + hri_a); 1367 } 1368 if (!regionStates.isRegionInTransition(hri_b.getEncodedName())) { 1369 LOG.warn("Split daughter region not in transition" + hri_b); 1370 } 1371 regionOffline(hri_a); 1372 regionOffline(hri_b); 1373 splitRegions.remove(regionInfo); 1374 } 1375 if (disabled) { 1376 // if server is offline, no hurt to unassign again 1377 LOG.info("Opened " + regionNameStr 1378 + "but this table is disabled, triggering close of region"); 1379 unassign(regionInfo); 1380 } 1381 } 1382 } else if (rs.isMergingNew()) { 1383 synchronized (regionStates) { 1384 String p = regionInfo.getEncodedName(); 1385 PairOfSameType<HRegionInfo> regions = mergingRegions.get(p); 1386 if (regions != null) { 1387 onlineMergingRegion(disabled, regions.getFirst(), serverName); 1388 onlineMergingRegion(disabled, regions.getSecond(), serverName); 1389 } 1390 } 1391 } 1392 } 1393 } finally { 1394 lock.unlock(); 1395 } 1396 } 1397 1398 private void onlineMergingRegion(boolean disabled, 1399 final HRegionInfo hri, final ServerName serverName) { 1400 RegionState regionState = regionStates.getRegionState(hri); 1401 if (regionState != null && regionState.isMerging() 1402 && regionState.isOnServer(serverName)) { 1403 regionOnline(regionState.getRegion(), serverName); 1404 if (disabled) { 1405 unassign(hri); 1406 } 1407 } 1408 } 1409 }); 1410 } 1411 } 1412 1413 /** 1414 * New unassigned node has been created. 1415 * 1416 * <p>This happens when an RS begins the OPENING, SPLITTING or CLOSING of a 1417 * region by creating a znode. 1418 * 1419 * <p>When this happens we must: 1420 * <ol> 1421 * <li>Watch the node for further children changed events</li> 1422 * <li>Watch all new children for changed events</li> 1423 * </ol> 1424 */ 1425 @Override nodeChildrenChanged(String path)1426 public void nodeChildrenChanged(String path) { 1427 if (path.equals(watcher.assignmentZNode)) { 1428 zkEventWorkers.submit(new Runnable() { 1429 @Override 1430 public void run() { 1431 try { 1432 // Just make sure we see the changes for the new znodes 1433 List<String> children = 1434 ZKUtil.listChildrenAndWatchForNewChildren( 1435 watcher, watcher.assignmentZNode); 1436 if (children != null) { 1437 Stat stat = new Stat(); 1438 for (String child : children) { 1439 // if region is in transition, we already have a watch 1440 // on it, so no need to watch it again. So, as I know for now, 1441 // this is needed to watch splitting nodes only. 1442 if (!regionStates.isRegionInTransition(child)) { 1443 ZKAssign.getDataAndWatch(watcher, child, stat); 1444 } 1445 } 1446 } 1447 } catch (KeeperException e) { 1448 server.abort("Unexpected ZK exception reading unassigned children", e); 1449 } 1450 } 1451 }); 1452 } 1453 } 1454 1455 1456 /** 1457 * Marks the region as online. Removes it from regions in transition and 1458 * updates the in-memory assignment information. 1459 * <p> 1460 * Used when a region has been successfully opened on a region server. 1461 * @param regionInfo 1462 * @param sn 1463 */ regionOnline(HRegionInfo regionInfo, ServerName sn)1464 void regionOnline(HRegionInfo regionInfo, ServerName sn) { 1465 regionOnline(regionInfo, sn, HConstants.NO_SEQNUM); 1466 } 1467 regionOnline(HRegionInfo regionInfo, ServerName sn, long openSeqNum)1468 void regionOnline(HRegionInfo regionInfo, ServerName sn, long openSeqNum) { 1469 numRegionsOpened.incrementAndGet(); 1470 regionStates.regionOnline(regionInfo, sn, openSeqNum); 1471 1472 // Remove plan if one. 1473 clearRegionPlan(regionInfo); 1474 balancer.regionOnline(regionInfo, sn); 1475 1476 // Tell our listeners that a region was opened 1477 sendRegionOpenedNotification(regionInfo, sn); 1478 } 1479 1480 /** 1481 * Pass the assignment event to a worker for processing. 1482 * Each worker is a single thread executor service. The reason 1483 * for just one thread is to make sure all events for a given 1484 * region are processed in order. 1485 * 1486 * @param path 1487 */ handleAssignmentEvent(final String path)1488 private void handleAssignmentEvent(final String path) { 1489 if (path.startsWith(watcher.assignmentZNode)) { 1490 final String regionName = ZKAssign.getRegionName(watcher, path); 1491 1492 zkEventWorkersSubmit(new RegionRunnable() { 1493 @Override 1494 public String getRegionName() { 1495 return regionName; 1496 } 1497 1498 @Override 1499 public void run() { 1500 try { 1501 Stat stat = new Stat(); 1502 byte [] data = ZKAssign.getDataAndWatch(watcher, path, stat); 1503 if (data == null) return; 1504 1505 RegionTransition rt = RegionTransition.parseFrom(data); 1506 1507 // TODO: This code is tied to ZK anyway, so for now leaving it as is, 1508 // will refactor when whole region assignment will be abstracted from ZK 1509 BaseCoordinatedStateManager csm = 1510 (BaseCoordinatedStateManager) server.getCoordinatedStateManager(); 1511 OpenRegionCoordination openRegionCoordination = csm.getOpenRegionCoordination(); 1512 1513 ZkOpenRegionCoordination.ZkOpenRegionDetails zkOrd = 1514 new ZkOpenRegionCoordination.ZkOpenRegionDetails(); 1515 zkOrd.setVersion(stat.getVersion()); 1516 zkOrd.setServerName(csm.getServer().getServerName()); 1517 1518 handleRegion(rt, openRegionCoordination, zkOrd); 1519 } catch (KeeperException e) { 1520 server.abort("Unexpected ZK exception reading unassigned node data", e); 1521 } catch (DeserializationException e) { 1522 server.abort("Unexpected exception deserializing node data", e); 1523 } 1524 } 1525 }); 1526 } 1527 } 1528 1529 /** 1530 * Marks the region as offline. Removes it from regions in transition and 1531 * removes in-memory assignment information. 1532 * <p> 1533 * Used when a region has been closed and should remain closed. 1534 * @param regionInfo 1535 */ regionOffline(final HRegionInfo regionInfo)1536 public void regionOffline(final HRegionInfo regionInfo) { 1537 regionOffline(regionInfo, null); 1538 } 1539 offlineDisabledRegion(HRegionInfo regionInfo)1540 public void offlineDisabledRegion(HRegionInfo regionInfo) { 1541 if (useZKForAssignment) { 1542 // Disabling so should not be reassigned, just delete the CLOSED node 1543 LOG.debug("Table being disabled so deleting ZK node and removing from " + 1544 "regions in transition, skipping assignment of region " + 1545 regionInfo.getRegionNameAsString()); 1546 String encodedName = regionInfo.getEncodedName(); 1547 deleteNodeInStates(encodedName, "closed", null, 1548 EventType.RS_ZK_REGION_CLOSED, EventType.M_ZK_REGION_OFFLINE); 1549 } 1550 replicasToClose.remove(regionInfo); 1551 regionOffline(regionInfo); 1552 } 1553 1554 // Assignment methods 1555 1556 /** 1557 * Assigns the specified region. 1558 * <p> 1559 * If a RegionPlan is available with a valid destination then it will be used 1560 * to determine what server region is assigned to. If no RegionPlan is 1561 * available, region will be assigned to a random available server. 1562 * <p> 1563 * Updates the RegionState and sends the OPEN RPC. 1564 * <p> 1565 * This will only succeed if the region is in transition and in a CLOSED or 1566 * OFFLINE state or not in transition (in-memory not zk), and of course, the 1567 * chosen server is up and running (It may have just crashed!). If the 1568 * in-memory checks pass, the zk node is forced to OFFLINE before assigning. 1569 * 1570 * @param region server to be assigned 1571 * @param setOfflineInZK whether ZK node should be created/transitioned to an 1572 * OFFLINE state before assigning the region 1573 */ assign(HRegionInfo region, boolean setOfflineInZK)1574 public void assign(HRegionInfo region, boolean setOfflineInZK) { 1575 assign(region, setOfflineInZK, false); 1576 } 1577 1578 /** 1579 * Use care with forceNewPlan. It could cause double assignment. 1580 */ assign(HRegionInfo region, boolean setOfflineInZK, boolean forceNewPlan)1581 public void assign(HRegionInfo region, 1582 boolean setOfflineInZK, boolean forceNewPlan) { 1583 if (isDisabledorDisablingRegionInRIT(region)) { 1584 return; 1585 } 1586 String encodedName = region.getEncodedName(); 1587 Lock lock = locker.acquireLock(encodedName); 1588 try { 1589 RegionState state = forceRegionStateToOffline(region, forceNewPlan); 1590 if (state != null) { 1591 if (regionStates.wasRegionOnDeadServer(encodedName)) { 1592 LOG.info("Skip assigning " + region.getRegionNameAsString() 1593 + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName) 1594 + " is dead but not processed yet"); 1595 return; 1596 } 1597 assign(state, setOfflineInZK && useZKForAssignment, forceNewPlan); 1598 } 1599 } finally { 1600 lock.unlock(); 1601 } 1602 } 1603 1604 /** 1605 * Bulk assign regions to <code>destination</code>. 1606 * @param destination 1607 * @param regions Regions to assign. 1608 * @return true if successful 1609 */ assign(final ServerName destination, final List<HRegionInfo> regions)1610 boolean assign(final ServerName destination, final List<HRegionInfo> regions) 1611 throws InterruptedException { 1612 long startTime = EnvironmentEdgeManager.currentTime(); 1613 try { 1614 int regionCount = regions.size(); 1615 if (regionCount == 0) { 1616 return true; 1617 } 1618 LOG.info("Assigning " + regionCount + " region(s) to " + destination.toString()); 1619 Set<String> encodedNames = new HashSet<String>(regionCount); 1620 for (HRegionInfo region : regions) { 1621 encodedNames.add(region.getEncodedName()); 1622 } 1623 1624 List<HRegionInfo> failedToOpenRegions = new ArrayList<HRegionInfo>(); 1625 Map<String, Lock> locks = locker.acquireLocks(encodedNames); 1626 try { 1627 AtomicInteger counter = new AtomicInteger(0); 1628 Map<String, Integer> offlineNodesVersions = new ConcurrentHashMap<String, Integer>(); 1629 OfflineCallback cb = new OfflineCallback( 1630 watcher, destination, counter, offlineNodesVersions); 1631 Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>(regions.size()); 1632 List<RegionState> states = new ArrayList<RegionState>(regions.size()); 1633 for (HRegionInfo region : regions) { 1634 String encodedName = region.getEncodedName(); 1635 if (!isDisabledorDisablingRegionInRIT(region)) { 1636 RegionState state = forceRegionStateToOffline(region, false); 1637 boolean onDeadServer = false; 1638 if (state != null) { 1639 if (regionStates.wasRegionOnDeadServer(encodedName)) { 1640 LOG.info("Skip assigning " + region.getRegionNameAsString() 1641 + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName) 1642 + " is dead but not processed yet"); 1643 onDeadServer = true; 1644 } else if (!useZKForAssignment 1645 || asyncSetOfflineInZooKeeper(state, cb, destination)) { 1646 RegionPlan plan = new RegionPlan(region, state.getServerName(), destination); 1647 plans.put(encodedName, plan); 1648 states.add(state); 1649 continue; 1650 } 1651 } 1652 // Reassign if the region wasn't on a dead server 1653 if (!onDeadServer) { 1654 LOG.info("failed to force region state to offline or " 1655 + "failed to set it offline in ZK, will reassign later: " + region); 1656 failedToOpenRegions.add(region); // assign individually later 1657 } 1658 } 1659 // Release the lock, this region is excluded from bulk assign because 1660 // we can't update its state, or set its znode to offline. 1661 Lock lock = locks.remove(encodedName); 1662 lock.unlock(); 1663 } 1664 1665 if (useZKForAssignment) { 1666 // Wait until all unassigned nodes have been put up and watchers set. 1667 int total = states.size(); 1668 for (int oldCounter = 0; !server.isStopped();) { 1669 int count = counter.get(); 1670 if (oldCounter != count) { 1671 LOG.debug(destination.toString() + " unassigned znodes=" + count + 1672 " of total=" + total + "; oldCounter=" + oldCounter); 1673 oldCounter = count; 1674 } 1675 if (count >= total) break; 1676 Thread.sleep(5); 1677 } 1678 } 1679 1680 if (server.isStopped()) { 1681 return false; 1682 } 1683 1684 // Add region plans, so we can updateTimers when one region is opened so 1685 // that unnecessary timeout on RIT is reduced. 1686 this.addPlans(plans); 1687 1688 List<Triple<HRegionInfo, Integer, List<ServerName>>> regionOpenInfos = 1689 new ArrayList<Triple<HRegionInfo, Integer, List<ServerName>>>(states.size()); 1690 for (RegionState state: states) { 1691 HRegionInfo region = state.getRegion(); 1692 String encodedRegionName = region.getEncodedName(); 1693 Integer nodeVersion = offlineNodesVersions.get(encodedRegionName); 1694 if (useZKForAssignment && (nodeVersion == null || nodeVersion == -1)) { 1695 LOG.warn("failed to offline in zookeeper: " + region); 1696 failedToOpenRegions.add(region); // assign individually later 1697 Lock lock = locks.remove(encodedRegionName); 1698 lock.unlock(); 1699 } else { 1700 regionStates.updateRegionState( 1701 region, State.PENDING_OPEN, destination); 1702 List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST; 1703 if (this.shouldAssignRegionsWithFavoredNodes) { 1704 favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region); 1705 } 1706 regionOpenInfos.add(new Triple<HRegionInfo, Integer, List<ServerName>>( 1707 region, nodeVersion, favoredNodes)); 1708 } 1709 } 1710 1711 // Move on to open regions. 1712 try { 1713 // Send OPEN RPC. If it fails on a IOE or RemoteException, 1714 // regions will be assigned individually. 1715 long maxWaitTime = System.currentTimeMillis() + 1716 this.server.getConfiguration(). 1717 getLong("hbase.regionserver.rpc.startup.waittime", 60000); 1718 for (int i = 1; i <= maximumAttempts && !server.isStopped(); i++) { 1719 try { 1720 // regionOpenInfos is empty if all regions are in failedToOpenRegions list 1721 if (regionOpenInfos.isEmpty()) { 1722 break; 1723 } 1724 List<RegionOpeningState> regionOpeningStateList = serverManager 1725 .sendRegionOpen(destination, regionOpenInfos); 1726 if (regionOpeningStateList == null) { 1727 // Failed getting RPC connection to this server 1728 return false; 1729 } 1730 for (int k = 0, n = regionOpeningStateList.size(); k < n; k++) { 1731 RegionOpeningState openingState = regionOpeningStateList.get(k); 1732 if (openingState != RegionOpeningState.OPENED) { 1733 HRegionInfo region = regionOpenInfos.get(k).getFirst(); 1734 if (openingState == RegionOpeningState.ALREADY_OPENED) { 1735 processAlreadyOpenedRegion(region, destination); 1736 } else if (openingState == RegionOpeningState.FAILED_OPENING) { 1737 // Failed opening this region, reassign it later 1738 failedToOpenRegions.add(region); 1739 } else { 1740 LOG.warn("THIS SHOULD NOT HAPPEN: unknown opening state " 1741 + openingState + " in assigning region " + region); 1742 } 1743 } 1744 } 1745 break; 1746 } catch (IOException e) { 1747 if (e instanceof RemoteException) { 1748 e = ((RemoteException)e).unwrapRemoteException(); 1749 } 1750 if (e instanceof RegionServerStoppedException) { 1751 LOG.warn("The region server was shut down, ", e); 1752 // No need to retry, the region server is a goner. 1753 return false; 1754 } else if (e instanceof ServerNotRunningYetException) { 1755 long now = System.currentTimeMillis(); 1756 if (now < maxWaitTime) { 1757 LOG.debug("Server is not yet up; waiting up to " + 1758 (maxWaitTime - now) + "ms", e); 1759 Thread.sleep(100); 1760 i--; // reset the try count 1761 continue; 1762 } 1763 } else if (e instanceof java.net.SocketTimeoutException 1764 && this.serverManager.isServerOnline(destination)) { 1765 // In case socket is timed out and the region server is still online, 1766 // the openRegion RPC could have been accepted by the server and 1767 // just the response didn't go through. So we will retry to 1768 // open the region on the same server. 1769 if (LOG.isDebugEnabled()) { 1770 LOG.debug("Bulk assigner openRegion() to " + destination 1771 + " has timed out, but the regions might" 1772 + " already be opened on it.", e); 1773 } 1774 // wait and reset the re-try count, server might be just busy. 1775 Thread.sleep(100); 1776 i--; 1777 continue; 1778 } 1779 throw e; 1780 } 1781 } 1782 } catch (IOException e) { 1783 // Can be a socket timeout, EOF, NoRouteToHost, etc 1784 LOG.info("Unable to communicate with " + destination 1785 + " in order to assign regions, ", e); 1786 return false; 1787 } 1788 } finally { 1789 for (Lock lock : locks.values()) { 1790 lock.unlock(); 1791 } 1792 } 1793 1794 if (!failedToOpenRegions.isEmpty()) { 1795 for (HRegionInfo region : failedToOpenRegions) { 1796 if (!regionStates.isRegionOnline(region)) { 1797 invokeAssign(region); 1798 } 1799 } 1800 } 1801 1802 // wait for assignment completion 1803 ArrayList<HRegionInfo> userRegionSet = new ArrayList<HRegionInfo>(regions.size()); 1804 for (HRegionInfo region: regions) { 1805 if (!region.getTable().isSystemTable()) { 1806 userRegionSet.add(region); 1807 } 1808 } 1809 if (!waitForAssignment(userRegionSet, true, userRegionSet.size(), 1810 System.currentTimeMillis())) { 1811 LOG.debug("some user regions are still in transition: " + userRegionSet); 1812 } 1813 LOG.debug("Bulk assigning done for " + destination); 1814 return true; 1815 } finally { 1816 metricsAssignmentManager.updateBulkAssignTime(EnvironmentEdgeManager.currentTime() - startTime); 1817 } 1818 } 1819 1820 /** 1821 * Send CLOSE RPC if the server is online, otherwise, offline the region. 1822 * 1823 * The RPC will be sent only to the region sever found in the region state 1824 * if it is passed in, otherwise, to the src server specified. If region 1825 * state is not specified, we don't update region state at all, instead 1826 * we just send the RPC call. This is useful for some cleanup without 1827 * messing around the region states (see handleRegion, on region opened 1828 * on an unexpected server scenario, for an example) 1829 */ unassign(final HRegionInfo region, final RegionState state, final int versionOfClosingNode, final ServerName dest, final boolean transitionInZK, final ServerName src)1830 private void unassign(final HRegionInfo region, 1831 final RegionState state, final int versionOfClosingNode, 1832 final ServerName dest, final boolean transitionInZK, 1833 final ServerName src) { 1834 ServerName server = src; 1835 if (state != null) { 1836 server = state.getServerName(); 1837 } 1838 long maxWaitTime = -1; 1839 for (int i = 1; i <= this.maximumAttempts; i++) { 1840 if (this.server.isStopped() || this.server.isAborted()) { 1841 LOG.debug("Server stopped/aborted; skipping unassign of " + region); 1842 return; 1843 } 1844 // ClosedRegionhandler can remove the server from this.regions 1845 if (!serverManager.isServerOnline(server)) { 1846 LOG.debug("Offline " + region.getRegionNameAsString() 1847 + ", no need to unassign since it's on a dead server: " + server); 1848 if (transitionInZK) { 1849 // delete the node. if no node exists need not bother. 1850 deleteClosingOrClosedNode(region, server); 1851 } 1852 if (state != null) { 1853 regionOffline(region); 1854 } 1855 return; 1856 } 1857 try { 1858 // Send CLOSE RPC 1859 if (serverManager.sendRegionClose(server, region, 1860 versionOfClosingNode, dest, transitionInZK)) { 1861 LOG.debug("Sent CLOSE to " + server + " for region " + 1862 region.getRegionNameAsString()); 1863 if (useZKForAssignment && !transitionInZK && state != null) { 1864 // Retry to make sure the region is 1865 // closed so as to avoid double assignment. 1866 unassign(region, state, versionOfClosingNode, 1867 dest, transitionInZK, src); 1868 } 1869 return; 1870 } 1871 // This never happens. Currently regionserver close always return true. 1872 // Todo; this can now happen (0.96) if there is an exception in a coprocessor 1873 LOG.warn("Server " + server + " region CLOSE RPC returned false for " + 1874 region.getRegionNameAsString()); 1875 } catch (Throwable t) { 1876 long sleepTime = 0; 1877 Configuration conf = this.server.getConfiguration(); 1878 if (t instanceof RemoteException) { 1879 t = ((RemoteException)t).unwrapRemoteException(); 1880 } 1881 boolean logRetries = true; 1882 if (t instanceof RegionServerAbortedException 1883 || t instanceof RegionServerStoppedException 1884 || t instanceof ServerNotRunningYetException) { 1885 // RS is aborting or stopping, we cannot offline the region since the region may need 1886 // to do WAL recovery. Until we see the RS expiration, we should retry. 1887 sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, 1888 RpcClient.FAILED_SERVER_EXPIRY_DEFAULT); 1889 1890 } else if (t instanceof NotServingRegionException) { 1891 LOG.debug("Offline " + region.getRegionNameAsString() 1892 + ", it's not any more on " + server, t); 1893 if (transitionInZK) { 1894 deleteClosingOrClosedNode(region, server); 1895 } 1896 if (state != null) { 1897 regionOffline(region); 1898 } 1899 return; 1900 } else if ((t instanceof FailedServerException) || (state != null && 1901 t instanceof RegionAlreadyInTransitionException)) { 1902 if(t instanceof FailedServerException) { 1903 sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, 1904 RpcClient.FAILED_SERVER_EXPIRY_DEFAULT); 1905 } else { 1906 // RS is already processing this region, only need to update the timestamp 1907 LOG.debug("update " + state + " the timestamp."); 1908 state.updateTimestampToNow(); 1909 if (maxWaitTime < 0) { 1910 maxWaitTime = 1911 EnvironmentEdgeManager.currentTime() 1912 + conf.getLong(ALREADY_IN_TRANSITION_WAITTIME, 1913 DEFAULT_ALREADY_IN_TRANSITION_WAITTIME); 1914 } 1915 long now = EnvironmentEdgeManager.currentTime(); 1916 if (now < maxWaitTime) { 1917 LOG.debug("Region is already in transition; " 1918 + "waiting up to " + (maxWaitTime - now) + "ms", t); 1919 sleepTime = 100; 1920 i--; // reset the try count 1921 logRetries = false; 1922 } 1923 } 1924 } 1925 1926 try { 1927 if (sleepTime > 0) { 1928 Thread.sleep(sleepTime); 1929 } 1930 } catch (InterruptedException ie) { 1931 LOG.warn("Failed to unassign " 1932 + region.getRegionNameAsString() + " since interrupted", ie); 1933 Thread.currentThread().interrupt(); 1934 if (state != null) { 1935 regionStates.updateRegionState(region, State.FAILED_CLOSE); 1936 } 1937 return; 1938 } 1939 1940 if (logRetries) { 1941 LOG.info("Server " + server + " returned " + t + " for " 1942 + region.getRegionNameAsString() + ", try=" + i 1943 + " of " + this.maximumAttempts, t); 1944 // Presume retry or server will expire. 1945 } 1946 } 1947 } 1948 // Run out of attempts 1949 if (state != null) { 1950 regionStates.updateRegionState(region, State.FAILED_CLOSE); 1951 } 1952 } 1953 1954 /** 1955 * Set region to OFFLINE unless it is opening and forceNewPlan is false. 1956 */ forceRegionStateToOffline( final HRegionInfo region, final boolean forceNewPlan)1957 private RegionState forceRegionStateToOffline( 1958 final HRegionInfo region, final boolean forceNewPlan) { 1959 RegionState state = regionStates.getRegionState(region); 1960 if (state == null) { 1961 LOG.warn("Assigning but not in region states: " + region); 1962 state = regionStates.createRegionState(region); 1963 } 1964 1965 ServerName sn = state.getServerName(); 1966 if (forceNewPlan && LOG.isDebugEnabled()) { 1967 LOG.debug("Force region state offline " + state); 1968 } 1969 1970 switch (state.getState()) { 1971 case OPEN: 1972 case OPENING: 1973 case PENDING_OPEN: 1974 case CLOSING: 1975 case PENDING_CLOSE: 1976 if (!forceNewPlan) { 1977 LOG.debug("Skip assigning " + 1978 region + ", it is already " + state); 1979 return null; 1980 } 1981 case FAILED_CLOSE: 1982 case FAILED_OPEN: 1983 unassign(region, state, -1, null, false, null); 1984 state = regionStates.getRegionState(region); 1985 if (state.isFailedClose()) { 1986 // If we can't close the region, we can't re-assign 1987 // it so as to avoid possible double assignment/data loss. 1988 LOG.info("Skip assigning " + 1989 region + ", we couldn't close it: " + state); 1990 return null; 1991 } 1992 case OFFLINE: 1993 // This region could have been open on this server 1994 // for a while. If the server is dead and not processed 1995 // yet, we can move on only if the meta shows the 1996 // region is not on this server actually, or on a server 1997 // not dead, or dead and processed already. 1998 // In case not using ZK, we don't need this check because 1999 // we have the latest info in memory, and the caller 2000 // will do another round checking any way. 2001 if (useZKForAssignment 2002 && regionStates.isServerDeadAndNotProcessed(sn) 2003 && wasRegionOnDeadServerByMeta(region, sn)) { 2004 if (!regionStates.isRegionInTransition(region)) { 2005 LOG.info("Updating the state to " + State.OFFLINE + " to allow to be reassigned by SSH"); 2006 regionStates.updateRegionState(region, State.OFFLINE); 2007 } 2008 LOG.info("Skip assigning " + region.getRegionNameAsString() 2009 + ", it is on a dead but not processed yet server: " + sn); 2010 return null; 2011 } 2012 case CLOSED: 2013 break; 2014 default: 2015 LOG.error("Trying to assign region " + region 2016 + ", which is " + state); 2017 return null; 2018 } 2019 return state; 2020 } 2021 2022 @SuppressWarnings("deprecation") wasRegionOnDeadServerByMeta( final HRegionInfo region, final ServerName sn)2023 protected boolean wasRegionOnDeadServerByMeta( 2024 final HRegionInfo region, final ServerName sn) { 2025 try { 2026 if (region.isMetaRegion()) { 2027 ServerName server = this.server.getMetaTableLocator(). 2028 getMetaRegionLocation(this.server.getZooKeeper()); 2029 return regionStates.isServerDeadAndNotProcessed(server); 2030 } 2031 while (!server.isStopped()) { 2032 try { 2033 this.server.getMetaTableLocator().waitMetaRegionLocation(server.getZooKeeper()); 2034 Result r = MetaTableAccessor.getRegionResult(server.getConnection(), 2035 region.getRegionName()); 2036 if (r == null || r.isEmpty()) return false; 2037 ServerName server = HRegionInfo.getServerName(r); 2038 return regionStates.isServerDeadAndNotProcessed(server); 2039 } catch (IOException ioe) { 2040 LOG.info("Received exception accessing hbase:meta during force assign " 2041 + region.getRegionNameAsString() + ", retrying", ioe); 2042 } 2043 } 2044 } catch (InterruptedException e) { 2045 Thread.currentThread().interrupt(); 2046 LOG.info("Interrupted accessing hbase:meta", e); 2047 } 2048 // Call is interrupted or server is stopped. 2049 return regionStates.isServerDeadAndNotProcessed(sn); 2050 } 2051 2052 /** 2053 * Caller must hold lock on the passed <code>state</code> object. 2054 * @param state 2055 * @param setOfflineInZK 2056 * @param forceNewPlan 2057 */ assign(RegionState state, boolean setOfflineInZK, final boolean forceNewPlan)2058 private void assign(RegionState state, 2059 boolean setOfflineInZK, final boolean forceNewPlan) { 2060 long startTime = EnvironmentEdgeManager.currentTime(); 2061 try { 2062 Configuration conf = server.getConfiguration(); 2063 RegionState currentState = state; 2064 int versionOfOfflineNode = -1; 2065 RegionPlan plan = null; 2066 long maxWaitTime = -1; 2067 HRegionInfo region = state.getRegion(); 2068 RegionOpeningState regionOpenState; 2069 Throwable previousException = null; 2070 for (int i = 1; i <= maximumAttempts; i++) { 2071 if (server.isStopped() || server.isAborted()) { 2072 LOG.info("Skip assigning " + region.getRegionNameAsString() 2073 + ", the server is stopped/aborted"); 2074 return; 2075 } 2076 2077 if (plan == null) { // Get a server for the region at first 2078 try { 2079 plan = getRegionPlan(region, forceNewPlan); 2080 } catch (HBaseIOException e) { 2081 LOG.warn("Failed to get region plan", e); 2082 } 2083 } 2084 2085 if (plan == null) { 2086 LOG.warn("Unable to determine a plan to assign " + region); 2087 2088 // For meta region, we have to keep retrying until succeeding 2089 if (region.isMetaRegion()) { 2090 if (i == maximumAttempts) { 2091 i = 0; // re-set attempt count to 0 for at least 1 retry 2092 2093 LOG.warn("Unable to determine a plan to assign a hbase:meta region " + region + 2094 " after maximumAttempts (" + this.maximumAttempts + 2095 "). Reset attempts count and continue retrying."); 2096 } 2097 waitForRetryingMetaAssignment(); 2098 continue; 2099 } 2100 2101 regionStates.updateRegionState(region, State.FAILED_OPEN); 2102 return; 2103 } 2104 if (setOfflineInZK && versionOfOfflineNode == -1) { 2105 LOG.info("Setting node as OFFLINED in ZooKeeper for region " + region); 2106 // get the version of the znode after setting it to OFFLINE. 2107 // versionOfOfflineNode will be -1 if the znode was not set to OFFLINE 2108 versionOfOfflineNode = setOfflineInZooKeeper(currentState, plan.getDestination()); 2109 if (versionOfOfflineNode != -1) { 2110 if (isDisabledorDisablingRegionInRIT(region)) { 2111 return; 2112 } 2113 // In case of assignment from EnableTableHandler table state is ENABLING. Any how 2114 // EnableTableHandler will set ENABLED after assigning all the table regions. If we 2115 // try to set to ENABLED directly then client API may think table is enabled. 2116 // When we have a case such as all the regions are added directly into hbase:meta and we call 2117 // assignRegion then we need to make the table ENABLED. Hence in such case the table 2118 // will not be in ENABLING or ENABLED state. 2119 TableName tableName = region.getTable(); 2120 if (!tableStateManager.isTableState(tableName, 2121 ZooKeeperProtos.Table.State.ENABLED, ZooKeeperProtos.Table.State.ENABLING)) { 2122 LOG.debug("Setting table " + tableName + " to ENABLED state."); 2123 setEnabledTable(tableName); 2124 } 2125 } 2126 } 2127 if (setOfflineInZK && versionOfOfflineNode == -1) { 2128 LOG.info("Unable to set offline in ZooKeeper to assign " + region); 2129 // Setting offline in ZK must have been failed due to ZK racing or some 2130 // exception which may make the server to abort. If it is ZK racing, 2131 // we should retry since we already reset the region state, 2132 // existing (re)assignment will fail anyway. 2133 if (!server.isAborted()) { 2134 continue; 2135 } 2136 } 2137 LOG.info("Assigning " + region.getRegionNameAsString() + 2138 " to " + plan.getDestination().toString()); 2139 // Transition RegionState to PENDING_OPEN 2140 currentState = regionStates.updateRegionState(region, 2141 State.PENDING_OPEN, plan.getDestination()); 2142 2143 boolean needNewPlan; 2144 final String assignMsg = "Failed assignment of " + region.getRegionNameAsString() + 2145 " to " + plan.getDestination(); 2146 try { 2147 List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST; 2148 if (this.shouldAssignRegionsWithFavoredNodes) { 2149 favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region); 2150 } 2151 regionOpenState = serverManager.sendRegionOpen( 2152 plan.getDestination(), region, versionOfOfflineNode, favoredNodes); 2153 2154 if (regionOpenState == RegionOpeningState.FAILED_OPENING) { 2155 // Failed opening this region, looping again on a new server. 2156 needNewPlan = true; 2157 LOG.warn(assignMsg + ", regionserver says 'FAILED_OPENING', " + 2158 " trying to assign elsewhere instead; " + 2159 "try=" + i + " of " + this.maximumAttempts); 2160 } else { 2161 // we're done 2162 if (regionOpenState == RegionOpeningState.ALREADY_OPENED) { 2163 processAlreadyOpenedRegion(region, plan.getDestination()); 2164 } 2165 return; 2166 } 2167 2168 } catch (Throwable t) { 2169 if (t instanceof RemoteException) { 2170 t = ((RemoteException) t).unwrapRemoteException(); 2171 } 2172 previousException = t; 2173 2174 // Should we wait a little before retrying? If the server is starting it's yes. 2175 // If the region is already in transition, it's yes as well: we want to be sure that 2176 // the region will get opened but we don't want a double assignment. 2177 boolean hold = (t instanceof RegionAlreadyInTransitionException || 2178 t instanceof ServerNotRunningYetException); 2179 2180 // In case socket is timed out and the region server is still online, 2181 // the openRegion RPC could have been accepted by the server and 2182 // just the response didn't go through. So we will retry to 2183 // open the region on the same server to avoid possible 2184 // double assignment. 2185 boolean retry = !hold && (t instanceof java.net.SocketTimeoutException 2186 && this.serverManager.isServerOnline(plan.getDestination())); 2187 2188 2189 if (hold) { 2190 LOG.warn(assignMsg + ", waiting a little before trying on the same region server " + 2191 "try=" + i + " of " + this.maximumAttempts, t); 2192 2193 if (maxWaitTime < 0) { 2194 if (t instanceof RegionAlreadyInTransitionException) { 2195 maxWaitTime = EnvironmentEdgeManager.currentTime() 2196 + this.server.getConfiguration().getLong(ALREADY_IN_TRANSITION_WAITTIME, 2197 DEFAULT_ALREADY_IN_TRANSITION_WAITTIME); 2198 } else { 2199 maxWaitTime = EnvironmentEdgeManager.currentTime() 2200 + this.server.getConfiguration().getLong( 2201 "hbase.regionserver.rpc.startup.waittime", 60000); 2202 } 2203 } 2204 try { 2205 needNewPlan = false; 2206 long now = EnvironmentEdgeManager.currentTime(); 2207 if (now < maxWaitTime) { 2208 LOG.debug("Server is not yet up or region is already in transition; " 2209 + "waiting up to " + (maxWaitTime - now) + "ms", t); 2210 Thread.sleep(100); 2211 i--; // reset the try count 2212 } else if (!(t instanceof RegionAlreadyInTransitionException)) { 2213 LOG.debug("Server is not up for a while; try a new one", t); 2214 needNewPlan = true; 2215 } 2216 } catch (InterruptedException ie) { 2217 LOG.warn("Failed to assign " 2218 + region.getRegionNameAsString() + " since interrupted", ie); 2219 regionStates.updateRegionState(region, State.FAILED_OPEN); 2220 Thread.currentThread().interrupt(); 2221 return; 2222 } 2223 } else if (retry) { 2224 needNewPlan = false; 2225 i--; // we want to retry as many times as needed as long as the RS is not dead. 2226 LOG.warn(assignMsg + ", trying to assign to the same region server due ", t); 2227 } else { 2228 needNewPlan = true; 2229 LOG.warn(assignMsg + ", trying to assign elsewhere instead;" + 2230 " try=" + i + " of " + this.maximumAttempts, t); 2231 } 2232 } 2233 2234 if (i == this.maximumAttempts) { 2235 // For meta region, we have to keep retrying until succeeding 2236 if (region.isMetaRegion()) { 2237 i = 0; // re-set attempt count to 0 for at least 1 retry 2238 LOG.warn(assignMsg + 2239 ", trying to assign a hbase:meta region reached to maximumAttempts (" + 2240 this.maximumAttempts + "). Reset attempt counts and continue retrying."); 2241 waitForRetryingMetaAssignment(); 2242 } 2243 else { 2244 // Don't reset the region state or get a new plan any more. 2245 // This is the last try. 2246 continue; 2247 } 2248 } 2249 2250 // If region opened on destination of present plan, reassigning to new 2251 // RS may cause double assignments. In case of RegionAlreadyInTransitionException 2252 // reassigning to same RS. 2253 if (needNewPlan) { 2254 // Force a new plan and reassign. Will return null if no servers. 2255 // The new plan could be the same as the existing plan since we don't 2256 // exclude the server of the original plan, which should not be 2257 // excluded since it could be the only server up now. 2258 RegionPlan newPlan = null; 2259 try { 2260 newPlan = getRegionPlan(region, true); 2261 } catch (HBaseIOException e) { 2262 LOG.warn("Failed to get region plan", e); 2263 } 2264 if (newPlan == null) { 2265 regionStates.updateRegionState(region, State.FAILED_OPEN); 2266 LOG.warn("Unable to find a viable location to assign region " + 2267 region.getRegionNameAsString()); 2268 return; 2269 } 2270 2271 if (plan != newPlan && !plan.getDestination().equals(newPlan.getDestination())) { 2272 // Clean out plan we failed execute and one that doesn't look like it'll 2273 // succeed anyways; we need a new plan! 2274 // Transition back to OFFLINE 2275 LOG.info("Region assignment plan changed from " + plan.getDestination() + " to " 2276 + newPlan.getDestination() + " server."); 2277 currentState = regionStates.updateRegionState(region, State.OFFLINE); 2278 versionOfOfflineNode = -1; 2279 if (useZKForAssignment) { 2280 setOfflineInZK = true; 2281 } 2282 plan = newPlan; 2283 } else if(plan.getDestination().equals(newPlan.getDestination()) && 2284 previousException instanceof FailedServerException) { 2285 try { 2286 LOG.info("Trying to re-assign " + region.getRegionNameAsString() + 2287 " to the same failed server."); 2288 Thread.sleep(1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, 2289 RpcClient.FAILED_SERVER_EXPIRY_DEFAULT)); 2290 } catch (InterruptedException ie) { 2291 LOG.warn("Failed to assign " 2292 + region.getRegionNameAsString() + " since interrupted", ie); 2293 regionStates.updateRegionState(region, State.FAILED_OPEN); 2294 Thread.currentThread().interrupt(); 2295 return; 2296 } 2297 } 2298 } 2299 } 2300 // Run out of attempts 2301 regionStates.updateRegionState(region, State.FAILED_OPEN); 2302 } finally { 2303 metricsAssignmentManager.updateAssignmentTime(EnvironmentEdgeManager.currentTime() - startTime); 2304 } 2305 } 2306 processAlreadyOpenedRegion(HRegionInfo region, ServerName sn)2307 private void processAlreadyOpenedRegion(HRegionInfo region, ServerName sn) { 2308 // Remove region from in-memory transition and unassigned node from ZK 2309 // While trying to enable the table the regions of the table were 2310 // already enabled. 2311 LOG.debug("ALREADY_OPENED " + region.getRegionNameAsString() 2312 + " to " + sn); 2313 String encodedName = region.getEncodedName(); 2314 2315 //If use ZkForAssignment, region already Opened event should not be handled, 2316 //leave it to zk event. See HBase-14407. 2317 if(useZKForAssignment){ 2318 String node = ZKAssign.getNodeName(watcher, encodedName); 2319 Stat stat = new Stat(); 2320 try { 2321 byte[] existingBytes = ZKUtil.getDataNoWatch(watcher, node, stat); 2322 if(existingBytes!=null){ 2323 RegionTransition rt= RegionTransition.parseFrom(existingBytes); 2324 EventType et = rt.getEventType(); 2325 if (et.equals(EventType.RS_ZK_REGION_OPENED)) { 2326 LOG.debug("ALREADY_OPENED " + region.getRegionNameAsString() 2327 + " and node in "+et+" state"); 2328 return; 2329 } 2330 } 2331 } catch (KeeperException ke) { 2332 LOG.warn("Unexpected ZK exception getData " + node 2333 + " node for the region " + encodedName, ke); 2334 } catch (DeserializationException e) { 2335 LOG.warn("Get RegionTransition from zk deserialization failed! ", e); 2336 } 2337 2338 deleteNodeInStates(encodedName, "offline", sn, EventType.M_ZK_REGION_OFFLINE); 2339 } 2340 2341 regionStates.regionOnline(region, sn); 2342 } 2343 isDisabledorDisablingRegionInRIT(final HRegionInfo region)2344 private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) { 2345 if (this.tableStateManager.isTableState(region.getTable(), 2346 ZooKeeperProtos.Table.State.DISABLED, 2347 ZooKeeperProtos.Table.State.DISABLING) || replicasToClose.contains(region)) { 2348 LOG.info("Table " + region.getTable() + " is disabled or disabling;" 2349 + " skipping assign of " + region.getRegionNameAsString()); 2350 offlineDisabledRegion(region); 2351 return true; 2352 } 2353 return false; 2354 } 2355 2356 /** 2357 * Set region as OFFLINED up in zookeeper 2358 * 2359 * @param state 2360 * @return the version of the offline node if setting of the OFFLINE node was 2361 * successful, -1 otherwise. 2362 */ setOfflineInZooKeeper(final RegionState state, final ServerName destination)2363 private int setOfflineInZooKeeper(final RegionState state, final ServerName destination) { 2364 if (!state.isClosed() && !state.isOffline()) { 2365 String msg = "Unexpected state : " + state + " .. Cannot transit it to OFFLINE."; 2366 this.server.abort(msg, new IllegalStateException(msg)); 2367 return -1; 2368 } 2369 regionStates.updateRegionState(state.getRegion(), State.OFFLINE); 2370 int versionOfOfflineNode; 2371 try { 2372 // get the version after setting the znode to OFFLINE 2373 versionOfOfflineNode = ZKAssign.createOrForceNodeOffline(watcher, 2374 state.getRegion(), destination); 2375 if (versionOfOfflineNode == -1) { 2376 LOG.warn("Attempted to create/force node into OFFLINE state before " 2377 + "completing assignment but failed to do so for " + state); 2378 return -1; 2379 } 2380 } catch (KeeperException e) { 2381 server.abort("Unexpected ZK exception creating/setting node OFFLINE", e); 2382 return -1; 2383 } 2384 return versionOfOfflineNode; 2385 } 2386 2387 /** 2388 * @param region the region to assign 2389 * @return Plan for passed <code>region</code> (If none currently, it creates one or 2390 * if no servers to assign, it returns null). 2391 */ getRegionPlan(final HRegionInfo region, final boolean forceNewPlan)2392 private RegionPlan getRegionPlan(final HRegionInfo region, 2393 final boolean forceNewPlan) throws HBaseIOException { 2394 return getRegionPlan(region, null, forceNewPlan); 2395 } 2396 2397 /** 2398 * @param region the region to assign 2399 * @param serverToExclude Server to exclude (we know its bad). Pass null if 2400 * all servers are thought to be assignable. 2401 * @param forceNewPlan If true, then if an existing plan exists, a new plan 2402 * will be generated. 2403 * @return Plan for passed <code>region</code> (If none currently, it creates one or 2404 * if no servers to assign, it returns null). 2405 */ getRegionPlan(final HRegionInfo region, final ServerName serverToExclude, final boolean forceNewPlan)2406 private RegionPlan getRegionPlan(final HRegionInfo region, 2407 final ServerName serverToExclude, final boolean forceNewPlan) throws HBaseIOException { 2408 // Pickup existing plan or make a new one 2409 final String encodedName = region.getEncodedName(); 2410 final List<ServerName> destServers = 2411 serverManager.createDestinationServersList(serverToExclude); 2412 2413 if (destServers.isEmpty()){ 2414 LOG.warn("Can't move " + encodedName + 2415 ", there is no destination server available."); 2416 return null; 2417 } 2418 2419 RegionPlan randomPlan = null; 2420 boolean newPlan = false; 2421 RegionPlan existingPlan; 2422 2423 synchronized (this.regionPlans) { 2424 existingPlan = this.regionPlans.get(encodedName); 2425 2426 if (existingPlan != null && existingPlan.getDestination() != null) { 2427 LOG.debug("Found an existing plan for " + region.getRegionNameAsString() 2428 + " destination server is " + existingPlan.getDestination() + 2429 " accepted as a dest server = " + destServers.contains(existingPlan.getDestination())); 2430 } 2431 2432 if (forceNewPlan 2433 || existingPlan == null 2434 || existingPlan.getDestination() == null 2435 || !destServers.contains(existingPlan.getDestination())) { 2436 newPlan = true; 2437 } 2438 } 2439 2440 if (newPlan) { 2441 ServerName destination = balancer.randomAssignment(region, destServers); 2442 if (destination == null) { 2443 LOG.warn("Can't find a destination for " + encodedName); 2444 return null; 2445 } 2446 synchronized (this.regionPlans) { 2447 randomPlan = new RegionPlan(region, null, destination); 2448 if (!region.isMetaTable() && shouldAssignRegionsWithFavoredNodes) { 2449 List<HRegionInfo> regions = new ArrayList<HRegionInfo>(1); 2450 regions.add(region); 2451 try { 2452 processFavoredNodes(regions); 2453 } catch (IOException ie) { 2454 LOG.warn("Ignoring exception in processFavoredNodes " + ie); 2455 } 2456 } 2457 this.regionPlans.put(encodedName, randomPlan); 2458 } 2459 LOG.debug("No previous transition plan found (or ignoring " + "an existing plan) for " 2460 + region.getRegionNameAsString() + "; generated random plan=" + randomPlan + "; " 2461 + destServers.size() + " (online=" + serverManager.getOnlineServers().size() 2462 + ") available servers, forceNewPlan=" + forceNewPlan); 2463 return randomPlan; 2464 } 2465 LOG.debug("Using pre-existing plan for " + 2466 region.getRegionNameAsString() + "; plan=" + existingPlan); 2467 return existingPlan; 2468 } 2469 2470 /** 2471 * Wait for some time before retrying meta table region assignment 2472 */ waitForRetryingMetaAssignment()2473 private void waitForRetryingMetaAssignment() { 2474 try { 2475 Thread.sleep(this.sleepTimeBeforeRetryingMetaAssignment); 2476 } catch (InterruptedException e) { 2477 LOG.error("Got exception while waiting for hbase:meta assignment"); 2478 Thread.currentThread().interrupt(); 2479 } 2480 } 2481 2482 /** 2483 * Unassigns the specified region. 2484 * <p> 2485 * Updates the RegionState and sends the CLOSE RPC unless region is being 2486 * split by regionserver; then the unassign fails (silently) because we 2487 * presume the region being unassigned no longer exists (its been split out 2488 * of existence). TODO: What to do if split fails and is rolled back and 2489 * parent is revivified? 2490 * <p> 2491 * If a RegionPlan is already set, it will remain. 2492 * 2493 * @param region server to be unassigned 2494 */ unassign(HRegionInfo region)2495 public void unassign(HRegionInfo region) { 2496 unassign(region, false); 2497 } 2498 2499 2500 /** 2501 * Unassigns the specified region. 2502 * <p> 2503 * Updates the RegionState and sends the CLOSE RPC unless region is being 2504 * split by regionserver; then the unassign fails (silently) because we 2505 * presume the region being unassigned no longer exists (its been split out 2506 * of existence). TODO: What to do if split fails and is rolled back and 2507 * parent is revivified? 2508 * <p> 2509 * If a RegionPlan is already set, it will remain. 2510 * 2511 * @param region server to be unassigned 2512 * @param force if region should be closed even if already closing 2513 */ unassign(HRegionInfo region, boolean force, ServerName dest)2514 public void unassign(HRegionInfo region, boolean force, ServerName dest) { 2515 // TODO: Method needs refactoring. Ugly buried returns throughout. Beware! 2516 LOG.debug("Starting unassign of " + region.getRegionNameAsString() 2517 + " (offlining), current state: " + regionStates.getRegionState(region)); 2518 2519 String encodedName = region.getEncodedName(); 2520 // Grab the state of this region and synchronize on it 2521 int versionOfClosingNode = -1; 2522 // We need a lock here as we're going to do a put later and we don't want multiple states 2523 // creation 2524 ReentrantLock lock = locker.acquireLock(encodedName); 2525 RegionState state = regionStates.getRegionTransitionState(encodedName); 2526 boolean reassign = true; 2527 try { 2528 if (state == null) { 2529 // Region is not in transition. 2530 // We can unassign it only if it's not SPLIT/MERGED. 2531 state = regionStates.getRegionState(encodedName); 2532 if (state != null && state.isUnassignable()) { 2533 LOG.info("Attempting to unassign " + state + ", ignored"); 2534 // Offline region will be reassigned below 2535 return; 2536 } 2537 // Create the znode in CLOSING state 2538 try { 2539 if (state == null || state.getServerName() == null) { 2540 // We don't know where the region is, offline it. 2541 // No need to send CLOSE RPC 2542 LOG.warn("Attempting to unassign a region not in RegionStates " 2543 + region.getRegionNameAsString() + ", offlined"); 2544 regionOffline(region); 2545 return; 2546 } 2547 if (useZKForAssignment) { 2548 versionOfClosingNode = ZKAssign.createNodeClosing( 2549 watcher, region, state.getServerName()); 2550 if (versionOfClosingNode == -1) { 2551 LOG.info("Attempting to unassign " + 2552 region.getRegionNameAsString() + " but ZK closing node " 2553 + "can't be created."); 2554 reassign = false; // not unassigned at all 2555 return; 2556 } 2557 } 2558 } catch (KeeperException e) { 2559 if (e instanceof NodeExistsException) { 2560 // Handle race between master initiated close and regionserver 2561 // orchestrated splitting. See if existing node is in a 2562 // SPLITTING or SPLIT state. If so, the regionserver started 2563 // an op on node before we could get our CLOSING in. Deal. 2564 NodeExistsException nee = (NodeExistsException)e; 2565 String path = nee.getPath(); 2566 try { 2567 if (isSplitOrSplittingOrMergedOrMerging(path)) { 2568 LOG.debug(path + " is SPLIT or SPLITTING or MERGED or MERGING; " + 2569 "skipping unassign because region no longer exists -- its split or merge"); 2570 reassign = false; // no need to reassign for split/merged region 2571 return; 2572 } 2573 } catch (KeeperException.NoNodeException ke) { 2574 LOG.warn("Failed getData on SPLITTING/SPLIT at " + path + 2575 "; presuming split and that the region to unassign, " + 2576 encodedName + ", no longer exists -- confirm", ke); 2577 return; 2578 } catch (KeeperException ke) { 2579 LOG.error("Unexpected zk state", ke); 2580 } catch (DeserializationException de) { 2581 LOG.error("Failed parse", de); 2582 } 2583 } 2584 // If we get here, don't understand whats going on -- abort. 2585 server.abort("Unexpected ZK exception creating node CLOSING", e); 2586 reassign = false; // heading out already 2587 return; 2588 } 2589 state = regionStates.updateRegionState(region, State.PENDING_CLOSE); 2590 } else if (state.isFailedOpen()) { 2591 // The region is not open yet 2592 regionOffline(region); 2593 return; 2594 } else if (force && state.isPendingCloseOrClosing()) { 2595 LOG.debug("Attempting to unassign " + region.getRegionNameAsString() + 2596 " which is already " + state.getState() + 2597 " but forcing to send a CLOSE RPC again "); 2598 if (state.isFailedClose()) { 2599 state = regionStates.updateRegionState(region, State.PENDING_CLOSE); 2600 } 2601 state.updateTimestampToNow(); 2602 } else { 2603 LOG.debug("Attempting to unassign " + 2604 region.getRegionNameAsString() + " but it is " + 2605 "already in transition (" + state.getState() + ", force=" + force + ")"); 2606 return; 2607 } 2608 2609 unassign(region, state, versionOfClosingNode, dest, useZKForAssignment, null); 2610 } finally { 2611 lock.unlock(); 2612 2613 // Region is expected to be reassigned afterwards 2614 if (!replicasToClose.contains(region) && reassign && regionStates.isRegionOffline(region)) { 2615 assign(region, true); 2616 } 2617 } 2618 } 2619 unassign(HRegionInfo region, boolean force)2620 public void unassign(HRegionInfo region, boolean force){ 2621 unassign(region, force, null); 2622 } 2623 2624 /** 2625 * @param region regioninfo of znode to be deleted. 2626 */ deleteClosingOrClosedNode(HRegionInfo region, ServerName sn)2627 public void deleteClosingOrClosedNode(HRegionInfo region, ServerName sn) { 2628 String encodedName = region.getEncodedName(); 2629 deleteNodeInStates(encodedName, "closing", sn, EventType.M_ZK_REGION_CLOSING, 2630 EventType.RS_ZK_REGION_CLOSED); 2631 } 2632 2633 /** 2634 * @param path 2635 * @return True if znode is in SPLIT or SPLITTING or MERGED or MERGING state. 2636 * @throws KeeperException Can happen if the znode went away in meantime. 2637 * @throws DeserializationException 2638 */ isSplitOrSplittingOrMergedOrMerging(final String path)2639 private boolean isSplitOrSplittingOrMergedOrMerging(final String path) 2640 throws KeeperException, DeserializationException { 2641 boolean result = false; 2642 // This may fail if the SPLIT or SPLITTING or MERGED or MERGING znode gets 2643 // cleaned up before we can get data from it. 2644 byte [] data = ZKAssign.getData(watcher, path); 2645 if (data == null) { 2646 LOG.info("Node " + path + " is gone"); 2647 return false; 2648 } 2649 RegionTransition rt = RegionTransition.parseFrom(data); 2650 switch (rt.getEventType()) { 2651 case RS_ZK_REQUEST_REGION_SPLIT: 2652 case RS_ZK_REGION_SPLIT: 2653 case RS_ZK_REGION_SPLITTING: 2654 case RS_ZK_REQUEST_REGION_MERGE: 2655 case RS_ZK_REGION_MERGED: 2656 case RS_ZK_REGION_MERGING: 2657 result = true; 2658 break; 2659 default: 2660 LOG.info("Node " + path + " is in " + rt.getEventType()); 2661 break; 2662 } 2663 return result; 2664 } 2665 2666 /** 2667 * Used by unit tests. Return the number of regions opened so far in the life 2668 * of the master. Increases by one every time the master opens a region 2669 * @return the counter value of the number of regions opened so far 2670 */ getNumRegionsOpened()2671 public int getNumRegionsOpened() { 2672 return numRegionsOpened.get(); 2673 } 2674 2675 /** 2676 * Waits until the specified region has completed assignment. 2677 * <p> 2678 * If the region is already assigned, returns immediately. Otherwise, method 2679 * blocks until the region is assigned. 2680 * @param regionInfo region to wait on assignment for 2681 * @return true if the region is assigned false otherwise. 2682 * @throws InterruptedException 2683 */ waitForAssignment(HRegionInfo regionInfo)2684 public boolean waitForAssignment(HRegionInfo regionInfo) 2685 throws InterruptedException { 2686 ArrayList<HRegionInfo> regionSet = new ArrayList<HRegionInfo>(1); 2687 regionSet.add(regionInfo); 2688 return waitForAssignment(regionSet, true, Long.MAX_VALUE); 2689 } 2690 2691 /** 2692 * Waits until the specified region has completed assignment, or the deadline is reached. 2693 */ waitForAssignment(final Collection<HRegionInfo> regionSet, final boolean waitTillAllAssigned, final int reassigningRegions, final long minEndTime)2694 protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet, 2695 final boolean waitTillAllAssigned, final int reassigningRegions, 2696 final long minEndTime) throws InterruptedException { 2697 long deadline = minEndTime + bulkPerRegionOpenTimeGuesstimate * (reassigningRegions + 1); 2698 return waitForAssignment(regionSet, waitTillAllAssigned, deadline); 2699 } 2700 2701 /** 2702 * Waits until the specified region has completed assignment, or the deadline is reached. 2703 * @param regionSet set of region to wait on. the set is modified and the assigned regions removed 2704 * @param waitTillAllAssigned true if we should wait all the regions to be assigned 2705 * @param deadline the timestamp after which the wait is aborted 2706 * @return true if all the regions are assigned false otherwise. 2707 * @throws InterruptedException 2708 */ waitForAssignment(final Collection<HRegionInfo> regionSet, final boolean waitTillAllAssigned, final long deadline)2709 protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet, 2710 final boolean waitTillAllAssigned, final long deadline) throws InterruptedException { 2711 // We're not synchronizing on regionsInTransition now because we don't use any iterator. 2712 while (!regionSet.isEmpty() && !server.isStopped() && deadline > System.currentTimeMillis()) { 2713 int failedOpenCount = 0; 2714 Iterator<HRegionInfo> regionInfoIterator = regionSet.iterator(); 2715 while (regionInfoIterator.hasNext()) { 2716 HRegionInfo hri = regionInfoIterator.next(); 2717 if (regionStates.isRegionOnline(hri) || regionStates.isRegionInState(hri, 2718 State.SPLITTING, State.SPLIT, State.MERGING, State.MERGED)) { 2719 regionInfoIterator.remove(); 2720 } else if (regionStates.isRegionInState(hri, State.FAILED_OPEN)) { 2721 failedOpenCount++; 2722 } 2723 } 2724 if (!waitTillAllAssigned) { 2725 // No need to wait, let assignment going on asynchronously 2726 break; 2727 } 2728 if (!regionSet.isEmpty()) { 2729 if (failedOpenCount == regionSet.size()) { 2730 // all the regions we are waiting had an error on open. 2731 break; 2732 } 2733 regionStates.waitForUpdate(100); 2734 } 2735 } 2736 return regionSet.isEmpty(); 2737 } 2738 2739 /** 2740 * Assigns the hbase:meta region or a replica. 2741 * <p> 2742 * Assumes that hbase:meta is currently closed and is not being actively served by 2743 * any RegionServer. 2744 * <p> 2745 * Forcibly unsets the current meta region location in ZooKeeper and assigns 2746 * hbase:meta to a random RegionServer. 2747 * @param hri TODO 2748 * @throws KeeperException 2749 */ assignMeta(HRegionInfo hri)2750 public void assignMeta(HRegionInfo hri) throws KeeperException { 2751 this.server.getMetaTableLocator().deleteMetaLocation(this.watcher, hri.getReplicaId()); 2752 assign(hri, true); 2753 } 2754 2755 /** 2756 * Assigns specified regions retaining assignments, if any. 2757 * <p> 2758 * This is a synchronous call and will return once every region has been 2759 * assigned. If anything fails, an exception is thrown 2760 * @throws InterruptedException 2761 * @throws IOException 2762 */ assign(Map<HRegionInfo, ServerName> regions)2763 public void assign(Map<HRegionInfo, ServerName> regions) 2764 throws IOException, InterruptedException { 2765 if (regions == null || regions.isEmpty()) { 2766 return; 2767 } 2768 List<ServerName> servers = serverManager.createDestinationServersList(); 2769 if (servers == null || servers.isEmpty()) { 2770 throw new IOException("Found no destination server to assign region(s)"); 2771 } 2772 2773 // Reuse existing assignment info 2774 Map<ServerName, List<HRegionInfo>> bulkPlan = 2775 balancer.retainAssignment(regions, servers); 2776 if (bulkPlan == null) { 2777 throw new IOException("Unable to determine a plan to assign region(s)"); 2778 } 2779 2780 assign(regions.size(), servers.size(), 2781 "retainAssignment=true", bulkPlan); 2782 } 2783 2784 /** 2785 * Assigns specified regions round robin, if any. 2786 * <p> 2787 * This is a synchronous call and will return once every region has been 2788 * assigned. If anything fails, an exception is thrown 2789 * @throws InterruptedException 2790 * @throws IOException 2791 */ assign(List<HRegionInfo> regions)2792 public void assign(List<HRegionInfo> regions) 2793 throws IOException, InterruptedException { 2794 if (regions == null || regions.isEmpty()) { 2795 return; 2796 } 2797 2798 List<ServerName> servers = serverManager.createDestinationServersList(); 2799 if (servers == null || servers.isEmpty()) { 2800 throw new IOException("Found no destination server to assign region(s)"); 2801 } 2802 2803 // Generate a round-robin bulk assignment plan 2804 Map<ServerName, List<HRegionInfo>> bulkPlan = balancer.roundRobinAssignment(regions, servers); 2805 if (bulkPlan == null) { 2806 throw new IOException("Unable to determine a plan to assign region(s)"); 2807 } 2808 2809 processFavoredNodes(regions); 2810 assign(regions.size(), servers.size(), "round-robin=true", bulkPlan); 2811 } 2812 assign(int regions, int totalServers, String message, Map<ServerName, List<HRegionInfo>> bulkPlan)2813 private void assign(int regions, int totalServers, 2814 String message, Map<ServerName, List<HRegionInfo>> bulkPlan) 2815 throws InterruptedException, IOException { 2816 2817 int servers = bulkPlan.size(); 2818 if (servers == 1 || (regions < bulkAssignThresholdRegions 2819 && servers < bulkAssignThresholdServers)) { 2820 2821 // Not use bulk assignment. This could be more efficient in small 2822 // cluster, especially mini cluster for testing, so that tests won't time out 2823 if (LOG.isTraceEnabled()) { 2824 LOG.trace("Not using bulk assignment since we are assigning only " + regions + 2825 " region(s) to " + servers + " server(s)"); 2826 } 2827 2828 // invoke assignment (async) 2829 ArrayList<HRegionInfo> userRegionSet = new ArrayList<HRegionInfo>(regions); 2830 for (Map.Entry<ServerName, List<HRegionInfo>> plan: bulkPlan.entrySet()) { 2831 if (!assign(plan.getKey(), plan.getValue())) { 2832 for (HRegionInfo region: plan.getValue()) { 2833 if (!regionStates.isRegionOnline(region)) { 2834 invokeAssign(region); 2835 if (!region.getTable().isSystemTable()) { 2836 userRegionSet.add(region); 2837 } 2838 } 2839 } 2840 } 2841 } 2842 2843 // wait for assignment completion 2844 if (!waitForAssignment(userRegionSet, true, userRegionSet.size(), 2845 System.currentTimeMillis())) { 2846 LOG.debug("some user regions are still in transition: " + userRegionSet); 2847 } 2848 } else { 2849 LOG.info("Bulk assigning " + regions + " region(s) across " 2850 + totalServers + " server(s), " + message); 2851 2852 // Use fixed count thread pool assigning. 2853 BulkAssigner ba = new GeneralBulkAssigner( 2854 this.server, bulkPlan, this, bulkAssignWaitTillAllAssigned); 2855 ba.bulkAssign(); 2856 LOG.info("Bulk assigning done"); 2857 } 2858 } 2859 2860 /** 2861 * Assigns all user regions, if any exist. Used during cluster startup. 2862 * <p> 2863 * This is a synchronous call and will return once every region has been 2864 * assigned. If anything fails, an exception is thrown and the cluster 2865 * should be shutdown. 2866 * @throws InterruptedException 2867 * @throws IOException 2868 */ assignAllUserRegions(Map<HRegionInfo, ServerName> allRegions)2869 private void assignAllUserRegions(Map<HRegionInfo, ServerName> allRegions) 2870 throws IOException, InterruptedException { 2871 if (allRegions == null || allRegions.isEmpty()) return; 2872 2873 // Determine what type of assignment to do on startup 2874 boolean retainAssignment = server.getConfiguration(). 2875 getBoolean("hbase.master.startup.retainassign", true); 2876 2877 Set<HRegionInfo> regionsFromMetaScan = allRegions.keySet(); 2878 if (retainAssignment) { 2879 assign(allRegions); 2880 } else { 2881 List<HRegionInfo> regions = new ArrayList<HRegionInfo>(regionsFromMetaScan); 2882 assign(regions); 2883 } 2884 2885 for (HRegionInfo hri : regionsFromMetaScan) { 2886 TableName tableName = hri.getTable(); 2887 if (!tableStateManager.isTableState(tableName, 2888 ZooKeeperProtos.Table.State.ENABLED)) { 2889 setEnabledTable(tableName); 2890 } 2891 } 2892 // assign all the replicas that were not recorded in the meta 2893 assign(replicaRegionsNotRecordedInMeta(regionsFromMetaScan, server)); 2894 } 2895 2896 /** 2897 * Get a list of replica regions that are: 2898 * not recorded in meta yet. We might not have recorded the locations 2899 * for the replicas since the replicas may not have been online yet, master restarted 2900 * in the middle of assigning, ZK erased, etc. 2901 * @param regionsRecordedInMeta the list of regions we know are recorded in meta 2902 * either as a default, or, as the location of a replica 2903 * @param master 2904 * @return list of replica regions 2905 * @throws IOException 2906 */ replicaRegionsNotRecordedInMeta( Set<HRegionInfo> regionsRecordedInMeta, MasterServices master)2907 public static List<HRegionInfo> replicaRegionsNotRecordedInMeta( 2908 Set<HRegionInfo> regionsRecordedInMeta, MasterServices master)throws IOException { 2909 List<HRegionInfo> regionsNotRecordedInMeta = new ArrayList<HRegionInfo>(); 2910 for (HRegionInfo hri : regionsRecordedInMeta) { 2911 TableName table = hri.getTable(); 2912 HTableDescriptor htd = master.getTableDescriptors().get(table); 2913 // look at the HTD for the replica count. That's the source of truth 2914 int desiredRegionReplication = htd.getRegionReplication(); 2915 for (int i = 0; i < desiredRegionReplication; i++) { 2916 HRegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hri, i); 2917 if (regionsRecordedInMeta.contains(replica)) continue; 2918 regionsNotRecordedInMeta.add(replica); 2919 } 2920 } 2921 return regionsNotRecordedInMeta; 2922 } 2923 2924 /** 2925 * Wait until no regions in transition. 2926 * @param timeout How long to wait. 2927 * @return True if nothing in regions in transition. 2928 * @throws InterruptedException 2929 */ waitUntilNoRegionsInTransition(final long timeout)2930 boolean waitUntilNoRegionsInTransition(final long timeout) 2931 throws InterruptedException { 2932 // Blocks until there are no regions in transition. It is possible that 2933 // there 2934 // are regions in transition immediately after this returns but guarantees 2935 // that if it returns without an exception that there was a period of time 2936 // with no regions in transition from the point-of-view of the in-memory 2937 // state of the Master. 2938 final long endTime = System.currentTimeMillis() + timeout; 2939 2940 while (!this.server.isStopped() && regionStates.isRegionsInTransition() 2941 && endTime > System.currentTimeMillis()) { 2942 regionStates.waitForUpdate(100); 2943 } 2944 2945 return !regionStates.isRegionsInTransition(); 2946 } 2947 2948 /** 2949 * Rebuild the list of user regions and assignment information. 2950 * Updates regionstates with findings as we go through list of regions. 2951 * @return set of servers not online that hosted some regions according to a scan of hbase:meta 2952 * @throws IOException 2953 */ rebuildUserRegions()2954 Set<ServerName> rebuildUserRegions() throws 2955 IOException, KeeperException, CoordinatedStateException { 2956 Set<TableName> disabledOrEnablingTables = tableStateManager.getTablesInStates( 2957 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.ENABLING); 2958 2959 Set<TableName> disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates( 2960 ZooKeeperProtos.Table.State.DISABLED, 2961 ZooKeeperProtos.Table.State.DISABLING, 2962 ZooKeeperProtos.Table.State.ENABLING); 2963 2964 // Region assignment from META 2965 List<Result> results = MetaTableAccessor.fullScanOfMeta(server.getConnection()); 2966 // Get any new but slow to checkin region server that joined the cluster 2967 Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet(); 2968 // Set of offline servers to be returned 2969 Set<ServerName> offlineServers = new HashSet<ServerName>(); 2970 // Iterate regions in META 2971 for (Result result : results) { 2972 if (result == null && LOG.isDebugEnabled()){ 2973 LOG.debug("null result from meta - ignoring but this is strange."); 2974 continue; 2975 } 2976 // keep a track of replicas to close. These were the replicas of the originally 2977 // unmerged regions. The master might have closed them before but it mightn't 2978 // maybe because it crashed. 2979 PairOfSameType<HRegionInfo> p = MetaTableAccessor.getMergeRegions(result); 2980 if (p.getFirst() != null && p.getSecond() != null) { 2981 int numReplicas = server.getTableDescriptors().get(p.getFirst(). 2982 getTable()).getRegionReplication(); 2983 for (HRegionInfo merge : p) { 2984 for (int i = 1; i < numReplicas; i++) { 2985 replicasToClose.add(RegionReplicaUtil.getRegionInfoForReplica(merge, i)); 2986 } 2987 } 2988 } 2989 RegionLocations rl = MetaTableAccessor.getRegionLocations(result); 2990 if (rl == null) continue; 2991 HRegionLocation[] locations = rl.getRegionLocations(); 2992 if (locations == null) continue; 2993 for (HRegionLocation hrl : locations) { 2994 if (hrl == null) continue; 2995 HRegionInfo regionInfo = hrl.getRegionInfo(); 2996 if (regionInfo == null) continue; 2997 int replicaId = regionInfo.getReplicaId(); 2998 State state = RegionStateStore.getRegionState(result, replicaId); 2999 // keep a track of replicas to close. These were the replicas of the split parents 3000 // from the previous life of the master. The master should have closed them before 3001 // but it couldn't maybe because it crashed 3002 if (replicaId == 0 && state.equals(State.SPLIT)) { 3003 for (HRegionLocation h : locations) { 3004 replicasToClose.add(h.getRegionInfo()); 3005 } 3006 } 3007 ServerName lastHost = hrl.getServerName(); 3008 ServerName regionLocation = RegionStateStore.getRegionServer(result, replicaId); 3009 if (tableStateManager.isTableState(regionInfo.getTable(), 3010 ZooKeeperProtos.Table.State.DISABLED)) { 3011 // force region to forget it hosts for disabled/disabling tables. 3012 // see HBASE-13326 3013 lastHost = null; 3014 regionLocation = null; 3015 } 3016 regionStates.createRegionState(regionInfo, state, regionLocation, lastHost); 3017 if (!regionStates.isRegionInState(regionInfo, State.OPEN)) { 3018 // Region is not open (either offline or in transition), skip 3019 continue; 3020 } 3021 TableName tableName = regionInfo.getTable(); 3022 if (!onlineServers.contains(regionLocation)) { 3023 // Region is located on a server that isn't online 3024 offlineServers.add(regionLocation); 3025 if (useZKForAssignment) { 3026 regionStates.regionOffline(regionInfo); 3027 } 3028 } else if (!disabledOrEnablingTables.contains(tableName)) { 3029 // Region is being served and on an active server 3030 // add only if region not in disabled or enabling table 3031 regionStates.regionOnline(regionInfo, regionLocation); 3032 balancer.regionOnline(regionInfo, regionLocation); 3033 } else if (useZKForAssignment) { 3034 regionStates.regionOffline(regionInfo); 3035 } 3036 // need to enable the table if not disabled or disabling or enabling 3037 // this will be used in rolling restarts 3038 if (!disabledOrDisablingOrEnabling.contains(tableName) 3039 && !getTableStateManager().isTableState(tableName, 3040 ZooKeeperProtos.Table.State.ENABLED)) { 3041 setEnabledTable(tableName); 3042 } 3043 } 3044 } 3045 return offlineServers; 3046 } 3047 3048 /** 3049 * Recover the tables that were not fully moved to DISABLED state. These 3050 * tables are in DISABLING state when the master restarted/switched. 3051 * 3052 * @throws KeeperException 3053 * @throws TableNotFoundException 3054 * @throws IOException 3055 */ recoverTableInDisablingState()3056 private void recoverTableInDisablingState() 3057 throws KeeperException, IOException, CoordinatedStateException { 3058 Set<TableName> disablingTables = 3059 tableStateManager.getTablesInStates(ZooKeeperProtos.Table.State.DISABLING); 3060 if (disablingTables.size() != 0) { 3061 for (TableName tableName : disablingTables) { 3062 // Recover by calling DisableTableHandler 3063 LOG.info("The table " + tableName 3064 + " is in DISABLING state. Hence recovering by moving the table" 3065 + " to DISABLED state."); 3066 new DisableTableHandler(this.server, tableName, 3067 this, tableLockManager, true).prepare().process(); 3068 } 3069 } 3070 } 3071 3072 /** 3073 * Recover the tables that are not fully moved to ENABLED state. These tables 3074 * are in ENABLING state when the master restarted/switched 3075 * 3076 * @throws KeeperException 3077 * @throws org.apache.hadoop.hbase.TableNotFoundException 3078 * @throws IOException 3079 */ recoverTableInEnablingState()3080 private void recoverTableInEnablingState() 3081 throws KeeperException, IOException, CoordinatedStateException { 3082 Set<TableName> enablingTables = tableStateManager. 3083 getTablesInStates(ZooKeeperProtos.Table.State.ENABLING); 3084 if (enablingTables.size() != 0) { 3085 for (TableName tableName : enablingTables) { 3086 // Recover by calling EnableTableHandler 3087 LOG.info("The table " + tableName 3088 + " is in ENABLING state. Hence recovering by moving the table" 3089 + " to ENABLED state."); 3090 // enableTable in sync way during master startup, 3091 // no need to invoke coprocessor 3092 EnableTableHandler eth = new EnableTableHandler(this.server, tableName, 3093 this, tableLockManager, true); 3094 try { 3095 eth.prepare(); 3096 } catch (TableNotFoundException e) { 3097 LOG.warn("Table " + tableName + " not found in hbase:meta to recover."); 3098 continue; 3099 } 3100 eth.process(); 3101 } 3102 } 3103 } 3104 3105 /** 3106 * Processes list of dead servers from result of hbase:meta scan and regions in RIT. 3107 * This is used for failover to recover the lost regions that belonged to 3108 * RegionServers which failed while there was no active master or are offline for whatever 3109 * reason and for regions that were in RIT. 3110 * 3111 * @param deadServers 3112 * The list of dead servers which failed while there was no active master. Can be null. 3113 * @throws IOException 3114 * @throws KeeperException 3115 */ processDeadServersAndRecoverLostRegions(Set<ServerName> deadServers)3116 private void processDeadServersAndRecoverLostRegions(Set<ServerName> deadServers) 3117 throws IOException, KeeperException { 3118 if (deadServers != null && !deadServers.isEmpty()) { 3119 for (ServerName serverName: deadServers) { 3120 if (!serverManager.isServerDead(serverName)) { 3121 serverManager.expireServer(serverName); // Let SSH do region re-assign 3122 } 3123 } 3124 } 3125 3126 List<String> nodes = useZKForAssignment ? 3127 ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.assignmentZNode) 3128 : ZKUtil.listChildrenNoWatch(watcher, watcher.assignmentZNode); 3129 if (nodes != null && !nodes.isEmpty()) { 3130 for (String encodedRegionName : nodes) { 3131 processRegionInTransition(encodedRegionName, null); 3132 } 3133 } else if (!useZKForAssignment) { 3134 processRegionInTransitionZkLess(); 3135 } 3136 } 3137 processRegionInTransitionZkLess()3138 void processRegionInTransitionZkLess() { 3139 // We need to send RPC call again for PENDING_OPEN/PENDING_CLOSE regions 3140 // in case the RPC call is not sent out yet before the master was shut down 3141 // since we update the state before we send the RPC call. We can't update 3142 // the state after the RPC call. Otherwise, we don't know what's happened 3143 // to the region if the master dies right after the RPC call is out. 3144 Map<String, RegionState> rits = regionStates.getRegionsInTransition(); 3145 for (RegionState regionState : rits.values()) { 3146 LOG.info("Processing " + regionState); 3147 ServerName serverName = regionState.getServerName(); 3148 // Server could be null in case of FAILED_OPEN when master cannot find a region plan. In that 3149 // case, try assigning it here. 3150 if (serverName != null 3151 && !serverManager.getOnlineServers().containsKey(serverName)) { 3152 LOG.info("Server " + serverName + " isn't online. SSH will handle this"); 3153 continue; 3154 } 3155 HRegionInfo regionInfo = regionState.getRegion(); 3156 State state = regionState.getState(); 3157 3158 switch (state) { 3159 case CLOSED: 3160 invokeAssign(regionInfo); 3161 break; 3162 case PENDING_OPEN: 3163 retrySendRegionOpen(regionState); 3164 break; 3165 case PENDING_CLOSE: 3166 retrySendRegionClose(regionState); 3167 break; 3168 case FAILED_CLOSE: 3169 case FAILED_OPEN: 3170 invokeUnAssign(regionInfo); 3171 break; 3172 default: 3173 // No process for other states 3174 } 3175 } 3176 } 3177 3178 /** 3179 * At master failover, for pending_open region, make sure 3180 * sendRegionOpen RPC call is sent to the target regionserver 3181 */ retrySendRegionOpen(final RegionState regionState)3182 private void retrySendRegionOpen(final RegionState regionState) { 3183 this.executorService.submit( 3184 new EventHandler(server, EventType.M_MASTER_RECOVERY) { 3185 @Override 3186 public void process() throws IOException { 3187 HRegionInfo hri = regionState.getRegion(); 3188 ServerName serverName = regionState.getServerName(); 3189 ReentrantLock lock = locker.acquireLock(hri.getEncodedName()); 3190 try { 3191 for (int i = 1; i <= maximumAttempts; i++) { 3192 if (!serverManager.isServerOnline(serverName) 3193 || server.isStopped() || server.isAborted()) { 3194 return; // No need any more 3195 } 3196 try { 3197 if (!regionState.equals(regionStates.getRegionState(hri))) { 3198 return; // Region is not in the expected state any more 3199 } 3200 List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST; 3201 if (shouldAssignRegionsWithFavoredNodes) { 3202 favoredNodes = ((FavoredNodeLoadBalancer)balancer).getFavoredNodes(hri); 3203 } 3204 RegionOpeningState regionOpenState = serverManager.sendRegionOpen( 3205 serverName, hri, -1, favoredNodes); 3206 3207 if (regionOpenState == RegionOpeningState.FAILED_OPENING) { 3208 // Failed opening this region, this means the target server didn't get 3209 // the original region open RPC, so re-assign it with a new plan 3210 LOG.debug("Got failed_opening in retry sendRegionOpen for " 3211 + regionState + ", re-assign it"); 3212 invokeAssign(hri, true); 3213 } 3214 return; // Done. 3215 } catch (Throwable t) { 3216 if (t instanceof RemoteException) { 3217 t = ((RemoteException) t).unwrapRemoteException(); 3218 } 3219 // In case SocketTimeoutException/FailedServerException, retry 3220 if (t instanceof java.net.SocketTimeoutException 3221 || t instanceof FailedServerException) { 3222 Threads.sleep(100); 3223 continue; 3224 } 3225 // For other exceptions, re-assign it 3226 LOG.debug("Got exception in retry sendRegionOpen for " 3227 + regionState + ", re-assign it", t); 3228 invokeAssign(hri); 3229 return; // Done. 3230 } 3231 } 3232 } finally { 3233 lock.unlock(); 3234 } 3235 } 3236 }); 3237 } 3238 3239 /** 3240 * At master failover, for pending_close region, make sure 3241 * sendRegionClose RPC call is sent to the target regionserver 3242 */ retrySendRegionClose(final RegionState regionState)3243 private void retrySendRegionClose(final RegionState regionState) { 3244 this.executorService.submit( 3245 new EventHandler(server, EventType.M_MASTER_RECOVERY) { 3246 @Override 3247 public void process() throws IOException { 3248 HRegionInfo hri = regionState.getRegion(); 3249 ServerName serverName = regionState.getServerName(); 3250 ReentrantLock lock = locker.acquireLock(hri.getEncodedName()); 3251 try { 3252 for (int i = 1; i <= maximumAttempts; i++) { 3253 if (!serverManager.isServerOnline(serverName) 3254 || server.isStopped() || server.isAborted()) { 3255 return; // No need any more 3256 } 3257 try { 3258 if (!regionState.equals(regionStates.getRegionState(hri))) { 3259 return; // Region is not in the expected state any more 3260 } 3261 if (!serverManager.sendRegionClose(serverName, hri, -1, null, false)) { 3262 // This means the region is still on the target server 3263 LOG.debug("Got false in retry sendRegionClose for " 3264 + regionState + ", re-close it"); 3265 invokeUnAssign(hri); 3266 } 3267 return; // Done. 3268 } catch (Throwable t) { 3269 if (t instanceof RemoteException) { 3270 t = ((RemoteException) t).unwrapRemoteException(); 3271 } 3272 // In case SocketTimeoutException/FailedServerException, retry 3273 if (t instanceof java.net.SocketTimeoutException 3274 || t instanceof FailedServerException) { 3275 Threads.sleep(100); 3276 continue; 3277 } 3278 if (!(t instanceof NotServingRegionException 3279 || t instanceof RegionAlreadyInTransitionException)) { 3280 // NotServingRegionException/RegionAlreadyInTransitionException 3281 // means the target server got the original region close request. 3282 // For other exceptions, re-close it 3283 LOG.debug("Got exception in retry sendRegionClose for " 3284 + regionState + ", re-close it", t); 3285 invokeUnAssign(hri); 3286 } 3287 return; // Done. 3288 } 3289 } 3290 } finally { 3291 lock.unlock(); 3292 } 3293 } 3294 }); 3295 } 3296 3297 /** 3298 * Set Regions in transitions metrics. 3299 * This takes an iterator on the RegionInTransition map (CLSM), and is not synchronized. 3300 * This iterator is not fail fast, which may lead to stale read; but that's better than 3301 * creating a copy of the map for metrics computation, as this method will be invoked 3302 * on a frequent interval. 3303 */ updateRegionsInTransitionMetrics()3304 public void updateRegionsInTransitionMetrics() { 3305 long currentTime = System.currentTimeMillis(); 3306 int totalRITs = 0; 3307 int totalRITsOverThreshold = 0; 3308 long oldestRITTime = 0; 3309 int ritThreshold = this.server.getConfiguration(). 3310 getInt(HConstants.METRICS_RIT_STUCK_WARNING_THRESHOLD, 60000); 3311 for (RegionState state: regionStates.getRegionsInTransition().values()) { 3312 totalRITs++; 3313 long ritTime = currentTime - state.getStamp(); 3314 if (ritTime > ritThreshold) { // more than the threshold 3315 totalRITsOverThreshold++; 3316 } 3317 if (oldestRITTime < ritTime) { 3318 oldestRITTime = ritTime; 3319 } 3320 } 3321 if (this.metricsAssignmentManager != null) { 3322 this.metricsAssignmentManager.updateRITOldestAge(oldestRITTime); 3323 this.metricsAssignmentManager.updateRITCount(totalRITs); 3324 this.metricsAssignmentManager.updateRITCountOverThreshold(totalRITsOverThreshold); 3325 } 3326 } 3327 3328 /** 3329 * @param region Region whose plan we are to clear. 3330 */ clearRegionPlan(final HRegionInfo region)3331 void clearRegionPlan(final HRegionInfo region) { 3332 synchronized (this.regionPlans) { 3333 this.regionPlans.remove(region.getEncodedName()); 3334 } 3335 } 3336 3337 /** 3338 * Wait on region to clear regions-in-transition. 3339 * @param hri Region to wait on. 3340 * @throws IOException 3341 */ waitOnRegionToClearRegionsInTransition(final HRegionInfo hri)3342 public void waitOnRegionToClearRegionsInTransition(final HRegionInfo hri) 3343 throws IOException, InterruptedException { 3344 waitOnRegionToClearRegionsInTransition(hri, -1L); 3345 } 3346 3347 /** 3348 * Wait on region to clear regions-in-transition or time out 3349 * @param hri 3350 * @param timeOut Milliseconds to wait for current region to be out of transition state. 3351 * @return True when a region clears regions-in-transition before timeout otherwise false 3352 * @throws InterruptedException 3353 */ waitOnRegionToClearRegionsInTransition(final HRegionInfo hri, long timeOut)3354 public boolean waitOnRegionToClearRegionsInTransition(final HRegionInfo hri, long timeOut) 3355 throws InterruptedException { 3356 if (!regionStates.isRegionInTransition(hri)) return true; 3357 long end = (timeOut <= 0) ? Long.MAX_VALUE : EnvironmentEdgeManager.currentTime() 3358 + timeOut; 3359 // There is already a timeout monitor on regions in transition so I 3360 // should not have to have one here too? 3361 LOG.info("Waiting for " + hri.getEncodedName() + 3362 " to leave regions-in-transition, timeOut=" + timeOut + " ms."); 3363 while (!this.server.isStopped() && regionStates.isRegionInTransition(hri)) { 3364 regionStates.waitForUpdate(100); 3365 if (EnvironmentEdgeManager.currentTime() > end) { 3366 LOG.info("Timed out on waiting for " + hri.getEncodedName() + " to be assigned."); 3367 return false; 3368 } 3369 } 3370 if (this.server.isStopped()) { 3371 LOG.info("Giving up wait on regions in transition because stoppable.isStopped is set"); 3372 return false; 3373 } 3374 return true; 3375 } 3376 invokeAssign(HRegionInfo regionInfo)3377 void invokeAssign(HRegionInfo regionInfo) { 3378 invokeAssign(regionInfo, true); 3379 } 3380 invokeAssign(HRegionInfo regionInfo, boolean newPlan)3381 void invokeAssign(HRegionInfo regionInfo, boolean newPlan) { 3382 threadPoolExecutorService.submit(new AssignCallable(this, regionInfo, newPlan)); 3383 } 3384 invokeUnAssign(HRegionInfo regionInfo)3385 void invokeUnAssign(HRegionInfo regionInfo) { 3386 threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo)); 3387 } 3388 isCarryingMeta(ServerName serverName)3389 public ServerHostRegion isCarryingMeta(ServerName serverName) { 3390 return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO); 3391 } 3392 isCarryingMetaReplica(ServerName serverName, int replicaId)3393 public ServerHostRegion isCarryingMetaReplica(ServerName serverName, int replicaId) { 3394 return isCarryingRegion(serverName, 3395 RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO, replicaId)); 3396 } 3397 isCarryingMetaReplica(ServerName serverName, HRegionInfo metaHri)3398 public ServerHostRegion isCarryingMetaReplica(ServerName serverName, HRegionInfo metaHri) { 3399 return isCarryingRegion(serverName, metaHri); 3400 } 3401 3402 /** 3403 * Check if the shutdown server carries the specific region. 3404 * We have a bunch of places that store region location 3405 * Those values aren't consistent. There is a delay of notification. 3406 * The location from zookeeper unassigned node has the most recent data; 3407 * but the node could be deleted after the region is opened by AM. 3408 * The AM's info could be old when OpenedRegionHandler 3409 * processing hasn't finished yet when server shutdown occurs. 3410 * @return whether the serverName currently hosts the region 3411 */ isCarryingRegion(ServerName serverName, HRegionInfo hri)3412 private ServerHostRegion isCarryingRegion(ServerName serverName, HRegionInfo hri) { 3413 RegionTransition rt = null; 3414 try { 3415 byte [] data = ZKAssign.getData(watcher, hri.getEncodedName()); 3416 // This call can legitimately come by null 3417 rt = data == null? null: RegionTransition.parseFrom(data); 3418 } catch (KeeperException e) { 3419 server.abort("Exception reading unassigned node for region=" + hri.getEncodedName(), e); 3420 } catch (DeserializationException e) { 3421 server.abort("Exception parsing unassigned node for region=" + hri.getEncodedName(), e); 3422 } 3423 3424 ServerName addressFromZK = rt != null? rt.getServerName(): null; 3425 if (addressFromZK != null) { 3426 // if we get something from ZK, we will use the data 3427 boolean matchZK = addressFromZK.equals(serverName); 3428 LOG.debug("Checking region=" + hri.getRegionNameAsString() + ", zk server=" + addressFromZK + 3429 " current=" + serverName + ", matches=" + matchZK); 3430 return matchZK ? ServerHostRegion.HOSTING_REGION : ServerHostRegion.NOT_HOSTING_REGION; 3431 } 3432 3433 ServerName addressFromAM = regionStates.getRegionServerOfRegion(hri); 3434 if (LOG.isDebugEnabled()) { 3435 LOG.debug("based on AM, current region=" + hri.getRegionNameAsString() + 3436 " is on server=" + (addressFromAM != null ? addressFromAM : "null") + 3437 " server being checked: " + serverName); 3438 } 3439 if (addressFromAM != null) { 3440 return addressFromAM.equals(serverName) ? 3441 ServerHostRegion.HOSTING_REGION : ServerHostRegion.NOT_HOSTING_REGION; 3442 } 3443 3444 if (hri.isMetaRegion() && RegionReplicaUtil.isDefaultReplica(hri)) { 3445 // For the Meta region (default replica), we can do one more check on MetaTableLocator 3446 final ServerName serverNameInZK = 3447 server.getMetaTableLocator().getMetaRegionLocation(this.server.getZooKeeper()); 3448 if (LOG.isDebugEnabled()) { 3449 LOG.debug("Based on MetaTableLocator, the META region is on server=" + 3450 (serverNameInZK == null ? "null" : serverNameInZK) + 3451 " server being checked: " + serverName); 3452 } 3453 if (serverNameInZK != null) { 3454 return serverNameInZK.equals(serverName) ? 3455 ServerHostRegion.HOSTING_REGION : ServerHostRegion.NOT_HOSTING_REGION; 3456 } 3457 } 3458 3459 // Checked everywhere, if reaching here, we are unsure whether the server is carrying region. 3460 return ServerHostRegion.UNKNOWN; 3461 } 3462 3463 /** 3464 * Clean out crashed server removing any assignments. 3465 * @param sn Server that went down. 3466 * @return list of regions in transition on this server 3467 */ cleanOutCrashedServerReferences(final ServerName sn)3468 public List<HRegionInfo> cleanOutCrashedServerReferences(final ServerName sn) { 3469 // Clean out any existing assignment plans for this server 3470 synchronized (this.regionPlans) { 3471 for (Iterator <Map.Entry<String, RegionPlan>> i = this.regionPlans.entrySet().iterator(); 3472 i.hasNext();) { 3473 Map.Entry<String, RegionPlan> e = i.next(); 3474 ServerName otherSn = e.getValue().getDestination(); 3475 // The name will be null if the region is planned for a random assign. 3476 if (otherSn != null && otherSn.equals(sn)) { 3477 // Use iterator's remove else we'll get CME 3478 i.remove(); 3479 } 3480 } 3481 } 3482 List<HRegionInfo> regions = regionStates.serverOffline(watcher, sn); 3483 for (Iterator<HRegionInfo> it = regions.iterator(); it.hasNext(); ) { 3484 HRegionInfo hri = it.next(); 3485 String encodedName = hri.getEncodedName(); 3486 3487 // We need a lock on the region as we could update it 3488 Lock lock = locker.acquireLock(encodedName); 3489 try { 3490 RegionState regionState = regionStates.getRegionTransitionState(encodedName); 3491 if (regionState == null 3492 || (regionState.getServerName() != null && !regionState.isOnServer(sn)) 3493 || !(regionState.isFailedClose() || regionState.isOffline() 3494 || regionState.isPendingOpenOrOpening())) { 3495 LOG.info("Skip " + regionState + " since it is not opening/failed_close" 3496 + " on the dead server any more: " + sn); 3497 it.remove(); 3498 } else { 3499 try { 3500 // Delete the ZNode if exists 3501 ZKAssign.deleteNodeFailSilent(watcher, hri); 3502 } catch (KeeperException ke) { 3503 server.abort("Unexpected ZK exception deleting node " + hri, ke); 3504 } 3505 if (tableStateManager.isTableState(hri.getTable(), 3506 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) { 3507 regionStates.regionOffline(hri); 3508 it.remove(); 3509 continue; 3510 } 3511 // Mark the region offline and assign it again by SSH 3512 regionStates.updateRegionState(hri, State.OFFLINE); 3513 } 3514 } finally { 3515 lock.unlock(); 3516 } 3517 } 3518 return regions; 3519 } 3520 3521 /** 3522 * @param plan Plan to execute. 3523 */ balance(final RegionPlan plan)3524 public void balance(final RegionPlan plan) { 3525 3526 HRegionInfo hri = plan.getRegionInfo(); 3527 TableName tableName = hri.getTable(); 3528 if (tableStateManager.isTableState(tableName, 3529 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) { 3530 LOG.info("Ignored moving region of disabling/disabled table " 3531 + tableName); 3532 return; 3533 } 3534 3535 // Move the region only if it's assigned 3536 String encodedName = hri.getEncodedName(); 3537 ReentrantLock lock = locker.acquireLock(encodedName); 3538 try { 3539 if (!regionStates.isRegionOnline(hri)) { 3540 RegionState state = regionStates.getRegionState(encodedName); 3541 LOG.info("Ignored moving region not assigned: " + hri + ", " 3542 + (state == null ? "not in region states" : state)); 3543 return; 3544 } 3545 synchronized (this.regionPlans) { 3546 this.regionPlans.put(plan.getRegionName(), plan); 3547 } 3548 unassign(hri, false, plan.getDestination()); 3549 } finally { 3550 lock.unlock(); 3551 } 3552 } 3553 stop()3554 public void stop() { 3555 shutdown(); // Stop executor service, etc 3556 } 3557 3558 /** 3559 * Shutdown the threadpool executor service 3560 */ shutdown()3561 public void shutdown() { 3562 // It's an immediate shutdown, so we're clearing the remaining tasks. 3563 synchronized (zkEventWorkerWaitingList){ 3564 zkEventWorkerWaitingList.clear(); 3565 } 3566 3567 // Shutdown the threadpool executor service 3568 threadPoolExecutorService.shutdownNow(); 3569 zkEventWorkers.shutdownNow(); 3570 regionStateStore.stop(); 3571 } 3572 setEnabledTable(TableName tableName)3573 protected void setEnabledTable(TableName tableName) { 3574 try { 3575 this.tableStateManager.setTableState(tableName, 3576 ZooKeeperProtos.Table.State.ENABLED); 3577 } catch (CoordinatedStateException e) { 3578 // here we can abort as it is the start up flow 3579 String errorMsg = "Unable to ensure that the table " + tableName 3580 + " will be" + " enabled because of a ZooKeeper issue"; 3581 LOG.error(errorMsg); 3582 this.server.abort(errorMsg, e); 3583 } 3584 } 3585 3586 /** 3587 * Set region as OFFLINED up in zookeeper asynchronously. 3588 * @param state 3589 * @return True if we succeeded, false otherwise (State was incorrect or failed 3590 * updating zk). 3591 */ asyncSetOfflineInZooKeeper(final RegionState state, final AsyncCallback.StringCallback cb, final ServerName destination)3592 private boolean asyncSetOfflineInZooKeeper(final RegionState state, 3593 final AsyncCallback.StringCallback cb, final ServerName destination) { 3594 if (!state.isClosed() && !state.isOffline()) { 3595 this.server.abort("Unexpected state trying to OFFLINE; " + state, 3596 new IllegalStateException()); 3597 return false; 3598 } 3599 regionStates.updateRegionState(state.getRegion(), State.OFFLINE); 3600 try { 3601 ZKAssign.asyncCreateNodeOffline(watcher, state.getRegion(), 3602 destination, cb, state); 3603 } catch (KeeperException e) { 3604 if (e instanceof NodeExistsException) { 3605 LOG.warn("Node for " + state.getRegion() + " already exists"); 3606 } else { 3607 server.abort("Unexpected ZK exception creating/setting node OFFLINE", e); 3608 } 3609 return false; 3610 } 3611 return true; 3612 } 3613 deleteNodeInStates(String encodedName, String desc, ServerName sn, EventType... types)3614 private boolean deleteNodeInStates(String encodedName, 3615 String desc, ServerName sn, EventType... types) { 3616 try { 3617 for (EventType et: types) { 3618 if (ZKAssign.deleteNode(watcher, encodedName, et, sn)) { 3619 return true; 3620 } 3621 } 3622 LOG.info("Failed to delete the " + desc + " node for " 3623 + encodedName + ". The node type may not match"); 3624 } catch (NoNodeException e) { 3625 if (LOG.isDebugEnabled()) { 3626 LOG.debug("The " + desc + " node for " + encodedName + " already deleted"); 3627 } 3628 } catch (KeeperException ke) { 3629 server.abort("Unexpected ZK exception deleting " + desc 3630 + " node for the region " + encodedName, ke); 3631 } 3632 return false; 3633 } 3634 deleteMergingNode(String encodedName, ServerName sn)3635 private void deleteMergingNode(String encodedName, ServerName sn) { 3636 deleteNodeInStates(encodedName, "merging", sn, EventType.RS_ZK_REGION_MERGING, 3637 EventType.RS_ZK_REQUEST_REGION_MERGE, EventType.RS_ZK_REGION_MERGED); 3638 } 3639 deleteSplittingNode(String encodedName, ServerName sn)3640 private void deleteSplittingNode(String encodedName, ServerName sn) { 3641 deleteNodeInStates(encodedName, "splitting", sn, EventType.RS_ZK_REGION_SPLITTING, 3642 EventType.RS_ZK_REQUEST_REGION_SPLIT, EventType.RS_ZK_REGION_SPLIT); 3643 } 3644 3645 @edu.umd.cs.findbugs.annotations.SuppressWarnings( 3646 value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION", 3647 justification="Modification of Maps not ATOMIC!!!! FIX!!!") onRegionFailedOpen( final HRegionInfo hri, final ServerName sn)3648 private void onRegionFailedOpen( 3649 final HRegionInfo hri, final ServerName sn) { 3650 String encodedName = hri.getEncodedName(); 3651 // FindBugs: AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION Worth fixing!!! 3652 AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName); 3653 if (failedOpenCount == null) { 3654 failedOpenCount = new AtomicInteger(); 3655 // No need to use putIfAbsent, or extra synchronization since 3656 // this whole handleRegion block is locked on the encoded region 3657 // name, and failedOpenTracker is updated only in this block 3658 // FindBugs: AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION 3659 failedOpenTracker.put(encodedName, failedOpenCount); 3660 } 3661 if (failedOpenCount.incrementAndGet() >= maximumAttempts && !hri.isMetaRegion()) { 3662 // FindBugs: AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION 3663 regionStates.updateRegionState(hri, State.FAILED_OPEN); 3664 // remove the tracking info to save memory, also reset 3665 // the count for next open initiative 3666 failedOpenTracker.remove(encodedName); 3667 } else { 3668 if (hri.isMetaRegion() && failedOpenCount.get() >= maximumAttempts) { 3669 // Log a warning message if a meta region failedOpenCount exceeds maximumAttempts 3670 // so that we are aware of potential problem if it persists for a long time. 3671 LOG.warn("Failed to open the hbase:meta region " + 3672 hri.getRegionNameAsString() + " after" + 3673 failedOpenCount.get() + " retries. Continue retrying."); 3674 } 3675 3676 // Handle this the same as if it were opened and then closed. 3677 RegionState regionState = regionStates.updateRegionState(hri, State.CLOSED); 3678 if (regionState != null) { 3679 // When there are more than one region server a new RS is selected as the 3680 // destination and the same is updated in the region plan. (HBASE-5546) 3681 if (getTableStateManager().isTableState(hri.getTable(), 3682 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING) || 3683 replicasToClose.contains(hri)) { 3684 offlineDisabledRegion(hri); 3685 return; 3686 } 3687 // ZK Node is in CLOSED state, assign it. 3688 regionStates.updateRegionState(hri, RegionState.State.CLOSED); 3689 // This below has to do w/ online enable/disable of a table 3690 removeClosedRegion(hri); 3691 try { 3692 getRegionPlan(hri, sn, true); 3693 } catch (HBaseIOException e) { 3694 LOG.warn("Failed to get region plan", e); 3695 } 3696 invokeAssign(hri, false); 3697 } 3698 } 3699 } 3700 onRegionOpen(final HRegionInfo hri, final ServerName sn, long openSeqNum)3701 private void onRegionOpen(final HRegionInfo hri, final ServerName sn, long openSeqNum) { 3702 regionOnline(hri, sn, openSeqNum); 3703 if (useZKForAssignment) { 3704 try { 3705 // Delete the ZNode if exists 3706 ZKAssign.deleteNodeFailSilent(watcher, hri); 3707 } catch (KeeperException ke) { 3708 server.abort("Unexpected ZK exception deleting node " + hri, ke); 3709 } 3710 } 3711 3712 // reset the count, if any 3713 failedOpenTracker.remove(hri.getEncodedName()); 3714 if (getTableStateManager().isTableState(hri.getTable(), 3715 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) { 3716 invokeUnAssign(hri); 3717 } 3718 } 3719 onRegionClosed(final HRegionInfo hri)3720 private void onRegionClosed(final HRegionInfo hri) { 3721 if (getTableStateManager().isTableState(hri.getTable(), 3722 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING) || 3723 replicasToClose.contains(hri)) { 3724 offlineDisabledRegion(hri); 3725 return; 3726 } 3727 regionStates.updateRegionState(hri, RegionState.State.CLOSED); 3728 sendRegionClosedNotification(hri); 3729 // This below has to do w/ online enable/disable of a table 3730 removeClosedRegion(hri); 3731 invokeAssign(hri, false); 3732 } 3733 checkInStateForSplit(ServerName sn, final HRegionInfo p, final HRegionInfo a, final HRegionInfo b)3734 private String checkInStateForSplit(ServerName sn, 3735 final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) { 3736 final RegionState rs_p = regionStates.getRegionState(p); 3737 RegionState rs_a = regionStates.getRegionState(a); 3738 RegionState rs_b = regionStates.getRegionState(b); 3739 if (!(rs_p.isOpenOrSplittingOnServer(sn) 3740 && (rs_a == null || rs_a.isOpenOrSplittingNewOnServer(sn)) 3741 && (rs_b == null || rs_b.isOpenOrSplittingNewOnServer(sn)))) { 3742 return "Not in state good for split"; 3743 } 3744 return ""; 3745 } 3746 onRegionSplitReverted(ServerName sn, final HRegionInfo p, final HRegionInfo a, final HRegionInfo b)3747 private String onRegionSplitReverted(ServerName sn, 3748 final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) { 3749 String s = checkInStateForSplit(sn, p, a, b); 3750 if (!org.apache.commons.lang.StringUtils.isEmpty(s)) { 3751 return s; 3752 } 3753 regionOnline(p, sn); 3754 regionOffline(a); 3755 regionOffline(b); 3756 3757 if (getTableStateManager().isTableState(p.getTable(), 3758 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) { 3759 invokeUnAssign(p); 3760 } 3761 return null; 3762 } 3763 onRegionSplit(ServerName sn, TransitionCode code, final HRegionInfo p, final HRegionInfo a, final HRegionInfo b)3764 private String onRegionSplit(ServerName sn, TransitionCode code, 3765 final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) { 3766 String s = checkInStateForSplit(sn, p, a, b); 3767 if (!org.apache.commons.lang.StringUtils.isEmpty(s)) { 3768 return s; 3769 } 3770 3771 regionStates.updateRegionState(a, State.SPLITTING_NEW, sn); 3772 regionStates.updateRegionState(b, State.SPLITTING_NEW, sn); 3773 regionStates.updateRegionState(p, State.SPLITTING); 3774 3775 if (code == TransitionCode.SPLIT) { 3776 if (TEST_SKIP_SPLIT_HANDLING) { 3777 return "Skipping split message, TEST_SKIP_SPLIT_HANDLING is set"; 3778 } 3779 regionOffline(p, State.SPLIT); 3780 regionOnline(a, sn, 1); 3781 regionOnline(b, sn, 1); 3782 3783 // User could disable the table before master knows the new region. 3784 if (getTableStateManager().isTableState(p.getTable(), 3785 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) { 3786 invokeUnAssign(a); 3787 invokeUnAssign(b); 3788 } else { 3789 Callable<Object> splitReplicasCallable = new Callable<Object>() { 3790 @Override 3791 public Object call() { 3792 doSplittingOfReplicas(p, a, b); 3793 return null; 3794 } 3795 }; 3796 threadPoolExecutorService.submit(splitReplicasCallable); 3797 } 3798 } else if (code == TransitionCode.SPLIT_PONR) { 3799 try { 3800 regionStates.splitRegion(p, a, b, sn); 3801 } catch (IOException ioe) { 3802 LOG.info("Failed to record split region " + p.getShortNameToLog()); 3803 return "Failed to record the splitting in meta"; 3804 } 3805 } 3806 return null; 3807 } 3808 onRegionMerge(ServerName sn, TransitionCode code, final HRegionInfo p, final HRegionInfo a, final HRegionInfo b)3809 private String onRegionMerge(ServerName sn, TransitionCode code, 3810 final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) { 3811 RegionState rs_p = regionStates.getRegionState(p); 3812 RegionState rs_a = regionStates.getRegionState(a); 3813 RegionState rs_b = regionStates.getRegionState(b); 3814 if (!(rs_a.isOpenOrMergingOnServer(sn) && rs_b.isOpenOrMergingOnServer(sn) 3815 && (rs_p == null || rs_p.isOpenOrMergingNewOnServer(sn)))) { 3816 return "Not in state good for merge"; 3817 } 3818 3819 regionStates.updateRegionState(a, State.MERGING); 3820 regionStates.updateRegionState(b, State.MERGING); 3821 regionStates.updateRegionState(p, State.MERGING_NEW, sn); 3822 3823 String encodedName = p.getEncodedName(); 3824 if (code == TransitionCode.READY_TO_MERGE) { 3825 mergingRegions.put(encodedName, 3826 new PairOfSameType<HRegionInfo>(a, b)); 3827 } else if (code == TransitionCode.MERGED) { 3828 mergingRegions.remove(encodedName); 3829 regionOffline(a, State.MERGED); 3830 regionOffline(b, State.MERGED); 3831 regionOnline(p, sn, 1); 3832 3833 // User could disable the table before master knows the new region. 3834 if (getTableStateManager().isTableState(p.getTable(), 3835 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) { 3836 invokeUnAssign(p); 3837 } else { 3838 Callable<Object> mergeReplicasCallable = new Callable<Object>() { 3839 @Override 3840 public Object call() { 3841 doMergingOfReplicas(p, a, b); 3842 return null; 3843 } 3844 }; 3845 threadPoolExecutorService.submit(mergeReplicasCallable); 3846 } 3847 } else if (code == TransitionCode.MERGE_PONR) { 3848 try { 3849 regionStates.mergeRegions(p, a, b, sn); 3850 } catch (IOException ioe) { 3851 LOG.info("Failed to record merged region " + p.getShortNameToLog()); 3852 return "Failed to record the merging in meta"; 3853 } 3854 } else { 3855 mergingRegions.remove(encodedName); 3856 regionOnline(a, sn); 3857 regionOnline(b, sn); 3858 regionOffline(p); 3859 3860 if (getTableStateManager().isTableState(p.getTable(), 3861 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) { 3862 invokeUnAssign(a); 3863 invokeUnAssign(b); 3864 } 3865 } 3866 return null; 3867 } 3868 3869 /** 3870 * A helper to handle region merging transition event. 3871 * It transitions merging regions to MERGING state. 3872 */ handleRegionMerging(final RegionTransition rt, final String encodedName, final String prettyPrintedRegionName, final ServerName sn)3873 private boolean handleRegionMerging(final RegionTransition rt, final String encodedName, 3874 final String prettyPrintedRegionName, final ServerName sn) { 3875 if (!serverManager.isServerOnline(sn)) { 3876 LOG.warn("Dropped merging! ServerName=" + sn + " unknown."); 3877 return false; 3878 } 3879 byte [] payloadOfMerging = rt.getPayload(); 3880 List<HRegionInfo> mergingRegions; 3881 try { 3882 mergingRegions = HRegionInfo.parseDelimitedFrom( 3883 payloadOfMerging, 0, payloadOfMerging.length); 3884 } catch (IOException e) { 3885 LOG.error("Dropped merging! Failed reading " + rt.getEventType() 3886 + " payload for " + prettyPrintedRegionName); 3887 return false; 3888 } 3889 assert mergingRegions.size() == 3; 3890 HRegionInfo p = mergingRegions.get(0); 3891 HRegionInfo hri_a = mergingRegions.get(1); 3892 HRegionInfo hri_b = mergingRegions.get(2); 3893 3894 RegionState rs_p = regionStates.getRegionState(p); 3895 RegionState rs_a = regionStates.getRegionState(hri_a); 3896 RegionState rs_b = regionStates.getRegionState(hri_b); 3897 3898 if (!((rs_a == null || rs_a.isOpenOrMergingOnServer(sn)) 3899 && (rs_b == null || rs_b.isOpenOrMergingOnServer(sn)) 3900 && (rs_p == null || rs_p.isOpenOrMergingNewOnServer(sn)))) { 3901 LOG.warn("Dropped merging! Not in state good for MERGING; rs_p=" 3902 + rs_p + ", rs_a=" + rs_a + ", rs_b=" + rs_b); 3903 return false; 3904 } 3905 3906 EventType et = rt.getEventType(); 3907 if (et == EventType.RS_ZK_REQUEST_REGION_MERGE) { 3908 try { 3909 RegionMergeCoordination.RegionMergeDetails std = 3910 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) 3911 .getRegionMergeCoordination().getDefaultDetails(); 3912 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) 3913 .getRegionMergeCoordination().processRegionMergeRequest(p, hri_a, hri_b, sn, std); 3914 if (((ZkRegionMergeCoordination.ZkRegionMergeDetails) std).getZnodeVersion() == -1) { 3915 byte[] data = ZKAssign.getData(watcher, encodedName); 3916 EventType currentType = null; 3917 if (data != null) { 3918 RegionTransition newRt = RegionTransition.parseFrom(data); 3919 currentType = newRt.getEventType(); 3920 } 3921 if (currentType == null || (currentType != EventType.RS_ZK_REGION_MERGED 3922 && currentType != EventType.RS_ZK_REGION_MERGING)) { 3923 LOG.warn("Failed to transition pending_merge node " 3924 + encodedName + " to merging, it's now " + currentType); 3925 return false; 3926 } 3927 } 3928 } catch (Exception e) { 3929 LOG.warn("Failed to transition pending_merge node " 3930 + encodedName + " to merging", e); 3931 return false; 3932 } 3933 } 3934 3935 synchronized (regionStates) { 3936 regionStates.updateRegionState(hri_a, State.MERGING); 3937 regionStates.updateRegionState(hri_b, State.MERGING); 3938 regionStates.updateRegionState(p, State.MERGING_NEW, sn); 3939 3940 if (et != EventType.RS_ZK_REGION_MERGED) { 3941 this.mergingRegions.put(encodedName, 3942 new PairOfSameType<HRegionInfo>(hri_a, hri_b)); 3943 } else { 3944 this.mergingRegions.remove(encodedName); 3945 regionOffline(hri_a, State.MERGED); 3946 regionOffline(hri_b, State.MERGED); 3947 regionOnline(p, sn); 3948 } 3949 } 3950 3951 if (et == EventType.RS_ZK_REGION_MERGED) { 3952 doMergingOfReplicas(p, hri_a, hri_b); 3953 LOG.debug("Handling MERGED event for " + encodedName + "; deleting node"); 3954 // Remove region from ZK 3955 try { 3956 boolean successful = false; 3957 while (!successful) { 3958 // It's possible that the RS tickles in between the reading of the 3959 // znode and the deleting, so it's safe to retry. 3960 successful = ZKAssign.deleteNode(watcher, encodedName, 3961 EventType.RS_ZK_REGION_MERGED, sn); 3962 } 3963 } catch (KeeperException e) { 3964 if (e instanceof NoNodeException) { 3965 String znodePath = ZKUtil.joinZNode(watcher.splitLogZNode, encodedName); 3966 LOG.debug("The znode " + znodePath + " does not exist. May be deleted already."); 3967 } else { 3968 server.abort("Error deleting MERGED node " + encodedName, e); 3969 } 3970 } 3971 LOG.info("Handled MERGED event; merged=" + p.getRegionNameAsString() 3972 + ", region_a=" + hri_a.getRegionNameAsString() + ", region_b=" 3973 + hri_b.getRegionNameAsString() + ", on " + sn); 3974 3975 // User could disable the table before master knows the new region. 3976 if (tableStateManager.isTableState(p.getTable(), 3977 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) { 3978 unassign(p); 3979 } 3980 } 3981 return true; 3982 } 3983 3984 /** 3985 * A helper to handle region splitting transition event. 3986 */ handleRegionSplitting(final RegionTransition rt, final String encodedName, final String prettyPrintedRegionName, final ServerName sn)3987 private boolean handleRegionSplitting(final RegionTransition rt, final String encodedName, 3988 final String prettyPrintedRegionName, final ServerName sn) { 3989 if (!serverManager.isServerOnline(sn)) { 3990 LOG.warn("Dropped splitting! ServerName=" + sn + " unknown."); 3991 return false; 3992 } 3993 byte [] payloadOfSplitting = rt.getPayload(); 3994 List<HRegionInfo> splittingRegions; 3995 try { 3996 splittingRegions = HRegionInfo.parseDelimitedFrom( 3997 payloadOfSplitting, 0, payloadOfSplitting.length); 3998 } catch (IOException e) { 3999 LOG.error("Dropped splitting! Failed reading " + rt.getEventType() 4000 + " payload for " + prettyPrintedRegionName); 4001 return false; 4002 } 4003 assert splittingRegions.size() == 2; 4004 HRegionInfo hri_a = splittingRegions.get(0); 4005 HRegionInfo hri_b = splittingRegions.get(1); 4006 4007 RegionState rs_p = regionStates.getRegionState(encodedName); 4008 RegionState rs_a = regionStates.getRegionState(hri_a); 4009 RegionState rs_b = regionStates.getRegionState(hri_b); 4010 4011 if (!((rs_p == null || rs_p.isOpenOrSplittingOnServer(sn)) 4012 && (rs_a == null || rs_a.isOpenOrSplittingNewOnServer(sn)) 4013 && (rs_b == null || rs_b.isOpenOrSplittingNewOnServer(sn)))) { 4014 LOG.warn("Dropped splitting! Not in state good for SPLITTING; rs_p=" 4015 + rs_p + ", rs_a=" + rs_a + ", rs_b=" + rs_b); 4016 return false; 4017 } 4018 4019 if (rs_p == null) { 4020 // Splitting region should be online 4021 rs_p = regionStates.updateRegionState(rt, State.OPEN); 4022 if (rs_p == null) { 4023 LOG.warn("Received splitting for region " + prettyPrintedRegionName 4024 + " from server " + sn + " but it doesn't exist anymore," 4025 + " probably already processed its split"); 4026 return false; 4027 } 4028 regionStates.regionOnline(rs_p.getRegion(), sn); 4029 } 4030 4031 HRegionInfo p = rs_p.getRegion(); 4032 EventType et = rt.getEventType(); 4033 if (et == EventType.RS_ZK_REQUEST_REGION_SPLIT) { 4034 try { 4035 SplitTransactionDetails std = 4036 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) 4037 .getSplitTransactionCoordination().getDefaultDetails(); 4038 if (((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) 4039 .getSplitTransactionCoordination().processTransition(p, hri_a, hri_b, sn, std) == -1) { 4040 byte[] data = ZKAssign.getData(watcher, encodedName); 4041 EventType currentType = null; 4042 if (data != null) { 4043 RegionTransition newRt = RegionTransition.parseFrom(data); 4044 currentType = newRt.getEventType(); 4045 } 4046 if (currentType == null 4047 || (currentType != EventType.RS_ZK_REGION_SPLIT && currentType != EventType.RS_ZK_REGION_SPLITTING)) { 4048 LOG.warn("Failed to transition pending_split node " + encodedName 4049 + " to splitting, it's now " + currentType); 4050 return false; 4051 } 4052 } 4053 } catch (Exception e) { 4054 LOG.warn("Failed to transition pending_split node " + encodedName + " to splitting", e); 4055 return false; 4056 } 4057 } 4058 4059 synchronized (regionStates) { 4060 splitRegions.put(p, new PairOfSameType<HRegionInfo>(hri_a, hri_b)); 4061 regionStates.updateRegionState(hri_a, State.SPLITTING_NEW, sn); 4062 regionStates.updateRegionState(hri_b, State.SPLITTING_NEW, sn); 4063 regionStates.updateRegionState(rt, State.SPLITTING); 4064 4065 // The below is for testing ONLY! We can't do fault injection easily, so 4066 // resort to this kinda uglyness -- St.Ack 02/25/2011. 4067 if (TEST_SKIP_SPLIT_HANDLING) { 4068 LOG.warn("Skipping split message, TEST_SKIP_SPLIT_HANDLING is set"); 4069 return true; // return true so that the splitting node stays 4070 } 4071 4072 if (et == EventType.RS_ZK_REGION_SPLIT) { 4073 regionOffline(p, State.SPLIT); 4074 regionOnline(hri_a, sn); 4075 regionOnline(hri_b, sn); 4076 splitRegions.remove(p); 4077 } 4078 } 4079 4080 if (et == EventType.RS_ZK_REGION_SPLIT) { 4081 // split replicas 4082 doSplittingOfReplicas(rs_p.getRegion(), hri_a, hri_b); 4083 LOG.debug("Handling SPLIT event for " + encodedName + "; deleting node"); 4084 // Remove region from ZK 4085 try { 4086 boolean successful = false; 4087 while (!successful) { 4088 // It's possible that the RS tickles in between the reading of the 4089 // znode and the deleting, so it's safe to retry. 4090 successful = ZKAssign.deleteNode(watcher, encodedName, 4091 EventType.RS_ZK_REGION_SPLIT, sn); 4092 } 4093 } catch (KeeperException e) { 4094 if (e instanceof NoNodeException) { 4095 String znodePath = ZKUtil.joinZNode(watcher.splitLogZNode, encodedName); 4096 LOG.debug("The znode " + znodePath + " does not exist. May be deleted already."); 4097 } else { 4098 server.abort("Error deleting SPLIT node " + encodedName, e); 4099 } 4100 } 4101 LOG.info("Handled SPLIT event; parent=" + p.getRegionNameAsString() 4102 + ", daughter a=" + hri_a.getRegionNameAsString() + ", daughter b=" 4103 + hri_b.getRegionNameAsString() + ", on " + sn); 4104 4105 // User could disable the table before master knows the new region. 4106 if (tableStateManager.isTableState(p.getTable(), 4107 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) { 4108 unassign(hri_a); 4109 unassign(hri_b); 4110 } 4111 } 4112 return true; 4113 } 4114 doMergingOfReplicas(HRegionInfo mergedHri, final HRegionInfo hri_a, final HRegionInfo hri_b)4115 private void doMergingOfReplicas(HRegionInfo mergedHri, final HRegionInfo hri_a, 4116 final HRegionInfo hri_b) { 4117 // Close replicas for the original unmerged regions. create/assign new replicas 4118 // for the merged parent. 4119 List<HRegionInfo> unmergedRegions = new ArrayList<HRegionInfo>(); 4120 unmergedRegions.add(hri_a); 4121 unmergedRegions.add(hri_b); 4122 Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(unmergedRegions); 4123 Collection<List<HRegionInfo>> c = map.values(); 4124 for (List<HRegionInfo> l : c) { 4125 for (HRegionInfo h : l) { 4126 if (!RegionReplicaUtil.isDefaultReplica(h)) { 4127 LOG.debug("Unassigning un-merged replica " + h); 4128 unassign(h); 4129 } 4130 } 4131 } 4132 int numReplicas = 1; 4133 try { 4134 numReplicas = server.getTableDescriptors().get(mergedHri.getTable()). 4135 getRegionReplication(); 4136 } catch (IOException e) { 4137 LOG.warn("Couldn't get the replication attribute of the table " + mergedHri.getTable() + 4138 " due to " + e.getMessage() + ". The assignment of replicas for the merged region " + 4139 "will not be done"); 4140 } 4141 List<HRegionInfo> regions = new ArrayList<HRegionInfo>(); 4142 for (int i = 1; i < numReplicas; i++) { 4143 regions.add(RegionReplicaUtil.getRegionInfoForReplica(mergedHri, i)); 4144 } 4145 try { 4146 assign(regions); 4147 } catch (IOException ioe) { 4148 LOG.warn("Couldn't assign all replica(s) of region " + mergedHri + " because of " + 4149 ioe.getMessage()); 4150 } catch (InterruptedException ie) { 4151 LOG.warn("Couldn't assign all replica(s) of region " + mergedHri+ " because of " + 4152 ie.getMessage()); 4153 } 4154 } 4155 doSplittingOfReplicas(final HRegionInfo parentHri, final HRegionInfo hri_a, final HRegionInfo hri_b)4156 private void doSplittingOfReplicas(final HRegionInfo parentHri, final HRegionInfo hri_a, 4157 final HRegionInfo hri_b) { 4158 // create new regions for the replica, and assign them to match with the 4159 // current replica assignments. If replica1 of parent is assigned to RS1, 4160 // the replica1s of daughters will be on the same machine 4161 int numReplicas = 1; 4162 try { 4163 numReplicas = server.getTableDescriptors().get(parentHri.getTable()). 4164 getRegionReplication(); 4165 } catch (IOException e) { 4166 LOG.warn("Couldn't get the replication attribute of the table " + parentHri.getTable() + 4167 " due to " + e.getMessage() + ". The assignment of daughter replicas " + 4168 "replicas will not be done"); 4169 } 4170 // unassign the old replicas 4171 List<HRegionInfo> parentRegion = new ArrayList<HRegionInfo>(); 4172 parentRegion.add(parentHri); 4173 Map<ServerName, List<HRegionInfo>> currentAssign = 4174 regionStates.getRegionAssignments(parentRegion); 4175 Collection<List<HRegionInfo>> c = currentAssign.values(); 4176 for (List<HRegionInfo> l : c) { 4177 for (HRegionInfo h : l) { 4178 if (!RegionReplicaUtil.isDefaultReplica(h)) { 4179 LOG.debug("Unassigning parent's replica " + h); 4180 unassign(h); 4181 } 4182 } 4183 } 4184 // assign daughter replicas 4185 Map<HRegionInfo, ServerName> map = new HashMap<HRegionInfo, ServerName>(); 4186 for (int i = 1; i < numReplicas; i++) { 4187 prepareDaughterReplicaForAssignment(hri_a, parentHri, i, map); 4188 prepareDaughterReplicaForAssignment(hri_b, parentHri, i, map); 4189 } 4190 try { 4191 assign(map); 4192 } catch (IOException e) { 4193 LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)"); 4194 } catch (InterruptedException e) { 4195 LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)"); 4196 } 4197 } 4198 prepareDaughterReplicaForAssignment(HRegionInfo daughterHri, HRegionInfo parentHri, int replicaId, Map<HRegionInfo, ServerName> map)4199 private void prepareDaughterReplicaForAssignment(HRegionInfo daughterHri, HRegionInfo parentHri, 4200 int replicaId, Map<HRegionInfo, ServerName> map) { 4201 HRegionInfo parentReplica = RegionReplicaUtil.getRegionInfoForReplica(parentHri, replicaId); 4202 HRegionInfo daughterReplica = RegionReplicaUtil.getRegionInfoForReplica(daughterHri, 4203 replicaId); 4204 LOG.debug("Created replica region for daughter " + daughterReplica); 4205 ServerName sn; 4206 if ((sn = regionStates.getRegionServerOfRegion(parentReplica)) != null) { 4207 map.put(daughterReplica, sn); 4208 } else { 4209 List<ServerName> servers = serverManager.getOnlineServersList(); 4210 sn = servers.get((new Random(System.currentTimeMillis())).nextInt(servers.size())); 4211 map.put(daughterReplica, sn); 4212 } 4213 } 4214 getReplicasToClose()4215 public Set<HRegionInfo> getReplicasToClose() { 4216 return replicasToClose; 4217 } 4218 4219 /** 4220 * A region is offline. The new state should be the specified one, 4221 * if not null. If the specified state is null, the new state is Offline. 4222 * The specified state can be Split/Merged/Offline/null only. 4223 */ regionOffline(final HRegionInfo regionInfo, final State state)4224 private void regionOffline(final HRegionInfo regionInfo, final State state) { 4225 regionStates.regionOffline(regionInfo, state); 4226 removeClosedRegion(regionInfo); 4227 // remove the region plan as well just in case. 4228 clearRegionPlan(regionInfo); 4229 balancer.regionOffline(regionInfo); 4230 4231 // Tell our listeners that a region was closed 4232 sendRegionClosedNotification(regionInfo); 4233 // also note that all the replicas of the primary should be closed 4234 if (state != null && state.equals(State.SPLIT)) { 4235 Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1); 4236 c.add(regionInfo); 4237 Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c); 4238 Collection<List<HRegionInfo>> allReplicas = map.values(); 4239 for (List<HRegionInfo> list : allReplicas) { 4240 replicasToClose.addAll(list); 4241 } 4242 } 4243 else if (state != null && state.equals(State.MERGED)) { 4244 Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1); 4245 c.add(regionInfo); 4246 Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c); 4247 Collection<List<HRegionInfo>> allReplicas = map.values(); 4248 for (List<HRegionInfo> list : allReplicas) { 4249 replicasToClose.addAll(list); 4250 } 4251 } 4252 } 4253 sendRegionOpenedNotification(final HRegionInfo regionInfo, final ServerName serverName)4254 private void sendRegionOpenedNotification(final HRegionInfo regionInfo, 4255 final ServerName serverName) { 4256 if (!this.listeners.isEmpty()) { 4257 for (AssignmentListener listener : this.listeners) { 4258 listener.regionOpened(regionInfo, serverName); 4259 } 4260 } 4261 } 4262 sendRegionClosedNotification(final HRegionInfo regionInfo)4263 private void sendRegionClosedNotification(final HRegionInfo regionInfo) { 4264 if (!this.listeners.isEmpty()) { 4265 for (AssignmentListener listener : this.listeners) { 4266 listener.regionClosed(regionInfo); 4267 } 4268 } 4269 } 4270 4271 /** 4272 * Try to update some region states. If the state machine prevents 4273 * such update, an error message is returned to explain the reason. 4274 * 4275 * It's expected that in each transition there should have just one 4276 * region for opening/closing, 3 regions for splitting/merging. 4277 * These regions should be on the server that requested the change. 4278 * 4279 * Region state machine. Only these transitions 4280 * are expected to be triggered by a region server. 4281 * 4282 * On the state transition: 4283 * (1) Open/Close should be initiated by master 4284 * (a) Master sets the region to pending_open/pending_close 4285 * in memory and hbase:meta after sending the request 4286 * to the region server 4287 * (b) Region server reports back to the master 4288 * after open/close is done (either success/failure) 4289 * (c) If region server has problem to report the status 4290 * to master, it must be because the master is down or some 4291 * temporary network issue. Otherwise, the region server should 4292 * abort since it must be a bug. If the master is not accessible, 4293 * the region server should keep trying until the server is 4294 * stopped or till the status is reported to the (new) master 4295 * (d) If region server dies in the middle of opening/closing 4296 * a region, SSH picks it up and finishes it 4297 * (e) If master dies in the middle, the new master recovers 4298 * the state during initialization from hbase:meta. Region server 4299 * can report any transition that has not been reported to 4300 * the previous active master yet 4301 * (2) Split/merge is initiated by region servers 4302 * (a) To split a region, a region server sends a request 4303 * to master to try to set a region to splitting, together with 4304 * two daughters (to be created) to splitting new. If approved 4305 * by the master, the splitting can then move ahead 4306 * (b) To merge two regions, a region server sends a request to 4307 * master to try to set the new merged region (to be created) to 4308 * merging_new, together with two regions (to be merged) to merging. 4309 * If it is ok with the master, the merge can then move ahead 4310 * (c) Once the splitting/merging is done, the region server 4311 * reports the status back to the master either success/failure. 4312 * (d) Other scenarios should be handled similarly as for 4313 * region open/close 4314 */ onRegionTransition(final ServerName serverName, final RegionStateTransition transition)4315 protected String onRegionTransition(final ServerName serverName, 4316 final RegionStateTransition transition) { 4317 TransitionCode code = transition.getTransitionCode(); 4318 HRegionInfo hri = HRegionInfo.convert(transition.getRegionInfo(0)); 4319 RegionState current = regionStates.getRegionState(hri); 4320 if (LOG.isDebugEnabled()) { 4321 LOG.debug("Got transition " + code + " for " 4322 + (current != null ? current.toString() : hri.getShortNameToLog()) 4323 + " from " + serverName); 4324 } 4325 String errorMsg = null; 4326 switch (code) { 4327 case OPENED: 4328 if (current != null && current.isOpened() && current.isOnServer(serverName)) { 4329 LOG.info("Region " + hri.getShortNameToLog() + " is already " + current.getState() + " on " 4330 + serverName); 4331 break; 4332 } 4333 case FAILED_OPEN: 4334 if (current == null 4335 || !current.isPendingOpenOrOpeningOnServer(serverName)) { 4336 errorMsg = hri.getShortNameToLog() 4337 + " is not pending open on " + serverName; 4338 } else if (code == TransitionCode.FAILED_OPEN) { 4339 onRegionFailedOpen(hri, serverName); 4340 } else { 4341 long openSeqNum = HConstants.NO_SEQNUM; 4342 if (transition.hasOpenSeqNum()) { 4343 openSeqNum = transition.getOpenSeqNum(); 4344 } 4345 if (openSeqNum < 0) { 4346 errorMsg = "Newly opened region has invalid open seq num " + openSeqNum; 4347 } else { 4348 onRegionOpen(hri, serverName, openSeqNum); 4349 } 4350 } 4351 break; 4352 4353 case CLOSED: 4354 if (current == null 4355 || !current.isPendingCloseOrClosingOnServer(serverName)) { 4356 errorMsg = hri.getShortNameToLog() 4357 + " is not pending close on " + serverName; 4358 } else { 4359 onRegionClosed(hri); 4360 } 4361 break; 4362 4363 case READY_TO_SPLIT: 4364 try { 4365 regionStateListener.onRegionSplit(hri); 4366 } catch (IOException exp) { 4367 errorMsg = StringUtils.stringifyException(exp); 4368 } 4369 break; 4370 case SPLIT_PONR: 4371 case SPLIT: 4372 errorMsg = 4373 onRegionSplit(serverName, code, hri, HRegionInfo.convert(transition.getRegionInfo(1)), 4374 HRegionInfo.convert(transition.getRegionInfo(2))); 4375 break; 4376 4377 case SPLIT_REVERTED: 4378 errorMsg = 4379 onRegionSplitReverted(serverName, hri, 4380 HRegionInfo.convert(transition.getRegionInfo(1)), 4381 HRegionInfo.convert(transition.getRegionInfo(2))); 4382 if (org.apache.commons.lang.StringUtils.isEmpty(errorMsg)) { 4383 try { 4384 regionStateListener.onRegionSplitReverted(hri); 4385 } catch (IOException exp) { 4386 LOG.warn(StringUtils.stringifyException(exp)); 4387 } 4388 } 4389 break; 4390 case READY_TO_MERGE: 4391 case MERGE_PONR: 4392 case MERGED: 4393 case MERGE_REVERTED: 4394 errorMsg = onRegionMerge(serverName, code, hri, 4395 HRegionInfo.convert(transition.getRegionInfo(1)), 4396 HRegionInfo.convert(transition.getRegionInfo(2))); 4397 if (code == TransitionCode.MERGED && org.apache.commons.lang.StringUtils.isEmpty(errorMsg)) { 4398 try { 4399 regionStateListener.onRegionMerged(hri); 4400 } catch (IOException exp) { 4401 errorMsg = StringUtils.stringifyException(exp); 4402 } 4403 } 4404 break; 4405 4406 default: 4407 errorMsg = "Unexpected transition code " + code; 4408 } 4409 if (errorMsg != null) { 4410 LOG.error("Failed to transtion region from " + current + " to " 4411 + code + " by " + serverName + ": " + errorMsg); 4412 } 4413 return errorMsg; 4414 } 4415 4416 /** 4417 * @return Instance of load balancer 4418 */ getBalancer()4419 public LoadBalancer getBalancer() { 4420 return this.balancer; 4421 } 4422 4423 public Map<ServerName, List<HRegionInfo>> getSnapShotOfAssignment(Collection<HRegionInfo> infos)4424 getSnapShotOfAssignment(Collection<HRegionInfo> infos) { 4425 return getRegionStates().getRegionAssignments(infos); 4426 } 4427 setRegionStateListener(RegionStateListener listener)4428 void setRegionStateListener(RegionStateListener listener) { 4429 this.regionStateListener = listener; 4430 } 4431 } 4432