1 /** 2 * 3 * Licensed to the Apache Software Foundation (ASF) under one 4 * or more contributor license agreements. See the NOTICE file 5 * distributed with this work for additional information 6 * regarding copyright ownership. The ASF licenses this file 7 * to you under the Apache License, Version 2.0 (the 8 * "License"); you may not use this file except in compliance 9 * with the License. You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software 14 * distributed under the License is distributed on an "AS IS" BASIS, 15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 16 * See the License for the specific language governing permissions and 17 * limitations under the License. 18 */ 19 package org.apache.hadoop.hbase.zookeeper; 20 21 import java.io.Closeable; 22 import java.io.IOException; 23 import java.util.ArrayList; 24 import java.util.HashMap; 25 import java.util.List; 26 import java.util.Map; 27 import java.util.concurrent.CopyOnWriteArrayList; 28 import java.util.concurrent.CountDownLatch; 29 import java.util.regex.Matcher; 30 import java.util.regex.Pattern; 31 32 import org.apache.commons.logging.Log; 33 import org.apache.commons.logging.LogFactory; 34 import org.apache.hadoop.hbase.classification.InterfaceAudience; 35 import org.apache.hadoop.conf.Configuration; 36 import org.apache.hadoop.hbase.Abortable; 37 import org.apache.hadoop.hbase.AuthUtil; 38 import org.apache.hadoop.hbase.HConstants; 39 import org.apache.hadoop.hbase.HRegionInfo; 40 import org.apache.hadoop.hbase.ZooKeeperConnectionException; 41 import org.apache.hadoop.hbase.classification.InterfaceAudience; 42 import org.apache.hadoop.hbase.security.Superusers; 43 import org.apache.hadoop.security.UserGroupInformation; 44 import org.apache.zookeeper.KeeperException; 45 import org.apache.zookeeper.WatchedEvent; 46 import org.apache.zookeeper.Watcher; 47 import org.apache.zookeeper.ZooDefs; 48 import org.apache.zookeeper.ZooDefs.Ids; 49 import org.apache.zookeeper.ZooDefs.Perms; 50 import org.apache.zookeeper.data.ACL; 51 import org.apache.zookeeper.data.Id; 52 import org.apache.zookeeper.data.Stat; 53 54 /** 55 * Acts as the single ZooKeeper Watcher. One instance of this is instantiated 56 * for each Master, RegionServer, and client process. 57 * 58 * <p>This is the only class that implements {@link Watcher}. Other internal 59 * classes which need to be notified of ZooKeeper events must register with 60 * the local instance of this watcher via {@link #registerListener}. 61 * 62 * <p>This class also holds and manages the connection to ZooKeeper. Code to 63 * deal with connection related events and exceptions are handled here. 64 */ 65 @InterfaceAudience.Private 66 public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { 67 private static final Log LOG = LogFactory.getLog(ZooKeeperWatcher.class); 68 69 // Identifier for this watcher (for logging only). It is made of the prefix 70 // passed on construction and the zookeeper sessionid. 71 private String prefix; 72 private String identifier; 73 74 // zookeeper quorum 75 private String quorum; 76 77 // zookeeper connection 78 private RecoverableZooKeeper recoverableZooKeeper; 79 80 // abortable in case of zk failure 81 protected Abortable abortable; 82 // Used if abortable is null 83 private boolean aborted = false; 84 85 // listeners to be notified 86 private final List<ZooKeeperListener> listeners = 87 new CopyOnWriteArrayList<ZooKeeperListener>(); 88 89 // Used by ZKUtil:waitForZKConnectionIfAuthenticating to wait for SASL 90 // negotiation to complete 91 public CountDownLatch saslLatch = new CountDownLatch(1); 92 93 // node names 94 95 // base znode for this cluster 96 public String baseZNode; 97 //znodes containing the locations of the servers hosting the meta replicas 98 private Map<Integer,String> metaReplicaZnodes = new HashMap<Integer, String>(); 99 // znode containing ephemeral nodes of the regionservers 100 public String rsZNode; 101 // znode containing ephemeral nodes of the draining regionservers 102 public String drainingZNode; 103 // znode of currently active master 104 private String masterAddressZNode; 105 // znode of this master in backup master directory, if not the active master 106 public String backupMasterAddressesZNode; 107 // znode containing the current cluster state 108 public String clusterStateZNode; 109 // znode used for region transitioning and assignment 110 public String assignmentZNode; 111 // znode used for table disabling/enabling 112 public String tableZNode; 113 // znode containing the unique cluster ID 114 public String clusterIdZNode; 115 // znode used for log splitting work assignment 116 public String splitLogZNode; 117 // znode containing the state of the load balancer 118 public String balancerZNode; 119 // znode containing the state of region normalizer 120 private String regionNormalizerZNode; 121 // znode containing the lock for the tables 122 public String tableLockZNode; 123 // znode containing the state of recovering regions 124 public String recoveringRegionsZNode; 125 // znode containing namespace descriptors 126 public static String namespaceZNode = "namespace"; 127 128 // Certain ZooKeeper nodes need to be world-readable 129 public static final ArrayList<ACL> CREATOR_ALL_AND_WORLD_READABLE = 130 new ArrayList<ACL>() { { 131 add(new ACL(ZooDefs.Perms.READ,ZooDefs.Ids.ANYONE_ID_UNSAFE)); 132 add(new ACL(ZooDefs.Perms.ALL,ZooDefs.Ids.AUTH_IDS)); 133 }}; 134 135 public final static String META_ZNODE_PREFIX = "meta-region-server"; 136 137 private final Configuration conf; 138 139 private final Exception constructorCaller; 140 141 /* A pattern that matches a Kerberos name, borrowed from Hadoop's KerberosName */ 142 private static final Pattern NAME_PATTERN = Pattern.compile("([^/@]*)(/([^/@]*))?@([^/@]*)"); 143 144 /** 145 * Instantiate a ZooKeeper connection and watcher. 146 * @param identifier string that is passed to RecoverableZookeeper to be used as 147 * identifier for this instance. Use null for default. 148 * @throws IOException 149 * @throws ZooKeeperConnectionException 150 */ ZooKeeperWatcher(Configuration conf, String identifier, Abortable abortable)151 public ZooKeeperWatcher(Configuration conf, String identifier, 152 Abortable abortable) throws ZooKeeperConnectionException, IOException { 153 this(conf, identifier, abortable, false); 154 } 155 156 /** 157 * Instantiate a ZooKeeper connection and watcher. 158 * @param conf 159 * @param identifier string that is passed to RecoverableZookeeper to be used as identifier for 160 * this instance. Use null for default. 161 * @param abortable Can be null if there is on error there is no host to abort: e.g. client 162 * context. 163 * @param canCreateBaseZNode 164 * @throws IOException 165 * @throws ZooKeeperConnectionException 166 */ ZooKeeperWatcher(Configuration conf, String identifier, Abortable abortable, boolean canCreateBaseZNode)167 public ZooKeeperWatcher(Configuration conf, String identifier, 168 Abortable abortable, boolean canCreateBaseZNode) 169 throws IOException, ZooKeeperConnectionException { 170 this.conf = conf; 171 // Capture a stack trace now. Will print it out later if problem so we can 172 // distingush amongst the myriad ZKWs. 173 try { 174 throw new Exception("ZKW CONSTRUCTOR STACK TRACE FOR DEBUGGING"); 175 } catch (Exception e) { 176 this.constructorCaller = e; 177 } 178 this.quorum = ZKConfig.getZKQuorumServersString(conf); 179 this.prefix = identifier; 180 // Identifier will get the sessionid appended later below down when we 181 // handle the syncconnect event. 182 this.identifier = identifier + "0x0"; 183 this.abortable = abortable; 184 setNodeNames(conf); 185 this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, this, identifier); 186 if (canCreateBaseZNode) { 187 createBaseZNodes(); 188 } 189 } 190 createBaseZNodes()191 private void createBaseZNodes() throws ZooKeeperConnectionException { 192 try { 193 // Create all the necessary "directories" of znodes 194 ZKUtil.createWithParents(this, baseZNode); 195 if (conf.getBoolean("hbase.assignment.usezk", true)) { 196 ZKUtil.createAndFailSilent(this, assignmentZNode); 197 } 198 ZKUtil.createAndFailSilent(this, rsZNode); 199 ZKUtil.createAndFailSilent(this, drainingZNode); 200 ZKUtil.createAndFailSilent(this, tableZNode); 201 ZKUtil.createAndFailSilent(this, splitLogZNode); 202 ZKUtil.createAndFailSilent(this, backupMasterAddressesZNode); 203 ZKUtil.createAndFailSilent(this, tableLockZNode); 204 ZKUtil.createAndFailSilent(this, recoveringRegionsZNode); 205 } catch (KeeperException e) { 206 throw new ZooKeeperConnectionException( 207 prefix("Unexpected KeeperException creating base node"), e); 208 } 209 } 210 211 /** Returns whether the znode is supposed to be readable by the client 212 * and DOES NOT contain sensitive information (world readable).*/ isClientReadable(String node)213 public boolean isClientReadable(String node) { 214 // Developer notice: These znodes are world readable. DO NOT add more znodes here UNLESS 215 // all clients need to access this data to work. Using zk for sharing data to clients (other 216 // than service lookup case is not a recommended design pattern. 217 return 218 node.equals(baseZNode) || 219 isAnyMetaReplicaZnode(node) || 220 node.equals(getMasterAddressZNode()) || 221 node.equals(clusterIdZNode)|| 222 node.equals(rsZNode) || 223 // /hbase/table and /hbase/table/foo is allowed, /hbase/table-lock is not 224 node.equals(tableZNode) || 225 node.startsWith(tableZNode + "/"); 226 } 227 228 /** 229 * On master start, we check the znode ACLs under the root directory and set the ACLs properly 230 * if needed. If the cluster goes from an unsecure setup to a secure setup, this step is needed 231 * so that the existing znodes created with open permissions are now changed with restrictive 232 * perms. 233 */ checkAndSetZNodeAcls()234 public void checkAndSetZNodeAcls() { 235 if (!ZKUtil.isSecureZooKeeper(getConfiguration())) { 236 LOG.info("not a secure deployment, proceeding"); 237 return; 238 } 239 240 // Check the base znodes permission first. Only do the recursion if base znode's perms are not 241 // correct. 242 try { 243 List<ACL> actualAcls = recoverableZooKeeper.getAcl(baseZNode, new Stat()); 244 245 if (!isBaseZnodeAclSetup(actualAcls)) { 246 LOG.info("setting znode ACLs"); 247 setZnodeAclsRecursive(baseZNode); 248 } 249 } catch(KeeperException.NoNodeException nne) { 250 return; 251 } catch(InterruptedException ie) { 252 interruptedException(ie); 253 } catch (IOException|KeeperException e) { 254 LOG.warn("Received exception while checking and setting zookeeper ACLs", e); 255 } 256 } 257 258 /** 259 * Set the znode perms recursively. This will do post-order recursion, so that baseZnode ACLs 260 * will be set last in case the master fails in between. 261 * @param znode 262 */ setZnodeAclsRecursive(String znode)263 private void setZnodeAclsRecursive(String znode) throws KeeperException, InterruptedException { 264 List<String> children = recoverableZooKeeper.getChildren(znode, false); 265 266 for (String child : children) { 267 setZnodeAclsRecursive(ZKUtil.joinZNode(znode, child)); 268 } 269 List<ACL> acls = ZKUtil.createACL(this, znode, true); 270 LOG.info("Setting ACLs for znode:" + znode + " , acl:" + acls); 271 recoverableZooKeeper.setAcl(znode, acls, -1); 272 } 273 274 /** 275 * Checks whether the ACLs returned from the base znode (/hbase) is set for secure setup. 276 * @param acls acls from zookeeper 277 * @return whether ACLs are set for the base znode 278 * @throws IOException 279 */ isBaseZnodeAclSetup(List<ACL> acls)280 private boolean isBaseZnodeAclSetup(List<ACL> acls) throws IOException { 281 if (LOG.isDebugEnabled()) { 282 LOG.debug("Checking znode ACLs"); 283 } 284 String[] superUsers = conf.getStrings(Superusers.SUPERUSER_CONF_KEY); 285 // Check whether ACL set for all superusers 286 if (superUsers != null && !checkACLForSuperUsers(superUsers, acls)) { 287 return false; 288 } 289 290 // this assumes that current authenticated user is the same as zookeeper client user 291 // configured via JAAS 292 String hbaseUser = UserGroupInformation.getCurrentUser().getShortUserName(); 293 294 if (acls.isEmpty()) { 295 if (LOG.isDebugEnabled()) { 296 LOG.debug("ACL is empty"); 297 } 298 return false; 299 } 300 301 for (ACL acl : acls) { 302 int perms = acl.getPerms(); 303 Id id = acl.getId(); 304 // We should only set at most 3 possible ACLs for 3 Ids. One for everyone, one for superuser 305 // and one for the hbase user 306 if (Ids.ANYONE_ID_UNSAFE.equals(id)) { 307 if (perms != Perms.READ) { 308 if (LOG.isDebugEnabled()) { 309 LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x", 310 id, perms, Perms.READ)); 311 } 312 return false; 313 } 314 } else if (superUsers != null && isSuperUserId(superUsers, id)) { 315 if (perms != Perms.ALL) { 316 if (LOG.isDebugEnabled()) { 317 LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x", 318 id, perms, Perms.ALL)); 319 } 320 return false; 321 } 322 } else if ("sasl".equals(id.getScheme())) { 323 String name = id.getId(); 324 // If ZooKeeper recorded the Kerberos full name in the ACL, use only the shortname 325 Matcher match = NAME_PATTERN.matcher(name); 326 if (match.matches()) { 327 name = match.group(1); 328 } 329 if (name.equals(hbaseUser)) { 330 if (perms != Perms.ALL) { 331 if (LOG.isDebugEnabled()) { 332 LOG.debug(String.format("permissions for '%s' are not correct: have 0x%x, want 0x%x", 333 id, perms, Perms.ALL)); 334 } 335 return false; 336 } 337 } else { 338 if (LOG.isDebugEnabled()) { 339 LOG.debug("Unexpected shortname in SASL ACL: " + id); 340 } 341 return false; 342 } 343 } else { 344 if (LOG.isDebugEnabled()) { 345 LOG.debug("unexpected ACL id '" + id + "'"); 346 } 347 return false; 348 } 349 } 350 return true; 351 } 352 353 /* 354 * Validate whether ACL set for all superusers. 355 */ checkACLForSuperUsers(String[] superUsers, List<ACL> acls)356 private boolean checkACLForSuperUsers(String[] superUsers, List<ACL> acls) { 357 for (String user : superUsers) { 358 boolean hasAccess = false; 359 // TODO: Validate super group members also when ZK supports setting node ACL for groups. 360 if (!user.startsWith(AuthUtil.GROUP_PREFIX)) { 361 for (ACL acl : acls) { 362 if (user.equals(acl.getId().getId())) { 363 if (acl.getPerms() == Perms.ALL) { 364 hasAccess = true; 365 } else { 366 if (LOG.isDebugEnabled()) { 367 LOG.debug(String.format( 368 "superuser '%s' does not have correct permissions: have 0x%x, want 0x%x", 369 acl.getId().getId(), acl.getPerms(), Perms.ALL)); 370 } 371 } 372 break; 373 } 374 } 375 if (!hasAccess) { 376 return false; 377 } 378 } 379 } 380 return true; 381 } 382 383 /* 384 * Validate whether ACL ID is superuser. 385 */ isSuperUserId(String[] superUsers, Id id)386 public static boolean isSuperUserId(String[] superUsers, Id id) { 387 for (String user : superUsers) { 388 // TODO: Validate super group members also when ZK supports setting node ACL for groups. 389 if (!user.startsWith(AuthUtil.GROUP_PREFIX) && new Id("sasl", user).equals(id)) { 390 return true; 391 } 392 } 393 return false; 394 } 395 396 @Override toString()397 public String toString() { 398 return this.identifier + ", quorum=" + quorum + ", baseZNode=" + baseZNode; 399 } 400 401 /** 402 * Adds this instance's identifier as a prefix to the passed <code>str</code> 403 * @param str String to amend. 404 * @return A new string with this instance's identifier as prefix: e.g. 405 * if passed 'hello world', the returned string could be 406 */ prefix(final String str)407 public String prefix(final String str) { 408 return this.toString() + " " + str; 409 } 410 411 /** 412 * Set the local variable node names using the specified configuration. 413 */ setNodeNames(Configuration conf)414 private void setNodeNames(Configuration conf) { 415 baseZNode = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, 416 HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); 417 metaReplicaZnodes.put(0, ZKUtil.joinZNode(baseZNode, 418 conf.get("zookeeper.znode.metaserver", "meta-region-server"))); 419 int numMetaReplicas = conf.getInt(HConstants.META_REPLICAS_NUM, 420 HConstants.DEFAULT_META_REPLICA_NUM); 421 for (int i = 1; i < numMetaReplicas; i++) { 422 String str = ZKUtil.joinZNode(baseZNode, 423 conf.get("zookeeper.znode.metaserver", "meta-region-server") + "-" + i); 424 metaReplicaZnodes.put(i, str); 425 } 426 rsZNode = ZKUtil.joinZNode(baseZNode, 427 conf.get("zookeeper.znode.rs", "rs")); 428 drainingZNode = ZKUtil.joinZNode(baseZNode, 429 conf.get("zookeeper.znode.draining.rs", "draining")); 430 masterAddressZNode = ZKUtil.joinZNode(baseZNode, 431 conf.get("zookeeper.znode.master", "master")); 432 backupMasterAddressesZNode = ZKUtil.joinZNode(baseZNode, 433 conf.get("zookeeper.znode.backup.masters", "backup-masters")); 434 clusterStateZNode = ZKUtil.joinZNode(baseZNode, 435 conf.get("zookeeper.znode.state", "running")); 436 assignmentZNode = ZKUtil.joinZNode(baseZNode, 437 conf.get("zookeeper.znode.unassigned", "region-in-transition")); 438 tableZNode = ZKUtil.joinZNode(baseZNode, 439 conf.get("zookeeper.znode.tableEnableDisable", "table")); 440 clusterIdZNode = ZKUtil.joinZNode(baseZNode, 441 conf.get("zookeeper.znode.clusterId", "hbaseid")); 442 splitLogZNode = ZKUtil.joinZNode(baseZNode, 443 conf.get("zookeeper.znode.splitlog", HConstants.SPLIT_LOGDIR_NAME)); 444 balancerZNode = ZKUtil.joinZNode(baseZNode, 445 conf.get("zookeeper.znode.balancer", "balancer")); 446 regionNormalizerZNode = ZKUtil.joinZNode(baseZNode, 447 conf.get("zookeeper.znode.regionNormalizer", "normalizer")); 448 tableLockZNode = ZKUtil.joinZNode(baseZNode, 449 conf.get("zookeeper.znode.tableLock", "table-lock")); 450 recoveringRegionsZNode = ZKUtil.joinZNode(baseZNode, 451 conf.get("zookeeper.znode.recovering.regions", "recovering-regions")); 452 namespaceZNode = ZKUtil.joinZNode(baseZNode, 453 conf.get("zookeeper.znode.namespace", "namespace")); 454 } 455 456 /** 457 * Is the znode of any meta replica 458 * @param node 459 * @return true or false 460 */ isAnyMetaReplicaZnode(String node)461 public boolean isAnyMetaReplicaZnode(String node) { 462 if (metaReplicaZnodes.values().contains(node)) { 463 return true; 464 } 465 return false; 466 } 467 468 /** 469 * Is it the default meta replica's znode 470 * @param node 471 * @return true or false 472 */ isDefaultMetaReplicaZnode(String node)473 public boolean isDefaultMetaReplicaZnode(String node) { 474 if (getZNodeForReplica(HRegionInfo.DEFAULT_REPLICA_ID).equals(node)) { 475 return true; 476 } 477 return false; 478 } 479 480 /** 481 * Get the znodes corresponding to the meta replicas from ZK 482 * @return list of znodes 483 * @throws KeeperException 484 */ getMetaReplicaNodes()485 public List<String> getMetaReplicaNodes() throws KeeperException { 486 List<String> childrenOfBaseNode = ZKUtil.listChildrenNoWatch(this, baseZNode); 487 List<String> metaReplicaNodes = new ArrayList<String>(2); 488 String pattern = conf.get("zookeeper.znode.metaserver","meta-region-server"); 489 for (String child : childrenOfBaseNode) { 490 if (child.startsWith(pattern)) metaReplicaNodes.add(child); 491 } 492 return metaReplicaNodes; 493 } 494 495 /** 496 * Get the znode string corresponding to a replicaId 497 * @param replicaId 498 * @return znode 499 */ getZNodeForReplica(int replicaId)500 public String getZNodeForReplica(int replicaId) { 501 String str = metaReplicaZnodes.get(replicaId); 502 // return a newly created path but don't update the cache of paths 503 // This is mostly needed for tests that attempt to create meta replicas 504 // from outside the master 505 if (str == null) { 506 str = ZKUtil.joinZNode(baseZNode, 507 conf.get("zookeeper.znode.metaserver", "meta-region-server") + "-" + replicaId); 508 } 509 return str; 510 } 511 512 /** 513 * Parse the meta replicaId from the passed znode 514 * @param znode 515 * @return replicaId 516 */ getMetaReplicaIdFromZnode(String znode)517 public int getMetaReplicaIdFromZnode(String znode) { 518 String pattern = conf.get("zookeeper.znode.metaserver","meta-region-server"); 519 if (znode.equals(pattern)) return HRegionInfo.DEFAULT_REPLICA_ID; 520 // the non-default replicas are of the pattern meta-region-server-<replicaId> 521 String nonDefaultPattern = pattern + "-"; 522 return Integer.parseInt(znode.substring(nonDefaultPattern.length())); 523 } 524 525 /** 526 * Register the specified listener to receive ZooKeeper events. 527 * @param listener 528 */ registerListener(ZooKeeperListener listener)529 public void registerListener(ZooKeeperListener listener) { 530 listeners.add(listener); 531 } 532 533 /** 534 * Register the specified listener to receive ZooKeeper events and add it as 535 * the first in the list of current listeners. 536 * @param listener 537 */ registerListenerFirst(ZooKeeperListener listener)538 public void registerListenerFirst(ZooKeeperListener listener) { 539 listeners.add(0, listener); 540 } 541 unregisterListener(ZooKeeperListener listener)542 public void unregisterListener(ZooKeeperListener listener) { 543 listeners.remove(listener); 544 } 545 546 /** 547 * Clean all existing listeners 548 */ unregisterAllListeners()549 public void unregisterAllListeners() { 550 listeners.clear(); 551 } 552 553 /** 554 * Get a copy of current registered listeners 555 */ getListeners()556 public List<ZooKeeperListener> getListeners() { 557 return new ArrayList<ZooKeeperListener>(listeners); 558 } 559 560 /** 561 * @return The number of currently registered listeners 562 */ getNumberOfListeners()563 public int getNumberOfListeners() { 564 return listeners.size(); 565 } 566 567 /** 568 * Get the connection to ZooKeeper. 569 * @return connection reference to zookeeper 570 */ getRecoverableZooKeeper()571 public RecoverableZooKeeper getRecoverableZooKeeper() { 572 return recoverableZooKeeper; 573 } 574 reconnectAfterExpiration()575 public void reconnectAfterExpiration() throws IOException, KeeperException, InterruptedException { 576 recoverableZooKeeper.reconnectAfterExpiration(); 577 } 578 579 /** 580 * Get the quorum address of this instance. 581 * @return quorum string of this zookeeper connection instance 582 */ getQuorum()583 public String getQuorum() { 584 return quorum; 585 } 586 587 /** 588 * @return the base znode of this zookeeper connection instance. 589 */ getBaseZNode()590 public String getBaseZNode() { 591 return baseZNode; 592 } 593 594 /** 595 * Method called from ZooKeeper for events and connection status. 596 * <p> 597 * Valid events are passed along to listeners. Connection status changes 598 * are dealt with locally. 599 */ 600 @Override process(WatchedEvent event)601 public void process(WatchedEvent event) { 602 LOG.debug(prefix("Received ZooKeeper Event, " + 603 "type=" + event.getType() + ", " + 604 "state=" + event.getState() + ", " + 605 "path=" + event.getPath())); 606 607 switch(event.getType()) { 608 609 // If event type is NONE, this is a connection status change 610 case None: { 611 connectionEvent(event); 612 break; 613 } 614 615 // Otherwise pass along to the listeners 616 617 case NodeCreated: { 618 for(ZooKeeperListener listener : listeners) { 619 listener.nodeCreated(event.getPath()); 620 } 621 break; 622 } 623 624 case NodeDeleted: { 625 for(ZooKeeperListener listener : listeners) { 626 listener.nodeDeleted(event.getPath()); 627 } 628 break; 629 } 630 631 case NodeDataChanged: { 632 for(ZooKeeperListener listener : listeners) { 633 listener.nodeDataChanged(event.getPath()); 634 } 635 break; 636 } 637 638 case NodeChildrenChanged: { 639 for(ZooKeeperListener listener : listeners) { 640 listener.nodeChildrenChanged(event.getPath()); 641 } 642 break; 643 } 644 } 645 } 646 647 // Connection management 648 649 /** 650 * Called when there is a connection-related event via the Watcher callback. 651 * <p> 652 * If Disconnected or Expired, this should shutdown the cluster. But, since 653 * we send a KeeperException.SessionExpiredException along with the abort 654 * call, it's possible for the Abortable to catch it and try to create a new 655 * session with ZooKeeper. This is what the client does in HCM. 656 * <p> 657 * @param event 658 */ connectionEvent(WatchedEvent event)659 private void connectionEvent(WatchedEvent event) { 660 switch(event.getState()) { 661 case SyncConnected: 662 // Now, this callback can be invoked before the this.zookeeper is set. 663 // Wait a little while. 664 long finished = System.currentTimeMillis() + 665 this.conf.getLong("hbase.zookeeper.watcher.sync.connected.wait", 2000); 666 while (System.currentTimeMillis() < finished) { 667 try { 668 Thread.sleep(1); 669 } catch (InterruptedException e) { 670 LOG.warn("Interrupted while sleeping"); 671 throw new RuntimeException("Interrupted while waiting for" + 672 " recoverableZooKeeper is set"); 673 } 674 if (this.recoverableZooKeeper != null) break; 675 } 676 677 if (this.recoverableZooKeeper == null) { 678 LOG.error("ZK is null on connection event -- see stack trace " + 679 "for the stack trace when constructor was called on this zkw", 680 this.constructorCaller); 681 throw new NullPointerException("ZK is null"); 682 } 683 this.identifier = this.prefix + "-0x" + 684 Long.toHexString(this.recoverableZooKeeper.getSessionId()); 685 // Update our identifier. Otherwise ignore. 686 LOG.debug(this.identifier + " connected"); 687 break; 688 689 // Abort the server if Disconnected or Expired 690 case Disconnected: 691 LOG.debug(prefix("Received Disconnected from ZooKeeper, ignoring")); 692 break; 693 694 case Expired: 695 String msg = prefix(this.identifier + " received expired from " + 696 "ZooKeeper, aborting"); 697 // TODO: One thought is to add call to ZooKeeperListener so say, 698 // ZooKeeperNodeTracker can zero out its data values. 699 if (this.abortable != null) { 700 this.abortable.abort(msg, new KeeperException.SessionExpiredException()); 701 } 702 break; 703 704 case ConnectedReadOnly: 705 case SaslAuthenticated: 706 case AuthFailed: 707 break; 708 709 default: 710 throw new IllegalStateException("Received event is not valid: " + event.getState()); 711 } 712 } 713 714 /** 715 * Forces a synchronization of this ZooKeeper client connection. 716 * <p> 717 * Executing this method before running other methods will ensure that the 718 * subsequent operations are up-to-date and consistent as of the time that 719 * the sync is complete. 720 * <p> 721 * This is used for compareAndSwap type operations where we need to read the 722 * data of an existing node and delete or transition that node, utilizing the 723 * previously read version and data. We want to ensure that the version read 724 * is up-to-date from when we begin the operation. 725 */ sync(String path)726 public void sync(String path) throws KeeperException { 727 this.recoverableZooKeeper.sync(path, null, null); 728 } 729 730 /** 731 * Handles KeeperExceptions in client calls. 732 * <p> 733 * This may be temporary but for now this gives one place to deal with these. 734 * <p> 735 * TODO: Currently this method rethrows the exception to let the caller handle 736 * <p> 737 * @param ke 738 * @throws KeeperException 739 */ keeperException(KeeperException ke)740 public void keeperException(KeeperException ke) 741 throws KeeperException { 742 LOG.error(prefix("Received unexpected KeeperException, re-throwing exception"), ke); 743 throw ke; 744 } 745 746 /** 747 * Handles InterruptedExceptions in client calls. 748 * <p> 749 * This may be temporary but for now this gives one place to deal with these. 750 * <p> 751 * TODO: Currently, this method does nothing. 752 * Is this ever expected to happen? Do we abort or can we let it run? 753 * Maybe this should be logged as WARN? It shouldn't happen? 754 * <p> 755 * @param ie 756 */ interruptedException(InterruptedException ie)757 public void interruptedException(InterruptedException ie) { 758 LOG.debug(prefix("Received InterruptedException, doing nothing here"), ie); 759 // At least preserver interrupt. 760 Thread.currentThread().interrupt(); 761 // no-op 762 } 763 764 /** 765 * Close the connection to ZooKeeper. 766 * 767 */ 768 @Override close()769 public void close() { 770 try { 771 if (recoverableZooKeeper != null) { 772 recoverableZooKeeper.close(); 773 } 774 } catch (InterruptedException e) { 775 Thread.currentThread().interrupt(); 776 } 777 } 778 getConfiguration()779 public Configuration getConfiguration() { 780 return conf; 781 } 782 783 @Override abort(String why, Throwable e)784 public void abort(String why, Throwable e) { 785 if (this.abortable != null) this.abortable.abort(why, e); 786 else this.aborted = true; 787 } 788 789 @Override isAborted()790 public boolean isAborted() { 791 return this.abortable == null? this.aborted: this.abortable.isAborted(); 792 } 793 794 /** 795 * @return Path to the currently active master. 796 */ getMasterAddressZNode()797 public String getMasterAddressZNode() { 798 return this.masterAddressZNode; 799 } 800 801 /** 802 * @return ZooKeeper znode for region normalizer state 803 */ getRegionNormalizerZNode()804 public String getRegionNormalizerZNode() { 805 return regionNormalizerZNode; 806 } 807 } 808