1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.zookeeper;
20 
21 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
22 import java.io.BufferedReader;
23 import java.io.ByteArrayOutputStream;
24 import java.io.IOException;
25 import java.io.InputStreamReader;
26 import java.net.ConnectException;
27 import java.net.InetSocketAddress;
28 import java.net.Socket;
29 import java.net.SocketAddress;
30 import java.nio.ByteBuffer;
31 import java.util.ArrayDeque;
32 import java.util.ArrayList;
33 import java.util.Collections;
34 import java.util.HashSet;
35 import java.util.Iterator;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.Map.Entry;
39 import java.util.Queue;
40 import java.util.Set;
41 import java.util.concurrent.CopyOnWriteArraySet;
42 import java.util.concurrent.LinkedBlockingDeque;
43 import java.util.concurrent.LinkedBlockingQueue;
44 import java.util.concurrent.ThreadLocalRandom;
45 import javax.security.auth.login.LoginException;
46 import javax.security.sasl.SaslException;
47 import org.apache.jute.BinaryInputArchive;
48 import org.apache.jute.BinaryOutputArchive;
49 import org.apache.jute.Record;
50 import org.apache.zookeeper.AsyncCallback.ACLCallback;
51 import org.apache.zookeeper.AsyncCallback.AllChildrenNumberCallback;
52 import org.apache.zookeeper.AsyncCallback.Children2Callback;
53 import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
54 import org.apache.zookeeper.AsyncCallback.Create2Callback;
55 import org.apache.zookeeper.AsyncCallback.DataCallback;
56 import org.apache.zookeeper.AsyncCallback.EphemeralsCallback;
57 import org.apache.zookeeper.AsyncCallback.MultiCallback;
58 import org.apache.zookeeper.AsyncCallback.StatCallback;
59 import org.apache.zookeeper.AsyncCallback.StringCallback;
60 import org.apache.zookeeper.AsyncCallback.VoidCallback;
61 import org.apache.zookeeper.KeeperException.Code;
62 import org.apache.zookeeper.OpResult.ErrorResult;
63 import org.apache.zookeeper.Watcher.Event;
64 import org.apache.zookeeper.Watcher.Event.EventType;
65 import org.apache.zookeeper.Watcher.Event.KeeperState;
66 import org.apache.zookeeper.ZooDefs.OpCode;
67 import org.apache.zookeeper.ZooKeeper.States;
68 import org.apache.zookeeper.ZooKeeper.WatchRegistration;
69 import org.apache.zookeeper.client.HostProvider;
70 import org.apache.zookeeper.client.ZKClientConfig;
71 import org.apache.zookeeper.client.ZooKeeperSaslClient;
72 import org.apache.zookeeper.common.Time;
73 import org.apache.zookeeper.proto.AuthPacket;
74 import org.apache.zookeeper.proto.ConnectRequest;
75 import org.apache.zookeeper.proto.Create2Response;
76 import org.apache.zookeeper.proto.CreateResponse;
77 import org.apache.zookeeper.proto.ExistsResponse;
78 import org.apache.zookeeper.proto.GetACLResponse;
79 import org.apache.zookeeper.proto.GetAllChildrenNumberResponse;
80 import org.apache.zookeeper.proto.GetChildren2Response;
81 import org.apache.zookeeper.proto.GetChildrenResponse;
82 import org.apache.zookeeper.proto.GetDataResponse;
83 import org.apache.zookeeper.proto.GetEphemeralsResponse;
84 import org.apache.zookeeper.proto.GetSASLRequest;
85 import org.apache.zookeeper.proto.ReplyHeader;
86 import org.apache.zookeeper.proto.RequestHeader;
87 import org.apache.zookeeper.proto.SetACLResponse;
88 import org.apache.zookeeper.proto.SetDataResponse;
89 import org.apache.zookeeper.proto.SetWatches;
90 import org.apache.zookeeper.proto.SetWatches2;
91 import org.apache.zookeeper.proto.WatcherEvent;
92 import org.apache.zookeeper.server.ByteBufferInputStream;
93 import org.apache.zookeeper.server.ZooKeeperThread;
94 import org.apache.zookeeper.server.ZooTrace;
95 import org.slf4j.Logger;
96 import org.slf4j.LoggerFactory;
97 import org.slf4j.MDC;
98 
99 /**
100  * This class manages the socket i/o for the client. ClientCnxn maintains a list
101  * of available servers to connect to and "transparently" switches servers it is
102  * connected to as needed.
103  *
104  */
105 @SuppressFBWarnings({"EI_EXPOSE_REP", "EI_EXPOSE_REP2"})
106 public class ClientCnxn {
107 
108     private static final Logger LOG = LoggerFactory.getLogger(ClientCnxn.class);
109 
110     /* ZOOKEEPER-706: If a session has a large number of watches set then
111      * attempting to re-establish those watches after a connection loss may
112      * fail due to the SetWatches request exceeding the server's configured
113      * jute.maxBuffer value. To avoid this we instead split the watch
114      * re-establishement across multiple SetWatches calls. This constant
115      * controls the size of each call. It is set to 128kB to be conservative
116      * with respect to the server's 1MB default for jute.maxBuffer.
117      */
118     private static final int SET_WATCHES_MAX_LENGTH = 128 * 1024;
119 
120     /* predefined xid's values recognized as special by the server */
121     // -1 means notification(WATCHER_EVENT)
122     public static final int NOTIFICATION_XID = -1;
123     // -2 is the xid for pings
124     public static final int PING_XID = -2;
125     // -4 is the xid for AuthPacket
126     public static final int AUTHPACKET_XID = -4;
127     // -8 is the xid for setWatch
128     public static final int SET_WATCHES_XID = -8;
129 
130     static class AuthData {
131 
AuthData(String scheme, byte[] data)132         AuthData(String scheme, byte[] data) {
133             this.scheme = scheme;
134             this.data = data;
135         }
136 
137         String scheme;
138 
139         byte[] data;
140 
141     }
142 
143     private final CopyOnWriteArraySet<AuthData> authInfo = new CopyOnWriteArraySet<AuthData>();
144 
145     /**
146      * These are the packets that have been sent and are waiting for a response.
147      */
148     private final Queue<Packet> pendingQueue = new ArrayDeque<>();
149 
150     /**
151      * These are the packets that need to be sent.
152      */
153     private final LinkedBlockingDeque<Packet> outgoingQueue = new LinkedBlockingDeque<Packet>();
154 
155     private int connectTimeout;
156 
157     /**
158      * The timeout in ms the client negotiated with the server. This is the
159      * "real" timeout, not the timeout request by the client (which may have
160      * been increased/decreased by the server which applies bounds to this
161      * value.
162      */
163     private volatile int negotiatedSessionTimeout;
164 
165     private int readTimeout;
166 
167     private final int sessionTimeout;
168 
169     private final ZKWatchManager watchManager;
170 
171     private long sessionId;
172 
173     private byte[] sessionPasswd;
174 
175     /**
176      * If true, the connection is allowed to go to r-o mode. This field's value
177      * is sent, besides other data, during session creation handshake. If the
178      * server on the other side of the wire is partitioned it'll accept
179      * read-only clients only.
180      */
181     private boolean readOnly;
182 
183     final String chrootPath;
184 
185     final SendThread sendThread;
186 
187     final EventThread eventThread;
188 
189     /**
190      * Set to true when close is called. Latches the connection such that we
191      * don't attempt to re-connect to the server if in the middle of closing the
192      * connection (client sends session disconnect to server as part of close
193      * operation)
194      */
195     private volatile boolean closing = false;
196 
197     /**
198      * A set of ZooKeeper hosts this client could connect to.
199      */
200     private final HostProvider hostProvider;
201 
202     /**
203      * Is set to true when a connection to a r/w server is established for the
204      * first time; never changed afterwards.
205      * <p>
206      * Is used to handle situations when client without sessionId connects to a
207      * read-only server. Such client receives "fake" sessionId from read-only
208      * server, but this sessionId is invalid for other servers. So when such
209      * client finds a r/w server, it sends 0 instead of fake sessionId during
210      * connection handshake and establishes new, valid session.
211      * <p>
212      * If this field is false (which implies we haven't seen r/w server before)
213      * then non-zero sessionId is fake, otherwise it is valid.
214      */
215     volatile boolean seenRwServerBefore = false;
216 
217     public ZooKeeperSaslClient zooKeeperSaslClient;
218 
219     private final ZKClientConfig clientConfig;
220     /**
221      * If any request's response in not received in configured requestTimeout
222      * then it is assumed that the response packet is lost.
223      */
224     private long requestTimeout;
225 
getWatcherManager()226     ZKWatchManager getWatcherManager() {
227         return watchManager;
228     }
229 
getSessionId()230     public long getSessionId() {
231         return sessionId;
232     }
233 
getSessionPasswd()234     public byte[] getSessionPasswd() {
235         return sessionPasswd;
236     }
237 
getSessionTimeout()238     public int getSessionTimeout() {
239         return negotiatedSessionTimeout;
240     }
241 
242     @Override
toString()243     public String toString() {
244         StringBuilder sb = new StringBuilder();
245 
246         SocketAddress local = sendThread.getClientCnxnSocket().getLocalSocketAddress();
247         SocketAddress remote = sendThread.getClientCnxnSocket().getRemoteSocketAddress();
248         sb.append("sessionid:0x").append(Long.toHexString(getSessionId()))
249           .append(" local:").append(local)
250           .append(" remoteserver:").append(remote)
251           .append(" lastZxid:").append(lastZxid)
252           .append(" xid:").append(xid)
253           .append(" sent:").append(sendThread.getClientCnxnSocket().getSentCount())
254           .append(" recv:").append(sendThread.getClientCnxnSocket().getRecvCount())
255           .append(" queuedpkts:").append(outgoingQueue.size())
256           .append(" pendingresp:").append(pendingQueue.size())
257           .append(" queuedevents:").append(eventThread.waitingEvents.size());
258 
259         return sb.toString();
260     }
261 
262     /**
263      * This class allows us to pass the headers and the relevant records around.
264      */
265     static class Packet {
266 
267         RequestHeader requestHeader;
268 
269         ReplyHeader replyHeader;
270 
271         Record request;
272 
273         Record response;
274 
275         ByteBuffer bb;
276 
277         /** Client's view of the path (may differ due to chroot) **/
278         String clientPath;
279         /** Servers's view of the path (may differ due to chroot) **/
280         String serverPath;
281 
282         boolean finished;
283 
284         AsyncCallback cb;
285 
286         Object ctx;
287 
288         WatchRegistration watchRegistration;
289 
290         public boolean readOnly;
291 
292         WatchDeregistration watchDeregistration;
293 
294         /** Convenience ctor */
Packet( RequestHeader requestHeader, ReplyHeader replyHeader, Record request, Record response, WatchRegistration watchRegistration)295         Packet(
296             RequestHeader requestHeader,
297             ReplyHeader replyHeader,
298             Record request,
299             Record response,
300             WatchRegistration watchRegistration) {
301             this(requestHeader, replyHeader, request, response, watchRegistration, false);
302         }
303 
Packet( RequestHeader requestHeader, ReplyHeader replyHeader, Record request, Record response, WatchRegistration watchRegistration, boolean readOnly)304         Packet(
305             RequestHeader requestHeader,
306             ReplyHeader replyHeader,
307             Record request,
308             Record response,
309             WatchRegistration watchRegistration,
310             boolean readOnly) {
311 
312             this.requestHeader = requestHeader;
313             this.replyHeader = replyHeader;
314             this.request = request;
315             this.response = response;
316             this.readOnly = readOnly;
317             this.watchRegistration = watchRegistration;
318         }
319 
createBB()320         public void createBB() {
321             try {
322                 ByteArrayOutputStream baos = new ByteArrayOutputStream();
323                 BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
324                 boa.writeInt(-1, "len"); // We'll fill this in later
325                 if (requestHeader != null) {
326                     requestHeader.serialize(boa, "header");
327                 }
328                 if (request instanceof ConnectRequest) {
329                     request.serialize(boa, "connect");
330                     // append "am-I-allowed-to-be-readonly" flag
331                     boa.writeBool(readOnly, "readOnly");
332                 } else if (request != null) {
333                     request.serialize(boa, "request");
334                 }
335                 baos.close();
336                 this.bb = ByteBuffer.wrap(baos.toByteArray());
337                 this.bb.putInt(this.bb.capacity() - 4);
338                 this.bb.rewind();
339             } catch (IOException e) {
340                 LOG.warn("Unexpected exception", e);
341             }
342         }
343 
344         @Override
toString()345         public String toString() {
346             StringBuilder sb = new StringBuilder();
347 
348             sb.append("clientPath:" + clientPath);
349             sb.append(" serverPath:" + serverPath);
350             sb.append(" finished:" + finished);
351 
352             sb.append(" header:: " + requestHeader);
353             sb.append(" replyHeader:: " + replyHeader);
354             sb.append(" request:: " + request);
355             sb.append(" response:: " + response);
356 
357             // jute toString is horrible, remove unnecessary newlines
358             return sb.toString().replaceAll("\r*\n+", " ");
359         }
360 
361     }
362 
363     /**
364      * Creates a connection object. The actual network connect doesn't get
365      * established until needed. The start() instance method must be called
366      * subsequent to construction.
367      *
368      * @param chrootPath the chroot of this client. Should be removed from this Class in ZOOKEEPER-838
369      * @param hostProvider the list of ZooKeeper servers to connect to
370      * @param sessionTimeout the timeout for connections.
371      * @param clientConfig the client configuration.
372      * @param defaultWatcher default watcher for this connection
373      * @param clientCnxnSocket the socket implementation used (e.g. NIO/Netty)
374      * @param canBeReadOnly whether the connection is allowed to go to read-only mode in case of partitioning
375      */
ClientCnxn( String chrootPath, HostProvider hostProvider, int sessionTimeout, ZKClientConfig clientConfig, Watcher defaultWatcher, ClientCnxnSocket clientCnxnSocket, boolean canBeReadOnly )376     public ClientCnxn(
377         String chrootPath,
378         HostProvider hostProvider,
379         int sessionTimeout,
380         ZKClientConfig clientConfig,
381         Watcher defaultWatcher,
382         ClientCnxnSocket clientCnxnSocket,
383         boolean canBeReadOnly
384     ) throws IOException {
385         this(
386             chrootPath,
387             hostProvider,
388             sessionTimeout,
389             clientConfig,
390             defaultWatcher,
391             clientCnxnSocket,
392             0,
393             new byte[16],
394             canBeReadOnly);
395     }
396 
397     /**
398      * Creates a connection object. The actual network connect doesn't get
399      * established until needed. The start() instance method must be called
400      * subsequent to construction.
401      *
402      * @param chrootPath the chroot of this client. Should be removed from this Class in ZOOKEEPER-838
403      * @param hostProvider the list of ZooKeeper servers to connect to
404      * @param sessionTimeout the timeout for connections.
405      * @param clientConfig the client configuration.
406      * @param defaultWatcher default watcher for this connection
407      * @param clientCnxnSocket the socket implementation used (e.g. NIO/Netty)
408      * @param sessionId session id if re-establishing session
409      * @param sessionPasswd session passwd if re-establishing session
410      * @param canBeReadOnly whether the connection is allowed to go to read-only mode in case of partitioning
411      * @throws IOException in cases of broken network
412      */
ClientCnxn( String chrootPath, HostProvider hostProvider, int sessionTimeout, ZKClientConfig clientConfig, Watcher defaultWatcher, ClientCnxnSocket clientCnxnSocket, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly )413     public ClientCnxn(
414         String chrootPath,
415         HostProvider hostProvider,
416         int sessionTimeout,
417         ZKClientConfig clientConfig,
418         Watcher defaultWatcher,
419         ClientCnxnSocket clientCnxnSocket,
420         long sessionId,
421         byte[] sessionPasswd,
422         boolean canBeReadOnly
423     ) throws IOException {
424         this.chrootPath = chrootPath;
425         this.hostProvider = hostProvider;
426         this.sessionTimeout = sessionTimeout;
427         this.clientConfig = clientConfig;
428         this.sessionId = sessionId;
429         this.sessionPasswd = sessionPasswd;
430         this.readOnly = canBeReadOnly;
431 
432         this.watchManager = new ZKWatchManager(
433                 clientConfig.getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET),
434                 defaultWatcher);
435 
436         this.connectTimeout = sessionTimeout / hostProvider.size();
437         this.readTimeout = sessionTimeout * 2 / 3;
438 
439         this.sendThread = new SendThread(clientCnxnSocket);
440         this.eventThread = new EventThread();
441         initRequestTimeout();
442     }
443 
start()444     public void start() {
445         sendThread.start();
446         eventThread.start();
447     }
448 
449     private Object eventOfDeath = new Object();
450 
451     private static class WatcherSetEventPair {
452 
453         private final Set<Watcher> watchers;
454         private final WatchedEvent event;
455 
WatcherSetEventPair(Set<Watcher> watchers, WatchedEvent event)456         public WatcherSetEventPair(Set<Watcher> watchers, WatchedEvent event) {
457             this.watchers = watchers;
458             this.event = event;
459         }
460 
461     }
462 
463     /**
464      * Guard against creating "-EventThread-EventThread-EventThread-..." thread
465      * names when ZooKeeper object is being created from within a watcher.
466      * See ZOOKEEPER-795 for details.
467      */
makeThreadName(String suffix)468     private static String makeThreadName(String suffix) {
469         String name = Thread.currentThread().getName().replaceAll("-EventThread", "");
470         return name + suffix;
471     }
472 
473     /**
474      * Tests that current thread is the main event loop.
475      * This method is useful only for tests inside ZooKeeper project
476      * it is not a public API intended for use by external applications.
477      * @return true if Thread.currentThread() is an EventThread.
478      */
isInEventThread()479     public static boolean isInEventThread() {
480         return Thread.currentThread() instanceof EventThread;
481     }
482 
483     class EventThread extends ZooKeeperThread {
484 
485         private final LinkedBlockingQueue<Object> waitingEvents = new LinkedBlockingQueue<Object>();
486 
487         /** This is really the queued session state until the event
488          * thread actually processes the event and hands it to the watcher.
489          * But for all intents and purposes this is the state.
490          */
491         private volatile KeeperState sessionState = KeeperState.Disconnected;
492 
493         private volatile boolean wasKilled = false;
494         private volatile boolean isRunning = false;
495 
EventThread()496         EventThread() {
497             super(makeThreadName("-EventThread"));
498             setDaemon(true);
499         }
500 
queueEvent(WatchedEvent event)501         public void queueEvent(WatchedEvent event) {
502             queueEvent(event, null);
503         }
504 
queueEvent(WatchedEvent event, Set<Watcher> materializedWatchers)505         private void queueEvent(WatchedEvent event, Set<Watcher> materializedWatchers) {
506             if (event.getType() == EventType.None && sessionState == event.getState()) {
507                 return;
508             }
509             sessionState = event.getState();
510             final Set<Watcher> watchers;
511             if (materializedWatchers == null) {
512                 // materialize the watchers based on the event
513                 watchers = watchManager.materialize(event.getState(), event.getType(), event.getPath());
514             } else {
515                 watchers = new HashSet<>(materializedWatchers);
516             }
517             WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event);
518             // queue the pair (watch set & event) for later processing
519             waitingEvents.add(pair);
520         }
521 
queueCallback(AsyncCallback cb, int rc, String path, Object ctx)522         public void queueCallback(AsyncCallback cb, int rc, String path, Object ctx) {
523             waitingEvents.add(new LocalCallback(cb, rc, path, ctx));
524         }
525 
526         @SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
queuePacket(Packet packet)527         public void queuePacket(Packet packet) {
528             if (wasKilled) {
529                 synchronized (waitingEvents) {
530                     if (isRunning) {
531                         waitingEvents.add(packet);
532                     } else {
533                         processEvent(packet);
534                     }
535                 }
536             } else {
537                 waitingEvents.add(packet);
538             }
539         }
540 
queueEventOfDeath()541         public void queueEventOfDeath() {
542             waitingEvents.add(eventOfDeath);
543         }
544 
545         @Override
546         @SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
run()547         public void run() {
548             try {
549                 isRunning = true;
550                 while (true) {
551                     Object event = waitingEvents.take();
552                     if (event == eventOfDeath) {
553                         wasKilled = true;
554                     } else {
555                         processEvent(event);
556                     }
557                     if (wasKilled) {
558                         synchronized (waitingEvents) {
559                             if (waitingEvents.isEmpty()) {
560                                 isRunning = false;
561                                 break;
562                             }
563                         }
564                     }
565                 }
566             } catch (InterruptedException e) {
567                 LOG.error("Event thread exiting due to interruption", e);
568             }
569 
570             LOG.info("EventThread shut down for session: 0x{}", Long.toHexString(getSessionId()));
571         }
572 
processEvent(Object event)573         private void processEvent(Object event) {
574             try {
575                 if (event instanceof WatcherSetEventPair) {
576                     // each watcher will process the event
577                     WatcherSetEventPair pair = (WatcherSetEventPair) event;
578                     for (Watcher watcher : pair.watchers) {
579                         try {
580                             watcher.process(pair.event);
581                         } catch (Throwable t) {
582                             LOG.error("Error while calling watcher.", t);
583                         }
584                     }
585                 } else if (event instanceof LocalCallback) {
586                     LocalCallback lcb = (LocalCallback) event;
587                     if (lcb.cb instanceof StatCallback) {
588                         ((StatCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, null);
589                     } else if (lcb.cb instanceof DataCallback) {
590                         ((DataCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, null, null);
591                     } else if (lcb.cb instanceof ACLCallback) {
592                         ((ACLCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, null, null);
593                     } else if (lcb.cb instanceof ChildrenCallback) {
594                         ((ChildrenCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, null);
595                     } else if (lcb.cb instanceof Children2Callback) {
596                         ((Children2Callback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, null, null);
597                     } else if (lcb.cb instanceof StringCallback) {
598                         ((StringCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, null);
599                     } else if (lcb.cb instanceof AsyncCallback.EphemeralsCallback) {
600                         ((AsyncCallback.EphemeralsCallback) lcb.cb).processResult(lcb.rc, lcb.ctx, null);
601                     } else if (lcb.cb instanceof AsyncCallback.AllChildrenNumberCallback) {
602                         ((AsyncCallback.AllChildrenNumberCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, -1);
603                     } else if (lcb.cb instanceof AsyncCallback.MultiCallback) {
604                         ((AsyncCallback.MultiCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, Collections.emptyList());
605                     } else {
606                         ((VoidCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx);
607                     }
608                 } else {
609                     Packet p = (Packet) event;
610                     int rc = 0;
611                     String clientPath = p.clientPath;
612                     if (p.replyHeader.getErr() != 0) {
613                         rc = p.replyHeader.getErr();
614                     }
615                     if (p.cb == null) {
616                         LOG.warn("Somehow a null cb got to EventThread!");
617                     } else if (p.response instanceof ExistsResponse
618                                || p.response instanceof SetDataResponse
619                                || p.response instanceof SetACLResponse) {
620                         StatCallback cb = (StatCallback) p.cb;
621                         if (rc == Code.OK.intValue()) {
622                             if (p.response instanceof ExistsResponse) {
623                                 cb.processResult(rc, clientPath, p.ctx, ((ExistsResponse) p.response).getStat());
624                             } else if (p.response instanceof SetDataResponse) {
625                                 cb.processResult(rc, clientPath, p.ctx, ((SetDataResponse) p.response).getStat());
626                             } else if (p.response instanceof SetACLResponse) {
627                                 cb.processResult(rc, clientPath, p.ctx, ((SetACLResponse) p.response).getStat());
628                             }
629                         } else {
630                             cb.processResult(rc, clientPath, p.ctx, null);
631                         }
632                     } else if (p.response instanceof GetDataResponse) {
633                         DataCallback cb = (DataCallback) p.cb;
634                         GetDataResponse rsp = (GetDataResponse) p.response;
635                         if (rc == Code.OK.intValue()) {
636                             cb.processResult(rc, clientPath, p.ctx, rsp.getData(), rsp.getStat());
637                         } else {
638                             cb.processResult(rc, clientPath, p.ctx, null, null);
639                         }
640                     } else if (p.response instanceof GetACLResponse) {
641                         ACLCallback cb = (ACLCallback) p.cb;
642                         GetACLResponse rsp = (GetACLResponse) p.response;
643                         if (rc == Code.OK.intValue()) {
644                             cb.processResult(rc, clientPath, p.ctx, rsp.getAcl(), rsp.getStat());
645                         } else {
646                             cb.processResult(rc, clientPath, p.ctx, null, null);
647                         }
648                     } else if (p.response instanceof GetChildrenResponse) {
649                         ChildrenCallback cb = (ChildrenCallback) p.cb;
650                         GetChildrenResponse rsp = (GetChildrenResponse) p.response;
651                         if (rc == Code.OK.intValue()) {
652                             cb.processResult(rc, clientPath, p.ctx, rsp.getChildren());
653                         } else {
654                             cb.processResult(rc, clientPath, p.ctx, null);
655                         }
656                     } else if (p.response instanceof GetAllChildrenNumberResponse) {
657                         AllChildrenNumberCallback cb = (AllChildrenNumberCallback) p.cb;
658                         GetAllChildrenNumberResponse rsp = (GetAllChildrenNumberResponse) p.response;
659                         if (rc == Code.OK.intValue()) {
660                             cb.processResult(rc, clientPath, p.ctx, rsp.getTotalNumber());
661                         } else {
662                             cb.processResult(rc, clientPath, p.ctx, -1);
663                         }
664                     } else if (p.response instanceof GetChildren2Response) {
665                         Children2Callback cb = (Children2Callback) p.cb;
666                         GetChildren2Response rsp = (GetChildren2Response) p.response;
667                         if (rc == Code.OK.intValue()) {
668                             cb.processResult(rc, clientPath, p.ctx, rsp.getChildren(), rsp.getStat());
669                         } else {
670                             cb.processResult(rc, clientPath, p.ctx, null, null);
671                         }
672                     } else if (p.response instanceof CreateResponse) {
673                         StringCallback cb = (StringCallback) p.cb;
674                         CreateResponse rsp = (CreateResponse) p.response;
675                         if (rc == Code.OK.intValue()) {
676                             cb.processResult(
677                                 rc,
678                                 clientPath,
679                                 p.ctx,
680                                 (chrootPath == null
681                                     ? rsp.getPath()
682                                     : rsp.getPath().substring(chrootPath.length())));
683                         } else {
684                             cb.processResult(rc, clientPath, p.ctx, null);
685                         }
686                     } else if (p.response instanceof Create2Response) {
687                         Create2Callback cb = (Create2Callback) p.cb;
688                         Create2Response rsp = (Create2Response) p.response;
689                         if (rc == Code.OK.intValue()) {
690                             cb.processResult(
691                                     rc,
692                                     clientPath,
693                                     p.ctx,
694                                     (chrootPath == null
695                                             ? rsp.getPath()
696                                             : rsp.getPath().substring(chrootPath.length())),
697                                     rsp.getStat());
698                         } else {
699                             cb.processResult(rc, clientPath, p.ctx, null, null);
700                         }
701                     } else if (p.response instanceof MultiResponse) {
702                         MultiCallback cb = (MultiCallback) p.cb;
703                         MultiResponse rsp = (MultiResponse) p.response;
704                         if (rc == Code.OK.intValue()) {
705                             List<OpResult> results = rsp.getResultList();
706                             int newRc = rc;
707                             for (OpResult result : results) {
708                                 if (result instanceof ErrorResult
709                                     && KeeperException.Code.OK.intValue()
710                                        != (newRc = ((ErrorResult) result).getErr())) {
711                                     break;
712                                 }
713                             }
714                             cb.processResult(newRc, clientPath, p.ctx, results);
715                         } else {
716                             cb.processResult(rc, clientPath, p.ctx, null);
717                         }
718                     } else if (p.response instanceof GetEphemeralsResponse) {
719                         EphemeralsCallback cb = (EphemeralsCallback) p.cb;
720                         GetEphemeralsResponse rsp = (GetEphemeralsResponse) p.response;
721                         if (rc == Code.OK.intValue()) {
722                             cb.processResult(rc, p.ctx, rsp.getEphemerals());
723                         } else {
724                             cb.processResult(rc, p.ctx, null);
725                         }
726                     } else if (p.cb instanceof VoidCallback) {
727                         VoidCallback cb = (VoidCallback) p.cb;
728                         cb.processResult(rc, clientPath, p.ctx);
729                     }
730                 }
731             } catch (Throwable t) {
732                 LOG.error("Unexpected throwable", t);
733             }
734         }
735 
736     }
737 
738     // @VisibleForTesting
finishPacket(Packet p)739     protected void finishPacket(Packet p) {
740         int err = p.replyHeader.getErr();
741         if (p.watchRegistration != null) {
742             p.watchRegistration.register(err);
743         }
744         // Add all the removed watch events to the event queue, so that the
745         // clients will be notified with 'Data/Child WatchRemoved' event type.
746         if (p.watchDeregistration != null) {
747             Map<EventType, Set<Watcher>> materializedWatchers = null;
748             try {
749                 materializedWatchers = p.watchDeregistration.unregister(err);
750                 for (Entry<EventType, Set<Watcher>> entry : materializedWatchers.entrySet()) {
751                     Set<Watcher> watchers = entry.getValue();
752                     if (watchers.size() > 0) {
753                         queueEvent(p.watchDeregistration.getClientPath(), err, watchers, entry.getKey());
754                         // ignore connectionloss when removing from local
755                         // session
756                         p.replyHeader.setErr(Code.OK.intValue());
757                     }
758                 }
759             } catch (KeeperException.NoWatcherException nwe) {
760                 p.replyHeader.setErr(nwe.code().intValue());
761             } catch (KeeperException ke) {
762                 p.replyHeader.setErr(ke.code().intValue());
763             }
764         }
765 
766         if (p.cb == null) {
767             synchronized (p) {
768                 p.finished = true;
769                 p.notifyAll();
770             }
771         } else {
772             p.finished = true;
773             eventThread.queuePacket(p);
774         }
775     }
776 
queueEvent(String clientPath, int err, Set<Watcher> materializedWatchers, EventType eventType)777     void queueEvent(String clientPath, int err, Set<Watcher> materializedWatchers, EventType eventType) {
778         KeeperState sessionState = KeeperState.SyncConnected;
779         if (KeeperException.Code.SESSIONEXPIRED.intValue() == err
780             || KeeperException.Code.CONNECTIONLOSS.intValue() == err) {
781             sessionState = Event.KeeperState.Disconnected;
782         }
783         WatchedEvent event = new WatchedEvent(eventType, sessionState, clientPath);
784         eventThread.queueEvent(event, materializedWatchers);
785     }
786 
queueCallback(AsyncCallback cb, int rc, String path, Object ctx)787     void queueCallback(AsyncCallback cb, int rc, String path, Object ctx) {
788         eventThread.queueCallback(cb, rc, path, ctx);
789     }
790 
791     // for test only
onConnecting(InetSocketAddress addr)792     protected void onConnecting(InetSocketAddress addr) {
793 
794     }
795 
conLossPacket(Packet p)796     private void conLossPacket(Packet p) {
797         if (p.replyHeader == null) {
798             return;
799         }
800         switch (state) {
801         case AUTH_FAILED:
802             p.replyHeader.setErr(KeeperException.Code.AUTHFAILED.intValue());
803             break;
804         case CLOSED:
805             p.replyHeader.setErr(KeeperException.Code.SESSIONEXPIRED.intValue());
806             break;
807         default:
808             p.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());
809         }
810         finishPacket(p);
811     }
812 
813     private volatile long lastZxid;
814 
getLastZxid()815     public long getLastZxid() {
816         return lastZxid;
817     }
818 
819     static class EndOfStreamException extends IOException {
820 
821         private static final long serialVersionUID = -5438877188796231422L;
822 
EndOfStreamException(String msg)823         public EndOfStreamException(String msg) {
824             super(msg);
825         }
826 
827         @Override
toString()828         public String toString() {
829             return "EndOfStreamException: " + getMessage();
830         }
831 
832     }
833 
834     private static class SessionTimeoutException extends IOException {
835 
836         private static final long serialVersionUID = 824482094072071178L;
837 
SessionTimeoutException(String msg)838         public SessionTimeoutException(String msg) {
839             super(msg);
840         }
841 
842     }
843 
844     private static class SessionExpiredException extends IOException {
845 
846         private static final long serialVersionUID = -1388816932076193249L;
847 
SessionExpiredException(String msg)848         public SessionExpiredException(String msg) {
849             super(msg);
850         }
851 
852     }
853 
854     private static class RWServerFoundException extends IOException {
855 
856         private static final long serialVersionUID = 90431199887158758L;
857 
RWServerFoundException(String msg)858         public RWServerFoundException(String msg) {
859             super(msg);
860         }
861 
862     }
863 
864     /**
865      * This class services the outgoing request queue and generates the heart
866      * beats. It also spawns the ReadThread.
867      */
868     class SendThread extends ZooKeeperThread {
869 
870         private long lastPingSentNs;
871         private final ClientCnxnSocket clientCnxnSocket;
872         private boolean isFirstConnect = true;
873 
readResponse(ByteBuffer incomingBuffer)874         void readResponse(ByteBuffer incomingBuffer) throws IOException {
875             ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
876             BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
877             ReplyHeader replyHdr = new ReplyHeader();
878 
879             replyHdr.deserialize(bbia, "header");
880             switch (replyHdr.getXid()) {
881             case PING_XID:
882                 LOG.debug("Got ping response for session id: 0x{} after {}ms.",
883                     Long.toHexString(sessionId),
884                     ((System.nanoTime() - lastPingSentNs) / 1000000));
885                 return;
886               case AUTHPACKET_XID:
887                 LOG.debug("Got auth session id: 0x{}", Long.toHexString(sessionId));
888                 if (replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
889                     changeZkState(States.AUTH_FAILED);
890                     eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,
891                         Watcher.Event.KeeperState.AuthFailed, null));
892                     eventThread.queueEventOfDeath();
893                 }
894               return;
895             case NOTIFICATION_XID:
896                 LOG.debug("Got notification session id: 0x{}",
897                     Long.toHexString(sessionId));
898                 WatcherEvent event = new WatcherEvent();
899                 event.deserialize(bbia, "response");
900 
901                 // convert from a server path to a client path
902                 if (chrootPath != null) {
903                     String serverPath = event.getPath();
904                     if (serverPath.compareTo(chrootPath) == 0) {
905                         event.setPath("/");
906                     } else if (serverPath.length() > chrootPath.length()) {
907                         event.setPath(serverPath.substring(chrootPath.length()));
908                      } else {
909                          LOG.warn("Got server path {} which is too short for chroot path {}.",
910                              event.getPath(), chrootPath);
911                      }
912                 }
913 
914                 WatchedEvent we = new WatchedEvent(event);
915                 LOG.debug("Got {} for session id 0x{}", we, Long.toHexString(sessionId));
916                 eventThread.queueEvent(we);
917                 return;
918             default:
919                 break;
920             }
921 
922             // If SASL authentication is currently in progress, construct and
923             // send a response packet immediately, rather than queuing a
924             // response as with other packets.
925             if (tunnelAuthInProgress()) {
926                 GetSASLRequest request = new GetSASLRequest();
927                 request.deserialize(bbia, "token");
928                 zooKeeperSaslClient.respondToServer(request.getToken(), ClientCnxn.this);
929                 return;
930             }
931 
932             Packet packet;
933             synchronized (pendingQueue) {
934                 if (pendingQueue.size() == 0) {
935                     throw new IOException("Nothing in the queue, but got " + replyHdr.getXid());
936                 }
937                 packet = pendingQueue.remove();
938             }
939             /*
940              * Since requests are processed in order, we better get a response
941              * to the first request!
942              */
943             try {
944                 if (packet.requestHeader.getXid() != replyHdr.getXid()) {
945                     packet.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());
946                     throw new IOException("Xid out of order. Got Xid " + replyHdr.getXid()
947                                           + " with err " + replyHdr.getErr()
948                                           + " expected Xid " + packet.requestHeader.getXid()
949                                           + " for a packet with details: " + packet);
950                 }
951 
952                 packet.replyHeader.setXid(replyHdr.getXid());
953                 packet.replyHeader.setErr(replyHdr.getErr());
954                 packet.replyHeader.setZxid(replyHdr.getZxid());
955                 if (replyHdr.getZxid() > 0) {
956                     lastZxid = replyHdr.getZxid();
957                 }
958                 if (packet.response != null && replyHdr.getErr() == 0) {
959                     packet.response.deserialize(bbia, "response");
960                 }
961 
962                 LOG.debug("Reading reply session id: 0x{}, packet:: {}", Long.toHexString(sessionId), packet);
963             } finally {
964                 finishPacket(packet);
965             }
966         }
967 
SendThread(ClientCnxnSocket clientCnxnSocket)968         SendThread(ClientCnxnSocket clientCnxnSocket) throws IOException {
969             super(makeThreadName("-SendThread()"));
970             changeZkState(States.CONNECTING);
971             this.clientCnxnSocket = clientCnxnSocket;
972             setDaemon(true);
973         }
974 
975         // TODO: can not name this method getState since Thread.getState()
976         // already exists
977         // It would be cleaner to make class SendThread an implementation of
978         // Runnable
979         /**
980          * Used by ClientCnxnSocket
981          *
982          * @return
983          */
getZkState()984         synchronized ZooKeeper.States getZkState() {
985             return state;
986         }
987 
changeZkState(ZooKeeper.States newState)988         synchronized void changeZkState(ZooKeeper.States newState) throws IOException {
989             if (!state.isAlive() && newState == States.CONNECTING) {
990                 throw new IOException(
991                         "Connection has already been closed and reconnection is not allowed");
992             }
993             // It's safer to place state modification at the end.
994             state = newState;
995         }
996 
getClientCnxnSocket()997         ClientCnxnSocket getClientCnxnSocket() {
998             return clientCnxnSocket;
999         }
1000 
1001         /**
1002          * Setup session, previous watches, authentication.
1003          */
primeConnection()1004         void primeConnection() throws IOException {
1005             LOG.info(
1006                 "Socket connection established, initiating session, client: {}, server: {}",
1007                 clientCnxnSocket.getLocalSocketAddress(),
1008                 clientCnxnSocket.getRemoteSocketAddress());
1009             isFirstConnect = false;
1010             long sessId = (seenRwServerBefore) ? sessionId : 0;
1011             ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd);
1012             // We add backwards since we are pushing into the front
1013             // Only send if there's a pending watch
1014             if (!clientConfig.getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET)) {
1015                 List<String> dataWatches = watchManager.getDataWatchList();
1016                 List<String> existWatches = watchManager.getExistWatchList();
1017                 List<String> childWatches = watchManager.getChildWatchList();
1018                 List<String> persistentWatches = watchManager.getPersistentWatchList();
1019                 List<String> persistentRecursiveWatches = watchManager.getPersistentRecursiveWatchList();
1020                 if (!dataWatches.isEmpty() || !existWatches.isEmpty() || !childWatches.isEmpty()
1021                         || !persistentWatches.isEmpty() || !persistentRecursiveWatches.isEmpty()) {
1022                     Iterator<String> dataWatchesIter = prependChroot(dataWatches).iterator();
1023                     Iterator<String> existWatchesIter = prependChroot(existWatches).iterator();
1024                     Iterator<String> childWatchesIter = prependChroot(childWatches).iterator();
1025                     Iterator<String> persistentWatchesIter = prependChroot(persistentWatches).iterator();
1026                     Iterator<String> persistentRecursiveWatchesIter = prependChroot(persistentRecursiveWatches).iterator();
1027                     long setWatchesLastZxid = lastZxid;
1028 
1029                     while (dataWatchesIter.hasNext() || existWatchesIter.hasNext() || childWatchesIter.hasNext()
1030                             || persistentWatchesIter.hasNext() || persistentRecursiveWatchesIter.hasNext()) {
1031                         List<String> dataWatchesBatch = new ArrayList<String>();
1032                         List<String> existWatchesBatch = new ArrayList<String>();
1033                         List<String> childWatchesBatch = new ArrayList<String>();
1034                         List<String> persistentWatchesBatch = new ArrayList<String>();
1035                         List<String> persistentRecursiveWatchesBatch = new ArrayList<String>();
1036                         int batchLength = 0;
1037 
1038                         // Note, we may exceed our max length by a bit when we add the last
1039                         // watch in the batch. This isn't ideal, but it makes the code simpler.
1040                         while (batchLength < SET_WATCHES_MAX_LENGTH) {
1041                             final String watch;
1042                             if (dataWatchesIter.hasNext()) {
1043                                 watch = dataWatchesIter.next();
1044                                 dataWatchesBatch.add(watch);
1045                             } else if (existWatchesIter.hasNext()) {
1046                                 watch = existWatchesIter.next();
1047                                 existWatchesBatch.add(watch);
1048                             } else if (childWatchesIter.hasNext()) {
1049                                 watch = childWatchesIter.next();
1050                                 childWatchesBatch.add(watch);
1051                             }  else if (persistentWatchesIter.hasNext()) {
1052                                 watch = persistentWatchesIter.next();
1053                                 persistentWatchesBatch.add(watch);
1054                             } else if (persistentRecursiveWatchesIter.hasNext()) {
1055                                 watch = persistentRecursiveWatchesIter.next();
1056                                 persistentRecursiveWatchesBatch.add(watch);
1057                             } else {
1058                                 break;
1059                             }
1060                             batchLength += watch.length();
1061                         }
1062 
1063                         Record record;
1064                         int opcode;
1065                         if (persistentWatchesBatch.isEmpty() && persistentRecursiveWatchesBatch.isEmpty()) {
1066                             // maintain compatibility with older servers - if no persistent/recursive watchers
1067                             // are used, use the old version of SetWatches
1068                             record = new SetWatches(setWatchesLastZxid, dataWatchesBatch, existWatchesBatch, childWatchesBatch);
1069                             opcode = OpCode.setWatches;
1070                         } else {
1071                             record = new SetWatches2(setWatchesLastZxid, dataWatchesBatch, existWatchesBatch,
1072                                     childWatchesBatch, persistentWatchesBatch, persistentRecursiveWatchesBatch);
1073                             opcode = OpCode.setWatches2;
1074                         }
1075                         RequestHeader header = new RequestHeader(ClientCnxn.SET_WATCHES_XID, opcode);
1076                         Packet packet = new Packet(header, new ReplyHeader(), record, null, null);
1077                         outgoingQueue.addFirst(packet);
1078                     }
1079                 }
1080             }
1081 
1082             for (AuthData id : authInfo) {
1083                 outgoingQueue.addFirst(
1084                     new Packet(
1085                         new RequestHeader(ClientCnxn.AUTHPACKET_XID, OpCode.auth),
1086                         null,
1087                         new AuthPacket(0, id.scheme, id.data),
1088                         null,
1089                         null));
1090             }
1091             outgoingQueue.addFirst(new Packet(null, null, conReq, null, null, readOnly));
1092             clientCnxnSocket.connectionPrimed();
1093             LOG.debug("Session establishment request sent on {}", clientCnxnSocket.getRemoteSocketAddress());
1094         }
1095 
prependChroot(List<String> paths)1096         private List<String> prependChroot(List<String> paths) {
1097             if (chrootPath != null && !paths.isEmpty()) {
1098                 for (int i = 0; i < paths.size(); ++i) {
1099                     String clientPath = paths.get(i);
1100                     String serverPath;
1101                     // handle clientPath = "/"
1102                     if (clientPath.length() == 1) {
1103                         serverPath = chrootPath;
1104                     } else {
1105                         serverPath = chrootPath + clientPath;
1106                     }
1107                     paths.set(i, serverPath);
1108                 }
1109             }
1110             return paths;
1111         }
1112 
sendPing()1113         private void sendPing() {
1114             lastPingSentNs = System.nanoTime();
1115             RequestHeader h = new RequestHeader(ClientCnxn.PING_XID, OpCode.ping);
1116             queuePacket(h, null, null, null, null, null, null, null, null);
1117         }
1118 
1119         private InetSocketAddress rwServerAddress = null;
1120 
1121         private static final int minPingRwTimeout = 100;
1122 
1123         private static final int maxPingRwTimeout = 60000;
1124 
1125         private int pingRwTimeout = minPingRwTimeout;
1126 
1127         // Set to true if and only if constructor of ZooKeeperSaslClient
1128         // throws a LoginException: see startConnect() below.
1129         private boolean saslLoginFailed = false;
1130 
startConnect(InetSocketAddress addr)1131         private void startConnect(InetSocketAddress addr) throws IOException {
1132             // initializing it for new connection
1133             saslLoginFailed = false;
1134             if (!isFirstConnect) {
1135                 try {
1136                     Thread.sleep(ThreadLocalRandom.current().nextLong(1000));
1137                 } catch (InterruptedException e) {
1138                     LOG.warn("Unexpected exception", e);
1139                 }
1140             }
1141             changeZkState(States.CONNECTING);
1142 
1143             String hostPort = addr.getHostString() + ":" + addr.getPort();
1144             MDC.put("myid", hostPort);
1145             setName(getName().replaceAll("\\(.*\\)", "(" + hostPort + ")"));
1146             if (clientConfig.isSaslClientEnabled()) {
1147                 try {
1148                     if (zooKeeperSaslClient != null) {
1149                         zooKeeperSaslClient.shutdown();
1150                     }
1151                     zooKeeperSaslClient = new ZooKeeperSaslClient(SaslServerPrincipal.getServerPrincipal(addr, clientConfig), clientConfig);
1152                 } catch (LoginException e) {
1153                     // An authentication error occurred when the SASL client tried to initialize:
1154                     // for Kerberos this means that the client failed to authenticate with the KDC.
1155                     // This is different from an authentication error that occurs during communication
1156                     // with the Zookeeper server, which is handled below.
1157                     LOG.warn(
1158                         "SASL configuration failed. "
1159                             + "Will continue connection to Zookeeper server without "
1160                             + "SASL authentication, if Zookeeper server allows it.", e);
1161                     eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null));
1162                     saslLoginFailed = true;
1163                 }
1164             }
1165             logStartConnect(addr);
1166 
1167             clientCnxnSocket.connect(addr);
1168         }
1169 
logStartConnect(InetSocketAddress addr)1170         private void logStartConnect(InetSocketAddress addr) {
1171             LOG.info("Opening socket connection to server {}.", addr);
1172             if (zooKeeperSaslClient != null) {
1173                 LOG.info("SASL config status: {}", zooKeeperSaslClient.getConfigStatus());
1174             }
1175         }
1176 
1177         @Override
run()1178         public void run() {
1179             clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
1180             clientCnxnSocket.updateNow();
1181             clientCnxnSocket.updateLastSendAndHeard();
1182             int to;
1183             long lastPingRwServer = Time.currentElapsedTime();
1184             final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
1185             InetSocketAddress serverAddress = null;
1186             while (state.isAlive()) {
1187                 try {
1188                     if (!clientCnxnSocket.isConnected()) {
1189                         // don't re-establish connection if we are closing
1190                         if (closing) {
1191                             break;
1192                         }
1193                         if (rwServerAddress != null) {
1194                             serverAddress = rwServerAddress;
1195                             rwServerAddress = null;
1196                         } else {
1197                             serverAddress = hostProvider.next(1000);
1198                         }
1199                         onConnecting(serverAddress);
1200                         startConnect(serverAddress);
1201                         clientCnxnSocket.updateLastSendAndHeard();
1202                     }
1203 
1204                     if (state.isConnected()) {
1205                         // determine whether we need to send an AuthFailed event.
1206                         if (zooKeeperSaslClient != null) {
1207                             boolean sendAuthEvent = false;
1208                             if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
1209                                 try {
1210                                     zooKeeperSaslClient.initialize(ClientCnxn.this);
1211                                 } catch (SaslException e) {
1212                                     LOG.error("SASL authentication with Zookeeper Quorum member failed.", e);
1213                                     changeZkState(States.AUTH_FAILED);
1214                                     sendAuthEvent = true;
1215                                 }
1216                             }
1217                             KeeperState authState = zooKeeperSaslClient.getKeeperState();
1218                             if (authState != null) {
1219                                 if (authState == KeeperState.AuthFailed) {
1220                                     // An authentication error occurred during authentication with the Zookeeper Server.
1221                                     changeZkState(States.AUTH_FAILED);
1222                                     sendAuthEvent = true;
1223                                 } else {
1224                                     if (authState == KeeperState.SaslAuthenticated) {
1225                                         sendAuthEvent = true;
1226                                     }
1227                                 }
1228                             }
1229 
1230                             if (sendAuthEvent) {
1231                                 eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, authState, null));
1232                                 if (state == States.AUTH_FAILED) {
1233                                     eventThread.queueEventOfDeath();
1234                                 }
1235                             }
1236                         }
1237                         to = readTimeout - clientCnxnSocket.getIdleRecv();
1238                     } else {
1239                         to = connectTimeout - clientCnxnSocket.getIdleRecv();
1240                     }
1241 
1242                     if (to <= 0) {
1243                         String warnInfo = String.format(
1244                             "Client session timed out, have not heard from server in %dms for session id 0x%s",
1245                             clientCnxnSocket.getIdleRecv(),
1246                             Long.toHexString(sessionId));
1247                         LOG.warn(warnInfo);
1248                         throw new SessionTimeoutException(warnInfo);
1249                     }
1250                     if (state.isConnected()) {
1251                         //1000(1 second) is to prevent race condition missing to send the second ping
1252                         //also make sure not to send too many pings when readTimeout is small
1253                         int timeToNextPing = readTimeout / 2
1254                                              - clientCnxnSocket.getIdleSend()
1255                                              - ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
1256                         //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
1257                         if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
1258                             sendPing();
1259                             clientCnxnSocket.updateLastSend();
1260                         } else {
1261                             if (timeToNextPing < to) {
1262                                 to = timeToNextPing;
1263                             }
1264                         }
1265                     }
1266 
1267                     // If we are in read-only mode, seek for read/write server
1268                     if (state == States.CONNECTEDREADONLY) {
1269                         long now = Time.currentElapsedTime();
1270                         int idlePingRwServer = (int) (now - lastPingRwServer);
1271                         if (idlePingRwServer >= pingRwTimeout) {
1272                             lastPingRwServer = now;
1273                             idlePingRwServer = 0;
1274                             pingRwTimeout = Math.min(2 * pingRwTimeout, maxPingRwTimeout);
1275                             pingRwServer();
1276                         }
1277                         to = Math.min(to, pingRwTimeout - idlePingRwServer);
1278                     }
1279 
1280                     clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
1281                 } catch (Throwable e) {
1282                     if (closing) {
1283                         // closing so this is expected
1284                         LOG.warn(
1285                             "An exception was thrown while closing send thread for session 0x{}.",
1286                             Long.toHexString(getSessionId()),
1287                             e);
1288                         break;
1289                     } else {
1290                         LOG.warn(
1291                             "Session 0x{} for sever {}, Closing socket connection. "
1292                                 + "Attempting reconnect except it is a SessionExpiredException.",
1293                             Long.toHexString(getSessionId()),
1294                             serverAddress,
1295                             e);
1296 
1297                         // At this point, there might still be new packets appended to outgoingQueue.
1298                         // they will be handled in next connection or cleared up if closed.
1299                         cleanAndNotifyState();
1300                     }
1301                 }
1302             }
1303 
1304             synchronized (state) {
1305                 // When it comes to this point, it guarantees that later queued
1306                 // packet to outgoingQueue will be notified of death.
1307                 cleanup();
1308             }
1309             clientCnxnSocket.close();
1310             if (state.isAlive()) {
1311                 eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null));
1312             }
1313             eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Closed, null));
1314             ZooTrace.logTraceMessage(
1315                 LOG,
1316                 ZooTrace.getTextTraceLevel(),
1317                 "SendThread exited loop for session: 0x" + Long.toHexString(getSessionId()));
1318         }
1319 
cleanAndNotifyState()1320         private void cleanAndNotifyState() {
1321             cleanup();
1322             if (state.isAlive()) {
1323                 eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null));
1324             }
1325             clientCnxnSocket.updateNow();
1326             clientCnxnSocket.updateLastSendAndHeard();
1327         }
1328 
pingRwServer()1329         private void pingRwServer() throws RWServerFoundException {
1330             String result = null;
1331             InetSocketAddress addr = hostProvider.next(0);
1332 
1333             LOG.info("Checking server {} for being r/w. Timeout {}", addr, pingRwTimeout);
1334 
1335             Socket sock = null;
1336             BufferedReader br = null;
1337             try {
1338                 sock = new Socket(addr.getHostString(), addr.getPort());
1339                 sock.setSoLinger(false, -1);
1340                 sock.setSoTimeout(1000);
1341                 sock.setTcpNoDelay(true);
1342                 sock.getOutputStream().write("isro".getBytes());
1343                 sock.getOutputStream().flush();
1344                 sock.shutdownOutput();
1345                 br = new BufferedReader(new InputStreamReader(sock.getInputStream()));
1346                 result = br.readLine();
1347             } catch (ConnectException e) {
1348                 // ignore, this just means server is not up
1349             } catch (IOException e) {
1350                 // some unexpected error, warn about it
1351                 LOG.warn("Exception while seeking for r/w server.", e);
1352             } finally {
1353                 if (sock != null) {
1354                     try {
1355                         sock.close();
1356                     } catch (IOException e) {
1357                         LOG.warn("Unexpected exception", e);
1358                     }
1359                 }
1360                 if (br != null) {
1361                     try {
1362                         br.close();
1363                     } catch (IOException e) {
1364                         LOG.warn("Unexpected exception", e);
1365                     }
1366                 }
1367             }
1368 
1369             if ("rw".equals(result)) {
1370                 pingRwTimeout = minPingRwTimeout;
1371                 // save the found address so that it's used during the next
1372                 // connection attempt
1373                 rwServerAddress = addr;
1374                 throw new RWServerFoundException("Majority server found at "
1375                                                  + addr.getHostString() + ":" + addr.getPort());
1376             }
1377         }
1378 
cleanup()1379         private void cleanup() {
1380             clientCnxnSocket.cleanup();
1381             synchronized (pendingQueue) {
1382                 for (Packet p : pendingQueue) {
1383                     conLossPacket(p);
1384                 }
1385                 pendingQueue.clear();
1386             }
1387             // We can't call outgoingQueue.clear() here because
1388             // between iterating and clear up there might be new
1389             // packets added in queuePacket().
1390             Iterator<Packet> iter = outgoingQueue.iterator();
1391             while (iter.hasNext()) {
1392                 Packet p = iter.next();
1393                 conLossPacket(p);
1394                 iter.remove();
1395             }
1396         }
1397 
1398         /**
1399          * Callback invoked by the ClientCnxnSocket once a connection has been
1400          * established.
1401          *
1402          * @param _negotiatedSessionTimeout
1403          * @param _sessionId
1404          * @param _sessionPasswd
1405          * @param isRO
1406          * @throws IOException
1407          */
onConnected( int _negotiatedSessionTimeout, long _sessionId, byte[] _sessionPasswd, boolean isRO)1408         void onConnected(
1409             int _negotiatedSessionTimeout,
1410             long _sessionId,
1411             byte[] _sessionPasswd,
1412             boolean isRO) throws IOException {
1413             negotiatedSessionTimeout = _negotiatedSessionTimeout;
1414             if (negotiatedSessionTimeout <= 0) {
1415                 changeZkState(States.CLOSED);
1416 
1417                 eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, null));
1418                 eventThread.queueEventOfDeath();
1419 
1420                 String warnInfo = String.format(
1421                     "Unable to reconnect to ZooKeeper service, session 0x%s has expired",
1422                     Long.toHexString(sessionId));
1423                 LOG.warn(warnInfo);
1424                 throw new SessionExpiredException(warnInfo);
1425             }
1426 
1427             if (!readOnly && isRO) {
1428                 LOG.error("Read/write client got connected to read-only server");
1429             }
1430 
1431             readTimeout = negotiatedSessionTimeout * 2 / 3;
1432             connectTimeout = negotiatedSessionTimeout / hostProvider.size();
1433             hostProvider.onConnected();
1434             sessionId = _sessionId;
1435             sessionPasswd = _sessionPasswd;
1436             changeZkState((isRO) ? States.CONNECTEDREADONLY : States.CONNECTED);
1437             seenRwServerBefore |= !isRO;
1438             LOG.info(
1439                 "Session establishment complete on server {}, session id = 0x{}, negotiated timeout = {}{}",
1440                 clientCnxnSocket.getRemoteSocketAddress(),
1441                 Long.toHexString(sessionId),
1442                 negotiatedSessionTimeout,
1443                 (isRO ? " (READ-ONLY mode)" : ""));
1444             KeeperState eventState = (isRO) ? KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;
1445             eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, eventState, null));
1446         }
1447 
close()1448         void close() {
1449             try {
1450                 changeZkState(States.CLOSED);
1451             } catch (IOException e) {
1452                 LOG.warn("Connection close fails when migrates state from {} to CLOSED",
1453                         getZkState());
1454             }
1455             clientCnxnSocket.onClosing();
1456         }
1457 
testableCloseSocket()1458         void testableCloseSocket() throws IOException {
1459             clientCnxnSocket.testableCloseSocket();
1460         }
1461 
tunnelAuthInProgress()1462         public boolean tunnelAuthInProgress() {
1463             // 1. SASL client is disabled.
1464             if (!clientConfig.isSaslClientEnabled()) {
1465                 return false;
1466             }
1467 
1468             // 2. SASL login failed.
1469             if (saslLoginFailed) {
1470                 return false;
1471             }
1472 
1473             // 3. SendThread has not created the authenticating object yet,
1474             // therefore authentication is (at the earliest stage of being) in progress.
1475             if (zooKeeperSaslClient == null) {
1476                 return true;
1477             }
1478 
1479             // 4. authenticating object exists, so ask it for its progress.
1480             return zooKeeperSaslClient.clientTunneledAuthenticationInProgress();
1481         }
1482 
sendPacket(Packet p)1483         public void sendPacket(Packet p) throws IOException {
1484             clientCnxnSocket.sendPacket(p);
1485         }
1486 
1487     }
1488 
1489     /**
1490      * Shutdown the send/event threads. This method should not be called
1491      * directly - rather it should be called as part of close operation. This
1492      * method is primarily here to allow the tests to verify disconnection
1493      * behavior.
1494      */
disconnect()1495     public void disconnect() {
1496         LOG.debug("Disconnecting client for session: 0x{}", Long.toHexString(getSessionId()));
1497 
1498         sendThread.close();
1499         try {
1500             sendThread.join();
1501         } catch (InterruptedException ex) {
1502             LOG.warn("Got interrupted while waiting for the sender thread to close", ex);
1503         }
1504         eventThread.queueEventOfDeath();
1505         if (zooKeeperSaslClient != null) {
1506             zooKeeperSaslClient.shutdown();
1507         }
1508     }
1509 
1510     /**
1511      * Close the connection, which includes; send session disconnect to the
1512      * server, shutdown the send/event threads.
1513      *
1514      * @throws IOException
1515      */
close()1516     public void close() throws IOException {
1517         LOG.debug("Closing client for session: 0x{}", Long.toHexString(getSessionId()));
1518 
1519         try {
1520             RequestHeader h = new RequestHeader();
1521             h.setType(ZooDefs.OpCode.closeSession);
1522 
1523             submitRequest(h, null, null, null);
1524         } catch (InterruptedException e) {
1525             // ignore, close the send/event threads
1526         } finally {
1527             disconnect();
1528         }
1529     }
1530 
1531     // @VisibleForTesting
1532     protected int xid = 1;
1533 
1534     // @VisibleForTesting
1535     volatile States state = States.NOT_CONNECTED;
1536 
1537     /*
1538      * getXid() is called externally by ClientCnxnNIO::doIO() when packets are sent from the outgoingQueue to
1539      * the server. Thus, getXid() must be public.
1540      */
getXid()1541     public synchronized int getXid() {
1542         // Avoid negative cxid values.  In particular, cxid values of -4, -2, and -1 are special and
1543         // must not be used for requests -- see SendThread.readResponse.
1544         // Skip from MAX to 1.
1545         if (xid == Integer.MAX_VALUE) {
1546             xid = 1;
1547         }
1548         return xid++;
1549     }
1550 
submitRequest( RequestHeader h, Record request, Record response, WatchRegistration watchRegistration)1551     public ReplyHeader submitRequest(
1552         RequestHeader h,
1553         Record request,
1554         Record response,
1555         WatchRegistration watchRegistration) throws InterruptedException {
1556         return submitRequest(h, request, response, watchRegistration, null);
1557     }
1558 
submitRequest( RequestHeader h, Record request, Record response, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration)1559     public ReplyHeader submitRequest(
1560         RequestHeader h,
1561         Record request,
1562         Record response,
1563         WatchRegistration watchRegistration,
1564         WatchDeregistration watchDeregistration) throws InterruptedException {
1565         ReplyHeader r = new ReplyHeader();
1566         Packet packet = queuePacket(
1567             h,
1568             r,
1569             request,
1570             response,
1571             null,
1572             null,
1573             null,
1574             null,
1575             watchRegistration,
1576             watchDeregistration);
1577         synchronized (packet) {
1578             if (requestTimeout > 0) {
1579                 // Wait for request completion with timeout
1580                 waitForPacketFinish(r, packet);
1581             } else {
1582                 // Wait for request completion infinitely
1583                 while (!packet.finished) {
1584                     packet.wait();
1585                 }
1586             }
1587         }
1588         if (r.getErr() == Code.REQUESTTIMEOUT.intValue()) {
1589             sendThread.cleanAndNotifyState();
1590         }
1591         return r;
1592     }
1593 
1594     /**
1595      * Wait for request completion with timeout.
1596      */
waitForPacketFinish(ReplyHeader r, Packet packet)1597     private void waitForPacketFinish(ReplyHeader r, Packet packet) throws InterruptedException {
1598         long waitStartTime = Time.currentElapsedTime();
1599         while (!packet.finished) {
1600             packet.wait(requestTimeout);
1601             if (!packet.finished && ((Time.currentElapsedTime() - waitStartTime) >= requestTimeout)) {
1602                 LOG.error("Timeout error occurred for the packet '{}'.", packet);
1603                 r.setErr(Code.REQUESTTIMEOUT.intValue());
1604                 break;
1605             }
1606         }
1607     }
1608 
saslCompleted()1609     public void saslCompleted() {
1610         sendThread.getClientCnxnSocket().saslCompleted();
1611     }
1612 
sendPacket(Record request, Record response, AsyncCallback cb, int opCode)1613     public void sendPacket(Record request, Record response, AsyncCallback cb, int opCode) throws IOException {
1614         // Generate Xid now because it will be sent immediately,
1615         // by call to sendThread.sendPacket() below.
1616         int xid = getXid();
1617         RequestHeader h = new RequestHeader();
1618         h.setXid(xid);
1619         h.setType(opCode);
1620 
1621         ReplyHeader r = new ReplyHeader();
1622         r.setXid(xid);
1623 
1624         Packet p = new Packet(h, r, request, response, null, false);
1625         p.cb = cb;
1626         sendThread.sendPacket(p);
1627     }
1628 
queuePacket( RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration)1629     public Packet queuePacket(
1630         RequestHeader h,
1631         ReplyHeader r,
1632         Record request,
1633         Record response,
1634         AsyncCallback cb,
1635         String clientPath,
1636         String serverPath,
1637         Object ctx,
1638         WatchRegistration watchRegistration) {
1639         return queuePacket(h, r, request, response, cb, clientPath, serverPath, ctx, watchRegistration, null);
1640     }
1641 
queuePacket( RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration)1642     public Packet queuePacket(
1643         RequestHeader h,
1644         ReplyHeader r,
1645         Record request,
1646         Record response,
1647         AsyncCallback cb,
1648         String clientPath,
1649         String serverPath,
1650         Object ctx,
1651         WatchRegistration watchRegistration,
1652         WatchDeregistration watchDeregistration) {
1653         Packet packet = null;
1654 
1655         // Note that we do not generate the Xid for the packet yet. It is
1656         // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
1657         // where the packet is actually sent.
1658         packet = new Packet(h, r, request, response, watchRegistration);
1659         packet.cb = cb;
1660         packet.ctx = ctx;
1661         packet.clientPath = clientPath;
1662         packet.serverPath = serverPath;
1663         packet.watchDeregistration = watchDeregistration;
1664         // The synchronized block here is for two purpose:
1665         // 1. synchronize with the final cleanup() in SendThread.run() to avoid race
1666         // 2. synchronized against each packet. So if a closeSession packet is added,
1667         // later packet will be notified.
1668         synchronized (state) {
1669             if (!state.isAlive() || closing) {
1670                 conLossPacket(packet);
1671             } else {
1672                 // If the client is asking to close the session then
1673                 // mark as closing
1674                 if (h.getType() == OpCode.closeSession) {
1675                     closing = true;
1676                 }
1677                 outgoingQueue.add(packet);
1678             }
1679         }
1680         sendThread.getClientCnxnSocket().packetAdded();
1681         return packet;
1682     }
1683 
addAuthInfo(String scheme, byte[] auth)1684     public void addAuthInfo(String scheme, byte[] auth) {
1685         if (!state.isAlive()) {
1686             return;
1687         }
1688         authInfo.add(new AuthData(scheme, auth));
1689         queuePacket(
1690             new RequestHeader(ClientCnxn.AUTHPACKET_XID, OpCode.auth),
1691             null,
1692             new AuthPacket(0, scheme, auth),
1693             null,
1694             null,
1695             null,
1696             null,
1697             null,
1698             null);
1699     }
1700 
getState()1701     States getState() {
1702         return state;
1703     }
1704 
1705     private static class LocalCallback {
1706 
1707         private final AsyncCallback cb;
1708         private final int rc;
1709         private final String path;
1710         private final Object ctx;
1711 
LocalCallback(AsyncCallback cb, int rc, String path, Object ctx)1712         public LocalCallback(AsyncCallback cb, int rc, String path, Object ctx) {
1713             this.cb = cb;
1714             this.rc = rc;
1715             this.path = path;
1716             this.ctx = ctx;
1717         }
1718 
1719     }
1720 
initRequestTimeout()1721     private void initRequestTimeout() {
1722         try {
1723             requestTimeout = clientConfig.getLong(
1724                 ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT,
1725                 ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT_DEFAULT);
1726             LOG.info(
1727                 "{} value is {}. feature enabled={}",
1728                 ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT,
1729                 requestTimeout,
1730                 requestTimeout > 0);
1731         } catch (NumberFormatException e) {
1732             LOG.error(
1733                 "Configured value {} for property {} can not be parsed to long.",
1734                 clientConfig.getProperty(ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT),
1735                 ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT);
1736             throw e;
1737         }
1738     }
1739 
1740 }
1741