1 /*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2002, 2014 Oracle and/or its affiliates. All rights reserved. 5 * 6 */ 7 8 package com.sleepycat.je.rep.impl; 9 10 import static com.sleepycat.je.rep.NoConsistencyRequiredPolicy.NO_CONSISTENCY; 11 import static com.sleepycat.je.rep.impl.RepParams.GROUP_NAME; 12 import static com.sleepycat.je.rep.impl.RepParams.NODE_NAME; 13 import static com.sleepycat.je.rep.impl.RepParams.RESET_REP_GROUP_RETAIN_UUID; 14 15 import java.io.File; 16 import java.util.HashMap; 17 import java.util.Map; 18 import java.util.UUID; 19 import java.util.logging.Logger; 20 21 import com.sleepycat.bind.tuple.StringBinding; 22 import com.sleepycat.bind.tuple.TupleBinding; 23 import com.sleepycat.bind.tuple.TupleInput; 24 import com.sleepycat.bind.tuple.TupleOutput; 25 import com.sleepycat.je.Cursor; 26 import com.sleepycat.je.CursorConfig; 27 import com.sleepycat.je.Database; 28 import com.sleepycat.je.DatabaseConfig; 29 import com.sleepycat.je.DatabaseEntry; 30 import com.sleepycat.je.DatabaseException; 31 import com.sleepycat.je.DatabaseNotFoundException; 32 import com.sleepycat.je.DbInternal; 33 import com.sleepycat.je.Durability; 34 import com.sleepycat.je.Durability.ReplicaAckPolicy; 35 import com.sleepycat.je.Durability.SyncPolicy; 36 import com.sleepycat.je.Environment; 37 import com.sleepycat.je.EnvironmentConfig; 38 import com.sleepycat.je.EnvironmentFailureException; 39 import com.sleepycat.je.JEVersion; 40 import com.sleepycat.je.LockConflictException; 41 import com.sleepycat.je.LockMode; 42 import com.sleepycat.je.OperationStatus; 43 import com.sleepycat.je.Transaction; 44 import com.sleepycat.je.TransactionConfig; 45 import com.sleepycat.je.dbi.DatabaseImpl; 46 import com.sleepycat.je.dbi.DbConfigManager; 47 import com.sleepycat.je.dbi.DbTree; 48 import com.sleepycat.je.dbi.DbType; 49 import com.sleepycat.je.rep.InsufficientAcksException; 50 import com.sleepycat.je.rep.InsufficientReplicasException; 51 import com.sleepycat.je.rep.NodeType; 52 import com.sleepycat.je.rep.impl.RepGroupImpl.BarrierState; 53 import com.sleepycat.je.rep.impl.node.Feeder; 54 import com.sleepycat.je.rep.impl.node.NameIdPair; 55 import com.sleepycat.je.rep.impl.node.RepNode; 56 import com.sleepycat.je.rep.monitor.GroupChangeEvent.GroupChangeType; 57 import com.sleepycat.je.rep.stream.Protocol; 58 import com.sleepycat.je.rep.txn.MasterTxn; 59 import com.sleepycat.je.rep.txn.ReadonlyTxn; 60 import com.sleepycat.je.rep.util.DbResetRepGroup; 61 import com.sleepycat.je.rep.utilint.HostPortPair; 62 import com.sleepycat.je.txn.Txn; 63 import com.sleepycat.je.utilint.LoggerUtils; 64 import com.sleepycat.je.utilint.VLSN; 65 66 /** 67 * This class is used to encapsulate all access to the rep group data that is 68 * present in every replicated JE environment. The rep group data exists 69 * primarily to support dynamic group membership. Both read and update access 70 * must be done through the APIs provided by this class. 71 * 72 * The database is simply a representation of the RepGroup. Each entry in the 73 * database represents a node in RepGroup; the key is the String node name, and 74 * the data is the serialized ReplicationNode. There is a special entry keyed 75 * by GROUP_KEY that holds the contents of the RepGroup (excluding the nodes) 76 * itself. 77 * 78 * The database may be modified concurrently by multiple transactions as a 79 * master processes requests to update it. It may also be accessed by multiple 80 * overlapping transactions as a Replica replays the rep stream. These updates 81 * need to be interleaved with operations like getGroup() that create copies of 82 * the RepGroup instance. To avoid deadlocks, entries in the database are 83 * accessed in order of ascending key. GROUP_KEY in particular is associated 84 * with the lowest key value so that it's locked first implicitly as part of 85 * any iteration and any other modifications to the database must first lock it 86 * before making changes to the group itself. 87 * 88 * An instance of this class is created as part of a replication node and is 89 * retained for the entire lifetime of that node. 90 */ 91 public class RepGroupDB { 92 93 private final RepImpl repImpl; 94 95 /* A convenient, cached empty group. */ 96 public final RepGroupImpl emptyGroup; 97 98 private final Logger logger; 99 100 /* The key used to store group-wide information in the database. It must 101 * be the lowest key in the database, so that it's locked first during 102 * database iteration. 103 */ 104 public final static String GROUP_KEY = "$$GROUP_KEY$$"; 105 public final static DatabaseEntry groupKeyEntry = new DatabaseEntry(); 106 107 /* Initialize the entry. */ 108 static { StringBinding.stringToEntry(GROUP_KEY, groupKeyEntry)109 StringBinding.stringToEntry(GROUP_KEY, groupKeyEntry); 110 } 111 112 /* The fixed DB ID associated with the internal rep group database. */ 113 public final static long DB_ID = DbTree.NEG_DB_ID_START - 1; 114 115 /* 116 * Number of times to retry for ACKs on the master before returning to 117 * to the Replica, which will then again retry on some periodic basis. 118 */ 119 private final static int QUORUM_ACK_RETRIES = 5; 120 121 /* Convenience Durability and Config constants. */ 122 private final static Durability QUORUM_ACK_DURABILITY = 123 new Durability(SyncPolicy.SYNC, 124 SyncPolicy.SYNC, 125 ReplicaAckPolicy.SIMPLE_MAJORITY); 126 127 private final static TransactionConfig QUORUM_ACK = 128 new TransactionConfig(); 129 130 private final static TransactionConfig NO_ACK = new TransactionConfig(); 131 132 /* 133 * TODO: Change this when we support true read only transactions. 134 */ 135 final static TransactionConfig READ_ONLY = NO_ACK; 136 137 private final static Durability NO_ACK_DURABILITY = 138 new Durability(SyncPolicy.SYNC, 139 SyncPolicy.SYNC, 140 ReplicaAckPolicy.NONE); 141 142 private final static Durability NO_ACK_NO_SYNC_DURABILITY = 143 new Durability(SyncPolicy.NO_SYNC, 144 SyncPolicy.NO_SYNC, 145 ReplicaAckPolicy.NONE); 146 147 private final static TransactionConfig NO_ACK_NO_SYNC = 148 new TransactionConfig(); 149 150 static { 151 /* Initialize config constants. */ 152 QUORUM_ACK.setDurability(QUORUM_ACK_DURABILITY); 153 NO_ACK.setDurability(NO_ACK_DURABILITY); 154 NO_ACK_NO_SYNC.setDurability(NO_ACK_NO_SYNC_DURABILITY); 155 } 156 157 /** 158 * Create an instance. Note that the database handle is not initialized at 159 * this time, since the state of the node master/replica is not known 160 * at the time the replication node (and consequently this instance) is 161 * created. 162 * @throws DatabaseException 163 */ RepGroupDB(RepImpl repImpl)164 public RepGroupDB(RepImpl repImpl) 165 throws DatabaseException { 166 167 this.repImpl = repImpl; 168 169 DbConfigManager configManager = repImpl.getConfigManager(); 170 emptyGroup = new RepGroupImpl(configManager.get(GROUP_NAME), 171 repImpl.getCurrentJEVersion()); 172 logger = LoggerUtils.getLogger(getClass()); 173 } 174 175 /** 176 * Returns all the members that are currently part of the replication 177 * group, using NO_CONSISTENCY. This method can read the database directly, 178 * and can be used when the replicated environment is detached and the 179 * RepNode is null. It's for the latter reason that the method reads 180 * uncommitted data. In detached mode, there may be transactions on the 181 * database that were in progress when the node was last shutdown. These 182 * transactions may have locks which will not be released until after the 183 * node is re-attached and the replication stream is resumed. Using 184 * uncommitted reads avoids use of locks in this circumstance. It's safe to 185 * read these records, since the database will eventually be updated with 186 * these changes. 187 * 188 * @return the group object 189 * @throws DatabaseException if the object could not be obtained 190 */ getGroup(RepImpl rImpl, String groupName)191 public static RepGroupImpl getGroup(RepImpl rImpl, 192 String groupName) 193 throws DatabaseException { 194 195 /* Get persistent nodes from the database */ 196 DatabaseImpl dbImpl = null; 197 boolean foundDbImpl = false; 198 try { 199 dbImpl = rImpl.getGroupDb(); 200 foundDbImpl = true; 201 } catch (DatabaseNotFoundException e) { 202 } 203 final RepGroupImpl group; 204 if (!foundDbImpl) { 205 206 /* Creates a temporary placeholder group for use until the real 207 * definition comes over the replication stream as part of the 208 * replicated group database. 209 */ 210 group = new RepGroupImpl(groupName, true, 211 rImpl.getCurrentJEVersion()); 212 213 } else { 214 final TransactionConfig txnConfig = new TransactionConfig(); 215 txnConfig.setDurability(READ_ONLY.getDurability()); 216 txnConfig.setConsistencyPolicy(NO_CONSISTENCY); 217 txnConfig.setReadUncommitted(true); 218 219 Txn txn = null; 220 try { 221 txn = new ReadonlyTxn(rImpl, txnConfig); 222 group = fetchGroup(groupName, dbImpl, txn); 223 /* 224 * Correct summary info since we are reading uncommitted data 225 */ 226 group.makeConsistent(); 227 txn.commit(); 228 txn = null; 229 } finally { 230 if (txn != null) { 231 txn.abort(); 232 } 233 } 234 } 235 236 /* Get secondary nodes from their feeders */ 237 final RepNode repNode = rImpl.getRepNode(); 238 if (repNode != null) { 239 for (final Feeder feeder : 240 repNode.feederManager().activeReplicasMap().values()) { 241 final RepNodeImpl node = feeder.getReplicaNode(); 242 if (node.getType().isSecondary()) { 243 group.addSecondaryNode(node); 244 } 245 } 246 } 247 248 return group; 249 } 250 getGroup()251 public RepGroupImpl getGroup() 252 throws DatabaseException { 253 254 return getGroup(repImpl, 255 repImpl.getConfigManager().get(GROUP_NAME)); 256 } 257 258 /** 259 * Sets the minimum JE version required for nodes to join the replication 260 * group and refreshes the group object cached in the rep group. Throws a 261 * {@link MinJEVersionUnsupportedException} if the requested version is not 262 * supported by current nodes. 263 * 264 * <p>If this method returns successfully, nodes that are running a JE 265 * version older than the one specified will not be permitted to join the 266 * replication group in the future. Use this method to implement features 267 * that require all group members to meet a minimum version requirement. 268 * 269 * <p>The update attempts to obtain acknowledgments from a simple majority, 270 * to make sure that future masters agree that the update has taken place, 271 * but does not require this. 272 * 273 * @param newMinJEVersion the new minimum JE version 274 * @throws DatabaseException if an error occurs when accessing the 275 * replication group database 276 * @throws MinJEVersionUnsupportedException if the requested version is not 277 * supported 278 */ setMinJEVersion(final JEVersion newMinJEVersion)279 public void setMinJEVersion(final JEVersion newMinJEVersion) 280 throws DatabaseException, MinJEVersionUnsupportedException { 281 282 final DatabaseImpl groupDbImpl; 283 try { 284 groupDbImpl = repImpl.getGroupDb(); 285 } catch (DatabaseNotFoundException e) { 286 /* Should never happen. */ 287 throw EnvironmentFailureException.unexpectedException(e); 288 } 289 MasterTxn txn = 290 new MasterTxn(repImpl, QUORUM_ACK, repImpl.getNameIdPair()); 291 try { 292 RepGroupImpl repGroup = 293 fetchGroupObject(txn, groupDbImpl, LockMode.RMW); 294 repGroup = fetchGroup(repGroup.getName(), groupDbImpl, txn); 295 repGroup.setMinJEVersion(newMinJEVersion); 296 saveGroupObject(txn, repGroup, groupDbImpl); 297 txn.commit(QUORUM_ACK_DURABILITY); 298 txn = null; 299 } catch (InsufficientAcksException e) { 300 301 /* 302 * Didn't receive acknowledgments from a simple majority. OK to 303 * proceed, since this operation will be repeated if the change is 304 * lost. 305 */ 306 LoggerUtils.info(logger, repImpl, 307 "Proceeding without acks for minimum JE version"); 308 } finally { 309 if (txn != null) { 310 txn.abort(); 311 } 312 } 313 repImpl.getRepNode().refreshCachedGroup(); 314 } 315 316 /** 317 * All rep group db access uses cursors with eviction disabled. 318 */ makeCursor(DatabaseImpl dbImpl, Txn txn, CursorConfig cursorConfig)319 static private Cursor makeCursor(DatabaseImpl dbImpl, 320 Txn txn, 321 CursorConfig cursorConfig) { 322 Cursor cursor = DbInternal.makeCursor(dbImpl, 323 txn, 324 cursorConfig); 325 DbInternal.getCursorImpl(cursor).setAllowEviction(false); 326 return cursor; 327 } 328 329 /** 330 * Returns a representation of the nodes of the group stored in the 331 * database, using the txn and handles that were passed in. 332 */ fetchGroup(String groupName, DatabaseImpl dbImpl, Txn txn)333 private static RepGroupImpl fetchGroup(String groupName, 334 DatabaseImpl dbImpl, 335 Txn txn) 336 throws DatabaseException { 337 338 final DatabaseEntry keyEntry = new DatabaseEntry(); 339 final DatabaseEntry value = new DatabaseEntry(); 340 NodeBinding nodeBinding = null; 341 final GroupBinding groupBinding = new GroupBinding(); 342 343 RepGroupImpl group = null; 344 Map <Integer, RepNodeImpl> nodes = 345 new HashMap<Integer, RepNodeImpl>(); 346 final CursorConfig cursorConfig = new CursorConfig(); 347 cursorConfig.setReadCommitted(true); 348 349 Cursor mcursor = null; 350 351 try { 352 mcursor = makeCursor(dbImpl, txn, cursorConfig); 353 while (mcursor.getNext(keyEntry, value, LockMode.DEFAULT) == 354 OperationStatus.SUCCESS) { 355 356 final String key = StringBinding.entryToString(keyEntry); 357 358 if (GROUP_KEY.equals(key)) { 359 group = groupBinding.entryToObject(value); 360 if (!group.getName().equals(groupName)) { 361 throw EnvironmentFailureException.unexpectedState 362 ("The argument: " + groupName + 363 " does not match the expected group name: " + 364 group.getName()); 365 } 366 367 /* 368 * The group entry should always be first, so we can use it 369 * to provide the group version for reading node entries. 370 */ 371 nodeBinding = new NodeBinding(group.getFormatVersion()); 372 } else { 373 if (nodeBinding == null) { 374 throw new IllegalStateException( 375 "Found node binding before group binding"); 376 } 377 final RepNodeImpl node = nodeBinding.entryToObject(value); 378 nodes.put(node.getNameIdPair().getId(), node); 379 } 380 } 381 if (group == null) { 382 throw EnvironmentFailureException.unexpectedState 383 ("Group key: " + GROUP_KEY + " is missing"); 384 } 385 group.setNodes(nodes); 386 return group; 387 } finally { 388 if (mcursor != null) { 389 mcursor.close(); 390 } 391 } 392 } 393 394 /** 395 * Ensures that information about this node, the current master, is in the 396 * member database. If it isn't, enter it into the database. If the 397 * database does not exist, create it as well. 398 * 399 * <p>Note that this overloading is only used by a node that is the master. 400 * 401 * @throws DatabaseException 402 */ addFirstNode()403 public void addFirstNode() 404 throws DatabaseException { 405 406 DbConfigManager configManager = repImpl.getConfigManager(); 407 String groupName = configManager.get(GROUP_NAME); 408 String nodeName = configManager.get(NODE_NAME); 409 410 DatabaseImpl groupDbImpl = repImpl.createGroupDb(); 411 412 /* setup the group information as data. */ 413 RepGroupImpl repGroup = 414 new RepGroupImpl(groupName, repImpl.getCurrentJEVersion()); 415 GroupBinding groupBinding = 416 new GroupBinding(repGroup.getFormatVersion()); 417 DatabaseEntry groupEntry = new DatabaseEntry(); 418 groupBinding.objectToEntry(repGroup, groupEntry); 419 420 /* Create the common group entry. */ 421 TransactionConfig txnConfig = new TransactionConfig(); 422 txnConfig.setDurability(NO_ACK.getDurability()); 423 txnConfig.setConsistencyPolicy(NO_CONSISTENCY); 424 Txn txn = null; 425 Cursor cursor = null; 426 try { 427 txn = new MasterTxn(repImpl, 428 txnConfig, 429 repImpl.getNameIdPair()); 430 431 cursor = makeCursor(groupDbImpl, txn, CursorConfig.DEFAULT); 432 OperationStatus status = cursor.put(groupKeyEntry, groupEntry); 433 if (status != OperationStatus.SUCCESS) { 434 throw EnvironmentFailureException.unexpectedState 435 ("Couldn't write first group entry " + status); 436 } 437 cursor.close(); 438 cursor = null; 439 txn.commit(); 440 txn = null; 441 } finally { 442 if (cursor != null) { 443 cursor.close(); 444 } 445 446 if (txn != null) { 447 txn.abort(); 448 } 449 } 450 451 ensureMember(new RepNodeImpl(nodeName, 452 repImpl.getHostName(), 453 repImpl.getPort(), 454 repImpl.getCurrentJEVersion())); 455 } 456 457 /** 458 * Ensures that the membership info for the replica is in the database. A 459 * call to this method is initiated by the master as part of the 460 * feeder/replica handshake, where the replica provides membership 461 * information as part of the handshake protocol. The membership database 462 * must already exist, with the master in it, when this method is invoked. 463 * 464 * <p>This method should not be called for secondary nodes. 465 * 466 * @param membershipInfo provided by the replica 467 * 468 * @throws InsufficientReplicasException upon failure of 2p member update 469 * @throw InsufficientAcksException upon failure of 2p member update 470 * @throws DatabaseException when the membership info could not be entered 471 * into the membership database. 472 */ ensureMember(Protocol.NodeGroupInfo membershipInfo)473 public void ensureMember(Protocol.NodeGroupInfo membershipInfo) 474 throws InsufficientReplicasException, 475 InsufficientAcksException, 476 DatabaseException { 477 478 ensureMember(new RepNodeImpl(membershipInfo)); 479 } 480 ensureMember(RepNodeImpl ensureNode)481 void ensureMember(RepNodeImpl ensureNode) 482 throws DatabaseException { 483 484 if (ensureNode.getType().isSecondary()) { 485 throw new IllegalArgumentException( 486 "Attempt to call ensureMember on SECONDARY node: " + 487 ensureNode); 488 } 489 DatabaseImpl groupDbImpl; 490 try { 491 groupDbImpl = repImpl.getGroupDb(); 492 } catch (DatabaseNotFoundException e) { 493 /* Should never happen. */ 494 throw EnvironmentFailureException.unexpectedException(e); 495 } 496 497 DatabaseEntry nodeNameKey = new DatabaseEntry(); 498 StringBinding.stringToEntry(ensureNode.getName(), nodeNameKey); 499 500 DatabaseEntry value = new DatabaseEntry(); 501 NodeBinding mib = null; 502 503 Txn txn = null; 504 Cursor cursor = null; 505 try { 506 txn = new ReadonlyTxn(repImpl, NO_ACK); 507 508 /* 509 * Fetch the group so we know the group format version. Read the 510 * group before reading the node entry in each case to avoid the 511 * potential of deadlocks caused by reversing the order of lock 512 * acquisition. 513 */ 514 final RepGroupImpl repGroup = 515 fetchGroupObject(txn, groupDbImpl, LockMode.DEFAULT); 516 mib = new NodeBinding(repGroup.getFormatVersion()); 517 518 CursorConfig config = new CursorConfig(); 519 config.setReadCommitted(true); 520 cursor = makeCursor(groupDbImpl, txn, config); 521 522 OperationStatus status = 523 cursor.getSearchKey(nodeNameKey, value, null); 524 if (status == OperationStatus.SUCCESS) { 525 /* Let's see if the entry needs updating. */ 526 RepNodeImpl miInDb = mib.entryToObject(value); 527 if (miInDb.equivalent(ensureNode)) { 528 if (miInDb.isQuorumAck()) { 529 /* Present, matched and acknowledged. */ 530 return; 531 } 532 ensureNode.getNameIdPair().update(miInDb.getNameIdPair()); 533 /* Not acknowledged, retry the update. */ 534 } else { 535 /* Present but not equivalent. */ 536 LoggerUtils.warning(logger, repImpl, 537 "Incompatible node descriptions. " + 538 "Membership database definition: " + 539 miInDb.toString() + 540 " Transient definition: " + 541 ensureNode.toString()); 542 if (ensureNode.getType() != miInDb.getType()) { 543 throw EnvironmentFailureException.unexpectedState( 544 "Conflicting node types for node " + 545 ensureNode.getName() + 546 ": expected " + ensureNode.getType() + 547 ", found " + miInDb.getType()); 548 } 549 throw EnvironmentFailureException.unexpectedState( 550 "Incompatible node descriptions for node: " + 551 ensureNode.getName() + ", node ID: " + 552 ensureNode.getNodeId()); 553 } 554 LoggerUtils.info(logger, repImpl, 555 "Present but not ack'd node: " + 556 ensureNode.getNodeId() + 557 " ack status: " + miInDb.isQuorumAck()); 558 } 559 cursor.close(); 560 cursor = null; 561 txn.commit(); 562 txn = null; 563 } finally { 564 if (cursor != null) { 565 cursor.close(); 566 } 567 568 if (txn != null) { 569 txn.abort(); 570 } 571 572 } 573 createMember(ensureNode); 574 575 /* Refresh group and Fire an ADD GroupChangeEvent. */ 576 refreshGroupAndNotifyGroupChange 577 (ensureNode.getName(), GroupChangeType.ADD); 578 } 579 refreshGroupAndNotifyGroupChange(String nodeName, GroupChangeType opType)580 private void refreshGroupAndNotifyGroupChange(String nodeName, 581 GroupChangeType opType) { 582 repImpl.getRepNode().refreshCachedGroup(); 583 repImpl.getRepNode().getMonitorEventManager().notifyGroupChange 584 (nodeName, opType); 585 } 586 587 /** 588 * Removes a node from the replication group by marking the node's entry in 589 * the rep group db as removed, and optionally deleting the entry. 590 * 591 * <p>This method should not be called for secondary nodes. 592 */ removeMember(final RepNodeImpl removeNode, final boolean delete)593 public void removeMember(final RepNodeImpl removeNode, 594 final boolean delete) { 595 LoggerUtils.info(logger, repImpl, 596 (delete ? "Deleting node: " : "Removing node: ") + 597 removeNode.getName()); 598 599 if (removeNode.getType().isSecondary()) { 600 throw new IllegalArgumentException( 601 "Attempt to call removeMember on a SECONDARY node: " + 602 removeNode); 603 } 604 605 TwoPhaseUpdate twoPhaseUpdate = new TwoPhaseUpdate(removeNode, true) { 606 607 @Override 608 void phase1Body() { 609 final RepGroupImpl repGroup = 610 fetchGroupObject(txn, groupDbImpl, LockMode.RMW); 611 int changeVersion = repGroup.incrementChangeVersion(); 612 saveGroupObject(txn, repGroup, groupDbImpl); 613 node.setChangeVersion(changeVersion); 614 node.setRemoved(true); 615 saveNodeObject(txn, node, groupDbImpl, repGroup); 616 } 617 /** Override phase 2 to delete the node entry if delete is true. */ 618 @Override 619 void phase2Body() { 620 if (!delete) { 621 super.phase2Body(); 622 return; 623 } 624 final DatabaseEntry nodeNameKey = new DatabaseEntry(); 625 StringBinding.stringToEntry(removeNode.getName(), nodeNameKey); 626 final Cursor cursor = 627 makeCursor(groupDbImpl, txn, CursorConfig.DEFAULT); 628 try { 629 final OperationStatus status = cursor.getSearchKey( 630 nodeNameKey, new DatabaseEntry(), LockMode.RMW); 631 if (status != OperationStatus.SUCCESS) { 632 throw EnvironmentFailureException.unexpectedState( 633 "Node ID: " + removeNode.getNameIdPair() + 634 " not present in group db"); 635 } 636 cursor.delete(); 637 } finally { 638 cursor.close(); 639 } 640 } 641 }; 642 643 twoPhaseUpdate.execute(); 644 645 /* Refresh group and fire a REMOVE GroupChangeEvent. */ 646 refreshGroupAndNotifyGroupChange 647 (removeNode.getName(), GroupChangeType.REMOVE); 648 649 LoggerUtils.info(logger, repImpl, 650 "Successfully deleted node: " + removeNode.getName()); 651 } 652 653 /* Add a new rep node into the RepGroupDB. */ createMember(final RepNodeImpl node)654 private void createMember(final RepNodeImpl node) 655 throws InsufficientReplicasException, 656 InsufficientAcksException, 657 DatabaseException { 658 659 LoggerUtils.fine 660 (logger, repImpl, "Adding node: " + node.getNameIdPair()); 661 662 twoPhaseMemberUpdate(node, true); 663 664 LoggerUtils.info(logger, repImpl, 665 "Successfully added node:" + node.getNameIdPair() + 666 " HostPort = " + node.getHostName() + ": " + 667 node.getPort() + " [" + node.getType() + "]"); 668 } 669 670 /* 671 * Update a current rep node information in the RepGroupDB. 672 * 673 * <p>This method should not be called for secondary nodes. 674 * 675 * @param node the new node information 676 * @param quorumAck whether to require acknowledgments from a quorum 677 */ updateMember(final RepNodeImpl node, final boolean quorumAck)678 public void updateMember(final RepNodeImpl node, final boolean quorumAck) 679 throws InsufficientReplicasException, 680 InsufficientAcksException, 681 DatabaseException { 682 683 if (node.getType().isSecondary()) { 684 throw new IllegalArgumentException( 685 "Attempt to call updateMember on a SECONDARY node: " + node); 686 } 687 688 LoggerUtils.fine(logger, repImpl, "Updating node: " + node); 689 690 twoPhaseMemberUpdate(node, quorumAck); 691 692 // TODO: clean up the Monitor interface. There are several aspects of 693 // that interface that need fixing; but in particular it ought to have 694 // a way to inform listeners that a node has moved to a new network 695 // address. Once that's done, the following should be replaced by a 696 // full refreshGroupAndNotifyGroupChange(). And actually that 697 // operation should be done closer to where we know the GroupDB has 698 // been changed. In particular, if the GroupDB update suffers an IAE, 699 // the exception blows by the following, even though the database 700 // actually does now contain the updated value. 701 // 702 repImpl.getRepNode().refreshCachedGroup(); 703 704 LoggerUtils.info(logger, repImpl, 705 "Successfully updated node: " + node.getNameIdPair() + 706 " Hostport = " + node.getHostName() + ": " + 707 node.getPort() + " [" + node.getType() + "]"); 708 } 709 710 /** 711 * Implements the two phase update of membership information. 712 * 713 * In the first phase the master repeatedly tries to commit the "put" 714 * operation until it gets a Quorum of acks, ensuring that the operation 715 * has been made durable. Nodes that obtain this entry will start using it 716 * in elections. However, the node itself will not participate in elections 717 * until it has successfully completed phase 2. 718 * 719 * In the second phase, the entry for the member is updated to note 720 * that a quorum of acks was received. 721 * 722 * Failure leaves the database with the member info absent, or 723 * present but without the update to quorumAcks indicating that a 724 * quorum has acknowledged the change. 725 * 726 * @param node the member info for the node. 727 * @param quorumAck whether to require acknowledgments from a quorum 728 * 729 * @throws DatabaseException upon failure. 730 */ twoPhaseMemberUpdate(final RepNodeImpl node, final boolean quorumAck)731 private void twoPhaseMemberUpdate(final RepNodeImpl node, 732 final boolean quorumAck) 733 throws InsufficientReplicasException, 734 InsufficientAcksException, 735 DatabaseException { 736 737 TwoPhaseUpdate twoPhaseUpdate = new TwoPhaseUpdate(node, quorumAck) { 738 739 @Override 740 void phase1Body() { 741 RepGroupImpl repGroup = 742 fetchGroupObject(txn, groupDbImpl, LockMode.RMW); 743 repGroup = fetchGroup(repGroup.getName(), groupDbImpl, txn); 744 repGroup.checkForConflicts(node); 745 int changeVersion = repGroup.incrementChangeVersion(); 746 if (node.getNameIdPair().hasNullId()) { 747 node.getNameIdPair().setId(repGroup.getNextNodeId()); 748 } 749 saveGroupObject(txn, repGroup, groupDbImpl); 750 node.setChangeVersion(changeVersion); 751 final RepNodeImpl existingNode = 752 repGroup.getNode(node.getName()); 753 if ((existingNode != null) && (node.getJEVersion() == null)) { 754 node.updateJEVersion(existingNode.getJEVersion()); 755 } 756 saveNodeObject(txn, node, groupDbImpl, repGroup); 757 } 758 759 @Override 760 void deadlockHandler() { 761 node.getNameIdPair().revertToNull(); 762 } 763 764 @Override 765 void insufficientReplicasHandler() { 766 node.getNameIdPair().revertToNull(); 767 } 768 }; 769 770 twoPhaseUpdate.execute(); 771 } 772 773 /** 774 * Updates the database entry associated with the node with the new local 775 * CBVLSN, if it can do so without encountering lock contention, and unless 776 * the node is a secondary node. Also updates the rep node's transient 777 * group information about the global CBVLSN. If it encounters contention, 778 * it returns false, and the caller must retry at some later point in time. 779 * 780 * Note that changes to the local CBVLSN do not update the group version 781 * number since they do not impact group membership. 782 * 783 * @param nameIdPair identifies the node being updated 784 * @param newCBVLSN the new local CBVLSN to be associated with the node. 785 * @param nodeType the node type of the RepNode 786 * @return true if the update succeeded. 787 * @throws DatabaseException 788 */ updateLocalCBVLSN(final NameIdPair nameIdPair, final VLSN newCBVLSN, final NodeType nodeType)789 public boolean updateLocalCBVLSN(final NameIdPair nameIdPair, 790 final VLSN newCBVLSN, 791 final NodeType nodeType) 792 throws DatabaseException { 793 794 DatabaseImpl groupDbImpl = null; 795 try { 796 groupDbImpl = repImpl.probeGroupDb(); 797 } catch (DatabaseException e) { 798 /* Contention on the groupDbImpl, try later. */ 799 return false; 800 } 801 802 if (groupDbImpl == null) { 803 /* Contention on the groupDbImpl, try later. */ 804 return false; 805 } 806 807 DatabaseEntry nodeNameKey = new DatabaseEntry(); 808 StringBinding.stringToEntry(nameIdPair.getName(), nodeNameKey); 809 DatabaseEntry value = new DatabaseEntry(); 810 final RepGroupImpl.BarrierState barrierState = 811 new RepGroupImpl.BarrierState(newCBVLSN, 812 System.currentTimeMillis()); 813 Txn txn = null; 814 Cursor cursor = null; 815 boolean ok = false; 816 try { 817 818 /* 819 * No database update for secondary nodes, but set ok to true so 820 * that the rep node's group information is updated. 821 */ 822 if (nodeType.isSecondary()) { 823 ok = true; 824 return true; 825 } 826 txn = new MasterTxn(repImpl, 827 NO_ACK_NO_SYNC, 828 repImpl.getNameIdPair()); 829 830 /* Read the group first to avoid deadlocks */ 831 final RepGroupImpl repGroup = 832 fetchGroupObject(txn, groupDbImpl, LockMode.DEFAULT); 833 cursor = makeCursor(groupDbImpl, txn, CursorConfig.DEFAULT); 834 835 OperationStatus status = 836 cursor.getSearchKey(nodeNameKey, value, LockMode.RMW); 837 if (status != OperationStatus.SUCCESS) { 838 throw EnvironmentFailureException.unexpectedState 839 ("Node ID: " + nameIdPair + " not present in group db"); 840 } 841 842 /* Let's see if the entry needs updating. */ 843 final NodeBinding nodeBinding = 844 new NodeBinding(repGroup.getFormatVersion()); 845 final RepNodeImpl node = nodeBinding.entryToObject(value); 846 final VLSN lastCBVLSN = node.getBarrierState().getLastCBVLSN(); 847 if (lastCBVLSN.equals(newCBVLSN)) { 848 ok = true; 849 return true; 850 } 851 852 node.setBarrierState(barrierState); 853 nodeBinding.objectToEntry(node, value); 854 status = cursor.putCurrent(value); 855 if (status != OperationStatus.SUCCESS) { 856 throw EnvironmentFailureException.unexpectedState 857 ("Node ID: " + nameIdPair + 858 " stored localCBVLSN could not be updated. Status: " + 859 status); 860 } 861 LoggerUtils.fine(logger, repImpl, 862 "Local CBVLSN updated to " + newCBVLSN + 863 " for node " + nameIdPair); 864 ok = true; 865 } finally { 866 if (cursor != null) { 867 cursor.close(); 868 } 869 870 if (txn != null) { 871 if (ok) { 872 txn.commit(NO_ACK_NO_SYNC_DURABILITY); 873 } else { 874 txn.abort(); 875 } 876 txn = null; 877 } 878 if (ok) { 879 /* RepNode may be null during shutdown. [#17424] */ 880 RepNode repNode = repImpl.getRepNode(); 881 if (repNode != null) { 882 repNode.updateGroupInfo(nameIdPair, barrierState); 883 } 884 } 885 } 886 887 return true; 888 } 889 890 /* 891 * Returns just the de-serialized special rep group object from the 892 * database, using the specified lock mode. 893 */ fetchGroupObject(final Txn txn, final DatabaseImpl groupDbImpl, final LockMode lockMode)894 private RepGroupImpl fetchGroupObject(final Txn txn, 895 final DatabaseImpl groupDbImpl, 896 final LockMode lockMode) 897 throws DatabaseException { 898 899 RepGroupDB.GroupBinding groupBinding = new RepGroupDB.GroupBinding(); 900 DatabaseEntry groupEntry = new DatabaseEntry(); 901 902 Cursor cursor = null; 903 try { 904 cursor = makeCursor(groupDbImpl, txn, CursorConfig.DEFAULT); 905 906 final OperationStatus status = 907 cursor.getSearchKey(groupKeyEntry, groupEntry, lockMode); 908 909 if (status != OperationStatus.SUCCESS) { 910 throw EnvironmentFailureException.unexpectedState 911 ("Group entry key: " + GROUP_KEY + 912 " missing from group database"); 913 } 914 } finally { 915 if (cursor != null) { 916 cursor.close(); 917 } 918 } 919 920 return groupBinding.entryToObject(groupEntry); 921 } 922 923 /* 924 * Saves the rep group in the database. 925 */ saveGroupObject(Txn txn, RepGroupImpl repGroup, DatabaseImpl groupDbImpl)926 private void saveGroupObject(Txn txn, 927 RepGroupImpl repGroup, 928 DatabaseImpl groupDbImpl) 929 throws DatabaseException { 930 931 final GroupBinding groupBinding = 932 new GroupBinding(repGroup.getFormatVersion()); 933 DatabaseEntry groupEntry = new DatabaseEntry(); 934 groupBinding.objectToEntry(repGroup, groupEntry); 935 936 Cursor cursor = null; 937 try { 938 cursor = makeCursor(groupDbImpl, txn, CursorConfig.DEFAULT); 939 940 OperationStatus status = cursor.put(groupKeyEntry, groupEntry); 941 if (status != OperationStatus.SUCCESS) { 942 throw EnvironmentFailureException.unexpectedState 943 ("Group entry save failed"); 944 } 945 } finally { 946 if (cursor != null) { 947 cursor.close(); 948 } 949 } 950 } 951 952 /* 953 * Save a ReplicationNode in the database, using the format version 954 * specified by the group. 955 */ saveNodeObject(Txn txn, RepNodeImpl node, DatabaseImpl groupDbImpl, RepGroupImpl repGroup)956 private void saveNodeObject(Txn txn, 957 RepNodeImpl node, 958 DatabaseImpl groupDbImpl, 959 RepGroupImpl repGroup) 960 throws DatabaseException { 961 962 assert !node.getType().isSecondary(); 963 964 DatabaseEntry nodeNameKey = new DatabaseEntry(); 965 StringBinding.stringToEntry(node.getName(), nodeNameKey); 966 967 final NodeBinding nodeBinding = 968 new NodeBinding(repGroup.getFormatVersion()); 969 DatabaseEntry memberInfoEntry = new DatabaseEntry(); 970 nodeBinding.objectToEntry(node, memberInfoEntry); 971 972 Cursor cursor = null; 973 try { 974 cursor = makeCursor(groupDbImpl, txn, CursorConfig.DEFAULT); 975 976 OperationStatus status = cursor.put(nodeNameKey, memberInfoEntry); 977 if (status != OperationStatus.SUCCESS) { 978 throw EnvironmentFailureException.unexpectedState 979 ("Group entry save failed"); 980 } 981 } finally { 982 if (cursor != null) { 983 cursor.close(); 984 } 985 } 986 } 987 988 /** 989 * Converts a numeric version string to a JEVersion, returning null for an 990 * empty string. 991 */ parseJEVersion(final String versionString)992 static JEVersion parseJEVersion(final String versionString) { 993 return versionString.isEmpty() ? 994 null : 995 new JEVersion(versionString); 996 } 997 998 /** 999 * Converts a JEVersion to a numeric version string, returning an empty 1000 * string for null. 1001 */ jeVersionString(final JEVersion jeVersion)1002 static String jeVersionString(final JEVersion jeVersion) { 1003 return (jeVersion == null) ? 1004 "" : 1005 jeVersion.getNumericVersionString(); 1006 } 1007 1008 /** 1009 * RepGroupImpl version 3: Add the minJEVersion field 1010 */ 1011 public static class GroupBinding extends TupleBinding<RepGroupImpl> { 1012 1013 /** 1014 * The rep group format version to use for writing, or -1 for reading. 1015 */ 1016 private final int writeFormatVersion; 1017 1018 /** Create an instance for reading. */ GroupBinding()1019 public GroupBinding() { 1020 writeFormatVersion = -1; 1021 } 1022 1023 /** 1024 * Create an instance for writing using the specified group format 1025 * version. 1026 */ GroupBinding(final int writeFormatVersion)1027 GroupBinding(final int writeFormatVersion) { 1028 if (writeFormatVersion < 0) { 1029 throw new IllegalArgumentException( 1030 "writeFormatVersion must be non-negative: " + 1031 writeFormatVersion); 1032 } 1033 this.writeFormatVersion = writeFormatVersion; 1034 } 1035 1036 @Override entryToObject(TupleInput input)1037 public RepGroupImpl entryToObject(TupleInput input) { 1038 if (writeFormatVersion >= 0) { 1039 throw new IllegalStateException( 1040 "GroupBinding not created for read"); 1041 } 1042 final String name = input.readString(); 1043 final UUID uuid = new UUID(input.readLong(), input.readLong()); 1044 final int formatVersion = input.readInt(); 1045 return new RepGroupImpl( 1046 name, 1047 uuid, 1048 formatVersion, 1049 input.readInt(), 1050 input.readInt(), 1051 ((formatVersion < RepGroupImpl.FORMAT_VERSION_3) ? 1052 RepGroupImpl.MIN_FORMAT_VERSION_JE_VERSION : 1053 parseJEVersion(input.readString()))); 1054 } 1055 1056 @Override objectToEntry(RepGroupImpl group, TupleOutput output)1057 public void objectToEntry(RepGroupImpl group, TupleOutput output) { 1058 if (writeFormatVersion < 0) { 1059 throw new IllegalStateException( 1060 "GroupBinding not created for write"); 1061 } 1062 output.writeString(group.getName()); 1063 output.writeLong(group.getUUID().getMostSignificantBits()); 1064 output.writeLong(group.getUUID().getLeastSignificantBits()); 1065 output.writeInt(writeFormatVersion); 1066 output.writeInt(group.getChangeVersion()); 1067 output.writeInt(group.getNodeIdSequence()); 1068 if (writeFormatVersion >= RepGroupImpl.FORMAT_VERSION_3) { 1069 output.writeString(jeVersionString(group.getMinJEVersion())); 1070 } 1071 } 1072 } 1073 1074 /** 1075 * Supports the serialization/deserialization of node info into and out of 1076 * the database. Nodes are always saved using the current group format 1077 * version, and the node's format version is checked on reading to make 1078 * sure it is not newer than the current group format version, although 1079 * they could have an older format version if they have not been saved 1080 * recently. 1081 * 1082 * <p>Prior to RepGroupImpl version 3, the second field was always the 1083 * ordinal value of the node type, which was either 0 or 1. Starting with 1084 * version 3, values greater than 1 are treated as the rep group version of 1085 * the format used to write the node binding, with the node type following 1086 * in the next field, and the jeVersion field added at the end. 1087 */ 1088 public static class NodeBinding extends TupleBinding<RepNodeImpl> { 1089 1090 /** The approximate maximum size of the serialized form. */ 1091 static final int APPROX_MAX_SIZE = 1092 40 + /* node name (guess) */ 1093 4 + /* node ID */ 1094 1 + /* group version */ 1095 1 + /* NodeType */ 1096 1 + /* quorumAck */ 1097 1 + /* isRemoved */ 1098 40 + /* hostName (guess) */ 1099 4 + /* port */ 1100 8 + /* lastCBVLSN */ 1101 8 + /* barrierTime */ 1102 4 + /* changeVersion */ 1103 10; /* jeVersion (approx) */ 1104 1105 /** The maximum node type value for version 2. */ 1106 private static final int V2_MAX_NODE_TYPE = 1; 1107 1108 /** The group format version to use for reading or writing. */ 1109 private final int groupFormatVersion; 1110 1111 /** 1112 * Create an instance for reading or writing using the specified group 1113 * format version. 1114 */ NodeBinding(final int groupFormatVersion)1115 public NodeBinding(final int groupFormatVersion) { 1116 this.groupFormatVersion = groupFormatVersion; 1117 } 1118 1119 @Override entryToObject(final TupleInput input)1120 public RepNodeImpl entryToObject(final TupleInput input) { 1121 final NameIdPair nameIdPair = NameIdPair.deserialize(input); 1122 final int versionOrNodeType = input.readByte(); 1123 final boolean v2 = (versionOrNodeType <= V2_MAX_NODE_TYPE); 1124 if (!v2 && (versionOrNodeType > groupFormatVersion)) { 1125 throw new IllegalStateException( 1126 "Node entry version " + versionOrNodeType + " for node " + 1127 nameIdPair.getId() + 1128 " is illegal because it is newer than group version " + 1129 groupFormatVersion); 1130 } 1131 final int nodeTypeNum = v2 ? versionOrNodeType : input.readByte(); 1132 return new RepNodeImpl( 1133 nameIdPair, 1134 NodeType.values()[nodeTypeNum], 1135 input.readBoolean(), 1136 input.readBoolean(), 1137 input.readString(), 1138 input.readInt(), 1139 new BarrierState(new VLSN(input.readLong()), 1140 input.readLong()), 1141 input.readInt(), 1142 v2 ? null : parseJEVersion(input.readString())); 1143 } 1144 1145 /** 1146 * Returns whether the node can be serialized using the specified group 1147 * format version. 1148 */ supportsObjectToEntry( final RepNodeImpl node, final int groupFormatVersion)1149 public static boolean supportsObjectToEntry( 1150 final RepNodeImpl node, 1151 final int groupFormatVersion) { 1152 1153 /* Version 2 supports a limited set of node types */ 1154 return ((groupFormatVersion > RepGroupImpl.FORMAT_VERSION_2) || 1155 (node.getType().compareTo(NodeType.ELECTABLE) <= 0)); 1156 } 1157 1158 @Override objectToEntry(final RepNodeImpl mi, final TupleOutput output)1159 public void objectToEntry(final RepNodeImpl mi, 1160 final TupleOutput output) { 1161 if (!supportsObjectToEntry(mi, groupFormatVersion)) { 1162 throw new IllegalArgumentException( 1163 "Node type " + mi.getType() + 1164 " is not supported for group version " + 1165 groupFormatVersion); 1166 } 1167 final boolean v2 = 1168 (groupFormatVersion <= RepGroupImpl.FORMAT_VERSION_2); 1169 final BarrierState syncState = mi.getBarrierState(); 1170 mi.getNameIdPair().serialize(output); 1171 if (!v2) { 1172 output.writeByte(groupFormatVersion); 1173 } 1174 output.writeByte(mi.getType().ordinal()); 1175 output.writeBoolean(mi.isQuorumAck()); 1176 output.writeBoolean(mi.isRemoved()); 1177 output.writeString(mi.getHostName()); 1178 output.writeInt(mi.getPort()); 1179 output.writeLong(syncState.getLastCBVLSN().getSequence()); 1180 output.writeLong(syncState.getBarrierTime()); 1181 output.writeInt(mi.getChangeVersion()); 1182 if (!v2) { 1183 output.writeString(jeVersionString(mi.getJEVersion())); 1184 } 1185 } 1186 } 1187 1188 /** 1189 * Implements two phase updates for membership changes to the group 1190 * database. It compartmentalizes the retry operations and exception 1191 * handling so that it's independent of the core logic. 1192 */ 1193 private abstract class TwoPhaseUpdate { 1194 1195 final RepNodeImpl node; 1196 final boolean quorumAck; 1197 final DatabaseImpl groupDbImpl; 1198 1199 protected Txn txn; 1200 private DatabaseException phase1Exception = null; 1201 TwoPhaseUpdate(final RepNodeImpl node, final boolean quorumAck)1202 TwoPhaseUpdate(final RepNodeImpl node, final boolean quorumAck) { 1203 this.node = node; 1204 this.quorumAck = quorumAck; 1205 try { 1206 groupDbImpl = repImpl.getGroupDb(); 1207 } catch (DatabaseNotFoundException e) { 1208 /* Should never happen. */ 1209 throw EnvironmentFailureException.unexpectedException(e); 1210 } 1211 } 1212 1213 /* Phase1 exception handlers for phase1Body-specific cleanup */ insufficientReplicasHandler()1214 void insufficientReplicasHandler() {} 1215 deadlockHandler()1216 void deadlockHandler() {} 1217 1218 /* The changes to be made in phase1 */ phase1Body()1219 abstract void phase1Body(); 1220 1221 /* The changes to be made in phase2. */ phase2Body()1222 void phase2Body() { 1223 node.setQuorumAck(true); 1224 final RepGroupImpl repGroup = 1225 fetchGroupObject(txn, groupDbImpl, LockMode.DEFAULT); 1226 saveNodeObject(txn, node, groupDbImpl, repGroup); 1227 } 1228 phase1()1229 private void phase1() 1230 throws DatabaseException { 1231 1232 for (int i=0; i < QUORUM_ACK_RETRIES; i++ ) { 1233 txn = null; 1234 try { 1235 txn = new MasterTxn(repImpl, 1236 quorumAck ? QUORUM_ACK : NO_ACK, 1237 repImpl.getNameIdPair()); 1238 phase1Body(); 1239 txn.commit( 1240 quorumAck ? QUORUM_ACK_DURABILITY : NO_ACK_DURABILITY); 1241 txn = null; 1242 return; 1243 } catch (InsufficientReplicasException e) { 1244 phase1Exception = e; 1245 insufficientReplicasHandler(); 1246 /* Commit was aborted. */ 1247 LoggerUtils.warning(logger, repImpl, 1248 "Phase 1 retry; for node: " + 1249 node.getName() + 1250 " insufficient active replicas: " + 1251 e.getMessage()); 1252 continue; 1253 } catch (InsufficientAcksException e) { 1254 phase1Exception = e; 1255 /* Local commit completed but did not get enough acks. */ 1256 LoggerUtils.warning(logger, repImpl, 1257 "Phase 1 retry; for node: " + 1258 node.getName() + 1259 " insufficient acks: " + 1260 e.getMessage()); 1261 continue; 1262 } catch (LockConflictException e) { 1263 /* Likely a timeout, can't distinguish between them. */ 1264 phase1Exception = e; 1265 deadlockHandler(); 1266 LoggerUtils.warning(logger, repImpl, 1267 "Phase 1 retry; for node: " + 1268 node.getName() + 1269 " deadlock exception: " + 1270 e.getMessage()); 1271 continue; 1272 } catch (DatabaseException e) { 1273 LoggerUtils.severe(logger, repImpl, 1274 "Phase 1 failed unexpectedly: " + 1275 e.getMessage()); 1276 if (txn != null) { 1277 txn.abort(); 1278 } 1279 throw e; 1280 } finally { 1281 if (txn != null) { 1282 txn.abort(); 1283 } 1284 } 1285 } 1286 LoggerUtils.warning(logger, 1287 repImpl, 1288 "Phase 1 failed: " + 1289 phase1Exception.getMessage()); 1290 throw phase1Exception; 1291 } 1292 phase2()1293 private void phase2() { 1294 try { 1295 txn = new MasterTxn(repImpl, NO_ACK, repImpl.getNameIdPair()); 1296 phase2Body(); 1297 txn.commit(); 1298 txn = null; 1299 } catch (DatabaseException e) { 1300 LoggerUtils.severe(logger, repImpl, 1301 "Unexpected failure in Phase 2: " + 1302 e.getMessage()); 1303 throw e; 1304 } finally { 1305 if (txn != null) { 1306 txn.abort(); 1307 } 1308 } 1309 } 1310 execute()1311 void execute() { 1312 phase1(); 1313 /* Only executed if phase 1 succeeds. */ 1314 phase2(); 1315 } 1316 } 1317 1318 /** 1319 * An internal API used to obtain group information by opening a stand 1320 * alone environment handle and reading the RepGroupDB. Used for debugging 1321 * and utilities. 1322 * 1323 * @param envDir the directory containing the environment log files 1324 * 1325 * @return the group as currently defined by the environment 1326 */ getGroup(final File envDir)1327 public static RepGroupImpl getGroup(final File envDir) { 1328 1329 EnvironmentConfig envConfig = new EnvironmentConfig(); 1330 envConfig.setReadOnly(true); 1331 envConfig.setTransactional(true); 1332 envConfig.setAllowCreate(false); 1333 Environment env = new Environment(envDir, envConfig); 1334 Transaction txn = null; 1335 Database db = null; 1336 try { 1337 DatabaseConfig dbConfig = new DatabaseConfig(); 1338 dbConfig.setReadOnly(true); 1339 dbConfig.setTransactional(true); 1340 dbConfig.setAllowCreate(false); 1341 txn = env.beginTransaction(null, null); 1342 db = env.openDatabase(txn, DbType.REP_GROUP.getInternalName(), 1343 dbConfig); 1344 1345 DatabaseEntry groupEntry = new DatabaseEntry(); 1346 OperationStatus status = db.get( 1347 txn, groupKeyEntry, groupEntry, LockMode.READ_COMMITTED); 1348 if (status != OperationStatus.SUCCESS) { 1349 throw new IllegalStateException 1350 ("Group entry not found " + status); 1351 } 1352 GroupBinding groupBinding = new GroupBinding(); 1353 RepGroupImpl group = groupBinding.entryToObject(groupEntry); 1354 1355 group = fetchGroup(group.getName(), 1356 DbInternal.getDatabaseImpl(db), 1357 DbInternal.getTxn(txn)); 1358 txn.commit(); 1359 txn = null; 1360 return group; 1361 } finally { 1362 if (txn != null) { 1363 txn.abort(); 1364 } 1365 if (db != null) { 1366 db.close(); 1367 } 1368 env.close(); 1369 } 1370 } 1371 1372 /** 1373 * Deletes all the current members from the rep group database and creates 1374 * a new group, with just the member supplied via the configuration. This 1375 * method exists to support the utility {@link DbResetRepGroup} 1376 * <p> 1377 * The changes proceed in three steps: 1378 * 1379 * 1) Determine the node id sequence number. This is to ensure that rep 1380 * node ids are not reused. Old rep node ids are present in the logs as 1381 * commit records. 1382 * 1383 * 2) A new group object, with the node id sequence number determined 1384 * in step 1), is created and all existing nodes are deleted. 1385 * 1386 * 3) The first node is added to the rep group. 1387 * 1388 * @param lastOldVLSN the VLSN used to associate the new barrier wrt this 1389 * node. 1390 */ reinitFirstNode(VLSN lastOldVLSN)1391 public void reinitFirstNode(VLSN lastOldVLSN) { 1392 1393 DbConfigManager configManager = repImpl.getConfigManager(); 1394 String groupName = configManager.get(GROUP_NAME); 1395 String nodeName = configManager.get(NODE_NAME); 1396 String hostPortPair = configManager.get(RepParams.NODE_HOST_PORT); 1397 String hostname = HostPortPair.getHostname(hostPortPair); 1398 int port = HostPortPair.getPort(hostPortPair); 1399 final boolean retainUUID = 1400 configManager.getBoolean(RESET_REP_GROUP_RETAIN_UUID); 1401 1402 final DatabaseImpl dbImpl = repImpl.getGroupDb(); 1403 1404 /* 1405 * Retrieve the previous rep group object, so we can use its node 1406 * sequence id. 1407 */ 1408 TransactionConfig txnConfig = new TransactionConfig(); 1409 txnConfig.setDurability(NO_ACK.getDurability()); 1410 txnConfig.setConsistencyPolicy(NO_CONSISTENCY); 1411 1412 NameIdPair nameIdPair = repImpl.getRepNode().getNameIdPair(); 1413 nameIdPair.revertToNull(); /* read transaction, so null id is ok. */ 1414 1415 /* Now delete old nodes and the group, and establish a new group */ 1416 Txn txn = new MasterTxn(repImpl, txnConfig, nameIdPair); 1417 RepGroupImpl prevRepGroup = 1418 fetchGroupObject(txn, dbImpl, LockMode.RMW); 1419 txn.commit(); 1420 1421 final int nodeIdSequenceStart = prevRepGroup.getNodeIdSequence(); 1422 1423 final DatabaseEntry keyEntry = new DatabaseEntry(); 1424 final DatabaseEntry value = new DatabaseEntry(); 1425 1426 /* 1427 * We have the "predicted" real node id, so set it and it will be used 1428 * in the commit lns that will be written in future. 1429 */ 1430 final int firstNodeId = nodeIdSequenceStart + 1; 1431 nameIdPair.setId(firstNodeId); 1432 1433 RepNodeImpl firstNode = new RepNodeImpl( 1434 nodeName, hostname, port, repImpl.getCurrentJEVersion()); 1435 final BarrierState barrierState = new BarrierState(lastOldVLSN, 1436 System.currentTimeMillis()); 1437 firstNode.setBarrierState(barrierState); 1438 1439 txn = new MasterTxn(repImpl, txnConfig, nameIdPair); 1440 1441 final CursorConfig cursorConfig = new CursorConfig(); 1442 cursorConfig.setReadCommitted(true); 1443 Cursor mcursor = makeCursor(dbImpl, txn, cursorConfig); 1444 1445 while (mcursor.getNext(keyEntry, value, LockMode.DEFAULT) == 1446 OperationStatus.SUCCESS) { 1447 final String key = StringBinding.entryToString(keyEntry); 1448 1449 if (GROUP_KEY.equals(key)) { 1450 final RepGroupImpl repGroup; 1451 if (retainUUID) { 1452 repGroup = new GroupBinding().entryToObject(value); 1453 repGroup.incrementChangeVersion(); 1454 } else { 1455 repGroup = new RepGroupImpl( 1456 groupName, repImpl.getCurrentJEVersion()); 1457 } 1458 GroupBinding groupBinding = 1459 new GroupBinding(repGroup.getFormatVersion()); 1460 repGroup.setNodeIdSequence(nodeIdSequenceStart); 1461 DatabaseEntry groupEntry = new DatabaseEntry(); 1462 groupBinding.objectToEntry(repGroup, groupEntry); 1463 OperationStatus status = mcursor.putCurrent(groupEntry); 1464 if (!OperationStatus.SUCCESS.equals(status)) { 1465 throw new IllegalStateException("Unexpected state:" + 1466 status); 1467 } 1468 } else { 1469 LoggerUtils.info(logger, repImpl, "Removing node: " + key); 1470 mcursor.delete(); 1471 } 1472 } 1473 mcursor.close(); 1474 txn.commit(); 1475 1476 /* Now add the first node of the new group. */ 1477 ensureMember(firstNode); 1478 if (firstNodeId != firstNode.getNodeId()) { 1479 throw new IllegalStateException("Expected nodeid:" + firstNodeId + 1480 " but found:" + 1481 firstNode.getNodeId()); 1482 } 1483 } 1484 } 1485