1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.zookeeper.server; 20 21 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; 22 import java.io.ByteArrayOutputStream; 23 import java.io.File; 24 import java.io.IOException; 25 import java.io.InputStream; 26 import java.io.PrintWriter; 27 import java.nio.ByteBuffer; 28 import java.util.ArrayDeque; 29 import java.util.ArrayList; 30 import java.util.Arrays; 31 import java.util.Deque; 32 import java.util.HashMap; 33 import java.util.List; 34 import java.util.Map; 35 import java.util.Properties; 36 import java.util.Random; 37 import java.util.Set; 38 import java.util.concurrent.atomic.AtomicInteger; 39 import java.util.concurrent.atomic.AtomicLong; 40 import java.util.function.BiConsumer; 41 import javax.security.sasl.SaslException; 42 import org.apache.jute.BinaryInputArchive; 43 import org.apache.jute.BinaryOutputArchive; 44 import org.apache.jute.Record; 45 import org.apache.zookeeper.Environment; 46 import org.apache.zookeeper.KeeperException; 47 import org.apache.zookeeper.KeeperException.Code; 48 import org.apache.zookeeper.KeeperException.SessionExpiredException; 49 import org.apache.zookeeper.Quotas; 50 import org.apache.zookeeper.StatsTrack; 51 import org.apache.zookeeper.Version; 52 import org.apache.zookeeper.ZooDefs; 53 import org.apache.zookeeper.ZooDefs.OpCode; 54 import org.apache.zookeeper.ZookeeperBanner; 55 import org.apache.zookeeper.common.StringUtils; 56 import org.apache.zookeeper.common.Time; 57 import org.apache.zookeeper.data.ACL; 58 import org.apache.zookeeper.data.Id; 59 import org.apache.zookeeper.data.StatPersisted; 60 import org.apache.zookeeper.jmx.MBeanRegistry; 61 import org.apache.zookeeper.metrics.MetricsContext; 62 import org.apache.zookeeper.proto.AuthPacket; 63 import org.apache.zookeeper.proto.ConnectRequest; 64 import org.apache.zookeeper.proto.ConnectResponse; 65 import org.apache.zookeeper.proto.CreateRequest; 66 import org.apache.zookeeper.proto.DeleteRequest; 67 import org.apache.zookeeper.proto.GetSASLRequest; 68 import org.apache.zookeeper.proto.ReplyHeader; 69 import org.apache.zookeeper.proto.RequestHeader; 70 import org.apache.zookeeper.proto.SetACLRequest; 71 import org.apache.zookeeper.proto.SetDataRequest; 72 import org.apache.zookeeper.proto.SetSASLResponse; 73 import org.apache.zookeeper.server.DataTree.ProcessTxnResult; 74 import org.apache.zookeeper.server.RequestProcessor.RequestProcessorException; 75 import org.apache.zookeeper.server.ServerCnxn.CloseRequestException; 76 import org.apache.zookeeper.server.SessionTracker.Session; 77 import org.apache.zookeeper.server.SessionTracker.SessionExpirer; 78 import org.apache.zookeeper.server.auth.ProviderRegistry; 79 import org.apache.zookeeper.server.auth.ServerAuthenticationProvider; 80 import org.apache.zookeeper.server.persistence.FileTxnSnapLog; 81 import org.apache.zookeeper.server.quorum.QuorumPeerConfig; 82 import org.apache.zookeeper.server.quorum.ReadOnlyZooKeeperServer; 83 import org.apache.zookeeper.server.util.JvmPauseMonitor; 84 import org.apache.zookeeper.server.util.OSMXBean; 85 import org.apache.zookeeper.server.util.RequestPathMetricsCollector; 86 import org.apache.zookeeper.txn.CreateSessionTxn; 87 import org.apache.zookeeper.txn.TxnDigest; 88 import org.apache.zookeeper.txn.TxnHeader; 89 import org.apache.zookeeper.util.ServiceUtils; 90 import org.slf4j.Logger; 91 import org.slf4j.LoggerFactory; 92 93 /** 94 * This class implements a simple standalone ZooKeeperServer. It sets up the 95 * following chain of RequestProcessors to process requests: 96 * PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor 97 */ 98 public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { 99 100 protected static final Logger LOG; 101 private static final RateLogger RATE_LOGGER; 102 103 public static final String GLOBAL_OUTSTANDING_LIMIT = "zookeeper.globalOutstandingLimit"; 104 105 public static final String ENABLE_EAGER_ACL_CHECK = "zookeeper.enableEagerACLCheck"; 106 public static final String SKIP_ACL = "zookeeper.skipACL"; 107 public static final String ENFORCE_QUOTA = "zookeeper.enforceQuota"; 108 109 // When enabled, will check ACL constraints appertained to the requests first, 110 // before sending the requests to the quorum. 111 static final boolean enableEagerACLCheck; 112 113 static final boolean skipACL; 114 115 public static final boolean enforceQuota; 116 117 public static final String SASL_SUPER_USER = "zookeeper.superUser"; 118 119 public static final String ALLOW_SASL_FAILED_CLIENTS = "zookeeper.allowSaslFailedClients"; 120 public static final String ZOOKEEPER_DIGEST_ENABLED = "zookeeper.digest.enabled"; 121 private static boolean digestEnabled; 122 123 // Add a enable/disable option for now, we should remove this one when 124 // this feature is confirmed to be stable 125 public static final String CLOSE_SESSION_TXN_ENABLED = "zookeeper.closeSessionTxn.enabled"; 126 private static boolean closeSessionTxnEnabled = true; 127 128 static { 129 LOG = LoggerFactory.getLogger(ZooKeeperServer.class); 130 131 RATE_LOGGER = new RateLogger(LOG); 132 133 ZookeeperBanner.printBanner(LOG); 134 135 Environment.logEnv("Server environment:", LOG); 136 137 enableEagerACLCheck = Boolean.getBoolean(ENABLE_EAGER_ACL_CHECK); 138 LOG.info("{} = {}", ENABLE_EAGER_ACL_CHECK, enableEagerACLCheck); 139 140 skipACL = System.getProperty(SKIP_ACL, "no").equals("yes"); 141 if (skipACL) { 142 LOG.info("{}==\"yes\", ACL checks will be skipped", SKIP_ACL); 143 } 144 145 enforceQuota = Boolean.parseBoolean(System.getProperty(ENFORCE_QUOTA, "false")); 146 if (enforceQuota) { 147 LOG.info("{} = {}, Quota Enforce enables", ENFORCE_QUOTA, enforceQuota); 148 } 149 150 digestEnabled = Boolean.parseBoolean(System.getProperty(ZOOKEEPER_DIGEST_ENABLED, "true")); 151 LOG.info("{} = {}", ZOOKEEPER_DIGEST_ENABLED, digestEnabled); 152 153 closeSessionTxnEnabled = Boolean.parseBoolean( 154 System.getProperty(CLOSE_SESSION_TXN_ENABLED, "true")); 155 LOG.info("{} = {}", CLOSE_SESSION_TXN_ENABLED, closeSessionTxnEnabled); 156 } 157 isCloseSessionTxnEnabled()158 public static boolean isCloseSessionTxnEnabled() { 159 return closeSessionTxnEnabled; 160 } 161 setCloseSessionTxnEnabled(boolean enabled)162 public static void setCloseSessionTxnEnabled(boolean enabled) { 163 ZooKeeperServer.closeSessionTxnEnabled = enabled; 164 LOG.info("Update {} to {}", CLOSE_SESSION_TXN_ENABLED, 165 ZooKeeperServer.closeSessionTxnEnabled); 166 } 167 168 protected ZooKeeperServerBean jmxServerBean; 169 protected DataTreeBean jmxDataTreeBean; 170 171 public static final int DEFAULT_TICK_TIME = 3000; 172 protected int tickTime = DEFAULT_TICK_TIME; 173 public static final int DEFAULT_THROTTLED_OP_WAIT_TIME = 0; // disabled 174 protected static volatile int throttledOpWaitTime = 175 Integer.getInteger("zookeeper.throttled_op_wait_time", DEFAULT_THROTTLED_OP_WAIT_TIME); 176 /** value of -1 indicates unset, use default */ 177 protected int minSessionTimeout = -1; 178 /** value of -1 indicates unset, use default */ 179 protected int maxSessionTimeout = -1; 180 /** Socket listen backlog. Value of -1 indicates unset */ 181 protected int listenBacklog = -1; 182 protected SessionTracker sessionTracker; 183 private FileTxnSnapLog txnLogFactory = null; 184 private ZKDatabase zkDb; 185 private ResponseCache readResponseCache; 186 private ResponseCache getChildrenResponseCache; 187 private final AtomicLong hzxid = new AtomicLong(0); 188 public static final Exception ok = new Exception("No prob"); 189 protected RequestProcessor firstProcessor; 190 protected JvmPauseMonitor jvmPauseMonitor; 191 protected volatile State state = State.INITIAL; 192 private boolean isResponseCachingEnabled = true; 193 /* contains the configuration file content read at startup */ 194 protected String initialConfig; 195 protected boolean reconfigEnabled; 196 private final RequestPathMetricsCollector requestPathMetricsCollector; 197 198 private boolean localSessionEnabled = false; 199 protected enum State { 200 INITIAL, 201 RUNNING, 202 SHUTDOWN, 203 ERROR 204 } 205 206 /** 207 * This is the secret that we use to generate passwords. For the moment, 208 * it's more of a checksum that's used in reconnection, which carries no 209 * security weight, and is treated internally as if it carries no 210 * security weight. 211 */ 212 private static final long superSecret = 0XB3415C00L; 213 214 private final AtomicInteger requestsInProcess = new AtomicInteger(0); 215 final Deque<ChangeRecord> outstandingChanges = new ArrayDeque<>(); 216 // this data structure must be accessed under the outstandingChanges lock 217 final Map<String, ChangeRecord> outstandingChangesForPath = new HashMap<String, ChangeRecord>(); 218 219 protected ServerCnxnFactory serverCnxnFactory; 220 protected ServerCnxnFactory secureServerCnxnFactory; 221 222 private final ServerStats serverStats; 223 private final ZooKeeperServerListener listener; 224 private ZooKeeperServerShutdownHandler zkShutdownHandler; 225 private volatile int createSessionTrackerServerId = 1; 226 227 private static final String FLUSH_DELAY = "zookeeper.flushDelay"; 228 private static volatile long flushDelay; 229 private static final String MAX_WRITE_QUEUE_POLL_SIZE = "zookeeper.maxWriteQueuePollTime"; 230 private static volatile long maxWriteQueuePollTime; 231 private static final String MAX_BATCH_SIZE = "zookeeper.maxBatchSize"; 232 private static volatile int maxBatchSize; 233 234 /** 235 * Starting size of read and write ByteArroyOuputBuffers. Default is 32 bytes. 236 * Flag not used for small transfers like connectResponses. 237 */ 238 public static final String INT_BUFFER_STARTING_SIZE_BYTES = "zookeeper.intBufferStartingSizeBytes"; 239 public static final int DEFAULT_STARTING_BUFFER_SIZE = 1024; 240 public static final int intBufferStartingSizeBytes; 241 242 public static final String GET_DATA_RESPONSE_CACHE_SIZE = "zookeeper.maxResponseCacheSize"; 243 public static final String GET_CHILDREN_RESPONSE_CACHE_SIZE = "zookeeper.maxGetChildrenResponseCacheSize"; 244 245 static { 246 long configuredFlushDelay = Long.getLong(FLUSH_DELAY, 0); 247 setFlushDelay(configuredFlushDelay); Long.getLong(MAX_WRITE_QUEUE_POLL_SIZE, configuredFlushDelay / 3)248 setMaxWriteQueuePollTime(Long.getLong(MAX_WRITE_QUEUE_POLL_SIZE, configuredFlushDelay / 3)); Integer.getInteger(MAX_BATCH_SIZE, 1000)249 setMaxBatchSize(Integer.getInteger(MAX_BATCH_SIZE, 1000)); 250 251 intBufferStartingSizeBytes = Integer.getInteger(INT_BUFFER_STARTING_SIZE_BYTES, DEFAULT_STARTING_BUFFER_SIZE); 252 253 if (intBufferStartingSizeBytes < 32) { 254 String msg = "Buffer starting size must be greater than 0." 255 + "Configure with \"-Dzookeeper.intBufferStartingSizeBytes=<size>\" "; 256 LOG.error(msg); 257 throw new IllegalArgumentException(msg); 258 } 259 260 LOG.info("{} = {}", INT_BUFFER_STARTING_SIZE_BYTES, intBufferStartingSizeBytes); 261 } 262 263 // Connection throttling 264 private BlueThrottle connThrottle = new BlueThrottle(); 265 266 @SuppressFBWarnings(value = "IS2_INCONSISTENT_SYNC", justification = 267 "Internally the throttler has a BlockingQueue so " 268 + "once the throttler is created and started, it is thread-safe") 269 private RequestThrottler requestThrottler; 270 public static final String SNAP_COUNT = "zookeeper.snapCount"; 271 272 /** 273 * This setting sets a limit on the total number of large requests that 274 * can be inflight and is designed to prevent ZooKeeper from accepting 275 * too many large requests such that the JVM runs out of usable heap and 276 * ultimately crashes. 277 * 278 * The limit is enforced by the {@link checkRequestSize(int, boolean)} 279 * method which is called by the connection layer ({@link NIOServerCnxn}, 280 * {@link NettyServerCnxn}) before allocating a byte buffer and pulling 281 * data off the TCP socket. The limit is then checked again by the 282 * ZooKeeper server in {@link processPacket(ServerCnxn, ByteBuffer)} which 283 * also atomically updates {@link currentLargeRequestBytes}. The request is 284 * then marked as a large request, with the request size stored in the Request 285 * object so that it can later be decremented from {@link currentLargeRequestsBytes}. 286 * 287 * When a request is completed or dropped, the relevant code path calls the 288 * {@link requestFinished(Request)} method which performs the decrement if 289 * needed. 290 */ 291 private volatile int largeRequestMaxBytes = 100 * 1024 * 1024; 292 293 /** 294 * The size threshold after which a request is considered a large request 295 * and is checked against the large request byte limit. 296 */ 297 private volatile int largeRequestThreshold = -1; 298 299 private final AtomicInteger currentLargeRequestBytes = new AtomicInteger(0); 300 301 private AuthenticationHelper authHelper; 302 removeCnxn(ServerCnxn cnxn)303 void removeCnxn(ServerCnxn cnxn) { 304 zkDb.removeCnxn(cnxn); 305 } 306 307 /** 308 * Creates a ZooKeeperServer instance. Nothing is setup, use the setX 309 * methods to prepare the instance (eg datadir, datalogdir, ticktime, 310 * builder, etc...) 311 * 312 */ ZooKeeperServer()313 public ZooKeeperServer() { 314 listener = new ZooKeeperServerListenerImpl(this); 315 serverStats = new ServerStats(this); 316 this.requestPathMetricsCollector = new RequestPathMetricsCollector(); 317 this.authHelper = new AuthenticationHelper(); 318 } 319 320 /** 321 * Keeping this constructor for backward compatibility 322 */ ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig)323 public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig) { 324 this(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, clientPortListenBacklog, zkDb, initialConfig, QuorumPeerConfig.isReconfigEnabled()); 325 } 326 327 /** 328 * * Creates a ZooKeeperServer instance. It sets everything up, but doesn't 329 * actually start listening for clients until run() is invoked. 330 * 331 */ ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig, boolean reconfigEnabled)332 public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig, boolean reconfigEnabled) { 333 serverStats = new ServerStats(this); 334 this.txnLogFactory = txnLogFactory; 335 this.txnLogFactory.setServerStats(this.serverStats); 336 this.zkDb = zkDb; 337 this.tickTime = tickTime; 338 setMinSessionTimeout(minSessionTimeout); 339 setMaxSessionTimeout(maxSessionTimeout); 340 this.listenBacklog = clientPortListenBacklog; 341 this.reconfigEnabled = reconfigEnabled; 342 343 listener = new ZooKeeperServerListenerImpl(this); 344 345 readResponseCache = new ResponseCache(Integer.getInteger( 346 GET_DATA_RESPONSE_CACHE_SIZE, 347 ResponseCache.DEFAULT_RESPONSE_CACHE_SIZE), "getData"); 348 349 getChildrenResponseCache = new ResponseCache(Integer.getInteger( 350 GET_CHILDREN_RESPONSE_CACHE_SIZE, 351 ResponseCache.DEFAULT_RESPONSE_CACHE_SIZE), "getChildren"); 352 353 this.initialConfig = initialConfig; 354 355 this.requestPathMetricsCollector = new RequestPathMetricsCollector(); 356 357 this.initLargeRequestThrottlingSettings(); 358 359 this.authHelper = new AuthenticationHelper(); 360 361 LOG.info( 362 "Created server with" 363 + " tickTime {}" 364 + " minSessionTimeout {}" 365 + " maxSessionTimeout {}" 366 + " clientPortListenBacklog {}" 367 + " datadir {}" 368 + " snapdir {}", 369 tickTime, 370 getMinSessionTimeout(), 371 getMaxSessionTimeout(), 372 getClientPortListenBacklog(), 373 txnLogFactory.getDataDir(), 374 txnLogFactory.getSnapDir()); 375 } 376 getInitialConfig()377 public String getInitialConfig() { 378 return initialConfig; 379 } 380 381 /** 382 * Adds JvmPauseMonitor and calls 383 * {@link #ZooKeeperServer(FileTxnSnapLog, int, int, int, int, ZKDatabase, String)} 384 * 385 */ ZooKeeperServer(JvmPauseMonitor jvmPauseMonitor, FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig)386 public ZooKeeperServer(JvmPauseMonitor jvmPauseMonitor, FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig) { 387 this(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, clientPortListenBacklog, zkDb, initialConfig, QuorumPeerConfig.isReconfigEnabled()); 388 this.jvmPauseMonitor = jvmPauseMonitor; 389 if (jvmPauseMonitor != null) { 390 LOG.info("Added JvmPauseMonitor to server"); 391 } 392 } 393 394 /** 395 * creates a zookeeperserver instance. 396 * @param txnLogFactory the file transaction snapshot logging class 397 * @param tickTime the ticktime for the server 398 * @throws IOException 399 */ ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, String initialConfig)400 public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, String initialConfig) { 401 this(txnLogFactory, tickTime, -1, -1, -1, new ZKDatabase(txnLogFactory), initialConfig, QuorumPeerConfig.isReconfigEnabled()); 402 } 403 serverStats()404 public ServerStats serverStats() { 405 return serverStats; 406 } 407 getRequestPathMetricsCollector()408 public RequestPathMetricsCollector getRequestPathMetricsCollector() { 409 return requestPathMetricsCollector; 410 } 411 connThrottle()412 public BlueThrottle connThrottle() { 413 return connThrottle; 414 } 415 dumpConf(PrintWriter pwriter)416 public void dumpConf(PrintWriter pwriter) { 417 pwriter.print("clientPort="); 418 pwriter.println(getClientPort()); 419 pwriter.print("secureClientPort="); 420 pwriter.println(getSecureClientPort()); 421 pwriter.print("dataDir="); 422 pwriter.println(zkDb.snapLog.getSnapDir().getAbsolutePath()); 423 pwriter.print("dataDirSize="); 424 pwriter.println(getDataDirSize()); 425 pwriter.print("dataLogDir="); 426 pwriter.println(zkDb.snapLog.getDataDir().getAbsolutePath()); 427 pwriter.print("dataLogSize="); 428 pwriter.println(getLogDirSize()); 429 pwriter.print("tickTime="); 430 pwriter.println(getTickTime()); 431 pwriter.print("maxClientCnxns="); 432 pwriter.println(getMaxClientCnxnsPerHost()); 433 pwriter.print("minSessionTimeout="); 434 pwriter.println(getMinSessionTimeout()); 435 pwriter.print("maxSessionTimeout="); 436 pwriter.println(getMaxSessionTimeout()); 437 pwriter.print("clientPortListenBacklog="); 438 pwriter.println(getClientPortListenBacklog()); 439 440 pwriter.print("serverId="); 441 pwriter.println(getServerId()); 442 } 443 getConf()444 public ZooKeeperServerConf getConf() { 445 return new ZooKeeperServerConf( 446 getClientPort(), 447 zkDb.snapLog.getSnapDir().getAbsolutePath(), 448 zkDb.snapLog.getDataDir().getAbsolutePath(), 449 getTickTime(), 450 getMaxClientCnxnsPerHost(), 451 getMinSessionTimeout(), 452 getMaxSessionTimeout(), 453 getServerId(), 454 getClientPortListenBacklog()); 455 } 456 457 /** 458 * This constructor is for backward compatibility with the existing unit 459 * test code. 460 * It defaults to FileLogProvider persistence provider. 461 */ ZooKeeperServer(File snapDir, File logDir, int tickTime)462 public ZooKeeperServer(File snapDir, File logDir, int tickTime) throws IOException { 463 this(new FileTxnSnapLog(snapDir, logDir), tickTime, ""); 464 } 465 466 /** 467 * Default constructor, relies on the config for its argument values 468 * 469 * @throws IOException 470 */ ZooKeeperServer(FileTxnSnapLog txnLogFactory)471 public ZooKeeperServer(FileTxnSnapLog txnLogFactory) throws IOException { 472 this(txnLogFactory, DEFAULT_TICK_TIME, -1, -1, -1, new ZKDatabase(txnLogFactory), "", QuorumPeerConfig.isReconfigEnabled()); 473 } 474 475 /** 476 * get the zookeeper database for this server 477 * @return the zookeeper database for this server 478 */ getZKDatabase()479 public ZKDatabase getZKDatabase() { 480 return this.zkDb; 481 } 482 483 /** 484 * set the zkdatabase for this zookeeper server 485 * @param zkDb 486 */ setZKDatabase(ZKDatabase zkDb)487 public void setZKDatabase(ZKDatabase zkDb) { 488 this.zkDb = zkDb; 489 } 490 491 /** 492 * Restore sessions and data 493 */ loadData()494 public void loadData() throws IOException, InterruptedException { 495 /* 496 * When a new leader starts executing Leader#lead, it 497 * invokes this method. The database, however, has been 498 * initialized before running leader election so that 499 * the server could pick its zxid for its initial vote. 500 * It does it by invoking QuorumPeer#getLastLoggedZxid. 501 * Consequently, we don't need to initialize it once more 502 * and avoid the penalty of loading it a second time. Not 503 * reloading it is particularly important for applications 504 * that host a large database. 505 * 506 * The following if block checks whether the database has 507 * been initialized or not. Note that this method is 508 * invoked by at least one other method: 509 * ZooKeeperServer#startdata. 510 * 511 * See ZOOKEEPER-1642 for more detail. 512 */ 513 if (zkDb.isInitialized()) { 514 setZxid(zkDb.getDataTreeLastProcessedZxid()); 515 } else { 516 setZxid(zkDb.loadDataBase()); 517 } 518 519 // Clean up dead sessions 520 zkDb.getSessions().stream() 521 .filter(session -> zkDb.getSessionWithTimeOuts().get(session) == null) 522 .forEach(session -> killSession(session, zkDb.getDataTreeLastProcessedZxid())); 523 524 // Make a clean snapshot 525 takeSnapshot(); 526 } 527 takeSnapshot()528 public void takeSnapshot() { 529 takeSnapshot(false); 530 } 531 takeSnapshot(boolean syncSnap)532 public void takeSnapshot(boolean syncSnap) { 533 long start = Time.currentElapsedTime(); 534 try { 535 txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap); 536 } catch (IOException e) { 537 LOG.error("Severe unrecoverable error, exiting", e); 538 // This is a severe error that we cannot recover from, 539 // so we need to exit 540 ServiceUtils.requestSystemExit(ExitCode.TXNLOG_ERROR_TAKING_SNAPSHOT.getValue()); 541 } 542 long elapsed = Time.currentElapsedTime() - start; 543 LOG.info("Snapshot taken in {} ms", elapsed); 544 ServerMetrics.getMetrics().SNAPSHOT_TIME.add(elapsed); 545 } 546 shouldForceWriteInitialSnapshotAfterLeaderElection()547 public boolean shouldForceWriteInitialSnapshotAfterLeaderElection() { 548 return txnLogFactory.shouldForceWriteInitialSnapshotAfterLeaderElection(); 549 } 550 551 @Override getDataDirSize()552 public long getDataDirSize() { 553 if (zkDb == null) { 554 return 0L; 555 } 556 File path = zkDb.snapLog.getDataDir(); 557 return getDirSize(path); 558 } 559 560 @Override getLogDirSize()561 public long getLogDirSize() { 562 if (zkDb == null) { 563 return 0L; 564 } 565 File path = zkDb.snapLog.getSnapDir(); 566 return getDirSize(path); 567 } 568 getDirSize(File file)569 private long getDirSize(File file) { 570 long size = 0L; 571 if (file.isDirectory()) { 572 File[] files = file.listFiles(); 573 if (files != null) { 574 for (File f : files) { 575 size += getDirSize(f); 576 } 577 } 578 } else { 579 size = file.length(); 580 } 581 return size; 582 } 583 getZxid()584 public long getZxid() { 585 return hzxid.get(); 586 } 587 getSessionTracker()588 public SessionTracker getSessionTracker() { 589 return sessionTracker; 590 } 591 getNextZxid()592 long getNextZxid() { 593 return hzxid.incrementAndGet(); 594 } 595 setZxid(long zxid)596 public void setZxid(long zxid) { 597 hzxid.set(zxid); 598 } 599 close(long sessionId)600 private void close(long sessionId) { 601 Request si = new Request(null, sessionId, 0, OpCode.closeSession, null, null); 602 submitRequest(si); 603 } 604 closeSession(long sessionId)605 public void closeSession(long sessionId) { 606 LOG.info("Closing session 0x{}", Long.toHexString(sessionId)); 607 608 // we do not want to wait for a session close. send it as soon as we 609 // detect it! 610 close(sessionId); 611 } 612 killSession(long sessionId, long zxid)613 protected void killSession(long sessionId, long zxid) { 614 zkDb.killSession(sessionId, zxid); 615 if (LOG.isTraceEnabled()) { 616 ZooTrace.logTraceMessage( 617 LOG, 618 ZooTrace.SESSION_TRACE_MASK, 619 "ZooKeeperServer --- killSession: 0x" + Long.toHexString(sessionId)); 620 } 621 if (sessionTracker != null) { 622 sessionTracker.removeSession(sessionId); 623 } 624 } 625 expire(Session session)626 public void expire(Session session) { 627 long sessionId = session.getSessionId(); 628 LOG.info( 629 "Expiring session 0x{}, timeout of {}ms exceeded", 630 Long.toHexString(sessionId), 631 session.getTimeout()); 632 close(sessionId); 633 } 634 635 public static class MissingSessionException extends IOException { 636 637 private static final long serialVersionUID = 7467414635467261007L; 638 MissingSessionException(String msg)639 public MissingSessionException(String msg) { 640 super(msg); 641 } 642 643 } 644 touch(ServerCnxn cnxn)645 void touch(ServerCnxn cnxn) throws MissingSessionException { 646 if (cnxn == null) { 647 return; 648 } 649 long id = cnxn.getSessionId(); 650 int to = cnxn.getSessionTimeout(); 651 if (!sessionTracker.touchSession(id, to)) { 652 throw new MissingSessionException("No session with sessionid 0x" 653 + Long.toHexString(id) 654 + " exists, probably expired and removed"); 655 } 656 } 657 registerJMX()658 protected void registerJMX() { 659 // register with JMX 660 try { 661 jmxServerBean = new ZooKeeperServerBean(this); 662 MBeanRegistry.getInstance().register(jmxServerBean, null); 663 664 try { 665 jmxDataTreeBean = new DataTreeBean(zkDb.getDataTree()); 666 MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean); 667 } catch (Exception e) { 668 LOG.warn("Failed to register with JMX", e); 669 jmxDataTreeBean = null; 670 } 671 } catch (Exception e) { 672 LOG.warn("Failed to register with JMX", e); 673 jmxServerBean = null; 674 } 675 } 676 startdata()677 public void startdata() throws IOException, InterruptedException { 678 //check to see if zkDb is not null 679 if (zkDb == null) { 680 zkDb = new ZKDatabase(this.txnLogFactory); 681 } 682 if (!zkDb.isInitialized()) { 683 loadData(); 684 } 685 } 686 startup()687 public synchronized void startup() { 688 startupWithServerState(State.RUNNING); 689 } 690 startupWithoutServing()691 public synchronized void startupWithoutServing() { 692 startupWithServerState(State.INITIAL); 693 } 694 startServing()695 public synchronized void startServing() { 696 setState(State.RUNNING); 697 notifyAll(); 698 } 699 startupWithServerState(State state)700 private void startupWithServerState(State state) { 701 if (sessionTracker == null) { 702 createSessionTracker(); 703 } 704 startSessionTracker(); 705 setupRequestProcessors(); 706 707 startRequestThrottler(); 708 709 registerJMX(); 710 711 startJvmPauseMonitor(); 712 713 registerMetrics(); 714 715 setState(state); 716 717 requestPathMetricsCollector.start(); 718 719 localSessionEnabled = sessionTracker.isLocalSessionsEnabled(); 720 721 notifyAll(); 722 } 723 startJvmPauseMonitor()724 protected void startJvmPauseMonitor() { 725 if (this.jvmPauseMonitor != null) { 726 this.jvmPauseMonitor.serviceStart(); 727 } 728 } 729 startRequestThrottler()730 protected void startRequestThrottler() { 731 requestThrottler = new RequestThrottler(this); 732 requestThrottler.start(); 733 734 } 735 setupRequestProcessors()736 protected void setupRequestProcessors() { 737 RequestProcessor finalProcessor = new FinalRequestProcessor(this); 738 RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor); 739 ((SyncRequestProcessor) syncProcessor).start(); 740 firstProcessor = new PrepRequestProcessor(this, syncProcessor); 741 ((PrepRequestProcessor) firstProcessor).start(); 742 } 743 getZooKeeperServerListener()744 public ZooKeeperServerListener getZooKeeperServerListener() { 745 return listener; 746 } 747 748 /** 749 * Change the server ID used by {@link #createSessionTracker()}. Must be called prior to 750 * {@link #startup()} being called 751 * 752 * @param newId ID to use 753 */ setCreateSessionTrackerServerId(int newId)754 public void setCreateSessionTrackerServerId(int newId) { 755 createSessionTrackerServerId = newId; 756 } 757 createSessionTracker()758 protected void createSessionTracker() { 759 sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(), tickTime, createSessionTrackerServerId, getZooKeeperServerListener()); 760 } 761 startSessionTracker()762 protected void startSessionTracker() { 763 ((SessionTrackerImpl) sessionTracker).start(); 764 } 765 766 /** 767 * Sets the state of ZooKeeper server. After changing the state, it notifies 768 * the server state change to a registered shutdown handler, if any. 769 * <p> 770 * The following are the server state transitions: 771 * <ul><li>During startup the server will be in the INITIAL state.</li> 772 * <li>After successfully starting, the server sets the state to RUNNING. 773 * </li> 774 * <li>The server transitions to the ERROR state if it hits an internal 775 * error. {@link ZooKeeperServerListenerImpl} notifies any critical resource 776 * error events, e.g., SyncRequestProcessor not being able to write a txn to 777 * disk.</li> 778 * <li>During shutdown the server sets the state to SHUTDOWN, which 779 * corresponds to the server not running.</li></ul> 780 * 781 * @param state new server state. 782 */ setState(State state)783 protected void setState(State state) { 784 this.state = state; 785 // Notify server state changes to the registered shutdown handler, if any. 786 if (zkShutdownHandler != null) { 787 zkShutdownHandler.handle(state); 788 } else { 789 LOG.debug( 790 "ZKShutdownHandler is not registered, so ZooKeeper server" 791 + " won't take any action on ERROR or SHUTDOWN server state changes"); 792 } 793 } 794 795 /** 796 * This can be used while shutting down the server to see whether the server 797 * is already shutdown or not. 798 * 799 * @return true if the server is running or server hits an error, false 800 * otherwise. 801 */ canShutdown()802 protected boolean canShutdown() { 803 return state == State.RUNNING || state == State.ERROR; 804 } 805 806 /** 807 * @return true if the server is running, false otherwise. 808 */ isRunning()809 public boolean isRunning() { 810 return state == State.RUNNING; 811 } 812 shutdown()813 public void shutdown() { 814 shutdown(false); 815 } 816 817 /** 818 * Shut down the server instance 819 * @param fullyShutDown true if another server using the same database will not replace this one in the same process 820 */ shutdown(boolean fullyShutDown)821 public synchronized void shutdown(boolean fullyShutDown) { 822 if (!canShutdown()) { 823 if (fullyShutDown && zkDb != null) { 824 zkDb.clear(); 825 } 826 LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!"); 827 return; 828 } 829 LOG.info("shutting down"); 830 831 // new RuntimeException("Calling shutdown").printStackTrace(); 832 setState(State.SHUTDOWN); 833 834 // unregister all metrics that are keeping a strong reference to this object 835 // subclasses will do their specific clean up 836 unregisterMetrics(); 837 838 if (requestThrottler != null) { 839 requestThrottler.shutdown(); 840 } 841 842 // Since sessionTracker and syncThreads poll we just have to 843 // set running to false and they will detect it during the poll 844 // interval. 845 if (sessionTracker != null) { 846 sessionTracker.shutdown(); 847 } 848 if (firstProcessor != null) { 849 firstProcessor.shutdown(); 850 } 851 if (jvmPauseMonitor != null) { 852 jvmPauseMonitor.serviceStop(); 853 } 854 855 if (zkDb != null) { 856 if (fullyShutDown) { 857 zkDb.clear(); 858 } else { 859 // else there is no need to clear the database 860 // * When a new quorum is established we can still apply the diff 861 // on top of the same zkDb data 862 // * If we fetch a new snapshot from leader, the zkDb will be 863 // cleared anyway before loading the snapshot 864 try { 865 //This will fast forward the database to the latest recorded transactions 866 zkDb.fastForwardDataBase(); 867 } catch (IOException e) { 868 LOG.error("Error updating DB", e); 869 zkDb.clear(); 870 } 871 } 872 } 873 874 requestPathMetricsCollector.shutdown(); 875 unregisterJMX(); 876 } 877 unregisterJMX()878 protected void unregisterJMX() { 879 // unregister from JMX 880 try { 881 if (jmxDataTreeBean != null) { 882 MBeanRegistry.getInstance().unregister(jmxDataTreeBean); 883 } 884 } catch (Exception e) { 885 LOG.warn("Failed to unregister with JMX", e); 886 } 887 try { 888 if (jmxServerBean != null) { 889 MBeanRegistry.getInstance().unregister(jmxServerBean); 890 } 891 } catch (Exception e) { 892 LOG.warn("Failed to unregister with JMX", e); 893 } 894 jmxServerBean = null; 895 jmxDataTreeBean = null; 896 } 897 incInProcess()898 public void incInProcess() { 899 requestsInProcess.incrementAndGet(); 900 } 901 decInProcess()902 public void decInProcess() { 903 requestsInProcess.decrementAndGet(); 904 if (requestThrottler != null) { 905 requestThrottler.throttleWake(); 906 } 907 } 908 getInProcess()909 public int getInProcess() { 910 return requestsInProcess.get(); 911 } 912 getInflight()913 public int getInflight() { 914 return requestThrottleInflight(); 915 } 916 requestThrottleInflight()917 private int requestThrottleInflight() { 918 if (requestThrottler != null) { 919 return requestThrottler.getInflight(); 920 } 921 return 0; 922 } 923 924 static class PrecalculatedDigest { 925 final long nodeDigest; 926 final long treeDigest; 927 PrecalculatedDigest(long nodeDigest, long treeDigest)928 PrecalculatedDigest(long nodeDigest, long treeDigest) { 929 this.nodeDigest = nodeDigest; 930 this.treeDigest = treeDigest; 931 } 932 } 933 934 935 /** 936 * This structure is used to facilitate information sharing between PrepRP 937 * and FinalRP. 938 */ 939 static class ChangeRecord { 940 PrecalculatedDigest precalculatedDigest; 941 byte[] data; 942 ChangeRecord(long zxid, String path, StatPersisted stat, int childCount, List<ACL> acl)943 ChangeRecord(long zxid, String path, StatPersisted stat, int childCount, List<ACL> acl) { 944 this.zxid = zxid; 945 this.path = path; 946 this.stat = stat; 947 this.childCount = childCount; 948 this.acl = acl; 949 } 950 951 long zxid; 952 953 String path; 954 955 StatPersisted stat; /* Make sure to create a new object when changing */ 956 957 int childCount; 958 959 List<ACL> acl; /* Make sure to create a new object when changing */ 960 duplicate(long zxid)961 ChangeRecord duplicate(long zxid) { 962 StatPersisted stat = new StatPersisted(); 963 if (this.stat != null) { 964 DataTree.copyStatPersisted(this.stat, stat); 965 } 966 ChangeRecord changeRecord = new ChangeRecord(zxid, path, stat, childCount, 967 acl == null ? new ArrayList<>() : new ArrayList<>(acl)); 968 changeRecord.precalculatedDigest = precalculatedDigest; 969 changeRecord.data = data; 970 return changeRecord; 971 } 972 973 } 974 generatePasswd(long id)975 byte[] generatePasswd(long id) { 976 Random r = new Random(id ^ superSecret); 977 byte[] p = new byte[16]; 978 r.nextBytes(p); 979 return p; 980 } 981 checkPasswd(long sessionId, byte[] passwd)982 protected boolean checkPasswd(long sessionId, byte[] passwd) { 983 return sessionId != 0 && Arrays.equals(passwd, generatePasswd(sessionId)); 984 } 985 createSession(ServerCnxn cnxn, byte[] passwd, int timeout)986 long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) { 987 if (passwd == null) { 988 // Possible since it's just deserialized from a packet on the wire. 989 passwd = new byte[0]; 990 } 991 long sessionId = sessionTracker.createSession(timeout); 992 Random r = new Random(sessionId ^ superSecret); 993 r.nextBytes(passwd); 994 ByteBuffer to = ByteBuffer.allocate(4); 995 to.putInt(timeout); 996 cnxn.setSessionId(sessionId); 997 Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, to, null); 998 submitRequest(si); 999 return sessionId; 1000 } 1001 1002 /** 1003 * set the owner of this session as owner 1004 * @param id the session id 1005 * @param owner the owner of the session 1006 * @throws SessionExpiredException 1007 */ setOwner(long id, Object owner)1008 public void setOwner(long id, Object owner) throws SessionExpiredException { 1009 sessionTracker.setOwner(id, owner); 1010 } 1011 revalidateSession(ServerCnxn cnxn, long sessionId, int sessionTimeout)1012 protected void revalidateSession(ServerCnxn cnxn, long sessionId, int sessionTimeout) throws IOException { 1013 boolean rc = sessionTracker.touchSession(sessionId, sessionTimeout); 1014 if (LOG.isTraceEnabled()) { 1015 ZooTrace.logTraceMessage( 1016 LOG, 1017 ZooTrace.SESSION_TRACE_MASK, 1018 "Session 0x" + Long.toHexString(sessionId) + " is valid: " + rc); 1019 } 1020 finishSessionInit(cnxn, rc); 1021 } 1022 reopenSession(ServerCnxn cnxn, long sessionId, byte[] passwd, int sessionTimeout)1023 public void reopenSession(ServerCnxn cnxn, long sessionId, byte[] passwd, int sessionTimeout) throws IOException { 1024 if (checkPasswd(sessionId, passwd)) { 1025 revalidateSession(cnxn, sessionId, sessionTimeout); 1026 } else { 1027 LOG.warn( 1028 "Incorrect password from {} for session 0x{}", 1029 cnxn.getRemoteSocketAddress(), 1030 Long.toHexString(sessionId)); 1031 finishSessionInit(cnxn, false); 1032 } 1033 } 1034 finishSessionInit(ServerCnxn cnxn, boolean valid)1035 public void finishSessionInit(ServerCnxn cnxn, boolean valid) { 1036 // register with JMX 1037 try { 1038 if (valid) { 1039 if (serverCnxnFactory != null && serverCnxnFactory.cnxns.contains(cnxn)) { 1040 serverCnxnFactory.registerConnection(cnxn); 1041 } else if (secureServerCnxnFactory != null && secureServerCnxnFactory.cnxns.contains(cnxn)) { 1042 secureServerCnxnFactory.registerConnection(cnxn); 1043 } 1044 } 1045 } catch (Exception e) { 1046 LOG.warn("Failed to register with JMX", e); 1047 } 1048 1049 try { 1050 ConnectResponse rsp = new ConnectResponse( 1051 0, 1052 valid ? cnxn.getSessionTimeout() : 0, 1053 valid ? cnxn.getSessionId() : 0, // send 0 if session is no 1054 // longer valid 1055 valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]); 1056 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 1057 BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos); 1058 bos.writeInt(-1, "len"); 1059 rsp.serialize(bos, "connect"); 1060 if (!cnxn.isOldClient) { 1061 bos.writeBool(this instanceof ReadOnlyZooKeeperServer, "readOnly"); 1062 } 1063 baos.close(); 1064 ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray()); 1065 bb.putInt(bb.remaining() - 4).rewind(); 1066 cnxn.sendBuffer(bb); 1067 1068 if (valid) { 1069 LOG.debug( 1070 "Established session 0x{} with negotiated timeout {} for client {}", 1071 Long.toHexString(cnxn.getSessionId()), 1072 cnxn.getSessionTimeout(), 1073 cnxn.getRemoteSocketAddress()); 1074 cnxn.enableRecv(); 1075 } else { 1076 1077 LOG.info( 1078 "Invalid session 0x{} for client {}, probably expired", 1079 Long.toHexString(cnxn.getSessionId()), 1080 cnxn.getRemoteSocketAddress()); 1081 cnxn.sendBuffer(ServerCnxnFactory.closeConn); 1082 } 1083 1084 } catch (Exception e) { 1085 LOG.warn("Exception while establishing session, closing", e); 1086 cnxn.close(ServerCnxn.DisconnectReason.IO_EXCEPTION_IN_SESSION_INIT); 1087 } 1088 } 1089 closeSession(ServerCnxn cnxn, RequestHeader requestHeader)1090 public void closeSession(ServerCnxn cnxn, RequestHeader requestHeader) { 1091 closeSession(cnxn.getSessionId()); 1092 } 1093 getServerId()1094 public long getServerId() { 1095 return 0; 1096 } 1097 1098 /** 1099 * If the underlying Zookeeper server support local session, this method 1100 * will set a isLocalSession to true if a request is associated with 1101 * a local session. 1102 * 1103 * @param si 1104 */ setLocalSessionFlag(Request si)1105 protected void setLocalSessionFlag(Request si) { 1106 } 1107 submitRequest(Request si)1108 public void submitRequest(Request si) { 1109 enqueueRequest(si); 1110 } 1111 enqueueRequest(Request si)1112 public void enqueueRequest(Request si) { 1113 if (requestThrottler == null) { 1114 synchronized (this) { 1115 try { 1116 // Since all requests are passed to the request 1117 // processor it should wait for setting up the request 1118 // processor chain. The state will be updated to RUNNING 1119 // after the setup. 1120 while (state == State.INITIAL) { 1121 wait(1000); 1122 } 1123 } catch (InterruptedException e) { 1124 LOG.warn("Unexpected interruption", e); 1125 } 1126 if (requestThrottler == null) { 1127 throw new RuntimeException("Not started"); 1128 } 1129 } 1130 } 1131 requestThrottler.submitRequest(si); 1132 } 1133 submitRequestNow(Request si)1134 public void submitRequestNow(Request si) { 1135 if (firstProcessor == null) { 1136 synchronized (this) { 1137 try { 1138 // Since all requests are passed to the request 1139 // processor it should wait for setting up the request 1140 // processor chain. The state will be updated to RUNNING 1141 // after the setup. 1142 while (state == State.INITIAL) { 1143 wait(1000); 1144 } 1145 } catch (InterruptedException e) { 1146 LOG.warn("Unexpected interruption", e); 1147 } 1148 if (firstProcessor == null || state != State.RUNNING) { 1149 throw new RuntimeException("Not started"); 1150 } 1151 } 1152 } 1153 try { 1154 touch(si.cnxn); 1155 boolean validpacket = Request.isValid(si.type); 1156 if (validpacket) { 1157 setLocalSessionFlag(si); 1158 firstProcessor.processRequest(si); 1159 if (si.cnxn != null) { 1160 incInProcess(); 1161 } 1162 } else { 1163 LOG.warn("Received packet at server of unknown type {}", si.type); 1164 // Update request accounting/throttling limits 1165 requestFinished(si); 1166 new UnimplementedRequestProcessor().processRequest(si); 1167 } 1168 } catch (MissingSessionException e) { 1169 LOG.debug("Dropping request.", e); 1170 // Update request accounting/throttling limits 1171 requestFinished(si); 1172 } catch (RequestProcessorException e) { 1173 LOG.error("Unable to process request", e); 1174 // Update request accounting/throttling limits 1175 requestFinished(si); 1176 } 1177 } 1178 getSnapCount()1179 public static int getSnapCount() { 1180 String sc = System.getProperty(SNAP_COUNT); 1181 try { 1182 int snapCount = Integer.parseInt(sc); 1183 1184 // snapCount must be 2 or more. See org.apache.zookeeper.server.SyncRequestProcessor 1185 if (snapCount < 2) { 1186 LOG.warn("SnapCount should be 2 or more. Now, snapCount is reset to 2"); 1187 snapCount = 2; 1188 } 1189 return snapCount; 1190 } catch (Exception e) { 1191 return 100000; 1192 } 1193 } 1194 getGlobalOutstandingLimit()1195 public int getGlobalOutstandingLimit() { 1196 String sc = System.getProperty(GLOBAL_OUTSTANDING_LIMIT); 1197 int limit; 1198 try { 1199 limit = Integer.parseInt(sc); 1200 } catch (Exception e) { 1201 limit = 1000; 1202 } 1203 return limit; 1204 } 1205 getSnapSizeInBytes()1206 public static long getSnapSizeInBytes() { 1207 long size = Long.getLong("zookeeper.snapSizeLimitInKb", 4194304L); // 4GB by default 1208 if (size <= 0) { 1209 LOG.info("zookeeper.snapSizeLimitInKb set to a non-positive value {}; disabling feature", size); 1210 } 1211 return size * 1024; // Convert to bytes 1212 } 1213 setServerCnxnFactory(ServerCnxnFactory factory)1214 public void setServerCnxnFactory(ServerCnxnFactory factory) { 1215 serverCnxnFactory = factory; 1216 } 1217 getServerCnxnFactory()1218 public ServerCnxnFactory getServerCnxnFactory() { 1219 return serverCnxnFactory; 1220 } 1221 getSecureServerCnxnFactory()1222 public ServerCnxnFactory getSecureServerCnxnFactory() { 1223 return secureServerCnxnFactory; 1224 } 1225 setSecureServerCnxnFactory(ServerCnxnFactory factory)1226 public void setSecureServerCnxnFactory(ServerCnxnFactory factory) { 1227 secureServerCnxnFactory = factory; 1228 } 1229 1230 /** 1231 * return the last proceesed id from the 1232 * datatree 1233 */ getLastProcessedZxid()1234 public long getLastProcessedZxid() { 1235 return zkDb.getDataTreeLastProcessedZxid(); 1236 } 1237 1238 /** 1239 * return the outstanding requests 1240 * in the queue, which havent been 1241 * processed yet 1242 */ getOutstandingRequests()1243 public long getOutstandingRequests() { 1244 return getInProcess(); 1245 } 1246 1247 /** 1248 * return the total number of client connections that are alive 1249 * to this server 1250 */ getNumAliveConnections()1251 public int getNumAliveConnections() { 1252 int numAliveConnections = 0; 1253 1254 if (serverCnxnFactory != null) { 1255 numAliveConnections += serverCnxnFactory.getNumAliveConnections(); 1256 } 1257 1258 if (secureServerCnxnFactory != null) { 1259 numAliveConnections += secureServerCnxnFactory.getNumAliveConnections(); 1260 } 1261 1262 return numAliveConnections; 1263 } 1264 1265 /** 1266 * trunccate the log to get in sync with others 1267 * if in a quorum 1268 * @param zxid the zxid that it needs to get in sync 1269 * with others 1270 * @throws IOException 1271 */ truncateLog(long zxid)1272 public void truncateLog(long zxid) throws IOException { 1273 this.zkDb.truncateLog(zxid); 1274 } 1275 getTickTime()1276 public int getTickTime() { 1277 return tickTime; 1278 } 1279 setTickTime(int tickTime)1280 public void setTickTime(int tickTime) { 1281 LOG.info("tickTime set to {}", tickTime); 1282 this.tickTime = tickTime; 1283 } 1284 getThrottledOpWaitTime()1285 public static int getThrottledOpWaitTime() { 1286 return throttledOpWaitTime; 1287 } 1288 setThrottledOpWaitTime(int time)1289 public static void setThrottledOpWaitTime(int time) { 1290 LOG.info("throttledOpWaitTime set to {}", time); 1291 throttledOpWaitTime = time; 1292 } 1293 getMinSessionTimeout()1294 public int getMinSessionTimeout() { 1295 return minSessionTimeout; 1296 } 1297 setMinSessionTimeout(int min)1298 public void setMinSessionTimeout(int min) { 1299 this.minSessionTimeout = min == -1 ? tickTime * 2 : min; 1300 LOG.info("minSessionTimeout set to {}", this.minSessionTimeout); 1301 } 1302 getMaxSessionTimeout()1303 public int getMaxSessionTimeout() { 1304 return maxSessionTimeout; 1305 } 1306 setMaxSessionTimeout(int max)1307 public void setMaxSessionTimeout(int max) { 1308 this.maxSessionTimeout = max == -1 ? tickTime * 20 : max; 1309 LOG.info("maxSessionTimeout set to {}", this.maxSessionTimeout); 1310 } 1311 getClientPortListenBacklog()1312 public int getClientPortListenBacklog() { 1313 return listenBacklog; 1314 } 1315 setClientPortListenBacklog(int backlog)1316 public void setClientPortListenBacklog(int backlog) { 1317 this.listenBacklog = backlog; 1318 LOG.info("clientPortListenBacklog set to {}", backlog); 1319 } 1320 getClientPort()1321 public int getClientPort() { 1322 return serverCnxnFactory != null ? serverCnxnFactory.getLocalPort() : -1; 1323 } 1324 getSecureClientPort()1325 public int getSecureClientPort() { 1326 return secureServerCnxnFactory != null ? secureServerCnxnFactory.getLocalPort() : -1; 1327 } 1328 1329 /** Maximum number of connections allowed from particular host (ip) */ getMaxClientCnxnsPerHost()1330 public int getMaxClientCnxnsPerHost() { 1331 if (serverCnxnFactory != null) { 1332 return serverCnxnFactory.getMaxClientCnxnsPerHost(); 1333 } 1334 if (secureServerCnxnFactory != null) { 1335 return secureServerCnxnFactory.getMaxClientCnxnsPerHost(); 1336 } 1337 return -1; 1338 } 1339 setTxnLogFactory(FileTxnSnapLog txnLog)1340 public void setTxnLogFactory(FileTxnSnapLog txnLog) { 1341 this.txnLogFactory = txnLog; 1342 } 1343 getTxnLogFactory()1344 public FileTxnSnapLog getTxnLogFactory() { 1345 return this.txnLogFactory; 1346 } 1347 1348 /** 1349 * Returns the elapsed sync of time of transaction log in milliseconds. 1350 */ getTxnLogElapsedSyncTime()1351 public long getTxnLogElapsedSyncTime() { 1352 return txnLogFactory.getTxnLogElapsedSyncTime(); 1353 } 1354 getState()1355 public String getState() { 1356 return "standalone"; 1357 } 1358 dumpEphemerals(PrintWriter pwriter)1359 public void dumpEphemerals(PrintWriter pwriter) { 1360 zkDb.dumpEphemerals(pwriter); 1361 } 1362 getEphemerals()1363 public Map<Long, Set<String>> getEphemerals() { 1364 return zkDb.getEphemerals(); 1365 } 1366 getConnectionDropChance()1367 public double getConnectionDropChance() { 1368 return connThrottle.getDropChance(); 1369 } 1370 1371 @SuppressFBWarnings(value = "IS2_INCONSISTENT_SYNC", justification = "the value won't change after startup") processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer)1372 public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) 1373 throws IOException, ClientCnxnLimitException { 1374 1375 BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer)); 1376 ConnectRequest connReq = new ConnectRequest(); 1377 connReq.deserialize(bia, "connect"); 1378 LOG.debug( 1379 "Session establishment request from client {} client's lastZxid is 0x{}", 1380 cnxn.getRemoteSocketAddress(), 1381 Long.toHexString(connReq.getLastZxidSeen())); 1382 1383 long sessionId = connReq.getSessionId(); 1384 int tokensNeeded = 1; 1385 if (connThrottle.isConnectionWeightEnabled()) { 1386 if (sessionId == 0) { 1387 if (localSessionEnabled) { 1388 tokensNeeded = connThrottle.getRequiredTokensForLocal(); 1389 } else { 1390 tokensNeeded = connThrottle.getRequiredTokensForGlobal(); 1391 } 1392 } else { 1393 tokensNeeded = connThrottle.getRequiredTokensForRenew(); 1394 } 1395 } 1396 1397 if (!connThrottle.checkLimit(tokensNeeded)) { 1398 throw new ClientCnxnLimitException(); 1399 } 1400 ServerMetrics.getMetrics().CONNECTION_TOKEN_DEFICIT.add(connThrottle.getDeficit()); 1401 1402 ServerMetrics.getMetrics().CONNECTION_REQUEST_COUNT.add(1); 1403 1404 boolean readOnly = false; 1405 try { 1406 readOnly = bia.readBool("readOnly"); 1407 cnxn.isOldClient = false; 1408 } catch (IOException e) { 1409 // this is ok -- just a packet from an old client which 1410 // doesn't contain readOnly field 1411 LOG.warn( 1412 "Connection request from old client {}; will be dropped if server is in r-o mode", 1413 cnxn.getRemoteSocketAddress()); 1414 } 1415 if (!readOnly && this instanceof ReadOnlyZooKeeperServer) { 1416 String msg = "Refusing session request for not-read-only client " + cnxn.getRemoteSocketAddress(); 1417 LOG.info(msg); 1418 throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.NOT_READ_ONLY_CLIENT); 1419 } 1420 if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) { 1421 String msg = "Refusing session request for client " 1422 + cnxn.getRemoteSocketAddress() 1423 + " as it has seen zxid 0x" 1424 + Long.toHexString(connReq.getLastZxidSeen()) 1425 + " our last zxid is 0x" 1426 + Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid()) 1427 + " client must try another server"; 1428 1429 LOG.info(msg); 1430 throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.CLIENT_ZXID_AHEAD); 1431 } 1432 int sessionTimeout = connReq.getTimeOut(); 1433 byte[] passwd = connReq.getPasswd(); 1434 int minSessionTimeout = getMinSessionTimeout(); 1435 if (sessionTimeout < minSessionTimeout) { 1436 sessionTimeout = minSessionTimeout; 1437 } 1438 int maxSessionTimeout = getMaxSessionTimeout(); 1439 if (sessionTimeout > maxSessionTimeout) { 1440 sessionTimeout = maxSessionTimeout; 1441 } 1442 cnxn.setSessionTimeout(sessionTimeout); 1443 // We don't want to receive any packets until we are sure that the 1444 // session is setup 1445 cnxn.disableRecv(); 1446 if (sessionId == 0) { 1447 long id = createSession(cnxn, passwd, sessionTimeout); 1448 LOG.debug( 1449 "Client attempting to establish new session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}", 1450 Long.toHexString(id), 1451 Long.toHexString(connReq.getLastZxidSeen()), 1452 connReq.getTimeOut(), 1453 cnxn.getRemoteSocketAddress()); 1454 } else { 1455 validateSession(cnxn, sessionId); 1456 LOG.debug( 1457 "Client attempting to renew session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}", 1458 Long.toHexString(sessionId), 1459 Long.toHexString(connReq.getLastZxidSeen()), 1460 connReq.getTimeOut(), 1461 cnxn.getRemoteSocketAddress()); 1462 if (serverCnxnFactory != null) { 1463 serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT); 1464 } 1465 if (secureServerCnxnFactory != null) { 1466 secureServerCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT); 1467 } 1468 cnxn.setSessionId(sessionId); 1469 reopenSession(cnxn, sessionId, passwd, sessionTimeout); 1470 ServerMetrics.getMetrics().CONNECTION_REVALIDATE_COUNT.add(1); 1471 1472 } 1473 } 1474 1475 /** 1476 * Validate if a particular session can be reestablished. 1477 * 1478 * @param cnxn 1479 * @param sessionId 1480 */ validateSession(ServerCnxn cnxn, long sessionId)1481 protected void validateSession(ServerCnxn cnxn, long sessionId) 1482 throws IOException { 1483 // do nothing 1484 } 1485 shouldThrottle(long outStandingCount)1486 public boolean shouldThrottle(long outStandingCount) { 1487 int globalOutstandingLimit = getGlobalOutstandingLimit(); 1488 if (globalOutstandingLimit < getInflight() || globalOutstandingLimit < getInProcess()) { 1489 return outStandingCount > 0; 1490 } 1491 return false; 1492 } 1493 getFlushDelay()1494 long getFlushDelay() { 1495 return flushDelay; 1496 } 1497 setFlushDelay(long delay)1498 static void setFlushDelay(long delay) { 1499 LOG.info("{}={}", FLUSH_DELAY, delay); 1500 flushDelay = delay; 1501 } 1502 getMaxWriteQueuePollTime()1503 long getMaxWriteQueuePollTime() { 1504 return maxWriteQueuePollTime; 1505 } 1506 setMaxWriteQueuePollTime(long maxTime)1507 static void setMaxWriteQueuePollTime(long maxTime) { 1508 LOG.info("{}={}", MAX_WRITE_QUEUE_POLL_SIZE, maxTime); 1509 maxWriteQueuePollTime = maxTime; 1510 } 1511 getMaxBatchSize()1512 int getMaxBatchSize() { 1513 return maxBatchSize; 1514 } 1515 setMaxBatchSize(int size)1516 static void setMaxBatchSize(int size) { 1517 LOG.info("{}={}", MAX_BATCH_SIZE, size); 1518 maxBatchSize = size; 1519 } 1520 initLargeRequestThrottlingSettings()1521 private void initLargeRequestThrottlingSettings() { 1522 setLargeRequestMaxBytes(Integer.getInteger("zookeeper.largeRequestMaxBytes", largeRequestMaxBytes)); 1523 setLargeRequestThreshold(Integer.getInteger("zookeeper.largeRequestThreshold", -1)); 1524 } 1525 getLargeRequestMaxBytes()1526 public int getLargeRequestMaxBytes() { 1527 return largeRequestMaxBytes; 1528 } 1529 setLargeRequestMaxBytes(int bytes)1530 public void setLargeRequestMaxBytes(int bytes) { 1531 if (bytes <= 0) { 1532 LOG.warn("Invalid max bytes for all large requests {}. It should be a positive number.", bytes); 1533 LOG.warn("Will not change the setting. The max bytes stay at {}", largeRequestMaxBytes); 1534 } else { 1535 largeRequestMaxBytes = bytes; 1536 LOG.info("The max bytes for all large requests are set to {}", largeRequestMaxBytes); 1537 } 1538 } 1539 getLargeRequestThreshold()1540 public int getLargeRequestThreshold() { 1541 return largeRequestThreshold; 1542 } 1543 setLargeRequestThreshold(int threshold)1544 public void setLargeRequestThreshold(int threshold) { 1545 if (threshold == 0 || threshold < -1) { 1546 LOG.warn("Invalid large request threshold {}. It should be -1 or positive. Setting to -1 ", threshold); 1547 largeRequestThreshold = -1; 1548 } else { 1549 largeRequestThreshold = threshold; 1550 LOG.info("The large request threshold is set to {}", largeRequestThreshold); 1551 } 1552 } 1553 getLargeRequestBytes()1554 public int getLargeRequestBytes() { 1555 return currentLargeRequestBytes.get(); 1556 } 1557 isLargeRequest(int length)1558 private boolean isLargeRequest(int length) { 1559 // The large request limit is disabled when threshold is -1 1560 if (largeRequestThreshold == -1) { 1561 return false; 1562 } 1563 return length > largeRequestThreshold; 1564 } 1565 checkRequestSizeWhenReceivingMessage(int length)1566 public boolean checkRequestSizeWhenReceivingMessage(int length) throws IOException { 1567 if (!isLargeRequest(length)) { 1568 return true; 1569 } 1570 if (currentLargeRequestBytes.get() + length <= largeRequestMaxBytes) { 1571 return true; 1572 } else { 1573 ServerMetrics.getMetrics().LARGE_REQUESTS_REJECTED.add(1); 1574 throw new IOException("Rejecting large request"); 1575 } 1576 1577 } 1578 checkRequestSizeWhenMessageReceived(int length)1579 private boolean checkRequestSizeWhenMessageReceived(int length) throws IOException { 1580 if (!isLargeRequest(length)) { 1581 return true; 1582 } 1583 1584 int bytes = currentLargeRequestBytes.addAndGet(length); 1585 if (bytes > largeRequestMaxBytes) { 1586 currentLargeRequestBytes.addAndGet(-length); 1587 ServerMetrics.getMetrics().LARGE_REQUESTS_REJECTED.add(1); 1588 throw new IOException("Rejecting large request"); 1589 } 1590 return true; 1591 } 1592 requestFinished(Request request)1593 public void requestFinished(Request request) { 1594 int largeRequestLength = request.getLargeRequestSize(); 1595 if (largeRequestLength != -1) { 1596 currentLargeRequestBytes.addAndGet(-largeRequestLength); 1597 } 1598 } 1599 processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer)1600 public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException { 1601 // We have the request, now process and setup for next 1602 InputStream bais = new ByteBufferInputStream(incomingBuffer); 1603 BinaryInputArchive bia = BinaryInputArchive.getArchive(bais); 1604 RequestHeader h = new RequestHeader(); 1605 h.deserialize(bia, "header"); 1606 1607 // Need to increase the outstanding request count first, otherwise 1608 // there might be a race condition that it enabled recv after 1609 // processing request and then disabled when check throttling. 1610 // 1611 // Be aware that we're actually checking the global outstanding 1612 // request before this request. 1613 // 1614 // It's fine if the IOException thrown before we decrease the count 1615 // in cnxn, since it will close the cnxn anyway. 1616 cnxn.incrOutstandingAndCheckThrottle(h); 1617 1618 // Through the magic of byte buffers, txn will not be 1619 // pointing 1620 // to the start of the txn 1621 incomingBuffer = incomingBuffer.slice(); 1622 if (h.getType() == OpCode.auth) { 1623 LOG.info("got auth packet {}", cnxn.getRemoteSocketAddress()); 1624 AuthPacket authPacket = new AuthPacket(); 1625 ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket); 1626 String scheme = authPacket.getScheme(); 1627 ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme); 1628 Code authReturn = KeeperException.Code.AUTHFAILED; 1629 if (ap != null) { 1630 try { 1631 // handleAuthentication may close the connection, to allow the client to choose 1632 // a different server to connect to. 1633 authReturn = ap.handleAuthentication( 1634 new ServerAuthenticationProvider.ServerObjs(this, cnxn), 1635 authPacket.getAuth()); 1636 } catch (RuntimeException e) { 1637 LOG.warn("Caught runtime exception from AuthenticationProvider: {}", scheme, e); 1638 authReturn = KeeperException.Code.AUTHFAILED; 1639 } 1640 } 1641 if (authReturn == KeeperException.Code.OK) { 1642 LOG.info("Session 0x{}: auth success for scheme {} and address {}", 1643 Long.toHexString(cnxn.getSessionId()), scheme, 1644 cnxn.getRemoteSocketAddress()); 1645 ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue()); 1646 cnxn.sendResponse(rh, null, null); 1647 } else { 1648 if (ap == null) { 1649 LOG.warn( 1650 "No authentication provider for scheme: {} has {}", 1651 scheme, 1652 ProviderRegistry.listProviders()); 1653 } else { 1654 LOG.warn("Authentication failed for scheme: {}", scheme); 1655 } 1656 // send a response... 1657 ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.AUTHFAILED.intValue()); 1658 cnxn.sendResponse(rh, null, null); 1659 // ... and close connection 1660 cnxn.sendBuffer(ServerCnxnFactory.closeConn); 1661 cnxn.disableRecv(); 1662 } 1663 return; 1664 } else if (h.getType() == OpCode.sasl) { 1665 processSasl(incomingBuffer, cnxn, h); 1666 } else { 1667 if (!authHelper.enforceAuthentication(cnxn, h.getXid())) { 1668 // Authentication enforcement is failed 1669 // Already sent response to user about failure and closed the session, lets return 1670 return; 1671 } else { 1672 Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo()); 1673 int length = incomingBuffer.limit(); 1674 if (isLargeRequest(length)) { 1675 // checkRequestSize will throw IOException if request is rejected 1676 checkRequestSizeWhenMessageReceived(length); 1677 si.setLargeRequestSize(length); 1678 } 1679 si.setOwner(ServerCnxn.me); 1680 submitRequest(si); 1681 } 1682 } 1683 } 1684 isSaslSuperUser(String id)1685 private static boolean isSaslSuperUser(String id) { 1686 if (id == null || id.isEmpty()) { 1687 return false; 1688 } 1689 1690 Properties properties = System.getProperties(); 1691 int prefixLen = SASL_SUPER_USER.length(); 1692 1693 for (String k : properties.stringPropertyNames()) { 1694 if (k.startsWith(SASL_SUPER_USER) 1695 && (k.length() == prefixLen || k.charAt(prefixLen) == '.')) { 1696 String value = properties.getProperty(k); 1697 1698 if (value != null && value.equals(id)) { 1699 return true; 1700 } 1701 } 1702 } 1703 1704 return false; 1705 } 1706 shouldAllowSaslFailedClientsConnect()1707 private static boolean shouldAllowSaslFailedClientsConnect() { 1708 return Boolean.getBoolean(ALLOW_SASL_FAILED_CLIENTS); 1709 } 1710 processSasl(ByteBuffer incomingBuffer, ServerCnxn cnxn, RequestHeader requestHeader)1711 private void processSasl(ByteBuffer incomingBuffer, ServerCnxn cnxn, RequestHeader requestHeader) throws IOException { 1712 LOG.debug("Responding to client SASL token."); 1713 GetSASLRequest clientTokenRecord = new GetSASLRequest(); 1714 ByteBufferInputStream.byteBuffer2Record(incomingBuffer, clientTokenRecord); 1715 byte[] clientToken = clientTokenRecord.getToken(); 1716 LOG.debug("Size of client SASL token: {}", clientToken.length); 1717 byte[] responseToken = null; 1718 try { 1719 ZooKeeperSaslServer saslServer = cnxn.zooKeeperSaslServer; 1720 try { 1721 // note that clientToken might be empty (clientToken.length == 0): 1722 // if using the DIGEST-MD5 mechanism, clientToken will be empty at the beginning of the 1723 // SASL negotiation process. 1724 responseToken = saslServer.evaluateResponse(clientToken); 1725 if (saslServer.isComplete()) { 1726 String authorizationID = saslServer.getAuthorizationID(); 1727 LOG.info("Session 0x{}: adding SASL authorization for authorizationID: {}", 1728 Long.toHexString(cnxn.getSessionId()), authorizationID); 1729 cnxn.addAuthInfo(new Id("sasl", authorizationID)); 1730 1731 if (isSaslSuperUser(authorizationID)) { 1732 cnxn.addAuthInfo(new Id("super", "")); 1733 LOG.info( 1734 "Session 0x{}: Authenticated Id '{}' as super user", 1735 Long.toHexString(cnxn.getSessionId()), 1736 authorizationID); 1737 } 1738 } 1739 } catch (SaslException e) { 1740 LOG.warn("Client {} failed to SASL authenticate: {}", cnxn.getRemoteSocketAddress(), e); 1741 if (shouldAllowSaslFailedClientsConnect() && !authHelper.isSaslAuthRequired()) { 1742 LOG.warn("Maintaining client connection despite SASL authentication failure."); 1743 } else { 1744 int error; 1745 if (authHelper.isSaslAuthRequired()) { 1746 LOG.warn( 1747 "Closing client connection due to server requires client SASL authenticaiton," 1748 + "but client SASL authentication has failed, or client is not configured with SASL " 1749 + "authentication."); 1750 error = Code.SESSIONCLOSEDREQUIRESASLAUTH.intValue(); 1751 } else { 1752 LOG.warn("Closing client connection due to SASL authentication failure."); 1753 error = Code.AUTHFAILED.intValue(); 1754 } 1755 1756 ReplyHeader replyHeader = new ReplyHeader(requestHeader.getXid(), 0, error); 1757 cnxn.sendResponse(replyHeader, new SetSASLResponse(null), "response"); 1758 cnxn.sendCloseSession(); 1759 cnxn.disableRecv(); 1760 return; 1761 } 1762 } 1763 } catch (NullPointerException e) { 1764 LOG.error("cnxn.saslServer is null: cnxn object did not initialize its saslServer properly."); 1765 } 1766 if (responseToken != null) { 1767 LOG.debug("Size of server SASL response: {}", responseToken.length); 1768 } 1769 1770 ReplyHeader replyHeader = new ReplyHeader(requestHeader.getXid(), 0, Code.OK.intValue()); 1771 Record record = new SetSASLResponse(responseToken); 1772 cnxn.sendResponse(replyHeader, record, "response"); 1773 } 1774 1775 // entry point for quorum/Learner.java processTxn(TxnHeader hdr, Record txn)1776 public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) { 1777 processTxnForSessionEvents(null, hdr, txn); 1778 return processTxnInDB(hdr, txn, null); 1779 } 1780 1781 // entry point for FinalRequestProcessor.java processTxn(Request request)1782 public ProcessTxnResult processTxn(Request request) { 1783 TxnHeader hdr = request.getHdr(); 1784 processTxnForSessionEvents(request, hdr, request.getTxn()); 1785 1786 final boolean writeRequest = (hdr != null); 1787 final boolean quorumRequest = request.isQuorum(); 1788 1789 // return fast w/o synchronization when we get a read 1790 if (!writeRequest && !quorumRequest) { 1791 return new ProcessTxnResult(); 1792 } 1793 synchronized (outstandingChanges) { 1794 ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn(), request.getTxnDigest()); 1795 1796 // request.hdr is set for write requests, which are the only ones 1797 // that add to outstandingChanges. 1798 if (writeRequest) { 1799 long zxid = hdr.getZxid(); 1800 while (!outstandingChanges.isEmpty() 1801 && outstandingChanges.peek().zxid <= zxid) { 1802 ChangeRecord cr = outstandingChanges.remove(); 1803 ServerMetrics.getMetrics().OUTSTANDING_CHANGES_REMOVED.add(1); 1804 if (cr.zxid < zxid) { 1805 LOG.warn( 1806 "Zxid outstanding 0x{} is less than current 0x{}", 1807 Long.toHexString(cr.zxid), 1808 Long.toHexString(zxid)); 1809 } 1810 if (outstandingChangesForPath.get(cr.path) == cr) { 1811 outstandingChangesForPath.remove(cr.path); 1812 } 1813 } 1814 } 1815 1816 // do not add non quorum packets to the queue. 1817 if (quorumRequest) { 1818 getZKDatabase().addCommittedProposal(request); 1819 } 1820 return rc; 1821 } 1822 } 1823 processTxnForSessionEvents(Request request, TxnHeader hdr, Record txn)1824 private void processTxnForSessionEvents(Request request, TxnHeader hdr, Record txn) { 1825 int opCode = (request == null) ? hdr.getType() : request.type; 1826 long sessionId = (request == null) ? hdr.getClientId() : request.sessionId; 1827 1828 if (opCode == OpCode.createSession) { 1829 if (hdr != null && txn instanceof CreateSessionTxn) { 1830 CreateSessionTxn cst = (CreateSessionTxn) txn; 1831 sessionTracker.commitSession(sessionId, cst.getTimeOut()); 1832 } else if (request == null || !request.isLocalSession()) { 1833 LOG.warn("*****>>>>> Got {} {}", txn.getClass(), txn.toString()); 1834 } 1835 } else if (opCode == OpCode.closeSession) { 1836 sessionTracker.removeSession(sessionId); 1837 } 1838 } 1839 processTxnInDB(TxnHeader hdr, Record txn, TxnDigest digest)1840 private ProcessTxnResult processTxnInDB(TxnHeader hdr, Record txn, TxnDigest digest) { 1841 if (hdr == null) { 1842 return new ProcessTxnResult(); 1843 } else { 1844 return getZKDatabase().processTxn(hdr, txn, digest); 1845 } 1846 } 1847 getSessionExpiryMap()1848 public Map<Long, Set<Long>> getSessionExpiryMap() { 1849 return sessionTracker.getSessionExpiryMap(); 1850 } 1851 1852 /** 1853 * This method is used to register the ZooKeeperServerShutdownHandler to get 1854 * server's error or shutdown state change notifications. 1855 * {@link ZooKeeperServerShutdownHandler#handle(State)} will be called for 1856 * every server state changes {@link #setState(State)}. 1857 * 1858 * @param zkShutdownHandler shutdown handler 1859 */ registerServerShutdownHandler(ZooKeeperServerShutdownHandler zkShutdownHandler)1860 void registerServerShutdownHandler(ZooKeeperServerShutdownHandler zkShutdownHandler) { 1861 this.zkShutdownHandler = zkShutdownHandler; 1862 } 1863 isResponseCachingEnabled()1864 public boolean isResponseCachingEnabled() { 1865 return isResponseCachingEnabled; 1866 } 1867 setResponseCachingEnabled(boolean isEnabled)1868 public void setResponseCachingEnabled(boolean isEnabled) { 1869 isResponseCachingEnabled = isEnabled; 1870 } 1871 getReadResponseCache()1872 public ResponseCache getReadResponseCache() { 1873 return isResponseCachingEnabled ? readResponseCache : null; 1874 } 1875 getGetChildrenResponseCache()1876 public ResponseCache getGetChildrenResponseCache() { 1877 return isResponseCachingEnabled ? getChildrenResponseCache : null; 1878 } 1879 registerMetrics()1880 protected void registerMetrics() { 1881 MetricsContext rootContext = ServerMetrics.getMetrics().getMetricsProvider().getRootContext(); 1882 1883 final ZKDatabase zkdb = this.getZKDatabase(); 1884 final ServerStats stats = this.serverStats(); 1885 1886 rootContext.registerGauge("avg_latency", stats::getAvgLatency); 1887 1888 rootContext.registerGauge("max_latency", stats::getMaxLatency); 1889 rootContext.registerGauge("min_latency", stats::getMinLatency); 1890 1891 rootContext.registerGauge("packets_received", stats::getPacketsReceived); 1892 rootContext.registerGauge("packets_sent", stats::getPacketsSent); 1893 rootContext.registerGauge("num_alive_connections", stats::getNumAliveClientConnections); 1894 1895 rootContext.registerGauge("outstanding_requests", stats::getOutstandingRequests); 1896 rootContext.registerGauge("uptime", stats::getUptime); 1897 1898 rootContext.registerGauge("znode_count", zkdb::getNodeCount); 1899 1900 rootContext.registerGauge("watch_count", zkdb.getDataTree()::getWatchCount); 1901 rootContext.registerGauge("ephemerals_count", zkdb.getDataTree()::getEphemeralsCount); 1902 1903 rootContext.registerGauge("approximate_data_size", zkdb.getDataTree()::cachedApproximateDataSize); 1904 1905 rootContext.registerGauge("global_sessions", zkdb::getSessionCount); 1906 rootContext.registerGauge("local_sessions", this.getSessionTracker()::getLocalSessionCount); 1907 1908 OSMXBean osMbean = new OSMXBean(); 1909 rootContext.registerGauge("open_file_descriptor_count", osMbean::getOpenFileDescriptorCount); 1910 rootContext.registerGauge("max_file_descriptor_count", osMbean::getMaxFileDescriptorCount); 1911 rootContext.registerGauge("connection_drop_probability", this::getConnectionDropChance); 1912 1913 rootContext.registerGauge("last_client_response_size", stats.getClientResponseStats()::getLastBufferSize); 1914 rootContext.registerGauge("max_client_response_size", stats.getClientResponseStats()::getMaxBufferSize); 1915 rootContext.registerGauge("min_client_response_size", stats.getClientResponseStats()::getMinBufferSize); 1916 1917 rootContext.registerGauge("outstanding_tls_handshake", this::getOutstandingHandshakeNum); 1918 rootContext.registerGauge("auth_failed_count", stats::getAuthFailedCount); 1919 rootContext.registerGauge("non_mtls_remote_conn_count", stats::getNonMTLSRemoteConnCount); 1920 rootContext.registerGauge("non_mtls_local_conn_count", stats::getNonMTLSLocalConnCount); 1921 } 1922 unregisterMetrics()1923 protected void unregisterMetrics() { 1924 1925 MetricsContext rootContext = ServerMetrics.getMetrics().getMetricsProvider().getRootContext(); 1926 1927 rootContext.unregisterGauge("avg_latency"); 1928 1929 rootContext.unregisterGauge("max_latency"); 1930 rootContext.unregisterGauge("min_latency"); 1931 1932 rootContext.unregisterGauge("packets_received"); 1933 rootContext.unregisterGauge("packets_sent"); 1934 rootContext.unregisterGauge("num_alive_connections"); 1935 1936 rootContext.unregisterGauge("outstanding_requests"); 1937 rootContext.unregisterGauge("uptime"); 1938 1939 rootContext.unregisterGauge("znode_count"); 1940 1941 rootContext.unregisterGauge("watch_count"); 1942 rootContext.unregisterGauge("ephemerals_count"); 1943 rootContext.unregisterGauge("approximate_data_size"); 1944 1945 rootContext.unregisterGauge("global_sessions"); 1946 rootContext.unregisterGauge("local_sessions"); 1947 1948 rootContext.unregisterGauge("open_file_descriptor_count"); 1949 rootContext.unregisterGauge("max_file_descriptor_count"); 1950 rootContext.unregisterGauge("connection_drop_probability"); 1951 1952 rootContext.unregisterGauge("last_client_response_size"); 1953 rootContext.unregisterGauge("max_client_response_size"); 1954 rootContext.unregisterGauge("min_client_response_size"); 1955 1956 rootContext.unregisterGauge("auth_failed_count"); 1957 rootContext.unregisterGauge("non_mtls_remote_conn_count"); 1958 rootContext.unregisterGauge("non_mtls_local_conn_count"); 1959 1960 1961 } 1962 1963 /** 1964 * Hook into admin server, useful to expose additional data 1965 * that do not represent metrics. 1966 * 1967 * @param response a sink which collects the data. 1968 */ dumpMonitorValues(BiConsumer<String, Object> response)1969 public void dumpMonitorValues(BiConsumer<String, Object> response) { 1970 ServerStats stats = serverStats(); 1971 response.accept("version", Version.getFullVersion()); 1972 response.accept("server_state", stats.getServerState()); 1973 } 1974 1975 /** 1976 * Grant or deny authorization to an operation on a node as a function of: 1977 * @param cnxn : the server connection 1978 * @param acl : set of ACLs for the node 1979 * @param perm : the permission that the client is requesting 1980 * @param ids : the credentials supplied by the client 1981 * @param path : the ZNode path 1982 * @param setAcls : for set ACL operations, the list of ACLs being set. Otherwise null. 1983 */ checkACL(ServerCnxn cnxn, List<ACL> acl, int perm, List<Id> ids, String path, List<ACL> setAcls)1984 public void checkACL(ServerCnxn cnxn, List<ACL> acl, int perm, List<Id> ids, String path, List<ACL> setAcls) throws KeeperException.NoAuthException { 1985 if (skipACL) { 1986 return; 1987 } 1988 1989 LOG.debug("Permission requested: {} ", perm); 1990 LOG.debug("ACLs for node: {}", acl); 1991 LOG.debug("Client credentials: {}", ids); 1992 1993 if (acl == null || acl.size() == 0) { 1994 return; 1995 } 1996 for (Id authId : ids) { 1997 if (authId.getScheme().equals("super")) { 1998 return; 1999 } 2000 } 2001 for (ACL a : acl) { 2002 Id id = a.getId(); 2003 if ((a.getPerms() & perm) != 0) { 2004 if (id.getScheme().equals("world") && id.getId().equals("anyone")) { 2005 return; 2006 } 2007 ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(id.getScheme()); 2008 if (ap != null) { 2009 for (Id authId : ids) { 2010 if (authId.getScheme().equals(id.getScheme()) 2011 && ap.matches( 2012 new ServerAuthenticationProvider.ServerObjs(this, cnxn), 2013 new ServerAuthenticationProvider.MatchValues(path, authId.getId(), id.getId(), perm, setAcls))) { 2014 return; 2015 } 2016 } 2017 } 2018 } 2019 } 2020 throw new KeeperException.NoAuthException(); 2021 } 2022 2023 /** 2024 * check a path whether exceeded the quota. 2025 * 2026 * @param path 2027 * the path of the node, used for the quota prefix check 2028 * @param lastData 2029 * the current node data, {@code null} for none 2030 * @param data 2031 * the data to be set, or {@code null} for none 2032 * @param type 2033 * currently, create and setData need to check quota 2034 */ checkQuota(String path, byte[] lastData, byte[] data, int type)2035 public void checkQuota(String path, byte[] lastData, byte[] data, int type) throws KeeperException.QuotaExceededException { 2036 if (!enforceQuota) { 2037 return; 2038 } 2039 long dataBytes = (data == null) ? 0 : data.length; 2040 ZKDatabase zkDatabase = getZKDatabase(); 2041 String lastPrefix = zkDatabase.getDataTree().getMaxPrefixWithQuota(path); 2042 if (StringUtils.isEmpty(lastPrefix)) { 2043 return; 2044 } 2045 2046 switch (type) { 2047 case OpCode.create: 2048 checkQuota(lastPrefix, dataBytes, 1); 2049 break; 2050 case OpCode.setData: 2051 checkQuota(lastPrefix, dataBytes - (lastData == null ? 0 : lastData.length), 0); 2052 break; 2053 default: 2054 throw new IllegalArgumentException("Unsupported OpCode for checkQuota: " + type); 2055 } 2056 } 2057 2058 /** 2059 * check a path whether exceeded the quota. 2060 * 2061 * @param lastPrefix 2062 the path of the node which has a quota. 2063 * @param bytesDiff 2064 * the diff to be added to number of bytes 2065 * @param countDiff 2066 * the diff to be added to the count 2067 */ checkQuota(String lastPrefix, long bytesDiff, long countDiff)2068 private void checkQuota(String lastPrefix, long bytesDiff, long countDiff) 2069 throws KeeperException.QuotaExceededException { 2070 LOG.debug("checkQuota: lastPrefix={}, bytesDiff={}, countDiff={}", lastPrefix, bytesDiff, countDiff); 2071 2072 // now check the quota we set 2073 String limitNode = Quotas.limitPath(lastPrefix); 2074 DataNode node = getZKDatabase().getNode(limitNode); 2075 StatsTrack limitStats; 2076 if (node == null) { 2077 // should not happen 2078 LOG.error("Missing limit node for quota {}", limitNode); 2079 return; 2080 } 2081 synchronized (node) { 2082 limitStats = new StatsTrack(node.data); 2083 } 2084 //check the quota 2085 boolean checkCountQuota = countDiff != 0 && (limitStats.getCount() > -1 || limitStats.getCountHardLimit() > -1); 2086 boolean checkByteQuota = bytesDiff != 0 && (limitStats.getBytes() > -1 || limitStats.getByteHardLimit() > -1); 2087 2088 if (!checkCountQuota && !checkByteQuota) { 2089 return; 2090 } 2091 2092 //check the statPath quota 2093 String statNode = Quotas.statPath(lastPrefix); 2094 node = getZKDatabase().getNode(statNode); 2095 2096 StatsTrack currentStats; 2097 if (node == null) { 2098 // should not happen 2099 LOG.error("Missing node for stat {}", statNode); 2100 return; 2101 } 2102 synchronized (node) { 2103 currentStats = new StatsTrack(node.data); 2104 } 2105 2106 //check the Count Quota 2107 if (checkCountQuota) { 2108 long newCount = currentStats.getCount() + countDiff; 2109 boolean isCountHardLimit = limitStats.getCountHardLimit() > -1 ? true : false; 2110 long countLimit = isCountHardLimit ? limitStats.getCountHardLimit() : limitStats.getCount(); 2111 2112 if (newCount > countLimit) { 2113 String msg = "Quota exceeded: " + lastPrefix + " [current count=" + newCount + ", " + (isCountHardLimit ? "hard" : "soft") + "CountLimit=" + countLimit + "]"; 2114 RATE_LOGGER.rateLimitLog(msg); 2115 if (isCountHardLimit) { 2116 throw new KeeperException.QuotaExceededException(lastPrefix); 2117 } 2118 } 2119 } 2120 2121 //check the Byte Quota 2122 if (checkByteQuota) { 2123 long newBytes = currentStats.getBytes() + bytesDiff; 2124 boolean isByteHardLimit = limitStats.getByteHardLimit() > -1 ? true : false; 2125 long byteLimit = isByteHardLimit ? limitStats.getByteHardLimit() : limitStats.getBytes(); 2126 if (newBytes > byteLimit) { 2127 String msg = "Quota exceeded: " + lastPrefix + " [current bytes=" + newBytes + ", " + (isByteHardLimit ? "hard" : "soft") + "ByteLimit=" + byteLimit + "]"; 2128 RATE_LOGGER.rateLimitLog(msg); 2129 if (isByteHardLimit) { 2130 throw new KeeperException.QuotaExceededException(lastPrefix); 2131 } 2132 } 2133 } 2134 } 2135 isDigestEnabled()2136 public static boolean isDigestEnabled() { 2137 return digestEnabled; 2138 } 2139 setDigestEnabled(boolean digestEnabled)2140 public static void setDigestEnabled(boolean digestEnabled) { 2141 LOG.info("{} = {}", ZOOKEEPER_DIGEST_ENABLED, digestEnabled); 2142 ZooKeeperServer.digestEnabled = digestEnabled; 2143 } 2144 2145 /** 2146 * Trim a path to get the immediate predecessor. 2147 * 2148 * @param path 2149 * @return 2150 * @throws KeeperException.BadArgumentsException 2151 */ parentPath(String path)2152 private String parentPath(String path) throws KeeperException.BadArgumentsException { 2153 int lastSlash = path.lastIndexOf('/'); 2154 if (lastSlash == -1 || path.indexOf('\0') != -1 || getZKDatabase().isSpecialPath(path)) { 2155 throw new KeeperException.BadArgumentsException(path); 2156 } 2157 return lastSlash == 0 ? "/" : path.substring(0, lastSlash); 2158 } 2159 effectiveACLPath(Request request)2160 private String effectiveACLPath(Request request) throws KeeperException.BadArgumentsException, KeeperException.InvalidACLException { 2161 boolean mustCheckACL = false; 2162 String path = null; 2163 List<ACL> acl = null; 2164 2165 switch (request.type) { 2166 case OpCode.create: 2167 case OpCode.create2: { 2168 CreateRequest req = new CreateRequest(); 2169 if (buffer2Record(request.request, req)) { 2170 mustCheckACL = true; 2171 acl = req.getAcl(); 2172 path = parentPath(req.getPath()); 2173 } 2174 break; 2175 } 2176 case OpCode.delete: { 2177 DeleteRequest req = new DeleteRequest(); 2178 if (buffer2Record(request.request, req)) { 2179 path = parentPath(req.getPath()); 2180 } 2181 break; 2182 } 2183 case OpCode.setData: { 2184 SetDataRequest req = new SetDataRequest(); 2185 if (buffer2Record(request.request, req)) { 2186 path = req.getPath(); 2187 } 2188 break; 2189 } 2190 case OpCode.setACL: { 2191 SetACLRequest req = new SetACLRequest(); 2192 if (buffer2Record(request.request, req)) { 2193 mustCheckACL = true; 2194 acl = req.getAcl(); 2195 path = req.getPath(); 2196 } 2197 break; 2198 } 2199 } 2200 2201 if (mustCheckACL) { 2202 /* we ignore the extrapolated ACL returned by fixupACL because 2203 * we only care about it being well-formed (and if it isn't, an 2204 * exception will be raised). 2205 */ 2206 PrepRequestProcessor.fixupACL(path, request.authInfo, acl); 2207 } 2208 2209 return path; 2210 } 2211 effectiveACLPerms(Request request)2212 private int effectiveACLPerms(Request request) { 2213 switch (request.type) { 2214 case OpCode.create: 2215 case OpCode.create2: 2216 return ZooDefs.Perms.CREATE; 2217 case OpCode.delete: 2218 return ZooDefs.Perms.DELETE; 2219 case OpCode.setData: 2220 return ZooDefs.Perms.WRITE; 2221 case OpCode.setACL: 2222 return ZooDefs.Perms.ADMIN; 2223 default: 2224 return ZooDefs.Perms.ALL; 2225 } 2226 } 2227 2228 /** 2229 * Check Write Requests for Potential Access Restrictions 2230 * <p/> 2231 * Before a request is being proposed to the quorum, lets check it 2232 * against local ACLs. Non-write requests (read, session, etc.) 2233 * are passed along. Invalid requests are sent a response. 2234 * <p/> 2235 * While we are at it, if the request will set an ACL: make sure it's 2236 * a valid one. 2237 * 2238 * @param request 2239 * @return true if request is permitted, false if not. 2240 * @throws java.io.IOException 2241 */ authWriteRequest(Request request)2242 public boolean authWriteRequest(Request request) { 2243 int err; 2244 String pathToCheck; 2245 2246 if (!enableEagerACLCheck) { 2247 return true; 2248 } 2249 2250 err = KeeperException.Code.OK.intValue(); 2251 2252 try { 2253 pathToCheck = effectiveACLPath(request); 2254 if (pathToCheck != null) { 2255 checkACL(request.cnxn, zkDb.getACL(pathToCheck, null), effectiveACLPerms(request), request.authInfo, pathToCheck, null); 2256 } 2257 } catch (KeeperException.NoAuthException e) { 2258 LOG.debug("Request failed ACL check", e); 2259 err = e.code().intValue(); 2260 } catch (KeeperException.InvalidACLException e) { 2261 LOG.debug("Request has an invalid ACL check", e); 2262 err = e.code().intValue(); 2263 } catch (KeeperException.NoNodeException e) { 2264 LOG.debug("ACL check against non-existent node: {}", e.getMessage()); 2265 } catch (KeeperException.BadArgumentsException e) { 2266 LOG.debug("ACL check against illegal node path: {}", e.getMessage()); 2267 } catch (Throwable t) { 2268 LOG.error("Uncaught exception in authWriteRequest with: ", t); 2269 throw t; 2270 } finally { 2271 if (err != KeeperException.Code.OK.intValue()) { 2272 /* This request has a bad ACL, so we are dismissing it early. */ 2273 decInProcess(); 2274 ReplyHeader rh = new ReplyHeader(request.cxid, 0, err); 2275 try { 2276 request.cnxn.sendResponse(rh, null, null); 2277 } catch (IOException e) { 2278 LOG.error("IOException : {}", e); 2279 } 2280 } 2281 } 2282 2283 return err == KeeperException.Code.OK.intValue(); 2284 } 2285 buffer2Record(ByteBuffer request, Record record)2286 private boolean buffer2Record(ByteBuffer request, Record record) { 2287 boolean rv = false; 2288 try { 2289 ByteBufferInputStream.byteBuffer2Record(request, record); 2290 request.rewind(); 2291 rv = true; 2292 } catch (IOException ex) { 2293 } 2294 2295 return rv; 2296 } 2297 getOutstandingHandshakeNum()2298 public int getOutstandingHandshakeNum() { 2299 if (serverCnxnFactory instanceof NettyServerCnxnFactory) { 2300 return ((NettyServerCnxnFactory) serverCnxnFactory).getOutstandingHandshakeNum(); 2301 } else { 2302 return 0; 2303 } 2304 } 2305 isReconfigEnabled()2306 public boolean isReconfigEnabled() { 2307 return this.reconfigEnabled; 2308 } 2309 getZkShutdownHandler()2310 public ZooKeeperServerShutdownHandler getZkShutdownHandler() { 2311 return zkShutdownHandler; 2312 } 2313 2314 } 2315