1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one
4  * or more contributor license agreements.  See the NOTICE file
5  * distributed with this work for additional information
6  * regarding copyright ownership.  The ASF licenses this file
7  * to you under the Apache License, Version 2.0 (the
8  * "License"); you may not use this file except in compliance
9  * with the License.  You may obtain a copy of the License at
10  *
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19 package org.apache.hadoop.hbase.zookeeper;
20 
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.regex.Matcher;
30 import java.util.regex.Pattern;
31 
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.hbase.classification.InterfaceAudience;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.hbase.Abortable;
37 import org.apache.hadoop.hbase.AuthUtil;
38 import org.apache.hadoop.hbase.HConstants;
39 import org.apache.hadoop.hbase.HRegionInfo;
40 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
41 import org.apache.hadoop.hbase.classification.InterfaceAudience;
42 import org.apache.hadoop.hbase.security.Superusers;
43 import org.apache.hadoop.security.UserGroupInformation;
44 import org.apache.zookeeper.KeeperException;
45 import org.apache.zookeeper.WatchedEvent;
46 import org.apache.zookeeper.Watcher;
47 import org.apache.zookeeper.ZooDefs;
48 import org.apache.zookeeper.ZooDefs.Ids;
49 import org.apache.zookeeper.ZooDefs.Perms;
50 import org.apache.zookeeper.data.ACL;
51 import org.apache.zookeeper.data.Id;
52 import org.apache.zookeeper.data.Stat;
53 
54 /**
55  * Acts as the single ZooKeeper Watcher.  One instance of this is instantiated
56  * for each Master, RegionServer, and client process.
57  *
58  * <p>This is the only class that implements {@link Watcher}.  Other internal
59  * classes which need to be notified of ZooKeeper events must register with
60  * the local instance of this watcher via {@link #registerListener}.
61  *
62  * <p>This class also holds and manages the connection to ZooKeeper.  Code to
63  * deal with connection related events and exceptions are handled here.
64  */
65 @InterfaceAudience.Private
66 public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
67   private static final Log LOG = LogFactory.getLog(ZooKeeperWatcher.class);
68 
69   // Identifier for this watcher (for logging only).  It is made of the prefix
70   // passed on construction and the zookeeper sessionid.
71   private String prefix;
72   private String identifier;
73 
74   // zookeeper quorum
75   private String quorum;
76 
77   // zookeeper connection
78   private RecoverableZooKeeper recoverableZooKeeper;
79 
80   // abortable in case of zk failure
81   protected Abortable abortable;
82   // Used if abortable is null
83   private boolean aborted = false;
84 
85   // listeners to be notified
86   private final List<ZooKeeperListener> listeners =
87     new CopyOnWriteArrayList<ZooKeeperListener>();
88 
89   // Used by ZKUtil:waitForZKConnectionIfAuthenticating to wait for SASL
90   // negotiation to complete
91   public CountDownLatch saslLatch = new CountDownLatch(1);
92 
93   // node names
94 
95   // base znode for this cluster
96   public String baseZNode;
97   //znodes containing the locations of the servers hosting the meta replicas
98   private Map<Integer,String> metaReplicaZnodes = new HashMap<Integer, String>();
99   // znode containing ephemeral nodes of the regionservers
100   public String rsZNode;
101   // znode containing ephemeral nodes of the draining regionservers
102   public String drainingZNode;
103   // znode of currently active master
104   private String masterAddressZNode;
105   // znode of this master in backup master directory, if not the active master
106   public String backupMasterAddressesZNode;
107   // znode containing the current cluster state
108   public String clusterStateZNode;
109   // znode used for region transitioning and assignment
110   public String assignmentZNode;
111   // znode used for table disabling/enabling
112   public String tableZNode;
113   // znode containing the unique cluster ID
114   public String clusterIdZNode;
115   // znode used for log splitting work assignment
116   public String splitLogZNode;
117   // znode containing the state of the load balancer
118   public String balancerZNode;
119   // znode containing the state of region normalizer
120   private String regionNormalizerZNode;
121   // znode containing the lock for the tables
122   public String tableLockZNode;
123   // znode containing the state of recovering regions
124   public String recoveringRegionsZNode;
125   // znode containing namespace descriptors
126   public static String namespaceZNode = "namespace";
127 
128   // Certain ZooKeeper nodes need to be world-readable
129   public static final ArrayList<ACL> CREATOR_ALL_AND_WORLD_READABLE =
130     new ArrayList<ACL>() { {
131       add(new ACL(ZooDefs.Perms.READ,ZooDefs.Ids.ANYONE_ID_UNSAFE));
132       add(new ACL(ZooDefs.Perms.ALL,ZooDefs.Ids.AUTH_IDS));
133     }};
134 
135   public final static String META_ZNODE_PREFIX = "meta-region-server";
136 
137   private final Configuration conf;
138 
139   private final Exception constructorCaller;
140 
141   /* A pattern that matches a Kerberos name, borrowed from Hadoop's KerberosName */
142   private static final Pattern NAME_PATTERN = Pattern.compile("([^/@]*)(/([^/@]*))?@([^/@]*)");
143 
144   /**
145    * Instantiate a ZooKeeper connection and watcher.
146    * @param identifier string that is passed to RecoverableZookeeper to be used as
147    * identifier for this instance. Use null for default.
148    * @throws IOException
149    * @throws ZooKeeperConnectionException
150    */
ZooKeeperWatcher(Configuration conf, String identifier, Abortable abortable)151   public ZooKeeperWatcher(Configuration conf, String identifier,
152       Abortable abortable) throws ZooKeeperConnectionException, IOException {
153     this(conf, identifier, abortable, false);
154   }
155 
156   /**
157    * Instantiate a ZooKeeper connection and watcher.
158    * @param conf
159    * @param identifier string that is passed to RecoverableZookeeper to be used as identifier for
160    *          this instance. Use null for default.
161    * @param abortable Can be null if there is on error there is no host to abort: e.g. client
162    *          context.
163    * @param canCreateBaseZNode
164    * @throws IOException
165    * @throws ZooKeeperConnectionException
166    */
ZooKeeperWatcher(Configuration conf, String identifier, Abortable abortable, boolean canCreateBaseZNode)167   public ZooKeeperWatcher(Configuration conf, String identifier,
168       Abortable abortable, boolean canCreateBaseZNode)
169   throws IOException, ZooKeeperConnectionException {
170     this.conf = conf;
171     // Capture a stack trace now.  Will print it out later if problem so we can
172     // distingush amongst the myriad ZKWs.
173     try {
174       throw new Exception("ZKW CONSTRUCTOR STACK TRACE FOR DEBUGGING");
175     } catch (Exception e) {
176       this.constructorCaller = e;
177     }
178     this.quorum = ZKConfig.getZKQuorumServersString(conf);
179     this.prefix = identifier;
180     // Identifier will get the sessionid appended later below down when we
181     // handle the syncconnect event.
182     this.identifier = identifier + "0x0";
183     this.abortable = abortable;
184     setNodeNames(conf);
185     this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, this, identifier);
186     if (canCreateBaseZNode) {
187       createBaseZNodes();
188     }
189   }
190 
createBaseZNodes()191   private void createBaseZNodes() throws ZooKeeperConnectionException {
192     try {
193       // Create all the necessary "directories" of znodes
194       ZKUtil.createWithParents(this, baseZNode);
195       if (conf.getBoolean("hbase.assignment.usezk", true)) {
196         ZKUtil.createAndFailSilent(this, assignmentZNode);
197       }
198       ZKUtil.createAndFailSilent(this, rsZNode);
199       ZKUtil.createAndFailSilent(this, drainingZNode);
200       ZKUtil.createAndFailSilent(this, tableZNode);
201       ZKUtil.createAndFailSilent(this, splitLogZNode);
202       ZKUtil.createAndFailSilent(this, backupMasterAddressesZNode);
203       ZKUtil.createAndFailSilent(this, tableLockZNode);
204       ZKUtil.createAndFailSilent(this, recoveringRegionsZNode);
205     } catch (KeeperException e) {
206       throw new ZooKeeperConnectionException(
207           prefix("Unexpected KeeperException creating base node"), e);
208     }
209   }
210 
211   /** Returns whether the znode is supposed to be readable by the client
212    * and DOES NOT contain sensitive information (world readable).*/
isClientReadable(String node)213   public boolean isClientReadable(String node) {
214     // Developer notice: These znodes are world readable. DO NOT add more znodes here UNLESS
215     // all clients need to access this data to work. Using zk for sharing data to clients (other
216     // than service lookup case is not a recommended design pattern.
217     return
218         node.equals(baseZNode) ||
219         isAnyMetaReplicaZnode(node) ||
220         node.equals(getMasterAddressZNode()) ||
221         node.equals(clusterIdZNode)||
222         node.equals(rsZNode) ||
223         // /hbase/table and /hbase/table/foo is allowed, /hbase/table-lock is not
224         node.equals(tableZNode) ||
225         node.startsWith(tableZNode + "/");
226   }
227 
228   /**
229    * On master start, we check the znode ACLs under the root directory and set the ACLs properly
230    * if needed. If the cluster goes from an unsecure setup to a secure setup, this step is needed
231    * so that the existing znodes created with open permissions are now changed with restrictive
232    * perms.
233    */
checkAndSetZNodeAcls()234   public void checkAndSetZNodeAcls() {
235     if (!ZKUtil.isSecureZooKeeper(getConfiguration())) {
236       LOG.info("not a secure deployment, proceeding");
237       return;
238     }
239 
240     // Check the base znodes permission first. Only do the recursion if base znode's perms are not
241     // correct.
242     try {
243       List<ACL> actualAcls = recoverableZooKeeper.getAcl(baseZNode, new Stat());
244 
245       if (!isBaseZnodeAclSetup(actualAcls)) {
246         LOG.info("setting znode ACLs");
247         setZnodeAclsRecursive(baseZNode);
248       }
249     } catch(KeeperException.NoNodeException nne) {
250       return;
251     } catch(InterruptedException ie) {
252       interruptedException(ie);
253     } catch (IOException|KeeperException e) {
254       LOG.warn("Received exception while checking and setting zookeeper ACLs", e);
255     }
256   }
257 
258   /**
259    * Set the znode perms recursively. This will do post-order recursion, so that baseZnode ACLs
260    * will be set last in case the master fails in between.
261    * @param znode
262    */
setZnodeAclsRecursive(String znode)263   private void setZnodeAclsRecursive(String znode) throws KeeperException, InterruptedException {
264     List<String> children = recoverableZooKeeper.getChildren(znode, false);
265 
266     for (String child : children) {
267       setZnodeAclsRecursive(ZKUtil.joinZNode(znode, child));
268     }
269     List<ACL> acls = ZKUtil.createACL(this, znode, true);
270     LOG.info("Setting ACLs for znode:" + znode + " , acl:" + acls);
271     recoverableZooKeeper.setAcl(znode, acls, -1);
272   }
273 
274   /**
275    * Checks whether the ACLs returned from the base znode (/hbase) is set for secure setup.
276    * @param acls acls from zookeeper
277    * @return whether ACLs are set for the base znode
278    * @throws IOException
279    */
isBaseZnodeAclSetup(List<ACL> acls)280   private boolean isBaseZnodeAclSetup(List<ACL> acls) throws IOException {
281     if (LOG.isDebugEnabled()) {
282       LOG.debug("Checking znode ACLs");
283     }
284     String[] superUsers = conf.getStrings(Superusers.SUPERUSER_CONF_KEY);
285     // Check whether ACL set for all superusers
286     if (superUsers != null && !checkACLForSuperUsers(superUsers, acls)) {
287       return false;
288     }
289 
290     // this assumes that current authenticated user is the same as zookeeper client user
291     // configured via JAAS
292     String hbaseUser = UserGroupInformation.getCurrentUser().getShortUserName();
293 
294     if (acls.isEmpty()) {
295       if (LOG.isDebugEnabled()) {
296         LOG.debug("ACL is empty");
297       }
298       return false;
299     }
300 
301     for (ACL acl : acls) {
302       int perms = acl.getPerms();
303       Id id = acl.getId();
304       // We should only set at most 3 possible ACLs for 3 Ids. One for everyone, one for superuser
305       // and one for the hbase user
306       if (Ids.ANYONE_ID_UNSAFE.equals(id)) {
307         if (perms != Perms.READ) {
308           if (LOG.isDebugEnabled()) {
309             LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x",
310               id, perms, Perms.READ));
311           }
312           return false;
313         }
314       } else if (superUsers != null && isSuperUserId(superUsers, id)) {
315         if (perms != Perms.ALL) {
316           if (LOG.isDebugEnabled()) {
317             LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x",
318               id, perms, Perms.ALL));
319           }
320           return false;
321         }
322       } else if ("sasl".equals(id.getScheme())) {
323         String name = id.getId();
324         // If ZooKeeper recorded the Kerberos full name in the ACL, use only the shortname
325         Matcher match = NAME_PATTERN.matcher(name);
326         if (match.matches()) {
327           name = match.group(1);
328         }
329         if (name.equals(hbaseUser)) {
330           if (perms != Perms.ALL) {
331             if (LOG.isDebugEnabled()) {
332               LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x",
333                 id, perms, Perms.ALL));
334             }
335             return false;
336           }
337         } else {
338           if (LOG.isDebugEnabled()) {
339             LOG.debug("Unexpected shortname in SASL ACL: " + id);
340           }
341           return false;
342         }
343       } else {
344         if (LOG.isDebugEnabled()) {
345           LOG.debug("unexpected ACL id '" + id + "'");
346         }
347         return false;
348       }
349     }
350     return true;
351   }
352 
353   /*
354    * Validate whether ACL set for all superusers.
355    */
checkACLForSuperUsers(String[] superUsers, List<ACL> acls)356   private boolean checkACLForSuperUsers(String[] superUsers, List<ACL> acls) {
357     for (String user : superUsers) {
358       boolean hasAccess = false;
359       // TODO: Validate super group members also when ZK supports setting node ACL for groups.
360       if (!user.startsWith(AuthUtil.GROUP_PREFIX)) {
361         for (ACL acl : acls) {
362           if (user.equals(acl.getId().getId())) {
363             if (acl.getPerms() == Perms.ALL) {
364               hasAccess = true;
365             } else {
366               if (LOG.isDebugEnabled()) {
367                 LOG.debug(String.format(
368                   "superuser '%s' does not have correct permissions: have 0x%x, want 0x%x",
369                   acl.getId().getId(), acl.getPerms(), Perms.ALL));
370               }
371             }
372             break;
373           }
374         }
375         if (!hasAccess) {
376           return false;
377         }
378       }
379     }
380     return true;
381   }
382 
383   /*
384    * Validate whether ACL ID is superuser.
385    */
isSuperUserId(String[] superUsers, Id id)386   public static boolean isSuperUserId(String[] superUsers, Id id) {
387     for (String user : superUsers) {
388       // TODO: Validate super group members also when ZK supports setting node ACL for groups.
389       if (!user.startsWith(AuthUtil.GROUP_PREFIX) && new Id("sasl", user).equals(id)) {
390         return true;
391       }
392     }
393     return false;
394   }
395 
396   @Override
toString()397   public String toString() {
398     return this.identifier + ", quorum=" + quorum + ", baseZNode=" + baseZNode;
399   }
400 
401   /**
402    * Adds this instance's identifier as a prefix to the passed <code>str</code>
403    * @param str String to amend.
404    * @return A new string with this instance's identifier as prefix: e.g.
405    * if passed 'hello world', the returned string could be
406    */
prefix(final String str)407   public String prefix(final String str) {
408     return this.toString() + " " + str;
409   }
410 
411   /**
412    * Set the local variable node names using the specified configuration.
413    */
setNodeNames(Configuration conf)414   private void setNodeNames(Configuration conf) {
415     baseZNode = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
416         HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
417     metaReplicaZnodes.put(0, ZKUtil.joinZNode(baseZNode,
418            conf.get("zookeeper.znode.metaserver", "meta-region-server")));
419     int numMetaReplicas = conf.getInt(HConstants.META_REPLICAS_NUM,
420             HConstants.DEFAULT_META_REPLICA_NUM);
421     for (int i = 1; i < numMetaReplicas; i++) {
422       String str = ZKUtil.joinZNode(baseZNode,
423         conf.get("zookeeper.znode.metaserver", "meta-region-server") + "-" + i);
424       metaReplicaZnodes.put(i, str);
425     }
426     rsZNode = ZKUtil.joinZNode(baseZNode,
427         conf.get("zookeeper.znode.rs", "rs"));
428     drainingZNode = ZKUtil.joinZNode(baseZNode,
429         conf.get("zookeeper.znode.draining.rs", "draining"));
430     masterAddressZNode = ZKUtil.joinZNode(baseZNode,
431         conf.get("zookeeper.znode.master", "master"));
432     backupMasterAddressesZNode = ZKUtil.joinZNode(baseZNode,
433         conf.get("zookeeper.znode.backup.masters", "backup-masters"));
434     clusterStateZNode = ZKUtil.joinZNode(baseZNode,
435         conf.get("zookeeper.znode.state", "running"));
436     assignmentZNode = ZKUtil.joinZNode(baseZNode,
437         conf.get("zookeeper.znode.unassigned", "region-in-transition"));
438     tableZNode = ZKUtil.joinZNode(baseZNode,
439         conf.get("zookeeper.znode.tableEnableDisable", "table"));
440     clusterIdZNode = ZKUtil.joinZNode(baseZNode,
441         conf.get("zookeeper.znode.clusterId", "hbaseid"));
442     splitLogZNode = ZKUtil.joinZNode(baseZNode,
443         conf.get("zookeeper.znode.splitlog", HConstants.SPLIT_LOGDIR_NAME));
444     balancerZNode = ZKUtil.joinZNode(baseZNode,
445         conf.get("zookeeper.znode.balancer", "balancer"));
446     regionNormalizerZNode = ZKUtil.joinZNode(baseZNode,
447       conf.get("zookeeper.znode.regionNormalizer", "normalizer"));
448     tableLockZNode = ZKUtil.joinZNode(baseZNode,
449         conf.get("zookeeper.znode.tableLock", "table-lock"));
450     recoveringRegionsZNode = ZKUtil.joinZNode(baseZNode,
451         conf.get("zookeeper.znode.recovering.regions", "recovering-regions"));
452     namespaceZNode = ZKUtil.joinZNode(baseZNode,
453         conf.get("zookeeper.znode.namespace", "namespace"));
454   }
455 
456   /**
457    * Is the znode of any meta replica
458    * @param node
459    * @return true or false
460    */
isAnyMetaReplicaZnode(String node)461   public boolean isAnyMetaReplicaZnode(String node) {
462     if (metaReplicaZnodes.values().contains(node)) {
463       return true;
464     }
465     return false;
466   }
467 
468   /**
469    * Is it the default meta replica's znode
470    * @param node
471    * @return true or false
472    */
isDefaultMetaReplicaZnode(String node)473   public boolean isDefaultMetaReplicaZnode(String node) {
474     if (getZNodeForReplica(HRegionInfo.DEFAULT_REPLICA_ID).equals(node)) {
475       return true;
476     }
477     return false;
478   }
479 
480   /**
481    * Get the znodes corresponding to the meta replicas from ZK
482    * @return list of znodes
483    * @throws KeeperException
484    */
getMetaReplicaNodes()485   public List<String> getMetaReplicaNodes() throws KeeperException {
486     List<String> childrenOfBaseNode = ZKUtil.listChildrenNoWatch(this, baseZNode);
487     List<String> metaReplicaNodes = new ArrayList<String>(2);
488     String pattern = conf.get("zookeeper.znode.metaserver","meta-region-server");
489     for (String child : childrenOfBaseNode) {
490       if (child.startsWith(pattern)) metaReplicaNodes.add(child);
491     }
492     return metaReplicaNodes;
493   }
494 
495   /**
496    * Get the znode string corresponding to a replicaId
497    * @param replicaId
498    * @return znode
499    */
getZNodeForReplica(int replicaId)500   public String getZNodeForReplica(int replicaId) {
501     String str = metaReplicaZnodes.get(replicaId);
502     // return a newly created path but don't update the cache of paths
503     // This is mostly needed for tests that attempt to create meta replicas
504     // from outside the master
505     if (str == null) {
506       str = ZKUtil.joinZNode(baseZNode,
507           conf.get("zookeeper.znode.metaserver", "meta-region-server") + "-" + replicaId);
508     }
509     return str;
510   }
511 
512   /**
513    * Parse the meta replicaId from the passed znode
514    * @param znode
515    * @return replicaId
516    */
getMetaReplicaIdFromZnode(String znode)517   public int getMetaReplicaIdFromZnode(String znode) {
518     String pattern = conf.get("zookeeper.znode.metaserver","meta-region-server");
519     if (znode.equals(pattern)) return HRegionInfo.DEFAULT_REPLICA_ID;
520     // the non-default replicas are of the pattern meta-region-server-<replicaId>
521     String nonDefaultPattern = pattern + "-";
522     return Integer.parseInt(znode.substring(nonDefaultPattern.length()));
523   }
524 
525   /**
526    * Register the specified listener to receive ZooKeeper events.
527    * @param listener
528    */
registerListener(ZooKeeperListener listener)529   public void registerListener(ZooKeeperListener listener) {
530     listeners.add(listener);
531   }
532 
533   /**
534    * Register the specified listener to receive ZooKeeper events and add it as
535    * the first in the list of current listeners.
536    * @param listener
537    */
registerListenerFirst(ZooKeeperListener listener)538   public void registerListenerFirst(ZooKeeperListener listener) {
539     listeners.add(0, listener);
540   }
541 
unregisterListener(ZooKeeperListener listener)542   public void unregisterListener(ZooKeeperListener listener) {
543     listeners.remove(listener);
544   }
545 
546   /**
547    * Clean all existing listeners
548    */
unregisterAllListeners()549   public void unregisterAllListeners() {
550     listeners.clear();
551   }
552 
553   /**
554    * Get a copy of current registered listeners
555    */
getListeners()556   public List<ZooKeeperListener> getListeners() {
557     return new ArrayList<ZooKeeperListener>(listeners);
558   }
559 
560   /**
561    * @return The number of currently registered listeners
562    */
getNumberOfListeners()563   public int getNumberOfListeners() {
564     return listeners.size();
565   }
566 
567   /**
568    * Get the connection to ZooKeeper.
569    * @return connection reference to zookeeper
570    */
getRecoverableZooKeeper()571   public RecoverableZooKeeper getRecoverableZooKeeper() {
572     return recoverableZooKeeper;
573   }
574 
reconnectAfterExpiration()575   public void reconnectAfterExpiration() throws IOException, KeeperException, InterruptedException {
576     recoverableZooKeeper.reconnectAfterExpiration();
577   }
578 
579   /**
580    * Get the quorum address of this instance.
581    * @return quorum string of this zookeeper connection instance
582    */
getQuorum()583   public String getQuorum() {
584     return quorum;
585   }
586 
587   /**
588    * @return the base znode of this zookeeper connection instance.
589    */
getBaseZNode()590   public String getBaseZNode() {
591     return baseZNode;
592   }
593 
594   /**
595    * Method called from ZooKeeper for events and connection status.
596    * <p>
597    * Valid events are passed along to listeners.  Connection status changes
598    * are dealt with locally.
599    */
600   @Override
process(WatchedEvent event)601   public void process(WatchedEvent event) {
602     LOG.debug(prefix("Received ZooKeeper Event, " +
603         "type=" + event.getType() + ", " +
604         "state=" + event.getState() + ", " +
605         "path=" + event.getPath()));
606 
607     switch(event.getType()) {
608 
609       // If event type is NONE, this is a connection status change
610       case None: {
611         connectionEvent(event);
612         break;
613       }
614 
615       // Otherwise pass along to the listeners
616 
617       case NodeCreated: {
618         for(ZooKeeperListener listener : listeners) {
619           listener.nodeCreated(event.getPath());
620         }
621         break;
622       }
623 
624       case NodeDeleted: {
625         for(ZooKeeperListener listener : listeners) {
626           listener.nodeDeleted(event.getPath());
627         }
628         break;
629       }
630 
631       case NodeDataChanged: {
632         for(ZooKeeperListener listener : listeners) {
633           listener.nodeDataChanged(event.getPath());
634         }
635         break;
636       }
637 
638       case NodeChildrenChanged: {
639         for(ZooKeeperListener listener : listeners) {
640           listener.nodeChildrenChanged(event.getPath());
641         }
642         break;
643       }
644     }
645   }
646 
647   // Connection management
648 
649   /**
650    * Called when there is a connection-related event via the Watcher callback.
651    * <p>
652    * If Disconnected or Expired, this should shutdown the cluster. But, since
653    * we send a KeeperException.SessionExpiredException along with the abort
654    * call, it's possible for the Abortable to catch it and try to create a new
655    * session with ZooKeeper. This is what the client does in HCM.
656    * <p>
657    * @param event
658    */
connectionEvent(WatchedEvent event)659   private void connectionEvent(WatchedEvent event) {
660     switch(event.getState()) {
661       case SyncConnected:
662         // Now, this callback can be invoked before the this.zookeeper is set.
663         // Wait a little while.
664         long finished = System.currentTimeMillis() +
665           this.conf.getLong("hbase.zookeeper.watcher.sync.connected.wait", 2000);
666         while (System.currentTimeMillis() < finished) {
667           try {
668             Thread.sleep(1);
669           } catch (InterruptedException e) {
670             LOG.warn("Interrupted while sleeping");
671             throw new RuntimeException("Interrupted while waiting for" +
672                 " recoverableZooKeeper is set");
673           }
674           if (this.recoverableZooKeeper != null) break;
675         }
676 
677         if (this.recoverableZooKeeper == null) {
678           LOG.error("ZK is null on connection event -- see stack trace " +
679             "for the stack trace when constructor was called on this zkw",
680             this.constructorCaller);
681           throw new NullPointerException("ZK is null");
682         }
683         this.identifier = this.prefix + "-0x" +
684           Long.toHexString(this.recoverableZooKeeper.getSessionId());
685         // Update our identifier.  Otherwise ignore.
686         LOG.debug(this.identifier + " connected");
687         break;
688 
689       // Abort the server if Disconnected or Expired
690       case Disconnected:
691         LOG.debug(prefix("Received Disconnected from ZooKeeper, ignoring"));
692         break;
693 
694       case Expired:
695         String msg = prefix(this.identifier + " received expired from " +
696           "ZooKeeper, aborting");
697         // TODO: One thought is to add call to ZooKeeperListener so say,
698         // ZooKeeperNodeTracker can zero out its data values.
699         if (this.abortable != null) {
700           this.abortable.abort(msg, new KeeperException.SessionExpiredException());
701         }
702         break;
703 
704       case ConnectedReadOnly:
705       case SaslAuthenticated:
706       case AuthFailed:
707         break;
708 
709       default:
710         throw new IllegalStateException("Received event is not valid: " + event.getState());
711     }
712   }
713 
714   /**
715    * Forces a synchronization of this ZooKeeper client connection.
716    * <p>
717    * Executing this method before running other methods will ensure that the
718    * subsequent operations are up-to-date and consistent as of the time that
719    * the sync is complete.
720    * <p>
721    * This is used for compareAndSwap type operations where we need to read the
722    * data of an existing node and delete or transition that node, utilizing the
723    * previously read version and data.  We want to ensure that the version read
724    * is up-to-date from when we begin the operation.
725    */
sync(String path)726   public void sync(String path) throws KeeperException {
727     this.recoverableZooKeeper.sync(path, null, null);
728   }
729 
730   /**
731    * Handles KeeperExceptions in client calls.
732    * <p>
733    * This may be temporary but for now this gives one place to deal with these.
734    * <p>
735    * TODO: Currently this method rethrows the exception to let the caller handle
736    * <p>
737    * @param ke
738    * @throws KeeperException
739    */
keeperException(KeeperException ke)740   public void keeperException(KeeperException ke)
741   throws KeeperException {
742     LOG.error(prefix("Received unexpected KeeperException, re-throwing exception"), ke);
743     throw ke;
744   }
745 
746   /**
747    * Handles InterruptedExceptions in client calls.
748    * <p>
749    * This may be temporary but for now this gives one place to deal with these.
750    * <p>
751    * TODO: Currently, this method does nothing.
752    *       Is this ever expected to happen?  Do we abort or can we let it run?
753    *       Maybe this should be logged as WARN?  It shouldn't happen?
754    * <p>
755    * @param ie
756    */
interruptedException(InterruptedException ie)757   public void interruptedException(InterruptedException ie) {
758     LOG.debug(prefix("Received InterruptedException, doing nothing here"), ie);
759     // At least preserver interrupt.
760     Thread.currentThread().interrupt();
761     // no-op
762   }
763 
764   /**
765    * Close the connection to ZooKeeper.
766    *
767    */
768   @Override
close()769   public void close() {
770     try {
771       if (recoverableZooKeeper != null) {
772         recoverableZooKeeper.close();
773       }
774     } catch (InterruptedException e) {
775       Thread.currentThread().interrupt();
776     }
777   }
778 
getConfiguration()779   public Configuration getConfiguration() {
780     return conf;
781   }
782 
783   @Override
abort(String why, Throwable e)784   public void abort(String why, Throwable e) {
785     if (this.abortable != null) this.abortable.abort(why, e);
786     else this.aborted = true;
787   }
788 
789   @Override
isAborted()790   public boolean isAborted() {
791     return this.abortable == null? this.aborted: this.abortable.isAborted();
792   }
793 
794   /**
795    * @return Path to the currently active master.
796    */
getMasterAddressZNode()797   public String getMasterAddressZNode() {
798     return this.masterAddressZNode;
799   }
800 
801   /**
802    * @return ZooKeeper znode for region normalizer state
803    */
getRegionNormalizerZNode()804   public String getRegionNormalizerZNode() {
805     return regionNormalizerZNode;
806   }
807 }
808