1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one
4  * or more contributor license agreements.  See the NOTICE file
5  * distributed with this work for additional information
6  * regarding copyright ownership.  The ASF licenses this file
7  * to you under the Apache License, Version 2.0 (the
8  * "License"); you may not use this file except in compliance
9  * with the License.  You may obtain a copy of the License at
10  *
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19 package org.apache.hadoop.hbase.master;
20 
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.Collection;
26 import java.util.Collections;
27 import java.util.HashMap;
28 import java.util.HashSet;
29 import java.util.Iterator;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.NavigableMap;
33 import java.util.Random;
34 import java.util.Set;
35 import java.util.TreeMap;
36 import java.util.concurrent.Callable;
37 import java.util.concurrent.ConcurrentHashMap;
38 import java.util.concurrent.CopyOnWriteArrayList;
39 import java.util.concurrent.ThreadFactory;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicBoolean;
42 import java.util.concurrent.atomic.AtomicInteger;
43 import java.util.concurrent.locks.Lock;
44 import java.util.concurrent.locks.ReentrantLock;
45 
46 import org.apache.commons.logging.Log;
47 import org.apache.commons.logging.LogFactory;
48 import org.apache.hadoop.conf.Configuration;
49 import org.apache.hadoop.fs.FileSystem;
50 import org.apache.hadoop.fs.Path;
51 import org.apache.hadoop.hbase.CoordinatedStateException;
52 import org.apache.hadoop.hbase.HBaseIOException;
53 import org.apache.hadoop.hbase.HConstants;
54 import org.apache.hadoop.hbase.HRegionInfo;
55 import org.apache.hadoop.hbase.HRegionLocation;
56 import org.apache.hadoop.hbase.HTableDescriptor;
57 import org.apache.hadoop.hbase.MetaTableAccessor;
58 import org.apache.hadoop.hbase.NotServingRegionException;
59 import org.apache.hadoop.hbase.RegionLocations;
60 import org.apache.hadoop.hbase.RegionStateListener;
61 import org.apache.hadoop.hbase.RegionTransition;
62 import org.apache.hadoop.hbase.ServerName;
63 import org.apache.hadoop.hbase.TableName;
64 import org.apache.hadoop.hbase.TableNotFoundException;
65 import org.apache.hadoop.hbase.TableStateManager;
66 import org.apache.hadoop.hbase.classification.InterfaceAudience;
67 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
68 import org.apache.hadoop.hbase.client.Result;
69 import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
70 import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
71 import org.apache.hadoop.hbase.coordination.RegionMergeCoordination;
72 import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination.SplitTransactionDetails;
73 import org.apache.hadoop.hbase.coordination.ZkOpenRegionCoordination;
74 import org.apache.hadoop.hbase.coordination.ZkRegionMergeCoordination;
75 import org.apache.hadoop.hbase.exceptions.DeserializationException;
76 import org.apache.hadoop.hbase.executor.EventHandler;
77 import org.apache.hadoop.hbase.executor.EventType;
78 import org.apache.hadoop.hbase.executor.ExecutorService;
79 import org.apache.hadoop.hbase.ipc.FailedServerException;
80 import org.apache.hadoop.hbase.ipc.RpcClient;
81 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
82 import org.apache.hadoop.hbase.master.RegionState.State;
83 import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper;
84 import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer;
85 import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler;
86 import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
87 import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
88 import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
89 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
90 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
91 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
92 import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
93 import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
94 import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
95 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
96 import org.apache.hadoop.hbase.util.ConfigUtil;
97 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
98 import org.apache.hadoop.hbase.util.FSUtils;
99 import org.apache.hadoop.hbase.util.KeyLocker;
100 import org.apache.hadoop.hbase.util.Pair;
101 import org.apache.hadoop.hbase.util.PairOfSameType;
102 import org.apache.hadoop.hbase.util.Threads;
103 import org.apache.hadoop.hbase.util.Triple;
104 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
105 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
106 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
107 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
108 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
109 import org.apache.hadoop.ipc.RemoteException;
110 import org.apache.hadoop.util.StringUtils;
111 import org.apache.zookeeper.AsyncCallback;
112 import org.apache.zookeeper.KeeperException;
113 import org.apache.zookeeper.KeeperException.NoNodeException;
114 import org.apache.zookeeper.KeeperException.NodeExistsException;
115 import org.apache.zookeeper.data.Stat;
116 
117 import com.google.common.annotations.VisibleForTesting;
118 import com.google.common.collect.LinkedHashMultimap;
119 
120 /**
121  * Manages and performs region assignment.
122  * <p>
123  * Monitors ZooKeeper for events related to regions in transition.
124  * <p>
125  * Handles existing regions in transition during master failover.
126  */
127 @InterfaceAudience.Private
128 public class AssignmentManager extends ZooKeeperListener {
129   private static final Log LOG = LogFactory.getLog(AssignmentManager.class);
130 
131   public static final ServerName HBCK_CODE_SERVERNAME = ServerName.valueOf(HConstants.HBCK_CODE_NAME,
132       -1, -1L);
133 
134   static final String ALREADY_IN_TRANSITION_WAITTIME
135     = "hbase.assignment.already.intransition.waittime";
136   static final int DEFAULT_ALREADY_IN_TRANSITION_WAITTIME = 60000; // 1 minute
137 
138   protected final MasterServices server;
139 
140   private ServerManager serverManager;
141 
142   private boolean shouldAssignRegionsWithFavoredNodes;
143 
144   private LoadBalancer balancer;
145 
146   private final MetricsAssignmentManager metricsAssignmentManager;
147 
148   private final TableLockManager tableLockManager;
149 
150   private AtomicInteger numRegionsOpened = new AtomicInteger(0);
151 
152   final private KeyLocker<String> locker = new KeyLocker<String>();
153 
154   Set<HRegionInfo> replicasToClose = Collections.synchronizedSet(new HashSet<HRegionInfo>());
155 
156   /**
157    * Map of regions to reopen after the schema of a table is changed. Key -
158    * encoded region name, value - HRegionInfo
159    */
160   private final Map <String, HRegionInfo> regionsToReopen;
161 
162   /*
163    * Maximum times we recurse an assignment/unassignment.
164    * See below in {@link #assign()} and {@link #unassign()}.
165    */
166   private final int maximumAttempts;
167 
168   /**
169    * Map of two merging regions from the region to be created.
170    */
171   private final Map<String, PairOfSameType<HRegionInfo>> mergingRegions
172     = new HashMap<String, PairOfSameType<HRegionInfo>>();
173 
174   private final Map<HRegionInfo, PairOfSameType<HRegionInfo>> splitRegions
175   = new HashMap<HRegionInfo, PairOfSameType<HRegionInfo>>();
176 
177   /**
178    * The sleep time for which the assignment will wait before retrying in case of hbase:meta assignment
179    * failure due to lack of availability of region plan or bad region plan
180    */
181   private final long sleepTimeBeforeRetryingMetaAssignment;
182 
183   /** Plans for region movement. Key is the encoded version of a region name*/
184   // TODO: When do plans get cleaned out?  Ever? In server open and in server
185   // shutdown processing -- St.Ack
186   // All access to this Map must be synchronized.
187   final NavigableMap<String, RegionPlan> regionPlans =
188     new TreeMap<String, RegionPlan>();
189 
190   private final TableStateManager tableStateManager;
191 
192   private final ExecutorService executorService;
193 
194   // For unit tests, keep track of calls to ClosedRegionHandler
195   private Map<HRegionInfo, AtomicBoolean> closedRegionHandlerCalled = null;
196 
197   // For unit tests, keep track of calls to OpenedRegionHandler
198   private Map<HRegionInfo, AtomicBoolean> openedRegionHandlerCalled = null;
199 
200   //Thread pool executor service for timeout monitor
201   private java.util.concurrent.ExecutorService threadPoolExecutorService;
202 
203   // A bunch of ZK events workers. Each is a single thread executor service
204   private final java.util.concurrent.ExecutorService zkEventWorkers;
205 
206   private List<EventType> ignoreStatesRSOffline = Arrays.asList(
207       EventType.RS_ZK_REGION_FAILED_OPEN, EventType.RS_ZK_REGION_CLOSED);
208 
209   private final RegionStates regionStates;
210 
211   // The threshold to use bulk assigning. Using bulk assignment
212   // only if assigning at least this many regions to at least this
213   // many servers. If assigning fewer regions to fewer servers,
214   // bulk assigning may be not as efficient.
215   private final int bulkAssignThresholdRegions;
216   private final int bulkAssignThresholdServers;
217   private final int bulkPerRegionOpenTimeGuesstimate;
218 
219   // Should bulk assignment wait till all regions are assigned,
220   // or it is timed out?  This is useful to measure bulk assignment
221   // performance, but not needed in most use cases.
222   private final boolean bulkAssignWaitTillAllAssigned;
223 
224   /**
225    * Indicator that AssignmentManager has recovered the region states so
226    * that ServerShutdownHandler can be fully enabled and re-assign regions
227    * of dead servers. So that when re-assignment happens, AssignmentManager
228    * has proper region states.
229    *
230    * Protected to ease testing.
231    */
232   protected final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
233 
234   /**
235    * A map to track the count a region fails to open in a row.
236    * So that we don't try to open a region forever if the failure is
237    * unrecoverable.  We don't put this information in region states
238    * because we don't expect this to happen frequently; we don't
239    * want to copy this information over during each state transition either.
240    */
241   private final ConcurrentHashMap<String, AtomicInteger>
242     failedOpenTracker = new ConcurrentHashMap<String, AtomicInteger>();
243 
244   // A flag to indicate if we are using ZK for region assignment
245   private final boolean useZKForAssignment;
246 
247   // In case not using ZK for region assignment, region states
248   // are persisted in meta with a state store
249   private final RegionStateStore regionStateStore;
250 
251   /**
252    * For testing only!  Set to true to skip handling of split.
253    */
254   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
255   public static boolean TEST_SKIP_SPLIT_HANDLING = false;
256 
257   /** Listeners that are called on assignment events. */
258   private List<AssignmentListener> listeners = new CopyOnWriteArrayList<AssignmentListener>();
259 
260   private RegionStateListener regionStateListener;
261 
262   public enum ServerHostRegion {
263     NOT_HOSTING_REGION, HOSTING_REGION, UNKNOWN,
264   }
265 
266   /**
267    * Constructs a new assignment manager.
268    *
269    * @param server instance of HMaster this AM running inside
270    * @param serverManager serverManager for associated HMaster
271    * @param balancer implementation of {@link LoadBalancer}
272    * @param service Executor service
273    * @param metricsMaster metrics manager
274    * @param tableLockManager TableLock manager
275    * @throws KeeperException
276    * @throws IOException
277    */
AssignmentManager(MasterServices server, ServerManager serverManager, final LoadBalancer balancer, final ExecutorService service, MetricsMaster metricsMaster, final TableLockManager tableLockManager)278   public AssignmentManager(MasterServices server, ServerManager serverManager,
279       final LoadBalancer balancer,
280       final ExecutorService service, MetricsMaster metricsMaster,
281       final TableLockManager tableLockManager) throws KeeperException,
282         IOException, CoordinatedStateException {
283     super(server.getZooKeeper());
284     this.server = server;
285     this.serverManager = serverManager;
286     this.executorService = service;
287     this.regionStateStore = new RegionStateStore(server);
288     this.regionsToReopen = Collections.synchronizedMap
289                            (new HashMap<String, HRegionInfo> ());
290     Configuration conf = server.getConfiguration();
291     // Only read favored nodes if using the favored nodes load balancer.
292     this.shouldAssignRegionsWithFavoredNodes = conf.getClass(
293            HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class).equals(
294            FavoredNodeLoadBalancer.class);
295     try {
296       if (server.getCoordinatedStateManager() != null) {
297         this.tableStateManager = server.getCoordinatedStateManager().getTableStateManager();
298       } else {
299         this.tableStateManager = null;
300       }
301     } catch (InterruptedException e) {
302       throw new InterruptedIOException();
303     }
304     // This is the max attempts, not retries, so it should be at least 1.
305     this.maximumAttempts = Math.max(1,
306       this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10));
307     this.sleepTimeBeforeRetryingMetaAssignment = this.server.getConfiguration().getLong(
308         "hbase.meta.assignment.retry.sleeptime", 1000l);
309     this.balancer = balancer;
310     int maxThreads = conf.getInt("hbase.assignment.threads.max", 30);
311     this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool(
312       maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM."));
313     this.regionStates = new RegionStates(
314       server, tableStateManager, serverManager, regionStateStore);
315 
316     this.bulkAssignWaitTillAllAssigned =
317       conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false);
318     this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7);
319     this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3);
320     this.bulkPerRegionOpenTimeGuesstimate =
321       conf.getInt("hbase.bulk.assignment.perregion.open.time", 10000);
322 
323     int workers = conf.getInt("hbase.assignment.zkevent.workers", 20);
324     ThreadFactory threadFactory = Threads.newDaemonThreadFactory("AM.ZK.Worker");
325     zkEventWorkers = Threads.getBoundedCachedThreadPool(workers, 60L,
326             TimeUnit.SECONDS, threadFactory);
327     this.tableLockManager = tableLockManager;
328 
329     this.metricsAssignmentManager = new MetricsAssignmentManager();
330     useZKForAssignment = ConfigUtil.useZKForAssignment(conf);
331   }
332 
333   /**
334    * Add the listener to the notification list.
335    * @param listener The AssignmentListener to register
336    */
registerListener(final AssignmentListener listener)337   public void registerListener(final AssignmentListener listener) {
338     this.listeners.add(listener);
339   }
340 
341   /**
342    * Remove the listener from the notification list.
343    * @param listener The AssignmentListener to unregister
344    */
unregisterListener(final AssignmentListener listener)345   public boolean unregisterListener(final AssignmentListener listener) {
346     return this.listeners.remove(listener);
347   }
348 
349   /**
350    * @return Instance of ZKTableStateManager.
351    */
getTableStateManager()352   public TableStateManager getTableStateManager() {
353     // These are 'expensive' to make involving trip to zk ensemble so allow
354     // sharing.
355     return this.tableStateManager;
356   }
357 
358   /**
359    * This SHOULD not be public. It is public now
360    * because of some unit tests.
361    *
362    * TODO: make it package private and keep RegionStates in the master package
363    */
getRegionStates()364   public RegionStates getRegionStates() {
365     return regionStates;
366   }
367 
368   /**
369    * Used in some tests to mock up region state in meta
370    */
371   @VisibleForTesting
getRegionStateStore()372   RegionStateStore getRegionStateStore() {
373     return regionStateStore;
374   }
375 
getRegionReopenPlan(HRegionInfo hri)376   public RegionPlan getRegionReopenPlan(HRegionInfo hri) {
377     return new RegionPlan(hri, null, regionStates.getRegionServerOfRegion(hri));
378   }
379 
380   /**
381    * Add a regionPlan for the specified region.
382    * @param encodedName
383    * @param plan
384    */
addPlan(String encodedName, RegionPlan plan)385   public void addPlan(String encodedName, RegionPlan plan) {
386     synchronized (regionPlans) {
387       regionPlans.put(encodedName, plan);
388     }
389   }
390 
391   /**
392    * Add a map of region plans.
393    */
addPlans(Map<String, RegionPlan> plans)394   public void addPlans(Map<String, RegionPlan> plans) {
395     synchronized (regionPlans) {
396       regionPlans.putAll(plans);
397     }
398   }
399 
400   /**
401    * Set the list of regions that will be reopened
402    * because of an update in table schema
403    *
404    * @param regions
405    *          list of regions that should be tracked for reopen
406    */
setRegionsToReopen(List <HRegionInfo> regions)407   public void setRegionsToReopen(List <HRegionInfo> regions) {
408     for(HRegionInfo hri : regions) {
409       regionsToReopen.put(hri.getEncodedName(), hri);
410     }
411   }
412 
413   /**
414    * Used by the client to identify if all regions have the schema updates
415    *
416    * @param tableName
417    * @return Pair indicating the status of the alter command
418    * @throws IOException
419    */
getReopenStatus(TableName tableName)420   public Pair<Integer, Integer> getReopenStatus(TableName tableName)
421       throws IOException {
422     List<HRegionInfo> hris;
423     if (TableName.META_TABLE_NAME.equals(tableName)) {
424       hris = new MetaTableLocator().getMetaRegions(server.getZooKeeper());
425     } else {
426       hris = MetaTableAccessor.getTableRegions(server.getZooKeeper(),
427         server.getConnection(), tableName, true);
428     }
429 
430     Integer pending = 0;
431     for (HRegionInfo hri : hris) {
432       String name = hri.getEncodedName();
433       // no lock concurrent access ok: sequential consistency respected.
434       if (regionsToReopen.containsKey(name)
435           || regionStates.isRegionInTransition(name)) {
436         pending++;
437       }
438     }
439     return new Pair<Integer, Integer>(pending, hris.size());
440   }
441 
442   /**
443    * Used by ServerShutdownHandler to make sure AssignmentManager has completed
444    * the failover cleanup before re-assigning regions of dead servers. So that
445    * when re-assignment happens, AssignmentManager has proper region states.
446    */
isFailoverCleanupDone()447   public boolean isFailoverCleanupDone() {
448     return failoverCleanupDone.get();
449   }
450 
451   /**
452    * To avoid racing with AM, external entities may need to lock a region,
453    * for example, when SSH checks what regions to skip re-assigning.
454    */
acquireRegionLock(final String encodedName)455   public Lock acquireRegionLock(final String encodedName) {
456     return locker.acquireLock(encodedName);
457   }
458 
459   /**
460    * Now, failover cleanup is completed. Notify server manager to
461    * process queued up dead servers processing, if any.
462    */
failoverCleanupDone()463   void failoverCleanupDone() {
464     failoverCleanupDone.set(true);
465     serverManager.processQueuedDeadServers();
466   }
467 
468   /**
469    * Called on startup.
470    * Figures whether a fresh cluster start of we are joining extant running cluster.
471    * @throws IOException
472    * @throws KeeperException
473    * @throws InterruptedException
474    * @throws CoordinatedStateException
475    */
joinCluster()476   void joinCluster() throws IOException,
477       KeeperException, InterruptedException, CoordinatedStateException {
478     long startTime = System.currentTimeMillis();
479     // Concurrency note: In the below the accesses on regionsInTransition are
480     // outside of a synchronization block where usually all accesses to RIT are
481     // synchronized.  The presumption is that in this case it is safe since this
482     // method is being played by a single thread on startup.
483 
484     // TODO: Regions that have a null location and are not in regionsInTransitions
485     // need to be handled.
486 
487     // Scan hbase:meta to build list of existing regions, servers, and assignment
488     // Returns servers who have not checked in (assumed dead) that some regions
489     // were assigned to (according to the meta)
490     Set<ServerName> deadServers = rebuildUserRegions();
491 
492     // This method will assign all user regions if a clean server startup or
493     // it will reconstruct master state and cleanup any leftovers from previous master process.
494     boolean failover = processDeadServersAndRegionsInTransition(deadServers);
495 
496     if (!useZKForAssignment) {
497       // Not use ZK for assignment any more, remove the ZNode
498       ZKUtil.deleteNodeRecursively(watcher, watcher.assignmentZNode);
499     }
500     recoverTableInDisablingState();
501     recoverTableInEnablingState();
502     LOG.info("Joined the cluster in " + (System.currentTimeMillis()
503       - startTime) + "ms, failover=" + failover);
504   }
505 
506   /**
507    * Process all regions that are in transition in zookeeper and also
508    * processes the list of dead servers.
509    * Used by master joining an cluster.  If we figure this is a clean cluster
510    * startup, will assign all user regions.
511    * @param deadServers Set of servers that are offline probably legitimately that were carrying
512    * regions according to a scan of hbase:meta. Can be null.
513    * @throws KeeperException
514    * @throws IOException
515    * @throws InterruptedException
516    */
processDeadServersAndRegionsInTransition(final Set<ServerName> deadServers)517   boolean processDeadServersAndRegionsInTransition(final Set<ServerName> deadServers)
518   throws KeeperException, IOException, InterruptedException, CoordinatedStateException {
519     List<String> nodes = ZKUtil.listChildrenNoWatch(watcher, watcher.assignmentZNode);
520 
521     if (useZKForAssignment && nodes == null) {
522       String errorMessage = "Failed to get the children from ZK";
523       server.abort(errorMessage, new IOException(errorMessage));
524       return true; // Doesn't matter in this case
525     }
526 
527     boolean failover = !serverManager.getDeadServers().isEmpty();
528     if (failover) {
529       // This may not be a failover actually, especially if meta is on this master.
530       if (LOG.isDebugEnabled()) {
531         LOG.debug("Found dead servers out on cluster " + serverManager.getDeadServers());
532       }
533     } else {
534       // If any one region except meta is assigned, it's a failover.
535       Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
536       for (Map.Entry<HRegionInfo, ServerName> en:
537           regionStates.getRegionAssignments().entrySet()) {
538         HRegionInfo hri = en.getKey();
539         if (!hri.isMetaTable()
540             && onlineServers.contains(en.getValue())) {
541           LOG.debug("Found " + hri + " out on cluster");
542           failover = true;
543           break;
544         }
545       }
546       if (!failover && nodes != null) {
547         // If any one region except meta is in transition, it's a failover.
548         for (String encodedName: nodes) {
549           RegionState regionState = regionStates.getRegionState(encodedName);
550           if (regionState != null && !regionState.getRegion().isMetaRegion()) {
551             LOG.debug("Found " + regionState + " in RITs");
552             failover = true;
553             break;
554           }
555         }
556       }
557     }
558     if (!failover && !useZKForAssignment) {
559       // If any region except meta is in transition on a live server, it's a failover.
560       Map<String, RegionState> regionsInTransition = regionStates.getRegionsInTransition();
561       if (!regionsInTransition.isEmpty()) {
562         Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
563         for (RegionState regionState: regionsInTransition.values()) {
564           ServerName serverName = regionState.getServerName();
565           if (!regionState.getRegion().isMetaRegion()
566               && serverName != null && onlineServers.contains(serverName)) {
567             LOG.debug("Found " + regionState + " in RITs");
568             failover = true;
569             break;
570           }
571         }
572       }
573     }
574     if (!failover) {
575       // If we get here, we have a full cluster restart. It is a failover only
576       // if there are some WALs are not split yet. For meta WALs, they should have
577       // been split already, if any. We can walk through those queued dead servers,
578       // if they don't have any WALs, this restart should be considered as a clean one
579       Set<ServerName> queuedDeadServers = serverManager.getRequeuedDeadServers().keySet();
580       if (!queuedDeadServers.isEmpty()) {
581         Configuration conf = server.getConfiguration();
582         Path rootdir = FSUtils.getRootDir(conf);
583         FileSystem fs = rootdir.getFileSystem(conf);
584         for (ServerName serverName: queuedDeadServers) {
585           // In the case of a clean exit, the shutdown handler would have presplit any WALs and
586           // removed empty directories.
587           Path logDir = new Path(rootdir,
588               DefaultWALProvider.getWALDirectoryName(serverName.toString()));
589           Path splitDir = logDir.suffix(DefaultWALProvider.SPLITTING_EXT);
590           if (fs.exists(logDir) || fs.exists(splitDir)) {
591             LOG.debug("Found queued dead server " + serverName);
592             failover = true;
593             break;
594           }
595         }
596         if (!failover) {
597           // We figured that it's not a failover, so no need to
598           // work on these re-queued dead servers any more.
599           LOG.info("AM figured that it's not a failover and cleaned up "
600             + queuedDeadServers.size() + " queued dead servers");
601           serverManager.removeRequeuedDeadServers();
602         }
603       }
604     }
605 
606     Set<TableName> disabledOrDisablingOrEnabling = null;
607     Map<HRegionInfo, ServerName> allRegions = null;
608 
609     if (!failover) {
610       disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
611         ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING,
612         ZooKeeperProtos.Table.State.ENABLING);
613 
614       // Clean re/start, mark all user regions closed before reassignment
615       allRegions = regionStates.closeAllUserRegions(
616         disabledOrDisablingOrEnabling);
617     }
618 
619     // Now region states are restored
620     regionStateStore.start();
621 
622     // If we found user regions out on cluster, its a failover.
623     if (failover) {
624       LOG.info("Found regions out on cluster or in RIT; presuming failover");
625       // Process list of dead servers and regions in RIT.
626       // See HBASE-4580 for more information.
627       processDeadServersAndRecoverLostRegions(deadServers);
628     }
629 
630     if (!failover && useZKForAssignment) {
631       // Cleanup any existing ZK nodes and start watching
632       ZKAssign.deleteAllNodes(watcher);
633       ZKUtil.listChildrenAndWatchForNewChildren(this.watcher,
634         this.watcher.assignmentZNode);
635     }
636 
637     // Now we can safely claim failover cleanup completed and enable
638     // ServerShutdownHandler for further processing. The nodes (below)
639     // in transition, if any, are for regions not related to those
640     // dead servers at all, and can be done in parallel to SSH.
641     failoverCleanupDone();
642     if (!failover) {
643       // Fresh cluster startup.
644       LOG.info("Clean cluster startup. Assigning user regions");
645       assignAllUserRegions(allRegions);
646     }
647     // unassign replicas of the split parents and the merged regions
648     // the daughter replicas are opened in assignAllUserRegions if it was
649     // not already opened.
650     for (HRegionInfo h : replicasToClose) {
651       unassign(h);
652     }
653     replicasToClose.clear();
654     return failover;
655   }
656 
657   /**
658    * If region is up in zk in transition, then do fixup and block and wait until
659    * the region is assigned and out of transition.  Used on startup for
660    * catalog regions.
661    * @param hri Region to look for.
662    * @return True if we processed a region in transition else false if region
663    * was not up in zk in transition.
664    * @throws InterruptedException
665    * @throws KeeperException
666    * @throws IOException
667    */
processRegionInTransitionAndBlockUntilAssigned(final HRegionInfo hri)668   boolean processRegionInTransitionAndBlockUntilAssigned(final HRegionInfo hri)
669       throws InterruptedException, KeeperException, IOException {
670     String encodedRegionName = hri.getEncodedName();
671     if (!processRegionInTransition(encodedRegionName, hri)) {
672       return false; // The region is not in transition
673     }
674     LOG.debug("Waiting on " + HRegionInfo.prettyPrint(encodedRegionName));
675     while (!this.server.isStopped() &&
676         this.regionStates.isRegionInTransition(encodedRegionName)) {
677       RegionState state = this.regionStates.getRegionTransitionState(encodedRegionName);
678       if (state == null || !serverManager.isServerOnline(state.getServerName())) {
679         // The region is not in transition, or not in transition on an online
680         // server. Doesn't help to block here any more. Caller need to
681         // verify the region is actually assigned.
682         break;
683       }
684       this.regionStates.waitForUpdate(100);
685     }
686     return true;
687   }
688 
689   /**
690    * Process failover of new master for region <code>encodedRegionName</code>
691    * up in zookeeper.
692    * @param encodedRegionName Region to process failover for.
693    * @param regionInfo If null we'll go get it from meta table.
694    * @return True if we processed <code>regionInfo</code> as a RIT.
695    * @throws KeeperException
696    * @throws IOException
697    */
processRegionInTransition(final String encodedRegionName, final HRegionInfo regionInfo)698   boolean processRegionInTransition(final String encodedRegionName,
699       final HRegionInfo regionInfo) throws KeeperException, IOException {
700     // We need a lock here to ensure that we will not put the same region twice
701     // It has no reason to be a lock shared with the other operations.
702     // We can do the lock on the region only, instead of a global lock: what we want to ensure
703     // is that we don't have two threads working on the same region.
704     Lock lock = locker.acquireLock(encodedRegionName);
705     try {
706       Stat stat = new Stat();
707       byte [] data = ZKAssign.getDataAndWatch(watcher, encodedRegionName, stat);
708       if (data == null) return false;
709       RegionTransition rt;
710       try {
711         rt = RegionTransition.parseFrom(data);
712       } catch (DeserializationException e) {
713         LOG.warn("Failed parse znode data", e);
714         return false;
715       }
716       HRegionInfo hri = regionInfo;
717       if (hri == null) {
718         // The region info is not passed in. We will try to find the region
719         // from region states map/meta based on the encoded region name. But we
720         // may not be able to find it. This is valid for online merge that
721         // the region may have not been created if the merge is not completed.
722         // Therefore, it is not in meta at master recovery time.
723         hri = regionStates.getRegionInfo(rt.getRegionName());
724         EventType et = rt.getEventType();
725         if (hri == null && et != EventType.RS_ZK_REGION_MERGING
726             && et != EventType.RS_ZK_REQUEST_REGION_MERGE) {
727           LOG.warn("Couldn't find the region in recovering " + rt);
728           return false;
729         }
730       }
731 
732       // TODO: This code is tied to ZK anyway, so for now leaving it as is,
733       // will refactor when whole region assignment will be abstracted from ZK
734       BaseCoordinatedStateManager cp =
735         (BaseCoordinatedStateManager) this.server.getCoordinatedStateManager();
736       OpenRegionCoordination openRegionCoordination = cp.getOpenRegionCoordination();
737 
738       ZkOpenRegionCoordination.ZkOpenRegionDetails zkOrd =
739         new ZkOpenRegionCoordination.ZkOpenRegionDetails();
740       zkOrd.setVersion(stat.getVersion());
741       zkOrd.setServerName(cp.getServer().getServerName());
742 
743       return processRegionsInTransition(
744         rt, hri, openRegionCoordination, zkOrd);
745     } finally {
746       lock.unlock();
747     }
748   }
749 
750   /**
751    * This call is invoked only (1) master assign meta;
752    * (2) during failover mode startup, zk assignment node processing.
753    * The locker is set in the caller. It returns true if the region
754    * is in transition for sure, false otherwise.
755    *
756    * It should be private but it is used by some test too.
757    */
processRegionsInTransition( final RegionTransition rt, final HRegionInfo regionInfo, OpenRegionCoordination coordination, final OpenRegionCoordination.OpenRegionDetails ord)758   boolean processRegionsInTransition(
759       final RegionTransition rt, final HRegionInfo regionInfo,
760       OpenRegionCoordination coordination,
761       final OpenRegionCoordination.OpenRegionDetails ord) throws KeeperException {
762     EventType et = rt.getEventType();
763     // Get ServerName.  Could not be null.
764     final ServerName sn = rt.getServerName();
765     final byte[] regionName = rt.getRegionName();
766     final String encodedName = HRegionInfo.encodeRegionName(regionName);
767     final String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
768     LOG.info("Processing " + prettyPrintedRegionName + " in state: " + et);
769 
770     if (regionStates.isRegionInTransition(encodedName)
771         && (regionInfo.isMetaRegion() || !useZKForAssignment)) {
772       LOG.info("Processed region " + prettyPrintedRegionName + " in state: "
773         + et + ", does nothing since the region is already in transition "
774         + regionStates.getRegionTransitionState(encodedName));
775       // Just return
776       return true;
777     }
778     if (!serverManager.isServerOnline(sn)) {
779       // It was transitioning on a dead server, so it's closed now.
780       // Force to OFFLINE and put it in transition, but not assign it
781       // since log splitting for the dead server is not done yet.
782       LOG.debug("RIT " + encodedName + " in state=" + rt.getEventType() +
783         " was on deadserver; forcing offline");
784       if (regionStates.isRegionOnline(regionInfo)) {
785         // Meta could still show the region is assigned to the previous
786         // server. If that server is online, when we reload the meta, the
787         // region is put back to online, we need to offline it.
788         regionStates.regionOffline(regionInfo);
789         sendRegionClosedNotification(regionInfo);
790       }
791       // Put it back in transition so that SSH can re-assign it
792       regionStates.updateRegionState(regionInfo, State.OFFLINE, sn);
793 
794       if (regionInfo.isMetaRegion()) {
795         // If it's meta region, reset the meta location.
796         // So that master knows the right meta region server.
797         MetaTableLocator.setMetaLocation(watcher, sn, State.OPEN);
798       } else {
799         // No matter the previous server is online or offline,
800         // we need to reset the last region server of the region.
801         regionStates.setLastRegionServerOfRegion(sn, encodedName);
802         // Make sure we know the server is dead.
803         if (!serverManager.isServerDead(sn)) {
804           serverManager.expireServer(sn);
805         }
806       }
807       return false;
808     }
809     switch (et) {
810       case M_ZK_REGION_CLOSING:
811         // Insert into RIT & resend the query to the region server: may be the previous master
812         // died before sending the query the first time.
813         final RegionState rsClosing = regionStates.updateRegionState(rt, State.CLOSING);
814         this.executorService.submit(
815           new EventHandler(server, EventType.M_MASTER_RECOVERY) {
816             @Override
817             public void process() throws IOException {
818               ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
819               try {
820                 final int expectedVersion = ((ZkOpenRegionCoordination.ZkOpenRegionDetails) ord)
821                   .getVersion();
822                 unassign(regionInfo, rsClosing, expectedVersion, null, useZKForAssignment, null);
823                 if (regionStates.isRegionOffline(regionInfo)) {
824                   assign(regionInfo, true);
825                 }
826               } finally {
827                 lock.unlock();
828               }
829             }
830           });
831         break;
832 
833       case RS_ZK_REGION_CLOSED:
834       case RS_ZK_REGION_FAILED_OPEN:
835         // Region is closed, insert into RIT and handle it
836         regionStates.setLastRegionServerOfRegion(sn, encodedName);
837         regionStates.updateRegionState(regionInfo, State.CLOSED, sn);
838         if (!replicasToClose.contains(regionInfo)) {
839           invokeAssign(regionInfo);
840         } else {
841           offlineDisabledRegion(regionInfo);
842         }
843         break;
844 
845       case M_ZK_REGION_OFFLINE:
846         // Insert in RIT and resend to the regionserver
847         regionStates.updateRegionState(rt, State.PENDING_OPEN);
848         final RegionState rsOffline = regionStates.getRegionState(regionInfo);
849         this.executorService.submit(
850           new EventHandler(server, EventType.M_MASTER_RECOVERY) {
851             @Override
852             public void process() throws IOException {
853               ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
854               try {
855                 RegionPlan plan = new RegionPlan(regionInfo, null, sn);
856                 addPlan(encodedName, plan);
857                 assign(rsOffline, false, false);
858               } finally {
859                 lock.unlock();
860               }
861             }
862           });
863         break;
864 
865       case RS_ZK_REGION_OPENING:
866         regionStates.updateRegionState(rt, State.OPENING);
867         break;
868 
869       case RS_ZK_REGION_OPENED:
870         // Region is opened, insert into RIT and handle it
871         // This could be done asynchronously, we would need then to acquire the lock in the
872         //  handler.
873         regionStates.updateRegionState(rt, State.OPEN);
874         new OpenedRegionHandler(server, this, regionInfo, coordination, ord).process();
875         break;
876       case RS_ZK_REQUEST_REGION_SPLIT:
877       case RS_ZK_REGION_SPLITTING:
878       case RS_ZK_REGION_SPLIT:
879         // Splitting region should be online. We could have skipped it during
880         // user region rebuilding since we may consider the split is completed.
881         // Put it in SPLITTING state to avoid complications.
882         regionStates.regionOnline(regionInfo, sn);
883         regionStates.updateRegionState(rt, State.SPLITTING);
884         if (!handleRegionSplitting(
885             rt, encodedName, prettyPrintedRegionName, sn)) {
886           deleteSplittingNode(encodedName, sn);
887         }
888         break;
889       case RS_ZK_REQUEST_REGION_MERGE:
890       case RS_ZK_REGION_MERGING:
891       case RS_ZK_REGION_MERGED:
892         if (!handleRegionMerging(
893             rt, encodedName, prettyPrintedRegionName, sn)) {
894           deleteMergingNode(encodedName, sn);
895         }
896         break;
897       default:
898         throw new IllegalStateException("Received region in state:" + et + " is not valid.");
899     }
900     LOG.info("Processed region " + prettyPrintedRegionName + " in state "
901       + et + ", on " + (serverManager.isServerOnline(sn) ? "" : "dead ")
902       + "server: " + sn);
903     return true;
904   }
905 
906   /**
907    * When a region is closed, it should be removed from the regionsToReopen
908    * @param hri HRegionInfo of the region which was closed
909    */
removeClosedRegion(HRegionInfo hri)910   public void removeClosedRegion(HRegionInfo hri) {
911     if (regionsToReopen.remove(hri.getEncodedName()) != null) {
912       LOG.debug("Removed region from reopening regions because it was closed");
913     }
914   }
915 
916   /**
917    * Handles various states an unassigned node can be in.
918    * <p>
919    * Method is called when a state change is suspected for an unassigned node.
920    * <p>
921    * This deals with skipped transitions (we got a CLOSED but didn't see CLOSING
922    * yet).
923    * @param rt region transition
924    * @param coordination coordination for opening region
925    * @param ord details about opening region
926    */
927   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
928       value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
929       justification="Needs work; says access to ConcurrentHashMaps not ATOMIC!!!")
handleRegion(final RegionTransition rt, OpenRegionCoordination coordination, OpenRegionCoordination.OpenRegionDetails ord)930   void handleRegion(final RegionTransition rt, OpenRegionCoordination coordination,
931                     OpenRegionCoordination.OpenRegionDetails ord) {
932     if (rt == null) {
933       LOG.warn("Unexpected NULL input for RegionTransition rt");
934       return;
935     }
936     final ServerName sn = rt.getServerName();
937     // Check if this is a special HBCK transition
938     if (sn.equals(HBCK_CODE_SERVERNAME)) {
939       handleHBCK(rt);
940       return;
941     }
942     final long createTime = rt.getCreateTime();
943     final byte[] regionName = rt.getRegionName();
944     String encodedName = HRegionInfo.encodeRegionName(regionName);
945     String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
946     // Verify this is a known server
947     if (!serverManager.isServerOnline(sn)
948       && !ignoreStatesRSOffline.contains(rt.getEventType())) {
949       LOG.warn("Attempted to handle region transition for server but " +
950         "it is not online: " + prettyPrintedRegionName + ", " + rt);
951       return;
952     }
953 
954     RegionState regionState =
955       regionStates.getRegionState(encodedName);
956     long startTime = System.currentTimeMillis();
957     if (LOG.isDebugEnabled()) {
958       boolean lateEvent = createTime < (startTime - 15000);
959       LOG.debug("Handling " + rt.getEventType() +
960         ", server=" + sn + ", region=" +
961         (prettyPrintedRegionName == null ? "null" : prettyPrintedRegionName) +
962         (lateEvent ? ", which is more than 15 seconds late" : "") +
963         ", current_state=" + regionState);
964     }
965     // We don't do anything for this event,
966     // so separate it out, no need to lock/unlock anything
967     if (rt.getEventType() == EventType.M_ZK_REGION_OFFLINE) {
968       return;
969     }
970 
971     // We need a lock on the region as we could update it
972     Lock lock = locker.acquireLock(encodedName);
973     try {
974       RegionState latestState =
975         regionStates.getRegionState(encodedName);
976       if ((regionState == null && latestState != null)
977           || (regionState != null && latestState == null)
978           || (regionState != null && latestState != null
979             && latestState.getState() != regionState.getState())) {
980         LOG.warn("Region state changed from " + regionState + " to "
981           + latestState + ", while acquiring lock");
982       }
983       long waitedTime = System.currentTimeMillis() - startTime;
984       if (waitedTime > 5000) {
985         LOG.warn("Took " + waitedTime + "ms to acquire the lock");
986       }
987       regionState = latestState;
988       switch (rt.getEventType()) {
989       case RS_ZK_REQUEST_REGION_SPLIT:
990       case RS_ZK_REGION_SPLITTING:
991       case RS_ZK_REGION_SPLIT:
992         if (!handleRegionSplitting(
993             rt, encodedName, prettyPrintedRegionName, sn)) {
994           deleteSplittingNode(encodedName, sn);
995         }
996         break;
997 
998       case RS_ZK_REQUEST_REGION_MERGE:
999       case RS_ZK_REGION_MERGING:
1000       case RS_ZK_REGION_MERGED:
1001         // Merged region is a new region, we can't find it in the region states now.
1002         // However, the two merging regions are not new. They should be in state for merging.
1003         if (!handleRegionMerging(
1004             rt, encodedName, prettyPrintedRegionName, sn)) {
1005           deleteMergingNode(encodedName, sn);
1006         }
1007         break;
1008 
1009       case M_ZK_REGION_CLOSING:
1010         // Should see CLOSING after we have asked it to CLOSE or additional
1011         // times after already being in state of CLOSING
1012         if (regionState == null
1013             || !regionState.isPendingCloseOrClosingOnServer(sn)) {
1014           LOG.warn("Received CLOSING for " + prettyPrintedRegionName
1015             + " from " + sn + " but the region isn't PENDING_CLOSE/CLOSING here: "
1016             + regionStates.getRegionState(encodedName));
1017           return;
1018         }
1019         // Transition to CLOSING (or update stamp if already CLOSING)
1020         regionStates.updateRegionState(rt, State.CLOSING);
1021         break;
1022 
1023       case RS_ZK_REGION_CLOSED:
1024         // Should see CLOSED after CLOSING but possible after PENDING_CLOSE
1025         if (regionState == null
1026             || !regionState.isPendingCloseOrClosingOnServer(sn)) {
1027           LOG.warn("Received CLOSED for " + prettyPrintedRegionName
1028             + " from " + sn + " but the region isn't PENDING_CLOSE/CLOSING here: "
1029             + regionStates.getRegionState(encodedName));
1030           return;
1031         }
1032         // Handle CLOSED by assigning elsewhere or stopping if a disable
1033         // If we got here all is good.  Need to update RegionState -- else
1034         // what follows will fail because not in expected state.
1035         new ClosedRegionHandler(server, this, regionState.getRegion()).process();
1036         updateClosedRegionHandlerTracker(regionState.getRegion());
1037         break;
1038 
1039         case RS_ZK_REGION_FAILED_OPEN:
1040           if (regionState == null
1041               || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
1042             LOG.warn("Received FAILED_OPEN for " + prettyPrintedRegionName
1043               + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
1044               + regionStates.getRegionState(encodedName));
1045             return;
1046           }
1047           AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
1048           if (failedOpenCount == null) {
1049             failedOpenCount = new AtomicInteger();
1050             // No need to use putIfAbsent, or extra synchronization since
1051             // this whole handleRegion block is locked on the encoded region
1052             // name, and failedOpenTracker is updated only in this block
1053             // FindBugs: AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION
1054             failedOpenTracker.put(encodedName, failedOpenCount);
1055           }
1056           if (failedOpenCount.incrementAndGet() >= maximumAttempts) {
1057             // FindBugs: AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION
1058             regionStates.updateRegionState(rt, State.FAILED_OPEN);
1059             // remove the tracking info to save memory, also reset
1060             // the count for next open initiative
1061             failedOpenTracker.remove(encodedName);
1062           } else {
1063             // Handle this the same as if it were opened and then closed.
1064             regionState = regionStates.updateRegionState(rt, State.CLOSED);
1065             if (regionState != null) {
1066               // When there are more than one region server a new RS is selected as the
1067               // destination and the same is updated in the regionplan. (HBASE-5546)
1068               try {
1069                 getRegionPlan(regionState.getRegion(), sn, true);
1070                 new ClosedRegionHandler(server, this, regionState.getRegion()).process();
1071               } catch (HBaseIOException e) {
1072                 LOG.warn("Failed to get region plan", e);
1073               }
1074             }
1075           }
1076           break;
1077 
1078         case RS_ZK_REGION_OPENING:
1079           // Should see OPENING after we have asked it to OPEN or additional
1080           // times after already being in state of OPENING
1081           if (regionState == null
1082               || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
1083             LOG.warn("Received OPENING for " + prettyPrintedRegionName
1084               + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
1085               + regionStates.getRegionState(encodedName));
1086             return;
1087           }
1088           // Transition to OPENING (or update stamp if already OPENING)
1089           regionStates.updateRegionState(rt, State.OPENING);
1090           break;
1091 
1092         case RS_ZK_REGION_OPENED:
1093           // Should see OPENED after OPENING but possible after PENDING_OPEN.
1094           if (regionState == null
1095               || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
1096             LOG.warn("Received OPENED for " + prettyPrintedRegionName
1097               + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
1098               + regionStates.getRegionState(encodedName));
1099 
1100             if (regionState != null) {
1101               // Close it without updating the internal region states,
1102               // so as not to create double assignments in unlucky scenarios
1103               // mentioned in OpenRegionHandler#process
1104               unassign(regionState.getRegion(), null, -1, null, false, sn);
1105             }
1106             return;
1107           }
1108           // Handle OPENED by removing from transition and deleted zk node
1109           regionState =
1110               regionStates.transitionOpenFromPendingOpenOrOpeningOnServer(rt,regionState, sn);
1111           if (regionState != null) {
1112             failedOpenTracker.remove(encodedName); // reset the count, if any
1113             new OpenedRegionHandler(
1114               server, this, regionState.getRegion(), coordination, ord).process();
1115             updateOpenedRegionHandlerTracker(regionState.getRegion());
1116           }
1117           break;
1118 
1119         default:
1120           throw new IllegalStateException("Received event is not valid.");
1121       }
1122     } finally {
1123       lock.unlock();
1124     }
1125   }
1126 
1127   //For unit tests only
wasClosedHandlerCalled(HRegionInfo hri)1128   boolean wasClosedHandlerCalled(HRegionInfo hri) {
1129     AtomicBoolean b = closedRegionHandlerCalled.get(hri);
1130     //compareAndSet to be sure that unit tests don't see stale values. Means,
1131     //we will return true exactly once unless the handler code resets to true
1132     //this value.
1133     return b == null ? false : b.compareAndSet(true, false);
1134   }
1135 
1136   //For unit tests only
wasOpenedHandlerCalled(HRegionInfo hri)1137   boolean wasOpenedHandlerCalled(HRegionInfo hri) {
1138     AtomicBoolean b = openedRegionHandlerCalled.get(hri);
1139     //compareAndSet to be sure that unit tests don't see stale values. Means,
1140     //we will return true exactly once unless the handler code resets to true
1141     //this value.
1142     return b == null ? false : b.compareAndSet(true, false);
1143   }
1144 
1145   //For unit tests only
initializeHandlerTrackers()1146   void initializeHandlerTrackers() {
1147     closedRegionHandlerCalled = new HashMap<HRegionInfo, AtomicBoolean>();
1148     openedRegionHandlerCalled = new HashMap<HRegionInfo, AtomicBoolean>();
1149   }
1150 
updateClosedRegionHandlerTracker(HRegionInfo hri)1151   void updateClosedRegionHandlerTracker(HRegionInfo hri) {
1152     if (closedRegionHandlerCalled != null) { //only for unit tests this is true
1153       closedRegionHandlerCalled.put(hri, new AtomicBoolean(true));
1154     }
1155   }
1156 
updateOpenedRegionHandlerTracker(HRegionInfo hri)1157   void updateOpenedRegionHandlerTracker(HRegionInfo hri) {
1158     if (openedRegionHandlerCalled != null) { //only for unit tests this is true
1159       openedRegionHandlerCalled.put(hri, new AtomicBoolean(true));
1160     }
1161   }
1162 
1163   // TODO: processFavoredNodes might throw an exception, for e.g., if the
1164   // meta could not be contacted/updated. We need to see how seriously to treat
1165   // this problem as. Should we fail the current assignment. We should be able
1166   // to recover from this problem eventually (if the meta couldn't be updated
1167   // things should work normally and eventually get fixed up).
processFavoredNodes(List<HRegionInfo> regions)1168   void processFavoredNodes(List<HRegionInfo> regions) throws IOException {
1169     if (!shouldAssignRegionsWithFavoredNodes) return;
1170     // The AM gets the favored nodes info for each region and updates the meta
1171     // table with that info
1172     Map<HRegionInfo, List<ServerName>> regionToFavoredNodes =
1173         new HashMap<HRegionInfo, List<ServerName>>();
1174     for (HRegionInfo region : regions) {
1175       regionToFavoredNodes.put(region,
1176           ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region));
1177     }
1178     FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(regionToFavoredNodes,
1179       this.server.getConnection());
1180   }
1181 
1182   /**
1183    * Handle a ZK unassigned node transition triggered by HBCK repair tool.
1184    * <p>
1185    * This is handled in a separate code path because it breaks the normal rules.
1186    * @param rt
1187    */
1188   @SuppressWarnings("deprecation")
handleHBCK(RegionTransition rt)1189   private void handleHBCK(RegionTransition rt) {
1190     String encodedName = HRegionInfo.encodeRegionName(rt.getRegionName());
1191     LOG.info("Handling HBCK triggered transition=" + rt.getEventType() +
1192       ", server=" + rt.getServerName() + ", region=" +
1193       HRegionInfo.prettyPrint(encodedName));
1194     RegionState regionState = regionStates.getRegionTransitionState(encodedName);
1195     switch (rt.getEventType()) {
1196       case M_ZK_REGION_OFFLINE:
1197         HRegionInfo regionInfo;
1198         if (regionState != null) {
1199           regionInfo = regionState.getRegion();
1200         } else {
1201           try {
1202             byte [] name = rt.getRegionName();
1203             Pair<HRegionInfo, ServerName> p = MetaTableAccessor.getRegion(
1204               this.server.getConnection(), name);
1205             regionInfo = p.getFirst();
1206           } catch (IOException e) {
1207             LOG.info("Exception reading hbase:meta doing HBCK repair operation", e);
1208             return;
1209           }
1210         }
1211         LOG.info("HBCK repair is triggering assignment of region=" +
1212             regionInfo.getRegionNameAsString());
1213         // trigger assign, node is already in OFFLINE so don't need to update ZK
1214         assign(regionInfo, false);
1215         break;
1216 
1217       default:
1218         LOG.warn("Received unexpected region state from HBCK: " + rt.toString());
1219         break;
1220     }
1221 
1222   }
1223 
1224   // ZooKeeper events
1225 
1226   /**
1227    * New unassigned node has been created.
1228    *
1229    * <p>This happens when an RS begins the OPENING or CLOSING of a region by
1230    * creating an unassigned node.
1231    *
1232    * <p>When this happens we must:
1233    * <ol>
1234    *   <li>Watch the node for further events</li>
1235    *   <li>Read and handle the state in the node</li>
1236    * </ol>
1237    */
1238   @Override
nodeCreated(String path)1239   public void nodeCreated(String path) {
1240     handleAssignmentEvent(path);
1241   }
1242 
1243   /**
1244    * Existing unassigned node has had data changed.
1245    *
1246    * <p>This happens when an RS transitions from OFFLINE to OPENING, or between
1247    * OPENING/OPENED and CLOSING/CLOSED.
1248    *
1249    * <p>When this happens we must:
1250    * <ol>
1251    *   <li>Watch the node for further events</li>
1252    *   <li>Read and handle the state in the node</li>
1253    * </ol>
1254    */
1255   @Override
nodeDataChanged(String path)1256   public void nodeDataChanged(String path) {
1257     handleAssignmentEvent(path);
1258   }
1259 
1260 
1261   // We  don't want to have two events on the same region managed simultaneously.
1262   // For this reason, we need to wait if an event on the same region is currently in progress.
1263   // So we track the region names of the events in progress, and we keep a waiting list.
1264   private final Set<String> regionsInProgress = new HashSet<String>();
1265   // In a LinkedHashMultimap, the put order is kept when we retrieve the collection back. We need
1266   //  this as we want the events to be managed in the same order as we received them.
1267   private final LinkedHashMultimap <String, RegionRunnable>
1268       zkEventWorkerWaitingList = LinkedHashMultimap.create();
1269 
1270   /**
1271    * A specific runnable that works only on a region.
1272    */
1273   private interface RegionRunnable extends Runnable{
1274     /**
1275      * @return - the name of the region it works on.
1276      */
getRegionName()1277     String getRegionName();
1278   }
1279 
1280   /**
1281    * Submit a task, ensuring that there is only one task at a time that working on a given region.
1282    * Order is respected.
1283    */
zkEventWorkersSubmit(final RegionRunnable regRunnable)1284   protected void zkEventWorkersSubmit(final RegionRunnable regRunnable) {
1285 
1286     synchronized (regionsInProgress) {
1287       // If we're there is already a task with this region, we add it to the
1288       //  waiting list and return.
1289       if (regionsInProgress.contains(regRunnable.getRegionName())) {
1290         synchronized (zkEventWorkerWaitingList){
1291           zkEventWorkerWaitingList.put(regRunnable.getRegionName(), regRunnable);
1292         }
1293         return;
1294       }
1295 
1296       // No event in progress on this region => we can submit a new task immediately.
1297       regionsInProgress.add(regRunnable.getRegionName());
1298       zkEventWorkers.submit(new Runnable() {
1299         @Override
1300         public void run() {
1301           try {
1302             regRunnable.run();
1303           } finally {
1304             // now that we have finished, let's see if there is an event for the same region in the
1305             //  waiting list. If it's the case, we can now submit it to the pool.
1306             synchronized (regionsInProgress) {
1307               regionsInProgress.remove(regRunnable.getRegionName());
1308               synchronized (zkEventWorkerWaitingList) {
1309                 java.util.Set<RegionRunnable> waiting = zkEventWorkerWaitingList.get(
1310                     regRunnable.getRegionName());
1311                 if (!waiting.isEmpty()) {
1312                   // We want the first object only. The only way to get it is through an iterator.
1313                   RegionRunnable toSubmit = waiting.iterator().next();
1314                   zkEventWorkerWaitingList.remove(toSubmit.getRegionName(), toSubmit);
1315                   zkEventWorkersSubmit(toSubmit);
1316                 }
1317               }
1318             }
1319           }
1320         }
1321       });
1322     }
1323   }
1324 
1325   @Override
nodeDeleted(final String path)1326   public void nodeDeleted(final String path) {
1327     if (path.startsWith(watcher.assignmentZNode)) {
1328       final String regionName = ZKAssign.getRegionName(watcher, path);
1329       zkEventWorkersSubmit(new RegionRunnable() {
1330         @Override
1331         public String getRegionName() {
1332           return regionName;
1333         }
1334 
1335         @Override
1336         public void run() {
1337           Lock lock = locker.acquireLock(regionName);
1338           try {
1339             RegionState rs = regionStates.getRegionTransitionState(regionName);
1340             if (rs == null) {
1341               rs = regionStates.getRegionState(regionName);
1342               if (rs == null || !rs.isMergingNew()) {
1343                 // MergingNew is an offline state
1344                 return;
1345               }
1346             }
1347 
1348             HRegionInfo regionInfo = rs.getRegion();
1349             String regionNameStr = regionInfo.getRegionNameAsString();
1350             LOG.debug("Znode " + regionNameStr + " deleted, state: " + rs);
1351 
1352             boolean disabled = getTableStateManager().isTableState(regionInfo.getTable(),
1353                 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING);
1354 
1355             ServerName serverName = rs.getServerName();
1356             if (serverManager.isServerOnline(serverName)) {
1357               if (rs.isOnServer(serverName) && (rs.isOpened() || rs.isSplitting())) {
1358                 synchronized (regionStates) {
1359                   regionOnline(regionInfo, serverName);
1360                   if (rs.isSplitting() && splitRegions.containsKey(regionInfo)) {
1361                     // Check if the daugter regions are still there, if they are present, offline
1362                     // as its the case of a rollback.
1363                     HRegionInfo hri_a = splitRegions.get(regionInfo).getFirst();
1364                     HRegionInfo hri_b = splitRegions.get(regionInfo).getSecond();
1365                     if (!regionStates.isRegionInTransition(hri_a.getEncodedName())) {
1366                       LOG.warn("Split daughter region not in transition " + hri_a);
1367                     }
1368                     if (!regionStates.isRegionInTransition(hri_b.getEncodedName())) {
1369                       LOG.warn("Split daughter region not in transition" + hri_b);
1370                     }
1371                     regionOffline(hri_a);
1372                     regionOffline(hri_b);
1373                     splitRegions.remove(regionInfo);
1374                   }
1375                   if (disabled) {
1376                     // if server is offline, no hurt to unassign again
1377                     LOG.info("Opened " + regionNameStr
1378                         + "but this table is disabled, triggering close of region");
1379                     unassign(regionInfo);
1380                   }
1381                 }
1382               } else if (rs.isMergingNew()) {
1383                 synchronized (regionStates) {
1384                   String p = regionInfo.getEncodedName();
1385                   PairOfSameType<HRegionInfo> regions = mergingRegions.get(p);
1386                   if (regions != null) {
1387                     onlineMergingRegion(disabled, regions.getFirst(), serverName);
1388                     onlineMergingRegion(disabled, regions.getSecond(), serverName);
1389                   }
1390                 }
1391               }
1392             }
1393           } finally {
1394             lock.unlock();
1395           }
1396         }
1397 
1398         private void onlineMergingRegion(boolean disabled,
1399             final HRegionInfo hri, final ServerName serverName) {
1400           RegionState regionState = regionStates.getRegionState(hri);
1401           if (regionState != null && regionState.isMerging()
1402               && regionState.isOnServer(serverName)) {
1403             regionOnline(regionState.getRegion(), serverName);
1404             if (disabled) {
1405               unassign(hri);
1406             }
1407           }
1408         }
1409       });
1410     }
1411   }
1412 
1413   /**
1414    * New unassigned node has been created.
1415    *
1416    * <p>This happens when an RS begins the OPENING, SPLITTING or CLOSING of a
1417    * region by creating a znode.
1418    *
1419    * <p>When this happens we must:
1420    * <ol>
1421    *   <li>Watch the node for further children changed events</li>
1422    *   <li>Watch all new children for changed events</li>
1423    * </ol>
1424    */
1425   @Override
nodeChildrenChanged(String path)1426   public void nodeChildrenChanged(String path) {
1427     if (path.equals(watcher.assignmentZNode)) {
1428       zkEventWorkers.submit(new Runnable() {
1429         @Override
1430         public void run() {
1431           try {
1432             // Just make sure we see the changes for the new znodes
1433             List<String> children =
1434               ZKUtil.listChildrenAndWatchForNewChildren(
1435                 watcher, watcher.assignmentZNode);
1436             if (children != null) {
1437               Stat stat = new Stat();
1438               for (String child : children) {
1439                 // if region is in transition, we already have a watch
1440                 // on it, so no need to watch it again. So, as I know for now,
1441                 // this is needed to watch splitting nodes only.
1442                 if (!regionStates.isRegionInTransition(child)) {
1443                   ZKAssign.getDataAndWatch(watcher, child, stat);
1444                 }
1445               }
1446             }
1447           } catch (KeeperException e) {
1448             server.abort("Unexpected ZK exception reading unassigned children", e);
1449           }
1450         }
1451       });
1452     }
1453   }
1454 
1455 
1456   /**
1457    * Marks the region as online.  Removes it from regions in transition and
1458    * updates the in-memory assignment information.
1459    * <p>
1460    * Used when a region has been successfully opened on a region server.
1461    * @param regionInfo
1462    * @param sn
1463    */
regionOnline(HRegionInfo regionInfo, ServerName sn)1464   void regionOnline(HRegionInfo regionInfo, ServerName sn) {
1465     regionOnline(regionInfo, sn, HConstants.NO_SEQNUM);
1466   }
1467 
regionOnline(HRegionInfo regionInfo, ServerName sn, long openSeqNum)1468   void regionOnline(HRegionInfo regionInfo, ServerName sn, long openSeqNum) {
1469     numRegionsOpened.incrementAndGet();
1470     regionStates.regionOnline(regionInfo, sn, openSeqNum);
1471 
1472     // Remove plan if one.
1473     clearRegionPlan(regionInfo);
1474     balancer.regionOnline(regionInfo, sn);
1475 
1476     // Tell our listeners that a region was opened
1477     sendRegionOpenedNotification(regionInfo, sn);
1478   }
1479 
1480   /**
1481    * Pass the assignment event to a worker for processing.
1482    * Each worker is a single thread executor service.  The reason
1483    * for just one thread is to make sure all events for a given
1484    * region are processed in order.
1485    *
1486    * @param path
1487    */
handleAssignmentEvent(final String path)1488   private void handleAssignmentEvent(final String path) {
1489     if (path.startsWith(watcher.assignmentZNode)) {
1490       final String regionName = ZKAssign.getRegionName(watcher, path);
1491 
1492       zkEventWorkersSubmit(new RegionRunnable() {
1493         @Override
1494         public String getRegionName() {
1495           return regionName;
1496         }
1497 
1498         @Override
1499         public void run() {
1500           try {
1501             Stat stat = new Stat();
1502             byte [] data = ZKAssign.getDataAndWatch(watcher, path, stat);
1503             if (data == null) return;
1504 
1505             RegionTransition rt = RegionTransition.parseFrom(data);
1506 
1507             // TODO: This code is tied to ZK anyway, so for now leaving it as is,
1508             // will refactor when whole region assignment will be abstracted from ZK
1509             BaseCoordinatedStateManager csm =
1510               (BaseCoordinatedStateManager) server.getCoordinatedStateManager();
1511             OpenRegionCoordination openRegionCoordination = csm.getOpenRegionCoordination();
1512 
1513             ZkOpenRegionCoordination.ZkOpenRegionDetails zkOrd =
1514               new ZkOpenRegionCoordination.ZkOpenRegionDetails();
1515             zkOrd.setVersion(stat.getVersion());
1516             zkOrd.setServerName(csm.getServer().getServerName());
1517 
1518             handleRegion(rt, openRegionCoordination, zkOrd);
1519           } catch (KeeperException e) {
1520             server.abort("Unexpected ZK exception reading unassigned node data", e);
1521           } catch (DeserializationException e) {
1522             server.abort("Unexpected exception deserializing node data", e);
1523           }
1524         }
1525       });
1526     }
1527   }
1528 
1529   /**
1530    * Marks the region as offline.  Removes it from regions in transition and
1531    * removes in-memory assignment information.
1532    * <p>
1533    * Used when a region has been closed and should remain closed.
1534    * @param regionInfo
1535    */
regionOffline(final HRegionInfo regionInfo)1536   public void regionOffline(final HRegionInfo regionInfo) {
1537     regionOffline(regionInfo, null);
1538   }
1539 
offlineDisabledRegion(HRegionInfo regionInfo)1540   public void offlineDisabledRegion(HRegionInfo regionInfo) {
1541     if (useZKForAssignment) {
1542       // Disabling so should not be reassigned, just delete the CLOSED node
1543       LOG.debug("Table being disabled so deleting ZK node and removing from " +
1544         "regions in transition, skipping assignment of region " +
1545           regionInfo.getRegionNameAsString());
1546       String encodedName = regionInfo.getEncodedName();
1547       deleteNodeInStates(encodedName, "closed", null,
1548         EventType.RS_ZK_REGION_CLOSED, EventType.M_ZK_REGION_OFFLINE);
1549     }
1550     replicasToClose.remove(regionInfo);
1551     regionOffline(regionInfo);
1552   }
1553 
1554   // Assignment methods
1555 
1556   /**
1557    * Assigns the specified region.
1558    * <p>
1559    * If a RegionPlan is available with a valid destination then it will be used
1560    * to determine what server region is assigned to.  If no RegionPlan is
1561    * available, region will be assigned to a random available server.
1562    * <p>
1563    * Updates the RegionState and sends the OPEN RPC.
1564    * <p>
1565    * This will only succeed if the region is in transition and in a CLOSED or
1566    * OFFLINE state or not in transition (in-memory not zk), and of course, the
1567    * chosen server is up and running (It may have just crashed!).  If the
1568    * in-memory checks pass, the zk node is forced to OFFLINE before assigning.
1569    *
1570    * @param region server to be assigned
1571    * @param setOfflineInZK whether ZK node should be created/transitioned to an
1572    *                       OFFLINE state before assigning the region
1573    */
assign(HRegionInfo region, boolean setOfflineInZK)1574   public void assign(HRegionInfo region, boolean setOfflineInZK) {
1575     assign(region, setOfflineInZK, false);
1576   }
1577 
1578   /**
1579    * Use care with forceNewPlan. It could cause double assignment.
1580    */
assign(HRegionInfo region, boolean setOfflineInZK, boolean forceNewPlan)1581   public void assign(HRegionInfo region,
1582       boolean setOfflineInZK, boolean forceNewPlan) {
1583     if (isDisabledorDisablingRegionInRIT(region)) {
1584       return;
1585     }
1586     String encodedName = region.getEncodedName();
1587     Lock lock = locker.acquireLock(encodedName);
1588     try {
1589       RegionState state = forceRegionStateToOffline(region, forceNewPlan);
1590       if (state != null) {
1591         if (regionStates.wasRegionOnDeadServer(encodedName)) {
1592           LOG.info("Skip assigning " + region.getRegionNameAsString()
1593             + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
1594             + " is dead but not processed yet");
1595           return;
1596         }
1597         assign(state, setOfflineInZK && useZKForAssignment, forceNewPlan);
1598       }
1599     } finally {
1600       lock.unlock();
1601     }
1602   }
1603 
1604   /**
1605    * Bulk assign regions to <code>destination</code>.
1606    * @param destination
1607    * @param regions Regions to assign.
1608    * @return true if successful
1609    */
assign(final ServerName destination, final List<HRegionInfo> regions)1610   boolean assign(final ServerName destination, final List<HRegionInfo> regions)
1611     throws InterruptedException {
1612     long startTime = EnvironmentEdgeManager.currentTime();
1613     try {
1614       int regionCount = regions.size();
1615       if (regionCount == 0) {
1616         return true;
1617       }
1618       LOG.info("Assigning " + regionCount + " region(s) to " + destination.toString());
1619       Set<String> encodedNames = new HashSet<String>(regionCount);
1620       for (HRegionInfo region : regions) {
1621         encodedNames.add(region.getEncodedName());
1622       }
1623 
1624       List<HRegionInfo> failedToOpenRegions = new ArrayList<HRegionInfo>();
1625       Map<String, Lock> locks = locker.acquireLocks(encodedNames);
1626       try {
1627         AtomicInteger counter = new AtomicInteger(0);
1628         Map<String, Integer> offlineNodesVersions = new ConcurrentHashMap<String, Integer>();
1629         OfflineCallback cb = new OfflineCallback(
1630           watcher, destination, counter, offlineNodesVersions);
1631         Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>(regions.size());
1632         List<RegionState> states = new ArrayList<RegionState>(regions.size());
1633         for (HRegionInfo region : regions) {
1634           String encodedName = region.getEncodedName();
1635           if (!isDisabledorDisablingRegionInRIT(region)) {
1636             RegionState state = forceRegionStateToOffline(region, false);
1637             boolean onDeadServer = false;
1638             if (state != null) {
1639               if (regionStates.wasRegionOnDeadServer(encodedName)) {
1640                 LOG.info("Skip assigning " + region.getRegionNameAsString()
1641                   + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
1642                   + " is dead but not processed yet");
1643                 onDeadServer = true;
1644               } else if (!useZKForAssignment
1645                   || asyncSetOfflineInZooKeeper(state, cb, destination)) {
1646                 RegionPlan plan = new RegionPlan(region, state.getServerName(), destination);
1647                 plans.put(encodedName, plan);
1648                 states.add(state);
1649                 continue;
1650               }
1651             }
1652             // Reassign if the region wasn't on a dead server
1653             if (!onDeadServer) {
1654               LOG.info("failed to force region state to offline or "
1655                 + "failed to set it offline in ZK, will reassign later: " + region);
1656               failedToOpenRegions.add(region); // assign individually later
1657             }
1658           }
1659           // Release the lock, this region is excluded from bulk assign because
1660           // we can't update its state, or set its znode to offline.
1661           Lock lock = locks.remove(encodedName);
1662           lock.unlock();
1663         }
1664 
1665         if (useZKForAssignment) {
1666           // Wait until all unassigned nodes have been put up and watchers set.
1667           int total = states.size();
1668           for (int oldCounter = 0; !server.isStopped();) {
1669             int count = counter.get();
1670             if (oldCounter != count) {
1671               LOG.debug(destination.toString() + " unassigned znodes=" + count +
1672                 " of total=" + total + "; oldCounter=" + oldCounter);
1673               oldCounter = count;
1674             }
1675             if (count >= total) break;
1676             Thread.sleep(5);
1677           }
1678         }
1679 
1680         if (server.isStopped()) {
1681           return false;
1682         }
1683 
1684         // Add region plans, so we can updateTimers when one region is opened so
1685         // that unnecessary timeout on RIT is reduced.
1686         this.addPlans(plans);
1687 
1688         List<Triple<HRegionInfo, Integer, List<ServerName>>> regionOpenInfos =
1689           new ArrayList<Triple<HRegionInfo, Integer, List<ServerName>>>(states.size());
1690         for (RegionState state: states) {
1691           HRegionInfo region = state.getRegion();
1692           String encodedRegionName = region.getEncodedName();
1693           Integer nodeVersion = offlineNodesVersions.get(encodedRegionName);
1694           if (useZKForAssignment && (nodeVersion == null || nodeVersion == -1)) {
1695             LOG.warn("failed to offline in zookeeper: " + region);
1696             failedToOpenRegions.add(region); // assign individually later
1697             Lock lock = locks.remove(encodedRegionName);
1698             lock.unlock();
1699           } else {
1700             regionStates.updateRegionState(
1701               region, State.PENDING_OPEN, destination);
1702             List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
1703             if (this.shouldAssignRegionsWithFavoredNodes) {
1704               favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
1705             }
1706             regionOpenInfos.add(new Triple<HRegionInfo, Integer,  List<ServerName>>(
1707               region, nodeVersion, favoredNodes));
1708           }
1709         }
1710 
1711         // Move on to open regions.
1712         try {
1713           // Send OPEN RPC. If it fails on a IOE or RemoteException,
1714           // regions will be assigned individually.
1715           long maxWaitTime = System.currentTimeMillis() +
1716             this.server.getConfiguration().
1717               getLong("hbase.regionserver.rpc.startup.waittime", 60000);
1718           for (int i = 1; i <= maximumAttempts && !server.isStopped(); i++) {
1719             try {
1720               // regionOpenInfos is empty if all regions are in failedToOpenRegions list
1721               if (regionOpenInfos.isEmpty()) {
1722                 break;
1723               }
1724               List<RegionOpeningState> regionOpeningStateList = serverManager
1725                 .sendRegionOpen(destination, regionOpenInfos);
1726               if (regionOpeningStateList == null) {
1727                 // Failed getting RPC connection to this server
1728                 return false;
1729               }
1730               for (int k = 0, n = regionOpeningStateList.size(); k < n; k++) {
1731                 RegionOpeningState openingState = regionOpeningStateList.get(k);
1732                 if (openingState != RegionOpeningState.OPENED) {
1733                   HRegionInfo region = regionOpenInfos.get(k).getFirst();
1734                   if (openingState == RegionOpeningState.ALREADY_OPENED) {
1735                     processAlreadyOpenedRegion(region, destination);
1736                   } else if (openingState == RegionOpeningState.FAILED_OPENING) {
1737                     // Failed opening this region, reassign it later
1738                     failedToOpenRegions.add(region);
1739                   } else {
1740                     LOG.warn("THIS SHOULD NOT HAPPEN: unknown opening state "
1741                       + openingState + " in assigning region " + region);
1742                   }
1743                 }
1744               }
1745               break;
1746             } catch (IOException e) {
1747               if (e instanceof RemoteException) {
1748                 e = ((RemoteException)e).unwrapRemoteException();
1749               }
1750               if (e instanceof RegionServerStoppedException) {
1751                 LOG.warn("The region server was shut down, ", e);
1752                 // No need to retry, the region server is a goner.
1753                 return false;
1754               } else if (e instanceof ServerNotRunningYetException) {
1755                 long now = System.currentTimeMillis();
1756                 if (now < maxWaitTime) {
1757                   LOG.debug("Server is not yet up; waiting up to " +
1758                     (maxWaitTime - now) + "ms", e);
1759                   Thread.sleep(100);
1760                   i--; // reset the try count
1761                   continue;
1762                 }
1763               } else if (e instanceof java.net.SocketTimeoutException
1764                   && this.serverManager.isServerOnline(destination)) {
1765                 // In case socket is timed out and the region server is still online,
1766                 // the openRegion RPC could have been accepted by the server and
1767                 // just the response didn't go through.  So we will retry to
1768                 // open the region on the same server.
1769                 if (LOG.isDebugEnabled()) {
1770                   LOG.debug("Bulk assigner openRegion() to " + destination
1771                     + " has timed out, but the regions might"
1772                     + " already be opened on it.", e);
1773                 }
1774                 // wait and reset the re-try count, server might be just busy.
1775                 Thread.sleep(100);
1776                 i--;
1777                 continue;
1778               }
1779               throw e;
1780             }
1781           }
1782         } catch (IOException e) {
1783           // Can be a socket timeout, EOF, NoRouteToHost, etc
1784           LOG.info("Unable to communicate with " + destination
1785             + " in order to assign regions, ", e);
1786           return false;
1787         }
1788       } finally {
1789         for (Lock lock : locks.values()) {
1790           lock.unlock();
1791         }
1792       }
1793 
1794       if (!failedToOpenRegions.isEmpty()) {
1795         for (HRegionInfo region : failedToOpenRegions) {
1796           if (!regionStates.isRegionOnline(region)) {
1797             invokeAssign(region);
1798           }
1799         }
1800       }
1801 
1802       // wait for assignment completion
1803       ArrayList<HRegionInfo> userRegionSet = new ArrayList<HRegionInfo>(regions.size());
1804       for (HRegionInfo region: regions) {
1805         if (!region.getTable().isSystemTable()) {
1806           userRegionSet.add(region);
1807         }
1808       }
1809       if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
1810             System.currentTimeMillis())) {
1811         LOG.debug("some user regions are still in transition: " + userRegionSet);
1812       }
1813       LOG.debug("Bulk assigning done for " + destination);
1814       return true;
1815     } finally {
1816       metricsAssignmentManager.updateBulkAssignTime(EnvironmentEdgeManager.currentTime() - startTime);
1817     }
1818   }
1819 
1820   /**
1821    * Send CLOSE RPC if the server is online, otherwise, offline the region.
1822    *
1823    * The RPC will be sent only to the region sever found in the region state
1824    * if it is passed in, otherwise, to the src server specified. If region
1825    * state is not specified, we don't update region state at all, instead
1826    * we just send the RPC call. This is useful for some cleanup without
1827    * messing around the region states (see handleRegion, on region opened
1828    * on an unexpected server scenario, for an example)
1829    */
unassign(final HRegionInfo region, final RegionState state, final int versionOfClosingNode, final ServerName dest, final boolean transitionInZK, final ServerName src)1830   private void unassign(final HRegionInfo region,
1831       final RegionState state, final int versionOfClosingNode,
1832       final ServerName dest, final boolean transitionInZK,
1833       final ServerName src) {
1834     ServerName server = src;
1835     if (state != null) {
1836       server = state.getServerName();
1837     }
1838     long maxWaitTime = -1;
1839     for (int i = 1; i <= this.maximumAttempts; i++) {
1840       if (this.server.isStopped() || this.server.isAborted()) {
1841         LOG.debug("Server stopped/aborted; skipping unassign of " + region);
1842         return;
1843       }
1844       // ClosedRegionhandler can remove the server from this.regions
1845       if (!serverManager.isServerOnline(server)) {
1846         LOG.debug("Offline " + region.getRegionNameAsString()
1847           + ", no need to unassign since it's on a dead server: " + server);
1848         if (transitionInZK) {
1849           // delete the node. if no node exists need not bother.
1850           deleteClosingOrClosedNode(region, server);
1851         }
1852         if (state != null) {
1853           regionOffline(region);
1854         }
1855         return;
1856       }
1857       try {
1858         // Send CLOSE RPC
1859         if (serverManager.sendRegionClose(server, region,
1860           versionOfClosingNode, dest, transitionInZK)) {
1861           LOG.debug("Sent CLOSE to " + server + " for region " +
1862             region.getRegionNameAsString());
1863           if (useZKForAssignment && !transitionInZK && state != null) {
1864             // Retry to make sure the region is
1865             // closed so as to avoid double assignment.
1866             unassign(region, state, versionOfClosingNode,
1867               dest, transitionInZK, src);
1868           }
1869           return;
1870         }
1871         // This never happens. Currently regionserver close always return true.
1872         // Todo; this can now happen (0.96) if there is an exception in a coprocessor
1873         LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
1874           region.getRegionNameAsString());
1875       } catch (Throwable t) {
1876         long sleepTime = 0;
1877         Configuration conf = this.server.getConfiguration();
1878         if (t instanceof RemoteException) {
1879           t = ((RemoteException)t).unwrapRemoteException();
1880         }
1881         boolean logRetries = true;
1882         if (t instanceof RegionServerAbortedException
1883             || t instanceof RegionServerStoppedException
1884             || t instanceof ServerNotRunningYetException) {
1885           // RS is aborting or stopping, we cannot offline the region since the region may need
1886           // to do WAL recovery. Until we see  the RS expiration, we should retry.
1887           sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1888             RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
1889 
1890         } else if (t instanceof NotServingRegionException) {
1891           LOG.debug("Offline " + region.getRegionNameAsString()
1892             + ", it's not any more on " + server, t);
1893           if (transitionInZK) {
1894             deleteClosingOrClosedNode(region, server);
1895           }
1896           if (state != null) {
1897             regionOffline(region);
1898           }
1899           return;
1900         } else if ((t instanceof FailedServerException) || (state != null &&
1901             t instanceof RegionAlreadyInTransitionException)) {
1902           if(t instanceof FailedServerException) {
1903             sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1904                   RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
1905           } else {
1906             // RS is already processing this region, only need to update the timestamp
1907             LOG.debug("update " + state + " the timestamp.");
1908             state.updateTimestampToNow();
1909             if (maxWaitTime < 0) {
1910               maxWaitTime =
1911                   EnvironmentEdgeManager.currentTime()
1912                       + conf.getLong(ALREADY_IN_TRANSITION_WAITTIME,
1913                         DEFAULT_ALREADY_IN_TRANSITION_WAITTIME);
1914             }
1915             long now = EnvironmentEdgeManager.currentTime();
1916             if (now < maxWaitTime) {
1917               LOG.debug("Region is already in transition; "
1918                 + "waiting up to " + (maxWaitTime - now) + "ms", t);
1919               sleepTime = 100;
1920               i--; // reset the try count
1921               logRetries = false;
1922             }
1923           }
1924         }
1925 
1926         try {
1927           if (sleepTime > 0) {
1928             Thread.sleep(sleepTime);
1929           }
1930         } catch (InterruptedException ie) {
1931           LOG.warn("Failed to unassign "
1932             + region.getRegionNameAsString() + " since interrupted", ie);
1933           Thread.currentThread().interrupt();
1934           if (state != null) {
1935             regionStates.updateRegionState(region, State.FAILED_CLOSE);
1936           }
1937           return;
1938         }
1939 
1940         if (logRetries) {
1941           LOG.info("Server " + server + " returned " + t + " for "
1942             + region.getRegionNameAsString() + ", try=" + i
1943             + " of " + this.maximumAttempts, t);
1944           // Presume retry or server will expire.
1945         }
1946       }
1947     }
1948     // Run out of attempts
1949     if (state != null) {
1950       regionStates.updateRegionState(region, State.FAILED_CLOSE);
1951     }
1952   }
1953 
1954   /**
1955    * Set region to OFFLINE unless it is opening and forceNewPlan is false.
1956    */
forceRegionStateToOffline( final HRegionInfo region, final boolean forceNewPlan)1957   private RegionState forceRegionStateToOffline(
1958       final HRegionInfo region, final boolean forceNewPlan) {
1959     RegionState state = regionStates.getRegionState(region);
1960     if (state == null) {
1961       LOG.warn("Assigning but not in region states: " + region);
1962       state = regionStates.createRegionState(region);
1963     }
1964 
1965     ServerName sn = state.getServerName();
1966     if (forceNewPlan && LOG.isDebugEnabled()) {
1967       LOG.debug("Force region state offline " + state);
1968     }
1969 
1970     switch (state.getState()) {
1971     case OPEN:
1972     case OPENING:
1973     case PENDING_OPEN:
1974     case CLOSING:
1975     case PENDING_CLOSE:
1976       if (!forceNewPlan) {
1977         LOG.debug("Skip assigning " +
1978           region + ", it is already " + state);
1979         return null;
1980       }
1981     case FAILED_CLOSE:
1982     case FAILED_OPEN:
1983       unassign(region, state, -1, null, false, null);
1984       state = regionStates.getRegionState(region);
1985       if (state.isFailedClose()) {
1986         // If we can't close the region, we can't re-assign
1987         // it so as to avoid possible double assignment/data loss.
1988         LOG.info("Skip assigning " +
1989           region + ", we couldn't close it: " + state);
1990         return null;
1991       }
1992     case OFFLINE:
1993       // This region could have been open on this server
1994       // for a while. If the server is dead and not processed
1995       // yet, we can move on only if the meta shows the
1996       // region is not on this server actually, or on a server
1997       // not dead, or dead and processed already.
1998       // In case not using ZK, we don't need this check because
1999       // we have the latest info in memory, and the caller
2000       // will do another round checking any way.
2001       if (useZKForAssignment
2002           && regionStates.isServerDeadAndNotProcessed(sn)
2003           && wasRegionOnDeadServerByMeta(region, sn)) {
2004         if (!regionStates.isRegionInTransition(region)) {
2005           LOG.info("Updating the state to " + State.OFFLINE + " to allow to be reassigned by SSH");
2006           regionStates.updateRegionState(region, State.OFFLINE);
2007         }
2008         LOG.info("Skip assigning " + region.getRegionNameAsString()
2009             + ", it is on a dead but not processed yet server: " + sn);
2010         return null;
2011       }
2012     case CLOSED:
2013       break;
2014     default:
2015       LOG.error("Trying to assign region " + region
2016         + ", which is " + state);
2017       return null;
2018     }
2019     return state;
2020   }
2021 
2022   @SuppressWarnings("deprecation")
wasRegionOnDeadServerByMeta( final HRegionInfo region, final ServerName sn)2023   protected boolean wasRegionOnDeadServerByMeta(
2024       final HRegionInfo region, final ServerName sn) {
2025     try {
2026       if (region.isMetaRegion()) {
2027         ServerName server = this.server.getMetaTableLocator().
2028           getMetaRegionLocation(this.server.getZooKeeper());
2029         return regionStates.isServerDeadAndNotProcessed(server);
2030       }
2031       while (!server.isStopped()) {
2032         try {
2033           this.server.getMetaTableLocator().waitMetaRegionLocation(server.getZooKeeper());
2034           Result r = MetaTableAccessor.getRegionResult(server.getConnection(),
2035             region.getRegionName());
2036           if (r == null || r.isEmpty()) return false;
2037           ServerName server = HRegionInfo.getServerName(r);
2038           return regionStates.isServerDeadAndNotProcessed(server);
2039         } catch (IOException ioe) {
2040           LOG.info("Received exception accessing hbase:meta during force assign "
2041             + region.getRegionNameAsString() + ", retrying", ioe);
2042         }
2043       }
2044     } catch (InterruptedException e) {
2045       Thread.currentThread().interrupt();
2046       LOG.info("Interrupted accessing hbase:meta", e);
2047     }
2048     // Call is interrupted or server is stopped.
2049     return regionStates.isServerDeadAndNotProcessed(sn);
2050   }
2051 
2052   /**
2053    * Caller must hold lock on the passed <code>state</code> object.
2054    * @param state
2055    * @param setOfflineInZK
2056    * @param forceNewPlan
2057    */
assign(RegionState state, boolean setOfflineInZK, final boolean forceNewPlan)2058   private void assign(RegionState state,
2059       boolean setOfflineInZK, final boolean forceNewPlan) {
2060     long startTime = EnvironmentEdgeManager.currentTime();
2061     try {
2062       Configuration conf = server.getConfiguration();
2063       RegionState currentState = state;
2064       int versionOfOfflineNode = -1;
2065       RegionPlan plan = null;
2066       long maxWaitTime = -1;
2067       HRegionInfo region = state.getRegion();
2068       RegionOpeningState regionOpenState;
2069       Throwable previousException = null;
2070       for (int i = 1; i <= maximumAttempts; i++) {
2071         if (server.isStopped() || server.isAborted()) {
2072           LOG.info("Skip assigning " + region.getRegionNameAsString()
2073             + ", the server is stopped/aborted");
2074           return;
2075         }
2076 
2077         if (plan == null) { // Get a server for the region at first
2078           try {
2079             plan = getRegionPlan(region, forceNewPlan);
2080           } catch (HBaseIOException e) {
2081             LOG.warn("Failed to get region plan", e);
2082           }
2083         }
2084 
2085         if (plan == null) {
2086           LOG.warn("Unable to determine a plan to assign " + region);
2087 
2088           // For meta region, we have to keep retrying until succeeding
2089           if (region.isMetaRegion()) {
2090             if (i == maximumAttempts) {
2091               i = 0; // re-set attempt count to 0 for at least 1 retry
2092 
2093               LOG.warn("Unable to determine a plan to assign a hbase:meta region " + region +
2094                 " after maximumAttempts (" + this.maximumAttempts +
2095                 "). Reset attempts count and continue retrying.");
2096             }
2097             waitForRetryingMetaAssignment();
2098             continue;
2099           }
2100 
2101           regionStates.updateRegionState(region, State.FAILED_OPEN);
2102           return;
2103         }
2104         if (setOfflineInZK && versionOfOfflineNode == -1) {
2105           LOG.info("Setting node as OFFLINED in ZooKeeper for region " + region);
2106           // get the version of the znode after setting it to OFFLINE.
2107           // versionOfOfflineNode will be -1 if the znode was not set to OFFLINE
2108           versionOfOfflineNode = setOfflineInZooKeeper(currentState, plan.getDestination());
2109           if (versionOfOfflineNode != -1) {
2110             if (isDisabledorDisablingRegionInRIT(region)) {
2111               return;
2112             }
2113             // In case of assignment from EnableTableHandler table state is ENABLING. Any how
2114             // EnableTableHandler will set ENABLED after assigning all the table regions. If we
2115             // try to set to ENABLED directly then client API may think table is enabled.
2116             // When we have a case such as all the regions are added directly into hbase:meta and we call
2117             // assignRegion then we need to make the table ENABLED. Hence in such case the table
2118             // will not be in ENABLING or ENABLED state.
2119             TableName tableName = region.getTable();
2120             if (!tableStateManager.isTableState(tableName,
2121               ZooKeeperProtos.Table.State.ENABLED, ZooKeeperProtos.Table.State.ENABLING)) {
2122               LOG.debug("Setting table " + tableName + " to ENABLED state.");
2123               setEnabledTable(tableName);
2124             }
2125           }
2126         }
2127         if (setOfflineInZK && versionOfOfflineNode == -1) {
2128           LOG.info("Unable to set offline in ZooKeeper to assign " + region);
2129           // Setting offline in ZK must have been failed due to ZK racing or some
2130           // exception which may make the server to abort. If it is ZK racing,
2131           // we should retry since we already reset the region state,
2132           // existing (re)assignment will fail anyway.
2133           if (!server.isAborted()) {
2134             continue;
2135           }
2136         }
2137         LOG.info("Assigning " + region.getRegionNameAsString() +
2138             " to " + plan.getDestination().toString());
2139         // Transition RegionState to PENDING_OPEN
2140         currentState = regionStates.updateRegionState(region,
2141           State.PENDING_OPEN, plan.getDestination());
2142 
2143         boolean needNewPlan;
2144         final String assignMsg = "Failed assignment of " + region.getRegionNameAsString() +
2145             " to " + plan.getDestination();
2146         try {
2147           List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
2148           if (this.shouldAssignRegionsWithFavoredNodes) {
2149             favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
2150           }
2151           regionOpenState = serverManager.sendRegionOpen(
2152               plan.getDestination(), region, versionOfOfflineNode, favoredNodes);
2153 
2154           if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
2155             // Failed opening this region, looping again on a new server.
2156             needNewPlan = true;
2157             LOG.warn(assignMsg + ", regionserver says 'FAILED_OPENING', " +
2158                 " trying to assign elsewhere instead; " +
2159                 "try=" + i + " of " + this.maximumAttempts);
2160           } else {
2161             // we're done
2162             if (regionOpenState == RegionOpeningState.ALREADY_OPENED) {
2163               processAlreadyOpenedRegion(region, plan.getDestination());
2164             }
2165             return;
2166           }
2167 
2168         } catch (Throwable t) {
2169           if (t instanceof RemoteException) {
2170             t = ((RemoteException) t).unwrapRemoteException();
2171           }
2172           previousException = t;
2173 
2174           // Should we wait a little before retrying? If the server is starting it's yes.
2175           // If the region is already in transition, it's yes as well: we want to be sure that
2176           //  the region will get opened but we don't want a double assignment.
2177           boolean hold = (t instanceof RegionAlreadyInTransitionException ||
2178               t instanceof ServerNotRunningYetException);
2179 
2180           // In case socket is timed out and the region server is still online,
2181           // the openRegion RPC could have been accepted by the server and
2182           // just the response didn't go through.  So we will retry to
2183           // open the region on the same server to avoid possible
2184           // double assignment.
2185           boolean retry = !hold && (t instanceof java.net.SocketTimeoutException
2186               && this.serverManager.isServerOnline(plan.getDestination()));
2187 
2188 
2189           if (hold) {
2190             LOG.warn(assignMsg + ", waiting a little before trying on the same region server " +
2191               "try=" + i + " of " + this.maximumAttempts, t);
2192 
2193             if (maxWaitTime < 0) {
2194               if (t instanceof RegionAlreadyInTransitionException) {
2195                 maxWaitTime = EnvironmentEdgeManager.currentTime()
2196                   + this.server.getConfiguration().getLong(ALREADY_IN_TRANSITION_WAITTIME,
2197                     DEFAULT_ALREADY_IN_TRANSITION_WAITTIME);
2198               } else {
2199                 maxWaitTime = EnvironmentEdgeManager.currentTime()
2200                   + this.server.getConfiguration().getLong(
2201                     "hbase.regionserver.rpc.startup.waittime", 60000);
2202               }
2203             }
2204             try {
2205               needNewPlan = false;
2206               long now = EnvironmentEdgeManager.currentTime();
2207               if (now < maxWaitTime) {
2208                 LOG.debug("Server is not yet up or region is already in transition; "
2209                   + "waiting up to " + (maxWaitTime - now) + "ms", t);
2210                 Thread.sleep(100);
2211                 i--; // reset the try count
2212               } else if (!(t instanceof RegionAlreadyInTransitionException)) {
2213                 LOG.debug("Server is not up for a while; try a new one", t);
2214                 needNewPlan = true;
2215               }
2216             } catch (InterruptedException ie) {
2217               LOG.warn("Failed to assign "
2218                   + region.getRegionNameAsString() + " since interrupted", ie);
2219               regionStates.updateRegionState(region, State.FAILED_OPEN);
2220               Thread.currentThread().interrupt();
2221               return;
2222             }
2223           } else if (retry) {
2224             needNewPlan = false;
2225             i--; // we want to retry as many times as needed as long as the RS is not dead.
2226             LOG.warn(assignMsg + ", trying to assign to the same region server due ", t);
2227           } else {
2228             needNewPlan = true;
2229             LOG.warn(assignMsg + ", trying to assign elsewhere instead;" +
2230                 " try=" + i + " of " + this.maximumAttempts, t);
2231           }
2232         }
2233 
2234         if (i == this.maximumAttempts) {
2235           // For meta region, we have to keep retrying until succeeding
2236           if (region.isMetaRegion()) {
2237             i = 0; // re-set attempt count to 0 for at least 1 retry
2238             LOG.warn(assignMsg +
2239                 ", trying to assign a hbase:meta region reached to maximumAttempts (" +
2240                 this.maximumAttempts + ").  Reset attempt counts and continue retrying.");
2241             waitForRetryingMetaAssignment();
2242           }
2243           else {
2244             // Don't reset the region state or get a new plan any more.
2245             // This is the last try.
2246             continue;
2247           }
2248         }
2249 
2250         // If region opened on destination of present plan, reassigning to new
2251         // RS may cause double assignments. In case of RegionAlreadyInTransitionException
2252         // reassigning to same RS.
2253         if (needNewPlan) {
2254           // Force a new plan and reassign. Will return null if no servers.
2255           // The new plan could be the same as the existing plan since we don't
2256           // exclude the server of the original plan, which should not be
2257           // excluded since it could be the only server up now.
2258           RegionPlan newPlan = null;
2259           try {
2260             newPlan = getRegionPlan(region, true);
2261           } catch (HBaseIOException e) {
2262             LOG.warn("Failed to get region plan", e);
2263           }
2264           if (newPlan == null) {
2265             regionStates.updateRegionState(region, State.FAILED_OPEN);
2266             LOG.warn("Unable to find a viable location to assign region " +
2267                 region.getRegionNameAsString());
2268             return;
2269           }
2270 
2271           if (plan != newPlan && !plan.getDestination().equals(newPlan.getDestination())) {
2272             // Clean out plan we failed execute and one that doesn't look like it'll
2273             // succeed anyways; we need a new plan!
2274             // Transition back to OFFLINE
2275             LOG.info("Region assignment plan changed from " + plan.getDestination() + " to "
2276                 + newPlan.getDestination() + " server.");
2277             currentState = regionStates.updateRegionState(region, State.OFFLINE);
2278             versionOfOfflineNode = -1;
2279             if (useZKForAssignment) {
2280               setOfflineInZK = true;
2281             }
2282             plan = newPlan;
2283           } else if(plan.getDestination().equals(newPlan.getDestination()) &&
2284               previousException instanceof FailedServerException) {
2285             try {
2286               LOG.info("Trying to re-assign " + region.getRegionNameAsString() +
2287                 " to the same failed server.");
2288               Thread.sleep(1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
2289                 RpcClient.FAILED_SERVER_EXPIRY_DEFAULT));
2290             } catch (InterruptedException ie) {
2291               LOG.warn("Failed to assign "
2292                   + region.getRegionNameAsString() + " since interrupted", ie);
2293               regionStates.updateRegionState(region, State.FAILED_OPEN);
2294               Thread.currentThread().interrupt();
2295               return;
2296             }
2297           }
2298         }
2299       }
2300       // Run out of attempts
2301       regionStates.updateRegionState(region, State.FAILED_OPEN);
2302     } finally {
2303       metricsAssignmentManager.updateAssignmentTime(EnvironmentEdgeManager.currentTime() - startTime);
2304     }
2305   }
2306 
processAlreadyOpenedRegion(HRegionInfo region, ServerName sn)2307   private void processAlreadyOpenedRegion(HRegionInfo region, ServerName sn) {
2308     // Remove region from in-memory transition and unassigned node from ZK
2309     // While trying to enable the table the regions of the table were
2310     // already enabled.
2311     LOG.debug("ALREADY_OPENED " + region.getRegionNameAsString()
2312       + " to " + sn);
2313     String encodedName = region.getEncodedName();
2314 
2315     //If use ZkForAssignment, region already Opened event should not be handled,
2316     //leave it to zk event. See HBase-14407.
2317     if(useZKForAssignment){
2318       String node = ZKAssign.getNodeName(watcher, encodedName);
2319       Stat stat = new Stat();
2320       try {
2321         byte[] existingBytes = ZKUtil.getDataNoWatch(watcher, node, stat);
2322         if(existingBytes!=null){
2323           RegionTransition rt= RegionTransition.parseFrom(existingBytes);
2324           EventType et = rt.getEventType();
2325           if (et.equals(EventType.RS_ZK_REGION_OPENED)) {
2326             LOG.debug("ALREADY_OPENED " + region.getRegionNameAsString()
2327               + " and node in "+et+" state");
2328             return;
2329           }
2330         }
2331       } catch (KeeperException ke) {
2332         LOG.warn("Unexpected ZK exception getData " + node
2333           + " node for the region " + encodedName, ke);
2334       } catch (DeserializationException e) {
2335         LOG.warn("Get RegionTransition from zk deserialization failed! ", e);
2336       }
2337 
2338       deleteNodeInStates(encodedName, "offline", sn, EventType.M_ZK_REGION_OFFLINE);
2339     }
2340 
2341     regionStates.regionOnline(region, sn);
2342   }
2343 
isDisabledorDisablingRegionInRIT(final HRegionInfo region)2344   private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
2345     if (this.tableStateManager.isTableState(region.getTable(),
2346         ZooKeeperProtos.Table.State.DISABLED,
2347         ZooKeeperProtos.Table.State.DISABLING) || replicasToClose.contains(region)) {
2348       LOG.info("Table " + region.getTable() + " is disabled or disabling;"
2349         + " skipping assign of " + region.getRegionNameAsString());
2350       offlineDisabledRegion(region);
2351       return true;
2352     }
2353     return false;
2354   }
2355 
2356   /**
2357    * Set region as OFFLINED up in zookeeper
2358    *
2359    * @param state
2360    * @return the version of the offline node if setting of the OFFLINE node was
2361    *         successful, -1 otherwise.
2362    */
setOfflineInZooKeeper(final RegionState state, final ServerName destination)2363   private int setOfflineInZooKeeper(final RegionState state, final ServerName destination) {
2364     if (!state.isClosed() && !state.isOffline()) {
2365       String msg = "Unexpected state : " + state + " .. Cannot transit it to OFFLINE.";
2366       this.server.abort(msg, new IllegalStateException(msg));
2367       return -1;
2368     }
2369     regionStates.updateRegionState(state.getRegion(), State.OFFLINE);
2370     int versionOfOfflineNode;
2371     try {
2372       // get the version after setting the znode to OFFLINE
2373       versionOfOfflineNode = ZKAssign.createOrForceNodeOffline(watcher,
2374         state.getRegion(), destination);
2375       if (versionOfOfflineNode == -1) {
2376         LOG.warn("Attempted to create/force node into OFFLINE state before "
2377             + "completing assignment but failed to do so for " + state);
2378         return -1;
2379       }
2380     } catch (KeeperException e) {
2381       server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
2382       return -1;
2383     }
2384     return versionOfOfflineNode;
2385   }
2386 
2387   /**
2388    * @param region the region to assign
2389    * @return Plan for passed <code>region</code> (If none currently, it creates one or
2390    * if no servers to assign, it returns null).
2391    */
getRegionPlan(final HRegionInfo region, final boolean forceNewPlan)2392   private RegionPlan getRegionPlan(final HRegionInfo region,
2393       final boolean forceNewPlan)  throws HBaseIOException {
2394     return getRegionPlan(region, null, forceNewPlan);
2395   }
2396 
2397   /**
2398    * @param region the region to assign
2399    * @param serverToExclude Server to exclude (we know its bad). Pass null if
2400    * all servers are thought to be assignable.
2401    * @param forceNewPlan If true, then if an existing plan exists, a new plan
2402    * will be generated.
2403    * @return Plan for passed <code>region</code> (If none currently, it creates one or
2404    * if no servers to assign, it returns null).
2405    */
getRegionPlan(final HRegionInfo region, final ServerName serverToExclude, final boolean forceNewPlan)2406   private RegionPlan getRegionPlan(final HRegionInfo region,
2407       final ServerName serverToExclude, final boolean forceNewPlan) throws HBaseIOException {
2408     // Pickup existing plan or make a new one
2409     final String encodedName = region.getEncodedName();
2410     final List<ServerName> destServers =
2411       serverManager.createDestinationServersList(serverToExclude);
2412 
2413     if (destServers.isEmpty()){
2414       LOG.warn("Can't move " + encodedName +
2415         ", there is no destination server available.");
2416       return null;
2417     }
2418 
2419     RegionPlan randomPlan = null;
2420     boolean newPlan = false;
2421     RegionPlan existingPlan;
2422 
2423     synchronized (this.regionPlans) {
2424       existingPlan = this.regionPlans.get(encodedName);
2425 
2426       if (existingPlan != null && existingPlan.getDestination() != null) {
2427         LOG.debug("Found an existing plan for " + region.getRegionNameAsString()
2428           + " destination server is " + existingPlan.getDestination() +
2429             " accepted as a dest server = " + destServers.contains(existingPlan.getDestination()));
2430       }
2431 
2432       if (forceNewPlan
2433           || existingPlan == null
2434           || existingPlan.getDestination() == null
2435           || !destServers.contains(existingPlan.getDestination())) {
2436         newPlan = true;
2437       }
2438     }
2439 
2440     if (newPlan) {
2441       ServerName destination = balancer.randomAssignment(region, destServers);
2442       if (destination == null) {
2443         LOG.warn("Can't find a destination for " + encodedName);
2444         return null;
2445       }
2446       synchronized (this.regionPlans) {
2447         randomPlan = new RegionPlan(region, null, destination);
2448         if (!region.isMetaTable() && shouldAssignRegionsWithFavoredNodes) {
2449           List<HRegionInfo> regions = new ArrayList<HRegionInfo>(1);
2450           regions.add(region);
2451           try {
2452             processFavoredNodes(regions);
2453           } catch (IOException ie) {
2454             LOG.warn("Ignoring exception in processFavoredNodes " + ie);
2455           }
2456         }
2457         this.regionPlans.put(encodedName, randomPlan);
2458       }
2459       LOG.debug("No previous transition plan found (or ignoring " + "an existing plan) for "
2460           + region.getRegionNameAsString() + "; generated random plan=" + randomPlan + "; "
2461           + destServers.size() + " (online=" + serverManager.getOnlineServers().size()
2462           + ") available servers, forceNewPlan=" + forceNewPlan);
2463       return randomPlan;
2464     }
2465     LOG.debug("Using pre-existing plan for " +
2466       region.getRegionNameAsString() + "; plan=" + existingPlan);
2467     return existingPlan;
2468   }
2469 
2470   /**
2471    * Wait for some time before retrying meta table region assignment
2472    */
waitForRetryingMetaAssignment()2473   private void waitForRetryingMetaAssignment() {
2474     try {
2475       Thread.sleep(this.sleepTimeBeforeRetryingMetaAssignment);
2476     } catch (InterruptedException e) {
2477       LOG.error("Got exception while waiting for hbase:meta assignment");
2478       Thread.currentThread().interrupt();
2479     }
2480   }
2481 
2482   /**
2483    * Unassigns the specified region.
2484    * <p>
2485    * Updates the RegionState and sends the CLOSE RPC unless region is being
2486    * split by regionserver; then the unassign fails (silently) because we
2487    * presume the region being unassigned no longer exists (its been split out
2488    * of existence). TODO: What to do if split fails and is rolled back and
2489    * parent is revivified?
2490    * <p>
2491    * If a RegionPlan is already set, it will remain.
2492    *
2493    * @param region server to be unassigned
2494    */
unassign(HRegionInfo region)2495   public void unassign(HRegionInfo region) {
2496     unassign(region, false);
2497   }
2498 
2499 
2500   /**
2501    * Unassigns the specified region.
2502    * <p>
2503    * Updates the RegionState and sends the CLOSE RPC unless region is being
2504    * split by regionserver; then the unassign fails (silently) because we
2505    * presume the region being unassigned no longer exists (its been split out
2506    * of existence). TODO: What to do if split fails and is rolled back and
2507    * parent is revivified?
2508    * <p>
2509    * If a RegionPlan is already set, it will remain.
2510    *
2511    * @param region server to be unassigned
2512    * @param force if region should be closed even if already closing
2513    */
unassign(HRegionInfo region, boolean force, ServerName dest)2514   public void unassign(HRegionInfo region, boolean force, ServerName dest) {
2515     // TODO: Method needs refactoring.  Ugly buried returns throughout.  Beware!
2516     LOG.debug("Starting unassign of " + region.getRegionNameAsString()
2517       + " (offlining), current state: " + regionStates.getRegionState(region));
2518 
2519     String encodedName = region.getEncodedName();
2520     // Grab the state of this region and synchronize on it
2521     int versionOfClosingNode = -1;
2522     // We need a lock here as we're going to do a put later and we don't want multiple states
2523     //  creation
2524     ReentrantLock lock = locker.acquireLock(encodedName);
2525     RegionState state = regionStates.getRegionTransitionState(encodedName);
2526     boolean reassign = true;
2527     try {
2528       if (state == null) {
2529         // Region is not in transition.
2530         // We can unassign it only if it's not SPLIT/MERGED.
2531         state = regionStates.getRegionState(encodedName);
2532         if (state != null && state.isUnassignable()) {
2533           LOG.info("Attempting to unassign " + state + ", ignored");
2534           // Offline region will be reassigned below
2535           return;
2536         }
2537         // Create the znode in CLOSING state
2538         try {
2539           if (state == null || state.getServerName() == null) {
2540             // We don't know where the region is, offline it.
2541             // No need to send CLOSE RPC
2542             LOG.warn("Attempting to unassign a region not in RegionStates "
2543               + region.getRegionNameAsString() + ", offlined");
2544             regionOffline(region);
2545             return;
2546           }
2547           if (useZKForAssignment) {
2548             versionOfClosingNode = ZKAssign.createNodeClosing(
2549               watcher, region, state.getServerName());
2550             if (versionOfClosingNode == -1) {
2551               LOG.info("Attempting to unassign " +
2552                 region.getRegionNameAsString() + " but ZK closing node "
2553                 + "can't be created.");
2554               reassign = false; // not unassigned at all
2555               return;
2556             }
2557           }
2558         } catch (KeeperException e) {
2559           if (e instanceof NodeExistsException) {
2560             // Handle race between master initiated close and regionserver
2561             // orchestrated splitting. See if existing node is in a
2562             // SPLITTING or SPLIT state.  If so, the regionserver started
2563             // an op on node before we could get our CLOSING in.  Deal.
2564             NodeExistsException nee = (NodeExistsException)e;
2565             String path = nee.getPath();
2566             try {
2567               if (isSplitOrSplittingOrMergedOrMerging(path)) {
2568                 LOG.debug(path + " is SPLIT or SPLITTING or MERGED or MERGING; " +
2569                   "skipping unassign because region no longer exists -- its split or merge");
2570                 reassign = false; // no need to reassign for split/merged region
2571                 return;
2572               }
2573             } catch (KeeperException.NoNodeException ke) {
2574               LOG.warn("Failed getData on SPLITTING/SPLIT at " + path +
2575                 "; presuming split and that the region to unassign, " +
2576                 encodedName + ", no longer exists -- confirm", ke);
2577               return;
2578             } catch (KeeperException ke) {
2579               LOG.error("Unexpected zk state", ke);
2580             } catch (DeserializationException de) {
2581               LOG.error("Failed parse", de);
2582             }
2583           }
2584           // If we get here, don't understand whats going on -- abort.
2585           server.abort("Unexpected ZK exception creating node CLOSING", e);
2586           reassign = false; // heading out already
2587           return;
2588         }
2589         state = regionStates.updateRegionState(region, State.PENDING_CLOSE);
2590       } else if (state.isFailedOpen()) {
2591         // The region is not open yet
2592         regionOffline(region);
2593         return;
2594       } else if (force && state.isPendingCloseOrClosing()) {
2595         LOG.debug("Attempting to unassign " + region.getRegionNameAsString() +
2596           " which is already " + state.getState()  +
2597           " but forcing to send a CLOSE RPC again ");
2598         if (state.isFailedClose()) {
2599           state = regionStates.updateRegionState(region, State.PENDING_CLOSE);
2600         }
2601         state.updateTimestampToNow();
2602       } else {
2603         LOG.debug("Attempting to unassign " +
2604           region.getRegionNameAsString() + " but it is " +
2605           "already in transition (" + state.getState() + ", force=" + force + ")");
2606         return;
2607       }
2608 
2609       unassign(region, state, versionOfClosingNode, dest, useZKForAssignment, null);
2610     } finally {
2611       lock.unlock();
2612 
2613       // Region is expected to be reassigned afterwards
2614       if (!replicasToClose.contains(region) && reassign && regionStates.isRegionOffline(region)) {
2615         assign(region, true);
2616       }
2617     }
2618   }
2619 
unassign(HRegionInfo region, boolean force)2620   public void unassign(HRegionInfo region, boolean force){
2621      unassign(region, force, null);
2622   }
2623 
2624   /**
2625    * @param region regioninfo of znode to be deleted.
2626    */
deleteClosingOrClosedNode(HRegionInfo region, ServerName sn)2627   public void deleteClosingOrClosedNode(HRegionInfo region, ServerName sn) {
2628     String encodedName = region.getEncodedName();
2629     deleteNodeInStates(encodedName, "closing", sn, EventType.M_ZK_REGION_CLOSING,
2630       EventType.RS_ZK_REGION_CLOSED);
2631   }
2632 
2633   /**
2634    * @param path
2635    * @return True if znode is in SPLIT or SPLITTING or MERGED or MERGING state.
2636    * @throws KeeperException Can happen if the znode went away in meantime.
2637    * @throws DeserializationException
2638    */
isSplitOrSplittingOrMergedOrMerging(final String path)2639   private boolean isSplitOrSplittingOrMergedOrMerging(final String path)
2640       throws KeeperException, DeserializationException {
2641     boolean result = false;
2642     // This may fail if the SPLIT or SPLITTING or MERGED or MERGING znode gets
2643     // cleaned up before we can get data from it.
2644     byte [] data = ZKAssign.getData(watcher, path);
2645     if (data == null) {
2646       LOG.info("Node " + path + " is gone");
2647       return false;
2648     }
2649     RegionTransition rt = RegionTransition.parseFrom(data);
2650     switch (rt.getEventType()) {
2651     case RS_ZK_REQUEST_REGION_SPLIT:
2652     case RS_ZK_REGION_SPLIT:
2653     case RS_ZK_REGION_SPLITTING:
2654     case RS_ZK_REQUEST_REGION_MERGE:
2655     case RS_ZK_REGION_MERGED:
2656     case RS_ZK_REGION_MERGING:
2657       result = true;
2658       break;
2659     default:
2660       LOG.info("Node " + path + " is in " + rt.getEventType());
2661       break;
2662     }
2663     return result;
2664   }
2665 
2666   /**
2667    * Used by unit tests. Return the number of regions opened so far in the life
2668    * of the master. Increases by one every time the master opens a region
2669    * @return the counter value of the number of regions opened so far
2670    */
getNumRegionsOpened()2671   public int getNumRegionsOpened() {
2672     return numRegionsOpened.get();
2673   }
2674 
2675   /**
2676    * Waits until the specified region has completed assignment.
2677    * <p>
2678    * If the region is already assigned, returns immediately.  Otherwise, method
2679    * blocks until the region is assigned.
2680    * @param regionInfo region to wait on assignment for
2681    * @return true if the region is assigned false otherwise.
2682    * @throws InterruptedException
2683    */
waitForAssignment(HRegionInfo regionInfo)2684   public boolean waitForAssignment(HRegionInfo regionInfo)
2685       throws InterruptedException {
2686     ArrayList<HRegionInfo> regionSet = new ArrayList<HRegionInfo>(1);
2687     regionSet.add(regionInfo);
2688     return waitForAssignment(regionSet, true, Long.MAX_VALUE);
2689   }
2690 
2691   /**
2692    * Waits until the specified region has completed assignment, or the deadline is reached.
2693    */
waitForAssignment(final Collection<HRegionInfo> regionSet, final boolean waitTillAllAssigned, final int reassigningRegions, final long minEndTime)2694   protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
2695       final boolean waitTillAllAssigned, final int reassigningRegions,
2696       final long minEndTime) throws InterruptedException {
2697     long deadline = minEndTime + bulkPerRegionOpenTimeGuesstimate * (reassigningRegions + 1);
2698     return waitForAssignment(regionSet, waitTillAllAssigned, deadline);
2699   }
2700 
2701   /**
2702    * Waits until the specified region has completed assignment, or the deadline is reached.
2703    * @param regionSet set of region to wait on. the set is modified and the assigned regions removed
2704    * @param waitTillAllAssigned true if we should wait all the regions to be assigned
2705    * @param deadline the timestamp after which the wait is aborted
2706    * @return true if all the regions are assigned false otherwise.
2707    * @throws InterruptedException
2708    */
waitForAssignment(final Collection<HRegionInfo> regionSet, final boolean waitTillAllAssigned, final long deadline)2709   protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
2710       final boolean waitTillAllAssigned, final long deadline) throws InterruptedException {
2711     // We're not synchronizing on regionsInTransition now because we don't use any iterator.
2712     while (!regionSet.isEmpty() && !server.isStopped() && deadline > System.currentTimeMillis()) {
2713       int failedOpenCount = 0;
2714       Iterator<HRegionInfo> regionInfoIterator = regionSet.iterator();
2715       while (regionInfoIterator.hasNext()) {
2716         HRegionInfo hri = regionInfoIterator.next();
2717         if (regionStates.isRegionOnline(hri) || regionStates.isRegionInState(hri,
2718             State.SPLITTING, State.SPLIT, State.MERGING, State.MERGED)) {
2719           regionInfoIterator.remove();
2720         } else if (regionStates.isRegionInState(hri, State.FAILED_OPEN)) {
2721           failedOpenCount++;
2722         }
2723       }
2724       if (!waitTillAllAssigned) {
2725         // No need to wait, let assignment going on asynchronously
2726         break;
2727       }
2728       if (!regionSet.isEmpty()) {
2729         if (failedOpenCount == regionSet.size()) {
2730           // all the regions we are waiting had an error on open.
2731           break;
2732         }
2733         regionStates.waitForUpdate(100);
2734       }
2735     }
2736     return regionSet.isEmpty();
2737   }
2738 
2739   /**
2740    * Assigns the hbase:meta region or a replica.
2741    * <p>
2742    * Assumes that hbase:meta is currently closed and is not being actively served by
2743    * any RegionServer.
2744    * <p>
2745    * Forcibly unsets the current meta region location in ZooKeeper and assigns
2746    * hbase:meta to a random RegionServer.
2747    * @param hri TODO
2748    * @throws KeeperException
2749    */
assignMeta(HRegionInfo hri)2750   public void assignMeta(HRegionInfo hri) throws KeeperException {
2751     this.server.getMetaTableLocator().deleteMetaLocation(this.watcher, hri.getReplicaId());
2752     assign(hri, true);
2753   }
2754 
2755   /**
2756    * Assigns specified regions retaining assignments, if any.
2757    * <p>
2758    * This is a synchronous call and will return once every region has been
2759    * assigned.  If anything fails, an exception is thrown
2760    * @throws InterruptedException
2761    * @throws IOException
2762    */
assign(Map<HRegionInfo, ServerName> regions)2763   public void assign(Map<HRegionInfo, ServerName> regions)
2764         throws IOException, InterruptedException {
2765     if (regions == null || regions.isEmpty()) {
2766       return;
2767     }
2768     List<ServerName> servers = serverManager.createDestinationServersList();
2769     if (servers == null || servers.isEmpty()) {
2770       throw new IOException("Found no destination server to assign region(s)");
2771     }
2772 
2773     // Reuse existing assignment info
2774     Map<ServerName, List<HRegionInfo>> bulkPlan =
2775       balancer.retainAssignment(regions, servers);
2776     if (bulkPlan == null) {
2777       throw new IOException("Unable to determine a plan to assign region(s)");
2778     }
2779 
2780     assign(regions.size(), servers.size(),
2781       "retainAssignment=true", bulkPlan);
2782   }
2783 
2784   /**
2785    * Assigns specified regions round robin, if any.
2786    * <p>
2787    * This is a synchronous call and will return once every region has been
2788    * assigned.  If anything fails, an exception is thrown
2789    * @throws InterruptedException
2790    * @throws IOException
2791    */
assign(List<HRegionInfo> regions)2792   public void assign(List<HRegionInfo> regions)
2793         throws IOException, InterruptedException {
2794     if (regions == null || regions.isEmpty()) {
2795       return;
2796     }
2797 
2798     List<ServerName> servers = serverManager.createDestinationServersList();
2799     if (servers == null || servers.isEmpty()) {
2800       throw new IOException("Found no destination server to assign region(s)");
2801     }
2802 
2803     // Generate a round-robin bulk assignment plan
2804     Map<ServerName, List<HRegionInfo>> bulkPlan = balancer.roundRobinAssignment(regions, servers);
2805     if (bulkPlan == null) {
2806       throw new IOException("Unable to determine a plan to assign region(s)");
2807     }
2808 
2809     processFavoredNodes(regions);
2810     assign(regions.size(), servers.size(), "round-robin=true", bulkPlan);
2811   }
2812 
assign(int regions, int totalServers, String message, Map<ServerName, List<HRegionInfo>> bulkPlan)2813   private void assign(int regions, int totalServers,
2814       String message, Map<ServerName, List<HRegionInfo>> bulkPlan)
2815           throws InterruptedException, IOException {
2816 
2817     int servers = bulkPlan.size();
2818     if (servers == 1 || (regions < bulkAssignThresholdRegions
2819         && servers < bulkAssignThresholdServers)) {
2820 
2821       // Not use bulk assignment.  This could be more efficient in small
2822       // cluster, especially mini cluster for testing, so that tests won't time out
2823       if (LOG.isTraceEnabled()) {
2824         LOG.trace("Not using bulk assignment since we are assigning only " + regions +
2825           " region(s) to " + servers + " server(s)");
2826       }
2827 
2828       // invoke assignment (async)
2829       ArrayList<HRegionInfo> userRegionSet = new ArrayList<HRegionInfo>(regions);
2830       for (Map.Entry<ServerName, List<HRegionInfo>> plan: bulkPlan.entrySet()) {
2831         if (!assign(plan.getKey(), plan.getValue())) {
2832           for (HRegionInfo region: plan.getValue()) {
2833             if (!regionStates.isRegionOnline(region)) {
2834               invokeAssign(region);
2835               if (!region.getTable().isSystemTable()) {
2836                 userRegionSet.add(region);
2837               }
2838             }
2839           }
2840         }
2841       }
2842 
2843       // wait for assignment completion
2844       if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
2845             System.currentTimeMillis())) {
2846         LOG.debug("some user regions are still in transition: " + userRegionSet);
2847       }
2848     } else {
2849       LOG.info("Bulk assigning " + regions + " region(s) across "
2850         + totalServers + " server(s), " + message);
2851 
2852       // Use fixed count thread pool assigning.
2853       BulkAssigner ba = new GeneralBulkAssigner(
2854         this.server, bulkPlan, this, bulkAssignWaitTillAllAssigned);
2855       ba.bulkAssign();
2856       LOG.info("Bulk assigning done");
2857     }
2858   }
2859 
2860   /**
2861    * Assigns all user regions, if any exist.  Used during cluster startup.
2862    * <p>
2863    * This is a synchronous call and will return once every region has been
2864    * assigned.  If anything fails, an exception is thrown and the cluster
2865    * should be shutdown.
2866    * @throws InterruptedException
2867    * @throws IOException
2868    */
assignAllUserRegions(Map<HRegionInfo, ServerName> allRegions)2869   private void assignAllUserRegions(Map<HRegionInfo, ServerName> allRegions)
2870       throws IOException, InterruptedException {
2871     if (allRegions == null || allRegions.isEmpty()) return;
2872 
2873     // Determine what type of assignment to do on startup
2874     boolean retainAssignment = server.getConfiguration().
2875       getBoolean("hbase.master.startup.retainassign", true);
2876 
2877     Set<HRegionInfo> regionsFromMetaScan = allRegions.keySet();
2878     if (retainAssignment) {
2879       assign(allRegions);
2880     } else {
2881       List<HRegionInfo> regions = new ArrayList<HRegionInfo>(regionsFromMetaScan);
2882       assign(regions);
2883     }
2884 
2885     for (HRegionInfo hri : regionsFromMetaScan) {
2886       TableName tableName = hri.getTable();
2887       if (!tableStateManager.isTableState(tableName,
2888           ZooKeeperProtos.Table.State.ENABLED)) {
2889         setEnabledTable(tableName);
2890       }
2891     }
2892     // assign all the replicas that were not recorded in the meta
2893     assign(replicaRegionsNotRecordedInMeta(regionsFromMetaScan, server));
2894   }
2895 
2896   /**
2897    * Get a list of replica regions that are:
2898    * not recorded in meta yet. We might not have recorded the locations
2899    * for the replicas since the replicas may not have been online yet, master restarted
2900    * in the middle of assigning, ZK erased, etc.
2901    * @param regionsRecordedInMeta the list of regions we know are recorded in meta
2902    * either as a default, or, as the location of a replica
2903    * @param master
2904    * @return list of replica regions
2905    * @throws IOException
2906    */
replicaRegionsNotRecordedInMeta( Set<HRegionInfo> regionsRecordedInMeta, MasterServices master)2907   public static List<HRegionInfo> replicaRegionsNotRecordedInMeta(
2908       Set<HRegionInfo> regionsRecordedInMeta, MasterServices master)throws IOException {
2909     List<HRegionInfo> regionsNotRecordedInMeta = new ArrayList<HRegionInfo>();
2910     for (HRegionInfo hri : regionsRecordedInMeta) {
2911       TableName table = hri.getTable();
2912       HTableDescriptor htd = master.getTableDescriptors().get(table);
2913       // look at the HTD for the replica count. That's the source of truth
2914       int desiredRegionReplication = htd.getRegionReplication();
2915       for (int i = 0; i < desiredRegionReplication; i++) {
2916         HRegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hri, i);
2917         if (regionsRecordedInMeta.contains(replica)) continue;
2918         regionsNotRecordedInMeta.add(replica);
2919       }
2920     }
2921     return regionsNotRecordedInMeta;
2922   }
2923 
2924   /**
2925    * Wait until no regions in transition.
2926    * @param timeout How long to wait.
2927    * @return True if nothing in regions in transition.
2928    * @throws InterruptedException
2929    */
waitUntilNoRegionsInTransition(final long timeout)2930   boolean waitUntilNoRegionsInTransition(final long timeout)
2931       throws InterruptedException {
2932     // Blocks until there are no regions in transition. It is possible that
2933     // there
2934     // are regions in transition immediately after this returns but guarantees
2935     // that if it returns without an exception that there was a period of time
2936     // with no regions in transition from the point-of-view of the in-memory
2937     // state of the Master.
2938     final long endTime = System.currentTimeMillis() + timeout;
2939 
2940     while (!this.server.isStopped() && regionStates.isRegionsInTransition()
2941         && endTime > System.currentTimeMillis()) {
2942       regionStates.waitForUpdate(100);
2943     }
2944 
2945     return !regionStates.isRegionsInTransition();
2946   }
2947 
2948   /**
2949    * Rebuild the list of user regions and assignment information.
2950    * Updates regionstates with findings as we go through list of regions.
2951    * @return set of servers not online that hosted some regions according to a scan of hbase:meta
2952    * @throws IOException
2953    */
rebuildUserRegions()2954   Set<ServerName> rebuildUserRegions() throws
2955       IOException, KeeperException, CoordinatedStateException {
2956     Set<TableName> disabledOrEnablingTables = tableStateManager.getTablesInStates(
2957       ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.ENABLING);
2958 
2959     Set<TableName> disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
2960       ZooKeeperProtos.Table.State.DISABLED,
2961       ZooKeeperProtos.Table.State.DISABLING,
2962       ZooKeeperProtos.Table.State.ENABLING);
2963 
2964     // Region assignment from META
2965     List<Result> results = MetaTableAccessor.fullScanOfMeta(server.getConnection());
2966     // Get any new but slow to checkin region server that joined the cluster
2967     Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
2968     // Set of offline servers to be returned
2969     Set<ServerName> offlineServers = new HashSet<ServerName>();
2970     // Iterate regions in META
2971     for (Result result : results) {
2972       if (result == null && LOG.isDebugEnabled()){
2973         LOG.debug("null result from meta - ignoring but this is strange.");
2974         continue;
2975       }
2976       // keep a track of replicas to close. These were the replicas of the originally
2977       // unmerged regions. The master might have closed them before but it mightn't
2978       // maybe because it crashed.
2979       PairOfSameType<HRegionInfo> p = MetaTableAccessor.getMergeRegions(result);
2980       if (p.getFirst() != null && p.getSecond() != null) {
2981         int numReplicas = server.getTableDescriptors().get(p.getFirst().
2982             getTable()).getRegionReplication();
2983         for (HRegionInfo merge : p) {
2984           for (int i = 1; i < numReplicas; i++) {
2985             replicasToClose.add(RegionReplicaUtil.getRegionInfoForReplica(merge, i));
2986           }
2987         }
2988       }
2989       RegionLocations rl =  MetaTableAccessor.getRegionLocations(result);
2990       if (rl == null) continue;
2991       HRegionLocation[] locations = rl.getRegionLocations();
2992       if (locations == null) continue;
2993       for (HRegionLocation hrl : locations) {
2994         if (hrl == null) continue;
2995         HRegionInfo regionInfo = hrl.getRegionInfo();
2996         if (regionInfo == null) continue;
2997         int replicaId = regionInfo.getReplicaId();
2998         State state = RegionStateStore.getRegionState(result, replicaId);
2999         // keep a track of replicas to close. These were the replicas of the split parents
3000         // from the previous life of the master. The master should have closed them before
3001         // but it couldn't maybe because it crashed
3002         if (replicaId == 0 && state.equals(State.SPLIT)) {
3003           for (HRegionLocation h : locations) {
3004             replicasToClose.add(h.getRegionInfo());
3005           }
3006         }
3007         ServerName lastHost = hrl.getServerName();
3008         ServerName regionLocation = RegionStateStore.getRegionServer(result, replicaId);
3009         if (tableStateManager.isTableState(regionInfo.getTable(),
3010              ZooKeeperProtos.Table.State.DISABLED)) {
3011           // force region to forget it hosts for disabled/disabling tables.
3012           // see HBASE-13326
3013           lastHost = null;
3014           regionLocation = null;
3015         }
3016         regionStates.createRegionState(regionInfo, state, regionLocation, lastHost);
3017         if (!regionStates.isRegionInState(regionInfo, State.OPEN)) {
3018           // Region is not open (either offline or in transition), skip
3019           continue;
3020         }
3021         TableName tableName = regionInfo.getTable();
3022         if (!onlineServers.contains(regionLocation)) {
3023           // Region is located on a server that isn't online
3024           offlineServers.add(regionLocation);
3025           if (useZKForAssignment) {
3026             regionStates.regionOffline(regionInfo);
3027           }
3028         } else if (!disabledOrEnablingTables.contains(tableName)) {
3029           // Region is being served and on an active server
3030           // add only if region not in disabled or enabling table
3031           regionStates.regionOnline(regionInfo, regionLocation);
3032           balancer.regionOnline(regionInfo, regionLocation);
3033         } else if (useZKForAssignment) {
3034           regionStates.regionOffline(regionInfo);
3035         }
3036         // need to enable the table if not disabled or disabling or enabling
3037         // this will be used in rolling restarts
3038         if (!disabledOrDisablingOrEnabling.contains(tableName)
3039           && !getTableStateManager().isTableState(tableName,
3040             ZooKeeperProtos.Table.State.ENABLED)) {
3041           setEnabledTable(tableName);
3042         }
3043       }
3044     }
3045     return offlineServers;
3046   }
3047 
3048   /**
3049    * Recover the tables that were not fully moved to DISABLED state. These
3050    * tables are in DISABLING state when the master restarted/switched.
3051    *
3052    * @throws KeeperException
3053    * @throws TableNotFoundException
3054    * @throws IOException
3055    */
recoverTableInDisablingState()3056   private void recoverTableInDisablingState()
3057       throws KeeperException, IOException, CoordinatedStateException {
3058     Set<TableName> disablingTables =
3059       tableStateManager.getTablesInStates(ZooKeeperProtos.Table.State.DISABLING);
3060     if (disablingTables.size() != 0) {
3061       for (TableName tableName : disablingTables) {
3062         // Recover by calling DisableTableHandler
3063         LOG.info("The table " + tableName
3064             + " is in DISABLING state.  Hence recovering by moving the table"
3065             + " to DISABLED state.");
3066         new DisableTableHandler(this.server, tableName,
3067             this, tableLockManager, true).prepare().process();
3068       }
3069     }
3070   }
3071 
3072   /**
3073    * Recover the tables that are not fully moved to ENABLED state. These tables
3074    * are in ENABLING state when the master restarted/switched
3075    *
3076    * @throws KeeperException
3077    * @throws org.apache.hadoop.hbase.TableNotFoundException
3078    * @throws IOException
3079    */
recoverTableInEnablingState()3080   private void recoverTableInEnablingState()
3081       throws KeeperException, IOException, CoordinatedStateException {
3082     Set<TableName> enablingTables = tableStateManager.
3083       getTablesInStates(ZooKeeperProtos.Table.State.ENABLING);
3084     if (enablingTables.size() != 0) {
3085       for (TableName tableName : enablingTables) {
3086         // Recover by calling EnableTableHandler
3087         LOG.info("The table " + tableName
3088             + " is in ENABLING state.  Hence recovering by moving the table"
3089             + " to ENABLED state.");
3090         // enableTable in sync way during master startup,
3091         // no need to invoke coprocessor
3092         EnableTableHandler eth = new EnableTableHandler(this.server, tableName,
3093           this, tableLockManager, true);
3094         try {
3095           eth.prepare();
3096         } catch (TableNotFoundException e) {
3097           LOG.warn("Table " + tableName + " not found in hbase:meta to recover.");
3098           continue;
3099         }
3100         eth.process();
3101       }
3102     }
3103   }
3104 
3105   /**
3106    * Processes list of dead servers from result of hbase:meta scan and regions in RIT.
3107    * This is used for failover to recover the lost regions that belonged to
3108    * RegionServers which failed while there was no active master or are offline for whatever
3109    * reason and for regions that were in RIT.
3110    *
3111    * @param deadServers
3112    *          The list of dead servers which failed while there was no active master. Can be null.
3113    * @throws IOException
3114    * @throws KeeperException
3115    */
processDeadServersAndRecoverLostRegions(Set<ServerName> deadServers)3116   private void processDeadServersAndRecoverLostRegions(Set<ServerName> deadServers)
3117   throws IOException, KeeperException {
3118     if (deadServers != null && !deadServers.isEmpty()) {
3119       for (ServerName serverName: deadServers) {
3120         if (!serverManager.isServerDead(serverName)) {
3121           serverManager.expireServer(serverName); // Let SSH do region re-assign
3122         }
3123       }
3124     }
3125 
3126     List<String> nodes = useZKForAssignment ?
3127       ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.assignmentZNode)
3128       : ZKUtil.listChildrenNoWatch(watcher, watcher.assignmentZNode);
3129     if (nodes != null && !nodes.isEmpty()) {
3130       for (String encodedRegionName : nodes) {
3131         processRegionInTransition(encodedRegionName, null);
3132       }
3133     } else if (!useZKForAssignment) {
3134       processRegionInTransitionZkLess();
3135     }
3136   }
3137 
processRegionInTransitionZkLess()3138   void processRegionInTransitionZkLess() {
3139     // We need to send RPC call again for PENDING_OPEN/PENDING_CLOSE regions
3140     // in case the RPC call is not sent out yet before the master was shut down
3141     // since we update the state before we send the RPC call. We can't update
3142     // the state after the RPC call. Otherwise, we don't know what's happened
3143     // to the region if the master dies right after the RPC call is out.
3144     Map<String, RegionState> rits = regionStates.getRegionsInTransition();
3145     for (RegionState regionState : rits.values()) {
3146       LOG.info("Processing " + regionState);
3147       ServerName serverName = regionState.getServerName();
3148       // Server could be null in case of FAILED_OPEN when master cannot find a region plan. In that
3149       // case, try assigning it here.
3150       if (serverName != null
3151           && !serverManager.getOnlineServers().containsKey(serverName)) {
3152         LOG.info("Server " + serverName + " isn't online. SSH will handle this");
3153         continue;
3154       }
3155       HRegionInfo regionInfo = regionState.getRegion();
3156       State state = regionState.getState();
3157 
3158       switch (state) {
3159       case CLOSED:
3160         invokeAssign(regionInfo);
3161         break;
3162       case PENDING_OPEN:
3163         retrySendRegionOpen(regionState);
3164         break;
3165       case PENDING_CLOSE:
3166         retrySendRegionClose(regionState);
3167         break;
3168       case FAILED_CLOSE:
3169       case FAILED_OPEN:
3170         invokeUnAssign(regionInfo);
3171         break;
3172       default:
3173         // No process for other states
3174       }
3175     }
3176   }
3177 
3178   /**
3179    * At master failover, for pending_open region, make sure
3180    * sendRegionOpen RPC call is sent to the target regionserver
3181    */
retrySendRegionOpen(final RegionState regionState)3182   private void retrySendRegionOpen(final RegionState regionState) {
3183     this.executorService.submit(
3184       new EventHandler(server, EventType.M_MASTER_RECOVERY) {
3185         @Override
3186         public void process() throws IOException {
3187           HRegionInfo hri = regionState.getRegion();
3188           ServerName serverName = regionState.getServerName();
3189           ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
3190           try {
3191             for (int i = 1; i <= maximumAttempts; i++) {
3192               if (!serverManager.isServerOnline(serverName)
3193                   || server.isStopped() || server.isAborted()) {
3194                 return; // No need any more
3195               }
3196               try {
3197                 if (!regionState.equals(regionStates.getRegionState(hri))) {
3198                   return; // Region is not in the expected state any more
3199                 }
3200                 List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
3201                 if (shouldAssignRegionsWithFavoredNodes) {
3202                   favoredNodes = ((FavoredNodeLoadBalancer)balancer).getFavoredNodes(hri);
3203                 }
3204                 RegionOpeningState regionOpenState = serverManager.sendRegionOpen(
3205                   serverName, hri, -1, favoredNodes);
3206 
3207                 if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
3208                   // Failed opening this region, this means the target server didn't get
3209                   // the original region open RPC, so re-assign it with a new plan
3210                   LOG.debug("Got failed_opening in retry sendRegionOpen for "
3211                     + regionState + ", re-assign it");
3212                   invokeAssign(hri, true);
3213                 }
3214                 return; // Done.
3215               } catch (Throwable t) {
3216                 if (t instanceof RemoteException) {
3217                   t = ((RemoteException) t).unwrapRemoteException();
3218                 }
3219                 // In case SocketTimeoutException/FailedServerException, retry
3220                 if (t instanceof java.net.SocketTimeoutException
3221                     || t instanceof FailedServerException) {
3222                   Threads.sleep(100);
3223                   continue;
3224                 }
3225                 // For other exceptions, re-assign it
3226                 LOG.debug("Got exception in retry sendRegionOpen for "
3227                   + regionState + ", re-assign it", t);
3228                 invokeAssign(hri);
3229                 return; // Done.
3230               }
3231             }
3232           } finally {
3233             lock.unlock();
3234           }
3235         }
3236       });
3237   }
3238 
3239   /**
3240    * At master failover, for pending_close region, make sure
3241    * sendRegionClose RPC call is sent to the target regionserver
3242    */
retrySendRegionClose(final RegionState regionState)3243   private void retrySendRegionClose(final RegionState regionState) {
3244     this.executorService.submit(
3245       new EventHandler(server, EventType.M_MASTER_RECOVERY) {
3246         @Override
3247         public void process() throws IOException {
3248           HRegionInfo hri = regionState.getRegion();
3249           ServerName serverName = regionState.getServerName();
3250           ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
3251           try {
3252             for (int i = 1; i <= maximumAttempts; i++) {
3253               if (!serverManager.isServerOnline(serverName)
3254                   || server.isStopped() || server.isAborted()) {
3255                 return; // No need any more
3256               }
3257               try {
3258                 if (!regionState.equals(regionStates.getRegionState(hri))) {
3259                   return; // Region is not in the expected state any more
3260                 }
3261                 if (!serverManager.sendRegionClose(serverName, hri, -1, null, false)) {
3262                   // This means the region is still on the target server
3263                   LOG.debug("Got false in retry sendRegionClose for "
3264                     + regionState + ", re-close it");
3265                   invokeUnAssign(hri);
3266                 }
3267                 return; // Done.
3268               } catch (Throwable t) {
3269                 if (t instanceof RemoteException) {
3270                   t = ((RemoteException) t).unwrapRemoteException();
3271                 }
3272                 // In case SocketTimeoutException/FailedServerException, retry
3273                 if (t instanceof java.net.SocketTimeoutException
3274                     || t instanceof FailedServerException) {
3275                   Threads.sleep(100);
3276                   continue;
3277                 }
3278                 if (!(t instanceof NotServingRegionException
3279                     || t instanceof RegionAlreadyInTransitionException)) {
3280                   // NotServingRegionException/RegionAlreadyInTransitionException
3281                   // means the target server got the original region close request.
3282                   // For other exceptions, re-close it
3283                   LOG.debug("Got exception in retry sendRegionClose for "
3284                     + regionState + ", re-close it", t);
3285                   invokeUnAssign(hri);
3286                 }
3287                 return; // Done.
3288               }
3289             }
3290           } finally {
3291             lock.unlock();
3292           }
3293         }
3294       });
3295   }
3296 
3297   /**
3298    * Set Regions in transitions metrics.
3299    * This takes an iterator on the RegionInTransition map (CLSM), and is not synchronized.
3300    * This iterator is not fail fast, which may lead to stale read; but that's better than
3301    * creating a copy of the map for metrics computation, as this method will be invoked
3302    * on a frequent interval.
3303    */
updateRegionsInTransitionMetrics()3304   public void updateRegionsInTransitionMetrics() {
3305     long currentTime = System.currentTimeMillis();
3306     int totalRITs = 0;
3307     int totalRITsOverThreshold = 0;
3308     long oldestRITTime = 0;
3309     int ritThreshold = this.server.getConfiguration().
3310       getInt(HConstants.METRICS_RIT_STUCK_WARNING_THRESHOLD, 60000);
3311     for (RegionState state: regionStates.getRegionsInTransition().values()) {
3312       totalRITs++;
3313       long ritTime = currentTime - state.getStamp();
3314       if (ritTime > ritThreshold) { // more than the threshold
3315         totalRITsOverThreshold++;
3316       }
3317       if (oldestRITTime < ritTime) {
3318         oldestRITTime = ritTime;
3319       }
3320     }
3321     if (this.metricsAssignmentManager != null) {
3322       this.metricsAssignmentManager.updateRITOldestAge(oldestRITTime);
3323       this.metricsAssignmentManager.updateRITCount(totalRITs);
3324       this.metricsAssignmentManager.updateRITCountOverThreshold(totalRITsOverThreshold);
3325     }
3326   }
3327 
3328   /**
3329    * @param region Region whose plan we are to clear.
3330    */
clearRegionPlan(final HRegionInfo region)3331   void clearRegionPlan(final HRegionInfo region) {
3332     synchronized (this.regionPlans) {
3333       this.regionPlans.remove(region.getEncodedName());
3334     }
3335   }
3336 
3337   /**
3338    * Wait on region to clear regions-in-transition.
3339    * @param hri Region to wait on.
3340    * @throws IOException
3341    */
waitOnRegionToClearRegionsInTransition(final HRegionInfo hri)3342   public void waitOnRegionToClearRegionsInTransition(final HRegionInfo hri)
3343       throws IOException, InterruptedException {
3344     waitOnRegionToClearRegionsInTransition(hri, -1L);
3345   }
3346 
3347   /**
3348    * Wait on region to clear regions-in-transition or time out
3349    * @param hri
3350    * @param timeOut Milliseconds to wait for current region to be out of transition state.
3351    * @return True when a region clears regions-in-transition before timeout otherwise false
3352    * @throws InterruptedException
3353    */
waitOnRegionToClearRegionsInTransition(final HRegionInfo hri, long timeOut)3354   public boolean waitOnRegionToClearRegionsInTransition(final HRegionInfo hri, long timeOut)
3355       throws InterruptedException {
3356     if (!regionStates.isRegionInTransition(hri)) return true;
3357     long end = (timeOut <= 0) ? Long.MAX_VALUE : EnvironmentEdgeManager.currentTime()
3358         + timeOut;
3359     // There is already a timeout monitor on regions in transition so I
3360     // should not have to have one here too?
3361     LOG.info("Waiting for " + hri.getEncodedName() +
3362         " to leave regions-in-transition, timeOut=" + timeOut + " ms.");
3363     while (!this.server.isStopped() && regionStates.isRegionInTransition(hri)) {
3364       regionStates.waitForUpdate(100);
3365       if (EnvironmentEdgeManager.currentTime() > end) {
3366         LOG.info("Timed out on waiting for " + hri.getEncodedName() + " to be assigned.");
3367         return false;
3368       }
3369     }
3370     if (this.server.isStopped()) {
3371       LOG.info("Giving up wait on regions in transition because stoppable.isStopped is set");
3372       return false;
3373     }
3374     return true;
3375   }
3376 
invokeAssign(HRegionInfo regionInfo)3377   void invokeAssign(HRegionInfo regionInfo) {
3378     invokeAssign(regionInfo, true);
3379   }
3380 
invokeAssign(HRegionInfo regionInfo, boolean newPlan)3381   void invokeAssign(HRegionInfo regionInfo, boolean newPlan) {
3382     threadPoolExecutorService.submit(new AssignCallable(this, regionInfo, newPlan));
3383   }
3384 
invokeUnAssign(HRegionInfo regionInfo)3385   void invokeUnAssign(HRegionInfo regionInfo) {
3386     threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
3387   }
3388 
isCarryingMeta(ServerName serverName)3389   public ServerHostRegion isCarryingMeta(ServerName serverName) {
3390     return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO);
3391   }
3392 
isCarryingMetaReplica(ServerName serverName, int replicaId)3393   public ServerHostRegion isCarryingMetaReplica(ServerName serverName, int replicaId) {
3394     return isCarryingRegion(serverName,
3395         RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO, replicaId));
3396   }
3397 
isCarryingMetaReplica(ServerName serverName, HRegionInfo metaHri)3398   public ServerHostRegion isCarryingMetaReplica(ServerName serverName, HRegionInfo metaHri) {
3399     return isCarryingRegion(serverName, metaHri);
3400   }
3401 
3402   /**
3403    * Check if the shutdown server carries the specific region.
3404    * We have a bunch of places that store region location
3405    * Those values aren't consistent. There is a delay of notification.
3406    * The location from zookeeper unassigned node has the most recent data;
3407    * but the node could be deleted after the region is opened by AM.
3408    * The AM's info could be old when OpenedRegionHandler
3409    * processing hasn't finished yet when server shutdown occurs.
3410    * @return whether the serverName currently hosts the region
3411    */
isCarryingRegion(ServerName serverName, HRegionInfo hri)3412   private ServerHostRegion isCarryingRegion(ServerName serverName, HRegionInfo hri) {
3413     RegionTransition rt = null;
3414     try {
3415       byte [] data = ZKAssign.getData(watcher, hri.getEncodedName());
3416       // This call can legitimately come by null
3417       rt = data == null? null: RegionTransition.parseFrom(data);
3418     } catch (KeeperException e) {
3419       server.abort("Exception reading unassigned node for region=" + hri.getEncodedName(), e);
3420     } catch (DeserializationException e) {
3421       server.abort("Exception parsing unassigned node for region=" + hri.getEncodedName(), e);
3422     }
3423 
3424     ServerName addressFromZK = rt != null? rt.getServerName():  null;
3425     if (addressFromZK != null) {
3426       // if we get something from ZK, we will use the data
3427       boolean matchZK = addressFromZK.equals(serverName);
3428       LOG.debug("Checking region=" + hri.getRegionNameAsString() + ", zk server=" + addressFromZK +
3429         " current=" + serverName + ", matches=" + matchZK);
3430       return matchZK ? ServerHostRegion.HOSTING_REGION : ServerHostRegion.NOT_HOSTING_REGION;
3431     }
3432 
3433     ServerName addressFromAM = regionStates.getRegionServerOfRegion(hri);
3434     if (LOG.isDebugEnabled()) {
3435       LOG.debug("based on AM, current region=" + hri.getRegionNameAsString() +
3436         " is on server=" + (addressFromAM != null ? addressFromAM : "null") +
3437         " server being checked: " + serverName);
3438     }
3439     if (addressFromAM != null) {
3440       return addressFromAM.equals(serverName) ?
3441           ServerHostRegion.HOSTING_REGION : ServerHostRegion.NOT_HOSTING_REGION;
3442     }
3443 
3444     if (hri.isMetaRegion() && RegionReplicaUtil.isDefaultReplica(hri)) {
3445       // For the Meta region (default replica), we can do one more check on MetaTableLocator
3446       final ServerName serverNameInZK =
3447           server.getMetaTableLocator().getMetaRegionLocation(this.server.getZooKeeper());
3448       if (LOG.isDebugEnabled()) {
3449         LOG.debug("Based on MetaTableLocator, the META region is on server=" +
3450           (serverNameInZK == null ? "null" : serverNameInZK) +
3451           " server being checked: " + serverName);
3452       }
3453       if (serverNameInZK != null) {
3454         return serverNameInZK.equals(serverName) ?
3455             ServerHostRegion.HOSTING_REGION : ServerHostRegion.NOT_HOSTING_REGION;
3456       }
3457     }
3458 
3459     // Checked everywhere, if reaching here, we are unsure whether the server is carrying region.
3460     return ServerHostRegion.UNKNOWN;
3461   }
3462 
3463   /**
3464    * Clean out crashed server removing any assignments.
3465    * @param sn Server that went down.
3466    * @return list of regions in transition on this server
3467    */
cleanOutCrashedServerReferences(final ServerName sn)3468   public List<HRegionInfo> cleanOutCrashedServerReferences(final ServerName sn) {
3469     // Clean out any existing assignment plans for this server
3470     synchronized (this.regionPlans) {
3471       for (Iterator <Map.Entry<String, RegionPlan>> i = this.regionPlans.entrySet().iterator();
3472           i.hasNext();) {
3473         Map.Entry<String, RegionPlan> e = i.next();
3474         ServerName otherSn = e.getValue().getDestination();
3475         // The name will be null if the region is planned for a random assign.
3476         if (otherSn != null && otherSn.equals(sn)) {
3477           // Use iterator's remove else we'll get CME
3478           i.remove();
3479         }
3480       }
3481     }
3482     List<HRegionInfo> regions = regionStates.serverOffline(watcher, sn);
3483     for (Iterator<HRegionInfo> it = regions.iterator(); it.hasNext(); ) {
3484       HRegionInfo hri = it.next();
3485       String encodedName = hri.getEncodedName();
3486 
3487       // We need a lock on the region as we could update it
3488       Lock lock = locker.acquireLock(encodedName);
3489       try {
3490         RegionState regionState = regionStates.getRegionTransitionState(encodedName);
3491         if (regionState == null
3492             || (regionState.getServerName() != null && !regionState.isOnServer(sn))
3493             || !(regionState.isFailedClose() || regionState.isOffline()
3494               || regionState.isPendingOpenOrOpening())) {
3495           LOG.info("Skip " + regionState + " since it is not opening/failed_close"
3496             + " on the dead server any more: " + sn);
3497           it.remove();
3498         } else {
3499           try {
3500             // Delete the ZNode if exists
3501             ZKAssign.deleteNodeFailSilent(watcher, hri);
3502           } catch (KeeperException ke) {
3503             server.abort("Unexpected ZK exception deleting node " + hri, ke);
3504           }
3505           if (tableStateManager.isTableState(hri.getTable(),
3506               ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3507             regionStates.regionOffline(hri);
3508             it.remove();
3509             continue;
3510           }
3511           // Mark the region offline and assign it again by SSH
3512           regionStates.updateRegionState(hri, State.OFFLINE);
3513         }
3514       } finally {
3515         lock.unlock();
3516       }
3517     }
3518     return regions;
3519   }
3520 
3521   /**
3522    * @param plan Plan to execute.
3523    */
balance(final RegionPlan plan)3524   public void balance(final RegionPlan plan) {
3525 
3526     HRegionInfo hri = plan.getRegionInfo();
3527     TableName tableName = hri.getTable();
3528     if (tableStateManager.isTableState(tableName,
3529       ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3530       LOG.info("Ignored moving region of disabling/disabled table "
3531         + tableName);
3532       return;
3533     }
3534 
3535     // Move the region only if it's assigned
3536     String encodedName = hri.getEncodedName();
3537     ReentrantLock lock = locker.acquireLock(encodedName);
3538     try {
3539       if (!regionStates.isRegionOnline(hri)) {
3540         RegionState state = regionStates.getRegionState(encodedName);
3541         LOG.info("Ignored moving region not assigned: " + hri + ", "
3542           + (state == null ? "not in region states" : state));
3543         return;
3544       }
3545       synchronized (this.regionPlans) {
3546         this.regionPlans.put(plan.getRegionName(), plan);
3547       }
3548       unassign(hri, false, plan.getDestination());
3549     } finally {
3550       lock.unlock();
3551     }
3552   }
3553 
stop()3554   public void stop() {
3555     shutdown(); // Stop executor service, etc
3556   }
3557 
3558   /**
3559    * Shutdown the threadpool executor service
3560    */
shutdown()3561   public void shutdown() {
3562     // It's an immediate shutdown, so we're clearing the remaining tasks.
3563     synchronized (zkEventWorkerWaitingList){
3564       zkEventWorkerWaitingList.clear();
3565     }
3566 
3567     // Shutdown the threadpool executor service
3568     threadPoolExecutorService.shutdownNow();
3569     zkEventWorkers.shutdownNow();
3570     regionStateStore.stop();
3571   }
3572 
setEnabledTable(TableName tableName)3573   protected void setEnabledTable(TableName tableName) {
3574     try {
3575       this.tableStateManager.setTableState(tableName,
3576         ZooKeeperProtos.Table.State.ENABLED);
3577     } catch (CoordinatedStateException e) {
3578       // here we can abort as it is the start up flow
3579       String errorMsg = "Unable to ensure that the table " + tableName
3580           + " will be" + " enabled because of a ZooKeeper issue";
3581       LOG.error(errorMsg);
3582       this.server.abort(errorMsg, e);
3583     }
3584   }
3585 
3586   /**
3587    * Set region as OFFLINED up in zookeeper asynchronously.
3588    * @param state
3589    * @return True if we succeeded, false otherwise (State was incorrect or failed
3590    * updating zk).
3591    */
asyncSetOfflineInZooKeeper(final RegionState state, final AsyncCallback.StringCallback cb, final ServerName destination)3592   private boolean asyncSetOfflineInZooKeeper(final RegionState state,
3593       final AsyncCallback.StringCallback cb, final ServerName destination) {
3594     if (!state.isClosed() && !state.isOffline()) {
3595       this.server.abort("Unexpected state trying to OFFLINE; " + state,
3596         new IllegalStateException());
3597       return false;
3598     }
3599     regionStates.updateRegionState(state.getRegion(), State.OFFLINE);
3600     try {
3601       ZKAssign.asyncCreateNodeOffline(watcher, state.getRegion(),
3602         destination, cb, state);
3603     } catch (KeeperException e) {
3604       if (e instanceof NodeExistsException) {
3605         LOG.warn("Node for " + state.getRegion() + " already exists");
3606       } else {
3607         server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
3608       }
3609       return false;
3610     }
3611     return true;
3612   }
3613 
deleteNodeInStates(String encodedName, String desc, ServerName sn, EventType... types)3614   private boolean deleteNodeInStates(String encodedName,
3615       String desc, ServerName sn, EventType... types) {
3616     try {
3617       for (EventType et: types) {
3618         if (ZKAssign.deleteNode(watcher, encodedName, et, sn)) {
3619           return true;
3620         }
3621       }
3622       LOG.info("Failed to delete the " + desc + " node for "
3623         + encodedName + ". The node type may not match");
3624     } catch (NoNodeException e) {
3625       if (LOG.isDebugEnabled()) {
3626         LOG.debug("The " + desc + " node for " + encodedName + " already deleted");
3627       }
3628     } catch (KeeperException ke) {
3629       server.abort("Unexpected ZK exception deleting " + desc
3630         + " node for the region " + encodedName, ke);
3631     }
3632     return false;
3633   }
3634 
deleteMergingNode(String encodedName, ServerName sn)3635   private void deleteMergingNode(String encodedName, ServerName sn) {
3636     deleteNodeInStates(encodedName, "merging", sn, EventType.RS_ZK_REGION_MERGING,
3637       EventType.RS_ZK_REQUEST_REGION_MERGE, EventType.RS_ZK_REGION_MERGED);
3638   }
3639 
deleteSplittingNode(String encodedName, ServerName sn)3640   private void deleteSplittingNode(String encodedName, ServerName sn) {
3641     deleteNodeInStates(encodedName, "splitting", sn, EventType.RS_ZK_REGION_SPLITTING,
3642       EventType.RS_ZK_REQUEST_REGION_SPLIT, EventType.RS_ZK_REGION_SPLIT);
3643   }
3644 
3645   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
3646       value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
3647       justification="Modification of Maps not ATOMIC!!!! FIX!!!")
onRegionFailedOpen( final HRegionInfo hri, final ServerName sn)3648   private void onRegionFailedOpen(
3649       final HRegionInfo hri, final ServerName sn) {
3650     String encodedName = hri.getEncodedName();
3651     // FindBugs: AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION Worth fixing!!!
3652     AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
3653     if (failedOpenCount == null) {
3654       failedOpenCount = new AtomicInteger();
3655       // No need to use putIfAbsent, or extra synchronization since
3656       // this whole handleRegion block is locked on the encoded region
3657       // name, and failedOpenTracker is updated only in this block
3658       // FindBugs: AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION
3659       failedOpenTracker.put(encodedName, failedOpenCount);
3660     }
3661     if (failedOpenCount.incrementAndGet() >= maximumAttempts && !hri.isMetaRegion()) {
3662       // FindBugs: AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION
3663       regionStates.updateRegionState(hri, State.FAILED_OPEN);
3664       // remove the tracking info to save memory, also reset
3665       // the count for next open initiative
3666       failedOpenTracker.remove(encodedName);
3667     } else {
3668       if (hri.isMetaRegion() && failedOpenCount.get() >= maximumAttempts) {
3669         // Log a warning message if a meta region failedOpenCount exceeds maximumAttempts
3670         // so that we are aware of potential problem if it persists for a long time.
3671         LOG.warn("Failed to open the hbase:meta region " +
3672             hri.getRegionNameAsString() + " after" +
3673             failedOpenCount.get() + " retries. Continue retrying.");
3674       }
3675 
3676       // Handle this the same as if it were opened and then closed.
3677       RegionState regionState = regionStates.updateRegionState(hri, State.CLOSED);
3678       if (regionState != null) {
3679         // When there are more than one region server a new RS is selected as the
3680         // destination and the same is updated in the region plan. (HBASE-5546)
3681         if (getTableStateManager().isTableState(hri.getTable(),
3682             ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING) ||
3683             replicasToClose.contains(hri)) {
3684           offlineDisabledRegion(hri);
3685           return;
3686         }
3687         // ZK Node is in CLOSED state, assign it.
3688          regionStates.updateRegionState(hri, RegionState.State.CLOSED);
3689         // This below has to do w/ online enable/disable of a table
3690         removeClosedRegion(hri);
3691         try {
3692           getRegionPlan(hri, sn, true);
3693         } catch (HBaseIOException e) {
3694           LOG.warn("Failed to get region plan", e);
3695         }
3696         invokeAssign(hri, false);
3697       }
3698     }
3699   }
3700 
onRegionOpen(final HRegionInfo hri, final ServerName sn, long openSeqNum)3701   private void onRegionOpen(final HRegionInfo hri, final ServerName sn, long openSeqNum) {
3702     regionOnline(hri, sn, openSeqNum);
3703     if (useZKForAssignment) {
3704       try {
3705         // Delete the ZNode if exists
3706         ZKAssign.deleteNodeFailSilent(watcher, hri);
3707       } catch (KeeperException ke) {
3708         server.abort("Unexpected ZK exception deleting node " + hri, ke);
3709       }
3710     }
3711 
3712     // reset the count, if any
3713     failedOpenTracker.remove(hri.getEncodedName());
3714     if (getTableStateManager().isTableState(hri.getTable(),
3715         ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3716       invokeUnAssign(hri);
3717     }
3718   }
3719 
onRegionClosed(final HRegionInfo hri)3720   private void onRegionClosed(final HRegionInfo hri) {
3721     if (getTableStateManager().isTableState(hri.getTable(),
3722         ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING) ||
3723         replicasToClose.contains(hri)) {
3724       offlineDisabledRegion(hri);
3725       return;
3726     }
3727     regionStates.updateRegionState(hri, RegionState.State.CLOSED);
3728     sendRegionClosedNotification(hri);
3729     // This below has to do w/ online enable/disable of a table
3730     removeClosedRegion(hri);
3731     invokeAssign(hri, false);
3732   }
3733 
checkInStateForSplit(ServerName sn, final HRegionInfo p, final HRegionInfo a, final HRegionInfo b)3734   private String checkInStateForSplit(ServerName sn,
3735       final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) {
3736     final RegionState rs_p = regionStates.getRegionState(p);
3737     RegionState rs_a = regionStates.getRegionState(a);
3738     RegionState rs_b = regionStates.getRegionState(b);
3739     if (!(rs_p.isOpenOrSplittingOnServer(sn)
3740         && (rs_a == null || rs_a.isOpenOrSplittingNewOnServer(sn))
3741         && (rs_b == null || rs_b.isOpenOrSplittingNewOnServer(sn)))) {
3742       return "Not in state good for split";
3743     }
3744     return "";
3745   }
3746 
onRegionSplitReverted(ServerName sn, final HRegionInfo p, final HRegionInfo a, final HRegionInfo b)3747   private String onRegionSplitReverted(ServerName sn,
3748       final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) {
3749     String s = checkInStateForSplit(sn, p, a, b);
3750     if (!org.apache.commons.lang.StringUtils.isEmpty(s)) {
3751       return s;
3752     }
3753     regionOnline(p, sn);
3754     regionOffline(a);
3755     regionOffline(b);
3756 
3757     if (getTableStateManager().isTableState(p.getTable(),
3758         ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3759       invokeUnAssign(p);
3760     }
3761     return null;
3762   }
3763 
onRegionSplit(ServerName sn, TransitionCode code, final HRegionInfo p, final HRegionInfo a, final HRegionInfo b)3764   private String onRegionSplit(ServerName sn, TransitionCode code,
3765       final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) {
3766     String s = checkInStateForSplit(sn, p, a, b);
3767     if (!org.apache.commons.lang.StringUtils.isEmpty(s)) {
3768       return s;
3769     }
3770 
3771     regionStates.updateRegionState(a, State.SPLITTING_NEW, sn);
3772     regionStates.updateRegionState(b, State.SPLITTING_NEW, sn);
3773     regionStates.updateRegionState(p, State.SPLITTING);
3774 
3775     if (code == TransitionCode.SPLIT) {
3776       if (TEST_SKIP_SPLIT_HANDLING) {
3777         return "Skipping split message, TEST_SKIP_SPLIT_HANDLING is set";
3778       }
3779       regionOffline(p, State.SPLIT);
3780       regionOnline(a, sn, 1);
3781       regionOnline(b, sn, 1);
3782 
3783       // User could disable the table before master knows the new region.
3784       if (getTableStateManager().isTableState(p.getTable(),
3785           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3786         invokeUnAssign(a);
3787         invokeUnAssign(b);
3788       } else {
3789         Callable<Object> splitReplicasCallable = new Callable<Object>() {
3790           @Override
3791           public Object call() {
3792             doSplittingOfReplicas(p, a, b);
3793             return null;
3794           }
3795         };
3796         threadPoolExecutorService.submit(splitReplicasCallable);
3797       }
3798     } else if (code == TransitionCode.SPLIT_PONR) {
3799       try {
3800         regionStates.splitRegion(p, a, b, sn);
3801       } catch (IOException ioe) {
3802         LOG.info("Failed to record split region " + p.getShortNameToLog());
3803         return "Failed to record the splitting in meta";
3804       }
3805     }
3806     return null;
3807   }
3808 
onRegionMerge(ServerName sn, TransitionCode code, final HRegionInfo p, final HRegionInfo a, final HRegionInfo b)3809   private String onRegionMerge(ServerName sn, TransitionCode code,
3810       final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) {
3811     RegionState rs_p = regionStates.getRegionState(p);
3812     RegionState rs_a = regionStates.getRegionState(a);
3813     RegionState rs_b = regionStates.getRegionState(b);
3814     if (!(rs_a.isOpenOrMergingOnServer(sn) && rs_b.isOpenOrMergingOnServer(sn)
3815         && (rs_p == null || rs_p.isOpenOrMergingNewOnServer(sn)))) {
3816       return "Not in state good for merge";
3817     }
3818 
3819     regionStates.updateRegionState(a, State.MERGING);
3820     regionStates.updateRegionState(b, State.MERGING);
3821     regionStates.updateRegionState(p, State.MERGING_NEW, sn);
3822 
3823     String encodedName = p.getEncodedName();
3824     if (code == TransitionCode.READY_TO_MERGE) {
3825       mergingRegions.put(encodedName,
3826         new PairOfSameType<HRegionInfo>(a, b));
3827     } else if (code == TransitionCode.MERGED) {
3828       mergingRegions.remove(encodedName);
3829       regionOffline(a, State.MERGED);
3830       regionOffline(b, State.MERGED);
3831       regionOnline(p, sn, 1);
3832 
3833       // User could disable the table before master knows the new region.
3834       if (getTableStateManager().isTableState(p.getTable(),
3835           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3836         invokeUnAssign(p);
3837       } else {
3838         Callable<Object> mergeReplicasCallable = new Callable<Object>() {
3839           @Override
3840           public Object call() {
3841             doMergingOfReplicas(p, a, b);
3842             return null;
3843           }
3844         };
3845         threadPoolExecutorService.submit(mergeReplicasCallable);
3846       }
3847     } else if (code == TransitionCode.MERGE_PONR) {
3848       try {
3849         regionStates.mergeRegions(p, a, b, sn);
3850       } catch (IOException ioe) {
3851         LOG.info("Failed to record merged region " + p.getShortNameToLog());
3852         return "Failed to record the merging in meta";
3853       }
3854     } else {
3855       mergingRegions.remove(encodedName);
3856       regionOnline(a, sn);
3857       regionOnline(b, sn);
3858       regionOffline(p);
3859 
3860       if (getTableStateManager().isTableState(p.getTable(),
3861           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3862         invokeUnAssign(a);
3863         invokeUnAssign(b);
3864       }
3865     }
3866     return null;
3867   }
3868 
3869   /**
3870    * A helper to handle region merging transition event.
3871    * It transitions merging regions to MERGING state.
3872    */
handleRegionMerging(final RegionTransition rt, final String encodedName, final String prettyPrintedRegionName, final ServerName sn)3873   private boolean handleRegionMerging(final RegionTransition rt, final String encodedName,
3874       final String prettyPrintedRegionName, final ServerName sn) {
3875     if (!serverManager.isServerOnline(sn)) {
3876       LOG.warn("Dropped merging! ServerName=" + sn + " unknown.");
3877       return false;
3878     }
3879     byte [] payloadOfMerging = rt.getPayload();
3880     List<HRegionInfo> mergingRegions;
3881     try {
3882       mergingRegions = HRegionInfo.parseDelimitedFrom(
3883         payloadOfMerging, 0, payloadOfMerging.length);
3884     } catch (IOException e) {
3885       LOG.error("Dropped merging! Failed reading "  + rt.getEventType()
3886         + " payload for " + prettyPrintedRegionName);
3887       return false;
3888     }
3889     assert mergingRegions.size() == 3;
3890     HRegionInfo p = mergingRegions.get(0);
3891     HRegionInfo hri_a = mergingRegions.get(1);
3892     HRegionInfo hri_b = mergingRegions.get(2);
3893 
3894     RegionState rs_p = regionStates.getRegionState(p);
3895     RegionState rs_a = regionStates.getRegionState(hri_a);
3896     RegionState rs_b = regionStates.getRegionState(hri_b);
3897 
3898     if (!((rs_a == null || rs_a.isOpenOrMergingOnServer(sn))
3899         && (rs_b == null || rs_b.isOpenOrMergingOnServer(sn))
3900         && (rs_p == null || rs_p.isOpenOrMergingNewOnServer(sn)))) {
3901       LOG.warn("Dropped merging! Not in state good for MERGING; rs_p="
3902         + rs_p + ", rs_a=" + rs_a + ", rs_b=" + rs_b);
3903       return false;
3904     }
3905 
3906     EventType et = rt.getEventType();
3907     if (et == EventType.RS_ZK_REQUEST_REGION_MERGE) {
3908       try {
3909         RegionMergeCoordination.RegionMergeDetails std =
3910             ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
3911                 .getRegionMergeCoordination().getDefaultDetails();
3912         ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
3913             .getRegionMergeCoordination().processRegionMergeRequest(p, hri_a, hri_b, sn, std);
3914         if (((ZkRegionMergeCoordination.ZkRegionMergeDetails) std).getZnodeVersion() == -1) {
3915           byte[] data = ZKAssign.getData(watcher, encodedName);
3916          EventType currentType = null;
3917           if (data != null) {
3918             RegionTransition newRt = RegionTransition.parseFrom(data);
3919             currentType = newRt.getEventType();
3920           }
3921           if (currentType == null || (currentType != EventType.RS_ZK_REGION_MERGED
3922               && currentType != EventType.RS_ZK_REGION_MERGING)) {
3923             LOG.warn("Failed to transition pending_merge node "
3924               + encodedName + " to merging, it's now " + currentType);
3925             return false;
3926           }
3927         }
3928       } catch (Exception e) {
3929         LOG.warn("Failed to transition pending_merge node "
3930           + encodedName + " to merging", e);
3931         return false;
3932       }
3933     }
3934 
3935     synchronized (regionStates) {
3936       regionStates.updateRegionState(hri_a, State.MERGING);
3937       regionStates.updateRegionState(hri_b, State.MERGING);
3938       regionStates.updateRegionState(p, State.MERGING_NEW, sn);
3939 
3940       if (et != EventType.RS_ZK_REGION_MERGED) {
3941         this.mergingRegions.put(encodedName,
3942           new PairOfSameType<HRegionInfo>(hri_a, hri_b));
3943       } else {
3944         this.mergingRegions.remove(encodedName);
3945         regionOffline(hri_a, State.MERGED);
3946         regionOffline(hri_b, State.MERGED);
3947         regionOnline(p, sn);
3948       }
3949     }
3950 
3951     if (et == EventType.RS_ZK_REGION_MERGED) {
3952       doMergingOfReplicas(p, hri_a, hri_b);
3953       LOG.debug("Handling MERGED event for " + encodedName + "; deleting node");
3954       // Remove region from ZK
3955       try {
3956         boolean successful = false;
3957         while (!successful) {
3958           // It's possible that the RS tickles in between the reading of the
3959           // znode and the deleting, so it's safe to retry.
3960           successful = ZKAssign.deleteNode(watcher, encodedName,
3961             EventType.RS_ZK_REGION_MERGED, sn);
3962         }
3963       } catch (KeeperException e) {
3964         if (e instanceof NoNodeException) {
3965           String znodePath = ZKUtil.joinZNode(watcher.splitLogZNode, encodedName);
3966           LOG.debug("The znode " + znodePath + " does not exist.  May be deleted already.");
3967         } else {
3968           server.abort("Error deleting MERGED node " + encodedName, e);
3969         }
3970       }
3971       LOG.info("Handled MERGED event; merged=" + p.getRegionNameAsString()
3972         + ", region_a=" + hri_a.getRegionNameAsString() + ", region_b="
3973         + hri_b.getRegionNameAsString() + ", on " + sn);
3974 
3975       // User could disable the table before master knows the new region.
3976       if (tableStateManager.isTableState(p.getTable(),
3977           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3978         unassign(p);
3979       }
3980     }
3981     return true;
3982   }
3983 
3984   /**
3985    * A helper to handle region splitting transition event.
3986    */
handleRegionSplitting(final RegionTransition rt, final String encodedName, final String prettyPrintedRegionName, final ServerName sn)3987   private boolean handleRegionSplitting(final RegionTransition rt, final String encodedName,
3988       final String prettyPrintedRegionName, final ServerName sn) {
3989     if (!serverManager.isServerOnline(sn)) {
3990       LOG.warn("Dropped splitting! ServerName=" + sn + " unknown.");
3991       return false;
3992     }
3993     byte [] payloadOfSplitting = rt.getPayload();
3994     List<HRegionInfo> splittingRegions;
3995     try {
3996       splittingRegions = HRegionInfo.parseDelimitedFrom(
3997         payloadOfSplitting, 0, payloadOfSplitting.length);
3998     } catch (IOException e) {
3999       LOG.error("Dropped splitting! Failed reading " + rt.getEventType()
4000         + " payload for " + prettyPrintedRegionName);
4001       return false;
4002     }
4003     assert splittingRegions.size() == 2;
4004     HRegionInfo hri_a = splittingRegions.get(0);
4005     HRegionInfo hri_b = splittingRegions.get(1);
4006 
4007     RegionState rs_p = regionStates.getRegionState(encodedName);
4008     RegionState rs_a = regionStates.getRegionState(hri_a);
4009     RegionState rs_b = regionStates.getRegionState(hri_b);
4010 
4011     if (!((rs_p == null || rs_p.isOpenOrSplittingOnServer(sn))
4012         && (rs_a == null || rs_a.isOpenOrSplittingNewOnServer(sn))
4013         && (rs_b == null || rs_b.isOpenOrSplittingNewOnServer(sn)))) {
4014       LOG.warn("Dropped splitting! Not in state good for SPLITTING; rs_p="
4015         + rs_p + ", rs_a=" + rs_a + ", rs_b=" + rs_b);
4016       return false;
4017     }
4018 
4019     if (rs_p == null) {
4020       // Splitting region should be online
4021       rs_p = regionStates.updateRegionState(rt, State.OPEN);
4022       if (rs_p == null) {
4023         LOG.warn("Received splitting for region " + prettyPrintedRegionName
4024           + " from server " + sn + " but it doesn't exist anymore,"
4025           + " probably already processed its split");
4026         return false;
4027       }
4028       regionStates.regionOnline(rs_p.getRegion(), sn);
4029     }
4030 
4031     HRegionInfo p = rs_p.getRegion();
4032     EventType et = rt.getEventType();
4033     if (et == EventType.RS_ZK_REQUEST_REGION_SPLIT) {
4034       try {
4035         SplitTransactionDetails std =
4036             ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
4037                 .getSplitTransactionCoordination().getDefaultDetails();
4038         if (((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
4039             .getSplitTransactionCoordination().processTransition(p, hri_a, hri_b, sn, std) == -1) {
4040           byte[] data = ZKAssign.getData(watcher, encodedName);
4041           EventType currentType = null;
4042           if (data != null) {
4043             RegionTransition newRt = RegionTransition.parseFrom(data);
4044             currentType = newRt.getEventType();
4045           }
4046           if (currentType == null
4047               || (currentType != EventType.RS_ZK_REGION_SPLIT && currentType != EventType.RS_ZK_REGION_SPLITTING)) {
4048             LOG.warn("Failed to transition pending_split node " + encodedName
4049                 + " to splitting, it's now " + currentType);
4050             return false;
4051           }
4052         }
4053       } catch (Exception e) {
4054         LOG.warn("Failed to transition pending_split node " + encodedName + " to splitting", e);
4055         return false;
4056       }
4057     }
4058 
4059     synchronized (regionStates) {
4060       splitRegions.put(p, new PairOfSameType<HRegionInfo>(hri_a, hri_b));
4061       regionStates.updateRegionState(hri_a, State.SPLITTING_NEW, sn);
4062       regionStates.updateRegionState(hri_b, State.SPLITTING_NEW, sn);
4063       regionStates.updateRegionState(rt, State.SPLITTING);
4064 
4065       // The below is for testing ONLY!  We can't do fault injection easily, so
4066       // resort to this kinda uglyness -- St.Ack 02/25/2011.
4067       if (TEST_SKIP_SPLIT_HANDLING) {
4068         LOG.warn("Skipping split message, TEST_SKIP_SPLIT_HANDLING is set");
4069         return true; // return true so that the splitting node stays
4070       }
4071 
4072       if (et == EventType.RS_ZK_REGION_SPLIT) {
4073         regionOffline(p, State.SPLIT);
4074         regionOnline(hri_a, sn);
4075         regionOnline(hri_b, sn);
4076         splitRegions.remove(p);
4077       }
4078     }
4079 
4080     if (et == EventType.RS_ZK_REGION_SPLIT) {
4081       // split replicas
4082       doSplittingOfReplicas(rs_p.getRegion(), hri_a, hri_b);
4083       LOG.debug("Handling SPLIT event for " + encodedName + "; deleting node");
4084       // Remove region from ZK
4085       try {
4086         boolean successful = false;
4087         while (!successful) {
4088           // It's possible that the RS tickles in between the reading of the
4089           // znode and the deleting, so it's safe to retry.
4090           successful = ZKAssign.deleteNode(watcher, encodedName,
4091             EventType.RS_ZK_REGION_SPLIT, sn);
4092         }
4093       } catch (KeeperException e) {
4094         if (e instanceof NoNodeException) {
4095           String znodePath = ZKUtil.joinZNode(watcher.splitLogZNode, encodedName);
4096           LOG.debug("The znode " + znodePath + " does not exist.  May be deleted already.");
4097         } else {
4098           server.abort("Error deleting SPLIT node " + encodedName, e);
4099         }
4100       }
4101       LOG.info("Handled SPLIT event; parent=" + p.getRegionNameAsString()
4102         + ", daughter a=" + hri_a.getRegionNameAsString() + ", daughter b="
4103         + hri_b.getRegionNameAsString() + ", on " + sn);
4104 
4105       // User could disable the table before master knows the new region.
4106       if (tableStateManager.isTableState(p.getTable(),
4107           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
4108         unassign(hri_a);
4109         unassign(hri_b);
4110       }
4111     }
4112     return true;
4113   }
4114 
doMergingOfReplicas(HRegionInfo mergedHri, final HRegionInfo hri_a, final HRegionInfo hri_b)4115   private void doMergingOfReplicas(HRegionInfo mergedHri, final HRegionInfo hri_a,
4116       final HRegionInfo hri_b) {
4117     // Close replicas for the original unmerged regions. create/assign new replicas
4118     // for the merged parent.
4119     List<HRegionInfo> unmergedRegions = new ArrayList<HRegionInfo>();
4120     unmergedRegions.add(hri_a);
4121     unmergedRegions.add(hri_b);
4122     Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(unmergedRegions);
4123     Collection<List<HRegionInfo>> c = map.values();
4124     for (List<HRegionInfo> l : c) {
4125       for (HRegionInfo h : l) {
4126         if (!RegionReplicaUtil.isDefaultReplica(h)) {
4127           LOG.debug("Unassigning un-merged replica " + h);
4128           unassign(h);
4129         }
4130       }
4131     }
4132     int numReplicas = 1;
4133     try {
4134       numReplicas = server.getTableDescriptors().get(mergedHri.getTable()).
4135           getRegionReplication();
4136     } catch (IOException e) {
4137       LOG.warn("Couldn't get the replication attribute of the table " + mergedHri.getTable() +
4138           " due to " + e.getMessage() + ". The assignment of replicas for the merged region " +
4139           "will not be done");
4140     }
4141     List<HRegionInfo> regions = new ArrayList<HRegionInfo>();
4142     for (int i = 1; i < numReplicas; i++) {
4143       regions.add(RegionReplicaUtil.getRegionInfoForReplica(mergedHri, i));
4144     }
4145     try {
4146       assign(regions);
4147     } catch (IOException ioe) {
4148       LOG.warn("Couldn't assign all replica(s) of region " + mergedHri + " because of " +
4149                 ioe.getMessage());
4150     } catch (InterruptedException ie) {
4151       LOG.warn("Couldn't assign all replica(s) of region " + mergedHri+ " because of " +
4152                 ie.getMessage());
4153     }
4154   }
4155 
doSplittingOfReplicas(final HRegionInfo parentHri, final HRegionInfo hri_a, final HRegionInfo hri_b)4156   private void doSplittingOfReplicas(final HRegionInfo parentHri, final HRegionInfo hri_a,
4157       final HRegionInfo hri_b) {
4158     // create new regions for the replica, and assign them to match with the
4159     // current replica assignments. If replica1 of parent is assigned to RS1,
4160     // the replica1s of daughters will be on the same machine
4161     int numReplicas = 1;
4162     try {
4163       numReplicas = server.getTableDescriptors().get(parentHri.getTable()).
4164           getRegionReplication();
4165     } catch (IOException e) {
4166       LOG.warn("Couldn't get the replication attribute of the table " + parentHri.getTable() +
4167           " due to " + e.getMessage() + ". The assignment of daughter replicas " +
4168           "replicas will not be done");
4169     }
4170     // unassign the old replicas
4171     List<HRegionInfo> parentRegion = new ArrayList<HRegionInfo>();
4172     parentRegion.add(parentHri);
4173     Map<ServerName, List<HRegionInfo>> currentAssign =
4174         regionStates.getRegionAssignments(parentRegion);
4175     Collection<List<HRegionInfo>> c = currentAssign.values();
4176     for (List<HRegionInfo> l : c) {
4177       for (HRegionInfo h : l) {
4178         if (!RegionReplicaUtil.isDefaultReplica(h)) {
4179           LOG.debug("Unassigning parent's replica " + h);
4180           unassign(h);
4181         }
4182       }
4183     }
4184     // assign daughter replicas
4185     Map<HRegionInfo, ServerName> map = new HashMap<HRegionInfo, ServerName>();
4186     for (int i = 1; i < numReplicas; i++) {
4187       prepareDaughterReplicaForAssignment(hri_a, parentHri, i, map);
4188       prepareDaughterReplicaForAssignment(hri_b, parentHri, i, map);
4189     }
4190     try {
4191       assign(map);
4192     } catch (IOException e) {
4193       LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
4194     } catch (InterruptedException e) {
4195       LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
4196     }
4197   }
4198 
prepareDaughterReplicaForAssignment(HRegionInfo daughterHri, HRegionInfo parentHri, int replicaId, Map<HRegionInfo, ServerName> map)4199   private void prepareDaughterReplicaForAssignment(HRegionInfo daughterHri, HRegionInfo parentHri,
4200       int replicaId, Map<HRegionInfo, ServerName> map) {
4201     HRegionInfo parentReplica = RegionReplicaUtil.getRegionInfoForReplica(parentHri, replicaId);
4202     HRegionInfo daughterReplica = RegionReplicaUtil.getRegionInfoForReplica(daughterHri,
4203         replicaId);
4204     LOG.debug("Created replica region for daughter " + daughterReplica);
4205     ServerName sn;
4206     if ((sn = regionStates.getRegionServerOfRegion(parentReplica)) != null) {
4207       map.put(daughterReplica, sn);
4208     } else {
4209       List<ServerName> servers = serverManager.getOnlineServersList();
4210       sn = servers.get((new Random(System.currentTimeMillis())).nextInt(servers.size()));
4211       map.put(daughterReplica, sn);
4212     }
4213   }
4214 
getReplicasToClose()4215   public Set<HRegionInfo> getReplicasToClose() {
4216     return replicasToClose;
4217   }
4218 
4219   /**
4220    * A region is offline.  The new state should be the specified one,
4221    * if not null.  If the specified state is null, the new state is Offline.
4222    * The specified state can be Split/Merged/Offline/null only.
4223    */
regionOffline(final HRegionInfo regionInfo, final State state)4224   private void regionOffline(final HRegionInfo regionInfo, final State state) {
4225     regionStates.regionOffline(regionInfo, state);
4226     removeClosedRegion(regionInfo);
4227     // remove the region plan as well just in case.
4228     clearRegionPlan(regionInfo);
4229     balancer.regionOffline(regionInfo);
4230 
4231     // Tell our listeners that a region was closed
4232     sendRegionClosedNotification(regionInfo);
4233     // also note that all the replicas of the primary should be closed
4234     if (state != null && state.equals(State.SPLIT)) {
4235       Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
4236       c.add(regionInfo);
4237       Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c);
4238       Collection<List<HRegionInfo>> allReplicas = map.values();
4239       for (List<HRegionInfo> list : allReplicas) {
4240         replicasToClose.addAll(list);
4241       }
4242     }
4243     else if (state != null && state.equals(State.MERGED)) {
4244       Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
4245       c.add(regionInfo);
4246       Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c);
4247       Collection<List<HRegionInfo>> allReplicas = map.values();
4248       for (List<HRegionInfo> list : allReplicas) {
4249         replicasToClose.addAll(list);
4250       }
4251     }
4252   }
4253 
sendRegionOpenedNotification(final HRegionInfo regionInfo, final ServerName serverName)4254   private void sendRegionOpenedNotification(final HRegionInfo regionInfo,
4255       final ServerName serverName) {
4256     if (!this.listeners.isEmpty()) {
4257       for (AssignmentListener listener : this.listeners) {
4258         listener.regionOpened(regionInfo, serverName);
4259       }
4260     }
4261   }
4262 
sendRegionClosedNotification(final HRegionInfo regionInfo)4263   private void sendRegionClosedNotification(final HRegionInfo regionInfo) {
4264     if (!this.listeners.isEmpty()) {
4265       for (AssignmentListener listener : this.listeners) {
4266         listener.regionClosed(regionInfo);
4267       }
4268     }
4269   }
4270 
4271   /**
4272    * Try to update some region states. If the state machine prevents
4273    * such update, an error message is returned to explain the reason.
4274    *
4275    * It's expected that in each transition there should have just one
4276    * region for opening/closing, 3 regions for splitting/merging.
4277    * These regions should be on the server that requested the change.
4278    *
4279    * Region state machine. Only these transitions
4280    * are expected to be triggered by a region server.
4281    *
4282    * On the state transition:
4283    *  (1) Open/Close should be initiated by master
4284    *      (a) Master sets the region to pending_open/pending_close
4285    *        in memory and hbase:meta after sending the request
4286    *        to the region server
4287    *      (b) Region server reports back to the master
4288    *        after open/close is done (either success/failure)
4289    *      (c) If region server has problem to report the status
4290    *        to master, it must be because the master is down or some
4291    *        temporary network issue. Otherwise, the region server should
4292    *        abort since it must be a bug. If the master is not accessible,
4293    *        the region server should keep trying until the server is
4294    *        stopped or till the status is reported to the (new) master
4295    *      (d) If region server dies in the middle of opening/closing
4296    *        a region, SSH picks it up and finishes it
4297    *      (e) If master dies in the middle, the new master recovers
4298    *        the state during initialization from hbase:meta. Region server
4299    *        can report any transition that has not been reported to
4300    *        the previous active master yet
4301    *  (2) Split/merge is initiated by region servers
4302    *      (a) To split a region, a region server sends a request
4303    *        to master to try to set a region to splitting, together with
4304    *        two daughters (to be created) to splitting new. If approved
4305    *        by the master, the splitting can then move ahead
4306    *      (b) To merge two regions, a region server sends a request to
4307    *        master to try to set the new merged region (to be created) to
4308    *        merging_new, together with two regions (to be merged) to merging.
4309    *        If it is ok with the master, the merge can then move ahead
4310    *      (c) Once the splitting/merging is done, the region server
4311    *        reports the status back to the master either success/failure.
4312    *      (d) Other scenarios should be handled similarly as for
4313    *        region open/close
4314    */
onRegionTransition(final ServerName serverName, final RegionStateTransition transition)4315   protected String onRegionTransition(final ServerName serverName,
4316       final RegionStateTransition transition) {
4317     TransitionCode code = transition.getTransitionCode();
4318     HRegionInfo hri = HRegionInfo.convert(transition.getRegionInfo(0));
4319     RegionState current = regionStates.getRegionState(hri);
4320     if (LOG.isDebugEnabled()) {
4321       LOG.debug("Got transition " + code + " for "
4322         + (current != null ? current.toString() : hri.getShortNameToLog())
4323         + " from " + serverName);
4324     }
4325     String errorMsg = null;
4326     switch (code) {
4327     case OPENED:
4328       if (current != null && current.isOpened() && current.isOnServer(serverName)) {
4329         LOG.info("Region " + hri.getShortNameToLog() + " is already " + current.getState() + " on "
4330             + serverName);
4331         break;
4332       }
4333     case FAILED_OPEN:
4334       if (current == null
4335           || !current.isPendingOpenOrOpeningOnServer(serverName)) {
4336         errorMsg = hri.getShortNameToLog()
4337           + " is not pending open on " + serverName;
4338       } else if (code == TransitionCode.FAILED_OPEN) {
4339         onRegionFailedOpen(hri, serverName);
4340       } else {
4341         long openSeqNum = HConstants.NO_SEQNUM;
4342         if (transition.hasOpenSeqNum()) {
4343           openSeqNum = transition.getOpenSeqNum();
4344         }
4345         if (openSeqNum < 0) {
4346           errorMsg = "Newly opened region has invalid open seq num " + openSeqNum;
4347         } else {
4348           onRegionOpen(hri, serverName, openSeqNum);
4349         }
4350       }
4351       break;
4352 
4353     case CLOSED:
4354       if (current == null
4355           || !current.isPendingCloseOrClosingOnServer(serverName)) {
4356         errorMsg = hri.getShortNameToLog()
4357           + " is not pending close on " + serverName;
4358       } else {
4359         onRegionClosed(hri);
4360       }
4361       break;
4362 
4363     case READY_TO_SPLIT:
4364       try {
4365         regionStateListener.onRegionSplit(hri);
4366       } catch (IOException exp) {
4367         errorMsg = StringUtils.stringifyException(exp);
4368       }
4369       break;
4370     case SPLIT_PONR:
4371     case SPLIT:
4372       errorMsg =
4373       onRegionSplit(serverName, code, hri, HRegionInfo.convert(transition.getRegionInfo(1)),
4374         HRegionInfo.convert(transition.getRegionInfo(2)));
4375       break;
4376 
4377     case SPLIT_REVERTED:
4378       errorMsg =
4379           onRegionSplitReverted(serverName, hri,
4380             HRegionInfo.convert(transition.getRegionInfo(1)),
4381             HRegionInfo.convert(transition.getRegionInfo(2)));
4382       if (org.apache.commons.lang.StringUtils.isEmpty(errorMsg)) {
4383         try {
4384           regionStateListener.onRegionSplitReverted(hri);
4385         } catch (IOException exp) {
4386           LOG.warn(StringUtils.stringifyException(exp));
4387         }
4388       }
4389       break;
4390     case READY_TO_MERGE:
4391     case MERGE_PONR:
4392     case MERGED:
4393     case MERGE_REVERTED:
4394       errorMsg = onRegionMerge(serverName, code, hri,
4395         HRegionInfo.convert(transition.getRegionInfo(1)),
4396         HRegionInfo.convert(transition.getRegionInfo(2)));
4397       if (code == TransitionCode.MERGED && org.apache.commons.lang.StringUtils.isEmpty(errorMsg)) {
4398         try {
4399           regionStateListener.onRegionMerged(hri);
4400         } catch (IOException exp) {
4401           errorMsg = StringUtils.stringifyException(exp);
4402         }
4403       }
4404       break;
4405 
4406     default:
4407       errorMsg = "Unexpected transition code " + code;
4408     }
4409     if (errorMsg != null) {
4410       LOG.error("Failed to transtion region from " + current + " to "
4411         + code + " by " + serverName + ": " + errorMsg);
4412     }
4413     return errorMsg;
4414   }
4415 
4416   /**
4417    * @return Instance of load balancer
4418    */
getBalancer()4419   public LoadBalancer getBalancer() {
4420     return this.balancer;
4421   }
4422 
4423   public Map<ServerName, List<HRegionInfo>>
getSnapShotOfAssignment(Collection<HRegionInfo> infos)4424     getSnapShotOfAssignment(Collection<HRegionInfo> infos) {
4425     return getRegionStates().getRegionAssignments(infos);
4426   }
4427 
setRegionStateListener(RegionStateListener listener)4428   void setRegionStateListener(RegionStateListener listener) {
4429     this.regionStateListener = listener;
4430   }
4431 }
4432