1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.zookeeper; 20 21 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; 22 import java.io.BufferedReader; 23 import java.io.ByteArrayOutputStream; 24 import java.io.IOException; 25 import java.io.InputStreamReader; 26 import java.net.ConnectException; 27 import java.net.InetSocketAddress; 28 import java.net.Socket; 29 import java.net.SocketAddress; 30 import java.nio.ByteBuffer; 31 import java.util.ArrayDeque; 32 import java.util.ArrayList; 33 import java.util.Collections; 34 import java.util.HashSet; 35 import java.util.Iterator; 36 import java.util.List; 37 import java.util.Map; 38 import java.util.Map.Entry; 39 import java.util.Queue; 40 import java.util.Set; 41 import java.util.concurrent.CopyOnWriteArraySet; 42 import java.util.concurrent.LinkedBlockingDeque; 43 import java.util.concurrent.LinkedBlockingQueue; 44 import java.util.concurrent.ThreadLocalRandom; 45 import javax.security.auth.login.LoginException; 46 import javax.security.sasl.SaslException; 47 import org.apache.jute.BinaryInputArchive; 48 import org.apache.jute.BinaryOutputArchive; 49 import org.apache.jute.Record; 50 import org.apache.zookeeper.AsyncCallback.ACLCallback; 51 import org.apache.zookeeper.AsyncCallback.AllChildrenNumberCallback; 52 import org.apache.zookeeper.AsyncCallback.Children2Callback; 53 import org.apache.zookeeper.AsyncCallback.ChildrenCallback; 54 import org.apache.zookeeper.AsyncCallback.Create2Callback; 55 import org.apache.zookeeper.AsyncCallback.DataCallback; 56 import org.apache.zookeeper.AsyncCallback.EphemeralsCallback; 57 import org.apache.zookeeper.AsyncCallback.MultiCallback; 58 import org.apache.zookeeper.AsyncCallback.StatCallback; 59 import org.apache.zookeeper.AsyncCallback.StringCallback; 60 import org.apache.zookeeper.AsyncCallback.VoidCallback; 61 import org.apache.zookeeper.KeeperException.Code; 62 import org.apache.zookeeper.OpResult.ErrorResult; 63 import org.apache.zookeeper.Watcher.Event; 64 import org.apache.zookeeper.Watcher.Event.EventType; 65 import org.apache.zookeeper.Watcher.Event.KeeperState; 66 import org.apache.zookeeper.ZooDefs.OpCode; 67 import org.apache.zookeeper.ZooKeeper.States; 68 import org.apache.zookeeper.ZooKeeper.WatchRegistration; 69 import org.apache.zookeeper.client.HostProvider; 70 import org.apache.zookeeper.client.ZKClientConfig; 71 import org.apache.zookeeper.client.ZooKeeperSaslClient; 72 import org.apache.zookeeper.common.Time; 73 import org.apache.zookeeper.proto.AuthPacket; 74 import org.apache.zookeeper.proto.ConnectRequest; 75 import org.apache.zookeeper.proto.Create2Response; 76 import org.apache.zookeeper.proto.CreateResponse; 77 import org.apache.zookeeper.proto.ExistsResponse; 78 import org.apache.zookeeper.proto.GetACLResponse; 79 import org.apache.zookeeper.proto.GetAllChildrenNumberResponse; 80 import org.apache.zookeeper.proto.GetChildren2Response; 81 import org.apache.zookeeper.proto.GetChildrenResponse; 82 import org.apache.zookeeper.proto.GetDataResponse; 83 import org.apache.zookeeper.proto.GetEphemeralsResponse; 84 import org.apache.zookeeper.proto.GetSASLRequest; 85 import org.apache.zookeeper.proto.ReplyHeader; 86 import org.apache.zookeeper.proto.RequestHeader; 87 import org.apache.zookeeper.proto.SetACLResponse; 88 import org.apache.zookeeper.proto.SetDataResponse; 89 import org.apache.zookeeper.proto.SetWatches; 90 import org.apache.zookeeper.proto.SetWatches2; 91 import org.apache.zookeeper.proto.WatcherEvent; 92 import org.apache.zookeeper.server.ByteBufferInputStream; 93 import org.apache.zookeeper.server.ZooKeeperThread; 94 import org.apache.zookeeper.server.ZooTrace; 95 import org.slf4j.Logger; 96 import org.slf4j.LoggerFactory; 97 import org.slf4j.MDC; 98 99 /** 100 * This class manages the socket i/o for the client. ClientCnxn maintains a list 101 * of available servers to connect to and "transparently" switches servers it is 102 * connected to as needed. 103 * 104 */ 105 @SuppressFBWarnings({"EI_EXPOSE_REP", "EI_EXPOSE_REP2"}) 106 public class ClientCnxn { 107 108 private static final Logger LOG = LoggerFactory.getLogger(ClientCnxn.class); 109 110 /* ZOOKEEPER-706: If a session has a large number of watches set then 111 * attempting to re-establish those watches after a connection loss may 112 * fail due to the SetWatches request exceeding the server's configured 113 * jute.maxBuffer value. To avoid this we instead split the watch 114 * re-establishement across multiple SetWatches calls. This constant 115 * controls the size of each call. It is set to 128kB to be conservative 116 * with respect to the server's 1MB default for jute.maxBuffer. 117 */ 118 private static final int SET_WATCHES_MAX_LENGTH = 128 * 1024; 119 120 /* predefined xid's values recognized as special by the server */ 121 // -1 means notification(WATCHER_EVENT) 122 public static final int NOTIFICATION_XID = -1; 123 // -2 is the xid for pings 124 public static final int PING_XID = -2; 125 // -4 is the xid for AuthPacket 126 public static final int AUTHPACKET_XID = -4; 127 // -8 is the xid for setWatch 128 public static final int SET_WATCHES_XID = -8; 129 130 static class AuthData { 131 AuthData(String scheme, byte[] data)132 AuthData(String scheme, byte[] data) { 133 this.scheme = scheme; 134 this.data = data; 135 } 136 137 String scheme; 138 139 byte[] data; 140 141 } 142 143 private final CopyOnWriteArraySet<AuthData> authInfo = new CopyOnWriteArraySet<AuthData>(); 144 145 /** 146 * These are the packets that have been sent and are waiting for a response. 147 */ 148 private final Queue<Packet> pendingQueue = new ArrayDeque<>(); 149 150 /** 151 * These are the packets that need to be sent. 152 */ 153 private final LinkedBlockingDeque<Packet> outgoingQueue = new LinkedBlockingDeque<Packet>(); 154 155 private int connectTimeout; 156 157 /** 158 * The timeout in ms the client negotiated with the server. This is the 159 * "real" timeout, not the timeout request by the client (which may have 160 * been increased/decreased by the server which applies bounds to this 161 * value. 162 */ 163 private volatile int negotiatedSessionTimeout; 164 165 private int readTimeout; 166 167 private final int sessionTimeout; 168 169 private final ZKWatchManager watchManager; 170 171 private long sessionId; 172 173 private byte[] sessionPasswd; 174 175 /** 176 * If true, the connection is allowed to go to r-o mode. This field's value 177 * is sent, besides other data, during session creation handshake. If the 178 * server on the other side of the wire is partitioned it'll accept 179 * read-only clients only. 180 */ 181 private boolean readOnly; 182 183 final String chrootPath; 184 185 final SendThread sendThread; 186 187 final EventThread eventThread; 188 189 /** 190 * Set to true when close is called. Latches the connection such that we 191 * don't attempt to re-connect to the server if in the middle of closing the 192 * connection (client sends session disconnect to server as part of close 193 * operation) 194 */ 195 private volatile boolean closing = false; 196 197 /** 198 * A set of ZooKeeper hosts this client could connect to. 199 */ 200 private final HostProvider hostProvider; 201 202 /** 203 * Is set to true when a connection to a r/w server is established for the 204 * first time; never changed afterwards. 205 * <p> 206 * Is used to handle situations when client without sessionId connects to a 207 * read-only server. Such client receives "fake" sessionId from read-only 208 * server, but this sessionId is invalid for other servers. So when such 209 * client finds a r/w server, it sends 0 instead of fake sessionId during 210 * connection handshake and establishes new, valid session. 211 * <p> 212 * If this field is false (which implies we haven't seen r/w server before) 213 * then non-zero sessionId is fake, otherwise it is valid. 214 */ 215 volatile boolean seenRwServerBefore = false; 216 217 public ZooKeeperSaslClient zooKeeperSaslClient; 218 219 private final ZKClientConfig clientConfig; 220 /** 221 * If any request's response in not received in configured requestTimeout 222 * then it is assumed that the response packet is lost. 223 */ 224 private long requestTimeout; 225 getWatcherManager()226 ZKWatchManager getWatcherManager() { 227 return watchManager; 228 } 229 getSessionId()230 public long getSessionId() { 231 return sessionId; 232 } 233 getSessionPasswd()234 public byte[] getSessionPasswd() { 235 return sessionPasswd; 236 } 237 getSessionTimeout()238 public int getSessionTimeout() { 239 return negotiatedSessionTimeout; 240 } 241 242 @Override toString()243 public String toString() { 244 StringBuilder sb = new StringBuilder(); 245 246 SocketAddress local = sendThread.getClientCnxnSocket().getLocalSocketAddress(); 247 SocketAddress remote = sendThread.getClientCnxnSocket().getRemoteSocketAddress(); 248 sb.append("sessionid:0x").append(Long.toHexString(getSessionId())) 249 .append(" local:").append(local) 250 .append(" remoteserver:").append(remote) 251 .append(" lastZxid:").append(lastZxid) 252 .append(" xid:").append(xid) 253 .append(" sent:").append(sendThread.getClientCnxnSocket().getSentCount()) 254 .append(" recv:").append(sendThread.getClientCnxnSocket().getRecvCount()) 255 .append(" queuedpkts:").append(outgoingQueue.size()) 256 .append(" pendingresp:").append(pendingQueue.size()) 257 .append(" queuedevents:").append(eventThread.waitingEvents.size()); 258 259 return sb.toString(); 260 } 261 262 /** 263 * This class allows us to pass the headers and the relevant records around. 264 */ 265 static class Packet { 266 267 RequestHeader requestHeader; 268 269 ReplyHeader replyHeader; 270 271 Record request; 272 273 Record response; 274 275 ByteBuffer bb; 276 277 /** Client's view of the path (may differ due to chroot) **/ 278 String clientPath; 279 /** Servers's view of the path (may differ due to chroot) **/ 280 String serverPath; 281 282 boolean finished; 283 284 AsyncCallback cb; 285 286 Object ctx; 287 288 WatchRegistration watchRegistration; 289 290 public boolean readOnly; 291 292 WatchDeregistration watchDeregistration; 293 294 /** Convenience ctor */ Packet( RequestHeader requestHeader, ReplyHeader replyHeader, Record request, Record response, WatchRegistration watchRegistration)295 Packet( 296 RequestHeader requestHeader, 297 ReplyHeader replyHeader, 298 Record request, 299 Record response, 300 WatchRegistration watchRegistration) { 301 this(requestHeader, replyHeader, request, response, watchRegistration, false); 302 } 303 Packet( RequestHeader requestHeader, ReplyHeader replyHeader, Record request, Record response, WatchRegistration watchRegistration, boolean readOnly)304 Packet( 305 RequestHeader requestHeader, 306 ReplyHeader replyHeader, 307 Record request, 308 Record response, 309 WatchRegistration watchRegistration, 310 boolean readOnly) { 311 312 this.requestHeader = requestHeader; 313 this.replyHeader = replyHeader; 314 this.request = request; 315 this.response = response; 316 this.readOnly = readOnly; 317 this.watchRegistration = watchRegistration; 318 } 319 createBB()320 public void createBB() { 321 try { 322 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 323 BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); 324 boa.writeInt(-1, "len"); // We'll fill this in later 325 if (requestHeader != null) { 326 requestHeader.serialize(boa, "header"); 327 } 328 if (request instanceof ConnectRequest) { 329 request.serialize(boa, "connect"); 330 // append "am-I-allowed-to-be-readonly" flag 331 boa.writeBool(readOnly, "readOnly"); 332 } else if (request != null) { 333 request.serialize(boa, "request"); 334 } 335 baos.close(); 336 this.bb = ByteBuffer.wrap(baos.toByteArray()); 337 this.bb.putInt(this.bb.capacity() - 4); 338 this.bb.rewind(); 339 } catch (IOException e) { 340 LOG.warn("Unexpected exception", e); 341 } 342 } 343 344 @Override toString()345 public String toString() { 346 StringBuilder sb = new StringBuilder(); 347 348 sb.append("clientPath:" + clientPath); 349 sb.append(" serverPath:" + serverPath); 350 sb.append(" finished:" + finished); 351 352 sb.append(" header:: " + requestHeader); 353 sb.append(" replyHeader:: " + replyHeader); 354 sb.append(" request:: " + request); 355 sb.append(" response:: " + response); 356 357 // jute toString is horrible, remove unnecessary newlines 358 return sb.toString().replaceAll("\r*\n+", " "); 359 } 360 361 } 362 363 /** 364 * Creates a connection object. The actual network connect doesn't get 365 * established until needed. The start() instance method must be called 366 * subsequent to construction. 367 * 368 * @param chrootPath the chroot of this client. Should be removed from this Class in ZOOKEEPER-838 369 * @param hostProvider the list of ZooKeeper servers to connect to 370 * @param sessionTimeout the timeout for connections. 371 * @param clientConfig the client configuration. 372 * @param defaultWatcher default watcher for this connection 373 * @param clientCnxnSocket the socket implementation used (e.g. NIO/Netty) 374 * @param canBeReadOnly whether the connection is allowed to go to read-only mode in case of partitioning 375 */ ClientCnxn( String chrootPath, HostProvider hostProvider, int sessionTimeout, ZKClientConfig clientConfig, Watcher defaultWatcher, ClientCnxnSocket clientCnxnSocket, boolean canBeReadOnly )376 public ClientCnxn( 377 String chrootPath, 378 HostProvider hostProvider, 379 int sessionTimeout, 380 ZKClientConfig clientConfig, 381 Watcher defaultWatcher, 382 ClientCnxnSocket clientCnxnSocket, 383 boolean canBeReadOnly 384 ) throws IOException { 385 this( 386 chrootPath, 387 hostProvider, 388 sessionTimeout, 389 clientConfig, 390 defaultWatcher, 391 clientCnxnSocket, 392 0, 393 new byte[16], 394 canBeReadOnly); 395 } 396 397 /** 398 * Creates a connection object. The actual network connect doesn't get 399 * established until needed. The start() instance method must be called 400 * subsequent to construction. 401 * 402 * @param chrootPath the chroot of this client. Should be removed from this Class in ZOOKEEPER-838 403 * @param hostProvider the list of ZooKeeper servers to connect to 404 * @param sessionTimeout the timeout for connections. 405 * @param clientConfig the client configuration. 406 * @param defaultWatcher default watcher for this connection 407 * @param clientCnxnSocket the socket implementation used (e.g. NIO/Netty) 408 * @param sessionId session id if re-establishing session 409 * @param sessionPasswd session passwd if re-establishing session 410 * @param canBeReadOnly whether the connection is allowed to go to read-only mode in case of partitioning 411 * @throws IOException in cases of broken network 412 */ ClientCnxn( String chrootPath, HostProvider hostProvider, int sessionTimeout, ZKClientConfig clientConfig, Watcher defaultWatcher, ClientCnxnSocket clientCnxnSocket, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly )413 public ClientCnxn( 414 String chrootPath, 415 HostProvider hostProvider, 416 int sessionTimeout, 417 ZKClientConfig clientConfig, 418 Watcher defaultWatcher, 419 ClientCnxnSocket clientCnxnSocket, 420 long sessionId, 421 byte[] sessionPasswd, 422 boolean canBeReadOnly 423 ) throws IOException { 424 this.chrootPath = chrootPath; 425 this.hostProvider = hostProvider; 426 this.sessionTimeout = sessionTimeout; 427 this.clientConfig = clientConfig; 428 this.sessionId = sessionId; 429 this.sessionPasswd = sessionPasswd; 430 this.readOnly = canBeReadOnly; 431 432 this.watchManager = new ZKWatchManager( 433 clientConfig.getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET), 434 defaultWatcher); 435 436 this.connectTimeout = sessionTimeout / hostProvider.size(); 437 this.readTimeout = sessionTimeout * 2 / 3; 438 439 this.sendThread = new SendThread(clientCnxnSocket); 440 this.eventThread = new EventThread(); 441 initRequestTimeout(); 442 } 443 start()444 public void start() { 445 sendThread.start(); 446 eventThread.start(); 447 } 448 449 private Object eventOfDeath = new Object(); 450 451 private static class WatcherSetEventPair { 452 453 private final Set<Watcher> watchers; 454 private final WatchedEvent event; 455 WatcherSetEventPair(Set<Watcher> watchers, WatchedEvent event)456 public WatcherSetEventPair(Set<Watcher> watchers, WatchedEvent event) { 457 this.watchers = watchers; 458 this.event = event; 459 } 460 461 } 462 463 /** 464 * Guard against creating "-EventThread-EventThread-EventThread-..." thread 465 * names when ZooKeeper object is being created from within a watcher. 466 * See ZOOKEEPER-795 for details. 467 */ makeThreadName(String suffix)468 private static String makeThreadName(String suffix) { 469 String name = Thread.currentThread().getName().replaceAll("-EventThread", ""); 470 return name + suffix; 471 } 472 473 /** 474 * Tests that current thread is the main event loop. 475 * This method is useful only for tests inside ZooKeeper project 476 * it is not a public API intended for use by external applications. 477 * @return true if Thread.currentThread() is an EventThread. 478 */ isInEventThread()479 public static boolean isInEventThread() { 480 return Thread.currentThread() instanceof EventThread; 481 } 482 483 class EventThread extends ZooKeeperThread { 484 485 private final LinkedBlockingQueue<Object> waitingEvents = new LinkedBlockingQueue<Object>(); 486 487 /** This is really the queued session state until the event 488 * thread actually processes the event and hands it to the watcher. 489 * But for all intents and purposes this is the state. 490 */ 491 private volatile KeeperState sessionState = KeeperState.Disconnected; 492 493 private volatile boolean wasKilled = false; 494 private volatile boolean isRunning = false; 495 EventThread()496 EventThread() { 497 super(makeThreadName("-EventThread")); 498 setDaemon(true); 499 } 500 queueEvent(WatchedEvent event)501 public void queueEvent(WatchedEvent event) { 502 queueEvent(event, null); 503 } 504 queueEvent(WatchedEvent event, Set<Watcher> materializedWatchers)505 private void queueEvent(WatchedEvent event, Set<Watcher> materializedWatchers) { 506 if (event.getType() == EventType.None && sessionState == event.getState()) { 507 return; 508 } 509 sessionState = event.getState(); 510 final Set<Watcher> watchers; 511 if (materializedWatchers == null) { 512 // materialize the watchers based on the event 513 watchers = watchManager.materialize(event.getState(), event.getType(), event.getPath()); 514 } else { 515 watchers = new HashSet<>(materializedWatchers); 516 } 517 WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event); 518 // queue the pair (watch set & event) for later processing 519 waitingEvents.add(pair); 520 } 521 queueCallback(AsyncCallback cb, int rc, String path, Object ctx)522 public void queueCallback(AsyncCallback cb, int rc, String path, Object ctx) { 523 waitingEvents.add(new LocalCallback(cb, rc, path, ctx)); 524 } 525 526 @SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER") queuePacket(Packet packet)527 public void queuePacket(Packet packet) { 528 if (wasKilled) { 529 synchronized (waitingEvents) { 530 if (isRunning) { 531 waitingEvents.add(packet); 532 } else { 533 processEvent(packet); 534 } 535 } 536 } else { 537 waitingEvents.add(packet); 538 } 539 } 540 queueEventOfDeath()541 public void queueEventOfDeath() { 542 waitingEvents.add(eventOfDeath); 543 } 544 545 @Override 546 @SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER") run()547 public void run() { 548 try { 549 isRunning = true; 550 while (true) { 551 Object event = waitingEvents.take(); 552 if (event == eventOfDeath) { 553 wasKilled = true; 554 } else { 555 processEvent(event); 556 } 557 if (wasKilled) { 558 synchronized (waitingEvents) { 559 if (waitingEvents.isEmpty()) { 560 isRunning = false; 561 break; 562 } 563 } 564 } 565 } 566 } catch (InterruptedException e) { 567 LOG.error("Event thread exiting due to interruption", e); 568 } 569 570 LOG.info("EventThread shut down for session: 0x{}", Long.toHexString(getSessionId())); 571 } 572 processEvent(Object event)573 private void processEvent(Object event) { 574 try { 575 if (event instanceof WatcherSetEventPair) { 576 // each watcher will process the event 577 WatcherSetEventPair pair = (WatcherSetEventPair) event; 578 for (Watcher watcher : pair.watchers) { 579 try { 580 watcher.process(pair.event); 581 } catch (Throwable t) { 582 LOG.error("Error while calling watcher.", t); 583 } 584 } 585 } else if (event instanceof LocalCallback) { 586 LocalCallback lcb = (LocalCallback) event; 587 if (lcb.cb instanceof StatCallback) { 588 ((StatCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, null); 589 } else if (lcb.cb instanceof DataCallback) { 590 ((DataCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, null, null); 591 } else if (lcb.cb instanceof ACLCallback) { 592 ((ACLCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, null, null); 593 } else if (lcb.cb instanceof ChildrenCallback) { 594 ((ChildrenCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, null); 595 } else if (lcb.cb instanceof Children2Callback) { 596 ((Children2Callback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, null, null); 597 } else if (lcb.cb instanceof StringCallback) { 598 ((StringCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, null); 599 } else if (lcb.cb instanceof AsyncCallback.EphemeralsCallback) { 600 ((AsyncCallback.EphemeralsCallback) lcb.cb).processResult(lcb.rc, lcb.ctx, null); 601 } else if (lcb.cb instanceof AsyncCallback.AllChildrenNumberCallback) { 602 ((AsyncCallback.AllChildrenNumberCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, -1); 603 } else if (lcb.cb instanceof AsyncCallback.MultiCallback) { 604 ((AsyncCallback.MultiCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, Collections.emptyList()); 605 } else { 606 ((VoidCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx); 607 } 608 } else { 609 Packet p = (Packet) event; 610 int rc = 0; 611 String clientPath = p.clientPath; 612 if (p.replyHeader.getErr() != 0) { 613 rc = p.replyHeader.getErr(); 614 } 615 if (p.cb == null) { 616 LOG.warn("Somehow a null cb got to EventThread!"); 617 } else if (p.response instanceof ExistsResponse 618 || p.response instanceof SetDataResponse 619 || p.response instanceof SetACLResponse) { 620 StatCallback cb = (StatCallback) p.cb; 621 if (rc == Code.OK.intValue()) { 622 if (p.response instanceof ExistsResponse) { 623 cb.processResult(rc, clientPath, p.ctx, ((ExistsResponse) p.response).getStat()); 624 } else if (p.response instanceof SetDataResponse) { 625 cb.processResult(rc, clientPath, p.ctx, ((SetDataResponse) p.response).getStat()); 626 } else if (p.response instanceof SetACLResponse) { 627 cb.processResult(rc, clientPath, p.ctx, ((SetACLResponse) p.response).getStat()); 628 } 629 } else { 630 cb.processResult(rc, clientPath, p.ctx, null); 631 } 632 } else if (p.response instanceof GetDataResponse) { 633 DataCallback cb = (DataCallback) p.cb; 634 GetDataResponse rsp = (GetDataResponse) p.response; 635 if (rc == Code.OK.intValue()) { 636 cb.processResult(rc, clientPath, p.ctx, rsp.getData(), rsp.getStat()); 637 } else { 638 cb.processResult(rc, clientPath, p.ctx, null, null); 639 } 640 } else if (p.response instanceof GetACLResponse) { 641 ACLCallback cb = (ACLCallback) p.cb; 642 GetACLResponse rsp = (GetACLResponse) p.response; 643 if (rc == Code.OK.intValue()) { 644 cb.processResult(rc, clientPath, p.ctx, rsp.getAcl(), rsp.getStat()); 645 } else { 646 cb.processResult(rc, clientPath, p.ctx, null, null); 647 } 648 } else if (p.response instanceof GetChildrenResponse) { 649 ChildrenCallback cb = (ChildrenCallback) p.cb; 650 GetChildrenResponse rsp = (GetChildrenResponse) p.response; 651 if (rc == Code.OK.intValue()) { 652 cb.processResult(rc, clientPath, p.ctx, rsp.getChildren()); 653 } else { 654 cb.processResult(rc, clientPath, p.ctx, null); 655 } 656 } else if (p.response instanceof GetAllChildrenNumberResponse) { 657 AllChildrenNumberCallback cb = (AllChildrenNumberCallback) p.cb; 658 GetAllChildrenNumberResponse rsp = (GetAllChildrenNumberResponse) p.response; 659 if (rc == Code.OK.intValue()) { 660 cb.processResult(rc, clientPath, p.ctx, rsp.getTotalNumber()); 661 } else { 662 cb.processResult(rc, clientPath, p.ctx, -1); 663 } 664 } else if (p.response instanceof GetChildren2Response) { 665 Children2Callback cb = (Children2Callback) p.cb; 666 GetChildren2Response rsp = (GetChildren2Response) p.response; 667 if (rc == Code.OK.intValue()) { 668 cb.processResult(rc, clientPath, p.ctx, rsp.getChildren(), rsp.getStat()); 669 } else { 670 cb.processResult(rc, clientPath, p.ctx, null, null); 671 } 672 } else if (p.response instanceof CreateResponse) { 673 StringCallback cb = (StringCallback) p.cb; 674 CreateResponse rsp = (CreateResponse) p.response; 675 if (rc == Code.OK.intValue()) { 676 cb.processResult( 677 rc, 678 clientPath, 679 p.ctx, 680 (chrootPath == null 681 ? rsp.getPath() 682 : rsp.getPath().substring(chrootPath.length()))); 683 } else { 684 cb.processResult(rc, clientPath, p.ctx, null); 685 } 686 } else if (p.response instanceof Create2Response) { 687 Create2Callback cb = (Create2Callback) p.cb; 688 Create2Response rsp = (Create2Response) p.response; 689 if (rc == Code.OK.intValue()) { 690 cb.processResult( 691 rc, 692 clientPath, 693 p.ctx, 694 (chrootPath == null 695 ? rsp.getPath() 696 : rsp.getPath().substring(chrootPath.length())), 697 rsp.getStat()); 698 } else { 699 cb.processResult(rc, clientPath, p.ctx, null, null); 700 } 701 } else if (p.response instanceof MultiResponse) { 702 MultiCallback cb = (MultiCallback) p.cb; 703 MultiResponse rsp = (MultiResponse) p.response; 704 if (rc == Code.OK.intValue()) { 705 List<OpResult> results = rsp.getResultList(); 706 int newRc = rc; 707 for (OpResult result : results) { 708 if (result instanceof ErrorResult 709 && KeeperException.Code.OK.intValue() 710 != (newRc = ((ErrorResult) result).getErr())) { 711 break; 712 } 713 } 714 cb.processResult(newRc, clientPath, p.ctx, results); 715 } else { 716 cb.processResult(rc, clientPath, p.ctx, null); 717 } 718 } else if (p.response instanceof GetEphemeralsResponse) { 719 EphemeralsCallback cb = (EphemeralsCallback) p.cb; 720 GetEphemeralsResponse rsp = (GetEphemeralsResponse) p.response; 721 if (rc == Code.OK.intValue()) { 722 cb.processResult(rc, p.ctx, rsp.getEphemerals()); 723 } else { 724 cb.processResult(rc, p.ctx, null); 725 } 726 } else if (p.cb instanceof VoidCallback) { 727 VoidCallback cb = (VoidCallback) p.cb; 728 cb.processResult(rc, clientPath, p.ctx); 729 } 730 } 731 } catch (Throwable t) { 732 LOG.error("Unexpected throwable", t); 733 } 734 } 735 736 } 737 738 // @VisibleForTesting finishPacket(Packet p)739 protected void finishPacket(Packet p) { 740 int err = p.replyHeader.getErr(); 741 if (p.watchRegistration != null) { 742 p.watchRegistration.register(err); 743 } 744 // Add all the removed watch events to the event queue, so that the 745 // clients will be notified with 'Data/Child WatchRemoved' event type. 746 if (p.watchDeregistration != null) { 747 Map<EventType, Set<Watcher>> materializedWatchers = null; 748 try { 749 materializedWatchers = p.watchDeregistration.unregister(err); 750 for (Entry<EventType, Set<Watcher>> entry : materializedWatchers.entrySet()) { 751 Set<Watcher> watchers = entry.getValue(); 752 if (watchers.size() > 0) { 753 queueEvent(p.watchDeregistration.getClientPath(), err, watchers, entry.getKey()); 754 // ignore connectionloss when removing from local 755 // session 756 p.replyHeader.setErr(Code.OK.intValue()); 757 } 758 } 759 } catch (KeeperException.NoWatcherException nwe) { 760 p.replyHeader.setErr(nwe.code().intValue()); 761 } catch (KeeperException ke) { 762 p.replyHeader.setErr(ke.code().intValue()); 763 } 764 } 765 766 if (p.cb == null) { 767 synchronized (p) { 768 p.finished = true; 769 p.notifyAll(); 770 } 771 } else { 772 p.finished = true; 773 eventThread.queuePacket(p); 774 } 775 } 776 queueEvent(String clientPath, int err, Set<Watcher> materializedWatchers, EventType eventType)777 void queueEvent(String clientPath, int err, Set<Watcher> materializedWatchers, EventType eventType) { 778 KeeperState sessionState = KeeperState.SyncConnected; 779 if (KeeperException.Code.SESSIONEXPIRED.intValue() == err 780 || KeeperException.Code.CONNECTIONLOSS.intValue() == err) { 781 sessionState = Event.KeeperState.Disconnected; 782 } 783 WatchedEvent event = new WatchedEvent(eventType, sessionState, clientPath); 784 eventThread.queueEvent(event, materializedWatchers); 785 } 786 queueCallback(AsyncCallback cb, int rc, String path, Object ctx)787 void queueCallback(AsyncCallback cb, int rc, String path, Object ctx) { 788 eventThread.queueCallback(cb, rc, path, ctx); 789 } 790 791 // for test only onConnecting(InetSocketAddress addr)792 protected void onConnecting(InetSocketAddress addr) { 793 794 } 795 conLossPacket(Packet p)796 private void conLossPacket(Packet p) { 797 if (p.replyHeader == null) { 798 return; 799 } 800 switch (state) { 801 case AUTH_FAILED: 802 p.replyHeader.setErr(KeeperException.Code.AUTHFAILED.intValue()); 803 break; 804 case CLOSED: 805 p.replyHeader.setErr(KeeperException.Code.SESSIONEXPIRED.intValue()); 806 break; 807 default: 808 p.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue()); 809 } 810 finishPacket(p); 811 } 812 813 private volatile long lastZxid; 814 getLastZxid()815 public long getLastZxid() { 816 return lastZxid; 817 } 818 819 static class EndOfStreamException extends IOException { 820 821 private static final long serialVersionUID = -5438877188796231422L; 822 EndOfStreamException(String msg)823 public EndOfStreamException(String msg) { 824 super(msg); 825 } 826 827 @Override toString()828 public String toString() { 829 return "EndOfStreamException: " + getMessage(); 830 } 831 832 } 833 834 private static class SessionTimeoutException extends IOException { 835 836 private static final long serialVersionUID = 824482094072071178L; 837 SessionTimeoutException(String msg)838 public SessionTimeoutException(String msg) { 839 super(msg); 840 } 841 842 } 843 844 private static class SessionExpiredException extends IOException { 845 846 private static final long serialVersionUID = -1388816932076193249L; 847 SessionExpiredException(String msg)848 public SessionExpiredException(String msg) { 849 super(msg); 850 } 851 852 } 853 854 private static class RWServerFoundException extends IOException { 855 856 private static final long serialVersionUID = 90431199887158758L; 857 RWServerFoundException(String msg)858 public RWServerFoundException(String msg) { 859 super(msg); 860 } 861 862 } 863 864 /** 865 * This class services the outgoing request queue and generates the heart 866 * beats. It also spawns the ReadThread. 867 */ 868 class SendThread extends ZooKeeperThread { 869 870 private long lastPingSentNs; 871 private final ClientCnxnSocket clientCnxnSocket; 872 private boolean isFirstConnect = true; 873 readResponse(ByteBuffer incomingBuffer)874 void readResponse(ByteBuffer incomingBuffer) throws IOException { 875 ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer); 876 BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis); 877 ReplyHeader replyHdr = new ReplyHeader(); 878 879 replyHdr.deserialize(bbia, "header"); 880 switch (replyHdr.getXid()) { 881 case PING_XID: 882 LOG.debug("Got ping response for session id: 0x{} after {}ms.", 883 Long.toHexString(sessionId), 884 ((System.nanoTime() - lastPingSentNs) / 1000000)); 885 return; 886 case AUTHPACKET_XID: 887 LOG.debug("Got auth session id: 0x{}", Long.toHexString(sessionId)); 888 if (replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) { 889 changeZkState(States.AUTH_FAILED); 890 eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, 891 Watcher.Event.KeeperState.AuthFailed, null)); 892 eventThread.queueEventOfDeath(); 893 } 894 return; 895 case NOTIFICATION_XID: 896 LOG.debug("Got notification session id: 0x{}", 897 Long.toHexString(sessionId)); 898 WatcherEvent event = new WatcherEvent(); 899 event.deserialize(bbia, "response"); 900 901 // convert from a server path to a client path 902 if (chrootPath != null) { 903 String serverPath = event.getPath(); 904 if (serverPath.compareTo(chrootPath) == 0) { 905 event.setPath("/"); 906 } else if (serverPath.length() > chrootPath.length()) { 907 event.setPath(serverPath.substring(chrootPath.length())); 908 } else { 909 LOG.warn("Got server path {} which is too short for chroot path {}.", 910 event.getPath(), chrootPath); 911 } 912 } 913 914 WatchedEvent we = new WatchedEvent(event); 915 LOG.debug("Got {} for session id 0x{}", we, Long.toHexString(sessionId)); 916 eventThread.queueEvent(we); 917 return; 918 default: 919 break; 920 } 921 922 // If SASL authentication is currently in progress, construct and 923 // send a response packet immediately, rather than queuing a 924 // response as with other packets. 925 if (tunnelAuthInProgress()) { 926 GetSASLRequest request = new GetSASLRequest(); 927 request.deserialize(bbia, "token"); 928 zooKeeperSaslClient.respondToServer(request.getToken(), ClientCnxn.this); 929 return; 930 } 931 932 Packet packet; 933 synchronized (pendingQueue) { 934 if (pendingQueue.size() == 0) { 935 throw new IOException("Nothing in the queue, but got " + replyHdr.getXid()); 936 } 937 packet = pendingQueue.remove(); 938 } 939 /* 940 * Since requests are processed in order, we better get a response 941 * to the first request! 942 */ 943 try { 944 if (packet.requestHeader.getXid() != replyHdr.getXid()) { 945 packet.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue()); 946 throw new IOException("Xid out of order. Got Xid " + replyHdr.getXid() 947 + " with err " + replyHdr.getErr() 948 + " expected Xid " + packet.requestHeader.getXid() 949 + " for a packet with details: " + packet); 950 } 951 952 packet.replyHeader.setXid(replyHdr.getXid()); 953 packet.replyHeader.setErr(replyHdr.getErr()); 954 packet.replyHeader.setZxid(replyHdr.getZxid()); 955 if (replyHdr.getZxid() > 0) { 956 lastZxid = replyHdr.getZxid(); 957 } 958 if (packet.response != null && replyHdr.getErr() == 0) { 959 packet.response.deserialize(bbia, "response"); 960 } 961 962 LOG.debug("Reading reply session id: 0x{}, packet:: {}", Long.toHexString(sessionId), packet); 963 } finally { 964 finishPacket(packet); 965 } 966 } 967 SendThread(ClientCnxnSocket clientCnxnSocket)968 SendThread(ClientCnxnSocket clientCnxnSocket) throws IOException { 969 super(makeThreadName("-SendThread()")); 970 changeZkState(States.CONNECTING); 971 this.clientCnxnSocket = clientCnxnSocket; 972 setDaemon(true); 973 } 974 975 // TODO: can not name this method getState since Thread.getState() 976 // already exists 977 // It would be cleaner to make class SendThread an implementation of 978 // Runnable 979 /** 980 * Used by ClientCnxnSocket 981 * 982 * @return 983 */ getZkState()984 synchronized ZooKeeper.States getZkState() { 985 return state; 986 } 987 changeZkState(ZooKeeper.States newState)988 synchronized void changeZkState(ZooKeeper.States newState) throws IOException { 989 if (!state.isAlive() && newState == States.CONNECTING) { 990 throw new IOException( 991 "Connection has already been closed and reconnection is not allowed"); 992 } 993 // It's safer to place state modification at the end. 994 state = newState; 995 } 996 getClientCnxnSocket()997 ClientCnxnSocket getClientCnxnSocket() { 998 return clientCnxnSocket; 999 } 1000 1001 /** 1002 * Setup session, previous watches, authentication. 1003 */ primeConnection()1004 void primeConnection() throws IOException { 1005 LOG.info( 1006 "Socket connection established, initiating session, client: {}, server: {}", 1007 clientCnxnSocket.getLocalSocketAddress(), 1008 clientCnxnSocket.getRemoteSocketAddress()); 1009 isFirstConnect = false; 1010 long sessId = (seenRwServerBefore) ? sessionId : 0; 1011 ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd); 1012 // We add backwards since we are pushing into the front 1013 // Only send if there's a pending watch 1014 if (!clientConfig.getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET)) { 1015 List<String> dataWatches = watchManager.getDataWatchList(); 1016 List<String> existWatches = watchManager.getExistWatchList(); 1017 List<String> childWatches = watchManager.getChildWatchList(); 1018 List<String> persistentWatches = watchManager.getPersistentWatchList(); 1019 List<String> persistentRecursiveWatches = watchManager.getPersistentRecursiveWatchList(); 1020 if (!dataWatches.isEmpty() || !existWatches.isEmpty() || !childWatches.isEmpty() 1021 || !persistentWatches.isEmpty() || !persistentRecursiveWatches.isEmpty()) { 1022 Iterator<String> dataWatchesIter = prependChroot(dataWatches).iterator(); 1023 Iterator<String> existWatchesIter = prependChroot(existWatches).iterator(); 1024 Iterator<String> childWatchesIter = prependChroot(childWatches).iterator(); 1025 Iterator<String> persistentWatchesIter = prependChroot(persistentWatches).iterator(); 1026 Iterator<String> persistentRecursiveWatchesIter = prependChroot(persistentRecursiveWatches).iterator(); 1027 long setWatchesLastZxid = lastZxid; 1028 1029 while (dataWatchesIter.hasNext() || existWatchesIter.hasNext() || childWatchesIter.hasNext() 1030 || persistentWatchesIter.hasNext() || persistentRecursiveWatchesIter.hasNext()) { 1031 List<String> dataWatchesBatch = new ArrayList<String>(); 1032 List<String> existWatchesBatch = new ArrayList<String>(); 1033 List<String> childWatchesBatch = new ArrayList<String>(); 1034 List<String> persistentWatchesBatch = new ArrayList<String>(); 1035 List<String> persistentRecursiveWatchesBatch = new ArrayList<String>(); 1036 int batchLength = 0; 1037 1038 // Note, we may exceed our max length by a bit when we add the last 1039 // watch in the batch. This isn't ideal, but it makes the code simpler. 1040 while (batchLength < SET_WATCHES_MAX_LENGTH) { 1041 final String watch; 1042 if (dataWatchesIter.hasNext()) { 1043 watch = dataWatchesIter.next(); 1044 dataWatchesBatch.add(watch); 1045 } else if (existWatchesIter.hasNext()) { 1046 watch = existWatchesIter.next(); 1047 existWatchesBatch.add(watch); 1048 } else if (childWatchesIter.hasNext()) { 1049 watch = childWatchesIter.next(); 1050 childWatchesBatch.add(watch); 1051 } else if (persistentWatchesIter.hasNext()) { 1052 watch = persistentWatchesIter.next(); 1053 persistentWatchesBatch.add(watch); 1054 } else if (persistentRecursiveWatchesIter.hasNext()) { 1055 watch = persistentRecursiveWatchesIter.next(); 1056 persistentRecursiveWatchesBatch.add(watch); 1057 } else { 1058 break; 1059 } 1060 batchLength += watch.length(); 1061 } 1062 1063 Record record; 1064 int opcode; 1065 if (persistentWatchesBatch.isEmpty() && persistentRecursiveWatchesBatch.isEmpty()) { 1066 // maintain compatibility with older servers - if no persistent/recursive watchers 1067 // are used, use the old version of SetWatches 1068 record = new SetWatches(setWatchesLastZxid, dataWatchesBatch, existWatchesBatch, childWatchesBatch); 1069 opcode = OpCode.setWatches; 1070 } else { 1071 record = new SetWatches2(setWatchesLastZxid, dataWatchesBatch, existWatchesBatch, 1072 childWatchesBatch, persistentWatchesBatch, persistentRecursiveWatchesBatch); 1073 opcode = OpCode.setWatches2; 1074 } 1075 RequestHeader header = new RequestHeader(ClientCnxn.SET_WATCHES_XID, opcode); 1076 Packet packet = new Packet(header, new ReplyHeader(), record, null, null); 1077 outgoingQueue.addFirst(packet); 1078 } 1079 } 1080 } 1081 1082 for (AuthData id : authInfo) { 1083 outgoingQueue.addFirst( 1084 new Packet( 1085 new RequestHeader(ClientCnxn.AUTHPACKET_XID, OpCode.auth), 1086 null, 1087 new AuthPacket(0, id.scheme, id.data), 1088 null, 1089 null)); 1090 } 1091 outgoingQueue.addFirst(new Packet(null, null, conReq, null, null, readOnly)); 1092 clientCnxnSocket.connectionPrimed(); 1093 LOG.debug("Session establishment request sent on {}", clientCnxnSocket.getRemoteSocketAddress()); 1094 } 1095 prependChroot(List<String> paths)1096 private List<String> prependChroot(List<String> paths) { 1097 if (chrootPath != null && !paths.isEmpty()) { 1098 for (int i = 0; i < paths.size(); ++i) { 1099 String clientPath = paths.get(i); 1100 String serverPath; 1101 // handle clientPath = "/" 1102 if (clientPath.length() == 1) { 1103 serverPath = chrootPath; 1104 } else { 1105 serverPath = chrootPath + clientPath; 1106 } 1107 paths.set(i, serverPath); 1108 } 1109 } 1110 return paths; 1111 } 1112 sendPing()1113 private void sendPing() { 1114 lastPingSentNs = System.nanoTime(); 1115 RequestHeader h = new RequestHeader(ClientCnxn.PING_XID, OpCode.ping); 1116 queuePacket(h, null, null, null, null, null, null, null, null); 1117 } 1118 1119 private InetSocketAddress rwServerAddress = null; 1120 1121 private static final int minPingRwTimeout = 100; 1122 1123 private static final int maxPingRwTimeout = 60000; 1124 1125 private int pingRwTimeout = minPingRwTimeout; 1126 1127 // Set to true if and only if constructor of ZooKeeperSaslClient 1128 // throws a LoginException: see startConnect() below. 1129 private boolean saslLoginFailed = false; 1130 startConnect(InetSocketAddress addr)1131 private void startConnect(InetSocketAddress addr) throws IOException { 1132 // initializing it for new connection 1133 saslLoginFailed = false; 1134 if (!isFirstConnect) { 1135 try { 1136 Thread.sleep(ThreadLocalRandom.current().nextLong(1000)); 1137 } catch (InterruptedException e) { 1138 LOG.warn("Unexpected exception", e); 1139 } 1140 } 1141 changeZkState(States.CONNECTING); 1142 1143 String hostPort = addr.getHostString() + ":" + addr.getPort(); 1144 MDC.put("myid", hostPort); 1145 setName(getName().replaceAll("\\(.*\\)", "(" + hostPort + ")")); 1146 if (clientConfig.isSaslClientEnabled()) { 1147 try { 1148 if (zooKeeperSaslClient != null) { 1149 zooKeeperSaslClient.shutdown(); 1150 } 1151 zooKeeperSaslClient = new ZooKeeperSaslClient(SaslServerPrincipal.getServerPrincipal(addr, clientConfig), clientConfig); 1152 } catch (LoginException e) { 1153 // An authentication error occurred when the SASL client tried to initialize: 1154 // for Kerberos this means that the client failed to authenticate with the KDC. 1155 // This is different from an authentication error that occurs during communication 1156 // with the Zookeeper server, which is handled below. 1157 LOG.warn( 1158 "SASL configuration failed. " 1159 + "Will continue connection to Zookeeper server without " 1160 + "SASL authentication, if Zookeeper server allows it.", e); 1161 eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null)); 1162 saslLoginFailed = true; 1163 } 1164 } 1165 logStartConnect(addr); 1166 1167 clientCnxnSocket.connect(addr); 1168 } 1169 logStartConnect(InetSocketAddress addr)1170 private void logStartConnect(InetSocketAddress addr) { 1171 LOG.info("Opening socket connection to server {}.", addr); 1172 if (zooKeeperSaslClient != null) { 1173 LOG.info("SASL config status: {}", zooKeeperSaslClient.getConfigStatus()); 1174 } 1175 } 1176 1177 @Override run()1178 public void run() { 1179 clientCnxnSocket.introduce(this, sessionId, outgoingQueue); 1180 clientCnxnSocket.updateNow(); 1181 clientCnxnSocket.updateLastSendAndHeard(); 1182 int to; 1183 long lastPingRwServer = Time.currentElapsedTime(); 1184 final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds 1185 InetSocketAddress serverAddress = null; 1186 while (state.isAlive()) { 1187 try { 1188 if (!clientCnxnSocket.isConnected()) { 1189 // don't re-establish connection if we are closing 1190 if (closing) { 1191 break; 1192 } 1193 if (rwServerAddress != null) { 1194 serverAddress = rwServerAddress; 1195 rwServerAddress = null; 1196 } else { 1197 serverAddress = hostProvider.next(1000); 1198 } 1199 onConnecting(serverAddress); 1200 startConnect(serverAddress); 1201 clientCnxnSocket.updateLastSendAndHeard(); 1202 } 1203 1204 if (state.isConnected()) { 1205 // determine whether we need to send an AuthFailed event. 1206 if (zooKeeperSaslClient != null) { 1207 boolean sendAuthEvent = false; 1208 if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) { 1209 try { 1210 zooKeeperSaslClient.initialize(ClientCnxn.this); 1211 } catch (SaslException e) { 1212 LOG.error("SASL authentication with Zookeeper Quorum member failed.", e); 1213 changeZkState(States.AUTH_FAILED); 1214 sendAuthEvent = true; 1215 } 1216 } 1217 KeeperState authState = zooKeeperSaslClient.getKeeperState(); 1218 if (authState != null) { 1219 if (authState == KeeperState.AuthFailed) { 1220 // An authentication error occurred during authentication with the Zookeeper Server. 1221 changeZkState(States.AUTH_FAILED); 1222 sendAuthEvent = true; 1223 } else { 1224 if (authState == KeeperState.SaslAuthenticated) { 1225 sendAuthEvent = true; 1226 } 1227 } 1228 } 1229 1230 if (sendAuthEvent) { 1231 eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, authState, null)); 1232 if (state == States.AUTH_FAILED) { 1233 eventThread.queueEventOfDeath(); 1234 } 1235 } 1236 } 1237 to = readTimeout - clientCnxnSocket.getIdleRecv(); 1238 } else { 1239 to = connectTimeout - clientCnxnSocket.getIdleRecv(); 1240 } 1241 1242 if (to <= 0) { 1243 String warnInfo = String.format( 1244 "Client session timed out, have not heard from server in %dms for session id 0x%s", 1245 clientCnxnSocket.getIdleRecv(), 1246 Long.toHexString(sessionId)); 1247 LOG.warn(warnInfo); 1248 throw new SessionTimeoutException(warnInfo); 1249 } 1250 if (state.isConnected()) { 1251 //1000(1 second) is to prevent race condition missing to send the second ping 1252 //also make sure not to send too many pings when readTimeout is small 1253 int timeToNextPing = readTimeout / 2 1254 - clientCnxnSocket.getIdleSend() 1255 - ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0); 1256 //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL 1257 if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) { 1258 sendPing(); 1259 clientCnxnSocket.updateLastSend(); 1260 } else { 1261 if (timeToNextPing < to) { 1262 to = timeToNextPing; 1263 } 1264 } 1265 } 1266 1267 // If we are in read-only mode, seek for read/write server 1268 if (state == States.CONNECTEDREADONLY) { 1269 long now = Time.currentElapsedTime(); 1270 int idlePingRwServer = (int) (now - lastPingRwServer); 1271 if (idlePingRwServer >= pingRwTimeout) { 1272 lastPingRwServer = now; 1273 idlePingRwServer = 0; 1274 pingRwTimeout = Math.min(2 * pingRwTimeout, maxPingRwTimeout); 1275 pingRwServer(); 1276 } 1277 to = Math.min(to, pingRwTimeout - idlePingRwServer); 1278 } 1279 1280 clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this); 1281 } catch (Throwable e) { 1282 if (closing) { 1283 // closing so this is expected 1284 LOG.warn( 1285 "An exception was thrown while closing send thread for session 0x{}.", 1286 Long.toHexString(getSessionId()), 1287 e); 1288 break; 1289 } else { 1290 LOG.warn( 1291 "Session 0x{} for sever {}, Closing socket connection. " 1292 + "Attempting reconnect except it is a SessionExpiredException.", 1293 Long.toHexString(getSessionId()), 1294 serverAddress, 1295 e); 1296 1297 // At this point, there might still be new packets appended to outgoingQueue. 1298 // they will be handled in next connection or cleared up if closed. 1299 cleanAndNotifyState(); 1300 } 1301 } 1302 } 1303 1304 synchronized (state) { 1305 // When it comes to this point, it guarantees that later queued 1306 // packet to outgoingQueue will be notified of death. 1307 cleanup(); 1308 } 1309 clientCnxnSocket.close(); 1310 if (state.isAlive()) { 1311 eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null)); 1312 } 1313 eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Closed, null)); 1314 ZooTrace.logTraceMessage( 1315 LOG, 1316 ZooTrace.getTextTraceLevel(), 1317 "SendThread exited loop for session: 0x" + Long.toHexString(getSessionId())); 1318 } 1319 cleanAndNotifyState()1320 private void cleanAndNotifyState() { 1321 cleanup(); 1322 if (state.isAlive()) { 1323 eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null)); 1324 } 1325 clientCnxnSocket.updateNow(); 1326 clientCnxnSocket.updateLastSendAndHeard(); 1327 } 1328 pingRwServer()1329 private void pingRwServer() throws RWServerFoundException { 1330 String result = null; 1331 InetSocketAddress addr = hostProvider.next(0); 1332 1333 LOG.info("Checking server {} for being r/w. Timeout {}", addr, pingRwTimeout); 1334 1335 Socket sock = null; 1336 BufferedReader br = null; 1337 try { 1338 sock = new Socket(addr.getHostString(), addr.getPort()); 1339 sock.setSoLinger(false, -1); 1340 sock.setSoTimeout(1000); 1341 sock.setTcpNoDelay(true); 1342 sock.getOutputStream().write("isro".getBytes()); 1343 sock.getOutputStream().flush(); 1344 sock.shutdownOutput(); 1345 br = new BufferedReader(new InputStreamReader(sock.getInputStream())); 1346 result = br.readLine(); 1347 } catch (ConnectException e) { 1348 // ignore, this just means server is not up 1349 } catch (IOException e) { 1350 // some unexpected error, warn about it 1351 LOG.warn("Exception while seeking for r/w server.", e); 1352 } finally { 1353 if (sock != null) { 1354 try { 1355 sock.close(); 1356 } catch (IOException e) { 1357 LOG.warn("Unexpected exception", e); 1358 } 1359 } 1360 if (br != null) { 1361 try { 1362 br.close(); 1363 } catch (IOException e) { 1364 LOG.warn("Unexpected exception", e); 1365 } 1366 } 1367 } 1368 1369 if ("rw".equals(result)) { 1370 pingRwTimeout = minPingRwTimeout; 1371 // save the found address so that it's used during the next 1372 // connection attempt 1373 rwServerAddress = addr; 1374 throw new RWServerFoundException("Majority server found at " 1375 + addr.getHostString() + ":" + addr.getPort()); 1376 } 1377 } 1378 cleanup()1379 private void cleanup() { 1380 clientCnxnSocket.cleanup(); 1381 synchronized (pendingQueue) { 1382 for (Packet p : pendingQueue) { 1383 conLossPacket(p); 1384 } 1385 pendingQueue.clear(); 1386 } 1387 // We can't call outgoingQueue.clear() here because 1388 // between iterating and clear up there might be new 1389 // packets added in queuePacket(). 1390 Iterator<Packet> iter = outgoingQueue.iterator(); 1391 while (iter.hasNext()) { 1392 Packet p = iter.next(); 1393 conLossPacket(p); 1394 iter.remove(); 1395 } 1396 } 1397 1398 /** 1399 * Callback invoked by the ClientCnxnSocket once a connection has been 1400 * established. 1401 * 1402 * @param _negotiatedSessionTimeout 1403 * @param _sessionId 1404 * @param _sessionPasswd 1405 * @param isRO 1406 * @throws IOException 1407 */ onConnected( int _negotiatedSessionTimeout, long _sessionId, byte[] _sessionPasswd, boolean isRO)1408 void onConnected( 1409 int _negotiatedSessionTimeout, 1410 long _sessionId, 1411 byte[] _sessionPasswd, 1412 boolean isRO) throws IOException { 1413 negotiatedSessionTimeout = _negotiatedSessionTimeout; 1414 if (negotiatedSessionTimeout <= 0) { 1415 changeZkState(States.CLOSED); 1416 1417 eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, null)); 1418 eventThread.queueEventOfDeath(); 1419 1420 String warnInfo = String.format( 1421 "Unable to reconnect to ZooKeeper service, session 0x%s has expired", 1422 Long.toHexString(sessionId)); 1423 LOG.warn(warnInfo); 1424 throw new SessionExpiredException(warnInfo); 1425 } 1426 1427 if (!readOnly && isRO) { 1428 LOG.error("Read/write client got connected to read-only server"); 1429 } 1430 1431 readTimeout = negotiatedSessionTimeout * 2 / 3; 1432 connectTimeout = negotiatedSessionTimeout / hostProvider.size(); 1433 hostProvider.onConnected(); 1434 sessionId = _sessionId; 1435 sessionPasswd = _sessionPasswd; 1436 changeZkState((isRO) ? States.CONNECTEDREADONLY : States.CONNECTED); 1437 seenRwServerBefore |= !isRO; 1438 LOG.info( 1439 "Session establishment complete on server {}, session id = 0x{}, negotiated timeout = {}{}", 1440 clientCnxnSocket.getRemoteSocketAddress(), 1441 Long.toHexString(sessionId), 1442 negotiatedSessionTimeout, 1443 (isRO ? " (READ-ONLY mode)" : "")); 1444 KeeperState eventState = (isRO) ? KeeperState.ConnectedReadOnly : KeeperState.SyncConnected; 1445 eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, eventState, null)); 1446 } 1447 close()1448 void close() { 1449 try { 1450 changeZkState(States.CLOSED); 1451 } catch (IOException e) { 1452 LOG.warn("Connection close fails when migrates state from {} to CLOSED", 1453 getZkState()); 1454 } 1455 clientCnxnSocket.onClosing(); 1456 } 1457 testableCloseSocket()1458 void testableCloseSocket() throws IOException { 1459 clientCnxnSocket.testableCloseSocket(); 1460 } 1461 tunnelAuthInProgress()1462 public boolean tunnelAuthInProgress() { 1463 // 1. SASL client is disabled. 1464 if (!clientConfig.isSaslClientEnabled()) { 1465 return false; 1466 } 1467 1468 // 2. SASL login failed. 1469 if (saslLoginFailed) { 1470 return false; 1471 } 1472 1473 // 3. SendThread has not created the authenticating object yet, 1474 // therefore authentication is (at the earliest stage of being) in progress. 1475 if (zooKeeperSaslClient == null) { 1476 return true; 1477 } 1478 1479 // 4. authenticating object exists, so ask it for its progress. 1480 return zooKeeperSaslClient.clientTunneledAuthenticationInProgress(); 1481 } 1482 sendPacket(Packet p)1483 public void sendPacket(Packet p) throws IOException { 1484 clientCnxnSocket.sendPacket(p); 1485 } 1486 1487 } 1488 1489 /** 1490 * Shutdown the send/event threads. This method should not be called 1491 * directly - rather it should be called as part of close operation. This 1492 * method is primarily here to allow the tests to verify disconnection 1493 * behavior. 1494 */ disconnect()1495 public void disconnect() { 1496 LOG.debug("Disconnecting client for session: 0x{}", Long.toHexString(getSessionId())); 1497 1498 sendThread.close(); 1499 try { 1500 sendThread.join(); 1501 } catch (InterruptedException ex) { 1502 LOG.warn("Got interrupted while waiting for the sender thread to close", ex); 1503 } 1504 eventThread.queueEventOfDeath(); 1505 if (zooKeeperSaslClient != null) { 1506 zooKeeperSaslClient.shutdown(); 1507 } 1508 } 1509 1510 /** 1511 * Close the connection, which includes; send session disconnect to the 1512 * server, shutdown the send/event threads. 1513 * 1514 * @throws IOException 1515 */ close()1516 public void close() throws IOException { 1517 LOG.debug("Closing client for session: 0x{}", Long.toHexString(getSessionId())); 1518 1519 try { 1520 RequestHeader h = new RequestHeader(); 1521 h.setType(ZooDefs.OpCode.closeSession); 1522 1523 submitRequest(h, null, null, null); 1524 } catch (InterruptedException e) { 1525 // ignore, close the send/event threads 1526 } finally { 1527 disconnect(); 1528 } 1529 } 1530 1531 // @VisibleForTesting 1532 protected int xid = 1; 1533 1534 // @VisibleForTesting 1535 volatile States state = States.NOT_CONNECTED; 1536 1537 /* 1538 * getXid() is called externally by ClientCnxnNIO::doIO() when packets are sent from the outgoingQueue to 1539 * the server. Thus, getXid() must be public. 1540 */ getXid()1541 public synchronized int getXid() { 1542 // Avoid negative cxid values. In particular, cxid values of -4, -2, and -1 are special and 1543 // must not be used for requests -- see SendThread.readResponse. 1544 // Skip from MAX to 1. 1545 if (xid == Integer.MAX_VALUE) { 1546 xid = 1; 1547 } 1548 return xid++; 1549 } 1550 submitRequest( RequestHeader h, Record request, Record response, WatchRegistration watchRegistration)1551 public ReplyHeader submitRequest( 1552 RequestHeader h, 1553 Record request, 1554 Record response, 1555 WatchRegistration watchRegistration) throws InterruptedException { 1556 return submitRequest(h, request, response, watchRegistration, null); 1557 } 1558 submitRequest( RequestHeader h, Record request, Record response, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration)1559 public ReplyHeader submitRequest( 1560 RequestHeader h, 1561 Record request, 1562 Record response, 1563 WatchRegistration watchRegistration, 1564 WatchDeregistration watchDeregistration) throws InterruptedException { 1565 ReplyHeader r = new ReplyHeader(); 1566 Packet packet = queuePacket( 1567 h, 1568 r, 1569 request, 1570 response, 1571 null, 1572 null, 1573 null, 1574 null, 1575 watchRegistration, 1576 watchDeregistration); 1577 synchronized (packet) { 1578 if (requestTimeout > 0) { 1579 // Wait for request completion with timeout 1580 waitForPacketFinish(r, packet); 1581 } else { 1582 // Wait for request completion infinitely 1583 while (!packet.finished) { 1584 packet.wait(); 1585 } 1586 } 1587 } 1588 if (r.getErr() == Code.REQUESTTIMEOUT.intValue()) { 1589 sendThread.cleanAndNotifyState(); 1590 } 1591 return r; 1592 } 1593 1594 /** 1595 * Wait for request completion with timeout. 1596 */ waitForPacketFinish(ReplyHeader r, Packet packet)1597 private void waitForPacketFinish(ReplyHeader r, Packet packet) throws InterruptedException { 1598 long waitStartTime = Time.currentElapsedTime(); 1599 while (!packet.finished) { 1600 packet.wait(requestTimeout); 1601 if (!packet.finished && ((Time.currentElapsedTime() - waitStartTime) >= requestTimeout)) { 1602 LOG.error("Timeout error occurred for the packet '{}'.", packet); 1603 r.setErr(Code.REQUESTTIMEOUT.intValue()); 1604 break; 1605 } 1606 } 1607 } 1608 saslCompleted()1609 public void saslCompleted() { 1610 sendThread.getClientCnxnSocket().saslCompleted(); 1611 } 1612 sendPacket(Record request, Record response, AsyncCallback cb, int opCode)1613 public void sendPacket(Record request, Record response, AsyncCallback cb, int opCode) throws IOException { 1614 // Generate Xid now because it will be sent immediately, 1615 // by call to sendThread.sendPacket() below. 1616 int xid = getXid(); 1617 RequestHeader h = new RequestHeader(); 1618 h.setXid(xid); 1619 h.setType(opCode); 1620 1621 ReplyHeader r = new ReplyHeader(); 1622 r.setXid(xid); 1623 1624 Packet p = new Packet(h, r, request, response, null, false); 1625 p.cb = cb; 1626 sendThread.sendPacket(p); 1627 } 1628 queuePacket( RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration)1629 public Packet queuePacket( 1630 RequestHeader h, 1631 ReplyHeader r, 1632 Record request, 1633 Record response, 1634 AsyncCallback cb, 1635 String clientPath, 1636 String serverPath, 1637 Object ctx, 1638 WatchRegistration watchRegistration) { 1639 return queuePacket(h, r, request, response, cb, clientPath, serverPath, ctx, watchRegistration, null); 1640 } 1641 queuePacket( RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration)1642 public Packet queuePacket( 1643 RequestHeader h, 1644 ReplyHeader r, 1645 Record request, 1646 Record response, 1647 AsyncCallback cb, 1648 String clientPath, 1649 String serverPath, 1650 Object ctx, 1651 WatchRegistration watchRegistration, 1652 WatchDeregistration watchDeregistration) { 1653 Packet packet = null; 1654 1655 // Note that we do not generate the Xid for the packet yet. It is 1656 // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(), 1657 // where the packet is actually sent. 1658 packet = new Packet(h, r, request, response, watchRegistration); 1659 packet.cb = cb; 1660 packet.ctx = ctx; 1661 packet.clientPath = clientPath; 1662 packet.serverPath = serverPath; 1663 packet.watchDeregistration = watchDeregistration; 1664 // The synchronized block here is for two purpose: 1665 // 1. synchronize with the final cleanup() in SendThread.run() to avoid race 1666 // 2. synchronized against each packet. So if a closeSession packet is added, 1667 // later packet will be notified. 1668 synchronized (state) { 1669 if (!state.isAlive() || closing) { 1670 conLossPacket(packet); 1671 } else { 1672 // If the client is asking to close the session then 1673 // mark as closing 1674 if (h.getType() == OpCode.closeSession) { 1675 closing = true; 1676 } 1677 outgoingQueue.add(packet); 1678 } 1679 } 1680 sendThread.getClientCnxnSocket().packetAdded(); 1681 return packet; 1682 } 1683 addAuthInfo(String scheme, byte[] auth)1684 public void addAuthInfo(String scheme, byte[] auth) { 1685 if (!state.isAlive()) { 1686 return; 1687 } 1688 authInfo.add(new AuthData(scheme, auth)); 1689 queuePacket( 1690 new RequestHeader(ClientCnxn.AUTHPACKET_XID, OpCode.auth), 1691 null, 1692 new AuthPacket(0, scheme, auth), 1693 null, 1694 null, 1695 null, 1696 null, 1697 null, 1698 null); 1699 } 1700 getState()1701 States getState() { 1702 return state; 1703 } 1704 1705 private static class LocalCallback { 1706 1707 private final AsyncCallback cb; 1708 private final int rc; 1709 private final String path; 1710 private final Object ctx; 1711 LocalCallback(AsyncCallback cb, int rc, String path, Object ctx)1712 public LocalCallback(AsyncCallback cb, int rc, String path, Object ctx) { 1713 this.cb = cb; 1714 this.rc = rc; 1715 this.path = path; 1716 this.ctx = ctx; 1717 } 1718 1719 } 1720 initRequestTimeout()1721 private void initRequestTimeout() { 1722 try { 1723 requestTimeout = clientConfig.getLong( 1724 ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT, 1725 ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT_DEFAULT); 1726 LOG.info( 1727 "{} value is {}. feature enabled={}", 1728 ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT, 1729 requestTimeout, 1730 requestTimeout > 0); 1731 } catch (NumberFormatException e) { 1732 LOG.error( 1733 "Configured value {} for property {} can not be parsed to long.", 1734 clientConfig.getProperty(ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT), 1735 ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT); 1736 throw e; 1737 } 1738 } 1739 1740 } 1741