1 /*-
2  * See the file LICENSE for redistribution information.
3  *
4  * Copyright (c) 2002, 2014 Oracle and/or its affiliates.  All rights reserved.
5  *
6  */
7 
8 package com.sleepycat.je.rep.impl;
9 
10 import static com.sleepycat.je.rep.NoConsistencyRequiredPolicy.NO_CONSISTENCY;
11 import static com.sleepycat.je.rep.impl.RepParams.GROUP_NAME;
12 import static com.sleepycat.je.rep.impl.RepParams.NODE_NAME;
13 import static com.sleepycat.je.rep.impl.RepParams.RESET_REP_GROUP_RETAIN_UUID;
14 
15 import java.io.File;
16 import java.util.HashMap;
17 import java.util.Map;
18 import java.util.UUID;
19 import java.util.logging.Logger;
20 
21 import com.sleepycat.bind.tuple.StringBinding;
22 import com.sleepycat.bind.tuple.TupleBinding;
23 import com.sleepycat.bind.tuple.TupleInput;
24 import com.sleepycat.bind.tuple.TupleOutput;
25 import com.sleepycat.je.Cursor;
26 import com.sleepycat.je.CursorConfig;
27 import com.sleepycat.je.Database;
28 import com.sleepycat.je.DatabaseConfig;
29 import com.sleepycat.je.DatabaseEntry;
30 import com.sleepycat.je.DatabaseException;
31 import com.sleepycat.je.DatabaseNotFoundException;
32 import com.sleepycat.je.DbInternal;
33 import com.sleepycat.je.Durability;
34 import com.sleepycat.je.Durability.ReplicaAckPolicy;
35 import com.sleepycat.je.Durability.SyncPolicy;
36 import com.sleepycat.je.Environment;
37 import com.sleepycat.je.EnvironmentConfig;
38 import com.sleepycat.je.EnvironmentFailureException;
39 import com.sleepycat.je.JEVersion;
40 import com.sleepycat.je.LockConflictException;
41 import com.sleepycat.je.LockMode;
42 import com.sleepycat.je.OperationStatus;
43 import com.sleepycat.je.Transaction;
44 import com.sleepycat.je.TransactionConfig;
45 import com.sleepycat.je.dbi.DatabaseImpl;
46 import com.sleepycat.je.dbi.DbConfigManager;
47 import com.sleepycat.je.dbi.DbTree;
48 import com.sleepycat.je.dbi.DbType;
49 import com.sleepycat.je.rep.InsufficientAcksException;
50 import com.sleepycat.je.rep.InsufficientReplicasException;
51 import com.sleepycat.je.rep.NodeType;
52 import com.sleepycat.je.rep.impl.RepGroupImpl.BarrierState;
53 import com.sleepycat.je.rep.impl.node.Feeder;
54 import com.sleepycat.je.rep.impl.node.NameIdPair;
55 import com.sleepycat.je.rep.impl.node.RepNode;
56 import com.sleepycat.je.rep.monitor.GroupChangeEvent.GroupChangeType;
57 import com.sleepycat.je.rep.stream.Protocol;
58 import com.sleepycat.je.rep.txn.MasterTxn;
59 import com.sleepycat.je.rep.txn.ReadonlyTxn;
60 import com.sleepycat.je.rep.util.DbResetRepGroup;
61 import com.sleepycat.je.rep.utilint.HostPortPair;
62 import com.sleepycat.je.txn.Txn;
63 import com.sleepycat.je.utilint.LoggerUtils;
64 import com.sleepycat.je.utilint.VLSN;
65 
66 /**
67  * This class is used to encapsulate all access to the rep group data that is
68  * present in every replicated JE environment. The rep group data exists
69  * primarily to support dynamic group membership. Both read and update access
70  * must be done through the APIs provided by this class.
71  *
72  * The database is simply a representation of the RepGroup. Each entry in the
73  * database represents a node in RepGroup; the key is the String node name, and
74  * the data is the serialized ReplicationNode.  There is a special entry keyed
75  * by GROUP_KEY that holds the contents of the RepGroup (excluding the nodes)
76  * itself.
77  *
78  * The database may be modified concurrently by multiple transactions as a
79  * master processes requests to update it. It may also be accessed by multiple
80  * overlapping transactions as a Replica replays the rep stream. These updates
81  * need to be interleaved with operations like getGroup() that create copies of
82  * the RepGroup instance. To avoid deadlocks, entries in the database are
83  * accessed in order of ascending key. GROUP_KEY in particular is associated
84  * with the lowest key value so that it's locked first implicitly as part of
85  * any iteration and any other modifications to the database must first lock it
86  * before making changes to the group itself.
87  *
88  * An instance of this class is created as part of a replication node and is
89  * retained for the entire lifetime of that node.
90  */
91 public class RepGroupDB {
92 
93     private final RepImpl repImpl;
94 
95     /* A convenient, cached empty group. */
96     public final RepGroupImpl emptyGroup;
97 
98     private final Logger logger;
99 
100     /* The key used to store group-wide information in the database. It must
101      * be the lowest key in the database, so that it's locked first during
102      * database iteration.
103      */
104     public final static String GROUP_KEY = "$$GROUP_KEY$$";
105     public final static DatabaseEntry groupKeyEntry = new DatabaseEntry();
106 
107     /* Initialize the entry. */
108     static {
StringBinding.stringToEntry(GROUP_KEY, groupKeyEntry)109         StringBinding.stringToEntry(GROUP_KEY, groupKeyEntry);
110     }
111 
112     /* The fixed DB ID associated with the internal rep group database. */
113     public final static long DB_ID = DbTree.NEG_DB_ID_START - 1;
114 
115     /*
116      * Number of times to retry for ACKs on the master before returning to
117      * to the Replica, which will then again retry on some periodic basis.
118      */
119     private final static int QUORUM_ACK_RETRIES = 5;
120 
121     /* Convenience Durability and Config constants. */
122     private final static Durability QUORUM_ACK_DURABILITY =
123         new Durability(SyncPolicy.SYNC,
124                        SyncPolicy.SYNC,
125                        ReplicaAckPolicy.SIMPLE_MAJORITY);
126 
127     private final static TransactionConfig QUORUM_ACK =
128         new TransactionConfig();
129 
130     private final static TransactionConfig NO_ACK = new TransactionConfig();
131 
132     /*
133      * TODO: Change this when we support true read only transactions.
134      */
135     final static TransactionConfig READ_ONLY = NO_ACK;
136 
137     private final static Durability NO_ACK_DURABILITY =
138         new Durability(SyncPolicy.SYNC,
139                        SyncPolicy.SYNC,
140                        ReplicaAckPolicy.NONE);
141 
142     private final static Durability NO_ACK_NO_SYNC_DURABILITY =
143         new Durability(SyncPolicy.NO_SYNC,
144                        SyncPolicy.NO_SYNC,
145                        ReplicaAckPolicy.NONE);
146 
147     private final static TransactionConfig NO_ACK_NO_SYNC =
148         new TransactionConfig();
149 
150     static {
151         /* Initialize config constants. */
152         QUORUM_ACK.setDurability(QUORUM_ACK_DURABILITY);
153         NO_ACK.setDurability(NO_ACK_DURABILITY);
154         NO_ACK_NO_SYNC.setDurability(NO_ACK_NO_SYNC_DURABILITY);
155     }
156 
157     /**
158      * Create an instance. Note that the database handle is not initialized at
159      * this time, since the state of the node master/replica is not known
160      * at the time the replication node (and consequently this instance) is
161      * created.
162      * @throws DatabaseException
163      */
RepGroupDB(RepImpl repImpl)164     public RepGroupDB(RepImpl repImpl)
165         throws DatabaseException {
166 
167         this.repImpl = repImpl;
168 
169         DbConfigManager configManager = repImpl.getConfigManager();
170         emptyGroup = new RepGroupImpl(configManager.get(GROUP_NAME),
171                                       repImpl.getCurrentJEVersion());
172         logger = LoggerUtils.getLogger(getClass());
173     }
174 
175     /**
176      * Returns all the members that are currently part of the replication
177      * group, using NO_CONSISTENCY. This method can read the database directly,
178      * and can be used when the replicated environment is detached and the
179      * RepNode is null. It's for the latter reason that the method reads
180      * uncommitted data. In detached mode, there may be transactions on the
181      * database that were in progress when the node was last shutdown. These
182      * transactions may have locks which will not be released until after the
183      * node is re-attached and the replication stream is resumed. Using
184      * uncommitted reads avoids use of locks in this circumstance. It's safe to
185      * read these records, since the database will eventually be updated with
186      * these changes.
187      *
188      * @return the group object
189      * @throws DatabaseException if the object could not be obtained
190      */
getGroup(RepImpl rImpl, String groupName)191     public static RepGroupImpl getGroup(RepImpl rImpl,
192                                         String groupName)
193         throws DatabaseException {
194 
195         /* Get persistent nodes from the database */
196         DatabaseImpl dbImpl = null;
197         boolean foundDbImpl = false;
198         try {
199             dbImpl = rImpl.getGroupDb();
200             foundDbImpl = true;
201         } catch (DatabaseNotFoundException e) {
202         }
203         final RepGroupImpl group;
204         if (!foundDbImpl) {
205 
206             /* Creates a temporary placeholder group for use until the real
207              * definition comes over the replication stream as part of the
208              * replicated group database.
209              */
210             group = new RepGroupImpl(groupName, true,
211                                      rImpl.getCurrentJEVersion());
212 
213         } else {
214             final TransactionConfig txnConfig = new TransactionConfig();
215             txnConfig.setDurability(READ_ONLY.getDurability());
216             txnConfig.setConsistencyPolicy(NO_CONSISTENCY);
217             txnConfig.setReadUncommitted(true);
218 
219             Txn txn = null;
220             try {
221                 txn = new ReadonlyTxn(rImpl, txnConfig);
222                 group = fetchGroup(groupName, dbImpl, txn);
223                 /*
224                  * Correct summary info since we are reading uncommitted data
225                  */
226                 group.makeConsistent();
227                 txn.commit();
228                 txn = null;
229             } finally {
230                 if (txn != null) {
231                     txn.abort();
232                 }
233             }
234         }
235 
236         /* Get secondary nodes from their feeders */
237         final RepNode repNode = rImpl.getRepNode();
238         if (repNode != null) {
239             for (final Feeder feeder :
240                      repNode.feederManager().activeReplicasMap().values()) {
241                 final RepNodeImpl node = feeder.getReplicaNode();
242                 if (node.getType().isSecondary()) {
243                     group.addSecondaryNode(node);
244                 }
245             }
246         }
247 
248         return group;
249     }
250 
getGroup()251     public RepGroupImpl getGroup()
252         throws DatabaseException {
253 
254         return getGroup(repImpl,
255                         repImpl.getConfigManager().get(GROUP_NAME));
256     }
257 
258     /**
259      * Sets the minimum JE version required for nodes to join the replication
260      * group and refreshes the group object cached in the rep group.  Throws a
261      * {@link MinJEVersionUnsupportedException} if the requested version is not
262      * supported by current nodes.
263      *
264      * <p>If this method returns successfully, nodes that are running a JE
265      * version older than the one specified will not be permitted to join the
266      * replication group in the future.  Use this method to implement features
267      * that require all group members to meet a minimum version requirement.
268      *
269      * <p>The update attempts to obtain acknowledgments from a simple majority,
270      * to make sure that future masters agree that the update has taken place,
271      * but does not require this.
272      *
273      * @param newMinJEVersion the new minimum JE version
274      * @throws DatabaseException if an error occurs when accessing the
275      * replication group database
276      * @throws MinJEVersionUnsupportedException if the requested version is not
277      * supported
278      */
setMinJEVersion(final JEVersion newMinJEVersion)279     public void setMinJEVersion(final JEVersion newMinJEVersion)
280         throws DatabaseException, MinJEVersionUnsupportedException {
281 
282         final DatabaseImpl groupDbImpl;
283         try {
284             groupDbImpl = repImpl.getGroupDb();
285         } catch (DatabaseNotFoundException e) {
286             /* Should never happen. */
287             throw EnvironmentFailureException.unexpectedException(e);
288         }
289         MasterTxn txn =
290             new MasterTxn(repImpl, QUORUM_ACK, repImpl.getNameIdPair());
291         try {
292             RepGroupImpl repGroup =
293                 fetchGroupObject(txn, groupDbImpl, LockMode.RMW);
294             repGroup = fetchGroup(repGroup.getName(), groupDbImpl, txn);
295             repGroup.setMinJEVersion(newMinJEVersion);
296             saveGroupObject(txn, repGroup, groupDbImpl);
297             txn.commit(QUORUM_ACK_DURABILITY);
298             txn = null;
299         } catch (InsufficientAcksException e) {
300 
301             /*
302              * Didn't receive acknowledgments from a simple majority.  OK to
303              * proceed, since this operation will be repeated if the change is
304              * lost.
305              */
306             LoggerUtils.info(logger, repImpl,
307                              "Proceeding without acks for minimum JE version");
308         } finally {
309             if (txn != null) {
310                 txn.abort();
311             }
312         }
313         repImpl.getRepNode().refreshCachedGroup();
314     }
315 
316     /**
317      * All rep group db access uses cursors with eviction disabled.
318      */
makeCursor(DatabaseImpl dbImpl, Txn txn, CursorConfig cursorConfig)319     static private Cursor makeCursor(DatabaseImpl dbImpl,
320                                      Txn txn,
321                                      CursorConfig cursorConfig) {
322         Cursor cursor = DbInternal.makeCursor(dbImpl,
323                                               txn,
324                                               cursorConfig);
325         DbInternal.getCursorImpl(cursor).setAllowEviction(false);
326         return cursor;
327     }
328 
329     /**
330      * Returns a representation of the nodes of the group stored in the
331      * database, using the txn and handles that were passed in.
332      */
fetchGroup(String groupName, DatabaseImpl dbImpl, Txn txn)333     private static RepGroupImpl fetchGroup(String groupName,
334                                            DatabaseImpl dbImpl,
335                                            Txn txn)
336         throws DatabaseException {
337 
338         final DatabaseEntry keyEntry = new DatabaseEntry();
339         final DatabaseEntry value = new DatabaseEntry();
340         NodeBinding nodeBinding = null;
341         final GroupBinding groupBinding = new GroupBinding();
342 
343         RepGroupImpl group = null;
344         Map <Integer, RepNodeImpl> nodes =
345             new HashMap<Integer, RepNodeImpl>();
346         final CursorConfig cursorConfig = new CursorConfig();
347         cursorConfig.setReadCommitted(true);
348 
349         Cursor mcursor = null;
350 
351         try {
352             mcursor = makeCursor(dbImpl, txn, cursorConfig);
353             while (mcursor.getNext(keyEntry, value, LockMode.DEFAULT) ==
354                    OperationStatus.SUCCESS) {
355 
356                 final String key = StringBinding.entryToString(keyEntry);
357 
358                 if (GROUP_KEY.equals(key)) {
359                     group = groupBinding.entryToObject(value);
360                     if (!group.getName().equals(groupName)) {
361                         throw EnvironmentFailureException.unexpectedState
362                             ("The argument: " + groupName +
363                              " does not match the expected group name: " +
364                              group.getName());
365                     }
366 
367                     /*
368                      * The group entry should always be first, so we can use it
369                      * to provide the group version for reading node entries.
370                      */
371                     nodeBinding = new NodeBinding(group.getFormatVersion());
372                 } else {
373                     if (nodeBinding == null) {
374                         throw new IllegalStateException(
375                             "Found node binding before group binding");
376                     }
377                     final RepNodeImpl node = nodeBinding.entryToObject(value);
378                     nodes.put(node.getNameIdPair().getId(), node);
379                 }
380             }
381             if (group == null) {
382                 throw EnvironmentFailureException.unexpectedState
383                     ("Group key: " + GROUP_KEY + " is missing");
384             }
385             group.setNodes(nodes);
386             return group;
387         } finally {
388             if (mcursor != null) {
389                 mcursor.close();
390             }
391         }
392     }
393 
394     /**
395      * Ensures that information about this node, the current master, is in the
396      * member database. If it isn't, enter it into the database. If the
397      * database does not exist, create it as well.
398      *
399      * <p>Note that this overloading is only used by a node that is the master.
400      *
401      * @throws DatabaseException
402      */
addFirstNode()403     public void addFirstNode()
404         throws DatabaseException {
405 
406         DbConfigManager configManager = repImpl.getConfigManager();
407         String groupName = configManager.get(GROUP_NAME);
408         String nodeName = configManager.get(NODE_NAME);
409 
410         DatabaseImpl groupDbImpl = repImpl.createGroupDb();
411 
412         /* setup the group information as data. */
413         RepGroupImpl repGroup =
414             new RepGroupImpl(groupName, repImpl.getCurrentJEVersion());
415         GroupBinding groupBinding =
416             new GroupBinding(repGroup.getFormatVersion());
417         DatabaseEntry groupEntry = new DatabaseEntry();
418         groupBinding.objectToEntry(repGroup, groupEntry);
419 
420         /* Create the common group entry. */
421         TransactionConfig txnConfig = new TransactionConfig();
422         txnConfig.setDurability(NO_ACK.getDurability());
423         txnConfig.setConsistencyPolicy(NO_CONSISTENCY);
424         Txn txn = null;
425         Cursor cursor = null;
426         try {
427             txn = new MasterTxn(repImpl,
428                                 txnConfig,
429                                 repImpl.getNameIdPair());
430 
431             cursor = makeCursor(groupDbImpl, txn, CursorConfig.DEFAULT);
432             OperationStatus status = cursor.put(groupKeyEntry, groupEntry);
433             if (status != OperationStatus.SUCCESS) {
434                 throw EnvironmentFailureException.unexpectedState
435                     ("Couldn't write first group entry " + status);
436             }
437             cursor.close();
438             cursor = null;
439             txn.commit();
440             txn = null;
441         } finally {
442             if (cursor != null) {
443                 cursor.close();
444             }
445 
446             if (txn != null) {
447                 txn.abort();
448             }
449         }
450 
451         ensureMember(new RepNodeImpl(nodeName,
452                                      repImpl.getHostName(),
453                                      repImpl.getPort(),
454                                      repImpl.getCurrentJEVersion()));
455     }
456 
457     /**
458      * Ensures that the membership info for the replica is in the database. A
459      * call to this method is initiated by the master as part of the
460      * feeder/replica handshake, where the replica provides membership
461      * information as part of the handshake protocol. The membership database
462      * must already exist, with the master in it, when this method is invoked.
463      *
464      * <p>This method should not be called for secondary nodes.
465      *
466      * @param membershipInfo provided by the replica
467      *
468      * @throws InsufficientReplicasException upon failure of 2p member update
469      * @throw  InsufficientAcksException upon failure of 2p member update
470      * @throws DatabaseException when the membership info could not be entered
471      * into the membership database.
472      */
ensureMember(Protocol.NodeGroupInfo membershipInfo)473     public void ensureMember(Protocol.NodeGroupInfo membershipInfo)
474         throws InsufficientReplicasException,
475                InsufficientAcksException,
476                DatabaseException {
477 
478         ensureMember(new RepNodeImpl(membershipInfo));
479     }
480 
ensureMember(RepNodeImpl ensureNode)481     void ensureMember(RepNodeImpl ensureNode)
482         throws DatabaseException {
483 
484         if (ensureNode.getType().isSecondary()) {
485             throw new IllegalArgumentException(
486                 "Attempt to call ensureMember on SECONDARY node: " +
487                 ensureNode);
488         }
489         DatabaseImpl groupDbImpl;
490         try {
491             groupDbImpl = repImpl.getGroupDb();
492         } catch (DatabaseNotFoundException e) {
493             /* Should never happen. */
494             throw EnvironmentFailureException.unexpectedException(e);
495         }
496 
497         DatabaseEntry nodeNameKey = new DatabaseEntry();
498         StringBinding.stringToEntry(ensureNode.getName(), nodeNameKey);
499 
500         DatabaseEntry value = new DatabaseEntry();
501         NodeBinding mib = null;
502 
503         Txn txn = null;
504         Cursor cursor = null;
505         try {
506             txn = new ReadonlyTxn(repImpl, NO_ACK);
507 
508             /*
509              * Fetch the group so we know the group format version.  Read the
510              * group before reading the node entry in each case to avoid the
511              * potential of deadlocks caused by reversing the order of lock
512              * acquisition.
513              */
514             final RepGroupImpl repGroup =
515                 fetchGroupObject(txn, groupDbImpl, LockMode.DEFAULT);
516             mib = new NodeBinding(repGroup.getFormatVersion());
517 
518             CursorConfig config = new CursorConfig();
519             config.setReadCommitted(true);
520             cursor = makeCursor(groupDbImpl, txn, config);
521 
522             OperationStatus status =
523                 cursor.getSearchKey(nodeNameKey, value, null);
524             if (status == OperationStatus.SUCCESS) {
525                 /* Let's see if the entry needs updating. */
526                 RepNodeImpl miInDb = mib.entryToObject(value);
527                 if (miInDb.equivalent(ensureNode)) {
528                     if (miInDb.isQuorumAck()) {
529                         /* Present, matched and acknowledged. */
530                         return;
531                     }
532                     ensureNode.getNameIdPair().update(miInDb.getNameIdPair());
533                     /* Not acknowledged, retry the update. */
534                 } else {
535                     /* Present but not equivalent. */
536                     LoggerUtils.warning(logger, repImpl,
537                                         "Incompatible node descriptions. " +
538                                         "Membership database definition: " +
539                                         miInDb.toString() +
540                                         " Transient definition: " +
541                                         ensureNode.toString());
542                     if (ensureNode.getType() != miInDb.getType()) {
543                         throw EnvironmentFailureException.unexpectedState(
544                             "Conflicting node types for node " +
545                             ensureNode.getName() +
546                             ": expected " + ensureNode.getType() +
547                             ", found " + miInDb.getType());
548                     }
549                     throw EnvironmentFailureException.unexpectedState(
550                         "Incompatible node descriptions for node: " +
551                         ensureNode.getName() + ", node ID: " +
552                         ensureNode.getNodeId());
553                 }
554                 LoggerUtils.info(logger, repImpl,
555                                  "Present but not ack'd node: " +
556                                  ensureNode.getNodeId() +
557                                  " ack status: " + miInDb.isQuorumAck());
558             }
559             cursor.close();
560             cursor = null;
561             txn.commit();
562             txn = null;
563         } finally {
564             if (cursor != null) {
565                 cursor.close();
566             }
567 
568             if (txn != null) {
569                 txn.abort();
570             }
571 
572         }
573         createMember(ensureNode);
574 
575         /* Refresh group and Fire an ADD GroupChangeEvent. */
576         refreshGroupAndNotifyGroupChange
577             (ensureNode.getName(), GroupChangeType.ADD);
578     }
579 
refreshGroupAndNotifyGroupChange(String nodeName, GroupChangeType opType)580     private void refreshGroupAndNotifyGroupChange(String nodeName,
581                                                   GroupChangeType opType) {
582         repImpl.getRepNode().refreshCachedGroup();
583         repImpl.getRepNode().getMonitorEventManager().notifyGroupChange
584             (nodeName, opType);
585     }
586 
587     /**
588      * Removes a node from the replication group by marking the node's entry in
589      * the rep group db as removed, and optionally deleting the entry.
590      *
591      * <p>This method should not be called for secondary nodes.
592      */
removeMember(final RepNodeImpl removeNode, final boolean delete)593     public void removeMember(final RepNodeImpl removeNode,
594                              final boolean delete) {
595         LoggerUtils.info(logger, repImpl,
596                          (delete ? "Deleting node: " : "Removing node: ") +
597                          removeNode.getName());
598 
599         if (removeNode.getType().isSecondary()) {
600             throw new IllegalArgumentException(
601                 "Attempt to call removeMember on a SECONDARY node: " +
602                 removeNode);
603         }
604 
605         TwoPhaseUpdate twoPhaseUpdate = new TwoPhaseUpdate(removeNode, true) {
606 
607             @Override
608             void phase1Body() {
609                 final RepGroupImpl repGroup =
610                     fetchGroupObject(txn, groupDbImpl, LockMode.RMW);
611                 int changeVersion = repGroup.incrementChangeVersion();
612                 saveGroupObject(txn, repGroup, groupDbImpl);
613                 node.setChangeVersion(changeVersion);
614                 node.setRemoved(true);
615                 saveNodeObject(txn, node, groupDbImpl, repGroup);
616             }
617             /** Override phase 2 to delete the node entry if delete is true. */
618             @Override
619             void phase2Body() {
620                 if (!delete) {
621                     super.phase2Body();
622                     return;
623                 }
624                 final DatabaseEntry nodeNameKey = new DatabaseEntry();
625                 StringBinding.stringToEntry(removeNode.getName(), nodeNameKey);
626                 final Cursor cursor =
627                     makeCursor(groupDbImpl, txn, CursorConfig.DEFAULT);
628                 try {
629                     final OperationStatus status = cursor.getSearchKey(
630                         nodeNameKey, new DatabaseEntry(), LockMode.RMW);
631                     if (status != OperationStatus.SUCCESS) {
632                         throw EnvironmentFailureException.unexpectedState(
633                             "Node ID: " + removeNode.getNameIdPair() +
634                             " not present in group db");
635                     }
636                     cursor.delete();
637                 } finally {
638                     cursor.close();
639                 }
640             }
641         };
642 
643         twoPhaseUpdate.execute();
644 
645         /* Refresh group and fire a REMOVE GroupChangeEvent. */
646         refreshGroupAndNotifyGroupChange
647             (removeNode.getName(), GroupChangeType.REMOVE);
648 
649         LoggerUtils.info(logger, repImpl,
650                          "Successfully deleted node: " + removeNode.getName());
651     }
652 
653     /* Add a new rep node into the RepGroupDB. */
createMember(final RepNodeImpl node)654     private void createMember(final RepNodeImpl node)
655         throws InsufficientReplicasException,
656                InsufficientAcksException,
657                DatabaseException {
658 
659         LoggerUtils.fine
660             (logger, repImpl, "Adding node: " + node.getNameIdPair());
661 
662         twoPhaseMemberUpdate(node, true);
663 
664         LoggerUtils.info(logger, repImpl,
665                          "Successfully added node:" + node.getNameIdPair() +
666                          " HostPort = " + node.getHostName() + ": " +
667                          node.getPort() + " [" + node.getType() + "]");
668     }
669 
670     /*
671      * Update a current rep node information in the RepGroupDB.
672      *
673      * <p>This method should not be called for secondary nodes.
674      *
675      * @param node the new node information
676      * @param quorumAck whether to require acknowledgments from a quorum
677      */
updateMember(final RepNodeImpl node, final boolean quorumAck)678     public void updateMember(final RepNodeImpl node, final boolean quorumAck)
679         throws InsufficientReplicasException,
680                InsufficientAcksException,
681                DatabaseException {
682 
683         if (node.getType().isSecondary()) {
684             throw new IllegalArgumentException(
685                 "Attempt to call updateMember on a SECONDARY node: " + node);
686         }
687 
688         LoggerUtils.fine(logger, repImpl, "Updating node: " + node);
689 
690         twoPhaseMemberUpdate(node, quorumAck);
691 
692         // TODO: clean up the Monitor interface.  There are several aspects of
693         // that interface that need fixing; but in particular it ought to have
694         // a way to inform listeners that a node has moved to a new network
695         // address.  Once that's done, the following should be replaced by a
696         // full refreshGroupAndNotifyGroupChange().  And actually that
697         // operation should be done closer to where we know the GroupDB has
698         // been changed.  In particular, if the GroupDB update suffers an IAE,
699         // the exception blows by the following, even though the database
700         // actually does now contain the updated value.
701         //
702         repImpl.getRepNode().refreshCachedGroup();
703 
704         LoggerUtils.info(logger, repImpl,
705                          "Successfully updated node: " + node.getNameIdPair() +
706                          " Hostport = " + node.getHostName() + ": " +
707                          node.getPort() + " [" + node.getType() + "]");
708     }
709 
710     /**
711      * Implements the two phase update of membership information.
712      *
713      * In the first phase the master repeatedly tries to commit the "put"
714      * operation until it gets a Quorum of acks, ensuring that the operation
715      * has been made durable. Nodes that obtain this entry will start using it
716      * in elections. However, the node itself will not participate in elections
717      * until it has successfully completed phase 2.
718      *
719      * In the second phase, the entry for the member is updated to note
720      * that a quorum of acks was received.
721      *
722      * Failure leaves the database with the member info absent, or
723      * present but without the update to quorumAcks indicating that a
724      * quorum has acknowledged the change.
725      *
726      * @param node the member info for the node.
727      * @param quorumAck whether to require acknowledgments from a quorum
728      *
729      * @throws DatabaseException upon failure.
730      */
twoPhaseMemberUpdate(final RepNodeImpl node, final boolean quorumAck)731     private void twoPhaseMemberUpdate(final RepNodeImpl node,
732                                       final boolean quorumAck)
733         throws InsufficientReplicasException,
734                InsufficientAcksException,
735                DatabaseException {
736 
737         TwoPhaseUpdate twoPhaseUpdate = new TwoPhaseUpdate(node, quorumAck) {
738 
739             @Override
740             void phase1Body() {
741                 RepGroupImpl repGroup =
742                     fetchGroupObject(txn, groupDbImpl, LockMode.RMW);
743                 repGroup = fetchGroup(repGroup.getName(), groupDbImpl, txn);
744                 repGroup.checkForConflicts(node);
745                 int changeVersion = repGroup.incrementChangeVersion();
746                 if (node.getNameIdPair().hasNullId()) {
747                     node.getNameIdPair().setId(repGroup.getNextNodeId());
748                 }
749                 saveGroupObject(txn, repGroup, groupDbImpl);
750                 node.setChangeVersion(changeVersion);
751                 final RepNodeImpl existingNode =
752                     repGroup.getNode(node.getName());
753                 if ((existingNode != null) && (node.getJEVersion() == null)) {
754                     node.updateJEVersion(existingNode.getJEVersion());
755                 }
756                 saveNodeObject(txn, node, groupDbImpl, repGroup);
757             }
758 
759             @Override
760             void deadlockHandler() {
761                 node.getNameIdPair().revertToNull();
762             }
763 
764             @Override
765             void insufficientReplicasHandler() {
766                 node.getNameIdPair().revertToNull();
767             }
768         };
769 
770         twoPhaseUpdate.execute();
771     }
772 
773     /**
774      * Updates the database entry associated with the node with the new local
775      * CBVLSN, if it can do so without encountering lock contention, and unless
776      * the node is a secondary node.  Also updates the rep node's transient
777      * group information about the global CBVLSN. If it encounters contention,
778      * it returns false, and the caller must retry at some later point in time.
779      *
780      * Note that changes to the local CBVLSN do not update the group version
781      * number since they do not impact group membership.
782      *
783      * @param nameIdPair identifies the node being updated
784      * @param newCBVLSN the new local CBVLSN to be associated with the node.
785      * @param nodeType the node type of the RepNode
786      * @return true if the update succeeded.
787      * @throws DatabaseException
788      */
updateLocalCBVLSN(final NameIdPair nameIdPair, final VLSN newCBVLSN, final NodeType nodeType)789     public boolean updateLocalCBVLSN(final NameIdPair nameIdPair,
790                                      final VLSN newCBVLSN,
791                                      final NodeType nodeType)
792         throws DatabaseException {
793 
794         DatabaseImpl groupDbImpl = null;
795         try {
796             groupDbImpl = repImpl.probeGroupDb();
797         } catch (DatabaseException e) {
798             /* Contention on the groupDbImpl, try later. */
799             return false;
800         }
801 
802         if (groupDbImpl == null) {
803             /* Contention on the groupDbImpl, try later. */
804             return false;
805         }
806 
807         DatabaseEntry nodeNameKey = new DatabaseEntry();
808         StringBinding.stringToEntry(nameIdPair.getName(), nodeNameKey);
809         DatabaseEntry value = new DatabaseEntry();
810         final RepGroupImpl.BarrierState barrierState =
811             new RepGroupImpl.BarrierState(newCBVLSN,
812                                           System.currentTimeMillis());
813         Txn txn = null;
814         Cursor cursor = null;
815         boolean ok = false;
816         try {
817 
818             /*
819              * No database update for secondary nodes, but set ok to true so
820              * that the rep node's group information is updated.
821              */
822             if (nodeType.isSecondary()) {
823                 ok = true;
824                 return true;
825             }
826             txn = new MasterTxn(repImpl,
827                                 NO_ACK_NO_SYNC,
828                                 repImpl.getNameIdPair());
829 
830             /* Read the group first to avoid deadlocks */
831             final RepGroupImpl repGroup =
832                 fetchGroupObject(txn, groupDbImpl, LockMode.DEFAULT);
833             cursor = makeCursor(groupDbImpl, txn, CursorConfig.DEFAULT);
834 
835             OperationStatus status =
836                     cursor.getSearchKey(nodeNameKey, value, LockMode.RMW);
837             if (status != OperationStatus.SUCCESS) {
838                 throw EnvironmentFailureException.unexpectedState
839                     ("Node ID: " + nameIdPair + " not present in group db");
840             }
841 
842             /* Let's see if the entry needs updating. */
843             final NodeBinding nodeBinding =
844                 new NodeBinding(repGroup.getFormatVersion());
845             final RepNodeImpl node = nodeBinding.entryToObject(value);
846             final VLSN lastCBVLSN = node.getBarrierState().getLastCBVLSN();
847             if (lastCBVLSN.equals(newCBVLSN)) {
848                 ok = true;
849                 return true;
850             }
851 
852             node.setBarrierState(barrierState);
853             nodeBinding.objectToEntry(node, value);
854             status = cursor.putCurrent(value);
855             if (status != OperationStatus.SUCCESS) {
856                 throw EnvironmentFailureException.unexpectedState
857                     ("Node ID: " + nameIdPair +
858                      " stored localCBVLSN could not be updated. Status: " +
859                      status);
860             }
861             LoggerUtils.fine(logger, repImpl,
862                              "Local CBVLSN updated to " + newCBVLSN +
863                              " for node " + nameIdPair);
864             ok = true;
865         } finally {
866             if (cursor != null) {
867                 cursor.close();
868             }
869 
870             if (txn != null) {
871                 if (ok) {
872                     txn.commit(NO_ACK_NO_SYNC_DURABILITY);
873                 } else {
874                     txn.abort();
875                 }
876                 txn = null;
877             }
878             if (ok) {
879                 /* RepNode may be null during shutdown. [#17424] */
880                 RepNode repNode = repImpl.getRepNode();
881                 if (repNode != null) {
882                     repNode.updateGroupInfo(nameIdPair, barrierState);
883                 }
884             }
885         }
886 
887         return true;
888     }
889 
890     /*
891      * Returns just the de-serialized special rep group object from the
892      * database, using the specified lock mode.
893      */
fetchGroupObject(final Txn txn, final DatabaseImpl groupDbImpl, final LockMode lockMode)894     private RepGroupImpl fetchGroupObject(final Txn txn,
895                                           final DatabaseImpl groupDbImpl,
896                                           final LockMode lockMode)
897         throws DatabaseException {
898 
899         RepGroupDB.GroupBinding groupBinding = new RepGroupDB.GroupBinding();
900         DatabaseEntry groupEntry = new DatabaseEntry();
901 
902         Cursor cursor = null;
903         try {
904             cursor = makeCursor(groupDbImpl, txn, CursorConfig.DEFAULT);
905 
906             final OperationStatus status =
907                 cursor.getSearchKey(groupKeyEntry, groupEntry, lockMode);
908 
909             if (status != OperationStatus.SUCCESS) {
910                 throw EnvironmentFailureException.unexpectedState
911                     ("Group entry key: " + GROUP_KEY +
912                      " missing from group database");
913             }
914         } finally {
915             if (cursor != null) {
916                 cursor.close();
917             }
918         }
919 
920         return groupBinding.entryToObject(groupEntry);
921     }
922 
923     /*
924      * Saves the rep group in the database.
925      */
saveGroupObject(Txn txn, RepGroupImpl repGroup, DatabaseImpl groupDbImpl)926     private void saveGroupObject(Txn txn,
927                                  RepGroupImpl repGroup,
928                                  DatabaseImpl groupDbImpl)
929         throws DatabaseException {
930 
931         final GroupBinding groupBinding =
932             new GroupBinding(repGroup.getFormatVersion());
933         DatabaseEntry groupEntry = new DatabaseEntry();
934         groupBinding.objectToEntry(repGroup, groupEntry);
935 
936         Cursor cursor = null;
937         try {
938             cursor = makeCursor(groupDbImpl, txn, CursorConfig.DEFAULT);
939 
940             OperationStatus status =  cursor.put(groupKeyEntry, groupEntry);
941             if (status != OperationStatus.SUCCESS) {
942                 throw EnvironmentFailureException.unexpectedState
943                     ("Group entry save failed");
944             }
945         } finally {
946             if (cursor != null) {
947                 cursor.close();
948             }
949         }
950     }
951 
952     /*
953      * Save a ReplicationNode in the database, using the format version
954      * specified by the group.
955      */
saveNodeObject(Txn txn, RepNodeImpl node, DatabaseImpl groupDbImpl, RepGroupImpl repGroup)956     private void saveNodeObject(Txn txn,
957                                 RepNodeImpl node,
958                                 DatabaseImpl groupDbImpl,
959                                 RepGroupImpl repGroup)
960         throws DatabaseException {
961 
962         assert !node.getType().isSecondary();
963 
964         DatabaseEntry nodeNameKey = new DatabaseEntry();
965         StringBinding.stringToEntry(node.getName(), nodeNameKey);
966 
967         final NodeBinding nodeBinding =
968             new NodeBinding(repGroup.getFormatVersion());
969         DatabaseEntry memberInfoEntry = new DatabaseEntry();
970         nodeBinding.objectToEntry(node, memberInfoEntry);
971 
972         Cursor cursor = null;
973         try {
974             cursor = makeCursor(groupDbImpl, txn, CursorConfig.DEFAULT);
975 
976             OperationStatus status =  cursor.put(nodeNameKey, memberInfoEntry);
977             if (status != OperationStatus.SUCCESS) {
978                 throw EnvironmentFailureException.unexpectedState
979                     ("Group entry save failed");
980             }
981         } finally {
982             if (cursor != null) {
983                 cursor.close();
984             }
985         }
986     }
987 
988     /**
989      * Converts a numeric version string to a JEVersion, returning null for an
990      * empty string.
991      */
parseJEVersion(final String versionString)992     static JEVersion parseJEVersion(final String versionString) {
993         return versionString.isEmpty() ?
994             null :
995             new JEVersion(versionString);
996     }
997 
998     /**
999      * Converts a JEVersion to a numeric version string, returning an empty
1000      * string for null.
1001      */
jeVersionString(final JEVersion jeVersion)1002     static String jeVersionString(final JEVersion jeVersion) {
1003         return (jeVersion == null) ?
1004             "" :
1005             jeVersion.getNumericVersionString();
1006     }
1007 
1008     /**
1009      * RepGroupImpl version 3: Add the minJEVersion field
1010      */
1011     public static class GroupBinding extends TupleBinding<RepGroupImpl>  {
1012 
1013         /**
1014          * The rep group format version to use for writing, or -1 for reading.
1015          */
1016         private final int writeFormatVersion;
1017 
1018         /** Create an instance for reading. */
GroupBinding()1019         public GroupBinding() {
1020             writeFormatVersion = -1;
1021         }
1022 
1023         /**
1024          * Create an instance for writing using the specified group format
1025          * version.
1026          */
GroupBinding(final int writeFormatVersion)1027         GroupBinding(final int writeFormatVersion) {
1028             if (writeFormatVersion < 0) {
1029                 throw new IllegalArgumentException(
1030                     "writeFormatVersion must be non-negative: " +
1031                     writeFormatVersion);
1032             }
1033             this.writeFormatVersion = writeFormatVersion;
1034         }
1035 
1036         @Override
entryToObject(TupleInput input)1037         public RepGroupImpl entryToObject(TupleInput input) {
1038             if (writeFormatVersion >= 0) {
1039                 throw new IllegalStateException(
1040                     "GroupBinding not created for read");
1041             }
1042             final String name = input.readString();
1043             final UUID uuid = new UUID(input.readLong(), input.readLong());
1044             final int formatVersion = input.readInt();
1045             return new RepGroupImpl(
1046                 name,
1047                 uuid,
1048                 formatVersion,
1049                 input.readInt(),
1050                 input.readInt(),
1051                 ((formatVersion < RepGroupImpl.FORMAT_VERSION_3) ?
1052                  RepGroupImpl.MIN_FORMAT_VERSION_JE_VERSION :
1053                  parseJEVersion(input.readString())));
1054         }
1055 
1056         @Override
objectToEntry(RepGroupImpl group, TupleOutput output)1057         public void objectToEntry(RepGroupImpl group, TupleOutput output) {
1058             if (writeFormatVersion < 0) {
1059                 throw new IllegalStateException(
1060                     "GroupBinding not created for write");
1061             }
1062             output.writeString(group.getName());
1063             output.writeLong(group.getUUID().getMostSignificantBits());
1064             output.writeLong(group.getUUID().getLeastSignificantBits());
1065             output.writeInt(writeFormatVersion);
1066             output.writeInt(group.getChangeVersion());
1067             output.writeInt(group.getNodeIdSequence());
1068             if (writeFormatVersion >= RepGroupImpl.FORMAT_VERSION_3) {
1069                 output.writeString(jeVersionString(group.getMinJEVersion()));
1070             }
1071         }
1072     }
1073 
1074     /**
1075      * Supports the serialization/deserialization of node info into and out of
1076      * the database.  Nodes are always saved using the current group format
1077      * version, and the node's format version is checked on reading to make
1078      * sure it is not newer than the current group format version, although
1079      * they could have an older format version if they have not been saved
1080      * recently.
1081      *
1082      * <p>Prior to RepGroupImpl version 3, the second field was always the
1083      * ordinal value of the node type, which was either 0 or 1.  Starting with
1084      * version 3, values greater than 1 are treated as the rep group version of
1085      * the format used to write the node binding, with the node type following
1086      * in the next field, and the jeVersion field added at the end.
1087      */
1088     public static class NodeBinding extends TupleBinding<RepNodeImpl> {
1089 
1090         /** The approximate maximum size of the serialized form. */
1091         static final int APPROX_MAX_SIZE =
1092             40 +                /* node name (guess) */
1093             4 +                 /* node ID */
1094             1 +                 /* group version */
1095             1 +                 /* NodeType */
1096             1 +                 /* quorumAck */
1097             1 +                 /* isRemoved */
1098             40 +                /* hostName (guess) */
1099             4 +                 /* port */
1100             8 +                 /* lastCBVLSN */
1101             8 +                 /* barrierTime */
1102             4 +                 /* changeVersion */
1103             10;                 /* jeVersion (approx) */
1104 
1105         /** The maximum node type value for version 2. */
1106         private static final int V2_MAX_NODE_TYPE = 1;
1107 
1108         /** The group format version to use for reading or writing. */
1109         private final int groupFormatVersion;
1110 
1111         /**
1112          * Create an instance for reading or writing using the specified group
1113          * format version.
1114          */
NodeBinding(final int groupFormatVersion)1115         public NodeBinding(final int groupFormatVersion) {
1116             this.groupFormatVersion = groupFormatVersion;
1117         }
1118 
1119         @Override
entryToObject(final TupleInput input)1120         public RepNodeImpl entryToObject(final TupleInput input) {
1121             final NameIdPair nameIdPair = NameIdPair.deserialize(input);
1122             final int versionOrNodeType = input.readByte();
1123             final boolean v2 = (versionOrNodeType <= V2_MAX_NODE_TYPE);
1124             if (!v2 && (versionOrNodeType > groupFormatVersion)) {
1125                 throw new IllegalStateException(
1126                     "Node entry version " + versionOrNodeType + " for node " +
1127                     nameIdPair.getId() +
1128                     " is illegal because it is newer than group version " +
1129                     groupFormatVersion);
1130             }
1131             final int nodeTypeNum = v2 ? versionOrNodeType : input.readByte();
1132             return new RepNodeImpl(
1133                 nameIdPair,
1134                 NodeType.values()[nodeTypeNum],
1135                 input.readBoolean(),
1136                 input.readBoolean(),
1137                 input.readString(),
1138                 input.readInt(),
1139                 new BarrierState(new VLSN(input.readLong()),
1140                                  input.readLong()),
1141                 input.readInt(),
1142                 v2 ? null : parseJEVersion(input.readString()));
1143         }
1144 
1145         /**
1146          * Returns whether the node can be serialized using the specified group
1147          * format version.
1148          */
supportsObjectToEntry( final RepNodeImpl node, final int groupFormatVersion)1149         public static boolean supportsObjectToEntry(
1150             final RepNodeImpl node,
1151             final int groupFormatVersion) {
1152 
1153             /* Version 2 supports a limited set of node types */
1154             return ((groupFormatVersion > RepGroupImpl.FORMAT_VERSION_2) ||
1155                     (node.getType().compareTo(NodeType.ELECTABLE) <= 0));
1156         }
1157 
1158         @Override
objectToEntry(final RepNodeImpl mi, final TupleOutput output)1159         public void objectToEntry(final RepNodeImpl mi,
1160                                   final TupleOutput output) {
1161             if (!supportsObjectToEntry(mi, groupFormatVersion)) {
1162                 throw new IllegalArgumentException(
1163                     "Node type " + mi.getType() +
1164                     " is not supported for group version " +
1165                     groupFormatVersion);
1166             }
1167             final boolean v2 =
1168                 (groupFormatVersion <= RepGroupImpl.FORMAT_VERSION_2);
1169             final BarrierState syncState = mi.getBarrierState();
1170             mi.getNameIdPair().serialize(output);
1171             if (!v2) {
1172                 output.writeByte(groupFormatVersion);
1173             }
1174             output.writeByte(mi.getType().ordinal());
1175             output.writeBoolean(mi.isQuorumAck());
1176             output.writeBoolean(mi.isRemoved());
1177             output.writeString(mi.getHostName());
1178             output.writeInt(mi.getPort());
1179             output.writeLong(syncState.getLastCBVLSN().getSequence());
1180             output.writeLong(syncState.getBarrierTime());
1181             output.writeInt(mi.getChangeVersion());
1182             if (!v2) {
1183                 output.writeString(jeVersionString(mi.getJEVersion()));
1184             }
1185         }
1186     }
1187 
1188     /**
1189      * Implements two phase updates for membership changes to the group
1190      * database. It compartmentalizes the retry operations and exception
1191      * handling so that it's independent of the core logic.
1192      */
1193     private abstract class TwoPhaseUpdate {
1194 
1195         final RepNodeImpl node;
1196         final boolean quorumAck;
1197         final DatabaseImpl groupDbImpl;
1198 
1199         protected Txn txn;
1200         private DatabaseException phase1Exception = null;
1201 
TwoPhaseUpdate(final RepNodeImpl node, final boolean quorumAck)1202         TwoPhaseUpdate(final RepNodeImpl node, final boolean quorumAck) {
1203             this.node = node;
1204             this.quorumAck = quorumAck;
1205             try {
1206                 groupDbImpl = repImpl.getGroupDb();
1207             } catch (DatabaseNotFoundException e) {
1208                 /* Should never happen. */
1209                 throw EnvironmentFailureException.unexpectedException(e);
1210             }
1211         }
1212 
1213         /* Phase1 exception handlers for phase1Body-specific cleanup */
insufficientReplicasHandler()1214         void insufficientReplicasHandler() {}
1215 
deadlockHandler()1216         void deadlockHandler() {}
1217 
1218         /* The changes to be made in phase1 */
phase1Body()1219         abstract void phase1Body();
1220 
1221         /* The changes to be made in phase2. */
phase2Body()1222         void phase2Body() {
1223             node.setQuorumAck(true);
1224             final RepGroupImpl repGroup =
1225                 fetchGroupObject(txn, groupDbImpl, LockMode.DEFAULT);
1226             saveNodeObject(txn, node, groupDbImpl, repGroup);
1227         }
1228 
phase1()1229         private void phase1()
1230             throws DatabaseException {
1231 
1232             for (int i=0; i < QUORUM_ACK_RETRIES; i++ ) {
1233                 txn = null;
1234                 try {
1235                     txn = new MasterTxn(repImpl,
1236                                         quorumAck ? QUORUM_ACK : NO_ACK,
1237                                         repImpl.getNameIdPair());
1238                     phase1Body();
1239                     txn.commit(
1240                         quorumAck ? QUORUM_ACK_DURABILITY : NO_ACK_DURABILITY);
1241                     txn = null;
1242                     return;
1243                 } catch (InsufficientReplicasException e) {
1244                     phase1Exception = e;
1245                     insufficientReplicasHandler();
1246                     /* Commit was aborted. */
1247                     LoggerUtils.warning(logger, repImpl,
1248                                         "Phase 1 retry; for node: " +
1249                                         node.getName() +
1250                                         " insufficient active replicas: " +
1251                                         e.getMessage());
1252                     continue;
1253                 } catch (InsufficientAcksException e) {
1254                     phase1Exception = e;
1255                     /* Local commit completed but did not get enough acks. */
1256                     LoggerUtils.warning(logger, repImpl,
1257                                         "Phase 1 retry; for node: " +
1258                                         node.getName() +
1259                                         " insufficient acks: " +
1260                                         e.getMessage());
1261                     continue;
1262                 } catch (LockConflictException e) {
1263                     /* Likely a timeout, can't distinguish between them. */
1264                     phase1Exception = e;
1265                     deadlockHandler();
1266                     LoggerUtils.warning(logger, repImpl,
1267                                         "Phase 1 retry; for node: " +
1268                                         node.getName() +
1269                                         " deadlock exception: " +
1270                                         e.getMessage());
1271                     continue;
1272                 } catch (DatabaseException e) {
1273                     LoggerUtils.severe(logger, repImpl,
1274                                        "Phase 1 failed unexpectedly: " +
1275                                        e.getMessage());
1276                     if (txn != null) {
1277                         txn.abort();
1278                     }
1279                     throw e;
1280                 } finally {
1281                     if (txn != null) {
1282                         txn.abort();
1283                     }
1284                 }
1285             }
1286             LoggerUtils.warning(logger,
1287                                 repImpl,
1288                                 "Phase 1 failed: " +
1289                                 phase1Exception.getMessage());
1290             throw phase1Exception;
1291         }
1292 
phase2()1293         private void phase2() {
1294             try {
1295                 txn = new MasterTxn(repImpl, NO_ACK, repImpl.getNameIdPair());
1296                 phase2Body();
1297                 txn.commit();
1298                 txn = null;
1299             } catch (DatabaseException e) {
1300                 LoggerUtils.severe(logger, repImpl,
1301                                    "Unexpected failure in Phase 2: " +
1302                                    e.getMessage());
1303                 throw e;
1304             } finally {
1305                 if (txn != null) {
1306                     txn.abort();
1307                 }
1308             }
1309         }
1310 
execute()1311         void execute() {
1312             phase1();
1313             /* Only executed if phase 1 succeeds. */
1314             phase2();
1315         }
1316     }
1317 
1318     /**
1319      * An internal API used to obtain group information by opening a stand
1320      * alone environment handle and reading the RepGroupDB. Used for debugging
1321      * and utilities.
1322      *
1323      * @param envDir the directory containing the environment log files
1324      *
1325      * @return the group as currently defined by the environment
1326      */
getGroup(final File envDir)1327     public static RepGroupImpl getGroup(final File envDir) {
1328 
1329         EnvironmentConfig envConfig = new EnvironmentConfig();
1330         envConfig.setReadOnly(true);
1331         envConfig.setTransactional(true);
1332         envConfig.setAllowCreate(false);
1333         Environment env = new Environment(envDir, envConfig);
1334         Transaction txn = null;
1335         Database db = null;
1336         try {
1337             DatabaseConfig dbConfig = new DatabaseConfig();
1338             dbConfig.setReadOnly(true);
1339             dbConfig.setTransactional(true);
1340             dbConfig.setAllowCreate(false);
1341             txn = env.beginTransaction(null, null);
1342             db = env.openDatabase(txn, DbType.REP_GROUP.getInternalName(),
1343                                   dbConfig);
1344 
1345             DatabaseEntry groupEntry = new DatabaseEntry();
1346             OperationStatus status = db.get(
1347                 txn, groupKeyEntry, groupEntry, LockMode.READ_COMMITTED);
1348             if (status != OperationStatus.SUCCESS) {
1349                 throw new IllegalStateException
1350                     ("Group entry not found " + status);
1351             }
1352             GroupBinding groupBinding = new GroupBinding();
1353             RepGroupImpl group = groupBinding.entryToObject(groupEntry);
1354 
1355             group = fetchGroup(group.getName(),
1356                                DbInternal.getDatabaseImpl(db),
1357                                DbInternal.getTxn(txn));
1358             txn.commit();
1359             txn = null;
1360             return group;
1361         } finally {
1362             if (txn != null) {
1363                 txn.abort();
1364             }
1365             if (db != null) {
1366                 db.close();
1367             }
1368             env.close();
1369         }
1370     }
1371 
1372     /**
1373      * Deletes all the current members from the rep group database and creates
1374      * a new group, with just the member supplied via the configuration. This
1375      * method exists to support the utility {@link DbResetRepGroup}
1376      * <p>
1377      * The changes proceed in three steps:
1378      *
1379      * 1) Determine the node id sequence number. This is to ensure that rep
1380      * node ids are not reused. Old rep node ids are present in the logs as
1381      * commit records.
1382      *
1383      * 2) A new group object, with the node id sequence number determined
1384      * in step 1), is created and all existing nodes are deleted.
1385      *
1386      * 3) The first node is added to the rep group.
1387      *
1388      * @param lastOldVLSN the VLSN used to associate the new barrier wrt this
1389      * node.
1390      */
reinitFirstNode(VLSN lastOldVLSN)1391     public void reinitFirstNode(VLSN lastOldVLSN) {
1392 
1393         DbConfigManager configManager = repImpl.getConfigManager();
1394         String groupName = configManager.get(GROUP_NAME);
1395         String nodeName = configManager.get(NODE_NAME);
1396         String hostPortPair = configManager.get(RepParams.NODE_HOST_PORT);
1397         String hostname = HostPortPair.getHostname(hostPortPair);
1398         int port = HostPortPair.getPort(hostPortPair);
1399         final boolean retainUUID =
1400             configManager.getBoolean(RESET_REP_GROUP_RETAIN_UUID);
1401 
1402         final DatabaseImpl dbImpl = repImpl.getGroupDb();
1403 
1404         /*
1405          * Retrieve the previous rep group object, so we can use its node
1406          * sequence id.
1407          */
1408         TransactionConfig txnConfig = new TransactionConfig();
1409         txnConfig.setDurability(NO_ACK.getDurability());
1410         txnConfig.setConsistencyPolicy(NO_CONSISTENCY);
1411 
1412         NameIdPair nameIdPair = repImpl.getRepNode().getNameIdPair();
1413         nameIdPair.revertToNull(); /* read transaction, so null id is ok. */
1414 
1415         /* Now delete old nodes and the group, and establish a new group */
1416         Txn txn = new MasterTxn(repImpl, txnConfig, nameIdPair);
1417         RepGroupImpl prevRepGroup =
1418             fetchGroupObject(txn, dbImpl, LockMode.RMW);
1419         txn.commit();
1420 
1421         final int nodeIdSequenceStart = prevRepGroup.getNodeIdSequence();
1422 
1423         final DatabaseEntry keyEntry = new DatabaseEntry();
1424         final DatabaseEntry value = new DatabaseEntry();
1425 
1426         /*
1427          * We have the "predicted" real node id, so set it and it will be used
1428          * in the commit lns that will be written in future.
1429          */
1430         final int firstNodeId = nodeIdSequenceStart + 1;
1431         nameIdPair.setId(firstNodeId);
1432 
1433         RepNodeImpl firstNode = new RepNodeImpl(
1434             nodeName, hostname, port, repImpl.getCurrentJEVersion());
1435         final BarrierState barrierState = new BarrierState(lastOldVLSN,
1436                                                    System.currentTimeMillis());
1437         firstNode.setBarrierState(barrierState);
1438 
1439         txn = new MasterTxn(repImpl, txnConfig, nameIdPair);
1440 
1441         final CursorConfig cursorConfig = new CursorConfig();
1442         cursorConfig.setReadCommitted(true);
1443         Cursor mcursor = makeCursor(dbImpl, txn, cursorConfig);
1444 
1445         while (mcursor.getNext(keyEntry, value, LockMode.DEFAULT) ==
1446                OperationStatus.SUCCESS) {
1447             final String key = StringBinding.entryToString(keyEntry);
1448 
1449             if (GROUP_KEY.equals(key)) {
1450                 final RepGroupImpl repGroup;
1451                 if (retainUUID) {
1452                     repGroup = new GroupBinding().entryToObject(value);
1453                     repGroup.incrementChangeVersion();
1454                 } else {
1455                     repGroup = new RepGroupImpl(
1456                         groupName, repImpl.getCurrentJEVersion());
1457                 }
1458                 GroupBinding groupBinding =
1459                     new GroupBinding(repGroup.getFormatVersion());
1460                 repGroup.setNodeIdSequence(nodeIdSequenceStart);
1461                 DatabaseEntry groupEntry = new DatabaseEntry();
1462                 groupBinding.objectToEntry(repGroup, groupEntry);
1463                 OperationStatus status = mcursor.putCurrent(groupEntry);
1464                 if (!OperationStatus.SUCCESS.equals(status)) {
1465                     throw new IllegalStateException("Unexpected state:" +
1466                                                     status);
1467                 }
1468             } else {
1469                 LoggerUtils.info(logger, repImpl, "Removing node: " + key);
1470                 mcursor.delete();
1471             }
1472         }
1473         mcursor.close();
1474         txn.commit();
1475 
1476         /* Now add the first node of the new group. */
1477         ensureMember(firstNode);
1478         if (firstNodeId != firstNode.getNodeId()) {
1479             throw new IllegalStateException("Expected nodeid:" + firstNodeId +
1480                                             " but found:" +
1481                                             firstNode.getNodeId());
1482         }
1483     }
1484 }
1485