1 /*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2002, 2014 Oracle and/or its affiliates. All rights reserved. 5 * 6 */ 7 8 package com.sleepycat.je.rep.impl.node; 9 10 import static com.sleepycat.je.rep.ReplicatedEnvironment.State.DETACHED; 11 import static com.sleepycat.je.rep.ReplicatedEnvironment.State.MASTER; 12 import static com.sleepycat.je.rep.ReplicatedEnvironment.State.REPLICA; 13 import static com.sleepycat.je.rep.ReplicatedEnvironment.State.UNKNOWN; 14 import static com.sleepycat.je.rep.impl.RepParams.DBTREE_CACHE_CLEAR_COUNT; 15 import static com.sleepycat.je.rep.impl.RepParams.ENV_CONSISTENCY_TIMEOUT; 16 import static com.sleepycat.je.rep.impl.RepParams.HEARTBEAT_INTERVAL; 17 import static com.sleepycat.je.rep.impl.RepParams.IGNORE_SECONDARY_NODE_ID; 18 import static com.sleepycat.je.rep.impl.RepParams.LOG_FLUSH_TASK_INTERVAL; 19 import static com.sleepycat.je.rep.impl.RepParams.NODE_TYPE; 20 import static com.sleepycat.je.rep.impl.RepParams.REPLAY_COST_PERCENT; 21 import static com.sleepycat.je.rep.impl.RepParams.REPLAY_FREE_DISK_PERCENT; 22 import static com.sleepycat.je.rep.impl.RepParams.RESET_REP_GROUP_RETAIN_UUID; 23 import static com.sleepycat.je.rep.impl.RepParams.RUN_LOG_FLUSH_TASK; 24 25 import java.io.IOException; 26 import java.net.InetAddress; 27 import java.net.InetSocketAddress; 28 import java.util.BitSet; 29 import java.util.Date; 30 import java.util.HashMap; 31 import java.util.HashSet; 32 import java.util.Map; 33 import java.util.Map.Entry; 34 import java.util.NavigableSet; 35 import java.util.Set; 36 import java.util.Timer; 37 import java.util.TreeSet; 38 import java.util.UUID; 39 import java.util.concurrent.TimeUnit; 40 import java.util.logging.Level; 41 import java.util.logging.Logger; 42 43 import com.sleepycat.je.CheckpointConfig; 44 import com.sleepycat.je.DatabaseException; 45 import com.sleepycat.je.EnvironmentFailureException; 46 import com.sleepycat.je.JEVersion; 47 import com.sleepycat.je.RecoveryProgress; 48 import com.sleepycat.je.ReplicaConsistencyPolicy; 49 import com.sleepycat.je.StatsConfig; 50 import com.sleepycat.je.cleaner.Cleaner; 51 import com.sleepycat.je.cleaner.FileSelector; 52 import com.sleepycat.je.cleaner.FileSummary; 53 import com.sleepycat.je.dbi.DbConfigManager; 54 import com.sleepycat.je.dbi.StartupTracker.Phase; 55 import com.sleepycat.je.log.FileManager; 56 import com.sleepycat.je.log.LogEntryType; 57 import com.sleepycat.je.log.LogManager; 58 import com.sleepycat.je.rep.AppStateMonitor; 59 import com.sleepycat.je.rep.GroupShutdownException; 60 import com.sleepycat.je.rep.InsufficientLogException; 61 import com.sleepycat.je.rep.MasterStateException; 62 import com.sleepycat.je.rep.MasterTransferFailureException; 63 import com.sleepycat.je.rep.MemberActiveException; 64 import com.sleepycat.je.rep.MemberNotFoundException; 65 import com.sleepycat.je.rep.NodeType; 66 import com.sleepycat.je.rep.QuorumPolicy; 67 import com.sleepycat.je.rep.RepInternal; 68 import com.sleepycat.je.rep.ReplicaConsistencyException; 69 import com.sleepycat.je.rep.ReplicaStateException; 70 import com.sleepycat.je.rep.ReplicatedEnvironment; 71 import com.sleepycat.je.rep.ReplicatedEnvironmentStats; 72 import com.sleepycat.je.rep.ReplicationNode; 73 import com.sleepycat.je.rep.RestartRequiredException; 74 import com.sleepycat.je.rep.UnknownMasterException; 75 import com.sleepycat.je.rep.arbitration.Arbiter; 76 import com.sleepycat.je.rep.elections.Elections; 77 import com.sleepycat.je.rep.elections.Proposer.Proposal; 78 import com.sleepycat.je.rep.elections.TimebasedProposalGenerator; 79 import com.sleepycat.je.rep.impl.BinaryNodeStateProtocol; 80 import com.sleepycat.je.rep.impl.BinaryNodeStateProtocol.BinaryNodeStateResponse; 81 import com.sleepycat.je.rep.impl.BinaryNodeStateService; 82 import com.sleepycat.je.rep.impl.GroupService; 83 import com.sleepycat.je.rep.impl.MinJEVersionUnsupportedException; 84 import com.sleepycat.je.rep.impl.NodeStateService; 85 import com.sleepycat.je.rep.impl.PointConsistencyPolicy; 86 import com.sleepycat.je.rep.impl.RepGroupDB; 87 import com.sleepycat.je.rep.impl.RepGroupImpl; 88 import com.sleepycat.je.rep.impl.RepGroupImpl.NodeConflictException; 89 import com.sleepycat.je.rep.impl.RepGroupProtocol; 90 import com.sleepycat.je.rep.impl.RepGroupProtocol.GroupResponse; 91 import com.sleepycat.je.rep.impl.RepImpl; 92 import com.sleepycat.je.rep.impl.RepNodeImpl; 93 import com.sleepycat.je.rep.impl.RepParams; 94 import com.sleepycat.je.rep.impl.TextProtocol.MessageExchange; 95 import com.sleepycat.je.rep.impl.TextProtocol.ResponseMessage; 96 import com.sleepycat.je.rep.monitor.LeaveGroupEvent.LeaveReason; 97 import com.sleepycat.je.rep.net.DataChannel; 98 import com.sleepycat.je.rep.net.DataChannelFactory.ConnectOptions; 99 import com.sleepycat.je.rep.stream.FeederTxns; 100 import com.sleepycat.je.rep.stream.MasterChangeListener; 101 import com.sleepycat.je.rep.stream.MasterStatus; 102 import com.sleepycat.je.rep.stream.MasterSuggestionGenerator; 103 import com.sleepycat.je.rep.util.ldiff.LDiffService; 104 import com.sleepycat.je.rep.utilint.RepUtils; 105 import com.sleepycat.je.rep.utilint.RepUtils.ExceptionAwareCountDownLatch; 106 import com.sleepycat.je.rep.utilint.ServiceDispatcher; 107 import com.sleepycat.je.rep.vlsn.VLSNIndex; 108 import com.sleepycat.je.utilint.FileStoreInfo; 109 import com.sleepycat.je.utilint.LoggerUtils; 110 import com.sleepycat.je.utilint.StoppableThread; 111 import com.sleepycat.je.utilint.TestHook; 112 import com.sleepycat.je.utilint.TestHookExecute; 113 import com.sleepycat.je.utilint.VLSN; 114 115 /** 116 * Represents a replication node. This class is the locus of operations that 117 * manage the state of the node, master, replica, etc. Once the state of a node 118 * has been established the thread of control passes over to the Replica or 119 * FeederManager instances. 120 * 121 * Note that both Feeders and the Replica instance may be active in future when 122 * we support r2r replication, in addition to m2r replication. For now however, 123 * either the FeederManager is active, or the Replica is and the same common 124 * thread control can be shared between the two. 125 */ 126 public class RepNode extends StoppableThread { 127 128 /* 129 * The unique node name and internal id that identifies the node within 130 * the rep group. There is a canonical instance of this that's updated 131 * when the node joins the group. 132 */ 133 private final NameIdPair nameIdPair; 134 135 /* 136 * The socket address on which Replicas connect to me, were this node 137 * to become the master. 138 */ 139 private final InetSocketAddress mySocket; 140 141 /* The service dispatcher used by this replication node. */ 142 private final ServiceDispatcher serviceDispatcher; 143 144 /* The election instance for this node */ 145 private Elections elections; 146 147 /* The locus of operations when the node is a replica. */ 148 private final Replica replica; 149 150 /* Used when the node is a feeder. */ 151 private FeederManager feederManager; 152 153 /* 154 * The status of the Master. Note that this is the leading state as 155 * communicated to this node via the Listener. The node itself may not as 156 * yet have responded to this state change announced by the Listener. That 157 * is, nodeState, may reflect a different state until the transition to 158 * this state has been completed. 159 */ 160 private final MasterStatus masterStatus; 161 private final MasterChangeListener changeListener; 162 private final MasterSuggestionGenerator suggestionGenerator; 163 164 /* 165 * Represents the application visible state of this node. It may lag the 166 * state as described by masterStatus. 167 */ 168 private final NodeState nodeState; 169 170 private final RepImpl repImpl; 171 172 /* The encapsulated internal replication group database. */ 173 final RepGroupDB repGroupDB; 174 175 /* 176 * The latch used to indicate that the node has a well defined state as a 177 * Master or Replica and has finished the node-specific initialization that 178 * will permit it to function immediately in that capacity. 179 * 180 * For a Master it means that it's ready to start accepting connections 181 * from Replicas. 182 * 183 * For a Replica, it means that it has established a connection with a 184 * Feeder, completed the handshake process that validates it as being a 185 * legitimate member of the group, established a sync point, and is ready 186 * to start replaying the replication stream. 187 */ 188 private volatile ExceptionAwareCountDownLatch readyLatch = null; 189 190 /* 191 * Latch used to freeze txn commit VLSN advancement during an election. 192 */ 193 private final CommitFreezeLatch vlsnFreezeLatch = new CommitFreezeLatch(); 194 195 /* 196 * Describes the nodes that form the group. This information is dynamic 197 * it's initialized at startup and subsequently as a result of changes 198 * made either directly to it, when the node is a master, or via the 199 * replication stream, when it is a Replica. 200 */ 201 volatile private RepGroupImpl group; 202 203 /* 204 * Determines the election policy to use when the node holds its very first 205 * elections 206 */ 207 private QuorumPolicy electionQuorumPolicy = QuorumPolicy.SIMPLE_MAJORITY; 208 209 /* 210 * Amount of times to sleep between retries when a new node tries to locate 211 * a master. 212 */ 213 private static final int MASTER_QUERY_INTERVAL = 10000; 214 215 /* Number of times to retry joining on a retryable exception. */ 216 private static final int JOIN_RETRIES = 10; 217 218 /* 219 * Encapsulates access to current time, to arrange for testing of clock 220 * skews. 221 */ 222 private final Clock clock; 223 224 private com.sleepycat.je.rep.impl.networkRestore.FeederManager 225 logFeederManager; 226 private LDiffService ldiff; 227 private NodeStateService nodeStateService; 228 private BinaryNodeStateService binaryNodeStateService; 229 private GroupService groupService; 230 231 /* tracks the local CBVLSN for this node. */ 232 final LocalCBVLSNTracker cbvlsnTracker; 233 234 /* The currently in-progress Master Transfer operation, if any. */ 235 private MasterTransfer xfrInProgress; 236 237 /* calculates and manages the global, cached CBVLSN */ 238 final GlobalCBVLSN globalCBVLSN; 239 240 /* Determines how long to wait for a replica to catch up on a close. */ 241 private long replicaCloseCatchupMs = -1; 242 243 /* Manage and notify MonitorChangeEvents fired by this RepNode. */ 244 private MonitorEventManager monitorEventManager; 245 246 /* The user defined AppStateMonitor which gets the application state. */ 247 private AppStateMonitor appStateMonitor; 248 249 /* A timer used to track inactive socket channels used by the RepNode. */ 250 private final Timer timer; 251 private final ChannelTimeoutTask channelTimeoutTask; 252 private LogFlusher logFlusher; 253 254 final Logger logger; 255 256 /* Locus of election and durability quorum decisions */ 257 private final ElectionQuorum electionQuorum; 258 private final DurabilityQuorum durabilityQuorum; 259 260 private final Arbiter arbiter; 261 private final NodeType nodeType; 262 263 /** Manages the allocation of node IDs for secondary nodes. */ 264 private final SecondaryNodeIds secondaryNodeIds = 265 new SecondaryNodeIds(RepGroupImpl.MAX_SECONDARY_NODES); 266 267 /** 268 * Synchronize on this object when setting the minimum JE version or adding 269 * a secondary node, which could change the JE versions of the nodes to 270 * check when setting a new minimum. 271 * 272 * @see #setMinJEVersion 273 * @see #addSecondaryNode 274 */ 275 private final Object minJEVersionLock = new Object(); 276 277 /** 278 * The relative cost of replay as compared with network restore, as 279 * specified by 280 * {@link com.sleepycat.je.rep.ReplicationConfig#REPLAY_COST_PERCENT}. 281 */ 282 private final int replayCostPercent; 283 284 /** 285 * The percentage of free disk space to maintain when choosing files to 286 * retain based on replay cost, as specified by {@link 287 * com.sleepycat.je.rep.ReplicationConfig#REPLAY_FREE_DISK_PERCENT}. 288 */ 289 private final int replayFreeDiskPercent; 290 291 /** 292 * The minimum VLSN that should be retained as requested by 293 * replayCostPercent and replayFreeDiskPercent, or NULL_VLSN if disabled. 294 */ 295 private volatile VLSN replayCostMinVLSN = VLSN.NULL_VLSN; 296 297 /* Used by tests only. */ 298 private int logVersion = LogEntryType.LOG_VERSION; 299 300 /* For unit testing */ 301 private Set<TestHook<Integer>> convertHooks; 302 RepNode(RepImpl repImpl, Replay replay, NodeState nodeState)303 public RepNode(RepImpl repImpl, 304 Replay replay, 305 NodeState nodeState) 306 throws IOException, DatabaseException { 307 308 super(repImpl, "RepNode " + repImpl.getNameIdPair()); 309 310 this.repImpl = repImpl; 311 readyLatch = new ExceptionAwareCountDownLatch(repImpl, 1); 312 nameIdPair = repImpl.getNameIdPair(); 313 logger = LoggerUtils.getLogger(getClass()); 314 315 this.mySocket = repImpl.getSocket(); 316 this.serviceDispatcher = 317 new ServiceDispatcher(mySocket, repImpl, 318 repImpl.getChannelFactory()); 319 serviceDispatcher.start(); 320 clock = new Clock(RepImpl.getClockSkewMs()); 321 this.repGroupDB = new RepGroupDB(repImpl); 322 323 masterStatus = new MasterStatus(nameIdPair); 324 replica = ReplicaFactory.create(this, replay); 325 326 feederManager = new FeederManager(this); 327 changeListener = new MasterChangeListener(this); 328 suggestionGenerator = new MasterSuggestionGenerator(this); 329 330 this.nodeState = nodeState; 331 332 electionQuorum = new ElectionQuorum(repImpl); 333 durabilityQuorum = new DurabilityQuorum(repImpl); 334 335 utilityServicesStart(); 336 this.cbvlsnTracker = new LocalCBVLSNTracker(this); 337 this.globalCBVLSN = new GlobalCBVLSN(this); 338 this.monitorEventManager = new MonitorEventManager(this); 339 timer = new Timer(true); 340 channelTimeoutTask = new ChannelTimeoutTask(timer); 341 configLogFlusher(getConfigManager()); 342 343 arbiter = new Arbiter(repImpl); 344 nodeType = NodeType.valueOf(getConfigManager().get(NODE_TYPE)); 345 replayCostPercent = getConfigManager().getInt(REPLAY_COST_PERCENT); 346 replayFreeDiskPercent = getReplayFreeDiskPercentParameter(); 347 } 348 utilityServicesStart()349 private void utilityServicesStart() { 350 ldiff = new LDiffService(serviceDispatcher, repImpl); 351 logFeederManager = 352 new com.sleepycat.je.rep.impl.networkRestore.FeederManager 353 (serviceDispatcher, repImpl, nameIdPair); 354 355 /* Register the node state querying service. */ 356 nodeStateService = new NodeStateService(serviceDispatcher, this); 357 serviceDispatcher.register(nodeStateService); 358 359 binaryNodeStateService = 360 new BinaryNodeStateService(serviceDispatcher, this); 361 groupService = new GroupService(serviceDispatcher, this); 362 serviceDispatcher.register(groupService); 363 } 364 getReplayFreeDiskPercentParameter()365 private int getReplayFreeDiskPercentParameter() { 366 final int value = getConfigManager().getInt(REPLAY_FREE_DISK_PERCENT); 367 if (value == 0) { 368 return 0; 369 } 370 try { 371 FileStoreInfo.checkSupported(); 372 return value; 373 } catch (UnsupportedOperationException e) { 374 LoggerUtils.warning( 375 logger, repImpl, 376 "The " + REPLAY_FREE_DISK_PERCENT.getName() + 377 " parameter was specified, but is not supported: " + 378 e.getMessage()); 379 return 0; 380 } 381 } 382 383 /* Create a placeholder node, for test purposes only. */ RepNode(NameIdPair nameIdPair)384 public RepNode(NameIdPair nameIdPair) { 385 this(nameIdPair, null); 386 } 387 RepNode()388 public RepNode() { 389 this(NameIdPair.NULL); 390 } 391 RepNode(NameIdPair nameIdPair, ServiceDispatcher serviceDispatcher)392 public RepNode(NameIdPair nameIdPair, 393 ServiceDispatcher serviceDispatcher) { 394 super("RepNode " + nameIdPair); 395 repImpl = null; 396 clock = new Clock(0); 397 398 this.nameIdPair = nameIdPair; 399 mySocket = null; 400 this.serviceDispatcher = serviceDispatcher; 401 402 this.repGroupDB = null; 403 404 masterStatus = new MasterStatus(NameIdPair.NULL); 405 replica = null; 406 feederManager = null; 407 changeListener = null; 408 suggestionGenerator = null; 409 nodeState = null; 410 cbvlsnTracker = null; 411 globalCBVLSN = null; 412 logger = null; 413 timer = null; 414 channelTimeoutTask = null; 415 electionQuorum = null; 416 durabilityQuorum = null; 417 arbiter = null; 418 nodeType = NodeType.ELECTABLE; 419 replayCostPercent = 0; 420 replayFreeDiskPercent = 0; 421 } 422 423 @Override getLogger()424 public Logger getLogger() { 425 return logger; 426 } 427 428 /** 429 * Returns the node type of this node. 430 */ getNodeType()431 public NodeType getNodeType() { 432 return nodeType; 433 } 434 435 /** 436 * Returns the timer associated with this RepNode 437 */ getTimer()438 public Timer getTimer() { 439 return timer; 440 } 441 getServiceDispatcher()442 public ServiceDispatcher getServiceDispatcher() { 443 return serviceDispatcher; 444 } 445 446 /** 447 * Returns the accumulated statistics for this node. The method 448 * encapsulates the statistics associated with its two principal components 449 * the FeederManager and the Replica. 450 */ getStats(StatsConfig config)451 public ReplicatedEnvironmentStats getStats(StatsConfig config) { 452 return RepInternal.makeReplicatedEnvironmentStats(repImpl, config); 453 } 454 resetStats()455 public void resetStats() { 456 feederManager.resetStats(); 457 replica.resetStats(); 458 } 459 getReadyLatch()460 public ExceptionAwareCountDownLatch getReadyLatch() { 461 return readyLatch; 462 } 463 getVLSNFreezeLatch()464 public CommitFreezeLatch getVLSNFreezeLatch() { 465 return vlsnFreezeLatch; 466 } 467 resetReadyLatch(Exception exception)468 public void resetReadyLatch(Exception exception) { 469 ExceptionAwareCountDownLatch old = readyLatch; 470 readyLatch = new ExceptionAwareCountDownLatch(repImpl, 1); 471 if (old.getCount() != 0) { 472 /* releasing latch in some error situation. */ 473 old.releaseAwait(exception); 474 } 475 } 476 477 /* The methods below return the components of the rep node. */ feederManager()478 public FeederManager feederManager() { 479 return feederManager; 480 } 481 replica()482 public Replica replica() { 483 return replica; 484 } 485 getClock()486 public Clock getClock() { 487 return clock; 488 } 489 getReplica()490 public Replica getReplica() { 491 return replica; 492 } 493 getRepGroupDB()494 public RepGroupDB getRepGroupDB() { 495 return repGroupDB; 496 } 497 498 /** 499 * Retrieves the node's current snapshot image of the group definition. 500 * <p> 501 * There is a very brief period of time, during node start-up, where this 502 * can be <code>null</code>. But after that it should always return a 503 * valid object. 504 */ getGroup()505 public RepGroupImpl getGroup() { 506 return group; 507 } 508 509 /** 510 * Returns the UUID associated with the replicated environment. 511 */ getUUID()512 public UUID getUUID() { 513 if (group == null) { 514 throw EnvironmentFailureException.unexpectedState 515 ("Group info is not available"); 516 } 517 return group.getUUID(); 518 } 519 520 /** 521 * Returns the nodeName associated with this replication node. 522 * 523 * @return the nodeName 524 */ getNodeName()525 public String getNodeName() { 526 return nameIdPair.getName(); 527 } 528 529 /** 530 * Returns the nodeId associated with this replication node. 531 * 532 * @return the nodeId 533 */ getNodeId()534 public int getNodeId() { 535 return nameIdPair.getId(); 536 } 537 getNameIdPair()538 public NameIdPair getNameIdPair() { 539 return nameIdPair; 540 } 541 getSocket()542 public InetSocketAddress getSocket() { 543 return mySocket; 544 } 545 getMasterStatus()546 public MasterStatus getMasterStatus() { 547 return masterStatus; 548 } 549 550 /** 551 * Returns a definitive answer to whether this node is currently the master 552 * by checking both its status as a master and whether the group agrees 553 * that it is the master. 554 * 555 * Such an authoritative answer is needed in a network partition situation 556 * to detect a master that may be isolated on the minority side of a 557 * network partition. 558 * 559 * @return true if the node is definitely the master. False if it's not or 560 * we cannot be sure. 561 */ isAuthoritativeMaster()562 public boolean isAuthoritativeMaster() { 563 return (electionQuorum.isAuthoritativeMaster(getMasterStatus(), 564 feederManager)); 565 } 566 getHeartbeatInterval()567 public int getHeartbeatInterval() { 568 return getConfigManager().getInt(HEARTBEAT_INTERVAL); 569 } 570 571 /* For unit testing only. */ setVersion(int version)572 public void setVersion(int version) { 573 logVersion = version; 574 } 575 getLogVersion()576 public int getLogVersion() { 577 return logVersion; 578 } 579 getElectionPriority()580 public int getElectionPriority() { 581 final int priority = 582 getConfigManager().getInt(RepParams.NODE_PRIORITY); 583 final int defaultPriority = 584 Integer.parseInt(RepParams.NODE_PRIORITY.getDefault()); 585 return (getConfigManager().getBoolean(RepParams.DESIGNATED_PRIMARY) && 586 (priority == defaultPriority)) ? 587 defaultPriority + 1 : /* Raise its priority. */ 588 priority; /* Explicit priority, leave it intact. */ 589 } 590 591 /* 592 * Amount of time to wait for a thread to finish on a shutdown. It's 593 * a multiple of a heartbeat, since a thread typically polls for a 594 * shutdown once per heartbeat. 595 */ getThreadWaitInterval()596 public int getThreadWaitInterval() { 597 return getHeartbeatInterval()*4; 598 } 599 getDbTreeCacheClearingOpCount()600 int getDbTreeCacheClearingOpCount() { 601 return getConfigManager().getInt(DBTREE_CACHE_CLEAR_COUNT); 602 } 603 getRepImpl()604 public RepImpl getRepImpl() { 605 return repImpl; 606 } 607 getLogManager()608 public LogManager getLogManager() { 609 return repImpl.getLogManager(); 610 } 611 getConfigManager()612 DbConfigManager getConfigManager() { 613 return repImpl.getConfigManager(); 614 } 615 getVLSNIndex()616 public VLSNIndex getVLSNIndex() { 617 return repImpl.getVLSNIndex(); 618 } 619 getFeederTxns()620 public FeederTxns getFeederTxns() { 621 return repImpl.getFeederTxns(); 622 } 623 getElections()624 public Elections getElections() { 625 return elections; 626 } 627 getSuggestionGenerator()628 public MasterSuggestionGenerator getSuggestionGenerator() { 629 return suggestionGenerator; 630 } 631 632 /* Used by unit tests only. */ getElectionPolicy()633 public QuorumPolicy getElectionPolicy() { 634 return electionQuorumPolicy; 635 } 636 637 /** 638 * Returns an array of nodes suitable for feeding log files for a network 639 * restore. 640 * 641 * @return an array of feeder nodes 642 */ getLogProviders()643 public RepNodeImpl[] getLogProviders() { 644 final Set<RepNodeImpl> nodes = getGroup().getDataMembers(); 645 return nodes.toArray(new RepNodeImpl[nodes.size()]); 646 } 647 648 /* Used by unit tests only. */ getLogFlusher()649 public LogFlusher getLogFlusher() { 650 return logFlusher; 651 } 652 653 /* Configure the log flusher according to the configuration changes. */ configLogFlusher(DbConfigManager configMgr)654 public void configLogFlusher(DbConfigManager configMgr) { 655 boolean enableTask = configMgr.getBoolean(RUN_LOG_FLUSH_TASK); 656 int flushInterval = configMgr.getDuration(LOG_FLUSH_TASK_INTERVAL); 657 658 /* Cancel the log flushing the task if we want to. */ 659 if (!enableTask) { 660 if (logFlusher != null) { 661 logFlusher.cancelTask(); 662 } 663 664 return; 665 } 666 667 /* Create LogFlusher if it's null and we do want to start the task. */ 668 if (logFlusher == null) { 669 logFlusher = new LogFlusher(this, timer); 670 } 671 672 /* Configure the flushing task. */ 673 logFlusher.configFlushTask(flushInterval); 674 } 675 getChannelTimeoutTask()676 public ChannelTimeoutTask getChannelTimeoutTask() { 677 return channelTimeoutTask; 678 } 679 isMaster()680 public boolean isMaster() { 681 return masterStatus.isNodeMaster(); 682 } 683 getMonitorEventManager()684 public MonitorEventManager getMonitorEventManager() { 685 return monitorEventManager; 686 } 687 688 /** 689 * Register an AppStateMonitor with this RepNode. 690 */ registerAppStateMonitor(AppStateMonitor stateMonitor)691 public void registerAppStateMonitor(AppStateMonitor stateMonitor) { 692 this.appStateMonitor = stateMonitor; 693 } 694 695 /** 696 * Return the application state that defined in user specified 697 * AppStateMonitor. 698 */ getAppState()699 public byte[] getAppState() { 700 701 /* 702 * If the AppStateMonitor is not defined, or there is currently no 703 * returned application state, return null. 704 */ 705 if (appStateMonitor == null || appStateMonitor.getAppState() == null) { 706 return null; 707 } 708 709 /* Application state shouldn't be a zero length byte array. */ 710 if (appStateMonitor.getAppState().length == 0) { 711 throw new IllegalStateException 712 ("Application state should be a byte array larger than 0."); 713 } 714 715 return appStateMonitor.getAppState(); 716 } 717 718 /* Get the current master name if it exists. */ getMasterName()719 public String getMasterName() { 720 if (masterStatus.getGroupMasterNameId().getId() == 721 NameIdPair.NULL_NODE_ID) { 722 return null; 723 } 724 725 return masterStatus.getGroupMasterNameId().getName(); 726 } 727 728 /** 729 * Returns the latest VLSN associated with a replicated commit. Note that 730 * since the lastTxnEndVLSN is computed outside the write log latch, via 731 * EnvironmentImpl.registerVLSN(LogItem) it's possible for it to be behind 732 * on an instantaneous basis, but it will eventually catch up when the 733 * updates quiesce. 734 */ getCurrentTxnEndVLSN()735 public VLSN getCurrentTxnEndVLSN() { 736 return repImpl.getLastTxnEnd(); 737 } 738 739 /* 740 * Testing API used to force this node as a master. The mastership is 741 * communicated upon election completion via the Listener. It's the 742 * responsibility of the caller to ensure that only one node is forced 743 * at a time via this API. 744 * 745 * @param force true to force this node as the master, false reverts back 746 * to use of normal (non-preemptive) elections. 747 */ forceMaster(boolean force)748 public void forceMaster(boolean force) 749 throws InterruptedException, DatabaseException { 750 751 suggestionGenerator.forceMaster(force); 752 /* Initiate elections to make the changed proposal heard. */ 753 refreshCachedGroup(); 754 elections.initiateElection(group, electionQuorumPolicy); 755 } 756 757 /** 758 * Starts up the thread in which the node does its processing as a master 759 * or replica. It then waits for the newly started thread to transition it 760 * out of the DETACHED state, and returns upon completion of this 761 * transition. 762 * 763 * @throws DatabaseException 764 */ startup(QuorumPolicy initialElectionPolicy)765 private void startup(QuorumPolicy initialElectionPolicy) 766 throws DatabaseException { 767 768 if (isAlive()) { 769 return; 770 } 771 772 if (nodeState.getRepEnvState().isDetached()) { 773 nodeState.changeAndNotify(UNKNOWN, NameIdPair.NULL); 774 } 775 elections = new Elections(this, 776 changeListener, 777 suggestionGenerator); 778 779 repImpl.getStartupTracker().start(Phase.FIND_MASTER); 780 try { 781 782 if (repImpl.getConfigManager(). 783 getBoolean(RepParams.RESET_REP_GROUP)) { 784 /* Invoked by DbResetRepGroup utility */ 785 reinitSelfElect(); 786 } else { 787 findMaster(); 788 } 789 this.electionQuorumPolicy = initialElectionPolicy; 790 791 /* Electable members should participate in elections */ 792 if (electionQuorum.nodeTypeParticipates(nodeType)) { 793 elections.participate(); 794 } 795 } finally { 796 repImpl.getStartupTracker().stop(Phase.FIND_MASTER); 797 } 798 799 start(); 800 } 801 802 /** 803 * This method must be invoked when a RepNode is first initialized and 804 * subsequently every time there is a change to the replication group. 805 * <p> 806 * The Master should invoke this method each time a member is added or 807 * removed, and a replica should invoke it each time it detects the commit 808 * of a transaction that modifies the membership database. 809 * <p> 810 * In addition, it must be invoked after a syncup operation, since it may 811 * revert changes made to the membership table. 812 * 813 * @throws DatabaseException 814 */ refreshCachedGroup()815 public RepGroupImpl refreshCachedGroup() 816 throws DatabaseException { 817 818 group = repGroupDB.getGroup(); 819 elections.updateRepGroup(group); 820 if (nameIdPair.hasNullId()) { 821 RepNodeImpl n = group.getMember(nameIdPair.getName()); 822 if (n != null) { 823 824 /* 825 * Don't update the node ID for a secondary node if 826 * IGNORE_SECONDARY_NODE_ID is true. In that case, we are 827 * trying to convert a previously electable node to a secondary 828 * node, so the information about the electable node ID in the 829 * local copy of the rep group DB should be ignored. 830 */ 831 if (!nodeType.isSecondary() || 832 !getConfigManager().getBoolean(IGNORE_SECONDARY_NODE_ID)) { 833 /* May not be sufficiently current in the rep stream. */ 834 nameIdPair.update(n.getNameIdPair()); 835 } 836 } 837 } 838 return group; 839 } 840 841 /** 842 * Removes a node so that it's no longer a member of the group. 843 * 844 * Note that names referring to removed nodes cannot be reused. 845 * 846 * @param nodeName identifies the node to be removed 847 * 848 * @throws MemberNotFoundException if the node denoted by 849 * <code>memberName</code> is not a member of the replication group. 850 * 851 * @throws MasterStateException if the member being removed is currently 852 * the Master 853 * 854 * @see <a href="https://sleepycat.oracle.com/trac/wiki/DynamicGroupMembership#DeletingMembers">Member Deletion</a> 855 */ removeMember(String nodeName)856 public void removeMember(String nodeName) { 857 removeMember(nodeName, false); 858 } 859 860 /** 861 * Remove or delete a node from the group. If deleting a node, the node 862 * must not be active. 863 * 864 * <p>Note that names referring to removed nodes cannot be reused, but 865 * names for deleted nodes can be. 866 * 867 * @param nodeName identifies the node to be removed or deleted 868 * 869 * @param delete whether to delete the node rather than just remove it 870 * 871 * @throws MemberActiveException if {@code delete} is {@code true} and 872 * the node is currently active 873 * 874 * @throws MemberNotFoundException if the node denoted by 875 * <code>memberName</code> is not a member of the replication group. 876 * 877 * @throws MasterStateException if the member being removed or deleted is 878 * currently the Master 879 */ removeMember(String nodeName, boolean delete)880 public void removeMember(String nodeName, boolean delete) { 881 checkValidity( 882 nodeName, delete ? "Deleting member" : "Removing member"); 883 884 if (delete && feederManager.activeReplicas().contains(nodeName)) { 885 throw new MemberActiveException( 886 "Attempt to delete an active node: " + nodeName); 887 } 888 889 /* 890 * First remove it from the cached group, effectively setting new 891 * durability requirements, for the ensuing group db updates. 892 */ 893 RepNodeImpl node = group.removeMember(nodeName, delete); 894 895 /* 896 * Shutdown any feeder that may be active with the replica. Unless 897 * deleting, any subsequent attempts by the replica to rejoin the group 898 * will result in a failure. 899 */ 900 feederManager.shutdownFeeder(node); 901 repGroupDB.removeMember(node, delete); 902 } 903 904 /** 905 * Update the network address of a node. 906 * 907 * Note that an alive node's address can't be updated, we'll throw an 908 * ReplicaStateException for this case. 909 * 910 * @param nodeName identifies the node to be updated 911 * @param newHostName the new host name of this node 912 * @param newPort the new port of this node 913 */ updateAddress(String nodeName, String newHostName, int newPort)914 public void updateAddress(String nodeName, 915 String newHostName, 916 int newPort) { 917 final RepNodeImpl node = 918 checkValidity(nodeName, "Updating node's address"); 919 920 /* Check whether the node is still alive. */ 921 if (feederManager.getFeeder(nodeName) != null) { 922 throw new ReplicaStateException 923 ("Can't update the network address for a live node."); 924 } 925 926 /* Update the node information in the group database. */ 927 node.setHostName(newHostName); 928 node.setPort(newPort); 929 node.setQuorumAck(false); 930 repGroupDB.updateMember(node, true); 931 } 932 933 /** 934 * Transfer the master role to one of the specified replicas. 935 * <p> 936 * We delegate most of the real work to an instance of the {@link 937 * MasterTransfer} class. Here, after some simple initial validity 938 * checking, we're concerned with coordinating the potential for multiple 939 * overlapping Master Transfer operation attempts. The possible outcomes 940 * are: 941 * <ol> 942 * <li>complete success ({@code done == true}) 943 * <ul> 944 * <li> 945 * don't unblock txns here; that'll happen automatically as part of the 946 * usual handling when the environment transitions from master->replica 947 * state. 948 * <li> 949 * don't clear xfrInProgress, because we don't want to allow another 950 * attempt to supersede 951 * </ul> 952 * <li>timeout before establishing a winner (no superseder) 953 * <ul> 954 * <li>unblock txns 955 * <li>clear xfrInProgress 956 * </ul> 957 * <li>superseded (see {@link #setUpTransfer}) 958 * <ul> 959 * <li>abort existing op (if permitted), unblock txns before unleashing the 960 * new one 961 * <li>replace xfrInProgress 962 * </ul> 963 * <li>env is closed (or invalidated because of an error) during the 964 * operation 965 * <ul> 966 * <li>release the block 967 * <li>leave xfrInProgress as is. 968 * </ul> 969 * </ol> 970 * 971 * @param replicas candidate targets for new master role 972 * @param timeout time limit, in msec 973 * @param force whether to replace any existing, in-progress 974 * transfer operation 975 */ transferMaster(Set<String> replicas, long timeout, boolean force)976 public String transferMaster(Set<String> replicas, 977 long timeout, 978 boolean force) { 979 if (replicas == null || replicas.isEmpty()) { 980 throw new IllegalArgumentException 981 ("Parameter 'replicas' cannot be null or empty"); 982 } 983 if (!nodeState.getRepEnvState().isMaster()) { 984 throw new IllegalStateException("Not currently master"); 985 } 986 if (replicas.contains(getNodeName())) { 987 988 /* 989 * The local node is on the list of candidate new masters, and 990 * we're already master: the operation is trivially satisfied. 991 */ 992 return getNodeName(); 993 } 994 for (String rep : replicas) { 995 RepNodeImpl node = group.getNode(rep); 996 if (node == null || node.isRemoved()) { 997 throw new IllegalArgumentException 998 ("Node '" + rep + 999 "' is not currently an active member of the group"); 1000 } else if (!node.getType().isElectable()) { 1001 throw new IllegalArgumentException 1002 ("Node '" + rep + 1003 "' must have node type ELECTABLE, but had type " + 1004 node.getType()); 1005 } 1006 } 1007 1008 MasterTransfer xfr = setUpTransfer(replicas, timeout, force); 1009 boolean done = false; 1010 try { 1011 String winner = xfr.transfer(); 1012 done = true; 1013 return winner; 1014 } finally { 1015 synchronized (this) { 1016 if (xfrInProgress == xfr && !done) { 1017 xfrInProgress = null; 1018 } 1019 } 1020 } 1021 } 1022 1023 /** 1024 * Sets up a Master Transfer operation, ensuring that only one operation 1025 * can be in progress at a time. 1026 */ setUpTransfer(Set<String> replicas, long timeout, boolean force)1027 synchronized private MasterTransfer setUpTransfer(Set<String> replicas, 1028 long timeout, 1029 boolean force) { 1030 boolean reject = false; // initial guess, refine below if nec. 1031 if (xfrInProgress != null) { 1032 reject = true; // next best guess, refine below again if nec. 1033 1034 /* 1035 * If the new operation is "forcing", see if we can abort the 1036 * existing one. 1037 */ 1038 if (force && 1039 xfrInProgress.abort 1040 (new MasterTransferFailureException("superseded"))) { 1041 reject = false; 1042 1043 repImpl.unblockTxnCompletion(); 1044 } 1045 } 1046 if (reject) { 1047 throw new MasterTransferFailureException 1048 ("another Master Transfer (started at " + 1049 new Date(xfrInProgress.getStartTime()) + 1050 ") is already in progress"); 1051 } 1052 xfrInProgress = new MasterTransfer(replicas, timeout, this); 1053 return xfrInProgress; 1054 } 1055 getActiveTransfer()1056 public MasterTransfer getActiveTransfer() { 1057 return xfrInProgress; 1058 } 1059 1060 /** 1061 * Called by the RepNode when a transition to replica status has completely 1062 * finished. 1063 */ clearActiveTransfer()1064 public synchronized void clearActiveTransfer() { 1065 xfrInProgress = null; 1066 } 1067 1068 /** 1069 * Performs some basic validity checking, common code for some 1070 * Group Membership operations. 1071 * 1072 * @param nodeName name of a replica node on which an operation is 1073 * to be performed 1074 * @param actionName textual description of the operation (for 1075 * exception message) 1076 * @return the named node 1077 */ checkValidity(String nodeName, String actionName)1078 private RepNodeImpl checkValidity(String nodeName, String actionName) 1079 throws MemberNotFoundException { 1080 1081 if (!nodeState.getRepEnvState().isMaster()) { 1082 throw EnvironmentFailureException.unexpectedState 1083 ("Not currently a master. " + actionName + " must be " + 1084 "invoked on the node that's currently the master."); 1085 } 1086 1087 final RepNodeImpl node = group.getNode(nodeName); 1088 if (node == null) { 1089 throw new MemberNotFoundException("Node:" + nodeName + 1090 "is not a member of the group:" + 1091 group.getName()); 1092 } 1093 1094 if (node.isRemoved() && node.isQuorumAck()) { 1095 throw new MemberNotFoundException("Node:" + nodeName + 1096 "is not currently a member of " + 1097 "the group:" + group.getName() + 1098 " It had been removed."); 1099 } 1100 1101 /* Check if the node is the master itself. */ 1102 if (nodeName.equals(getNodeName())) { 1103 throw new MasterStateException(getRepImpl(). 1104 getStateChangeEvent()); 1105 } 1106 1107 return node; 1108 } 1109 1110 /** 1111 * Updates the cached group info for the node, avoiding a database read. 1112 * 1113 * @param updateNameIdPair the node whose localCBVLSN must be updated. 1114 * @param barrierState the new node syncup state 1115 */ updateGroupInfo(NameIdPair updateNameIdPair, RepGroupImpl.BarrierState barrierState)1116 public void updateGroupInfo(NameIdPair updateNameIdPair, 1117 RepGroupImpl.BarrierState barrierState) { 1118 1119 RepNodeImpl node = group.getMember(updateNameIdPair.getName()); 1120 if (node == null) { 1121 /* A subsequent refresh will get it, along with the new node. */ 1122 return; 1123 } 1124 1125 LoggerUtils.fine(logger, repImpl, 1126 "LocalCBVLSN for " + updateNameIdPair + 1127 " updated to " + barrierState + 1128 " from " + node.getBarrierState().getLastCBVLSN()); 1129 node.setBarrierState(barrierState); 1130 globalCBVLSN.recalculate(group); 1131 } 1132 1133 /** 1134 * Recalculate the Global CBVLSN, provoked by Replay, to ensure that the 1135 * replica's global CBVLSN is up to date. 1136 */ recalculateGlobalCBVLSN()1137 void recalculateGlobalCBVLSN() { 1138 globalCBVLSN.recalculate(group); 1139 } 1140 getCBVLSNTracker()1141 LocalCBVLSNTracker getCBVLSNTracker() { 1142 return cbvlsnTracker; 1143 } 1144 freezeLocalCBVLSN()1145 public void freezeLocalCBVLSN() { 1146 cbvlsnTracker.incrementFreezeCounter(); 1147 } 1148 unfreezeLocalCBVLSN()1149 public void unfreezeLocalCBVLSN() { 1150 cbvlsnTracker.decrementFreezeCounter(); 1151 } 1152 1153 /** 1154 * Finds a master node. 1155 * 1156 * @throws DatabaseException 1157 */ findMaster()1158 private void findMaster() 1159 throws DatabaseException { 1160 1161 refreshCachedGroup(); 1162 elections.startLearner(); 1163 LoggerUtils.info(logger, repImpl, "Current group size: " + 1164 group.getElectableGroupSize()); 1165 final RepNodeImpl thisNode = group.getNode(nameIdPair.getName()); 1166 if ((thisNode == null) && 1167 1168 /* 1169 * Secondary nodes are not stored in the group DB, so they will not 1170 * be found even though they are not new. Use group UUID to 1171 * distinguish -- it is only unknown if the node is new. 1172 */ 1173 (nodeType.isElectable() || group.hasUnknownUUID())) { 1174 1175 /* A new node */ 1176 LoggerUtils.info(logger, repImpl, "New node " + nameIdPair + 1177 " unknown to rep group"); 1178 Set<InetSocketAddress> helperSockets = repImpl.getHelperSockets(); 1179 1180 /* 1181 * Not present in the replication group. Use the helper, to get 1182 * to a master and enter the group. 1183 */ 1184 if ((group.getElectableGroupSize() == 0) && 1185 (helperSockets.size() == 1) && 1186 nodeType.isElectable() && 1187 serviceDispatcher.getSocketAddress(). 1188 equals(helperSockets.iterator().next())) { 1189 /* A startup situation, should this node become master. */ 1190 selfElect(); 1191 elections.updateRepGroup(group); 1192 return; 1193 } 1194 try { 1195 queryGroupForMembership(); 1196 } catch (InterruptedException e) { 1197 throw EnvironmentFailureException.unexpectedException(e); 1198 } 1199 } else if ((thisNode != null) && thisNode.isRemoved()) { 1200 throw EnvironmentFailureException.unexpectedState 1201 ("Node: " + nameIdPair.getName() + 1202 " was previously deleted."); 1203 } else { 1204 1205 /* An existing node */ 1206 LoggerUtils.info(logger, repImpl, 1207 "Existing node " + nameIdPair.getName() + 1208 " querying for a current master."); 1209 1210 /* 1211 * The group has other members, see if they know of a master, 1212 * along with any helpers that were also supplied. 1213 */ 1214 Set<InetSocketAddress> helperSockets = repImpl.getHelperSockets(); 1215 helperSockets.addAll(group.getAllHelperSockets()); 1216 elections.getLearner().queryForMaster(helperSockets); 1217 } 1218 } 1219 1220 /** 1221 * This method enforces the requirement that all addresses within a 1222 * replication group, must be loopback addresses or they must all be 1223 * non-local ip addresses. Mixing them means that the node with a loopback 1224 * address cannot be contacted by a different node. 1225 * 1226 * @param helperSockets the helper nodes used by this node when contacting 1227 * the master. 1228 */ checkLoopbackAddresses(Set<InetSocketAddress> helperSockets)1229 private void checkLoopbackAddresses(Set<InetSocketAddress> helperSockets) { 1230 1231 final InetAddress myAddress = mySocket.getAddress(); 1232 final boolean isLoopback= myAddress.isLoopbackAddress(); 1233 1234 for (InetSocketAddress socketAddress : helperSockets) { 1235 final InetAddress nodeAddress = socketAddress.getAddress(); 1236 1237 if (nodeAddress.isLoopbackAddress() == isLoopback) { 1238 continue; 1239 } 1240 String message = mySocket + 1241 " the address associated with this node, " + 1242 (isLoopback? "is " : "is not ") + "a loopback address." + 1243 " It conflicts with an existing use, by a different node " + 1244 " of the address:" + 1245 socketAddress + 1246 (!isLoopback ? " which is a loopback address." : 1247 " which is not a loopback address.") + 1248 " Such mixing of addresses within a group is not allowed, " + 1249 "since the nodes will not be able to communicate with " + 1250 "each other."; 1251 throw new IllegalArgumentException(message); 1252 } 1253 } 1254 1255 /** 1256 * Communicates with existing nodes in the group in order to figure out how 1257 * to start up, in the case where the local node does not appear to be in 1258 * the (local copy of the) GroupDB, typically because the node is starting 1259 * up with an empty env directory. It could be that this is a new node 1260 * (never before been part of the group). Or it could be a pre-existing 1261 * group member that has lost its env dir contents and wants to be restored 1262 * via a Network Restore operation. 1263 * <p> 1264 * We first try to find a currently running master node. (An authoritative 1265 * master can easily handle either of the above-mentioned situations.) If 1266 * we can't find a master, we look for other running nodes that may know of 1267 * us (by asking them for their Group information). 1268 * <p> 1269 * We query the designated helpers and all known learners. The helpers are 1270 * the ones that were identified via the node's configuration, while the 1271 * learners are the ones currently in the member database. We use both in 1272 * order to cast the widest possible net. 1273 * <p> 1274 * Returns normally when the master is found. 1275 * 1276 * @throws InterruptedException if the current thread is interrupted, 1277 * typically due to a shutdown 1278 * @throws InsufficientLogException if the environment requires a network 1279 * restore 1280 * @see #findRestoreSuppliers 1281 */ queryGroupForMembership()1282 private void queryGroupForMembership() 1283 throws InterruptedException { 1284 1285 Set<InetSocketAddress> helperSockets = repImpl.getHelperSockets(); 1286 1287 checkLoopbackAddresses(helperSockets); 1288 1289 /* 1290 * Not in the rep group. Use the designated helpers and other members 1291 * of the group to help us figure out how to get started. 1292 */ 1293 final Set<InetSocketAddress> helpers = 1294 new HashSet<InetSocketAddress>(helperSockets); 1295 helpers.addAll(group.getAllHelperSockets()); 1296 if (helpers.isEmpty()) { 1297 throw EnvironmentFailureException.unexpectedState 1298 ("Need a helper to add a new node into the group"); 1299 } 1300 1301 NameIdPair groupMasterNameId; 1302 while (true) { 1303 elections.getLearner().queryForMaster(helpers); 1304 groupMasterNameId = masterStatus.getGroupMasterNameId(); 1305 if (!groupMasterNameId.hasNullId()) { 1306 /* A new, or pre-query, group master. */ 1307 if (nameIdPair.hasNullId() && 1308 groupMasterNameId.getName().equals(nameIdPair.getName())) { 1309 /* 1310 * Residual obsolete information in replicas, ignore it. 1311 * Can't be master if we don't know our own id, but some 1312 * other node does! This state means that the node was a 1313 * master in the recent past, but has had its environment 1314 * deleted since that time. 1315 */ 1316 try { 1317 Thread.sleep(MASTER_QUERY_INTERVAL); 1318 } catch (InterruptedException e) { 1319 throw EnvironmentFailureException.unexpectedException(e); 1320 } 1321 continue; 1322 } 1323 1324 if (checkGroupMasterIsAlive(groupMasterNameId)) { 1325 /* Use the current group master if it's alive. */ 1326 break; 1327 } 1328 } 1329 1330 /* 1331 * If there's no master, or the last known master cannot be 1332 * reached, see if anyone thinks we're actually already in the 1333 * group, and could supply us with a Network Restore. (Remember, 1334 * we're here only if we didn't find ourselves in the local 1335 * GroupDB. So we could be in a group restore from backup 1336 * situation.) 1337 */ 1338 findRestoreSuppliers(helpers); 1339 1340 Thread.sleep(MASTER_QUERY_INTERVAL); 1341 } 1342 LoggerUtils.info(logger, repImpl, "New node " + nameIdPair.getName() + 1343 " located master: " + groupMasterNameId); 1344 } 1345 1346 /** 1347 * Check that the master found by querying other group nodes is indeed 1348 * alive and that we are not dealing with obsolete cached information. 1349 * 1350 * @return true if the master node could be contacted and was truly alive 1351 * 1352 * TODO: handle protocol version mismatch here and in DbPing, also 1353 * consolidate code so that a single copy is shared. 1354 */ checkGroupMasterIsAlive(NameIdPair groupMasterNameId)1355 private boolean checkGroupMasterIsAlive(NameIdPair groupMasterNameId) { 1356 1357 DataChannel channel = null; 1358 1359 try { 1360 final InetSocketAddress masterSocket = 1361 masterStatus.getGroupMaster(); 1362 1363 final BinaryNodeStateProtocol protocol = 1364 new BinaryNodeStateProtocol(NameIdPair.NOCHECK, null); 1365 1366 /* Build the connection. Set the parameter connectTimeout.*/ 1367 channel = repImpl.getChannelFactory(). 1368 connect(masterSocket, 1369 new ConnectOptions(). 1370 setTcpNoDelay(true). 1371 setOpenTimeout(5000). 1372 setReadTimeout(5000)); 1373 ServiceDispatcher.doServiceHandshake 1374 (channel, BinaryNodeStateService.SERVICE_NAME); 1375 1376 /* Send a NodeState request to the node. */ 1377 protocol.write 1378 (protocol.new BinaryNodeStateRequest(groupMasterNameId.getName(), 1379 group.getName()), 1380 channel); 1381 1382 /* Get the response and return the NodeState. */ 1383 BinaryNodeStateResponse response = 1384 protocol.read(channel, BinaryNodeStateResponse.class); 1385 1386 ReplicatedEnvironment.State state = response.getNodeState(); 1387 return (state != null) && state.isMaster(); 1388 } catch (Exception e) { 1389 LoggerUtils.info(logger, repImpl, 1390 "Queried master:" + groupMasterNameId + 1391 " unavailable. Reason:" + e); 1392 return false; 1393 } finally { 1394 if (channel != null) { 1395 try { 1396 channel.close(); 1397 } catch (IOException ioe) { 1398 /* Ignore it */ 1399 } 1400 } 1401 } 1402 } 1403 1404 /** 1405 * Sets up a Network Restore, as part of the process of restoring an entire 1406 * group from backup, by producing an appropriate {@code 1407 * InsufficientLogException} if possible. 1408 * <p> 1409 * Queries each of the supplied helper hosts for their notion of the group 1410 * make-up. If any of them consider us to be already in the group, then 1411 * instead of joining the group as a new node we ought to try a Network 1412 * Restore; and the node(s) that do already know of us are the suitable 1413 * suppliers for it. 1414 * 1415 * @throws InsufficientLogException in the successful case, if one or more 1416 * suitable suppliers for a Network Restore can be found; otherwise just 1417 * returns. 1418 */ findRestoreSuppliers(Set<InetSocketAddress> helpers)1419 public void findRestoreSuppliers(Set<InetSocketAddress> helpers) { 1420 final Set<ReplicationNode> suppliers = new HashSet<ReplicationNode>(); 1421 RepGroupProtocol protocol = 1422 new RepGroupProtocol(group.getName(), nameIdPair, repImpl, 1423 repImpl.getChannelFactory()); 1424 1425 for (InetSocketAddress helper : helpers) { 1426 MessageExchange msg = 1427 protocol.new MessageExchange(helper, 1428 GroupService.SERVICE_NAME, 1429 protocol.new GroupRequest()); 1430 1431 /* 1432 * Just as we did in the queryForMaster() case, quietly ignore any 1433 * unsurprising response error or socket exceptions; we'll retry 1434 * later if we end up not finding any Network Restore suppliers. 1435 */ 1436 msg.run(); 1437 ResponseMessage response = msg.getResponseMessage(); 1438 if (response == null || 1439 protocol.RGFAIL_RESP.equals(response.getOp())) { 1440 continue; 1441 } else if (!protocol.GROUP_RESP.equals(response.getOp())) { 1442 LoggerUtils.warning(logger, repImpl, 1443 "Expected GROUP_RESP, got " + 1444 response.getOp() + ": " + response); 1445 continue; 1446 } 1447 GroupResponse groupResp = (GroupResponse) response; 1448 1449 /* 1450 * If the response from the remote node shows that I am already a 1451 * member of the group, add the node to the list of nodes that will 1452 * serve the Network Restore. 1453 */ 1454 RepGroupImpl groupInfo = groupResp.getGroup(); 1455 RepNodeImpl me = groupInfo.getNode(nameIdPair.getName()); 1456 if (me == null || me.isRemoved() || !me.isQuorumAck()) { 1457 continue; 1458 } 1459 1460 ReplicationNode supplier = groupInfo.getMember(helper); 1461 if (supplier != null) { 1462 suppliers.add(supplier); 1463 } 1464 } 1465 1466 if (suppliers.isEmpty()) { 1467 return; 1468 } 1469 1470 throw new InsufficientLogException(this, VLSN.NULL_VLSN, suppliers); 1471 } 1472 1473 /** 1474 * Elects this node as the master. The operation is only valid when the 1475 * group consists of just this node, and when this is an ELECTABLE node. 1476 * 1477 * @throws DatabaseException 1478 * @throws IllegalStateException if the node type is not ELECTABLE 1479 */ selfElect()1480 private void selfElect() 1481 throws DatabaseException { 1482 1483 if (!nodeType.isElectable()) { 1484 throw new IllegalStateException( 1485 "Cannot elect node " + nameIdPair.getName() + 1486 " as master because its node type, " + nodeType + 1487 ", is not ELECTABLE"); 1488 } 1489 nameIdPair.setId(RepGroupImpl.getFirstNodeId()); 1490 1491 /* Master by default of a nascent group. */ 1492 Proposal proposal = new TimebasedProposalGenerator().nextProposal(); 1493 elections.getLearner().processResult(proposal, 1494 suggestionGenerator.get(proposal)); 1495 LoggerUtils.info(logger, repImpl, "Nascent group. " + 1496 nameIdPair.getName() + 1497 " is master by virtue of being the first node."); 1498 masterStatus.sync(); 1499 nodeState.changeAndNotify(MASTER, masterStatus.getNodeMasterNameId()); 1500 repImpl.getVLSNIndex().initAsMaster(); 1501 repGroupDB.addFirstNode(); 1502 refreshCachedGroup(); 1503 /* Unsync so that the run loop does not call for an election. */ 1504 masterStatus.unSync(); 1505 } 1506 1507 /** 1508 * Establishes this node as the master, after re-initializing the group 1509 * with this as the sole node in the group. This method is used solely 1510 * as part of the DbResetRepGroup utility. 1511 * 1512 * @throws IllegalStateException if the node type is not ELECTABLE 1513 */ reinitSelfElect()1514 private void reinitSelfElect() { 1515 if (nodeType.isSecondary()) { 1516 throw new IllegalStateException( 1517 "Cannot elect node " + nameIdPair.getName() + 1518 " as master because its node type, " + nodeType + 1519 ", is not ELECTABLE"); 1520 } 1521 1522 /* Establish an empty group so transaction commits can proceed. */ 1523 group = repGroupDB.emptyGroup; 1524 LoggerUtils.info(logger, repImpl, "Reinitializing group to node " + 1525 nameIdPair); 1526 1527 /* 1528 * Unilaterally transition the nodeState to Master, so that write 1529 * transactions needed to reset the group and establish this node can 1530 * be issued against the environment. 1531 */ 1532 nodeState.changeAndNotify(MASTER, masterStatus.getNodeMasterNameId()); 1533 repImpl.getVLSNIndex().initAsMaster(); 1534 1535 /* 1536 * Start using new log files. The file ensures that we can safely 1537 * truncate the past VLSNs. 1538 */ 1539 repImpl.forceLogFileFlip(); 1540 1541 CheckpointConfig ckptConfig = new CheckpointConfig(); 1542 ckptConfig.setForce(true); 1543 1544 /* 1545 * The checkpoint ensures that we do not have to replay VLSNs from the 1546 * prior group and that we have a complete VLSN index on disk. 1547 */ 1548 repImpl.getCheckpointer().doCheckpoint(ckptConfig, 1549 "Reinit of RepGroup"); 1550 VLSN lastOldVLSN = repImpl.getVLSNIndex().getRange().getLast(); 1551 1552 /* Now create the new rep group on disk. */ 1553 repGroupDB.reinitFirstNode(lastOldVLSN); 1554 refreshCachedGroup(); 1555 1556 long lastOldFile = 1557 repImpl.getVLSNIndex().getLTEFileNumber(lastOldVLSN); 1558 1559 /* 1560 * Discard the VLSN index covering the pre group reset VLSNS, to ensure 1561 * that the pre reset part of the log is never replayed. We don't want 1562 * to replay this part of the log, since it contains references to 1563 * repnodes via node ids that are no longer part of the reset rep 1564 * group. Note that we do not reuse rep node ids, that is, rep node id 1565 * sequence continues across the reset operation and is not itself 1566 * reset. Nodes joining the new group will need to do a network restore 1567 * when they join the group. 1568 * 1569 * Don't perform the truncation if RESET_REP_GROUP_RETAIN_UUID is true. 1570 * In that case, we are only removing the rep group members, but 1571 * retaining the remaining information, because we will be restarting 1572 * the rep group in place with an old secondary acting as an electable 1573 * node. 1574 */ 1575 final boolean retainUUID = 1576 getConfigManager().getBoolean(RESET_REP_GROUP_RETAIN_UUID); 1577 if (!retainUUID) { 1578 repImpl.getVLSNIndex().truncateFromHead(lastOldVLSN, lastOldFile); 1579 } 1580 1581 elections.startLearner(); 1582 /* Unsync so that the run loop does not call for an election. */ 1583 masterStatus.unSync(); 1584 } 1585 1586 /** 1587 * The top level Master/Feeder or Replica loop in support of replication. 1588 * It's responsible for driving the node level state changes resulting 1589 * from elections initiated either by this node, or by other members of the 1590 * group. 1591 * <p> 1592 * The thread is terminated via an orderly shutdown initiated as a result 1593 * of an interrupt issued by the shutdown() method. Any exception that is 1594 * not handled by the run method itself is caught by the thread's uncaught 1595 * exception handler, and results in the RepImpl being made invalid. In 1596 * that case, the application is responsible for closing the Replicated 1597 * Environment, which will provoke the shutdown. 1598 * <p> 1599 * Note: This method currently runs either the feeder loop or the replica 1600 * loop. With R to R support, it would be possible for a Replica to run 1601 * both. This will be a future feature. 1602 */ 1603 @Override run()1604 public void run() { 1605 /* Set to indicate an error-initiated shutdown. */ 1606 Error repNodeError = null; 1607 try { 1608 LoggerUtils.info(logger, repImpl, 1609 "Node " + nameIdPair.getName() + " started" + 1610 (!nodeType.isElectable() ? 1611 " as " + nodeType : 1612 "")); 1613 while (!isShutdownOrInvalid()) { 1614 if (nodeState.getRepEnvState() != UNKNOWN) { 1615 /* Avoid unnecessary state changes. */ 1616 nodeState.changeAndNotify(UNKNOWN, NameIdPair.NULL); 1617 } 1618 1619 /* 1620 * Initiate elections if we don't have a group master, or there 1621 * is a master, but we were unable to use it. 1622 */ 1623 if (masterStatus.getGroupMasterNameId().hasNullId() || 1624 masterStatus.inSync()) { 1625 1626 /* 1627 * But we can't if we don't have our own node ID yet or if 1628 * we are not ELECTABLE. 1629 */ 1630 if (nameIdPair.hasNullId() || !nodeType.isElectable()) { 1631 queryGroupForMembership(); 1632 } else { 1633 elections.initiateElection(group, electionQuorumPolicy); 1634 1635 /* 1636 * Subsequent elections must always use a simple 1637 * majority. 1638 */ 1639 electionQuorumPolicy = QuorumPolicy.SIMPLE_MAJORITY; 1640 } 1641 /* In case elections were shut down. */ 1642 if (isShutdownOrInvalid()) { 1643 return; 1644 } 1645 } 1646 1647 /* Start syncing this node to the new group master */ 1648 masterStatus.sync(); 1649 1650 if (masterStatus.isNodeMaster()) { 1651 repImpl.getVLSNIndex().initAsMaster(); 1652 replica.masterTransitionCleanup(); 1653 1654 /* Master is ready for business. */ 1655 nodeState.changeAndNotify 1656 (MASTER, masterStatus.getNodeMasterNameId()); 1657 1658 /* 1659 * Update the JE version information stored for the master 1660 * in the RepGroupDB, if needed. 1661 */ 1662 maybeUpdateMasterJEVersion(); 1663 1664 feederManager.runFeeders(); 1665 1666 /* 1667 * At this point, the feeder manager has been shutdown. 1668 * Re-initialize the VLSNIndex put latch mechanism, which 1669 * is present on masters to maintain a tip cache of the 1670 * last record on the replication stream, and by all 1671 * nodes when doing checkpoint vlsn consistency waiting. 1672 * Create a new feeder manager, should this node become a 1673 * master later on. 1674 * Set the node to UNKNOWN state right away, because the 1675 * MasterTxn will use node state to prevent the advent of 1676 * any replicated writes. Once the VLSNIndex is 1677 * initialized for replica state, the node will NPE if it 1678 * attempts execute replicated writes. 1679 */ 1680 nodeState.changeAndNotify(UNKNOWN, NameIdPair.NULL); 1681 repImpl.getVLSNIndex().initAsReplica(); 1682 assert runConvertHooks(); 1683 feederManager = new FeederManager(this); 1684 } else { 1685 /* 1686 * Replica will notify us when connection is successfully 1687 * made, and Feeder handshake done, at which point we'll 1688 * update nodeState. 1689 */ 1690 replica.replicaTransitionCleanup(); 1691 replica.runReplicaLoop(); 1692 } 1693 } 1694 } catch (InterruptedException e) { 1695 LoggerUtils.fine(logger, repImpl, 1696 "RepNode main thread interrupted - " + 1697 " forced shutdown."); 1698 } catch (GroupShutdownException e) { 1699 saveShutdownException(e); 1700 LoggerUtils.fine(logger, repImpl, 1701 "RepNode main thread sees group shutdown - " + e); 1702 } catch (InsufficientLogException e) { 1703 saveShutdownException(e); 1704 } catch (RuntimeException e) { 1705 LoggerUtils.fine(logger, repImpl, 1706 "RepNode main thread sees runtime ex - " + e); 1707 saveShutdownException(e); 1708 throw e; 1709 } catch (Error e) { 1710 LoggerUtils.fine(logger, repImpl, e + 1711 " incurred during repnode loop"); 1712 repNodeError = e; 1713 repImpl.invalidate(e); 1714 } finally { 1715 try { 1716 LoggerUtils.info(logger, repImpl, 1717 "RepNode main thread shutting down."); 1718 1719 if (repNodeError != null) { 1720 LoggerUtils.info(logger, repImpl, 1721 "Node state at shutdown:\n"+ 1722 repImpl.dumpState()); 1723 throw repNodeError; 1724 } 1725 Throwable exception = getSavedShutdownException(); 1726 1727 if (exception == null) { 1728 LoggerUtils.fine(logger, repImpl, 1729 "Node state at shutdown:\n"+ 1730 repImpl.dumpState()); 1731 } else { 1732 LoggerUtils.info(logger, repImpl, 1733 "RepNode shutdown exception:\n" + 1734 exception.getMessage() + 1735 repImpl.dumpState()); 1736 } 1737 1738 try { 1739 shutdown(); 1740 } catch (DatabaseException e) { 1741 RepUtils.chainExceptionCause(e, exception); 1742 LoggerUtils.severe(logger, repImpl, 1743 "Unexpected exception during shutdown" + 1744 e); 1745 throw e; 1746 } 1747 } catch (InterruptedException e1) { 1748 // Ignore exceptions on exit 1749 } 1750 nodeState.changeAndNotify(DETACHED, NameIdPair.NULL); 1751 cleanup(); 1752 } 1753 } 1754 1755 /** 1756 * Update the information stored for the master in the RepGroupDB if 1757 * storing it is supported and the current version is different from the 1758 * recorded version. 1759 */ maybeUpdateMasterJEVersion()1760 private void maybeUpdateMasterJEVersion() { 1761 1762 /* Check if storing JE version information is supported */ 1763 if (group.getFormatVersion() < RepGroupImpl.FORMAT_VERSION_3) { 1764 return; 1765 } 1766 1767 final JEVersion currentJEVersion = repImpl.getCurrentJEVersion(); 1768 final RepNodeImpl node = group.getMember(nameIdPair.getName()); 1769 1770 if (currentJEVersion.equals(node.getJEVersion())) { 1771 return; 1772 } 1773 node.updateJEVersion(currentJEVersion); 1774 repGroupDB.updateMember(node, false); 1775 } 1776 notifyReplicaConnected()1777 void notifyReplicaConnected() { 1778 nodeState.changeAndNotify(REPLICA, masterStatus.getNodeMasterNameId()); 1779 } 1780 1781 /** 1782 * Returns true if the node has been shutdown or if the underlying 1783 * environment has been invalidated. It's used as the basis for exiting 1784 * the FeederManager or the Replica. 1785 */ isShutdownOrInvalid()1786 boolean isShutdownOrInvalid() { 1787 if (isShutdown()) { 1788 return true; 1789 } 1790 if (getRepImpl().wasInvalidated()) { 1791 saveShutdownException(getRepImpl().getInvalidatingException()); 1792 return true; 1793 } 1794 return false; 1795 } 1796 1797 /** 1798 * Used to shutdown all activity associated with this replication stream. 1799 * If method is invoked from different thread of control, it will wait 1800 * until the rep node thread exits. If it's from the same thread, it's the 1801 * caller's responsibility to exit the thread upon return from this method. 1802 * 1803 * @throws InterruptedException 1804 * @throws DatabaseException 1805 */ shutdown()1806 public void shutdown() 1807 throws InterruptedException, DatabaseException { 1808 1809 if (shutdownDone()) { 1810 return; 1811 } 1812 1813 LoggerUtils.info(logger, repImpl, "Shutting down node " + nameIdPair); 1814 1815 /* Fire a LeaveGroup if this RepNode is valid. */ 1816 if (repImpl.isValid()) { 1817 monitorEventManager.notifyLeaveGroup(getLeaveReason()); 1818 } 1819 1820 /* Stop accepting any new network requests. */ 1821 serviceDispatcher.preShutdown(); 1822 1823 if (elections != null) { 1824 elections.shutdown(); 1825 } 1826 1827 /* Initiate the FeederManger soft shutdown if it's active. */ 1828 feederManager.shutdownQueue(); 1829 1830 if ((getReplicaCloseCatchupMs() >= 0) && 1831 (nodeState.getRepEnvState().isMaster())) { 1832 1833 /* 1834 * A group shutdown. Shutting down the queue will cause the 1835 * FeederManager to shutdown its feeders and exit. 1836 */ 1837 this.join(); 1838 } 1839 1840 /* Shutdown the replica, if it's active. */ 1841 replica.shutdown(); 1842 1843 shutdownThread(logger); 1844 1845 LoggerUtils.info(logger, repImpl, 1846 "RepNode main thread: " + this.getName() + " exited."); 1847 /* Shut down all other services. */ 1848 utilityServicesShutdown(); 1849 1850 /* Shutdown all the services before shutting down the dispatcher. */ 1851 MasterTransfer mt = getActiveTransfer(); 1852 if (mt != null) { 1853 Exception ex = getSavedShutdownException(); 1854 if (ex == null) { 1855 ex = new MasterTransferFailureException("shutting down"); 1856 } 1857 mt.abort(ex); 1858 } 1859 serviceDispatcher.shutdown(); 1860 LoggerUtils.info(logger, repImpl, 1861 nameIdPair + " shutdown completed."); 1862 masterStatus.setGroupMaster(null, NameIdPair.NULL); 1863 readyLatch.releaseAwait(getSavedShutdownException()); 1864 1865 /* Cancel the TimerTasks. */ 1866 channelTimeoutTask.cancel(); 1867 if (logFlusher != null) { 1868 logFlusher.cancelTask(); 1869 } 1870 timer.cancel(); 1871 } 1872 1873 1874 /** 1875 * Soft shutdown for the RepNode thread. Note that since the thread is 1876 * shared by the FeederManager and the Replica, the FeederManager or 1877 * Replica specific soft shutdown actions should already have been done 1878 * earlier. 1879 */ 1880 @Override initiateSoftShutdown()1881 protected int initiateSoftShutdown() { 1882 return getThreadWaitInterval(); 1883 } 1884 1885 /* Get the shut down reason for this node. */ getLeaveReason()1886 private LeaveReason getLeaveReason() { 1887 LeaveReason reason = null; 1888 1889 Exception exception = getSavedShutdownException(); 1890 if (exception == null) { 1891 reason = LeaveReason.NORMAL_SHUTDOWN; 1892 } else if (exception instanceof GroupShutdownException) { 1893 reason = LeaveReason.MASTER_SHUTDOWN_GROUP; 1894 } else { 1895 reason = LeaveReason.ABNORMAL_TERMINATION; 1896 } 1897 1898 return reason; 1899 } 1900 utilityServicesShutdown()1901 private void utilityServicesShutdown() { 1902 if (ldiff != null) { 1903 ldiff.shutdown(); 1904 } 1905 1906 if (logFeederManager != null) { 1907 logFeederManager.shutdown(); 1908 } 1909 1910 if (binaryNodeStateService != null) { 1911 binaryNodeStateService.shutdown(); 1912 } 1913 1914 if (nodeStateService != null) { 1915 serviceDispatcher.cancel(NodeStateService.SERVICE_NAME); 1916 } 1917 1918 if (groupService != null) { 1919 serviceDispatcher.cancel(GroupService.SERVICE_NAME); 1920 } 1921 } 1922 1923 /** 1924 * Must be invoked on the Master via the last open handle. 1925 * 1926 * Note that the method itself does not shutdown the group. It merely 1927 * sets replicaCloseCatchupMs, indicating that the ensuing handle close 1928 * should shutdown the Replicas. The actual coordination with the closing 1929 * of the handle is implemented by ReplicatedEnvironment.shutdownGroup(). 1930 * 1931 * @see ReplicatedEnvironment#shutdownGroup(long, TimeUnit) 1932 */ shutdownGroupOnClose(long timeoutMs)1933 public void shutdownGroupOnClose(long timeoutMs) 1934 throws IllegalStateException { 1935 1936 if (!nodeState.getRepEnvState().isMaster()) { 1937 throw new IllegalStateException 1938 ("Node state must be " + MASTER + 1939 ", not " + nodeState.getRepEnvState()); 1940 } 1941 replicaCloseCatchupMs = (timeoutMs < 0) ? 0 : timeoutMs; 1942 } 1943 1944 /** 1945 * JoinGroup ensures that a RepNode is actively participating in a 1946 * replication group. It's invoked each time a replicated environment 1947 * handle is created. 1948 * 1949 * If the node is already participating in a replication group, because 1950 * it's not the first handle to the environment, it will return without 1951 * having to wait. Otherwise it will wait until a master is elected and 1952 * this node is active, either as a Master, or as a Replica. 1953 * 1954 * If the node joins as a replica, it will wait further until it has become 1955 * sufficiently consistent as defined by its consistency argument. By 1956 * default it uses PointConsistencyPolicy to ensure that it is at least as 1957 * consistent as the master as of the time the handle was opened. 1958 * 1959 * A node can also join in the Unknown state if it has been configured to 1960 * do so via ENV_UNKNOWN_STATE_TIMEOUT. 1961 * 1962 * @throws UnknownMasterException If a master cannot be established within 1963 * ENV_SETUP_TIMEOUT, unless ENV_UNKNOWN_STATE_TIMEOUT has 1964 * been set to allow the creation of a handle while in the UNKNOWN state. 1965 * 1966 * @return MASTER, REPLICA, or UNKNOWN (if ENV_UNKNOWN_STATE_TIMEOUT 1967 * is set) 1968 */ 1969 public ReplicatedEnvironment.State joinGroup(ReplicaConsistencyPolicy consistency, QuorumPolicy initialElectionPolicy)1970 joinGroup(ReplicaConsistencyPolicy consistency, 1971 QuorumPolicy initialElectionPolicy) 1972 throws ReplicaConsistencyException, DatabaseException { 1973 1974 final JoinGroupTimeouts timeouts = 1975 new JoinGroupTimeouts(getConfigManager()); 1976 1977 startup(initialElectionPolicy); 1978 LoggerUtils.finest(logger, repImpl, "joinGroup " + 1979 nodeState.getRepEnvState()); 1980 1981 DatabaseException exitException = null; 1982 int retries=0; 1983 repImpl.getStartupTracker().start(Phase.BECOME_CONSISTENT); 1984 repImpl.getStartupTracker().setProgress 1985 (RecoveryProgress.BECOME_CONSISTENT); 1986 try { 1987 for (retries=0; retries < JOIN_RETRIES; retries++ ) { 1988 try { 1989 /* Wait for Feeder/Replica to be fully initialized. */ 1990 boolean done = getReadyLatch().awaitOrException 1991 (timeouts.getTimeout(), TimeUnit.MILLISECONDS); 1992 1993 /* 1994 * Save the state, and use it from this point forward, 1995 * since the node's state may change again. 1996 */ 1997 final ReplicatedEnvironment.State finalState = 1998 nodeState.getRepEnvState(); 1999 if (!done) { 2000 /* An election or setup, timeout. */ 2001 if (finalState.isReplica()) { 2002 if (timeouts.timeoutIsForUnknownState()) { 2003 /* 2004 * Replica syncing up; move onwards to the 2005 * setup timeout and continue with the syncup. 2006 */ 2007 timeouts.setSetupTimeout(); 2008 continue; 2009 } 2010 throw new ReplicaConsistencyException 2011 (String.format("Setup time exceeded %,d ms", 2012 timeouts.getSetupTimeout()), 2013 null); 2014 } 2015 2016 if (finalState.isUnknown() && 2017 timeouts.timeoutIsForUnknownState()) { 2018 return UNKNOWN; 2019 } 2020 break; 2021 } 2022 2023 switch (finalState) { 2024 case UNKNOWN: 2025 2026 /* 2027 * State flipped between release of ready latch and 2028 * nodeState.getRepEnvState() above; retry for a 2029 * Master/Replica state. 2030 */ 2031 continue; 2032 2033 case REPLICA: 2034 joinAsReplica(consistency); 2035 break; 2036 2037 case MASTER: 2038 LoggerUtils.info(logger, repImpl, 2039 "Joining group as master"); 2040 break; 2041 2042 case DETACHED: 2043 throw EnvironmentFailureException. 2044 unexpectedState("Node in DETACHED state " + 2045 "while joining group."); 2046 } 2047 2048 return finalState; 2049 } catch (InterruptedException e) { 2050 throw EnvironmentFailureException.unexpectedException(e); 2051 } catch (MasterStateException e) { 2052 /* Transition to master while establishing consistency. */ 2053 LoggerUtils.warning(logger, repImpl, 2054 "Join retry due to master transition: " 2055 + e.getMessage()); 2056 continue; 2057 } catch (RestartRequiredException e) { 2058 LoggerUtils.warning(logger, repImpl, 2059 "Environment needs to be restarted: " + 2060 e.getMessage()); 2061 throw e; 2062 } catch (DatabaseException e) { 2063 Throwable cause = e.getCause(); 2064 if ((cause != null) && 2065 (cause.getClass() == 2066 Replica.ConnectRetryException.class)) { 2067 2068 /* 2069 * The master may have changed. Retry if there is time 2070 * left to do so. It may result in a new master. 2071 */ 2072 exitException = e; 2073 if (timeouts.getTimeout() > 0) { 2074 LoggerUtils.warning(logger, repImpl, 2075 "Join retry due to exception: " 2076 + cause.getMessage()); 2077 continue; 2078 } 2079 } 2080 throw e; 2081 } 2082 } 2083 } finally { 2084 repImpl.getStartupTracker().stop(Phase.BECOME_CONSISTENT); 2085 } 2086 2087 /* Timed out or exceeded retries. */ 2088 if (exitException != null) { 2089 LoggerUtils.warning(logger, repImpl, "Exiting joinGroup after " + 2090 retries + " retries." + exitException); 2091 throw exitException; 2092 } 2093 throw new UnknownMasterException(null, repImpl.getStateChangeEvent()); 2094 } 2095 2096 /** 2097 * Join the group as a Replica ensuring that the node is sufficiently 2098 * consistent as defined by its consistency policy. 2099 * 2100 * @param consistency the consistency policy to use when joining initially 2101 */ joinAsReplica(ReplicaConsistencyPolicy consistency)2102 private void joinAsReplica(ReplicaConsistencyPolicy consistency) 2103 throws InterruptedException { 2104 2105 if (consistency == null) { 2106 final int consistencyTimeout = 2107 getConfigManager().getDuration(ENV_CONSISTENCY_TIMEOUT); 2108 consistency = new PointConsistencyPolicy 2109 (new VLSN(replica.getMasterTxnEndVLSN()), 2110 consistencyTimeout, TimeUnit.MILLISECONDS); 2111 } 2112 2113 /* 2114 * Wait for the replica to become sufficiently consistent. 2115 */ 2116 consistency.ensureConsistency(repImpl); 2117 2118 /* 2119 * Flush changes to the file system. The flush ensures in particular 2120 * that any member database updates defining this node itself are not 2121 * lost in case of a process crash. See SR 20607. 2122 */ 2123 repImpl.getLogManager().flushNoSync(); 2124 2125 LoggerUtils.info(logger, repImpl, "Joined group as a replica. " + 2126 " join consistencyPolicy=" + consistency + 2127 " " + repImpl.getVLSNIndex().getRange()); 2128 } 2129 2130 /** 2131 * Should be called whenever a new VLSN is associated with a log entry 2132 * suitable for Replica/Feeder syncup. 2133 */ trackSyncableVLSN(VLSN syncableVLSN, long lsn)2134 public void trackSyncableVLSN(VLSN syncableVLSN, long lsn) { 2135 cbvlsnTracker.track(syncableVLSN, lsn); 2136 } 2137 2138 /** 2139 * Returns the group wide CBVLSN. The group CBVLSN is computed as the 2140 * minimum of CBVLSNs after discarding CBVLSNs that are obsolete. A CBVLSN 2141 * is considered obsolete, if it has not been updated within a configurable 2142 * time interval relative to the time that the most recent CBVLSN was 2143 * updated. Also considers VLSNs protected by REPLAY_COST_PERCENT. 2144 * 2145 * <p>May return NULL_VLSN. 2146 */ getGroupCBVLSN()2147 public VLSN getGroupCBVLSN() { 2148 return VLSN.min(globalCBVLSN.getCBVLSN(), replayCostMinVLSN); 2149 } 2150 2151 /** 2152 * Marks the start of the search for a matchpoint that happens during a 2153 * syncup. The globalCBVLSN can't be changed when a syncup is in 2154 * progress. A feeder may have multiple syncups in action. The caller 2155 * should call {@link #syncupEnded} when the syncup is done. 2156 */ syncupStarted()2157 public void syncupStarted() { 2158 globalCBVLSN.syncupStarted(); 2159 } 2160 syncupEnded()2161 public void syncupEnded() { 2162 globalCBVLSN.syncupEnded(); 2163 } 2164 2165 /** 2166 * Returns the file number that forms a barrier for the cleaner's file 2167 * deletion activities. Files with numbers >= this file number cannot be 2168 * removed by the cleaner without disrupting the replication stream. 2169 * 2170 * @return the file number that's the barrier for cleaner file deletion 2171 * 2172 * @throws DatabaseException 2173 */ getCleanerBarrierFile()2174 public long getCleanerBarrierFile() 2175 throws DatabaseException { 2176 2177 /* 2178 * It turns out that this method is only called in contexts that 2179 * specify an active syncup is underway. Neither the global CBVLSN or 2180 * the VLSNIndex can change during an active syncup, so being called 2181 * during an active syncup means we can depend on the global CBVLSN 2182 * having an entry in the VLSNIndex. The check below that there are 2183 * active syncups underway doesn't guarantee that the syncup will 2184 * remain in effect during the whole call, but is a good indication 2185 * that we can depend on the global CBVLSN having an entry in the 2186 * VLSNIndex, as needed later in this method. 2187 */ 2188 if (globalCBVLSN.getActiveSyncups() <= 0) { 2189 throw new IllegalStateException( 2190 "getCleanerBarrierFile should only be called during" + 2191 " an active syncup"); 2192 } 2193 2194 /* 2195 * Take the minimum of GlobalCBVLSN, replayCostMinVLSN, and 2196 * SyncCleanerBarrier 2197 */ 2198 final VLSN globalCBVLSNValue = globalCBVLSN.getCBVLSN(); 2199 final VLSN cbvlsn = VLSN.min(globalCBVLSNValue, replayCostMinVLSN); 2200 2201 if (logger.isLoggable(Level.FINE)) { 2202 LoggerUtils.fine( 2203 logger, repImpl, 2204 "Computing getCleanerBarrierFile:" + 2205 " GlobalCBVLSN=" + globalCBVLSNValue + 2206 " replayCostMinVLSN=" + replayCostMinVLSN + 2207 " CBVLSN=" + cbvlsn); 2208 } 2209 2210 /* The GlobalCBVLSN can be null, putting it outside the VLSN index */ 2211 if (cbvlsn.isNull()) { 2212 return 0; 2213 } 2214 return repImpl.getVLSNIndex().getLTEFileNumber(cbvlsn); 2215 } 2216 2217 /** 2218 * Protects additional files from deletion by returning a subset of the 2219 * specified unprotected files. 2220 * 2221 * <p>The unprotectedFiles parameter provides a collection of files that 2222 * the caller believes are safe to delete. These files have had their live 2223 * data migrated by the cleaner, and they are not in use by DbBackup or 2224 * DataSync. This method decides which of the unprotected files should be 2225 * removed from the return value to protect them from deletion so that they 2226 * can be used to stream replication data to replay at replicas. 2227 * 2228 * <p>The diagram below represents log files organized by the time they 2229 * were created, with the oldest file at the left and the newest at the 2230 * right. 2231 * 2232 * <pre> 2233 * 2234 * <-- Older Newer --> 2235 * 2236 * All files: 2237 * 2238 * Region 1 | Region 2 | Region 3 | Region 4 2239 * ------------A------------X------------B------------C------------ 2240 * | | | | 2241 * Start of CBVLSN Global Last 2242 * VLSN Index | CBVLSN VLSN 2243 * | | 2244 * From unprotected files: | | 2245 * | | 2246 * Truncated files | Retained files | Barren files 2247 * 2248 * </pre> 2249 * 2250 * <p>Note that the unprotected files represent a subset of the files in 2251 * each region of the diagram. 2252 * 2253 * <p>Files in region 1 are files that are not represented in the VLSN 2254 * Index. The mappings between LSNs and VLSNs maintained by the VLSN Index 2255 * are needed to support replays, so these files cannot be used for 2256 * replication. 2257 * 2258 * <p>Region 1 files consist primarily of live files containing data needed 2259 * for recovery, typically with a high enough percentage of live data that 2260 * the cleaner does not consider them worth cleaning. There are usually 2261 * gaps in the sequence of files in this region because some of these older 2262 * files have been removed after being cleaned. 2263 * 2264 * <p>Note that entries are removed from the VLSN Index before the 2265 * associated log file files are deleted. If log files are found to be in 2266 * use by DbBackup, or the system crashes before the files can be deleted, 2267 * it is possible for cleaned log files to appear in region 1 that do not 2268 * have associated mappings in the VLSN index. That is the reason why it 2269 * is possible for this region to contain some unprotected files. 2270 * 2271 * <p>Files in region 2 are files that are represented in the VLSN index 2272 * and so can be used for replication. Because these files only contain 2273 * VLSNs before the Global CBVLSN, they are not known to be needed to 2274 * support replication for electable replicas that have been in contact 2275 * with the master within the time period represented by the 2276 * REP_STREAM_TIMEOUT parameter, or by any replica currently performing 2277 * replication, but they might still be useful to secondary nodes that are 2278 * out of contact or electable nodes that have been out of contact for 2279 * longer than REP_STREAM_TIMEOUT. 2280 * 2281 * <p>The system uses the values of the REPLAY_COST_PERCENT and 2282 * REPLAY_FREE_DISK_PERCENT parameters to determine if it is worthwhile to 2283 * retain cleaned files in region 2, while also satisfying the requested 2284 * free disk space target. The system needs to retain a contiguous subset 2285 * of the files at the upper end of this range in order for them to be 2286 * useful for replication, since replication requires an uninterrupted 2287 * sequence of VLSNs. The system will select a VLSN between points A and B 2288 * as point X, which represents the lowest VLSN for files that should be 2289 * protected. 2290 * 2291 * <p>Files in region 3 are files that are known to be needed for 2292 * replication by electable nodes, so otherwise unprotected files in this 2293 * range are retained for that reason. 2294 * 2295 * <p>Files in region 4 are files that do not contain any VLSNs. These 2296 * files include log entries to represent changes to the structure of the 2297 * Btree, but that are not needed for replication. Cleaned files in this 2298 * region are candidates for deletion because they are not needed for 2299 * either replication or network restore. 2300 * 2301 * <p>The unprotected files passed to the getUnprotectedFileSet consist of 2302 * cleaned files that appear in the four regions described above. 2303 * 2304 * <p>The unprotected files in region 1 will always be removed, and are 2305 * called "truncated files". 2306 * 2307 * <p>Unprotected files in region 2 below point X, the final CBVLSN value, 2308 * will be removed, and are also called "truncated files". 2309 * 2310 * <p>Unprotected files in region 3 will be retained because they are known 2311 * to be needed for replication. 2312 * 2313 * <p>Unprotected files in region 4 will always be removed because they are 2314 * not needed for replication or replay. These files are called "barren 2315 * files". 2316 * 2317 * <p>Note that the Global CBVLSN increases over time, which moves files 2318 * from region 3 into region 2, where they can be evaluated for retention 2319 * based on replay cost. The minimum VLSN of the files protected by replay 2320 * cost also increases over time, allowing the low end of the VLSN Index to 2321 * be truncated. 2322 * 2323 * @param unprotectedFiles set of file numbers of the files that are 2324 * potential candidates for deletion, before taking into consideration the 2325 * sync-up needs of other nodes in the replication group 2326 * 2327 * @param cleaner the cleaner, to provide the implementation needs 2328 * additional information about log files 2329 * 2330 * @return a subset of the original set, where files that we want to 2331 * protect have been removed. We never destructively modify the input set. 2332 * Instead we return either a restricted view of the original set or a new 2333 * set with copies of some of the file numbers from the original set. 2334 */ getUnprotectedFileSet( final NavigableSet<Long> unprotectedFiles, final Cleaner cleaner)2335 public NavigableSet<Long> getUnprotectedFileSet( 2336 final NavigableSet<Long> unprotectedFiles, 2337 final Cleaner cleaner) { 2338 2339 final VLSN globalCBVLSNValue = globalCBVLSN.getCBVLSN(); 2340 final long globalCBVLSNFile = globalCBVLSNValue.isNull() ? 2341 0 : 2342 getVLSNIndex().getLTEFileNumber(globalCBVLSNValue); 2343 final VLSN lastVLSN = getVLSNIndex().getRange().getLast(); 2344 final long lastVLSNFile = getVLSNIndex().getGTEFileNumber(lastVLSN); 2345 2346 /* 2347 * Compute the minimum VLSN associated with replay cost based on 2348 * REPLAY_COST_PERCENT and REPLAY_FREE_DISK_PERCENT 2349 */ 2350 replayCostMinVLSN = computeReplayCostMinVLSN( 2351 unprotectedFiles, cleaner, globalCBVLSNFile, lastVLSNFile); 2352 2353 /* 2354 * Compute the new CBVLSN as the minimum of globalCBVLSN and 2355 * replayCostMinVLSN 2356 */ 2357 final VLSN cbvlsn; 2358 final long cbvlsnFile; 2359 if (!replayCostMinVLSN.isNull() && 2360 (replayCostMinVLSN.compareTo(globalCBVLSNValue) < 0)) { 2361 2362 /* Log if the replayCostMinVLSN wins out */ 2363 if (logger.isLoggable(Level.INFO)) { 2364 LoggerUtils.info( 2365 logger, envImpl, 2366 "Unprotected file set determined by replay cost:" + 2367 " GlobalCBVLSN=" + globalCBVLSNValue + 2368 " replayCostMinVLSN=" + replayCostMinVLSN); 2369 } 2370 cbvlsn = replayCostMinVLSN; 2371 cbvlsnFile = getVLSNIndex().getLTEFileNumber(replayCostMinVLSN); 2372 } else { 2373 cbvlsn = globalCBVLSNValue; 2374 cbvlsnFile = globalCBVLSNFile; 2375 if (logger.isLoggable(Level.FINE)) { 2376 LoggerUtils.fine(logger, envImpl, 2377 "Computing unprotected file set:" + 2378 " GlobalCBVLSN=" + globalCBVLSNValue + 2379 " replayCostMinVLSN=" + replayCostMinVLSN); 2380 } 2381 } 2382 2383 /* 2384 * The files we want to preserve range from the one containing the 2385 * CBVLSN, to the one containing the last VLSN. So there could be up 2386 * to three disjoint subsets of the unprotected files: (1) those before 2387 * the CBVLSN (the "truncated files"), (2) those we want to preserve 2388 * (the "retained files"), and (3) those after the last VLSN (the 2389 * "barren files"). Since we need the retained files in (2), the 2390 * unprotected files we return from this method consist of the files in 2391 * (1) and (3). 2392 */ 2393 final NavigableSet<Long> truncatedFiles = 2394 unprotectedFiles.headSet(cbvlsnFile, false); 2395 final NavigableSet<Long> barrenFiles = 2396 unprotectedFiles.tailSet(lastVLSNFile, false); 2397 2398 if (!barrenFiles.isEmpty()) { 2399 if (logger.isLoggable(Level.INFO)) { 2400 LoggerUtils.info 2401 (logger, envImpl, 2402 "CBVLSN file is 0x" + 2403 Long.toHexString(cbvlsnFile) + 2404 " but these files have no VLSNs and can be deleted: " + 2405 enumerateFiles(barrenFiles)); 2406 2407 } 2408 } 2409 2410 /* 2411 * enumerateFiles() can be expensive, so only generate this String 2412 * if logging is set to FINER 2413 */ 2414 if (logger.isLoggable(Level.FINER)) { 2415 if (!truncatedFiles.isEmpty()) { 2416 logger.finer("Known unused files before CBVLSN start: " + 2417 enumerateFiles(truncatedFiles)); 2418 } 2419 logger.finer("Candidates for deletion: " + 2420 enumerateFiles(unprotectedFiles)); 2421 } 2422 2423 NavigableSet<Long> result; 2424 if (barrenFiles.isEmpty()) { 2425 result = truncatedFiles; 2426 } else if (truncatedFiles.isEmpty()) { 2427 result = barrenFiles; 2428 } else { 2429 2430 /* 2431 * The result needs to be made up from two disjoint subsets of the 2432 * original set, so we can't simply return a view of that set. 2433 */ 2434 result = new TreeSet<Long>(truncatedFiles); 2435 result.addAll(barrenFiles); 2436 } 2437 2438 if (result.isEmpty()) { 2439 LoggerUtils.traceAndLog( 2440 logger, repImpl, Level.WARNING, 2441 "Replication prevents deletion of " + unprotectedFiles.size() + 2442 " files by Cleaner. " + 2443 "Start file=0x" + Long.toHexString(cbvlsnFile) + 2444 " holds CBVLSN " + cbvlsn + 2445 ", end file=0x" + Long.toHexString(lastVLSNFile) + 2446 " holds last VLSN " + lastVLSN); 2447 } 2448 return result; 2449 } 2450 2451 /** 2452 * Returns the minimum VLSN of files that should be retained to support 2453 * replay, or NULL_VLSN if disabled or no files are found to protect. 2454 */ computeReplayCostMinVLSN( final NavigableSet<Long> unprotectedFiles, final Cleaner cleaner, final long globalCBVLSNFile, final long lastVLSNFile)2455 private VLSN computeReplayCostMinVLSN( 2456 final NavigableSet<Long> unprotectedFiles, 2457 final Cleaner cleaner, 2458 final long globalCBVLSNFile, 2459 final long lastVLSNFile) { 2460 2461 /* Check if disabled */ 2462 if (replayCostPercent == 0) { 2463 if (logger.isLoggable(Level.FINE)) { 2464 LoggerUtils.fine(logger, repImpl, 2465 "replayCostPercent is disabled"); 2466 } 2467 return VLSN.NULL_VLSN; 2468 } 2469 2470 /* 2471 * Use FileSummary information to determine log file sizes. Specify 2472 * false for the includeTrackedFiles parameter to say that no 2473 * information is needed about changes made since the last checkpoint 2474 * because this method is called immediately after a checkpoint. 2475 */ 2476 final Map<Long, FileSummary> fileSummaryMap = 2477 cleaner.getUtilizationProfile().getFileSummaryMap(false); 2478 2479 /* Prepare to check free disk space, if enabled */ 2480 SpaceInfo spaceInfo = (replayFreeDiskPercent != 0) ? 2481 new SpaceInfo(fileSummaryMap, unprotectedFiles, globalCBVLSNFile, 2482 lastVLSNFile) : 2483 null; 2484 2485 /* 2486 * Compute the number of bytes needed for a network restore, including 2487 * files that are already protected, plus any files containing VLSNs 2488 * between the GlobalCBVLSN and the last VLSN, because they are slated 2489 * to be protected. 2490 */ 2491 long networkRestoreBytes = 0; 2492 for (final Entry<Long, FileSummary> ent : fileSummaryMap.entrySet()) { 2493 final long file = ent.getKey(); 2494 if (!unprotectedFiles.contains(file) || 2495 ((file >= globalCBVLSNFile) && (file <= lastVLSNFile))) { 2496 final FileSummary fileSummary = ent.getValue(); 2497 networkRestoreBytes += fileSummary.totalSize; 2498 } 2499 } 2500 2501 /* 2502 * Compute the maximum number of replay bytes as a percentage of the 2503 * network restore bytes 2504 */ 2505 final long maxReplayBytes = 2506 (long) (networkRestoreBytes / (replayCostPercent / 100.0)); 2507 2508 /* 2509 * Iterate backwards over the files, from newest to oldest, counting 2510 * the size in bytes until it reaches the maximum number of replay 2511 * bytes worth retaining, and so long as we have enough free space. 2512 * Don't count barren files, since they are not used for replay and 2513 * will be deleted, or files below the VLSNIndex, since those can't be 2514 * used for replay. 2515 */ 2516 final VLSN firstVLSN = getVLSNIndex().getRange().getFirst(); 2517 final long firstVLSNFile = getVLSNIndex().getLTEFileNumber(firstVLSN); 2518 final FileSelector fileSelector = cleaner.getFileSelector(); 2519 long replayBytes = 0; 2520 VLSN newReplayCostMinVLSN = VLSN.NULL_VLSN; 2521 for (final long file : unprotectedFiles.descendingSet()) { 2522 2523 /* Ignore barren files */ 2524 if (file > lastVLSNFile) { 2525 continue; 2526 } 2527 2528 /* Done if we pass the start of the VLSN Index */ 2529 if (file < firstVLSNFile) { 2530 break; 2531 } 2532 2533 /* Ignore deleted files */ 2534 final VLSN fileFirstVLSN = fileSelector.getFirstVLSN(file); 2535 if (fileFirstVLSN == null) { 2536 continue; 2537 } 2538 2539 /* Check free disk space */ 2540 final long fileSize = fileSummaryMap.get(file).totalSize; 2541 if (spaceInfo != null) { 2542 final FileStoreSpaceInfo fileInfo = 2543 spaceInfo.getFileInfo(file); 2544 if (fileInfo != null) { 2545 if (fileSize > fileInfo.replaySpace) { 2546 if (logger.isLoggable(Level.INFO)) { 2547 LoggerUtils.info( 2548 logger, envImpl, 2549 String.format( 2550 "Limited free disk space prevented" + 2551 " retaining some log files." + 2552 " Retained %,d bytes, but wanted to" + 2553 " retain %,d bytes based on replay cost." + 2554 " Associated file store: %s", 2555 replayBytes, maxReplayBytes, fileInfo)); 2556 } 2557 break; 2558 } 2559 fileInfo.replaySpace -= fileSize; 2560 } 2561 } 2562 newReplayCostMinVLSN = fileFirstVLSN; 2563 replayBytes += fileSize; 2564 2565 /* Check if we've reached the maximum of useful replay bytes */ 2566 if (replayBytes >= maxReplayBytes) { 2567 break; 2568 } 2569 } 2570 2571 if (logger.isLoggable(Level.FINE)) { 2572 LoggerUtils.fine( 2573 logger, repImpl, 2574 String.format("Computing replayCostMinVLSN:" + 2575 " networkRestoreBytes=%,d" + 2576 " maxReplayBytes=%,d" + 2577 " replayBytes=%,d" + 2578 " firstVLSN=%s" + 2579 " replayCostMinVLSN=%s", 2580 networkRestoreBytes, maxReplayBytes, replayBytes, 2581 firstVLSN, newReplayCostMinVLSN)); 2582 } 2583 return newReplayCostMinVLSN; 2584 } 2585 2586 /** Holds space information for a file store. */ 2587 private class FileStoreSpaceInfo { 2588 private final FileStoreInfo fileStoreInfo; 2589 final long totalSpace; 2590 final long freeSpace; 2591 long replaySpace; 2592 FileStoreSpaceInfo(final FileStoreInfo fileStoreInfo)2593 FileStoreSpaceInfo(final FileStoreInfo fileStoreInfo) 2594 throws IOException { 2595 2596 this.fileStoreInfo = fileStoreInfo; 2597 totalSpace = fileStoreInfo.getTotalSpace(); 2598 freeSpace = fileStoreInfo.getUsableSpace(); 2599 replaySpace = freeSpace - getTargetFreeSpace(); 2600 } 2601 2602 /** 2603 * Returns the target free disk space for the associate file store. 2604 */ getTargetFreeSpace()2605 private long getTargetFreeSpace() { 2606 return (long) (totalSpace * (replayFreeDiskPercent / 100.0)); 2607 } 2608 2609 @Override toString()2610 public String toString() { 2611 return String.format("%s: totalSpace=%,d freeSpace=%,d" + 2612 " targetFreeSpace=%,d", 2613 fileStoreInfo, totalSpace, freeSpace, 2614 getTargetFreeSpace()); 2615 } 2616 } 2617 2618 /** 2619 * Maintain information about disk space available for log files used for 2620 * replay, so we can decide whether we have enough free space to retain 2621 * them. Information is looked up by file, but the return value expresses 2622 * the free space available on the file store (meaning volume or disk) 2623 * associated with the file. 2624 */ 2625 private class SpaceInfo { 2626 2627 /** 2628 * Maps a file store to an object storing space information for log 2629 * files in that file store. 2630 */ 2631 private final Map<FileStoreInfo, FileStoreSpaceInfo> fileStoreInfoMap = 2632 new HashMap<FileStoreInfo, FileStoreSpaceInfo>(); 2633 2634 /** 2635 * Maps a file number to the object that stores space information for 2636 * all files in the same file store. The value is shared for all 2637 * entries in this map associated with the same file store, and is 2638 * shared with the associated value in fileStoreInfoMap. 2639 */ 2640 private final Map<Long, FileStoreSpaceInfo> fileMap = 2641 new HashMap<Long, FileStoreSpaceInfo>(); 2642 2643 /** The number of IOExceptions logged. */ 2644 private int loggedIOExceptions; 2645 SpaceInfo(final Map<Long, FileSummary> fileSummaryMap, final NavigableSet<Long> unprotectedFiles, final long globalCBVLSNFile, final long lastVLSNFile)2646 SpaceInfo(final Map<Long, FileSummary> fileSummaryMap, 2647 final NavigableSet<Long> unprotectedFiles, 2648 final long globalCBVLSNFile, 2649 final long lastVLSNFile) { 2650 2651 /* 2652 * Tally information for all currently unprotected files, including 2653 * barren files, but not files containing VLSNs between the global 2654 * CBVLSN and the last VLSN, because they will be protected 2655 */ 2656 for (final long file : unprotectedFiles.descendingSet()) { 2657 if ((file > globalCBVLSNFile) && (file <= lastVLSNFile)) { 2658 continue; 2659 } 2660 final FileSummary fileSummary = fileSummaryMap.get(file); 2661 if (fileSummary != null) { 2662 tallyFile(file, fileSummary); 2663 } 2664 } 2665 } 2666 2667 /** 2668 * Returns the disk space information for the file store associated 2669 * with the specified file. 2670 */ getFileInfo(final long file)2671 FileStoreSpaceInfo getFileInfo(final long file) { 2672 return fileMap.get(file); 2673 } 2674 2675 /** 2676 * Create an entry for the file store associated with the file, if not 2677 * already present, and tally the space used by the individual file. 2678 */ tallyFile(final long file, final FileSummary fileSummary)2679 private void tallyFile(final long file, 2680 final FileSummary fileSummary) { 2681 2682 final String fileName = repImpl.getFileManager().getFullFileName( 2683 file, FileManager.JE_SUFFIX); 2684 try { 2685 final FileStoreInfo fileStoreInfo = 2686 FileStoreInfo.getInfo(fileName); 2687 FileStoreSpaceInfo fileInfo = 2688 fileStoreInfoMap.get(fileStoreInfo); 2689 if (fileInfo == null) { 2690 2691 /* 2692 * Set the initial value to the free space available beyond 2693 * the target amount. We will then add to this value the 2694 * amount of space used by replay log files located in this 2695 * file store. If the initial value is negative, then the 2696 * result will be the amount of space we have available to 2697 * store log files and still maintain the requested free 2698 * space. The result may be negative, meaning we can't 2699 * meet the requirement even if all these replay files are 2700 * deleted. 2701 */ 2702 fileInfo = new FileStoreSpaceInfo(fileStoreInfo); 2703 fileStoreInfoMap.put(fileStoreInfo, fileInfo); 2704 if (logger.isLoggable(Level.FINE)) { 2705 LoggerUtils.fine( 2706 logger, repImpl, 2707 "Space information for file store " + fileInfo); 2708 } 2709 } 2710 fileMap.put(file, fileInfo); 2711 fileInfo.replaySpace += fileSummary.totalSize; 2712 } catch (IOException e) { 2713 2714 /* 2715 * Problem getting file store information. Leave this file out 2716 * of the summary, which means that it will be retained because 2717 * we can't figure out which file store it is part of. 2718 * 2719 * Only log the first exception at INFO, to reduce clutter. 2720 */ 2721 final Level level = 2722 (loggedIOExceptions == 0) ? Level.INFO : Level.FINE; 2723 if (logger.isLoggable(level)) { 2724 loggedIOExceptions++; 2725 LoggerUtils.logMsg( 2726 logger, repImpl, level, 2727 "Problem accessing file store info for file " + 2728 fileName + ": " + e); 2729 } 2730 } 2731 } 2732 } 2733 enumerateFiles(Set<Long> fileSet)2734 private String enumerateFiles(Set<Long> fileSet) { 2735 StringBuilder sb = new StringBuilder(); 2736 for (Long f : fileSet) { 2737 sb.append(" 0x").append(Long.toHexString(f)); 2738 }; 2739 return sb.toString(); 2740 } 2741 getReplicaCloseCatchupMs()2742 long getReplicaCloseCatchupMs() { 2743 return replicaCloseCatchupMs; 2744 } 2745 getArbiter()2746 public Arbiter getArbiter() { 2747 return arbiter; 2748 } 2749 2750 /** 2751 * Shuts down the Network backup service *before* a rollback is initiated 2752 * as part of syncup, thus ensuring that NetworkRestore does not see an 2753 * inconsistent set of log files. Any network backup operations that are in 2754 * progress at this node are aborted. The client of the service will 2755 * experience network connection failures and will retry with this node 2756 * (when the service is re-established at this node), or with some other 2757 * node. 2758 * <p> 2759 * restarNetworkBackup() is then used to restart the service after it was 2760 * shut down. 2761 */ shutdownNetworkBackup()2762 final public void shutdownNetworkBackup() { 2763 logFeederManager.shutdown(); 2764 logFeederManager = null; 2765 } 2766 2767 /** 2768 * Restarts the network backup service *after* a rollback has been 2769 * completed and the log files are once again in a consistent state. 2770 */ restartNetworkBackup()2771 final public void restartNetworkBackup() { 2772 if (logFeederManager != null) { 2773 throw EnvironmentFailureException.unexpectedState(repImpl); 2774 } 2775 logFeederManager= 2776 new com.sleepycat.je.rep.impl.networkRestore.FeederManager 2777 (serviceDispatcher, repImpl, nameIdPair); 2778 } 2779 2780 /* 2781 * Used to create deliberate clock skews for testing purposes. Replicator 2782 * code should use it instead of invoking System.currentTimeMillis() 2783 * directly. 2784 */ 2785 public static class Clock { 2786 private final int skewMs; 2787 Clock(int skewMs)2788 private Clock(int skewMs) { 2789 this.skewMs = skewMs; 2790 } 2791 currentTimeMillis()2792 public long currentTimeMillis() { 2793 return System.currentTimeMillis() + skewMs; 2794 } 2795 } 2796 2797 /** 2798 * Dumps the states associated with any active Feeders as well as 2799 * information pertaining to the group CBVLSN and the composition of the 2800 * group itself. 2801 */ dumpState()2802 public String dumpState() { 2803 return "\n" + feederManager.dumpState(false /* acksOnly */) + 2804 "\nGlobalCBVLSN=" + getGroupCBVLSN() + 2805 "\n" + getGroup(); 2806 } 2807 2808 /** 2809 * Dumps the state associated with all active Feeders that supply 2810 * acknowledgments. 2811 */ dumpAckFeederState()2812 public String dumpAckFeederState() { 2813 return "\n" + feederManager.dumpState(true /* acksOnly */) + "\n"; 2814 } 2815 getElectionQuorum()2816 public ElectionQuorum getElectionQuorum() { 2817 return electionQuorum; 2818 } 2819 getDurabilityQuorum()2820 public DurabilityQuorum getDurabilityQuorum() { 2821 return durabilityQuorum; 2822 } 2823 setConvertHook(TestHook<Integer> hook)2824 public void setConvertHook(TestHook<Integer> hook) { 2825 if (convertHooks == null) { 2826 convertHooks = new HashSet<TestHook<Integer>>(); 2827 } 2828 convertHooks.add(hook); 2829 } 2830 runConvertHooks()2831 private boolean runConvertHooks () { 2832 if (convertHooks == null) { 2833 return true; 2834 } 2835 2836 for (TestHook<Integer> h : convertHooks) { 2837 assert TestHookExecute.doHookIfSet(h, 0); 2838 } 2839 return true; 2840 } 2841 2842 /** 2843 * Get the group minimum JE version. 2844 * 2845 * <p>Returns the minimum JE version that is required for all nodes that 2846 * join this node's replication group. The version returned is supported 2847 * by all current and future group members. The minimum JE version is 2848 * guaranteed to only increase over time, so long as the data for the 2849 * environment is not rolled back or lost. 2850 * 2851 * @return the group minimum JE version 2852 */ getMinJEVersion()2853 public JEVersion getMinJEVersion() { 2854 synchronized (minJEVersionLock) { 2855 return group.getMinJEVersion(); 2856 } 2857 } 2858 2859 /** 2860 * Checks if all data nodes in the replication group support the specified 2861 * JE version. Updates the group minimum JE version, and the group format 2862 * version, as needed to require all nodes joining the group to be running 2863 * at least the specified JE version. 2864 * 2865 * <p>This method should only be called on the master, because attempts to 2866 * update the rep group DB on an replica will fail. 2867 * 2868 * @param newMinJEVersion the new minimum JE version 2869 * @throws DatabaseException if an error occurs when accessing the 2870 * replication group database 2871 * @throws MinJEVersionUnsupportedException if the version is not supported 2872 * by one or more current group members 2873 */ setMinJEVersion(final JEVersion newMinJEVersion)2874 public void setMinJEVersion(final JEVersion newMinJEVersion) 2875 throws MinJEVersionUnsupportedException { 2876 2877 /* 2878 * Synchronize here on minJEVersionLock to prevent new secondary nodes 2879 * from being added while updating the minimum JE version. Electable 2880 * nodes are stored in the RepGroupDB, so the check performed on that 2881 * class's setMinJEVersion within a transaction insures that all 2882 * current nodes have been checked before the minimum JE version is 2883 * increased. But secondary nodes are not stored persistently, so 2884 * other synchronization is needed for them. 2885 */ 2886 synchronized (minJEVersionLock) { 2887 2888 /* Check if at least this version is already required */ 2889 final JEVersion groupMinJEVersion = group.getMinJEVersion(); 2890 if (groupMinJEVersion.compareTo(newMinJEVersion) >= 0) { 2891 return; 2892 } 2893 2894 for (final RepNodeImpl node : group.getDataMembers()) { 2895 JEVersion nodeJEVersion = node.getJEVersion(); 2896 if (getNodeName().equals(node.getName())) { 2897 2898 /* Use the current software version for the local node */ 2899 nodeJEVersion = repImpl.getCurrentJEVersion(); 2900 } else { 2901 2902 /* Use the version recorded by the feeder for replicas */ 2903 final Feeder feeder = 2904 feederManager.getFeeder(node.getName()); 2905 if (feeder != null) { 2906 final JEVersion currentReplicaJEVersion = 2907 feeder.getReplicaJEVersion(); 2908 if (currentReplicaJEVersion != null) { 2909 nodeJEVersion = currentReplicaJEVersion; 2910 } 2911 } 2912 } 2913 if ((nodeJEVersion == null) || 2914 (newMinJEVersion.compareTo(nodeJEVersion) > 0)) { 2915 throw new MinJEVersionUnsupportedException( 2916 newMinJEVersion, node.getName(), nodeJEVersion); 2917 } 2918 } 2919 repGroupDB.setMinJEVersion(newMinJEVersion); 2920 } 2921 } 2922 2923 /** 2924 * Adds a secondary node to the group. Assign a node ID and add the node 2925 * to the RepGroupImpl. Don't notify the monitor: secondary nodes do not 2926 * generate GroupChangeEvents. 2927 * 2928 * @param node the node 2929 * @throws IllegalStateException if the store does not currently support 2930 * secondary nodes or the node doesn't meet the current minimum JE 2931 * version 2932 * @throws NodeConflictException if the node conflicts with an existing 2933 * persistent node 2934 */ addSecondaryNode(final RepNodeImpl node)2935 public void addSecondaryNode(final RepNodeImpl node) { 2936 if (!node.getType().isSecondary()) { 2937 throw new IllegalArgumentException( 2938 "Attempt to call addSecondaryNode with a" + 2939 " non-SECONDARY node: " + node); 2940 } 2941 final JEVersion requiredJEVersion = 2942 RepGroupImpl.FORMAT_VERSION_3_JE_VERSION; 2943 try { 2944 setMinJEVersion(requiredJEVersion); 2945 } catch (MinJEVersionUnsupportedException e) { 2946 if (e.nodeVersion == null) { 2947 throw new IllegalStateException( 2948 "Secondary nodes are not currently supported." + 2949 " The version running on node " + e.nodeName + 2950 " could not be determined," + 2951 " but this feature requires version " + 2952 requiredJEVersion.getNumericVersionString() + 2953 " or later."); 2954 } 2955 throw new IllegalStateException( 2956 "Secondary nodes are not currently supported." + 2957 " Node " + e.nodeName + " is running version " + 2958 e.nodeVersion.getNumericVersionString() + 2959 ", but this feature requires version " + 2960 requiredJEVersion.getNumericVersionString() + 2961 " or later."); 2962 } 2963 2964 /* 2965 * Synchronize on minJEVersionLock to coordinate with setMinJEVersion 2966 */ 2967 synchronized (minJEVersionLock) { 2968 final JEVersion minJEVersion = group.getMinJEVersion(); 2969 if (node.getJEVersion().compareTo(minJEVersion) < 0) { 2970 throw new IllegalStateException( 2971 "The node does not meet the minimum required version" + 2972 " for the group." + 2973 " Node " + node.getNameIdPair().getName() + 2974 " is running version " + node.getJEVersion() + 2975 ", but the minimum required version is " + 2976 minJEVersion); 2977 } 2978 if (!node.getNameIdPair().hasNullId()) { 2979 throw new IllegalStateException( 2980 "New secondary node " + node.getNameIdPair().getName() + 2981 " already has an ID: " + node.getNameIdPair().getId()); 2982 } 2983 node.getNameIdPair().setId(secondaryNodeIds.allocateId()); 2984 group.addSecondaryNode(node); 2985 } 2986 } 2987 2988 /** 2989 * Removes a secondary node from the group. Remove the node from the 2990 * RepGroupImpl and deallocate the node ID. 2991 * 2992 * @param node the node 2993 */ removeSecondaryNode(final RepNodeImpl node)2994 public void removeSecondaryNode(final RepNodeImpl node) { 2995 if (!node.getType().isSecondary()) { 2996 throw new IllegalArgumentException( 2997 "Attempt to call removeSecondaryNode with a" + 2998 " non-SECONDARY node: " + node); 2999 } 3000 group.removeSecondaryNode(node); 3001 secondaryNodeIds.deallocateId(node.getNodeId()); 3002 } 3003 3004 /** 3005 * Track node IDs for secondary nodes. IDs are allocated from the specified 3006 * number of values at the high end of the range of integers. 3007 */ 3008 static class SecondaryNodeIds { 3009 private final int size; 3010 private final BitSet bits; 3011 3012 /** Creates an instance that allocates the specified number of IDs. */ SecondaryNodeIds(final int size)3013 SecondaryNodeIds(final int size) { 3014 this.size = size; 3015 assert size > 0; 3016 bits = new BitSet(size); 3017 } 3018 3019 /** 3020 * Allocates a free ID, throwing IllegalStateException if none are 3021 * available. 3022 */ allocateId()3023 synchronized int allocateId() { 3024 3025 /* 3026 * Note that scanning for the next clear bit is somewhat 3027 * inefficient, but this inefficiency shouldn't matter given the 3028 * small number of secondary nodes expected. If needed, the next 3029 * improvement would probably be to remember the last allocated ID, 3030 * to avoid repeated scans of an initial range of already allocated 3031 * bits. 3032 */ 3033 final int pos = bits.nextClearBit(0); 3034 if (pos >= size) { 3035 throw new IllegalStateException("No more secondary node IDs"); 3036 } 3037 bits.set(pos); 3038 return Integer.MAX_VALUE - pos; 3039 } 3040 3041 /** 3042 * Deallocates a previously allocated ID, throwing 3043 * IllegalArgumentException if the argument was not allocated by 3044 * allocateId or if the ID is not currently allocated. 3045 */ deallocateId(final int id)3046 synchronized void deallocateId(final int id) { 3047 if (id < Integer.MAX_VALUE - size) { 3048 throw new IllegalArgumentException( 3049 "Illegal secondary node ID: " + id); 3050 } 3051 final int pos = Integer.MAX_VALUE - id; 3052 if (!bits.get(pos)) { 3053 throw new IllegalArgumentException( 3054 "Secondary node ID is not currently allocated: " + id); 3055 } 3056 bits.clear(pos); 3057 } 3058 } 3059 } 3060