1 /*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2002, 2014 Oracle and/or its affiliates. All rights reserved. 5 * 6 */ 7 8 package com.sleepycat.je.rep.impl.node; 9 10 import static com.sleepycat.je.rep.impl.node.FeederManagerStatDefinition.N_FEEDERS_CREATED; 11 import static com.sleepycat.je.rep.impl.node.FeederManagerStatDefinition.N_FEEDERS_SHUTDOWN; 12 import static com.sleepycat.je.rep.impl.node.FeederManagerStatDefinition.N_MAX_REPLICA_LAG; 13 import static com.sleepycat.je.rep.impl.node.FeederManagerStatDefinition.N_MAX_REPLICA_LAG_NAME; 14 15 import java.io.IOException; 16 import java.util.Collections; 17 import java.util.HashMap; 18 import java.util.HashSet; 19 import java.util.Map; 20 import java.util.Map.Entry; 21 import java.util.Set; 22 import java.util.concurrent.BlockingQueue; 23 import java.util.concurrent.LinkedBlockingQueue; 24 import java.util.concurrent.TimeUnit; 25 import java.util.concurrent.atomic.AtomicBoolean; 26 import java.util.logging.Logger; 27 28 import com.sleepycat.je.DatabaseException; 29 import com.sleepycat.je.EnvironmentFailureException; 30 import com.sleepycat.je.StatsConfig; 31 import com.sleepycat.je.rep.UnknownMasterException; 32 import com.sleepycat.je.rep.impl.RepNodeImpl; 33 import com.sleepycat.je.rep.net.DataChannel; 34 import com.sleepycat.je.rep.stream.MasterStatus.MasterSyncException; 35 import com.sleepycat.je.rep.utilint.BinaryProtocolStatDefinition; 36 import com.sleepycat.je.rep.utilint.IntRunningTotalStat; 37 import com.sleepycat.je.rep.utilint.LongMaxZeroStat; 38 import com.sleepycat.je.rep.utilint.RepUtils; 39 import com.sleepycat.je.rep.utilint.SizeAwaitMap; 40 import com.sleepycat.je.rep.utilint.SizeAwaitMap.Predicate; 41 import com.sleepycat.je.utilint.IntStat; 42 import com.sleepycat.je.utilint.LoggerUtils; 43 import com.sleepycat.je.utilint.StatGroup; 44 import com.sleepycat.je.utilint.StringStat; 45 import com.sleepycat.je.utilint.VLSN; 46 47 /** 48 * FeedManager is responsible for the creation and management of the Feeders 49 * used to respond to connections initiated by a Replica. runfeeders() is the 50 * central loop that listens for replica connections and manages the lifecycle 51 * of individual Feeders. It's re-entered each time the node becomes a Master 52 * and is exited when its status changes. 53 * 54 * There is a single instance of FeederManager that is created for a 55 * replication node. There are many instances of Feeders per FeederManager. 56 * Each Feeder instance represents an instance of a connection between the node 57 * serving as the feeder and the replica. 58 * 59 * Note that the FeederManager and the Replica currently reuse the Replication 60 * node's thread of control. When we implement r2r we will need to revisit the 61 * thread management to provide for concurrent operation of the FeederManger 62 * and the Replica. 63 */ 64 final public class FeederManager { 65 66 private final RepNode repNode; 67 68 /* 69 * The queue into which the ServiceDispatcher queues socket channels for 70 * new Feeder instances. 71 */ 72 private final BlockingQueue<DataChannel> channelQueue = 73 new LinkedBlockingQueue<DataChannel>(); 74 75 /* 76 * Feeders are stored in either nascentFeeders or activeFeeders, and not 77 * both. To avoid deadlock, if locking both collections, lock 78 * nascentFeeders first and then activeFeeders. 79 */ 80 81 /* 82 * Nascent feeders that are starting up and are not yet active. They have 83 * network connections but have not synched up or completed handshakes. 84 * They are moved into the feeder map, once they become active. 85 */ 86 private final Set<Feeder> nascentFeeders = 87 Collections.synchronizedSet(new HashSet<Feeder>()); 88 89 /* 90 * The collection of active feeders currently feeding replicas. The map is 91 * indexed by the Replica's node name. Access to this map must be 92 * synchronized, since it's updated concurrently by the Feeders that share 93 * it. 94 * 95 * A feeder is considered to be active after it has completed the handshake 96 * sequence with its associated Replica. 97 * 98 * Note that the SizeAwaitMap will only wait for feeders that are connected 99 * to electable replicas, since those are the only ones participating in 100 * durability decisions. 101 */ 102 private final SizeAwaitMap<String, Feeder> activeFeeders; 103 104 /* 105 * A test delay introduced in the feeder loop to simulate a loaded master. 106 * The feeder waits this amount of time after each message is sent. 107 */ 108 private int testDelayMs=0; 109 110 /* Set to true to force a shutdown of the FeederManager. */ 111 AtomicBoolean shutdown = new AtomicBoolean(false); 112 113 /* 114 * Non null if the replication node must be shutdown as well. This is 115 * typically the result of an unexpected exception in the feeder. 116 */ 117 private RuntimeException repNodeShutdownException; 118 private final Logger logger; 119 120 /* FeederManager statistics. */ 121 private final StatGroup stats; 122 private final IntStat nFeedersCreated; 123 private final IntStat nFeedersShutdown; 124 125 /* 126 * The maximum lag across all replicas. Atomic values or synchronization 127 * are not used for the shared statistic to minimize overheads and the 128 * resulting occasional inaccuracy in the statics is an acceptable 129 * tradeoff. 130 */ 131 private final LongMaxZeroStat nMaxReplicaLag; 132 private final StringStat nMaxReplicaLagName; 133 134 /* The frequency with which the Feeder checks for a master change */ 135 public static final int MASTER_CHANGE_CHECK_TIMEOUT = 1000; 136 137 /* Identifies the Feeder Service. */ 138 public static final String FEEDER_SERVICE = "Feeder"; 139 FeederManager(RepNode repNode)140 FeederManager(RepNode repNode) { 141 this.repNode = repNode; 142 activeFeeders = new SizeAwaitMap<String, Feeder>( 143 repNode.getRepImpl(), new MatchElectableFeeders()); 144 logger = LoggerUtils.getLogger(getClass()); 145 stats = new StatGroup(FeederManagerStatDefinition.GROUP_NAME, 146 FeederManagerStatDefinition.GROUP_DESC); 147 nFeedersCreated = new IntRunningTotalStat(stats, N_FEEDERS_CREATED); 148 nFeedersShutdown = new IntRunningTotalStat(stats, N_FEEDERS_SHUTDOWN); 149 nMaxReplicaLag = new LongMaxZeroStat(stats, N_MAX_REPLICA_LAG); 150 nMaxReplicaLagName = new StringStat(stats, N_MAX_REPLICA_LAG_NAME); 151 } 152 153 /** 154 * A SizeAwaitMap predicate that matches feeders connected to electable 155 * replicas. 156 */ 157 private class MatchElectableFeeders implements Predicate<Feeder> { 158 @Override match(final Feeder value)159 public boolean match(final Feeder value) { 160 161 /* The replica node might be null during unit testing */ 162 final RepNodeImpl replica = value.getReplicaNode(); 163 return (replica != null) && 164 repNode.getDurabilityQuorum().replicaAcksQualify(replica); 165 } 166 } 167 168 /** 169 * Returns the statistics associated with the FeederManager. 170 * 171 * @return the statistics 172 */ getFeederManagerStats(StatsConfig config)173 public StatGroup getFeederManagerStats(StatsConfig config) { 174 175 synchronized (stats) { 176 return stats.cloneGroup(config.getClear()); 177 } 178 } 179 180 /* Get the protocol stats for this FeederManager. */ getProtocolStats(StatsConfig config)181 public StatGroup getProtocolStats(StatsConfig config) { 182 /* Aggregate stats that have not yet been aggregated. */ 183 StatGroup protocolStats = 184 new StatGroup(BinaryProtocolStatDefinition.GROUP_NAME, 185 BinaryProtocolStatDefinition.GROUP_DESC); 186 synchronized (activeFeeders) { 187 for (Feeder feeder : activeFeeders.values()) { 188 protocolStats.addAll(feeder.getProtocolStats(config)); 189 } 190 } 191 192 return protocolStats; 193 } 194 195 /* Reset the feeders' stats of this FeederManager. */ resetStats()196 public void resetStats() { 197 synchronized (stats) { 198 stats.clear(); 199 } 200 synchronized (activeFeeders) { 201 for (Feeder feeder : activeFeeders.values()) { 202 feeder.resetStats(); 203 } 204 } 205 } 206 207 /** 208 * Accumulates statistics from a terminating feeder. 209 * @param stats 210 */ incStats(StatGroup feederStats)211 void incStats(StatGroup feederStats) { 212 synchronized (stats) { 213 stats.addAll(feederStats); 214 } 215 } 216 getTestDelayMs()217 public int getTestDelayMs() { 218 return testDelayMs; 219 } 220 setTestDelayMs(int testDelayMs)221 public void setTestDelayMs(int testDelayMs) { 222 this.testDelayMs = testDelayMs; 223 } 224 225 /** 226 * Returns the RepNode associated with the FeederManager 227 * @return 228 */ repNode()229 RepNode repNode() { 230 return repNode; 231 } 232 233 /** 234 * Returns the Feeder associated with the node, if such a feeder is 235 * currently active. 236 */ getFeeder(String nodeName)237 public Feeder getFeeder(String nodeName) { 238 return activeFeeders.get(nodeName); 239 } 240 241 /* 242 * For test use only. 243 */ putFeeder(String nodeName, Feeder feeder)244 public Feeder putFeeder(String nodeName, Feeder feeder) { 245 return activeFeeders.put(nodeName, feeder); 246 } 247 getnMaxReplicaLag()248 public LongMaxZeroStat getnMaxReplicaLag() { 249 return nMaxReplicaLag; 250 } 251 getnMaxReplicaLagName()252 public StringStat getnMaxReplicaLagName() { 253 return nMaxReplicaLagName; 254 } 255 setRepNodeShutdownException(RuntimeException rNSE)256 void setRepNodeShutdownException(RuntimeException rNSE) { 257 this.repNodeShutdownException = rNSE; 258 } 259 260 /** 261 * The numbers of Replicas currently "active" with this feeder. Active 262 * currently means they are connected. It does not make any guarantees 263 * about where they are in the replication stream. They may, for example, 264 * be too far behind to participate in timely acks. 265 * 266 * @return the active replica count 267 */ activeReplicaCount()268 public int activeReplicaCount() { 269 return activeFeeders.size(); 270 } 271 272 /** 273 * Returns the set of Replicas that are currently active with this feeder. 274 * A replica is active if it has completed the handshake sequence. 275 * 276 * @return the set of replica node names 277 */ activeReplicas()278 public Set<String> activeReplicas() { 279 synchronized (activeFeeders) { 280 281 /* 282 * Create a copy to avoid inadvertent concurrency conflicts, 283 * since the keySet is a view of the underlying map. 284 */ 285 return new HashSet<String>(activeFeeders.keySet()); 286 } 287 } 288 289 /** 290 * Returns the set of replicas that are currently active with this feeder 291 * and that supply acknowledgments. A replica is active if it has 292 * completed the handshake sequence. 293 * 294 * @return the set of replica node names 295 */ activeAckReplicas()296 public Set<String> activeAckReplicas() { 297 final Set<String> nodeNames = new HashSet<String>(); 298 synchronized (activeFeeders) { 299 for (final Entry<String, Feeder> entry : 300 activeFeeders.entrySet()) { 301 final Feeder feeder = entry.getValue(); 302 303 /* The replica node should be non-null for an active feeder */ 304 final RepNodeImpl replica = feeder.getReplicaNode(); 305 if (replica.getType().isElectable()) { 306 final String nodeName = entry.getKey(); 307 nodeNames.add(nodeName); 308 } 309 } 310 } 311 return nodeNames; 312 } 313 activeReplicasMap()314 public Map<String, Feeder> activeReplicasMap() { 315 synchronized (activeFeeders){ 316 return new HashMap<String, Feeder>(activeFeeders); 317 } 318 } 319 320 /** 321 * Transitions a Feeder to being active, so that it can be used in 322 * considerations relating to commit acknowledgments and decisions about 323 * choosing feeders related to system load. 324 * 325 * @param feeder the feeder being transitioned. 326 * 327 * @throws DuplicateReplicaException if the Feeder is already active 328 */ activateFeeder(Feeder feeder)329 void activateFeeder(Feeder feeder) { 330 synchronized (nascentFeeders) { 331 synchronized (activeFeeders) { 332 boolean removed = nascentFeeders.remove(feeder); 333 if (feeder.isShutdown()) { 334 return; 335 } 336 assert(removed); 337 String replicaName = feeder.getReplicaNameIdPair().getName(); 338 assert(!feeder.getReplicaNameIdPair().equals(NameIdPair.NULL)); 339 Feeder dup = activeFeeders.get(replicaName); 340 if ((dup != null) && !dup.isShutdown()) { 341 throw EnvironmentFailureException. 342 unexpectedState(repNode.getRepImpl(), 343 feeder.getReplicaNameIdPair() + 344 " is present in both nascent and " + 345 "active feeder sets"); 346 } 347 activeFeeders.put(replicaName, feeder); 348 349 MasterTransfer xfr = repNode.getActiveTransfer(); 350 if (xfr != null) { 351 xfr.addFeeder(feeder); 352 } 353 } 354 } 355 } 356 357 /** 358 * Remove the feeder from the sets used to track it. Invoked when a feeder 359 * is shutdown. 360 * 361 * @param feeder 362 */ removeFeeder(Feeder feeder)363 void removeFeeder(Feeder feeder) { 364 assert(feeder.isShutdown()); 365 final String replicaName = feeder.getReplicaNameIdPair().getName(); 366 synchronized (nascentFeeders) { 367 synchronized (activeFeeders) { 368 nascentFeeders.remove(feeder); 369 activeFeeders.remove(replicaName); 370 } 371 } 372 373 final RepNodeImpl node = feeder.getReplicaNode(); 374 if ((node != null) && node.getType().isSecondary()) { 375 repNode.removeSecondaryNode(node); 376 } 377 } 378 379 /** 380 * Clears and shuts down the runFeeders by inserting a special EOF marker 381 * value into the queue. 382 */ shutdownQueue()383 void shutdownQueue() { 384 if (!repNode.isShutdown()) { 385 throw EnvironmentFailureException.unexpectedState 386 ("Rep node is still active"); 387 } 388 channelQueue.clear(); 389 /* Add special entry so that the channelQueue.poll operation exits. */ 390 channelQueue.add(RepUtils.CHANNEL_EOF_MARKER); 391 } 392 393 /** 394 * The core feeder listener loop that is run either in a Master node, or in 395 * a Replica that is serving as a Feeder to other Replica nodes. The core 396 * loop accepts connections from Replicas as they come in and establishes a 397 * Feeder on that connection. 398 * 399 * The loop can be terminated for one of the following reasons: 400 * 401 * 1) A change in Masters. 402 * 403 * 2) A forced shutdown, via a thread interrupt. 404 * 405 * 3) A server socket level exception. 406 * 407 * The timeout on the accept is used to ensure that the check is done at 408 * least once per timeout period. 409 */ runFeeders()410 void runFeeders() 411 throws DatabaseException { 412 413 if (shutdown.get()) { 414 throw EnvironmentFailureException.unexpectedState 415 ("Feeder manager was shutdown"); 416 } 417 Exception feederShutdownException = null; 418 LoggerUtils.info(logger, repNode.getRepImpl(), 419 "Feeder manager accepting requests."); 420 421 /* This updater represents the masters's local cbvlsn, which the master 422 updates directly. */ 423 final LocalCBVLSNUpdater updater = new LocalCBVLSNUpdater( 424 repNode.getNameIdPair(), repNode.getNodeType(), repNode); 425 final LocalCBVLSNTracker tracker = repNode.getCBVLSNTracker(); 426 427 try { 428 /* 429 * Ensure that the Global CBVLSN is initialized for the master when 430 * it first comes up; it's subsequently maintained in the loop 431 * below. 432 */ 433 updater.updateForMaster(tracker); 434 435 repNode.getServiceDispatcher(). 436 register(FEEDER_SERVICE, channelQueue); 437 438 /* 439 * The Feeder is ready for business, indicate that the node is 440 * ready by counting down the latch and releasing any waiters. 441 */ 442 repNode.getReadyLatch().countDown(); 443 444 while (true) { 445 final DataChannel feederReplicaChannel = 446 channelQueue.poll(MASTER_CHANGE_CHECK_TIMEOUT, 447 TimeUnit.MILLISECONDS); 448 449 if (feederReplicaChannel == RepUtils.CHANNEL_EOF_MARKER) { 450 LoggerUtils.info(logger, repNode.getRepImpl(), 451 "Feeder manager soft shutdown."); 452 return; 453 } 454 455 repNode.getMasterStatus().assertSync(); 456 if (feederReplicaChannel == null) { 457 if (repNode.isShutdownOrInvalid()) { 458 /* Timeout and shutdown request */ 459 LoggerUtils.info(logger, repNode.getRepImpl(), 460 "Feeder manager forced shutdown."); 461 return; 462 } 463 464 /* 465 * Take this opportunity to update this node's CBVLSN The 466 * replicas are sending in their CBVLSNs through the 467 * heartbeat responses, but a master does not send any 468 * heartbeat responses, and needs a different path to 469 * update its local CBVLSN. 470 */ 471 updater.updateForMaster(tracker); 472 continue; 473 } 474 475 nFeedersCreated.increment(); 476 try { 477 Feeder feeder = new Feeder(this, feederReplicaChannel); 478 nascentFeeders.add(feeder); 479 feeder.startFeederThreads(); 480 } catch (IOException e) { 481 482 /* 483 * Indicates a feeder socket level exception. 484 */ 485 LoggerUtils.fine 486 (logger, repNode.getRepImpl(), 487 "Feeder I/O exception: " + e.getMessage()); 488 try { 489 feederReplicaChannel.close(); 490 } catch (IOException e1) { 491 LoggerUtils.fine 492 (logger, repNode.getRepImpl(), 493 "Exception during cleanup." + e.getMessage()); 494 } 495 continue; 496 } 497 } 498 } catch (MasterSyncException e) { 499 LoggerUtils.info(logger, repNode.getRepImpl(), 500 "Master change: " + e.getMessage()); 501 502 feederShutdownException = new UnknownMasterException("Node " + 503 repNode.getRepImpl().getName() + 504 " is not a master anymore"); 505 } catch (InterruptedException e) { 506 if (this.repNodeShutdownException != null) { 507 508 /* 509 * The interrupt was issued to propagate an exception from one 510 * of the Feeder threads. It's not a normal exit. 511 */ 512 LoggerUtils.warning(logger, repNode.getRepImpl(), 513 "Feeder manager unexpected interrupt"); 514 throw repNodeShutdownException; /* Terminate the rep node */ 515 } 516 if (repNode.isShutdown()) { 517 LoggerUtils.info(logger, repNode.getRepImpl(), 518 "Feeder manager interrupted for shutdown"); 519 return; 520 } 521 feederShutdownException = e; 522 LoggerUtils.warning(logger, repNode.getRepImpl(), 523 "Feeder manager unexpected interrupt"); 524 } finally { 525 repNode.resetReadyLatch(feederShutdownException); 526 repNode.getServiceDispatcher().cancel(FEEDER_SERVICE); 527 shutdownFeeders(feederShutdownException); 528 LoggerUtils.info(logger, repNode.getRepImpl(), 529 "Feeder manager exited. CurrentTxnEnd VLSN: " + 530 repNode.getCurrentTxnEndVLSN()); 531 } 532 } 533 534 /** 535 * Shuts down all the feeders managed by the FeederManager 536 * 537 * @param feederShutdownException the exception provoking the shutdown. 538 */ shutdownFeeders(Exception feederShutdownException)539 private void shutdownFeeders(Exception feederShutdownException) { 540 541 boolean changed = shutdown.compareAndSet(false, true); 542 if (!changed) { 543 return; 544 } 545 546 try { 547 /* Copy sets for safe iteration in the presence of deletes.*/ 548 final Set<Feeder> feederSet; 549 synchronized (nascentFeeders) { 550 synchronized (activeFeeders) { 551 feederSet = new HashSet<Feeder>(activeFeeders.values()); 552 feederSet.addAll(nascentFeeders); 553 } 554 } 555 556 for (Feeder feeder : feederSet) { 557 nFeedersShutdown.increment(); 558 feeder.shutdown(feederShutdownException); 559 } 560 } finally { 561 if (feederShutdownException == null) { 562 feederShutdownException = 563 new IllegalStateException("FeederManager shutdown"); 564 } 565 activeFeeders.clear(feederShutdownException); 566 nascentFeeders.clear(); 567 } 568 } 569 570 /** 571 * Shuts down a specific feeder. It's typically done in response to the 572 * removal of a member from the group. 573 */ shutdownFeeder(RepNodeImpl node)574 public void shutdownFeeder(RepNodeImpl node) { 575 Feeder feeder = activeFeeders.get(node.getName()); 576 if (feeder == null) { 577 return; 578 } 579 nFeedersShutdown.increment(); 580 feeder.shutdown(null); 581 } 582 583 /** 584 * Block until the required number of electable feeders/replica connections 585 * are established. Used for establishing durability quorums. Since this is 586 * counting feeder/replica connections, requiredReplicaCount does not 587 * include the master. 588 */ awaitFeederReplicaConnections( int requiredReplicaCount, long insufficientReplicasTimeout)589 public boolean awaitFeederReplicaConnections( 590 int requiredReplicaCount, long insufficientReplicasTimeout) 591 throws InterruptedException { 592 593 return activeFeeders.sizeAwait(requiredReplicaCount, 594 insufficientReplicasTimeout, 595 TimeUnit.MILLISECONDS); 596 } 597 598 /* 599 * For debugging help, and for expanded exception messages, dump feeder 600 * related state. If acksOnly is true, only include information about 601 * feeders for replicas that supply acknowledgments. 602 */ dumpState(final boolean acksOnly)603 public String dumpState(final boolean acksOnly) { 604 StringBuilder sb = new StringBuilder(); 605 synchronized (activeFeeders) { 606 Set<Map.Entry<String, Feeder>> feeds = activeFeeders.entrySet(); 607 if (feeds.size() == 0) { 608 sb.append("No feeders."); 609 } else { 610 sb.append("Current feeds:"); 611 for (Map.Entry<String, Feeder> feedEntry : feeds) { 612 final Feeder feeder = feedEntry.getValue(); 613 614 /* 615 * Ignore secondary nodes if only want nodes that provide 616 * acknowledgments 617 */ 618 if (acksOnly && 619 feeder.getReplicaNode().getType().isSecondary()) { 620 continue; 621 } 622 sb.append("\n ").append(feedEntry.getKey()).append(": "); 623 sb.append(feeder.dumpState()); 624 } 625 } 626 } 627 return sb.toString(); 628 } 629 630 /** 631 * Returns a count of the number of feeders whose replicas are counted in 632 * durability decisions and have acknowledged txn-end VLSNs >= the 633 * commitVLSN argument. 634 * 635 * @param commitVLSN the commitVLSN being checked 636 */ getNumCurrentAckFeeders(VLSN commitVLSN)637 public int getNumCurrentAckFeeders(VLSN commitVLSN) { 638 final DurabilityQuorum durabilityQuorum = 639 repNode.getDurabilityQuorum(); 640 int count = 0; 641 synchronized (activeFeeders) { 642 for (Feeder feeder : activeFeeders.values()) { 643 if ((commitVLSN.compareTo(feeder.getReplicaTxnEndVLSN()) <= 0) 644 && durabilityQuorum.replicaAcksQualify( 645 feeder.getReplicaNode())) { 646 count++; 647 } 648 } 649 return count; 650 } 651 } 652 } 653