1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.zookeeper.server;
20 
21 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
22 import java.io.ByteArrayOutputStream;
23 import java.io.File;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.io.PrintWriter;
27 import java.nio.ByteBuffer;
28 import java.util.ArrayDeque;
29 import java.util.ArrayList;
30 import java.util.Arrays;
31 import java.util.Deque;
32 import java.util.HashMap;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.Properties;
36 import java.util.Random;
37 import java.util.Set;
38 import java.util.concurrent.atomic.AtomicInteger;
39 import java.util.concurrent.atomic.AtomicLong;
40 import java.util.function.BiConsumer;
41 import javax.security.sasl.SaslException;
42 import org.apache.jute.BinaryInputArchive;
43 import org.apache.jute.BinaryOutputArchive;
44 import org.apache.jute.Record;
45 import org.apache.zookeeper.Environment;
46 import org.apache.zookeeper.KeeperException;
47 import org.apache.zookeeper.KeeperException.Code;
48 import org.apache.zookeeper.KeeperException.SessionExpiredException;
49 import org.apache.zookeeper.Quotas;
50 import org.apache.zookeeper.StatsTrack;
51 import org.apache.zookeeper.Version;
52 import org.apache.zookeeper.ZooDefs;
53 import org.apache.zookeeper.ZooDefs.OpCode;
54 import org.apache.zookeeper.ZookeeperBanner;
55 import org.apache.zookeeper.common.StringUtils;
56 import org.apache.zookeeper.common.Time;
57 import org.apache.zookeeper.data.ACL;
58 import org.apache.zookeeper.data.Id;
59 import org.apache.zookeeper.data.StatPersisted;
60 import org.apache.zookeeper.jmx.MBeanRegistry;
61 import org.apache.zookeeper.metrics.MetricsContext;
62 import org.apache.zookeeper.proto.AuthPacket;
63 import org.apache.zookeeper.proto.ConnectRequest;
64 import org.apache.zookeeper.proto.ConnectResponse;
65 import org.apache.zookeeper.proto.CreateRequest;
66 import org.apache.zookeeper.proto.DeleteRequest;
67 import org.apache.zookeeper.proto.GetSASLRequest;
68 import org.apache.zookeeper.proto.ReplyHeader;
69 import org.apache.zookeeper.proto.RequestHeader;
70 import org.apache.zookeeper.proto.SetACLRequest;
71 import org.apache.zookeeper.proto.SetDataRequest;
72 import org.apache.zookeeper.proto.SetSASLResponse;
73 import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
74 import org.apache.zookeeper.server.RequestProcessor.RequestProcessorException;
75 import org.apache.zookeeper.server.ServerCnxn.CloseRequestException;
76 import org.apache.zookeeper.server.SessionTracker.Session;
77 import org.apache.zookeeper.server.SessionTracker.SessionExpirer;
78 import org.apache.zookeeper.server.auth.ProviderRegistry;
79 import org.apache.zookeeper.server.auth.ServerAuthenticationProvider;
80 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
81 import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
82 import org.apache.zookeeper.server.quorum.ReadOnlyZooKeeperServer;
83 import org.apache.zookeeper.server.util.JvmPauseMonitor;
84 import org.apache.zookeeper.server.util.OSMXBean;
85 import org.apache.zookeeper.server.util.RequestPathMetricsCollector;
86 import org.apache.zookeeper.txn.CreateSessionTxn;
87 import org.apache.zookeeper.txn.TxnDigest;
88 import org.apache.zookeeper.txn.TxnHeader;
89 import org.apache.zookeeper.util.ServiceUtils;
90 import org.slf4j.Logger;
91 import org.slf4j.LoggerFactory;
92 
93 /**
94  * This class implements a simple standalone ZooKeeperServer. It sets up the
95  * following chain of RequestProcessors to process requests:
96  * PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor
97  */
98 public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
99 
100     protected static final Logger LOG;
101     private static final RateLogger RATE_LOGGER;
102 
103     public static final String GLOBAL_OUTSTANDING_LIMIT = "zookeeper.globalOutstandingLimit";
104 
105     public static final String ENABLE_EAGER_ACL_CHECK = "zookeeper.enableEagerACLCheck";
106     public static final String SKIP_ACL = "zookeeper.skipACL";
107     public static final String ENFORCE_QUOTA = "zookeeper.enforceQuota";
108 
109     // When enabled, will check ACL constraints appertained to the requests first,
110     // before sending the requests to the quorum.
111     static final boolean enableEagerACLCheck;
112 
113     static final boolean skipACL;
114 
115     public static final boolean enforceQuota;
116 
117     public static final String SASL_SUPER_USER = "zookeeper.superUser";
118 
119     public static final String ALLOW_SASL_FAILED_CLIENTS = "zookeeper.allowSaslFailedClients";
120     public static final String ZOOKEEPER_DIGEST_ENABLED = "zookeeper.digest.enabled";
121     private static boolean digestEnabled;
122 
123     // Add a enable/disable option for now, we should remove this one when
124     // this feature is confirmed to be stable
125     public static final String CLOSE_SESSION_TXN_ENABLED = "zookeeper.closeSessionTxn.enabled";
126     private static boolean closeSessionTxnEnabled = true;
127 
128     static {
129         LOG = LoggerFactory.getLogger(ZooKeeperServer.class);
130 
131         RATE_LOGGER = new RateLogger(LOG);
132 
133         ZookeeperBanner.printBanner(LOG);
134 
135         Environment.logEnv("Server environment:", LOG);
136 
137         enableEagerACLCheck = Boolean.getBoolean(ENABLE_EAGER_ACL_CHECK);
138         LOG.info("{} = {}", ENABLE_EAGER_ACL_CHECK, enableEagerACLCheck);
139 
140         skipACL = System.getProperty(SKIP_ACL, "no").equals("yes");
141         if (skipACL) {
142             LOG.info("{}==\"yes\", ACL checks will be skipped", SKIP_ACL);
143         }
144 
145         enforceQuota = Boolean.parseBoolean(System.getProperty(ENFORCE_QUOTA, "false"));
146         if (enforceQuota) {
147             LOG.info("{} = {}, Quota Enforce enables", ENFORCE_QUOTA, enforceQuota);
148         }
149 
150         digestEnabled = Boolean.parseBoolean(System.getProperty(ZOOKEEPER_DIGEST_ENABLED, "true"));
151         LOG.info("{} = {}", ZOOKEEPER_DIGEST_ENABLED, digestEnabled);
152 
153         closeSessionTxnEnabled = Boolean.parseBoolean(
154                 System.getProperty(CLOSE_SESSION_TXN_ENABLED, "true"));
155         LOG.info("{} = {}", CLOSE_SESSION_TXN_ENABLED, closeSessionTxnEnabled);
156     }
157 
isCloseSessionTxnEnabled()158     public static boolean isCloseSessionTxnEnabled() {
159         return closeSessionTxnEnabled;
160     }
161 
setCloseSessionTxnEnabled(boolean enabled)162     public static void setCloseSessionTxnEnabled(boolean enabled) {
163         ZooKeeperServer.closeSessionTxnEnabled = enabled;
164         LOG.info("Update {} to {}", CLOSE_SESSION_TXN_ENABLED,
165                 ZooKeeperServer.closeSessionTxnEnabled);
166     }
167 
168     protected ZooKeeperServerBean jmxServerBean;
169     protected DataTreeBean jmxDataTreeBean;
170 
171     public static final int DEFAULT_TICK_TIME = 3000;
172     protected int tickTime = DEFAULT_TICK_TIME;
173     public static final int DEFAULT_THROTTLED_OP_WAIT_TIME = 0; // disabled
174     protected static volatile int throttledOpWaitTime =
175         Integer.getInteger("zookeeper.throttled_op_wait_time", DEFAULT_THROTTLED_OP_WAIT_TIME);
176     /** value of -1 indicates unset, use default */
177     protected int minSessionTimeout = -1;
178     /** value of -1 indicates unset, use default */
179     protected int maxSessionTimeout = -1;
180     /** Socket listen backlog. Value of -1 indicates unset */
181     protected int listenBacklog = -1;
182     protected SessionTracker sessionTracker;
183     private FileTxnSnapLog txnLogFactory = null;
184     private ZKDatabase zkDb;
185     private ResponseCache readResponseCache;
186     private ResponseCache getChildrenResponseCache;
187     private final AtomicLong hzxid = new AtomicLong(0);
188     public static final Exception ok = new Exception("No prob");
189     protected RequestProcessor firstProcessor;
190     protected JvmPauseMonitor jvmPauseMonitor;
191     protected volatile State state = State.INITIAL;
192     private boolean isResponseCachingEnabled = true;
193     /* contains the configuration file content read at startup */
194     protected String initialConfig;
195     protected boolean reconfigEnabled;
196     private final RequestPathMetricsCollector requestPathMetricsCollector;
197 
198     private boolean localSessionEnabled = false;
199     protected enum State {
200         INITIAL,
201         RUNNING,
202         SHUTDOWN,
203         ERROR
204     }
205 
206     /**
207      * This is the secret that we use to generate passwords. For the moment,
208      * it's more of a checksum that's used in reconnection, which carries no
209      * security weight, and is treated internally as if it carries no
210      * security weight.
211      */
212     private static final long superSecret = 0XB3415C00L;
213 
214     private final AtomicInteger requestsInProcess = new AtomicInteger(0);
215     final Deque<ChangeRecord> outstandingChanges = new ArrayDeque<>();
216     // this data structure must be accessed under the outstandingChanges lock
217     final Map<String, ChangeRecord> outstandingChangesForPath = new HashMap<String, ChangeRecord>();
218 
219     protected ServerCnxnFactory serverCnxnFactory;
220     protected ServerCnxnFactory secureServerCnxnFactory;
221 
222     private final ServerStats serverStats;
223     private final ZooKeeperServerListener listener;
224     private ZooKeeperServerShutdownHandler zkShutdownHandler;
225     private volatile int createSessionTrackerServerId = 1;
226 
227     private static final String FLUSH_DELAY = "zookeeper.flushDelay";
228     private static volatile long flushDelay;
229     private static final String MAX_WRITE_QUEUE_POLL_SIZE = "zookeeper.maxWriteQueuePollTime";
230     private static volatile long maxWriteQueuePollTime;
231     private static final String MAX_BATCH_SIZE = "zookeeper.maxBatchSize";
232     private static volatile int maxBatchSize;
233 
234     /**
235      * Starting size of read and write ByteArroyOuputBuffers. Default is 32 bytes.
236      * Flag not used for small transfers like connectResponses.
237      */
238     public static final String INT_BUFFER_STARTING_SIZE_BYTES = "zookeeper.intBufferStartingSizeBytes";
239     public static final int DEFAULT_STARTING_BUFFER_SIZE = 1024;
240     public static final int intBufferStartingSizeBytes;
241 
242     public static final String GET_DATA_RESPONSE_CACHE_SIZE = "zookeeper.maxResponseCacheSize";
243     public static final String GET_CHILDREN_RESPONSE_CACHE_SIZE = "zookeeper.maxGetChildrenResponseCacheSize";
244 
245     static {
246         long configuredFlushDelay = Long.getLong(FLUSH_DELAY, 0);
247         setFlushDelay(configuredFlushDelay);
Long.getLong(MAX_WRITE_QUEUE_POLL_SIZE, configuredFlushDelay / 3)248         setMaxWriteQueuePollTime(Long.getLong(MAX_WRITE_QUEUE_POLL_SIZE, configuredFlushDelay / 3));
Integer.getInteger(MAX_BATCH_SIZE, 1000)249         setMaxBatchSize(Integer.getInteger(MAX_BATCH_SIZE, 1000));
250 
251         intBufferStartingSizeBytes = Integer.getInteger(INT_BUFFER_STARTING_SIZE_BYTES, DEFAULT_STARTING_BUFFER_SIZE);
252 
253         if (intBufferStartingSizeBytes < 32) {
254             String msg = "Buffer starting size must be greater than 0."
255                          + "Configure with \"-Dzookeeper.intBufferStartingSizeBytes=<size>\" ";
256             LOG.error(msg);
257             throw new IllegalArgumentException(msg);
258         }
259 
260         LOG.info("{} = {}", INT_BUFFER_STARTING_SIZE_BYTES, intBufferStartingSizeBytes);
261     }
262 
263     // Connection throttling
264     private BlueThrottle connThrottle = new BlueThrottle();
265 
266     @SuppressFBWarnings(value = "IS2_INCONSISTENT_SYNC", justification =
267         "Internally the throttler has a BlockingQueue so "
268         + "once the throttler is created and started, it is thread-safe")
269     private RequestThrottler requestThrottler;
270     public static final String SNAP_COUNT = "zookeeper.snapCount";
271 
272     /**
273      * This setting sets a limit on the total number of large requests that
274      * can be inflight and is designed to prevent ZooKeeper from accepting
275      * too many large requests such that the JVM runs out of usable heap and
276      * ultimately crashes.
277      *
278      * The limit is enforced by the {@link checkRequestSize(int, boolean)}
279      * method which is called by the connection layer ({@link NIOServerCnxn},
280      * {@link NettyServerCnxn}) before allocating a byte buffer and pulling
281      * data off the TCP socket. The limit is then checked again by the
282      * ZooKeeper server in {@link processPacket(ServerCnxn, ByteBuffer)} which
283      * also atomically updates {@link currentLargeRequestBytes}. The request is
284      * then marked as a large request, with the request size stored in the Request
285      * object so that it can later be decremented from {@link currentLargeRequestsBytes}.
286      *
287      * When a request is completed or dropped, the relevant code path calls the
288      * {@link requestFinished(Request)} method which performs the decrement if
289      * needed.
290      */
291     private volatile int largeRequestMaxBytes = 100 * 1024 * 1024;
292 
293     /**
294      * The size threshold after which a request is considered a large request
295      * and is checked against the large request byte limit.
296      */
297     private volatile int largeRequestThreshold = -1;
298 
299     private final AtomicInteger currentLargeRequestBytes = new AtomicInteger(0);
300 
301     private AuthenticationHelper authHelper;
302 
removeCnxn(ServerCnxn cnxn)303     void removeCnxn(ServerCnxn cnxn) {
304         zkDb.removeCnxn(cnxn);
305     }
306 
307     /**
308      * Creates a ZooKeeperServer instance. Nothing is setup, use the setX
309      * methods to prepare the instance (eg datadir, datalogdir, ticktime,
310      * builder, etc...)
311      *
312      */
ZooKeeperServer()313     public ZooKeeperServer() {
314         listener = new ZooKeeperServerListenerImpl(this);
315         serverStats = new ServerStats(this);
316         this.requestPathMetricsCollector = new RequestPathMetricsCollector();
317         this.authHelper = new AuthenticationHelper();
318     }
319 
320     /**
321      * Keeping this constructor for backward compatibility
322      */
ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig)323     public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig) {
324         this(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, clientPortListenBacklog, zkDb, initialConfig, QuorumPeerConfig.isReconfigEnabled());
325     }
326 
327     /**
328      *  * Creates a ZooKeeperServer instance. It sets everything up, but doesn't
329      * actually start listening for clients until run() is invoked.
330      *
331      */
ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig, boolean reconfigEnabled)332     public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig, boolean reconfigEnabled) {
333         serverStats = new ServerStats(this);
334         this.txnLogFactory = txnLogFactory;
335         this.txnLogFactory.setServerStats(this.serverStats);
336         this.zkDb = zkDb;
337         this.tickTime = tickTime;
338         setMinSessionTimeout(minSessionTimeout);
339         setMaxSessionTimeout(maxSessionTimeout);
340         this.listenBacklog = clientPortListenBacklog;
341         this.reconfigEnabled = reconfigEnabled;
342 
343         listener = new ZooKeeperServerListenerImpl(this);
344 
345         readResponseCache = new ResponseCache(Integer.getInteger(
346             GET_DATA_RESPONSE_CACHE_SIZE,
347             ResponseCache.DEFAULT_RESPONSE_CACHE_SIZE), "getData");
348 
349         getChildrenResponseCache = new ResponseCache(Integer.getInteger(
350             GET_CHILDREN_RESPONSE_CACHE_SIZE,
351             ResponseCache.DEFAULT_RESPONSE_CACHE_SIZE), "getChildren");
352 
353         this.initialConfig = initialConfig;
354 
355         this.requestPathMetricsCollector = new RequestPathMetricsCollector();
356 
357         this.initLargeRequestThrottlingSettings();
358 
359         this.authHelper = new AuthenticationHelper();
360 
361         LOG.info(
362             "Created server with"
363                 + " tickTime {}"
364                 + " minSessionTimeout {}"
365                 + " maxSessionTimeout {}"
366                 + " clientPortListenBacklog {}"
367                 + " datadir {}"
368                 + " snapdir {}",
369             tickTime,
370             getMinSessionTimeout(),
371             getMaxSessionTimeout(),
372             getClientPortListenBacklog(),
373             txnLogFactory.getDataDir(),
374             txnLogFactory.getSnapDir());
375     }
376 
getInitialConfig()377     public String getInitialConfig() {
378         return initialConfig;
379     }
380 
381     /**
382      * Adds JvmPauseMonitor and calls
383      * {@link #ZooKeeperServer(FileTxnSnapLog, int, int, int, int, ZKDatabase, String)}
384      *
385      */
ZooKeeperServer(JvmPauseMonitor jvmPauseMonitor, FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig)386     public ZooKeeperServer(JvmPauseMonitor jvmPauseMonitor, FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig) {
387         this(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, clientPortListenBacklog, zkDb, initialConfig, QuorumPeerConfig.isReconfigEnabled());
388         this.jvmPauseMonitor = jvmPauseMonitor;
389         if (jvmPauseMonitor != null) {
390             LOG.info("Added JvmPauseMonitor to server");
391         }
392     }
393 
394     /**
395      * creates a zookeeperserver instance.
396      * @param txnLogFactory the file transaction snapshot logging class
397      * @param tickTime the ticktime for the server
398      * @throws IOException
399      */
ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, String initialConfig)400     public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, String initialConfig) {
401         this(txnLogFactory, tickTime, -1, -1, -1, new ZKDatabase(txnLogFactory), initialConfig, QuorumPeerConfig.isReconfigEnabled());
402     }
403 
serverStats()404     public ServerStats serverStats() {
405         return serverStats;
406     }
407 
getRequestPathMetricsCollector()408     public RequestPathMetricsCollector getRequestPathMetricsCollector() {
409         return requestPathMetricsCollector;
410     }
411 
connThrottle()412     public BlueThrottle connThrottle() {
413         return connThrottle;
414     }
415 
dumpConf(PrintWriter pwriter)416     public void dumpConf(PrintWriter pwriter) {
417         pwriter.print("clientPort=");
418         pwriter.println(getClientPort());
419         pwriter.print("secureClientPort=");
420         pwriter.println(getSecureClientPort());
421         pwriter.print("dataDir=");
422         pwriter.println(zkDb.snapLog.getSnapDir().getAbsolutePath());
423         pwriter.print("dataDirSize=");
424         pwriter.println(getDataDirSize());
425         pwriter.print("dataLogDir=");
426         pwriter.println(zkDb.snapLog.getDataDir().getAbsolutePath());
427         pwriter.print("dataLogSize=");
428         pwriter.println(getLogDirSize());
429         pwriter.print("tickTime=");
430         pwriter.println(getTickTime());
431         pwriter.print("maxClientCnxns=");
432         pwriter.println(getMaxClientCnxnsPerHost());
433         pwriter.print("minSessionTimeout=");
434         pwriter.println(getMinSessionTimeout());
435         pwriter.print("maxSessionTimeout=");
436         pwriter.println(getMaxSessionTimeout());
437         pwriter.print("clientPortListenBacklog=");
438         pwriter.println(getClientPortListenBacklog());
439 
440         pwriter.print("serverId=");
441         pwriter.println(getServerId());
442     }
443 
getConf()444     public ZooKeeperServerConf getConf() {
445         return new ZooKeeperServerConf(
446             getClientPort(),
447             zkDb.snapLog.getSnapDir().getAbsolutePath(),
448             zkDb.snapLog.getDataDir().getAbsolutePath(),
449             getTickTime(),
450             getMaxClientCnxnsPerHost(),
451             getMinSessionTimeout(),
452             getMaxSessionTimeout(),
453             getServerId(),
454             getClientPortListenBacklog());
455     }
456 
457     /**
458      * This constructor is for backward compatibility with the existing unit
459      * test code.
460      * It defaults to FileLogProvider persistence provider.
461      */
ZooKeeperServer(File snapDir, File logDir, int tickTime)462     public ZooKeeperServer(File snapDir, File logDir, int tickTime) throws IOException {
463         this(new FileTxnSnapLog(snapDir, logDir), tickTime, "");
464     }
465 
466     /**
467      * Default constructor, relies on the config for its argument values
468      *
469      * @throws IOException
470      */
ZooKeeperServer(FileTxnSnapLog txnLogFactory)471     public ZooKeeperServer(FileTxnSnapLog txnLogFactory) throws IOException {
472         this(txnLogFactory, DEFAULT_TICK_TIME, -1, -1, -1, new ZKDatabase(txnLogFactory), "", QuorumPeerConfig.isReconfigEnabled());
473     }
474 
475     /**
476      * get the zookeeper database for this server
477      * @return the zookeeper database for this server
478      */
getZKDatabase()479     public ZKDatabase getZKDatabase() {
480         return this.zkDb;
481     }
482 
483     /**
484      * set the zkdatabase for this zookeeper server
485      * @param zkDb
486      */
setZKDatabase(ZKDatabase zkDb)487     public void setZKDatabase(ZKDatabase zkDb) {
488         this.zkDb = zkDb;
489     }
490 
491     /**
492      *  Restore sessions and data
493      */
loadData()494     public void loadData() throws IOException, InterruptedException {
495         /*
496          * When a new leader starts executing Leader#lead, it
497          * invokes this method. The database, however, has been
498          * initialized before running leader election so that
499          * the server could pick its zxid for its initial vote.
500          * It does it by invoking QuorumPeer#getLastLoggedZxid.
501          * Consequently, we don't need to initialize it once more
502          * and avoid the penalty of loading it a second time. Not
503          * reloading it is particularly important for applications
504          * that host a large database.
505          *
506          * The following if block checks whether the database has
507          * been initialized or not. Note that this method is
508          * invoked by at least one other method:
509          * ZooKeeperServer#startdata.
510          *
511          * See ZOOKEEPER-1642 for more detail.
512          */
513         if (zkDb.isInitialized()) {
514             setZxid(zkDb.getDataTreeLastProcessedZxid());
515         } else {
516             setZxid(zkDb.loadDataBase());
517         }
518 
519         // Clean up dead sessions
520         zkDb.getSessions().stream()
521                         .filter(session -> zkDb.getSessionWithTimeOuts().get(session) == null)
522                         .forEach(session -> killSession(session, zkDb.getDataTreeLastProcessedZxid()));
523 
524         // Make a clean snapshot
525         takeSnapshot();
526     }
527 
takeSnapshot()528     public void takeSnapshot() {
529         takeSnapshot(false);
530     }
531 
takeSnapshot(boolean syncSnap)532     public void takeSnapshot(boolean syncSnap) {
533         long start = Time.currentElapsedTime();
534         try {
535             txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap);
536         } catch (IOException e) {
537             LOG.error("Severe unrecoverable error, exiting", e);
538             // This is a severe error that we cannot recover from,
539             // so we need to exit
540             ServiceUtils.requestSystemExit(ExitCode.TXNLOG_ERROR_TAKING_SNAPSHOT.getValue());
541         }
542         long elapsed = Time.currentElapsedTime() - start;
543         LOG.info("Snapshot taken in {} ms", elapsed);
544         ServerMetrics.getMetrics().SNAPSHOT_TIME.add(elapsed);
545     }
546 
shouldForceWriteInitialSnapshotAfterLeaderElection()547     public boolean shouldForceWriteInitialSnapshotAfterLeaderElection() {
548         return txnLogFactory.shouldForceWriteInitialSnapshotAfterLeaderElection();
549     }
550 
551     @Override
getDataDirSize()552     public long getDataDirSize() {
553         if (zkDb == null) {
554             return 0L;
555         }
556         File path = zkDb.snapLog.getDataDir();
557         return getDirSize(path);
558     }
559 
560     @Override
getLogDirSize()561     public long getLogDirSize() {
562         if (zkDb == null) {
563             return 0L;
564         }
565         File path = zkDb.snapLog.getSnapDir();
566         return getDirSize(path);
567     }
568 
getDirSize(File file)569     private long getDirSize(File file) {
570         long size = 0L;
571         if (file.isDirectory()) {
572             File[] files = file.listFiles();
573             if (files != null) {
574                 for (File f : files) {
575                     size += getDirSize(f);
576                 }
577             }
578         } else {
579             size = file.length();
580         }
581         return size;
582     }
583 
getZxid()584     public long getZxid() {
585         return hzxid.get();
586     }
587 
getSessionTracker()588     public SessionTracker getSessionTracker() {
589         return sessionTracker;
590     }
591 
getNextZxid()592     long getNextZxid() {
593         return hzxid.incrementAndGet();
594     }
595 
setZxid(long zxid)596     public void setZxid(long zxid) {
597         hzxid.set(zxid);
598     }
599 
close(long sessionId)600     private void close(long sessionId) {
601         Request si = new Request(null, sessionId, 0, OpCode.closeSession, null, null);
602         submitRequest(si);
603     }
604 
closeSession(long sessionId)605     public void closeSession(long sessionId) {
606         LOG.info("Closing session 0x{}", Long.toHexString(sessionId));
607 
608         // we do not want to wait for a session close. send it as soon as we
609         // detect it!
610         close(sessionId);
611     }
612 
killSession(long sessionId, long zxid)613     protected void killSession(long sessionId, long zxid) {
614         zkDb.killSession(sessionId, zxid);
615         if (LOG.isTraceEnabled()) {
616             ZooTrace.logTraceMessage(
617                 LOG,
618                 ZooTrace.SESSION_TRACE_MASK,
619                 "ZooKeeperServer --- killSession: 0x" + Long.toHexString(sessionId));
620         }
621         if (sessionTracker != null) {
622             sessionTracker.removeSession(sessionId);
623         }
624     }
625 
expire(Session session)626     public void expire(Session session) {
627         long sessionId = session.getSessionId();
628         LOG.info(
629             "Expiring session 0x{}, timeout of {}ms exceeded",
630             Long.toHexString(sessionId),
631             session.getTimeout());
632         close(sessionId);
633     }
634 
635     public static class MissingSessionException extends IOException {
636 
637         private static final long serialVersionUID = 7467414635467261007L;
638 
MissingSessionException(String msg)639         public MissingSessionException(String msg) {
640             super(msg);
641         }
642 
643     }
644 
touch(ServerCnxn cnxn)645     void touch(ServerCnxn cnxn) throws MissingSessionException {
646         if (cnxn == null) {
647             return;
648         }
649         long id = cnxn.getSessionId();
650         int to = cnxn.getSessionTimeout();
651         if (!sessionTracker.touchSession(id, to)) {
652             throw new MissingSessionException("No session with sessionid 0x"
653                                               + Long.toHexString(id)
654                                               + " exists, probably expired and removed");
655         }
656     }
657 
registerJMX()658     protected void registerJMX() {
659         // register with JMX
660         try {
661             jmxServerBean = new ZooKeeperServerBean(this);
662             MBeanRegistry.getInstance().register(jmxServerBean, null);
663 
664             try {
665                 jmxDataTreeBean = new DataTreeBean(zkDb.getDataTree());
666                 MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean);
667             } catch (Exception e) {
668                 LOG.warn("Failed to register with JMX", e);
669                 jmxDataTreeBean = null;
670             }
671         } catch (Exception e) {
672             LOG.warn("Failed to register with JMX", e);
673             jmxServerBean = null;
674         }
675     }
676 
startdata()677     public void startdata() throws IOException, InterruptedException {
678         //check to see if zkDb is not null
679         if (zkDb == null) {
680             zkDb = new ZKDatabase(this.txnLogFactory);
681         }
682         if (!zkDb.isInitialized()) {
683             loadData();
684         }
685     }
686 
startup()687     public synchronized void startup() {
688         startupWithServerState(State.RUNNING);
689     }
690 
startupWithoutServing()691     public synchronized void startupWithoutServing() {
692         startupWithServerState(State.INITIAL);
693     }
694 
startServing()695     public synchronized void startServing() {
696         setState(State.RUNNING);
697         notifyAll();
698     }
699 
startupWithServerState(State state)700     private void startupWithServerState(State state) {
701         if (sessionTracker == null) {
702             createSessionTracker();
703         }
704         startSessionTracker();
705         setupRequestProcessors();
706 
707         startRequestThrottler();
708 
709         registerJMX();
710 
711         startJvmPauseMonitor();
712 
713         registerMetrics();
714 
715         setState(state);
716 
717         requestPathMetricsCollector.start();
718 
719         localSessionEnabled = sessionTracker.isLocalSessionsEnabled();
720 
721         notifyAll();
722     }
723 
startJvmPauseMonitor()724     protected void startJvmPauseMonitor() {
725         if (this.jvmPauseMonitor != null) {
726             this.jvmPauseMonitor.serviceStart();
727         }
728     }
729 
startRequestThrottler()730     protected void startRequestThrottler() {
731         requestThrottler = new RequestThrottler(this);
732         requestThrottler.start();
733 
734     }
735 
setupRequestProcessors()736     protected void setupRequestProcessors() {
737         RequestProcessor finalProcessor = new FinalRequestProcessor(this);
738         RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
739         ((SyncRequestProcessor) syncProcessor).start();
740         firstProcessor = new PrepRequestProcessor(this, syncProcessor);
741         ((PrepRequestProcessor) firstProcessor).start();
742     }
743 
getZooKeeperServerListener()744     public ZooKeeperServerListener getZooKeeperServerListener() {
745         return listener;
746     }
747 
748     /**
749      * Change the server ID used by {@link #createSessionTracker()}. Must be called prior to
750      * {@link #startup()} being called
751      *
752      * @param newId ID to use
753      */
setCreateSessionTrackerServerId(int newId)754     public void setCreateSessionTrackerServerId(int newId) {
755         createSessionTrackerServerId = newId;
756     }
757 
createSessionTracker()758     protected void createSessionTracker() {
759         sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(), tickTime, createSessionTrackerServerId, getZooKeeperServerListener());
760     }
761 
startSessionTracker()762     protected void startSessionTracker() {
763         ((SessionTrackerImpl) sessionTracker).start();
764     }
765 
766     /**
767      * Sets the state of ZooKeeper server. After changing the state, it notifies
768      * the server state change to a registered shutdown handler, if any.
769      * <p>
770      * The following are the server state transitions:
771      * <ul><li>During startup the server will be in the INITIAL state.</li>
772      * <li>After successfully starting, the server sets the state to RUNNING.
773      * </li>
774      * <li>The server transitions to the ERROR state if it hits an internal
775      * error. {@link ZooKeeperServerListenerImpl} notifies any critical resource
776      * error events, e.g., SyncRequestProcessor not being able to write a txn to
777      * disk.</li>
778      * <li>During shutdown the server sets the state to SHUTDOWN, which
779      * corresponds to the server not running.</li></ul>
780      *
781      * @param state new server state.
782      */
setState(State state)783     protected void setState(State state) {
784         this.state = state;
785         // Notify server state changes to the registered shutdown handler, if any.
786         if (zkShutdownHandler != null) {
787             zkShutdownHandler.handle(state);
788         } else {
789             LOG.debug(
790                 "ZKShutdownHandler is not registered, so ZooKeeper server"
791                     + " won't take any action on ERROR or SHUTDOWN server state changes");
792         }
793     }
794 
795     /**
796      * This can be used while shutting down the server to see whether the server
797      * is already shutdown or not.
798      *
799      * @return true if the server is running or server hits an error, false
800      *         otherwise.
801      */
canShutdown()802     protected boolean canShutdown() {
803         return state == State.RUNNING || state == State.ERROR;
804     }
805 
806     /**
807      * @return true if the server is running, false otherwise.
808      */
isRunning()809     public boolean isRunning() {
810         return state == State.RUNNING;
811     }
812 
shutdown()813     public void shutdown() {
814         shutdown(false);
815     }
816 
817     /**
818      * Shut down the server instance
819      * @param fullyShutDown true if another server using the same database will not replace this one in the same process
820      */
shutdown(boolean fullyShutDown)821     public synchronized void shutdown(boolean fullyShutDown) {
822         if (!canShutdown()) {
823             if (fullyShutDown && zkDb != null) {
824                 zkDb.clear();
825             }
826             LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!");
827             return;
828         }
829         LOG.info("shutting down");
830 
831         // new RuntimeException("Calling shutdown").printStackTrace();
832         setState(State.SHUTDOWN);
833 
834         // unregister all metrics that are keeping a strong reference to this object
835         // subclasses will do their specific clean up
836         unregisterMetrics();
837 
838         if (requestThrottler != null) {
839             requestThrottler.shutdown();
840         }
841 
842         // Since sessionTracker and syncThreads poll we just have to
843         // set running to false and they will detect it during the poll
844         // interval.
845         if (sessionTracker != null) {
846             sessionTracker.shutdown();
847         }
848         if (firstProcessor != null) {
849             firstProcessor.shutdown();
850         }
851         if (jvmPauseMonitor != null) {
852             jvmPauseMonitor.serviceStop();
853         }
854 
855         if (zkDb != null) {
856             if (fullyShutDown) {
857                 zkDb.clear();
858             } else {
859                 // else there is no need to clear the database
860                 //  * When a new quorum is established we can still apply the diff
861                 //    on top of the same zkDb data
862                 //  * If we fetch a new snapshot from leader, the zkDb will be
863                 //    cleared anyway before loading the snapshot
864                 try {
865                     //This will fast forward the database to the latest recorded transactions
866                     zkDb.fastForwardDataBase();
867                 } catch (IOException e) {
868                     LOG.error("Error updating DB", e);
869                     zkDb.clear();
870                 }
871             }
872         }
873 
874         requestPathMetricsCollector.shutdown();
875         unregisterJMX();
876     }
877 
unregisterJMX()878     protected void unregisterJMX() {
879         // unregister from JMX
880         try {
881             if (jmxDataTreeBean != null) {
882                 MBeanRegistry.getInstance().unregister(jmxDataTreeBean);
883             }
884         } catch (Exception e) {
885             LOG.warn("Failed to unregister with JMX", e);
886         }
887         try {
888             if (jmxServerBean != null) {
889                 MBeanRegistry.getInstance().unregister(jmxServerBean);
890             }
891         } catch (Exception e) {
892             LOG.warn("Failed to unregister with JMX", e);
893         }
894         jmxServerBean = null;
895         jmxDataTreeBean = null;
896     }
897 
incInProcess()898     public void incInProcess() {
899         requestsInProcess.incrementAndGet();
900     }
901 
decInProcess()902     public void decInProcess() {
903         requestsInProcess.decrementAndGet();
904         if (requestThrottler != null) {
905             requestThrottler.throttleWake();
906         }
907     }
908 
getInProcess()909     public int getInProcess() {
910         return requestsInProcess.get();
911     }
912 
getInflight()913     public int getInflight() {
914         return requestThrottleInflight();
915     }
916 
requestThrottleInflight()917     private int requestThrottleInflight() {
918         if (requestThrottler != null) {
919             return requestThrottler.getInflight();
920         }
921         return 0;
922     }
923 
924     static class PrecalculatedDigest {
925         final long nodeDigest;
926         final long treeDigest;
927 
PrecalculatedDigest(long nodeDigest, long treeDigest)928         PrecalculatedDigest(long nodeDigest, long treeDigest) {
929             this.nodeDigest = nodeDigest;
930             this.treeDigest = treeDigest;
931         }
932     }
933 
934 
935     /**
936      * This structure is used to facilitate information sharing between PrepRP
937      * and FinalRP.
938      */
939     static class ChangeRecord {
940         PrecalculatedDigest precalculatedDigest;
941         byte[] data;
942 
ChangeRecord(long zxid, String path, StatPersisted stat, int childCount, List<ACL> acl)943         ChangeRecord(long zxid, String path, StatPersisted stat, int childCount, List<ACL> acl) {
944             this.zxid = zxid;
945             this.path = path;
946             this.stat = stat;
947             this.childCount = childCount;
948             this.acl = acl;
949         }
950 
951         long zxid;
952 
953         String path;
954 
955         StatPersisted stat; /* Make sure to create a new object when changing */
956 
957         int childCount;
958 
959         List<ACL> acl; /* Make sure to create a new object when changing */
960 
duplicate(long zxid)961         ChangeRecord duplicate(long zxid) {
962             StatPersisted stat = new StatPersisted();
963             if (this.stat != null) {
964                 DataTree.copyStatPersisted(this.stat, stat);
965             }
966             ChangeRecord changeRecord = new ChangeRecord(zxid, path, stat, childCount,
967                     acl == null ? new ArrayList<>() : new ArrayList<>(acl));
968             changeRecord.precalculatedDigest = precalculatedDigest;
969             changeRecord.data = data;
970             return changeRecord;
971         }
972 
973     }
974 
generatePasswd(long id)975     byte[] generatePasswd(long id) {
976         Random r = new Random(id ^ superSecret);
977         byte[] p = new byte[16];
978         r.nextBytes(p);
979         return p;
980     }
981 
checkPasswd(long sessionId, byte[] passwd)982     protected boolean checkPasswd(long sessionId, byte[] passwd) {
983         return sessionId != 0 && Arrays.equals(passwd, generatePasswd(sessionId));
984     }
985 
createSession(ServerCnxn cnxn, byte[] passwd, int timeout)986     long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) {
987         if (passwd == null) {
988             // Possible since it's just deserialized from a packet on the wire.
989             passwd = new byte[0];
990         }
991         long sessionId = sessionTracker.createSession(timeout);
992         Random r = new Random(sessionId ^ superSecret);
993         r.nextBytes(passwd);
994         ByteBuffer to = ByteBuffer.allocate(4);
995         to.putInt(timeout);
996         cnxn.setSessionId(sessionId);
997         Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, to, null);
998         submitRequest(si);
999         return sessionId;
1000     }
1001 
1002     /**
1003      * set the owner of this session as owner
1004      * @param id the session id
1005      * @param owner the owner of the session
1006      * @throws SessionExpiredException
1007      */
setOwner(long id, Object owner)1008     public void setOwner(long id, Object owner) throws SessionExpiredException {
1009         sessionTracker.setOwner(id, owner);
1010     }
1011 
revalidateSession(ServerCnxn cnxn, long sessionId, int sessionTimeout)1012     protected void revalidateSession(ServerCnxn cnxn, long sessionId, int sessionTimeout) throws IOException {
1013         boolean rc = sessionTracker.touchSession(sessionId, sessionTimeout);
1014         if (LOG.isTraceEnabled()) {
1015             ZooTrace.logTraceMessage(
1016                 LOG,
1017                 ZooTrace.SESSION_TRACE_MASK,
1018                 "Session 0x" + Long.toHexString(sessionId) + " is valid: " + rc);
1019         }
1020         finishSessionInit(cnxn, rc);
1021     }
1022 
reopenSession(ServerCnxn cnxn, long sessionId, byte[] passwd, int sessionTimeout)1023     public void reopenSession(ServerCnxn cnxn, long sessionId, byte[] passwd, int sessionTimeout) throws IOException {
1024         if (checkPasswd(sessionId, passwd)) {
1025             revalidateSession(cnxn, sessionId, sessionTimeout);
1026         } else {
1027             LOG.warn(
1028                 "Incorrect password from {} for session 0x{}",
1029                 cnxn.getRemoteSocketAddress(),
1030                 Long.toHexString(sessionId));
1031             finishSessionInit(cnxn, false);
1032         }
1033     }
1034 
finishSessionInit(ServerCnxn cnxn, boolean valid)1035     public void finishSessionInit(ServerCnxn cnxn, boolean valid) {
1036         // register with JMX
1037         try {
1038             if (valid) {
1039                 if (serverCnxnFactory != null && serverCnxnFactory.cnxns.contains(cnxn)) {
1040                     serverCnxnFactory.registerConnection(cnxn);
1041                 } else if (secureServerCnxnFactory != null && secureServerCnxnFactory.cnxns.contains(cnxn)) {
1042                     secureServerCnxnFactory.registerConnection(cnxn);
1043                 }
1044             }
1045         } catch (Exception e) {
1046             LOG.warn("Failed to register with JMX", e);
1047         }
1048 
1049         try {
1050             ConnectResponse rsp = new ConnectResponse(
1051                 0,
1052                 valid ? cnxn.getSessionTimeout() : 0,
1053                 valid ? cnxn.getSessionId() : 0, // send 0 if session is no
1054                 // longer valid
1055                 valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]);
1056             ByteArrayOutputStream baos = new ByteArrayOutputStream();
1057             BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
1058             bos.writeInt(-1, "len");
1059             rsp.serialize(bos, "connect");
1060             if (!cnxn.isOldClient) {
1061                 bos.writeBool(this instanceof ReadOnlyZooKeeperServer, "readOnly");
1062             }
1063             baos.close();
1064             ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
1065             bb.putInt(bb.remaining() - 4).rewind();
1066             cnxn.sendBuffer(bb);
1067 
1068             if (valid) {
1069                 LOG.debug(
1070                     "Established session 0x{} with negotiated timeout {} for client {}",
1071                     Long.toHexString(cnxn.getSessionId()),
1072                     cnxn.getSessionTimeout(),
1073                     cnxn.getRemoteSocketAddress());
1074                 cnxn.enableRecv();
1075             } else {
1076 
1077                 LOG.info(
1078                     "Invalid session 0x{} for client {}, probably expired",
1079                     Long.toHexString(cnxn.getSessionId()),
1080                     cnxn.getRemoteSocketAddress());
1081                 cnxn.sendBuffer(ServerCnxnFactory.closeConn);
1082             }
1083 
1084         } catch (Exception e) {
1085             LOG.warn("Exception while establishing session, closing", e);
1086             cnxn.close(ServerCnxn.DisconnectReason.IO_EXCEPTION_IN_SESSION_INIT);
1087         }
1088     }
1089 
closeSession(ServerCnxn cnxn, RequestHeader requestHeader)1090     public void closeSession(ServerCnxn cnxn, RequestHeader requestHeader) {
1091         closeSession(cnxn.getSessionId());
1092     }
1093 
getServerId()1094     public long getServerId() {
1095         return 0;
1096     }
1097 
1098     /**
1099      * If the underlying Zookeeper server support local session, this method
1100      * will set a isLocalSession to true if a request is associated with
1101      * a local session.
1102      *
1103      * @param si
1104      */
setLocalSessionFlag(Request si)1105     protected void setLocalSessionFlag(Request si) {
1106     }
1107 
submitRequest(Request si)1108     public void submitRequest(Request si) {
1109         enqueueRequest(si);
1110     }
1111 
enqueueRequest(Request si)1112     public void enqueueRequest(Request si) {
1113         if (requestThrottler == null) {
1114             synchronized (this) {
1115                 try {
1116                     // Since all requests are passed to the request
1117                     // processor it should wait for setting up the request
1118                     // processor chain. The state will be updated to RUNNING
1119                     // after the setup.
1120                     while (state == State.INITIAL) {
1121                         wait(1000);
1122                     }
1123                 } catch (InterruptedException e) {
1124                     LOG.warn("Unexpected interruption", e);
1125                 }
1126                 if (requestThrottler == null) {
1127                     throw new RuntimeException("Not started");
1128                 }
1129             }
1130         }
1131         requestThrottler.submitRequest(si);
1132     }
1133 
submitRequestNow(Request si)1134     public void submitRequestNow(Request si) {
1135         if (firstProcessor == null) {
1136             synchronized (this) {
1137                 try {
1138                     // Since all requests are passed to the request
1139                     // processor it should wait for setting up the request
1140                     // processor chain. The state will be updated to RUNNING
1141                     // after the setup.
1142                     while (state == State.INITIAL) {
1143                         wait(1000);
1144                     }
1145                 } catch (InterruptedException e) {
1146                     LOG.warn("Unexpected interruption", e);
1147                 }
1148                 if (firstProcessor == null || state != State.RUNNING) {
1149                     throw new RuntimeException("Not started");
1150                 }
1151             }
1152         }
1153         try {
1154             touch(si.cnxn);
1155             boolean validpacket = Request.isValid(si.type);
1156             if (validpacket) {
1157                 setLocalSessionFlag(si);
1158                 firstProcessor.processRequest(si);
1159                 if (si.cnxn != null) {
1160                     incInProcess();
1161                 }
1162             } else {
1163                 LOG.warn("Received packet at server of unknown type {}", si.type);
1164                 // Update request accounting/throttling limits
1165                 requestFinished(si);
1166                 new UnimplementedRequestProcessor().processRequest(si);
1167             }
1168         } catch (MissingSessionException e) {
1169             LOG.debug("Dropping request.", e);
1170             // Update request accounting/throttling limits
1171             requestFinished(si);
1172         } catch (RequestProcessorException e) {
1173             LOG.error("Unable to process request", e);
1174             // Update request accounting/throttling limits
1175             requestFinished(si);
1176         }
1177     }
1178 
getSnapCount()1179     public static int getSnapCount() {
1180         String sc = System.getProperty(SNAP_COUNT);
1181         try {
1182             int snapCount = Integer.parseInt(sc);
1183 
1184             // snapCount must be 2 or more. See org.apache.zookeeper.server.SyncRequestProcessor
1185             if (snapCount < 2) {
1186                 LOG.warn("SnapCount should be 2 or more. Now, snapCount is reset to 2");
1187                 snapCount = 2;
1188             }
1189             return snapCount;
1190         } catch (Exception e) {
1191             return 100000;
1192         }
1193     }
1194 
getGlobalOutstandingLimit()1195     public int getGlobalOutstandingLimit() {
1196         String sc = System.getProperty(GLOBAL_OUTSTANDING_LIMIT);
1197         int limit;
1198         try {
1199             limit = Integer.parseInt(sc);
1200         } catch (Exception e) {
1201             limit = 1000;
1202         }
1203         return limit;
1204     }
1205 
getSnapSizeInBytes()1206     public static long getSnapSizeInBytes() {
1207         long size = Long.getLong("zookeeper.snapSizeLimitInKb", 4194304L); // 4GB by default
1208         if (size <= 0) {
1209             LOG.info("zookeeper.snapSizeLimitInKb set to a non-positive value {}; disabling feature", size);
1210         }
1211         return size * 1024; // Convert to bytes
1212     }
1213 
setServerCnxnFactory(ServerCnxnFactory factory)1214     public void setServerCnxnFactory(ServerCnxnFactory factory) {
1215         serverCnxnFactory = factory;
1216     }
1217 
getServerCnxnFactory()1218     public ServerCnxnFactory getServerCnxnFactory() {
1219         return serverCnxnFactory;
1220     }
1221 
getSecureServerCnxnFactory()1222     public ServerCnxnFactory getSecureServerCnxnFactory() {
1223         return secureServerCnxnFactory;
1224     }
1225 
setSecureServerCnxnFactory(ServerCnxnFactory factory)1226     public void setSecureServerCnxnFactory(ServerCnxnFactory factory) {
1227         secureServerCnxnFactory = factory;
1228     }
1229 
1230     /**
1231      * return the last proceesed id from the
1232      * datatree
1233      */
getLastProcessedZxid()1234     public long getLastProcessedZxid() {
1235         return zkDb.getDataTreeLastProcessedZxid();
1236     }
1237 
1238     /**
1239      * return the outstanding requests
1240      * in the queue, which havent been
1241      * processed yet
1242      */
getOutstandingRequests()1243     public long getOutstandingRequests() {
1244         return getInProcess();
1245     }
1246 
1247     /**
1248      * return the total number of client connections that are alive
1249      * to this server
1250      */
getNumAliveConnections()1251     public int getNumAliveConnections() {
1252         int numAliveConnections = 0;
1253 
1254         if (serverCnxnFactory != null) {
1255             numAliveConnections += serverCnxnFactory.getNumAliveConnections();
1256         }
1257 
1258         if (secureServerCnxnFactory != null) {
1259             numAliveConnections += secureServerCnxnFactory.getNumAliveConnections();
1260         }
1261 
1262         return numAliveConnections;
1263     }
1264 
1265     /**
1266      * trunccate the log to get in sync with others
1267      * if in a quorum
1268      * @param zxid the zxid that it needs to get in sync
1269      * with others
1270      * @throws IOException
1271      */
truncateLog(long zxid)1272     public void truncateLog(long zxid) throws IOException {
1273         this.zkDb.truncateLog(zxid);
1274     }
1275 
getTickTime()1276     public int getTickTime() {
1277         return tickTime;
1278     }
1279 
setTickTime(int tickTime)1280     public void setTickTime(int tickTime) {
1281         LOG.info("tickTime set to {}", tickTime);
1282         this.tickTime = tickTime;
1283     }
1284 
getThrottledOpWaitTime()1285     public static int getThrottledOpWaitTime() {
1286         return throttledOpWaitTime;
1287     }
1288 
setThrottledOpWaitTime(int time)1289     public static void setThrottledOpWaitTime(int time) {
1290         LOG.info("throttledOpWaitTime set to {}", time);
1291         throttledOpWaitTime = time;
1292     }
1293 
getMinSessionTimeout()1294     public int getMinSessionTimeout() {
1295         return minSessionTimeout;
1296     }
1297 
setMinSessionTimeout(int min)1298     public void setMinSessionTimeout(int min) {
1299         this.minSessionTimeout = min == -1 ? tickTime * 2 : min;
1300         LOG.info("minSessionTimeout set to {}", this.minSessionTimeout);
1301     }
1302 
getMaxSessionTimeout()1303     public int getMaxSessionTimeout() {
1304         return maxSessionTimeout;
1305     }
1306 
setMaxSessionTimeout(int max)1307     public void setMaxSessionTimeout(int max) {
1308         this.maxSessionTimeout = max == -1 ? tickTime * 20 : max;
1309         LOG.info("maxSessionTimeout set to {}", this.maxSessionTimeout);
1310     }
1311 
getClientPortListenBacklog()1312     public int getClientPortListenBacklog() {
1313         return listenBacklog;
1314     }
1315 
setClientPortListenBacklog(int backlog)1316     public void setClientPortListenBacklog(int backlog) {
1317         this.listenBacklog = backlog;
1318         LOG.info("clientPortListenBacklog set to {}", backlog);
1319     }
1320 
getClientPort()1321     public int getClientPort() {
1322         return serverCnxnFactory != null ? serverCnxnFactory.getLocalPort() : -1;
1323     }
1324 
getSecureClientPort()1325     public int getSecureClientPort() {
1326         return secureServerCnxnFactory != null ? secureServerCnxnFactory.getLocalPort() : -1;
1327     }
1328 
1329     /** Maximum number of connections allowed from particular host (ip) */
getMaxClientCnxnsPerHost()1330     public int getMaxClientCnxnsPerHost() {
1331         if (serverCnxnFactory != null) {
1332             return serverCnxnFactory.getMaxClientCnxnsPerHost();
1333         }
1334         if (secureServerCnxnFactory != null) {
1335             return secureServerCnxnFactory.getMaxClientCnxnsPerHost();
1336         }
1337         return -1;
1338     }
1339 
setTxnLogFactory(FileTxnSnapLog txnLog)1340     public void setTxnLogFactory(FileTxnSnapLog txnLog) {
1341         this.txnLogFactory = txnLog;
1342     }
1343 
getTxnLogFactory()1344     public FileTxnSnapLog getTxnLogFactory() {
1345         return this.txnLogFactory;
1346     }
1347 
1348     /**
1349      * Returns the elapsed sync of time of transaction log in milliseconds.
1350      */
getTxnLogElapsedSyncTime()1351     public long getTxnLogElapsedSyncTime() {
1352         return txnLogFactory.getTxnLogElapsedSyncTime();
1353     }
1354 
getState()1355     public String getState() {
1356         return "standalone";
1357     }
1358 
dumpEphemerals(PrintWriter pwriter)1359     public void dumpEphemerals(PrintWriter pwriter) {
1360         zkDb.dumpEphemerals(pwriter);
1361     }
1362 
getEphemerals()1363     public Map<Long, Set<String>> getEphemerals() {
1364         return zkDb.getEphemerals();
1365     }
1366 
getConnectionDropChance()1367     public double getConnectionDropChance() {
1368         return connThrottle.getDropChance();
1369     }
1370 
1371     @SuppressFBWarnings(value = "IS2_INCONSISTENT_SYNC", justification = "the value won't change after startup")
processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer)1372     public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer)
1373         throws IOException, ClientCnxnLimitException {
1374 
1375         BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
1376         ConnectRequest connReq = new ConnectRequest();
1377         connReq.deserialize(bia, "connect");
1378         LOG.debug(
1379             "Session establishment request from client {} client's lastZxid is 0x{}",
1380             cnxn.getRemoteSocketAddress(),
1381             Long.toHexString(connReq.getLastZxidSeen()));
1382 
1383         long sessionId = connReq.getSessionId();
1384         int tokensNeeded = 1;
1385         if (connThrottle.isConnectionWeightEnabled()) {
1386             if (sessionId == 0) {
1387                 if (localSessionEnabled) {
1388                     tokensNeeded = connThrottle.getRequiredTokensForLocal();
1389                 } else {
1390                     tokensNeeded = connThrottle.getRequiredTokensForGlobal();
1391                 }
1392             } else {
1393                 tokensNeeded = connThrottle.getRequiredTokensForRenew();
1394             }
1395         }
1396 
1397         if (!connThrottle.checkLimit(tokensNeeded)) {
1398             throw new ClientCnxnLimitException();
1399         }
1400         ServerMetrics.getMetrics().CONNECTION_TOKEN_DEFICIT.add(connThrottle.getDeficit());
1401 
1402         ServerMetrics.getMetrics().CONNECTION_REQUEST_COUNT.add(1);
1403 
1404         boolean readOnly = false;
1405         try {
1406             readOnly = bia.readBool("readOnly");
1407             cnxn.isOldClient = false;
1408         } catch (IOException e) {
1409             // this is ok -- just a packet from an old client which
1410             // doesn't contain readOnly field
1411             LOG.warn(
1412                 "Connection request from old client {}; will be dropped if server is in r-o mode",
1413                 cnxn.getRemoteSocketAddress());
1414         }
1415         if (!readOnly && this instanceof ReadOnlyZooKeeperServer) {
1416             String msg = "Refusing session request for not-read-only client " + cnxn.getRemoteSocketAddress();
1417             LOG.info(msg);
1418             throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.NOT_READ_ONLY_CLIENT);
1419         }
1420         if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
1421             String msg = "Refusing session request for client "
1422                          + cnxn.getRemoteSocketAddress()
1423                          + " as it has seen zxid 0x"
1424                          + Long.toHexString(connReq.getLastZxidSeen())
1425                          + " our last zxid is 0x"
1426                          + Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid())
1427                          + " client must try another server";
1428 
1429             LOG.info(msg);
1430             throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.CLIENT_ZXID_AHEAD);
1431         }
1432         int sessionTimeout = connReq.getTimeOut();
1433         byte[] passwd = connReq.getPasswd();
1434         int minSessionTimeout = getMinSessionTimeout();
1435         if (sessionTimeout < minSessionTimeout) {
1436             sessionTimeout = minSessionTimeout;
1437         }
1438         int maxSessionTimeout = getMaxSessionTimeout();
1439         if (sessionTimeout > maxSessionTimeout) {
1440             sessionTimeout = maxSessionTimeout;
1441         }
1442         cnxn.setSessionTimeout(sessionTimeout);
1443         // We don't want to receive any packets until we are sure that the
1444         // session is setup
1445         cnxn.disableRecv();
1446         if (sessionId == 0) {
1447             long id = createSession(cnxn, passwd, sessionTimeout);
1448             LOG.debug(
1449                 "Client attempting to establish new session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}",
1450                 Long.toHexString(id),
1451                 Long.toHexString(connReq.getLastZxidSeen()),
1452                 connReq.getTimeOut(),
1453                 cnxn.getRemoteSocketAddress());
1454         } else {
1455             validateSession(cnxn, sessionId);
1456             LOG.debug(
1457                 "Client attempting to renew session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}",
1458                 Long.toHexString(sessionId),
1459                 Long.toHexString(connReq.getLastZxidSeen()),
1460                 connReq.getTimeOut(),
1461                 cnxn.getRemoteSocketAddress());
1462             if (serverCnxnFactory != null) {
1463                 serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
1464             }
1465             if (secureServerCnxnFactory != null) {
1466                 secureServerCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
1467             }
1468             cnxn.setSessionId(sessionId);
1469             reopenSession(cnxn, sessionId, passwd, sessionTimeout);
1470             ServerMetrics.getMetrics().CONNECTION_REVALIDATE_COUNT.add(1);
1471 
1472         }
1473     }
1474 
1475     /**
1476      * Validate if a particular session can be reestablished.
1477      *
1478      * @param cnxn
1479      * @param sessionId
1480      */
validateSession(ServerCnxn cnxn, long sessionId)1481     protected void validateSession(ServerCnxn cnxn, long sessionId)
1482             throws IOException {
1483         // do nothing
1484     }
1485 
shouldThrottle(long outStandingCount)1486     public boolean shouldThrottle(long outStandingCount) {
1487         int globalOutstandingLimit = getGlobalOutstandingLimit();
1488         if (globalOutstandingLimit < getInflight() || globalOutstandingLimit < getInProcess()) {
1489             return outStandingCount > 0;
1490         }
1491         return false;
1492     }
1493 
getFlushDelay()1494     long getFlushDelay() {
1495         return flushDelay;
1496     }
1497 
setFlushDelay(long delay)1498     static void setFlushDelay(long delay) {
1499         LOG.info("{}={}", FLUSH_DELAY, delay);
1500         flushDelay = delay;
1501     }
1502 
getMaxWriteQueuePollTime()1503     long getMaxWriteQueuePollTime() {
1504         return maxWriteQueuePollTime;
1505     }
1506 
setMaxWriteQueuePollTime(long maxTime)1507     static void setMaxWriteQueuePollTime(long maxTime) {
1508         LOG.info("{}={}", MAX_WRITE_QUEUE_POLL_SIZE, maxTime);
1509         maxWriteQueuePollTime = maxTime;
1510     }
1511 
getMaxBatchSize()1512     int getMaxBatchSize() {
1513         return maxBatchSize;
1514     }
1515 
setMaxBatchSize(int size)1516     static void setMaxBatchSize(int size) {
1517         LOG.info("{}={}", MAX_BATCH_SIZE, size);
1518         maxBatchSize = size;
1519     }
1520 
initLargeRequestThrottlingSettings()1521     private void initLargeRequestThrottlingSettings() {
1522         setLargeRequestMaxBytes(Integer.getInteger("zookeeper.largeRequestMaxBytes", largeRequestMaxBytes));
1523         setLargeRequestThreshold(Integer.getInteger("zookeeper.largeRequestThreshold", -1));
1524     }
1525 
getLargeRequestMaxBytes()1526     public int getLargeRequestMaxBytes() {
1527         return largeRequestMaxBytes;
1528     }
1529 
setLargeRequestMaxBytes(int bytes)1530     public void setLargeRequestMaxBytes(int bytes) {
1531         if (bytes <= 0) {
1532             LOG.warn("Invalid max bytes for all large requests {}. It should be a positive number.", bytes);
1533             LOG.warn("Will not change the setting. The max bytes stay at {}", largeRequestMaxBytes);
1534         } else {
1535             largeRequestMaxBytes = bytes;
1536             LOG.info("The max bytes for all large requests are set to {}", largeRequestMaxBytes);
1537         }
1538     }
1539 
getLargeRequestThreshold()1540     public int getLargeRequestThreshold() {
1541         return largeRequestThreshold;
1542     }
1543 
setLargeRequestThreshold(int threshold)1544     public void setLargeRequestThreshold(int threshold) {
1545         if (threshold == 0 || threshold < -1) {
1546             LOG.warn("Invalid large request threshold {}. It should be -1 or positive. Setting to -1 ", threshold);
1547             largeRequestThreshold = -1;
1548         } else {
1549             largeRequestThreshold = threshold;
1550             LOG.info("The large request threshold is set to {}", largeRequestThreshold);
1551         }
1552     }
1553 
getLargeRequestBytes()1554     public int getLargeRequestBytes() {
1555         return currentLargeRequestBytes.get();
1556     }
1557 
isLargeRequest(int length)1558     private boolean isLargeRequest(int length) {
1559         // The large request limit is disabled when threshold is -1
1560         if (largeRequestThreshold == -1) {
1561             return false;
1562         }
1563         return length > largeRequestThreshold;
1564     }
1565 
checkRequestSizeWhenReceivingMessage(int length)1566     public boolean checkRequestSizeWhenReceivingMessage(int length) throws IOException {
1567         if (!isLargeRequest(length)) {
1568             return true;
1569         }
1570         if (currentLargeRequestBytes.get() + length <= largeRequestMaxBytes) {
1571             return true;
1572         } else {
1573             ServerMetrics.getMetrics().LARGE_REQUESTS_REJECTED.add(1);
1574             throw new IOException("Rejecting large request");
1575         }
1576 
1577     }
1578 
checkRequestSizeWhenMessageReceived(int length)1579     private boolean checkRequestSizeWhenMessageReceived(int length) throws IOException {
1580         if (!isLargeRequest(length)) {
1581             return true;
1582         }
1583 
1584         int bytes = currentLargeRequestBytes.addAndGet(length);
1585         if (bytes > largeRequestMaxBytes) {
1586             currentLargeRequestBytes.addAndGet(-length);
1587             ServerMetrics.getMetrics().LARGE_REQUESTS_REJECTED.add(1);
1588             throw new IOException("Rejecting large request");
1589         }
1590         return true;
1591     }
1592 
requestFinished(Request request)1593     public void requestFinished(Request request) {
1594         int largeRequestLength = request.getLargeRequestSize();
1595         if (largeRequestLength != -1) {
1596             currentLargeRequestBytes.addAndGet(-largeRequestLength);
1597         }
1598     }
1599 
processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer)1600     public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
1601         // We have the request, now process and setup for next
1602         InputStream bais = new ByteBufferInputStream(incomingBuffer);
1603         BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
1604         RequestHeader h = new RequestHeader();
1605         h.deserialize(bia, "header");
1606 
1607         // Need to increase the outstanding request count first, otherwise
1608         // there might be a race condition that it enabled recv after
1609         // processing request and then disabled when check throttling.
1610         //
1611         // Be aware that we're actually checking the global outstanding
1612         // request before this request.
1613         //
1614         // It's fine if the IOException thrown before we decrease the count
1615         // in cnxn, since it will close the cnxn anyway.
1616         cnxn.incrOutstandingAndCheckThrottle(h);
1617 
1618         // Through the magic of byte buffers, txn will not be
1619         // pointing
1620         // to the start of the txn
1621         incomingBuffer = incomingBuffer.slice();
1622         if (h.getType() == OpCode.auth) {
1623             LOG.info("got auth packet {}", cnxn.getRemoteSocketAddress());
1624             AuthPacket authPacket = new AuthPacket();
1625             ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket);
1626             String scheme = authPacket.getScheme();
1627             ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme);
1628             Code authReturn = KeeperException.Code.AUTHFAILED;
1629             if (ap != null) {
1630                 try {
1631                     // handleAuthentication may close the connection, to allow the client to choose
1632                     // a different server to connect to.
1633                     authReturn = ap.handleAuthentication(
1634                         new ServerAuthenticationProvider.ServerObjs(this, cnxn),
1635                         authPacket.getAuth());
1636                 } catch (RuntimeException e) {
1637                     LOG.warn("Caught runtime exception from AuthenticationProvider: {}", scheme, e);
1638                     authReturn = KeeperException.Code.AUTHFAILED;
1639                 }
1640             }
1641             if (authReturn == KeeperException.Code.OK) {
1642                 LOG.info("Session 0x{}: auth success for scheme {} and address {}",
1643                         Long.toHexString(cnxn.getSessionId()), scheme,
1644                         cnxn.getRemoteSocketAddress());
1645                 ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
1646                 cnxn.sendResponse(rh, null, null);
1647             } else {
1648                 if (ap == null) {
1649                     LOG.warn(
1650                         "No authentication provider for scheme: {} has {}",
1651                         scheme,
1652                         ProviderRegistry.listProviders());
1653                 } else {
1654                     LOG.warn("Authentication failed for scheme: {}", scheme);
1655                 }
1656                 // send a response...
1657                 ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.AUTHFAILED.intValue());
1658                 cnxn.sendResponse(rh, null, null);
1659                 // ... and close connection
1660                 cnxn.sendBuffer(ServerCnxnFactory.closeConn);
1661                 cnxn.disableRecv();
1662             }
1663             return;
1664         } else if (h.getType() == OpCode.sasl) {
1665             processSasl(incomingBuffer, cnxn, h);
1666         } else {
1667             if (!authHelper.enforceAuthentication(cnxn, h.getXid())) {
1668                 // Authentication enforcement is failed
1669                 // Already sent response to user about failure and closed the session, lets return
1670                 return;
1671             } else {
1672                 Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo());
1673                 int length = incomingBuffer.limit();
1674                 if (isLargeRequest(length)) {
1675                     // checkRequestSize will throw IOException if request is rejected
1676                     checkRequestSizeWhenMessageReceived(length);
1677                     si.setLargeRequestSize(length);
1678                 }
1679                 si.setOwner(ServerCnxn.me);
1680                 submitRequest(si);
1681             }
1682         }
1683     }
1684 
isSaslSuperUser(String id)1685     private static boolean isSaslSuperUser(String id) {
1686         if (id == null || id.isEmpty()) {
1687             return false;
1688         }
1689 
1690         Properties properties = System.getProperties();
1691         int prefixLen = SASL_SUPER_USER.length();
1692 
1693         for (String k : properties.stringPropertyNames()) {
1694             if (k.startsWith(SASL_SUPER_USER)
1695                 && (k.length() == prefixLen || k.charAt(prefixLen) == '.')) {
1696                 String value = properties.getProperty(k);
1697 
1698                 if (value != null && value.equals(id)) {
1699                     return true;
1700                 }
1701             }
1702         }
1703 
1704         return false;
1705     }
1706 
shouldAllowSaslFailedClientsConnect()1707     private static boolean shouldAllowSaslFailedClientsConnect() {
1708         return Boolean.getBoolean(ALLOW_SASL_FAILED_CLIENTS);
1709     }
1710 
processSasl(ByteBuffer incomingBuffer, ServerCnxn cnxn, RequestHeader requestHeader)1711     private void processSasl(ByteBuffer incomingBuffer, ServerCnxn cnxn, RequestHeader requestHeader) throws IOException {
1712         LOG.debug("Responding to client SASL token.");
1713         GetSASLRequest clientTokenRecord = new GetSASLRequest();
1714         ByteBufferInputStream.byteBuffer2Record(incomingBuffer, clientTokenRecord);
1715         byte[] clientToken = clientTokenRecord.getToken();
1716         LOG.debug("Size of client SASL token: {}", clientToken.length);
1717         byte[] responseToken = null;
1718         try {
1719             ZooKeeperSaslServer saslServer = cnxn.zooKeeperSaslServer;
1720             try {
1721                 // note that clientToken might be empty (clientToken.length == 0):
1722                 // if using the DIGEST-MD5 mechanism, clientToken will be empty at the beginning of the
1723                 // SASL negotiation process.
1724                 responseToken = saslServer.evaluateResponse(clientToken);
1725                 if (saslServer.isComplete()) {
1726                     String authorizationID = saslServer.getAuthorizationID();
1727                     LOG.info("Session 0x{}: adding SASL authorization for authorizationID: {}",
1728                             Long.toHexString(cnxn.getSessionId()), authorizationID);
1729                     cnxn.addAuthInfo(new Id("sasl", authorizationID));
1730 
1731                     if (isSaslSuperUser(authorizationID)) {
1732                         cnxn.addAuthInfo(new Id("super", ""));
1733                         LOG.info(
1734                             "Session 0x{}: Authenticated Id '{}' as super user",
1735                             Long.toHexString(cnxn.getSessionId()),
1736                             authorizationID);
1737                     }
1738                 }
1739             } catch (SaslException e) {
1740                 LOG.warn("Client {} failed to SASL authenticate: {}", cnxn.getRemoteSocketAddress(), e);
1741                 if (shouldAllowSaslFailedClientsConnect() && !authHelper.isSaslAuthRequired()) {
1742                     LOG.warn("Maintaining client connection despite SASL authentication failure.");
1743                 } else {
1744                     int error;
1745                     if (authHelper.isSaslAuthRequired()) {
1746                         LOG.warn(
1747                             "Closing client connection due to server requires client SASL authenticaiton,"
1748                                 + "but client SASL authentication has failed, or client is not configured with SASL "
1749                                 + "authentication.");
1750                         error = Code.SESSIONCLOSEDREQUIRESASLAUTH.intValue();
1751                     } else {
1752                         LOG.warn("Closing client connection due to SASL authentication failure.");
1753                         error = Code.AUTHFAILED.intValue();
1754                     }
1755 
1756                     ReplyHeader replyHeader = new ReplyHeader(requestHeader.getXid(), 0, error);
1757                     cnxn.sendResponse(replyHeader, new SetSASLResponse(null), "response");
1758                     cnxn.sendCloseSession();
1759                     cnxn.disableRecv();
1760                     return;
1761                 }
1762             }
1763         } catch (NullPointerException e) {
1764             LOG.error("cnxn.saslServer is null: cnxn object did not initialize its saslServer properly.");
1765         }
1766         if (responseToken != null) {
1767             LOG.debug("Size of server SASL response: {}", responseToken.length);
1768         }
1769 
1770         ReplyHeader replyHeader = new ReplyHeader(requestHeader.getXid(), 0, Code.OK.intValue());
1771         Record record = new SetSASLResponse(responseToken);
1772         cnxn.sendResponse(replyHeader, record, "response");
1773     }
1774 
1775     // entry point for quorum/Learner.java
processTxn(TxnHeader hdr, Record txn)1776     public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
1777         processTxnForSessionEvents(null, hdr, txn);
1778         return processTxnInDB(hdr, txn, null);
1779     }
1780 
1781     // entry point for FinalRequestProcessor.java
processTxn(Request request)1782     public ProcessTxnResult processTxn(Request request) {
1783         TxnHeader hdr = request.getHdr();
1784         processTxnForSessionEvents(request, hdr, request.getTxn());
1785 
1786         final boolean writeRequest = (hdr != null);
1787         final boolean quorumRequest = request.isQuorum();
1788 
1789         // return fast w/o synchronization when we get a read
1790         if (!writeRequest && !quorumRequest) {
1791             return new ProcessTxnResult();
1792         }
1793         synchronized (outstandingChanges) {
1794             ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn(), request.getTxnDigest());
1795 
1796             // request.hdr is set for write requests, which are the only ones
1797             // that add to outstandingChanges.
1798             if (writeRequest) {
1799                 long zxid = hdr.getZxid();
1800                 while (!outstandingChanges.isEmpty()
1801                         && outstandingChanges.peek().zxid <= zxid) {
1802                     ChangeRecord cr = outstandingChanges.remove();
1803                     ServerMetrics.getMetrics().OUTSTANDING_CHANGES_REMOVED.add(1);
1804                     if (cr.zxid < zxid) {
1805                         LOG.warn(
1806                             "Zxid outstanding 0x{} is less than current 0x{}",
1807                             Long.toHexString(cr.zxid),
1808                             Long.toHexString(zxid));
1809                     }
1810                     if (outstandingChangesForPath.get(cr.path) == cr) {
1811                         outstandingChangesForPath.remove(cr.path);
1812                     }
1813                 }
1814             }
1815 
1816             // do not add non quorum packets to the queue.
1817             if (quorumRequest) {
1818                 getZKDatabase().addCommittedProposal(request);
1819             }
1820             return rc;
1821         }
1822     }
1823 
processTxnForSessionEvents(Request request, TxnHeader hdr, Record txn)1824     private void processTxnForSessionEvents(Request request, TxnHeader hdr, Record txn) {
1825         int opCode = (request == null) ? hdr.getType() : request.type;
1826         long sessionId = (request == null) ? hdr.getClientId() : request.sessionId;
1827 
1828         if (opCode == OpCode.createSession) {
1829             if (hdr != null && txn instanceof CreateSessionTxn) {
1830                 CreateSessionTxn cst = (CreateSessionTxn) txn;
1831                 sessionTracker.commitSession(sessionId, cst.getTimeOut());
1832             } else if (request == null || !request.isLocalSession()) {
1833                 LOG.warn("*****>>>>> Got {} {}",  txn.getClass(), txn.toString());
1834             }
1835         } else if (opCode == OpCode.closeSession) {
1836             sessionTracker.removeSession(sessionId);
1837         }
1838     }
1839 
processTxnInDB(TxnHeader hdr, Record txn, TxnDigest digest)1840     private ProcessTxnResult processTxnInDB(TxnHeader hdr, Record txn, TxnDigest digest) {
1841         if (hdr == null) {
1842             return new ProcessTxnResult();
1843         } else {
1844             return getZKDatabase().processTxn(hdr, txn, digest);
1845         }
1846     }
1847 
getSessionExpiryMap()1848     public Map<Long, Set<Long>> getSessionExpiryMap() {
1849         return sessionTracker.getSessionExpiryMap();
1850     }
1851 
1852     /**
1853      * This method is used to register the ZooKeeperServerShutdownHandler to get
1854      * server's error or shutdown state change notifications.
1855      * {@link ZooKeeperServerShutdownHandler#handle(State)} will be called for
1856      * every server state changes {@link #setState(State)}.
1857      *
1858      * @param zkShutdownHandler shutdown handler
1859      */
registerServerShutdownHandler(ZooKeeperServerShutdownHandler zkShutdownHandler)1860     void registerServerShutdownHandler(ZooKeeperServerShutdownHandler zkShutdownHandler) {
1861         this.zkShutdownHandler = zkShutdownHandler;
1862     }
1863 
isResponseCachingEnabled()1864     public boolean isResponseCachingEnabled() {
1865         return isResponseCachingEnabled;
1866     }
1867 
setResponseCachingEnabled(boolean isEnabled)1868     public void setResponseCachingEnabled(boolean isEnabled) {
1869         isResponseCachingEnabled = isEnabled;
1870     }
1871 
getReadResponseCache()1872     public ResponseCache getReadResponseCache() {
1873         return isResponseCachingEnabled ? readResponseCache : null;
1874     }
1875 
getGetChildrenResponseCache()1876     public ResponseCache getGetChildrenResponseCache() {
1877         return isResponseCachingEnabled ? getChildrenResponseCache : null;
1878     }
1879 
registerMetrics()1880     protected void registerMetrics() {
1881         MetricsContext rootContext = ServerMetrics.getMetrics().getMetricsProvider().getRootContext();
1882 
1883         final ZKDatabase zkdb = this.getZKDatabase();
1884         final ServerStats stats = this.serverStats();
1885 
1886         rootContext.registerGauge("avg_latency", stats::getAvgLatency);
1887 
1888         rootContext.registerGauge("max_latency", stats::getMaxLatency);
1889         rootContext.registerGauge("min_latency", stats::getMinLatency);
1890 
1891         rootContext.registerGauge("packets_received", stats::getPacketsReceived);
1892         rootContext.registerGauge("packets_sent", stats::getPacketsSent);
1893         rootContext.registerGauge("num_alive_connections", stats::getNumAliveClientConnections);
1894 
1895         rootContext.registerGauge("outstanding_requests", stats::getOutstandingRequests);
1896         rootContext.registerGauge("uptime", stats::getUptime);
1897 
1898         rootContext.registerGauge("znode_count", zkdb::getNodeCount);
1899 
1900         rootContext.registerGauge("watch_count", zkdb.getDataTree()::getWatchCount);
1901         rootContext.registerGauge("ephemerals_count", zkdb.getDataTree()::getEphemeralsCount);
1902 
1903         rootContext.registerGauge("approximate_data_size", zkdb.getDataTree()::cachedApproximateDataSize);
1904 
1905         rootContext.registerGauge("global_sessions", zkdb::getSessionCount);
1906         rootContext.registerGauge("local_sessions", this.getSessionTracker()::getLocalSessionCount);
1907 
1908         OSMXBean osMbean = new OSMXBean();
1909         rootContext.registerGauge("open_file_descriptor_count", osMbean::getOpenFileDescriptorCount);
1910         rootContext.registerGauge("max_file_descriptor_count", osMbean::getMaxFileDescriptorCount);
1911         rootContext.registerGauge("connection_drop_probability", this::getConnectionDropChance);
1912 
1913         rootContext.registerGauge("last_client_response_size", stats.getClientResponseStats()::getLastBufferSize);
1914         rootContext.registerGauge("max_client_response_size", stats.getClientResponseStats()::getMaxBufferSize);
1915         rootContext.registerGauge("min_client_response_size", stats.getClientResponseStats()::getMinBufferSize);
1916 
1917         rootContext.registerGauge("outstanding_tls_handshake", this::getOutstandingHandshakeNum);
1918         rootContext.registerGauge("auth_failed_count", stats::getAuthFailedCount);
1919         rootContext.registerGauge("non_mtls_remote_conn_count", stats::getNonMTLSRemoteConnCount);
1920         rootContext.registerGauge("non_mtls_local_conn_count", stats::getNonMTLSLocalConnCount);
1921     }
1922 
unregisterMetrics()1923     protected void unregisterMetrics() {
1924 
1925         MetricsContext rootContext = ServerMetrics.getMetrics().getMetricsProvider().getRootContext();
1926 
1927         rootContext.unregisterGauge("avg_latency");
1928 
1929         rootContext.unregisterGauge("max_latency");
1930         rootContext.unregisterGauge("min_latency");
1931 
1932         rootContext.unregisterGauge("packets_received");
1933         rootContext.unregisterGauge("packets_sent");
1934         rootContext.unregisterGauge("num_alive_connections");
1935 
1936         rootContext.unregisterGauge("outstanding_requests");
1937         rootContext.unregisterGauge("uptime");
1938 
1939         rootContext.unregisterGauge("znode_count");
1940 
1941         rootContext.unregisterGauge("watch_count");
1942         rootContext.unregisterGauge("ephemerals_count");
1943         rootContext.unregisterGauge("approximate_data_size");
1944 
1945         rootContext.unregisterGauge("global_sessions");
1946         rootContext.unregisterGauge("local_sessions");
1947 
1948         rootContext.unregisterGauge("open_file_descriptor_count");
1949         rootContext.unregisterGauge("max_file_descriptor_count");
1950         rootContext.unregisterGauge("connection_drop_probability");
1951 
1952         rootContext.unregisterGauge("last_client_response_size");
1953         rootContext.unregisterGauge("max_client_response_size");
1954         rootContext.unregisterGauge("min_client_response_size");
1955 
1956         rootContext.unregisterGauge("auth_failed_count");
1957         rootContext.unregisterGauge("non_mtls_remote_conn_count");
1958         rootContext.unregisterGauge("non_mtls_local_conn_count");
1959 
1960 
1961     }
1962 
1963     /**
1964      * Hook into admin server, useful to expose additional data
1965      * that do not represent metrics.
1966      *
1967      * @param response a sink which collects the data.
1968      */
dumpMonitorValues(BiConsumer<String, Object> response)1969     public void dumpMonitorValues(BiConsumer<String, Object> response) {
1970         ServerStats stats = serverStats();
1971         response.accept("version", Version.getFullVersion());
1972         response.accept("server_state", stats.getServerState());
1973     }
1974 
1975     /**
1976      * Grant or deny authorization to an operation on a node as a function of:
1977      * @param cnxn :    the server connection
1978      * @param acl :     set of ACLs for the node
1979      * @param perm :    the permission that the client is requesting
1980      * @param ids :     the credentials supplied by the client
1981      * @param path :    the ZNode path
1982      * @param setAcls : for set ACL operations, the list of ACLs being set. Otherwise null.
1983      */
checkACL(ServerCnxn cnxn, List<ACL> acl, int perm, List<Id> ids, String path, List<ACL> setAcls)1984     public void checkACL(ServerCnxn cnxn, List<ACL> acl, int perm, List<Id> ids, String path, List<ACL> setAcls) throws KeeperException.NoAuthException {
1985         if (skipACL) {
1986             return;
1987         }
1988 
1989         LOG.debug("Permission requested: {} ", perm);
1990         LOG.debug("ACLs for node: {}", acl);
1991         LOG.debug("Client credentials: {}", ids);
1992 
1993         if (acl == null || acl.size() == 0) {
1994             return;
1995         }
1996         for (Id authId : ids) {
1997             if (authId.getScheme().equals("super")) {
1998                 return;
1999             }
2000         }
2001         for (ACL a : acl) {
2002             Id id = a.getId();
2003             if ((a.getPerms() & perm) != 0) {
2004                 if (id.getScheme().equals("world") && id.getId().equals("anyone")) {
2005                     return;
2006                 }
2007                 ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(id.getScheme());
2008                 if (ap != null) {
2009                     for (Id authId : ids) {
2010                         if (authId.getScheme().equals(id.getScheme())
2011                             && ap.matches(
2012                                 new ServerAuthenticationProvider.ServerObjs(this, cnxn),
2013                                 new ServerAuthenticationProvider.MatchValues(path, authId.getId(), id.getId(), perm, setAcls))) {
2014                             return;
2015                         }
2016                     }
2017                 }
2018             }
2019         }
2020         throw new KeeperException.NoAuthException();
2021     }
2022 
2023     /**
2024      * check a path whether exceeded the quota.
2025      *
2026      * @param path
2027      *            the path of the node, used for the quota prefix check
2028      * @param lastData
2029      *            the current node data, {@code null} for none
2030      * @param data
2031      *            the data to be set, or {@code null} for none
2032      * @param type
2033      *            currently, create and setData need to check quota
2034      */
checkQuota(String path, byte[] lastData, byte[] data, int type)2035     public void checkQuota(String path, byte[] lastData, byte[] data, int type) throws KeeperException.QuotaExceededException {
2036         if (!enforceQuota) {
2037             return;
2038         }
2039         long dataBytes = (data == null) ? 0 : data.length;
2040         ZKDatabase zkDatabase = getZKDatabase();
2041         String lastPrefix = zkDatabase.getDataTree().getMaxPrefixWithQuota(path);
2042         if (StringUtils.isEmpty(lastPrefix)) {
2043             return;
2044         }
2045 
2046         switch (type) {
2047             case OpCode.create:
2048                 checkQuota(lastPrefix, dataBytes, 1);
2049                 break;
2050             case OpCode.setData:
2051                 checkQuota(lastPrefix, dataBytes - (lastData == null ? 0 : lastData.length), 0);
2052                 break;
2053              default:
2054                  throw new IllegalArgumentException("Unsupported OpCode for checkQuota: " + type);
2055         }
2056     }
2057 
2058     /**
2059      * check a path whether exceeded the quota.
2060      *
2061      * @param lastPrefix
2062                   the path of the node which has a quota.
2063      * @param bytesDiff
2064      *            the diff to be added to number of bytes
2065      * @param countDiff
2066      *            the diff to be added to the count
2067      */
checkQuota(String lastPrefix, long bytesDiff, long countDiff)2068     private void checkQuota(String lastPrefix, long bytesDiff, long countDiff)
2069             throws KeeperException.QuotaExceededException {
2070         LOG.debug("checkQuota: lastPrefix={}, bytesDiff={}, countDiff={}", lastPrefix, bytesDiff, countDiff);
2071 
2072         // now check the quota we set
2073         String limitNode = Quotas.limitPath(lastPrefix);
2074         DataNode node = getZKDatabase().getNode(limitNode);
2075         StatsTrack limitStats;
2076         if (node == null) {
2077             // should not happen
2078             LOG.error("Missing limit node for quota {}", limitNode);
2079             return;
2080         }
2081         synchronized (node) {
2082             limitStats = new StatsTrack(node.data);
2083         }
2084         //check the quota
2085         boolean checkCountQuota = countDiff != 0 && (limitStats.getCount() > -1 || limitStats.getCountHardLimit() > -1);
2086         boolean checkByteQuota = bytesDiff != 0 && (limitStats.getBytes() > -1 || limitStats.getByteHardLimit() > -1);
2087 
2088         if (!checkCountQuota && !checkByteQuota) {
2089             return;
2090         }
2091 
2092         //check the statPath quota
2093         String statNode = Quotas.statPath(lastPrefix);
2094         node = getZKDatabase().getNode(statNode);
2095 
2096         StatsTrack currentStats;
2097         if (node == null) {
2098             // should not happen
2099             LOG.error("Missing node for stat {}", statNode);
2100             return;
2101         }
2102         synchronized (node) {
2103             currentStats = new StatsTrack(node.data);
2104         }
2105 
2106         //check the Count Quota
2107         if (checkCountQuota) {
2108             long newCount = currentStats.getCount() + countDiff;
2109             boolean isCountHardLimit = limitStats.getCountHardLimit() > -1 ? true : false;
2110             long countLimit = isCountHardLimit ? limitStats.getCountHardLimit() : limitStats.getCount();
2111 
2112             if (newCount > countLimit) {
2113                 String msg = "Quota exceeded: " + lastPrefix + " [current count=" + newCount + ", " + (isCountHardLimit ? "hard" : "soft") + "CountLimit=" + countLimit + "]";
2114                 RATE_LOGGER.rateLimitLog(msg);
2115                 if (isCountHardLimit) {
2116                     throw new KeeperException.QuotaExceededException(lastPrefix);
2117                 }
2118             }
2119         }
2120 
2121         //check the Byte Quota
2122         if (checkByteQuota) {
2123             long newBytes = currentStats.getBytes() + bytesDiff;
2124             boolean isByteHardLimit = limitStats.getByteHardLimit() > -1 ? true : false;
2125             long byteLimit = isByteHardLimit ? limitStats.getByteHardLimit() : limitStats.getBytes();
2126             if (newBytes > byteLimit) {
2127                 String msg = "Quota exceeded: " + lastPrefix + " [current bytes=" + newBytes + ", " + (isByteHardLimit ? "hard" : "soft") + "ByteLimit=" + byteLimit + "]";
2128                 RATE_LOGGER.rateLimitLog(msg);
2129                 if (isByteHardLimit) {
2130                     throw new KeeperException.QuotaExceededException(lastPrefix);
2131                 }
2132             }
2133         }
2134     }
2135 
isDigestEnabled()2136     public static boolean isDigestEnabled() {
2137         return digestEnabled;
2138     }
2139 
setDigestEnabled(boolean digestEnabled)2140     public static void setDigestEnabled(boolean digestEnabled) {
2141         LOG.info("{} = {}", ZOOKEEPER_DIGEST_ENABLED, digestEnabled);
2142         ZooKeeperServer.digestEnabled = digestEnabled;
2143     }
2144 
2145     /**
2146      * Trim a path to get the immediate predecessor.
2147      *
2148      * @param path
2149      * @return
2150      * @throws KeeperException.BadArgumentsException
2151      */
parentPath(String path)2152     private String parentPath(String path) throws KeeperException.BadArgumentsException {
2153         int lastSlash = path.lastIndexOf('/');
2154         if (lastSlash == -1 || path.indexOf('\0') != -1 || getZKDatabase().isSpecialPath(path)) {
2155             throw new KeeperException.BadArgumentsException(path);
2156         }
2157         return lastSlash == 0 ? "/" : path.substring(0, lastSlash);
2158     }
2159 
effectiveACLPath(Request request)2160     private String effectiveACLPath(Request request) throws KeeperException.BadArgumentsException, KeeperException.InvalidACLException {
2161         boolean mustCheckACL = false;
2162         String path = null;
2163         List<ACL> acl = null;
2164 
2165         switch (request.type) {
2166         case OpCode.create:
2167         case OpCode.create2: {
2168             CreateRequest req = new CreateRequest();
2169             if (buffer2Record(request.request, req)) {
2170                 mustCheckACL = true;
2171                 acl = req.getAcl();
2172                 path = parentPath(req.getPath());
2173             }
2174             break;
2175         }
2176         case OpCode.delete: {
2177             DeleteRequest req = new DeleteRequest();
2178             if (buffer2Record(request.request, req)) {
2179                 path = parentPath(req.getPath());
2180             }
2181             break;
2182         }
2183         case OpCode.setData: {
2184             SetDataRequest req = new SetDataRequest();
2185             if (buffer2Record(request.request, req)) {
2186                 path = req.getPath();
2187             }
2188             break;
2189         }
2190         case OpCode.setACL: {
2191             SetACLRequest req = new SetACLRequest();
2192             if (buffer2Record(request.request, req)) {
2193                 mustCheckACL = true;
2194                 acl = req.getAcl();
2195                 path = req.getPath();
2196             }
2197             break;
2198         }
2199         }
2200 
2201         if (mustCheckACL) {
2202             /* we ignore the extrapolated ACL returned by fixupACL because
2203              * we only care about it being well-formed (and if it isn't, an
2204              * exception will be raised).
2205              */
2206             PrepRequestProcessor.fixupACL(path, request.authInfo, acl);
2207         }
2208 
2209         return path;
2210     }
2211 
effectiveACLPerms(Request request)2212     private int effectiveACLPerms(Request request) {
2213         switch (request.type) {
2214         case OpCode.create:
2215         case OpCode.create2:
2216             return ZooDefs.Perms.CREATE;
2217         case OpCode.delete:
2218             return ZooDefs.Perms.DELETE;
2219         case OpCode.setData:
2220             return ZooDefs.Perms.WRITE;
2221         case OpCode.setACL:
2222             return ZooDefs.Perms.ADMIN;
2223         default:
2224             return ZooDefs.Perms.ALL;
2225         }
2226     }
2227 
2228     /**
2229      * Check Write Requests for Potential Access Restrictions
2230      * <p/>
2231      * Before a request is being proposed to the quorum, lets check it
2232      * against local ACLs. Non-write requests (read, session, etc.)
2233      * are passed along. Invalid requests are sent a response.
2234      * <p/>
2235      * While we are at it, if the request will set an ACL: make sure it's
2236      * a valid one.
2237      *
2238      * @param request
2239      * @return true if request is permitted, false if not.
2240      * @throws java.io.IOException
2241      */
authWriteRequest(Request request)2242     public boolean authWriteRequest(Request request) {
2243         int err;
2244         String pathToCheck;
2245 
2246         if (!enableEagerACLCheck) {
2247             return true;
2248         }
2249 
2250         err = KeeperException.Code.OK.intValue();
2251 
2252         try {
2253             pathToCheck = effectiveACLPath(request);
2254             if (pathToCheck != null) {
2255                 checkACL(request.cnxn, zkDb.getACL(pathToCheck, null), effectiveACLPerms(request), request.authInfo, pathToCheck, null);
2256             }
2257         } catch (KeeperException.NoAuthException e) {
2258             LOG.debug("Request failed ACL check", e);
2259             err = e.code().intValue();
2260         } catch (KeeperException.InvalidACLException e) {
2261             LOG.debug("Request has an invalid ACL check", e);
2262             err = e.code().intValue();
2263         } catch (KeeperException.NoNodeException e) {
2264             LOG.debug("ACL check against non-existent node: {}", e.getMessage());
2265         } catch (KeeperException.BadArgumentsException e) {
2266             LOG.debug("ACL check against illegal node path: {}", e.getMessage());
2267         } catch (Throwable t) {
2268             LOG.error("Uncaught exception in authWriteRequest with: ", t);
2269             throw t;
2270         } finally {
2271             if (err != KeeperException.Code.OK.intValue()) {
2272                 /*  This request has a bad ACL, so we are dismissing it early. */
2273                 decInProcess();
2274                 ReplyHeader rh = new ReplyHeader(request.cxid, 0, err);
2275                 try {
2276                     request.cnxn.sendResponse(rh, null, null);
2277                 } catch (IOException e) {
2278                     LOG.error("IOException : {}", e);
2279                 }
2280             }
2281         }
2282 
2283         return err == KeeperException.Code.OK.intValue();
2284     }
2285 
buffer2Record(ByteBuffer request, Record record)2286     private boolean buffer2Record(ByteBuffer request, Record record) {
2287         boolean rv = false;
2288         try {
2289             ByteBufferInputStream.byteBuffer2Record(request, record);
2290             request.rewind();
2291             rv = true;
2292         } catch (IOException ex) {
2293         }
2294 
2295         return rv;
2296     }
2297 
getOutstandingHandshakeNum()2298     public int getOutstandingHandshakeNum() {
2299         if (serverCnxnFactory instanceof NettyServerCnxnFactory) {
2300             return ((NettyServerCnxnFactory) serverCnxnFactory).getOutstandingHandshakeNum();
2301         } else {
2302             return 0;
2303         }
2304     }
2305 
isReconfigEnabled()2306     public boolean isReconfigEnabled() {
2307         return this.reconfigEnabled;
2308     }
2309 
getZkShutdownHandler()2310     public ZooKeeperServerShutdownHandler getZkShutdownHandler() {
2311         return zkShutdownHandler;
2312     }
2313 
2314 }
2315